Build an MCP-Enabled Python Tool for Real-Time Data Analysis in 30 Minutes

Create a Model Context Protocol server in Python that streams live data analysis to Claude with proper error handling and WebSocket support.

Problem: Claude Can't Access Your Live Data Streams

You need Claude to analyze real-time metrics, logs, or sensor data, but MCP servers you've seen only work with static files or APIs that return complete responses.

You'll learn:

  • How to stream live data through MCP's stdio transport
  • Proper error handling for production deployments
  • When MCP is better than direct API integration

Time: 30 min | Level: Intermediate


Why This Happens

MCP (Model Context Protocol) uses stdio by default, which seems incompatible with streaming. Most examples show one-shot data fetching, not continuous monitoring or real-time analysis.

Common symptoms:

  • MCP tools time out on long-running operations
  • Data updates don't reach Claude without restarting
  • Unclear how to handle WebSocket or event streams in MCP

Reality: MCP stdio can handle streaming if you chunk responses and use proper async patterns.


Solution

Step 1: Install MCP SDK and Dependencies

# Create project
mkdir mcp-realtime-analyzer
cd mcp-realtime-analyzer
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# Install MCP SDK (2026 version)
pip install mcp anthropic-mcp-sdk --break-system-packages

# For real-time data sources
pip install websockets pandas --break-system-packages

Expected: SDK installs without errors. If you see dependency conflicts, use Python 3.11+.

If it fails:

  • Error: "No module named 'mcp'": The package is anthropic-mcp-sdk, not mcp
  • Windows path issues: Use forward slashes in file paths

Step 2: Create the MCP Server Structure

# server.py
import asyncio
import json
from datetime import datetime
from typing import Any
from mcp.server import Server
from mcp.types import Tool, TextContent

# Initialize MCP server
app = Server("realtime-analyzer")

