Skip to main content

Arrow Query Recipes

Practical recipes for using HatiData's Arrow-native query endpoint with Polars, Pandas, and PyArrow.


Recipe 1: Zero-Copy DataFrame Loading

Load large result sets directly into Polars without intermediate serialization:

import pyarrow as pa
import pyarrow.ipc as ipc
import polars as pl
import requests
import os

HATIDATA_URL = f"http://{os.environ['HATIDATA_HOST']}:{os.environ['HATIDATA_PORT']}"
API_KEY = os.environ["HATIDATA_API_KEY"]

def arrow_query(sql: str) -> pa.Table:
"""Execute SQL and return an Arrow Table."""
resp = requests.post(
f"{HATIDATA_URL}/v1/query/arrow",
json={"sql": sql},
headers={"Authorization": f"Bearer {API_KEY}"},
)
resp.raise_for_status()
return ipc.open_stream(resp.content).read_all()

def polars_query(sql: str) -> pl.DataFrame:
"""Execute SQL and return a Polars DataFrame (zero-copy)."""
return pl.from_arrow(arrow_query(sql))

# Load 1M rows in under 3 seconds
df = polars_query("""
SELECT order_id, customer_id, product_id, quantity, total, order_date
FROM orders
WHERE order_date >= '2025-01-01'
""")
print(f"Loaded {len(df):,} rows, {df.estimated_size('mb'):.1f} MB")

Recipe 2: Incremental Batch Processing

Process large datasets in batches without loading everything into memory:

def process_in_batches(sql: str, batch_size: int = 100_000):
"""Process query results in Arrow record batches."""
offset = 0
total_processed = 0

while True:
batch_sql = f"{sql} LIMIT {batch_size} OFFSET {offset}"
table = arrow_query(batch_sql)

if table.num_rows == 0:
break

df = pl.from_arrow(table)

# Process the batch
yield df

total_processed += table.num_rows
offset += batch_size

if table.num_rows < batch_size:
break # Last batch

print(f"Processed {total_processed:,} rows total")

# Example: compute aggregates across batches
running_total = 0
running_count = 0

for batch_df in process_in_batches("SELECT total FROM orders"):
running_total += batch_df["total"].sum()
running_count += len(batch_df)

print(f"Average order value: ${running_total / running_count:.2f}")

Recipe 3: ML Feature Engineering Pipeline

Build feature matrices for machine learning models:

import numpy as np

def build_customer_features() -> pl.DataFrame:
"""Build a customer feature matrix from HatiData."""
raw = polars_query("""
SELECT
c.customer_id,
c.tier,
c.created_at AS signup_date,
COUNT(o.order_id) AS order_count,
COALESCE(SUM(o.total), 0) AS total_spent,
COALESCE(AVG(o.total), 0) AS avg_order_value,
MAX(o.order_date) AS last_order_date,
MIN(o.order_date) AS first_order_date,
COUNT(DISTINCT o.product_id) AS unique_products
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.tier, c.created_at
""")

# Feature engineering in Polars
features = raw.with_columns(
# Tenure in days
(pl.col("last_order_date") - pl.col("first_order_date"))
.dt.total_days()
.fill_null(0)
.alias("customer_tenure_days"),

# Days since last order
(pl.lit(pl.Series([None]).cast(pl.Date).item()) - pl.col("last_order_date"))
.dt.total_days()
.fill_null(999)
.alias("days_since_last_order"),

# Revenue per order
(pl.col("total_spent") / pl.col("order_count").clip(lower_bound=1))
.alias("revenue_per_order"),

# Tier encoding
pl.col("tier")
.map_elements(lambda t: {"free": 0, "growth": 1, "enterprise": 2}.get(t, 0), return_dtype=pl.Int32)
.alias("tier_encoded"),
)

return features

features = build_customer_features()
print(f"Feature matrix: {features.shape}")
print(features.head(5))

