Build a Customer Journey Agent for E-commerce in 45 Minutes

Create an AI agent that tracks, predicts, and personalizes every step of the customer journey — from first visit to repeat purchase.

Problem: Your E-commerce Store Treats Every Customer the Same

Most e-commerce stacks fire generic email sequences and show static recommendations. Meanwhile, 80% of customers abandon after one session because the experience never adapts.

A customer journey agent fixes this — it tracks behavioral signals, infers intent, and takes personalized actions at every stage automatically.

You'll learn:

  • How to model customer journey stages as agent state
  • How to wire up tools for cart data, product search, and messaging
  • How to deploy the agent on a real event stream

Time: 45 min | Level: Advanced


Why This Happens

Static personalization engines run batch jobs — they update recommendations every 24 hours and can't react to what a customer just did. A customer who just viewed a sold-out item in size M gets the same homepage as everyone else.

Agents solve this by maintaining state and running tools in response to live events.

Common symptoms of the missing-agent problem:

  • Email open rates below 15%
  • Cart abandonment above 70%
  • Zero personalization on the first session

Solution

We'll build this in Python using LangChain's tool-calling agent and a simple Postgres event store. The architecture looks like this:

Customer journey agent architecture diagram Events flow in from your storefront, the agent decides what action to take, tools execute it


Step 1: Model the Journey Stages

Define the stages your agent will reason about. Keep it to 5 or fewer — agents degrade with too many states.

# journey_stages.py
from enum import Enum

class JourneyStage(str, Enum):
    DISCOVERY     = "discovery"      # First visit, no account
    CONSIDERATION = "consideration"  # Viewed 2+ products, no cart
    INTENT        = "intent"         # Added to cart, no checkout
    CHECKOUT      = "checkout"       # Started checkout
    RETENTION     = "retention"      # Past purchaser, returning

def infer_stage(events: list[dict]) -> JourneyStage:
    """
    Infer stage from raw event history.
    Events are ordered oldest-first.
    """
    event_types = {e["type"] for e in events}
    
    if "purchase_completed" in event_types:
        return JourneyStage.RETENTION
    if "checkout_started" in event_types:
        return JourneyStage.CHECKOUT
    if "cart_item_added" in event_types:
        return JourneyStage.INTENT
    
    product_views = [e for e in events if e["type"] == "product_viewed"]
    if len(product_views) >= 2:
        return JourneyStage.CONSIDERATION
    
    return JourneyStage.DISCOVERY

Expected: infer_stage([{"type": "cart_item_added"}]) returns JourneyStage.INTENT


Step 2: Build the Agent Tools

Each tool is an action the agent can take. Start with three — you can add more once the core loop works.

# tools.py
from langchain.tools import tool
from typing import Optional

@tool
def fetch_cart(customer_id: str) -> dict:
    """
    Returns the current cart contents for a customer.
    Use this before sending any cart-related message.
    """
    # Replace with your actual cart service call
    cart = db.query(
        "SELECT product_id, quantity, price FROM cart_items WHERE customer_id = %s",
        (customer_id,)
    )
    return {
        "items": cart,
        "total": sum(item["price"] * item["quantity"] for item in cart),
        "item_count": len(cart)
    }


@tool
def search_related_products(product_id: str, limit: int = 3) -> list[dict]:
    """
    Returns products frequently bought with the given product.
    Use when customer is in CONSIDERATION or INTENT stage.
    """
    return recommendation_engine.similar(product_id, k=limit)


@tool
def send_targeted_message(
    customer_id: str,
    channel: str,          # "email" | "sms" | "push"
    template_key: str,
    context: dict
) -> bool:
    """
    Sends a personalized message. Always call fetch_cart first
    if the template involves cart items.
    """
    template = load_template(template_key)
    rendered = template.render(**context)
    return messaging_service.send(customer_id, channel, rendered)

If it fails:

  • TypeError: db is not defined: Wire up your actual DB client before running — these tools need real dependencies injected.
  • template not found: Create a /templates folder with at least cart_reminder.jinja2 and welcome.jinja2.

