Skip to main content

Multi-Agent Coordination

Patterns for building agent systems where multiple agents collaborate, hand off work, and share state through the HatiData agent data layer.


Pattern 1: Shared Memory Between Agents

Agents in the same org can read from and write to a shared memory namespace while keeping their private memories isolated.

Architecture:

Agent A (writer)                Agent B (reader)
│ │
│ store(agent_id="shared-kb") │ search(agent_id="shared-kb")
└──────────┐ ┌──────────────┘
▼ ▼
_hatidata_memory
(shared-knowledge-base namespace)
import hatidata

SHARED_NS = "shared-knowledge-base"

# Agent A writes a discovery to the shared namespace
agent_a = hatidata.Client(api_key="hd_agent_a_key")
agent_a.memory.store(
agent_id=SHARED_NS,
content="Customer segment 'enterprise' has 42% higher LTV than mid-market.",
metadata={"source": "agent-a", "topic": "customer-segments", "confidence": 0.91},
)

# Agent B reads the shared knowledge without knowing which agent wrote it
agent_b = hatidata.Client(api_key="hd_agent_b_key")
results = agent_b.memory.search(
agent_id=SHARED_NS,
query="Which customer segment has the highest lifetime value?",
top_k=5,
)
for r in results:
print(f"[{r.score:.2f}] {r.content}")

Considerations:

  • Apply an ABAC write-restriction policy so only designated writer agents can populate the shared namespace.
  • Run cleanup with a higher minimum importance threshold (e.g., 0.5) on shared stores to prevent noise accumulation.

Pattern 2: Trigger-Based Coordination

Agent A writes data; a semantic trigger automatically notifies Agent B to act on the new information — no polling required.

Architecture:

Agent A                    HatiData Trigger Engine           Agent B
│ │ │
│ store(content="...") │ │
│──────────────────────────────► │ │
│ │ cosine_similarity ≥ 0.85 │
│ │──────────────────────────► │
│ │ AgentNotify inbox push │
│ │ │ poll_inbox()
│ │ │◄──────────
# Register the trigger once (e.g., at startup or via the dashboard)
coordinator = hatidata.Client(api_key="hd_coordinator_key")
coordinator.triggers.register(
name="new-segment-analysis",
description="Notify Agent B when Agent A stores customer segment insights",
concept="customer segment lifetime value LTV churn analysis",
threshold=0.85,
action={
"type": "agent_notify",
"target_agent_id": "agent-b",
},
cooldown_seconds=60,
)

# Agent B polls its inbox for notifications
agent_b = hatidata.Client(api_key="hd_agent_b_key")

def agent_b_loop():
while True:
notifications = agent_b.inbox.poll(agent_id="agent-b", limit=10)
for notif in notifications:
handle_notification(notif)
time.sleep(5)

def handle_notification(notif):
# Retrieve the memory that triggered the notification
mem = agent_b.memory.get(memory_id=notif.source_memory_id)
print(f"Agent B received: {mem.content}")
# ... act on the new information

Considerations:

  • Set cooldown_seconds to avoid flooding Agent B's inbox during high-write bursts.
  • Use webhook action type instead of agent_notify for external systems that need real-time push.

Pattern 3: Branch-Based Collaboration

Agents collaborate on experimental changes in isolated branches before promoting results to the main data layer.

Architecture:

Agent A (creator)          Agent B (reviewer)         Agent C (merger)
│ │ │
│ branch_create() │ │
│──────────────────────► │ │
│ branch_query/write() │ │
│ │ │
│ branch_id → handoff ──► │ │
│ │ branch_query() (read) │
│ │ approve/reject │
│ │──────────────────────► │
│ │ │ branch_merge()
│ │ │──────────────►
│ │ │ (or branch_discard)
# Agent A: create an experimental branch and write to it
agent_a = hatidata.Client(api_key="hd_agent_a_key")

branch = agent_a.branches.create(
label="q1-forecast-experiment",
description="Test updated revenue forecast model",
)
branch_id = branch.branch_id

# Write experimental results into the branch (does not affect main)
agent_a.branches.write(
branch_id=branch_id,
table="revenue_forecast",
rows=[
{"period": "2026-Q1", "model": "v2", "predicted_arr": 4_200_000},
],
)

