Anomaly Detection in Trading Data: Ollama Outlier and Pattern Recognition Guide

Learn how to detect trading anomalies using Ollama's AI models. Build robust pattern recognition systems for market outliers and financial data analysis.

Picture this: Your trading algorithm just flagged a $50,000 anomaly at 3 AM while you were dreaming about profits. Was it a genuine market opportunity or a data glitch that could cost you everything?

Trading data flows like a digital river—millions of data points rushing past every second. Hidden within this stream are anomalies that can make or break your trading strategy. Anomaly detection in trading data helps you spot these crucial outliers before they impact your bottom line.

This guide shows you how to build robust anomaly detection systems using Ollama's local AI models. You'll learn to identify market outliers, recognize suspicious patterns, and create automated alerts that protect your trading operations.

What Makes Trading Data Anomalies So Dangerous?

The Hidden Costs of Undetected Anomalies

Trading anomalies cost firms billions annually. A single undetected outlier can:

  • Trigger false buy/sell signals
  • Corrupt your algorithm's learning process
  • Generate massive losses from bad data
  • Violate regulatory compliance requirements

Common Trading Data Anomalies

Price Anomalies

  • Flash crashes and sudden spikes
  • Bid-ask spread irregularities
  • Volume-price disconnects

Volume Anomalies

  • Unusual trading volume bursts
  • Dark pool activity patterns
  • Market maker manipulation signals

Time Series Anomalies

  • Missing data gaps
  • Delayed price updates
  • Timestamp inconsistencies

Why Ollama Excels at Trading Anomaly Detection

Local Processing Advantages

Ollama processes your trading data locally, providing:

  • Zero latency: No cloud API delays
  • Complete privacy: Your data never leaves your system
  • Cost efficiency: No per-request charges
  • Offline capability: Works without internet connections

AI Model Flexibility

Ollama supports multiple models optimized for different anomaly types:

  • Llama 3.1: Best for pattern recognition
  • Mistral: Excellent for numerical analysis
  • CodeLlama: Perfect for algorithm debugging

Setting Up Your Ollama Trading Anomaly Detection System

Prerequisites and Installation

# Install Ollama on your trading server
curl -fsSL https://ollama.com/install.sh | sh

# Pull the recommended model for trading analysis
ollama pull llama3.1:8b

# Verify installation
ollama list

Required Python Libraries

# Install required packages
pip install ollama pandas numpy scikit-learn matplotlib yfinance

# Import essential libraries
import ollama
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
import matplotlib.pyplot as plt
import yfinance as yf
from datetime import datetime, timedelta

Building Your First Trading Anomaly Detector

Step 1: Data Collection and Preparation

class TradingDataCollector:
    def __init__(self, symbols=['AAPL', 'GOOGL', 'MSFT']):
        self.symbols = symbols
        self.data = {}
    
    def fetch_market_data(self, period='1mo'):
        """Collect trading data from multiple sources"""
        for symbol in self.symbols:
            try:
                # Download stock data
                ticker = yf.Ticker(symbol)
                df = ticker.history(period=period, interval='1h')
                
                # Add technical indicators
                df['price_change'] = df['Close'].pct_change()
                df['volume_ma'] = df['Volume'].rolling(window=24).mean()
                df['price_volatility'] = df['Close'].rolling(window=24).std()
                
                self.data[symbol] = df
                print(f"✓ Collected {len(df)} data points for {symbol}")
                
            except Exception as e:
                print(f"✗ Error collecting {symbol}: {e}")
    
    def prepare_features(self, symbol):
        """Extract features for anomaly detection"""
        df = self.data[symbol].copy()
        
        # Create anomaly detection features
        features = pd.DataFrame({
            'price_change': df['price_change'],
            'volume_ratio': df['Volume'] / df['volume_ma'],
            'volatility_score': df['price_volatility'] / df['price_volatility'].mean(),
            'hour': df.index.hour,
            'day_of_week': df.index.dayofweek
        })
        
        # Remove NaN values
        features = features.dropna()
        return features

