Multi-Agent Coordination
Patterns for building agent systems where multiple agents collaborate, hand off work, and share state through the HatiData agent data layer.
Pattern 1: Shared Memory Between Agents
Agents in the same org can read from and write to a shared memory namespace while keeping their private memories isolated.
Architecture:
Agent A (writer) Agent B (reader)
│ │
│ store(agent_id="shared-kb") │ search(agent_id="shared-kb")
└──────────┐ ┌──────────────┘
▼ ▼
_hatidata_memory
(shared-knowledge-base namespace)
import hatidata
SHARED_NS = "shared-knowledge-base"
# Agent A writes a discovery to the shared namespace
agent_a = hatidata.Client(api_key="hd_agent_a_key")
agent_a.memory.store(
agent_id=SHARED_NS,
content="Customer segment 'enterprise' has 42% higher LTV than mid-market.",
metadata={"source": "agent-a", "topic": "customer-segments", "confidence": 0.91},
)
# Agent B reads the shared knowledge without knowing which agent wrote it
agent_b = hatidata.Client(api_key="hd_agent_b_key")
results = agent_b.memory.search(
agent_id=SHARED_NS,
query="Which customer segment has the highest lifetime value?",
top_k=5,
)
for r in results:
print(f"[{r.score:.2f}] {r.content}")
Considerations:
- Apply an ABAC write-restriction policy so only designated writer agents can populate the shared namespace.
- Run cleanup with a higher minimum importance threshold (e.g.,
0.5) on shared stores to prevent noise accumulation.
Pattern 2: Trigger-Based Coordination
Agent A writes data; a semantic trigger automatically notifies Agent B to act on the new information — no polling required.
Architecture:
Agent A HatiData Trigger Engine Agent B
│ │ │
│ store(content="...") │ │
│──────────────────────────────► │ │
│ │ cosine_similarity ≥ 0.85 │
│ │──────────────────────────► │
│ │ AgentNotify inbox push │
│ │ │ poll_inbox()
│ │ │◄──────────
# Register the trigger once (e.g., at startup or via the dashboard)
coordinator = hatidata.Client(api_key="hd_coordinator_key")
coordinator.triggers.register(
name="new-segment-analysis",
description="Notify Agent B when Agent A stores customer segment insights",
concept="customer segment lifetime value LTV churn analysis",
threshold=0.85,
action={
"type": "agent_notify",
"target_agent_id": "agent-b",
},
cooldown_seconds=60,
)
# Agent B polls its inbox for notifications
agent_b = hatidata.Client(api_key="hd_agent_b_key")
def agent_b_loop():
while True:
notifications = agent_b.inbox.poll(agent_id="agent-b", limit=10)
for notif in notifications:
handle_notification(notif)
time.sleep(5)
def handle_notification(notif):
# Retrieve the memory that triggered the notification
mem = agent_b.memory.get(memory_id=notif.source_memory_id)
print(f"Agent B received: {mem.content}")
# ... act on the new information
Considerations:
- Set
cooldown_secondsto avoid flooding Agent B's inbox during high-write bursts. - Use
webhookaction type instead ofagent_notifyfor external systems that need real-time push.
Pattern 3: Branch-Based Collaboration
Agents collaborate on experimental changes in isolated branches before promoting results to the main data layer.
Architecture:
Agent A (creator) Agent B (reviewer) Agent C (merger)
│ │ │
│ branch_create() │ │
│──────────────────────► │ │
│ branch_query/write() │ │
│ │ │
│ branch_id → handoff ──► │ │
│ │ branch_query() (read) │
│ │ approve/reject │
│ │──────────────────────► │
│ │ │ branch_merge()
│ │ │──────────────►
│ │ │ (or branch_discard)
# Agent A: create an experimental branch and write to it
agent_a = hatidata.Client(api_key="hd_agent_a_key")
branch = agent_a.branches.create(
label="q1-forecast-experiment",
description="Test updated revenue forecast model",
)
branch_id = branch.branch_id
# Write experimental results into the branch (does not affect main)
agent_a.branches.write(
branch_id=branch_id,
table="revenue_forecast",
rows=[
{"period": "2026-Q1", "model": "v2", "predicted_arr": 4_200_000},
],
)
# Hand off branch_id to Agent B for review
# (pass via shared memory, message queue, or direct API call)
agent_a.memory.store(
agent_id="shared-knowledge-base",
content=f"Branch {branch_id} ready for review: q1-forecast-experiment",
metadata={"branch_id": branch_id, "status": "pending-review"},
)
# Agent B: review the branch
agent_b = hatidata.Client(api_key="hd_agent_b_key")
preview = agent_b.branches.query(
branch_id=branch_id,
sql="SELECT * FROM revenue_forecast WHERE model = 'v2'",
)
approved = validate_forecast(preview.rows)
# Agent C: merge or discard based on review outcome
agent_c = hatidata.Client(api_key="hd_agent_c_key")
if approved:
agent_c.branches.merge(branch_id=branch_id, strategy="branch_wins")
else:
agent_c.branches.discard(branch_id=branch_id)
Considerations:
- Branches are copy-on-write: creating a branch is near-zero cost until the first write.
- Set a branch TTL at the org level to automatically discard abandoned branches (see Cost Optimization).
- Use
strategy="manual"in merge to surface conflicts for human review rather than auto-resolving.
Pattern 4: Agent Handoff via State Transfer
One agent completes a phase of work and passes its full execution state to a successor agent, which resumes without reprocessing.
Architecture:
Agent A (phase 1) Agent B (phase 2)
│ │
│ set_agent_state(key, value) │
│──────────────────────────────────► │
│ │ get_agent_state(key)
│ │◄──────────────────────
# Agent A completes phase 1 and saves its state
agent_a = hatidata.Client(api_key="hd_agent_a_key")
agent_a.state.set(
agent_id="pipeline-agent-a",
key="phase1_output",
value={
"processed_records": 15_420,
"anomalies_detected": 3,
"output_table": "cleaned_transactions_2026_q1",
"completed_at": "2026-02-25T10:30:00Z",
},
)
# Notify Agent B (via trigger or message queue)
# ...
# Agent B resumes from Agent A's saved state
agent_b = hatidata.Client(api_key="hd_agent_b_key")
phase1 = agent_b.state.get(agent_id="pipeline-agent-a", key="phase1_output")
print(f"Agent B resuming from: {phase1['output_table']}")
print(f"Anomalies to investigate: {phase1['anomalies_detected']}")
# Agent B processes the output table Agent A produced
agent_b.query(f"SELECT * FROM {phase1['output_table']} LIMIT 100")
Considerations:
- State values are durable and survive agent restarts — suitable for long-running pipelines.
- Use namespaced keys (e.g.,
phase1_output,phase2_results) to avoid key collisions in shared state.
Pattern 5: Supervisor Pattern
A supervisor agent monitors other agents' activity by querying the audit log and CoT ledger, escalating anomalies to a human operator.
Architecture:
Agent A ─── writes ──► _hatidata_audit_log ◄─── queries ─── Supervisor Agent
Agent B ─── writes ──► _hatidata_cot ◄─── queries ─┘
Agent C ─── writes ──► │
│ alert on anomaly
Human Operator
supervisor = hatidata.Client(api_key="hd_supervisor_key")
ANOMALY_SQL = """
SELECT
agent_id,
COUNT(*) AS denied_queries,
MAX(denied_at) AS last_denial
FROM _hatidata_policy_denials
WHERE denied_at >= NOW() - INTERVAL '1 hour'
GROUP BY agent_id
HAVING COUNT(*) > 10
ORDER BY denied_queries DESC;
"""
def supervision_loop():
while True:
anomalies = supervisor.query(ANOMALY_SQL).rows
for row in anomalies:
print(f"[ALERT] Agent {row['agent_id']} had {row['denied_queries']} "
f"denied queries in the last hour")
escalate_to_operator(row)
# Verify CoT integrity for active sessions
active_sessions = supervisor.query(
"SELECT DISTINCT session_id FROM _hatidata_cot "
"WHERE logged_at >= NOW() - INTERVAL '1 hour'"
).rows
for session in active_sessions:
result = supervisor.cot.verify_chain(session_id=session["session_id"])
if not result.chain_intact:
escalate_chain_break(session["session_id"], result.broken_links)
time.sleep(60)
Considerations:
- The supervisor agent should have read-only ABAC permissions — it should observe, not modify.
- Combine with trigger-based alerts (Recipe in Governance) for real-time notification rather than polling.