Skip to main content

Branch Recipes

Practical recipes for using HatiData's branch isolation in production agent systems.


Recipe 1: A/B Testing with Branches

Compare two approaches by running each in its own branch:

from hatidata import HatiDataClient

client = HatiDataClient(
host="localhost",
port=5439,
api_key="hd_live_your_api_key",
)

# Create two branches for A/B testing
branch_a = client.branches.create(
name="pricing-model-a",
description="10% across-the-board increase",
agent_id="pricing-agent",
ttl_hours=48,
)

branch_b = client.branches.create(
name="pricing-model-b",
description="Tiered increase: 5% standard, 15% premium",
agent_id="pricing-agent",
ttl_hours=48,
)

# Apply changes to branch A
client.branches.write(
branch_id=branch_a.branch_id,
sql="UPDATE products SET price = price * 1.10",
)

# Apply changes to branch B
client.branches.write(
branch_id=branch_b.branch_id,
sql="""
UPDATE products
SET price = CASE
WHEN tier = 'premium' THEN price * 1.15
ELSE price * 1.05
END
""",
)

# Compare revenue projections
def project_revenue(branch_id: str) -> dict:
rows = client.branches.query(
branch_id=branch_id,
sql="""
SELECT
SUM(p.price * o.quantity) AS projected_revenue,
AVG(p.price) AS avg_price
FROM products p
JOIN orders o ON p.product_id = o.product_id
WHERE o.order_date >= '2025-10-01'
""",
)
return dict(rows[0])

rev_a = project_revenue(branch_a.branch_id)
rev_b = project_revenue(branch_b.branch_id)
rev_main = dict(client.query("""
SELECT SUM(p.price * o.quantity) AS projected_revenue, AVG(p.price) AS avg_price
FROM products p JOIN orders o ON p.product_id = o.product_id
WHERE o.order_date >= '2025-10-01'
""")[0])

print(f"Current: ${rev_main['projected_revenue']:,.0f} (avg: ${rev_main['avg_price']:.2f})")
print(f"Model A: ${rev_a['projected_revenue']:,.0f} (avg: ${rev_a['avg_price']:.2f})")
print(f"Model B: ${rev_b['projected_revenue']:,.0f} (avg: ${rev_b['avg_price']:.2f})")

Recipe 2: Safe Agent Exploration

Let an agent explore and modify data freely, with the ability to undo everything:

def run_with_branch(agent_id: str, task_fn, merge_if_successful: bool = False):
"""Run an agent task within a branch for safe exploration."""
branch = client.branches.create(
name=f"exploration-{agent_id}",
agent_id=agent_id,
ttl_hours=4,
)

try:
result = task_fn(branch.branch_id)

if merge_if_successful and result.get("success"):
merge = client.branches.merge(
branch_id=branch.branch_id,
strategy="branch_wins",
)
print(f"Branch merged: {merge.tables_merged} tables")
else:
client.branches.discard(branch_id=branch.branch_id)
print("Branch discarded.")

return result
except Exception as e:
client.branches.discard(branch_id=branch.branch_id)
print(f"Error, branch discarded: {e}")
raise

# Example task function
def data_cleanup_task(branch_id: str) -> dict:
"""Clean up duplicate records within the safety of a branch."""
# Find and remove duplicates
client.branches.write(
branch_id=branch_id,
sql="""
DELETE FROM customers
WHERE customer_id IN (
SELECT customer_id FROM (
SELECT customer_id,
ROW_NUMBER() OVER (PARTITION BY email ORDER BY created_at) AS rn
FROM customers
) WHERE rn > 1
)
""",
)

# Verify the result
remaining = client.branches.query(
branch_id=branch_id,
sql="SELECT COUNT(*) AS count FROM customers",
)

return {"success": True, "remaining_customers": remaining[0]["count"]}

result = run_with_branch("cleanup-agent", data_cleanup_task, merge_if_successful=True)

Recipe 3: Branch-Per-Request Pattern

Create a short-lived branch for each agent request, ensuring complete isolation:

from contextlib import contextmanager

@contextmanager
def request_branch(agent_id: str, request_id: str):
"""Context manager that creates and cleans up a branch per request."""
branch = client.branches.create(
name=f"req-{request_id}",
agent_id=agent_id,
ttl_hours=1,
)
try:
yield branch
finally:
# Always discard -- merge only if explicitly requested
try:
client.branches.discard(branch_id=branch.branch_id)
except Exception:
pass # Branch may have been merged already

