Skip to main content

CrewAI Integration

The crewai-hatidata package provides four data warehouse tools and a persistent memory backend for building CrewAI agents that can explore schemas, run SQL queries, and share state across multi-agent crews.

Installation

pip install crewai-hatidata

Dependencies: hatidata-agent, crewai >= 0.30.0

Tools

The package provides four tools, each implementing CrewAI's BaseTool interface:

Tool ClassTool NameDescription
HatiDataQueryToolhatidata_queryExecute SQL and return results
HatiDataListTablesToolhatidata_list_tablesList available tables
HatiDataDescribeTableToolhatidata_describe_tableGet column schema for a table
HatiDataContextSearchToolhatidata_context_searchRAG full-text search over table content

Connection Parameters

All four tools accept the same connection parameters:

ParameterDefaultDescription
host"localhost"HatiData proxy hostname
port5439Proxy port
agent_id"crewai-agent"Agent identifier for billing and audit
database"hatidata"Database name
user"agent"Username
password""API key (hd_live_* or hd_test_*)

HatiDataQueryTool

Execute SQL queries against the data warehouse. Supports both standard SQL and legacy warehouse SQL syntax (auto-transpiled).

from crewai_hatidata import HatiDataQueryTool

query_tool = HatiDataQueryTool(
host="your-org.proxy.hatidata.com",
agent_id="analyst",
password="hd_live_your_api_key",
)

# The tool is called by the agent with a SQL string
result = query_tool._run("SELECT customer_id, SUM(total) as revenue FROM orders GROUP BY 1 ORDER BY 2 DESC LIMIT 5")

HatiDataListTablesTool

List all tables the agent has permission to access.

from crewai_hatidata import HatiDataListTablesTool

list_tool = HatiDataListTablesTool(
host="your-org.proxy.hatidata.com",
agent_id="analyst",
password="hd_live_your_api_key",
)

result = list_tool._run("")
# Returns: "customers, orders, products, events"

HatiDataDescribeTableTool

Get column names, data types, and nullability for a specific table.

from crewai_hatidata import HatiDataDescribeTableTool

describe_tool = HatiDataDescribeTableTool(
host="your-org.proxy.hatidata.com",
agent_id="analyst",
password="hd_live_your_api_key",
)

result = describe_tool._run("orders")
# Returns: "id INTEGER NOT NULL\ncustomer_id INTEGER NOT NULL\ntotal DECIMAL(10,2)\ncreated_at TIMESTAMP"

HatiDataContextSearchTool

Search a table's text columns using full-text search. Useful for RAG workflows where the agent needs context before constructing SQL.

from crewai_hatidata import HatiDataContextSearchTool

search_tool = HatiDataContextSearchTool(
host="your-org.proxy.hatidata.com",
agent_id="analyst",
password="hd_live_your_api_key",
)

result = search_tool._run({"table": "knowledge_base", "query": "enterprise pricing"})

HatiDataMemory

Persistent conversation and task memory backed by HatiData's SQL engine. Unlike CrewAI's default in-memory storage, HatiDataMemory persists across runs and can be shared between agents in a crew.

from crewai_hatidata import HatiDataMemory

memory = HatiDataMemory(
host="your-org.proxy.hatidata.com",
port=5439,
agent_id="crewai-crew",
password="hd_live_your_api_key",
session_id="project-alpha",
)

Memory entries are stored in the _hatidata_agent_memory table and can be queried with SQL for debugging or analysis.

Memory Parameters

ParameterDefaultDescription
host"localhost"HatiData proxy hostname
port5439Proxy port
agent_id"crewai-agent"Agent identifier
password""API key
session_idAuto-generatedGroups memories by session
max_entries1000Maximum memory entries to retain

Complete Example: Data Analysis Crew

A multi-agent crew where a researcher discovers the schema, an analyst writes queries, and a writer produces a report:

from crewai import Agent, Task, Crew, Process
from crewai_hatidata import (
HatiDataQueryTool,
HatiDataListTablesTool,
HatiDataDescribeTableTool,
HatiDataContextSearchTool,
HatiDataMemory,
)

# Shared connection config
conn = {
"host": "your-org.proxy.hatidata.com",
"password": "hd_live_your_api_key",
}

# Create tools -- each agent gets its own agent_id for billing attribution
research_tools = [
HatiDataListTablesTool(**conn, agent_id="researcher"),
HatiDataDescribeTableTool(**conn, agent_id="researcher"),
HatiDataContextSearchTool(**conn, agent_id="researcher"),
]

analysis_tools = [
HatiDataQueryTool(**conn, agent_id="analyst"),
HatiDataDescribeTableTool(**conn, agent_id="analyst"),
]

# Define agents
researcher = Agent(
role="Data Researcher",
goal="Discover relevant tables and understand the schema for the analysis task",
backstory="You are an expert at exploring data warehouses and understanding data models.",
tools=research_tools,
verbose=True,
)

analyst = Agent(
role="SQL Analyst",
goal="Write and execute precise SQL queries to answer business questions",
backstory="You are a senior data analyst who writes efficient, accurate SQL.",
tools=analysis_tools,
verbose=True,
)

writer = Agent(
role="Report Writer",
goal="Synthesize data analysis results into clear business reports",
backstory="You are a business intelligence specialist who turns data into insights.",
verbose=True,
)

# Define tasks
research_task = Task(
description="""Explore the data warehouse to find tables related to customer revenue.
List all tables, then describe the most relevant ones. Identify key columns
for revenue analysis.""",
agent=researcher,
expected_output="A summary of relevant tables and their schemas",
)

analysis_task = Task(
description="""Using the schema information from the researcher, write and execute SQL queries to:
1. Calculate total revenue by customer segment for Q4
2. Find month-over-month revenue growth rates
3. Identify the top 10 customers by lifetime value""",
agent=analyst,
expected_output="Query results with revenue by segment, growth rates, and top customers",
context=[research_task],
)

report_task = Task(
description="""Write a concise executive summary of the revenue analysis.
Include key metrics, trends, and actionable recommendations.""",
agent=writer,
expected_output="A 1-page executive summary with key findings and recommendations",
context=[analysis_task],
)

# Create and run the crew
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, report_task],
process=Process.sequential,
memory=True,
verbose=True,
)

result = crew.kickoff()
print(result)

Per-Agent Billing

Each tool instance carries its own agent_id, which means HatiData tracks query costs separately for each agent in the crew. This enables:

  • Cost attribution -- See which agents consume the most credits
  • Quota enforcement -- Set per-agent credit limits to prevent runaway queries
  • Audit trails -- Every query is logged with the specific agent and crew context
  • Policy evaluation -- ABAC rules can grant different permissions to different agents

View per-agent costs in the HatiData dashboard or query the audit log directly:

SELECT agent_id, COUNT(*) as queries, SUM(credits_used) as total_credits
FROM _hatidata_audit_log
WHERE framework = 'crewai'
GROUP BY agent_id
ORDER BY total_credits DESC;

Error Handling

CrewAI tools return error messages as strings rather than raising exceptions, so agents can reason about failures:

# If a query fails, the tool returns the error message
result = query_tool._run("SELECT * FROM nonexistent_table")
# Returns: "Error: Table 'nonexistent_table' not found"

# The agent can then adjust its approach based on the error

HatiData's AI healing feature can also automatically attempt to correct failed queries. If enabled on the proxy, the agent receives corrected results transparently.

Source Code

Next Steps

Stay in the loop

Product updates, engineering deep-dives, and agent-native insights. No spam.