TradingView Integration with Ollama: Custom Indicator Development Guide

Learn how to integrate TradingView with Ollama for custom indicator development. Build AI-powered trading indicators with step-by-step code examples.

Ever stared at a chart wondering if your trading indicator could be smarter? What if your moving averages could learn from market patterns, or your RSI could adapt to volatility changes? Welcome to the world where artificial intelligence meets technical analysis.

Combining TradingView's powerful charting platform with Ollama's local AI capabilities opens new doors for custom indicator development. This integration lets you create indicators that don't just calculate—they learn, adapt, and evolve with market conditions.

What You'll Learn

This guide shows you how to build AI-enhanced trading indicators using TradingView's Pine Script and Ollama's machine learning models. You'll create indicators that process market data through AI models for better signal generation and pattern recognition.

Understanding TradingView and Ollama Integration

TradingView Platform Overview

TradingView serves as your charting foundation with Pine Script, its proprietary programming language. Pine Script handles data visualization, user interactions, and basic calculations. However, it lacks advanced machine learning capabilities—where Ollama fills the gap.

Ollama's Role in Trading Analysis

Ollama provides local AI model execution without cloud dependencies. This setup ensures data privacy and reduces latency for real-time trading decisions. The integration works through HTTP requests from Pine Script to your local Ollama instance.

Integration Architecture

The system architecture follows this flow:

  • TradingView collects market data
  • Pine Script processes initial calculations
  • Data sends to Ollama via HTTP requests
  • Ollama processes data through AI models
  • Results return to TradingView for visualization

Prerequisites and Setup Requirements

System Requirements

Your development environment needs:

  • Windows 10/11, macOS 10.14+, or Linux Ubuntu 18.04+
  • 8GB RAM minimum (16GB recommended)
  • 10GB free disk space
  • Stable internet connection
  • Python 3.8 or higher

Installing Ollama

Download and install Ollama from the official website:

# Linux/macOS
curl -fsSL https://ollama.ai/install.sh | sh

# Windows
# Download installer from https://ollama.ai/download

Verify installation:

ollama --version

Setting Up Development Environment

Create a project directory and install required dependencies:

mkdir tradingview-ollama
cd tradingview-ollama
pip install requests flask numpy pandas

Creating Your First AI-Enhanced Indicator

Basic HTTP Bridge Setup

Create a Flask application to handle requests between TradingView and Ollama:

# app.py
from flask import Flask, request, jsonify
import requests
import json
import numpy as np

app = Flask(__name__)

@app.route('/analyze', methods=['POST'])
def analyze_market_data():
    """
    Processes market data through Ollama AI model
    Returns trading signals and confidence scores
    """
    try:
        data = request.json
        prices = data.get('prices', [])
        volumes = data.get('volumes', [])
        
        # Prepare data for AI analysis
        market_context = {
            "prices": prices,
            "volumes": volumes,
            "timeframe": data.get('timeframe', '1h'),
            "symbol": data.get('symbol', 'UNKNOWN')
        }
        
        # Send to Ollama for analysis
        ollama_response = requests.post('http://localhost:11434/api/generate',
            json={
                "model": "llama2",
                "prompt": f"Analyze this market data and provide trading signals: {json.dumps(market_context)}",
                "stream": False
            }
        )
        
        if ollama_response.status_code == 200:
            ai_analysis = ollama_response.json()
            
            # Process AI response into trading signals
            signals = parse_ai_signals(ai_analysis['response'])
            
            return jsonify({
                "success": True,
                "signals": signals,
                "confidence": calculate_confidence(signals)
            })
        else:
            return jsonify({"success": False, "error": "AI analysis failed"})
            
    except Exception as e:
        return jsonify({"success": False, "error": str(e)})

def parse_ai_signals(ai_response):
    """
    Converts AI text response into structured trading signals
    """
    # Simple signal extraction (enhance based on your AI model)
    signals = {
        "direction": "neutral",
        "strength": 0.5,
        "stop_loss": 0,
        "take_profit": 0
    }
    
    # Basic keyword analysis
    if "bullish" in ai_response.lower() or "buy" in ai_response.lower():
        signals["direction"] = "bullish"
        signals["strength"] = 0.7
    elif "bearish" in ai_response.lower() or "sell" in ai_response.lower():
        signals["direction"] = "bearish"
        signals["strength"] = 0.7
    
    return signals

def calculate_confidence(signals):
    """
    Calculates confidence score based on signal strength
    """
    return min(signals["strength"] * 100, 100)

if __name__ == '__main__':
    app.run(debug=True, port=5000)

Pine Script Integration Code

Create a Pine Script indicator that communicates with your Flask bridge:

//@version=5
indicator("AI Enhanced RSI", shorttitle="AI-RSI", overlay=false)

// Input parameters
rsi_length = input.int(14, "RSI Length", minval=1)
ai_lookback = input.int(50, "AI Analysis Lookback", minval=10)

// Calculate traditional RSI
rsi = ta.rsi(close, rsi_length)

// Prepare data for AI analysis
var prices = array.new<float>()
var volumes = array.new<float>()

// Collect recent price and volume data
if barstate.isconfirmed
    array.push(prices, close)
    array.push(volumes, volume)
    
    // Maintain lookback window
    if array.size(prices) > ai_lookback
        array.shift(prices)
        array.shift(volumes)