# Usage
with request_branch("analyst-agent", "req-12345") as branch:
# All writes are isolated to this request
client.branches.write(
branch_id=branch.branch_id,
sql="CREATE TABLE temp_analysis AS SELECT * FROM orders WHERE total > 1000",
)

results = client.branches.query(
branch_id=branch.branch_id,
sql="SELECT COUNT(*) AS high_value_orders FROM temp_analysis",
)
print(f"High-value orders: {results[0]['high_value_orders']}")
# Branch is automatically discarded when the context exits

Recipe 4: Merge Strategy Selection

Choose the right merge strategy based on the use case:

def smart_merge(branch_id: str, tables_modified: list[str]) -> dict:
"""Select merge strategy based on table characteristics."""

# First, check for conflicts
preview = client.branches.merge(
branch_id=branch_id,
strategy="manual", # Preview only
)

if not preview.has_conflicts:
# No conflicts -- safe to merge directly
return client.branches.merge(
branch_id=branch_id,
strategy="branch_wins",
)

# Resolve per-table based on data characteristics
resolutions = {}
for conflict in preview.conflicts:
if conflict.table in ("_hatidata_agent_memory", "_hatidata_cot"):
# Append-only tables: branch always wins (new data)
resolutions[conflict.table] = "branch_wins"
elif conflict.table in ("customers", "products"):
# Reference data: main wins to preserve canonical records
resolutions[conflict.table] = "main_wins"
else:
# Default: abort and review manually
resolutions[conflict.table] = "abort"

if "abort" in resolutions.values():
print("Manual review required for conflicting tables.")
return preview

return client.branches.resolve(
branch_id=branch_id,
merge_id=preview.merge_id,
resolutions=resolutions,
)

Recipe 5: Garbage Collection Monitoring

Monitor branch lifecycle and storage usage:

-- Active branches with age and size
SELECT
branch_id,
name,
agent_id,
created_at,
expires_at,
modified_table_count,
total_size_bytes,
EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600 AS age_hours
FROM _hatidata_branches
WHERE status = 'active'
ORDER BY total_size_bytes DESC;

-- Branches past their TTL (should have been cleaned up)
SELECT branch_id, name, expires_at
FROM _hatidata_branches
WHERE status = 'active'
AND expires_at < NOW();

-- Branch storage usage by agent
SELECT
agent_id,
COUNT(*) AS active_branches,
SUM(total_size_bytes) AS total_bytes,
AVG(modified_table_count) AS avg_modified_tables
FROM _hatidata_branches
WHERE status = 'active'
GROUP BY agent_id
ORDER BY total_bytes DESC;

Programmatic Cleanup

def cleanup_stale_branches(max_age_hours: int = 24):
"""Clean up branches that have exceeded their TTL."""
branches = client.branches.list(status="active")
cleaned = 0

for b in branches:
age_hours = (datetime.utcnow() - b.created_at).total_seconds() / 3600
if age_hours > max_age_hours:
client.branches.discard(branch_id=b.branch_id)
print(f" Cleaned: {b.name} (age: {age_hours:.1f}h)")
cleaned += 1

print(f"Cleaned {cleaned} stale branches.")

Recipe 6: Branch Diffing

Compare branch state against main before deciding to merge:

def diff_branch(branch_id: str, table: str) -> dict:
"""Compare a table between branch and main."""
# Rows in branch but not in main
added = client.branches.query(
branch_id=branch_id,
sql=f"""
SELECT * FROM {table}
EXCEPT
SELECT * FROM main.{table}
""",
)

# Rows in main but not in branch
removed = client.branches.query(
branch_id=branch_id,
sql=f"""
SELECT * FROM main.{table}
EXCEPT
SELECT * FROM {table}
""",
)

return {
"table": table,
"added_rows": len(added),
"removed_rows": len(removed),
"net_change": len(added) - len(removed),
}

# Diff all modified tables
branch_info = client.branches.get(branch_id=branch.branch_id)
for table in branch_info.modified_tables:
diff = diff_branch(branch.branch_id, table)
print(f" {diff['table']}: +{diff['added_rows']} / -{diff['removed_rows']}")

Recipe 7: Schema Evolution via Branch

Use branches to propose, validate, and deploy schema changes (new tables, new columns) without risk to production data. This is especially useful when an agent needs to extend its own data model.

