Picture this: Your trading algorithm just flagged a $50,000 anomaly at 3 AM while you were dreaming about profits. Was it a genuine market opportunity or a data glitch that could cost you everything?
Trading data flows like a digital river—millions of data points rushing past every second. Hidden within this stream are anomalies that can make or break your trading strategy. Anomaly detection in trading data helps you spot these crucial outliers before they impact your bottom line.
This guide shows you how to build robust anomaly detection systems using Ollama's local AI models. You'll learn to identify market outliers, recognize suspicious patterns, and create automated alerts that protect your trading operations.
What Makes Trading Data Anomalies So Dangerous?
The Hidden Costs of Undetected Anomalies
Trading anomalies cost firms billions annually. A single undetected outlier can:
- Trigger false buy/sell signals
- Corrupt your algorithm's learning process
- Generate massive losses from bad data
- Violate regulatory compliance requirements
Common Trading Data Anomalies
Price Anomalies
- Flash crashes and sudden spikes
- Bid-ask spread irregularities
- Volume-price disconnects
Volume Anomalies
- Unusual trading volume bursts
- Dark pool activity patterns
- Market maker manipulation signals
Time Series Anomalies
- Missing data gaps
- Delayed price updates
- Timestamp inconsistencies
Why Ollama Excels at Trading Anomaly Detection
Local Processing Advantages
Ollama processes your trading data locally, providing:
- Zero latency: No cloud API delays
- Complete privacy: Your data never leaves your system
- Cost efficiency: No per-request charges
- Offline capability: Works without internet connections
AI Model Flexibility
Ollama supports multiple models optimized for different anomaly types:
- Llama 3.1: Best for pattern recognition
- Mistral: Excellent for numerical analysis
- CodeLlama: Perfect for algorithm debugging
Setting Up Your Ollama Trading Anomaly Detection System
Prerequisites and Installation
# Install Ollama on your trading server
curl -fsSL https://ollama.com/install.sh | sh
# Pull the recommended model for trading analysis
ollama pull llama3.1:8b
# Verify installation
ollama list
Required Python Libraries
# Install required packages
pip install ollama pandas numpy scikit-learn matplotlib yfinance
# Import essential libraries
import ollama
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
import matplotlib.pyplot as plt
import yfinance as yf
from datetime import datetime, timedelta
Building Your First Trading Anomaly Detector
Step 1: Data Collection and Preparation
class TradingDataCollector:
def __init__(self, symbols=['AAPL', 'GOOGL', 'MSFT']):
self.symbols = symbols
self.data = {}
def fetch_market_data(self, period='1mo'):
"""Collect trading data from multiple sources"""
for symbol in self.symbols:
try:
# Download stock data
ticker = yf.Ticker(symbol)
df = ticker.history(period=period, interval='1h')
# Add technical indicators
df['price_change'] = df['Close'].pct_change()
df['volume_ma'] = df['Volume'].rolling(window=24).mean()
df['price_volatility'] = df['Close'].rolling(window=24).std()
self.data[symbol] = df
print(f"✓ Collected {len(df)} data points for {symbol}")
except Exception as e:
print(f"✗ Error collecting {symbol}: {e}")
def prepare_features(self, symbol):
"""Extract features for anomaly detection"""
df = self.data[symbol].copy()
# Create anomaly detection features
features = pd.DataFrame({
'price_change': df['price_change'],
'volume_ratio': df['Volume'] / df['volume_ma'],
'volatility_score': df['price_volatility'] / df['price_volatility'].mean(),
'hour': df.index.hour,
'day_of_week': df.index.dayofweek
})
# Remove NaN values
features = features.dropna()
return features
# Initialize and collect data
collector = TradingDataCollector()
collector.fetch_market_data()
Step 2: Statistical Anomaly Detection
class StatisticalAnomalyDetector:
def __init__(self, contamination=0.1):
self.contamination = contamination
self.scaler = StandardScaler()
self.detector = IsolationForest(
contamination=contamination,
random_state=42
)
def detect_anomalies(self, features):
"""Detect anomalies using Isolation Forest"""
# Scale features
features_scaled = self.scaler.fit_transform(features)
# Detect anomalies
anomaly_labels = self.detector.fit_predict(features_scaled)
anomaly_scores = self.detector.decision_function(features_scaled)
# Create results dataframe
results = features.copy()
results['anomaly'] = anomaly_labels == -1
results['anomaly_score'] = anomaly_scores
return results
def analyze_anomalies(self, results):
"""Analyze detected anomalies"""
anomalies = results[results['anomaly'] == True]
analysis = {
'total_anomalies': len(anomalies),
'anomaly_rate': len(anomalies) / len(results) * 100,
'avg_anomaly_score': anomalies['anomaly_score'].mean(),
'most_anomalous': anomalies.nsmallest(5, 'anomaly_score')
}
return analysis
# Detect anomalies for each symbol
detector = StatisticalAnomalyDetector()
anomaly_results = {}
for symbol in collector.symbols:
features = collector.prepare_features(symbol)
results = detector.detect_anomalies(features)
analysis = detector.analyze_anomalies(results)
anomaly_results[symbol] = {
'results': results,
'analysis': analysis
}
print(f"\n{symbol} Anomaly Analysis:")
print(f"Total anomalies: {analysis['total_anomalies']}")
print(f"Anomaly rate: {analysis['anomaly_rate']:.2f}%")
Step 3: AI-Powered Pattern Recognition with Ollama
class OllamaPatternAnalyzer:
def __init__(self, model='llama3.1:8b'):
self.model = model
self.client = ollama.Client()
def analyze_anomaly_pattern(self, anomaly_data, symbol):
"""Use Ollama to analyze anomaly patterns"""
# Prepare context for AI analysis
context = f"""
Trading Symbol: {symbol}
Anomaly Data Summary:
- Total anomalies detected: {len(anomaly_data)}
- Average price change: {anomaly_data['price_change'].mean():.4f}
- Average volume ratio: {anomaly_data['volume_ratio'].mean():.2f}
- Time distribution: {anomaly_data['hour'].value_counts().to_dict()}
Recent anomalous events:
{anomaly_data.head(10).to_string()}
"""
prompt = f"""
Analyze this trading anomaly data and provide insights:
{context}
Please provide:
1. Pattern identification (what type of anomalies are these?)
2. Potential causes (market events, technical issues, etc.)
3. Risk assessment (low/medium/high)
4. Recommended actions
Keep your response focused and actionable for traders.
"""
try:
response = self.client.generate(
model=self.model,
prompt=prompt,
options={'temperature': 0.3}
)
return response['response']
except Exception as e:
return f"Error analyzing pattern: {e}"
def generate_trading_alert(self, symbol, anomaly_score, pattern_analysis):
"""Generate trading alert with AI insights"""
risk_level = "HIGH" if anomaly_score < -0.5 else "MEDIUM" if anomaly_score < -0.2 else "LOW"
alert_prompt = f"""
Generate a concise trading alert for {symbol}:
Anomaly Score: {anomaly_score:.3f}
Risk Level: {risk_level}
Pattern Analysis: {pattern_analysis[:200]}...
Create a brief alert (max 100 words) that includes:
- Alert type and urgency
- Key metrics
- Recommended action
"""
try:
response = self.client.generate(
model=self.model,
prompt=alert_prompt,
options={'temperature': 0.1}
)
return response['response']
except Exception as e:
return f"Error generating alert: {e}"
# Initialize Ollama analyzer
analyzer = OllamaPatternAnalyzer()
# Analyze patterns for each symbol
for symbol, data in anomaly_results.items():
anomalies = data['results'][data['results']['anomaly'] == True]
if len(anomalies) > 0:
# Get AI analysis
pattern_analysis = analyzer.analyze_anomaly_pattern(anomalies, symbol)
# Generate alert for most severe anomaly
worst_anomaly = anomalies.nsmallest(1, 'anomaly_score')
alert = analyzer.generate_trading_alert(
symbol,
worst_anomaly['anomaly_score'].iloc[0],
pattern_analysis
)
print(f"\n{'='*50}")
print(f"PATTERN ANALYSIS FOR {symbol}")
print(f"{'='*50}")
print(pattern_analysis)
print(f"\n{'='*20}")
print("TRADING ALERT")
print(f"{'='*20}")
print(alert)
Advanced Anomaly Detection Techniques
Real-Time Streaming Detection
class StreamingAnomalyDetector:
def __init__(self, window_size=100):
self.window_size = window_size
self.data_buffer = []
self.baseline_stats = {}
def update_baseline(self, new_data):
"""Update baseline statistics with new data"""
self.data_buffer.append(new_data)
# Maintain rolling window
if len(self.data_buffer) > self.window_size:
self.data_buffer.pop(0)
# Calculate updated statistics
if len(self.data_buffer) >= 20: # Minimum samples needed
df = pd.DataFrame(self.data_buffer)
self.baseline_stats = {
'price_mean': df['price'].mean(),
'price_std': df['price'].std(),
'volume_mean': df['volume'].mean(),
'volume_std': df['volume'].std()
}
def detect_realtime_anomaly(self, current_data):
"""Detect anomalies in real-time data"""
if not self.baseline_stats:
return False, 0.0
# Calculate z-scores
price_zscore = abs(
(current_data['price'] - self.baseline_stats['price_mean']) /
self.baseline_stats['price_std']
)
volume_zscore = abs(
(current_data['volume'] - self.baseline_stats['volume_mean']) /
self.baseline_stats['volume_std']
)
# Combined anomaly score
anomaly_score = max(price_zscore, volume_zscore)
# Threshold for anomaly detection
is_anomaly = anomaly_score > 3.0
return is_anomaly, anomaly_score
# Example usage
streaming_detector = StreamingAnomalyDetector()
# Simulate real-time data processing
print("Real-time Anomaly Detection Demo:")
print("-" * 40)
for i in range(10):
# Simulate new market data
new_data = {
'price': np.random.normal(100, 2),
'volume': np.random.normal(1000, 200),
'timestamp': datetime.now()
}
# Add occasional anomaly
if i == 7:
new_data['price'] = 150 # Price spike
new_data['volume'] = 5000 # Volume spike
# Update baseline and detect anomalies
streaming_detector.update_baseline(new_data)
is_anomaly, score = streaming_detector.detect_realtime_anomaly(new_data)
status = "🚨 ANOMALY" if is_anomaly else "✓ Normal"
print(f"Data point {i+1}: {status} (Score: {score:.2f})")
Multi-Timeframe Anomaly Detection
class MultiTimeframeDetector:
def __init__(self, timeframes=['1h', '4h', '1d']):
self.timeframes = timeframes
self.detectors = {}
# Initialize detector for each timeframe
for tf in timeframes:
self.detectors[tf] = StatisticalAnomalyDetector()
def detect_across_timeframes(self, symbol, data):
"""Detect anomalies across multiple timeframes"""
results = {}
for timeframe in self.timeframes:
# Resample data to timeframe
if timeframe == '1h':
resampled = data.resample('1H').agg({
'Open': 'first',
'High': 'max',
'Low': 'min',
'Close': 'last',
'Volume': 'sum'
})
elif timeframe == '4h':
resampled = data.resample('4H').agg({
'Open': 'first',
'High': 'max',
'Low': 'min',
'Close': 'last',
'Volume': 'sum'
})
elif timeframe == '1d':
resampled = data.resample('1D').agg({
'Open': 'first',
'High': 'max',
'Low': 'min',
'Close': 'last',
'Volume': 'sum'
})
# Prepare features
features = pd.DataFrame({
'price_change': resampled['Close'].pct_change(),
'volume_change': resampled['Volume'].pct_change(),
'high_low_ratio': resampled['High'] / resampled['Low']
}).dropna()
# Detect anomalies
if len(features) > 10:
anomalies = self.detectors[timeframe].detect_anomalies(features)
results[timeframe] = anomalies
return results
def cross_timeframe_analysis(self, symbol, multi_results):
"""Analyze anomalies across timeframes"""
analysis = {}
for timeframe, results in multi_results.items():
anomalies = results[results['anomaly'] == True]
analysis[timeframe] = {
'count': len(anomalies),
'severity': anomalies['anomaly_score'].mean() if len(anomalies) > 0 else 0
}
# Find consensus anomalies (anomalies detected in multiple timeframes)
consensus_times = set()
for tf, results in multi_results.items():
anomaly_times = results[results['anomaly'] == True].index
if len(consensus_times) == 0:
consensus_times = set(anomaly_times)
else:
consensus_times = consensus_times.intersection(set(anomaly_times))
analysis['consensus_anomalies'] = len(consensus_times)
return analysis
# Example multi-timeframe analysis
multi_detector = MultiTimeframeDetector()
for symbol in collector.symbols:
if symbol in collector.data:
multi_results = multi_detector.detect_across_timeframes(
symbol, collector.data[symbol]
)
analysis = multi_detector.cross_timeframe_analysis(symbol, multi_results)
print(f"\n{symbol} Multi-Timeframe Analysis:")
for timeframe, stats in analysis.items():
if timeframe != 'consensus_anomalies':
print(f" {timeframe}: {stats['count']} anomalies, severity: {stats['severity']:.3f}")
print(f" Consensus anomalies: {analysis['consensus_anomalies']}")
Visualization and Monitoring Dashboard
Creating Anomaly Visualizations
class AnomalyVisualizer:
def __init__(self, figsize=(15, 10)):
self.figsize = figsize
plt.style.use('seaborn-v0_8')
def plot_anomaly_timeline(self, symbol, data, anomaly_results):
"""Create timeline visualization of anomalies"""
fig, axes = plt.subplots(3, 1, figsize=self.figsize)
# Price chart with anomalies
axes[0].plot(data.index, data['Close'], label='Price', alpha=0.7)
# Highlight anomalies
anomalies = anomaly_results[anomaly_results['anomaly'] == True]
if len(anomalies) > 0:
# Map anomaly indices to price data
anomaly_times = [data.index[i] for i in anomalies.index if i < len(data)]
anomaly_prices = [data['Close'].iloc[i] for i in anomalies.index if i < len(data)]
axes[0].scatter(anomaly_times, anomaly_prices,
color='red', s=50, alpha=0.8, label='Anomalies')
axes[0].set_title(f'{symbol} Price with Anomalies')
axes[0].set_ylabel('Price ($)')
axes[0].legend()
# Volume chart
axes[1].plot(data.index, data['Volume'], label='Volume', alpha=0.7)
axes[1].set_title(f'{symbol} Volume')
axes[1].set_ylabel('Volume')
# Anomaly score distribution
axes[2].hist(anomaly_results['anomaly_score'], bins=30, alpha=0.7)
axes[2].axvline(x=anomaly_results['anomaly_score'].mean(),
color='red', linestyle='--', label='Mean Score')
axes[2].set_title('Anomaly Score Distribution')
axes[2].set_xlabel('Anomaly Score')
axes[2].set_ylabel('Frequency')
axes[2].legend()
plt.tight_layout()
plt.savefig(f'{symbol}_anomaly_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
def create_anomaly_heatmap(self, multi_symbol_results):
"""Create heatmap of anomaly patterns"""
# Prepare data for heatmap
heatmap_data = []
symbols = []
for symbol, data in multi_symbol_results.items():
if 'analysis' in data:
analysis = data['analysis']
heatmap_data.append([
analysis['total_anomalies'],
analysis['anomaly_rate'],
abs(analysis['avg_anomaly_score'])
])
symbols.append(symbol)
if heatmap_data:
heatmap_df = pd.DataFrame(
heatmap_data,
columns=['Total Anomalies', 'Anomaly Rate (%)', 'Avg Score'],
index=symbols
)
plt.figure(figsize=(10, 6))
plt.imshow(heatmap_df.values, cmap='YlOrRd', aspect='auto')
plt.colorbar(label='Intensity')
plt.xticks(range(len(heatmap_df.columns)), heatmap_df.columns)
plt.yticks(range(len(heatmap_df.index)), heatmap_df.index)
plt.title('Anomaly Detection Heatmap')
# Add text annotations
for i in range(len(symbols)):
for j in range(len(heatmap_df.columns)):
plt.text(j, i, f'{heatmap_df.iloc[i, j]:.2f}',
ha='center', va='center', color='white')
plt.tight_layout()
plt.savefig('anomaly_heatmap.png', dpi=300, bbox_inches='tight')
plt.show()
# Generate visualizations
visualizer = AnomalyVisualizer()
# Create individual symbol visualizations
for symbol in collector.symbols:
if symbol in collector.data and symbol in anomaly_results:
visualizer.plot_anomaly_timeline(
symbol,
collector.data[symbol],
anomaly_results[symbol]['results']
)
# Create multi-symbol heatmap
visualizer.create_anomaly_heatmap(anomaly_results)
Production Deployment Strategies
Docker Container Setup
# Dockerfile for trading anomaly detection
FROM python:3.9-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# Install Ollama
RUN curl -fsSL https://ollama.com/install.sh | sh
# Set working directory
WORKDIR /app
# Copy requirements
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy application code
COPY . .
# Expose port
EXPOSE 8000
# Start script
CMD ["python", "anomaly_detector.py"]
Requirements File
# requirements.txt
ollama>=0.2.0
pandas>=2.0.0
numpy>=1.24.0
scikit-learn>=1.3.0
matplotlib>=3.7.0
yfinance>=0.2.0
fastapi>=0.100.0
uvicorn>=0.23.0
pydantic>=2.0.0
python-multipart>=0.0.6
API Service Implementation
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Dict, Optional
import asyncio
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Trading Anomaly Detection API")
class AnomalyRequest(BaseModel):
symbol: str
timeframe: str = "1h"
lookback_days: int = 30
class AnomalyResponse(BaseModel):
symbol: str
total_anomalies: int
anomaly_rate: float
risk_level: str
ai_analysis: str
recommendations: List[str]
@app.post("/detect-anomalies", response_model=AnomalyResponse)
async def detect_anomalies(request: AnomalyRequest):
"""API endpoint for anomaly detection"""
try:
# Initialize detectors
collector = TradingDataCollector([request.symbol])
detector = StatisticalAnomalyDetector()
analyzer = OllamaPatternAnalyzer()
# Collect and analyze data
collector.fetch_market_data(period=f"{request.lookback_days}d")
features = collector.prepare_features(request.symbol)
results = detector.detect_anomalies(features)
analysis = detector.analyze_anomalies(results)
# Get AI insights
anomalies = results[results['anomaly'] == True]
ai_analysis = analyzer.analyze_anomaly_pattern(anomalies, request.symbol)
# Determine risk level
risk_level = "HIGH" if analysis['anomaly_rate'] > 15 else "MEDIUM" if analysis['anomaly_rate'] > 5 else "LOW"
# Generate recommendations
recommendations = []
if analysis['anomaly_rate'] > 10:
recommendations.append("Review trading algorithms for potential issues")
if analysis['total_anomalies'] > 50:
recommendations.append("Increase monitoring frequency")
if len(recommendations) == 0:
recommendations.append("Continue normal operations")
return AnomalyResponse(
symbol=request.symbol,
total_anomalies=analysis['total_anomalies'],
anomaly_rate=analysis['anomaly_rate'],
risk_level=risk_level,
ai_analysis=ai_analysis,
recommendations=recommendations
)
except Exception as e:
logger.error(f"Error detecting anomalies: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "anomaly-detector"}
# Background task for continuous monitoring
async def continuous_monitoring():
"""Background task for continuous anomaly monitoring"""
while True:
try:
# Monitor key symbols
symbols = ['AAPL', 'GOOGL', 'MSFT', 'TSLA']
for symbol in symbols:
# Perform anomaly detection
# Send alerts if necessary
logger.info(f"Monitored {symbol} for anomalies")
# Wait before next check
await asyncio.sleep(300) # 5 minutes
except Exception as e:
logger.error(f"Error in continuous monitoring: {e}")
await asyncio.sleep(60) # Wait 1 minute before retry
# Start background monitoring
@app.on_event("startup")
async def startup_event():
asyncio.create_task(continuous_monitoring())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Performance Optimization and Scaling
Memory-Efficient Data Processing
class MemoryEfficientDetector:
def __init__(self, chunk_size=1000):
self.chunk_size = chunk_size
self.running_stats = {}
def process_large_dataset(self, file_path):
"""Process large datasets in chunks to avoid memory issues"""
anomaly_count = 0
total_rows = 0
# Process data in chunks
for chunk in pd.read_csv(file_path, chunksize=self.chunk_size):
# Process chunk
chunk_anomalies = self.detect_chunk_anomalies(chunk)
anomaly_count += chunk_anomalies
total_rows += len(chunk)
# Update running statistics
self.update_running_stats(chunk)
# Memory cleanup
del chunk
anomaly_rate = (anomaly_count / total_rows) * 100
return anomaly_count, anomaly_rate
def detect_chunk_anomalies(self, chunk):
"""Detect anomalies in a data chunk"""
# Simple statistical detection
Q1 = chunk['price'].quantile(0.25)
Q3 = chunk['price'].quantile(0.75)
IQR = Q3 - Q1
# Define outlier bounds
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# Count anomalies
anomalies = chunk[(chunk['price'] < lower_bound) | (chunk['price'] > upper_bound)]
return len(anomalies)
Caching and Performance Optimization
import redis
import pickle
from functools import wraps
class CacheManager:
def __init__(self, redis_host='localhost', redis_port=6379):
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=False
)
def cache_result(self, key, data, expiry=3600):
"""Cache analysis results"""
serialized_data = pickle.dumps(data)
self.redis_client.setex(key, expiry, serialized_data)
def get_cached_result(self, key):
"""Retrieve cached results"""
cached_data = self.redis_client.get(key)
if cached_data:
return pickle.loads(cached_data)
return None
def cache_anomaly_detection(cache_manager):
"""Decorator for caching anomaly detection results"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Create cache key
cache_key = f"anomaly_{hash(str(args) + str(kwargs))}"
# Try to get cached result
cached_result = cache_manager.get_cached_result(cache_key)
if cached_result:
return cached_result
# Compute result
result = func(*args, **kwargs)
# Cache result
cache_manager.cache_result(cache_key, result)
return result
return wrapper
return decorator
# Usage example
cache_manager = CacheManager()
@cache_anomaly_detection(cache_manager)
def cached_anomaly_detection(symbol, timeframe):
"""Cached anomaly detection function"""
# Your anomaly detection logic here
pass
Monitoring and Alerting System
Real-Time Alert System
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import json
import requests
class AlertManager:
def __init__(self, config):
self.config = config
self.alert_thresholds = {
'HIGH': 0.8,
'MEDIUM': 0.5,
'LOW': 0.2
}
def send_email_alert(self, subject, body, recipient):
"""Send email alert"""
try:
msg = MIMEMultipart()
msg['From'] = self.config['email']['sender']
msg['To'] = recipient
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP(self.config['email']['smtp_server'], 587)
server.starttls()
server.login(
self.config['email']['username'],
self.config['email']['password']
)
server.send_message(msg)
server.quit()
return True
except Exception as e:
print(f"Email alert failed: {e}")
return False
def send_slack_alert(self, message):
"""Send Slack alert"""
try:
webhook_url = self.config['slack']['webhook_url']
payload = {
'text': message,
'channel': '#trading-alerts',
'username': 'AnomalyBot'
}
response = requests.post(webhook_url, json=payload)
return response.status_code == 200
except Exception as e:
print(f"Slack alert failed: {e}")
return False
def process_anomaly_alert(self, symbol, anomaly_data, ai_analysis):
"""Process and send anomaly alerts"""
severity = self.calculate_severity(anomaly_data)
if severity >= self.alert_thresholds['HIGH']:
alert_type = "🚨 HIGH PRIORITY"
color = "danger"
elif severity >= self.alert_thresholds['MEDIUM']:
alert_type = "⚠️ MEDIUM PRIORITY"
color = "warning"
else:
alert_type = "ℹ️ LOW PRIORITY"
color = "good"
# Create alert message
alert_message = f"""
{alert_type} TRADING ANOMALY DETECTED
Symbol: {symbol}
Severity Score: {severity:.3f}
Total Anomalies: {anomaly_data['total_anomalies']}
Anomaly Rate: {anomaly_data['anomaly_rate']:.2f}%
AI Analysis:
{ai_analysis[:500]}...
Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
# Send alerts based on severity
if severity >= self.alert_thresholds['HIGH']:
self.send_email_alert(
f"HIGH PRIORITY: {symbol} Trading Anomaly",
alert_message,
self.config['email']['recipients']['high_priority']
)
# Always send to Slack
self.send_slack_alert(alert_message)
def calculate_severity(self, anomaly_data):
"""Calculate alert severity score"""
# Combine multiple factors
rate_score = min(anomaly_data['anomaly_rate'] / 20, 1.0) # Normalize to 0-1
count_score = min(anomaly_data['total_anomalies'] / 100, 1.0)
avg_score = min(abs(anomaly_data['avg_anomaly_score']) / 2, 1.0)
# Weighted combination
severity = (rate_score * 0.4) + (count_score * 0.3) + (avg_score * 0.3)
return severity
# Alert configuration
alert_config = {
'email': {
'sender': 'alerts@yourcompany.com',
'username': 'your_email@gmail.com',
'password': 'your_app_password',
'smtp_server': 'smtp.gmail.com',
'recipients': {
'high_priority': 'trader@yourcompany.com'
}
},
'slack': {
'webhook_url': 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL'
}
}
# Initialize alert manager
alert_manager = AlertManager(alert_config)
# Example alert processing
for symbol, data in anomaly_results.items():
if data['analysis']['total_anomalies'] > 0:
# Get AI analysis
anomalies = data['results'][data['results']['anomaly'] == True]
ai_analysis = analyzer.analyze_anomaly_pattern(anomalies, symbol)
# Process alert
alert_manager.process_anomaly_alert(
symbol,
data['analysis'],
ai_analysis
)
Best Practices and Common Pitfalls
Data Quality Considerations
Data Validation Checklist:
- ✅ Check for missing timestamps
- ✅ Validate price ranges (no negative prices)
- ✅ Verify volume consistency
- ✅ Handle market holidays and closures
- ✅ Account for stock splits and dividends
Model Tuning Guidelines
Isolation Forest Parameters:
contamination: Start with 0.1 (10% anomalies expected)n_estimators: Use 100+ for productionmax_samples: Set to 'auto' for large datasetsrandom_state: Fix for reproducible results
Ollama Model Selection:
- Llama 3.1:8b: Best for general pattern analysis
- Mistral 7B: Faster processing, good for real-time
- CodeLlama: Use for debugging trading algorithms
Common Pitfalls to Avoid
Over-fitting to Historical Data
- Use rolling windows instead of fixed training sets
- Validate on out-of-sample data
- Regularly retrain models
Ignoring Market Conditions
- Adjust thresholds during high volatility periods
- Consider market hours and holidays
- Account for earnings announcements
Alert Fatigue
- Implement severity levels
- Use escalation policies
- Provide clear action items
Conclusion
Anomaly detection in trading data transforms raw market signals into actionable insights. By combining statistical methods with Ollama's AI capabilities, you create a robust system that:
- Identifies critical market anomalies before they impact your portfolio
- Provides AI-powered pattern analysis for better decision-making
- Scales efficiently from single-symbol monitoring to portfolio-wide surveillance
- Delivers real-time alerts that protect your trading operations
The techniques covered in this guide give you a solid foundation for building production-ready anomaly detection systems. Start with the basic implementation, then gradually add advanced features like multi-timeframe analysis and real-time streaming as your needs grow.
Remember that effective anomaly detection requires continuous refinement. Monitor your system's performance, tune parameters based on market conditions, and always validate alerts against actual trading outcomes.
Your trading data contains valuable signals hidden within the noise. With proper anomaly detection, you can discover these patterns and gain a competitive edge in the markets.
Ready to implement these techniques in your own trading system? Start with the basic setup and gradually add advanced features as you gain experience with the platform.