Problem: Your RAG Pipeline Returns Outdated Context
Your real-time streaming RAG system ingests fresh documents continuously—but your LLM keeps citing information that was updated or deleted hours ago. Users notice. You don't know where the staleness lives.
You'll learn:
- How to pinpoint whether staleness is in the index, cache, or embedding layer
- How to verify freshness end-to-end without rewriting your pipeline
- Which fixes are safe for production and which require downtime
Time: 25 min | Level: Advanced
Why This Happens
Streaming RAG pipelines have three distinct places where data can go stale, and they fail in different ways.
Common symptoms:
- Retrieved chunks reference facts that changed in the last N hours
- Queries return documents marked as deleted in your source system
- Embedding similarity scores drift upward over time without document changes
The root cause is usually one of: index write lag (your vector store hasn't caught up to the stream), cache serving expired embeddings, or embedding model version mismatch after a silent upgrade.
The three zones where data goes stale: stream ingestion lag, cache TTL misconfiguration, and embedding model drift
Solution
Step 1: Measure Your Index Lag
Before fixing anything, establish baseline lag between your event stream and your vector store.
import time
from datetime import datetime, timezone
def measure_index_lag(stream_client, vector_store, probe_doc: dict) -> float:
"""
Writes a probe document to the stream and polls until it appears
in the vector index. Returns lag in seconds.
"""
probe_id = f"probe-{int(time.time())}"
probe_doc["_probe_id"] = probe_id
ingested_at = time.monotonic()
# Publish probe to stream
stream_client.publish(probe_doc)
# Poll vector store until probe appears
for _ in range(60):
results = vector_store.search(
query=probe_doc["content"],
filter={"_probe_id": probe_id},
top_k=1
)
if results:
lag = time.monotonic() - ingested_at
vector_store.delete(probe_id) # Clean up probe
return lag
time.sleep(1)
raise TimeoutError("Probe document never appeared in index after 60s")
lag = measure_index_lag(stream_client, vector_store, {"content": "lag probe 2026"})
print(f"Current index lag: {lag:.1f}s")
Expected: Under 5 seconds for most managed vector stores (Pinecone, Weaviate, Qdrant). Over 30 seconds means your consumer is backlogged.
If it fails:
- TimeoutError after 60s: Your consumer group has stopped processing—check Kafka consumer lag metrics
- Probe appears instantly but queries still return stale results: The bug is downstream of indexing (cache or retrieval logic)
A healthy pipeline shows under 5s lag. This output shows 47s—the consumer is backlogged.
Step 2: Audit Your Retrieval Cache
Most production RAG systems cache embedding lookups or retrieved chunks. A misconfigured TTL is the second most common cause of staleness.
import hashlib
from typing import Optional
class AuditedRetrievalCache:
"""
Drop-in wrapper for your existing cache that logs staleness signals.
"""
def __init__(self, cache, ttl_seconds: int = 300):
self.cache = cache
self.ttl = ttl_seconds
def get(self, query: str) -> Optional[list]:
key = self._key(query)
entry = self.cache.get(key)
if entry is None:
return None
age = time.time() - entry["cached_at"]
if age > self.ttl:
# Log before evicting—helps with post-incident analysis
print(f"[CACHE EVICT] key={key[:8]} age={age:.0f}s ttl={self.ttl}s")
self.cache.delete(key)
return None
print(f"[CACHE HIT] key={key[:8]} age={age:.0f}s")
return entry["chunks"]
def set(self, query: str, chunks: list) -> None:
key = self._key(query)
self.cache.set(key, {"chunks": chunks, "cached_at": time.time()})
def _key(self, query: str) -> str:
# Normalize before hashing so "Fix bug" and "fix bug" share a cache entry
return hashlib.sha256(query.strip().lower().encode()).hexdigest()
Swap in AuditedRetrievalCache for a few minutes in staging. If you see cache hits with ages over your expected document update frequency, your TTL is too long.
Why this works: Most staleness from cache isn't a missing eviction—it's a TTL set once during initial setup and never revisited as document velocity increased.
Step 3: Detect Embedding Model Drift
If your embedding model was silently upgraded (common with hosted providers), old embeddings and new query embeddings live in different latent spaces. Similarity scores become meaningless.
import numpy as np
def detect_embedding_drift(
vector_store,
embed_fn,
sample_size: int = 100
) -> dict:
"""
Compares re-embedded samples against stored embeddings.
Cosine distance > 0.05 on average indicates model drift.
"""
stored = vector_store.sample(n=sample_size) # Returns {id, text, embedding}
distances = []
for doc in stored:
fresh_embedding = embed_fn(doc["text"])
stored_embedding = np.array(doc["embedding"])
# Cosine distance (0 = identical, 2 = opposite)
cos_sim = np.dot(fresh_embedding, stored_embedding) / (
np.linalg.norm(fresh_embedding) * np.linalg.norm(stored_embedding)
)
distances.append(1 - cos_sim)
mean_dist = np.mean(distances)
return {
"mean_cosine_distance": mean_dist,
"drift_detected": mean_dist > 0.05,
"sample_size": sample_size,
"recommendation": "Re-index all documents" if mean_dist > 0.05 else "No drift detected"
}
result = detect_embedding_drift(vector_store, embed_fn)
print(result)
Expected output (no drift):
{"mean_cosine_distance": 0.003, "drift_detected": false, "sample_size": 100}
If drift is detected: You need a full re-index. Do not do this live—spin up a shadow index, re-embed everything, then cut over with a blue/green swap on your retrieval layer.
Mean cosine distance spiked from 0.003 to 0.12 following a silent model version upgrade
Step 4: Add a Freshness Metadata Filter
Once your index is healthy, prevent future staleness by attaching and filtering on ingestion timestamps.
from datetime import datetime, timezone, timedelta
def fresh_retrieve(
query: str,
vector_store,
embed_fn,
max_age_hours: int = 24,
top_k: int = 5
) -> list:
"""
Retrieves only documents ingested within max_age_hours.
Fall back to broader window if results are sparse.
"""
cutoff = datetime.now(timezone.utc) - timedelta(hours=max_age_hours)
cutoff_ts = int(cutoff.timestamp())
query_embedding = embed_fn(query)
results = vector_store.search(
embedding=query_embedding,
filter={"ingested_at": {"$gte": cutoff_ts}},
top_k=top_k
)
if len(results) < 2:
# Sparse results—widen window but log the fallback
print(f"[FRESHNESS FALLBACK] Only {len(results)} fresh results, widening window")
results = vector_store.search(
embedding=query_embedding,
top_k=top_k
)
return results
The fallback matters: without it, you'll silently return zero context to your LLM on topics that haven't been updated recently—a worse outcome than slightly stale data.
Verification
Run this end-to-end freshness check after applying fixes:
python -c "
from your_rag import stream_client, vector_store, embed_fn, fresh_retrieve
from troubleshoot_helpers import measure_index_lag, detect_embedding_drift
lag = measure_index_lag(stream_client, vector_store, {'content': 'verification probe'})
drift = detect_embedding_drift(vector_store, embed_fn)
results = fresh_retrieve('recent system events', vector_store, embed_fn, max_age_hours=1)
print(f'Index lag: {lag:.1f}s (target: <5s)')
print(f'Embedding drift: {drift[\"drift_detected\"]} (target: False)')
print(f'Fresh results count: {len(results)} (target: >0)')
"
You should see:
Index lag: 2.3s (target: <5s)
Embedding drift: False (target: False)
Fresh results count: 4 (target: >0)
What You Learned
- Index lag, cache TTL, and embedding drift are independent failure modes—test each separately before concluding
- The probe-and-measure pattern works without mocking or altering production traffic
- Freshness filters require a fallback; sparse results with hard cutoffs silently break retrieval quality
Limitation: The drift detection here works for cosine-space models (OpenAI, Cohere, most open-source). Sparse retrieval systems (BM25, SPLADE) drift differently and need a separate approach.
When NOT to use freshness filters: Time-insensitive corpora (legal reference, historical documentation) benefit from broader retrieval windows. Forcing freshness on stable content degrades recall without improving accuracy.
Tested with Pinecone v3, Weaviate 1.24, Kafka 3.7, Python 3.12, LangChain 0.2, sentence-transformers 3.0