Skip to main content

Build a Research Agent with Branching

In this tutorial you will build a research agent that explores multiple competing hypotheses in parallel, compares the results across isolated branches, merges the winning hypothesis back to main, and stores validated discoveries as persistent memories.

By the end you will have an agent that:

  • Creates isolated branches in the data layer to explore independent hypotheses without interference
  • Runs speculative queries and transformations inside each branch safely
  • Compares branch results to select the strongest hypothesis
  • Merges the winning branch and discards the rest
  • Stores validated findings as searchable memories for future agents

Prerequisites

  • Python 3.10 or later
  • A HatiData account and API key (sign up)
  • hatidata Python SDK installed
pip install hatidata

Set your credentials as environment variables:

export HATIDATA_API_KEY="hd_your_api_key"
export HATIDATA_ORG="your-org-slug"

Step 1: Set Up HatiData with Market Data

Create the client and define a market data layer. The research agent will explore this data without ever modifying the main tables directly.

import os
from hatidata import HatiDataClient

client = HatiDataClient(
api_key=os.environ["HATIDATA_API_KEY"],
org=os.environ["HATIDATA_ORG"],
)

client.execute("""
CREATE TABLE IF NOT EXISTS market_prices (
symbol TEXT NOT NULL,
price_usd NUMERIC(18, 4) NOT NULL,
volume_24h NUMERIC(24, 2),
market_cap NUMERIC(24, 2),
recorded_at TIMESTAMPTZ DEFAULT now(),
PRIMARY KEY (symbol, recorded_at)
)
""")

client.execute("""
CREATE TABLE IF NOT EXISTS market_signals (
signal_id TEXT PRIMARY KEY,
symbol TEXT NOT NULL,
signal_type TEXT NOT NULL, -- momentum, reversal, breakout, anomaly
strength NUMERIC(5, 4), -- 0.0 to 1.0
notes TEXT,
created_at TIMESTAMPTZ DEFAULT now()
)
""")

print("Market data layer ready.")

Seed historical price data for three symbols with distinct patterns:

from datetime import datetime, timedelta, timezone
import random

random.seed(42)
base_time = datetime.now(timezone.utc) - timedelta(days=30)

price_rows = []
for day in range(30):
ts = base_time + timedelta(days=day)

# BTC: steady uptrend
btc_price = 42000 + (day * 150) + random.uniform(-300, 300)
price_rows.append(("BTC", round(btc_price, 4), 28_000_000_000, 820_000_000_000, ts))

# ETH: volatile with a mid-period spike
eth_base = 2200 + (day * 30)
spike = 400 if 12 <= day <= 15 else 0
eth_price = eth_base + spike + random.uniform(-80, 80)
price_rows.append(("ETH", round(eth_price, 4), 12_000_000_000, 265_000_000_000, ts))

# SOL: declining then recovering
sol_price = 95 - (day * 0.5) + (day * 0.8 if day > 20 else 0) + random.uniform(-3, 3)
price_rows.append(("SOL", round(sol_price, 4), 2_500_000_000, 42_000_000_000, ts))

client.executemany(
"""INSERT INTO market_prices (symbol, price_usd, volume_24h, market_cap, recorded_at)
VALUES (?, ?, ?, ?, ?)""",
price_rows,
)

print(f"Seeded {len(price_rows)} price records across 3 symbols over 30 days.")

Step 2: Create Branches for Different Hypotheses

Each hypothesis gets its own isolated branch. Branches start as zero-copy views of the main data layer — the first write to a branch materialises a private copy of the affected tables.

from hatidata.branches import BranchClient

branches = BranchClient(client)

AGENT_ID = "research-agent-v1"

# Hypothesis A: momentum-based — rising volume predicts price continuation
branch_a = branches.create(
agent_id=AGENT_ID,
name="hypothesis-momentum",
description="Rising volume in the prior 7 days predicts price continuation over the next 7 days.",
)

# Hypothesis B: mean-reversion — large short-term deviations revert to the 30-day mean
branch_b = branches.create(
agent_id=AGENT_ID,
name="hypothesis-mean-reversion",
description="Symbols deviating more than 2 standard deviations from their 30-day mean price revert within 5 days.",
)

