Your FastAPI endpoint times out after 30 seconds. Your LLM job takes 90. Every user hitting that endpoint gets a 504. Celery + Redis makes the job async — and your users stop rage-quitting.
You're not alone. Python is the #1 most-used language for 4 consecutive years (Stack Overflow 2025), and with 42% of new Python API projects using FastAPI (JetBrains Dev Ecosystem 2025), thousands of developers are hitting this exact wall. That sleek FastAPI endpoint that worked perfectly with your 7B parameter model crumbles when you upgrade to Llama 3.1 70B. The synchronous request-response cycle wasn't built for AI workloads that think in seconds, not milliseconds.
Let's fix this with a production-ready Celery and Redis stack that queues, tracks, and recovers long-running AI jobs. You'll get async processing, progress tracking, and resilience against failures — without rewriting your entire application.
Why Synchronous LLM Endpoints Fail Under Load (With Timeout Math)
Your cloud provider's 30-second timeout isn't arbitrary — it's a load balancer's mercy killing. When you block a worker thread for 90 seconds on an LLM inference, you're not just serving one slow request; you're crippling your entire application's capacity.
Do the math: A typical FastAPI deployment with 4 workers (Gunicorn/Uvicorn) has 4 request threads. If each LLM request takes 90 seconds:
- Request 1 arrives: Worker 1 busy for 90s
- Request 2 arrives at 10s: Worker 2 busy for 90s
- Request 3 arrives at 20s: Worker 3 busy for 90s
- Request 4 arrives at 30s: Worker 4 busy for 90s
- Request 5 arrives at 31s: All workers busy → Queue or 503
Now imagine 10 concurrent users. Your error rate goes exponential. Even Python 3.12's 15–60% speed boost on compute-bound tasks (python.org benchmarks) won't save you — LLM inference is I/O-bound waiting on GPU/API.
The fix is architectural: decouple request acceptance from job execution. Enter Celery.
Celery + Redis Architecture for AI Task Queues
Celery is a distributed task queue that uses Redis (or RabbitMQ) as a message broker. Here's how it transforms your AI workflow:
[FastAPI Client] → [FastAPI Endpoint] → [Redis Queue] → [Celery Worker] → [LLM/GPU]
↑ ↑ ↑ ↑ ↑
Gets task ID Submits job Stores task Processes Long-running
polls for status to queue metadata job async computation
Redis serves dual purpose: message broker (task queue) and result backend (task status/results). For AI workloads, this separation is critical — your web tier stays responsive while workers grind through compute.
First, set up your environment with modern Python tooling:
uv init ai-queue-project
cd ai-queue-project
uv add "celery[redis]" fastapi "pydantic>=2.0" redis uvicorn
# Install dev tools
uv add --dev ruff mypy pytest
Now create your Celery app configuration:
# celery_app.py
from celery import Celery
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
redis_url: str = "redis://localhost:6379/0"
model_cache_dir: str = "./model_cache"
class Config:
env_file = ".env"
settings = Settings()
# Create Celery instance
celery_app = Celery(
"ai_worker",
broker=settings.redis_url,
backend=settings.redis_url,
include=["ai_tasks"]
)
# Configure Celery
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True, # Critical for progress tracking
task_time_limit=300, # 5 minute hard limit per task
worker_max_tasks_per_child=100, # Prevent memory leaks
worker_prefetch_multiplier=1, # Fair scheduling for long tasks
)
FastAPI Integration: Submitting Jobs and Returning Task IDs
Your FastAPI endpoint becomes a job submission portal, not an execution engine. Here's the complete integration:
# main.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
from typing import Optional
from celery.result import AsyncResult
from celery_app import celery_app, settings
from ai_tasks import process_llm_request
app = FastAPI(title="Async LLM Queue")
class LLMRequest(BaseModel):
prompt: str = Field(..., min_length=1, max_length=4000)
model: str = Field("llama3.1-70b")
temperature: float = Field(0.7, ge=0.0, le=2.0)
max_tokens: int = Field(1024, ge=1, le=4096)
class TaskResponse(BaseModel):
task_id: str
status_url: str
@app.post("/generate", response_model=TaskResponse)
async def submit_llm_job(request: LLMRequest) -> TaskResponse:
"""
Submit an LLM generation job to the queue.
Returns immediately with a task ID for status checking.
"""
# Submit task to Celery
task = process_llm_request.delay(
prompt=request.prompt,
model=request.model,
temperature=request.temperature,
max_tokens=request.max_tokens
)
return TaskResponse(
task_id=task.id,
status_url=f"/tasks/{task.id}/status"
)
@app.get("/tasks/{task_id}/status")
async def get_task_status(task_id: str):
"""Check the status and result of a submitted task."""
task_result = AsyncResult(task_id, app=celery_app)
response = {
"task_id": task_id,
"status": task_result.status,
"result": None,
"error": None,
}
if task_result.status == "SUCCESS":
response["result"] = task_result.result
elif task_result.status == "FAILURE":
response["error"] = str(task_result.result)
# task_result.traceback for full trace in debug
return response
The key insight: process_llm_request.delay() returns immediately with a task ID. Your endpoint response time drops from 90 seconds to <100ms.
Progress Tracking: Updating Job State in Redis During LLM Streaming
LLM inference isn't binary — users want to see tokens streaming. Celery's default task states (PENDING, STARTED, SUCCESS, FAILURE) are too coarse. We need custom progress tracking.
Here's how to implement real-time progress updates using Redis directly:
# ai_tasks.py
import json
import time
from typing import Dict, Any
import redis
from celery_app import celery_app, settings
# Connect to Redis for progress updates
redis_client = redis.from_url(settings.redis_url)
@celery_app.task(bind=True)
def process_llm_request(self, prompt: str, model: str,
temperature: float, max_tokens: int) -> Dict[str, Any]:
"""
Long-running LLM processing task with progress tracking.
"""
task_id = self.request.id
# Update task metadata in Redis
def update_progress(step: str, progress: float, metadata: Dict = None):
progress_data = {
"task_id": task_id,
"step": step,
"progress": progress,
"timestamp": time.time(),
"metadata": metadata or {}
}
# Store in Redis with 10 minute expiry
redis_client.setex(
f"task:progress:{task_id}",
600, # 10 minutes
json.dumps(progress_data)
)
try:
# Step 1: Load model (0-30% progress)
update_progress("loading_model", 0.1)
# Simulated model loading
time.sleep(2)
update_progress("model_loaded", 0.3, {"model": model})
# Step 2: Generate tokens with streaming (30-100% progress)
# In reality, you'd hook into your LLM's streaming callback
generated_text = ""
total_tokens = 100 # Example total
for i in range(total_tokens):
# Simulate token generation
time.sleep(0.05)
generated_text += f" token{i}"
# Update progress every 10 tokens
if i % 10 == 0:
progress = 0.3 + (0.7 * (i / total_tokens))
update_progress(
"generating",
progress,
{"tokens_generated": i, "text_so_far": generated_text}
)
update_progress("complete", 1.0, {"total_tokens": total_tokens})
return {
"text": generated_text,
"model": model,
"tokens_generated": total_tokens
}
except Exception as e:
update_progress("failed", 0.0, {"error": str(e)})
raise # Celery will handle retry logic
Now add a progress endpoint to FastAPI:
@app.get("/tasks/{task_id}/progress")
async def get_task_progress(task_id: str):
"""Get real-time progress updates for a task."""
progress_data = redis_client.get(f"task:progress:{task_id}")
if not progress_data:
raise HTTPException(
status_code=404,
detail="Task progress not found or expired"
)
return json.loads(progress_data)
Streaming Results to the Client via WebSocket or SSE
Polling /progress works, but Server-Sent Events (SSE) provide better UX. Here's an SSE implementation:
# streaming.py
from fastapi import Response
from sse_starlette.sse import EventSourceResponse
import json
import asyncio
import redis.asyncio as aioredis
async def stream_task_progress(task_id: str):
"""SSE stream for task progress updates."""
redis_client = aioredis.from_url(settings.redis_url)
last_progress = 0
while True:
# Check task completion first
task_result = AsyncResult(task_id, app=celery_app)
if task_result.ready():
if task_result.status == "SUCCESS":
yield {
"event": "complete",
"data": json.dumps(task_result.result)
}
else:
yield {
"event": "error",
"data": json.dumps({"error": str(task_result.result)})
}
break
# Check for progress updates
progress_data = await redis_client.get(f"task:progress:{task_id}")
if progress_data:
data = json.loads(progress_data)
if data["progress"] > last_progress:
yield {
"event": "progress",
"data": json.dumps(data)
}
last_progress = data["progress"]
await asyncio.sleep(0.5) # Poll interval
@app.get("/tasks/{task_id}/stream")
async def stream_progress(task_id: str):
"""Server-Sent Events stream for real-time progress."""
return EventSourceResponse(stream_task_progress(task_id))
Client-side JavaScript can now subscribe to real-time updates:
// Client-side example
const eventSource = new EventSource(`/tasks/${taskId}/stream`);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
updateUI(data); // Your update function
};
eventSource.onerror = () => {
eventSource.close();
showError("Connection lost");
};
Retry Logic: Exponential Backoff for Rate-Limited LLM Calls
When calling external LLM APIs (OpenAI, Anthropic, etc.), rate limits are inevitable. Celery's retry mechanism with exponential backoff saves you:
@celery_app.task(bind=True, max_retries=5)
def call_external_llm_api(self, prompt: str, api_key: str) -> str:
"""
Task with automatic retry for rate limits and transient failures.
"""
import requests
from requests.exceptions import RequestException
try:
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {api_key}"},
json={"model": "gpt-4", "messages": [{"role": "user", "content": prompt}]},
timeout=30
)
if response.status_code == 429: # Rate limited
# Exponential backoff: 4s, 16s, 64s, 256s, 1024s
retry_delay = 4 ** (self.request.retries + 1)
raise self.retry(
countdown=retry_delay,
exc=Exception(f"Rate limited, retrying in {retry_delay}s")
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
except RequestException as exc:
# Network errors get retried with exponential backoff
raise self.retry(
countdown=2 ** self.request.retries, # 2, 4, 8, 16, 32
exc=exc
)
Common Error Fix: If you see ModuleNotFoundError: No module named 'requests', check your venv activation and run uv add requests in the correct environment.
Configure Celery-wide retry policies in your settings:
celery_app.conf.update(
task_acks_late=True, # Don't ack until task completes
task_reject_on_worker_lost=True, # Retry if worker dies
task_compression="gzip", # Compress large prompts
broker_transport_options={
"visibility_timeout": 3600, # 1 hour for long tasks
"fanout_prefix": True,
},
)
Monitoring: Flower Dashboard for Celery Worker Health
You can't manage what you can't measure. Flower is Celery's real-time monitoring tool that shows:
- Active, scheduled, and reserved tasks
- Worker status and resource usage
- Task history and statistics
- Rate of task processing
Install and run it alongside your stack:
uv add flower
flower -A celery_app --port=5555
Access it at http://localhost:5555 to see your task queue in action. For production, add authentication and run behind your web server.
Performance Comparison: Task Queue vs Synchronous
| Metric | Synchronous Endpoint | Celery + Redis Queue |
|---|---|---|
| Request latency | 90s (full processing) | <100ms (job submission) |
| Error rate under load | Exponential growth | Near-zero (queued) |
| Resource utilization | Workers blocked | Workers efficient |
| User experience | Timeout errors | Progress indicators |
| Failure recovery | Manual restart | Automatic retry |
| Scalability | Vertical scaling only | Horizontal scaling |
The numbers speak for themselves. FastAPI handles ~50,000 req/s on a 4-core machine vs Flask's ~8,000 req/s, but only if you don't block those workers with 90-second LLM calls.
Next Steps: Production Deployment Patterns
You've got the basics working locally. Now let's talk production:
Multiple Queues for Priority: Separate urgent vs batch jobs
celery_app.conf.task_routes = { 'ai_tasks.high_priority': {'queue': 'high_priority'}, 'ai_tasks.batch': {'queue': 'batch'}, }Redis Persistence: Enable AOF persistence so tasks survive restarts
# redis.conf appendonly yes appendfsync everysecHealth Checks: Implement
/healthendpoints for all services@app.get("/health") async def health_check(): # Check Redis connection try: redis_client.ping() return {"status": "healthy", "services": ["redis", "celery"]} except redis.ConnectionError: raise HTTPException(status_code=503, detail="Redis unavailable")Memory Management: Monitor for
MemoryError with large DataFrames— fix by using chunked reading with chunksize or switch to Polars for better memory efficiency.Deployment: Use Docker Compose or Kubernetes with resource limits:
# docker-compose.yml version: '3.8' services: redis: image: redis:7-alpine command: redis-server --appendonly yes worker: build: . command: celery -A celery_app worker --loglevel=info deploy: resources: limits: memory: 8G
Final Error Fix: If you encounter TypeError: 'NoneType' object is not subscriptable when accessing task results, add a None guard: if result and 'key' in result: value = result['key'].
Your AI application is now production-ready. Users get instant feedback, jobs queue gracefully under load, and you can sleep through the night knowing failed tasks will retry automatically. The queue handles the chaos so you don't have to.
Remember: Async isn't just about performance — it's about user experience. A progress bar beats a loading spinner, and a queued job beats a 504 error every time. Now go deploy that Llama 3.1 70B model without taking down your entire API.