Skip to main content

Python SDK

HatiData provides two Python packages:

PackageVersionStyleUse Case
hatidatav0.5.0+Async-firstRecommended for new projects. Full agent-native features.
hatidata-agentv0.3.xSyncLegacy sync client. Still supported but no new features.

Async SDK (Recommended)

The hatidata package (v0.5.0+) is the recommended client. It is async-first, supports parameterized queries, and includes built-in modules for agent memory, chain-of-thought, and semantic triggers.

Installation

pip install hatidata

Requirements: Python 3.9+.

Connection

from hatidata import HatiData

db = HatiData(
host="preprod.hatidata.com",
port=5439,
api_key="hd_live_xxx",
database="main",
)
Self-Registration

When connecting in cloud mode, the agent auto-registers with the control plane and receives a unique fingerprint. The fingerprint is used for billing, audit, and policy targeting without any manual setup.

Queries

# Parameterized queries (safe from injection)
rows = await db.query("SELECT * FROM users WHERE id = $1", [42])

# Insert helper
await db.insert("products", {"id": 1, "name": "Widget"})

# Execute DML
count = await db.execute("UPDATE users SET active = true WHERE id = $1", [42])

Agent Memory

Store and retrieve long-term memories with hybrid vector + SQL search.

# Store a memory
await db.memory.store(content="Enterprise customers renew at 95% in Q4", agent_id="my-agent", importance=0.8)

# Search memories by semantic similarity
results = await db.memory.search("organic coffee", top_k=5)
for r in results:
print(r["content"], r["similarity_score"])

Chain-of-Thought

Log immutable, hash-chained reasoning traces for audit and compliance.

# Log a reasoning step
await db.cot.log_step(session_id="s1", step_type="reasoning", content="Revenue is up 15% but concentrated in enterprise.")

# Replay a full session
trace = await db.cot.replay(session_id="s1")
print(trace["chain_valid"]) # True if hash chain is intact

Semantic Triggers

Register event-driven rules that fire when agent behavior matches a concept.

# Register a trigger
await db.triggers.register(
name="alert",
concept="accessing personally identifiable information",
action="webhook",
webhook_url="https://hooks.acme.com/pii-alert",
)

Legacy Sync Client

Legacy

The hatidata-agent package is the legacy sync client. It remains supported but is not receiving new features. Prefer the async hatidata package for new projects.

Installation

pip install hatidata-agent

Requirements: Python 3.9+. Includes psycopg2-binary for Postgres connectivity.

Optional extras:

pip install hatidata-agent[langchain]   # LangChain integration
pip install hatidata-agent[mcp] # MCP server support
pip install hatidata-agent[async] # Async support via asyncpg
pip install hatidata-agent[all] # Everything

HatiDataAgent

Constructor

from hatidata_agent import HatiDataAgent

agent = HatiDataAgent(
host="localhost", # Proxy hostname
port=5439, # Proxy port (default: 5439)
agent_id="my-agent", # Unique agent identifier
framework="custom", # AI framework name
database="hatidata", # Database name
user="agent", # Username
password="hd_live_...", # API key
)
ParameterTypeDefaultDescription
hoststr"localhost"Proxy hostname
portint5439Proxy port
agent_idstrAuto-generatedUnique agent identifier
frameworkstr"custom"AI framework (langchain, crewai, autogen, custom)
databasestr"hatidata"Database name
userstr"agent"Username
passwordstr""Password or API key
prioritystr"normal"Query priority: low, normal, high, critical
connect_timeoutint10Connection timeout in seconds

The agent_id and framework are passed to the proxy as startup parameters. The proxy uses them for billing, scheduling, policy evaluation, and audit.


query()

Execute a SELECT query. Returns a list of dicts.

rows = agent.query("SELECT * FROM customers WHERE status = 'active' LIMIT 10")
for row in rows:
print(row["name"], row["email"])

With parameterized queries (recommended to prevent SQL injection):

rows = agent.query(
"SELECT * FROM orders WHERE customer_id = %s AND status = %s",
params=(42, "active"),
)

With request tracing:

rows = agent.query(
"SELECT * FROM orders",
request_id="trace-abc-123",
)

The request_id appears in the audit log, letting you correlate agent actions with specific queries.

