How to Create Multi-Agent Trading System using Ollama and CrewAI: Complete 2025 Guide

Build automated trading systems with AI agents using Ollama and CrewAI. Step-by-step tutorial with code examples and deployment tips.

Your portfolio just lost 15% because you missed a crucial market signal while sleeping. Sound familiar?

Traditional trading approaches fail because human traders cannot monitor markets 24/7. A multi-agent trading system solves this problem by deploying specialized AI agents that work together continuously.

This guide shows you how to build an automated trading system using Ollama and CrewAI. You'll create agents that analyze markets, execute trades, and manage risk without human intervention.

What you'll learn:

  • Set up Ollama and CrewAI for trading applications
  • Create specialized trading agents with distinct roles
  • Implement agent coordination and communication
  • Deploy your system for live trading
  • Monitor and optimize agent performance

What is a Multi-Agent Trading System?

A multi-agent trading system uses multiple AI agents to handle different trading tasks. Each agent specializes in specific functions like market analysis, risk management, or trade execution.

Why Multi-Agent Systems Beat Single-Agent Approaches

Single trading bots often fail because they try to do everything. Multi-agent systems distribute responsibilities:

  • Market Analyst Agent: Processes news and technical indicators
  • Risk Manager Agent: Calculates position sizes and stop losses
  • Execution Agent: Places and monitors trades
  • Portfolio Manager: Balances asset allocation

This specialization improves decision quality and reduces system failures.

Key Benefits of Multi-Agent Trading

Continuous Operation: Agents monitor markets 24/7 across global exchanges.

Specialized Expertise: Each agent focuses on specific trading aspects.

Fault Tolerance: If one agent fails, others continue operating.

Scalability: Add new agents for additional markets or strategies.

Prerequisites and Environment Setup

Technical Requirements

System Requirements:

  • Python 3.9 or higher
  • 8GB RAM minimum (16GB recommended)
  • Stable internet connection
  • API access to trading platforms

Required Knowledge:

  • Basic Python programming
  • Trading concepts (support, resistance, risk management)
  • Command line operations

API Keys and Accounts

You'll need accounts with these services:

  1. Trading Platform: Alpaca, Interactive Brokers, or paper trading account
  2. Market Data Provider: Alpha Vantage, Polygon, or Yahoo Finance
  3. News API: NewsAPI or similar service
API Key Configuration Interface

Installing Ollama and CrewAI

Step 1: Install Ollama

Ollama provides local language model inference for your trading agents.

# Install Ollama (macOS/Linux)
curl -fsSL https://ollama.ai/install.sh | sh

# Windows users: Download from https://ollama.ai/download

Download a trading-optimized model:

# Install a model suitable for financial analysis
ollama pull llama2:13b

# Verify installation
ollama list

Step 2: Install CrewAI Framework

CrewAI orchestrates multiple AI agents working together.

# Create virtual environment
python -m venv trading_env
source trading_env/bin/activate  # On Windows: trading_env\Scripts\activate

# Install CrewAI and dependencies
pip install crewai
pip install crewai[tools]
pip install yfinance alpaca-trade-api pandas numpy

Step 3: Project Structure Setup

# Create project directory
mkdir multi_agent_trading
cd multi_agent_trading

# Create folder structure
mkdir -p {agents,tools,config,data,logs}
touch config/settings.py agents/__init__.py tools/__init__.py

Your project structure should look like:

multi_agent_trading/
├── agents/
│   ├── __init__.py
│   ├── market_analyst.py
│   ├── risk_manager.py
│   └── execution_agent.py
├── tools/
│   ├── market_data.py
│   └── trading_api.py
├── config/
│   └── settings.py
├── main.py
└── requirements.txt

Creating Your First Trading Agent

Market Analyst Agent Implementation

The Market Analyst Agent processes market data and generates trading signals.

# agents/market_analyst.py
from crewai import Agent, Task, Crew
from langchain.llms import Ollama
import yfinance as yf
import pandas as pd

