Python SDK
HatiData provides two Python packages:
| Package | Version | Style | Use Case |
|---|---|---|---|
hatidata | v0.5.0+ | Async-first | Recommended for new projects. Full agent-native features. |
hatidata-agent | v0.3.x | Sync | Legacy sync client. Still supported but no new features. |
Async SDK (Recommended)
The hatidata package (v0.5.0+) is the recommended client. It is async-first, supports parameterized queries, and includes built-in modules for agent memory, chain-of-thought, and semantic triggers.
Installation
pip install hatidata
Requirements: Python 3.9+.
Connection
from hatidata import HatiData
db = HatiData(
host="preprod.hatidata.com",
port=5439,
api_key="hd_live_xxx",
database="main",
)
When connecting in cloud mode, the agent auto-registers with the control plane and receives a unique fingerprint. The fingerprint is used for billing, audit, and policy targeting without any manual setup.
Queries
# Parameterized queries (safe from injection)
rows = await db.query("SELECT * FROM users WHERE id = $1", [42])
# Insert helper
await db.insert("products", {"id": 1, "name": "Widget"})
# Execute DML
count = await db.execute("UPDATE users SET active = true WHERE id = $1", [42])
Agent Memory
Store and retrieve long-term memories with hybrid vector + SQL search.
# Store a memory
await db.memory.store(content="Enterprise customers renew at 95% in Q4", agent_id="my-agent", importance=0.8)
# Search memories by semantic similarity
results = await db.memory.search("organic coffee", top_k=5)
for r in results:
print(r["content"], r["similarity_score"])
Chain-of-Thought
Log immutable, hash-chained reasoning traces for audit and compliance.
# Log a reasoning step
await db.cot.log_step(session_id="s1", step_type="reasoning", content="Revenue is up 15% but concentrated in enterprise.")
# Replay a full session
trace = await db.cot.replay(session_id="s1")
print(trace["chain_valid"]) # True if hash chain is intact
Semantic Triggers
Register event-driven rules that fire when agent behavior matches a concept.
# Register a trigger
await db.triggers.register(
name="alert",
concept="accessing personally identifiable information",
action="webhook",
webhook_url="https://hooks.acme.com/pii-alert",
)
Legacy Sync Client
The hatidata-agent package is the legacy sync client. It remains supported but is not receiving new features. Prefer the async hatidata package for new projects.
Installation
pip install hatidata-agent
Requirements: Python 3.9+. Includes psycopg2-binary for Postgres connectivity.
Optional extras:
pip install hatidata-agent[langchain] # LangChain integration
pip install hatidata-agent[mcp] # MCP server support
pip install hatidata-agent[async] # Async support via asyncpg
pip install hatidata-agent[all] # Everything
HatiDataAgent
Constructor
from hatidata_agent import HatiDataAgent
agent = HatiDataAgent(
host="localhost", # Proxy hostname
port=5439, # Proxy port (default: 5439)
agent_id="my-agent", # Unique agent identifier
framework="custom", # AI framework name
database="hatidata", # Database name
user="agent", # Username
password="hd_live_...", # API key
)
| Parameter | Type | Default | Description |
|---|---|---|---|
host | str | "localhost" | Proxy hostname |
port | int | 5439 | Proxy port |
agent_id | str | Auto-generated | Unique agent identifier |
framework | str | "custom" | AI framework (langchain, crewai, autogen, custom) |
database | str | "hatidata" | Database name |
user | str | "agent" | Username |
password | str | "" | Password or API key |
priority | str | "normal" | Query priority: low, normal, high, critical |
connect_timeout | int | 10 | Connection timeout in seconds |
The agent_id and framework are passed to the proxy as startup parameters. The proxy uses them for billing, scheduling, policy evaluation, and audit.
query()
Execute a SELECT query. Returns a list of dicts.
rows = agent.query("SELECT * FROM customers WHERE status = 'active' LIMIT 10")
for row in rows:
print(row["name"], row["email"])
With parameterized queries (recommended to prevent SQL injection):
rows = agent.query(
"SELECT * FROM orders WHERE customer_id = %s AND status = %s",
params=(42, "active"),
)
With request tracing:
rows = agent.query(
"SELECT * FROM orders",
request_id="trace-abc-123",
)
The request_id appears in the audit log, letting you correlate agent actions with specific queries.
| Parameter | Type | Description |
|---|---|---|
sql | str | SQL query (Snowflake syntax auto-transpiled) |
params | tuple | Query parameters for parameterized queries |
request_id | str | Optional trace identifier |
| Returns | list[dict] | Rows as list of dicts |
execute()
Execute INSERT, UPDATE, or DELETE. Returns the affected row count.
count = agent.execute("INSERT INTO logs VALUES (1, 'agent started', NOW())")
print(f"{count} rows inserted")
count = agent.execute(
"UPDATE customers SET status = %s WHERE customer_id = %s",
params=("churned", 42),
)
store_memory()
Store a fact, episode, preference, or context entry in long-term agent memory.
memory_id = agent.store_memory(
content="Enterprise customers have a 95% renewal rate in Q4",
memory_type="fact",
metadata={"source": "quarterly-review", "quarter": "Q4-2025"},
importance=0.9,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
content | str | required | Memory content |
memory_type | str | "fact" | fact, episode, preference, or context |
metadata | dict | {} | Arbitrary key-value metadata |
importance | float | 0.5 | Importance score 0.0–1.0 |
| Returns | str | UUID of the stored memory |
search_memory()
Search memories using natural language with optional filters. Uses hybrid vector ANN + structured metadata search.
memories = agent.search_memory(
query="customer renewal rates",
top_k=5,
memory_type="fact",
min_importance=0.5,
)
for m in memories:
print(m["content"], m["similarity_score"])
| Parameter | Type | Default | Description |
|---|---|---|---|
query | str | required | Natural language search query |
top_k | int | 10 | Maximum results |
memory_type | str | None | Filter by type |
min_importance | float | None | Minimum importance score |
agent_id | str | current agent | Filter by agent |
| Returns | list[dict] | Memories ranked by similarity score |
log_reasoning_step()
Log a step in an agent's reasoning chain. Steps are cryptographic hash-chained per session for tamper detection.
trace = agent.log_reasoning_step(
session_id="analysis-2026-02-25",
step_type="thought",
content="Revenue is up 15% but concentrated in enterprise. Need to check SMB trends.",
metadata={"confidence": 0.85},
importance=0.7,
)
print(trace["step_number"], trace["hash"])
Valid step_type values: observation, thought, action, result, reflection, planning, tool_call, tool_result, decision, error, correction, summary.
Steps with type decision, error, or correction are always embedded for semantic search regardless of the configured sampling rate.
Branch Operations
branch_create()
Create an isolated copy-on-write branch for speculative operations.
branch = agent.branch_create(
name="what-if-pricing",
description="Simulate 10% price increase on enterprise tier",
ttl_seconds=7200,
)
branch_id = branch["branch_id"]
branch_query()
Execute SQL within the branch's isolated schema.
rows = agent.branch_query(
branch_id="br_a1b2c3d4",
sql="UPDATE products SET price = price * 1.10 WHERE tier = 'enterprise'",
)
Writes within a branch use copy-on-write isolation. The main data layer is never affected until you merge.
branch_merge()
Merge branch changes back into the main schema.
result = agent.branch_merge(
branch_id="br_a1b2c3d4",
strategy="branch_wins", # or "main_wins", "abort"
)
print(result["tables_merged"], result["conflicts"])
branch_discard()
Discard a branch and clean up its isolated schema.
agent.branch_discard(branch_id="br_a1b2c3d4")
Agent State
Get and set named state values scoped to the agent identity.
# Set state
agent.set_agent_state("current_task", "Analyzing Q4 revenue")
# Get state
state = agent.get_agent_state("current_task")
print(state["value"], state["updated_at"])
State is stored in _hatidata_agent_state, scoped by agent_id.
Context Manager
with HatiDataAgent(host="localhost", agent_id="my-agent") as agent:
rows = agent.query("SELECT COUNT(*) FROM orders")
# Connection automatically closed when the block exits
Reasoning Chains
Tag related queries with a shared request_id for end-to-end tracing across multi-step operations:
with agent.reasoning_chain("req-001") as chain:
tables = chain.query("SELECT table_name FROM information_schema.tables")
data = chain.query("SELECT * FROM customers LIMIT 10", step=1)
summary = chain.query(
"SELECT status, COUNT(*) AS cnt FROM customers GROUP BY status",
step=2,
)
Nested chains are supported via parent_request_id:
with agent.reasoning_chain("req-002", parent_request_id="req-001") as sub_chain:
detail = sub_chain.query("SELECT * FROM orders WHERE customer_id = 42")
RAG Context Retrieval
Full-Text Search
context = agent.get_context(
table="knowledge_base",
search_query="enterprise pricing tiers",
top_k=5,
)
# Returns: [{"id": 1, "content": "...", "title": "..."}, ...]
Vector Similarity Search
context = agent.get_rag_context(
table="documents",
embedding_col="embedding",
vector=[0.1, 0.2, 0.3, ...], # Query embedding vector
top_k=10,
)
Methods Reference
| Method | Returns | Description |
|---|---|---|
query(sql, params, request_id) | list[dict] | Execute SELECT, return rows as dicts |
execute(sql, params) | int | Execute INSERT/UPDATE/DELETE, return row count |
store_memory(content, memory_type, metadata, importance) | str | Store memory, return UUID |
search_memory(query, top_k, memory_type, min_importance) | list[dict] | Search memories by semantic similarity |
log_reasoning_step(session_id, step_type, content, metadata, importance) | dict | Log CoT step, return trace metadata |
get_agent_state(key) | dict | Get named state value |
set_agent_state(key, value) | dict | Set named state value |
branch_create(name, description, ttl_seconds) | dict | Create isolated branch |
branch_query(branch_id, sql) | list[dict] | Execute SQL within a branch |
branch_merge(branch_id, strategy) | dict | Merge branch to main data layer |
branch_discard(branch_id) | dict | Discard branch |
get_context(table, search_query, top_k) | list[dict] | Full-text RAG retrieval |
get_rag_context(table, embedding_col, vector, top_k) | list[dict] | Vector similarity retrieval |
reasoning_chain(request_id, parent_request_id) | Context manager | Multi-step reasoning tracking |
close() | None | Close the database connection |
Using Standard psycopg2
HatiData speaks the Postgres wire protocol. Any Postgres driver connects directly:
import psycopg2
conn = psycopg2.connect(
host="localhost",
port=5439,
user="agent",
password="hd_live_...",
dbname="hatidata",
application_name="my-agent/custom", # agent_id/framework convention
)
with conn.cursor() as cur:
cur.execute("SELECT * FROM customers LIMIT 10")
rows = cur.fetchall()
conn.close()
The proxy parses application_name to extract agent identity for billing and audit when connecting outside the SDK.
ControlPlaneClient
REST API client for the HatiData Control Plane — manages agent memory, triggers, branches, CoT traces, JIT access, and agent keys.
from hatidata_agent import ControlPlaneClient
cp = ControlPlaneClient(
base_url="https://api.hatidata.com",
api_key="hd_live_...",
org_id="org_abc123",
env_id="env_prod_x1y2",
)
Constructor Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
base_url | str | http://localhost:8080 | Control plane API endpoint |
email | str | "" | Email for JWT login (alternative to api_key) |
password | str | "" | Password for JWT login |
api_key | str | "" | API key (hd_live_*) for authentication |
org_id | str | "" | Organization ID |
env_id | str | "" | Environment ID |
timeout | int | 15 | Request timeout (seconds) |
Memory Methods
| Method | Parameters | Returns | Description |
|---|---|---|---|
create_memory() | agent_id, content, memory_type="observation" | dict | Store new memory |
list_memories() | agent_id=None, memory_type=None, limit=50 | list[dict] | List memories |
search_memory() | query, agent_id=None, top_k=10 | list[dict] | Semantic search |
delete_memory() | memory_id | dict | Delete memory |
Trigger Methods
| Method | Parameters | Returns | Description |
|---|---|---|---|
create_trigger() | name, concept, threshold=0.85, actions=None, cooldown_secs=60 | dict | Register trigger |
list_triggers() | — | list[dict] | List triggers |
test_trigger() | trigger_id, text | dict | Test trigger (dry-run) |
delete_trigger() | trigger_id | dict | Delete trigger |
Branch Methods
| Method | Parameters | Returns | Description |
|---|---|---|---|
create_branch() | agent_id, tables, description=None | dict | Create branch |
list_branches() | — | list[dict] | List branches |
get_branch() | branch_id | dict | Get details |
merge_branch() | branch_id, strategy="branch_wins" | dict | Merge to main |
discard_branch() | branch_id | dict | Discard branch |
branch_diff() | branch_id | dict | Get diff |
branch_conflicts() | branch_id | dict | Get conflicts |
Chain-of-Thought Methods
| Method | Parameters | Returns | Description |
|---|---|---|---|
ingest_cot() | traces (list of dicts) | dict | Ingest reasoning traces |
list_cot_sessions() | — | list[dict] | List sessions |
replay_cot() | session_id | dict | Replay session |
verify_cot() | session_id | dict | Verify hash chain |
JIT Access Methods
| Method | Parameters | Returns | Description |
|---|---|---|---|
request_jit() | target_role, reason, duration_hours=1 | dict | Request JIT access |
list_jit_grants() | — | list[dict] | List active grants |
Agent Key Methods
| Method | Parameters | Returns | Description |
|---|---|---|---|
create_agent_key() | agent_name, framework="custom", allowed_tables=None, expires_in_days=None | dict | Create agent key |
list_agent_keys() | — | list[dict] | List agent keys |
rotate_agent_key() | key_id | dict | Rotate key |
revoke_agent_key() | key_id | dict | Revoke key |
Static Helpers
| Method | Parameters | Returns | Description |
|---|---|---|---|
build_cot_trace() | session_id, agent_id, org_id, step_index, step_type, content, prev_hash="" | dict | Build single CoT trace with SHA-256 hash |
build_cot_session() | agent_id, org_id, steps | (str, list[dict]) | Build hash-chained session |
Example
# Store and search memories
cp.create_memory("research-agent", "Customer churn increased 15% in Q4")
results = cp.search_memory("churn trends", top_k=5)
# Create and test a semantic trigger
trigger = cp.create_trigger(
name="fraud-detector",
concept="suspicious transaction patterns",
threshold=0.85,
)
test = cp.test_trigger(trigger["trigger_id"], "Large wire transfer to offshore account")
print(f"Would fire: {test['would_fire']}, Score: {test['similarity_score']}")
LocalEngine
Fully offline engine for local agent workflows. Available in the public SDK (pip install hatidata-agent). Provides the same agent features (memory, CoT, triggers, branches) without requiring a running proxy or control plane.
from hatidata_agent.local_engine import LocalEngine
engine = LocalEngine(db_path=".hati/local.db")
Query Interface
| Method | Parameters | Returns | Description |
|---|---|---|---|
query() | sql, params=None | list[dict] | Execute SELECT |
execute() | sql, params=None | int | Execute DML, returns row count |
Memory
| Method | Parameters | Returns | Description |
|---|---|---|---|
store_memory() | agent_id, content, memory_type="fact", metadata=None, importance=0.5 | str | Store, returns UUID |
search_memory() | agent_id, query_text, top_k=10, memory_type=None, min_importance=None | list[dict] | Text search |
get_state() | agent_id, key | Any or None | Get key-value state |
set_state() | agent_id, key, value | None | Set key-value state |
delete_memory() | memory_id | bool | Delete memory |
Chain-of-Thought
| Method | Parameters | Returns | Description |
|---|---|---|---|
log_reasoning_step() | agent_id, session_id, step_type, content, metadata=None, importance=0.5 | str | Log step, returns trace ID |
replay_session() | session_id, verify_chain=False | dict | Replay with chain_valid flag |
list_sessions() | agent_id=None, limit=50, since=None | list[dict] | List sessions |
Triggers
| Method | Parameters | Returns | Description |
|---|---|---|---|
register_trigger() | name, concept, threshold=0.7, action_type="flag_for_review" | str | Register, returns ID |
list_triggers() | status=None | list[dict] | List triggers |
delete_trigger() | trigger_id | bool | Delete trigger |
test_trigger() | trigger_id, content | dict | Test trigger |
Branches
| Method | Parameters | Returns | Description |
|---|---|---|---|
branch_create() | name=None, description=None, ttl_seconds=3600 | dict | Create branch |
branch_query() | branch_id, sql | list[dict] | Query in branch |
branch_merge() | branch_id, strategy="branch_wins" | dict | Merge to main |
branch_discard() | branch_id | bool | Discard branch |
branch_list() | status=None | list[dict] | List branches |
Example
with LocalEngine() as engine:
# Store and search memories locally
engine.store_memory("my-agent", "Revenue grew 20% in Q1", importance=0.9)
results = engine.search_memory("my-agent", "revenue growth")
# Log reasoning with hash-chained integrity
engine.log_reasoning_step("my-agent", "session-1", "observation", "Sales data shows upward trend")
engine.log_reasoning_step("my-agent", "session-1", "decision", "Recommend increasing inventory")
replay = engine.replay_session("session-1", verify_chain=True)
print(f"Chain valid: {replay['chain_valid']}")
The local engine stores all data in a single database file. Memory, CoT traces, triggers, and branches each use dedicated internal schemas for isolation.