// AI Analysis Function
ai_signal(price_data, volume_data) =>
    // Convert arrays to string format for HTTP request
    price_str = str.tostring(array.get(price_data, 0))
    for i = 1 to array.size(price_data) - 1
        price_str := price_str + "," + str.tostring(array.get(price_data, i))
    
    volume_str = str.tostring(array.get(volume_data, 0))
    for i = 1 to array.size(volume_data) - 1
        volume_str := volume_str + "," + str.tostring(array.get(volume_data, i))
    
    // Prepare request payload
    payload = '{"prices":[' + price_str + '],"volumes":[' + volume_str + '],"symbol":"' + syminfo.ticker + '","timeframe":"' + timeframe.period + '"}'
    
    // Send HTTP request to Flask bridge
    response = request.post("http://localhost:5000/analyze", 
        headers=map.new<string, string>(), 
        body=payload)
    
    // Parse response (simplified)
    signal_strength = 0.5
    if str.contains(response, "bullish")
        signal_strength := 0.8
    else if str.contains(response, "bearish")
        signal_strength := 0.2
    
    signal_strength

// Get AI-enhanced signal
ai_strength = ai_signal(prices, volumes)

// Combine traditional RSI with AI signals
enhanced_rsi = rsi * ai_strength

// Plot indicators
plot(rsi, "Traditional RSI", color.gray, linewidth=1)
plot(enhanced_rsi, "AI Enhanced RSI", color.blue, linewidth=2)

// Add signal zones
hline(70, "Overbought", color.red, linestyle=hline.style_dashed)
hline(30, "Oversold", color.green, linestyle=hline.style_dashed)
hline(50, "Midline", color.gray, linestyle=hline.style_dotted)

// Background highlighting for strong signals
bgcolor(enhanced_rsi > 70 and ai_strength > 0.7 ? color.new(color.red, 80) : na)
bgcolor(enhanced_rsi < 30 and ai_strength > 0.7 ? color.new(color.green, 80) : na)

Advanced AI Model Integration

Custom Model Training

Train a specialized model for your trading strategy:

# model_trainer.py
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import pickle

class TradingModelTrainer:
    def __init__(self):
        self.model = RandomForestClassifier(n_estimators=100, random_state=42)
        self.scaler = StandardScaler()
        
    def prepare_features(self, df):
        """
        Creates technical indicators as features
        """
        # Price-based features
        df['sma_10'] = df['close'].rolling(10).mean()
        df['sma_20'] = df['close'].rolling(20).mean()
        df['rsi'] = self.calculate_rsi(df['close'])
        df['macd'] = self.calculate_macd(df['close'])
        
        # Volume features
        df['volume_sma'] = df['volume'].rolling(10).mean()
        df['volume_ratio'] = df['volume'] / df['volume_sma']
        
        # Volatility features
        df['volatility'] = df['close'].rolling(20).std()
        
        # Price change features
        df['price_change'] = df['close'].pct_change()
        df['price_change_5'] = df['close'].pct_change(5)
        
        return df
    
    def calculate_rsi(self, prices, period=14):
        """RSI calculation"""
        delta = prices.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
        rs = gain / loss
        return 100 - (100 / (1 + rs))
    
    def calculate_macd(self, prices):
        """MACD calculation"""
        exp1 = prices.ewm(span=12).mean()
        exp2 = prices.ewm(span=26).mean()
        return exp1 - exp2
    
    def create_labels(self, df, forward_periods=5, threshold=0.02):
        """
        Creates trading labels based on future price movements
        """
        future_returns = df['close'].shift(-forward_periods) / df['close'] - 1
        
        labels = np.where(future_returns > threshold, 1,  # Buy signal
                 np.where(future_returns < -threshold, -1, 0))  # Sell signal, Hold
        
        return labels
    
    def train_model(self, data_file):
        """
        Trains the trading model
        """
        df = pd.read_csv(data_file)
        df = self.prepare_features(df)
        
        # Create labels
        labels = self.create_labels(df)
        
        # Select features
        feature_columns = ['sma_10', 'sma_20', 'rsi', 'macd', 'volume_ratio', 
                          'volatility', 'price_change', 'price_change_5']
        
        # Remove NaN values
        df = df.dropna()
        features = df[feature_columns]
        labels = labels[:len(features)]
        
        # Scale features
        features_scaled = self.scaler.fit_transform(features)
        
        # Train model
        self.model.fit(features_scaled, labels)
        
        # Save model and scaler
        with open('trading_model.pkl', 'wb') as f:
            pickle.dump(self.model, f)
        with open('scaler.pkl', 'wb') as f:
            pickle.dump(self.scaler, f)
        
        print("Model training completed successfully!")
        
    def predict_signal(self, features):
        """
        Predicts trading signal for given features
        """
        features_scaled = self.scaler.transform([features])
        prediction = self.model.predict(features_scaled)[0]
        probability = self.model.predict_proba(features_scaled)[0]
        
        return {
            'signal': int(prediction),
            'confidence': max(probability)
        }

# Usage example
if __name__ == "__main__":
    trainer = TradingModelTrainer()
    trainer.train_model('market_data.csv')

Enhanced Flask Bridge with Custom Model

Update your Flask application to use the trained model:

# enhanced_app.py
from flask import Flask, request, jsonify
import pickle
import numpy as np
import pandas as pd

app = Flask(__name__)

# Load trained model and scaler
with open('trading_model.pkl', 'rb') as f:
    model = pickle.load(f)
with open('scaler.pkl', 'rb') as f:
    scaler = pickle.load(f)

