Skip to main content

CrewAI Integration

HatiData integrates with CrewAI as an agent-native data layer — giving multi-agent crews governed access to your structured data, persistent memory that survives between runs, and per-agent billing attribution for cost tracking. Each agent in a crew gets its own agent_id, so HatiData can audit, meter, and enforce policy per role rather than per crew.

The crewai-hatidata package provides four BaseTool implementations and a persistent memory backend:

ComponentPurpose
HatiDataQueryToolExecute SQL against the data layer
HatiDataListTablesToolList tables the agent has permission to see
HatiDataDescribeTableToolGet column schema for a table
HatiDataContextSearchToolRAG full-text search over table content
HatiDataMemoryPersistent task and conversation memory

Installation

pip install crewai-hatidata

Dependencies: hatidata-agent, crewai >= 0.30.0


Tools

All four tools share the same connection parameters:

ParameterDefaultDescription
host"localhost"HatiData proxy hostname
port5439Proxy port
agent_id"crewai-agent"Agent identifier for billing and audit
database"hatidata"Database name
user"agent"Username
password""API key (hd_live_* or hd_test_*)

HatiDataQueryTool

Execute SQL queries against the data layer. Supports both standard SQL and legacy dialect syntax (NVL, IFF, DATEDIFF) — auto-transpiled to native equivalents. The full multi-stage query pipeline runs on every call.

from crewai_hatidata import HatiDataQueryTool

query_tool = HatiDataQueryTool(
host="your-org.proxy.hatidata.com",
agent_id="analyst",
password="hd_live_your_api_key",
)

result = query_tool._run(
"SELECT customer_id, SUM(total) as revenue FROM orders GROUP BY 1 ORDER BY 2 DESC LIMIT 5"
)

HatiDataListTablesTool

List all tables the agent has permission to access (filtered by ABAC policy).

from crewai_hatidata import HatiDataListTablesTool

list_tool = HatiDataListTablesTool(
host="your-org.proxy.hatidata.com",
agent_id="analyst",
password="hd_live_your_api_key",
)

result = list_tool._run("")
# Returns: "customers, orders, products, events"

HatiDataDescribeTableTool

Get column names, data types, and nullability for a specific table.

from crewai_hatidata import HatiDataDescribeTableTool

describe_tool = HatiDataDescribeTableTool(
host="your-org.proxy.hatidata.com",
agent_id="analyst",
password="hd_live_your_api_key",
)

result = describe_tool._run("orders")
# Returns: "id INTEGER NOT NULL\ncustomer_id INTEGER NOT NULL\ntotal DECIMAL(10,2)\ncreated_at TIMESTAMP"

HatiDataContextSearchTool

Full-text search over a table's text columns. Useful for RAG workflows where the agent needs context before constructing SQL queries.

from crewai_hatidata import HatiDataContextSearchTool

search_tool = HatiDataContextSearchTool(
host="your-org.proxy.hatidata.com",
agent_id="analyst",
password="hd_live_your_api_key",
)

result = search_tool._run({"table": "knowledge_base", "query": "enterprise pricing"})

HatiDataMemory

Persistent task and conversation memory backed by HatiData's SQL engine. Unlike CrewAI's default in-memory storage, HatiDataMemory persists across runs and can be shared between agents in a crew through a common session_id.

from crewai_hatidata import HatiDataMemory

memory = HatiDataMemory(
host="your-org.proxy.hatidata.com",
port=5439,
agent_id="crewai-crew",
password="hd_live_your_api_key",
session_id="project-alpha",
)

Memory entries are stored in _hatidata_agent_memory and can be queried directly with SQL for debugging or analysis.

Memory Parameters

ParameterDefaultDescription
host"localhost"HatiData proxy hostname
port5439Proxy port
agent_id"crewai-agent"Agent identifier
password""API key
session_idAuto-generatedGroups memories by session or project
max_entries1000Maximum memory entries to retain