# Initialize and collect data
collector = TradingDataCollector()
collector.fetch_market_data()

Step 2: Statistical Anomaly Detection

class StatisticalAnomalyDetector:
    def __init__(self, contamination=0.1):
        self.contamination = contamination
        self.scaler = StandardScaler()
        self.detector = IsolationForest(
            contamination=contamination,
            random_state=42
        )
    
    def detect_anomalies(self, features):
        """Detect anomalies using Isolation Forest"""
        # Scale features
        features_scaled = self.scaler.fit_transform(features)
        
        # Detect anomalies
        anomaly_labels = self.detector.fit_predict(features_scaled)
        anomaly_scores = self.detector.decision_function(features_scaled)
        
        # Create results dataframe
        results = features.copy()
        results['anomaly'] = anomaly_labels == -1
        results['anomaly_score'] = anomaly_scores
        
        return results
    
    def analyze_anomalies(self, results):
        """Analyze detected anomalies"""
        anomalies = results[results['anomaly'] == True]
        
        analysis = {
            'total_anomalies': len(anomalies),
            'anomaly_rate': len(anomalies) / len(results) * 100,
            'avg_anomaly_score': anomalies['anomaly_score'].mean(),
            'most_anomalous': anomalies.nsmallest(5, 'anomaly_score')
        }
        
        return analysis

# Detect anomalies for each symbol
detector = StatisticalAnomalyDetector()
anomaly_results = {}

for symbol in collector.symbols:
    features = collector.prepare_features(symbol)
    results = detector.detect_anomalies(features)
    analysis = detector.analyze_anomalies(results)
    
    anomaly_results[symbol] = {
        'results': results,
        'analysis': analysis
    }
    
    print(f"\n{symbol} Anomaly Analysis:")
    print(f"Total anomalies: {analysis['total_anomalies']}")
    print(f"Anomaly rate: {analysis['anomaly_rate']:.2f}%")

Step 3: AI-Powered Pattern Recognition with Ollama

class OllamaPatternAnalyzer:
    def __init__(self, model='llama3.1:8b'):
        self.model = model
        self.client = ollama.Client()
    
    def analyze_anomaly_pattern(self, anomaly_data, symbol):
        """Use Ollama to analyze anomaly patterns"""
        
        # Prepare context for AI analysis
        context = f"""
        Trading Symbol: {symbol}
        Anomaly Data Summary:
        - Total anomalies detected: {len(anomaly_data)}
        - Average price change: {anomaly_data['price_change'].mean():.4f}
        - Average volume ratio: {anomaly_data['volume_ratio'].mean():.2f}
        - Time distribution: {anomaly_data['hour'].value_counts().to_dict()}
        
        Recent anomalous events:
        {anomaly_data.head(10).to_string()}
        """
        
        prompt = f"""
        Analyze this trading anomaly data and provide insights:
        
        {context}
        
        Please provide:
        1. Pattern identification (what type of anomalies are these?)
        2. Potential causes (market events, technical issues, etc.)
        3. Risk assessment (low/medium/high)
        4. Recommended actions
        
        Keep your response focused and actionable for traders.
        """
        
        try:
            response = self.client.generate(
                model=self.model,
                prompt=prompt,
                options={'temperature': 0.3}
            )
            
            return response['response']
            
        except Exception as e:
            return f"Error analyzing pattern: {e}"
    
    def generate_trading_alert(self, symbol, anomaly_score, pattern_analysis):
        """Generate trading alert with AI insights"""
        
        risk_level = "HIGH" if anomaly_score < -0.5 else "MEDIUM" if anomaly_score < -0.2 else "LOW"
        
        alert_prompt = f"""
        Generate a concise trading alert for {symbol}:
        
        Anomaly Score: {anomaly_score:.3f}
        Risk Level: {risk_level}
        Pattern Analysis: {pattern_analysis[:200]}...
        
        Create a brief alert (max 100 words) that includes:
        - Alert type and urgency
        - Key metrics
        - Recommended action
        """
        
        try:
            response = self.client.generate(
                model=self.model,
                prompt=alert_prompt,
                options={'temperature': 0.1}
            )
            
            return response['response']
            
        except Exception as e:
            return f"Error generating alert: {e}"