Step 3: Create the Agent

# agent.py
from langchain_anthropic import ChatAnthropic
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate

from tools import fetch_cart, search_related_products, send_targeted_message
from journey_stages import JourneyStage

SYSTEM_PROMPT = """You are a customer journey agent for an e-commerce store.

Your job: given a customer's current journey stage and recent events, 
decide on the SINGLE best action to take right now.

Rules:
- DISCOVERY: Welcome them, don't push products yet
- CONSIDERATION: Surface related products based on what they viewed
- INTENT: Remind about cart if 30+ minutes since last activity
- CHECKOUT: Never interrupt — let them finish
- RETENTION: Reward loyalty, surface new arrivals in their preferred categories

Always call fetch_cart before referencing specific cart items.
If no action is needed, respond with "no_action" and explain why.
"""

def build_agent():
    llm = ChatAnthropic(model="claude-opus-4-6", temperature=0)
    tools = [fetch_cart, search_related_products, send_targeted_message]
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", SYSTEM_PROMPT),
        ("human", "{input}"),
        ("placeholder", "{agent_scratchpad}"),
    ])
    
    agent = create_tool_calling_agent(llm, tools, prompt)
    return AgentExecutor(agent=agent, tools=tools, verbose=True)


def run_journey_agent(customer_id: str, stage: JourneyStage, events: list[dict]):
    agent = build_agent()
    
    input_text = f"""
    Customer ID: {customer_id}
    Current Stage: {stage.value}
    Recent Events (last 10): {events[-10:]}
    
    What action should we take right now?
    """
    
    return agent.invoke({"input": input_text})

Step 4: Connect to Your Event Stream

Wire the agent to fire on incoming events. This example uses a simple Kafka consumer — swap for your event bus.

# consumer.py
from kafka import KafkaConsumer
import json

from journey_stages import infer_stage
from agent import run_journey_agent

TRIGGER_EVENTS = {
    "cart_item_added",
    "checkout_abandoned",
    "product_viewed",
    "purchase_completed",
    "session_started",
}

consumer = KafkaConsumer(
    "storefront-events",
    bootstrap_servers="localhost:9092",
    value_deserializer=lambda m: json.loads(m.decode("utf-8"))
)

for message in consumer:
    event = message.value
    
    # Only run agent on meaningful trigger events
    if event["type"] not in TRIGGER_EVENTS:
        continue
    
    customer_id = event["customer_id"]
    history = fetch_event_history(customer_id, limit=50)
    stage = infer_stage(history)
    
    result = run_journey_agent(customer_id, stage, history)
    print(f"[{customer_id}] Stage: {stage} | Action: {result['output']}")

Expected: Each qualifying event triggers one agent run. You'll see tool calls logged with verbose=True.

Terminal output showing agent reasoning through a cart abandonment event The agent fetches cart, picks the right template, and fires an SMS — all in one run


Verification

Run the test suite against a mock event stream:

pip install pytest pytest-asyncio langchain-anthropic kafka-python
pytest tests/test_journey_agent.py -v

You should see:

PASSED test_discovery_stage_no_push
PASSED test_intent_stage_sends_cart_reminder
PASSED test_checkout_stage_no_interruption
PASSED test_retention_stage_loyalty_message

If any test fails, check that your ANTHROPIC_API_KEY env var is set and that the mock DB fixtures match the schema in Step 1.


What You Learned

  • Journey stage inference from raw event data — keep it simple, 5 stages max
  • LangChain tool-calling agents work well when each tool has a clear, narrow purpose
  • The CHECKOUT stage rule (never interrupt) prevents the most common agent mistake: over-messaging at the worst time

Limitations to know:

  • This runs one agent per event — at high volume (10k+ events/sec), add a queue and batch similar customers
  • temperature=0 is intentional: you want deterministic behavior in production messaging, not creative variations
  • Don't add more than 6–7 tools without testing degradation — LLMs lose track of tool selection above that threshold

Tested on Python 3.12, LangChain 0.3.x, langchain-anthropic 0.3.x, Kafka 3.7