Build a Real-Time Recommendation Engine with Milvus 3.0

Use Milvus 3.0 vector database to build a production-ready recommendation engine with sub-10ms query latency and live index updates.

Problem: Building Recommendations That Don't Lag

Your recommendation system returns stale suggestions because re-indexing takes minutes. Users who just watched a video or bought a product keep getting the same irrelevant results.

You'll learn:

  • How to set up Milvus 3.0 with streaming index updates
  • How to generate and store embeddings for real-time similarity search
  • How to query recommendations with sub-10ms latency in production

Time: 45 min | Level: Advanced


Why This Happens

Traditional recommendation pipelines batch-process user events and rebuild indexes overnight. Milvus 3.0 introduces Streaming Node architecture, which separates write throughput from search latency. New vectors are queryable within milliseconds of insertion — no full re-index needed.

Common symptoms of the old approach:

  • Recommendations don't reflect actions from the last hour
  • Index rebuilds block writes during peak traffic
  • Scaling search and ingestion requires the same hardware

Solution

Step 1: Install and Start Milvus 3.0

# Pull the Milvus 3.0 standalone image
docker pull milvusdb/milvus:v3.0.0

# Start with Docker Compose
curl -o docker-compose.yml https://raw.githubusercontent.com/milvus-io/milvus/v3.0.0/deployments/docker/standalone/docker-compose.yml

docker compose up -d

Expected: Milvus is running on localhost:19530 and the web UI on localhost:9091.

If it fails:

  • Port 19530 in use: Change the host port in docker-compose.yml and update your connection string.
  • etcd fails to start: Increase Docker memory allocation to at least 4GB.

Install the Python SDK:

pip install pymilvus==2.5.0

Step 2: Create the Collection Schema

A collection in Milvus is like a table — but built for vectors. This schema stores item embeddings alongside metadata for filtering.

from pymilvus import MilvusClient, DataType

client = MilvusClient(uri="http://localhost:19530")

# Define schema for a product recommendation collection
schema = client.create_schema(auto_id=False, enable_dynamic_field=True)

schema.add_field(
    field_name="item_id",
    datatype=DataType.INT64,
    is_primary=True,
)
schema.add_field(
    field_name="embedding",
    datatype=DataType.FLOAT_VECTOR,
    dim=768,  # Match your embedding model's output dimension
)
schema.add_field(
    field_name="category",
    datatype=DataType.VARCHAR,
    max_length=64,
)

# HNSW index — best for real-time ANN search
index_params = client.prepare_index_params()
index_params.add_index(
    field_name="embedding",
    index_type="HNSW",
    metric_type="COSINE",
    params={"M": 16, "efConstruction": 200},
)

client.create_collection(
    collection_name="products",
    schema=schema,
    index_params=index_params,
)

Why HNSW here: Milvus 3.0's Streaming Node keeps an in-memory HNSW graph that merges new vectors without a full rebuild. This is what gives you real-time freshness.


Step 3: Generate and Ingest Embeddings

Use a sentence-transformer model to encode item descriptions into vectors. In production, run this step in your event pipeline (Kafka, Flink, etc.).

from sentence_transformers import SentenceTransformer
import numpy as np

model = SentenceTransformer("all-mpnet-base-v2")  # 768-dim output

def embed_items(items: list[dict]) -> list[dict]:
    """
    items: [{"item_id": 1, "description": "...", "category": "electronics"}]
    """
    descriptions = [item["description"] for item in items]
    embeddings = model.encode(descriptions, normalize_embeddings=True)
    
    return [
        {
            "item_id": item["item_id"],
            "embedding": emb.tolist(),
            "category": item["category"],
        }
        for item, emb in zip(items, embeddings)
    ]

# Ingest a batch
sample_items = [
    {"item_id": 1001, "description": "Wireless noise-cancelling headphones", "category": "electronics"},
    {"item_id": 1002, "description": "Over-ear studio monitor headphones", "category": "electronics"},
    {"item_id": 1003, "description": "Running shoes with carbon fiber plate", "category": "footwear"},
]

