Skip to main content

Human-in-the-Loop Integration

HatiData's semantic trigger system lets you insert human checkpoints into agent workflows. When an agent's action or query matches a concept you define, HatiData can flag it for human review, fire a webhook, notify the agent, or write an audit event -- all without the agent needing to implement the review logic itself.

This guide covers the full HITL flow: defining triggers, receiving notifications, reviewing flagged items, and approving or rejecting them.

How It Works

Agent writes data or calls MCP tool


Semantic Trigger Evaluator
(cosine similarity against registered concepts)

▼ threshold exceeded
Trigger Action fires:
├── FlagForReview → Dashboard review queue + API
├── Webhook → HMAC-signed POST to your endpoint
├── AgentNotify → Agent's offline inbox
└── WriteEvent → Audit log entry
  1. You register a semantic trigger with a concept (natural language description) and a threshold (0.0 to 1.0).
  2. When an agent performs an action, the trigger evaluator computes the cosine similarity between the action's embedding and all registered trigger concepts.
  3. If the similarity exceeds the threshold, the configured action fires.
  4. For FlagForReview, the item appears in the dashboard review queue and is accessible via the API.

Trigger Actions

FlagForReview

The primary HITL action. Creates a review item that a human must approve or reject before the agent can proceed (if your workflow is configured to block on review).

# Register a trigger that flags high-value operations for review
import requests

response = requests.post(
"https://your-env.hatidata.com/mcp",
headers={"ApiKey": "hd_live_...", "Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "register_trigger",
"arguments": {
"name": "high-value-transaction",
"concept": "financial transaction exceeding $50,000 or bulk account modification",
"threshold": 0.82,
"action": {
"type": "FlagForReview",
"metadata": {
"priority": "high",
"team": "finance-ops"
}
}
}
},
"id": 1
},
)
print(response.json())

Flagged items include the original action context, the trigger that fired, the similarity score, and any metadata you attached to the trigger.

Webhook (HMAC-SHA256)

Sends a signed HTTP POST to your endpoint when the trigger fires. Use this to integrate with Slack, PagerDuty, custom review systems, or any external workflow.

{
"name": "pii-detection",
"concept": "query accessing personally identifiable information such as SSN, email, phone, or address",
"threshold": 0.80,
"action": {
"type": "Webhook",
"url": "https://your-app.com/webhooks/hatidata-review",
"secret": "whsec_your_signing_secret"
}
}

The webhook payload follows the format documented in Webhook Events. Every request includes an X-HatiData-Signature header with the HMAC-SHA256 digest.

AgentNotify

Deposits a notification in the agent's offline inbox. The agent retrieves pending notifications on its next poll. Use this when you want the agent to self-correct or pause without requiring synchronous human review.

{
"name": "schema-drift-warning",
"concept": "agent creating or altering table schemas outside expected patterns",
"threshold": 0.78,
"action": {
"type": "AgentNotify",
"message": "Your schema change was flagged. Review the change and confirm it matches the approved data model."
}
}

Agents poll their inbox via the MCP server:

response = requests.post(
"https://your-env.hatidata.com/mcp",
headers={"ApiKey": "hd_live_...", "Content-Type": "application/json"},
json={
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "get_agent_state",
"arguments": {
"agent_id": "analytics-bot",
"key": "_notifications"
}
},
"id": 2
},
)
notifications = response.json()["result"]

Inbox entries have a configurable TTL (default: 24 hours) and a bounded queue size (default: 100 entries per agent). Oldest entries are evicted when the queue is full.

WriteEvent

Writes a structured event to the audit log without blocking the agent or notifying anyone. Use this for compliance logging where you need a record that a trigger fired but do not need human intervention.

{
"name": "cross-schema-access",
"concept": "agent querying tables across multiple schemas in a single session",
"threshold": 0.75,
"action": {
"type": "WriteEvent",
"event_type": "trigger.cross_schema_access"
}
}

Dashboard: Review Queue

The HatiData Dashboard at app.hatidata.com includes a Triggers page that shows:

  • Active Triggers: All registered triggers with their concept, threshold, and action type.
  • Fired Triggers: A chronological log of every trigger evaluation that exceeded its threshold.
  • Review Queue: Items flagged via FlagForReview that are awaiting human decision (approve or reject).

From the review queue, a reviewer can:

  1. View the full context: the agent action that triggered the flag, the similarity score, and the trigger definition.
  2. Approve the action -- the review record is updated and the agent can proceed.
  3. Reject the action -- the review record is updated with a rejection reason.
  4. Add a comment for the agent or other reviewers.

API: Managing Reviews Programmatically

List Pending Reviews

curl -X GET "https://your-env.hatidata.com/v1/triggers/reviews?status=pending" \
-H "Authorization: Bearer <jwt>"

Response:

{
"reviews": [
{
"id": "rev_8f3a2b1c-4d5e-6f7a-8b9c-0d1e2f3a4b5c",
"trigger_id": "trg_a1b2c3d4",
"trigger_name": "high-value-transaction",
"agent_id": "finance-bot",
"status": "pending",
"similarity_score": 0.91,
"context": {
"query": "INSERT INTO transactions (amount, recipient) VALUES (75000, 'ACME Corp')",
"table": "transactions",
"action": "write"
},
"metadata": {
"priority": "high",
"team": "finance-ops"
},
"created_at": "2025-01-15T10:30:00.123Z"
}
],
"total": 1,
"has_more": false
}

Approve a Review

curl -X POST "https://your-env.hatidata.com/v1/triggers/reviews/rev_8f3a2b1c/approve" \
-H "Authorization: Bearer <jwt>" \
-H "Content-Type: application/json" \
-d '{
"reviewer_id": "alice@company.com",
"comment": "Approved. Verified with finance team that this is the expected quarterly payment."
}'

