Skip to main content

Scenario 4: Async Operations

This scenario demonstrates how to use powermem's async operations for high-performance, concurrent memory operations.

Prerequisites

  • Completed Scenario 1
  • Understanding of async/await in Python
  • powermem installed

Understanding Async Operations

Async operations enable:

  • Non-blocking memory operations
  • Concurrent processing
  • High-throughput scenarios
  • Better performance for I/O-bound operations

Step 1: Initialize Async Memory

First, let's create and initialize an AsyncMemory instance:

# async_operations_example.py
import asyncio
from powermem import AsyncMemory, auto_config

async def main():
config = auto_config()

# Create async memory instance
async_memory = AsyncMemory(config=config)

# Initialize (required for async)
await async_memory.initialize()

print("✓ AsyncMemory initialized successfully!")

# Run async function
asyncio.run(main())

Run this code:

python async_operations_example.py

Expected output:

✓ AsyncMemory initialized successfully!

Step 2: Add Memories Asynchronously

Add memories using async methods:

# async_operations_example.py
import asyncio
from powermem import AsyncMemory, auto_config

async def main():
config = auto_config()
async_memory = AsyncMemory(config=config)
await async_memory.initialize()

# Add memory asynchronously
result = await async_memory.add(
"User likes Python programming", # messages parameter (first positional argument)
user_id="user123"
)

# Handle result - check if results list is not empty
results_list = result.get('results', [])
if results_list:
memory_id = results_list[0].get('id', 'N/A')
print(f"✓ Memory added! ID: {memory_id}")
else:
print("✓ Memory operation completed (may have been deduplicated)")

asyncio.run(main())

Run this code:

python async_operations_example.py

Expected output:

✓ Memory added! ID: mem_xxx

Step 3: Concurrent Memory Operations

Add multiple memories concurrently:

# async_operations_example.py
import asyncio
from powermem import AsyncMemory, auto_config

async def main():
config = auto_config()
async_memory = AsyncMemory(config=config)
await async_memory.initialize()

# Add multiple memories concurrently
memories = [
"User likes Python programming",
"User prefers email support",
"User works as a software engineer",
"User favorite color is blue"
]

# Create tasks for concurrent execution
tasks = [
async_memory.add(mem, user_id="user123") # messages as first positional argument
for mem in memories
]

# Execute all tasks concurrently
results = await asyncio.gather(*tasks)

print(f"✓ Added {len(results)} memories concurrently")

asyncio.run(main())

Run this code:

python async_operations_example.py

Expected output:

✓ Added 4 memories concurrently

Search memories asynchronously:

# async_operations_example.py
import asyncio
from powermem import AsyncMemory, auto_config

async def main():
config = auto_config()
async_memory = AsyncMemory(config=config)
await async_memory.initialize()

# Add some memories first
await async_memory.add("User likes Python", user_id="user123")
await async_memory.add("User prefers email", user_id="user123")

# Search asynchronously
results = await async_memory.search(
query="user preferences",
user_id="user123",
limit=5
)

print(f"Found {len(results.get('results', []))} memories:")
for result in results.get('results', []):
print(f" - {result['memory']}")

asyncio.run(main())

Run this code:

python async_operations_example.py

Expected output:

Found 2 memories:
- User prefers email
- User likes Python

Step 5: Batch Processing

Process memories in batches:

# async_operations_example.py
import asyncio
from powermem import AsyncMemory, auto_config

async def main():
config = auto_config()
async_memory = AsyncMemory(config=config)
await async_memory.initialize()

# Large list of memories
memories = [
f"Memory {i}: User preference {i}"
for i in range(100)
]

# Process in batches
batch_size = 10
for i in range(0, len(memories), batch_size):
batch = memories[i:i+batch_size]

# Create tasks for batch
tasks = [
async_memory.add(mem, user_id="user123") # messages as first positional argument
for mem in batch
]

# Execute batch concurrently
await asyncio.gather(*tasks)

print(f"✓ Processed batch {i//batch_size + 1}")

asyncio.run(main())

Run this code:

python async_operations_example.py

Expected output:

✓ Processed batch 1
✓ Processed batch 2
✓ Processed batch 3
...

Step 6: Async with Intelligent Memory

Use async operations with intelligent memory features:

# async_operations_example.py
import asyncio
from powermem import AsyncMemory, auto_config

async def main():
config = auto_config()
async_memory = AsyncMemory(config=config)
await async_memory.initialize()

# Add memory with intelligent processing
result = await async_memory.add(
messages=[
{"role": "user", "content": "I'm Alice, a software engineer"},
{"role": "assistant", "content": "Nice to meet you!"}
],
user_id="user123",
infer=True # Enable intelligent fact extraction
)

print(f"✓ Processed conversation, extracted {len(result.get('results', []))} memories:")
for mem in result.get('results', []):
print(f" - {mem.get('memory', '')}")

asyncio.run(main())

Run this code:

python async_operations_example.py

Expected output:

✓ Processed conversation, extracted 1 memories:
- Is Alice, a software engineer

Step 7: Async Update and Delete

Update and delete memories asynchronously:

# async_operations_example.py
import asyncio
from powermem import AsyncMemory, auto_config

