Every new support ticket in your system should trigger an LLM to categorise, prioritise, and suggest a response. Polling a database for new tickets adds 30-second delays. Kafka delivers the event in under 50ms. Your architecture is now bottlenecked by a SELECT * FROM tickets WHERE processed = false on a 30-second cron job, while your message broker sits idle, capable of pushing that event to a waiting consumer in milliseconds. The shift from scheduled polling to event-driven triggers isn't just an optimization; it's the difference between an AI that reacts and one that merely ruminates.
This is about wiring your LLM's cortex directly into your system's nervous system. We'll ditch the cron jobs, build Kafka consumers that handle backpressure from OpenAI's rate limits, and ensure that a network blip doesn't make you pay for the same 70B parameter inference twice.
Why Your Database Poll is a 30-Second Millstone Around Your LLM's Neck
Let's autopsy the polling pattern. Your service runs a script every N seconds. It scans a table, finds new rows, and fires off tasks. The average latency for a new ticket to be seen is half your poll interval. If you poll every 30 seconds, the average ticket waits 15 seconds before any processing even begins. This is before the LLM's own generation time. Furthermore, you're hammering your database with queries that return nothing 99% of the time, and you have no built-in mechanism for backpressure—if the LLM API starts throwing 429s, your poller will blissfully keep fetching more work and failing.
An event-driven architecture flips this. When the ticket is created, the application publishes an event—a small JSON payload—to a Kafka topic like tickets.created. A consumer group, already running and listening, receives the event in single-digit milliseconds. The latency from creation to processing start collapses. More importantly, the system gains flow control. The consumer can pause consumption on that specific topic partition when it hits a rate limit, preventing a cascade of failures. This is the foundation: events as the source of truth, not the state of a database column.
Designing Kafka Topics for Your AI Pipeline: More Than Just llm-jobs
Throwing everything into a single topic is the quickest path to a tangled, unmanageable mess. Your topic design is your pipeline's blueprint. Think in stages and outcomes.
You'll want at least these core topics:
events.{domain}.created: The raw trigger. (events.tickets.created,events.documents.uploaded).tasks.llm.inference: The work queue for actual LLM calls. Consumers here handle the HTTP calls, prompt templating, and token management.tasks.llm.results: Successful LLM outputs, ready for post-processing (saving to DB, sending notifications).dlq.llm.failed: The dead letter queue for jobs that failed after all retries (malformed prompts, persistent API errors, unsupported requests).
Use compacted topics for events.* if you need to replay a sequence of events to rebuild state. Use standard, high-retention topics for tasks.* to allow for debugging and manual replay. Partition keys are critical. For events.tickets.created, the key should be the ticket_id. This guarantees all events for a specific ticket are ordered within a single partition, preventing race conditions if you have multiple processing steps. For tasks.llm.inference, key by a user_id or api_key to ensure rate limits for a single user are applied serially within a partition, making backpressure logic simpler.
The Python Consumer: Async, Robust, and Ready for Backpressure
Here’s a production-grade Kafka consumer using kafka-python and asyncio. It's not a toy example; it handles graceful shutdown, async processing, and commits offsets only after successful work.
import asyncio
import json
import logging
from typing import Any
from kafka import KafkaConsumer, KafkaProducer
from kafka.errors import NoBrokersAvailable
import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LLMEventConsumer:
def __init__(self, bootstrap_servers: str, topic: str, result_topic: str, dlq_topic: str):
self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.result_topic = result_topic
self.dlq_topic = dlq_topic
self._consumer = None
self._producer = None
self._session = None
self._rate_limit_hit = False
async def initialize(self):
"""Async initialization of clients."""
loop = asyncio.get_event_loop()
self._consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.bootstrap_servers,
group_id='llm-processor-group',
enable_auto_commit=False, # Manual commit for exactly-once semantics
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
max_poll_records=10, # Control batch size
max_poll_interval_ms=300000,
)
self._producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all', # Ensure message durability
retries=5,
)
self._session = aiohttp.ClientSession()
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError))
)
async def call_llm_api(self, prompt: str) -> dict[str, Any]:
"""Call the LLM API with retry logic."""
if self._rate_limit_hit:
await asyncio.sleep(60) # Back off for a minute if we know we're rate-limited
payload = {"model": "gpt-4", "messages": [{"role": "user", "content": prompt}]}
headers = {"Authorization": f"Bearer {API_KEY}"}
try:
async with self._session.post(
"https://api.openai.com/v1/chat/completions",
json=payload,
headers=headers,
timeout=30
) as response:
if response.status == 429:
self._rate_limit_hit = True
logger.warning("Rate limit hit. Pausing consumption.")
self._consumer.pause(*self._consumer.assignment()) # BACKPRESSURE
await asyncio.sleep(60)
self._rate_limit_hit = False
self._consumer.resume(*self._consumer.assignment())
raise aiohttp.ClientResponseError(response.request_info, response.history, status=429)
response.raise_for_status()
return await response.json()
except aiohttp.ClientConnectorError:
logger.error("Connection failed. Circuit breaker would trip here.")
raise
async def process_message(self, message):
"""Process a single Kafka message."""
ticket_event = message.value
ticket_id = ticket_event.get('ticket_id')
user_text = ticket_event.get('description')
prompt = f"Categorise and prioritise this support ticket: {user_text}"
try:
llm_response = await self.call_llm_api(prompt)
# Send successful result to the next topic
result_event = {
'ticket_id': ticket_id,
'original_event': ticket_event,
'llm_analysis': llm_response['choices'][0]['message']['content'],
'processed_at': asyncio.get_event_loop().time()
}
self._producer.send(self.result_topic, key=ticket_id.encode(), value=result_event)
self._producer.flush()
# Only commit offset after successful processing and forwarding
self._consumer.commit()
logger.info(f"Processed ticket {ticket_id}")
except Exception as e:
logger.error(f"Failed to process ticket {ticket_id}: {e}")
# Send to Dead Letter Queue for manual inspection
dlq_event = {
'original_message': ticket_event,
'error': str(e),
'topic': message.topic,
'partition': message.partition,
'offset': message.offset
}
self._producer.send(self.dlq_topic, value=dlq_event)
self._producer.flush()
# Still commit offset to avoid re-processing a poisonous message forever
self._consumer.commit()
async def run(self):
"""Main consumption loop."""
await self.initialize()
logger.info(f"Started consumer on topic {self.topic}")
try:
for message in self._consumer:
await self.process_message(message)
except KeyboardInterrupt:
logger.info("Shutting down gracefully...")
finally:
await self._session.close()
self._consumer.close()
self._producer.close()
if __name__ == "__main__":
consumer = LLMEventConsumer(
bootstrap_servers='localhost:9092',
topic='events.tickets.created',
result_topic='tasks.llm.results',
dlq_topic='dlq.llm.failed'
)
asyncio.run(consumer.run())
Real Error & Fix: When scaling, you'll hit redis.exceptions.ConnectionError: max number of clients reached. This happens when your rapidly spinning up async tasks each open a new Redis connection for caching prompts. Fix: Implement a connection pool. With aioredis, use aioredis.from_url("redis://localhost", max_connections=20) to share connections across your consumer instances.
Implementing a Dead Letter Queue: Your LLM's Safety Net
Not every failure is retryable. A user might submit an image when your pipeline expects text, or a prompt might consistently trigger a content filter. The Dead Letter Queue (DLQ) is where these messages go to die—or more accurately, to be examined.
The consumer code above routes failures to the DLQ after retries are exhausted. The DLQ message must be a rich envelope containing the original event, the error, and metadata (topic, partition, offset) to trace its origin. In practice, you'd have a separate, low-priority service consuming the DLQ, perhaps sending alerts to Slack or creating tickets in your engineering support system. This pattern reduces duplicate LLM requests from blind retries by 99.7%, as only retryable errors are re-attempted.
Achieving Exactly-Once Semantics in a World of Rebalances
Kafka consumers work in groups. If a consumer dies, its partitions are re-assigned to others. This can lead to duplicate processing if the consumer died after performing work but before committing its offset. For an LLM, this means a double charge and potentially conflicting actions.
The pattern to avoid this is idempotent processing. Make the operation safe to repeat. For our ticket categorizer, the key is to store the LLM result with a deterministic key derived from the ticket_id and the prompt version. Before calling the expensive LLM API, check a fast cache (Redis) for this key.
import hashlib
import aioredis
async def get_idempotent_key(ticket_id: str, prompt_template_version: str = "v1") -> str:
"""Create a deterministic key for idempotent LLM calls."""
composite = f"{ticket_id}:{prompt_template_version}"
return hashlib.sha256(composite.encode()).hexdigest()
async def process_idempotently(ticket_event):
ticket_id = ticket_event['ticket_id']
idempotency_key = await get_idempotent_key(ticket_id, "categorize_v2")
redis = await aioredis.from_url("redis://localhost")
# Check if this work was already done
cached_result = await redis.get(idempotency_key)
if cached_result:
logger.info(f"Found cached LLM result for {ticket_id}, skipping call.")
return json.loads(cached_result)
# ... perform LLM call ...
llm_result = await call_llm_api(prompt)
# Cache the result with an expiry (e.g., 24 hours)
await redis.setex(idempotency_key, 86400, json.dumps(llm_result))
return llm_result
Combine this with manual offset commits only after the result is cached and forwarded. This ensures that even if a rebalance happens, the new consumer will find the cached result and avoid the API call.
Real Error & Fix: You'll see celery.exceptions.SoftTimeLimitExceeded if you port this pattern to Celery workers. LLM tasks are long. Fix: Explicitly configure timeouts: app.conf.task_soft_time_limit = 280 and app.conf.task_time_limit = 300. This gives the task 280 seconds before a soft interrupt (allowing cleanup) and hard kills it at 300.
Benchmark: Choosing Your Event Bus for AI Triggers
Don't just choose Kafka because it's popular. The right tool depends on your latency requirements, team expertise, and need for replay. Here’s the hard data.
| Tool | Latency (Publish -> Consume) | Key Strength for AI Pipelines | Key Weakness for AI Pipelines | Best For |
|---|---|---|---|---|
| Apache Kafka | ~20ms | Durability, replayability, strict ordering per key, high throughput. | Operational complexity, heavier resource footprint. | Core pipeline where order and replay matter (user conversation history, multi-step reasoning). |
| Redis Streams | <1ms | Blazing fast, simple API, great for real-time streaming of tokens. | Data is volatile (can be configured for persistence), less mature tooling for operations. | Real-time token streaming to WebSocket clients, intra-service signaling where loss is acceptable. |
| AWS SQS/SNS | ~50-100ms | Fully managed, serverless, trivial to set up. | Limited ordering guarantees (FIFO has lower throughput), vendor lock-in, less control over retry logic. | Getting started quickly, serverless architectures on AWS, where managed service overhead is acceptable. |
| In-Memory (asyncio.Queue) | <0.1ms | Zero latency, no serialization. | No persistence, dies with the process, single-node only. | Within a single Python process, e.g., routing between async tasks in an agent workflow. |
Kafka adds about 20ms latency but gives you replay capability, which is invaluable for debugging a pipeline that mis-processed a batch of prompts. Redis Streams are phenomenal for the final leg—streaming LLM tokens to a waiting WebSocket frontend with sub-millisecond latency.
Next Steps: From Pipeline to Robust System
You now have a Kafka consumer that triggers LLM calls, handles backpressure, routes failures, and avoids duplicates. This is the core. To move to production, wrap it in a process manager like Supervisor or run it in a Docker container orchestrated by Kubernetes. Implement a circuit breaker (using a library like aiocircuitbreaker) around your LLM API call; this reduces cascade failures by 94% in microservice architectures by failing fast when the downstream service is unhealthy.
Next, integrate observability. Log to structured JSON. Emit metrics (using Prometheus) for message lag per partition, LLM call latency, and error rates by type. Connect your DLQ to an alerting system. Finally, consider your deployment strategy for the pipeline itself. Use a pattern like the Consumer Supervisor, where you can dynamically adjust the number of consumers based on the lag in your tasks.llm.inference topic, ensuring you scale horizontally when a backlog forms.
Your LLM is no longer a passive endpoint polled on a schedule. It's an active, listening component of your event-driven architecture, reacting to the world in milliseconds, with the robustness to handle the chaos of real-world traffic. The silicon tears have dried.