Skip to main content

Trigger Recipes

Practical recipes for building event-driven agent systems with HatiData's semantic triggers.


Recipe 1: Multi-Tier Alert Pipeline

Route triggered events to different channels based on severity:

from hatidata import HatiDataClient

client = HatiDataClient(
host="localhost",
port=5439,
api_key="hd_live_your_api_key",
)

# Low severity: log only
client.triggers.register(
name="info-anomaly",
concept="minor anomaly, slight deviation, informational alert",
threshold=0.72,
action={"type": "write_event", "table": "_hatidata_trigger_events"},
cooldown_seconds=600,
metadata={"severity": "low"},
)

# Medium severity: notify the monitoring agent
client.triggers.register(
name="warning-anomaly",
concept="significant anomaly, unexpected pattern, warning level issue",
threshold=0.78,
action={"type": "agent_notify", "agent_id": "monitoring-agent"},
cooldown_seconds=300,
metadata={"severity": "medium"},
)

# High severity: webhook to PagerDuty
client.triggers.register(
name="critical-anomaly",
concept="critical failure, data loss, security breach, system down",
threshold=0.82,
action={
"type": "webhook",
"url": "https://events.pagerduty.com/v2/enqueue",
"secret": "whsec_pagerduty_secret",
"headers": {"Content-Type": "application/json"},
},
cooldown_seconds=60,
metadata={"severity": "critical"},
)

Recipe 2: Content Moderation Triggers

Flag content that matches moderation categories before it reaches downstream agents:

moderation_categories = [
("mod-pii", "social security number, credit card number, personal identification, passport number", 0.80),
("mod-toxic", "hate speech, harassment, threats, violent content, discrimination", 0.82),
("mod-financial", "insider trading, market manipulation, unauthorized financial advice", 0.78),
]

for name, concept, threshold in moderation_categories:
client.triggers.register(
name=name,
concept=concept,
threshold=threshold,
action={
"type": "webhook",
"url": "https://your-app.com/webhooks/moderation",
"secret": "whsec_moderation_secret",
},
cooldown_seconds=0, # No cooldown -- flag every occurrence
metadata={"category": name},
)

Moderation Webhook Handler

from flask import Flask, request, jsonify
import hmac, hashlib

app = Flask(__name__)

@app.route("/webhooks/moderation", methods=["POST"])
def moderation_webhook():
payload = request.json
category = payload["metadata"]["category"]
content = payload["content"]

if category == "mod-pii":
# Quarantine the content and redact PII
quarantine_content(payload["content_id"])
notify_privacy_team(payload)
elif category == "mod-toxic":
flag_for_review(payload["content_id"], payload["agent_id"])
elif category == "mod-financial":
block_agent_output(payload["content_id"])
notify_compliance_team(payload)

return jsonify({"handled": True}), 200

Recipe 3: Competitive Intelligence Trigger

Detect when agents process content mentioning competitors:

client.triggers.register(
name="competitor-mention",
concept="competitor product, alternative solution, switching provider, market comparison",
threshold=0.75,
action={
"type": "webhook",
"url": "https://your-app.com/webhooks/competitive-intel",
"secret": "whsec_intel_secret",
},
cooldown_seconds=3600, # At most once per hour per concept
metadata={"team": "product"},
)

Recipe 4: Trigger Testing Workflow

Systematically test trigger thresholds before deploying to production:

def test_trigger_threshold(trigger_name: str, test_cases: list[tuple[str, bool]]):
"""Test a trigger against known positive and negative cases."""
results = []
for content, expected_fire in test_cases:
result = client.triggers.test(name=trigger_name, content=content)
actual = result["would_fire"]
status = "PASS" if actual == expected_fire else "FAIL"
results.append({
"content": content[:60],
"expected": expected_fire,
"actual": actual,
"score": result["similarity_score"],
"status": status,
})

# Report
passed = sum(1 for r in results if r["status"] == "PASS")
print(f"\n{trigger_name}: {passed}/{len(results)} tests passed")
for r in results:
marker = " " if r["status"] == "PASS" else "X "
print(f" {marker} [{r['score']:.3f}] fire={r['actual']:<5} {r['content']}")

return results

# Test the churn risk detector
test_trigger_threshold("churn-risk-detector", [
# Should fire (positive cases)
("I want to cancel my subscription immediately", True),
("We are evaluating other providers and may switch", True),
("This product is not meeting our needs anymore", True),
# Should NOT fire (negative cases)
("Thanks for the great support, everything works well", False),
("Can you help me set up a new integration?", False),
("What are the pricing options for upgrading?", False),
])

Recipe 5: Cooldown Strategies

Different use cases require different cooldown configurations:

Use CaseCooldownRationale
PII detection0sFlag every single occurrence
Churn risk alerts300s (5 min)Avoid flooding CS team during an angry conversation
Competitive intel3600s (1 hr)One alert per topic is enough
Opportunity detection600s (10 min)Let sales prioritize before next alert
System health120s (2 min)Balance alerting vs. noise

Dynamic Cooldown Adjustment

def adjust_cooldown(trigger_name: str, firing_rate_per_hour: float):
"""Auto-adjust cooldown based on firing frequency."""
if firing_rate_per_hour > 20:
new_cooldown = 1800 # Too noisy, increase to 30 min
elif firing_rate_per_hour > 10:
new_cooldown = 600 # Moderate, 10 min
elif firing_rate_per_hour < 1:
new_cooldown = 60 # Under-firing, reduce to 1 min
else:
return # Current setting is fine

client.triggers.update(
name=trigger_name,
cooldown_seconds=new_cooldown,
)
print(f"Adjusted {trigger_name} cooldown to {new_cooldown}s "
f"(firing rate: {firing_rate_per_hour:.1f}/hr)")

Recipe 6: Trigger Monitoring Dashboard

SQL queries for monitoring trigger health:

-- Trigger firing rate over the last 24 hours (hourly buckets)
SELECT
trigger_name,
DATE_TRUNC('hour', fired_at) AS hour,
COUNT(*) AS fire_count,
AVG(similarity_score) AS avg_score
FROM _hatidata_trigger_events
WHERE fired_at > NOW() - INTERVAL '24 hours'
GROUP BY trigger_name, DATE_TRUNC('hour', fired_at)
ORDER BY hour DESC, fire_count DESC;

-- Triggers that have not fired in 7 days (may need threshold adjustment)
SELECT t.name, t.threshold, t.cooldown_seconds, MAX(e.fired_at) AS last_fired
FROM _hatidata_triggers t
LEFT JOIN _hatidata_trigger_events e ON t.name = e.trigger_name
GROUP BY t.name, t.threshold, t.cooldown_seconds
HAVING MAX(e.fired_at) IS NULL
OR MAX(e.fired_at) < NOW() - INTERVAL '7 days';

-- Average similarity scores by trigger (threshold tuning data)
SELECT
trigger_name,
COUNT(*) AS fires,
AVG(similarity_score) AS avg_score,
MIN(similarity_score) AS min_score,
MAX(similarity_score) AS max_score
FROM _hatidata_trigger_events
WHERE fired_at > NOW() - INTERVAL '30 days'
GROUP BY trigger_name
ORDER BY avg_score DESC;

Stay in the loop

Product updates, engineering deep-dives, and agent-native insights. No spam.