Skip to main content

Semantic Trigger Pipeline

In this tutorial you will build an event-driven AI pipeline using HatiData's semantic triggers. Triggers fire when stored content crosses a cosine similarity threshold against a registered concept, enabling automated responses to semantically meaningful events.

By the end you will have:

  • Registered triggers that watch for specific semantic concepts
  • Configured webhook delivery with HMAC-SHA256 signature verification
  • Tested triggers with different content to verify firing behavior
  • A monitoring dashboard query for trigger activity

Prerequisites

  • Python 3.10+
  • HatiData proxy running with an embedding provider configured
  • hatidata SDK installed
pip install hatidata requests
export HATIDATA_API_KEY="hd_live_your_api_key"
export HATIDATA_HOST="localhost"

Step 1: Register Semantic Triggers

A semantic trigger watches for content that is semantically similar to a registered concept. When a new memory or event crosses the similarity threshold, the trigger fires.

from hatidata import HatiDataClient

client = HatiDataClient(
host="localhost",
port=5439,
api_key="hd_live_your_api_key",
)

# Register a trigger that fires when content related to customer churn is detected
client.triggers.register(
name="churn-risk-detector",
concept="customer is unhappy, wants to cancel, considering alternatives, churn risk",
threshold=0.78,
action={
"type": "webhook",
"url": "https://your-app.com/webhooks/churn-alert",
"secret": "whsec_your_webhook_secret",
},
cooldown_seconds=300, # Prevent repeated firing within 5 minutes
metadata={
"team": "customer-success",
"severity": "high",
},
)

# Register a trigger for compliance-sensitive content
client.triggers.register(
name="compliance-flag",
concept="regulatory violation, data breach, unauthorized access, compliance failure",
threshold=0.80,
action={
"type": "webhook",
"url": "https://your-app.com/webhooks/compliance-alert",
"secret": "whsec_compliance_secret",
},
cooldown_seconds=60,
metadata={
"team": "legal",
"severity": "critical",
},
)

# Register a trigger that notifies an agent instead of calling a webhook
client.triggers.register(
name="opportunity-detector",
concept="upsell opportunity, expansion revenue, customer wants more features",
threshold=0.75,
action={
"type": "agent_notify",
"agent_id": "sales-agent",
},
cooldown_seconds=600,
)

print("Triggers registered.")

Step 2: Understand Trigger Evaluation

Triggers are evaluated using a two-stage process:

  1. ANN Pre-filter: When new content is stored, its embedding is compared against all registered trigger concept embeddings using approximate nearest neighbor search in the vector index. This returns the top-K trigger candidates.

  2. Exact Cosine Verification: Each candidate's cosine similarity is computed exactly. If the score exceeds the trigger's threshold and the cooldown period has elapsed, the trigger fires.

New Content → Embed → ANN Pre-filter (top-K triggers) → Exact Cosine → Fire if > threshold

This two-stage approach scales to thousands of registered triggers with sub-50ms evaluation latency.


Step 3: Configure Webhook Delivery

Webhook triggers deliver a POST request to the configured URL. Each request includes an HMAC-SHA256 signature for verification.

Webhook Payload

{
"trigger_id": "trg_abc123",
"trigger_name": "churn-risk-detector",
"fired_at": "2025-12-15T10:30:00Z",
"similarity_score": 0.84,
"threshold": 0.78,
"content": "Customer expressed frustration and mentioned they are evaluating competitors...",
"content_id": "mem_xyz789",
"agent_id": "support-agent",
"metadata": {
"team": "customer-success",
"severity": "high"
}
}

Signature Verification

The X-HatiData-Signature header contains an HMAC-SHA256 signature of the payload body:

import hmac
import hashlib
from flask import Flask, request, jsonify

app = Flask(__name__)
WEBHOOK_SECRET = "whsec_your_webhook_secret"

@app.route("/webhooks/churn-alert", methods=["POST"])
def churn_alert():
# Verify the signature
signature = request.headers.get("X-HatiData-Signature", "")
expected = hmac.new(
WEBHOOK_SECRET.encode(),
request.data,
hashlib.sha256,
).hexdigest()

if not hmac.compare_digest(f"sha256={expected}", signature):
return jsonify({"error": "Invalid signature"}), 401

payload = request.json
print(f"Churn risk detected! Score: {payload['similarity_score']:.2f}")
print(f"Content: {payload['content'][:200]}")

# Take action: notify customer success team, create ticket, etc.
return jsonify({"received": True}), 200

Step 4: Test Triggers

Use the test endpoint to verify triggers fire correctly without persisting test content:

# Test the churn detector with content that should trigger
result = client.triggers.test(
name="churn-risk-detector",
content="I am very disappointed with the service and considering switching to another provider.",
)
print(f"Would fire: {result['would_fire']}")
print(f"Similarity: {result['similarity_score']:.3f}")
print(f"Threshold: {result['threshold']}")

# Test with content that should NOT trigger
result = client.triggers.test(
name="churn-risk-detector",
content="Thanks for the quick response, everything is working great now!",
)
print(f"Would fire: {result['would_fire']}")
print(f"Similarity: {result['similarity_score']:.3f}")

Expected output:

Would fire: True
Similarity: 0.842
Threshold: 0.78

Would fire: False
Similarity: 0.321
Threshold: 0.78

Step 5: Store Content and Watch Triggers Fire

When you store memories or log events through the normal API, triggers evaluate automatically:

from hatidata.memory import MemoryClient

memory = MemoryClient(client)

# This should fire the churn-risk-detector trigger
memory.store(
agent_id="support-agent",
content=(
"Customer called in very frustrated. They mentioned that competitors "
"offer better pricing and they are actively evaluating alternatives. "
"Requested a call with account management."
),
metadata={
"customer_id": "cust-456",
"channel": "phone",
"sentiment": "negative",
},
)

# This should fire the opportunity-detector trigger
memory.store(
agent_id="sales-agent",
content=(
"Customer asked about enterprise features and wants to add 50 more seats. "
"They are expanding their AI team and need higher rate limits."
),
metadata={
"customer_id": "cust-789",
"channel": "email",
},
)

Step 6: Monitor Trigger Activity

Query trigger events directly with SQL:

-- Recent trigger firings
SELECT
trigger_name,
similarity_score,
content_id,
fired_at
FROM _hatidata_trigger_events
ORDER BY fired_at DESC
LIMIT 20;

-- Trigger firing frequency by name
SELECT
trigger_name,
COUNT(*) AS fire_count,
AVG(similarity_score) AS avg_score,
MIN(fired_at) AS first_fired,
MAX(fired_at) AS last_fired
FROM _hatidata_trigger_events
GROUP BY trigger_name
ORDER BY fire_count DESC;

-- List all registered triggers
SELECT name, concept, threshold, cooldown_seconds, action_type
FROM _hatidata_triggers
ORDER BY name;

What You Built

CapabilityHatiData Feature
Concept-based event detectiontriggers.register() with cosine similarity
Webhook delivery with authHMAC-SHA256 signed payloads
Agent-to-agent notificationsagent_notify action type
Cooldown debouncingcooldown_seconds parameter
Dry-run testingtriggers.test() endpoint

Stay in the loop

Product updates, engineering deep-dives, and agent-native insights. No spam.