The Pattern

  1. Create a branch
  2. Apply DDL (CREATE TABLE, ALTER TABLE) inside the branch
  3. Validate the new schema by inserting test data and querying it
  4. Merge back to main -- the schema change propagates
def evolve_schema_in_branch(agent_id: str, description: str, ddl_statements: list[str]):
"""Propose and validate a schema change inside a branch before merging."""
branch = client.branches.create(
name=f"schema-evolution-{agent_id}",
description=description,
agent_id=agent_id,
ttl_hours=24,
)
print(f"Branch created: {branch.branch_id}")

# Step 1: Apply DDL in the branch
for ddl in ddl_statements:
client.branches.write(branch_id=branch.branch_id, sql=ddl)
print(f" Applied: {ddl[:80]}...")

return branch


# Example: Agent proposes a new table for tracking customer interactions
branch = evolve_schema_in_branch(
agent_id="analytics-agent",
description="Add interaction_events table for customer journey tracking",
ddl_statements=[
"""
CREATE TABLE interaction_events (
event_id TEXT PRIMARY KEY,
customer_id TEXT NOT NULL,
event_type TEXT NOT NULL,
channel TEXT,
metadata TEXT,
occurred_at TIMESTAMP DEFAULT NOW()
)
""",
"""
CREATE INDEX idx_interactions_customer
ON interaction_events (customer_id, occurred_at)
""",
],
)

Validate the Schema Change

Before merging, insert test data and run validation queries to confirm the schema works:

def validate_schema(branch_id: str) -> bool:
"""Insert test data and validate the new schema works correctly."""
# Insert test records
client.branches.write(
branch_id=branch_id,
sql="""
INSERT INTO interaction_events (event_id, customer_id, event_type, channel, occurred_at)
VALUES
('evt-001', 'cust-100', 'page_view', 'web', NOW()),
('evt-002', 'cust-100', 'support_ticket', 'chat', NOW()),
('evt-003', 'cust-200', 'purchase', 'mobile', NOW())
""",
)

# Verify the table structure and data
rows = client.branches.query(
branch_id=branch_id,
sql="""
SELECT
customer_id,
COUNT(*) AS event_count,
COUNT(DISTINCT event_type) AS unique_types
FROM interaction_events
GROUP BY customer_id
ORDER BY event_count DESC
""",
)

# Verify join with existing tables works
join_result = client.branches.query(
branch_id=branch_id,
sql="""
SELECT c.customer_id, c.name, COUNT(e.event_id) AS interactions
FROM customers c
LEFT JOIN interaction_events e ON c.customer_id = e.customer_id
GROUP BY c.customer_id, c.name
LIMIT 5
""",
)

print(f" Test rows inserted: {len(rows)} customer groups")
print(f" Join validation: {len(join_result)} rows returned")
return len(rows) > 0 and len(join_result) > 0


is_valid = validate_schema(branch.branch_id)

Merge the Schema Change

Once validated, merge the branch. The new table and its data become part of main:

if is_valid:
merge_result = client.branches.merge(
branch_id=branch.branch_id,
strategy="branch_wins",
)
print(f"Schema merged: {merge_result.tables_merged} tables updated")

# Verify the table exists in main
check = client.query("SELECT COUNT(*) AS cnt FROM interaction_events")
print(f" Rows in main after merge: {check[0]['cnt']}")
else:
client.branches.discard(branch_id=branch.branch_id)
print("Validation failed, branch discarded.")
DDL Merges vs DML Merges

Schema changes (DDL) and data changes (DML) behave differently during merge:

  • DML merges (INSERT, UPDATE, DELETE) use conflict detection. If the same row was modified in both main and the branch, the merge strategy determines which version wins.
  • DDL merges (CREATE TABLE, ALTER TABLE, CREATE INDEX) are applied as-is. If main already has a table with the same name (created after the branch was forked), the merge will fail with a conflict. In that case, discard the branch, create a new one from current main, and re-apply the DDL.
  • ALTER TABLE in branches works for adding columns. Dropping or renaming columns in a branch is not recommended -- these are destructive operations that can break queries running against main after merge.
  • Best practice: Keep DDL branches short-lived. Create the branch, apply the schema change, validate, and merge within a single session. Long-lived DDL branches increase the risk of conflicts with main.

Stay in the loop

Product updates, engineering deep-dives, and agent-native insights. No spam.