# Hand off branch_id to Agent B for review
# (pass via shared memory, message queue, or direct API call)
agent_a.memory.store(
agent_id="shared-knowledge-base",
content=f"Branch {branch_id} ready for review: q1-forecast-experiment",
metadata={"branch_id": branch_id, "status": "pending-review"},
)

# Agent B: review the branch
agent_b = hatidata.Client(api_key="hd_agent_b_key")
preview = agent_b.branches.query(
branch_id=branch_id,
sql="SELECT * FROM revenue_forecast WHERE model = 'v2'",
)
approved = validate_forecast(preview.rows)

# Agent C: merge or discard based on review outcome
agent_c = hatidata.Client(api_key="hd_agent_c_key")
if approved:
agent_c.branches.merge(branch_id=branch_id, strategy="branch_wins")
else:
agent_c.branches.discard(branch_id=branch_id)

Considerations:

  • Branches are copy-on-write: creating a branch is near-zero cost until the first write.
  • Set a branch TTL at the org level to automatically discard abandoned branches (see Cost Optimization).
  • Use strategy="manual" in merge to surface conflicts for human review rather than auto-resolving.

Pattern 4: Agent Handoff via State Transfer

One agent completes a phase of work and passes its full execution state to a successor agent, which resumes without reprocessing.

Architecture:

Agent A (phase 1)                    Agent B (phase 2)
│ │
│ set_agent_state(key, value) │
│──────────────────────────────────► │
│ │ get_agent_state(key)
│ │◄──────────────────────
# Agent A completes phase 1 and saves its state
agent_a = hatidata.Client(api_key="hd_agent_a_key")

agent_a.state.set(
agent_id="pipeline-agent-a",
key="phase1_output",
value={
"processed_records": 15_420,
"anomalies_detected": 3,
"output_table": "cleaned_transactions_2026_q1",
"completed_at": "2026-02-25T10:30:00Z",
},
)

# Notify Agent B (via trigger or message queue)
# ...

# Agent B resumes from Agent A's saved state
agent_b = hatidata.Client(api_key="hd_agent_b_key")

phase1 = agent_b.state.get(agent_id="pipeline-agent-a", key="phase1_output")
print(f"Agent B resuming from: {phase1['output_table']}")
print(f"Anomalies to investigate: {phase1['anomalies_detected']}")

# Agent B processes the output table Agent A produced
agent_b.query(f"SELECT * FROM {phase1['output_table']} LIMIT 100")

Considerations:

  • State values are durable and survive agent restarts — suitable for long-running pipelines.
  • Use namespaced keys (e.g., phase1_output, phase2_results) to avoid key collisions in shared state.

Pattern 5: Supervisor Pattern

A supervisor agent monitors other agents' activity by querying the audit log and CoT ledger, escalating anomalies to a human operator.

Architecture:

Agent A ─── writes ──► _hatidata_audit_log ◄─── queries ─── Supervisor Agent
Agent B ─── writes ──► _hatidata_cot ◄─── queries ─┘
Agent C ─── writes ──► │
│ alert on anomaly
Human Operator
supervisor = hatidata.Client(api_key="hd_supervisor_key")

ANOMALY_SQL = """
SELECT
agent_id,
COUNT(*) AS denied_queries,
MAX(denied_at) AS last_denial
FROM _hatidata_policy_denials
WHERE denied_at >= NOW() - INTERVAL '1 hour'
GROUP BY agent_id
HAVING COUNT(*) > 10
ORDER BY denied_queries DESC;
"""

def supervision_loop():
while True:
anomalies = supervisor.query(ANOMALY_SQL).rows
for row in anomalies:
print(f"[ALERT] Agent {row['agent_id']} had {row['denied_queries']} "
f"denied queries in the last hour")
escalate_to_operator(row)

# Verify CoT integrity for active sessions
active_sessions = supervisor.query(
"SELECT DISTINCT session_id FROM _hatidata_cot "
"WHERE logged_at >= NOW() - INTERVAL '1 hour'"
).rows
for session in active_sessions:
result = supervisor.cot.verify_chain(session_id=session["session_id"])
if not result.chain_intact:
escalate_chain_break(session["session_id"], result.broken_links)

time.sleep(60)

Considerations:

  • The supervisor agent should have read-only ABAC permissions — it should observe, not modify.
  • Combine with trigger-based alerts (Recipe in Governance) for real-time notification rather than polling.

Stay in the loop

Product updates, engineering deep-dives, and agent-native insights. No spam.