Arrow & Polars Integration
HatiData exposes an Arrow-native query endpoint that returns results in Apache Arrow IPC stream format. This enables zero-copy loading into Polars, Pandas, and any Arrow-compatible library without intermediate serialization. For analytical workloads and ML feature pipelines, this path is significantly faster than row-based Postgres wire protocol results.
Installation
pip install hatidata pyarrow polars pandas
Requirements: Python 3.10+, a running HatiData proxy with the Arrow endpoint enabled.
export HATIDATA_API_KEY="hd_live_your_api_key"
export HATIDATA_HOST="localhost"
export HATIDATA_PORT=5439
Arrow Query Endpoint
The HatiData proxy exposes an HTTP endpoint at /v1/query/arrow that accepts SQL and returns results as an Arrow IPC byte stream. The underlying query engine produces Arrow record batches natively, so no row-to-columnar conversion is needed.
import pyarrow as pa
import pyarrow.ipc as ipc
import requests
import os
HATIDATA_URL = f"http://{os.environ['HATIDATA_HOST']}:{os.environ['HATIDATA_PORT']}"
API_KEY = os.environ["HATIDATA_API_KEY"]
def query_arrow(sql: str) -> pa.Table:
"""Execute a SQL query and return an Arrow Table."""
response = requests.post(
f"{HATIDATA_URL}/v1/query/arrow",
json={"sql": sql},
headers={"Authorization": f"Bearer {API_KEY}"},
)
response.raise_for_status()
reader = ipc.open_stream(response.content)
return reader.read_all()
# Execute a query
table = query_arrow("SELECT * FROM orders WHERE status = 'completed' LIMIT 10000")
print(f"Rows: {table.num_rows}, Columns: {table.num_columns}")
print(table.schema)
Loading into Polars
Polars can read directly from Arrow tables with zero data copying:
import polars as pl
# Zero-copy conversion from Arrow to Polars
arrow_table = query_arrow("""
SELECT
customer_id,
order_date,
total,
product_category
FROM orders
WHERE order_date >= '2025-01-01'
""")
df = pl.from_arrow(arrow_table)
# Now use Polars for fast local analytics
summary = (
df.group_by("product_category")
.agg(
pl.col("total").sum().alias("total_revenue"),
pl.col("total").mean().alias("avg_order_value"),
pl.col("customer_id").n_unique().alias("unique_customers"),
)
.sort("total_revenue", descending=True)
)
print(summary)
Lazy Frame Pipeline
For large datasets, use Polars lazy frames to defer computation:
# Load a large dataset
arrow_table = query_arrow("SELECT * FROM events WHERE event_date >= '2025-01-01'")
# Build a lazy pipeline
result = (
pl.from_arrow(arrow_table)
.lazy()
.filter(pl.col("event_type") == "purchase")
.group_by("user_id")
.agg(
pl.col("amount").sum().alias("total_spent"),
pl.col("event_id").count().alias("purchase_count"),
)
.filter(pl.col("total_spent") > 100)
.sort("total_spent", descending=True)
.collect()
)
Loading into Pandas
Convert Arrow tables to Pandas DataFrames using PyArrow's built-in conversion:
import pandas as pd
arrow_table = query_arrow("SELECT * FROM customers LIMIT 5000")
# Convert to Pandas (uses Arrow-backed columns for efficiency)
df = arrow_table.to_pandas(types_mapper=pd.ArrowDtype)
# Standard Pandas operations
print(df.describe())
print(df.groupby("tier")["revenue"].mean())
Using types_mapper=pd.ArrowDtype keeps the Arrow memory layout under the hood, avoiding a full data copy. This requires Pandas 2.0+.
Batch Processing
For datasets too large to fit in memory, process Arrow record batches incrementally:
import pyarrow.ipc as ipc
response = requests.post(
f"{HATIDATA_URL}/v1/query/arrow",
json={"sql": "SELECT * FROM large_events_table"},
headers={"Authorization": f"Bearer {API_KEY}"},
stream=True,
)
reader = ipc.open_stream(response.content)
total_rows = 0
for batch in reader:
# Process each RecordBatch independently
df = pl.from_arrow(batch)
total_rows += len(df)
# Example: write each batch to Parquet
df.write_parquet(f"output/batch_{total_rows}.parquet")
print(f"Processed {total_rows} rows in batches")
ML Feature Pipeline
Use Arrow queries to build feature pipelines for machine learning:
import polars as pl
import numpy as np
# Fetch features from HatiData
features_table = query_arrow("""
SELECT
customer_id,
COUNT(*) AS order_count,
SUM(total) AS total_spent,
AVG(total) AS avg_order_value,
MAX(order_date) AS last_order_date,
MIN(order_date) AS first_order_date
FROM orders
GROUP BY customer_id
""")
df = pl.from_arrow(features_table)
# Engineer features
features = df.with_columns(
(pl.col("last_order_date") - pl.col("first_order_date"))
.dt.total_days()
.alias("customer_tenure_days"),
(pl.col("total_spent") / pl.col("order_count")).alias("revenue_per_order"),
)
# Convert to NumPy for scikit-learn
X = features.select(
"order_count", "total_spent", "avg_order_value",
"customer_tenure_days", "revenue_per_order",
).to_numpy()
print(f"Feature matrix shape: {X.shape}")
Performance Comparison
| Method | 100K rows | 1M rows | Format |
|---|---|---|---|
| Postgres wire (psycopg2) | ~2.1s | ~18s | Row-based |
| Arrow endpoint + PyArrow | ~0.3s | ~2.4s | Columnar |
| Arrow endpoint + Polars | ~0.3s | ~2.2s | Columnar (zero-copy) |
The Arrow endpoint is 5-8x faster for analytical workloads because the query engine produces Arrow record batches natively and no row-to-column conversion is needed on either side.
Related Concepts
- Arrow Query API -- Full API reference for the Arrow endpoint
- Arrow Query Recipes -- Advanced Arrow usage patterns
- Query Pipeline -- How queries flow through the proxy
- Cost Model -- Understanding query costs
- Postgres Drivers -- Row-based alternative