Skip to main content

Build a Fraud Detection Agent

In this tutorial you will build a fraud detection agent that monitors transaction queries for suspicious patterns, logs every reasoning step to an immutable audit ledger, and fires webhook alerts when anomalies are detected.

By the end you will have an agent that:

  • Registers semantic triggers to catch unusual query patterns, high-value transactions, and geographic anomalies
  • Queries a transaction data layer with per-agent access controls enforced automatically
  • Logs all reasoning steps to a hash-chained chain-of-thought ledger for regulatory compliance
  • Fires webhook alerts when triggers fire and replays decisions for post-incident audit

Prerequisites

  • Python 3.10 or later
  • A HatiData account and API key (sign up)
  • hatidata Python SDK installed
pip install hatidata

Set your credentials as environment variables:

export HATIDATA_API_KEY="hd_your_api_key"
export HATIDATA_ORG="your-org-slug"
export FRAUD_WEBHOOK_URL="https://your-alert-endpoint.example.com/webhooks/fraud"

Step 1: Set Up HatiData with Transaction Data

Create the client and define the transaction data layer. The fraud agent will only ever receive a read-only scoped API key so it cannot modify records.

import os
from hatidata import HatiDataClient

# Full admin client for schema setup only
admin_client = HatiDataClient(
api_key=os.environ["HATIDATA_API_KEY"],
org=os.environ["HATIDATA_ORG"],
)

admin_client.execute("""
CREATE TABLE IF NOT EXISTS transactions (
txn_id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
amount_usd NUMERIC(18, 2) NOT NULL,
merchant TEXT NOT NULL,
country_code TEXT NOT NULL,
ip_address TEXT,
txn_type TEXT NOT NULL, -- purchase, withdrawal, transfer
flagged BOOLEAN DEFAULT false,
created_at TIMESTAMPTZ DEFAULT now()
)
""")

admin_client.execute("""
CREATE TABLE IF NOT EXISTS accounts (
account_id TEXT PRIMARY KEY,
owner_name TEXT NOT NULL,
home_country TEXT NOT NULL,
risk_tier TEXT DEFAULT 'standard', -- standard, elevated, restricted
created_at TIMESTAMPTZ DEFAULT now()
)
""")

print("Transaction data layer ready.")

Seed sample transactions that include a few anomalies:

import random
from datetime import datetime, timedelta, timezone

transactions = [
("txn-001", "acct-42", 29.99, "Amazon", "US", "192.168.1.1", "purchase", False),
("txn-002", "acct-42", 14500.00, "Wire Transfer Co", "RU", "45.33.21.99", "transfer", False),
("txn-003", "acct-99", 9.99, "Netflix", "US", "10.0.0.5", "purchase", False),
("txn-004", "acct-42", 8750.00, "Crypto Exchange", "NG", "212.5.88.11", "withdrawal", False),
("txn-005", "acct-77", 199.00, "Apple Store", "GB", "86.24.11.2", "purchase", False),
("txn-006", "acct-42", 350.00, "ATM Withdrawal", "CN", "103.44.22.9", "withdrawal", False),
]

