Skip to main content

Caching

What Your Agent Inherits

Your AI agent can cache any value with a single dependency injection call. The cache layer is optional, pluggable (memory or Redis), and pre-wired with per-key TTL support, key prefix namespacing, and automatic health registration. When caching is enabled, the agent simply calls get_cache() in a route handler and receives a fully configured store. There is no connection management, no backend selection, and no TTL infrastructure to build.

When caching is disabled (the default), no cache resources are allocated and no readiness check is registered. The feature costs nothing when left unused.

The CacheStore Abstraction

The CacheStore abstract base class defines the contract that every cache backend must fulfill. Seven methods cover the complete lifecycle: read, write, delete, existence check, bulk clear, health ping, and resource cleanup.

src/app/cache/store.py View source
src/app/cache/store.py
class CacheStore(ABC):
"""Abstract cache store used by the application."""
@abstractmethod
async def get(self, key: str) -> bytes | None:
"""Return the cached value for *key*, or ``None`` if absent/expired."""
@abstractmethod
async def set(self, key: str, value: bytes, *, ttl_seconds: int) -> None:
"""Store *value* under *key* with a TTL in seconds."""
@abstractmethod
async def delete(self, key: str) -> None:
"""Remove *key* from the cache."""
@abstractmethod
async def exists(self, key: str) -> bool:
"""Return ``True`` if *key* is present and not expired."""
@abstractmethod
async def clear(self) -> None:
"""Remove all entries from this cache store."""
@abstractmethod
async def ping(self) -> bool:
"""Lightweight connectivity check used by the readiness probe."""
@abstractmethod
async def close(self) -> None:
"""Release resources held by the store (connections, memory)."""

All methods are async, even for the in-memory backend. This keeps the interface uniform so that route handlers never need to know which backend is active. Values are stored as bytes: callers serialize before writing and deserialize after reading, which keeps the cache layer encoding-agnostic.

Memory Backend

MemoryCacheStore implements the interface using a plain dictionary with passive TTL expiry. Each entry stores a value alongside an expiry timestamp based on time.monotonic(). Expired entries are evicted lazily on read, and a max-entry cap triggers oldest-first eviction once the store fills up.

src/app/cache/store.py View source
src/app/cache/store.py
class MemoryCacheStore(CacheStore):
"""In-process cache with passive TTL expiry and max-entry eviction."""
def __init__(self, *, max_entries: int = 10_000) -> None:
self.max_entries = max_entries
# key -> (value, expiry_timestamp)
self._data: dict[str, tuple[bytes, float]] = {}
async def get(self, key: str) -> bytes | None:
entry = self._data.get(key)
if entry is None:
return None
value, expiry = entry
if time.monotonic() >= expiry:
self._data.pop(key, None)
return None
return value
async def set(self, key: str, value: bytes, *, ttl_seconds: int) -> None:
if len(self._data) >= self.max_entries and key not in self._data:
self._evict_oldest()
self._data[key] = (value, time.monotonic() + ttl_seconds)
async def delete(self, key: str) -> None:
self._data.pop(key, None)
async def exists(self, key: str) -> bool:
return await self.get(key) is not None
async def clear(self) -> None:
self._data.clear()
async def ping(self) -> bool:
return True
async def close(self) -> None:
self._data.clear()
def _evict_oldest(self) -> None:
"""Remove the entry with the earliest expiry timestamp."""
if not self._data:
return
oldest_key = min(self._data, key=lambda k: self._data[k][1])
self._data.pop(oldest_key, None)

This backend is a good fit for single-process deployments and local development. The max_entries cap (default 10,000) prevents unbounded memory growth, and ping() always returns True because there is no external dependency to verify. TTL relies on time.monotonic() rather than wall-clock time, making it immune to clock adjustments.

Redis Backend

RedisCacheStore wraps the async Redis client and adds key prefix namespacing. Every key gets a prefix (default cache:) to avoid collisions when multiple applications share a Redis instance. TTL is delegated natively to Redis via SETEX.

