Handling Long-Running AI Jobs with Redis and Celery: Queue, Track, and Recover

Build a production-grade async AI task queue — Celery workers for LLM jobs, Redis for state tracking, progress streaming via WebSocket, and automatic retry on failure.

Your FastAPI endpoint times out after 30 seconds. Your LLM job takes 90. Every user hitting that endpoint gets a 504. Celery + Redis makes the job async — and your users stop rage-quitting.

You're not alone. Python is the #1 most-used language for 4 consecutive years (Stack Overflow 2025), and with 42% of new Python API projects using FastAPI (JetBrains Dev Ecosystem 2025), thousands of developers are hitting this exact wall. That sleek FastAPI endpoint that worked perfectly with your 7B parameter model crumbles when you upgrade to Llama 3.1 70B. The synchronous request-response cycle wasn't built for AI workloads that think in seconds, not milliseconds.

Let's fix this with a production-ready Celery and Redis stack that queues, tracks, and recovers long-running AI jobs. You'll get async processing, progress tracking, and resilience against failures — without rewriting your entire application.

Why Synchronous LLM Endpoints Fail Under Load (With Timeout Math)

Your cloud provider's 30-second timeout isn't arbitrary — it's a load balancer's mercy killing. When you block a worker thread for 90 seconds on an LLM inference, you're not just serving one slow request; you're crippling your entire application's capacity.

Do the math: A typical FastAPI deployment with 4 workers (Gunicorn/Uvicorn) has 4 request threads. If each LLM request takes 90 seconds:

  • Request 1 arrives: Worker 1 busy for 90s
  • Request 2 arrives at 10s: Worker 2 busy for 90s
  • Request 3 arrives at 20s: Worker 3 busy for 90s
  • Request 4 arrives at 30s: Worker 4 busy for 90s
  • Request 5 arrives at 31s: All workers busy → Queue or 503

Now imagine 10 concurrent users. Your error rate goes exponential. Even Python 3.12's 15–60% speed boost on compute-bound tasks (python.org benchmarks) won't save you — LLM inference is I/O-bound waiting on GPU/API.

The fix is architectural: decouple request acceptance from job execution. Enter Celery.

Celery + Redis Architecture for AI Task Queues

Celery is a distributed task queue that uses Redis (or RabbitMQ) as a message broker. Here's how it transforms your AI workflow:

[FastAPI Client] → [FastAPI Endpoint] → [Redis Queue] → [Celery Worker] → [LLM/GPU]
       ↑                    ↑                    ↑              ↑             ↑
    Gets task ID      Submits job          Stores task     Processes     Long-running
    polls for status  to queue             metadata        job async     computation

Redis serves dual purpose: message broker (task queue) and result backend (task status/results). For AI workloads, this separation is critical — your web tier stays responsive while workers grind through compute.

First, set up your environment with modern Python tooling:


uv init ai-queue-project
cd ai-queue-project
uv add "celery[redis]" fastapi "pydantic>=2.0" redis uvicorn

# Install dev tools
uv add --dev ruff mypy pytest

Now create your Celery app configuration:

# celery_app.py
from celery import Celery
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    redis_url: str = "redis://localhost:6379/0"
    model_cache_dir: str = "./model_cache"
    
    class Config:
        env_file = ".env"

settings = Settings()

# Create Celery instance
celery_app = Celery(
    "ai_worker",
    broker=settings.redis_url,
    backend=settings.redis_url,
    include=["ai_tasks"]
)

# Configure Celery
celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,  # Critical for progress tracking
    task_time_limit=300,  # 5 minute hard limit per task
    worker_max_tasks_per_child=100,  # Prevent memory leaks
    worker_prefetch_multiplier=1,  # Fair scheduling for long tasks
)

FastAPI Integration: Submitting Jobs and Returning Task IDs

Your FastAPI endpoint becomes a job submission portal, not an execution engine. Here's the complete integration:

# main.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
from typing import Optional
from celery.result import AsyncResult

from celery_app import celery_app, settings
from ai_tasks import process_llm_request

app = FastAPI(title="Async LLM Queue")

class LLMRequest(BaseModel):
    prompt: str = Field(..., min_length=1, max_length=4000)
    model: str = Field("llama3.1-70b")
    temperature: float = Field(0.7, ge=0.0, le=2.0)
    max_tokens: int = Field(1024, ge=1, le=4096)

class TaskResponse(BaseModel):
    task_id: str
    status_url: str

@app.post("/generate", response_model=TaskResponse)
async def submit_llm_job(request: LLMRequest) -> TaskResponse:
    """
    Submit an LLM generation job to the queue.
    Returns immediately with a task ID for status checking.
    """
    # Submit task to Celery
    task = process_llm_request.delay(
        prompt=request.prompt,
        model=request.model,
        temperature=request.temperature,
        max_tokens=request.max_tokens
    )
    
    return TaskResponse(
        task_id=task.id,
        status_url=f"/tasks/{task.id}/status"
    )