# Initialize Ollama analyzer
analyzer = OllamaPatternAnalyzer()

# Analyze patterns for each symbol
for symbol, data in anomaly_results.items():
    anomalies = data['results'][data['results']['anomaly'] == True]
    
    if len(anomalies) > 0:
        # Get AI analysis
        pattern_analysis = analyzer.analyze_anomaly_pattern(anomalies, symbol)
        
        # Generate alert for most severe anomaly
        worst_anomaly = anomalies.nsmallest(1, 'anomaly_score')
        alert = analyzer.generate_trading_alert(
            symbol, 
            worst_anomaly['anomaly_score'].iloc[0],
            pattern_analysis
        )
        
        print(f"\n{'='*50}")
        print(f"PATTERN ANALYSIS FOR {symbol}")
        print(f"{'='*50}")
        print(pattern_analysis)
        print(f"\n{'='*20}")
        print("TRADING ALERT")
        print(f"{'='*20}")
        print(alert)

Advanced Anomaly Detection Techniques

Real-Time Streaming Detection

class StreamingAnomalyDetector:
    def __init__(self, window_size=100):
        self.window_size = window_size
        self.data_buffer = []
        self.baseline_stats = {}
    
    def update_baseline(self, new_data):
        """Update baseline statistics with new data"""
        self.data_buffer.append(new_data)
        
        # Maintain rolling window
        if len(self.data_buffer) > self.window_size:
            self.data_buffer.pop(0)
        
        # Calculate updated statistics
        if len(self.data_buffer) >= 20:  # Minimum samples needed
            df = pd.DataFrame(self.data_buffer)
            self.baseline_stats = {
                'price_mean': df['price'].mean(),
                'price_std': df['price'].std(),
                'volume_mean': df['volume'].mean(),
                'volume_std': df['volume'].std()
            }
    
    def detect_realtime_anomaly(self, current_data):
        """Detect anomalies in real-time data"""
        if not self.baseline_stats:
            return False, 0.0
        
        # Calculate z-scores
        price_zscore = abs(
            (current_data['price'] - self.baseline_stats['price_mean']) / 
            self.baseline_stats['price_std']
        )
        
        volume_zscore = abs(
            (current_data['volume'] - self.baseline_stats['volume_mean']) / 
            self.baseline_stats['volume_std']
        )
        
        # Combined anomaly score
        anomaly_score = max(price_zscore, volume_zscore)
        
        # Threshold for anomaly detection
        is_anomaly = anomaly_score > 3.0
        
        return is_anomaly, anomaly_score

# Example usage
streaming_detector = StreamingAnomalyDetector()

# Simulate real-time data processing
print("Real-time Anomaly Detection Demo:")
print("-" * 40)

for i in range(10):
    # Simulate new market data
    new_data = {
        'price': np.random.normal(100, 2),
        'volume': np.random.normal(1000, 200),
        'timestamp': datetime.now()
    }
    
    # Add occasional anomaly
    if i == 7:
        new_data['price'] = 150  # Price spike
        new_data['volume'] = 5000  # Volume spike
    
    # Update baseline and detect anomalies
    streaming_detector.update_baseline(new_data)
    is_anomaly, score = streaming_detector.detect_realtime_anomaly(new_data)
    
    status = "🚨 ANOMALY" if is_anomaly else "✓ Normal"
    print(f"Data point {i+1}: {status} (Score: {score:.2f})")

Multi-Timeframe Anomaly Detection