ParameterTypeDescription
sqlstrSQL query (Snowflake syntax auto-transpiled)
paramstupleQuery parameters for parameterized queries
request_idstrOptional trace identifier
Returnslist[dict]Rows as list of dicts

execute()

Execute INSERT, UPDATE, or DELETE. Returns the affected row count.

count = agent.execute("INSERT INTO logs VALUES (1, 'agent started', NOW())")
print(f"{count} rows inserted")

count = agent.execute(
"UPDATE customers SET status = %s WHERE customer_id = %s",
params=("churned", 42),
)

store_memory()

Store a fact, episode, preference, or context entry in long-term agent memory.

memory_id = agent.store_memory(
content="Enterprise customers have a 95% renewal rate in Q4",
memory_type="fact",
metadata={"source": "quarterly-review", "quarter": "Q4-2025"},
importance=0.9,
)
ParameterTypeDefaultDescription
contentstrrequiredMemory content
memory_typestr"fact"fact, episode, preference, or context
metadatadict{}Arbitrary key-value metadata
importancefloat0.5Importance score 0.0–1.0
ReturnsstrUUID of the stored memory

search_memory()

Search memories using natural language with optional filters. Uses hybrid vector ANN + structured metadata search.

memories = agent.search_memory(
query="customer renewal rates",
top_k=5,
memory_type="fact",
min_importance=0.5,
)

for m in memories:
print(m["content"], m["similarity_score"])
ParameterTypeDefaultDescription
querystrrequiredNatural language search query
top_kint10Maximum results
memory_typestrNoneFilter by type
min_importancefloatNoneMinimum importance score
agent_idstrcurrent agentFilter by agent
Returnslist[dict]Memories ranked by similarity score

log_reasoning_step()

Log a step in an agent's reasoning chain. Steps are cryptographic hash-chained per session for tamper detection.

trace = agent.log_reasoning_step(
session_id="analysis-2026-02-25",
step_type="thought",
content="Revenue is up 15% but concentrated in enterprise. Need to check SMB trends.",
metadata={"confidence": 0.85},
importance=0.7,
)
print(trace["step_number"], trace["hash"])

Valid step_type values: observation, thought, action, result, reflection, planning, tool_call, tool_result, decision, error, correction, summary.

Steps with type decision, error, or correction are always embedded for semantic search regardless of the configured sampling rate.


Branch Operations

branch_create()

Create an isolated copy-on-write branch for speculative operations.

branch = agent.branch_create(
name="what-if-pricing",
description="Simulate 10% price increase on enterprise tier",
ttl_seconds=7200,
)
branch_id = branch["branch_id"]

branch_query()

Execute SQL within the branch's isolated schema.

rows = agent.branch_query(
branch_id="br_a1b2c3d4",
sql="UPDATE products SET price = price * 1.10 WHERE tier = 'enterprise'",
)

Writes within a branch use copy-on-write isolation. The main data layer is never affected until you merge.

branch_merge()

Merge branch changes back into the main schema.

result = agent.branch_merge(
branch_id="br_a1b2c3d4",
strategy="branch_wins", # or "main_wins", "abort"
)
print(result["tables_merged"], result["conflicts"])

branch_discard()

Discard a branch and clean up its isolated schema.

agent.branch_discard(branch_id="br_a1b2c3d4")

Agent State

Get and set named state values scoped to the agent identity.

# Set state
agent.set_agent_state("current_task", "Analyzing Q4 revenue")

# Get state
state = agent.get_agent_state("current_task")
print(state["value"], state["updated_at"])

State is stored in _hatidata_agent_state, scoped by agent_id.


Context Manager

with HatiDataAgent(host="localhost", agent_id="my-agent") as agent:
rows = agent.query("SELECT COUNT(*) FROM orders")
# Connection automatically closed when the block exits

Reasoning Chains

Tag related queries with a shared request_id for end-to-end tracing across multi-step operations:

with agent.reasoning_chain("req-001") as chain:
tables = chain.query("SELECT table_name FROM information_schema.tables")
data = chain.query("SELECT * FROM customers LIMIT 10", step=1)
summary = chain.query(
"SELECT status, COUNT(*) AS cnt FROM customers GROUP BY status",
step=2,
)