rows = embed_items(sample_items)
client.insert(collection_name="products", data=rows)

# Milvus 3.0: no manual flush needed — vectors are immediately searchable

Expected: No errors. New items are queryable within ~50ms on a local setup (single-digit ms on production hardware).


Step 4: Query Real-Time Recommendations

Given a user's recently viewed item, find the top-K most similar items in the catalog. Filter by category to keep results relevant.

def get_recommendations(
    viewed_item_id: int,
    top_k: int = 10,
    category_filter: str | None = None,
) -> list[dict]:
    # Fetch the embedding for the viewed item
    results = client.query(
        collection_name="products",
        filter=f"item_id == {viewed_item_id}",
        output_fields=["embedding", "category"],
    )

    if not results:
        return []

    query_vector = results[0]["embedding"]
    filter_expr = f'category == "{category_filter}"' if category_filter else ""

    # ANN search — this is where Milvus earns its keep
    hits = client.search(
        collection_name="products",
        data=[query_vector],
        limit=top_k + 1,  # +1 because the item itself may appear
        search_params={"ef": 64},  # Higher ef = better recall, slightly slower
        filter=filter_expr,
        output_fields=["item_id", "category"],
    )

    # Remove the source item from results
    recommendations = [
        hit for hit in hits[0]
        if hit["entity"]["item_id"] != viewed_item_id
    ][:top_k]

    return recommendations


# Example call
recs = get_recommendations(viewed_item_id=1001, top_k=5, category_filter="electronics")
for rec in recs:
    print(f"item_id={rec['entity']['item_id']}  score={rec['distance']:.4f}")

Expected output:

item_id=1002  score=0.9431

If it fails:

  • "collection not loaded": Call client.load_collection("products") before searching. Milvus auto-loads on first query in most configs, but standalone may require explicit loading.
  • Low recall: Increase ef in search_params up to 256 and efConstruction when re-creating the index.

Step 5: Handle User Event Streams

Real-time recommendations mean ingesting user-generated content (reviews, interactions) as they happen. Here's a minimal async writer for a FastAPI event endpoint:

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class ItemEvent(BaseModel):
    item_id: int
    description: str
    category: str

@app.post("/items")
async def ingest_item(event: ItemEvent):
    rows = embed_items([event.model_dump()])
    
    # Milvus 3.0 write path is non-blocking
    # The Streaming Node buffers and indexes asynchronously
    client.insert(collection_name="products", data=rows)
    
    return {"status": "ingested", "item_id": event.item_id}

New items added via this endpoint are searchable in the next query — no manual trigger, no batch job.


Verification

Run the full pipeline end-to-end:

# Start the API server
uvicorn main:app --reload

# Insert a new item
curl -X POST http://localhost:8000/items \
  -H "Content-Type: application/json" \
  -d '{"item_id": 2001, "description": "Bluetooth sport earbuds IPX5", "category": "electronics"}'

# Query recommendations for item 1001 — item 2001 should appear
python -c "from main import get_recommendations; print(get_recommendations(1001, top_k=5))"

You should see: Item 2001 in the results within 1–2 seconds of insertion (dominated by embedding model latency, not Milvus).


What You Learned

  • Milvus 3.0's Streaming Node eliminates the batch re-index bottleneck for real-time systems.
  • HNSW with M=16 and efConstruction=200 balances build speed and recall for most catalogs up to ~10M items.
  • The ef search parameter is your main latency/recall dial at query time — tune it per SLA.

Limitations to know:

  • HNSW uses significant RAM (~400 bytes/vector for 768-dim). A 10M item catalog needs ~4GB dedicated to Milvus.
  • Cosine similarity requires normalized vectors — set normalize_embeddings=True in your encoder or results will be wrong.
  • For catalogs over 50M items, move from standalone Docker to a distributed Milvus cluster.

Tested on Milvus 3.0.0, pymilvus 2.5.0, sentence-transformers 3.x, Python 3.12, Ubuntu 24.04