Agent Identity Recipes
Practical recipes for managing agent identities, API keys, and access control policies in HatiData.
Recipe 1: Create Agent Keys with Scoped Permissions
Every agent should have its own API key with the minimum required permissions. Never share keys across agents.
from hatidata import HatiDataClient
admin = HatiDataClient(
host="localhost",
port=5439,
api_key="hd_live_admin_key",
)
# Create a read-only key for a reporting agent
reporting_key = admin.keys.create(
name="reporting-agent",
scope="read_only",
description="Read-only access for the weekly report generator",
allowed_tables=["orders", "customers", "products"],
rate_limit=100, # queries per minute
expires_in_days=90,
)
print(f"Reporting key: {reporting_key.api_key}")
# Create an admin key for a data pipeline agent
pipeline_key = admin.keys.create(
name="pipeline-agent",
scope="admin",
description="Full access for the ETL pipeline",
rate_limit=500,
expires_in_days=30,
)
print(f"Pipeline key: {pipeline_key.api_key}")
Key Scopes
| Scope | Permissions |
|---|---|
read_only | SELECT queries only |
admin | SELECT, INSERT, UPDATE, DELETE, CREATE TABLE |
Recipe 2: RBAC Role Assignment
Assign roles to agents for granular access control:
# Create a custom role
admin.roles.create(
name="analyst",
permissions=[
{"action": "SELECT", "resource": "orders"},
{"action": "SELECT", "resource": "customers"},
{"action": "SELECT", "resource": "_hatidata_agent_memory"},
],
)
# Assign the role to an agent key
admin.keys.assign_role(
key_id=reporting_key.key_id,
role="analyst",
)
# Verify: this agent can query orders...
analyst = HatiDataClient(host="localhost", port=5439, api_key=reporting_key.api_key)
rows = analyst.query("SELECT COUNT(*) FROM orders")
# ...but cannot modify data
try:
analyst.execute("DELETE FROM orders WHERE id = 1")
except Exception as e:
print(f"Blocked: {e}") # Permission denied
Recipe 3: Key Rotation
Rotate keys without downtime using the overlap window pattern:
import time
def rotate_key(admin_client, old_key_id: str, agent_name: str) -> str:
"""Rotate an agent key with a 24-hour overlap window."""
# Step 1: Create new key
new_key = admin_client.keys.create(
name=f"{agent_name}-rotated",
scope=admin_client.keys.get(old_key_id).scope,
description=f"Rotated key for {agent_name}",
rate_limit=admin_client.keys.get(old_key_id).rate_limit,
expires_in_days=90,
)
print(f"New key created: {new_key.key_id}")
# Step 2: Copy role assignments
roles = admin_client.keys.list_roles(old_key_id)
for role in roles:
admin_client.keys.assign_role(new_key.key_id, role.name)
# Step 3: Update your agent configuration to use new_key.api_key
# (deploy new config to your agent runtime)
# Step 4: After confirming the agent uses the new key, revoke the old one
# admin_client.keys.revoke(old_key_id)
return new_key.api_key
Always maintain a 24-hour overlap window where both old and new keys are valid. This prevents downtime during deployment rollouts.
Recipe 4: Multi-Tenant Agent Isolation
Isolate agents by organization so that one tenant's agent cannot access another tenant's data:
# Each organization gets its own namespace
def create_tenant_agent(org_id: str, agent_name: str):
"""Create an agent key scoped to a specific organization."""
key = admin.keys.create(
name=f"{org_id}/{agent_name}",
scope="read_only",
org_id=org_id,
description=f"Agent for org {org_id}",
)
# Apply row-level security: agent can only see rows for its org
admin.policies.create(
name=f"rls-{org_id}-{agent_name}",
key_id=key.key_id,
type="row_filter",
condition=f"org_id = '{org_id}'",
tables=["orders", "customers", "events"],
)
return key
# Tenant A's agent can only see Tenant A's data
tenant_a_key = create_tenant_agent("org_acme", "support-agent")
# Tenant B's agent can only see Tenant B's data
tenant_b_key = create_tenant_agent("org_beta", "support-agent")
Column Masking
Mask sensitive columns for certain agents:
admin.policies.create(
name="mask-email-for-analytics",
key_id=reporting_key.key_id,
type="column_mask",
column="email",
mask_function="sha256", # Hash the email column
tables=["customers"],
)
# The reporting agent sees hashed emails
analyst = HatiDataClient(host="localhost", port=5439, api_key=reporting_key.api_key)
rows = analyst.query("SELECT name, email FROM customers LIMIT 3")
# email column contains cryptographic hashes
Recipe 5: Audit Logging for Agent Actions
Query the audit log to track what each agent has done:
-- All queries by a specific agent in the last 24 hours
SELECT
agent_id,
query_text,
rows_returned,
credits_consumed,
latency_ms,
executed_at
FROM _hatidata_audit_log
WHERE agent_id = 'reporting-agent'
AND executed_at > NOW() - INTERVAL '24 hours'
ORDER BY executed_at DESC;
-- Top agents by query volume
SELECT
agent_id,
COUNT(*) AS query_count,
SUM(credits_consumed) AS total_credits,
AVG(latency_ms) AS avg_latency_ms
FROM _hatidata_audit_log
WHERE executed_at > NOW() - INTERVAL '7 days'
GROUP BY agent_id
ORDER BY query_count DESC
LIMIT 20;
-- Detect unusual access patterns
SELECT
agent_id,
COUNT(DISTINCT query_text) AS unique_queries,
COUNT(*) AS total_queries,
MAX(rows_returned) AS max_rows
FROM _hatidata_audit_log
WHERE executed_at > NOW() - INTERVAL '1 hour'
GROUP BY agent_id
HAVING COUNT(*) > 100
ORDER BY total_queries DESC;
Recipe 6: Emergency Key Revocation
Immediately revoke a compromised key:
# Revoke immediately -- all in-flight queries with this key will complete,
# but no new queries will be accepted
admin.keys.revoke(
key_id="key_compromised_abc",
reason="Key exposed in public repository",
)
# List all recently revoked keys
revoked = admin.keys.list(status="revoked")
for k in revoked:
print(f" {k.name}: revoked at {k.revoked_at}, reason: {k.revoke_reason}")
Recipe 7: Agent Offboarding Without Memory Loss
When decommissioning an agent, preserve its accumulated knowledge by exporting memories to a shared namespace or transferring them to a replacement agent.
from hatidata import HatiDataClient
admin = HatiDataClient(host="localhost", port=5439, api_key="hd_live_admin_key")
def offboard_agent(
old_agent_id: str,
transfer_to: str = "shared-knowledge-base",
archive: bool = True,
):
"""Offboard an agent: revoke key, export memories, optionally transfer."""
# Step 1: Revoke the agent's key (blocks new queries immediately)
keys = admin.keys.list(agent_id=old_agent_id, status="active")
for key in keys:
admin.keys.revoke(key.key_id, reason=f"Agent {old_agent_id} offboarded")
print(f"Revoked key {key.key_id}")
# Step 2: Export all memories
memories = admin.memory.list(agent_id=old_agent_id, limit=10000)
print(f"Found {len(memories)} memories to transfer")
# Step 3: Transfer memories to replacement agent or shared namespace
for mem in memories:
admin.memory.store(
agent_id=transfer_to,
content=mem.content,
metadata={
**mem.metadata,
"original_agent": old_agent_id,
"transferred_at": datetime.utcnow().isoformat(),
},
)
# Step 4 (optional): Archive to a dedicated table for compliance
if archive:
admin.execute(f"""
INSERT INTO _hatidata_memory_archive
SELECT *, '{old_agent_id}' AS archived_from, NOW() AS archived_at
FROM _hatidata_memory
WHERE agent_id = '{old_agent_id}'
""")
# Step 5: Clean up the original memories
admin.memory.cleanup(agent_id=old_agent_id, max_age_days=0)
print(f"Agent {old_agent_id} offboarded. Memories transferred to {transfer_to}.")
# Usage
offboard_agent("legacy-support-agent", transfer_to="support-agent-v2")
Recipe 8: Built-in Key Rotation with Grace Period
Use the control plane's built-in rotation endpoint for zero-downtime key rotation with a 72-hour grace period where both old and new keys are valid.
import requests
CP_URL = "https://cp.hatidata.com"
ADMIN_TOKEN = "Bearer eyJ..." # Admin JWT
def rotate_with_grace_period(key_id: str) -> dict:
"""Rotate an API key using the control plane's built-in rotation."""
# Step 1: Trigger rotation (creates new key, old key enters grace period)
resp = requests.post(
f"{CP_URL}/v1/api-keys/{key_id}/rotate",
headers={"Authorization": ADMIN_TOKEN},
json={
"grace_period_hours": 72, # Both keys valid for 72 hours
"reason": "Scheduled quarterly rotation",
},
)
resp.raise_for_status()
rotation = resp.json()
print(f"New key: {rotation['new_key']['api_key']}")
print(f"Old key valid until: {rotation['old_key_expires_at']}")
print(f"Rotation ID: {rotation['rotation_id']}")
return rotation
def check_rotation_status(rotation_id: str) -> dict:
"""Monitor the rotation: verify the new key is in use before grace period ends."""
resp = requests.get(
f"{CP_URL}/v1/api-keys/rotations/{rotation_id}",
headers={"Authorization": ADMIN_TOKEN},
)
resp.raise_for_status()
status = resp.json()
print(f"Status: {status['status']}") # pending, active, completed, expired
print(f"New key last used: {status['new_key_last_used_at']}")
print(f"Old key last used: {status['old_key_last_used_at']}")
print(f"Grace period ends: {status['grace_period_ends_at']}")
if status["old_key_last_used_at"] and not status["new_key_last_used_at"]:
print("WARNING: New key has not been used yet. Update agent config before grace period ends.")
return status
# Usage
rotation = rotate_with_grace_period("key_abc123")
# ... deploy new key to agent config ...
check_rotation_status(rotation["rotation_id"])
The grace period workflow:
| Time | Old Key | New Key | Action Required |
|---|---|---|---|
| T+0 | Valid | Valid | Rotation triggered. Deploy new key to agent. |
| T+0 to T+72h | Valid (grace) | Valid | Both keys work. Monitor that new key is being used. |
| T+72h | Revoked | Valid | Grace period ends. Old key stops working automatically. |
If the new key has not been used before the grace period ends, the control plane sends a webhook notification (if configured) but still revokes the old key. Always verify the new key is active before the grace period expires.
Recipe 9: Multi-Tenant Agent Isolation at Scale
For SaaS platforms where each customer gets their own agent, use per-customer keys with table allowlists and quota enforcement via AgentKeyMetadata.
PLAN_TIERS = {
"free": {"queries_per_hour": 100, "memory_writes_per_hour": 50, "max_tables": 5},
"pro": {"queries_per_hour": 10_000, "memory_writes_per_hour": 5_000, "max_tables": 50},
"enterprise": {"queries_per_hour": 0, "memory_writes_per_hour": 0, "max_tables": 0}, # 0 = unlimited
}
def provision_customer_agent(customer_id: str, plan: str) -> str:
"""Create a per-customer agent key with plan-appropriate quotas."""
tier = PLAN_TIERS[plan]
allowed_tables = [f"{customer_id}_orders", f"{customer_id}_customers", f"{customer_id}_events"]
key = admin.keys.create(
name=f"agent-{customer_id}",
scope="read_only",
org_id=customer_id,
description=f"Auto-provisioned agent for {customer_id} ({plan} plan)",
allowed_tables=allowed_tables if tier["max_tables"] > 0 else None,
rate_limit=tier["queries_per_hour"] if tier["queries_per_hour"] > 0 else None,
metadata={
"plan": plan,
"customer_id": customer_id,
"memory_writes_per_hour": tier["memory_writes_per_hour"],
},
)
# Apply row-level security scoped to this customer
admin.policies.create(
name=f"rls-{customer_id}",
key_id=key.key_id,
type="row_filter",
condition=f"customer_id = '{customer_id}'",
tables=allowed_tables,
)
return key.api_key
def upgrade_customer_plan(customer_id: str, new_plan: str):
"""Upgrade a customer's agent quotas without key rotation."""
tier = PLAN_TIERS[new_plan]
keys = admin.keys.list(agent_id=f"agent-{customer_id}", status="active")
for key in keys:
admin.keys.update(
key_id=key.key_id,
rate_limit=tier["queries_per_hour"] if tier["queries_per_hour"] > 0 else None,
metadata={
**key.metadata,
"plan": new_plan,
"memory_writes_per_hour": tier["memory_writes_per_hour"],
"upgraded_at": datetime.utcnow().isoformat(),
},
)
# Provision agents for three customers
provision_customer_agent("acme-corp", "pro")
provision_customer_agent("startup-xyz", "free")
provision_customer_agent("megacorp", "enterprise")
# Later: upgrade startup-xyz to pro
upgrade_customer_plan("startup-xyz", "pro")
Plan Tier Limits
| Plan | Queries/Hour | Memory Writes/Hour | Max Tables | Column Masking | Audit Export |
|---|---|---|---|---|---|
| Free | 100 | 50 | 5 | No | No |
| Pro | 10,000 | 5,000 | 50 | Yes | Yes |
| Enterprise | Unlimited | Unlimited | Unlimited | Yes | Yes |
Monitoring Per-Customer Usage
-- Queries per customer in the last hour
SELECT
SPLIT_PART(agent_id, '-', 2) AS customer_id,
COUNT(*) AS query_count,
SUM(credits_consumed) AS credits
FROM _hatidata_audit_log
WHERE executed_at > NOW() - INTERVAL '1 hour'
GROUP BY customer_id
ORDER BY query_count DESC;
Related Concepts
- Agent Identity Model -- Full identity architecture
- Security Model -- Authentication and authorization
- Governance Recipes -- Policy and compliance patterns
- Audit Guarantees -- Enterprise audit features
- MCP Tools Reference -- Agent key management via MCP