getting_started_async
Prerequisites
- Python 3.10+
- powermem installed (
pip install powermem) - Basic understanding of Python async/await syntax
When to Use Async Memory?
The async version of powermem (AsyncMemory) is ideal when you need:
- High-throughput applications: Processing many memory operations simultaneously
- Concurrent operations: Adding, searching, or updating multiple memories at once
- I/O-bound workloads: When operations involve network calls (API requests, database queries)
- Non-blocking operations: Keeping your application responsive while processing memories
Async vs Sync: Which Should You Use?
- Use
AsyncMemory(async) for:- High-concurrency scenarios
- Batch processing large datasets
- When you need maximum throughput
Note: If you're new to powermem, start with the synchronous guide to learn the basics, then come back here for async operations.
Configuration
Before using async memory, you need to configure powermem. The configuration is the same for both sync and async versions. Powermem can automatically load configuration from a .env file in your project directory.
Creating a .env File
-
Copy the example configuration file:
cp .env.example .env -
Edit the
.envfile and configure your settings:- LLM Provider: Choose your language model provider
- Embedding Provider: Select how text will be converted to vectors
- Vector Store: Choose your database (SQLite for development, OceanBase for production)
Note: The
auto_config()function automatically:
- Looks for a
.envfile in the current directory- Loads configuration from environment variables
- Uses sensible defaults if no configuration is found
For more configuration options, see the full example in .env.example or refer to the Configuration Guide.
Understanding Async Operations
Async operations in powermem provide several key advantages:
Key Benefits
- Non-blocking: Operations don't block the event loop, allowing other tasks to run
- Concurrent processing: Execute multiple memory operations simultaneously
- High throughput: Process many operations in parallel, significantly improving performance
- Better I/O performance: Ideal for operations involving network calls or database queries
- Scalability: Handle more concurrent requests without creating multiple threads
How Async Works in Powermem
- Async methods: All memory operations (
add,search,update,delete) are async - Event loop: Operations run on Python's asyncio event loop
- Concurrent execution: Use
asyncio.gather()to run multiple operations concurrently - Resource management: Properly initialize and cleanup async resources
Performance Considerations
- I/O-bound operations: Async shines when waiting for network or database responses
- CPU-bound operations: For CPU-intensive tasks, consider using
asyncio.to_thread() - Batch size: When processing many items, use batching to avoid overwhelming the system
- Connection pooling: Async operations can efficiently share database connections
Initialize Async Memory
The first step in using async memory is to create and initialize an AsyncMemory instance. Unlike the synchronous version, async memory requires explicit initialization.
Understanding AsyncMemory Initialization
The AsyncMemory class:
- Requires explicit initialization with
await async_memory.initialize() - Sets up async connections to vector stores and embedding services
- Must be initialized before any operations can be performed
- Should be properly closed when done (though Python's garbage collector handles this)
Important: Always Initialize
Unlike the sync Memory class, AsyncMemory requires you to call initialize() before use. This is because:
- Async connections need to be established
- Some vector stores require async connection pools
- Embedding services may need async HTTP clients
Let's create and initialize an AsyncMemory instance:
import asyncio
from powermem import AsyncMemory, auto_config
async def main():
# Load configuration from .env file
config = auto_config()
# Create async memory instance
async_memory = AsyncMemory(config=config)
# Initialize (required for async)
await async_memory.initialize()
print("✓ AsyncMemory initialized successfully!")
# Your async operations go here
# Don't forget to close when done (optional, but recommended)
# await async_memory.close()
# Run async function
asyncio.run(main())
Using JSON/Dictionary Configuration
Instead of using .env files, you can also pass configuration directly as a Python dictionary (JSON-like format). This is useful when:
- You want to load configuration from a JSON file
- You need to programmatically generate configuration
- You're embedding configuration in your application code
Here's an example using a dictionary configuration:
import asyncio
from powermem import AsyncMemory
async def main():
# Define configuration as a dictionary (JSON-like format)
config = {
'llm': {
'provider': 'qwen',
'config': {
'api_key': 'your_api_key_here',
'model': 'qwen-plus',
'temperature': 0.7
}
},
'embedder': {
'provider': 'qwen',
'config': {
'api_key': 'your_api_key_here',
'model': 'text-embedding-v4'
}
},
'vector_store': {
'provider': 'oceanbase',
'config': {
'collection_name': 'memories',
'connection_args': {
'host': '127.0.0.1',
'port': 2881,
'user': 'root@sys',
'password': 'password',
'db_name': 'powermem'
},
'embedding_model_dims': 1536,
'index_type': 'IVF_FLAT',
'vidx_metric_type': 'cosine'
}
}
}
# Create async memory instance with dictionary config
async_memory = AsyncMemory(config=config)
# Initialize (required for async)
await async_memory.initialize()
print("✓ AsyncMemory initialized with JSON config!")
# Your async operations go here
# await async_memory.close()
# Run async function
asyncio.run(main())
You can also load configuration from a JSON file:
import asyncio
import json
from powermem import AsyncMemory
async def main():
# Load configuration from JSON file
with open('config.json', 'r') as f:
config = json.load(f)
# Create async memory instance
async_memory = AsyncMemory(config=config)
# Initialize (required for async)
await async_memory.initialize()
print("✓ AsyncMemory initialized from JSON file!")
# Your async operations go here
# await async_memory.close()
# Run async function
asyncio.run(main())
Note: When using dictionary/JSON configuration, make sure to include all required fields (
llm,embedder,vector_store) with their respectiveproviderandconfigsections. For more configuration options, see the Configuration Guide.
Tip: In production applications, consider using async context managers or ensuring proper cleanup. The
initialize()method sets up connections that should ideally be closed when your application shuts down.
Add Memories Asynchronously
Adding memories asynchronously is similar to the synchronous version, but uses await to handle the async operation. This allows your application to continue processing other tasks while the memory is being stored.
Understanding Async Add Operations
When you call await async_memory.add():
- The operation is scheduled on the event loop
- Your code can yield control to other tasks
- The memory is processed (embedded, stored) asynchronously
- The result is returned when complete
Key Differences from Sync Version
- Use
await: All operations must be awaited - Inside async function: Must be called from an async function
- Non-blocking: Other async tasks can run while waiting
- Same parameters: Takes the same parameters as sync version
Handling Results
The result format is identical to the sync version:
- Returns a dictionary with a
resultslist - Each result contains memory ID, content, and metadata
- Empty results list may indicate deduplication occurred
Let's add a memory asynchronously:
import asyncio
from powermem import AsyncMemory, auto_config
async def main():
config = auto_config()
async_memory = AsyncMemory(config=config)
await async_memory.initialize()
# Add memory asynchronously
result = await async_memory.add(
"User likes Python programming", # messages parameter (first positional argument)
user_id="user123"
)
# Handle result - check if results list is not empty
results_list = result.get('results', [])
if results_list:
memory_id = results_list[0].get('id', 'N/A')
print(f"✓ Memory added! ID: {memory_id}")
else:
print("✓ Memory operation completed (may have been deduplicated)")
asyncio.run(main())
Note: The async
add()method works exactly like the sync version, but doesn't block your event loop. This means other async operations can run concurrently while the memory is being processed.
Concurrent Memory Operations
One of the biggest advantages of async operations is the ability to process multiple memories concurrently. Instead of waiting for each operation to complete sequentially, you can run them all at the same time.
Why Concurrent Operations Matter
- Speed: Process multiple operations simultaneously instead of one-by-one
- Efficiency: Better utilization of I/O resources (network, database)
- Throughput: Handle more operations per second
- Scalability: Essential for high-performance applications
How Concurrent Execution Works
- Create tasks: Build a list of coroutine objects (don't await them yet)
- Gather tasks: Use
asyncio.gather()to run all tasks concurrently - Wait for completion: All tasks execute in parallel
- Collect results: Get results from all operations
Understanding asyncio.gather()
- Concurrent execution: All tasks run at the same time
- Result order: Results are returned in the same order as input tasks
- Error handling: If any task fails, you can choose to cancel others or continue
- Resource limits: Be mindful of connection limits (database, API rate limits)
Best Practices
- Batch size: Don't create too many concurrent tasks at once
- Rate limiting: Respect API rate limits when using external services
- Error handling: Wrap in try-except to handle individual failures
- Resource management: Monitor connection pool usage
Let's add multiple memories concurrently:
import asyncio
from powermem import AsyncMemory, auto_config
async def main():
config = auto_config()
async_memory = AsyncMemory(config=config)
await async_memory.initialize()
# Add multiple memories concurrently
memories = [
"User likes Python programming",
"User prefers email support",
"User works as a software engineer",
"User favorite color is blue"
]
# Create tasks for concurrent execution
# Note: We're creating coroutine objects, not awaiting them yet
tasks = [
async_memory.add(mem, user_id="user123") # messages as first positional argument
for mem in memories
]
# Execute all tasks concurrently
# This runs all add operations in parallel
results = await asyncio.gather(*tasks)
print(f"✓ Added {len(results)} memories concurrently")
# Process individual results if needed
for i, result in enumerate(results):
results_list = result.get('results', [])
if results_list:
print(f" Memory {i+1}: {results_list[0].get('memory', 'N/A')}")
asyncio.run(main())
Tip: The
*taskssyntax unpacks the list of tasks. This allowsasyncio.gather()to receive multiple coroutine objects as separate arguments. All operations will execute concurrently, significantly faster than sequential execution.
Async Search
Searching memories asynchronously allows you to perform semantic searches without blocking your application. This is especially useful in web applications where you need to handle multiple search requests concurrently.
Understanding Async Search
Async search works the same way as sync search, but:
- Non-blocking: Doesn't block the event loop while searching
- Concurrent queries: Can handle multiple search requests simultaneously
- Same semantics: Uses the same semantic search algorithm
- Better performance: In high-concurrency scenarios
Search Performance with Async
- Multiple users: Handle search requests from multiple users concurrently
- Parallel searches: Run multiple searches at the same time
- Resource sharing: Efficiently share database connections
- Response time: Better response times under load
Use Cases for Async Search
- Web APIs: Handle multiple search requests in parallel
- Real-time applications: Search while maintaining responsiveness
- Batch queries: Search across multiple users concurrently
- Analytics: Run multiple analytical queries in parallel
Let's search memories asynchronously:
import asyncio
from powermem import AsyncMemory, auto_config
async def main():
config = auto_config()
async_memory = AsyncMemory(config=config)
await async_memory.initialize()
# Add some memories first
await async_memory.add("User likes Python", user_id="user123")
await async_memory.add("User prefers email", user_id="user123")
# Search asynchronously
results = await async_memory.search(
query="user preferences",
user_id="user123",
limit=5
)
print(f"Found {len(results.get('results', []))} memories:")
for result in results.get('results', []):
print(f" - {result['memory']}")
asyncio.run(main())
Note: You can also run multiple searches concurrently using
asyncio.gather(), just like withadd()operations. This is useful when you need to search for different users or different queries simultaneously.
Batch Processing
When dealing with large datasets, it's important to process memories in batches rather than all at once. This prevents overwhelming your system and helps manage resources efficiently.
Why Batch Processing?
- Resource management: Avoids exhausting database connections or API rate limits
- Memory efficiency: Processes data in manageable chunks
- Progress tracking: Easier to track progress and handle errors
- Rate limiting: Respects API rate limits when using external services
- Error recovery: If one batch fails, others can still succeed
Batch Processing Strategy
- Divide into batches: Split your data into smaller chunks
- Process concurrently: Within each batch, process items concurrently
- Wait for completion: Wait for each batch to complete before starting the next
- Handle errors: Implement error handling for failed batches
Choosing Batch Size
The optimal batch size depends on:
- API rate limits: How many requests per second your API allows
- Database connections: Connection pool size
- Memory constraints: Available system memory
- Network bandwidth: Your network capacity
Common batch sizes:
- Small batches (5-10): For rate-limited APIs
- Medium batches (10-50): For most use cases
- Large batches (50-100): For high-capacity systems
Best Practices
- Monitor performance: Adjust batch size based on actual performance
- Handle failures: Implement retry logic for failed batches
- Progress reporting: Log progress to track long-running operations
- Resource cleanup: Ensure proper cleanup between batches
Let's process memories in batches:
import asyncio
from powermem import AsyncMemory, auto_config
async def main():
config = auto_config()
async_memory = AsyncMemory(config=config)
await async_memory.initialize()
# Large list of memories
memories = [
f"Memory {i}: User preference {i}"
for i in range(100)
]
# Process in batches
batch_size = 10
total_batches = (len(memories) + batch_size - 1) // batch_size
for i in range(0, len(memories), batch_size):
batch = memories[i:i+batch_size]
batch_num = i // batch_size + 1
# Create tasks for batch
tasks = [
async_memory.add(mem, user_id="user123") # messages as first positional argument
for mem in batch
]
# Execute batch concurrently
try:
results = await asyncio.gather(*tasks)
print(f"✓ Processed batch {batch_num}/{total_batches} ({len(results)} memories)")
except Exception as e:
print(f"✗ Error processing batch {batch_num}: {e}")
asyncio.run(main())
Tip: For production applications, consider adding:
- Progress bars or logging
- Retry logic for failed operations
- Rate limiting to respect API limits
- Checkpointing to resume from failures
Async with Intelligent Memory
Intelligent memory features work seamlessly with async operations. This allows you to leverage powermem's intelligent fact extraction and deduplication while maintaining high performance through async operations.
What is Intelligent Memory?
Intelligent memory (infer=True) enables:
- Automatic fact extraction: Extracts facts from conversations automatically
- Smart deduplication: Prevents storing duplicate or very similar memories
- Context understanding: Better understands context from conversation history
- Multi-turn conversations: Handles conversation threads effectively
Async + Intelligent Memory Benefits
- Non-blocking processing: LLM calls for fact extraction don't block your event loop
- Concurrent processing: Process multiple conversations simultaneously
- Better throughput: Handle more intelligent memory operations per second
- Scalability: Scale intelligent memory processing efficiently
When to Use Intelligent Memory
- Conversation processing: When processing chat logs or conversations
- Automatic extraction: When you want facts extracted automatically
- Deduplication: When you need to prevent duplicate memories
- Complex contexts: When memories depend on conversation context
Performance Considerations
- LLM calls: Intelligent memory requires LLM API calls, which are I/O-bound (perfect for async)
- Processing time: Takes longer than simple add operations
- Cost: Each intelligent operation uses LLM tokens
- Concurrent limits: Be mindful of LLM API rate limits
Let's use async operations with intelligent memory features:
import asyncio
from powermem import AsyncMemory, auto_config
async def main():
config = auto_config()
async_memory = AsyncMemory(config=config)
await async_memory.initialize()
# Add memory with intelligent processing
# This will extract facts from the conversation automatically
result = await async_memory.add(
messages=[
{"role": "user", "content": "I'm Alice, a software engineer"},
{"role": "assistant", "content": "Nice to meet you!"}
],
user_id="user123",
infer=True # Enable intelligent fact extraction
)
print(f"✓ Processed conversation, extracted {len(result.get('results', []))} memories:")
for mem in result.get('results', []):
print(f" - {mem.get('memory', '')}")
asyncio.run(main())
Note: Intelligent memory operations are perfect for async because they involve network calls to LLM APIs. Async allows these operations to run concurrently, significantly improving throughput when processing multiple conversations.
Async Update and Delete
Updating and deleting memories asynchronously follows the same patterns as other async operations. These operations are particularly useful in web applications where you need to handle multiple update/delete requests concurrently.
Understanding Async Update
The update() method:
- Requires memory ID: You need the ID from when the memory was created
- Regenerates embedding: Creates a new vector embedding for the updated content
- Non-blocking: Doesn't block while updating
- Same parameters: Takes the same parameters as sync version
Understanding Async Delete
The delete() method:
- Requires memory ID: You need the specific memory ID to delete
- Permanent: Deletion is permanent (same as sync version)
- Returns boolean: Returns
Trueon success,Falseon failure - Non-blocking: Executes asynchronously
Use Cases for Async Update/Delete
- Web APIs: Handle multiple update/delete requests concurrently
- Batch operations: Update or delete multiple memories in parallel
- Real-time applications: Update memories without blocking the application
- Data management: Efficiently manage large numbers of memories
Best Practices
- Error handling: Always check return values and handle errors
- Concurrent operations: You can update/delete multiple memories concurrently
- Resource cleanup: Ensure proper cleanup in long-running applications
- Validation: Validate memory IDs before operations
Let's update and delete memories asynchronously:
import asyncio
from powermem import AsyncMemory, auto_config
async def main():
config = auto_config()
async_memory = AsyncMemory(config=config)
await async_memory.initialize()
# Add memory (using infer=False to ensure it's added for demo purposes)
# In production, you might want infer=True for intelligent deduplication
result = await async_memory.add(
"User likes Python", # messages as first positional argument
user_id="user123",
infer=False # Disable intelligent deduplication for demo
)
# Handle result - check if results list is not empty
results_list = result.get('results', [])
if not results_list:
print("Error: No memory was added")
raise ValueError("Cannot update/delete: memory was not added")
memory_id = results_list[0].get('id')
if not memory_id:
print("Error: Memory ID not found in result")
raise ValueError("Cannot update/delete: memory ID not found")
# Update memory asynchronously
updated = await async_memory.update(
memory_id=memory_id,
content="User loves Python programming"
)
print(f"✓ Updated memory: {updated.get('memory', 'N/A')}")
# Delete memory asynchronously
success = await async_memory.delete(memory_id)
if success:
print(f"✓ Deleted memory {memory_id}")
else:
print(f"✗ Failed to delete memory {memory_id}")
asyncio.run(main())
Concurrent Update/Delete Operations
You can also perform multiple update or delete operations concurrently:
# Example: Delete multiple memories concurrently
memory_ids = ["id1", "id2", "id3"]
tasks = [async_memory.delete(mid) for mid in memory_ids]
results = await asyncio.gather(*tasks)
success_count = sum(1 for r in results if r)
print(f"✓ Deleted {success_count}/{len(memory_ids)} memories")
Tip: Like other async operations, you can use
asyncio.gather()to perform multiple update or delete operations concurrently. This is especially useful when managing large numbers of memories or handling batch operations.