Problem: RAG Demos Work, Production Pipelines Break
Most Gemini + LangChain tutorials get you to a working chatbot in 50 lines. Then you hit production: retrieval returns irrelevant chunks, the model hallucinates on edge cases, latency spikes under load, and there's no visibility into what's failing.
This guide skips the demo. You'll build a pipeline designed to survive real workloads.
You'll learn:
- How to configure
ChatGoogleGenerativeAIandGoogleGenerativeAIEmbeddingscorrectly for Gemini 2.0 Flash - How to structure a retrieval chain with reranking and metadata filtering
- How to add streaming, retry logic, and LangSmith tracing from the start
Time: 25 min | Difficulty: Intermediate
Why Gemini 2.0 Flash for RAG
Gemini 2.0 Flash has a 1M token context window, which changes RAG tradeoffs — you can stuff more context per call and reduce retrieval errors. It also costs significantly less than GPT-4o at equivalent quality for retrieval-augmented tasks.
The catch: LangChain's langchain-google-genai package has several configuration footguns that aren't obvious from the docs. This guide covers them.
Solution
Step 1: Install Dependencies
# Use uv for fast installs (pip works too)
uv add langchain langchain-google-genai langchain-community chromadb tiktoken
# For LangSmith tracing (recommended for production)
uv add langsmith
Verify the key packages:
python -c "import langchain_google_genai; print(langchain_google_genai.__version__)"
# Expected: 2.x.x
If you see ModuleNotFoundError: langchain-google-genai is a separate package from langchain-community. Install it explicitly — it's not a transitive dependency.
Step 2: Configure Environment Variables
# .env
GOOGLE_API_KEY=your_key_here
# Optional but strongly recommended
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=your_langsmith_key
LANGCHAIN_PROJECT=rag-production
Get your Google API key from Google AI Studio. Gemini 2.0 Flash is available on the free tier with rate limits, and on pay-as-you-go with higher throughput.
Step 3: Initialize the LLM and Embedding Model
import os
from dotenv import load_dotenv
from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings
load_dotenv()
# Gemini 2.0 Flash: fast, cheap, 1M context
llm = ChatGoogleGenerativeAI(
model="gemini-2.0-flash",
temperature=0.1, # Low temp for factual RAG answers
max_output_tokens=2048,
# Convert_system_message_to_human: required for older Gemini versions
# Gemini 2.0 supports system messages natively — leave this out
)
# text-embedding-004 outperforms text-embedding-003 on MTEB benchmarks
embeddings = GoogleGenerativeAIEmbeddings(
model="models/text-embedding-004",
task_type="retrieval_document", # Optimizes embeddings for retrieval, not similarity
)
Common mistake: Using task_type="similarity" for document embeddings. Set retrieval_document for the embedding store and retrieval_query for query-time embeddings. This alone can improve retrieval precision by 10–15%.
Step 4: Build the Vector Store
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import DirectoryLoader, TextFileLoader
# Load documents from a local directory
loader = DirectoryLoader(
"./docs",
glob="**/*.txt",
loader_cls=TextFileLoader,
show_progress=True,
)
raw_docs = loader.load()
# Chunk size 1000 with 200 overlap is a solid default for most content
# Increase chunk_size for technical docs; decrease for Q&A datasets
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
add_start_index=True, # Adds char offset to metadata — useful for debugging
)
docs = splitter.split_documents(raw_docs)
print(f"Split {len(raw_docs)} documents into {len(docs)} chunks")
# Persist to disk so you don't re-embed on every restart
vectorstore = Chroma.from_documents(
documents=docs,
embedding=embeddings,
persist_directory="./chroma_db",
collection_name="production_rag",
)
Expected output:
Split 42 documents into 387 chunks
For production, replace Chroma with pgvector (Postgres) or Qdrant. Chroma is fine for development and up to ~100k documents locally.
Step 5: Build the Retrieval Chain
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate
# Load an existing vectorstore (skip re-embedding)
vectorstore = Chroma(
persist_directory="./chroma_db",
embedding_function=embeddings,
collection_name="production_rag",
)
# MMR retrieval reduces redundancy vs plain similarity search
# fetch_k=20 retrieves 20 candidates, then reranks to k=5
retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={
"k": 5,
"fetch_k": 20,
"lambda_mult": 0.7, # 1.0 = pure similarity, 0.0 = pure diversity
},
)
# System prompt: be explicit about what to do when context is insufficient
system_prompt = """You are a helpful assistant answering questions based only on the provided context.
Context:
{context}
Rules:
- Answer only from the context above
- If the context doesn't contain the answer, say "I don't have enough information to answer that"
- Cite the source document when possible
- Be concise: 2–4 sentences unless asked for more
"""
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{input}"),
])
# create_stuff_documents_chain concatenates retrieved docs into {context}
question_answer_chain = create_stuff_documents_chain(llm, prompt)
rag_chain = create_retrieval_chain(retriever, question_answer_chain)
Step 6: Add Streaming and Retry Logic
Plain .invoke() blocks until the full response is ready — unacceptable for web UIs. Use .astream() with a retry wrapper.
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
from langchain_core.runnables import RunnableConfig
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
# Retry on Google API rate limit errors (429) and transient 5xx
reraise=True,
)
async def stream_rag_response(query: str):
"""Stream tokens from the RAG chain with automatic retry."""
config = RunnableConfig(
tags=["production", "rag"], # Appears in LangSmith trace
metadata={"query_length": len(query)},
)
full_answer = ""
async for chunk in rag_chain.astream({"input": query}, config=config):
# Retrieval chain streams: first the docs, then the answer tokens
if "answer" in chunk:
token = chunk["answer"]
full_answer += token
print(token, end="", flush=True)
print() # Newline after stream completes
return full_answer
# Run it
asyncio.run(stream_rag_response("What are the main features of the product?"))
Expected behavior: Tokens stream to stdout as Gemini generates them. Latency to first token is typically 200–400ms on Gemini 2.0 Flash.
If it fails:
google.api_core.exceptions.ResourceExhausted→ You've hit the free tier rate limit. The retry decorator handles this, but consider upgrading to pay-as-you-go for production.AttributeError: 'str' object has no attribute 'content'→ You're on an oldlangchain-google-genaiversion. Runuv add langchain-google-genai --upgrade.
Step 7: Add Query-Time Embedding Optimization
Using the same embedding model for both documents and queries is correct, but the task_type must differ:
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
# Query-optimized embeddings for the retriever
query_embeddings = GoogleGenerativeAIEmbeddings(
model="models/text-embedding-004",
task_type="retrieval_query", # Different from document embeddings above
)
# Hybrid retrieval: BM25 (keyword) + vector (semantic) = better coverage
bm25_retriever = BM25Retriever.from_documents(docs)
bm25_retriever.k = 5
vector_retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20},
)
# EnsembleRetriever merges results with Reciprocal Rank Fusion
hybrid_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, vector_retriever],
weights=[0.4, 0.6], # Favor semantic search; adjust based on your content
)
# Rebuild the chain with hybrid retrieval
rag_chain = create_retrieval_chain(
hybrid_retriever,
question_answer_chain,
)
Hybrid retrieval consistently outperforms pure vector search on keyword-heavy queries (product names, version numbers, technical terms). The BM25 component handles exact matches that embeddings can miss.
Step 8: Wrap in a FastAPI Endpoint
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import json
app = FastAPI()
class QueryRequest(BaseModel):
question: str
max_length: int = 500
@app.post("/query")
async def query_rag(request: QueryRequest):
if not request.question.strip():
raise HTTPException(status_code=400, detail="Question cannot be empty")
async def generate():
async for chunk in rag_chain.astream({"input": request.question}):
if "answer" in chunk:
# Server-Sent Events format for easy frontend consumption
yield f"data: {json.dumps({'token': chunk['answer']})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
@app.get("/health")
async def health():
return {"status": "ok", "model": "gemini-2.0-flash"}
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
Verification
Run a quick end-to-end test before deploying:
import requests
response = requests.post(
"http://localhost:8000/query",
json={"question": "What is this product used for?"},
stream=True,
)
for line in response.iter_lines():
if line and line.startswith(b"data: ") and line != b"data: [DONE]":
data = json.loads(line[6:])
print(data["token"], end="", flush=True)
You should see: Streamed tokens appearing word-by-word, completing in under 3 seconds for most queries.
Check LangSmith for trace data:
open https://smith.langchain.com
# Navigate to your project: rag-production
# Inspect retrieval latency, chunk quality, and token counts per query
Caption: LangSmith trace showing retrieval latency (148ms), reranking, and Gemini generation time per query
Production Considerations
Context window vs. retrieval quality: Gemini 2.0 Flash's 1M token window tempts you to retrieve 50+ chunks. Resist. More chunks increase cost, latency, and the risk of the model losing focus. 5–10 well-reranked chunks outperform 30 mediocre ones.
Embedding drift: If you update your documents, re-embed everything. Mixing chunks from different embedding model versions in the same collection causes subtle retrieval bugs that are hard to diagnose.
Rate limits: Free tier is 15 requests/minute. Pay-as-you-go allows 1,000+ RPM. For anything beyond testing, enable billing in Google AI Studio.
Cost estimate: At current Gemini 2.0 Flash pricing, a typical RAG query (5 chunks × 1000 tokens + 500 token response) costs roughly $0.0004. A production app handling 10,000 queries/day runs under $5/day.
What You Learned
task_typeon embeddings (retrieval_documentvsretrieval_query) is not optional — it directly affects retrieval accuracy- Hybrid BM25 + vector retrieval handles keyword queries that pure semantic search misses
- Streaming with
.astream()andtenacityretry logic is the minimum viable production setup - LangSmith tracing from day one saves hours of debugging retrieval quality issues
When not to use this stack: If your documents change in real time (live support tickets, dynamic databases), a pure vector store approach won't keep up. Look at streaming ingestion pipelines with Kafka + pgvector instead.
Tested on langchain 0.3.x, langchain-google-genai 2.x, Gemini 2.0 Flash, Python 3.12, Ubuntu 24.04