Wall Street analysts spend 40+ hours per week screening investments manually. What if you could automate this process and add AI-powered ESG insights in under 2 hours?
This guide shows you how to build a comprehensive investment screener with Ollama that combines traditional factor analysis with modern ESG (Environmental, Social, Governance) evaluation. You'll create a tool that screens thousands of stocks, analyzes sustainability metrics, and generates investment recommendations automatically.
What You'll Learn
- Set up Ollama for financial data processing
- Build automated ESG scoring systems
- Implement multi-factor investment screening
- Create custom ranking algorithms
- Deploy your screener with real-time updates
Prerequisites and Setup
System Requirements
Before building your investment screener, ensure you have:
- Python 3.8 or higher
- 8GB RAM minimum (16GB recommended)
- Ollama installed locally
- API keys for financial data providers
Installing Ollama
# Install Ollama (macOS/Linux)
curl -fsSL https://ollama.ai/install.sh | sh
# Pull the recommended model for financial analysis
ollama pull llama3.1:8b
# Verify installation
ollama --version
Python Dependencies
# requirements.txt
ollama==0.2.1
yfinance==0.2.18
pandas==2.0.3
numpy==1.24.3
requests==2.31.0
python-dotenv==1.0.0
matplotlib==3.7.1
seaborn==0.12.2
Install dependencies:
pip install -r requirements.txt
Understanding Investment Screening Fundamentals
What is Factor Analysis?
Factor analysis identifies underlying variables that explain stock performance patterns. Common factors include:
- Value factors: Price-to-earnings, price-to-book ratios
- Growth factors: Revenue growth, earnings growth rates
- Quality factors: Return on equity, debt-to-equity ratios
- Momentum factors: Price momentum, earnings revisions
ESG Integration Benefits
ESG analysis adds sustainability considerations to traditional screening:
- Environmental: Carbon footprint, resource efficiency
- Social: Employee satisfaction, community impact
- Governance: Board composition, executive compensation
Companies with strong ESG scores often show better long-term performance and lower risk profiles.
Building the Core Investment Screener
Step 1: Create the Base Screener Class
# investment_screener.py
import ollama
import yfinance as yf
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple
import json
import logging
class InvestmentScreener:
"""
AI-powered investment screener using Ollama for ESG and factor analysis
"""
def __init__(self, model_name: str = "llama3.1:8b"):
self.model_name = model_name
self.client = ollama.Client()
self.logger = self._setup_logging()
# Initialize screening criteria
self.factor_weights = {
'value': 0.25,
'growth': 0.25,
'quality': 0.25,
'momentum': 0.25
}
self.esg_weights = {
'environmental': 0.33,
'social': 0.33,
'governance': 0.34
}
def _setup_logging(self) -> logging.Logger:
"""Configure logging for screening operations"""
logger = logging.getLogger('investment_screener')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
Step 2: Implement Data Collection
def fetch_stock_data(self, symbols: List[str]) -> pd.DataFrame:
"""
Fetch comprehensive stock data for analysis
Args:
symbols: List of stock symbols to analyze
Returns:
DataFrame with stock metrics
"""
stock_data = []
for symbol in symbols:
try:
ticker = yf.Ticker(symbol)
info = ticker.info
# Get financial metrics
stock_metrics = {
'symbol': symbol,
'company_name': info.get('longName', 'N/A'),
'sector': info.get('sector', 'N/A'),
'industry': info.get('industry', 'N/A'),
# Value factors
'pe_ratio': info.get('trailingPE', np.nan),
'pb_ratio': info.get('priceToBook', np.nan),
'price_to_sales': info.get('priceToSalesTrailing12Months', np.nan),
# Growth factors
'revenue_growth': info.get('revenueGrowth', np.nan),
'earnings_growth': info.get('earningsGrowth', np.nan),
# Quality factors
'roe': info.get('returnOnEquity', np.nan),
'debt_to_equity': info.get('debtToEquity', np.nan),
'current_ratio': info.get('currentRatio', np.nan),
# Momentum factors
'price_change_1y': info.get('52WeekChange', np.nan),
'beta': info.get('beta', np.nan),
# Basic info
'market_cap': info.get('marketCap', np.nan),
'current_price': info.get('currentPrice', np.nan)
}
stock_data.append(stock_metrics)
self.logger.info(f"Fetched data for {symbol}")
except Exception as e:
self.logger.error(f"Error fetching {symbol}: {str(e)}")
continue
return pd.DataFrame(stock_data)
Step 3: Implement Factor Analysis
def calculate_factor_scores(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Calculate factor scores for each stock
Args:
df: DataFrame with stock data
Returns:
DataFrame with factor scores added
"""
df_scored = df.copy()
# Value Score (lower is better for ratios)
df_scored['value_score'] = (
self._rank_inverse(df_scored['pe_ratio']) * 0.4 +
self._rank_inverse(df_scored['pb_ratio']) * 0.3 +
self._rank_inverse(df_scored['price_to_sales']) * 0.3
)
# Growth Score (higher is better)
df_scored['growth_score'] = (
self._rank_normal(df_scored['revenue_growth']) * 0.6 +
self._rank_normal(df_scored['earnings_growth']) * 0.4
)
# Quality Score (higher ROE, lower debt is better)
df_scored['quality_score'] = (
self._rank_normal(df_scored['roe']) * 0.4 +
self._rank_inverse(df_scored['debt_to_equity']) * 0.3 +
self._rank_normal(df_scored['current_ratio']) * 0.3
)
# Momentum Score (higher price change is better, beta closer to 1 is moderate)
df_scored['momentum_score'] = (
self._rank_normal(df_scored['price_change_1y']) * 0.7 +
self._rank_beta(df_scored['beta']) * 0.3
)
# Calculate composite factor score
df_scored['factor_score'] = (
df_scored['value_score'] * self.factor_weights['value'] +
df_scored['growth_score'] * self.factor_weights['growth'] +
df_scored['quality_score'] * self.factor_weights['quality'] +
df_scored['momentum_score'] * self.factor_weights['momentum']
)
return df_scored
def _rank_normal(self, series: pd.Series) -> pd.Series:
"""Rank series where higher values are better"""
return series.rank(pct=True, na_option='bottom')
def _rank_inverse(self, series: pd.Series) -> pd.Series:
"""Rank series where lower values are better"""
return series.rank(pct=True, ascending=False, na_option='bottom')
def _rank_beta(self, series: pd.Series) -> pd.Series:
"""Rank beta where values closer to 1 are better"""
return 1 - abs(series - 1).rank(pct=True, na_option='bottom')
Implementing AI-Powered ESG Analysis
Step 4: Create ESG Scoring with Ollama
def analyze_esg_factors(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Use Ollama to analyze ESG factors for each stock
Args:
df: DataFrame with stock data
Returns:
DataFrame with ESG scores added
"""
df_esg = df.copy()
esg_scores = []
for _, row in df_esg.iterrows():
try:
esg_score = self._get_esg_analysis(
row['symbol'],
row['company_name'],
row['sector']
)
esg_scores.append(esg_score)
except Exception as e:
self.logger.error(f"ESG analysis failed for {row['symbol']}: {str(e)}")
esg_scores.append(self._default_esg_score())
# Add ESG scores to dataframe
esg_df = pd.DataFrame(esg_scores)
df_esg = pd.concat([df_esg, esg_df], axis=1)
# Calculate composite ESG score
df_esg['esg_score'] = (
df_esg['environmental_score'] * self.esg_weights['environmental'] +
df_esg['social_score'] * self.esg_weights['social'] +
df_esg['governance_score'] * self.esg_weights['governance']
)
return df_esg
def _get_esg_analysis(self, symbol: str, company_name: str, sector: str) -> Dict:
"""
Get ESG analysis from Ollama
Args:
symbol: Stock symbol
company_name: Company name
sector: Company sector
Returns:
Dictionary with ESG scores
"""
prompt = f"""
Analyze the ESG (Environmental, Social, Governance) factors for {company_name} ({symbol}) in the {sector} sector.
Please provide scores from 1-10 for each category and explain your reasoning:
Environmental (1-10):
- Carbon footprint and emissions
- Resource efficiency
- Waste management
- Climate change initiatives
Social (1-10):
- Employee relations and diversity
- Community impact
- Product safety and quality
- Human rights practices
Governance (1-10):
- Board composition and independence
- Executive compensation
- Shareholder rights
- Ethics and transparency
Format your response as JSON:
{{
"environmental_score": <score>,
"social_score": <score>,
"governance_score": <score>,
"environmental_reasoning": "<explanation>",
"social_reasoning": "<explanation>",
"governance_reasoning": "<explanation>"
}}
"""
response = self.client.generate(
model=self.model_name,
prompt=prompt,
options={'temperature': 0.3}
)
try:
# Parse JSON response
esg_data = json.loads(response['response'])
return esg_data
except json.JSONDecodeError:
self.logger.error(f"Failed to parse ESG JSON for {symbol}")
return self._default_esg_score()
def _default_esg_score(self) -> Dict:
"""Return default ESG scores when analysis fails"""
return {
'environmental_score': 5.0,
'social_score': 5.0,
'governance_score': 5.0,
'environmental_reasoning': 'Analysis unavailable',
'social_reasoning': 'Analysis unavailable',
'governance_reasoning': 'Analysis unavailable'
}
Step 5: Generate Investment Recommendations
def generate_recommendations(self, df: pd.DataFrame, top_n: int = 10) -> pd.DataFrame:
"""
Generate final investment recommendations
Args:
df: DataFrame with factor and ESG scores
top_n: Number of top recommendations to return
Returns:
DataFrame with ranked recommendations
"""
df_recommendations = df.copy()
# Calculate composite score (factor + ESG)
df_recommendations['composite_score'] = (
df_recommendations['factor_score'] * 0.6 +
df_recommendations['esg_score'] * 0.4
)
# Rank stocks by composite score
df_recommendations['rank'] = df_recommendations['composite_score'].rank(
ascending=False, na_option='bottom'
)
# Get top recommendations
top_recommendations = df_recommendations.nsmallest(top_n, 'rank')
# Generate AI-powered investment thesis for each
recommendations_with_thesis = []
for _, row in top_recommendations.iterrows():
thesis = self._generate_investment_thesis(row)
row_dict = row.to_dict()
row_dict['investment_thesis'] = thesis
recommendations_with_thesis.append(row_dict)
return pd.DataFrame(recommendations_with_thesis)
def _generate_investment_thesis(self, stock_data: pd.Series) -> str:
"""
Generate investment thesis using Ollama
Args:
stock_data: Series with stock metrics
Returns:
Investment thesis string
"""
prompt = f"""
Based on the following stock analysis, write a concise investment thesis (2-3 sentences):
Company: {stock_data['company_name']} ({stock_data['symbol']})
Sector: {stock_data['sector']}
Factor Analysis:
- Value Score: {stock_data['value_score']:.2f}
- Growth Score: {stock_data['growth_score']:.2f}
- Quality Score: {stock_data['quality_score']:.2f}
- Momentum Score: {stock_data['momentum_score']:.2f}
ESG Analysis:
- Environmental Score: {stock_data['environmental_score']:.1f}
- Social Score: {stock_data['social_score']:.1f}
- Governance Score: {stock_data['governance_score']:.1f}
Key Metrics:
- P/E Ratio: {stock_data['pe_ratio']:.2f}
- Revenue Growth: {stock_data['revenue_growth']:.2%}
- ROE: {stock_data['roe']:.2%}
Write a brief investment thesis focusing on the strongest factors and ESG considerations.
"""
response = self.client.generate(
model=self.model_name,
prompt=prompt,
options={'temperature': 0.5, 'max_tokens': 200}
)
return response['response'].strip()
Running Your Investment Screener
Step 6: Main Execution Script
# main.py
from investment_screener import InvestmentScreener
import pandas as pd
def main():
# Initialize screener
screener = InvestmentScreener()
# Define universe of stocks to screen
# Example: S&P 500 subset
stock_symbols = [
'AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA',
'META', 'NVDA', 'JPM', 'JNJ', 'PG',
'UNH', 'V', 'HD', 'MA', 'DIS'
]
print("🔍 Starting investment screening process...")
# Step 1: Fetch stock data
print("📊 Fetching stock data...")
stock_data = screener.fetch_stock_data(stock_symbols)
print(f"✅ Fetched data for {len(stock_data)} stocks")
# Step 2: Calculate factor scores
print("🧮 Calculating factor scores...")
factor_data = screener.calculate_factor_scores(stock_data)
print("✅ Factor analysis complete")
# Step 3: Analyze ESG factors
print("🌱 Analyzing ESG factors with AI...")
esg_data = screener.analyze_esg_factors(factor_data)
print("✅ ESG analysis complete")
# Step 4: Generate recommendations
print("💡 Generating investment recommendations...")
recommendations = screener.generate_recommendations(esg_data, top_n=5)
print("✅ Recommendations generated")
# Display results
print("\n📈 TOP INVESTMENT RECOMMENDATIONS:")
print("=" * 50)
for i, rec in recommendations.iterrows():
print(f"\n{int(rec['rank'])}. {rec['company_name']} ({rec['symbol']})")
print(f" Sector: {rec['sector']}")
print(f" Composite Score: {rec['composite_score']:.3f}")
print(f" Factor Score: {rec['factor_score']:.3f}")
print(f" ESG Score: {rec['esg_score']:.3f}")
print(f" Investment Thesis: {rec['investment_thesis']}")
# Save results
recommendations.to_csv('investment_recommendations.csv', index=False)
print(f"\n💾 Results saved to 'investment_recommendations.csv'")
if __name__ == "__main__":
main()
Expected Output
🔍 Starting investment screening process...
📊 Fetching stock data...
✅ Fetched data for 15 stocks
🧮 Calculating factor scores...
✅ Factor analysis complete
🌱 Analyzing ESG factors with AI...
✅ ESG analysis complete
💡 Generating investment recommendations...
✅ Recommendations generated
📈 TOP INVESTMENT RECOMMENDATIONS:
==================================================
1. Microsoft Corporation (MSFT)
Sector: Technology
Composite Score: 0.842
Factor Score: 0.785
ESG Score: 8.7
Investment Thesis: Microsoft demonstrates strong fundamentals with excellent growth prospects in cloud computing and AI, while maintaining industry-leading governance standards and sustainable business practices.
2. Apple Inc. (AAPL)
Sector: Technology
Composite Score: 0.798
Factor Score: 0.723
ESG Score: 8.2
Investment Thesis: Apple combines solid value metrics with consistent innovation, supported by strong environmental initiatives and robust corporate governance, making it an attractive long-term investment.
Advanced Features and Customization
Custom Factor Weights
# Customize factor importance based on investment strategy
screener.factor_weights = {
'value': 0.40, # Higher weight for value investing
'growth': 0.20,
'quality': 0.30,
'momentum': 0.10
}
# Adjust ESG importance
screener.esg_weights = {
'environmental': 0.50, # Higher environmental focus
'social': 0.25,
'governance': 0.25
}
Sector-Specific Screening
def screen_by_sector(self, sector: str, symbols: List[str]) -> pd.DataFrame:
"""
Screen stocks within a specific sector
Args:
sector: Target sector (e.g., 'Technology', 'Healthcare')
symbols: List of symbols to screen
Returns:
Sector-specific recommendations
"""
# Adjust factor weights for sector
sector_weights = self._get_sector_weights(sector)
original_weights = self.factor_weights.copy()
self.factor_weights = sector_weights
# Run screening
stock_data = self.fetch_stock_data(symbols)
sector_stocks = stock_data[stock_data['sector'] == sector]
if len(sector_stocks) == 0:
return pd.DataFrame()
factor_data = self.calculate_factor_scores(sector_stocks)
esg_data = self.analyze_esg_factors(factor_data)
recommendations = self.generate_recommendations(esg_data)
# Restore original weights
self.factor_weights = original_weights
return recommendations
Real-Time Data Integration
import schedule
import time
def schedule_screening():
"""Schedule regular screening runs"""
def run_screening():
screener = InvestmentScreener()
# Your screening logic here
print(f"Screening completed at {time.strftime('%Y-%m-%d %H:%M:%S')}")
# Schedule screening every day at 9 AM
schedule.every().day.at("09:00").do(run_screening)
while True:
schedule.run_pending()
time.sleep(60)
Performance Optimization Tips
Caching ESG Results
import pickle
import os
from datetime import datetime, timedelta
class CachedInvestmentScreener(InvestmentScreener):
def __init__(self, cache_dir: str = "esg_cache", cache_days: int = 7):
super().__init__()
self.cache_dir = cache_dir
self.cache_days = cache_days
os.makedirs(cache_dir, exist_ok=True)
def _get_cached_esg(self, symbol: str) -> Dict:
"""Get ESG data from cache if available and fresh"""
cache_file = os.path.join(self.cache_dir, f"{symbol}_esg.pkl")
if os.path.exists(cache_file):
# Check if cache is still valid
file_time = datetime.fromtimestamp(os.path.getmtime(cache_file))
if datetime.now() - file_time < timedelta(days=self.cache_days):
with open(cache_file, 'rb') as f:
return pickle.load(f)
return None
def _cache_esg(self, symbol: str, esg_data: Dict):
"""Cache ESG data for future use"""
cache_file = os.path.join(self.cache_dir, f"{symbol}_esg.pkl")
with open(cache_file, 'wb') as f:
pickle.dump(esg_data, f)
Batch Processing
def batch_process_stocks(self, symbols: List[str], batch_size: int = 5) -> pd.DataFrame:
"""
Process stocks in batches to avoid rate limits
Args:
symbols: List of stock symbols
batch_size: Number of stocks to process at once
Returns:
Combined results DataFrame
"""
all_results = []
for i in range(0, len(symbols), batch_size):
batch = symbols[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1}/{len(symbols)//batch_size + 1}")
try:
batch_data = self.fetch_stock_data(batch)
batch_factor = self.calculate_factor_scores(batch_data)
batch_esg = self.analyze_esg_factors(batch_factor)
all_results.append(batch_esg)
# Brief pause between batches
time.sleep(1)
except Exception as e:
self.logger.error(f"Batch processing error: {str(e)}")
continue
return pd.concat(all_results, ignore_index=True)
Deployment Options
Web Interface with Streamlit
# app.py
import streamlit as st
from investment_screener import InvestmentScreener
import plotly.express as px
def main():
st.title("🚀 AI Investment Screener")
st.write("Powered by Ollama for ESG and Factor Analysis")
# Sidebar controls
st.sidebar.header("Screening Parameters")
# Stock selection
symbols_input = st.sidebar.text_area(
"Enter stock symbols (comma-separated)",
"AAPL,MSFT,GOOGL,AMZN,TSLA"
)
symbols = [s.strip().upper() for s in symbols_input.split(",")]
# Factor weights
st.sidebar.subheader("Factor Weights")
value_weight = st.sidebar.slider("Value", 0.0, 1.0, 0.25)
growth_weight = st.sidebar.slider("Growth", 0.0, 1.0, 0.25)
quality_weight = st.sidebar.slider("Quality", 0.0, 1.0, 0.25)
momentum_weight = st.sidebar.slider("Momentum", 0.0, 1.0, 0.25)
# ESG weights
st.sidebar.subheader("ESG Weights")
env_weight = st.sidebar.slider("Environmental", 0.0, 1.0, 0.33)
social_weight = st.sidebar.slider("Social", 0.0, 1.0, 0.33)
gov_weight = st.sidebar.slider("Governance", 0.0, 1.0, 0.34)
if st.sidebar.button("🔍 Run Screening"):
with st.spinner("Running analysis..."):
screener = InvestmentScreener()
# Update weights
screener.factor_weights = {
'value': value_weight,
'growth': growth_weight,
'quality': quality_weight,
'momentum': momentum_weight
}
screener.esg_weights = {
'environmental': env_weight,
'social': social_weight,
'governance': gov_weight
}
# Run screening
stock_data = screener.fetch_stock_data(symbols)
factor_data = screener.calculate_factor_scores(stock_data)
esg_data = screener.analyze_esg_factors(factor_data)
recommendations = screener.generate_recommendations(esg_data)
# Display results
st.header("📈 Investment Recommendations")
# Top recommendations table
st.subheader("Top 5 Recommendations")
display_cols = ['rank', 'symbol', 'company_name', 'sector',
'composite_score', 'factor_score', 'esg_score']
st.dataframe(recommendations[display_cols])
# Visualization
st.subheader("📊 Score Comparison")
fig = px.scatter(
recommendations,
x='factor_score',
y='esg_score',
size='composite_score',
hover_data=['symbol', 'company_name'],
title="Factor vs ESG Scores"
)
st.plotly_chart(fig)
# Investment thesis
st.subheader("💡 Investment Thesis")
for _, rec in recommendations.head(3).iterrows():
st.write(f"**{rec['symbol']}**: {rec['investment_thesis']}")
if __name__ == "__main__":
main()
Run with: streamlit run app.py
API Deployment with FastAPI
# api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
from investment_screener import InvestmentScreener
import uvicorn
app = FastAPI(title="Investment Screener API", version="1.0.0")
class ScreeningRequest(BaseModel):
symbols: List[str]
factor_weights: Optional[dict] = None
esg_weights: Optional[dict] = None
top_n: int = 10
class ScreeningResponse(BaseModel):
recommendations: List[dict]
total_analyzed: int
processing_time: float
@app.post("/screen", response_model=ScreeningResponse)
async def screen_investments(request: ScreeningRequest):
"""
Screen investments with custom parameters
"""
try:
import time
start_time = time.time()
screener = InvestmentScreener()
# Update weights if provided
if request.factor_weights:
screener.factor_weights = request.factor_weights
if request.esg_weights:
screener.esg_weights = request.esg_weights
# Run screening
stock_data = screener.fetch_stock_data(request.symbols)
factor_data = screener.calculate_factor_scores(stock_data)
esg_data = screener.analyze_esg_factors(factor_data)
recommendations = screener.generate_recommendations(esg_data, request.top_n)
processing_time = time.time() - start_time
return ScreeningResponse(
recommendations=recommendations.to_dict('records'),
total_analyzed=len(stock_data),
processing_time=processing_time
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "investment_screener"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Best Practices and Considerations
Data Quality Management
Always validate your data inputs:
def validate_stock_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Validate and clean stock data
Args:
df: Raw stock data
Returns:
Cleaned DataFrame
"""
# Remove stocks with insufficient data
min_data_threshold = 0.7 # 70% of fields must be present
for idx, row in df.iterrows():
non_null_ratio = row.notna().sum() / len(row)
if non_null_ratio < min_data_threshold:
df.drop(idx, inplace=True)
self.logger.warning(f"Removed {row['symbol']} due to insufficient data")
# Handle outliers
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
# Define outlier bounds
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# Cap outliers instead of removing
df[col] = df[col].clip(lower=lower_bound, upper=upper_bound)
return df.reset_index(drop=True)
Error Handling and Monitoring
import logging
from functools import wraps
def retry_on_failure(max_retries: int = 3, delay: float = 1.0):
"""
Decorator for retrying failed operations
Args:
max_retries: Maximum number of retry attempts
delay: Delay between retries in seconds
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise e
logging.warning(f"Attempt {attempt + 1} failed for {func.__name__}: {str(e)}")
time.sleep(delay * (2 ** attempt)) # Exponential backoff
return wrapper
return decorator
# Apply to critical methods
@retry_on_failure(max_retries=3)
def fetch_stock_data_with_retry(self, symbols: List[str]) -> pd.DataFrame:
"""Fetch stock data with automatic retry"""
return self.fetch_stock_data(symbols)
Performance Monitoring
import time
from contextlib import contextmanager
@contextmanager
def performance_monitor(operation_name: str):
"""
Context manager for monitoring operation performance
Args:
operation_name: Name of the operation being monitored
"""
start_time = time.time()
start_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
try:
yield
finally:
end_time = time.time()
end_memory = psutil.Process().memory_info().rss / 1024 / 1024 # MB
duration = end_time - start_time
memory_usage = end_memory - start_memory
logging.info(f"{operation_name} completed in {duration:.2f}s, "
f"memory delta: {memory_usage:.2f}MB")
# Usage example
def run_full_screening(self, symbols: List[str]) -> pd.DataFrame:
"""Run complete screening with performance monitoring"""
with performance_monitor("Stock Data Fetch"):
stock_data = self.fetch_stock_data(symbols)
with performance_monitor("Factor Analysis"):
factor_data = self.calculate_factor_scores(stock_data)
with performance_monitor("ESG Analysis"):
esg_data = self.analyze_esg_factors(factor_data)
with performance_monitor("Recommendation Generation"):
recommendations = self.generate_recommendations(esg_data)
return recommendations
Advanced ESG Integration
Custom ESG Metrics
def calculate_custom_esg_metrics(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Calculate custom ESG metrics based on quantitative data
Args:
df: DataFrame with stock data
Returns:
DataFrame with custom ESG metrics
"""
df_custom = df.copy()
# Environmental metrics
df_custom['carbon_intensity'] = self._get_carbon_intensity(df_custom['symbol'])
df_custom['renewable_energy_usage'] = self._get_renewable_energy_data(df_custom['symbol'])
# Social metrics
df_custom['employee_satisfaction'] = self._get_employee_ratings(df_custom['symbol'])
df_custom['diversity_score'] = self._calculate_diversity_score(df_custom['symbol'])
# Governance metrics
df_custom['board_independence'] = self._get_board_independence(df_custom['symbol'])
df_custom['executive_pay_ratio'] = self._get_pay_ratio(df_custom['symbol'])
return df_custom
def _get_carbon_intensity(self, symbol: str) -> float:
"""
Get carbon intensity data from external API
Args:
symbol: Stock symbol
Returns:
Carbon intensity score
"""
# Example integration with sustainability API
try:
# This would integrate with actual sustainability data providers
# like Sustainalytics, MSCI ESG, or Bloomberg ESG
api_url = f"https://api.sustainability-provider.com/carbon/{symbol}"
response = requests.get(api_url, headers={'Authorization': 'Bearer YOUR_API_KEY'})
if response.status_code == 200:
data = response.json()
return data.get('carbon_intensity', 5.0)
except Exception as e:
self.logger.error(f"Failed to get carbon data for {symbol}: {str(e)}")
return 5.0 # Default middle score
ESG Risk Assessment
def assess_esg_risks(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Assess ESG-related investment risks
Args:
df: DataFrame with ESG scores
Returns:
DataFrame with risk assessments
"""
df_risk = df.copy()
# Define risk thresholds
risk_thresholds = {
'environmental': {'high': 3.0, 'medium': 6.0},
'social': {'high': 3.0, 'medium': 6.0},
'governance': {'high': 3.0, 'medium': 6.0}
}
# Calculate risk levels
for category in ['environmental', 'social', 'governance']:
score_col = f'{category}_score'
risk_col = f'{category}_risk'
df_risk[risk_col] = df_risk[score_col].apply(
lambda x: self._categorize_risk(x, risk_thresholds[category])
)
# Overall ESG risk
df_risk['overall_esg_risk'] = df_risk.apply(
lambda row: self._calculate_overall_risk(
row['environmental_risk'],
row['social_risk'],
row['governance_risk']
), axis=1
)
return df_risk
def _categorize_risk(self, score: float, thresholds: dict) -> str:
"""Categorize ESG risk level"""
if score <= thresholds['high']:
return 'HIGH'
elif score <= thresholds['medium']:
return 'MEDIUM'
else:
return 'LOW'
def _calculate_overall_risk(self, env_risk: str, social_risk: str, gov_risk: str) -> str:
"""Calculate overall ESG risk"""
risk_weights = {'HIGH': 3, 'MEDIUM': 2, 'LOW': 1}
total_risk = (risk_weights[env_risk] + risk_weights[social_risk] + risk_weights[gov_risk]) / 3
if total_risk >= 2.5:
return 'HIGH'
elif total_risk >= 1.5:
return 'MEDIUM'
else:
return 'LOW'
Creating Interactive Dashboards
Plotly Dashboard
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
def create_interactive_dashboard(self, recommendations: pd.DataFrame):
"""
Create interactive dashboard for investment analysis
Args:
recommendations: DataFrame with recommendations
"""
# Create subplots
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('Factor vs ESG Scores', 'Sector Distribution',
'Risk-Return Profile', 'ESG Breakdown'),
specs=[[{"secondary_y": False}, {"type": "pie"}],
[{"secondary_y": True}, {"type": "bar"}]]
)
# 1. Factor vs ESG scatter plot
fig.add_trace(
go.Scatter(
x=recommendations['factor_score'],
y=recommendations['esg_score'],
mode='markers+text',
text=recommendations['symbol'],
textposition="top center",
marker=dict(
size=recommendations['composite_score'] * 50,
color=recommendations['composite_score'],
colorscale='Viridis',
showscale=True
),
name='Stocks'
),
row=1, col=1
)
# 2. Sector distribution pie chart
sector_counts = recommendations['sector'].value_counts()
fig.add_trace(
go.Pie(
labels=sector_counts.index,
values=sector_counts.values,
name="Sectors"
),
row=1, col=2
)
# 3. Risk-return profile
fig.add_trace(
go.Scatter(
x=recommendations['beta'],
y=recommendations['revenue_growth'],
mode='markers+text',
text=recommendations['symbol'],
textposition="top center",
marker=dict(
size=10,
color=recommendations['esg_score'],
colorscale='RdYlGn',
showscale=True
),
name='Risk-Return'
),
row=2, col=1
)
# 4. ESG breakdown
esg_avg = recommendations[['environmental_score', 'social_score', 'governance_score']].mean()
fig.add_trace(
go.Bar(
x=['Environmental', 'Social', 'Governance'],
y=esg_avg.values,
marker_color=['green', 'blue', 'orange'],
name='ESG Scores'
),
row=2, col=2
)
# Update layout
fig.update_layout(
title_text="Investment Screening Dashboard",
showlegend=True,
height=800
)
# Update axes labels
fig.update_xaxes(title_text="Factor Score", row=1, col=1)
fig.update_yaxes(title_text="ESG Score", row=1, col=1)
fig.update_xaxes(title_text="Beta (Risk)", row=2, col=1)
fig.update_yaxes(title_text="Revenue Growth", row=2, col=1)
fig.update_yaxes(title_text="Average Score", row=2, col=2)
return fig
# Usage
dashboard = screener.create_interactive_dashboard(recommendations)
dashboard.show()
Real-Time Portfolio Tracking
class PortfolioTracker:
"""
Track performance of screened investments
"""
def __init__(self, recommendations: pd.DataFrame):
self.portfolio = recommendations.copy()
self.initial_prices = self._get_current_prices()
self.purchase_date = datetime.now()
def _get_current_prices(self) -> dict:
"""Get current prices for portfolio stocks"""
prices = {}
for symbol in self.portfolio['symbol']:
try:
ticker = yf.Ticker(symbol)
prices[symbol] = ticker.history(period='1d')['Close'].iloc[-1]
except:
prices[symbol] = self.portfolio[self.portfolio['symbol'] == symbol]['current_price'].iloc[0]
return prices
def update_performance(self) -> pd.DataFrame:
"""Update portfolio performance"""
current_prices = self._get_current_prices()
self.portfolio['current_price'] = self.portfolio['symbol'].map(current_prices)
self.portfolio['initial_price'] = self.portfolio['symbol'].map(self.initial_prices)
# Calculate returns
self.portfolio['return_pct'] = (
(self.portfolio['current_price'] - self.portfolio['initial_price']) /
self.portfolio['initial_price']
)
# Calculate days held
self.portfolio['days_held'] = (datetime.now() - self.purchase_date).days
# Annualized return
self.portfolio['annualized_return'] = (
self.portfolio['return_pct'] * 365 / self.portfolio['days_held']
)
return self.portfolio[['symbol', 'return_pct', 'annualized_return', 'composite_score']]
def generate_performance_report(self) -> str:
"""Generate performance report using Ollama"""
performance_data = self.update_performance()
prompt = f"""
Generate a portfolio performance report based on the following data:
Portfolio Performance:
{performance_data.to_string()}
Analysis Period: {self.purchase_date.strftime('%Y-%m-%d')} to {datetime.now().strftime('%Y-%m-%d')}
Please provide:
1. Overall portfolio performance summary
2. Best and worst performing stocks
3. ESG impact on performance
4. Recommendations for portfolio adjustments
Keep the report concise and actionable.
"""
response = self.client.generate(
model="llama3.1:8b",
prompt=prompt,
options={'temperature': 0.3}
)
return response['response']
Troubleshooting Common Issues
Memory Management
import gc
import psutil
def optimize_memory_usage(self):
"""
Optimize memory usage during screening
"""
# Clear unnecessary variables
gc.collect()
# Monitor memory usage
memory_usage = psutil.Process().memory_info().rss / 1024 / 1024 # MB
if memory_usage > 1000: # 1GB threshold
self.logger.warning(f"High memory usage: {memory_usage:.2f}MB")
# Implement memory-saving strategies
# Process in smaller batches
# Use data types optimization
# Clear intermediate results
API Rate Limiting
import time
from functools import wraps
def rate_limit(calls_per_minute: int = 60):
"""
Rate limiting decorator
Args:
calls_per_minute: Maximum calls per minute
"""
min_interval = 60.0 / calls_per_minute
last_called = [0.0]
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_called[0]
left_to_wait = min_interval - elapsed
if left_to_wait > 0:
time.sleep(left_to_wait)
ret = func(*args, **kwargs)
last_called[0] = time.time()
return ret
return wrapper
return decorator
# Apply to data fetching methods
@rate_limit(calls_per_minute=30)
def fetch_stock_data_rate_limited(self, symbols: List[str]) -> pd.DataFrame:
"""Fetch stock data with rate limiting"""
return self.fetch_stock_data(symbols)
Conclusion
Building an investment screener with Ollama transforms traditional stock analysis by integrating AI-powered ESG evaluation with quantitative factor analysis. This approach provides several key advantages:
Enhanced Decision Making: AI analysis of ESG factors provides nuanced insights that pure quantitative metrics might miss. The combination of traditional financial metrics with sustainability considerations creates a more comprehensive investment framework.
Scalability: Your screener can analyze hundreds of stocks simultaneously, something that would take analysts weeks to complete manually. The automated ESG analysis ensures consistent evaluation criteria across all investments.
Customization: The flexible architecture allows you to adjust factor weights, ESG criteria, and screening parameters based on your investment philosophy or client requirements.
Real-time Insights: With proper deployment, your screener can provide continuous monitoring and alerts for portfolio management and new investment opportunities.
The investment screener you've built combines the analytical power of AI with proven financial analysis techniques, creating a tool that's both sophisticated and practical for modern portfolio management.
Next Steps
- Expand Data Sources: Integrate additional ESG data providers and alternative datasets
- Implement Machine Learning: Add predictive models for future performance estimation
- Build Portfolio Optimization: Create position sizing and risk management features
- Add Backtesting: Test strategies against historical data
- Create Mobile App: Develop mobile interface for on-the-go screening
Your investment screener with Ollama represents the future of AI-assisted investment research, where traditional analysis meets modern sustainability considerations for better long-term outcomes.
Ready to revolutionize your investment process? Start screening with AI-powered ESG analysis today.