How to Create Investment Screener with Ollama: ESG and Factor Analysis

Build a powerful investment screener using Ollama for ESG and factor analysis. Complete guide with code examples and practical implementation.

Wall Street analysts spend 40+ hours per week screening investments manually. What if you could automate this process and add AI-powered ESG insights in under 2 hours?

This guide shows you how to build a comprehensive investment screener with Ollama that combines traditional factor analysis with modern ESG (Environmental, Social, Governance) evaluation. You'll create a tool that screens thousands of stocks, analyzes sustainability metrics, and generates investment recommendations automatically.

What You'll Learn

  • Set up Ollama for financial data processing
  • Build automated ESG scoring systems
  • Implement multi-factor investment screening
  • Create custom ranking algorithms
  • Deploy your screener with real-time updates

Prerequisites and Setup

System Requirements

Before building your investment screener, ensure you have:

  • Python 3.8 or higher
  • 8GB RAM minimum (16GB recommended)
  • Ollama installed locally
  • API keys for financial data providers

Installing Ollama

# Install Ollama (macOS/Linux)
curl -fsSL https://ollama.ai/install.sh | sh

# Pull the recommended model for financial analysis
ollama pull llama3.1:8b

# Verify installation
ollama --version

Python Dependencies

# requirements.txt
ollama==0.2.1
yfinance==0.2.18
pandas==2.0.3
numpy==1.24.3
requests==2.31.0
python-dotenv==1.0.0
matplotlib==3.7.1
seaborn==0.12.2

Install dependencies:

pip install -r requirements.txt

Understanding Investment Screening Fundamentals

What is Factor Analysis?

Factor analysis identifies underlying variables that explain stock performance patterns. Common factors include:

  • Value factors: Price-to-earnings, price-to-book ratios
  • Growth factors: Revenue growth, earnings growth rates
  • Quality factors: Return on equity, debt-to-equity ratios
  • Momentum factors: Price momentum, earnings revisions

ESG Integration Benefits

ESG analysis adds sustainability considerations to traditional screening:

  • Environmental: Carbon footprint, resource efficiency
  • Social: Employee satisfaction, community impact
  • Governance: Board composition, executive compensation

Companies with strong ESG scores often show better long-term performance and lower risk profiles.

Building the Core Investment Screener

Step 1: Create the Base Screener Class

# investment_screener.py
import ollama
import yfinance as yf
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple
import json
import logging

class InvestmentScreener:
    """
    AI-powered investment screener using Ollama for ESG and factor analysis
    """
    
    def __init__(self, model_name: str = "llama3.1:8b"):
        self.model_name = model_name
        self.client = ollama.Client()
        self.logger = self._setup_logging()
        
        # Initialize screening criteria
        self.factor_weights = {
            'value': 0.25,
            'growth': 0.25,
            'quality': 0.25,
            'momentum': 0.25
        }
        
        self.esg_weights = {
            'environmental': 0.33,
            'social': 0.33,
            'governance': 0.34
        }
    
    def _setup_logging(self) -> logging.Logger:
        """Configure logging for screening operations"""
        logger = logging.getLogger('investment_screener')
        logger.setLevel(logging.INFO)
        
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        
        return logger

Step 2: Implement Data Collection

    def fetch_stock_data(self, symbols: List[str]) -> pd.DataFrame:
        """
        Fetch comprehensive stock data for analysis
        
        Args:
            symbols: List of stock symbols to analyze
            
        Returns:
            DataFrame with stock metrics
        """
        stock_data = []
        
        for symbol in symbols:
            try:
                ticker = yf.Ticker(symbol)
                info = ticker.info
                
                # Get financial metrics
                stock_metrics = {
                    'symbol': symbol,
                    'company_name': info.get('longName', 'N/A'),
                    'sector': info.get('sector', 'N/A'),
                    'industry': info.get('industry', 'N/A'),
                    
                    # Value factors
                    'pe_ratio': info.get('trailingPE', np.nan),
                    'pb_ratio': info.get('priceToBook', np.nan),
                    'price_to_sales': info.get('priceToSalesTrailing12Months', np.nan),
                    
                    # Growth factors
                    'revenue_growth': info.get('revenueGrowth', np.nan),
                    'earnings_growth': info.get('earningsGrowth', np.nan),
                    
                    # Quality factors
                    'roe': info.get('returnOnEquity', np.nan),
                    'debt_to_equity': info.get('debtToEquity', np.nan),
                    'current_ratio': info.get('currentRatio', np.nan),
                    
                    # Momentum factors
                    'price_change_1y': info.get('52WeekChange', np.nan),
                    'beta': info.get('beta', np.nan),
                    
                    # Basic info
                    'market_cap': info.get('marketCap', np.nan),
                    'current_price': info.get('currentPrice', np.nan)
                }
                
                stock_data.append(stock_metrics)
                self.logger.info(f"Fetched data for {symbol}")
                
            except Exception as e:
                self.logger.error(f"Error fetching {symbol}: {str(e)}")
                continue
        
        return pd.DataFrame(stock_data)

