Skip to main content

Memory Patterns

Practical patterns for building agents that remember — from RAG pipelines to entity stores to cross-agent shared memory.


Combine semantic similarity with structured SQL filters to retrieve only the most relevant memories for a given agent context.

When to use: Question-answering agents that need to ground responses in stored knowledge while also filtering by metadata such as owner, date, tag, or project.

import hatidata

client = hatidata.Client(api_key="hd_...")

# Store a memory with metadata
client.memory.store(
agent_id="research-agent",
content="Quarterly revenue grew 18% YoY driven by enterprise tier expansion.",
metadata={
"source": "q4-report",
"department": "finance",
"date": "2025-12-31",
"importance": 0.9,
},
)

# Hybrid search: semantic match + structured filter
results = client.memory.search(
agent_id="research-agent",
query="What drove revenue growth last quarter?",
filters={
"department": "finance",
"date": {"gte": "2025-10-01"},
},
top_k=5,
min_score=0.75,
)

for mem in results:
print(f"[{mem.score:.2f}] {mem.content}")

Configuration tips:

ParameterRecommendedNotes
top_k5–10More results increase latency
min_score0.70–0.80Lower = broader recall, higher = precision
filtersAlways includeScope to agent or org to prevent cross-contamination

Pattern 2: Conversation Memory

Maintain per-session context that auto-summarizes when the session grows beyond a token budget.

When to use: Chatbots and interactive agents where the full conversation history is too large to pass in every prompt.

from datetime import datetime

session_id = "session-abc123"

def on_user_message(user_input: str, agent_response: str):
client.memory.store(
agent_id="chat-agent",
content=f"User: {user_input}\nAssistant: {agent_response}",
metadata={
"session_id": session_id,
"turn_type": "dialog",
"timestamp": datetime.utcnow().isoformat(),
},
)

def get_conversation_context(query: str) -> list[str]:
results = client.memory.search(
agent_id="chat-agent",
query=query,
filters={"session_id": session_id},
top_k=8,
)
return [r.content for r in results]

def maybe_summarize(turn_count: int, summarize_with_llm):
"""Auto-summarize every 20 turns to keep context size manageable."""
if turn_count % 20 == 0:
history = client.memory.list(
agent_id="chat-agent",
filters={"session_id": session_id, "turn_type": "dialog"},
)
summary = summarize_with_llm([h.content for h in history])
client.memory.store(
agent_id="chat-agent",
content=summary,
metadata={
"session_id": session_id,
"turn_type": "summary",
"importance": 1.0,
},
)

Configuration tips:

ConcernRecommendation
Summarization triggerEvery 20 turns or when estimated token count exceeds 4 000
Summary importanceSet to 1.0 so summaries always outrank individual turns
Session scopingAlways filter by session_id to avoid cross-session leakage

Pattern 3: Entity Memory

Maintain a per-entity (customer, project, product) fact store that agents can read and update independently.

When to use: CRM agents, project management agents, or any agent that accumulates knowledge about specific real-world entities over time.

def upsert_entity_fact(entity_id: str, fact: str, source: str):
"""Store or overwrite a fact about an entity."""
client.memory.store(
agent_id="crm-agent",
content=fact,
metadata={
"entity_id": entity_id,
"entity_type": "customer",
"source": source,
"updated_at": datetime.utcnow().isoformat(),
},
)

def get_entity_profile(entity_id: str) -> dict:
"""Retrieve all known facts about an entity."""
results = client.memory.search(
agent_id="crm-agent",
query=f"facts about entity {entity_id}",
filters={"entity_id": entity_id},
top_k=20,
min_score=0.0, # Retrieve all records, not just semantic matches
)
return {
"entity_id": entity_id,
"facts": [r.content for r in results],
}

# Usage
upsert_entity_fact("cust-001", "Prefers email over phone for communication.", "support-ticket-442")
upsert_entity_fact("cust-001", "On the Growth plan as of 2025-11-01.", "billing-event")
profile = get_entity_profile("cust-001")

Pattern 4: Temporal Decay

Weight memories by recency so recently-accessed facts rank higher than stale ones, even when semantic similarity scores are equal.

When to use: Agents operating in fast-changing domains — market data, support queues, operational alerts — where stale knowledge is worse than no knowledge.