class MarketAnalyst:
    def __init__(self):
        self.llm = Ollama(model="llama2:13b")
        self.agent = Agent(
            role="Market Analyst",
            goal="Analyze market conditions and generate trading signals",
            backstory="""You are an expert market analyst with 10 years of experience.
            You specialize in technical analysis and market sentiment evaluation.""",
            llm=self.llm,
            verbose=True
        )
    
    def analyze_stock(self, symbol: str, period: str = "1mo") -> dict:
        """Analyze stock and return trading recommendation"""
        
        # Fetch market data
        stock = yf.Ticker(symbol)
        data = stock.history(period=period)
        
        # Calculate technical indicators
        data['SMA_20'] = data['Close'].rolling(window=20).mean()
        data['SMA_50'] = data['Close'].rolling(window=50).mean()
        
        # Current price analysis
        current_price = data['Close'].iloc[-1]
        sma_20 = data['SMA_20'].iloc[-1]
        sma_50 = data['SMA_50'].iloc[-1]
        
        # Create analysis prompt
        analysis_prompt = f"""
        Analyze this stock data for {symbol}:
        Current Price: ${current_price:.2f}
        20-day SMA: ${sma_20:.2f}
        50-day SMA: ${sma_50:.2f}
        
        Provide a trading recommendation (BUY/SELL/HOLD) with reasoning.
        Include confidence level (1-10) and suggested position size.
        """
        
        # Get AI analysis
        task = Task(
            description=analysis_prompt,
            agent=self.agent
        )
        
        crew = Crew(agents=[self.agent], tasks=[task])
        result = crew.kickoff()
        
        return {
            'symbol': symbol,
            'current_price': current_price,
            'analysis': result,
            'timestamp': pd.Timestamp.now()
        }

Testing Your Market Analyst

# test_analyst.py
from agents.market_analyst import MarketAnalyst

# Initialize analyst
analyst = MarketAnalyst()

# Analyze Apple stock
result = analyst.analyze_stock('AAPL')
print(f"Analysis for {result['symbol']}: {result['analysis']}")
Market Analyst Output Screenshot

Building Multiple Specialized Agents

Risk Manager Agent

The Risk Manager calculates position sizes and manages portfolio risk.

# agents/risk_manager.py
from crewai import Agent, Task, Crew
from langchain.llms import Ollama

class RiskManager:
    def __init__(self, max_position_size: float = 0.1):
        self.llm = Ollama(model="llama2:13b")
        self.max_position_size = max_position_size  # 10% max per position
        
        self.agent = Agent(
            role="Risk Manager",
            goal="Calculate optimal position sizes and manage portfolio risk",
            backstory="""You are a conservative risk manager focused on capital preservation.
            You never risk more than necessary and always consider worst-case scenarios.""",
            llm=self.llm,
            verbose=True
        )
    
    def calculate_position_size(self, account_balance: float, entry_price: float, 
                              stop_loss: float, confidence: int) -> dict:
        """Calculate position size based on risk parameters"""
        
        # Risk per trade (1-2% of account)
        risk_per_trade = account_balance * 0.02
        
        # Price risk per share
        price_risk = abs(entry_price - stop_loss)
        
        # Base position size
        base_shares = risk_per_trade / price_risk if price_risk > 0 else 0
        
        # Adjust for confidence level
        confidence_multiplier = confidence / 10.0
        adjusted_shares = int(base_shares * confidence_multiplier)
        
        # Apply maximum position size limit
        max_shares = int((account_balance * self.max_position_size) / entry_price)
        final_shares = min(adjusted_shares, max_shares)
        
        return {
            'shares': final_shares,
            'total_cost': final_shares * entry_price,
            'risk_amount': final_shares * price_risk,
            'percentage_risk': (final_shares * price_risk) / account_balance * 100
        }

Execution Agent

The Execution Agent handles order placement and monitoring.

# agents/execution_agent.py
from crewai import Agent, Task, Crew
from langchain.llms import Ollama
import alpaca_trade_api as tradeapi

