Skip to main content

Arrow & Polars Integration

HatiData exposes an Arrow-native query endpoint that returns results in Apache Arrow IPC stream format. This enables zero-copy loading into Polars, Pandas, and any Arrow-compatible library without intermediate serialization. For analytical workloads and ML feature pipelines, this path is significantly faster than row-based Postgres wire protocol results.

Installation

pip install hatidata pyarrow polars pandas

Requirements: Python 3.10+, a running HatiData proxy with the Arrow endpoint enabled.

export HATIDATA_API_KEY="hd_live_your_api_key"
export HATIDATA_HOST="localhost"
export HATIDATA_PORT=5439

Arrow Query Endpoint

The HatiData proxy exposes an HTTP endpoint at /v1/query/arrow that accepts SQL and returns results as an Arrow IPC byte stream. The underlying query engine produces Arrow record batches natively, so no row-to-columnar conversion is needed.

import pyarrow as pa
import pyarrow.ipc as ipc
import requests
import os

HATIDATA_URL = f"http://{os.environ['HATIDATA_HOST']}:{os.environ['HATIDATA_PORT']}"
API_KEY = os.environ["HATIDATA_API_KEY"]

def query_arrow(sql: str) -> pa.Table:
"""Execute a SQL query and return an Arrow Table."""
response = requests.post(
f"{HATIDATA_URL}/v1/query/arrow",
json={"sql": sql},
headers={"Authorization": f"Bearer {API_KEY}"},
)
response.raise_for_status()

reader = ipc.open_stream(response.content)
return reader.read_all()

# Execute a query
table = query_arrow("SELECT * FROM orders WHERE status = 'completed' LIMIT 10000")
print(f"Rows: {table.num_rows}, Columns: {table.num_columns}")
print(table.schema)

Loading into Polars

Polars can read directly from Arrow tables with zero data copying:

import polars as pl

# Zero-copy conversion from Arrow to Polars
arrow_table = query_arrow("""
SELECT
customer_id,
order_date,
total,
product_category
FROM orders
WHERE order_date >= '2025-01-01'
""")

df = pl.from_arrow(arrow_table)

# Now use Polars for fast local analytics
summary = (
df.group_by("product_category")
.agg(
pl.col("total").sum().alias("total_revenue"),
pl.col("total").mean().alias("avg_order_value"),
pl.col("customer_id").n_unique().alias("unique_customers"),
)
.sort("total_revenue", descending=True)
)

print(summary)

Lazy Frame Pipeline

For large datasets, use Polars lazy frames to defer computation:

# Load a large dataset
arrow_table = query_arrow("SELECT * FROM events WHERE event_date >= '2025-01-01'")

# Build a lazy pipeline
result = (
pl.from_arrow(arrow_table)
.lazy()
.filter(pl.col("event_type") == "purchase")
.group_by("user_id")
.agg(
pl.col("amount").sum().alias("total_spent"),
pl.col("event_id").count().alias("purchase_count"),
)
.filter(pl.col("total_spent") > 100)
.sort("total_spent", descending=True)
.collect()
)

Loading into Pandas

Convert Arrow tables to Pandas DataFrames using PyArrow's built-in conversion:

import pandas as pd

arrow_table = query_arrow("SELECT * FROM customers LIMIT 5000")

# Convert to Pandas (uses Arrow-backed columns for efficiency)
df = arrow_table.to_pandas(types_mapper=pd.ArrowDtype)

# Standard Pandas operations
print(df.describe())
print(df.groupby("tier")["revenue"].mean())
note

Using types_mapper=pd.ArrowDtype keeps the Arrow memory layout under the hood, avoiding a full data copy. This requires Pandas 2.0+.


Batch Processing

For datasets too large to fit in memory, process Arrow record batches incrementally:

import pyarrow.ipc as ipc

response = requests.post(
f"{HATIDATA_URL}/v1/query/arrow",
json={"sql": "SELECT * FROM large_events_table"},
headers={"Authorization": f"Bearer {API_KEY}"},
stream=True,
)

reader = ipc.open_stream(response.content)
total_rows = 0

for batch in reader:
# Process each RecordBatch independently
df = pl.from_arrow(batch)
total_rows += len(df)

# Example: write each batch to Parquet
df.write_parquet(f"output/batch_{total_rows}.parquet")

print(f"Processed {total_rows} rows in batches")

ML Feature Pipeline

Use Arrow queries to build feature pipelines for machine learning:

import polars as pl
import numpy as np

# Fetch features from HatiData
features_table = query_arrow("""
SELECT
customer_id,
COUNT(*) AS order_count,
SUM(total) AS total_spent,
AVG(total) AS avg_order_value,
MAX(order_date) AS last_order_date,
MIN(order_date) AS first_order_date
FROM orders
GROUP BY customer_id
""")

df = pl.from_arrow(features_table)

# Engineer features
features = df.with_columns(
(pl.col("last_order_date") - pl.col("first_order_date"))
.dt.total_days()
.alias("customer_tenure_days"),
(pl.col("total_spent") / pl.col("order_count")).alias("revenue_per_order"),
)

# Convert to NumPy for scikit-learn
X = features.select(
"order_count", "total_spent", "avg_order_value",
"customer_tenure_days", "revenue_per_order",
).to_numpy()

print(f"Feature matrix shape: {X.shape}")

Performance Comparison

Method100K rows1M rowsFormat
Postgres wire (psycopg2)~2.1s~18sRow-based
Arrow endpoint + PyArrow~0.3s~2.4sColumnar
Arrow endpoint + Polars~0.3s~2.2sColumnar (zero-copy)

The Arrow endpoint is 5-8x faster for analytical workloads because the query engine produces Arrow record batches natively and no row-to-column conversion is needed on either side.


Stay in the loop

Product updates, engineering deep-dives, and agent-native insights. No spam.