class MultiTimeframeDetector:
    def __init__(self, timeframes=['1h', '4h', '1d']):
        self.timeframes = timeframes
        self.detectors = {}
        
        # Initialize detector for each timeframe
        for tf in timeframes:
            self.detectors[tf] = StatisticalAnomalyDetector()
    
    def detect_across_timeframes(self, symbol, data):
        """Detect anomalies across multiple timeframes"""
        results = {}
        
        for timeframe in self.timeframes:
            # Resample data to timeframe
            if timeframe == '1h':
                resampled = data.resample('1H').agg({
                    'Open': 'first',
                    'High': 'max',
                    'Low': 'min',
                    'Close': 'last',
                    'Volume': 'sum'
                })
            elif timeframe == '4h':
                resampled = data.resample('4H').agg({
                    'Open': 'first',
                    'High': 'max',
                    'Low': 'min',
                    'Close': 'last',
                    'Volume': 'sum'
                })
            elif timeframe == '1d':
                resampled = data.resample('1D').agg({
                    'Open': 'first',
                    'High': 'max',
                    'Low': 'min',
                    'Close': 'last',
                    'Volume': 'sum'
                })
            
            # Prepare features
            features = pd.DataFrame({
                'price_change': resampled['Close'].pct_change(),
                'volume_change': resampled['Volume'].pct_change(),
                'high_low_ratio': resampled['High'] / resampled['Low']
            }).dropna()
            
            # Detect anomalies
            if len(features) > 10:
                anomalies = self.detectors[timeframe].detect_anomalies(features)
                results[timeframe] = anomalies
        
        return results
    
    def cross_timeframe_analysis(self, symbol, multi_results):
        """Analyze anomalies across timeframes"""
        analysis = {}
        
        for timeframe, results in multi_results.items():
            anomalies = results[results['anomaly'] == True]
            analysis[timeframe] = {
                'count': len(anomalies),
                'severity': anomalies['anomaly_score'].mean() if len(anomalies) > 0 else 0
            }
        
        # Find consensus anomalies (anomalies detected in multiple timeframes)
        consensus_times = set()
        for tf, results in multi_results.items():
            anomaly_times = results[results['anomaly'] == True].index
            if len(consensus_times) == 0:
                consensus_times = set(anomaly_times)
            else:
                consensus_times = consensus_times.intersection(set(anomaly_times))
        
        analysis['consensus_anomalies'] = len(consensus_times)
        
        return analysis

# Example multi-timeframe analysis
multi_detector = MultiTimeframeDetector()

for symbol in collector.symbols:
    if symbol in collector.data:
        multi_results = multi_detector.detect_across_timeframes(
            symbol, collector.data[symbol]
        )
        
        analysis = multi_detector.cross_timeframe_analysis(symbol, multi_results)
        
        print(f"\n{symbol} Multi-Timeframe Analysis:")
        for timeframe, stats in analysis.items():
            if timeframe != 'consensus_anomalies':
                print(f"  {timeframe}: {stats['count']} anomalies, severity: {stats['severity']:.3f}")
        
        print(f"  Consensus anomalies: {analysis['consensus_anomalies']}")

Visualization and Monitoring Dashboard

Creating Anomaly Visualizations