class ExecutionAgent:
    def __init__(self, api_key: str, secret_key: str, paper: bool = True):
        self.llm = Ollama(model="llama2:13b")
        
        # Initialize Alpaca API
        base_url = 'https://paper-api.alpaca.markets' if paper else 'https://api.alpaca.markets'
        self.api = tradeapi.REST(api_key, secret_key, base_url, api_version='v2')
        
        self.agent = Agent(
            role="Trade Executor",
            goal="Execute trades efficiently and monitor order status",
            backstory="""You are a precise trade executor who ensures orders are 
            placed correctly and monitors execution quality.""",
            llm=self.llm,
            verbose=True
        )
    
    def place_order(self, symbol: str, quantity: int, side: str, 
                   order_type: str = 'market') -> dict:
        """Place trading order through Alpaca API"""
        
        try:
            order = self.api.submit_order(
                symbol=symbol,
                qty=quantity,
                side=side,  # 'buy' or 'sell'
                type=order_type,
                time_in_force='day'
            )
            
            return {
                'success': True,
                'order_id': order.id,
                'symbol': symbol,
                'quantity': quantity,
                'side': side,
                'status': order.status
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'symbol': symbol,
                'quantity': quantity,
                'side': side
            }
    
    def get_account_info(self) -> dict:
        """Retrieve account balance and buying power"""
        account = self.api.get_account()
        
        return {
            'cash': float(account.cash),
            'buying_power': float(account.buying_power),
            'portfolio_value': float(account.portfolio_value)
        }
Execution Agent Trading Interface

Implementing Agent Coordination

CrewAI Orchestration

Create a main controller that coordinates all agents:

# main.py
from crewai import Agent, Task, Crew
from agents.market_analyst import MarketAnalyst
from agents.risk_manager import RiskManager
from agents.execution_agent import ExecutionAgent
import time

class TradingSystemOrchestrator:
    def __init__(self, api_key: str, secret_key: str, paper_trading: bool = True):
        self.analyst = MarketAnalyst()
        self.risk_manager = RiskManager()
        self.executor = ExecutionAgent(api_key, secret_key, paper_trading)
        
        # Watchlist of stocks to monitor
        self.watchlist = ['AAPL', 'GOOGL', 'MSFT', 'TSLA', 'NVDA']
        
    def run_trading_cycle(self):
        """Execute one complete trading cycle"""
        
        # Get account information
        account_info = self.executor.get_account_info()
        print(f"Account Balance: ${account_info['cash']:.2f}")
        
        # Analyze each stock in watchlist
        for symbol in self.watchlist:
            print(f"\n--- Analyzing {symbol} ---")
            
            # Step 1: Market Analysis
            analysis = self.analyst.analyze_stock(symbol)
            print(f"Analysis: {analysis['analysis']}")
            
            # Step 2: Risk Assessment (example values)
            if "BUY" in analysis['analysis'].upper():
                entry_price = analysis['current_price']
                stop_loss = entry_price * 0.95  # 5% stop loss
                confidence = 7  # Extract from analysis in real implementation
                
                position_calc = self.risk_manager.calculate_position_size(
                    account_balance=account_info['cash'],
                    entry_price=entry_price,
                    stop_loss=stop_loss,
                    confidence=confidence
                )
                
                print(f"Position calculation: {position_calc}")
                
                # Step 3: Execute Trade
                if position_calc['shares'] > 0:
                    order_result = self.executor.place_order(
                        symbol=symbol,
                        quantity=position_calc['shares'],
                        side='buy'
                    )
                    print(f"Order result: {order_result}")
            
            # Wait between analyses to avoid rate limits
            time.sleep(2)
    
    def start_trading_system(self, interval_minutes: int = 15):
        """Start the automated trading system"""
        print("Starting Multi-Agent Trading System...")
        
        while True:
            try:
                self.run_trading_cycle()
                print(f"\nWaiting {interval_minutes} minutes until next cycle...")
                time.sleep(interval_minutes * 60)
                
            except KeyboardInterrupt:
                print("\nTrading system stopped by user.")
                break
            except Exception as e:
                print(f"Error in trading cycle: {e}")
                time.sleep(60)  # Wait 1 minute before retrying