@app.route('/analyze_advanced', methods=['POST'])
def analyze_advanced():
    """
    Advanced analysis using trained ML model
    """
    try:
        data = request.json
        prices = data.get('prices', [])
        volumes = data.get('volumes', [])
        
        if len(prices) < 26:  # Minimum data for technical indicators
            return jsonify({"success": False, "error": "Insufficient data"})
        
        # Create DataFrame
        df = pd.DataFrame({
            'close': prices,
            'volume': volumes
        })
        
        # Calculate features
        features = calculate_features(df)
        
        # Get latest feature values
        latest_features = [
            features['sma_10'].iloc[-1],
            features['sma_20'].iloc[-1],
            features['rsi'].iloc[-1],
            features['macd'].iloc[-1],
            features['volume_ratio'].iloc[-1],
            features['volatility'].iloc[-1],
            features['price_change'].iloc[-1],
            features['price_change_5'].iloc[-1]
        ]
        
        # Make prediction
        features_scaled = scaler.transform([latest_features])
        prediction = model.predict(features_scaled)[0]
        probabilities = model.predict_proba(features_scaled)[0]
        
        # Convert to trading signals
        signal_map = {-1: "sell", 0: "hold", 1: "buy"}
        
        return jsonify({
            "success": True,
            "signal": signal_map[prediction],
            "confidence": float(max(probabilities)),
            "probabilities": {
                "sell": float(probabilities[0]),
                "hold": float(probabilities[1]),
                "buy": float(probabilities[2])
            }
        })
        
    except Exception as e:
        return jsonify({"success": False, "error": str(e)})

def calculate_features(df):
    """Calculate technical indicators"""
    df['sma_10'] = df['close'].rolling(10).mean()
    df['sma_20'] = df['close'].rolling(20).mean()
    df['rsi'] = calculate_rsi(df['close'])
    df['macd'] = calculate_macd(df['close'])
    df['volume_sma'] = df['volume'].rolling(10).mean()
    df['volume_ratio'] = df['volume'] / df['volume_sma']
    df['volatility'] = df['close'].rolling(20).std()
    df['price_change'] = df['close'].pct_change()
    df['price_change_5'] = df['close'].pct_change(5)
    return df

def calculate_rsi(prices, period=14):
    delta = prices.diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
    rs = gain / loss
    return 100 - (100 / (1 + rs))

def calculate_macd(prices):
    exp1 = prices.ewm(span=12).mean()
    exp2 = prices.ewm(span=26).mean()
    return exp1 - exp2

if __name__ == '__main__':
    app.run(debug=True, port=5000)

Real-Time Data Processing

WebSocket Integration

Implement real-time data streaming for live trading:

# websocket_handler.py
import asyncio
import websockets
import json
import pandas as pd
from datetime import datetime

class RealTimeProcessor:
    def __init__(self):
        self.clients = set()
        self.data_buffer = []
        self.model = None  # Load your trained model here
        
    async def register_client(self, websocket, path):
        """Register new WebSocket client"""
        self.clients.add(websocket)
        try:
            await websocket.wait_closed()
        finally:
            self.clients.remove(websocket)
    
    async def process_market_data(self, data):
        """Process incoming market data"""
        self.data_buffer.append(data)
        
        # Keep only last 100 data points
        if len(self.data_buffer) > 100:
            self.data_buffer.pop(0)
        
        # Process if we have enough data
        if len(self.data_buffer) >= 50:
            signal = await self.generate_signal()
            await self.broadcast_signal(signal)
    
    async def generate_signal(self):
        """Generate trading signal from buffered data"""
        df = pd.DataFrame(self.data_buffer)
        
        # Calculate features (reuse from previous functions)
        features = calculate_features(df)
        
        # Get prediction
        # ... prediction logic here ...
        
        return {
            'timestamp': datetime.now().isoformat(),
            'signal': 'buy',  # or 'sell', 'hold'
            'confidence': 0.85,
            'price': df['close'].iloc[-1]
        }
    
    async def broadcast_signal(self, signal):
        """Broadcast signal to all connected clients"""
        if self.clients:
            message = json.dumps(signal)
            await asyncio.gather(
                *[client.send(message) for client in self.clients],
                return_exceptions=True
            )

# Start WebSocket server
async def main():
    processor = RealTimeProcessor()
    
    start_server = websockets.serve(
        processor.register_client, 
        "localhost", 
        8765
    )
    
    await start_server

if __name__ == "__main__":
    asyncio.run(main())

Performance Optimization

Caching Strategies

Implement caching to reduce API calls and improve response times:

# cache_manager.py
import time
import json
from functools import wraps

class CacheManager:
    def __init__(self, ttl=300):  # 5 minutes default TTL
        self.cache = {}
        self.ttl = ttl
    
    def get_cache_key(self, data):
        """Generate cache key from data"""
        return hash(json.dumps(data, sort_keys=True))
    
    def get(self, key):
        """Get cached value if not expired"""
        if key in self.cache:
            value, timestamp = self.cache[key]
            if time.time() - timestamp < self.ttl:
                return value
            else:
                del self.cache[key]
        return None
    
    def set(self, key, value):
        """Set cache value with timestamp"""
        self.cache[key] = (value, time.time())
    
    def clear_expired(self):
        """Clear expired cache entries"""
        current_time = time.time()
        expired_keys = [
            key for key, (value, timestamp) in self.cache.items()
            if current_time - timestamp >= self.ttl
        ]
        for key in expired_keys:
            del self.cache[key]