class AnomalyVisualizer:
    def __init__(self, figsize=(15, 10)):
        self.figsize = figsize
        plt.style.use('seaborn-v0_8')
    
    def plot_anomaly_timeline(self, symbol, data, anomaly_results):
        """Create timeline visualization of anomalies"""
        fig, axes = plt.subplots(3, 1, figsize=self.figsize)
        
        # Price chart with anomalies
        axes[0].plot(data.index, data['Close'], label='Price', alpha=0.7)
        
        # Highlight anomalies
        anomalies = anomaly_results[anomaly_results['anomaly'] == True]
        if len(anomalies) > 0:
            # Map anomaly indices to price data
            anomaly_times = [data.index[i] for i in anomalies.index if i < len(data)]
            anomaly_prices = [data['Close'].iloc[i] for i in anomalies.index if i < len(data)]
            
            axes[0].scatter(anomaly_times, anomaly_prices, 
                          color='red', s=50, alpha=0.8, label='Anomalies')
        
        axes[0].set_title(f'{symbol} Price with Anomalies')
        axes[0].set_ylabel('Price ($)')
        axes[0].legend()
        
        # Volume chart
        axes[1].plot(data.index, data['Volume'], label='Volume', alpha=0.7)
        axes[1].set_title(f'{symbol} Volume')
        axes[1].set_ylabel('Volume')
        
        # Anomaly score distribution
        axes[2].hist(anomaly_results['anomaly_score'], bins=30, alpha=0.7)
        axes[2].axvline(x=anomaly_results['anomaly_score'].mean(), 
                       color='red', linestyle='--', label='Mean Score')
        axes[2].set_title('Anomaly Score Distribution')
        axes[2].set_xlabel('Anomaly Score')
        axes[2].set_ylabel('Frequency')
        axes[2].legend()
        
        plt.tight_layout()
        plt.savefig(f'{symbol}_anomaly_analysis.png', dpi=300, bbox_inches='tight')
        plt.show()
    
    def create_anomaly_heatmap(self, multi_symbol_results):
        """Create heatmap of anomaly patterns"""
        # Prepare data for heatmap
        heatmap_data = []
        symbols = []
        
        for symbol, data in multi_symbol_results.items():
            if 'analysis' in data:
                analysis = data['analysis']
                heatmap_data.append([
                    analysis['total_anomalies'],
                    analysis['anomaly_rate'],
                    abs(analysis['avg_anomaly_score'])
                ])
                symbols.append(symbol)
        
        if heatmap_data:
            heatmap_df = pd.DataFrame(
                heatmap_data,
                columns=['Total Anomalies', 'Anomaly Rate (%)', 'Avg Score'],
                index=symbols
            )
            
            plt.figure(figsize=(10, 6))
            plt.imshow(heatmap_df.values, cmap='YlOrRd', aspect='auto')
            plt.colorbar(label='Intensity')
            plt.xticks(range(len(heatmap_df.columns)), heatmap_df.columns)
            plt.yticks(range(len(heatmap_df.index)), heatmap_df.index)
            plt.title('Anomaly Detection Heatmap')
            
            # Add text annotations
            for i in range(len(symbols)):
                for j in range(len(heatmap_df.columns)):
                    plt.text(j, i, f'{heatmap_df.iloc[i, j]:.2f}', 
                            ha='center', va='center', color='white')
            
            plt.tight_layout()
            plt.savefig('anomaly_heatmap.png', dpi=300, bbox_inches='tight')
            plt.show()

# Generate visualizations
visualizer = AnomalyVisualizer()

# Create individual symbol visualizations
for symbol in collector.symbols:
    if symbol in collector.data and symbol in anomaly_results:
        visualizer.plot_anomaly_timeline(
            symbol, 
            collector.data[symbol],
            anomaly_results[symbol]['results']
        )

# Create multi-symbol heatmap
visualizer.create_anomaly_heatmap(anomaly_results)
Trading Anomaly Timeline VisualizationTrading Anomaly Detection Heatmap

Production Deployment Strategies

Docker Container Setup

# Dockerfile for trading anomaly detection
FROM python:3.9-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Install Ollama
RUN curl -fsSL https://ollama.com/install.sh | sh

# Set working directory
WORKDIR /app

# Copy requirements
COPY requirements.txt .
RUN pip install -r requirements.txt

# Copy application code
COPY . .

# Expose port
EXPOSE 8000

# Start script
CMD ["python", "anomaly_detector.py"]

Requirements File

# requirements.txt
ollama>=0.2.0
pandas>=2.0.0
numpy>=1.24.0
scikit-learn>=1.3.0
matplotlib>=3.7.0
yfinance>=0.2.0
fastapi>=0.100.0
uvicorn>=0.23.0
pydantic>=2.0.0
python-multipart>=0.0.6

API Service Implementation

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Dict, Optional
import asyncio
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title="Trading Anomaly Detection API")