# Convert to NumPy for scikit-learn
numeric_cols = [
"order_count", "total_spent", "avg_order_value",
"unique_products", "customer_tenure_days",
"days_since_last_order", "revenue_per_order", "tier_encoded",
]
X = features.select(numeric_cols).to_numpy()
print(f"NumPy shape: {X.shape}")

Recipe 4: Time-Series Analysis

Use Arrow queries for efficient time-series data processing:

def load_time_series(
table: str,
timestamp_col: str,
value_col: str,
start_date: str,
end_date: str,
interval: str = "1 hour",
) -> pl.DataFrame:
"""Load and bucket time-series data."""
return polars_query(f"""
SELECT
DATE_TRUNC('{interval}', {timestamp_col}) AS bucket,
COUNT(*) AS event_count,
AVG({value_col}) AS avg_value,
MIN({value_col}) AS min_value,
MAX({value_col}) AS max_value,
APPROX_QUANTILE({value_col}, 0.95) AS p95_value
FROM {table}
WHERE {timestamp_col} BETWEEN '{start_date}' AND '{end_date}'
GROUP BY bucket
ORDER BY bucket
""")

# Load hourly order data
ts = load_time_series(
table="orders",
timestamp_col="order_date",
value_col="total",
start_date="2025-11-01",
end_date="2025-12-01",
interval="1 day",
)

print(ts)

Recipe 5: Memory Search Results as DataFrames

Combine semantic search with Arrow for structured analysis of memories:

def search_memories_as_df(query: str, agent_id: str, top_k: int = 100) -> pl.DataFrame:
"""Search memories and return results as a Polars DataFrame."""
query_escaped = query.replace("'", "''")
return polars_query(f"""
SELECT
memory_id,
agent_id,
content,
metadata,
created_at,
semantic_rank(content, '{query_escaped}') AS relevance
FROM _hatidata_agent_memory
WHERE agent_id = '{agent_id}'
AND semantic_match(content, '{query_escaped}', 0.6)
ORDER BY relevance DESC
LIMIT {top_k}
""")

# Search and analyze
memories = search_memories_as_df("customer billing issues", "support-agent")

# Group by date
daily = (
memories
.with_columns(pl.col("created_at").dt.date().alias("date"))
.group_by("date")
.agg(
pl.count().alias("memory_count"),
pl.col("relevance").mean().alias("avg_relevance"),
)
.sort("date")
)
print(daily)

Recipe 6: Export to Parquet

Export query results to Parquet files for data lake integration:

import pyarrow.parquet as pq

def export_to_parquet(sql: str, output_path: str, compression: str = "snappy"):
"""Export query results to a Parquet file."""
table = arrow_query(sql)
pq.write_table(
table,
output_path,
compression=compression,
row_group_size=100_000,
)
print(f"Exported {table.num_rows:,} rows to {output_path}")

# Export monthly data
export_to_parquet(
sql="SELECT * FROM orders WHERE order_date >= '2025-12-01'",
output_path="/tmp/orders_dec_2025.parquet",
)

# Export with partitioning
def export_partitioned(sql: str, output_dir: str, partition_col: str):
"""Export with Hive-style partitioning."""
table = arrow_query(sql)
pq.write_to_dataset(
table,
output_dir,
partition_cols=[partition_col],
)

export_partitioned(
sql="SELECT *, DATE_TRUNC('month', order_date) AS month FROM orders",
output_dir="/tmp/orders_partitioned/",
partition_col="month",
)

Performance Tips

TipImpact
Use LIMIT in Arrow queriesReduces transfer size and memory usage
Prefer SELECT col1, col2 over SELECT *Arrow serializes only requested columns
Use polars_query() over pandas_query()Polars is 2-5x faster for analytics
Pre-aggregate in SQLLet the query engine do the heavy lifting
Use compression='zstd' for Parquet export20-30% smaller files vs snappy
Batch large exportsAvoid OOM with LIMIT/OFFSET batching

Stay in the loop

Product updates, engineering deep-dives, and agent-native insights. No spam.