# Cache decorator
def cached_analysis(cache_manager):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Create cache key from arguments
            cache_key = cache_manager.get_cache_key({
                'args': args,
                'kwargs': kwargs
            })
            
            # Check cache first
            cached_result = cache_manager.get(cache_key)
            if cached_result:
                return cached_result
            
            # Execute function and cache result
            result = func(*args, **kwargs)
            cache_manager.set(cache_key, result)
            
            return result
        return wrapper
    return decorator

Error Handling and Monitoring

Comprehensive Error Management

# error_handler.py
import logging
import traceback
from datetime import datetime
from functools import wraps

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('trading_integration.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

class TradingError(Exception):
    """Custom exception for trading-related errors"""
    pass

class ModelError(TradingError):
    """Model-specific errors"""
    pass

class DataError(TradingError):
    """Data-related errors"""
    pass

def error_handler(func):
    """Decorator for comprehensive error handling"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except DataError as e:
            logger.error(f"Data error in {func.__name__}: {str(e)}")
            return {"success": False, "error": "Data processing failed", "details": str(e)}
        except ModelError as e:
            logger.error(f"Model error in {func.__name__}: {str(e)}")
            return {"success": False, "error": "Model prediction failed", "details": str(e)}
        except Exception as e:
            logger.error(f"Unexpected error in {func.__name__}: {str(e)}")
            logger.error(traceback.format_exc())
            return {"success": False, "error": "Internal server error", "details": str(e)}
    return wrapper

class PerformanceMonitor:
    def __init__(self):
        self.metrics = {
            'requests_count': 0,
            'success_count': 0,
            'error_count': 0,
            'avg_response_time': 0,
            'last_reset': datetime.now()
        }
    
    def record_request(self, success=True, response_time=0):
        """Record request metrics"""
        self.metrics['requests_count'] += 1
        if success:
            self.metrics['success_count'] += 1
        else:
            self.metrics['error_count'] += 1
        
        # Update average response time
        current_avg = self.metrics['avg_response_time']
        self.metrics['avg_response_time'] = (
            (current_avg * (self.metrics['requests_count'] - 1) + response_time) /
            self.metrics['requests_count']
        )
    
    def get_metrics(self):
        """Get current performance metrics"""
        return {
            **self.metrics,
            'success_rate': self.metrics['success_count'] / max(self.metrics['requests_count'], 1) * 100
        }
    
    def reset_metrics(self):
        """Reset all metrics"""
        self.metrics = {
            'requests_count': 0,
            'success_count': 0,
            'error_count': 0,
            'avg_response_time': 0,
            'last_reset': datetime.now()
        }

Testing and Validation

Backtesting Framework

Create a backtesting system to validate your AI indicators:

# backtester.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

class AIIndicatorBacktester:
    def __init__(self, initial_capital=10000):
        self.initial_capital = initial_capital
        self.capital = initial_capital
        self.positions = []
        self.trades = []
        self.equity_curve = []
        
    def run_backtest(self, data, signals, transaction_cost=0.001):
        """
        Run backtest with AI-generated signals
        """
        position = 0
        entry_price = 0
        
        for i, row in data.iterrows():
            current_price = row['close']
            signal = signals.get(i, 'hold')
            
            # Calculate current equity
            current_equity = self.capital
            if position != 0:
                current_equity += position * (current_price - entry_price)
            
            self.equity_curve.append({
                'timestamp': row['timestamp'],
                'equity': current_equity,
                'price': current_price,
                'signal': signal
            })
            
            # Process signals
            if signal == 'buy' and position <= 0:
                if position < 0:  # Close short position
                    pnl = position * (entry_price - current_price)
                    self.capital += pnl - (abs(position) * current_price * transaction_cost)
                    self.record_trade('cover', current_price, position, pnl)
                
                # Open long position
                position = self.capital / current_price * 0.95  # 95% of capital
                entry_price = current_price
                self.capital -= position * current_price * (1 + transaction_cost)
                self.record_trade('buy', current_price, position, 0)
                
            elif signal == 'sell' and position >= 0:
                if position > 0:  # Close long position
                    pnl = position * (current_price - entry_price)
                    self.capital += pnl - (position * current_price * transaction_cost)
                    self.record_trade('sell', current_price, position, pnl)
                
                # Open short position
                position = -(self.capital / current_price * 0.95)
                entry_price = current_price
                self.capital -= abs(position) * current_price * (1 + transaction_cost)
                self.record_trade('short', current_price, position, 0)
        
        # Close final position
        if position != 0:
            final_price = data.iloc[-1]['close']
            pnl = position * (final_price - entry_price)
            self.capital += pnl - (abs(position) * final_price * transaction_cost)
            self.record_trade('close', final_price, position, pnl)
        
        return self.calculate_performance_metrics()
    
    def record_trade(self, action, price, quantity, pnl):
        """Record trade details"""
        self.trades.append({
            'action': action,
            'price': price,
            'quantity': quantity,
            'pnl': pnl,
            'timestamp': datetime.now()
        })
    
    def calculate_performance_metrics(self):
        """Calculate comprehensive performance metrics"""
        equity_df = pd.DataFrame(self.equity_curve)
        
        # Basic metrics
        total_return = (self.capital / self.initial_capital - 1) * 100
        
        # Calculate returns
        equity_df['returns'] = equity_df['equity'].pct_change()
        
        # Risk metrics
        sharpe_ratio = self.calculate_sharpe_ratio(equity_df['returns'])
        max_drawdown = self.calculate_max_drawdown(equity_df['equity'])
        
        # Trade statistics
        winning_trades = [t for t in self.trades if t['pnl'] > 0]
        losing_trades = [t for t in self.trades if t['pnl'] < 0]
        
        win_rate = len(winning_trades) / len(self.trades) * 100 if self.trades else 0
        
        return {
            'total_return': total_return,
            'sharpe_ratio': sharpe_ratio,
            'max_drawdown': max_drawdown,
            'win_rate': win_rate,
            'total_trades': len(self.trades),
            'winning_trades': len(winning_trades),
            'losing_trades': len(losing_trades),
            'final_capital': self.capital,
            'equity_curve': equity_df
        }
    
    def calculate_sharpe_ratio(self, returns, risk_free_rate=0.02):
        """Calculate Sharpe ratio"""
        if returns.std() == 0:
            return 0
        
        excess_returns = returns.mean() - risk_free_rate / 252
        return excess_returns / returns.std() * np.sqrt(252)
    
    def calculate_max_drawdown(self, equity):
        """Calculate maximum drawdown"""
        peak = equity.expanding().max()
        drawdown = (equity - peak) / peak
        return drawdown.min() * 100

Deployment and Production Considerations

Docker Configuration

Create a containerized deployment setup:

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Install Ollama
RUN curl -fsSL https://ollama.ai/install.sh | sh

# Copy requirements
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Expose ports
EXPOSE 5000 8765

# Start script
COPY start.sh .
RUN chmod +x start.sh

CMD ["./start.sh"]
# start.sh
#!/bin/bash

# Start Ollama service
ollama serve &

# Wait for Ollama to be ready
sleep 10

# Pull required models
ollama pull llama2

# Start Flask application
python enhanced_app.py &

# Start WebSocket server
python websocket_handler.py &

# Keep container running
wait

Production Monitoring

# monitoring.py
import psutil
import time
import json
from datetime import datetime

class SystemMonitor:
    def __init__(self):
        self.metrics_history = []
        
    def collect_metrics(self):
        """Collect system performance metrics"""
        metrics = {
            'timestamp': datetime.now().isoformat(),
            'cpu_percent': psutil.cpu_percent(interval=1),
            'memory_percent': psutil.virtual_memory().percent,
            'disk_usage': psutil.disk_usage('/').percent,
            'network_io': psutil.net_io_counters()._asdict(),
            'process_count': len(psutil.pids())
        }
        
        self.metrics_history.append(metrics)
        
        # Keep only last 1000 metrics (about 16 minutes at 1-second intervals)
        if len(self.metrics_history) > 1000:
            self.metrics_history.pop(0)
        
        return metrics
    
    def get_health_status(self):
        """Get current system health status"""
        if not self.metrics_history:
            return {'status': 'unknown', 'message': 'No metrics available'}
        
        latest = self.metrics_history[-1]
        
        # Define health thresholds
        if latest['cpu_percent'] > 80 or latest['memory_percent'] > 85:
            return {'status': 'critical', 'message': 'High resource usage detected'}
        elif latest['cpu_percent'] > 60 or latest['memory_percent'] > 70:
            return {'status': 'warning', 'message': 'Moderate resource usage'}
        else:
            return {'status': 'healthy', 'message': 'System running normally'}
    
    def generate_report(self):
        """Generate performance report"""
        if len(self.metrics_history) < 2:
            return {'error': 'Insufficient data for report'}
        
        recent_metrics = self.metrics_history[-60:]  # Last 60 measurements
        
        avg_cpu = sum(m['cpu_percent'] for m in recent_metrics) / len(recent_metrics)
        avg_memory = sum(m['memory_percent'] for m in recent_metrics) / len(recent_metrics)
        
        return {
            'average_cpu_usage': round(avg_cpu, 2),
            'average_memory_usage': round(avg_memory, 2),
            'current_disk_usage': recent_metrics[-1]['disk_usage'],
            'uptime_minutes': len(self.metrics_history),
            'health_status': self.get_health_status()
        }

Security Best Practices

API Security Implementation

# security.py
import jwt
import hashlib
import hmac
import time
from functools import wraps
from flask import request, jsonify

class SecurityManager:
    def __init__(self, secret_key):
        self.secret_key = secret_key
        self.rate_limits = {}
        
    def generate_api_key(self, user_id):
        """Generate API key for user"""
        payload = {
            'user_id': user_id,
            'created_at': time.time(),
            'expires_at': time.time() + 86400  # 24 hours
        }
        return jwt.encode(payload, self.secret_key, algorithm='HS256')
    
    def validate_api_key(self, token):
        """Validate API key"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
            if payload['expires_at'] < time.time():
                return False, "Token expired"
            return True, payload
        except jwt.InvalidTokenError:
            return False, "Invalid token"
    
    def rate_limit(self, limit=60, window=60):
        """Rate limiting decorator"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                client_ip = request.remote_addr
                current_time = time.time()
                
                # Clean old entries
                self.rate_limits = {
                    ip: requests for ip, requests in self.rate_limits.items()
                    if any(req_time > current_time - window for req_time in requests)
                }
                
                # Check rate limit
                if client_ip not in self.rate_limits:
                    self.rate_limits[client_ip] = []
                
                # Filter recent requests
                recent_requests = [
                    req_time for req_time in self.rate_limits[client_ip]
                    if req_time > current_time - window
                ]
                
                if len(recent_requests) >= limit:
                    return jsonify({
                        'error': 'Rate limit exceeded',
                        'retry_after': window
                    }), 429
                
                # Add current request
                self.rate_limits[client_ip].append(current_time)
                
                return func(*args, **kwargs)
            return wrapper
        return decorator
    
    def require_auth(self, func):
        """Authentication decorator"""
        @wraps(func)
        def wrapper(*args, **kwargs):
            auth_header = request.headers.get('Authorization')
            if not auth_header or not auth_header.startswith('Bearer '):
                return jsonify({'error': 'Missing or invalid authorization header'}), 401
            
            token = auth_header.split(' ')[1]
            valid, payload = self.validate_api_key(token)
            
            if not valid:
                return jsonify({'error': payload}), 401
            
            # Add user info to request context
            request.user = payload
            return func(*args, **kwargs)
        return wrapper
    
    def validate_request_signature(self, request_data, signature, timestamp):
        """Validate request signature for webhook security"""
        # Check timestamp (prevent replay attacks)
        if abs(time.time() - timestamp) > 300:  # 5 minutes
            return False
        
        # Create expected signature
        message = f"{timestamp}.{request_data}"
        expected_signature = hmac.new(
            self.secret_key.encode(),
            message.encode(),
            hashlib.sha256
        ).hexdigest()
        
        return hmac.compare_digest(signature, expected_signature)

Troubleshooting Common Issues

Connection Problems

# diagnostics.py
import requests
import socket
import subprocess
import json
from datetime import datetime

class DiagnosticTools:
    def __init__(self):
        self.test_results = []
    
    def test_ollama_connection(self):
        """Test Ollama API connectivity"""
        try:
            response = requests.get('http://localhost:11434/api/tags', timeout=5)
            if response.status_code == 200:
                models = response.json()
                return {
                    'status': 'success',
                    'message': 'Ollama connected successfully',
                    'available_models': [model['name'] for model in models.get('models', [])]
                }
            else:
                return {
                    'status': 'error',
                    'message': f'Ollama API returned status {response.status_code}'
                }
        except requests.exceptions.ConnectionError:
            return {
                'status': 'error',
                'message': 'Cannot connect to Ollama service. Is it running?'
            }
        except Exception as e:
            return {
                'status': 'error',
                'message': f'Unexpected error: {str(e)}'
            }
    
    def test_model_inference(self, model_name='llama2'):
        """Test model inference capability"""
        try:
            test_prompt = "Analyze this simple market data: price went from 100 to 105. What's the signal?"
            
            response = requests.post('http://localhost:11434/api/generate', json={
                'model': model_name,
                'prompt': test_prompt,
                'stream': False
            }, timeout=30)
            
            if response.status_code == 200:
                result = response.json()
                return {
                    'status': 'success',
                    'message': 'Model inference working',
                    'response_length': len(result.get('response', '')),
                    'sample_response': result.get('response', '')[:100] + '...'
                }
            else:
                return {
                    'status': 'error',
                    'message': f'Model inference failed with status {response.status_code}'
                }
        except Exception as e:
            return {
                'status': 'error',
                'message': f'Model inference error: {str(e)}'
            }
    
    def test_port_availability(self, ports=[5000, 8765, 11434]):
        """Test if required ports are available"""
        results = {}
        for port in ports:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            result = sock.connect_ex(('localhost', port))
            sock.close()
            
            results[port] = {
                'status': 'open' if result == 0 else 'closed',
                'service': self.get_service_name(port)
            }
        
        return results
    
    def get_service_name(self, port):
        """Get service name for port"""
        service_map = {
            5000: 'Flask API',
            8765: 'WebSocket Server',
            11434: 'Ollama API'
        }
        return service_map.get(port, 'Unknown')
    
    def run_full_diagnostic(self):
        """Run comprehensive diagnostic check"""
        results = {
            'timestamp': datetime.now().isoformat(),
            'tests': {}
        }
        
        # Test Ollama connection
        results['tests']['ollama_connection'] = self.test_ollama_connection()
        
        # Test model inference if Ollama is connected
        if results['tests']['ollama_connection']['status'] == 'success':
            results['tests']['model_inference'] = self.test_model_inference()
        
        # Test port availability
        results['tests']['port_availability'] = self.test_port_availability()
        
        # System resource check
        results['tests']['system_resources'] = self.check_system_resources()
        
        return results
    
    def check_system_resources(self):
        """Check system resource availability"""
        try:
            import psutil
            
            cpu_percent = psutil.cpu_percent(interval=1)
            memory = psutil.virtual_memory()
            disk = psutil.disk_usage('/')
            
            return {
                'status': 'success',
                'cpu_usage': cpu_percent,
                'memory_usage': memory.percent,
                'memory_available': memory.available / (1024**3),  # GB
                'disk_usage': disk.percent,
                'disk_free': disk.free / (1024**3)  # GB
            }
        except Exception as e:
            return {
                'status': 'error',
                'message': f'Resource check failed: {str(e)}'
            }

Advanced Configuration Options

Environment Configuration

# config.py
import os
from dataclasses import dataclass
from typing import Optional

@dataclass
class OllamaConfig:
    """Ollama service configuration"""
    host: str = "localhost"
    port: int = 11434
    model: str = "llama2"
    timeout: int = 30
    max_tokens: int = 2048
    temperature: float = 0.7
    
    @classmethod
    def from_env(cls):
        return cls(
            host=os.getenv('OLLAMA_HOST', 'localhost'),
            port=int(os.getenv('OLLAMA_PORT', '11434')),
            model=os.getenv('OLLAMA_MODEL', 'llama2'),
            timeout=int(os.getenv('OLLAMA_TIMEOUT', '30')),
            max_tokens=int(os.getenv('OLLAMA_MAX_TOKENS', '2048')),
            temperature=float(os.getenv('OLLAMA_TEMPERATURE', '0.7'))
        )

@dataclass
class TradingConfig:
    """Trading-specific configuration"""
    risk_per_trade: float = 0.02  # 2% risk per trade
    max_positions: int = 5
    stop_loss_pct: float = 0.05  # 5% stop loss
    take_profit_pct: float = 0.10  # 10% take profit
    min_confidence: float = 0.7  # Minimum AI confidence for signals
    
    @classmethod
    def from_env(cls):
        return cls(
            risk_per_trade=float(os.getenv('RISK_PER_TRADE', '0.02')),
            max_positions=int(os.getenv('MAX_POSITIONS', '5')),
            stop_loss_pct=float(os.getenv('STOP_LOSS_PCT', '0.05')),
            take_profit_pct=float(os.getenv('TAKE_PROFIT_PCT', '0.10')),
            min_confidence=float(os.getenv('MIN_CONFIDENCE', '0.7'))
        )

@dataclass
class AppConfig:
    """Main application configuration"""
    debug: bool = False
    secret_key: str = "your-secret-key-here"
    database_url: str = "sqlite:///trading.db"
    redis_url: str = "redis://localhost:6379"
    
    ollama: OllamaConfig = OllamaConfig()
    trading: TradingConfig = TradingConfig()
    
    @classmethod
    def from_env(cls):
        return cls(
            debug=os.getenv('DEBUG', 'false').lower() == 'true',
            secret_key=os.getenv('SECRET_KEY', 'your-secret-key-here'),
            database_url=os.getenv('DATABASE_URL', 'sqlite:///trading.db'),
            redis_url=os.getenv('REDIS_URL', 'redis://localhost:6379'),
            ollama=OllamaConfig.from_env(),
            trading=TradingConfig.from_env()
        )

Performance Optimization Strategies

Model Response Caching

# model_cache.py
import redis
import json
import hashlib
from datetime import timedelta

class ModelCache:
    def __init__(self, redis_url='redis://localhost:6379'):
        self.redis_client = redis.from_url(redis_url)
        self.default_ttl = 300  # 5 minutes
    
    def generate_key(self, model_name, prompt, parameters=None):
        """Generate cache key for model request"""
        key_data = {
            'model': model_name,
            'prompt': prompt,
            'parameters': parameters or {}
        }
        key_string = json.dumps(key_data, sort_keys=True)
        return f"model_cache:{hashlib.md5(key_string.encode()).hexdigest()}"
    
    def get(self, model_name, prompt, parameters=None):
        """Get cached model response"""
        key = self.generate_key(model_name, prompt, parameters)
        cached_data = self.redis_client.get(key)
        
        if cached_data:
            return json.loads(cached_data)
        return None
    
    def set(self, model_name, prompt, response, parameters=None, ttl=None):
        """Cache model response"""
        key = self.generate_key(model_name, prompt, parameters)
        cache_data = {
            'response': response,
            'timestamp': time.time(),
            'parameters': parameters
        }
        
        self.redis_client.setex(
            key,
            ttl or self.default_ttl,
            json.dumps(cache_data)
        )
    
    def invalidate_pattern(self, pattern):
        """Invalidate cache entries matching pattern"""
        keys = self.redis_client.keys(f"model_cache:{pattern}*")
        if keys:
            self.redis_client.delete(*keys)

Async Processing with Celery

# tasks.py
from celery import Celery
import pandas as pd
from datetime import datetime

# Initialize Celery
celery_app = Celery('trading_tasks', broker='redis://localhost:6379')

@celery_app.task
def process_historical_data(symbol, start_date, end_date):
    """Process historical data asynchronously"""
    try:
        # Fetch historical data (implement your data source)
        data = fetch_market_data(symbol, start_date, end_date)
        
        # Process with AI model
        signals = []
        for i in range(len(data)):
            if i >= 50:  # Minimum data for analysis
                window_data = data.iloc[i-50:i]
                signal = analyze_with_ai(window_data)
                signals.append({
                    'timestamp': data.iloc[i]['timestamp'],
                    'signal': signal['signal'],
                    'confidence': signal['confidence']
                })
        
        # Store results
        store_signals(symbol, signals)
        
        return {
            'status': 'success',
            'symbol': symbol,
            'signals_generated': len(signals),
            'processed_at': datetime.now().isoformat()
        }
        
    except Exception as e:
        return {
            'status': 'error',
            'error': str(e),
            'symbol': symbol
        }

@celery_app.task
def retrain_model(training_data_path):
    """Retrain AI model with new data"""
    try:
        # Load training data
        df = pd.read_csv(training_data_path)
        
        # Retrain model (implement your training logic)
        model_metrics = train_model(df)
        
        # Deploy new model
        deploy_model()
        
        return {
            'status': 'success',
            'metrics': model_metrics,
            'retrained_at': datetime.now().isoformat()
        }
        
    except Exception as e:
        return {
            'status': 'error',
            'error': str(e)
        }

Binance API Integration

# binance_integration.py
import ccxt
import pandas as pd
from datetime import datetime, timedelta

class BinanceIntegration:
    def __init__(self, api_key=None, api_secret=None, sandbox=True):
        self.exchange = ccxt.binance({
            'apiKey': api_key,
            'secret': api_secret,
            'sandbox': sandbox,
            'enableRateLimit': True,
        })
    
    def fetch_ohlcv(self, symbol, timeframe='1h', limit=100):
        """Fetch OHLCV data from Binance"""
        try:
            ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
            df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
            df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
            return df
        except Exception as e:
            print(f"Error fetching OHLCV data: {e}")
            return None
    
    def place_order(self, symbol, side, amount, price=None, order_type='market'):
        """Place order on Binance"""
        try:
            if order_type == 'market':
                order = self.exchange.create_market_order(symbol, side, amount)
            else:
                order = self.exchange.create_limit_order(symbol, side, amount, price)
            
            return {
                'success': True,
                'order_id': order['id'],
                'symbol': symbol,
                'side': side,
                'amount': amount,
                'price': price,
                'timestamp': datetime.now().isoformat()
            }
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }
    
    def get_account_balance(self):
        """Get account balance"""
        try:
            balance = self.exchange.fetch_balance()
            return {
                'success': True,
                'balance': balance['total'],
                'free': balance['free'],
                'used': balance['used']
            }
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }

Best Practices and Recommendations

Code Organization

Structure your project for maintainability:

tradingview-ollama/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── models/
│   │   ├── __init__.py
│   │   ├── ai_models.py
│   │   └── trading_models.py
│   ├── services/
│   │   ├── __init__.py
│   │   ├── ollama_service.py
│   │   ├── trading_service.py
│   │   └── market_data_service.py
│   ├── utils/
│   │   ├── __init__.py
│   │   ├── cache.py
│   │   ├── security.py
│   │   └── diagnostics.py
│   └── config/
│       ├── __init__.py
│       ├── settings.py
│       └── logging.py
├── tests/
│   ├── __init__.py
│   ├── test_models.py
│   ├── test_services.py
│   └── test_integration.py
├── scripts/
│   ├── deploy.sh
│   ├── backup.sh
│   └── maintenance.py
├── docker/
│   ├── Dockerfile
│   ├── docker-compose.yml
│   └── requirements.txt
└── docs/
    ├── API.md
    ├── DEPLOYMENT.md
    └── TROUBLESHOOTING.md

Testing Strategy

# test_integration.py
import unittest
import requests
import json
from unittest.mock import patch, MagicMock

class TestTradingViewOllamaIntegration(unittest.TestCase):
    def setUp(self):
        self.base_url = "http://localhost:5000"
        self.test_data = {
            'prices': [100, 101, 102, 103, 104, 105],
            'volumes': [1000, 1100, 1200, 1300, 1400, 1500],
            'symbol': 'BTCUSD',
            'timeframe': '1h'
        }
    
    def test_api_health_check(self):
        """Test API health endpoint"""
        response = requests.get(f"{self.base_url}/health")
        self.assertEqual(response.status_code, 200)
        
        data = response.json()
        self.assertIn('status', data)
        self.assertEqual(data['status'], 'healthy')
    
    @patch('requests.post')
    def test_ollama_integration(self, mock_post):
        """Test Ollama API integration"""
        # Mock Ollama response
        mock_response = MagicMock()
        mock_response.status_code = 200
        mock_response.json.return_value = {
            'response': 'The market data shows bullish momentum with increasing prices and volume.'
        }
        mock_post.return_value = mock_response
        
        # Test analysis endpoint
        response = requests.post(
            f"{self.base_url}/analyze",
            json=self.test_data
        )
        
        self.assertEqual(response.status_code, 200)
        data = response.json()
        self.assertTrue(data['success'])
        self.assertIn('signals', data)
    
    def test_invalid_data_handling(self):
        """Test handling of invalid input data"""
        invalid_data = {'prices': []}  # Empty prices array
        
        response = requests.post(
            f"{self.base_url}/analyze",
            json=invalid_data
        )
        
        self.assertEqual(response.status_code, 200)
        data = response.json()
        self.assertFalse(data['success'])
        self.assertIn('error', data)
    
    def test_rate_limiting(self):
        """Test rate limiting functionality"""
        # Make multiple rapid requests
        responses = []
        for i in range(65):  # Exceed rate limit
            response = requests.post(
                f"{self.base_url}/analyze",
                json=self.test_data
            )
            responses.append(response)
        
        # Check that some requests were rate limited
        rate_limited = [r for r in responses if r.status_code == 429]
        self.assertTrue(len(rate_limited) > 0)

if __name__ == '__main__':
    unittest.main()

Conclusion

Integrating TradingView with Ollama creates powerful possibilities for AI-enhanced trading indicators. This combination provides local AI processing, maintaining data privacy while delivering sophisticated market analysis capabilities.

Key benefits of this integration include:

Enhanced Decision Making: AI models can identify complex patterns that traditional indicators miss, improving signal accuracy and timing.

Privacy and Security: Local processing through Ollama keeps sensitive trading data on your systems, avoiding cloud-based AI services.

Customization Flexibility: You can train models specifically for your trading strategy, market conditions, and risk tolerance.

Real-time Processing: The integration supports live market Data Analysis, enabling responsive trading decisions.

Scalable Architecture: The modular design allows for easy expansion and modification as your trading needs evolve.

Remember to thoroughly backtest any AI-enhanced indicators before live trading. Start with paper trading to validate performance and gradually increase position sizes as confidence builds.

This integration represents the future of algorithmic trading—where artificial intelligence enhances human decision-making rather than replacing it. The combination of TradingView's visualization capabilities with Ollama's AI processing creates a powerful toolkit for modern traders.

For ongoing development, consider implementing continuous learning systems that adapt to changing market conditions, and always maintain proper risk management regardless of AI confidence levels.

The trading landscape continues evolving, and tools like these help traders stay competitive while maintaining control over their data and strategies.