class AnomalyRequest(BaseModel):
    symbol: str
    timeframe: str = "1h"
    lookback_days: int = 30

class AnomalyResponse(BaseModel):
    symbol: str
    total_anomalies: int
    anomaly_rate: float
    risk_level: str
    ai_analysis: str
    recommendations: List[str]

@app.post("/detect-anomalies", response_model=AnomalyResponse)
async def detect_anomalies(request: AnomalyRequest):
    """API endpoint for anomaly detection"""
    try:
        # Initialize detectors
        collector = TradingDataCollector([request.symbol])
        detector = StatisticalAnomalyDetector()
        analyzer = OllamaPatternAnalyzer()
        
        # Collect and analyze data
        collector.fetch_market_data(period=f"{request.lookback_days}d")
        features = collector.prepare_features(request.symbol)
        results = detector.detect_anomalies(features)
        analysis = detector.analyze_anomalies(results)
        
        # Get AI insights
        anomalies = results[results['anomaly'] == True]
        ai_analysis = analyzer.analyze_anomaly_pattern(anomalies, request.symbol)
        
        # Determine risk level
        risk_level = "HIGH" if analysis['anomaly_rate'] > 15 else "MEDIUM" if analysis['anomaly_rate'] > 5 else "LOW"
        
        # Generate recommendations
        recommendations = []
        if analysis['anomaly_rate'] > 10:
            recommendations.append("Review trading algorithms for potential issues")
        if analysis['total_anomalies'] > 50:
            recommendations.append("Increase monitoring frequency")
        if len(recommendations) == 0:
            recommendations.append("Continue normal operations")
        
        return AnomalyResponse(
            symbol=request.symbol,
            total_anomalies=analysis['total_anomalies'],
            anomaly_rate=analysis['anomaly_rate'],
            risk_level=risk_level,
            ai_analysis=ai_analysis,
            recommendations=recommendations
        )
        
    except Exception as e:
        logger.error(f"Error detecting anomalies: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {"status": "healthy", "service": "anomaly-detector"}

# Background task for continuous monitoring
async def continuous_monitoring():
    """Background task for continuous anomaly monitoring"""
    while True:
        try:
            # Monitor key symbols
            symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA']
            
            for symbol in symbols:
                # Perform anomaly detection
                # Send alerts if necessary
                logger.info(f"Monitored {symbol} for anomalies")
            
            # Wait before next check
            await asyncio.sleep(300)  # 5 minutes
            
        except Exception as e:
            logger.error(f"Error in continuous monitoring: {e}")
            await asyncio.sleep(60)  # Wait 1 minute before retry

# Start background monitoring
@app.on_event("startup")
async def startup_event():
    asyncio.create_task(continuous_monitoring())

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
API Deployment Microservices Architecture Diagram

Performance Optimization and Scaling

Memory-Efficient Data Processing

class MemoryEfficientDetector:
    def __init__(self, chunk_size=1000):
        self.chunk_size = chunk_size
        self.running_stats = {}
    
    def process_large_dataset(self, file_path):
        """Process large datasets in chunks to avoid memory issues"""
        anomaly_count = 0
        total_rows = 0
        
        # Process data in chunks
        for chunk in pd.read_csv(file_path, chunksize=self.chunk_size):
            # Process chunk
            chunk_anomalies = self.detect_chunk_anomalies(chunk)
            anomaly_count += chunk_anomalies
            total_rows += len(chunk)
            
            # Update running statistics
            self.update_running_stats(chunk)
            
            # Memory cleanup
            del chunk
        
        anomaly_rate = (anomaly_count / total_rows) * 100
        return anomaly_count, anomaly_rate
    
    def detect_chunk_anomalies(self, chunk):
        """Detect anomalies in a data chunk"""
        # Simple statistical detection
        Q1 = chunk['price'].quantile(0.25)
        Q3 = chunk['price'].quantile(0.75)
        IQR = Q3 - Q1
        
        # Define outlier bounds
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        # Count anomalies
        anomalies = chunk[(chunk['price'] < lower_bound) | (chunk['price'] > upper_bound)]
        return len(anomalies)

Caching and Performance Optimization

import redis
import pickle
from functools import wraps

class CacheManager:
    def __init__(self, redis_host='localhost', redis_port=6379):
        self.redis_client = redis.Redis(
            host=redis_host, 
            port=redis_port, 
            decode_responses=False
        )
    
    def cache_result(self, key, data, expiry=3600):
        """Cache analysis results"""
        serialized_data = pickle.dumps(data)
        self.redis_client.setex(key, expiry, serialized_data)
    
    def get_cached_result(self, key):
        """Retrieve cached results"""
        cached_data = self.redis_client.get(key)
        if cached_data:
            return pickle.loads(cached_data)
        return None

def cache_anomaly_detection(cache_manager):
    """Decorator for caching anomaly detection results"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Create cache key
            cache_key = f"anomaly_{hash(str(args) + str(kwargs))}"
            
            # Try to get cached result
            cached_result = cache_manager.get_cached_result(cache_key)
            if cached_result:
                return cached_result
            
            # Compute result
            result = func(*args, **kwargs)
            
            # Cache result
            cache_manager.cache_result(cache_key, result)
            
            return result
        return wrapper
    return decorator

# Usage example
cache_manager = CacheManager()

@cache_anomaly_detection(cache_manager)
def cached_anomaly_detection(symbol, timeframe):
    """Cached anomaly detection function"""
    # Your anomaly detection logic here
    pass

Monitoring and Alerting System

Real-Time Alert System

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import json
import requests

class AlertManager:
    def __init__(self, config):
        self.config = config
        self.alert_thresholds = {
            'HIGH': 0.8,
            'MEDIUM': 0.5,
            'LOW': 0.2
        }
    
    def send_email_alert(self, subject, body, recipient):
        """Send email alert"""
        try:
            msg = MIMEMultipart()
            msg['From'] = self.config['email']['sender']
            msg['To'] = recipient
            msg['Subject'] = subject
            
            msg.attach(MIMEText(body, 'plain'))
            
            server = smtplib.SMTP(self.config['email']['smtp_server'], 587)
            server.starttls()
            server.login(
                self.config['email']['username'],
                self.config['email']['password']
            )
            
            server.send_message(msg)
            server.quit()
            
            return True
            
        except Exception as e:
            print(f"Email alert failed: {e}")
            return False
    
    def send_slack_alert(self, message):
        """Send Slack alert"""
        try:
            webhook_url = self.config['slack']['webhook_url']
            
            payload = {
                'text': message,
                'channel': '#trading-alerts',
                'username': 'AnomalyBot'
            }
            
            response = requests.post(webhook_url, json=payload)
            return response.status_code == 200
            
        except Exception as e:
            print(f"Slack alert failed: {e}")
            return False
    
    def process_anomaly_alert(self, symbol, anomaly_data, ai_analysis):
        """Process and send anomaly alerts"""
        severity = self.calculate_severity(anomaly_data)
        
        if severity >= self.alert_thresholds['HIGH']:
            alert_type = "🚨 HIGH PRIORITY"
            color = "danger"
        elif severity >= self.alert_thresholds['MEDIUM']:
            alert_type = "⚠️ MEDIUM PRIORITY"
            color = "warning"
        else:
            alert_type = "ℹ️ LOW PRIORITY"
            color = "good"
        
        # Create alert message
        alert_message = f"""
        {alert_type} TRADING ANOMALY DETECTED
        
        Symbol: {symbol}
        Severity Score: {severity:.3f}
        Total Anomalies: {anomaly_data['total_anomalies']}
        Anomaly Rate: {anomaly_data['anomaly_rate']:.2f}%
        
        AI Analysis:
        {ai_analysis[:500]}...
        
        Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
        """
        
        # Send alerts based on severity
        if severity >= self.alert_thresholds['HIGH']:
            self.send_email_alert(
                f"HIGH PRIORITY: {symbol} Trading Anomaly",
                alert_message,
                self.config['email']['recipients']['high_priority']
            )
        
        # Always send to Slack
        self.send_slack_alert(alert_message)
    
    def calculate_severity(self, anomaly_data):
        """Calculate alert severity score"""
        # Combine multiple factors
        rate_score = min(anomaly_data['anomaly_rate'] / 20, 1.0)  # Normalize to 0-1
        count_score = min(anomaly_data['total_anomalies'] / 100, 1.0)
        avg_score = min(abs(anomaly_data['avg_anomaly_score']) / 2, 1.0)
        
        # Weighted combination
        severity = (rate_score * 0.4) + (count_score * 0.3) + (avg_score * 0.3)
        return severity

# Alert configuration
alert_config = {
    'email': {
        'sender': 'alerts@yourcompany.com',
        'username': 'your_email@gmail.com',
        'password': 'your_app_password',
        'smtp_server': 'smtp.gmail.com',
        'recipients': {
            'high_priority': 'trader@yourcompany.com'
        }
    },
    'slack': {
        'webhook_url': 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL'
    }
}

# Initialize alert manager
alert_manager = AlertManager(alert_config)

# Example alert processing
for symbol, data in anomaly_results.items():
    if data['analysis']['total_anomalies'] > 0:
        # Get AI analysis
        anomalies = data['results'][data['results']['anomaly'] == True]
        ai_analysis = analyzer.analyze_anomaly_pattern(anomalies, symbol)
        
        # Process alert
        alert_manager.process_anomaly_alert(
            symbol, 
            data['analysis'], 
            ai_analysis
        )
Trading Anomaly Detection Alert Dashboard

Best Practices and Common Pitfalls

Data Quality Considerations

Data Validation Checklist:

  • ✅ Check for missing timestamps
  • ✅ Validate price ranges (no negative prices)
  • ✅ Verify volume consistency
  • ✅ Handle market holidays and closures
  • ✅ Account for stock splits and dividends

Model Tuning Guidelines

Isolation Forest Parameters:

  • contamination: Start with 0.1 (10% anomalies expected)
  • n_estimators: Use 100+ for production
  • max_samples: Set to 'auto' for large datasets
  • random_state: Fix for reproducible results

Ollama Model Selection:

  • Llama 3.1:8b: Best for general pattern analysis
  • Mistral 7B: Faster processing, good for real-time
  • CodeLlama: Use for debugging trading algorithms

Common Pitfalls to Avoid

  1. Over-fitting to Historical Data

    • Use rolling windows instead of fixed training sets
    • Validate on out-of-sample data
    • Regularly retrain models
  2. Ignoring Market Conditions

    • Adjust thresholds during high volatility periods
    • Consider market hours and holidays
    • Account for earnings announcements
  3. Alert Fatigue

    • Implement severity levels
    • Use escalation policies
    • Provide clear action items

Conclusion

Anomaly detection in trading data transforms raw market signals into actionable insights. By combining statistical methods with Ollama's AI capabilities, you create a robust system that:

  • Identifies critical market anomalies before they impact your portfolio
  • Provides AI-powered pattern analysis for better decision-making
  • Scales efficiently from single-symbol monitoring to portfolio-wide surveillance
  • Delivers real-time alerts that protect your trading operations

The techniques covered in this guide give you a solid foundation for building production-ready anomaly detection systems. Start with the basic implementation, then gradually add advanced features like multi-timeframe analysis and real-time streaming as your needs grow.

Remember that effective anomaly detection requires continuous refinement. Monitor your system's performance, tune parameters based on market conditions, and always validate alerts against actual trading outcomes.

Your trading data contains valuable signals hidden within the noise. With proper anomaly detection, you can discover these patterns and gain a competitive edge in the markets.


Ready to implement these techniques in your own trading system? Start with the basic setup and gradually add advanced features as you gain experience with the platform.