@app.list_tools()
async def list_tools() -> list[Tool]:
    """Register available tools with Claude."""
    return [
        Tool(
            name="stream_metrics",
            description="Stream real-time system metrics (CPU, memory, network) with 1-second intervals. Returns analysis-ready JSON.",
            inputSchema={
                "type": "object",
                "properties": {
                    "duration_seconds": {
                        "type": "number",
                        "description": "How long to monitor (max 60s for safety)",
                        "default": 10
                    },
                    "metric_type": {
                        "type": "string",
                        "enum": ["cpu", "memory", "network", "all"],
                        "description": "Which metrics to track",
                        "default": "all"
                    }
                },
                "required": []
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
    """Execute tool and return streaming results."""
    if name == "stream_metrics":
        duration = min(arguments.get("duration_seconds", 10), 60)  # Cap at 60s
        metric_type = arguments.get("metric_type", "all")
        
        # Stream data in chunks
        results = await stream_system_metrics(duration, metric_type)
        
        return [TextContent(
            type="text",
            text=json.dumps(results, indent=2)
        )]
    
    raise ValueError(f"Unknown tool: {name}")

async def stream_system_metrics(duration: int, metric_type: str) -> dict:
    """Collect metrics over time and return aggregated results."""
    import psutil  # Import here to fail gracefully if not installed
    
    metrics = {
        "start_time": datetime.now().isoformat(),
        "duration_seconds": duration,
        "samples": []
    }
    
    for i in range(duration):
        sample = {"timestamp": datetime.now().isoformat()}
        
        # Collect requested metrics
        if metric_type in ["cpu", "all"]:
            sample["cpu_percent"] = psutil.cpu_percent(interval=0.1)
        
        if metric_type in ["memory", "all"]:
            mem = psutil.virtual_memory()
            sample["memory_percent"] = mem.percent
            sample["memory_available_gb"] = mem.available / (1024**3)
        
        if metric_type in ["network", "all"]:
            net = psutil.net_io_counters()
            sample["bytes_sent"] = net.bytes_sent
            sample["bytes_recv"] = net.bytes_recv
        
        metrics["samples"].append(sample)
        
        # Sleep until next second (non-blocking)
        if i < duration - 1:
            await asyncio.sleep(1)
    
    # Add summary statistics
    if metrics["samples"]:
        metrics["summary"] = calculate_summary(metrics["samples"], metric_type)
    
    return metrics

def calculate_summary(samples: list[dict], metric_type: str) -> dict:
    """Calculate min/max/avg for collected metrics."""
    summary = {}
    
    if metric_type in ["cpu", "all"] and "cpu_percent" in samples[0]:
        cpu_values = [s["cpu_percent"] for s in samples]
        summary["cpu"] = {
            "min": min(cpu_values),
            "max": max(cpu_values),
            "avg": sum(cpu_values) / len(cpu_values)
        }
    
    if metric_type in ["memory", "all"] and "memory_percent" in samples[0]:
        mem_values = [s["memory_percent"] for s in samples]
        summary["memory"] = {
            "min": min(mem_values),
            "max": max(mem_values),
            "avg": sum(mem_values) / len(mem_values)
        }
    
    return summary

# Run server on stdio (MCP standard)
if __name__ == "__main__":
    import mcp.server.stdio
    
    async def main():
        async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
            await app.run(
                read_stream,
                write_stream,
                app.create_initialization_options()
            )
    
    asyncio.run(main())

Why this works: MCP doesn't require WebSockets for streaming - you collect data over time and return the full result. Claude waits for the complete response, which can take up to 60 seconds.


Step 3: Install Missing Dependency

pip install psutil --break-system-packages

Expected: Installs system monitoring library.


Step 4: Configure MCP in Claude Desktop

Create or edit ~/Library/Application Support/Claude/claude_desktop_config.json (macOS) or %APPDATA%\Claude\claude_desktop_config.json (Windows):

{
  "mcpServers": {
    "realtime-analyzer": {
      "command": "python",
      "args": [
        "/absolute/path/to/mcp-realtime-analyzer/server.py"
      ],
      "env": {
        "PYTHONUNBUFFERED": "1"
      }
    }
  }
}

Critical: Replace /absolute/path/to/ with your actual project path. Use forward slashes even on Windows.

If it fails:

  • Server not appearing: Restart Claude Desktop completely (not just refresh)
  • "Module not found": Activate venv first: "command": "/path/to/venv/bin/python"
  • Silent failure: Check logs at ~/Library/Logs/Claude/mcp-*.log

Step 5: Test from Claude

Restart Claude Desktop, then ask:

Monitor my system CPU and memory for 10 seconds and tell me if anything looks unusual.

Claude will call your stream_metrics tool and analyze the results.


Verification

Test it:

# Run server directly to test (not through Claude)
python server.py

Then in the Terminal, paste this MCP protocol message:

{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}

You should see: JSON response listing the stream_metrics tool.

Production check:

  • Server starts without errors
  • Tool appears in Claude's MCP menu
  • 10-second monitoring completes successfully
  • Results include summary statistics

Advanced: Handle External Data Streams

WebSocket Integration

# Add to server.py
import websockets

@app.list_tools()
async def list_tools() -> list[Tool]:
    return [
        # ... existing tools ...
        Tool(
            name="monitor_websocket",
            description="Connect to a WebSocket and analyze messages over time",
            inputSchema={
                "type": "object",
                "properties": {
                    "ws_url": {"type": "string", "description": "WebSocket URL"},
                    "duration_seconds": {"type": "number", "default": 30}
                },
                "required": ["ws_url"]
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
    if name == "monitor_websocket":
        url = arguments["ws_url"]
        duration = min(arguments.get("duration_seconds", 30), 60)
        
        messages = []
        start_time = asyncio.get_event_loop().time()
        
        try:
            async with websockets.connect(url, ping_interval=10) as ws:
                while asyncio.get_event_loop().time() - start_time < duration:
                    try:
                        # Wait for message with timeout
                        msg = await asyncio.wait_for(ws.recv(), timeout=5.0)
                        messages.append({
                            "timestamp": datetime.now().isoformat(),
                            "data": msg
                        })
                    except asyncio.TimeoutError:
                        continue  # No message in 5s, keep waiting
        except Exception as e:
            return [TextContent(
                type="text",
                text=f"WebSocket error: {str(e)}"
            )]
        
        return [TextContent(
            type="text",
            text=json.dumps({
                "total_messages": len(messages),
                "duration_seconds": duration,
                "messages": messages[-50:]  # Last 50 only to avoid huge responses
            }, indent=2)
        )]
    
    # ... existing tools ...

Why this pattern: Collect all data during the duration, then return once. MCP doesn't support true streaming responses, but this approach works for up to 60-second monitoring windows.


Error Handling for Production

Add Timeout Protection

# Wrap long-running operations
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
    try:
        # Set maximum execution time
        async with asyncio.timeout(65):  # Slightly over max duration
            if name == "stream_metrics":
                # ... existing code ...
                pass
    except asyncio.TimeoutError:
        return [TextContent(
            type="text",
            text="Operation timed out. Reduce duration_seconds or check data source."
        )]
    except Exception as e:
        # Log error but return useful message
        import traceback
        error_details = traceback.format_exc()
        
        # In production, log to file
        with open("/tmp/mcp-errors.log", "a") as f:
            f.write(f"{datetime.now()}: {error_details}\n")
        
        return [TextContent(
            type="text",
            text=f"Tool error: {str(e)}. Check server logs for details."
        )]

What You Learned

  • MCP stdio handles "streaming" by collecting data over time then returning it
  • Use async patterns to avoid blocking the server during long operations
  • Cap durations at 60 seconds to prevent timeouts
  • Production MCP servers need comprehensive error handling

Limitations:

  • Not true real-time streaming (no incremental updates to Claude)
  • 60-second practical limit per tool call
  • Claude must wait for complete response before analyzing

When NOT to use this:

  • Data that updates faster than 1/second (use direct API instead)
  • Continuous monitoring >60 seconds (batch into multiple calls)
  • Binary data or large files (use resources, not tools)

Troubleshooting

"Server appears but tool calls fail"

  • Check PYTHONUNBUFFERED=1 in config (prevents output buffering)
  • Verify virtual environment Python path is correct
  • Look at MCP logs: ~/Library/Logs/Claude/mcp-realtime-analyzer.log

"ModuleNotFoundError: psutil"

  • Install in the correct venv: ./venv/bin/pip install psutil --break-system-packages
  • Use full path to venv Python in claude_desktop_config.json

"Tool times out after 30 seconds"

  • This is expected behavior for longer operations
  • Reduce duration_seconds to 20 or less
  • For longer monitoring, batch into multiple 30s calls

"Results show empty samples array"

  • psutil might need sudo on some Linux systems for network stats
  • Try metric_type: "cpu" only to isolate the issue
  • Check if psutil works outside MCP: python -c "import psutil; print(psutil.cpu_percent())"

Tested on Python 3.11, MCP SDK 0.9.x, macOS Sonoma & Ubuntu 24.04

GitHub: Example repository for full implementation with tests.