# Usage example
if __name__ == "__main__":
    # Replace with your Alpaca API credentials
    API_KEY = "your_alpaca_api_key"
    SECRET_KEY = "your_alpaca_secret_key"
    
    # Initialize trading system
    trading_system = TradingSystemOrchestrator(API_KEY, SECRET_KEY, paper_trading=True)
    
    # Start automated trading
    trading_system.start_trading_system(interval_minutes=15)

Agent Communication Protocol

Implement structured communication between agents:

# tools/agent_communication.py
from dataclasses import dataclass
from typing import Dict, Any
import json

@dataclass
class Message:
    sender: str
    recipient: str
    message_type: str
    content: Dict[str, Any]
    timestamp: str

class MessageBroker:
    def __init__(self):
        self.message_queue = []
        self.agent_subscriptions = {}
    
    def subscribe(self, agent_name: str, message_types: list):
        """Subscribe agent to specific message types"""
        self.agent_subscriptions[agent_name] = message_types
    
    def publish(self, message: Message):
        """Publish message to subscribed agents"""
        self.message_queue.append(message)
        
        # Notify subscribed agents
        for agent, subscriptions in self.agent_subscriptions.items():
            if message.message_type in subscriptions:
                self.deliver_message(agent, message)
    
    def deliver_message(self, agent_name: str, message: Message):
        """Deliver message to specific agent"""
        print(f"Delivering {message.message_type} to {agent_name}: {message.content}")
Agent Communication Flow Diagram

Backtesting Your Multi-Agent System

Historical Data Testing

Before live trading, test your system with historical data:

# backtesting/backtest_engine.py
import pandas as pd
import yfinance as yf
from datetime import datetime, timedelta

class BacktestEngine:
    def __init__(self, initial_capital: float = 100000):
        self.initial_capital = initial_capital
        self.current_capital = initial_capital
        self.positions = {}
        self.trade_history = []
        
    def run_backtest(self, symbol: str, start_date: str, end_date: str):
        """Run backtest on historical data"""
        
        # Download historical data
        data = yf.download(symbol, start=start_date, end=end_date)
        
        # Simulate trading decisions for each day
        for date, row in data.iterrows():
            # Your trading logic here
            decision = self.make_trading_decision(symbol, row, date)
            
            if decision['action'] == 'BUY':
                self.execute_buy(symbol, decision['shares'], row['Close'], date)
            elif decision['action'] == 'SELL':
                self.execute_sell(symbol, decision['shares'], row['Close'], date)
        
        return self.calculate_performance()
    
    def make_trading_decision(self, symbol: str, price_data: pd.Series, date: datetime):
        """Simulate your agents' decision-making process"""
        # Simplified example - replace with your agent logic
        if len(self.trade_history) == 0:  # First trade
            return {'action': 'BUY', 'shares': 100}
        
        # Random decision for demo
        return {'action': 'HOLD', 'shares': 0}
    
    def calculate_performance(self):
        """Calculate backtest performance metrics"""
        total_return = ((self.current_capital - self.initial_capital) / self.initial_capital) * 100
        
        return {
            'initial_capital': self.initial_capital,
            'final_capital': self.current_capital,
            'total_return': total_return,
            'total_trades': len(self.trade_history)
        }

# Run backtest example
backtest = BacktestEngine(initial_capital=50000)
results = backtest.run_backtest('AAPL', '2024-01-01', '2024-12-31')
print(f"Backtest Results: {results}")
Backtest Performance Chart

Deployment and Monitoring

Cloud Deployment Options

Option 1: AWS EC2 Deployment

# Launch EC2 instance with Ubuntu
# Install dependencies
sudo apt update
sudo apt install python3-pip python3-venv

# Clone your trading system
git clone your-repo-url
cd multi_agent_trading

# Setup environment
python3 -m venv trading_env
source trading_env/bin/activate
pip install -r requirements.txt

