Remember when investment advice cost $200 per hour and came with a side of judgment about your spending habits? Those days died faster than a tech stock during market correction. Today, you can build robo-advisor with Ollama that manages portfolios smarter than most human advisors—and it won't lecture you about that daily coffee purchase.
Automated investment management systems now handle over $1.4 trillion in assets globally. Building your own robo-advisor gives you complete control over investment strategies, fee structures, and risk parameters. This guide shows you exactly how to create a sophisticated robo-advisor using Ollama's local AI capabilities.
You'll learn to build portfolio optimization algorithms, implement risk assessment systems, and create automated rebalancing features. By the end, you'll have a working robo-advisor that can manage investment portfolios with institutional-grade sophistication.
What Is a Robo-Advisor and Why Build One?
A robo-advisor automates investment management through algorithmic portfolio construction and rebalancing. Traditional robo-advisors like Betterment and Wealthfront charge 0.25-0.50% annually. Building your own eliminates these fees permanently.
Core robo-advisor functions include:
- Automated portfolio allocation based on risk tolerance
- Periodic rebalancing to maintain target allocations
- Tax-loss harvesting optimization
- Goal-based investment planning
- Risk assessment and adjustment
Benefits of custom robo-advisor development:
- Zero ongoing management fees
- Complete control over investment algorithms
- Custom risk models and strategies
- Integration with any brokerage API
- Privacy protection for financial data
Why Choose Ollama for Robo-Advisory Systems?
Ollama provides local AI processing without sending sensitive financial data to external services. This approach protects client information while delivering sophisticated investment analysis.
Ollama advantages for financial applications:
- Local processing ensures data privacy
- No per-API-call costs for investment analysis
- Customizable models for specific investment strategies
- Real-time decision making without latency
- Complete control over AI model behavior
Supported Ollama models for financial analysis:
- Llama 3.1: Best for complex investment reasoning
- Mistral: Excellent for risk assessment calculations
- CodeLlama: Optimal for automated trading logic
- Phi-3: Lightweight option for basic portfolio management
Prerequisites and Development Environment Setup
Required technical skills:
- Python programming (intermediate level)
- Basic understanding of investment concepts
- API integration experience
- Database management knowledge
System requirements:
- Python 3.9 or higher
- 16GB RAM minimum (32GB recommended)
- Ollama installed and configured
- PostgreSQL or SQLite database
Essential Python libraries:
pip install ollama pandas numpy yfinance alpaca-trade-api
pip install scikit-learn matplotlib plotly streamlit
pip install sqlalchemy psycopg2-binary python-dotenv
Install and configure Ollama:
# Install Ollama
curl -fsSL https://ollama.ai/install.sh | sh
# Download financial analysis model
ollama pull llama3.1:8b
# Verify installation
ollama list
Building the Core Robo-Advisor Architecture
Project structure setup:
robo_advisor/
├── src/
│ ├── portfolio/
│ │ ├── optimizer.py
│ │ ├── rebalancer.py
│ │ └── risk_analyzer.py
│ ├── data/
│ │ ├── market_data.py
│ │ └── user_profiles.py
│ ├── ai/
│ │ ├── ollama_client.py
│ │ └── investment_advisor.py
│ └── api/
│ └── broker_integration.py
├── config/
│ └── settings.py
└── main.py
Core system configuration:
# config/settings.py
import os
from dataclasses import dataclass
@dataclass
class RoboAdvisorConfig:
"""Configuration settings for robo-advisor system"""
# Ollama settings
ollama_base_url: str = "http://localhost:11434"
model_name: str = "llama3.1:8b"
# Database settings
database_url: str = os.getenv("DATABASE_URL", "sqlite:///robo_advisor.db")
# Risk management settings
max_position_size: float = 0.10 # 10% maximum per position
rebalance_threshold: float = 0.05 # 5% drift triggers rebalancing
# Market data settings
data_provider: str = "yfinance"
update_frequency: int = 3600 # Update every hour
# Portfolio settings
default_risk_tolerance: str = "moderate"
emergency_fund_months: int = 6
Implementing Ollama Integration for Investment Analysis
Ollama client setup:
# src/ai/ollama_client.py
import ollama
import json
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class InvestmentRecommendation:
"""Structure for AI investment recommendations"""
ticker: str
allocation_percentage: float
reasoning: str
risk_score: float
confidence_level: float
class OllamaInvestmentAnalyzer:
"""AI-powered investment analysis using Ollama"""
def __init__(self, model_name: str = "llama3.1:8b"):
self.model_name = model_name
self.client = ollama.Client()
def analyze_portfolio_allocation(self,
user_profile: Dict,
market_data: Dict) -> List[InvestmentRecommendation]:
"""Generate portfolio allocation recommendations"""
prompt = self._build_allocation_prompt(user_profile, market_data)
response = self.client.chat(
model=self.model_name,
messages=[{
'role': 'user',
'content': prompt
}],
stream=False
)
return self._parse_allocation_response(response['message']['content'])
def _build_allocation_prompt(self, user_profile: Dict, market_data: Dict) -> str:
"""Build comprehensive prompt for portfolio allocation"""
return f"""
You are an expert financial advisor. Analyze this user profile and recommend portfolio allocation.
User Profile:
- Age: {user_profile['age']}
- Risk Tolerance: {user_profile['risk_tolerance']}
- Investment Goals: {user_profile['goals']}
- Time Horizon: {user_profile['time_horizon']} years
- Current Portfolio Value: ${user_profile['portfolio_value']:,.2f}
Market Conditions:
- VIX Level: {market_data['vix']}
- 10-Year Treasury Yield: {market_data['treasury_10y']}%
- S&P 500 PE Ratio: {market_data['sp500_pe']}
Provide allocation recommendations in JSON format:
{{
"recommendations": [
{{
"ticker": "VTI",
"allocation_percentage": 60.0,
"reasoning": "Core equity exposure for long-term growth",
"risk_score": 7.5,
"confidence_level": 0.85
}}
]
}}
Focus on diversified ETFs and consider current market conditions.
"""
def _parse_allocation_response(self, response: str) -> List[InvestmentRecommendation]:
"""Parse AI response into structured recommendations"""
try:
data = json.loads(response)
recommendations = []
for rec in data['recommendations']:
recommendations.append(InvestmentRecommendation(
ticker=rec['ticker'],
allocation_percentage=rec['allocation_percentage'],
reasoning=rec['reasoning'],
risk_score=rec['risk_score'],
confidence_level=rec['confidence_level']
))
return recommendations
except (json.JSONDecodeError, KeyError) as e:
print(f"Error parsing AI response: {e}")
return []
Portfolio Optimization and Risk Management
Modern Portfolio Theory implementation:
# src/portfolio/optimizer.py
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from typing import Dict, List, Tuple
import yfinance as yf
class PortfolioOptimizer:
"""Advanced portfolio optimization using Modern Portfolio Theory"""
def __init__(self, risk_free_rate: float = 0.02):
self.risk_free_rate = risk_free_rate
def optimize_portfolio(self,
tickers: List[str],
target_return: float = None,
risk_tolerance: str = "moderate") -> Dict:
"""Optimize portfolio allocation based on risk-return profile"""
# Download historical price data
price_data = self._get_price_data(tickers)
returns = self._calculate_returns(price_data)
# Calculate expected returns and covariance matrix
expected_returns = returns.mean() * 252 # Annualized
cov_matrix = returns.cov() * 252 # Annualized
# Set optimization target based on risk tolerance
if target_return is None:
target_return = self._get_target_return(risk_tolerance)
# Optimize portfolio weights
optimal_weights = self._optimize_weights(
expected_returns, cov_matrix, target_return
)
# Calculate portfolio metrics
portfolio_return = np.sum(optimal_weights * expected_returns)
portfolio_volatility = np.sqrt(
np.dot(optimal_weights.T, np.dot(cov_matrix, optimal_weights))
)
sharpe_ratio = (portfolio_return - self.risk_free_rate) / portfolio_volatility
return {
'weights': dict(zip(tickers, optimal_weights)),
'expected_return': portfolio_return,
'volatility': portfolio_volatility,
'sharpe_ratio': sharpe_ratio,
'risk_tolerance': risk_tolerance
}
def _get_price_data(self, tickers: List[str], period: str = "2y") -> pd.DataFrame:
"""Download historical price data for portfolio assets"""
data = yf.download(tickers, period=period)['Adj Close']
return data.dropna()
def _calculate_returns(self, price_data: pd.DataFrame) -> pd.DataFrame:
"""Calculate daily returns from price data"""
return price_data.pct_change().dropna()
def _get_target_return(self, risk_tolerance: str) -> float:
"""Set target return based on risk tolerance level"""
targets = {
'conservative': 0.06, # 6% annual return
'moderate': 0.08, # 8% annual return
'aggressive': 0.12 # 12% annual return
}
return targets.get(risk_tolerance, 0.08)
def _optimize_weights(self,
expected_returns: pd.Series,
cov_matrix: pd.DataFrame,
target_return: float) -> np.ndarray:
"""Optimize portfolio weights using quadratic programming"""
num_assets = len(expected_returns)
# Objective function: minimize portfolio variance
def objective(weights):
return np.dot(weights.T, np.dot(cov_matrix, weights))
# Constraints
constraints = [
{'type': 'eq', 'fun': lambda x: np.sum(x) - 1}, # Weights sum to 1
{'type': 'eq', 'fun': lambda x: np.sum(x * expected_returns) - target_return}
]
# Bounds: weights between 0 and 1
bounds = tuple((0, 1) for _ in range(num_assets))
# Initial guess: equal weights
initial_guess = np.array([1/num_assets] * num_assets)
# Optimize
result = minimize(
objective,
initial_guess,
method='SLSQP',
bounds=bounds,
constraints=constraints
)
return result.x
Automated Rebalancing System
Portfolio rebalancing implementation:
# src/portfolio/rebalancer.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class RebalanceAction:
"""Structure for rebalancing actions"""
ticker: str
current_weight: float
target_weight: float
action: str # 'buy', 'sell', 'hold'
shares_to_trade: int
dollar_amount: float
class PortfolioRebalancer:
"""Automated portfolio rebalancing system"""
def __init__(self, rebalance_threshold: float = 0.05):
self.rebalance_threshold = rebalance_threshold
def check_rebalance_needed(self,
current_portfolio: Dict,
target_allocation: Dict) -> bool:
"""Check if portfolio needs rebalancing"""
for ticker, target_weight in target_allocation.items():
current_weight = current_portfolio.get(ticker, 0)
drift = abs(current_weight - target_weight)
if drift > self.rebalance_threshold:
return True
return False
def generate_rebalance_actions(self,
current_portfolio: Dict,
target_allocation: Dict,
current_prices: Dict,
total_portfolio_value: float) -> List[RebalanceAction]:
"""Generate specific rebalancing actions"""
actions = []
for ticker, target_weight in target_allocation.items():
current_weight = current_portfolio.get(ticker, 0)
target_value = total_portfolio_value * target_weight
current_value = total_portfolio_value * current_weight
dollar_difference = target_value - current_value
current_price = current_prices[ticker]
shares_to_trade = int(dollar_difference / current_price)
if abs(dollar_difference) > (total_portfolio_value * 0.01): # 1% minimum
action_type = 'buy' if shares_to_trade > 0 else 'sell'
actions.append(RebalanceAction(
ticker=ticker,
current_weight=current_weight,
target_weight=target_weight,
action=action_type,
shares_to_trade=abs(shares_to_trade),
dollar_amount=abs(dollar_difference)
))
return actions
def calculate_tax_impact(self, actions: List[RebalanceAction]) -> Dict:
"""Calculate potential tax implications of rebalancing"""
total_realized_gains = 0
total_realized_losses = 0
# This would integrate with actual cost basis tracking
# Simplified calculation for demonstration
for action in actions:
if action.action == 'sell':
# Estimate gain/loss (would use actual cost basis in production)
estimated_gain = action.dollar_amount * 0.1 # 10% average gain
if estimated_gain > 0:
total_realized_gains += estimated_gain
else:
total_realized_losses += abs(estimated_gain)
return {
'total_realized_gains': total_realized_gains,
'total_realized_losses': total_realized_losses,
'net_tax_impact': total_realized_gains * 0.20 # 20% capital gains rate
}
Risk Assessment and User Profiling
Comprehensive risk assessment system:
# src/portfolio/risk_analyzer.py
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple
from enum import Enum
class RiskLevel(Enum):
VERY_LOW = 1
LOW = 2
MODERATE = 3
HIGH = 4
VERY_HIGH = 5
class RiskProfileAnalyzer:
"""Advanced risk assessment and user profiling system"""
def __init__(self):
self.risk_questions = self._load_risk_questions()
def assess_risk_tolerance(self, questionnaire_responses: Dict) -> Dict:
"""Comprehensive risk tolerance assessment"""
risk_score = 0
max_score = 0
for question_id, response in questionnaire_responses.items():
question = self.risk_questions[question_id]
risk_score += question['scoring'][response]
max_score += max(question['scoring'].values())
# Calculate normalized risk score (0-100)
normalized_score = (risk_score / max_score) * 100
# Determine risk level
risk_level = self._calculate_risk_level(normalized_score)
# Generate investment recommendations
recommendations = self._generate_risk_based_recommendations(risk_level)
return {
'risk_score': normalized_score,
'risk_level': risk_level.name,
'recommendations': recommendations,
'suggested_allocation': self._get_allocation_template(risk_level)
}
def _load_risk_questions(self) -> Dict:
"""Load risk assessment questionnaire"""
return {
'age_group': {
'question': 'What is your age group?',
'options': ['18-30', '31-45', '46-60', '60+'],
'scoring': {'18-30': 4, '31-45': 3, '46-60': 2, '60+': 1}
},
'investment_experience': {
'question': 'How much investment experience do you have?',
'options': ['None', 'Some', 'Experienced', 'Expert'],
'scoring': {'None': 1, 'Some': 2, 'Experienced': 3, 'Expert': 4}
},
'income_stability': {
'question': 'How stable is your income?',
'options': ['Very unstable', 'Somewhat stable', 'Stable', 'Very stable'],
'scoring': {'Very unstable': 1, 'Somewhat stable': 2, 'Stable': 3, 'Very stable': 4}
},
'investment_horizon': {
'question': 'When do you plan to use this money?',
'options': ['< 2 years', '2-5 years', '5-10 years', '> 10 years'],
'scoring': {'< 2 years': 1, '2-5 years': 2, '5-10 years': 3, '> 10 years': 4}
},
'loss_tolerance': {
'question': 'How would you react to a 20% portfolio loss?',
'options': ['Panic and sell', 'Very concerned', 'Somewhat concerned', 'Stay the course'],
'scoring': {'Panic and sell': 1, 'Very concerned': 2, 'Somewhat concerned': 3, 'Stay the course': 4}
}
}
def _calculate_risk_level(self, score: float) -> RiskLevel:
"""Determine risk level from score"""
if score <= 20:
return RiskLevel.VERY_LOW
elif score <= 40:
return RiskLevel.LOW
elif score <= 60:
return RiskLevel.MODERATE
elif score <= 80:
return RiskLevel.HIGH
else:
return RiskLevel.VERY_HIGH
def _get_allocation_template(self, risk_level: RiskLevel) -> Dict:
"""Get asset allocation template based on risk level"""
templates = {
RiskLevel.VERY_LOW: {
'bonds': 70, 'stocks': 20, 'cash': 10
},
RiskLevel.LOW: {
'bonds': 50, 'stocks': 40, 'cash': 10
},
RiskLevel.MODERATE: {
'bonds': 30, 'stocks': 60, 'alternatives': 10
},
RiskLevel.HIGH: {
'bonds': 20, 'stocks': 70, 'alternatives': 10
},
RiskLevel.VERY_HIGH: {
'bonds': 10, 'stocks': 80, 'alternatives': 10
}
}
return templates[risk_level]
Market Data Integration and Analysis
Real-time market data system:
# src/data/market_data.py
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import requests
class MarketDataProvider:
"""Comprehensive market data integration system"""
def __init__(self):
self.cache_duration = 300 # 5 minutes cache
self._price_cache = {}
self._last_update = {}
def get_current_prices(self, tickers: List[str]) -> Dict[str, float]:
"""Get current market prices for tickers"""
current_time = datetime.now()
prices = {}
for ticker in tickers:
# Check cache first
if (ticker in self._price_cache and
ticker in self._last_update and
(current_time - self._last_update[ticker]).seconds < self.cache_duration):
prices[ticker] = self._price_cache[ticker]
else:
# Fetch fresh data
try:
stock = yf.Ticker(ticker)
price = stock.info['regularMarketPrice']
prices[ticker] = price
# Update cache
self._price_cache[ticker] = price
self._last_update[ticker] = current_time
except Exception as e:
print(f"Error fetching price for {ticker}: {e}")
prices[ticker] = 0.0
return prices
def get_market_indicators(self) -> Dict[str, float]:
"""Get key market indicators for investment decisions"""
indicators = {}
try:
# VIX (Fear Index)
vix = yf.Ticker("^VIX")
indicators['vix'] = vix.info['regularMarketPrice']
# 10-Year Treasury Yield
treasury = yf.Ticker("^TNX")
indicators['treasury_10y'] = treasury.info['regularMarketPrice']
# S&P 500 P/E Ratio (approximate)
sp500 = yf.Ticker("^GSPC")
indicators['sp500_price'] = sp500.info['regularMarketPrice']
# Dollar Index
dxy = yf.Ticker("DX-Y.NYB")
indicators['dollar_index'] = dxy.info['regularMarketPrice']
except Exception as e:
print(f"Error fetching market indicators: {e}")
return indicators
def calculate_portfolio_metrics(self,
portfolio: Dict[str, float],
prices: Dict[str, float]) -> Dict:
"""Calculate comprehensive portfolio performance metrics"""
total_value = sum(shares * prices[ticker]
for ticker, shares in portfolio.items()
if ticker in prices)
# Get historical data for performance calculation
tickers = list(portfolio.keys())
historical_data = yf.download(tickers, period="1y")['Adj Close']
if len(tickers) == 1:
historical_data = historical_data.to_frame(tickers[0])
# Calculate portfolio weights
weights = {}
for ticker, shares in portfolio.items():
if ticker in prices:
weights[ticker] = (shares * prices[ticker]) / total_value
# Calculate historical portfolio returns
portfolio_returns = []
for date in historical_data.index:
daily_return = 0
for ticker, weight in weights.items():
if ticker in historical_data.columns:
stock_return = historical_data[ticker].pct_change().loc[date]
if not pd.isna(stock_return):
daily_return += weight * stock_return
portfolio_returns.append(daily_return)
portfolio_returns = pd.Series(portfolio_returns).dropna()
# Calculate metrics
annual_return = portfolio_returns.mean() * 252
annual_volatility = portfolio_returns.std() * np.sqrt(252)
sharpe_ratio = annual_return / annual_volatility if annual_volatility > 0 else 0
max_drawdown = self._calculate_max_drawdown(portfolio_returns)
return {
'total_value': total_value,
'annual_return': annual_return,
'annual_volatility': annual_volatility,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'weights': weights
}
def _calculate_max_drawdown(self, returns: pd.Series) -> float:
"""Calculate maximum drawdown from returns series"""
cumulative = (1 + returns).cumprod()
running_max = cumulative.expanding().max()
drawdown = (cumulative - running_max) / running_max
return drawdown.min()
Building the User Interface
Streamlit web application:
# main.py
import streamlit as st
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from src.ai.ollama_client import OllamaInvestmentAnalyzer
from src.portfolio.optimizer import PortfolioOptimizer
from src.portfolio.rebalancer import PortfolioRebalancer
from src.portfolio.risk_analyzer import RiskProfileAnalyzer
from src.data.market_data import MarketDataProvider
st.set_page_config(
page_title="AI Robo-Advisor",
page_icon="🤖",
layout="wide"
)
def main():
"""Main application interface"""
st.title("🤖 AI-Powered Robo-Advisor")
st.subheader("Automated Investment Management with Ollama")
# Initialize components
ai_analyzer = OllamaInvestmentAnalyzer()
optimizer = PortfolioOptimizer()
rebalancer = PortfolioRebalancer()
risk_analyzer = RiskProfileAnalyzer()
market_data = MarketDataProvider()
# Sidebar for user inputs
with st.sidebar:
st.header("User Profile")
age = st.slider("Age", 18, 80, 35)
risk_tolerance = st.selectbox("Risk Tolerance",
["conservative", "moderate", "aggressive"])
investment_amount = st.number_input("Investment Amount ($)",
min_value=1000, value=10000)
time_horizon = st.slider("Investment Time Horizon (years)", 1, 40, 10)
goals = st.multiselect("Investment Goals",
["Retirement", "House Down Payment", "Education",
"Wealth Building", "Emergency Fund"])
# Main content area
col1, col2 = st.columns([2, 1])
with col1:
st.header("Portfolio Recommendations")
if st.button("Generate AI Recommendations"):
with st.spinner("Analyzing market conditions and generating recommendations..."):
# Build user profile
user_profile = {
'age': age,
'risk_tolerance': risk_tolerance,
'investment_amount': investment_amount,
'time_horizon': time_horizon,
'goals': goals,
'portfolio_value': investment_amount
}
# Get market data
market_indicators = market_data.get_market_indicators()
# Generate AI recommendations
recommendations = ai_analyzer.analyze_portfolio_allocation(
user_profile, market_indicators
)
# Display recommendations
if recommendations:
st.success("✅ AI Analysis Complete!")
rec_df = pd.DataFrame([
{
'Ticker': rec.ticker,
'Allocation %': f"{rec.allocation_percentage:.1f}%",
'Risk Score': rec.risk_score,
'Confidence': f"{rec.confidence_level:.1%}",
'Reasoning': rec.reasoning
}
for rec in recommendations
])
st.dataframe(rec_df, use_container_width=True)
# Create allocation pie chart
fig = px.pie(
values=[rec.allocation_percentage for rec in recommendations],
names=[rec.ticker for rec in recommendations],
title="Recommended Portfolio Allocation"
)
st.plotly_chart(fig, use_container_width=True)
else:
st.error("❌ Unable to generate recommendations. Please check Ollama connection.")
with col2:
st.header("Market Overview")
# Display market indicators
indicators = market_data.get_market_indicators()
for indicator, value in indicators.items():
if indicator == 'vix':
st.metric("VIX (Fear Index)", f"{value:.2f}")
elif indicator == 'treasury_10y':
st.metric("10-Year Treasury", f"{value:.2f}%")
elif indicator == 'sp500_price':
st.metric("S&P 500", f"{value:,.2f}")
st.header("Risk Assessment")
if st.button("Take Risk Assessment"):
# Simplified risk assessment for demo
risk_responses = {
'age_group': '31-45' if age < 45 else '46-60',
'investment_experience': 'Some',
'income_stability': 'Stable',
'investment_horizon': '> 10 years' if time_horizon > 10 else '5-10 years',
'loss_tolerance': 'Somewhat concerned'
}
risk_profile = risk_analyzer.assess_risk_tolerance(risk_responses)
st.success(f"Risk Level: {risk_profile['risk_level']}")
st.info(f"Risk Score: {risk_profile['risk_score']:.1f}/100")
# Display suggested allocation
allocation = risk_profile['suggested_allocation']
fig = px.bar(
x=list(allocation.keys()),
y=list(allocation.values()),
title="Suggested Asset Allocation"
)
st.plotly_chart(fig, use_container_width=True)
if __name__ == "__main__":
main()
Testing and Deployment Strategies
Comprehensive testing framework:
# tests/test_portfolio_optimizer.py
import unittest
import pandas as pd
import numpy as np
from src.portfolio.optimizer import PortfolioOptimizer
class TestPortfolioOptimizer(unittest.TestCase):
"""Unit tests for portfolio optimization system"""
def setUp(self):
self.optimizer = PortfolioOptimizer()
self.test_tickers = ['VTI', 'VTIAX', 'BND']
def test_portfolio_optimization(self):
"""Test portfolio optimization returns valid results"""
result = self.optimizer.optimize_portfolio(
self.test_tickers,
target_return=0.08
)
# Check that weights sum to 1
total_weight = sum(result['weights'].values())
self.assertAlmostEqual(total_weight, 1.0, places=2)
# Check that all weights are non-negative
for weight in result['weights'].values():
self.assertGreaterEqual(weight, 0)
# Check that expected return is positive
self.assertGreater(result['expected_return'], 0)
# Check that volatility is positive
self.assertGreater(result['volatility'], 0)
def test_risk_tolerance_mapping(self):
"""Test different risk tolerance levels produce different allocations"""
conservative = self.optimizer.optimize_portfolio(
self.test_tickers,
risk_tolerance="conservative"
)
aggressive = self.optimizer.optimize_portfolio(
self.test_tickers,
risk_tolerance="aggressive"
)
# Aggressive should have higher expected return
self.assertGreater(
aggressive['expected_return'],
conservative['expected_return']
)
if __name__ == '__main__':
unittest.main()
Docker deployment configuration:
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# Install Ollama
RUN curl -fsSL https://ollama.ai/install.sh | sh
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Expose port for Streamlit
EXPOSE 8501
# Start services
CMD ["sh", "-c", "ollama serve & sleep 10 && ollama pull llama3.1:8b && streamlit run main.py --server.port=8501 --server.address=0.0.0.0"]
Performance Monitoring and Analytics
Portfolio performance tracking:
# src/analytics/performance_tracker.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List
import sqlite3
class PerformanceTracker:
"""Track and analyze portfolio performance over time"""
def __init__(self, db_path: str = "portfolio_performance.db"):
self.db_path = db_path
self._init_database()
def _init_database(self):
"""Initialize performance tracking database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS portfolio_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME,
user_id TEXT,
total_value REAL,
daily_return REAL,
allocation TEXT,
benchmark_return REAL
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS rebalance_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME,
user_id TEXT,
actions TEXT,
cost REAL,
reason TEXT
)
""")
conn.commit()
conn.close()
def record_portfolio_snapshot(self,
user_id: str,
portfolio_value: float,
allocation: Dict,
benchmark_return: float):
"""Record daily portfolio performance snapshot"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Calculate daily return (simplified - would use previous day's value in production)
daily_return = 0.0 # Placeholder for actual calculation
cursor.execute("""
INSERT INTO portfolio_snapshots
(timestamp, user_id, total_value, daily_return, allocation, benchmark_return)
VALUES (?, ?, ?, ?, ?, ?)
""", (
datetime.now(),
user_id,
portfolio_value,
daily_return,
str(allocation),
benchmark_return
))
conn.commit()
conn.close()
def generate_performance_report(self, user_id: str, period_days: int = 30) -> Dict:
"""Generate comprehensive performance report"""
conn = sqlite3.connect(self.db_path)
query = """
SELECT * FROM portfolio_snapshots
WHERE user_id = ? AND timestamp >= datetime('now', '-{} days')
ORDER BY timestamp
""".format(period_days)
df = pd.read_sql_query(query, conn, params=(user_id,))
conn.close()
if df.empty:
return {'error': 'No performance data available'}
# Calculate performance metrics
total_return = (df['total_value'].iloc[-1] / df['total_value'].iloc[0]) - 1
benchmark_return = df['benchmark_return'].mean() * period_days / 365
alpha = total_return - benchmark_return
volatility = df['daily_return'].std() * np.sqrt(252) # Annualized
sharpe_ratio = (total_return * 365 / period_days) / volatility if volatility > 0 else 0
return {
'period_days': period_days,
'total_return': total_return,
'benchmark_return': benchmark_return,
'alpha': alpha,
'volatility': volatility,
'sharpe_ratio': sharpe_ratio,
'max_value': df['total_value'].max(),
'min_value': df['total_value'].min(),
'current_value': df['total_value'].iloc[-1]
}
Advanced Features and Extensions
Tax-loss harvesting implementation:
# src/portfolio/tax_optimizer.py
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import pandas as pd
class TaxLossHarvester:
"""Automated tax-loss harvesting system"""
def __init__(self, wash_sale_days: int = 30):
self.wash_sale_days = wash_sale_days
def identify_harvest_opportunities(self,
portfolio: Dict,
cost_basis: Dict,
current_prices: Dict) -> List[Dict]:
"""Identify tax-loss harvesting opportunities"""
opportunities = []
for ticker, shares in portfolio.items():
if ticker in cost_basis and ticker in current_prices:
current_value = shares * current_prices[ticker]
original_cost = shares * cost_basis[ticker]['price']
unrealized_loss = original_cost - current_value
if unrealized_loss > 100: # Minimum $100 loss threshold
# Check wash sale rule
purchase_date = cost_basis[ticker]['date']
days_held = (datetime.now() - purchase_date).days
if days_held > self.wash_sale_days:
opportunities.append({
'ticker': ticker,
'shares': shares,
'unrealized_loss': unrealized_loss,
'tax_benefit': unrealized_loss * 0.22, # 22% tax rate
'replacement_suggestion': self._suggest_replacement(ticker)
})
return sorted(opportunities, key=lambda x: x['tax_benefit'], reverse=True)
def _suggest_replacement(self, ticker: str) -> str:
"""Suggest replacement ETF to avoid wash sale rule"""
replacements = {
'VTI': 'ITOT', # Total stock market alternatives
'VTIAX': 'FTIHX', # International alternatives
'BND': 'AGG', # Bond alternatives
'QQQ': 'ONEQ', # NASDAQ alternatives
}
return replacements.get(ticker, 'CASH')
Security and Compliance Considerations
Data protection and security measures:
# src/security/data_protection.py
import hashlib
import secrets
from cryptography.fernet import Fernet
import os
class DataProtectionManager:
"""Handle sensitive financial data protection"""
def __init__(self):
self.encryption_key = self._get_or_create_key()
self.cipher = Fernet(self.encryption_key)
def _get_or_create_key(self) -> bytes:
"""Get or create encryption key for sensitive data"""
key_file = "encryption.key"
if os.path.exists(key_file):
with open(key_file, 'rb') as f:
return f.read()
else:
key = Fernet.generate_key()
with open(key_file, 'wb') as f:
f.write(key)
return key
def encrypt_sensitive_data(self, data: str) -> str:
"""Encrypt sensitive financial information"""
encrypted_data = self.cipher.encrypt(data.encode())
return encrypted_data.decode()
def decrypt_sensitive_data(self, encrypted_data: str) -> str:
"""Decrypt sensitive financial information"""
decrypted_data = self.cipher.decrypt(encrypted_data.encode())
return decrypted_data.decode()
def hash_user_id(self, user_identifier: str) -> str:
"""Create secure hash for user identification"""
salt = secrets.token_bytes(32)
hash_object = hashlib.pbkdf2_hmac('sha256',
user_identifier.encode(),
salt,
100000)
return (salt + hash_object).hex()
Conclusion
You now have a complete blueprint to build robo-advisor with Ollama for automated investment management. This system combines modern portfolio theory, AI-powered analysis, and automated rebalancing to create institutional-grade investment management capabilities.
Key benefits achieved:
- Zero ongoing management fees compared to traditional robo-advisors
- Complete data privacy through local AI processing
- Customizable investment strategies and risk models
- Advanced features like tax-loss harvesting and performance analytics
- Scalable architecture supporting multiple users
Next steps for enhancement:
- Integrate with real brokerage APIs for live trading
- Add alternative investment classes (REITs, commodities, crypto)
- Implement more sophisticated risk models
- Build mobile applications for portfolio monitoring
- Add social trading and advisor marketplace features
Your robo-advisor can now manage portfolios with the sophistication of services costing thousands annually. The combination of Ollama's local AI processing and modern portfolio management techniques provides a powerful foundation for automated investment management.
Ready to revolutionize your investment management? Deploy this robo-advisor system and start building wealth with AI-powered automation.