# Hypothesis C: cross-asset correlation — BTC momentum leads ETH by 2-3 days
branch_c = branches.create(
agent_id=AGENT_ID,
name="hypothesis-btc-eth-lag",
description="BTC price momentum leads ETH momentum by 2-3 days; a lagged correlation above 0.75 confirms.",
)

print(f"Created branches: {branch_a['id']}, {branch_b['id']}, {branch_c['id']}")

Step 3: Run Speculative Queries in Each Branch

Each branch gets its own execution context. Queries and writes inside a branch do not affect other branches or the main data layer.

def explore_momentum_hypothesis(branch_id: str) -> dict:
"""
Test whether 7-day volume growth correlates with 7-day price return.
Writes an intermediate signal table inside the branch.
"""
# Compute per-symbol volume and price change over rolling 7-day windows
rows = branches.query(branch_id, """
WITH weekly AS (
SELECT
symbol,
DATE_TRUNC('week', recorded_at) AS week,
AVG(price_usd) AS avg_price,
SUM(volume_24h) AS total_volume
FROM market_prices
GROUP BY symbol, DATE_TRUNC('week', recorded_at)
),
changes AS (
SELECT
symbol,
week,
avg_price,
total_volume,
LAG(avg_price, 1) OVER (PARTITION BY symbol ORDER BY week) AS prev_price,
LAG(total_volume, 1) OVER (PARTITION BY symbol ORDER BY week) AS prev_volume
FROM weekly
)
SELECT
symbol,
week,
ROUND((avg_price - prev_price) / NULLIF(prev_price, 0) * 100, 2) AS price_chg_pct,
ROUND((total_volume - prev_volume) / NULLIF(prev_volume, 0) * 100, 2) AS volume_chg_pct
FROM changes
WHERE prev_price IS NOT NULL
ORDER BY symbol, week
""")

result_rows = [dict(r) for r in rows]

# Count weeks where volume and price both increased (momentum confirmation)
confirmations = sum(
1 for r in result_rows
if r["price_chg_pct"] is not None
and r["volume_chg_pct"] is not None
and r["price_chg_pct"] > 0
and r["volume_chg_pct"] > 0
)
confirmation_rate = confirmations / max(len(result_rows), 1)

# Write finding into the branch (does not touch main)
branches.write(branch_id, """
CREATE TABLE IF NOT EXISTS hypothesis_results AS
SELECT 'momentum' AS hypothesis, ? AS confirmation_rate, ? AS sample_size
""", [confirmation_rate, len(result_rows)])

return {
"hypothesis": "momentum",
"branch_id": branch_id,
"confirmation_rate": confirmation_rate,
"sample_size": len(result_rows),
"score": confirmation_rate,
}


def explore_mean_reversion_hypothesis(branch_id: str) -> dict:
"""
Test whether prices more than 2 std devs from their 30-day mean revert within 5 days.
"""
rows = branches.query(branch_id, """
WITH stats AS (
SELECT
symbol,
AVG(price_usd) AS mean_price,
STDDEV(price_usd) AS std_price
FROM market_prices
GROUP BY symbol
),
deviations AS (
SELECT
p.symbol,
p.price_usd,
p.recorded_at,
s.mean_price,
s.std_price,
ABS(p.price_usd - s.mean_price) / NULLIF(s.std_price, 0) AS z_score
FROM market_prices p
JOIN stats s ON p.symbol = s.symbol
),
extreme AS (
SELECT
symbol,
recorded_at AS extreme_date,
price_usd AS extreme_price,
mean_price,
z_score
FROM deviations
WHERE z_score > 2.0
)
SELECT
e.symbol,
e.extreme_date,
e.extreme_price,
e.mean_price,
e.z_score,
LEAD(p.price_usd, 5) OVER (
PARTITION BY p.symbol ORDER BY p.recorded_at
) AS price_5d_later
FROM extreme e
JOIN market_prices p
ON e.symbol = p.symbol AND e.extreme_date = p.recorded_at
""")

result_rows = [dict(r) for r in rows]

reversions = sum(
1 for r in result_rows
if r["price_5d_later"] is not None
and abs(r["price_5d_later"] - r["mean_price"]) < abs(r["extreme_price"] - r["mean_price"])
)
reversion_rate = reversions / max(len(result_rows), 1)