import math
from datetime import datetime, timezone

DECAY_HALF_LIFE_DAYS = 30 # Importance halves every 30 days

def decayed_importance(base_importance: float, last_accessed_iso: str) -> float:
last = datetime.fromisoformat(last_accessed_iso).replace(tzinfo=timezone.utc)
days_old = (datetime.now(timezone.utc) - last).days
decay = math.exp(-0.693 * days_old / DECAY_HALF_LIFE_DAYS)
return round(base_importance * decay, 4)

# Store with initial importance and access timestamp
client.memory.store(
agent_id="market-agent",
content="NVDA Q3 EPS beat consensus by 12%.",
metadata={
"importance": 0.85,
"last_accessed": datetime.utcnow().isoformat(),
"domain": "earnings",
},
)

def get_relevant_memories(query: str, min_decayed_importance: float = 0.2):
results = client.memory.search(agent_id="market-agent", query=query, top_k=20)
scored = []
for r in results:
di = decayed_importance(r.metadata["importance"], r.metadata["last_accessed"])
if di >= min_decayed_importance:
scored.append((di * r.score, r))
scored.sort(key=lambda x: x[0], reverse=True)
return [r for _, r in scored[:5]]
Half-lifeDomain fit
7 daysNews, alerts, operational events
30 daysMarket data, project updates
180 daysCustomer profiles, product knowledge
No decayCompliance records, audit trails

Pattern 5: Memory Cleanup

Evict stale and low-importance memories on a schedule to control storage costs.

When to use: Long-running production agents accumulating memory over weeks or months.

# Option A: SDK cleanup helper
client.memory.cleanup(
agent_id="research-agent",
max_age_days=90,
min_importance=0.3,
)

# Option B: Direct SQL via HatiData query interface
CLEANUP_SQL = """
DELETE FROM _hatidata_memory
WHERE
agent_id = 'research-agent'
AND CAST(metadata->>'importance' AS FLOAT) < 0.3
AND CAST(metadata->>'last_accessed' AS TIMESTAMP)
< NOW() - INTERVAL '90 days';
"""
client.query(CLEANUP_SQL)

Run cleanup as a scheduled task (cron or background worker) rather than inline with agent requests to avoid latency spikes.


Pattern 6: Cross-Agent Memory Sharing

Allow multiple agents in the same org to read from a shared memory namespace without exposing private per-agent memories.

When to use: Agent pipelines where a writer agent populates knowledge and reader agents consume it.

SHARED_AGENT_ID = "shared-knowledge-base"

# Writer agent publishes to shared namespace
writer_client.memory.store(
agent_id=SHARED_AGENT_ID,
content="Product roadmap: HatiData v2 ships Q2 2026.",
metadata={"visibility": "org-wide", "owner": "product-team"},
)

# Any reader agent in the same org can query shared memories
results = reader_client.memory.search(
agent_id=SHARED_AGENT_ID,
query="When does v2 ship?",
top_k=5,
)

Configuration tips:

ConcernRecommendation
Write accessUse ABAC policy to restrict writes to shared-* agent IDs
Namespace conventionsPrefix shared agent IDs with shared- to distinguish them
Cleanup thresholdsUse a higher minimum importance (e.g., 0.5) on shared stores

Pattern 7: Memory Hydration from External Systems

Seed an agent's memory from external data sources — CSV exports, CRM systems, ticketing platforms, chat archives — so agents start with context rather than building knowledge from scratch.

When to use: Bootstrapping a new agent with historical data, migrating from another memory system, or continuously syncing external knowledge into HatiData.

CSV Import

import csv
from hatidata import HatiDataClient

client = HatiDataClient(api_key="hd_...")

def import_csv(file_path: str, agent_id: str, source_name: str, batch_size: int = 200):
"""Bulk import memories from a CSV file with columns: content, category, importance."""
batch = []

with open(file_path, "r") as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
batch.append({
"agent_id": agent_id,
"content": row["content"],
"metadata": {
"source": source_name,
"category": row.get("category", "imported"),
"importance": float(row.get("importance", 0.5)),
"imported_at": datetime.utcnow().isoformat(),
},
})

if len(batch) >= batch_size:
client.memory.store_batch(batch)
print(f"Imported {i + 1} records...")
batch = []

