Your portfolio just lost 15% overnight because you missed a critical market signal. While you slept, algorithms made millions. Welcome to the harsh reality of modern trading—where human reflexes lose to machine precision every single time.
Building autonomous trading agents with Ollama AI transforms this disadvantage into your secret weapon. This guide shows you how to develop, backtest, and deploy AI-powered trading bots that never sleep, never panic, and execute strategies with millisecond precision.
What Are Autonomous Trading Agents?
Autonomous trading agents are AI-powered systems that execute trading decisions without human intervention. These systems analyze market data, identify opportunities, and place trades based on predefined strategies and machine learning models.
Key Components of AI Trading Systems
Data Processing Engine: Ingests real-time market data, news feeds, and technical indicators. The system processes thousands of data points per second to identify trading signals.
Decision Logic Module: Uses machine learning algorithms to evaluate market conditions and generate trading signals. This component applies risk management rules and portfolio optimization techniques.
Execution Framework: Connects to trading platforms through APIs to place orders, manage positions, and monitor trade execution. The framework handles order routing and trade confirmation.
Risk Management System: Monitors portfolio exposure, implements stop-loss mechanisms, and manages position sizing. This system prevents catastrophic losses through automated risk controls.
Setting Up Your Ollama AI Trading Environment
Prerequisites and Installation
First, install Ollama and required dependencies for your trading bot development environment:
# Install Ollama
curl -fsSL https://ollama.com/install.sh | sh
# Install Python dependencies
pip install ollama pandas numpy yfinance ta-lib backtrader websockets
# Install trading platform APIs
pip install alpaca-trade-api ccxt python-binance
Initial Configuration
Create your project structure for organized development:
# trading_bot_setup.py
import os
import json
from pathlib import Path
def create_project_structure():
"""Create organized directory structure for trading bot"""
directories = [
'config',
'data',
'strategies',
'models',
'backtesting',
'logs',
'utils'
]
for directory in directories:
Path(directory).mkdir(exist_ok=True)
# Create configuration file
config = {
"ollama_model": "llama3.1:8b",
"api_keys": {
"alpaca_key": "your_alpaca_key",
"alpaca_secret": "your_alpaca_secret"
},
"trading_params": {
"max_portfolio_risk": 0.02,
"position_size": 0.05,
"stop_loss": 0.03
}
}
with open('config/trading_config.json', 'w') as f:
json.dump(config, f, indent=2)
create_project_structure()
Developing Your Ollama AI Trading Strategy
Core Strategy Framework
Build the foundation for your AI trading agent with this comprehensive strategy class:
# strategies/ollama_strategy.py
import ollama
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import yfinance as yf
import json
class OllamaTradingStrategy:
def __init__(self, config_path='config/trading_config.json'):
"""Initialize Ollama trading strategy with configuration"""
with open(config_path, 'r') as f:
self.config = json.load(f)
self.model = self.config['ollama_model']
self.client = ollama.Client()
self.positions = {}
self.market_data = {}
def fetch_market_data(self, symbol, period='1d', interval='1m'):
"""Fetch real-time market data for analysis"""
try:
ticker = yf.Ticker(symbol)
data = ticker.history(period=period, interval=interval)
# Add technical indicators
data['SMA_20'] = data['Close'].rolling(window=20).mean()
data['SMA_50'] = data['Close'].rolling(window=50).mean()
data['RSI'] = self.calculate_rsi(data['Close'])
data['MACD'] = self.calculate_macd(data['Close'])
return data
except Exception as e:
print(f"Error fetching data for {symbol}: {e}")
return None
def calculate_rsi(self, prices, period=14):
"""Calculate Relative Strength Index"""
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def calculate_macd(self, prices, fast=12, slow=26, signal=9):
"""Calculate MACD indicator"""
exp1 = prices.ewm(span=fast).mean()
exp2 = prices.ewm(span=slow).mean()
macd = exp1 - exp2
signal_line = macd.ewm(span=signal).mean()
return macd - signal_line
def analyze_market_sentiment(self, symbol, market_data):
"""Use Ollama AI to analyze market sentiment and generate signals"""
# Prepare market context for AI analysis
latest_data = market_data.tail(10)
prompt = f"""
Analyze the following market data for {symbol} and provide a trading signal:
Recent Price Data:
{latest_data[['Open', 'High', 'Low', 'Close', 'Volume']].to_string()}
Technical Indicators:
- RSI: {latest_data['RSI'].iloc[-1]:.2f}
- MACD: {latest_data['MACD'].iloc[-1]:.4f}
- SMA 20: {latest_data['SMA_20'].iloc[-1]:.2f}
- SMA 50: {latest_data['SMA_50'].iloc[-1]:.2f}
- Current Price: {latest_data['Close'].iloc[-1]:.2f}
Based on this data, provide:
1. Trading Signal (BUY/SELL/HOLD)
2. Confidence Level (0-100%)
3. Risk Assessment (LOW/MEDIUM/HIGH)
4. Key reasoning points
Format your response as JSON with these exact keys:
{{
"signal": "BUY/SELL/HOLD",
"confidence": 85,
"risk": "MEDIUM",
"reasoning": ["point1", "point2", "point3"]
}}
"""
try:
response = self.client.generate(
model=self.model,
prompt=prompt,
format='json'
)
analysis = json.loads(response['response'])
return analysis
except Exception as e:
print(f"Error in AI analysis: {e}")
return {
"signal": "HOLD",
"confidence": 0,
"risk": "HIGH",
"reasoning": ["AI analysis failed"]
}
def execute_trade_signal(self, symbol, signal_data):
"""Execute trading decision based on AI signal"""
signal = signal_data['signal']
confidence = signal_data['confidence']
risk = signal_data['risk']
# Risk management checks
if confidence < 70:
print(f"Low confidence signal ({confidence}%) - skipping trade")
return False
if risk == "HIGH":
print(f"High risk signal - reducing position size")
position_size = self.config['trading_params']['position_size'] * 0.5
else:
position_size = self.config['trading_params']['position_size']
# Execute trade based on signal
if signal == "BUY":
return self.place_buy_order(symbol, position_size)
elif signal == "SELL":
return self.place_sell_order(symbol, position_size)
else:
print(f"HOLD signal for {symbol}")
return True
def place_buy_order(self, symbol, position_size):
"""Place buy order with risk management"""
# This would connect to your broker API
print(f"BUYING {symbol} with position size {position_size}")
# Implement stop-loss
current_price = self.get_current_price(symbol)
stop_loss_price = current_price * (1 - self.config['trading_params']['stop_loss'])
print(f"Stop loss set at ${stop_loss_price:.2f}")
return True
def place_sell_order(self, symbol, position_size):
"""Place sell order"""
print(f"SELLING {symbol} with position size {position_size}")
return True
def get_current_price(self, symbol):
"""Get current market price"""
ticker = yf.Ticker(symbol)
return ticker.info.get('currentPrice', 0)
def run_strategy(self, symbols, interval=60):
"""Main strategy execution loop"""
print(f"Starting Ollama AI Trading Strategy for {symbols}")
for symbol in symbols:
print(f"\nAnalyzing {symbol}...")
# Fetch market data
market_data = self.fetch_market_data(symbol)
if market_data is None:
continue
# AI analysis
signal_data = self.analyze_market_sentiment(symbol, market_data)
# Execute trading decision
self.execute_trade_signal(symbol, signal_data)
# Log results
self.log_trading_decision(symbol, signal_data)
def log_trading_decision(self, symbol, signal_data):
"""Log trading decisions for analysis"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'symbol': symbol,
'signal': signal_data['signal'],
'confidence': signal_data['confidence'],
'risk': signal_data['risk'],
'reasoning': signal_data['reasoning']
}
with open('logs/trading_decisions.json', 'a') as f:
f.write(json.dumps(log_entry) + '\n')
Advanced Strategy Components
Enhance your trading agent with sophisticated features:
# strategies/advanced_features.py
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import pickle
class AdvancedTradingFeatures:
def __init__(self):
self.feature_scaler = StandardScaler()
self.ml_model = RandomForestClassifier(n_estimators=100, random_state=42)
self.is_trained = False
def extract_features(self, market_data):
"""Extract comprehensive features for ML model"""
features = []
# Price-based features
features.extend([
market_data['Close'].iloc[-1],
market_data['Close'].pct_change().iloc[-1],
market_data['Close'].pct_change(5).iloc[-1],
market_data['Close'].pct_change(20).iloc[-1]
])
# Volume features
features.extend([
market_data['Volume'].iloc[-1],
market_data['Volume'].rolling(5).mean().iloc[-1],
market_data['Volume'].pct_change().iloc[-1]
])
# Technical indicators
features.extend([
market_data['RSI'].iloc[-1],
market_data['MACD'].iloc[-1],
market_data['SMA_20'].iloc[-1] - market_data['SMA_50'].iloc[-1]
])
# Volatility features
returns = market_data['Close'].pct_change()
features.extend([
returns.rolling(20).std().iloc[-1],
returns.rolling(5).std().iloc[-1]
])
return np.array(features).reshape(1, -1)
def train_ml_model(self, historical_data, labels):
"""Train machine learning model on historical data"""
features = []
for data in historical_data:
feature_vector = self.extract_features(data)
features.append(feature_vector.flatten())
X = np.array(features)
y = np.array(labels)
# Scale features
X_scaled = self.feature_scaler.fit_transform(X)
# Train model
self.ml_model.fit(X_scaled, y)
self.is_trained = True
# Save model
with open('models/ml_model.pkl', 'wb') as f:
pickle.dump((self.ml_model, self.feature_scaler), f)
print(f"ML model trained with accuracy: {self.ml_model.score(X_scaled, y):.2f}")
def predict_signal(self, market_data):
"""Use ML model to predict trading signal"""
if not self.is_trained:
return None
features = self.extract_features(market_data)
features_scaled = self.feature_scaler.transform(features)
prediction = self.ml_model.predict(features_scaled)[0]
probability = self.ml_model.predict_proba(features_scaled)[0].max()
return {
'ml_signal': prediction,
'ml_confidence': probability * 100
}
def portfolio_optimization(self, symbols, expected_returns, risk_matrix):
"""Optimize portfolio allocation using modern portfolio theory"""
# Simplified portfolio optimization
n_assets = len(symbols)
# Equal weight baseline
weights = np.ones(n_assets) / n_assets
# Adjust weights based on expected returns and risk
risk_adjusted_returns = expected_returns / np.diag(risk_matrix)
weights = risk_adjusted_returns / risk_adjusted_returns.sum()
return dict(zip(symbols, weights))
Backtesting Your Trading Strategy
Historical Performance Analysis
Test your strategy against historical data before live deployment:
# backtesting/strategy_backtest.py
import backtrader as bt
import pandas as pd
from datetime import datetime
class OllamaBacktestStrategy(bt.Strategy):
params = (
('rsi_period', 14),
('sma_short', 20),
('sma_long', 50),
('stop_loss', 0.03),
('take_profit', 0.06)
)
def __init__(self):
self.rsi = bt.indicators.RSI(self.data.close, period=self.params.rsi_period)
self.sma_short = bt.indicators.SMA(self.data.close, period=self.params.sma_short)
self.sma_long = bt.indicators.SMA(self.data.close, period=self.params.sma_long)
self.crossover = bt.indicators.CrossOver(self.sma_short, self.sma_long)
self.buy_signals = []
self.sell_signals = []
def next(self):
if not self.position:
# Buy conditions
if (self.crossover > 0 and
self.rsi < 70 and
self.data.close[0] > self.sma_short[0]):
self.buy(size=1000)
self.buy_signals.append(self.data.datetime.date(0))
# Set stop loss and take profit
self.sell(exectype=bt.Order.Stop,
price=self.data.close[0] * (1 - self.params.stop_loss))
self.sell(exectype=bt.Order.Limit,
price=self.data.close[0] * (1 + self.params.take_profit))
else:
# Sell conditions
if (self.crossover < 0 or
self.rsi > 80):
self.close()
self.sell_signals.append(self.data.datetime.date(0))
def run_backtest(symbol, start_date, end_date):
"""Run comprehensive backtest"""
# Initialize Cerebro engine
cerebro = bt.Cerebro()
# Add strategy
cerebro.addstrategy(OllamaBacktestStrategy)
# Get data
data = yf.download(symbol, start=start_date, end=end_date)
data_feed = bt.feeds.PandasData(dataname=data)
cerebro.adddata(data_feed)
# Set initial capital
cerebro.broker.setcash(100000)
# Add analyzers
cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
# Run backtest
results = cerebro.run()
# Extract performance metrics
strategy = results[0]
performance = {
'sharpe_ratio': strategy.analyzers.sharpe.get_analysis().get('sharperatio', 0),
'max_drawdown': strategy.analyzers.drawdown.get_analysis().get('max', {}).get('drawdown', 0),
'total_return': strategy.analyzers.returns.get_analysis().get('rtot', 0),
'total_trades': strategy.analyzers.trades.get_analysis().get('total', {}).get('total', 0),
'win_rate': strategy.analyzers.trades.get_analysis().get('won', {}).get('total', 0) /
max(strategy.analyzers.trades.get_analysis().get('total', {}).get('total', 1), 1)
}
return performance, cerebro
# Run backtest example
if __name__ == "__main__":
symbol = "AAPL"
start_date = "2023-01-01"
end_date = "2024-01-01"
performance, cerebro = run_backtest(symbol, start_date, end_date)
print("Backtest Results:")
print(f"Sharpe Ratio: {performance['sharpe_ratio']:.2f}")
print(f"Max Drawdown: {performance['max_drawdown']:.2f}%")
print(f"Total Return: {performance['total_return']:.2f}%")
print(f"Total Trades: {performance['total_trades']}")
print(f"Win Rate: {performance['win_rate']:.2f}%")
# Plot results
cerebro.plot()
Risk Management and Position Sizing
Dynamic Risk Control System
Implement sophisticated risk management to protect your capital:
# utils/risk_management.py
import numpy as np
import pandas as pd
from scipy import stats
class RiskManager:
def __init__(self, max_portfolio_risk=0.02, max_position_size=0.05):
self.max_portfolio_risk = max_portfolio_risk
self.max_position_size = max_position_size
self.positions = {}
self.portfolio_value = 100000 # Starting capital
def calculate_var(self, returns, confidence_level=0.95):
"""Calculate Value at Risk"""
if len(returns) < 30:
return 0.05 # Default VaR if insufficient data
return np.percentile(returns, (1 - confidence_level) * 100)
def calculate_position_size(self, symbol, signal_confidence, volatility):
"""Calculate optimal position size using Kelly Criterion"""
# Kelly fraction calculation
win_prob = signal_confidence / 100
loss_prob = 1 - win_prob
# Simplified Kelly fraction
kelly_fraction = (win_prob * 2 - loss_prob) / 2
# Adjust for volatility
volatility_adjustment = min(1.0, 0.2 / volatility)
# Final position size
position_size = min(
kelly_fraction * volatility_adjustment,
self.max_position_size
)
return max(0.01, position_size) # Minimum 1% position
def check_correlation_risk(self, new_symbol, existing_positions):
"""Check correlation risk with existing positions"""
if len(existing_positions) < 2:
return True
# Simplified correlation check
# In practice, you'd calculate actual correlations
sector_exposure = {}
for symbol in existing_positions:
sector = self.get_sector(symbol)
sector_exposure[sector] = sector_exposure.get(sector, 0) + 1
new_sector = self.get_sector(new_symbol)
if sector_exposure.get(new_sector, 0) >= 3:
return False # Too much sector concentration
return True
def get_sector(self, symbol):
"""Get sector for symbol (simplified)"""
# In practice, you'd use a proper sector mapping
tech_stocks = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'META']
if symbol in tech_stocks:
return 'Technology'
return 'Other'
def update_portfolio_value(self, new_value):
"""Update portfolio value for risk calculations"""
self.portfolio_value = new_value
def get_risk_metrics(self):
"""Get current portfolio risk metrics"""
total_exposure = sum(pos['size'] for pos in self.positions.values())
return {
'total_exposure': total_exposure,
'portfolio_utilization': total_exposure / self.portfolio_value,
'max_risk_per_trade': self.max_portfolio_risk,
'position_count': len(self.positions)
}
Deployment and Live Trading
Production Deployment Setup
Deploy your trading bot for live market execution:
# deployment/live_trading.py
import asyncio
import websockets
import json
import logging
from datetime import datetime
import alpaca_trade_api as tradeapi
class LiveTradingBot:
def __init__(self, config):
self.config = config
self.strategy = OllamaTradingStrategy()
self.risk_manager = RiskManager()
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/live_trading.log'),
logging.StreamHandler()
]
)
# Initialize broker API
self.api = tradeapi.REST(
config['api_keys']['alpaca_key'],
config['api_keys']['alpaca_secret'],
base_url='https://paper-api.alpaca.markets' # Paper trading
)
self.is_running = False
self.symbols = ['AAPL', 'TSLA', 'MSFT', 'GOOGL']
async def start_trading(self):
"""Start live trading loop"""
self.is_running = True
logging.info("Starting live trading bot...")
while self.is_running:
try:
await self.trading_cycle()
await asyncio.sleep(60) # Run every minute
except Exception as e:
logging.error(f"Error in trading cycle: {e}")
await asyncio.sleep(10)
async def trading_cycle(self):
"""Execute one trading cycle"""
market_hours = self.api.get_clock()
if not market_hours.is_open:
logging.info("Market is closed")
return
# Get account info
account = self.api.get_account()
self.risk_manager.update_portfolio_value(float(account.equity))
# Process each symbol
for symbol in self.symbols:
try:
await self.process_symbol(symbol)
except Exception as e:
logging.error(f"Error processing {symbol}: {e}")
async def process_symbol(self, symbol):
"""Process trading signal for a symbol"""
# Get market data
market_data = self.strategy.fetch_market_data(symbol)
if market_data is None:
return
# Generate AI signal
signal_data = self.strategy.analyze_market_sentiment(symbol, market_data)
# Risk management checks
if not self.risk_manager.check_correlation_risk(symbol, self.get_current_positions()):
logging.info(f"Skipping {symbol} due to correlation risk")
return
# Calculate position size
volatility = market_data['Close'].pct_change().std()
position_size = self.risk_manager.calculate_position_size(
symbol, signal_data['confidence'], volatility
)
# Execute trade
await self.execute_trade(symbol, signal_data, position_size)
async def execute_trade(self, symbol, signal_data, position_size):
"""Execute trade through broker API"""
signal = signal_data['signal']
if signal == 'BUY':
try:
order = self.api.submit_order(
symbol=symbol,
qty=int(position_size * 1000), # Convert to shares
side='buy',
type='market',
time_in_force='gtc'
)
logging.info(f"Buy order submitted for {symbol}: {order.id}")
except Exception as e:
logging.error(f"Error submitting buy order for {symbol}: {e}")
elif signal == 'SELL':
try:
# Check if we have position to sell
position = self.api.get_position(symbol)
if int(position.qty) > 0:
order = self.api.submit_order(
symbol=symbol,
qty=min(int(position.qty), int(position_size * 1000)),
side='sell',
type='market',
time_in_force='gtc'
)
logging.info(f"Sell order submitted for {symbol}: {order.id}")
except Exception as e:
if "position does not exist" not in str(e):
logging.error(f"Error submitting sell order for {symbol}: {e}")
def get_current_positions(self):
"""Get current positions from broker"""
try:
positions = self.api.list_positions()
return {pos.symbol: {'size': float(pos.qty), 'value': float(pos.market_value)}
for pos in positions}
except Exception as e:
logging.error(f"Error getting positions: {e}")
return {}
def stop_trading(self):
"""Stop trading bot"""
self.is_running = False
logging.info("Trading bot stopped")
# Main execution
async def main():
# Load configuration
with open('config/trading_config.json', 'r') as f:
config = json.load(f)
# Create and start trading bot
bot = LiveTradingBot(config)
try:
await bot.start_trading()
except KeyboardInterrupt:
bot.stop_trading()
logging.info("Trading bot shutdown complete")
if __name__ == "__main__":
asyncio.run(main())
Performance Monitoring and Optimization
Real-time Performance Tracking
Monitor your trading bot's performance with comprehensive analytics:
# utils/performance_monitor.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import json
class PerformanceMonitor:
def __init__(self):
self.trades = []
self.portfolio_values = []
self.drawdowns = []
def record_trade(self, symbol, side, quantity, price, timestamp=None):
"""Record executed trade"""
if timestamp is None:
timestamp = datetime.now()
trade = {
'timestamp': timestamp,
'symbol': symbol,
'side': side,
'quantity': quantity,
'price': price,
'value': quantity * price
}
self.trades.append(trade)
self.save_trade_record(trade)
def calculate_performance_metrics(self):
"""Calculate comprehensive performance metrics"""
if not self.trades:
return {}
df = pd.DataFrame(self.trades)
df['timestamp'] = pd.to_datetime(df['timestamp'])
# Calculate returns
df['pnl'] = df.apply(lambda x: x['value'] if x['side'] == 'sell' else -x['value'], axis=1)
df['cumulative_pnl'] = df['pnl'].cumsum()
# Performance metrics
total_trades = len(df)
winning_trades = len(df[df['pnl'] > 0])
losing_trades = len(df[df['pnl'] < 0])
win_rate = winning_trades / total_trades if total_trades > 0 else 0
avg_win = df[df['pnl'] > 0]['pnl'].mean() if winning_trades > 0 else 0
avg_loss = df[df['pnl'] < 0]['pnl'].mean() if losing_trades > 0 else 0
profit_factor = abs(avg_win * winning_trades / (avg_loss * losing_trades)) if losing_trades > 0 else float('inf')
# Calculate Sharpe ratio
if len(df) > 1:
returns = df['pnl'].pct_change().dropna()
sharpe_ratio = returns.mean() / returns.std() * np.sqrt(252) if returns.std() > 0 else 0
else:
sharpe_ratio = 0
# Maximum drawdown
peak = df['cumulative_pnl'].expanding().max()
drawdown = (df['cumulative_pnl'] - peak) / peak
max_drawdown = drawdown.min()
return {
'total_trades': total_trades,
'winning_trades': winning_trades,
'losing_trades': losing_trades,
'win_rate': win_rate,
'avg_win': avg_win,
'avg_loss': avg_loss,
'profit_factor': profit_factor,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'total_pnl': df['cumulative_pnl'].iloc[-1],
'current_streak': self.calculate_current_streak(df)
}
def calculate_current_streak(self, df):
"""Calculate current winning/losing streak"""
if df.empty:
return 0
recent_trades = df.tail(10)['pnl']
streak = 0
for pnl in reversed(recent_trades.tolist()):
if pnl > 0:
if streak >= 0:
streak += 1
else:
break
elif pnl < 0:
if streak <= 0:
streak -= 1
else:
break
return streak
def generate_performance_report(self):
"""Generate comprehensive performance report"""
metrics = self.calculate_performance_metrics()
if not metrics:
return "No trading data available"
report = f"""
📊 TRADING PERFORMANCE REPORT
================================
📈 Trade Statistics:
- Total Trades: {metrics['total_trades']}
- Winning Trades: {metrics['winning_trades']}
- Losing Trades: {metrics['losing_trades']}
- Win Rate: {metrics['win_rate']:.2%}
- Current Streak: {metrics['current_streak']}
💰 Profitability:
- Total P&L: ${metrics['total_pnl']:.2f}
- Average Win: ${metrics['avg_win']:.2f}
- Average Loss: ${metrics['avg_loss']:.2f}
- Profit Factor: {metrics['profit_factor']:.2f}
📊 Risk Metrics:
- Sharpe Ratio: {metrics['sharpe_ratio']:.2f}
- Max Drawdown: {metrics['max_drawdown']:.2%}
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
return report
def save_trade_record(self, trade):
"""Save trade record to file"""
with open('logs/trade_history.json', 'a') as f:
f.write(json.dumps(trade, default=str) + '\n')
def plot_performance_charts(self):
"""Generate performance visualization charts"""
if not self.trades:
print("No trading data to plot")
return
df = pd.DataFrame(self.trades)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['pnl'] = df.apply(lambda x: x['value'] if x['side'] == 'sell' else -x['value'], axis=1)
df['cumulative_pnl'] = df['pnl'].cumsum()
# Create subplots
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
# Cumulative P&L
ax1.plot(df['timestamp'], df['cumulative_pnl'], linewidth=2, color='blue')
ax1.set_title('Cumulative P&L Over Time')
ax1.set_ylabel('P&L ($)')
ax1.grid(True, alpha=0.3)
# Trade distribution
ax2.hist(df['pnl'], bins=20, alpha=0.7, color='green', edgecolor='black')
ax2.set_title('Trade P&L Distribution')
ax2.set_xlabel('P&L ($)')
ax2.set_ylabel('Frequency')
ax2.grid(True, alpha=0.3)
# Monthly performance
df_monthly = df.groupby(df['timestamp'].dt.to_period('M'))['pnl'].sum()
ax3.bar(range(len(df_monthly)), df_monthly.values, color='orange', alpha=0.7)
ax3.set_title('Monthly Performance')
ax3.set_xlabel('Month')
ax3.set_ylabel('P&L ($)')
ax3.grid(True, alpha=0.3)
# Rolling Sharpe ratio
rolling_returns = df['pnl'].rolling(window=20).mean()
rolling_std = df['pnl'].rolling(window=20).std()
rolling_sharpe = rolling_returns / rolling_std * np.sqrt(252)
ax4.plot(df['timestamp'], rolling_sharpe, linewidth=2, color='purple')
ax4.set_title('Rolling Sharpe Ratio (20-period)')
ax4.set_ylabel('Sharpe Ratio')
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('logs/performance_charts.png', dpi=300, bbox_inches='tight')
plt.show()
Advanced Strategy Optimization
Hyperparameter Tuning and Strategy Enhancement
Optimize your trading strategy parameters for maximum performance:
# optimization/strategy_optimizer.py
import itertools
import numpy as np
from concurrent.futures import ProcessPoolExecutor
import json
class StrategyOptimizer:
def __init__(self, strategy_class, data_source):
self.strategy_class = strategy_class
self.data_source = data_source
self.optimization_results = []
def define_parameter_space(self):
"""Define parameter space for optimization"""
return {
'rsi_period': range(10, 21, 2),
'sma_short': range(15, 26, 5),
'sma_long': range(45, 56, 5),
'stop_loss': [0.02, 0.03, 0.04, 0.05],
'take_profit': [0.04, 0.06, 0.08, 0.10],
'confidence_threshold': [60, 65, 70, 75, 80]
}
def objective_function(self, params):
"""Objective function for optimization"""
try:
# Run backtest with parameters
performance = self.run_backtest_with_params(params)
# Multi-objective optimization
sharpe_ratio = performance.get('sharpe_ratio', 0)
max_drawdown = performance.get('max_drawdown', 100)
win_rate = performance.get('win_rate', 0)
total_trades = performance.get('total_trades', 0)
# Penalty for too few trades
if total_trades < 10:
return -999
# Penalty for excessive drawdown
if max_drawdown > 20:
return -999
# Combined score
score = (sharpe_ratio * 0.4 +
win_rate * 0.3 +
(100 - max_drawdown) / 100 * 0.3)
return score
except Exception as e:
print(f"Error in objective function: {e}")
return -999
def run_backtest_with_params(self, params):
"""Run backtest with specific parameters"""
# This would integrate with your backtesting framework
# For demonstration, returning mock results
np.random.seed(42)
return {
'sharpe_ratio': np.random.uniform(0.5, 2.0),
'max_drawdown': np.random.uniform(5, 15),
'win_rate': np.random.uniform(0.4, 0.7),
'total_trades': np.random.randint(20, 100),
'total_return': np.random.uniform(0.05, 0.25)
}
def grid_search_optimization(self):
"""Perform grid search optimization"""
param_space = self.define_parameter_space()
# Generate all parameter combinations
param_names = list(param_space.keys())
param_values = list(param_space.values())
best_score = -float('inf')
best_params = None
total_combinations = np.prod([len(v) for v in param_values])
print(f"Testing {total_combinations} parameter combinations...")
for i, params in enumerate(itertools.product(*param_values)):
param_dict = dict(zip(param_names, params))
score = self.objective_function(param_dict)
if score > best_score:
best_score = score
best_params = param_dict
self.optimization_results.append({
'params': param_dict,
'score': score
})
if i % 100 == 0:
print(f"Progress: {i}/{total_combinations} ({i/total_combinations:.1%})")
return best_params, best_score
def save_optimization_results(self):
"""Save optimization results to file"""
with open('optimization/optimization_results.json', 'w') as f:
json.dump(self.optimization_results, f, indent=2)
def analyze_parameter_sensitivity(self):
"""Analyze parameter sensitivity"""
if not self.optimization_results:
return
param_analysis = {}
for param_name in self.define_parameter_space().keys():
param_scores = {}
for result in self.optimization_results:
param_value = result['params'][param_name]
score = result['score']
if param_value not in param_scores:
param_scores[param_value] = []
param_scores[param_value].append(score)
# Calculate average score for each parameter value
param_analysis[param_name] = {
value: np.mean(scores)
for value, scores in param_scores.items()
}
return param_analysis
# Usage example
if __name__ == "__main__":
optimizer = StrategyOptimizer(OllamaTradingStrategy, "historical_data.csv")
# Run optimization
best_params, best_score = optimizer.grid_search_optimization()
print(f"Best parameters: {best_params}")
print(f"Best score: {best_score:.4f}")
# Save results
optimizer.save_optimization_results()
# Analyze sensitivity
sensitivity = optimizer.analyze_parameter_sensitivity()
print("\nParameter Sensitivity Analysis:")
for param, values in sensitivity.items():
print(f"\n{param}:")
for value, score in sorted(values.items(), key=lambda x: x[1], reverse=True)[:3]:
print(f" {value}: {score:.4f}")
Real-World Implementation Considerations
Production-Ready Features
Implement essential features for professional trading bot deployment:
# production/production_features.py
import sqlite3
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests
import threading
import time
from datetime import datetime, timedelta
class ProductionTradingBot:
def __init__(self, config):
self.config = config
self.db_connection = self.setup_database()
self.alert_system = AlertSystem(config)
self.health_monitor = HealthMonitor(self)
def setup_database(self):
"""Setup SQLite database for trade storage"""
conn = sqlite3.connect('trading_bot.db')
cursor = conn.cursor()
# Create tables
cursor.execute('''
CREATE TABLE IF NOT EXISTS trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
symbol TEXT NOT NULL,
side TEXT NOT NULL,
quantity REAL NOT NULL,
price REAL NOT NULL,
pnl REAL,
strategy TEXT,
confidence REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS system_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
level TEXT NOT NULL,
message TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
return conn
def log_trade(self, trade_data):
"""Log trade to database"""
cursor = self.db_connection.cursor()
cursor.execute('''
INSERT INTO trades (timestamp, symbol, side, quantity, price, pnl, strategy, confidence)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
trade_data['timestamp'],
trade_data['symbol'],
trade_data['side'],
trade_data['quantity'],
trade_data['price'],
trade_data.get('pnl', 0),
trade_data.get('strategy', 'ollama_ai'),
trade_data.get('confidence', 0)
))
self.db_connection.commit()
def get_trade_history(self, days=30):
"""Get trade history from database"""
cursor = self.db_connection.cursor()
cursor.execute('''
SELECT * FROM trades
WHERE datetime(timestamp) > datetime('now', '-{} days')
ORDER BY timestamp DESC
'''.format(days))
return cursor.fetchall()
def emergency_stop(self):
"""Emergency stop all trading activities"""
self.alert_system.send_alert(
"EMERGENCY STOP",
"Trading bot has been emergency stopped!",
priority="HIGH"
)
# Close all positions
try:
positions = self.api.list_positions()
for position in positions:
self.api.close_position(position.symbol)
except Exception as e:
self.alert_system.send_alert(
"EMERGENCY STOP ERROR",
f"Failed to close positions: {e}",
priority="CRITICAL"
)
def check_account_health(self):
"""Check account health and risk levels"""
try:
account = self.api.get_account()
equity = float(account.equity)
day_trade_buying_power = float(account.day_trade_buying_power)
# Check for margin call
if hasattr(account, 'initial_margin') and float(account.initial_margin) > equity * 0.8:
self.alert_system.send_alert(
"MARGIN WARNING",
f"High margin usage detected. Equity: ${equity:.2f}",
priority="HIGH"
)
# Check for low buying power
if day_trade_buying_power < equity * 0.1:
self.alert_system.send_alert(
"LOW BUYING POWER",
f"Buying power low: ${day_trade_buying_power:.2f}",
priority="MEDIUM"
)
return True
except Exception as e:
self.alert_system.send_alert(
"HEALTH CHECK ERROR",
f"Failed to check account health: {e}",
priority="HIGH"
)
return False
class AlertSystem:
def __init__(self, config):
self.config = config
self.email_config = config.get('email', {})
self.slack_webhook = config.get('slack_webhook')
def send_alert(self, subject, message, priority="MEDIUM"):
"""Send alert through multiple channels"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
alert_data = {
'timestamp': timestamp,
'subject': subject,
'message': message,
'priority': priority
}
# Send email alert
if self.email_config:
self.send_email_alert(alert_data)
# Send Slack alert
if self.slack_webhook:
self.send_slack_alert(alert_data)
# Log alert
print(f"[{timestamp}] {priority}: {subject} - {message}")
def send_email_alert(self, alert_data):
"""Send email alert"""
try:
msg = MIMEMultipart()
msg['From'] = self.email_config['from']
msg['To'] = self.email_config['to']
msg['Subject'] = f"Trading Bot Alert: {alert_data['subject']}"
body = f"""
Trading Bot Alert
Time: {alert_data['timestamp']}
Priority: {alert_data['priority']}
Subject: {alert_data['subject']}
Message:
{alert_data['message']}
"""
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
server.starttls()
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)
server.quit()
except Exception as e:
print(f"Failed to send email alert: {e}")
def send_slack_alert(self, alert_data):
"""Send Slack alert"""
try:
payload = {
'text': f"🤖 Trading Bot Alert",
'attachments': [{
'color': 'danger' if alert_data['priority'] == 'HIGH' else 'warning',
'fields': [
{'title': 'Priority', 'value': alert_data['priority'], 'short': True},
{'title': 'Subject', 'value': alert_data['subject'], 'short': True},
{'title': 'Message', 'value': alert_data['message'], 'short': False}
],
'footer': 'Trading Bot',
'ts': int(time.time())
}]
}
requests.post(self.slack_webhook, json=payload)
except Exception as e:
print(f"Failed to send Slack alert: {e}")
class HealthMonitor:
def __init__(self, bot):
self.bot = bot
self.is_monitoring = False
self.monitor_thread = None
def start_monitoring(self):
"""Start health monitoring in separate thread"""
self.is_monitoring = True
self.monitor_thread = threading.Thread(target=self.monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def monitor_loop(self):
"""Main monitoring loop"""
while self.is_monitoring:
try:
# Check account health
if not self.bot.check_account_health():
self.bot.emergency_stop()
# Check system resources
self.check_system_resources()
# Check for stuck orders
self.check_stuck_orders()
# Wait before next check
time.sleep(60) # Check every minute
except Exception as e:
self.bot.alert_system.send_alert(
"MONITOR ERROR",
f"Health monitor error: {e}",
priority="HIGH"
)
def check_system_resources(self):
"""Check system resource usage"""
import psutil
# Check memory usage
memory_percent = psutil.virtual_memory().percent
if memory_percent > 85:
self.bot.alert_system.send_alert(
"HIGH MEMORY USAGE",
f"Memory usage: {memory_percent:.1f}%",
priority="MEDIUM"
)
# Check CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > 80:
self.bot.alert_system.send_alert(
"HIGH CPU USAGE",
f"CPU usage: {cpu_percent:.1f}%",
priority="MEDIUM"
)
def check_stuck_orders(self):
"""Check for orders that haven't been filled"""
try:
orders = self.bot.api.list_orders(status='open')
current_time = datetime.now()
for order in orders:
order_time = datetime.fromisoformat(order.created_at.replace('Z', '+00:00'))
if current_time - order_time > timedelta(minutes=15):
self.bot.alert_system.send_alert(
"STUCK ORDER",
f"Order {order.id} for {order.symbol} has been open for >15 minutes",
priority="MEDIUM"
)
except Exception as e:
print(f"Error checking stuck orders: {e}")
def stop_monitoring(self):
"""Stop health monitoring"""
self.is_monitoring = False
if self.monitor_thread:
self.monitor_thread.join()
Conclusion
Building autonomous trading agents with Ollama AI revolutionizes your trading approach through intelligent automation and sophisticated risk management. This comprehensive system combines artificial intelligence with proven trading strategies to create a robust, scalable trading solution.
The key benefits of implementing this Ollama AI trading bot include 24/7 market monitoring, emotion-free decision making, consistent strategy execution, and advanced risk management. Your trading system now operates with machine precision while leveraging AI insights for superior market analysis.
Start with paper trading to validate your strategy, then gradually transition to live markets with proper risk controls. The combination of Ollama AI's analytical capabilities with systematic trading approaches creates a powerful foundation for autonomous trading success.
Remember that successful algorithmic trading requires continuous monitoring, regular strategy optimization, and strict adherence to risk management principles. Your autonomous trading agent is now ready to capture market opportunities while you focus on strategy development and portfolio growth.
Ready to deploy your Ollama AI trading bot? Begin with the backtesting framework, optimize your parameters, and start building your path to automated trading success.