Problem: Your AI Endpoint Stalls Under Load
You wire up a FastAPI endpoint that calls an LLM, logs the result to Postgres, and it works fine in testing. Under real traffic — 50 concurrent requests hitting an endpoint that waits 2–8 seconds for an LLM response — you start seeing:
sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached,
connection timed out, timeout 30
Or worse: silent hangs where requests queue behind each other because your async code is blocking the event loop.
You'll learn:
- How to configure
AsyncEngineandAsyncSessioncorrectly for AI workloads - Why default pool settings fail when LLM calls hold connections open
- How to structure dependency injection so sessions don't leak
- How to tune
pool_size,max_overflow, andpool_pre_pingfor production
Time: 25 min | Difficulty: Advanced
Why Default SQLAlchemy Settings Break With AI Endpoints
Standard web endpoints spend ~5ms in the database. AI endpoints spend 2–8 seconds waiting for LLM responses — and if your session stays open across that wait, you're holding a DB connection for the full duration.
Default SQLAlchemy pool:
| Setting | Default | Problem under AI load |
|---|---|---|
pool_size | 5 | Exhausted by 5 concurrent requests |
max_overflow | 10 | Max 15 connections total |
pool_timeout | 30s | Requests queue and timeout |
pool_recycle | -1 (never) | Stale connections after idle periods |
The fix is a combination of: acquiring DB sessions only when needed, releasing them before LLM calls, and sizing the pool to match your actual concurrency target.
Solution
Step 1: Set Up the Async Engine With Correct Pool Config
Install dependencies first:
# Use uv for fast installs (Python 3.12+)
uv add fastapi sqlalchemy asyncpg pydantic-settings uvicorn
Create app/database.py:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase
# asyncpg is the async PostgreSQL driver — DO NOT use psycopg2 here
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"
engine = create_async_engine(
DATABASE_URL,
# Pool sizing for AI endpoints: target concurrency × average DB time / average total time
# Example: 100 concurrent reqs × 50ms DB / 4000ms total = ~1.25 → use 10 as safe buffer
pool_size=10,
max_overflow=20, # Allows burst to 30 total connections
pool_timeout=10, # Fail fast — don't let requests queue for 30s
pool_pre_ping=True, # Checks connection health before use (catches stale connections)
pool_recycle=3600, # Recycle connections every hour
echo=False, # Set True only during local debugging — logs every SQL statement
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Prevents lazy-load errors after commit in async context
)
class Base(DeclarativeBase):
pass
Why expire_on_commit=False: In sync SQLAlchemy, accessing attributes after commit triggers a lazy load. In async, that raises MissingGreenlet. Setting expire_on_commit=False keeps attributes available without a new query.
Step 2: Structure the Session Dependency to Release Early
The critical pattern: close the DB session before the LLM call, not after.
# app/dependencies.py
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import AsyncSessionLocal
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
except Exception:
await session.rollback()
raise
# Session closes here automatically — context manager handles it
This is the wrong pattern for AI endpoints — it holds the connection open during LLM inference:
# ❌ BAD: Session open the entire time, including 4-second LLM call
@router.post("/generate")
async def generate(db: AsyncSession = Depends(get_db)):
user = await db.get(User, user_id)
result = await llm_client.generate(user.prompt) # 4 seconds — DB connection held!
await save_result(db, result)
return result
Step 3: Implement the Release-Before-LLM Pattern
Split your endpoint into three phases: read → release → call LLM → reacquire → write.
# app/routers/generate.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.dependencies import get_db
from app.models import User, GenerationLog
from app.services.llm import call_llm # your LLM wrapper
router = APIRouter()
@router.post("/generate/{user_id}")
async def generate_response(
user_id: int,
db: AsyncSession = Depends(get_db),
):
# Phase 1: Read what you need, then close the session
user = await db.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
prompt = user.prompt_template # Read attributes while session is alive
await db.close() # Release connection back to pool NOW
# Phase 2: LLM call — no DB connection held during this wait
# This is where the 2–8 second wait happens
llm_result = await call_llm(prompt)
# Phase 3: Reacquire a connection to write results
async with AsyncSessionLocal() as write_session:
log = GenerationLog(
user_id=user_id,
result=llm_result.text,
tokens_used=llm_result.usage.total_tokens,
)
write_session.add(log)
await write_session.commit()
return {"result": llm_result.text, "tokens": llm_result.usage.total_tokens}
With this pattern, a connection is only held for ~5ms (read) + ~5ms (write), not for the full 4 seconds.
Step 4: Build a Robust LLM Service Wrapper
The LLM call needs timeout handling and retry logic. LLM APIs return 429s under load.
# app/services/llm.py
import asyncio
import httpx
from dataclasses import dataclass
@dataclass
class LLMResult:
text: str
usage: dict
async def call_llm(
prompt: str,
model: str = "claude-sonnet-4-20250514",
max_retries: int = 3,
timeout: float = 30.0,
) -> LLMResult:
# Exponential backoff handles rate limits (429) and transient 5xx errors
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
},
json={
"model": model,
"max_tokens": 1024,
"messages": [{"role": "user", "content": prompt}],
},
)
response.raise_for_status()
data = response.json()
return LLMResult(
text=data["content"][0]["text"],
usage=data["usage"],
)
except httpx.HTTPStatusError as e:
if e.response.status_code == 429 and attempt < max_retries - 1:
# 429 = rate limit; back off 2^attempt seconds
await asyncio.sleep(2 ** attempt)
continue
raise
except httpx.TimeoutException:
if attempt < max_retries - 1:
await asyncio.sleep(1)
continue
raise
raise RuntimeError(f"LLM call failed after {max_retries} attempts")
Step 5: Configure Lifespan for Engine Disposal
FastAPI's lifespan context manager is the correct place to create and dispose the engine — not module-level startup/shutdown events (deprecated in FastAPI 0.95+).
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.database import engine, Base
from app.routers import generate
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: create tables if they don't exist
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
# Shutdown: dispose engine cleanly — closes all pooled connections
await engine.dispose()
app = FastAPI(lifespan=lifespan)
app.include_router(generate.router, prefix="/api/v1")
Why engine.dispose() matters: Without it, you get asyncpg warnings about unclosed connections when the process exits. In Kubernetes, this causes slow pod shutdown and failed health checks.
Step 6: Add Background Tasks for Fire-and-Forget Logging
For non-critical logging (analytics, usage tracking), use FastAPI's BackgroundTasks so the response returns immediately:
# app/routers/generate.py (updated)
from fastapi import BackgroundTasks
async def log_usage_background(user_id: int, tokens: int, latency_ms: int):
"""Runs after response is sent — doesn't block the user."""
async with AsyncSessionLocal() as session:
log = UsageLog(user_id=user_id, tokens=tokens, latency_ms=latency_ms)
session.add(log)
await session.commit()
@router.post("/generate/{user_id}")
async def generate_response(
user_id: int,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
):
import time
start = time.monotonic()
user = await db.get(User, user_id)
await db.close()
llm_result = await call_llm(user.prompt_template)
latency_ms = int((time.monotonic() - start) * 1000)
# Schedule logging — doesn't delay the response
background_tasks.add_task(
log_usage_background,
user_id=user_id,
tokens=llm_result.usage.get("total_tokens", 0),
latency_ms=latency_ms,
)
return {"result": llm_result.text}
Verification
Load test with httpx to confirm the pool holds under concurrent requests:
uv add httpx pytest pytest-asyncio
# tests/test_concurrency.py
import asyncio
import httpx
import pytest
@pytest.mark.asyncio
async def test_concurrent_generate_requests():
"""50 concurrent requests should all complete without pool exhaustion."""
async with httpx.AsyncClient(base_url="http://localhost:8000", timeout=60) as client:
tasks = [
client.post("/api/v1/generate/1")
for _ in range(50)
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
errors = [r for r in responses if isinstance(r, Exception)]
timeouts = [r for r in responses if hasattr(r, "status_code") and r.status_code == 504]
assert len(errors) == 0, f"Exceptions: {errors}"
assert len(timeouts) == 0, f"Timeouts: {timeouts}"
Check pool stats during load:
# Run this while the load test is active
from app.database import engine
pool = engine.pool
print(f"Pool size: {pool.size()}") # Connections currently open
print(f"Checked out: {pool.checkedout()}") # Connections in use
print(f"Overflow: {pool.overflow()}") # Connections beyond pool_size
print(f"Invalid: {pool.checkedin()}") # Connections available
You should see: checkedout peaks at your actual DB usage (not LLM wait time), and never hits pool_size + max_overflow.
Production Pool Sizing Formula
For AI workloads, connection demand is driven by DB time, not total request time:
pool_size = (target_concurrent_requests × avg_db_time_ms) / avg_total_request_time_ms
# Example: 100 concurrent, 50ms DB work, 4000ms total
pool_size = (100 × 50) / 4000 = 1.25 → use 10 (with buffer)
max_overflow = pool_size × 2 # Burst headroom
pool_timeout = 5–10s # Fail fast; don't queue for 30s
Set pool_timeout low. If requests queue at the pool, that's a signal to increase pool_size, not to let them wait longer.
What You Learned
- Default SQLAlchemy pool settings assume short-lived DB operations — AI endpoints violate this assumption
- The release-before-LLM pattern keeps connections held for only milliseconds, not seconds
expire_on_commit=Falseis required in async SQLAlchemy to avoidMissingGreenleterrorsBackgroundTasksis the right tool for non-blocking analytics logging- Pool sizing for AI backends is calculated from DB time fraction, not total latency
When NOT to use this pattern: If your LLM calls are very short (<200ms) and you're running a small instance with minimal concurrency, the added complexity isn't worth it. Use it when you're targeting >20 concurrent AI requests on a shared DB.
Tested on FastAPI 0.115, SQLAlchemy 2.0.36, asyncpg 0.30, Python 3.12, PostgreSQL 17