Problem: Building Recommendations That Don't Lag
Your recommendation system returns stale suggestions because re-indexing takes minutes. Users who just watched a video or bought a product keep getting the same irrelevant results.
You'll learn:
- How to set up Milvus 3.0 with streaming index updates
- How to generate and store embeddings for real-time similarity search
- How to query recommendations with sub-10ms latency in production
Time: 45 min | Level: Advanced
Why This Happens
Traditional recommendation pipelines batch-process user events and rebuild indexes overnight. Milvus 3.0 introduces Streaming Node architecture, which separates write throughput from search latency. New vectors are queryable within milliseconds of insertion — no full re-index needed.
Common symptoms of the old approach:
- Recommendations don't reflect actions from the last hour
- Index rebuilds block writes during peak traffic
- Scaling search and ingestion requires the same hardware
Solution
Step 1: Install and Start Milvus 3.0
# Pull the Milvus 3.0 standalone image
docker pull milvusdb/milvus:v3.0.0
# Start with Docker Compose
curl -o docker-compose.yml https://raw.githubusercontent.com/milvus-io/milvus/v3.0.0/deployments/docker/standalone/docker-compose.yml
docker compose up -d
Expected: Milvus is running on localhost:19530 and the web UI on localhost:9091.
If it fails:
- Port 19530 in use: Change the host port in
docker-compose.ymland update your connection string. - etcd fails to start: Increase Docker memory allocation to at least 4GB.
Install the Python SDK:
pip install pymilvus==2.5.0
Step 2: Create the Collection Schema
A collection in Milvus is like a table — but built for vectors. This schema stores item embeddings alongside metadata for filtering.
from pymilvus import MilvusClient, DataType
client = MilvusClient(uri="http://localhost:19530")
# Define schema for a product recommendation collection
schema = client.create_schema(auto_id=False, enable_dynamic_field=True)
schema.add_field(
field_name="item_id",
datatype=DataType.INT64,
is_primary=True,
)
schema.add_field(
field_name="embedding",
datatype=DataType.FLOAT_VECTOR,
dim=768, # Match your embedding model's output dimension
)
schema.add_field(
field_name="category",
datatype=DataType.VARCHAR,
max_length=64,
)
# HNSW index — best for real-time ANN search
index_params = client.prepare_index_params()
index_params.add_index(
field_name="embedding",
index_type="HNSW",
metric_type="COSINE",
params={"M": 16, "efConstruction": 200},
)
client.create_collection(
collection_name="products",
schema=schema,
index_params=index_params,
)
Why HNSW here: Milvus 3.0's Streaming Node keeps an in-memory HNSW graph that merges new vectors without a full rebuild. This is what gives you real-time freshness.
Step 3: Generate and Ingest Embeddings
Use a sentence-transformer model to encode item descriptions into vectors. In production, run this step in your event pipeline (Kafka, Flink, etc.).
from sentence_transformers import SentenceTransformer
import numpy as np
model = SentenceTransformer("all-mpnet-base-v2") # 768-dim output
def embed_items(items: list[dict]) -> list[dict]:
"""
items: [{"item_id": 1, "description": "...", "category": "electronics"}]
"""
descriptions = [item["description"] for item in items]
embeddings = model.encode(descriptions, normalize_embeddings=True)
return [
{
"item_id": item["item_id"],
"embedding": emb.tolist(),
"category": item["category"],
}
for item, emb in zip(items, embeddings)
]
# Ingest a batch
sample_items = [
{"item_id": 1001, "description": "Wireless noise-cancelling headphones", "category": "electronics"},
{"item_id": 1002, "description": "Over-ear studio monitor headphones", "category": "electronics"},
{"item_id": 1003, "description": "Running shoes with carbon fiber plate", "category": "footwear"},
]
rows = embed_items(sample_items)
client.insert(collection_name="products", data=rows)
# Milvus 3.0: no manual flush needed — vectors are immediately searchable
Expected: No errors. New items are queryable within ~50ms on a local setup (single-digit ms on production hardware).
Step 4: Query Real-Time Recommendations
Given a user's recently viewed item, find the top-K most similar items in the catalog. Filter by category to keep results relevant.
def get_recommendations(
viewed_item_id: int,
top_k: int = 10,
category_filter: str | None = None,
) -> list[dict]:
# Fetch the embedding for the viewed item
results = client.query(
collection_name="products",
filter=f"item_id == {viewed_item_id}",
output_fields=["embedding", "category"],
)
if not results:
return []
query_vector = results[0]["embedding"]
filter_expr = f'category == "{category_filter}"' if category_filter else ""
# ANN search — this is where Milvus earns its keep
hits = client.search(
collection_name="products",
data=[query_vector],
limit=top_k + 1, # +1 because the item itself may appear
search_params={"ef": 64}, # Higher ef = better recall, slightly slower
filter=filter_expr,
output_fields=["item_id", "category"],
)
# Remove the source item from results
recommendations = [
hit for hit in hits[0]
if hit["entity"]["item_id"] != viewed_item_id
][:top_k]
return recommendations
# Example call
recs = get_recommendations(viewed_item_id=1001, top_k=5, category_filter="electronics")
for rec in recs:
print(f"item_id={rec['entity']['item_id']} score={rec['distance']:.4f}")
Expected output:
item_id=1002 score=0.9431
If it fails:
- "collection not loaded": Call
client.load_collection("products")before searching. Milvus auto-loads on first query in most configs, but standalone may require explicit loading. - Low recall: Increase
efinsearch_paramsup to 256 andefConstructionwhen re-creating the index.
Step 5: Handle User Event Streams
Real-time recommendations mean ingesting user-generated content (reviews, interactions) as they happen. Here's a minimal async writer for a FastAPI event endpoint:
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class ItemEvent(BaseModel):
item_id: int
description: str
category: str
@app.post("/items")
async def ingest_item(event: ItemEvent):
rows = embed_items([event.model_dump()])
# Milvus 3.0 write path is non-blocking
# The Streaming Node buffers and indexes asynchronously
client.insert(collection_name="products", data=rows)
return {"status": "ingested", "item_id": event.item_id}
New items added via this endpoint are searchable in the next query — no manual trigger, no batch job.
Verification
Run the full pipeline end-to-end:
# Start the API server
uvicorn main:app --reload
# Insert a new item
curl -X POST http://localhost:8000/items \
-H "Content-Type: application/json" \
-d '{"item_id": 2001, "description": "Bluetooth sport earbuds IPX5", "category": "electronics"}'
# Query recommendations for item 1001 — item 2001 should appear
python -c "from main import get_recommendations; print(get_recommendations(1001, top_k=5))"
You should see: Item 2001 in the results within 1–2 seconds of insertion (dominated by embedding model latency, not Milvus).
What You Learned
- Milvus 3.0's Streaming Node eliminates the batch re-index bottleneck for real-time systems.
- HNSW with
M=16andefConstruction=200balances build speed and recall for most catalogs up to ~10M items. - The
efsearch parameter is your main latency/recall dial at query time — tune it per SLA.
Limitations to know:
- HNSW uses significant RAM (~400 bytes/vector for 768-dim). A 10M item catalog needs ~4GB dedicated to Milvus.
- Cosine similarity requires normalized vectors — set
normalize_embeddings=Truein your encoder or results will be wrong. - For catalogs over 50M items, move from standalone Docker to a distributed Milvus cluster.
Tested on Milvus 3.0.0, pymilvus 2.5.0, sentence-transformers 3.x, Python 3.12, Ubuntu 24.04