@app.get("/tasks/{task_id}/status")
async def get_task_status(task_id: str):
    """Check the status and result of a submitted task."""
    task_result = AsyncResult(task_id, app=celery_app)
    
    response = {
        "task_id": task_id,
        "status": task_result.status,
        "result": None,
        "error": None,
    }
    
    if task_result.status == "SUCCESS":
        response["result"] = task_result.result
    elif task_result.status == "FAILURE":
        response["error"] = str(task_result.result)
        # task_result.traceback for full trace in debug
        
    return response

The key insight: process_llm_request.delay() returns immediately with a task ID. Your endpoint response time drops from 90 seconds to <100ms.

Progress Tracking: Updating Job State in Redis During LLM Streaming

LLM inference isn't binary — users want to see tokens streaming. Celery's default task states (PENDING, STARTED, SUCCESS, FAILURE) are too coarse. We need custom progress tracking.

Here's how to implement real-time progress updates using Redis directly:

# ai_tasks.py
import json
import time
from typing import Dict, Any
import redis
from celery_app import celery_app, settings

# Connect to Redis for progress updates
redis_client = redis.from_url(settings.redis_url)

@celery_app.task(bind=True)
def process_llm_request(self, prompt: str, model: str, 
                        temperature: float, max_tokens: int) -> Dict[str, Any]:
    """
    Long-running LLM processing task with progress tracking.
    """
    task_id = self.request.id
    
    # Update task metadata in Redis
    def update_progress(step: str, progress: float, metadata: Dict = None):
        progress_data = {
            "task_id": task_id,
            "step": step,
            "progress": progress,
            "timestamp": time.time(),
            "metadata": metadata or {}
        }
        # Store in Redis with 10 minute expiry
        redis_client.setex(
            f"task:progress:{task_id}",
            600,  # 10 minutes
            json.dumps(progress_data)
        )
    
    try:
        # Step 1: Load model (0-30% progress)
        update_progress("loading_model", 0.1)
        # Simulated model loading
        time.sleep(2)
        update_progress("model_loaded", 0.3, {"model": model})
        
        # Step 2: Generate tokens with streaming (30-100% progress)
        # In reality, you'd hook into your LLM's streaming callback
        generated_text = ""
        total_tokens = 100  # Example total
        for i in range(total_tokens):
            # Simulate token generation
            time.sleep(0.05)
            generated_text += f" token{i}"
            
            # Update progress every 10 tokens
            if i % 10 == 0:
                progress = 0.3 + (0.7 * (i / total_tokens))
                update_progress(
                    "generating",
                    progress,
                    {"tokens_generated": i, "text_so_far": generated_text}
                )
        
        update_progress("complete", 1.0, {"total_tokens": total_tokens})
        
        return {
            "text": generated_text,
            "model": model,
            "tokens_generated": total_tokens
        }
        
    except Exception as e:
        update_progress("failed", 0.0, {"error": str(e)})
        raise  # Celery will handle retry logic

Now add a progress endpoint to FastAPI:

@app.get("/tasks/{task_id}/progress")
async def get_task_progress(task_id: str):
    """Get real-time progress updates for a task."""
    progress_data = redis_client.get(f"task:progress:{task_id}")
    
    if not progress_data:
        raise HTTPException(
            status_code=404,
            detail="Task progress not found or expired"
        )
    
    return json.loads(progress_data)

Streaming Results to the Client via WebSocket or SSE

Polling /progress works, but Server-Sent Events (SSE) provide better UX. Here's an SSE implementation:

# streaming.py
from fastapi import Response
from sse_starlette.sse import EventSourceResponse
import json
import asyncio
import redis.asyncio as aioredis

async def stream_task_progress(task_id: str):
    """SSE stream for task progress updates."""
    redis_client = aioredis.from_url(settings.redis_url)
    
    last_progress = 0
    while True:
        # Check task completion first
        task_result = AsyncResult(task_id, app=celery_app)
        if task_result.ready():
            if task_result.status == "SUCCESS":
                yield {
                    "event": "complete",
                    "data": json.dumps(task_result.result)
                }
            else:
                yield {
                    "event": "error", 
                    "data": json.dumps({"error": str(task_result.result)})
                }
            break
        
        # Check for progress updates
        progress_data = await redis_client.get(f"task:progress:{task_id}")
        if progress_data:
            data = json.loads(progress_data)
            if data["progress"] > last_progress:
                yield {
                    "event": "progress",
                    "data": json.dumps(data)
                }
                last_progress = data["progress"]
        
        await asyncio.sleep(0.5)  # Poll interval

@app.get("/tasks/{task_id}/stream")
async def stream_progress(task_id: str):
    """Server-Sent Events stream for real-time progress."""
    return EventSourceResponse(stream_task_progress(task_id))

Client-side JavaScript can now subscribe to real-time updates:

// Client-side example
const eventSource = new EventSource(`/tasks/${taskId}/stream`);

eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    updateUI(data); // Your update function
};

eventSource.onerror = () => {
    eventSource.close();
    showError("Connection lost");
};

Retry Logic: Exponential Backoff for Rate-Limited LLM Calls

When calling external LLM APIs (OpenAI, Anthropic, etc.), rate limits are inevitable. Celery's retry mechanism with exponential backoff saves you:

@celery_app.task(bind=True, max_retries=5)
def call_external_llm_api(self, prompt: str, api_key: str) -> str:
    """
    Task with automatic retry for rate limits and transient failures.
    """
    import requests
    from requests.exceptions import RequestException
    
    try:
        response = requests.post(
            "https://api.openai.com/v1/chat/completions",
            headers={"Authorization": f"Bearer {api_key}"},
            json={"model": "gpt-4", "messages": [{"role": "user", "content": prompt}]},
            timeout=30
        )
        
        if response.status_code == 429:  # Rate limited
            # Exponential backoff: 4s, 16s, 64s, 256s, 1024s
            retry_delay = 4 ** (self.request.retries + 1)
            raise self.retry(
                countdown=retry_delay,
                exc=Exception(f"Rate limited, retrying in {retry_delay}s")
            )
        
        response.raise_for_status()
        return response.json()["choices"][0]["message"]["content"]
        
    except RequestException as exc:
        # Network errors get retried with exponential backoff
        raise self.retry(
            countdown=2 ** self.request.retries,  # 2, 4, 8, 16, 32
            exc=exc
        )

Common Error Fix: If you see ModuleNotFoundError: No module named 'requests', check your venv activation and run uv add requests in the correct environment.

Configure Celery-wide retry policies in your settings:

celery_app.conf.update(
    task_acks_late=True,  # Don't ack until task completes
    task_reject_on_worker_lost=True,  # Retry if worker dies
    task_compression="gzip",  # Compress large prompts
    broker_transport_options={
        "visibility_timeout": 3600,  # 1 hour for long tasks
        "fanout_prefix": True,
    },
)

Monitoring: Flower Dashboard for Celery Worker Health

You can't manage what you can't measure. Flower is Celery's real-time monitoring tool that shows:

  • Active, scheduled, and reserved tasks
  • Worker status and resource usage
  • Task history and statistics
  • Rate of task processing

Install and run it alongside your stack:

uv add flower
flower -A celery_app --port=5555

Access it at http://localhost:5555 to see your task queue in action. For production, add authentication and run behind your web server.

Performance Comparison: Task Queue vs Synchronous

MetricSynchronous EndpointCelery + Redis Queue
Request latency90s (full processing)<100ms (job submission)
Error rate under loadExponential growthNear-zero (queued)
Resource utilizationWorkers blockedWorkers efficient
User experienceTimeout errorsProgress indicators
Failure recoveryManual restartAutomatic retry
ScalabilityVertical scaling onlyHorizontal scaling

The numbers speak for themselves. FastAPI handles ~50,000 req/s on a 4-core machine vs Flask's ~8,000 req/s, but only if you don't block those workers with 90-second LLM calls.

Next Steps: Production Deployment Patterns

You've got the basics working locally. Now let's talk production:

  1. Multiple Queues for Priority: Separate urgent vs batch jobs

    celery_app.conf.task_routes = {
        'ai_tasks.high_priority': {'queue': 'high_priority'},
        'ai_tasks.batch': {'queue': 'batch'},
    }
    
  2. Redis Persistence: Enable AOF persistence so tasks survive restarts

    # redis.conf
    appendonly yes
    appendfsync everysec
    
  3. Health Checks: Implement /health endpoints for all services

    @app.get("/health")
    async def health_check():
        # Check Redis connection
        try:
            redis_client.ping()
            return {"status": "healthy", "services": ["redis", "celery"]}
        except redis.ConnectionError:
            raise HTTPException(status_code=503, detail="Redis unavailable")
    
  4. Memory Management: Monitor for MemoryError with large DataFrames — fix by using chunked reading with chunksize or switch to Polars for better memory efficiency.

  5. Deployment: Use Docker Compose or Kubernetes with resource limits:

    # docker-compose.yml
    version: '3.8'
    services:
      redis:
        image: redis:7-alpine
        command: redis-server --appendonly yes
    
      worker:
        build: .
        command: celery -A celery_app worker --loglevel=info
        deploy:
          resources:
            limits:
              memory: 8G
    

Final Error Fix: If you encounter TypeError: 'NoneType' object is not subscriptable when accessing task results, add a None guard: if result and 'key' in result: value = result['key'].

Your AI application is now production-ready. Users get instant feedback, jobs queue gracefully under load, and you can sleep through the night knowing failed tasks will retry automatically. The queue handles the chaos so you don't have to.

Remember: Async isn't just about performance — it's about user experience. A progress bar beats a loading spinner, and a queued job beats a 504 error every time. Now go deploy that Llama 3.1 70B model without taking down your entire API.