FastAPI + SQLAlchemy Async: Production Patterns for High-Concurrency AI Endpoints

Configure SQLAlchemy async engine, tune connection pools, and structure FastAPI AI endpoints to handle concurrent LLM calls without exhausting DB connections.

Problem: Your AI Endpoint Stalls Under Load

You wire up a FastAPI endpoint that calls an LLM, logs the result to Postgres, and it works fine in testing. Under real traffic — 50 concurrent requests hitting an endpoint that waits 2–8 seconds for an LLM response — you start seeing:

sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached,
connection timed out, timeout 30

Or worse: silent hangs where requests queue behind each other because your async code is blocking the event loop.

You'll learn:

  • How to configure AsyncEngine and AsyncSession correctly for AI workloads
  • Why default pool settings fail when LLM calls hold connections open
  • How to structure dependency injection so sessions don't leak
  • How to tune pool_size, max_overflow, and pool_pre_ping for production

Time: 25 min | Difficulty: Advanced


Why Default SQLAlchemy Settings Break With AI Endpoints

Standard web endpoints spend ~5ms in the database. AI endpoints spend 2–8 seconds waiting for LLM responses — and if your session stays open across that wait, you're holding a DB connection for the full duration.

Default SQLAlchemy pool:

SettingDefaultProblem under AI load
pool_size5Exhausted by 5 concurrent requests
max_overflow10Max 15 connections total
pool_timeout30sRequests queue and timeout
pool_recycle-1 (never)Stale connections after idle periods

The fix is a combination of: acquiring DB sessions only when needed, releasing them before LLM calls, and sizing the pool to match your actual concurrency target.


Solution

Step 1: Set Up the Async Engine With Correct Pool Config

Install dependencies first:

# Use uv for fast installs (Python 3.12+)
uv add fastapi sqlalchemy asyncpg pydantic-settings uvicorn

Create app/database.py:

from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase

# asyncpg is the async PostgreSQL driver — DO NOT use psycopg2 here
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"

engine = create_async_engine(
    DATABASE_URL,
    # Pool sizing for AI endpoints: target concurrency × average DB time / average total time
    # Example: 100 concurrent reqs × 50ms DB / 4000ms total = ~1.25 → use 10 as safe buffer
    pool_size=10,
    max_overflow=20,        # Allows burst to 30 total connections
    pool_timeout=10,        # Fail fast — don't let requests queue for 30s
    pool_pre_ping=True,     # Checks connection health before use (catches stale connections)
    pool_recycle=3600,      # Recycle connections every hour
    echo=False,             # Set True only during local debugging — logs every SQL statement
)

AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,  # Prevents lazy-load errors after commit in async context
)

class Base(DeclarativeBase):
    pass

Why expire_on_commit=False: In sync SQLAlchemy, accessing attributes after commit triggers a lazy load. In async, that raises MissingGreenlet. Setting expire_on_commit=False keeps attributes available without a new query.


Step 2: Structure the Session Dependency to Release Early

The critical pattern: close the DB session before the LLM call, not after.

# app/dependencies.py
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import AsyncSessionLocal

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with AsyncSessionLocal() as session:
        try:
            yield session
        except Exception:
            await session.rollback()
            raise
        # Session closes here automatically — context manager handles it

This is the wrong pattern for AI endpoints — it holds the connection open during LLM inference:

# ❌ BAD: Session open the entire time, including 4-second LLM call
@router.post("/generate")
async def generate(db: AsyncSession = Depends(get_db)):
    user = await db.get(User, user_id)
    result = await llm_client.generate(user.prompt)  # 4 seconds — DB connection held!
    await save_result(db, result)
    return result

Step 3: Implement the Release-Before-LLM Pattern

Split your endpoint into three phases: read → release → call LLM → reacquire → write.

# app/routers/generate.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.dependencies import get_db
from app.models import User, GenerationLog
from app.services.llm import call_llm  # your LLM wrapper

router = APIRouter()