# Flush remaining
if batch:
client.memory.store_batch(batch)
print(f"Import complete: {i + 1} total records from {file_path}")

# Usage
import_csv("support_knowledge_base.csv", agent_id="support-agent", source_name="zendesk-export")

API Import (Zendesk, Slack, etc.)

import requests

def import_from_zendesk(
zendesk_domain: str,
zendesk_token: str,
agent_id: str,
batch_size: int = 100,
):
"""Import resolved Zendesk tickets as agent memories."""
url = f"https://{zendesk_domain}.zendesk.com/api/v2/tickets.json"
headers = {"Authorization": f"Bearer {zendesk_token}"}
params = {"status": "solved", "sort_by": "updated_at", "sort_order": "desc"}

batch = []
page = 1

while True:
resp = requests.get(url, headers=headers, params={**params, "page": page})
resp.raise_for_status()
data = resp.json()

for ticket in data["tickets"]:
content = f"Ticket #{ticket['id']}: {ticket['subject']}\n{ticket['description']}"
batch.append({
"agent_id": agent_id,
"content": content,
"metadata": {
"source": "zendesk",
"ticket_id": str(ticket["id"]),
"category": ticket.get("type", "general"),
"importance": 0.7 if ticket.get("priority") in ("high", "urgent") else 0.4,
"original_date": ticket["updated_at"],
},
})

if len(batch) >= batch_size:
client.memory.store_batch(batch)
batch = []

if not data.get("next_page"):
break
page += 1

if batch:
client.memory.store_batch(batch)

def import_from_slack(
slack_token: str,
channel_id: str,
agent_id: str,
batch_size: int = 100,
):
"""Import Slack messages from a channel as agent memories."""
url = "https://slack.com/api/conversations.history"
headers = {"Authorization": f"Bearer {slack_token}"}
batch = []
cursor = None

while True:
params = {"channel": channel_id, "limit": 200}
if cursor:
params["cursor"] = cursor

resp = requests.get(url, headers=headers, params=params)
data = resp.json()

for msg in data.get("messages", []):
if msg.get("subtype"): # Skip system messages
continue
batch.append({
"agent_id": agent_id,
"content": msg["text"],
"metadata": {
"source": "slack",
"channel": channel_id,
"user": msg.get("user", "unknown"),
"importance": 0.3,
"original_date": msg["ts"],
},
})

if len(batch) >= batch_size:
client.memory.store_batch(batch)
batch = []

cursor = data.get("response_metadata", {}).get("next_cursor")
if not cursor:
break

if batch:
client.memory.store_batch(batch)

Best Practices for Bulk Import

ConcernRecommendation
Batch size100–500 records per store_batch call. Larger batches reduce HTTP overhead; smaller batches provide faster progress feedback.
Importance scoringAssign importance based on source signal: high-priority tickets = 0.7–0.9, routine messages = 0.2–0.4, curated knowledge base articles = 0.8–1.0.
DeduplicationInclude a stable source_id in metadata (e.g., ticket ID, message ID). Before importing, query existing memories to skip duplicates.
Rate limitingRespect source API rate limits. Add a time.sleep() between pages if needed. HatiData itself handles concurrent writes gracefully.
Incremental syncStore a high-water mark (last imported timestamp or ID) and use it on subsequent runs to import only new records.
Content qualityFilter out empty, trivial, or duplicate content before import. Short messages (< 20 characters) rarely add retrieval value.

Import Source Reference

SourceImport MethodRecommended Batch SizeNotes
CSV / TSV filescsv.DictReader + store_batch200–500Fastest method for one-time imports
ZendeskREST API (/api/v2/tickets.json)100Filter by status=solved for resolved knowledge
SlackWeb API (conversations.history)100Skip system messages, thread replies need separate calls
NotionREST API (/v1/databases/{id}/query)50Pages can be large; use smaller batches
ConfluenceREST API (/wiki/rest/api/content)50Extract body from storage format, strip HTML tags
Google DocsDrive API + Docs API20One doc per API call; batch the memory writes
IntercomREST API (/conversations)100Similar to Zendesk; include conversation parts
Custom databaseDirect SQL export to CSV, then CSV import200–500Export first, then use the CSV path

Stay in the loop

Product updates, engineering deep-dives, and agent-native insights. No spam.