Step 3: Implement Factor Analysis

    def calculate_factor_scores(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Calculate factor scores for each stock
        
        Args:
            df: DataFrame with stock data
            
        Returns:
            DataFrame with factor scores added
        """
        df_scored = df.copy()
        
        # Value Score (lower is better for ratios)
        df_scored['value_score'] = (
            self._rank_inverse(df_scored['pe_ratio']) * 0.4 +
            self._rank_inverse(df_scored['pb_ratio']) * 0.3 +
            self._rank_inverse(df_scored['price_to_sales']) * 0.3
        )
        
        # Growth Score (higher is better)
        df_scored['growth_score'] = (
            self._rank_normal(df_scored['revenue_growth']) * 0.6 +
            self._rank_normal(df_scored['earnings_growth']) * 0.4
        )
        
        # Quality Score (higher ROE, lower debt is better)
        df_scored['quality_score'] = (
            self._rank_normal(df_scored['roe']) * 0.4 +
            self._rank_inverse(df_scored['debt_to_equity']) * 0.3 +
            self._rank_normal(df_scored['current_ratio']) * 0.3
        )
        
        # Momentum Score (higher price change is better, beta closer to 1 is moderate)
        df_scored['momentum_score'] = (
            self._rank_normal(df_scored['price_change_1y']) * 0.7 +
            self._rank_beta(df_scored['beta']) * 0.3
        )
        
        # Calculate composite factor score
        df_scored['factor_score'] = (
            df_scored['value_score'] * self.factor_weights['value'] +
            df_scored['growth_score'] * self.factor_weights['growth'] +
            df_scored['quality_score'] * self.factor_weights['quality'] +
            df_scored['momentum_score'] * self.factor_weights['momentum']
        )
        
        return df_scored
    
    def _rank_normal(self, series: pd.Series) -> pd.Series:
        """Rank series where higher values are better"""
        return series.rank(pct=True, na_option='bottom')
    
    def _rank_inverse(self, series: pd.Series) -> pd.Series:
        """Rank series where lower values are better"""
        return series.rank(pct=True, ascending=False, na_option='bottom')
    
    def _rank_beta(self, series: pd.Series) -> pd.Series:
        """Rank beta where values closer to 1 are better"""
        return 1 - abs(series - 1).rank(pct=True, na_option='bottom')

Implementing AI-Powered ESG Analysis

Step 4: Create ESG Scoring with Ollama

    def analyze_esg_factors(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Use Ollama to analyze ESG factors for each stock
        
        Args:
            df: DataFrame with stock data
            
        Returns:
            DataFrame with ESG scores added
        """
        df_esg = df.copy()
        esg_scores = []
        
        for _, row in df_esg.iterrows():
            try:
                esg_score = self._get_esg_analysis(
                    row['symbol'], 
                    row['company_name'], 
                    row['sector']
                )
                esg_scores.append(esg_score)
                
            except Exception as e:
                self.logger.error(f"ESG analysis failed for {row['symbol']}: {str(e)}")
                esg_scores.append(self._default_esg_score())
        
        # Add ESG scores to dataframe
        esg_df = pd.DataFrame(esg_scores)
        df_esg = pd.concat([df_esg, esg_df], axis=1)
        
        # Calculate composite ESG score
        df_esg['esg_score'] = (
            df_esg['environmental_score'] * self.esg_weights['environmental'] +
            df_esg['social_score'] * self.esg_weights['social'] +
            df_esg['governance_score'] * self.esg_weights['governance']
        )
        
        return df_esg
    
    def _get_esg_analysis(self, symbol: str, company_name: str, sector: str) -> Dict:
        """
        Get ESG analysis from Ollama
        
        Args:
            symbol: Stock symbol
            company_name: Company name
            sector: Company sector
            
        Returns:
            Dictionary with ESG scores
        """
        prompt = f"""
        Analyze the ESG (Environmental, Social, Governance) factors for {company_name} ({symbol}) in the {sector} sector.
        
        Please provide scores from 1-10 for each category and explain your reasoning:
        
        Environmental (1-10):
        - Carbon footprint and emissions
        - Resource efficiency
        - Waste management
        - Climate change initiatives
        
        Social (1-10):
        - Employee relations and diversity
        - Community impact
        - Product safety and quality
        - Human rights practices
        
        Governance (1-10):
        - Board composition and independence
        - Executive compensation
        - Shareholder rights
        - Ethics and transparency
        
        Format your response as JSON:
        {{
            "environmental_score": <score>,
            "social_score": <score>,
            "governance_score": <score>,
            "environmental_reasoning": "<explanation>",
            "social_reasoning": "<explanation>",
            "governance_reasoning": "<explanation>"
        }}
        """
        
        response = self.client.generate(
            model=self.model_name,
            prompt=prompt,
            options={'temperature': 0.3}
        )
        
        try:
            # Parse JSON response
            esg_data = json.loads(response['response'])
            return esg_data
            
        except json.JSONDecodeError:
            self.logger.error(f"Failed to parse ESG JSON for {symbol}")
            return self._default_esg_score()
    
    def _default_esg_score(self) -> Dict:
        """Return default ESG scores when analysis fails"""
        return {
            'environmental_score': 5.0,
            'social_score': 5.0,
            'governance_score': 5.0,
            'environmental_reasoning': 'Analysis unavailable',
            'social_reasoning': 'Analysis unavailable',
            'governance_reasoning': 'Analysis unavailable'
        }

Step 5: Generate Investment Recommendations

    def generate_recommendations(self, df: pd.DataFrame, top_n: int = 10) -> pd.DataFrame:
        """
        Generate final investment recommendations
        
        Args:
            df: DataFrame with factor and ESG scores
            top_n: Number of top recommendations to return
            
        Returns:
            DataFrame with ranked recommendations
        """
        df_recommendations = df.copy()
        
        # Calculate composite score (factor + ESG)
        df_recommendations['composite_score'] = (
            df_recommendations['factor_score'] * 0.6 +
            df_recommendations['esg_score'] * 0.4
        )
        
        # Rank stocks by composite score
        df_recommendations['rank'] = df_recommendations['composite_score'].rank(
            ascending=False, na_option='bottom'
        )
        
        # Get top recommendations
        top_recommendations = df_recommendations.nsmallest(top_n, 'rank')
        
        # Generate AI-powered investment thesis for each
        recommendations_with_thesis = []
        
        for _, row in top_recommendations.iterrows():
            thesis = self._generate_investment_thesis(row)
            row_dict = row.to_dict()
            row_dict['investment_thesis'] = thesis
            recommendations_with_thesis.append(row_dict)
        
        return pd.DataFrame(recommendations_with_thesis)
    
    def _generate_investment_thesis(self, stock_data: pd.Series) -> str:
        """
        Generate investment thesis using Ollama
        
        Args:
            stock_data: Series with stock metrics
            
        Returns:
            Investment thesis string
        """
        prompt = f"""
        Based on the following stock analysis, write a concise investment thesis (2-3 sentences):
        
        Company: {stock_data['company_name']} ({stock_data['symbol']})
        Sector: {stock_data['sector']}
        
        Factor Analysis:
        - Value Score: {stock_data['value_score']:.2f}
        - Growth Score: {stock_data['growth_score']:.2f}
        - Quality Score: {stock_data['quality_score']:.2f}
        - Momentum Score: {stock_data['momentum_score']:.2f}
        
        ESG Analysis:
        - Environmental Score: {stock_data['environmental_score']:.1f}
        - Social Score: {stock_data['social_score']:.1f}
        - Governance Score: {stock_data['governance_score']:.1f}
        
        Key Metrics:
        - P/E Ratio: {stock_data['pe_ratio']:.2f}
        - Revenue Growth: {stock_data['revenue_growth']:.2%}
        - ROE: {stock_data['roe']:.2%}
        
        Write a brief investment thesis focusing on the strongest factors and ESG considerations.
        """
        
        response = self.client.generate(
            model=self.model_name,
            prompt=prompt,
            options={'temperature': 0.5, 'max_tokens': 200}
        )
        
        return response['response'].strip()

Running Your Investment Screener

Step 6: Main Execution Script

# main.py
from investment_screener import InvestmentScreener
import pandas as pd

def main():
    # Initialize screener
    screener = InvestmentScreener()
    
    # Define universe of stocks to screen
    # Example: S&P 500 subset
    stock_symbols = [
        'AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA',
        'META', 'NVDA', 'JPM', 'JNJ', 'PG',
        'UNH', 'V', 'HD', 'MA', 'DIS'
    ]
    
    print("🔍 Starting investment screening process...")
    
    # Step 1: Fetch stock data
    print("📊 Fetching stock data...")
    stock_data = screener.fetch_stock_data(stock_symbols)
    print(f"✅ Fetched data for {len(stock_data)} stocks")
    
    # Step 2: Calculate factor scores
    print("🧮 Calculating factor scores...")
    factor_data = screener.calculate_factor_scores(stock_data)
    print("✅ Factor analysis complete")
    
    # Step 3: Analyze ESG factors
    print("🌱 Analyzing ESG factors with AI...")
    esg_data = screener.analyze_esg_factors(factor_data)
    print("✅ ESG analysis complete")
    
    # Step 4: Generate recommendations
    print("💡 Generating investment recommendations...")
    recommendations = screener.generate_recommendations(esg_data, top_n=5)
    print("✅ Recommendations generated")
    
    # Display results
    print("\n📈 TOP INVESTMENT RECOMMENDATIONS:")
    print("=" * 50)
    
    for i, rec in recommendations.iterrows():
        print(f"\n{int(rec['rank'])}. {rec['company_name']} ({rec['symbol']})")
        print(f"   Sector: {rec['sector']}")
        print(f"   Composite Score: {rec['composite_score']:.3f}")
        print(f"   Factor Score: {rec['factor_score']:.3f}")
        print(f"   ESG Score: {rec['esg_score']:.3f}")
        print(f"   Investment Thesis: {rec['investment_thesis']}")
    
    # Save results
    recommendations.to_csv('investment_recommendations.csv', index=False)
    print(f"\n💾 Results saved to 'investment_recommendations.csv'")

if __name__ == "__main__":
    main()

Expected Output

🔍 Starting investment screening process...
📊 Fetching stock data...
✅ Fetched data for 15 stocks
🧮 Calculating factor scores...
✅ Factor analysis complete
🌱 Analyzing ESG factors with AI...
✅ ESG analysis complete
💡 Generating investment recommendations...
✅ Recommendations generated

📈 TOP INVESTMENT RECOMMENDATIONS:
==================================================

1. Microsoft Corporation (MSFT)
   Sector: Technology
   Composite Score: 0.842
   Factor Score: 0.785
   ESG Score: 8.7
   Investment Thesis: Microsoft demonstrates strong fundamentals with excellent growth prospects in cloud computing and AI, while maintaining industry-leading governance standards and sustainable business practices.

2. Apple Inc. (AAPL)
   Sector: Technology
   Composite Score: 0.798
   Factor Score: 0.723
   ESG Score: 8.2
   Investment Thesis: Apple combines solid value metrics with consistent innovation, supported by strong environmental initiatives and robust corporate governance, making it an attractive long-term investment.

Advanced Features and Customization

Custom Factor Weights

# Customize factor importance based on investment strategy
screener.factor_weights = {
    'value': 0.40,      # Higher weight for value investing
    'growth': 0.20,
    'quality': 0.30,
    'momentum': 0.10
}

# Adjust ESG importance
screener.esg_weights = {
    'environmental': 0.50,  # Higher environmental focus
    'social': 0.25,
    'governance': 0.25
}

Sector-Specific Screening

def screen_by_sector(self, sector: str, symbols: List[str]) -> pd.DataFrame:
    """
    Screen stocks within a specific sector
    
    Args:
        sector: Target sector (e.g., 'Technology', 'Healthcare')
        symbols: List of symbols to screen
        
    Returns:
        Sector-specific recommendations
    """
    # Adjust factor weights for sector
    sector_weights = self._get_sector_weights(sector)
    original_weights = self.factor_weights.copy()
    
    self.factor_weights = sector_weights
    
    # Run screening
    stock_data = self.fetch_stock_data(symbols)
    sector_stocks = stock_data[stock_data['sector'] == sector]
    
    if len(sector_stocks) == 0:
        return pd.DataFrame()
    
    factor_data = self.calculate_factor_scores(sector_stocks)
    esg_data = self.analyze_esg_factors(factor_data)
    recommendations = self.generate_recommendations(esg_data)
    
    # Restore original weights
    self.factor_weights = original_weights
    
    return recommendations

Real-Time Data Integration

import schedule
import time

def schedule_screening():
    """Schedule regular screening runs"""
    
    def run_screening():
        screener = InvestmentScreener()
        # Your screening logic here
        print(f"Screening completed at {time.strftime('%Y-%m-%d %H:%M:%S')}")
    
    # Schedule screening every day at 9 AM
    schedule.every().day.at("09:00").do(run_screening)
    
    while True:
        schedule.run_pending()
        time.sleep(60)

Performance Optimization Tips

Caching ESG Results

import pickle
import os
from datetime import datetime, timedelta

class CachedInvestmentScreener(InvestmentScreener):
    def __init__(self, cache_dir: str = "esg_cache", cache_days: int = 7):
        super().__init__()
        self.cache_dir = cache_dir
        self.cache_days = cache_days
        os.makedirs(cache_dir, exist_ok=True)
    
    def _get_cached_esg(self, symbol: str) -> Dict:
        """Get ESG data from cache if available and fresh"""
        cache_file = os.path.join(self.cache_dir, f"{symbol}_esg.pkl")
        
        if os.path.exists(cache_file):
            # Check if cache is still valid
            file_time = datetime.fromtimestamp(os.path.getmtime(cache_file))
            if datetime.now() - file_time < timedelta(days=self.cache_days):
                with open(cache_file, 'rb') as f:
                    return pickle.load(f)
        
        return None
    
    def _cache_esg(self, symbol: str, esg_data: Dict):
        """Cache ESG data for future use"""
        cache_file = os.path.join(self.cache_dir, f"{symbol}_esg.pkl")
        with open(cache_file, 'wb') as f:
            pickle.dump(esg_data, f)

Batch Processing

def batch_process_stocks(self, symbols: List[str], batch_size: int = 5) -> pd.DataFrame:
    """
    Process stocks in batches to avoid rate limits
    
    Args:
        symbols: List of stock symbols
        batch_size: Number of stocks to process at once
        
    Returns:
        Combined results DataFrame
    """
    all_results = []
    
    for i in range(0, len(symbols), batch_size):
        batch = symbols[i:i + batch_size]
        
        print(f"Processing batch {i//batch_size + 1}/{len(symbols)//batch_size + 1}")
        
        try:
            batch_data = self.fetch_stock_data(batch)
            batch_factor = self.calculate_factor_scores(batch_data)
            batch_esg = self.analyze_esg_factors(batch_factor)
            
            all_results.append(batch_esg)
            
            # Brief pause between batches
            time.sleep(1)
            
        except Exception as e:
            self.logger.error(f"Batch processing error: {str(e)}")
            continue
    
    return pd.concat(all_results, ignore_index=True)

Deployment Options

Web Interface with Streamlit

# app.py
import streamlit as st
from investment_screener import InvestmentScreener
import plotly.express as px

def main():
    st.title("🚀 AI Investment Screener")
    st.write("Powered by Ollama for ESG and Factor Analysis")
    
    # Sidebar controls
    st.sidebar.header("Screening Parameters")
    
    # Stock selection
    symbols_input = st.sidebar.text_area(
        "Enter stock symbols (comma-separated)",
        "AAPL,MSFT,GOOGL,AMZN,TSLA"
    )
    
    symbols = [s.strip().upper() for s in symbols_input.split(",")]
    
    # Factor weights
    st.sidebar.subheader("Factor Weights")
    value_weight = st.sidebar.slider("Value", 0.0, 1.0, 0.25)
    growth_weight = st.sidebar.slider("Growth", 0.0, 1.0, 0.25)
    quality_weight = st.sidebar.slider("Quality", 0.0, 1.0, 0.25)
    momentum_weight = st.sidebar.slider("Momentum", 0.0, 1.0, 0.25)
    
    # ESG weights
    st.sidebar.subheader("ESG Weights")
    env_weight = st.sidebar.slider("Environmental", 0.0, 1.0, 0.33)
    social_weight = st.sidebar.slider("Social", 0.0, 1.0, 0.33)
    gov_weight = st.sidebar.slider("Governance", 0.0, 1.0, 0.34)
    
    if st.sidebar.button("🔍 Run Screening"):
        with st.spinner("Running analysis..."):
            screener = InvestmentScreener()
            
            # Update weights
            screener.factor_weights = {
                'value': value_weight,
                'growth': growth_weight,
                'quality': quality_weight,
                'momentum': momentum_weight
            }
            
            screener.esg_weights = {
                'environmental': env_weight,
                'social': social_weight,
                'governance': gov_weight
            }
            
            # Run screening
            stock_data = screener.fetch_stock_data(symbols)
            factor_data = screener.calculate_factor_scores(stock_data)
            esg_data = screener.analyze_esg_factors(factor_data)
            recommendations = screener.generate_recommendations(esg_data)
            
            # Display results
            st.header("📈 Investment Recommendations")
            
            # Top recommendations table
            st.subheader("Top 5 Recommendations")
            display_cols = ['rank', 'symbol', 'company_name', 'sector', 
                          'composite_score', 'factor_score', 'esg_score']
            st.dataframe(recommendations[display_cols])
            
            # Visualization
            st.subheader("📊 Score Comparison")
            fig = px.scatter(
                recommendations, 
                x='factor_score', 
                y='esg_score',
                size='composite_score',
                hover_data=['symbol', 'company_name'],
                title="Factor vs ESG Scores"
            )
            st.plotly_chart(fig)
            
            # Investment thesis
            st.subheader("💡 Investment Thesis")
            for _, rec in recommendations.head(3).iterrows():
                st.write(f"**{rec['symbol']}**: {rec['investment_thesis']}")

if __name__ == "__main__":
    main()

Run with: streamlit run app.py

API Deployment with FastAPI

# api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
from investment_screener import InvestmentScreener
import uvicorn

app = FastAPI(title="Investment Screener API", version="1.0.0")

class ScreeningRequest(BaseModel):
    symbols: List[str]
    factor_weights: Optional[dict] = None
    esg_weights: Optional[dict] = None
    top_n: int = 10

class ScreeningResponse(BaseModel):
    recommendations: List[dict]
    total_analyzed: int
    processing_time: float

@app.post("/screen", response_model=ScreeningResponse)
async def screen_investments(request: ScreeningRequest):
    """
    Screen investments with custom parameters
    """
    try:
        import time
        start_time = time.time()
        
        screener = InvestmentScreener()
        
        # Update weights if provided
        if request.factor_weights:
            screener.factor_weights = request.factor_weights
        
        if request.esg_weights:
            screener.esg_weights = request.esg_weights
        
        # Run screening
        stock_data = screener.fetch_stock_data(request.symbols)
        factor_data = screener.calculate_factor_scores(stock_data)
        esg_data = screener.analyze_esg_factors(factor_data)
        recommendations = screener.generate_recommendations(esg_data, request.top_n)
        
        processing_time = time.time() - start_time
        
        return ScreeningResponse(
            recommendations=recommendations.to_dict('records'),
            total_analyzed=len(stock_data),
            processing_time=processing_time
        )
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {"status": "healthy", "service": "investment_screener"}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Best Practices and Considerations

Data Quality Management

Always validate your data inputs:

def validate_stock_data(self, df: pd.DataFrame) -> pd.DataFrame:
    """
    Validate and clean stock data
    
    Args:
        df: Raw stock data
        
    Returns:
        Cleaned DataFrame
    """
    # Remove stocks with insufficient data
    min_data_threshold = 0.7  # 70% of fields must be present
    
    for idx, row in df.iterrows():
        non_null_ratio = row.notna().sum() / len(row)
        
        if non_null_ratio < min_data_threshold:
            df.drop(idx, inplace=True)
            self.logger.warning(f"Removed {row['symbol']} due to insufficient data")
    
    # Handle outliers
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    
    for col in numeric_cols:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        
        # Define outlier bounds
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        # Cap outliers instead of removing
        df[col] = df[col].clip(lower=lower_bound, upper=upper_bound)
    
    return df.reset_index(drop=True)

Error Handling and Monitoring

import logging
from functools import wraps

def retry_on_failure(max_retries: int = 3, delay: float = 1.0):
    """
    Decorator for retrying failed operations
    
    Args:
        max_retries: Maximum number of retry attempts
        delay: Delay between retries in seconds
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise e
                    
                    logging.warning(f"Attempt {attempt + 1} failed for {func.__name__}: {str(e)}")
                    time.sleep(delay * (2 ** attempt))  # Exponential backoff
            
        return wrapper
    return decorator

# Apply to critical methods
@retry_on_failure(max_retries=3)
def fetch_stock_data_with_retry(self, symbols: List[str]) -> pd.DataFrame:
    """Fetch stock data with automatic retry"""
    return self.fetch_stock_data(symbols)

Performance Monitoring

import time
from contextlib import contextmanager

@contextmanager
def performance_monitor(operation_name: str):
    """
    Context manager for monitoring operation performance
    
    Args:
        operation_name: Name of the operation being monitored
    """
    start_time = time.time()
    start_memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB
    
    try:
        yield
    finally:
        end_time = time.time()
        end_memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB
        
        duration = end_time - start_time
        memory_usage = end_memory - start_memory
        
        logging.info(f"{operation_name} completed in {duration:.2f}s, "
                    f"memory delta: {memory_usage:.2f}MB")

# Usage example
def run_full_screening(self, symbols: List[str]) -> pd.DataFrame:
    """Run complete screening with performance monitoring"""
    
    with performance_monitor("Stock Data Fetch"):
        stock_data = self.fetch_stock_data(symbols)
    
    with performance_monitor("Factor Analysis"):
        factor_data = self.calculate_factor_scores(stock_data)
    
    with performance_monitor("ESG Analysis"):
        esg_data = self.analyze_esg_factors(factor_data)
    
    with performance_monitor("Recommendation Generation"):
        recommendations = self.generate_recommendations(esg_data)
    
    return recommendations

Advanced ESG Integration

Custom ESG Metrics

def calculate_custom_esg_metrics(self, df: pd.DataFrame) -> pd.DataFrame:
    """
    Calculate custom ESG metrics based on quantitative data
    
    Args:
        df: DataFrame with stock data
        
    Returns:
        DataFrame with custom ESG metrics
    """
    df_custom = df.copy()
    
    # Environmental metrics
    df_custom['carbon_intensity'] = self._get_carbon_intensity(df_custom['symbol'])
    df_custom['renewable_energy_usage'] = self._get_renewable_energy_data(df_custom['symbol'])
    
    # Social metrics
    df_custom['employee_satisfaction'] = self._get_employee_ratings(df_custom['symbol'])
    df_custom['diversity_score'] = self._calculate_diversity_score(df_custom['symbol'])
    
    # Governance metrics
    df_custom['board_independence'] = self._get_board_independence(df_custom['symbol'])
    df_custom['executive_pay_ratio'] = self._get_pay_ratio(df_custom['symbol'])
    
    return df_custom

def _get_carbon_intensity(self, symbol: str) -> float:
    """
    Get carbon intensity data from external API
    
    Args:
        symbol: Stock symbol
        
    Returns:
        Carbon intensity score
    """
    # Example integration with sustainability API
    try:
        # This would integrate with actual sustainability data providers
        # like Sustainalytics, MSCI ESG, or Bloomberg ESG
        api_url = f"https://api.sustainability-provider.com/carbon/{symbol}"
        response = requests.get(api_url, headers={'Authorization': 'Bearer YOUR_API_KEY'})
        
        if response.status_code == 200:
            data = response.json()
            return data.get('carbon_intensity', 5.0)
        
    except Exception as e:
        self.logger.error(f"Failed to get carbon data for {symbol}: {str(e)}")
    
    return 5.0  # Default middle score

ESG Risk Assessment

def assess_esg_risks(self, df: pd.DataFrame) -> pd.DataFrame:
    """
    Assess ESG-related investment risks
    
    Args:
        df: DataFrame with ESG scores
        
    Returns:
        DataFrame with risk assessments
    """
    df_risk = df.copy()
    
    # Define risk thresholds
    risk_thresholds = {
        'environmental': {'high': 3.0, 'medium': 6.0},
        'social': {'high': 3.0, 'medium': 6.0},
        'governance': {'high': 3.0, 'medium': 6.0}
    }
    
    # Calculate risk levels
    for category in ['environmental', 'social', 'governance']:
        score_col = f'{category}_score'
        risk_col = f'{category}_risk'
        
        df_risk[risk_col] = df_risk[score_col].apply(
            lambda x: self._categorize_risk(x, risk_thresholds[category])
        )
    
    # Overall ESG risk
    df_risk['overall_esg_risk'] = df_risk.apply(
        lambda row: self._calculate_overall_risk(
            row['environmental_risk'],
            row['social_risk'],
            row['governance_risk']
        ), axis=1
    )
    
    return df_risk

def _categorize_risk(self, score: float, thresholds: dict) -> str:
    """Categorize ESG risk level"""
    if score <= thresholds['high']:
        return 'HIGH'
    elif score <= thresholds['medium']:
        return 'MEDIUM'
    else:
        return 'LOW'

def _calculate_overall_risk(self, env_risk: str, social_risk: str, gov_risk: str) -> str:
    """Calculate overall ESG risk"""
    risk_weights = {'HIGH': 3, 'MEDIUM': 2, 'LOW': 1}
    
    total_risk = (risk_weights[env_risk] + risk_weights[social_risk] + risk_weights[gov_risk]) / 3
    
    if total_risk >= 2.5:
        return 'HIGH'
    elif total_risk >= 1.5:
        return 'MEDIUM'
    else:
        return 'LOW'

Creating Interactive Dashboards

Plotly Dashboard

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px

def create_interactive_dashboard(self, recommendations: pd.DataFrame):
    """
    Create interactive dashboard for investment analysis
    
    Args:
        recommendations: DataFrame with recommendations
    """
    # Create subplots
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=('Factor vs ESG Scores', 'Sector Distribution', 
                       'Risk-Return Profile', 'ESG Breakdown'),
        specs=[[{"secondary_y": False}, {"type": "pie"}],
               [{"secondary_y": True}, {"type": "bar"}]]
    )
    
    # 1. Factor vs ESG scatter plot
    fig.add_trace(
        go.Scatter(
            x=recommendations['factor_score'],
            y=recommendations['esg_score'],
            mode='markers+text',
            text=recommendations['symbol'],
            textposition="top center",
            marker=dict(
                size=recommendations['composite_score'] * 50,
                color=recommendations['composite_score'],
                colorscale='Viridis',
                showscale=True
            ),
            name='Stocks'
        ),
        row=1, col=1
    )
    
    # 2. Sector distribution pie chart
    sector_counts = recommendations['sector'].value_counts()
    fig.add_trace(
        go.Pie(
            labels=sector_counts.index,
            values=sector_counts.values,
            name="Sectors"
        ),
        row=1, col=2
    )
    
    # 3. Risk-return profile
    fig.add_trace(
        go.Scatter(
            x=recommendations['beta'],
            y=recommendations['revenue_growth'],
            mode='markers+text',
            text=recommendations['symbol'],
            textposition="top center",
            marker=dict(
                size=10,
                color=recommendations['esg_score'],
                colorscale='RdYlGn',
                showscale=True
            ),
            name='Risk-Return'
        ),
        row=2, col=1
    )
    
    # 4. ESG breakdown
    esg_avg = recommendations[['environmental_score', 'social_score', 'governance_score']].mean()
    fig.add_trace(
        go.Bar(
            x=['Environmental', 'Social', 'Governance'],
            y=esg_avg.values,
            marker_color=['green', 'blue', 'orange'],
            name='ESG Scores'
        ),
        row=2, col=2
    )
    
    # Update layout
    fig.update_layout(
        title_text="Investment Screening Dashboard",
        showlegend=True,
        height=800
    )
    
    # Update axes labels
    fig.update_xaxes(title_text="Factor Score", row=1, col=1)
    fig.update_yaxes(title_text="ESG Score", row=1, col=1)
    fig.update_xaxes(title_text="Beta (Risk)", row=2, col=1)
    fig.update_yaxes(title_text="Revenue Growth", row=2, col=1)
    fig.update_yaxes(title_text="Average Score", row=2, col=2)
    
    return fig

# Usage
dashboard = screener.create_interactive_dashboard(recommendations)
dashboard.show()

Real-Time Portfolio Tracking

class PortfolioTracker:
    """
    Track performance of screened investments
    """
    
    def __init__(self, recommendations: pd.DataFrame):
        self.portfolio = recommendations.copy()
        self.initial_prices = self._get_current_prices()
        self.purchase_date = datetime.now()
    
    def _get_current_prices(self) -> dict:
        """Get current prices for portfolio stocks"""
        prices = {}
        for symbol in self.portfolio['symbol']:
            try:
                ticker = yf.Ticker(symbol)
                prices[symbol] = ticker.history(period='1d')['Close'].iloc[-1]
            except:
                prices[symbol] = self.portfolio[self.portfolio['symbol'] == symbol]['current_price'].iloc[0]
        return prices
    
    def update_performance(self) -> pd.DataFrame:
        """Update portfolio performance"""
        current_prices = self._get_current_prices()
        
        self.portfolio['current_price'] = self.portfolio['symbol'].map(current_prices)
        self.portfolio['initial_price'] = self.portfolio['symbol'].map(self.initial_prices)
        
        # Calculate returns
        self.portfolio['return_pct'] = (
            (self.portfolio['current_price'] - self.portfolio['initial_price']) / 
            self.portfolio['initial_price']
        )
        
        # Calculate days held
        self.portfolio['days_held'] = (datetime.now() - self.purchase_date).days
        
        # Annualized return
        self.portfolio['annualized_return'] = (
            self.portfolio['return_pct'] * 365 / self.portfolio['days_held']
        )
        
        return self.portfolio[['symbol', 'return_pct', 'annualized_return', 'composite_score']]
    
    def generate_performance_report(self) -> str:
        """Generate performance report using Ollama"""
        performance_data = self.update_performance()
        
        prompt = f"""
        Generate a portfolio performance report based on the following data:
        
        Portfolio Performance:
        {performance_data.to_string()}
        
        Analysis Period: {self.purchase_date.strftime('%Y-%m-%d')} to {datetime.now().strftime('%Y-%m-%d')}
        
        Please provide:
        1. Overall portfolio performance summary
        2. Best and worst performing stocks
        3. ESG impact on performance
        4. Recommendations for portfolio adjustments
        
        Keep the report concise and actionable.
        """
        
        response = self.client.generate(
            model="llama3.1:8b",
            prompt=prompt,
            options={'temperature': 0.3}
        )
        
        return response['response']

Troubleshooting Common Issues

Memory Management

import gc
import psutil

def optimize_memory_usage(self):
    """
    Optimize memory usage during screening
    """
    # Clear unnecessary variables
    gc.collect()
    
    # Monitor memory usage
    memory_usage = psutil.Process().memory_info().rss / 1024 / 1024  # MB
    
    if memory_usage > 1000:  # 1GB threshold
        self.logger.warning(f"High memory usage: {memory_usage:.2f}MB")
        
        # Implement memory-saving strategies
        # Process in smaller batches
        # Use data types optimization
        # Clear intermediate results

API Rate Limiting

import time
from functools import wraps

def rate_limit(calls_per_minute: int = 60):
    """
    Rate limiting decorator
    
    Args:
        calls_per_minute: Maximum calls per minute
    """
    min_interval = 60.0 / calls_per_minute
    last_called = [0.0]
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_called[0]
            left_to_wait = min_interval - elapsed
            
            if left_to_wait > 0:
                time.sleep(left_to_wait)
            
            ret = func(*args, **kwargs)
            last_called[0] = time.time()
            return ret
        
        return wrapper
    return decorator

# Apply to data fetching methods
@rate_limit(calls_per_minute=30)
def fetch_stock_data_rate_limited(self, symbols: List[str]) -> pd.DataFrame:
    """Fetch stock data with rate limiting"""
    return self.fetch_stock_data(symbols)

Conclusion

Building an investment screener with Ollama transforms traditional stock analysis by integrating AI-powered ESG evaluation with quantitative factor analysis. This approach provides several key advantages:

Enhanced Decision Making: AI analysis of ESG factors provides nuanced insights that pure quantitative metrics might miss. The combination of traditional financial metrics with sustainability considerations creates a more comprehensive investment framework.

Scalability: Your screener can analyze hundreds of stocks simultaneously, something that would take analysts weeks to complete manually. The automated ESG analysis ensures consistent evaluation criteria across all investments.

Customization: The flexible architecture allows you to adjust factor weights, ESG criteria, and screening parameters based on your investment philosophy or client requirements.

Real-time Insights: With proper deployment, your screener can provide continuous monitoring and alerts for portfolio management and new investment opportunities.

The investment screener you've built combines the analytical power of AI with proven financial analysis techniques, creating a tool that's both sophisticated and practical for modern portfolio management.

Next Steps

  1. Expand Data Sources: Integrate additional ESG data providers and alternative datasets
  2. Implement Machine Learning: Add predictive models for future performance estimation
  3. Build Portfolio Optimization: Create position sizing and risk management features
  4. Add Backtesting: Test strategies against historical data
  5. Create Mobile App: Develop mobile interface for on-the-go screening

Your investment screener with Ollama represents the future of AI-assisted investment research, where traditional analysis meets modern sustainability considerations for better long-term outcomes.

Ready to revolutionize your investment process? Start screening with AI-powered ESG analysis today.