Nested chains are supported via parent_request_id:

with agent.reasoning_chain("req-002", parent_request_id="req-001") as sub_chain:
detail = sub_chain.query("SELECT * FROM orders WHERE customer_id = 42")

RAG Context Retrieval

context = agent.get_context(
table="knowledge_base",
search_query="enterprise pricing tiers",
top_k=5,
)
# Returns: [{"id": 1, "content": "...", "title": "..."}, ...]
context = agent.get_rag_context(
table="documents",
embedding_col="embedding",
vector=[0.1, 0.2, 0.3, ...], # Query embedding vector
top_k=10,
)

Methods Reference

MethodReturnsDescription
query(sql, params, request_id)list[dict]Execute SELECT, return rows as dicts
execute(sql, params)intExecute INSERT/UPDATE/DELETE, return row count
store_memory(content, memory_type, metadata, importance)strStore memory, return UUID
search_memory(query, top_k, memory_type, min_importance)list[dict]Search memories by semantic similarity
log_reasoning_step(session_id, step_type, content, metadata, importance)dictLog CoT step, return trace metadata
get_agent_state(key)dictGet named state value
set_agent_state(key, value)dictSet named state value
branch_create(name, description, ttl_seconds)dictCreate isolated branch
branch_query(branch_id, sql)list[dict]Execute SQL within a branch
branch_merge(branch_id, strategy)dictMerge branch to main data layer
branch_discard(branch_id)dictDiscard branch
get_context(table, search_query, top_k)list[dict]Full-text RAG retrieval
get_rag_context(table, embedding_col, vector, top_k)list[dict]Vector similarity retrieval
reasoning_chain(request_id, parent_request_id)Context managerMulti-step reasoning tracking
close()NoneClose the database connection

Using Standard psycopg2

HatiData speaks the Postgres wire protocol. Any Postgres driver connects directly:

import psycopg2

conn = psycopg2.connect(
host="localhost",
port=5439,
user="agent",
password="hd_live_...",
dbname="hatidata",
application_name="my-agent/custom", # agent_id/framework convention
)

with conn.cursor() as cur:
cur.execute("SELECT * FROM customers LIMIT 10")
rows = cur.fetchall()

conn.close()

The proxy parses application_name to extract agent identity for billing and audit when connecting outside the SDK.


ControlPlaneClient

REST API client for the HatiData Control Plane — manages agent memory, triggers, branches, CoT traces, JIT access, and agent keys.

from hatidata_agent import ControlPlaneClient

cp = ControlPlaneClient(
base_url="https://api.hatidata.com",
api_key="hd_live_...",
org_id="org_abc123",
env_id="env_prod_x1y2",
)

Constructor Parameters

ParameterTypeDefaultDescription
base_urlstrhttp://localhost:8080Control plane API endpoint
emailstr""Email for JWT login (alternative to api_key)
passwordstr""Password for JWT login
api_keystr""API key (hd_live_*) for authentication
org_idstr""Organization ID
env_idstr""Environment ID
timeoutint15Request timeout (seconds)

Memory Methods

MethodParametersReturnsDescription
create_memory()agent_id, content, memory_type="observation"dictStore new memory
list_memories()agent_id=None, memory_type=None, limit=50list[dict]List memories
search_memory()query, agent_id=None, top_k=10list[dict]Semantic search
delete_memory()memory_iddictDelete memory

Trigger Methods

MethodParametersReturnsDescription
create_trigger()name, concept, threshold=0.85, actions=None, cooldown_secs=60dictRegister trigger
list_triggers()list[dict]List triggers
test_trigger()trigger_id, textdictTest trigger (dry-run)
delete_trigger()trigger_iddictDelete trigger

Branch Methods

MethodParametersReturnsDescription
create_branch()agent_id, tables, description=NonedictCreate branch
list_branches()list[dict]List branches
get_branch()branch_iddictGet details
merge_branch()branch_id, strategy="branch_wins"dictMerge to main
discard_branch()branch_iddictDiscard branch
branch_diff()branch_iddictGet diff
branch_conflicts()branch_iddictGet conflicts

Chain-of-Thought Methods

