Memory Patterns
Practical patterns for building agents that remember — from RAG pipelines to entity stores to cross-agent shared memory.
Pattern 1: RAG with Hybrid Search
Combine semantic similarity with structured SQL filters to retrieve only the most relevant memories for a given agent context.
When to use: Question-answering agents that need to ground responses in stored knowledge while also filtering by metadata such as owner, date, tag, or project.
import hatidata
client = hatidata.Client(api_key="hd_...")
# Store a memory with metadata
client.memory.store(
agent_id="research-agent",
content="Quarterly revenue grew 18% YoY driven by enterprise tier expansion.",
metadata={
"source": "q4-report",
"department": "finance",
"date": "2025-12-31",
"importance": 0.9,
},
)
# Hybrid search: semantic match + structured filter
results = client.memory.search(
agent_id="research-agent",
query="What drove revenue growth last quarter?",
filters={
"department": "finance",
"date": {"gte": "2025-10-01"},
},
top_k=5,
min_score=0.75,
)
for mem in results:
print(f"[{mem.score:.2f}] {mem.content}")
Configuration tips:
| Parameter | Recommended | Notes |
|---|---|---|
top_k | 5–10 | More results increase latency |
min_score | 0.70–0.80 | Lower = broader recall, higher = precision |
filters | Always include | Scope to agent or org to prevent cross-contamination |
Pattern 2: Conversation Memory
Maintain per-session context that auto-summarizes when the session grows beyond a token budget.
When to use: Chatbots and interactive agents where the full conversation history is too large to pass in every prompt.
from datetime import datetime
session_id = "session-abc123"
def on_user_message(user_input: str, agent_response: str):
client.memory.store(
agent_id="chat-agent",
content=f"User: {user_input}\nAssistant: {agent_response}",
metadata={
"session_id": session_id,
"turn_type": "dialog",
"timestamp": datetime.utcnow().isoformat(),
},
)
def get_conversation_context(query: str) -> list[str]:
results = client.memory.search(
agent_id="chat-agent",
query=query,
filters={"session_id": session_id},
top_k=8,
)
return [r.content for r in results]
def maybe_summarize(turn_count: int, summarize_with_llm):
"""Auto-summarize every 20 turns to keep context size manageable."""
if turn_count % 20 == 0:
history = client.memory.list(
agent_id="chat-agent",
filters={"session_id": session_id, "turn_type": "dialog"},
)
summary = summarize_with_llm([h.content for h in history])
client.memory.store(
agent_id="chat-agent",
content=summary,
metadata={
"session_id": session_id,
"turn_type": "summary",
"importance": 1.0,
},
)
Configuration tips:
| Concern | Recommendation |
|---|---|
| Summarization trigger | Every 20 turns or when estimated token count exceeds 4 000 |
| Summary importance | Set to 1.0 so summaries always outrank individual turns |
| Session scoping | Always filter by session_id to avoid cross-session leakage |
Pattern 3: Entity Memory
Maintain a per-entity (customer, project, product) fact store that agents can read and update independently.
When to use: CRM agents, project management agents, or any agent that accumulates knowledge about specific real-world entities over time.
def upsert_entity_fact(entity_id: str, fact: str, source: str):
"""Store or overwrite a fact about an entity."""
client.memory.store(
agent_id="crm-agent",
content=fact,
metadata={
"entity_id": entity_id,
"entity_type": "customer",
"source": source,
"updated_at": datetime.utcnow().isoformat(),
},
)
def get_entity_profile(entity_id: str) -> dict:
"""Retrieve all known facts about an entity."""
results = client.memory.search(
agent_id="crm-agent",
query=f"facts about entity {entity_id}",
filters={"entity_id": entity_id},
top_k=20,
min_score=0.0, # Retrieve all records, not just semantic matches
)
return {
"entity_id": entity_id,
"facts": [r.content for r in results],
}
# Usage
upsert_entity_fact("cust-001", "Prefers email over phone for communication.", "support-ticket-442")
upsert_entity_fact("cust-001", "On the Growth plan as of 2025-11-01.", "billing-event")
profile = get_entity_profile("cust-001")
Pattern 4: Temporal Decay
Weight memories by recency so recently-accessed facts rank higher than stale ones, even when semantic similarity scores are equal.
When to use: Agents operating in fast-changing domains — market data, support queues, operational alerts — where stale knowledge is worse than no knowledge.
import math
from datetime import datetime, timezone
DECAY_HALF_LIFE_DAYS = 30 # Importance halves every 30 days
def decayed_importance(base_importance: float, last_accessed_iso: str) -> float:
last = datetime.fromisoformat(last_accessed_iso).replace(tzinfo=timezone.utc)
days_old = (datetime.now(timezone.utc) - last).days
decay = math.exp(-0.693 * days_old / DECAY_HALF_LIFE_DAYS)
return round(base_importance * decay, 4)
# Store with initial importance and access timestamp
client.memory.store(
agent_id="market-agent",
content="NVDA Q3 EPS beat consensus by 12%.",
metadata={
"importance": 0.85,
"last_accessed": datetime.utcnow().isoformat(),
"domain": "earnings",
},
)
def get_relevant_memories(query: str, min_decayed_importance: float = 0.2):
results = client.memory.search(agent_id="market-agent", query=query, top_k=20)
scored = []
for r in results:
di = decayed_importance(r.metadata["importance"], r.metadata["last_accessed"])
if di >= min_decayed_importance:
scored.append((di * r.score, r))
scored.sort(key=lambda x: x[0], reverse=True)
return [r for _, r in scored[:5]]
| Half-life | Domain fit |
|---|---|
| 7 days | News, alerts, operational events |
| 30 days | Market data, project updates |
| 180 days | Customer profiles, product knowledge |
| No decay | Compliance records, audit trails |
Pattern 5: Memory Cleanup
Evict stale and low-importance memories on a schedule to control storage costs.
When to use: Long-running production agents accumulating memory over weeks or months.
# Option A: SDK cleanup helper
client.memory.cleanup(
agent_id="research-agent",
max_age_days=90,
min_importance=0.3,
)
# Option B: Direct SQL via HatiData query interface
CLEANUP_SQL = """
DELETE FROM _hatidata_memory
WHERE
agent_id = 'research-agent'
AND CAST(metadata->>'importance' AS FLOAT) < 0.3
AND CAST(metadata->>'last_accessed' AS TIMESTAMP)
< NOW() - INTERVAL '90 days';
"""
client.query(CLEANUP_SQL)
Run cleanup as a scheduled task (cron or background worker) rather than inline with agent requests to avoid latency spikes.
Pattern 6: Cross-Agent Memory Sharing
Allow multiple agents in the same org to read from a shared memory namespace without exposing private per-agent memories.
When to use: Agent pipelines where a writer agent populates knowledge and reader agents consume it.
SHARED_AGENT_ID = "shared-knowledge-base"
# Writer agent publishes to shared namespace
writer_client.memory.store(
agent_id=SHARED_AGENT_ID,
content="Product roadmap: HatiData v2 ships Q2 2026.",
metadata={"visibility": "org-wide", "owner": "product-team"},
)
# Any reader agent in the same org can query shared memories
results = reader_client.memory.search(
agent_id=SHARED_AGENT_ID,
query="When does v2 ship?",
top_k=5,
)
Configuration tips:
| Concern | Recommendation |
|---|---|
| Write access | Use ABAC policy to restrict writes to shared-* agent IDs |
| Namespace conventions | Prefix shared agent IDs with shared- to distinguish them |
| Cleanup thresholds | Use a higher minimum importance (e.g., 0.5) on shared stores |
Pattern 7: Memory Hydration from External Systems
Seed an agent's memory from external data sources — CSV exports, CRM systems, ticketing platforms, chat archives — so agents start with context rather than building knowledge from scratch.
When to use: Bootstrapping a new agent with historical data, migrating from another memory system, or continuously syncing external knowledge into HatiData.
CSV Import
import csv
from hatidata import HatiDataClient
client = HatiDataClient(api_key="hd_...")
def import_csv(file_path: str, agent_id: str, source_name: str, batch_size: int = 200):
"""Bulk import memories from a CSV file with columns: content, category, importance."""
batch = []
with open(file_path, "r") as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
batch.append({
"agent_id": agent_id,
"content": row["content"],
"metadata": {
"source": source_name,
"category": row.get("category", "imported"),
"importance": float(row.get("importance", 0.5)),
"imported_at": datetime.utcnow().isoformat(),
},
})
if len(batch) >= batch_size:
client.memory.store_batch(batch)
print(f"Imported {i + 1} records...")
batch = []
# Flush remaining
if batch:
client.memory.store_batch(batch)
print(f"Import complete: {i + 1} total records from {file_path}")
# Usage
import_csv("support_knowledge_base.csv", agent_id="support-agent", source_name="zendesk-export")
API Import (Zendesk, Slack, etc.)
import requests
def import_from_zendesk(
zendesk_domain: str,
zendesk_token: str,
agent_id: str,
batch_size: int = 100,
):
"""Import resolved Zendesk tickets as agent memories."""
url = f"https://{zendesk_domain}.zendesk.com/api/v2/tickets.json"
headers = {"Authorization": f"Bearer {zendesk_token}"}
params = {"status": "solved", "sort_by": "updated_at", "sort_order": "desc"}
batch = []
page = 1
while True:
resp = requests.get(url, headers=headers, params={**params, "page": page})
resp.raise_for_status()
data = resp.json()
for ticket in data["tickets"]:
content = f"Ticket #{ticket['id']}: {ticket['subject']}\n{ticket['description']}"
batch.append({
"agent_id": agent_id,
"content": content,
"metadata": {
"source": "zendesk",
"ticket_id": str(ticket["id"]),
"category": ticket.get("type", "general"),
"importance": 0.7 if ticket.get("priority") in ("high", "urgent") else 0.4,
"original_date": ticket["updated_at"],
},
})
if len(batch) >= batch_size:
client.memory.store_batch(batch)
batch = []
if not data.get("next_page"):
break
page += 1
if batch:
client.memory.store_batch(batch)
def import_from_slack(
slack_token: str,
channel_id: str,
agent_id: str,
batch_size: int = 100,
):
"""Import Slack messages from a channel as agent memories."""
url = "https://slack.com/api/conversations.history"
headers = {"Authorization": f"Bearer {slack_token}"}
batch = []
cursor = None
while True:
params = {"channel": channel_id, "limit": 200}
if cursor:
params["cursor"] = cursor
resp = requests.get(url, headers=headers, params=params)
data = resp.json()
for msg in data.get("messages", []):
if msg.get("subtype"): # Skip system messages
continue
batch.append({
"agent_id": agent_id,
"content": msg["text"],
"metadata": {
"source": "slack",
"channel": channel_id,
"user": msg.get("user", "unknown"),
"importance": 0.3,
"original_date": msg["ts"],
},
})
if len(batch) >= batch_size:
client.memory.store_batch(batch)
batch = []
cursor = data.get("response_metadata", {}).get("next_cursor")
if not cursor:
break
if batch:
client.memory.store_batch(batch)
Best Practices for Bulk Import
| Concern | Recommendation |
|---|---|
| Batch size | 100–500 records per store_batch call. Larger batches reduce HTTP overhead; smaller batches provide faster progress feedback. |
| Importance scoring | Assign importance based on source signal: high-priority tickets = 0.7–0.9, routine messages = 0.2–0.4, curated knowledge base articles = 0.8–1.0. |
| Deduplication | Include a stable source_id in metadata (e.g., ticket ID, message ID). Before importing, query existing memories to skip duplicates. |
| Rate limiting | Respect source API rate limits. Add a time.sleep() between pages if needed. HatiData itself handles concurrent writes gracefully. |
| Incremental sync | Store a high-water mark (last imported timestamp or ID) and use it on subsequent runs to import only new records. |
| Content quality | Filter out empty, trivial, or duplicate content before import. Short messages (< 20 characters) rarely add retrieval value. |
Import Source Reference
| Source | Import Method | Recommended Batch Size | Notes |
|---|---|---|---|
| CSV / TSV files | csv.DictReader + store_batch | 200–500 | Fastest method for one-time imports |
| Zendesk | REST API (/api/v2/tickets.json) | 100 | Filter by status=solved for resolved knowledge |
| Slack | Web API (conversations.history) | 100 | Skip system messages, thread replies need separate calls |
| Notion | REST API (/v1/databases/{id}/query) | 50 | Pages can be large; use smaller batches |
| Confluence | REST API (/wiki/rest/api/content) | 50 | Extract body from storage format, strip HTML tags |
| Google Docs | Drive API + Docs API | 20 | One doc per API call; batch the memory writes |
| Intercom | REST API (/conversations) | 100 | Similar to Zendesk; include conversation parts |
| Custom database | Direct SQL export to CSV, then CSV import | 200–500 | Export first, then use the CSV path |