Trigger Recipes
Practical recipes for building event-driven agent systems with HatiData's semantic triggers.
Recipe 1: Multi-Tier Alert Pipeline
Route triggered events to different channels based on severity:
from hatidata import HatiDataClient
client = HatiDataClient(
host="localhost",
port=5439,
api_key="hd_live_your_api_key",
)
# Low severity: log only
client.triggers.register(
name="info-anomaly",
concept="minor anomaly, slight deviation, informational alert",
threshold=0.72,
action={"type": "write_event", "table": "_hatidata_trigger_events"},
cooldown_seconds=600,
metadata={"severity": "low"},
)
# Medium severity: notify the monitoring agent
client.triggers.register(
name="warning-anomaly",
concept="significant anomaly, unexpected pattern, warning level issue",
threshold=0.78,
action={"type": "agent_notify", "agent_id": "monitoring-agent"},
cooldown_seconds=300,
metadata={"severity": "medium"},
)
# High severity: webhook to PagerDuty
client.triggers.register(
name="critical-anomaly",
concept="critical failure, data loss, security breach, system down",
threshold=0.82,
action={
"type": "webhook",
"url": "https://events.pagerduty.com/v2/enqueue",
"secret": "whsec_pagerduty_secret",
"headers": {"Content-Type": "application/json"},
},
cooldown_seconds=60,
metadata={"severity": "critical"},
)
Recipe 2: Content Moderation Triggers
Flag content that matches moderation categories before it reaches downstream agents:
moderation_categories = [
("mod-pii", "social security number, credit card number, personal identification, passport number", 0.80),
("mod-toxic", "hate speech, harassment, threats, violent content, discrimination", 0.82),
("mod-financial", "insider trading, market manipulation, unauthorized financial advice", 0.78),
]
for name, concept, threshold in moderation_categories:
client.triggers.register(
name=name,
concept=concept,
threshold=threshold,
action={
"type": "webhook",
"url": "https://your-app.com/webhooks/moderation",
"secret": "whsec_moderation_secret",
},
cooldown_seconds=0, # No cooldown -- flag every occurrence
metadata={"category": name},
)
Moderation Webhook Handler
from flask import Flask, request, jsonify
import hmac, hashlib
app = Flask(__name__)
@app.route("/webhooks/moderation", methods=["POST"])
def moderation_webhook():
payload = request.json
category = payload["metadata"]["category"]
content = payload["content"]
if category == "mod-pii":
# Quarantine the content and redact PII
quarantine_content(payload["content_id"])
notify_privacy_team(payload)
elif category == "mod-toxic":
flag_for_review(payload["content_id"], payload["agent_id"])
elif category == "mod-financial":
block_agent_output(payload["content_id"])
notify_compliance_team(payload)
return jsonify({"handled": True}), 200
Recipe 3: Competitive Intelligence Trigger
Detect when agents process content mentioning competitors:
client.triggers.register(
name="competitor-mention",
concept="competitor product, alternative solution, switching provider, market comparison",
threshold=0.75,
action={
"type": "webhook",
"url": "https://your-app.com/webhooks/competitive-intel",
"secret": "whsec_intel_secret",
},
cooldown_seconds=3600, # At most once per hour per concept
metadata={"team": "product"},
)
Recipe 4: Trigger Testing Workflow
Systematically test trigger thresholds before deploying to production:
def test_trigger_threshold(trigger_name: str, test_cases: list[tuple[str, bool]]):
"""Test a trigger against known positive and negative cases."""
results = []
for content, expected_fire in test_cases:
result = client.triggers.test(name=trigger_name, content=content)
actual = result["would_fire"]
status = "PASS" if actual == expected_fire else "FAIL"
results.append({
"content": content[:60],
"expected": expected_fire,
"actual": actual,
"score": result["similarity_score"],
"status": status,
})
# Report
passed = sum(1 for r in results if r["status"] == "PASS")
print(f"\n{trigger_name}: {passed}/{len(results)} tests passed")
for r in results:
marker = " " if r["status"] == "PASS" else "X "
print(f" {marker} [{r['score']:.3f}] fire={r['actual']:<5} {r['content']}")
return results
# Test the churn risk detector
test_trigger_threshold("churn-risk-detector", [
# Should fire (positive cases)
("I want to cancel my subscription immediately", True),
("We are evaluating other providers and may switch", True),
("This product is not meeting our needs anymore", True),
# Should NOT fire (negative cases)
("Thanks for the great support, everything works well", False),
("Can you help me set up a new integration?", False),
("What are the pricing options for upgrading?", False),
])
Recipe 5: Cooldown Strategies
Different use cases require different cooldown configurations:
| Use Case | Cooldown | Rationale |
|---|---|---|
| PII detection | 0s | Flag every single occurrence |
| Churn risk alerts | 300s (5 min) | Avoid flooding CS team during an angry conversation |
| Competitive intel | 3600s (1 hr) | One alert per topic is enough |
| Opportunity detection | 600s (10 min) | Let sales prioritize before next alert |
| System health | 120s (2 min) | Balance alerting vs. noise |
Dynamic Cooldown Adjustment
def adjust_cooldown(trigger_name: str, firing_rate_per_hour: float):
"""Auto-adjust cooldown based on firing frequency."""
if firing_rate_per_hour > 20:
new_cooldown = 1800 # Too noisy, increase to 30 min
elif firing_rate_per_hour > 10:
new_cooldown = 600 # Moderate, 10 min
elif firing_rate_per_hour < 1:
new_cooldown = 60 # Under-firing, reduce to 1 min
else:
return # Current setting is fine
client.triggers.update(
name=trigger_name,
cooldown_seconds=new_cooldown,
)
print(f"Adjusted {trigger_name} cooldown to {new_cooldown}s "
f"(firing rate: {firing_rate_per_hour:.1f}/hr)")
Recipe 6: Trigger Monitoring Dashboard
SQL queries for monitoring trigger health:
-- Trigger firing rate over the last 24 hours (hourly buckets)
SELECT
trigger_name,
DATE_TRUNC('hour', fired_at) AS hour,
COUNT(*) AS fire_count,
AVG(similarity_score) AS avg_score
FROM _hatidata_trigger_events
WHERE fired_at > NOW() - INTERVAL '24 hours'
GROUP BY trigger_name, DATE_TRUNC('hour', fired_at)
ORDER BY hour DESC, fire_count DESC;
-- Triggers that have not fired in 7 days (may need threshold adjustment)
SELECT t.name, t.threshold, t.cooldown_seconds, MAX(e.fired_at) AS last_fired
FROM _hatidata_triggers t
LEFT JOIN _hatidata_trigger_events e ON t.name = e.trigger_name
GROUP BY t.name, t.threshold, t.cooldown_seconds
HAVING MAX(e.fired_at) IS NULL
OR MAX(e.fired_at) < NOW() - INTERVAL '7 days';
-- Average similarity scores by trigger (threshold tuning data)
SELECT
trigger_name,
COUNT(*) AS fires,
AVG(similarity_score) AS avg_score,
MIN(similarity_score) AS min_score,
MAX(similarity_score) AS max_score
FROM _hatidata_trigger_events
WHERE fired_at > NOW() - INTERVAL '30 days'
GROUP BY trigger_name
ORDER BY avg_score DESC;
Related Concepts
- Semantic Triggers -- Architecture and evaluation
- Semantic Trigger Pipeline -- Step-by-step tutorial
- Webhook Events -- Full webhook payload reference
- Embedding Pipeline -- How content is embedded
- MCP Tools Reference --
register_trigger,test_triggertools