Your portfolio just lost 15% because you missed a crucial market signal while sleeping. Sound familiar?
Traditional trading approaches fail because human traders cannot monitor markets 24/7. A multi-agent trading system solves this problem by deploying specialized AI agents that work together continuously.
This guide shows you how to build an automated trading system using Ollama and CrewAI. You'll create agents that analyze markets, execute trades, and manage risk without human intervention.
What you'll learn:
- Set up Ollama and CrewAI for trading applications
- Create specialized trading agents with distinct roles
- Implement agent coordination and communication
- Deploy your system for live trading
- Monitor and optimize agent performance
What is a Multi-Agent Trading System?
A multi-agent trading system uses multiple AI agents to handle different trading tasks. Each agent specializes in specific functions like market analysis, risk management, or trade execution.
Why Multi-Agent Systems Beat Single-Agent Approaches
Single trading bots often fail because they try to do everything. Multi-agent systems distribute responsibilities:
- Market Analyst Agent: Processes news and technical indicators
- Risk Manager Agent: Calculates position sizes and stop losses
- Execution Agent: Places and monitors trades
- Portfolio Manager: Balances asset allocation
This specialization improves decision quality and reduces system failures.
Key Benefits of Multi-Agent Trading
Continuous Operation: Agents monitor markets 24/7 across global exchanges.
Specialized Expertise: Each agent focuses on specific trading aspects.
Fault Tolerance: If one agent fails, others continue operating.
Scalability: Add new agents for additional markets or strategies.
Prerequisites and Environment Setup
Technical Requirements
System Requirements:
- Python 3.9 or higher
- 8GB RAM minimum (16GB recommended)
- Stable internet connection
- API access to trading platforms
Required Knowledge:
- Basic Python programming
- Trading concepts (support, resistance, risk management)
- Command line operations
API Keys and Accounts
You'll need accounts with these services:
- Trading Platform: Alpaca, Interactive Brokers, or paper trading account
- Market Data Provider: Alpha Vantage, Polygon, or Yahoo Finance
- News API: NewsAPI or similar service
Installing Ollama and CrewAI
Step 1: Install Ollama
Ollama provides local language model inference for your trading agents.
# Install Ollama (macOS/Linux)
curl -fsSL https://ollama.ai/install.sh | sh
# Windows users: Download from https://ollama.ai/download
Download a trading-optimized model:
# Install a model suitable for financial analysis
ollama pull llama2:13b
# Verify installation
ollama list
Step 2: Install CrewAI Framework
CrewAI orchestrates multiple AI agents working together.
# Create virtual environment
python -m venv trading_env
source trading_env/bin/activate # On Windows: trading_env\Scripts\activate
# Install CrewAI and dependencies
pip install crewai
pip install crewai[tools]
pip install yfinance alpaca-trade-api pandas numpy
Step 3: Project Structure Setup
# Create project directory
mkdir multi_agent_trading
cd multi_agent_trading
# Create folder structure
mkdir -p {agents,tools,config,data,logs}
touch config/settings.py agents/__init__.py tools/__init__.py
Your project structure should look like:
multi_agent_trading/
├── agents/
│ ├── __init__.py
│ ├── market_analyst.py
│ ├── risk_manager.py
│ └── execution_agent.py
├── tools/
│ ├── market_data.py
│ └── trading_api.py
├── config/
│ └── settings.py
├── main.py
└── requirements.txt
Creating Your First Trading Agent
Market Analyst Agent Implementation
The Market Analyst Agent processes market data and generates trading signals.
# agents/market_analyst.py
from crewai import Agent, Task, Crew
from langchain.llms import Ollama
import yfinance as yf
import pandas as pd
class MarketAnalyst:
def __init__(self):
self.llm = Ollama(model="llama2:13b")
self.agent = Agent(
role="Market Analyst",
goal="Analyze market conditions and generate trading signals",
backstory="""You are an expert market analyst with 10 years of experience.
You specialize in technical analysis and market sentiment evaluation.""",
llm=self.llm,
verbose=True
)
def analyze_stock(self, symbol: str, period: str = "1mo") -> dict:
"""Analyze stock and return trading recommendation"""
# Fetch market data
stock = yf.Ticker(symbol)
data = stock.history(period=period)
# Calculate technical indicators
data['SMA_20'] = data['Close'].rolling(window=20).mean()
data['SMA_50'] = data['Close'].rolling(window=50).mean()
# Current price analysis
current_price = data['Close'].iloc[-1]
sma_20 = data['SMA_20'].iloc[-1]
sma_50 = data['SMA_50'].iloc[-1]
# Create analysis prompt
analysis_prompt = f"""
Analyze this stock data for {symbol}:
Current Price: ${current_price:.2f}
20-day SMA: ${sma_20:.2f}
50-day SMA: ${sma_50:.2f}
Provide a trading recommendation (BUY/SELL/HOLD) with reasoning.
Include confidence level (1-10) and suggested position size.
"""
# Get AI analysis
task = Task(
description=analysis_prompt,
agent=self.agent
)
crew = Crew(agents=[self.agent], tasks=[task])
result = crew.kickoff()
return {
'symbol': symbol,
'current_price': current_price,
'analysis': result,
'timestamp': pd.Timestamp.now()
}
Testing Your Market Analyst
# test_analyst.py
from agents.market_analyst import MarketAnalyst
# Initialize analyst
analyst = MarketAnalyst()
# Analyze Apple stock
result = analyst.analyze_stock('AAPL')
print(f"Analysis for {result['symbol']}: {result['analysis']}")
Building Multiple Specialized Agents
Risk Manager Agent
The Risk Manager calculates position sizes and manages portfolio risk.
# agents/risk_manager.py
from crewai import Agent, Task, Crew
from langchain.llms import Ollama
class RiskManager:
def __init__(self, max_position_size: float = 0.1):
self.llm = Ollama(model="llama2:13b")
self.max_position_size = max_position_size # 10% max per position
self.agent = Agent(
role="Risk Manager",
goal="Calculate optimal position sizes and manage portfolio risk",
backstory="""You are a conservative risk manager focused on capital preservation.
You never risk more than necessary and always consider worst-case scenarios.""",
llm=self.llm,
verbose=True
)
def calculate_position_size(self, account_balance: float, entry_price: float,
stop_loss: float, confidence: int) -> dict:
"""Calculate position size based on risk parameters"""
# Risk per trade (1-2% of account)
risk_per_trade = account_balance * 0.02
# Price risk per share
price_risk = abs(entry_price - stop_loss)
# Base position size
base_shares = risk_per_trade / price_risk if price_risk > 0 else 0
# Adjust for confidence level
confidence_multiplier = confidence / 10.0
adjusted_shares = int(base_shares * confidence_multiplier)
# Apply maximum position size limit
max_shares = int((account_balance * self.max_position_size) / entry_price)
final_shares = min(adjusted_shares, max_shares)
return {
'shares': final_shares,
'total_cost': final_shares * entry_price,
'risk_amount': final_shares * price_risk,
'percentage_risk': (final_shares * price_risk) / account_balance * 100
}
Execution Agent
The Execution Agent handles order placement and monitoring.
# agents/execution_agent.py
from crewai import Agent, Task, Crew
from langchain.llms import Ollama
import alpaca_trade_api as tradeapi
class ExecutionAgent:
def __init__(self, api_key: str, secret_key: str, paper: bool = True):
self.llm = Ollama(model="llama2:13b")
# Initialize Alpaca API
base_url = 'https://paper-api.alpaca.markets' if paper else 'https://api.alpaca.markets'
self.api = tradeapi.REST(api_key, secret_key, base_url, api_version='v2')
self.agent = Agent(
role="Trade Executor",
goal="Execute trades efficiently and monitor order status",
backstory="""You are a precise trade executor who ensures orders are
placed correctly and monitors execution quality.""",
llm=self.llm,
verbose=True
)
def place_order(self, symbol: str, quantity: int, side: str,
order_type: str = 'market') -> dict:
"""Place trading order through Alpaca API"""
try:
order = self.api.submit_order(
symbol=symbol,
qty=quantity,
side=side, # 'buy' or 'sell'
type=order_type,
time_in_force='day'
)
return {
'success': True,
'order_id': order.id,
'symbol': symbol,
'quantity': quantity,
'side': side,
'status': order.status
}
except Exception as e:
return {
'success': False,
'error': str(e),
'symbol': symbol,
'quantity': quantity,
'side': side
}
def get_account_info(self) -> dict:
"""Retrieve account balance and buying power"""
account = self.api.get_account()
return {
'cash': float(account.cash),
'buying_power': float(account.buying_power),
'portfolio_value': float(account.portfolio_value)
}
Implementing Agent Coordination
CrewAI Orchestration
Create a main controller that coordinates all agents:
# main.py
from crewai import Agent, Task, Crew
from agents.market_analyst import MarketAnalyst
from agents.risk_manager import RiskManager
from agents.execution_agent import ExecutionAgent
import time
class TradingSystemOrchestrator:
def __init__(self, api_key: str, secret_key: str, paper_trading: bool = True):
self.analyst = MarketAnalyst()
self.risk_manager = RiskManager()
self.executor = ExecutionAgent(api_key, secret_key, paper_trading)
# Watchlist of stocks to monitor
self.watchlist = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'NVDA']
def run_trading_cycle(self):
"""Execute one complete trading cycle"""
# Get account information
account_info = self.executor.get_account_info()
print(f"Account Balance: ${account_info['cash']:.2f}")
# Analyze each stock in watchlist
for symbol in self.watchlist:
print(f"\n--- Analyzing {symbol} ---")
# Step 1: Market Analysis
analysis = self.analyst.analyze_stock(symbol)
print(f"Analysis: {analysis['analysis']}")
# Step 2: Risk Assessment (example values)
if "BUY" in analysis['analysis'].upper():
entry_price = analysis['current_price']
stop_loss = entry_price * 0.95 # 5% stop loss
confidence = 7 # Extract from analysis in real implementation
position_calc = self.risk_manager.calculate_position_size(
account_balance=account_info['cash'],
entry_price=entry_price,
stop_loss=stop_loss,
confidence=confidence
)
print(f"Position calculation: {position_calc}")
# Step 3: Execute Trade
if position_calc['shares'] > 0:
order_result = self.executor.place_order(
symbol=symbol,
quantity=position_calc['shares'],
side='buy'
)
print(f"Order result: {order_result}")
# Wait between analyses to avoid rate limits
time.sleep(2)
def start_trading_system(self, interval_minutes: int = 15):
"""Start the automated trading system"""
print("Starting Multi-Agent Trading System...")
while True:
try:
self.run_trading_cycle()
print(f"\nWaiting {interval_minutes} minutes until next cycle...")
time.sleep(interval_minutes * 60)
except KeyboardInterrupt:
print("\nTrading system stopped by user.")
break
except Exception as e:
print(f"Error in trading cycle: {e}")
time.sleep(60) # Wait 1 minute before retrying
# Usage example
if __name__ == "__main__":
# Replace with your Alpaca API credentials
API_KEY = "your_alpaca_api_key"
SECRET_KEY = "your_alpaca_secret_key"
# Initialize trading system
trading_system = TradingSystemOrchestrator(API_KEY, SECRET_KEY, paper_trading=True)
# Start automated trading
trading_system.start_trading_system(interval_minutes=15)
Agent Communication Protocol
Implement structured communication between agents:
# tools/agent_communication.py
from dataclasses import dataclass
from typing import Dict, Any
import json
@dataclass
class Message:
sender: str
recipient: str
message_type: str
content: Dict[str, Any]
timestamp: str
class MessageBroker:
def __init__(self):
self.message_queue = []
self.agent_subscriptions = {}
def subscribe(self, agent_name: str, message_types: list):
"""Subscribe agent to specific message types"""
self.agent_subscriptions[agent_name] = message_types
def publish(self, message: Message):
"""Publish message to subscribed agents"""
self.message_queue.append(message)
# Notify subscribed agents
for agent, subscriptions in self.agent_subscriptions.items():
if message.message_type in subscriptions:
self.deliver_message(agent, message)
def deliver_message(self, agent_name: str, message: Message):
"""Deliver message to specific agent"""
print(f"Delivering {message.message_type} to {agent_name}: {message.content}")
Backtesting Your Multi-Agent System
Historical Data Testing
Before live trading, test your system with historical data:
# backtesting/backtest_engine.py
import pandas as pd
import yfinance as yf
from datetime import datetime, timedelta
class BacktestEngine:
def __init__(self, initial_capital: float = 100000):
self.initial_capital = initial_capital
self.current_capital = initial_capital
self.positions = {}
self.trade_history = []
def run_backtest(self, symbol: str, start_date: str, end_date: str):
"""Run backtest on historical data"""
# Download historical data
data = yf.download(symbol, start=start_date, end=end_date)
# Simulate trading decisions for each day
for date, row in data.iterrows():
# Your trading logic here
decision = self.make_trading_decision(symbol, row, date)
if decision['action'] == 'BUY':
self.execute_buy(symbol, decision['shares'], row['Close'], date)
elif decision['action'] == 'SELL':
self.execute_sell(symbol, decision['shares'], row['Close'], date)
return self.calculate_performance()
def make_trading_decision(self, symbol: str, price_data: pd.Series, date: datetime):
"""Simulate your agents' decision-making process"""
# Simplified example - replace with your agent logic
if len(self.trade_history) == 0: # First trade
return {'action': 'BUY', 'shares': 100}
# Random decision for demo
return {'action': 'HOLD', 'shares': 0}
def calculate_performance(self):
"""Calculate backtest performance metrics"""
total_return = ((self.current_capital - self.initial_capital) / self.initial_capital) * 100
return {
'initial_capital': self.initial_capital,
'final_capital': self.current_capital,
'total_return': total_return,
'total_trades': len(self.trade_history)
}
# Run backtest example
backtest = BacktestEngine(initial_capital=50000)
results = backtest.run_backtest('AAPL', '2024-01-01', '2024-12-31')
print(f"Backtest Results: {results}")
Deployment and Monitoring
Cloud Deployment Options
Option 1: AWS EC2 Deployment
# Launch EC2 instance with Ubuntu
# Install dependencies
sudo apt update
sudo apt install python3-pip python3-venv
# Clone your trading system
git clone your-repo-url
cd multi_agent_trading
# Setup environment
python3 -m venv trading_env
source trading_env/bin/activate
pip install -r requirements.txt
# Install Ollama on EC2
curl -fsSL https://ollama.ai/install.sh | sh
ollama pull llama2:13b
# Run with screen for persistence
screen -S trading_system
python main.py
Option 2: Docker Deployment
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y curl
# Install Ollama
RUN curl -fsSL https://ollama.ai/install.sh | sh
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy application code
COPY . .
# Pull Ollama model
RUN ollama serve & sleep 10 && ollama pull llama2:13b
# Start trading system
CMD ["python", "main.py"]
Monitoring and Alerting
# monitoring/system_monitor.py
import smtplib
from email.mime.text import MimeText
import logging
class TradingSystemMonitor:
def __init__(self, email_config: dict):
self.email_config = email_config
self.setup_logging()
def setup_logging(self):
"""Configure logging for system monitoring"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/trading_system.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def check_system_health(self, trading_system):
"""Monitor system health and send alerts"""
try:
# Check account balance
account_info = trading_system.executor.get_account_info()
# Alert if balance drops significantly
if account_info['portfolio_value'] < trading_system.initial_balance * 0.9:
self.send_alert(f"Portfolio down 10%: ${account_info['portfolio_value']}")
# Log system status
self.logger.info(f"System healthy - Portfolio: ${account_info['portfolio_value']}")
except Exception as e:
self.send_alert(f"System error: {str(e)}")
self.logger.error(f"Health check failed: {e}")
def send_alert(self, message: str):
"""Send email alert for important events"""
try:
msg = MimeText(message)
msg['Subject'] = 'Trading System Alert'
msg['From'] = self.email_config['from_email']
msg['To'] = self.email_config['to_email']
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)
server.quit()
self.logger.info(f"Alert sent: {message}")
except Exception as e:
self.logger.error(f"Failed to send alert: {e}")
Advanced Features and Optimization
Machine Learning Integration
Enhance your agents with machine learning capabilities:
# ml_models/price_predictor.py
from sklearn.ensemble import RandomForestRegressor
import pandas as pd
import numpy as np
class PricePredictor:
def __init__(self):
self.model = RandomForestRegressor(n_estimators=100, random_state=42)
self.is_trained = False
def prepare_features(self, data: pd.DataFrame) -> np.array:
"""Create features for ML model"""
features = []
# Technical indicators
data['SMA_5'] = data['Close'].rolling(5).mean()
data['SMA_20'] = data['Close'].rolling(20).mean()
data['RSI'] = self.calculate_rsi(data['Close'])
# Price changes
data['price_change'] = data['Close'].pct_change()
data['volume_change'] = data['Volume'].pct_change()
# Select feature columns
feature_cols = ['SMA_5', 'SMA_20', 'RSI', 'price_change', 'volume_change']
return data[feature_cols].dropna().values
def train(self, symbol: str, period: str = "2y"):
"""Train model on historical data"""
stock = yf.Ticker(symbol)
data = stock.history(period=period)
# Prepare features and targets
X = self.prepare_features(data)
y = data['Close'].shift(-1).dropna().values # Next day's price
# Align arrays
min_len = min(len(X), len(y))
X, y = X[:min_len], y[:min_len]
# Train model
self.model.fit(X, y)
self.is_trained = True
return self.model.score(X, y)
def predict_price(self, current_data: pd.DataFrame) -> float:
"""Predict next price movement"""
if not self.is_trained:
raise ValueError("Model must be trained first")
features = self.prepare_features(current_data)
return self.model.predict(features[-1].reshape(1, -1))[0]
Performance Optimization Tips
1. Reduce API Calls
# Cache market data to reduce API calls
import functools
import time
@functools.lru_cache(maxsize=128)
def get_cached_market_data(symbol: str, timestamp: int):
"""Cache market data for 5 minutes"""
return yf.Ticker(symbol).history(period="1d")
# Usage
current_time = int(time.time() // 300) * 300 # 5-minute intervals
data = get_cached_market_data('AAPL', current_time)
2. Parallel Agent Processing
import asyncio
async def analyze_symbols_parallel(symbols: list):
"""Analyze multiple symbols simultaneously"""
tasks = []
for symbol in symbols:
task = asyncio.create_task(analyze_symbol_async(symbol))
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
Troubleshooting Common Issues
Connection Problems
Ollama Connection Issues:
# Check if Ollama is running
ollama list
# Restart Ollama service
sudo systemctl restart ollama
# Check Ollama logs
sudo journalctl -u ollama -f
API Rate Limits:
# Implement exponential backoff
import time
import random
def api_call_with_retry(func, max_retries=3):
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise e
wait_time = (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait_time)
Memory Management
Ollama Memory Issues:
# Clear Ollama memory periodically
import subprocess
def clear_ollama_memory():
"""Clear Ollama model from memory"""
subprocess.run(['ollama', 'stop'], capture_output=True)
time.sleep(5)
subprocess.run(['ollama', 'serve'], capture_output=True)
Security Best Practices
API Key Protection
# config/secure_config.py
import os
from cryptography.fernet import Fernet
class SecureConfig:
def __init__(self):
self.key = os.environ.get('ENCRYPTION_KEY')
if not self.key:
self.key = Fernet.generate_key()
print(f"Generated encryption key: {self.key.decode()}")
self.cipher = Fernet(self.key)
def encrypt_api_key(self, api_key: str) -> str:
"""Encrypt API key for storage"""
return self.cipher.encrypt(api_key.encode()).decode()
def decrypt_api_key(self, encrypted_key: str) -> str:
"""Decrypt API key for use"""
return self.cipher.decrypt(encrypted_key.encode()).decode()
# Usage
config = SecureConfig()
encrypted_key = config.encrypt_api_key("your_api_key")
Network Security
# Use VPN for trading connections
import requests
def secure_api_request(url: str, headers: dict = None):
"""Make API request through secure connection"""
session = requests.Session()
# Set secure headers
secure_headers = {
'User-Agent': 'TradingBot/1.0',
'Accept': 'application/json',
'Connection': 'close'
}
if headers:
secure_headers.update(headers)
# Use timeout and SSL verification
response = session.get(
url,
headers=secure_headers,
timeout=30,
verify=True
)
session.close()
return response
Conclusion
You've successfully built a multi-agent trading system using Ollama and CrewAI. This automated trading platform combines specialized AI agents to analyze markets, manage risk, and execute trades without human intervention.
Key achievements:
- Created specialized trading agents with distinct roles
- Implemented agent coordination and communication
- Built backtesting and monitoring capabilities
- Deployed a secure, scalable trading system
Next steps:
- Add more sophisticated trading strategies
- Integrate additional data sources (news, social sentiment)
- Implement portfolio rebalancing agents
- Explore multi-asset trading capabilities
Your multi-agent trading system provides the foundation for automated trading success. Start with paper trading to validate performance before deploying real capital.
Remember: successful algorithmic trading requires continuous monitoring, regular strategy updates, and strict risk management. Always test thoroughly before risking real money.
Ready to deploy your trading system? Start with small position sizes and gradually scale up as you gain confidence in your agents' performance.
Disclaimer: This tutorial is for educational purposes only. Trading involves substantial risk of loss. Never trade with money you cannot afford to lose.