Middleware Stack
What Your Agent Inherits
Your AI agent’s endpoints run inside a middleware pipeline that has already taken care of request IDs, structured logging, rate limiting, body size limits, timeouts, security headers, CORS, and trusted host validation. By the time a request reaches your route handler, it has already been identified, metered, and hardened.
The agent never registers middleware, never configures CORS origins, and never writes request logging. The chassis owns the entire request envelope, leaving the agent free to focus on the response body.
The Registration Order
Starlette applies middleware in reverse registration order: the last middleware added is the first to process incoming requests. The chassis takes advantage of this to create a deliberate layering where security concerns wrap everything and correlation IDs propagate through every layer.
def setup_middleware(self) -> Self: """ Configure the middleware stack.
Starlette applies middleware in reverse registration order, so the last middleware added here will be the first one to process requests. """ try: # Timeout stays closest to the handler so outer middleware can still # observe and annotate timeout responses consistently. self.app.add_middleware( TimeoutMiddleware, timeout=self.settings.request_timeout, ) # Body limits run before auth/handler logic to reject oversized # requests as early as possible. self.app.add_middleware( BodySizeLimitMiddleware, max_request_body_bytes=self.settings.max_request_body_bytes, ) if self.settings.rate_limit_enabled: # Rate limiting remains inside request ID/logging wrappers so # rejected requests still receive correlation headers and # access-style request logs. self.app.add_middleware( RateLimitMiddleware, limit=self.settings.rate_limit_requests, window_seconds=self.settings.rate_limit_window_seconds, key_strategy=self.settings.rate_limit_key_strategy, storage_url=self.settings.rate_limit_storage_url, trust_proxy_headers=self.settings.rate_limit_trust_proxy_headers, proxy_headers=self.settings.rate_limit_proxy_headers, trusted_proxies=self.settings.rate_limit_trusted_proxies, exempt_paths=[ self.settings.health_check_path, self.settings.readiness_check_path, METRICS_PATH, "/favicon.ico", ], ) # Request identity and request logging stay outside the rate limiter # so 429s still get correlation headers and one structured log. self.app.add_middleware(RequestIDMiddleware) self.app.add_middleware( RequestLoggingMiddleware, redact_headers=self.settings.log_redact_headers ) if self.settings.security_headers_enabled: # Security headers are added late so they are applied to both # success and error responses. self.app.add_middleware( SecurityHeadersMiddleware, hsts_enabled=self.settings.security_hsts_enabled, hsts_max_age_seconds=self.settings.security_hsts_max_age_seconds, referrer_policy=self.settings.security_referrer_policy, permissions_policy=self.settings.security_permissions_policy, content_security_policy=self.settings.security_content_security_policy, trust_proxy_proto_header=self.settings.security_trust_proxy_proto_header, trusted_proxies=self.settings.security_trusted_proxies, ) # Host validation stays on by default; local/test hosts are part of # the default settings so development still works without widening # the acceptance policy to "*". self.app.add_middleware( TrustedHostMiddleware, allowed_hosts=self.settings.trusted_hosts, ) # CORS is outermost so preflight requests are handled before auth, # rate limiting, or route logic. self.app.add_middleware( CORSMiddleware, allow_origins=self.settings.cors_allowed_origins, allow_credentials=self.settings.cors_allow_credentials, allow_methods=self.settings.cors_allowed_methods, allow_headers=self.settings.cors_allowed_headers, expose_headers=self.settings.cors_expose_headers, ) self.logger.info("Middleware stack configured successfully") except Exception as exc: self.logger.exception("Failed to configure middleware: %s", exc) raise return selfTake a close look at the inline comments; they explain why each middleware sits where it does. The key insight is that registration order is the inverse of execution order:
- CORS is registered last but executes first, so preflight requests never reach auth or rate limiting.
- Timeout is registered first but executes last. It wraps only the handler, which lets outer middleware still log and annotate timeout responses.
- Request ID and Logging sit outside rate limiting so that even 429 responses get correlation headers and a structured access log.
Why Raw ASGI, Not BaseHTTPMiddleware
Every custom middleware in this chassis implements the raw ASGI protocol directly instead of subclassing Starlette’s BaseHTTPMiddleware. This is a deliberate choice, driven by three well-known problems with BaseHTTPMiddleware.
-
It reads the entire request body into memory.
BaseHTTPMiddlewarecallsawait request.body()before your code runs, which breaks streaming uploads and consumes memory proportional to body size. -
It wraps exceptions in a way that hides the original. When a handler raises inside
BaseHTTPMiddleware, the original traceback is lost because the middleware catches and re-raises it. -
It prevents streaming responses. The
call_next()pattern buffers the entire response before returning it, which defeats any streaming response your handler produces.
The raw ASGI pattern is straightforward: accept scope, receive, and send, check if the request type is HTTP, do your work, and call the next app. Here is the canonical example from the Request ID middleware:
class RequestIDMiddleware: """ Middleware that assigns per-request and cross-request tracing IDs.
``request_id`` is always generated locally so each service hop has its own unique identifier. ``correlation_id`` is propagated from upstream when available so related requests across services can still be tied together. """
HEADER_NAME = "X-Request-ID" CORRELATION_HEADER_NAME = "X-Correlation-ID"
def __init__(self, app: ASGIApp) -> None: self.app = app
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: """Inject tracing IDs into request state, context, and response headers.""" if scope["type"] != "http": await self.app(scope, receive, send) return
headers = {key.lower(): value for key, value in scope.get("headers", [])} request_id = str(uuid.uuid4()) correlation_id = ( headers.get(self.CORRELATION_HEADER_NAME.lower().encode("latin-1")) or headers.get(self.HEADER_NAME.lower().encode("latin-1")) or request_id.encode("utf-8") ).decode("utf-8")Every middleware in the chassis follows this same structure. __init__ stores the next app, __call__ checks the scope type, does its work, and delegates to self.app(scope, receive, send). It can optionally wrap send or receive to intercept messages along the way.
The Six Middlewares
Request ID
Generates a fresh UUID for every request and propagates correlation IDs across service boundaries. If the caller sends X-Correlation-ID, that value is reused. If only X-Request-ID is present, it becomes the correlation ID. Both identifiers are then injected into the ASGI scope state, the logging context, and the response headers.
state = scope.setdefault("state", {}) state["request_id"] = request_id state["correlation_id"] = correlation_id
async def send_wrapper(message: Message) -> None: if message["type"] == "http.response.start": mutable_headers = list(message.get("headers", [])) self._upsert_header( mutable_headers, self.HEADER_NAME.encode("latin-1"), request_id.encode("utf-8") ) self._upsert_header( mutable_headers, self.CORRELATION_HEADER_NAME.encode("latin-1"), correlation_id.encode("utf-8"), ) message["headers"] = mutable_headers await send(message)
try: await self.app(scope, receive, send_wrapper) finally: reset_request_context(tokens)The send_wrapper pattern is how raw ASGI middleware intercepts response messages. It inspects the http.response.start message to inject headers before the response is sent to the client.
Request Logging
Emits one structured access log per request containing the method, path, status code, latency, client address, request ID, and correlation ID. This replaces Uvicorn’s default access log with a richer, JSON-structured format that includes correlation IDs for distributed tracing.
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: """Emit one structured app log for each HTTP request.""" if scope["type"] != "http": await self.app(scope, receive, send) return
start = perf_counter() status_code = 500 request_id = "-" correlation_id = "-" response_bytes: int | None = None
async def send_wrapper(message: Message) -> None: nonlocal status_code, request_id, correlation_id, response_bytes if message["type"] == "http.response.start": status_code = int(message["status"]) headers = {key.lower(): value for key, value in message.get("headers", [])} request_id = headers.get( RequestIDMiddleware.HEADER_NAME.lower().encode("latin-1"), b"-" ).decode("utf-8") correlation_id = headers.get( RequestIDMiddleware.CORRELATION_HEADER_NAME.lower().encode("latin-1"), b"-", ).decode("utf-8") content_length = headers.get(b"content-length") if content_length is not None: try: response_bytes = int(content_length.decode("ascii")) except ValueError: response_bytes = None await send(message)Notice that status_code defaults to 500. If the handler crashes before sending a response, the log still records the failure correctly. The perf_counter() timer captures sub-millisecond latency.
Security Headers
Injects a conservative set of security headers on every response, including X-Content-Type-Options: nosniff, X-Frame-Options: DENY, Referrer-Policy, Permissions-Policy, Cache-Control: no-store, and optionally Content-Security-Policy. HSTS is only added when the request arrived over HTTPS, detected via the scheme or trusted proxy headers.
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: if scope["type"] != "http": await self.app(scope, receive, send) return
async def send_wrapper(message: Message) -> None: if message["type"] == "http.response.start": headers = MutableHeaders(raw=message.setdefault("headers", [])) headers["X-Content-Type-Options"] = "nosniff" headers["X-Frame-Options"] = "DENY" headers["Referrer-Policy"] = self.referrer_policy headers["Permissions-Policy"] = self.permissions_policy headers["Cache-Control"] = "no-store" if self.content_security_policy: headers["Content-Security-Policy"] = self.content_security_policy
request_headers = Headers(raw=scope.get("headers", [])) client = scope.get("client") client_host = client[0] if client else "unknown" forwarded_proto = None if self.trust_proxy_proto_header and is_trusted_proxy( client_host, self.trusted_proxies ): forwarded_proto = normalize_forwarded_proto( request_headers.get("x-forwarded-proto") ) scheme = forwarded_proto or scope.get("scheme", "http") if self.hsts_enabled and scheme == "https": headers["Strict-Transport-Security"] = ( f"max-age={self.hsts_max_age_seconds}; includeSubDomains" )
await send(message)
await self.app(scope, receive, send_wrapper)The HSTS check is proxy-aware. When the application sits behind a reverse proxy like Nginx, Traefik, or a cloud load balancer, it reads X-Forwarded-Proto but only from trusted proxy IPs. This prevents clients from spoofing the protocol header to trigger or suppress HSTS.
Rate Limiting
Implements a fixed-window rate limiting algorithm with pluggable backends: in-memory for development and Redis for production. The middleware exempts health check and metrics paths, and it is proxy-aware for accurate IP-based rate limiting behind reverse proxies.
class RateLimitMiddleware: """Apply request rate limiting before the route handler executes."""
def __init__( self, app: ASGIApp, *, limit: int, window_seconds: int, key_strategy: str, storage_url: str, trust_proxy_headers: bool, proxy_headers: list[str], trusted_proxies: list[str], exempt_paths: list[str], ) -> None: self.app = app self.limit = limit self.window_seconds = window_seconds self.key_strategy = key_strategy self.trust_proxy_headers = trust_proxy_headers self.proxy_headers = [header.lower() for header in proxy_headers] self.trusted_proxies = parse_trusted_proxies(trusted_proxies) self.exempt_paths = set(exempt_paths) self.store: RateLimitStore if storage_url: self.store = RedisRateLimitStore(storage_url) else: self.store = MemoryRateLimitStore()The key_strategy setting supports two modes. "ip" (the default) hashes the client IP, while "authorization" hashes the Authorization header so authenticated clients share a per-token budget. Thanks to the store abstraction, switching from memory to Redis is a single configuration change with no code modifications required.
Body Size Limit
Rejects oversized request bodies before the handler ever reads them. The middleware checks the Content-Length header upfront for an early rejection, then wraps receive to count bytes as they stream in. This catches requests that underreport their size or omit Content-Length entirely.
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: if scope["type"] != "http": await self.app(scope, receive, send) return
total_received = 0
async def receive_wrapper() -> Message: nonlocal total_received message = await receive() if message["type"] == "http.request": body = message.get("body", b"") total_received += len(body) if total_received > self.max_request_body_bytes: raise RequestTooLargeError return message
content_length = dict(scope.get("headers", [])).get(b"content-length") if content_length is not None: try: parsed_content_length = int(content_length) except ValueError: response = JSONResponse( status_code=400, content={ "error": "invalid_request", "detail": "Content-Length header must be a valid integer", }, ) await response(scope, receive, send) return
if parsed_content_length > self.max_request_body_bytes: response = JSONResponse( status_code=413, content={ "error": "request_too_large", "detail": ( f"Request body exceeds {self.max_request_body_bytes} byte limit" ), }, ) await response(scope, receive, send) return
try: await self.app(scope, receive_wrapper, send) except RequestTooLargeError: response = JSONResponse( status_code=413, content={ "error": "request_too_large", "detail": (f"Request body exceeds {self.max_request_body_bytes} byte limit"), }, ) await response(scope, receive, send)This is the receive_wrapper pattern, the counterpart to the send_wrapper pattern seen in other middleware. Instead of intercepting outgoing response messages, it intercepts incoming request body chunks. It is fully streaming-aware and never buffers the entire body in memory. It simply counts bytes as they arrive.
Timeout
Wraps request processing in asyncio.wait_for() to enforce a maximum duration. If the handler exceeds the timeout, the middleware returns a 504 Gateway Timeout with a structured JSON body. For Kubernetes deployments, set this value below the ingress controller’s timeout so the application returns a meaningful error before the infrastructure kills the connection.
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: """Process each request with timeout enforcement.""" if scope["type"] != "http": await self.app(scope, receive, send) return
response_started = False response_completed = False
async def send_wrapper(message: Message) -> None: nonlocal response_started, response_completed if message["type"] == "http.response.start": response_started = True if message["type"] == "http.response.body" and not message.get("more_body", False): response_completed = True await send(message)
try: await asyncio.wait_for(self.app(scope, receive, send_wrapper), timeout=self.timeout) except TimeoutError: if response_started: request = Request(scope, receive=receive) request_path = get_sanitized_request_path(request) logger.warning( "Request timed out after response started; closing partial response for %s", request_path, ) if not response_completed: await send({"type": "http.response.body", "body": b"", "more_body": False}) return
request = Request(scope, receive=receive) request_path = get_sanitized_request_path(request) response = JSONResponse( status_code=504, content={ "error": "gateway_timeout", "detail": f"Request processing exceeded {self.timeout}s limit", "path": request_path, }, ) await response(scope, receive, send)The timeout middleware tracks whether the response has already started. If the timeout fires after response headers have been sent, it cannot send a 504, so it closes the partial response cleanly and logs a warning instead. This prevents the ASGI server from raising a protocol error due to duplicate response starts.
The Request Flow
The diagram above shows the full 8-layer request flow. An inbound request passes through each layer from the outside in, starting with CORS, then Trusted Host, Security Headers, Request Logging, Request ID, Rate Limiting, Body Size Limit, and finally Timeout before reaching your route handler. The response travels back through the same layers in reverse, picking up security headers, rate limit headers, correlation IDs, and an access log entry along the way.
Best Practices
- Always use raw ASGI middleware instead of
BaseHTTPMiddlewarefor production code.BaseHTTPMiddlewarebuffers request bodies, hides exception tracebacks, and prevents streaming responses. - Never change middleware registration order without understanding the execution model. Starlette applies middleware in reverse registration order — the last middleware added is the first to process requests.
- Always place CORS as the outermost middleware so preflight requests are handled before authentication, rate limiting, or route logic execute.
- Prefer the
send_wrapperandreceive_wrapperpatterns for intercepting ASGI messages. These patterns enable streaming-compatible request/response modification without buffering. - Always keep request ID and logging middleware outside the rate limiter so that even rejected 429 responses get correlation headers and a structured access log entry.
Further Reading
- Starlette Middleware Documentation
- ASGI Specification
- Starlette Discussion: BaseHTTPMiddleware Issues
- OWASP Secure Headers Project
What the Agent Never Implements
The middleware stack handles all of the following, so your agent just writes route handlers and benefits from these protections automatically:
- Request ID generation and correlation header propagation across service boundaries
- Structured access logging with method, path, status, latency, and correlation IDs
- Security headers including HSTS, CSP, X-Frame-Options, referrer policy, and permissions policy
- Rate limiting with fixed-window algorithm, memory and Redis backends, and proxy-aware IP detection
- Body size validation with streaming byte counting and early Content-Length rejection
- Request timeout enforcement with clean cancellation semantics and partial response handling
- CORS configuration for cross-origin preflight handling
- Trusted host validation preventing host header injection attacks