Complete Example: Multi-Agent Data Analysis Crew

A three-agent crew where a researcher discovers the schema, an analyst runs queries, and a writer produces the report:

from crewai import Agent, Task, Crew, Process
from crewai_hatidata import (
HatiDataQueryTool,
HatiDataListTablesTool,
HatiDataDescribeTableTool,
HatiDataContextSearchTool,
HatiDataMemory,
)

# Shared connection config
conn = {
"host": "your-org.proxy.hatidata.com",
"password": "hd_live_your_api_key",
}

# Each agent gets its own agent_id for per-role billing attribution
research_tools = [
HatiDataListTablesTool(**conn, agent_id="researcher"),
HatiDataDescribeTableTool(**conn, agent_id="researcher"),
HatiDataContextSearchTool(**conn, agent_id="researcher"),
]

analysis_tools = [
HatiDataQueryTool(**conn, agent_id="analyst"),
HatiDataDescribeTableTool(**conn, agent_id="analyst"),
]

# Define agents
researcher = Agent(
role="Data Researcher",
goal="Discover relevant tables and understand the schema for the analysis task",
backstory="You are an expert at exploring data infrastructure and understanding data models.",
tools=research_tools,
verbose=True,
)

analyst = Agent(
role="SQL Analyst",
goal="Write and execute precise SQL queries to answer business questions",
backstory="You are a senior data analyst who writes efficient, accurate SQL.",
tools=analysis_tools,
verbose=True,
)

writer = Agent(
role="Report Writer",
goal="Synthesize data analysis results into clear business reports",
backstory="You are a business intelligence specialist who turns data into insights.",
verbose=True,
)

# Define tasks
research_task = Task(
description="""Explore the data layer to find tables related to customer revenue.
List all tables, then describe the most relevant ones. Identify key columns
for revenue analysis.""",
agent=researcher,
expected_output="A summary of relevant tables and their schemas",
)

analysis_task = Task(
description="""Using the schema information from the researcher, write and execute SQL queries to:
1. Calculate total revenue by customer segment for Q4
2. Find month-over-month revenue growth rates
3. Identify the top 10 customers by lifetime value""",
agent=analyst,
expected_output="Query results with revenue by segment, growth rates, and top customers",
context=[research_task],
)

report_task = Task(
description="""Write a concise executive summary of the revenue analysis.
Include key metrics, trends, and actionable recommendations.""",
agent=writer,
expected_output="A 1-page executive summary with key findings and recommendations",
context=[analysis_task],
)

# Create and run the crew
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, report_task],
process=Process.sequential,
memory=True,
verbose=True,
)

result = crew.kickoff()
print(result)

Per-Agent Billing and Audit

Each tool instance carries its own agent_id, so HatiData tracks costs and enforces quotas separately per agent role:

-- Query audit log to see per-role costs
SELECT agent_id, COUNT(*) as queries, SUM(credits_used) as total_credits
FROM _hatidata_audit_log
WHERE framework = 'crewai'
GROUP BY agent_id
ORDER BY total_credits DESC;

This enables:

  • Cost attribution — See which agent roles consume the most credits
  • Quota enforcement — Set per-agent credit limits to prevent runaway queries
  • Audit trails — Every query logged with the specific agent and crew context
  • Policy evaluation — ABAC rules can grant different data permissions per agent role

Error Handling

CrewAI tools return error messages as strings so agents can reason about failures and adjust their approach:

result = query_tool._run("SELECT * FROM nonexistent_table")
# Returns: "Error: Table 'nonexistent_table' not found"
# The agent can then list tables and retry with the correct name

If AI healing is enabled on the proxy (HATIDATA_AI_HEAL=true), the proxy will attempt to auto-correct failed queries and the agent receives corrected results transparently.

Source Code


Stay in the loop

Product updates, engineering deep-dives, and agent-native insights. No spam.