MethodParametersReturnsDescription
ingest_cot()traces (list of dicts)dictIngest reasoning traces
list_cot_sessions()list[dict]List sessions
replay_cot()session_iddictReplay session
verify_cot()session_iddictVerify hash chain

JIT Access Methods

MethodParametersReturnsDescription
request_jit()target_role, reason, duration_hours=1dictRequest JIT access
list_jit_grants()list[dict]List active grants

Agent Key Methods

MethodParametersReturnsDescription
create_agent_key()agent_name, framework="custom", allowed_tables=None, expires_in_days=NonedictCreate agent key
list_agent_keys()list[dict]List agent keys
rotate_agent_key()key_iddictRotate key
revoke_agent_key()key_iddictRevoke key

Static Helpers

MethodParametersReturnsDescription
build_cot_trace()session_id, agent_id, org_id, step_index, step_type, content, prev_hash=""dictBuild single CoT trace with SHA-256 hash
build_cot_session()agent_id, org_id, steps(str, list[dict])Build hash-chained session

Example

# Store and search memories
cp.create_memory("research-agent", "Customer churn increased 15% in Q4")
results = cp.search_memory("churn trends", top_k=5)

# Create and test a semantic trigger
trigger = cp.create_trigger(
name="fraud-detector",
concept="suspicious transaction patterns",
threshold=0.85,
)
test = cp.test_trigger(trigger["trigger_id"], "Large wire transfer to offshore account")
print(f"Would fire: {test['would_fire']}, Score: {test['similarity_score']}")

LocalEngine

Fully offline engine for local agent workflows. Available in the public SDK (pip install hatidata-agent). Provides the same agent features (memory, CoT, triggers, branches) without requiring a running proxy or control plane.

from hatidata_agent.local_engine import LocalEngine

engine = LocalEngine(db_path=".hati/local.db")

Query Interface

MethodParametersReturnsDescription
query()sql, params=Nonelist[dict]Execute SELECT
execute()sql, params=NoneintExecute DML, returns row count

Memory

MethodParametersReturnsDescription
store_memory()agent_id, content, memory_type="fact", metadata=None, importance=0.5strStore, returns UUID
search_memory()agent_id, query_text, top_k=10, memory_type=None, min_importance=Nonelist[dict]Text search
get_state()agent_id, keyAny or NoneGet key-value state
set_state()agent_id, key, valueNoneSet key-value state
delete_memory()memory_idboolDelete memory

Chain-of-Thought

MethodParametersReturnsDescription
log_reasoning_step()agent_id, session_id, step_type, content, metadata=None, importance=0.5strLog step, returns trace ID
replay_session()session_id, verify_chain=FalsedictReplay with chain_valid flag
list_sessions()agent_id=None, limit=50, since=Nonelist[dict]List sessions

Triggers

MethodParametersReturnsDescription
register_trigger()name, concept, threshold=0.7, action_type="flag_for_review"strRegister, returns ID
list_triggers()status=Nonelist[dict]List triggers
delete_trigger()trigger_idboolDelete trigger
test_trigger()trigger_id, contentdictTest trigger

Branches

MethodParametersReturnsDescription
branch_create()name=None, description=None, ttl_seconds=3600dictCreate branch
branch_query()branch_id, sqllist[dict]Query in branch
branch_merge()branch_id, strategy="branch_wins"dictMerge to main
branch_discard()branch_idboolDiscard branch
branch_list()status=Nonelist[dict]List branches

Example

with LocalEngine() as engine:
# Store and search memories locally
engine.store_memory("my-agent", "Revenue grew 20% in Q1", importance=0.9)
results = engine.search_memory("my-agent", "revenue growth")

# Log reasoning with hash-chained integrity
engine.log_reasoning_step("my-agent", "session-1", "observation", "Sales data shows upward trend")
engine.log_reasoning_step("my-agent", "session-1", "decision", "Recommend increasing inventory")
replay = engine.replay_session("session-1", verify_chain=True)
print(f"Chain valid: {replay['chain_valid']}")
note

The local engine stores all data in a single database file. Memory, CoT traces, triggers, and branches each use dedicated internal schemas for isolation.


Source Code

github.com/HatiOS-AI/HatiData-SDKs — sdk/python

Stay in the loop

Product updates, engineering deep-dives, and agent-native insights. No spam.