async def main():
config = auto_config()
async_memory = AsyncMemory(config=config)
await async_memory.initialize()

# Add memory (using infer=False to ensure it's added for demo purposes)
# In production, you might want infer=True for intelligent deduplication
result = await async_memory.add(
"User likes Python", # messages as first positional argument
user_id="user123",
infer=False # Disable intelligent deduplication for demo
)

# Handle result - check if results list is not empty
results_list = result.get('results', [])
if not results_list:
print("Error: No memory was added")
raise ValueError("Cannot update/delete: memory was not added")

memory_id = results_list[0].get('id')
if not memory_id:
print("Error: Memory ID not found in result")
raise ValueError("Cannot update/delete: memory ID not found")

# Update memory
updated = await async_memory.update(
memory_id=memory_id,
content="User loves Python programming"
)
print(f"✓ Updated memory: {updated.get('memory', 'N/A')}")

# Delete memory
success = await async_memory.delete(memory_id)
if success:
print(f"✓ Deleted memory {memory_id}")

asyncio.run(main())

Run this code:

python async_operations_example.py

Expected output:

✓ Updated memory: User loves Python programming
✓ Deleted memory mem_xxx

Step 8: FastAPI Integration Example

Use async memory in FastAPI:

# fastapi_example.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from powermem import AsyncMemory, auto_config

config = auto_config()
async_memory = None

@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: initialize async memory
global async_memory
async_memory = AsyncMemory(config=config)
await async_memory.initialize()
yield
# Shutdown: cleanup (if needed)
# async_memory cleanup can be added here if needed

app = FastAPI(lifespan=lifespan)

class MemoryRequest(BaseModel):
memory: str
user_id: str

@app.post("/memories")
async def add_memory(request: MemoryRequest):
try:
result = await async_memory.add(
request.memory, # messages as first positional argument
user_id=request.user_id
)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

@app.post("/memories/search")
async def search_memories(query: str, user_id: str):
try:
results = await async_memory.search(
query=query,
user_id=user_id
)
return results
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

Complete Example

Here's a complete async example:

# complete_async_example.py
import asyncio
from powermem import AsyncMemory, auto_config

async def main():
config = auto_config()
async_memory = AsyncMemory(config=config)
await async_memory.initialize()

print("=" * 80)
print("Async Memory Operations Demo")
print("=" * 80)

# Step 1: Add memories concurrently
print("\n[Step 1] Adding Memories Concurrently")
print("-" * 60)

memories = [
"User likes Python programming",
"User prefers email support",
"User works as a software engineer"
]

tasks = [
async_memory.add(mem, user_id="user123") # messages as first positional argument
for mem in memories
]

results = await asyncio.gather(*tasks)
print(f"✓ Added {len(results)} memories concurrently")

# Step 2: Search asynchronously
print("\n[Step 2] Searching Memories")
print("-" * 60)

search_results = await async_memory.search(
query="user preferences",
user_id="user123"
)

print(f"Found {len(search_results.get('results', []))} memories:")
for result in search_results.get('results', []):
print(f" - {result['memory']}")

# Step 3: Batch processing
print("\n[Step 3] Batch Processing")
print("-" * 60)

batch_memories = [f"Memory {i}" for i in range(20)]
batch_size = 5

for i in range(0, len(batch_memories), batch_size):
batch = batch_memories[i:i+batch_size]
tasks = [
async_memory.add(mem, user_id="user123") # messages as first positional argument
for mem in batch
]
await asyncio.gather(*tasks)
print(f"✓ Processed batch {i//batch_size + 1}")

print("\n" + "=" * 80)
print("Demo completed successfully!")
print("=" * 80)

if __name__ == "__main__":
asyncio.run(main())

Run this code:

python complete_async_example.py

Extension Exercises

Exercise 1: Concurrent Searches

Perform multiple searches concurrently:

async def concurrent_searches():
async_memory = AsyncMemory(config=config)
await async_memory.initialize()

queries = [
"user preferences",
"user interests",
"user information"
]

tasks = [
async_memory.search(query=q, user_id="user123")
for q in queries
]

results = await asyncio.gather(*tasks)
return results

Exercise 2: Async with Error Handling

Add error handling to async operations:

async def safe_add(memory, user_id):
try:
result = await async_memory.add(memory, user_id=user_id) # messages as first positional argument
return result
except Exception as e:
print(f"Error adding memory: {e}")
return None

Exercise 3: Rate Limiting

Implement rate limiting for async operations:

import asyncio

async def rate_limited_add(memories, user_id, rate=5):
semaphore = asyncio.Semaphore(rate)

async def add_with_limit(memory):
async with semaphore:
return await async_memory.add(memory, user_id=user_id) # messages as first positional argument

tasks = [add_with_limit(mem) for mem in memories]
return await asyncio.gather(*tasks)

When to Use Async

Use AsyncMemory when:

  • Processing many memories concurrently
  • Building async web applications (FastAPI, aiohttp)
  • Implementing batch processing pipelines
  • Need non-blocking operations

Use Memory when:

  • Simple synchronous scripts
  • Interactive notebooks
  • Simple use cases without concurrency