src/app/cache/store.py View source
src/app/cache/store.py
class RedisCacheStore(CacheStore):
"""Redis-backed distributed cache store."""
def __init__(self, storage_url: str, *, key_prefix: str = "cache:") -> None:
try:
from redis import asyncio as redis_asyncio
except ImportError:
raise ImportError(
"The 'redis' package is required for Redis-backed caching. "
"Install it with: uv sync --extra redis"
) from None
self._client = redis_asyncio.from_url(storage_url, encoding="utf-8", decode_responses=False)
self._key_prefix = key_prefix
def _prefixed(self, key: str) -> str:
return f"{self._key_prefix}{key}"
async def get(self, key: str) -> bytes | None:
value = await self._client.get(self._prefixed(key))
if value is None:
return None
return value if isinstance(value, bytes) else value.encode("utf-8")
async def set(self, key: str, value: bytes, *, ttl_seconds: int) -> None:
await self._client.setex(self._prefixed(key), ttl_seconds, value)
async def delete(self, key: str) -> None:
await self._client.delete(self._prefixed(key))
async def exists(self, key: str) -> bool:
return bool(await self._client.exists(self._prefixed(key)))
async def clear(self) -> None:
await self._client.flushdb()
async def ping(self) -> bool:
client: Any = self._client
return bool(await client.ping())
async def close(self) -> None:
await self._client.aclose()

The Redis package is an optional dependency (uv sync --extra redis). If the package is missing, the ImportError guard surfaces a clear installation message. Key prefixing is handled transparently: callers pass bare keys like "user:123", and the store prepends "cache:user:123" before touching Redis. This makes multi-tenant isolation straightforward when several services share a single Redis instance.

Factory Function

The create_cache_store() factory reads the application settings and decides which backend to instantiate. The cache_backend setting ("memory" or "redis") drives the selection, while backend-specific settings like max entries, Redis URL, and key prefix are passed through automatically.

src/app/cache/store.py View source
src/app/cache/store.py
def create_cache_store(settings: Settings) -> CacheStore:
"""Instantiate a cache store based on the application settings."""
if settings.cache_backend == "redis":
return RedisCacheStore(
settings.cache_storage_url,
key_prefix=settings.cache_key_prefix,
)
return MemoryCacheStore(max_entries=settings.cache_max_entries)

This factory runs once during the lifespan startup. The resulting store is attached to app.state.cache_store and shared across all requests. Switching backends is a one-line environment variable change (APP_CACHE_BACKEND=redis) with no code modifications required.

Dependency Injection

Route handlers access the cache through a FastAPI dependency. The get_cache() function extracts the store from application state and returns it typed as CacheStore, so handlers always code against the abstract interface regardless of the active backend.

src/app/cache/dependencies.py View source
src/app/cache/dependencies.py
def get_cache(request: Request) -> CacheStore:
"""Return the configured cache store from application state."""
return cast("CacheStore", request.app.state.cache_store)

A route handler uses it like this:

from fastapi import Depends
from app.cache.dependencies import get_cache
from app.cache.store import CacheStore
@router.get("/items/{item_id}")
async def get_item(item_id: str, cache: CacheStore = Depends(get_cache)):
cached = await cache.get(f"item:{item_id}")
if cached:
return json.loads(cached)
# ... fetch from database, then cache
await cache.set(f"item:{item_id}", json.dumps(item).encode(), ttl_seconds=300)
return item

The dependency is synchronous (no async) because it only reads from app.state and involves no I/O. FastAPI handles both sync and async dependencies transparently.


Best Practices

  • Always use an abstract CacheStore interface so backends are interchangeable. Coding against the abstraction means switching from memory to Redis is a single environment variable change with no application code modifications.
  • Prefer per-key TTL over global TTL. Different data has different freshness requirements — user profiles may tolerate 5-minute staleness while pricing data needs 10-second TTL.
  • Always use time.monotonic() for TTL expiry in in-memory caches. Monotonic clocks are immune to wall-clock adjustments (NTP sync, daylight saving) that can prematurely expire or extend cached entries.
  • Never cache without a max-entry cap on in-memory stores. Unbounded caches cause memory growth that eventually triggers OOM kills in containerized deployments.
  • Always use key prefix namespacing when sharing a Redis instance. Without prefixes, multiple applications using the same Redis instance risk key collisions and accidental data overwrites.

Further Reading


What the Agent Never Implements

The caching infrastructure takes care of everything listed below. Your agent only needs to write cache-aware business logic on top of this foundation:

  • CacheStore abstraction. A 7-method async interface that ensures backend interchangeability.
  • Memory backend. A dict-based store with monotonic TTL and max-entry eviction, ideal for single-process deployments.
  • Redis backend. A distributed store with key prefix namespacing and native TTL delegation.
  • Backend selection. A factory function that reads settings and instantiates the correct store.
  • Dependency injection. get_cache() provides the store to any route handler via Depends().
  • Health registration. The cache readiness check is auto-registered during the builder chain.
  • Lifespan management. Store initialization on startup and close() on shutdown, both handled by the lifespan manager.
  • Settings integration. Backend type, TTL defaults, max entries, Redis URL, and key prefix are all configurable through environment variables.