Human-in-the-Loop Integration
HatiData's semantic trigger system lets you insert human checkpoints into agent workflows. When an agent's action or query matches a concept you define, HatiData can flag it for human review, fire a webhook, notify the agent, or write an audit event -- all without the agent needing to implement the review logic itself.
This guide covers the full HITL flow: defining triggers, receiving notifications, reviewing flagged items, and approving or rejecting them.
How It Works
Agent writes data or calls MCP tool
│
▼
Semantic Trigger Evaluator
(cosine similarity against registered concepts)
│
▼ threshold exceeded
Trigger Action fires:
├── FlagForReview → Dashboard review queue + API
├── Webhook → HMAC-signed POST to your endpoint
├── AgentNotify → Agent's offline inbox
└── WriteEvent → Audit log entry
- You register a semantic trigger with a concept (natural language description) and a threshold (0.0 to 1.0).
- When an agent performs an action, the trigger evaluator computes the cosine similarity between the action's embedding and all registered trigger concepts.
- If the similarity exceeds the threshold, the configured action fires.
- For
FlagForReview, the item appears in the dashboard review queue and is accessible via the API.
Trigger Actions
FlagForReview
The primary HITL action. Creates a review item that a human must approve or reject before the agent can proceed (if your workflow is configured to block on review).
# Register a trigger that flags high-value operations for review
import requests
response = requests.post(
"https://your-env.hatidata.com/mcp",
headers={"ApiKey": "hd_live_...", "Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "register_trigger",
"arguments": {
"name": "high-value-transaction",
"concept": "financial transaction exceeding $50,000 or bulk account modification",
"threshold": 0.82,
"action": {
"type": "FlagForReview",
"metadata": {
"priority": "high",
"team": "finance-ops"
}
}
}
},
"id": 1
},
)
print(response.json())
Flagged items include the original action context, the trigger that fired, the similarity score, and any metadata you attached to the trigger.
Webhook (HMAC-SHA256)
Sends a signed HTTP POST to your endpoint when the trigger fires. Use this to integrate with Slack, PagerDuty, custom review systems, or any external workflow.
{
"name": "pii-detection",
"concept": "query accessing personally identifiable information such as SSN, email, phone, or address",
"threshold": 0.80,
"action": {
"type": "Webhook",
"url": "https://your-app.com/webhooks/hatidata-review",
"secret": "whsec_your_signing_secret"
}
}
The webhook payload follows the format documented in Webhook Events. Every request includes an X-HatiData-Signature header with the HMAC-SHA256 digest.
AgentNotify
Deposits a notification in the agent's offline inbox. The agent retrieves pending notifications on its next poll. Use this when you want the agent to self-correct or pause without requiring synchronous human review.
{
"name": "schema-drift-warning",
"concept": "agent creating or altering table schemas outside expected patterns",
"threshold": 0.78,
"action": {
"type": "AgentNotify",
"message": "Your schema change was flagged. Review the change and confirm it matches the approved data model."
}
}
Agents poll their inbox via the MCP server:
response = requests.post(
"https://your-env.hatidata.com/mcp",
headers={"ApiKey": "hd_live_...", "Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "get_agent_state",
"arguments": {
"agent_id": "analytics-bot",
"key": "_notifications"
}
},
"id": 2
},
)
notifications = response.json()["result"]
Inbox entries have a configurable TTL (default: 24 hours) and a bounded queue size (default: 100 entries per agent). Oldest entries are evicted when the queue is full.
WriteEvent
Writes a structured event to the audit log without blocking the agent or notifying anyone. Use this for compliance logging where you need a record that a trigger fired but do not need human intervention.
{
"name": "cross-schema-access",
"concept": "agent querying tables across multiple schemas in a single session",
"threshold": 0.75,
"action": {
"type": "WriteEvent",
"event_type": "trigger.cross_schema_access"
}
}
Dashboard: Review Queue
The HatiData Dashboard at app.hatidata.com includes a Triggers page that shows:
- Active Triggers: All registered triggers with their concept, threshold, and action type.
- Fired Triggers: A chronological log of every trigger evaluation that exceeded its threshold.
- Review Queue: Items flagged via
FlagForReviewthat are awaiting human decision (approve or reject).
From the review queue, a reviewer can:
- View the full context: the agent action that triggered the flag, the similarity score, and the trigger definition.
- Approve the action -- the review record is updated and the agent can proceed.
- Reject the action -- the review record is updated with a rejection reason.
- Add a comment for the agent or other reviewers.
API: Managing Reviews Programmatically
List Pending Reviews
curl -X GET "https://your-env.hatidata.com/v1/triggers/reviews?status=pending" \
-H "Authorization: Bearer <jwt>"
Response:
{
"reviews": [
{
"id": "rev_8f3a2b1c-4d5e-6f7a-8b9c-0d1e2f3a4b5c",
"trigger_id": "trg_a1b2c3d4",
"trigger_name": "high-value-transaction",
"agent_id": "finance-bot",
"status": "pending",
"similarity_score": 0.91,
"context": {
"query": "INSERT INTO transactions (amount, recipient) VALUES (75000, 'ACME Corp')",
"table": "transactions",
"action": "write"
},
"metadata": {
"priority": "high",
"team": "finance-ops"
},
"created_at": "2025-01-15T10:30:00.123Z"
}
],
"total": 1,
"has_more": false
}
Approve a Review
curl -X POST "https://your-env.hatidata.com/v1/triggers/reviews/rev_8f3a2b1c/approve" \
-H "Authorization: Bearer <jwt>" \
-H "Content-Type: application/json" \
-d '{
"reviewer_id": "alice@company.com",
"comment": "Approved. Verified with finance team that this is the expected quarterly payment."
}'
Response:
{
"id": "rev_8f3a2b1c-4d5e-6f7a-8b9c-0d1e2f3a4b5c",
"status": "approved",
"reviewer_id": "alice@company.com",
"comment": "Approved. Verified with finance team that this is the expected quarterly payment.",
"reviewed_at": "2025-01-15T10:45:00.456Z"
}
Reject a Review
curl -X POST "https://your-env.hatidata.com/v1/triggers/reviews/rev_8f3a2b1c/reject" \
-H "Authorization: Bearer <jwt>" \
-H "Content-Type: application/json" \
-d '{
"reviewer_id": "alice@company.com",
"reason": "Amount exceeds single-transaction limit. Agent should split into two transfers."
}'
Webhook Receiver Example
A complete Python webhook receiver that verifies the HMAC signature and routes trigger events to a Slack channel:
import hmac
import hashlib
import json
from flask import Flask, request, jsonify
app = Flask(__name__)
WEBHOOK_SECRET = "whsec_your_signing_secret"
SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/T.../B.../xxx"
def verify_signature(payload: bytes, signature: str) -> bool:
"""Verify the HMAC-SHA256 signature from HatiData."""
expected = hmac.new(
WEBHOOK_SECRET.encode("utf-8"),
payload,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
@app.route("/webhooks/hatidata-review", methods=["POST"])
def handle_trigger_webhook():
# 1. Verify signature
signature = request.headers.get("X-HatiData-Signature", "")
if not verify_signature(request.data, signature):
return jsonify({"error": "Invalid signature"}), 401
# 2. Parse event
event = request.json
event_type = request.headers.get("X-HatiData-Event-Type")
event_id = request.headers.get("X-HatiData-Event-Id")
# 3. Route based on event type
if event_type == "trigger.fired":
trigger_name = event["trigger"]["name"]
agent_id = event["agent_id"]
score = event["similarity_score"]
context = event.get("context", {})
# Post to Slack
import requests as req
req.post(SLACK_WEBHOOK_URL, json={
"text": (
f"*HatiData Trigger Fired*\n"
f"Trigger: {trigger_name}\n"
f"Agent: {agent_id}\n"
f"Score: {score:.2f}\n"
f"Query: `{context.get('query', 'N/A')}`\n"
f"Review: https://app.hatidata.com/triggers/reviews"
)
})
# 4. Acknowledge receipt
return jsonify({"received": True, "event_id": event_id}), 200
if __name__ == "__main__":
app.run(port=8000)
Approval via Python SDK
The Python SDK provides helper methods for the review workflow:
from hatidata import Client
client = Client(
host="your-env.hatidata.com",
port=5439,
api_key="hd_live_...",
ssl=True,
)
# List pending reviews
pending = client.list_reviews(status="pending")
for review in pending:
print(f"[{review.id}] {review.trigger_name} - score: {review.similarity_score:.2f}")
print(f" Agent: {review.agent_id}")
print(f" Query: {review.context.get('query')}")
print()
# Approve a specific review
client.approve_review(
review_id="rev_8f3a2b1c",
reviewer_id="alice@company.com",
comment="Verified and approved.",
)
# Reject a specific review
client.reject_review(
review_id="rev_9d4e5f6a",
reviewer_id="alice@company.com",
reason="Unauthorized data access pattern. Agent should use the restricted view instead.",
)
Best Practices
Threshold Tuning
- Start with a threshold of 0.80 and adjust based on false positive rates.
- Thresholds below 0.70 produce too many false positives for most concepts.
- Thresholds above 0.90 may miss relevant actions due to embedding variance.
- Use the
test_triggerMCP tool to evaluate your threshold against sample inputs before deploying.
Trigger Design
- Write concepts in clear, specific natural language. "Financial transaction over $50,000" is better than "big money."
- Use multiple narrow triggers rather than one broad trigger. Three triggers for "PII access," "financial threshold," and "schema modification" are more useful than one for "anything suspicious."
- Attach metadata (priority, team, category) to triggers so your review workflow can route appropriately.
Cooldown and Deduplication
Triggers include a configurable cooldown period (default: 60 seconds) to prevent the same action from firing the same trigger repeatedly:
{
"name": "pii-access",
"concept": "accessing PII fields",
"threshold": 0.82,
"cooldown_seconds": 300,
"action": {"type": "FlagForReview"}
}
During the cooldown window, subsequent matches against the same trigger for the same agent are suppressed. The cooldown resets after it expires.
Blocking vs Non-Blocking
By default, triggers are non-blocking: the agent's action proceeds and the trigger fires asynchronously. If you need the agent to wait for human approval before proceeding, implement a polling loop in your agent code:
# Agent-side: wait for approval after a flagged action
import time
# Perform the action (trigger fires asynchronously)
client.query("INSERT INTO transactions (amount) VALUES (75000)")
# Check for pending review
while True:
state = client.mcp_call("get_agent_state", {
"agent_id": "finance-bot",
"key": "_pending_review"
})
if state and state.get("status") == "approved":
print("Action approved, proceeding")
break
elif state and state.get("status") == "rejected":
print(f"Action rejected: {state.get('reason')}")
# Handle rejection (rollback, alternative action, etc.)
break
time.sleep(5) # Poll every 5 seconds
This pattern keeps the blocking logic in the agent rather than in HatiData, giving you full control over timeout behavior and fallback strategies.