branches.write(branch_id, """
CREATE TABLE IF NOT EXISTS hypothesis_results AS
SELECT 'mean_reversion' AS hypothesis, ? AS reversion_rate, ? AS extreme_events
""", [reversion_rate, len(result_rows)])

return {
"hypothesis": "mean_reversion",
"branch_id": branch_id,
"reversion_rate": reversion_rate,
"extreme_events": len(result_rows),
"score": reversion_rate,
}


def explore_btc_eth_lag_hypothesis(branch_id: str) -> dict:
"""
Test whether BTC daily returns lead ETH daily returns by 2-3 days.
"""
rows = branches.query(branch_id, """
WITH daily AS (
SELECT
symbol,
DATE_TRUNC('day', recorded_at) AS day,
AVG(price_usd) AS avg_price
FROM market_prices
GROUP BY symbol, DATE_TRUNC('day', recorded_at)
),
returns AS (
SELECT
symbol,
day,
avg_price,
LAG(avg_price, 1) OVER (PARTITION BY symbol ORDER BY day) AS prev_price
FROM daily
),
btc AS (SELECT day, (avg_price - prev_price) / NULLIF(prev_price, 0) AS ret FROM returns WHERE symbol = 'BTC' AND prev_price IS NOT NULL),
eth AS (SELECT day, (avg_price - prev_price) / NULLIF(prev_price, 0) AS ret FROM returns WHERE symbol = 'ETH' AND prev_price IS NOT NULL)
SELECT
btc.day,
btc.ret AS btc_ret,
eth.ret AS eth_ret_lag2
FROM btc
JOIN eth ON eth.day = btc.day + INTERVAL '2 days'
""")

result_rows = [dict(r) for r in rows]

if len(result_rows) < 5:
correlation = 0.0
else:
btc_rets = [float(r["btc_ret"]) for r in result_rows]
eth_rets = [float(r["eth_ret_lag2"]) for r in result_rows]
n = len(btc_rets)
mean_b = sum(btc_rets) / n
mean_e = sum(eth_rets) / n
cov = sum((b - mean_b) * (e - mean_e) for b, e in zip(btc_rets, eth_rets)) / n
std_b = (sum((b - mean_b) ** 2 for b in btc_rets) / n) ** 0.5
std_e = (sum((e - mean_e) ** 2 for e in eth_rets) / n) ** 0.5
correlation = cov / (std_b * std_e) if std_b * std_e > 0 else 0.0

branches.write(branch_id, """
CREATE TABLE IF NOT EXISTS hypothesis_results AS
SELECT 'btc_eth_lag' AS hypothesis, ? AS lag2_correlation, ? AS data_points
""", [correlation, len(result_rows)])

return {
"hypothesis": "btc_eth_lag",
"branch_id": branch_id,
"lag2_correlation": correlation,
"data_points": len(result_rows),
"score": abs(correlation),
}


# Run all three explorations
result_a = explore_momentum_hypothesis(branch_a["id"])
result_b = explore_mean_reversion_hypothesis(branch_b["id"])
result_c = explore_btc_eth_lag_hypothesis(branch_c["id"])

results = [result_a, result_b, result_c]
print("\nHypothesis exploration complete:")
for r in results:
print(f" [{r['hypothesis']}] score={r['score']:.4f}")

Step 4: Compare Branch Results

Query the results from each branch side by side to select the strongest hypothesis.

def compare_hypotheses(branch_ids: list[str], results: list[dict]) -> dict:
"""
Compare branch results and return the winning hypothesis.
"""
# Read the hypothesis_results table from each branch for verification
print("\nBranch result verification:")
for branch_id, result in zip(branch_ids, results):
rows = branches.query(branch_id, "SELECT * FROM hypothesis_results LIMIT 1")
row = dict(next(iter(rows), {}))
print(f" Branch {branch_id[:8]}: {row}")

# Select winner by highest score
winner = max(results, key=lambda r: r["score"])
losers = [r for r in results if r["branch_id"] != winner["branch_id"]]

print(f"\nWinner: '{winner['hypothesis']}' with score {winner['score']:.4f}")
print(f"Discarding: {[r['hypothesis'] for r in losers]}")

return {"winner": winner, "losers": losers}


comparison = compare_hypotheses(
[branch_a["id"], branch_b["id"], branch_c["id"]],
results,
)