Response:

{
"id": "rev_8f3a2b1c-4d5e-6f7a-8b9c-0d1e2f3a4b5c",
"status": "approved",
"reviewer_id": "alice@company.com",
"comment": "Approved. Verified with finance team that this is the expected quarterly payment.",
"reviewed_at": "2025-01-15T10:45:00.456Z"
}

Reject a Review

curl -X POST "https://your-env.hatidata.com/v1/triggers/reviews/rev_8f3a2b1c/reject" \
-H "Authorization: Bearer <jwt>" \
-H "Content-Type: application/json" \
-d '{
"reviewer_id": "alice@company.com",
"reason": "Amount exceeds single-transaction limit. Agent should split into two transfers."
}'

Webhook Receiver Example

A complete Python webhook receiver that verifies the HMAC signature and routes trigger events to a Slack channel:

import hmac
import hashlib
import json
from flask import Flask, request, jsonify

app = Flask(__name__)
WEBHOOK_SECRET = "whsec_your_signing_secret"
SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/T.../B.../xxx"

def verify_signature(payload: bytes, signature: str) -> bool:
"""Verify the HMAC-SHA256 signature from HatiData."""
expected = hmac.new(
WEBHOOK_SECRET.encode("utf-8"),
payload,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)


@app.route("/webhooks/hatidata-review", methods=["POST"])
def handle_trigger_webhook():
# 1. Verify signature
signature = request.headers.get("X-HatiData-Signature", "")
if not verify_signature(request.data, signature):
return jsonify({"error": "Invalid signature"}), 401

# 2. Parse event
event = request.json
event_type = request.headers.get("X-HatiData-Event-Type")
event_id = request.headers.get("X-HatiData-Event-Id")

# 3. Route based on event type
if event_type == "trigger.fired":
trigger_name = event["trigger"]["name"]
agent_id = event["agent_id"]
score = event["similarity_score"]
context = event.get("context", {})

# Post to Slack
import requests as req
req.post(SLACK_WEBHOOK_URL, json={
"text": (
f"*HatiData Trigger Fired*\n"
f"Trigger: {trigger_name}\n"
f"Agent: {agent_id}\n"
f"Score: {score:.2f}\n"
f"Query: `{context.get('query', 'N/A')}`\n"
f"Review: https://app.hatidata.com/triggers/reviews"
)
})

# 4. Acknowledge receipt
return jsonify({"received": True, "event_id": event_id}), 200


if __name__ == "__main__":
app.run(port=8000)

Approval via Python SDK

The Python SDK provides helper methods for the review workflow:

from hatidata import Client

client = Client(
host="your-env.hatidata.com",
port=5439,
api_key="hd_live_...",
ssl=True,
)

# List pending reviews
pending = client.list_reviews(status="pending")
for review in pending:
print(f"[{review.id}] {review.trigger_name} - score: {review.similarity_score:.2f}")
print(f" Agent: {review.agent_id}")
print(f" Query: {review.context.get('query')}")
print()

# Approve a specific review
client.approve_review(
review_id="rev_8f3a2b1c",
reviewer_id="alice@company.com",
comment="Verified and approved.",
)

# Reject a specific review
client.reject_review(
review_id="rev_9d4e5f6a",
reviewer_id="alice@company.com",
reason="Unauthorized data access pattern. Agent should use the restricted view instead.",
)

Best Practices

Threshold Tuning

  • Start with a threshold of 0.80 and adjust based on false positive rates.
  • Thresholds below 0.70 produce too many false positives for most concepts.
  • Thresholds above 0.90 may miss relevant actions due to embedding variance.
  • Use the test_trigger MCP tool to evaluate your threshold against sample inputs before deploying.

Trigger Design

  • Write concepts in clear, specific natural language. "Financial transaction over $50,000" is better than "big money."
  • Use multiple narrow triggers rather than one broad trigger. Three triggers for "PII access," "financial threshold," and "schema modification" are more useful than one for "anything suspicious."
  • Attach metadata (priority, team, category) to triggers so your review workflow can route appropriately.

Cooldown and Deduplication

Triggers include a configurable cooldown period (default: 60 seconds) to prevent the same action from firing the same trigger repeatedly:

{
"name": "pii-access",
"concept": "accessing PII fields",
"threshold": 0.82,
"cooldown_seconds": 300,
"action": {"type": "FlagForReview"}
}

During the cooldown window, subsequent matches against the same trigger for the same agent are suppressed. The cooldown resets after it expires.

Blocking vs Non-Blocking

By default, triggers are non-blocking: the agent's action proceeds and the trigger fires asynchronously. If you need the agent to wait for human approval before proceeding, implement a polling loop in your agent code:

# Agent-side: wait for approval after a flagged action
import time

# Perform the action (trigger fires asynchronously)
client.query("INSERT INTO transactions (amount) VALUES (75000)")

# Check for pending review
while True:
state = client.mcp_call("get_agent_state", {
"agent_id": "finance-bot",
"key": "_pending_review"
})
if state and state.get("status") == "approved":
print("Action approved, proceeding")
break
elif state and state.get("status") == "rejected":
print(f"Action rejected: {state.get('reason')}")
# Handle rejection (rollback, alternative action, etc.)
break
time.sleep(5) # Poll every 5 seconds

This pattern keeps the blocking logic in the agent rather than in HatiData, giving you full control over timeout behavior and fallback strategies.

Stay in the loop

Product updates, engineering deep-dives, and agent-native insights. No spam.