admin_client.executemany(
"""INSERT INTO transactions
(txn_id, account_id, amount_usd, merchant, country_code, ip_address, txn_type, flagged)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
transactions,
)

admin_client.executemany(
"INSERT INTO accounts (account_id, owner_name, home_country) VALUES (?, ?, ?)",
[
("acct-42", "Jordan Lee", "US"),
("acct-77", "Sam Reeves", "GB"),
("acct-99", "Alex Kim", "US"),
],
)

print(f"Seeded {len(transactions)} transactions.")

Create a read-only API key scoped to the fraud detection agent:

fraud_key = admin_client.create_api_key(
name="fraud-agent-v1",
scope="read",
tables=["transactions", "accounts"],
)
print(f"Fraud agent key: {fraud_key['key'][:12]}...")

# Instantiate a restricted client for the agent itself
fraud_client = HatiDataClient(
api_key=fraud_key["key"],
org=os.environ["HATIDATA_ORG"],
)

Step 2: Register Semantic Triggers

Semantic triggers watch for query patterns that match a natural-language concept. When a trigger fires, HatiData can call a webhook, notify the agent, or flag a row for review.

from hatidata.triggers import TriggerClient, TriggerAction

triggers = TriggerClient(admin_client)

# Trigger 1: unusual query patterns (e.g., bulk enumeration of accounts)
triggers.register(
agent_id="fraud-agent-v1",
name="unusual-query-pattern",
concept="bulk enumeration of accounts or sequential scan of transaction IDs",
action=TriggerAction.WEBHOOK,
webhook_url=os.environ["FRAUD_WEBHOOK_URL"],
cooldown_seconds=60,
metadata={"severity": "medium", "team": "fraud-ops"},
)

# Trigger 2: high-value transaction in a foreign country
triggers.register(
agent_id="fraud-agent-v1",
name="high-value-foreign-transaction",
concept="large monetary transfer or withdrawal in a country different from the account home country",
action=TriggerAction.WEBHOOK,
webhook_url=os.environ["FRAUD_WEBHOOK_URL"],
cooldown_seconds=30,
metadata={"severity": "high", "team": "fraud-ops"},
)

# Trigger 3: geographic anomaly — multiple countries in short window
triggers.register(
agent_id="fraud-agent-v1",
name="geographic-anomaly",
concept="transactions originating from multiple different countries within a short time period",
action=TriggerAction.FLAG_FOR_REVIEW,
cooldown_seconds=120,
metadata={"severity": "high", "auto_restrict": True},
)

print("3 semantic triggers registered.")

You can verify the registered triggers at any time:

registered = triggers.list(agent_id="fraud-agent-v1")
for t in registered:
print(f" [{t['name']}] action={t['action']} cooldown={t['cooldown_seconds']}s")

Step 3: Build the Agent Query Loop

The agent periodically scans for anomalies using structured SQL queries. HatiData evaluates semantic triggers automatically against each query before execution.

from hatidata.cot import CotClient, StepType
import uuid

cot = CotClient(fraud_client)

def scan_high_value_foreign(session_id: str) -> list[dict]:
"""
Find transactions above $5,000 where the transaction country
differs from the account's registered home country.
"""
rows = fraud_client.query("""
SELECT
t.txn_id,
t.account_id,
t.amount_usd,
t.merchant,
t.country_code AS txn_country,
a.home_country,
t.created_at
FROM transactions t
JOIN accounts a ON t.account_id = a.account_id
WHERE t.amount_usd > 5000
AND t.country_code <> a.home_country
ORDER BY t.amount_usd DESC
""")

results = [dict(row) for row in rows]

cot.log_step(
agent_id="fraud-agent-v1",
session_id=session_id,
step_type=StepType.TOOL_CALL,
content=f"Scanned for high-value foreign transactions. Found {len(results)} candidates.",
metadata={"query": "high_value_foreign", "result_count": len(results)},
)

return results


def scan_geographic_velocity(session_id: str, account_id: str, window_hours: int = 24) -> list[dict]:
"""
Detect accounts with transactions in more than 2 distinct countries
within the specified time window.
"""
rows = fraud_client.query(f"""
SELECT
account_id,
COUNT(DISTINCT country_code) AS country_count,
ARRAY_AGG(DISTINCT country_code) AS countries,
MIN(created_at) AS first_txn,
MAX(created_at) AS last_txn
FROM transactions
WHERE account_id = '{account_id}'
AND created_at > now() - INTERVAL '{window_hours} hours'
GROUP BY account_id
HAVING COUNT(DISTINCT country_code) > 2
""")

results = [dict(row) for row in rows]

cot.log_step(
agent_id="fraud-agent-v1",
session_id=session_id,
step_type=StepType.TOOL_CALL,
content=(
f"Velocity check for account {account_id} over {window_hours}h window. "
f"Distinct countries: {results[0]['country_count'] if results else 0}"
),
metadata={"query": "geo_velocity", "account_id": account_id},
)

return results

Step 4: Log All Reasoning to the CoT Ledger

Wrap the full detection logic in a function that records every reasoning step. Regulators often require that automated decisions be explainable — the CoT ledger provides that evidence.

def run_fraud_scan() -> dict:
"""
Execute a full fraud scan cycle with complete chain-of-thought logging.
Returns a summary of findings.
"""
session_id = f"fraud-scan-{uuid.uuid4().hex[:8]}"
findings = []

cot.log_step(
agent_id="fraud-agent-v1",
session_id=session_id,
step_type=StepType.OBSERVATION,
content="Starting scheduled fraud scan cycle.",
metadata={"scan_type": "full", "triggered_by": "scheduler"},
)

# --- High-value foreign transactions ---
cot.log_step(
agent_id="fraud-agent-v1",
session_id=session_id,
step_type=StepType.REASONING,
content="Checking for high-value transactions in countries that differ from the account home country.",
)

foreign_txns = scan_high_value_foreign(session_id)

for txn in foreign_txns:
risk_level = "critical" if txn["amount_usd"] > 10000 else "high"

cot.log_step(
agent_id="fraud-agent-v1",
session_id=session_id,
step_type=StepType.REASONING,
content=(
f"Transaction {txn['txn_id']}: ${txn['amount_usd']:,.2f} at {txn['merchant']} "
f"in {txn['txn_country']} (home: {txn['home_country']}). Risk: {risk_level}."
),
metadata={"txn_id": txn["txn_id"], "risk_level": risk_level},
)

findings.append({
"type": "high_value_foreign",
"txn_id": txn["txn_id"],
"account_id": txn["account_id"],
"amount_usd": float(txn["amount_usd"]),
"risk_level": risk_level,
})

# --- Geographic velocity check on flagged accounts ---
flagged_accounts = {f["account_id"] for f in findings}

for account_id in flagged_accounts:
velocity_hits = scan_geographic_velocity(session_id, account_id, window_hours=48)

if velocity_hits:
hit = velocity_hits[0]
cot.log_step(
agent_id="fraud-agent-v1",
session_id=session_id,
step_type=StepType.REASONING,
content=(
f"Account {account_id} transacted in {hit['country_count']} countries "
f"within 48 hours: {hit['countries']}. Geographic anomaly confirmed."
),
metadata={"account_id": account_id, "countries": hit["countries"]},
)

findings.append({
"type": "geo_anomaly",
"account_id": account_id,
"country_count": hit["country_count"],
"countries": hit["countries"],
"risk_level": "critical",
})

# --- Conclusion ---
cot.log_step(
agent_id="fraud-agent-v1",
session_id=session_id,
step_type=StepType.CONCLUSION,
content=f"Scan complete. {len(findings)} findings. Critical: {sum(1 for f in findings if f.get('risk_level') == 'critical')}.",
metadata={"session_id": session_id, "finding_count": len(findings)},
)

return {"session_id": session_id, "findings": findings}


result = run_fraud_scan()
print(f"\nScan session: {result['session_id']}")
print(f"Findings: {len(result['findings'])}")
for f in result["findings"]:
print(f" [{f['risk_level'].upper()}] {f['type']} — account {f['account_id']}")

Step 5: Set Up Webhook Alerts

When a semantic trigger fires, HatiData sends a signed POST request to your webhook URL. Verify the HMAC-SHA256 signature before processing.

import hmac
import hashlib
import json
from flask import Flask, request, abort

app = Flask(__name__)
WEBHOOK_SECRET = os.environ.get("HATIDATA_WEBHOOK_SECRET", "change-me")

@app.route("/webhooks/fraud", methods=["POST"])
def handle_fraud_alert():
# Verify signature
signature = request.headers.get("X-HatiData-Signature", "")
expected = hmac.new(
WEBHOOK_SECRET.encode(),
request.data,
hashlib.sha256,
).hexdigest()

if not hmac.compare_digest(f"sha256={expected}", signature):
abort(403)

payload = request.get_json()
trigger_name = payload.get("trigger_name")
agent_id = payload.get("agent_id")
context = payload.get("context", {})

print(f"[ALERT] Trigger '{trigger_name}' fired for agent '{agent_id}'")
print(f" Context: {json.dumps(context, indent=2)}")

# Route to the appropriate on-call queue
if context.get("severity") == "critical":
notify_on_call(payload)
else:
create_ops_ticket(payload)

return {"status": "received"}, 200


def notify_on_call(payload: dict):
"""Placeholder: page the on-call fraud analyst."""
print(f" => Paging on-call analyst for: {payload['trigger_name']}")


def create_ops_ticket(payload: dict):
"""Placeholder: create a ticket in the fraud ops queue."""
print(f" => Creating ops ticket for: {payload['trigger_name']}")

Test the webhook locally before deploying by using the HatiData trigger test utility:

test_result = triggers.test(
name="high-value-foreign-transaction",
agent_id="fraud-agent-v1",
sample_query="SELECT * FROM transactions WHERE amount_usd > 8000 AND country_code = 'NG'",
)
print(f"Test fire: {test_result['fired']} — score: {test_result['score']:.3f}")

Step 6: Replay Decisions for Audit

Regulators may require a full explanation of why a specific account was flagged. Replay any session to reconstruct the exact reasoning chain.

def audit_scan_session(session_id: str):
"""Replay and verify the full reasoning trace for a fraud scan session."""
trace = cot.replay_session(
agent_id="fraud-agent-v1",
session_id=session_id,
)

print(f"\nAudit trace — session: {session_id}")
print(f"Steps recorded : {len(trace.steps)}")
print(f"Chain integrity: {'VALID' if trace.chain_valid else 'BROKEN - EVIDENCE MAY BE INADMISSIBLE'}")
print()

for i, step in enumerate(trace.steps, 1):
print(f" {i:02d}. [{step.step_type:<12}] {step.timestamp}")
print(f" {step.content[:140]}")
if step.metadata:
print(f" metadata: {step.metadata}")

audit_scan_session(result["session_id"])

Export the trace as a JSON report for submission to compliance teams:

def export_compliance_report(session_id: str, output_path: str):
"""Export a full CoT trace as a compliance-ready JSON file."""
trace = cot.replay_session(agent_id="fraud-agent-v1", session_id=session_id)

report = {
"session_id": session_id,
"agent_id": "fraud-agent-v1",
"chain_valid": trace.chain_valid,
"generated_at": datetime.now(timezone.utc).isoformat(),
"steps": [
{
"index": i,
"step_type": s.step_type,
"timestamp": s.timestamp,
"content": s.content,
"hash": s.hash,
"metadata": s.metadata,
}
for i, s in enumerate(trace.steps, 1)
],
}

with open(output_path, "w") as f:
json.dump(report, f, indent=2, default=str)

print(f"Compliance report written to {output_path}")

export_compliance_report(result["session_id"], "/tmp/fraud-audit-report.json")

What You Built

CapabilityHatiData Feature
Detect suspicious query patternsSemantic triggers with TriggerClient.register()
Per-agent read-only accessScoped API keys with table-level restrictions
Structured anomaly detectionStandard SQL against the transaction data layer
Immutable audit trailCotClient.log_step() with hash chaining
Webhook alerts with HMACTriggerAction.WEBHOOK + signature verification
Compliance report exportCotClient.replay_session() + JSON export

Stay in the loop

Product updates, engineering deep-dives, and agent-native insights. No spam.