# Install Ollama on EC2
curl -fsSL https://ollama.ai/install.sh | sh
ollama pull llama2:13b

# Run with screen for persistence
screen -S trading_system
python main.py

Option 2: Docker Deployment

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y curl

# Install Ollama
RUN curl -fsSL https://ollama.ai/install.sh | sh

# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt

# Copy application code
COPY . .

# Pull Ollama model
RUN ollama serve & sleep 10 && ollama pull llama2:13b

# Start trading system
CMD ["python", "main.py"]

Monitoring and Alerting

# monitoring/system_monitor.py
import smtplib
from email.mime.text import MimeText
import logging

class TradingSystemMonitor:
    def __init__(self, email_config: dict):
        self.email_config = email_config
        self.setup_logging()
        
    def setup_logging(self):
        """Configure logging for system monitoring"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('logs/trading_system.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def check_system_health(self, trading_system):
        """Monitor system health and send alerts"""
        try:
            # Check account balance
            account_info = trading_system.executor.get_account_info()
            
            # Alert if balance drops significantly
            if account_info['portfolio_value'] < trading_system.initial_balance * 0.9:
                self.send_alert(f"Portfolio down 10%: ${account_info['portfolio_value']}")
            
            # Log system status
            self.logger.info(f"System healthy - Portfolio: ${account_info['portfolio_value']}")
            
        except Exception as e:
            self.send_alert(f"System error: {str(e)}")
            self.logger.error(f"Health check failed: {e}")
    
    def send_alert(self, message: str):
        """Send email alert for important events"""
        try:
            msg = MimeText(message)
            msg['Subject'] = 'Trading System Alert'
            msg['From'] = self.email_config['from_email']
            msg['To'] = self.email_config['to_email']
            
            server = smtplib.SMTP('smtp.gmail.com', 587)
            server.starttls()
            server.login(self.email_config['username'], self.email_config['password'])
            server.send_message(msg)
            server.quit()
            
            self.logger.info(f"Alert sent: {message}")
            
        except Exception as e:
            self.logger.error(f"Failed to send alert: {e}")
Multi-Agent Trading System Monitoring Dashboard

Advanced Features and Optimization

Machine Learning Integration

Enhance your agents with machine learning capabilities:

# ml_models/price_predictor.py
from sklearn.ensemble import RandomForestRegressor
import pandas as pd
import numpy as np

class PricePredictor:
    def __init__(self):
        self.model = RandomForestRegressor(n_estimators=100, random_state=42)
        self.is_trained = False
    
    def prepare_features(self, data: pd.DataFrame) -> np.array:
        """Create features for ML model"""
        features = []
        
        # Technical indicators
        data['SMA_5'] = data['Close'].rolling(5).mean()
        data['SMA_20'] = data['Close'].rolling(20).mean()
        data['RSI'] = self.calculate_rsi(data['Close'])
        
        # Price changes
        data['price_change'] = data['Close'].pct_change()
        data['volume_change'] = data['Volume'].pct_change()
        
        # Select feature columns
        feature_cols = ['SMA_5', 'SMA_20', 'RSI', 'price_change', 'volume_change']
        return data[feature_cols].dropna().values
    
    def train(self, symbol: str, period: str = "2y"):
        """Train model on historical data"""
        stock = yf.Ticker(symbol)
        data = stock.history(period=period)
        
        # Prepare features and targets
        X = self.prepare_features(data)
        y = data['Close'].shift(-1).dropna().values  # Next day's price
        
        # Align arrays
        min_len = min(len(X), len(y))
        X, y = X[:min_len], y[:min_len]
        
        # Train model
        self.model.fit(X, y)
        self.is_trained = True
        
        return self.model.score(X, y)
    
    def predict_price(self, current_data: pd.DataFrame) -> float:
        """Predict next price movement"""
        if not self.is_trained:
            raise ValueError("Model must be trained first")
        
        features = self.prepare_features(current_data)
        return self.model.predict(features[-1].reshape(1, -1))[0]

Performance Optimization Tips

1. Reduce API Calls

# Cache market data to reduce API calls
import functools
import time

@functools.lru_cache(maxsize=128)
def get_cached_market_data(symbol: str, timestamp: int):
    """Cache market data for 5 minutes"""
    return yf.Ticker(symbol).history(period="1d")

# Usage
current_time = int(time.time() // 300) * 300  # 5-minute intervals
data = get_cached_market_data('AAPL', current_time)

2. Parallel Agent Processing

import asyncio

async def analyze_symbols_parallel(symbols: list):
    """Analyze multiple symbols simultaneously"""
    tasks = []
    for symbol in symbols:
        task = asyncio.create_task(analyze_symbol_async(symbol))
        tasks.append(task)
    
    results = await asyncio.gather(*tasks)
    return results

Troubleshooting Common Issues

Connection Problems

Ollama Connection Issues:

# Check if Ollama is running
ollama list

# Restart Ollama service
sudo systemctl restart ollama

# Check Ollama logs
sudo journalctl -u ollama -f

API Rate Limits:

# Implement exponential backoff
import time
import random

def api_call_with_retry(func, max_retries=3):
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise e
            
            wait_time = (2 ** attempt) + random.uniform(0, 1)
            time.sleep(wait_time)

Memory Management

Ollama Memory Issues:

# Clear Ollama memory periodically
import subprocess

def clear_ollama_memory():
    """Clear Ollama model from memory"""
    subprocess.run(['ollama', 'stop'], capture_output=True)
    time.sleep(5)
    subprocess.run(['ollama', 'serve'], capture_output=True)
System Resource Monitoring Dashboard

Security Best Practices

API Key Protection

# config/secure_config.py
import os
from cryptography.fernet import Fernet

class SecureConfig:
    def __init__(self):
        self.key = os.environ.get('ENCRYPTION_KEY')
        if not self.key:
            self.key = Fernet.generate_key()
            print(f"Generated encryption key: {self.key.decode()}")
        
        self.cipher = Fernet(self.key)
    
    def encrypt_api_key(self, api_key: str) -> str:
        """Encrypt API key for storage"""
        return self.cipher.encrypt(api_key.encode()).decode()
    
    def decrypt_api_key(self, encrypted_key: str) -> str:
        """Decrypt API key for use"""
        return self.cipher.decrypt(encrypted_key.encode()).decode()

# Usage
config = SecureConfig()
encrypted_key = config.encrypt_api_key("your_api_key")

Network Security

# Use VPN for trading connections
import requests

def secure_api_request(url: str, headers: dict = None):
    """Make API request through secure connection"""
    session = requests.Session()
    
    # Set secure headers
    secure_headers = {
        'User-Agent': 'TradingBot/1.0',
        'Accept': 'application/json',
        'Connection': 'close'
    }
    
    if headers:
        secure_headers.update(headers)
    
    # Use timeout and SSL verification
    response = session.get(
        url, 
        headers=secure_headers,
        timeout=30,
        verify=True
    )
    
    session.close()
    return response

Conclusion

You've successfully built a multi-agent trading system using Ollama and CrewAI. This automated trading platform combines specialized AI agents to analyze markets, manage risk, and execute trades without human intervention.

Key achievements:

  • Created specialized trading agents with distinct roles
  • Implemented agent coordination and communication
  • Built backtesting and monitoring capabilities
  • Deployed a secure, scalable trading system

Next steps:

  • Add more sophisticated trading strategies
  • Integrate additional data sources (news, social sentiment)
  • Implement portfolio rebalancing agents
  • Explore multi-asset trading capabilities

Your multi-agent trading system provides the foundation for automated trading success. Start with paper trading to validate performance before deploying real capital.

Remember: successful algorithmic trading requires continuous monitoring, regular strategy updates, and strict risk management. Always test thoroughly before risking real money.

Ready to deploy your trading system? Start with small position sizes and gradually scale up as you gain confidence in your agents' performance.


Disclaimer: This tutorial is for educational purposes only. Trading involves substantial risk of loss. Never trade with money you cannot afford to lose.