@router.post("/generate/{user_id}")
async def generate_response(
    user_id: int,
    db: AsyncSession = Depends(get_db),
):
    # Phase 1: Read what you need, then close the session
    user = await db.get(User, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    prompt = user.prompt_template  # Read attributes while session is alive
    await db.close()               # Release connection back to pool NOW

    # Phase 2: LLM call — no DB connection held during this wait
    # This is where the 2–8 second wait happens
    llm_result = await call_llm(prompt)

    # Phase 3: Reacquire a connection to write results
    async with AsyncSessionLocal() as write_session:
        log = GenerationLog(
            user_id=user_id,
            result=llm_result.text,
            tokens_used=llm_result.usage.total_tokens,
        )
        write_session.add(log)
        await write_session.commit()

    return {"result": llm_result.text, "tokens": llm_result.usage.total_tokens}

With this pattern, a connection is only held for ~5ms (read) + ~5ms (write), not for the full 4 seconds.


Step 4: Build a Robust LLM Service Wrapper

The LLM call needs timeout handling and retry logic. LLM APIs return 429s under load.

# app/services/llm.py
import asyncio
import httpx
from dataclasses import dataclass

@dataclass
class LLMResult:
    text: str
    usage: dict

async def call_llm(
    prompt: str,
    model: str = "claude-sonnet-4-20250514",
    max_retries: int = 3,
    timeout: float = 30.0,
) -> LLMResult:
    # Exponential backoff handles rate limits (429) and transient 5xx errors
    for attempt in range(max_retries):
        try:
            async with httpx.AsyncClient(timeout=timeout) as client:
                response = await client.post(
                    "https://api.anthropic.com/v1/messages",
                    headers={
                        "x-api-key": ANTHROPIC_API_KEY,
                        "anthropic-version": "2023-06-01",
                        "content-type": "application/json",
                    },
                    json={
                        "model": model,
                        "max_tokens": 1024,
                        "messages": [{"role": "user", "content": prompt}],
                    },
                )
                response.raise_for_status()
                data = response.json()
                return LLMResult(
                    text=data["content"][0]["text"],
                    usage=data["usage"],
                )

        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429 and attempt < max_retries - 1:
                # 429 = rate limit; back off 2^attempt seconds
                await asyncio.sleep(2 ** attempt)
                continue
            raise
        except httpx.TimeoutException:
            if attempt < max_retries - 1:
                await asyncio.sleep(1)
                continue
            raise

    raise RuntimeError(f"LLM call failed after {max_retries} attempts")

Step 5: Configure Lifespan for Engine Disposal

FastAPI's lifespan context manager is the correct place to create and dispose the engine — not module-level startup/shutdown events (deprecated in FastAPI 0.95+).

# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.database import engine, Base
from app.routers import generate

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: create tables if they don't exist
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    # Shutdown: dispose engine cleanly — closes all pooled connections
    await engine.dispose()

app = FastAPI(lifespan=lifespan)
app.include_router(generate.router, prefix="/api/v1")

Why engine.dispose() matters: Without it, you get asyncpg warnings about unclosed connections when the process exits. In Kubernetes, this causes slow pod shutdown and failed health checks.


Step 6: Add Background Tasks for Fire-and-Forget Logging

For non-critical logging (analytics, usage tracking), use FastAPI's BackgroundTasks so the response returns immediately:

# app/routers/generate.py (updated)
from fastapi import BackgroundTasks

async def log_usage_background(user_id: int, tokens: int, latency_ms: int):
    """Runs after response is sent — doesn't block the user."""
    async with AsyncSessionLocal() as session:
        log = UsageLog(user_id=user_id, tokens=tokens, latency_ms=latency_ms)
        session.add(log)
        await session.commit()

@router.post("/generate/{user_id}")
async def generate_response(
    user_id: int,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db),
):
    import time
    start = time.monotonic()

    user = await db.get(User, user_id)
    await db.close()

    llm_result = await call_llm(user.prompt_template)

    latency_ms = int((time.monotonic() - start) * 1000)

    # Schedule logging — doesn't delay the response
    background_tasks.add_task(
        log_usage_background,
        user_id=user_id,
        tokens=llm_result.usage.get("total_tokens", 0),
        latency_ms=latency_ms,
    )

    return {"result": llm_result.text}

Verification

Load test with httpx to confirm the pool holds under concurrent requests:

uv add httpx pytest pytest-asyncio
# tests/test_concurrency.py
import asyncio
import httpx
import pytest

@pytest.mark.asyncio
async def test_concurrent_generate_requests():
    """50 concurrent requests should all complete without pool exhaustion."""
    async with httpx.AsyncClient(base_url="http://localhost:8000", timeout=60) as client:
        tasks = [
            client.post("/api/v1/generate/1")
            for _ in range(50)
        ]
        responses = await asyncio.gather(*tasks, return_exceptions=True)

    errors = [r for r in responses if isinstance(r, Exception)]
    timeouts = [r for r in responses if hasattr(r, "status_code") and r.status_code == 504]

    assert len(errors) == 0, f"Exceptions: {errors}"
    assert len(timeouts) == 0, f"Timeouts: {timeouts}"

Check pool stats during load:

# Run this while the load test is active
from app.database import engine

pool = engine.pool
print(f"Pool size:      {pool.size()}")       # Connections currently open
print(f"Checked out:    {pool.checkedout()}")  # Connections in use
print(f"Overflow:       {pool.overflow()}")    # Connections beyond pool_size
print(f"Invalid:        {pool.checkedin()}")   # Connections available

You should see: checkedout peaks at your actual DB usage (not LLM wait time), and never hits pool_size + max_overflow.


Production Pool Sizing Formula

For AI workloads, connection demand is driven by DB time, not total request time:

pool_size = (target_concurrent_requests × avg_db_time_ms) / avg_total_request_time_ms

# Example: 100 concurrent, 50ms DB work, 4000ms total
pool_size = (100 × 50) / 4000 = 1.25 → use 10 (with buffer)
max_overflow = pool_size × 2    # Burst headroom
pool_timeout = 5–10s            # Fail fast; don't queue for 30s

Set pool_timeout low. If requests queue at the pool, that's a signal to increase pool_size, not to let them wait longer.


What You Learned

  • Default SQLAlchemy pool settings assume short-lived DB operations — AI endpoints violate this assumption
  • The release-before-LLM pattern keeps connections held for only milliseconds, not seconds
  • expire_on_commit=False is required in async SQLAlchemy to avoid MissingGreenlet errors
  • BackgroundTasks is the right tool for non-blocking analytics logging
  • Pool sizing for AI backends is calculated from DB time fraction, not total latency

When NOT to use this pattern: If your LLM calls are very short (<200ms) and you're running a small instance with minimal concurrency, the added complexity isn't worth it. Use it when you're targeting >20 concurrent AI requests on a shared DB.

Tested on FastAPI 0.115, SQLAlchemy 2.0.36, asyncpg 0.30, Python 3.12, PostgreSQL 17