Step 5: Merge the Winning Hypothesis

Merge the winning branch back to the main data layer. HatiData checks for conflicts using the configured merge strategy and reports any rows that need manual resolution.

def merge_winning_hypothesis(winner: dict) -> dict:
"""
Merge the winning branch into main using BranchWins strategy.
Discard all losing branches to free resources.
"""
branch_id = winner["branch_id"]

# Merge winner — BranchWins resolves any conflicts in favour of branch data
merge_result = branches.merge(
branch_id=branch_id,
strategy="BranchWins",
dry_run=False,
)

print(f"\nMerge result for branch {branch_id[:8]}:")
print(f" Tables merged : {merge_result['tables_merged']}")
print(f" Rows added : {merge_result['rows_added']}")
print(f" Conflicts : {merge_result['conflicts']}")
print(f" Status : {merge_result['status']}")

# Discard losing branches
for loser in comparison["losers"]:
branches.discard(loser["branch_id"])
print(f" Discarded branch: {loser['branch_id'][:8]} ({loser['hypothesis']})")

return merge_result


merge_result = merge_winning_hypothesis(comparison["winner"])

Available merge strategies:

StrategyBehaviour
BranchWinsBranch data overwrites conflicting main rows
MainWinsMain data is preserved; branch changes discarded on conflict
ManualConflicts returned as a diff for human review
AbortMerge cancelled if any conflict is detected

Step 6: Store Validated Findings as Memories

Persist the validated discovery so future agents can find it through semantic search without re-running the full exploration.

from hatidata.memory import MemoryClient

memory = MemoryClient(client)

def store_validated_finding(winner: dict, merge_result: dict):
"""
Store the validated hypothesis as a persistent, searchable memory.
"""
hypothesis = winner["hypothesis"]

# Build a natural-language summary that embeds well for future retrieval
summaries = {
"momentum": (
f"Market momentum hypothesis validated: in {winner['sample_size']} weekly windows, "
f"rising volume predicted continued price appreciation {winner['confirmation_rate']*100:.1f}% of the time. "
"Use 7-day volume growth as a leading indicator for price continuation signals."
),
"mean_reversion": (
f"Mean reversion hypothesis validated: {winner['extreme_events']} extreme deviation events observed. "
f"Prices reverted toward the 30-day mean within 5 days in {winner['reversion_rate']*100:.1f}% of cases. "
"Positions taken against 2+ standard deviation moves have high reversion probability."
),
"btc_eth_lag": (
f"BTC-ETH lag correlation validated: BTC daily returns lead ETH returns by 2 days "
f"with a Pearson correlation of {winner['lag2_correlation']:.3f} ({winner['data_points']} data points). "
"BTC momentum shifts can be used as a 2-day leading signal for ETH positioning."
),
}

memory.store(
agent_id=AGENT_ID,
content=summaries[hypothesis],
metadata={
"type": "validated_hypothesis",
"hypothesis": hypothesis,
"score": winner["score"],
"branch_id": winner["branch_id"],
"merge_status": merge_result["status"],
"rows_added": merge_result["rows_added"],
},
)

print(f"\nStored validated finding for hypothesis '{hypothesis}'.")
print(f"Summary: {summaries[hypothesis][:120]}...")


store_validated_finding(comparison["winner"], merge_result)

Verify the memory is searchable by future agents:

# Future agent retrieves validated market findings
future_results = memory.search(
agent_id=AGENT_ID,
query="which market indicators reliably predict price movements",
top_k=3,
)

print("\nMemory search results for future agents:")
for r in future_results:
print(f" [{r['score']:.3f}] {r['content'][:100]}...")

What You Built

CapabilityHatiData Feature
Parallel hypothesis explorationBranchClient.create() — schema-isolated branches
Zero-copy branch creationSchema views (no data duplication on create)
Branch-isolated writesBranchClient.write() — copy-on-write materialisation
Cross-branch result comparisonBranchClient.query() per branch
Conflict-aware mergeBranchClient.merge() with configurable strategies
Automatic branch cleanupBranchClient.discard()
Validated finding persistenceMemoryClient.store() with semantic search index

Stay in the loop

Product updates, engineering deep-dives, and agent-native insights. No spam.