Semantic Trigger Pipeline
In this tutorial you will build an event-driven AI pipeline using HatiData's semantic triggers. Triggers fire when stored content crosses a cosine similarity threshold against a registered concept, enabling automated responses to semantically meaningful events.
By the end you will have:
- Registered triggers that watch for specific semantic concepts
- Configured webhook delivery with HMAC-SHA256 signature verification
- Tested triggers with different content to verify firing behavior
- A monitoring dashboard query for trigger activity
Prerequisites
- Python 3.10+
- HatiData proxy running with an embedding provider configured
hatidataSDK installed
pip install hatidata requests
export HATIDATA_API_KEY="hd_live_your_api_key"
export HATIDATA_HOST="localhost"
Step 1: Register Semantic Triggers
A semantic trigger watches for content that is semantically similar to a registered concept. When a new memory or event crosses the similarity threshold, the trigger fires.
from hatidata import HatiDataClient
client = HatiDataClient(
host="localhost",
port=5439,
api_key="hd_live_your_api_key",
)
# Register a trigger that fires when content related to customer churn is detected
client.triggers.register(
name="churn-risk-detector",
concept="customer is unhappy, wants to cancel, considering alternatives, churn risk",
threshold=0.78,
action={
"type": "webhook",
"url": "https://your-app.com/webhooks/churn-alert",
"secret": "whsec_your_webhook_secret",
},
cooldown_seconds=300, # Prevent repeated firing within 5 minutes
metadata={
"team": "customer-success",
"severity": "high",
},
)
# Register a trigger for compliance-sensitive content
client.triggers.register(
name="compliance-flag",
concept="regulatory violation, data breach, unauthorized access, compliance failure",
threshold=0.80,
action={
"type": "webhook",
"url": "https://your-app.com/webhooks/compliance-alert",
"secret": "whsec_compliance_secret",
},
cooldown_seconds=60,
metadata={
"team": "legal",
"severity": "critical",
},
)
# Register a trigger that notifies an agent instead of calling a webhook
client.triggers.register(
name="opportunity-detector",
concept="upsell opportunity, expansion revenue, customer wants more features",
threshold=0.75,
action={
"type": "agent_notify",
"agent_id": "sales-agent",
},
cooldown_seconds=600,
)
print("Triggers registered.")
Step 2: Understand Trigger Evaluation
Triggers are evaluated using a two-stage process:
-
ANN Pre-filter: When new content is stored, its embedding is compared against all registered trigger concept embeddings using approximate nearest neighbor search in the vector index. This returns the top-K trigger candidates.
-
Exact Cosine Verification: Each candidate's cosine similarity is computed exactly. If the score exceeds the trigger's threshold and the cooldown period has elapsed, the trigger fires.
New Content → Embed → ANN Pre-filter (top-K triggers) → Exact Cosine → Fire if > threshold
This two-stage approach scales to thousands of registered triggers with sub-50ms evaluation latency.
Step 3: Configure Webhook Delivery
Webhook triggers deliver a POST request to the configured URL. Each request includes an HMAC-SHA256 signature for verification.
Webhook Payload
{
"trigger_id": "trg_abc123",
"trigger_name": "churn-risk-detector",
"fired_at": "2025-12-15T10:30:00Z",
"similarity_score": 0.84,
"threshold": 0.78,
"content": "Customer expressed frustration and mentioned they are evaluating competitors...",
"content_id": "mem_xyz789",
"agent_id": "support-agent",
"metadata": {
"team": "customer-success",
"severity": "high"
}
}
Signature Verification
The X-HatiData-Signature header contains an HMAC-SHA256 signature of the payload body:
import hmac
import hashlib
from flask import Flask, request, jsonify
app = Flask(__name__)
WEBHOOK_SECRET = "whsec_your_webhook_secret"
@app.route("/webhooks/churn-alert", methods=["POST"])
def churn_alert():
# Verify the signature
signature = request.headers.get("X-HatiData-Signature", "")
expected = hmac.new(
WEBHOOK_SECRET.encode(),
request.data,
hashlib.sha256,
).hexdigest()
if not hmac.compare_digest(f"sha256={expected}", signature):
return jsonify({"error": "Invalid signature"}), 401
payload = request.json
print(f"Churn risk detected! Score: {payload['similarity_score']:.2f}")
print(f"Content: {payload['content'][:200]}")
# Take action: notify customer success team, create ticket, etc.
return jsonify({"received": True}), 200
Step 4: Test Triggers
Use the test endpoint to verify triggers fire correctly without persisting test content:
# Test the churn detector with content that should trigger
result = client.triggers.test(
name="churn-risk-detector",
content="I am very disappointed with the service and considering switching to another provider.",
)
print(f"Would fire: {result['would_fire']}")
print(f"Similarity: {result['similarity_score']:.3f}")
print(f"Threshold: {result['threshold']}")
# Test with content that should NOT trigger
result = client.triggers.test(
name="churn-risk-detector",
content="Thanks for the quick response, everything is working great now!",
)
print(f"Would fire: {result['would_fire']}")
print(f"Similarity: {result['similarity_score']:.3f}")
Expected output:
Would fire: True
Similarity: 0.842
Threshold: 0.78
Would fire: False
Similarity: 0.321
Threshold: 0.78
Step 5: Store Content and Watch Triggers Fire
When you store memories or log events through the normal API, triggers evaluate automatically:
from hatidata.memory import MemoryClient
memory = MemoryClient(client)
# This should fire the churn-risk-detector trigger
memory.store(
agent_id="support-agent",
content=(
"Customer called in very frustrated. They mentioned that competitors "
"offer better pricing and they are actively evaluating alternatives. "
"Requested a call with account management."
),
metadata={
"customer_id": "cust-456",
"channel": "phone",
"sentiment": "negative",
},
)
# This should fire the opportunity-detector trigger
memory.store(
agent_id="sales-agent",
content=(
"Customer asked about enterprise features and wants to add 50 more seats. "
"They are expanding their AI team and need higher rate limits."
),
metadata={
"customer_id": "cust-789",
"channel": "email",
},
)
Step 6: Monitor Trigger Activity
Query trigger events directly with SQL:
-- Recent trigger firings
SELECT
trigger_name,
similarity_score,
content_id,
fired_at
FROM _hatidata_trigger_events
ORDER BY fired_at DESC
LIMIT 20;
-- Trigger firing frequency by name
SELECT
trigger_name,
COUNT(*) AS fire_count,
AVG(similarity_score) AS avg_score,
MIN(fired_at) AS first_fired,
MAX(fired_at) AS last_fired
FROM _hatidata_trigger_events
GROUP BY trigger_name
ORDER BY fire_count DESC;
-- List all registered triggers
SELECT name, concept, threshold, cooldown_seconds, action_type
FROM _hatidata_triggers
ORDER BY name;
What You Built
| Capability | HatiData Feature |
|---|---|
| Concept-based event detection | triggers.register() with cosine similarity |
| Webhook delivery with auth | HMAC-SHA256 signed payloads |
| Agent-to-agent notifications | agent_notify action type |
| Cooldown debouncing | cooldown_seconds parameter |
| Dry-run testing | triggers.test() endpoint |
Related Concepts
- Semantic Triggers -- How triggers work under the hood
- Trigger Recipes -- Advanced trigger patterns
- Webhook Events -- Full webhook event reference
- Embedding Pipeline -- How content is embedded
- MCP Tools Reference --
register_trigger,test_triggerMCP tools