Ever stared at a chart wondering if your trading indicator could be smarter? What if your moving averages could learn from market patterns, or your RSI could adapt to volatility changes? Welcome to the world where artificial intelligence meets technical analysis.
Combining TradingView's powerful charting platform with Ollama's local AI capabilities opens new doors for custom indicator development. This integration lets you create indicators that don't just calculate—they learn, adapt, and evolve with market conditions.
What You'll Learn
This guide shows you how to build AI-enhanced trading indicators using TradingView's Pine Script and Ollama's machine learning models. You'll create indicators that process market data through AI models for better signal generation and pattern recognition.
Understanding TradingView and Ollama Integration
TradingView Platform Overview
TradingView serves as your charting foundation with Pine Script, its proprietary programming language. Pine Script handles data visualization, user interactions, and basic calculations. However, it lacks advanced machine learning capabilities—where Ollama fills the gap.
Ollama's Role in Trading Analysis
Ollama provides local AI model execution without cloud dependencies. This setup ensures data privacy and reduces latency for real-time trading decisions. The integration works through HTTP requests from Pine Script to your local Ollama instance.
Integration Architecture
The system architecture follows this flow:
- TradingView collects market data
- Pine Script processes initial calculations
- Data sends to Ollama via HTTP requests
- Ollama processes data through AI models
- Results return to TradingView for visualization
Prerequisites and Setup Requirements
System Requirements
Your development environment needs:
- Windows 10/11, macOS 10.14+, or Linux Ubuntu 18.04+
- 8GB RAM minimum (16GB recommended)
- 10GB free disk space
- Stable internet connection
- Python 3.8 or higher
Installing Ollama
Download and install Ollama from the official website:
# Linux/macOS
curl -fsSL https://ollama.ai/install.sh | sh
# Windows
# Download installer from https://ollama.ai/download
Verify installation:
ollama --version
Setting Up Development Environment
Create a project directory and install required dependencies:
mkdir tradingview-ollama
cd tradingview-ollama
pip install requests flask numpy pandas
Creating Your First AI-Enhanced Indicator
Basic HTTP Bridge Setup
Create a Flask application to handle requests between TradingView and Ollama:
# app.py
from flask import Flask, request, jsonify
import requests
import json
import numpy as np
app = Flask(__name__)
@app.route('/analyze', methods=['POST'])
def analyze_market_data():
"""
Processes market data through Ollama AI model
Returns trading signals and confidence scores
"""
try:
data = request.json
prices = data.get('prices', [])
volumes = data.get('volumes', [])
# Prepare data for AI analysis
market_context = {
"prices": prices,
"volumes": volumes,
"timeframe": data.get('timeframe', '1h'),
"symbol": data.get('symbol', 'UNKNOWN')
}
# Send to Ollama for analysis
ollama_response = requests.post('http://localhost:11434/api/generate',
json={
"model": "llama2",
"prompt": f"Analyze this market data and provide trading signals: {json.dumps(market_context)}",
"stream": False
}
)
if ollama_response.status_code == 200:
ai_analysis = ollama_response.json()
# Process AI response into trading signals
signals = parse_ai_signals(ai_analysis['response'])
return jsonify({
"success": True,
"signals": signals,
"confidence": calculate_confidence(signals)
})
else:
return jsonify({"success": False, "error": "AI analysis failed"})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
def parse_ai_signals(ai_response):
"""
Converts AI text response into structured trading signals
"""
# Simple signal extraction (enhance based on your AI model)
signals = {
"direction": "neutral",
"strength": 0.5,
"stop_loss": 0,
"take_profit": 0
}
# Basic keyword analysis
if "bullish" in ai_response.lower() or "buy" in ai_response.lower():
signals["direction"] = "bullish"
signals["strength"] = 0.7
elif "bearish" in ai_response.lower() or "sell" in ai_response.lower():
signals["direction"] = "bearish"
signals["strength"] = 0.7
return signals
def calculate_confidence(signals):
"""
Calculates confidence score based on signal strength
"""
return min(signals["strength"] * 100, 100)
if __name__ == '__main__':
app.run(debug=True, port=5000)
Pine Script Integration Code
Create a Pine Script indicator that communicates with your Flask bridge:
//@version=5
indicator("AI Enhanced RSI", shorttitle="AI-RSI", overlay=false)
// Input parameters
rsi_length = input.int(14, "RSI Length", minval=1)
ai_lookback = input.int(50, "AI Analysis Lookback", minval=10)
// Calculate traditional RSI
rsi = ta.rsi(close, rsi_length)
// Prepare data for AI analysis
var prices = array.new<float>()
var volumes = array.new<float>()
// Collect recent price and volume data
if barstate.isconfirmed
array.push(prices, close)
array.push(volumes, volume)
// Maintain lookback window
if array.size(prices) > ai_lookback
array.shift(prices)
array.shift(volumes)
// AI Analysis Function
ai_signal(price_data, volume_data) =>
// Convert arrays to string format for HTTP request
price_str = str.tostring(array.get(price_data, 0))
for i = 1 to array.size(price_data) - 1
price_str := price_str + "," + str.tostring(array.get(price_data, i))
volume_str = str.tostring(array.get(volume_data, 0))
for i = 1 to array.size(volume_data) - 1
volume_str := volume_str + "," + str.tostring(array.get(volume_data, i))
// Prepare request payload
payload = '{"prices":[' + price_str + '],"volumes":[' + volume_str + '],"symbol":"' + syminfo.ticker + '","timeframe":"' + timeframe.period + '"}'
// Send HTTP request to Flask bridge
response = request.post("http://localhost:5000/analyze",
headers=map.new<string, string>(),
body=payload)
// Parse response (simplified)
signal_strength = 0.5
if str.contains(response, "bullish")
signal_strength := 0.8
else if str.contains(response, "bearish")
signal_strength := 0.2
signal_strength
// Get AI-enhanced signal
ai_strength = ai_signal(prices, volumes)
// Combine traditional RSI with AI signals
enhanced_rsi = rsi * ai_strength
// Plot indicators
plot(rsi, "Traditional RSI", color.gray, linewidth=1)
plot(enhanced_rsi, "AI Enhanced RSI", color.blue, linewidth=2)
// Add signal zones
hline(70, "Overbought", color.red, linestyle=hline.style_dashed)
hline(30, "Oversold", color.green, linestyle=hline.style_dashed)
hline(50, "Midline", color.gray, linestyle=hline.style_dotted)
// Background highlighting for strong signals
bgcolor(enhanced_rsi > 70 and ai_strength > 0.7 ? color.new(color.red, 80) : na)
bgcolor(enhanced_rsi < 30 and ai_strength > 0.7 ? color.new(color.green, 80) : na)
Advanced AI Model Integration
Custom Model Training
Train a specialized model for your trading strategy:
# model_trainer.py
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import pickle
class TradingModelTrainer:
def __init__(self):
self.model = RandomForestClassifier(n_estimators=100, random_state=42)
self.scaler = StandardScaler()
def prepare_features(self, df):
"""
Creates technical indicators as features
"""
# Price-based features
df['sma_10'] = df['close'].rolling(10).mean()
df['sma_20'] = df['close'].rolling(20).mean()
df['rsi'] = self.calculate_rsi(df['close'])
df['macd'] = self.calculate_macd(df['close'])
# Volume features
df['volume_sma'] = df['volume'].rolling(10).mean()
df['volume_ratio'] = df['volume'] / df['volume_sma']
# Volatility features
df['volatility'] = df['close'].rolling(20).std()
# Price change features
df['price_change'] = df['close'].pct_change()
df['price_change_5'] = df['close'].pct_change(5)
return df
def calculate_rsi(self, prices, period=14):
"""RSI calculation"""
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
return 100 - (100 / (1 + rs))
def calculate_macd(self, prices):
"""MACD calculation"""
exp1 = prices.ewm(span=12).mean()
exp2 = prices.ewm(span=26).mean()
return exp1 - exp2
def create_labels(self, df, forward_periods=5, threshold=0.02):
"""
Creates trading labels based on future price movements
"""
future_returns = df['close'].shift(-forward_periods) / df['close'] - 1
labels = np.where(future_returns > threshold, 1, # Buy signal
np.where(future_returns < -threshold, -1, 0)) # Sell signal, Hold
return labels
def train_model(self, data_file):
"""
Trains the trading model
"""
df = pd.read_csv(data_file)
df = self.prepare_features(df)
# Create labels
labels = self.create_labels(df)
# Select features
feature_columns = ['sma_10', 'sma_20', 'rsi', 'macd', 'volume_ratio',
'volatility', 'price_change', 'price_change_5']
# Remove NaN values
df = df.dropna()
features = df[feature_columns]
labels = labels[:len(features)]
# Scale features
features_scaled = self.scaler.fit_transform(features)
# Train model
self.model.fit(features_scaled, labels)
# Save model and scaler
with open('trading_model.pkl', 'wb') as f:
pickle.dump(self.model, f)
with open('scaler.pkl', 'wb') as f:
pickle.dump(self.scaler, f)
print("Model training completed successfully!")
def predict_signal(self, features):
"""
Predicts trading signal for given features
"""
features_scaled = self.scaler.transform([features])
prediction = self.model.predict(features_scaled)[0]
probability = self.model.predict_proba(features_scaled)[0]
return {
'signal': int(prediction),
'confidence': max(probability)
}
# Usage example
if __name__ == "__main__":
trainer = TradingModelTrainer()
trainer.train_model('market_data.csv')
Enhanced Flask Bridge with Custom Model
Update your Flask application to use the trained model:
# enhanced_app.py
from flask import Flask, request, jsonify
import pickle
import numpy as np
import pandas as pd
app = Flask(__name__)
# Load trained model and scaler
with open('trading_model.pkl', 'rb') as f:
model = pickle.load(f)
with open('scaler.pkl', 'rb') as f:
scaler = pickle.load(f)
@app.route('/analyze_advanced', methods=['POST'])
def analyze_advanced():
"""
Advanced analysis using trained ML model
"""
try:
data = request.json
prices = data.get('prices', [])
volumes = data.get('volumes', [])
if len(prices) < 26: # Minimum data for technical indicators
return jsonify({"success": False, "error": "Insufficient data"})
# Create DataFrame
df = pd.DataFrame({
'close': prices,
'volume': volumes
})
# Calculate features
features = calculate_features(df)
# Get latest feature values
latest_features = [
features['sma_10'].iloc[-1],
features['sma_20'].iloc[-1],
features['rsi'].iloc[-1],
features['macd'].iloc[-1],
features['volume_ratio'].iloc[-1],
features['volatility'].iloc[-1],
features['price_change'].iloc[-1],
features['price_change_5'].iloc[-1]
]
# Make prediction
features_scaled = scaler.transform([latest_features])
prediction = model.predict(features_scaled)[0]
probabilities = model.predict_proba(features_scaled)[0]
# Convert to trading signals
signal_map = {-1: "sell", 0: "hold", 1: "buy"}
return jsonify({
"success": True,
"signal": signal_map[prediction],
"confidence": float(max(probabilities)),
"probabilities": {
"sell": float(probabilities[0]),
"hold": float(probabilities[1]),
"buy": float(probabilities[2])
}
})
except Exception as e:
return jsonify({"success": False, "error": str(e)})
def calculate_features(df):
"""Calculate technical indicators"""
df['sma_10'] = df['close'].rolling(10).mean()
df['sma_20'] = df['close'].rolling(20).mean()
df['rsi'] = calculate_rsi(df['close'])
df['macd'] = calculate_macd(df['close'])
df['volume_sma'] = df['volume'].rolling(10).mean()
df['volume_ratio'] = df['volume'] / df['volume_sma']
df['volatility'] = df['close'].rolling(20).std()
df['price_change'] = df['close'].pct_change()
df['price_change_5'] = df['close'].pct_change(5)
return df
def calculate_rsi(prices, period=14):
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
return 100 - (100 / (1 + rs))
def calculate_macd(prices):
exp1 = prices.ewm(span=12).mean()
exp2 = prices.ewm(span=26).mean()
return exp1 - exp2
if __name__ == '__main__':
app.run(debug=True, port=5000)
Real-Time Data Processing
WebSocket Integration
Implement real-time data streaming for live trading:
# websocket_handler.py
import asyncio
import websockets
import json
import pandas as pd
from datetime import datetime
class RealTimeProcessor:
def __init__(self):
self.clients = set()
self.data_buffer = []
self.model = None # Load your trained model here
async def register_client(self, websocket, path):
"""Register new WebSocket client"""
self.clients.add(websocket)
try:
await websocket.wait_closed()
finally:
self.clients.remove(websocket)
async def process_market_data(self, data):
"""Process incoming market data"""
self.data_buffer.append(data)
# Keep only last 100 data points
if len(self.data_buffer) > 100:
self.data_buffer.pop(0)
# Process if we have enough data
if len(self.data_buffer) >= 50:
signal = await self.generate_signal()
await self.broadcast_signal(signal)
async def generate_signal(self):
"""Generate trading signal from buffered data"""
df = pd.DataFrame(self.data_buffer)
# Calculate features (reuse from previous functions)
features = calculate_features(df)
# Get prediction
# ... prediction logic here ...
return {
'timestamp': datetime.now().isoformat(),
'signal': 'buy', # or 'sell', 'hold'
'confidence': 0.85,
'price': df['close'].iloc[-1]
}
async def broadcast_signal(self, signal):
"""Broadcast signal to all connected clients"""
if self.clients:
message = json.dumps(signal)
await asyncio.gather(
*[client.send(message) for client in self.clients],
return_exceptions=True
)
# Start WebSocket server
async def main():
processor = RealTimeProcessor()
start_server = websockets.serve(
processor.register_client,
"localhost",
8765
)
await start_server
if __name__ == "__main__":
asyncio.run(main())
Performance Optimization
Caching Strategies
Implement caching to reduce API calls and improve response times:
# cache_manager.py
import time
import json
from functools import wraps
class CacheManager:
def __init__(self, ttl=300): # 5 minutes default TTL
self.cache = {}
self.ttl = ttl
def get_cache_key(self, data):
"""Generate cache key from data"""
return hash(json.dumps(data, sort_keys=True))
def get(self, key):
"""Get cached value if not expired"""
if key in self.cache:
value, timestamp = self.cache[key]
if time.time() - timestamp < self.ttl:
return value
else:
del self.cache[key]
return None
def set(self, key, value):
"""Set cache value with timestamp"""
self.cache[key] = (value, time.time())
def clear_expired(self):
"""Clear expired cache entries"""
current_time = time.time()
expired_keys = [
key for key, (value, timestamp) in self.cache.items()
if current_time - timestamp >= self.ttl
]
for key in expired_keys:
del self.cache[key]
# Cache decorator
def cached_analysis(cache_manager):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Create cache key from arguments
cache_key = cache_manager.get_cache_key({
'args': args,
'kwargs': kwargs
})
# Check cache first
cached_result = cache_manager.get(cache_key)
if cached_result:
return cached_result
# Execute function and cache result
result = func(*args, **kwargs)
cache_manager.set(cache_key, result)
return result
return wrapper
return decorator
Error Handling and Monitoring
Comprehensive Error Management
# error_handler.py
import logging
import traceback
from datetime import datetime
from functools import wraps
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('trading_integration.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class TradingError(Exception):
"""Custom exception for trading-related errors"""
pass
class ModelError(TradingError):
"""Model-specific errors"""
pass
class DataError(TradingError):
"""Data-related errors"""
pass
def error_handler(func):
"""Decorator for comprehensive error handling"""
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except DataError as e:
logger.error(f"Data error in {func.__name__}: {str(e)}")
return {"success": False, "error": "Data processing failed", "details": str(e)}
except ModelError as e:
logger.error(f"Model error in {func.__name__}: {str(e)}")
return {"success": False, "error": "Model prediction failed", "details": str(e)}
except Exception as e:
logger.error(f"Unexpected error in {func.__name__}: {str(e)}")
logger.error(traceback.format_exc())
return {"success": False, "error": "Internal server error", "details": str(e)}
return wrapper
class PerformanceMonitor:
def __init__(self):
self.metrics = {
'requests_count': 0,
'success_count': 0,
'error_count': 0,
'avg_response_time': 0,
'last_reset': datetime.now()
}
def record_request(self, success=True, response_time=0):
"""Record request metrics"""
self.metrics['requests_count'] += 1
if success:
self.metrics['success_count'] += 1
else:
self.metrics['error_count'] += 1
# Update average response time
current_avg = self.metrics['avg_response_time']
self.metrics['avg_response_time'] = (
(current_avg * (self.metrics['requests_count'] - 1) + response_time) /
self.metrics['requests_count']
)
def get_metrics(self):
"""Get current performance metrics"""
return {
**self.metrics,
'success_rate': self.metrics['success_count'] / max(self.metrics['requests_count'], 1) * 100
}
def reset_metrics(self):
"""Reset all metrics"""
self.metrics = {
'requests_count': 0,
'success_count': 0,
'error_count': 0,
'avg_response_time': 0,
'last_reset': datetime.now()
}
Testing and Validation
Backtesting Framework
Create a backtesting system to validate your AI indicators:
# backtester.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
class AIIndicatorBacktester:
def __init__(self, initial_capital=10000):
self.initial_capital = initial_capital
self.capital = initial_capital
self.positions = []
self.trades = []
self.equity_curve = []
def run_backtest(self, data, signals, transaction_cost=0.001):
"""
Run backtest with AI-generated signals
"""
position = 0
entry_price = 0
for i, row in data.iterrows():
current_price = row['close']
signal = signals.get(i, 'hold')
# Calculate current equity
current_equity = self.capital
if position != 0:
current_equity += position * (current_price - entry_price)
self.equity_curve.append({
'timestamp': row['timestamp'],
'equity': current_equity,
'price': current_price,
'signal': signal
})
# Process signals
if signal == 'buy' and position <= 0:
if position < 0: # Close short position
pnl = position * (entry_price - current_price)
self.capital += pnl - (abs(position) * current_price * transaction_cost)
self.record_trade('cover', current_price, position, pnl)
# Open long position
position = self.capital / current_price * 0.95 # 95% of capital
entry_price = current_price
self.capital -= position * current_price * (1 + transaction_cost)
self.record_trade('buy', current_price, position, 0)
elif signal == 'sell' and position >= 0:
if position > 0: # Close long position
pnl = position * (current_price - entry_price)
self.capital += pnl - (position * current_price * transaction_cost)
self.record_trade('sell', current_price, position, pnl)
# Open short position
position = -(self.capital / current_price * 0.95)
entry_price = current_price
self.capital -= abs(position) * current_price * (1 + transaction_cost)
self.record_trade('short', current_price, position, 0)
# Close final position
if position != 0:
final_price = data.iloc[-1]['close']
pnl = position * (final_price - entry_price)
self.capital += pnl - (abs(position) * final_price * transaction_cost)
self.record_trade('close', final_price, position, pnl)
return self.calculate_performance_metrics()
def record_trade(self, action, price, quantity, pnl):
"""Record trade details"""
self.trades.append({
'action': action,
'price': price,
'quantity': quantity,
'pnl': pnl,
'timestamp': datetime.now()
})
def calculate_performance_metrics(self):
"""Calculate comprehensive performance metrics"""
equity_df = pd.DataFrame(self.equity_curve)
# Basic metrics
total_return = (self.capital / self.initial_capital - 1) * 100
# Calculate returns
equity_df['returns'] = equity_df['equity'].pct_change()
# Risk metrics
sharpe_ratio = self.calculate_sharpe_ratio(equity_df['returns'])
max_drawdown = self.calculate_max_drawdown(equity_df['equity'])
# Trade statistics
winning_trades = [t for t in self.trades if t['pnl'] > 0]
losing_trades = [t for t in self.trades if t['pnl'] < 0]
win_rate = len(winning_trades) / len(self.trades) * 100 if self.trades else 0
return {
'total_return': total_return,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'win_rate': win_rate,
'total_trades': len(self.trades),
'winning_trades': len(winning_trades),
'losing_trades': len(losing_trades),
'final_capital': self.capital,
'equity_curve': equity_df
}
def calculate_sharpe_ratio(self, returns, risk_free_rate=0.02):
"""Calculate Sharpe ratio"""
if returns.std() == 0:
return 0
excess_returns = returns.mean() - risk_free_rate / 252
return excess_returns / returns.std() * np.sqrt(252)
def calculate_max_drawdown(self, equity):
"""Calculate maximum drawdown"""
peak = equity.expanding().max()
drawdown = (equity - peak) / peak
return drawdown.min() * 100
Deployment and Production Considerations
Docker Configuration
Create a containerized deployment setup:
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# Install Ollama
RUN curl -fsSL https://ollama.ai/install.sh | sh
# Copy requirements
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Expose ports
EXPOSE 5000 8765
# Start script
COPY start.sh .
RUN chmod +x start.sh
CMD ["./start.sh"]
# start.sh
#!/bin/bash
# Start Ollama service
ollama serve &
# Wait for Ollama to be ready
sleep 10
# Pull required models
ollama pull llama2
# Start Flask application
python enhanced_app.py &
# Start WebSocket server
python websocket_handler.py &
# Keep container running
wait
Production Monitoring
# monitoring.py
import psutil
import time
import json
from datetime import datetime
class SystemMonitor:
def __init__(self):
self.metrics_history = []
def collect_metrics(self):
"""Collect system performance metrics"""
metrics = {
'timestamp': datetime.now().isoformat(),
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_usage': psutil.disk_usage('/').percent,
'network_io': psutil.net_io_counters()._asdict(),
'process_count': len(psutil.pids())
}
self.metrics_history.append(metrics)
# Keep only last 1000 metrics (about 16 minutes at 1-second intervals)
if len(self.metrics_history) > 1000:
self.metrics_history.pop(0)
return metrics
def get_health_status(self):
"""Get current system health status"""
if not self.metrics_history:
return {'status': 'unknown', 'message': 'No metrics available'}
latest = self.metrics_history[-1]
# Define health thresholds
if latest['cpu_percent'] > 80 or latest['memory_percent'] > 85:
return {'status': 'critical', 'message': 'High resource usage detected'}
elif latest['cpu_percent'] > 60 or latest['memory_percent'] > 70:
return {'status': 'warning', 'message': 'Moderate resource usage'}
else:
return {'status': 'healthy', 'message': 'System running normally'}
def generate_report(self):
"""Generate performance report"""
if len(self.metrics_history) < 2:
return {'error': 'Insufficient data for report'}
recent_metrics = self.metrics_history[-60:] # Last 60 measurements
avg_cpu = sum(m['cpu_percent'] for m in recent_metrics) / len(recent_metrics)
avg_memory = sum(m['memory_percent'] for m in recent_metrics) / len(recent_metrics)
return {
'average_cpu_usage': round(avg_cpu, 2),
'average_memory_usage': round(avg_memory, 2),
'current_disk_usage': recent_metrics[-1]['disk_usage'],
'uptime_minutes': len(self.metrics_history),
'health_status': self.get_health_status()
}
Security Best Practices
API Security Implementation
# security.py
import jwt
import hashlib
import hmac
import time
from functools import wraps
from flask import request, jsonify
class SecurityManager:
def __init__(self, secret_key):
self.secret_key = secret_key
self.rate_limits = {}
def generate_api_key(self, user_id):
"""Generate API key for user"""
payload = {
'user_id': user_id,
'created_at': time.time(),
'expires_at': time.time() + 86400 # 24 hours
}
return jwt.encode(payload, self.secret_key, algorithm='HS256')
def validate_api_key(self, token):
"""Validate API key"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
if payload['expires_at'] < time.time():
return False, "Token expired"
return True, payload
except jwt.InvalidTokenError:
return False, "Invalid token"
def rate_limit(self, limit=60, window=60):
"""Rate limiting decorator"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
client_ip = request.remote_addr
current_time = time.time()
# Clean old entries
self.rate_limits = {
ip: requests for ip, requests in self.rate_limits.items()
if any(req_time > current_time - window for req_time in requests)
}
# Check rate limit
if client_ip not in self.rate_limits:
self.rate_limits[client_ip] = []
# Filter recent requests
recent_requests = [
req_time for req_time in self.rate_limits[client_ip]
if req_time > current_time - window
]
if len(recent_requests) >= limit:
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': window
}), 429
# Add current request
self.rate_limits[client_ip].append(current_time)
return func(*args, **kwargs)
return wrapper
return decorator
def require_auth(self, func):
"""Authentication decorator"""
@wraps(func)
def wrapper(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({'error': 'Missing or invalid authorization header'}), 401
token = auth_header.split(' ')[1]
valid, payload = self.validate_api_key(token)
if not valid:
return jsonify({'error': payload}), 401
# Add user info to request context
request.user = payload
return func(*args, **kwargs)
return wrapper
def validate_request_signature(self, request_data, signature, timestamp):
"""Validate request signature for webhook security"""
# Check timestamp (prevent replay attacks)
if abs(time.time() - timestamp) > 300: # 5 minutes
return False
# Create expected signature
message = f"{timestamp}.{request_data}"
expected_signature = hmac.new(
self.secret_key.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected_signature)
Troubleshooting Common Issues
Connection Problems
# diagnostics.py
import requests
import socket
import subprocess
import json
from datetime import datetime
class DiagnosticTools:
def __init__(self):
self.test_results = []
def test_ollama_connection(self):
"""Test Ollama API connectivity"""
try:
response = requests.get('http://localhost:11434/api/tags', timeout=5)
if response.status_code == 200:
models = response.json()
return {
'status': 'success',
'message': 'Ollama connected successfully',
'available_models': [model['name'] for model in models.get('models', [])]
}
else:
return {
'status': 'error',
'message': f'Ollama API returned status {response.status_code}'
}
except requests.exceptions.ConnectionError:
return {
'status': 'error',
'message': 'Cannot connect to Ollama service. Is it running?'
}
except Exception as e:
return {
'status': 'error',
'message': f'Unexpected error: {str(e)}'
}
def test_model_inference(self, model_name='llama2'):
"""Test model inference capability"""
try:
test_prompt = "Analyze this simple market data: price went from 100 to 105. What's the signal?"
response = requests.post('http://localhost:11434/api/generate', json={
'model': model_name,
'prompt': test_prompt,
'stream': False
}, timeout=30)
if response.status_code == 200:
result = response.json()
return {
'status': 'success',
'message': 'Model inference working',
'response_length': len(result.get('response', '')),
'sample_response': result.get('response', '')[:100] + '...'
}
else:
return {
'status': 'error',
'message': f'Model inference failed with status {response.status_code}'
}
except Exception as e:
return {
'status': 'error',
'message': f'Model inference error: {str(e)}'
}
def test_port_availability(self, ports=[5000, 8765, 11434]):
"""Test if required ports are available"""
results = {}
for port in ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex(('localhost', port))
sock.close()
results[port] = {
'status': 'open' if result == 0 else 'closed',
'service': self.get_service_name(port)
}
return results
def get_service_name(self, port):
"""Get service name for port"""
service_map = {
5000: 'Flask API',
8765: 'WebSocket Server',
11434: 'Ollama API'
}
return service_map.get(port, 'Unknown')
def run_full_diagnostic(self):
"""Run comprehensive diagnostic check"""
results = {
'timestamp': datetime.now().isoformat(),
'tests': {}
}
# Test Ollama connection
results['tests']['ollama_connection'] = self.test_ollama_connection()
# Test model inference if Ollama is connected
if results['tests']['ollama_connection']['status'] == 'success':
results['tests']['model_inference'] = self.test_model_inference()
# Test port availability
results['tests']['port_availability'] = self.test_port_availability()
# System resource check
results['tests']['system_resources'] = self.check_system_resources()
return results
def check_system_resources(self):
"""Check system resource availability"""
try:
import psutil
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
'status': 'success',
'cpu_usage': cpu_percent,
'memory_usage': memory.percent,
'memory_available': memory.available / (1024**3), # GB
'disk_usage': disk.percent,
'disk_free': disk.free / (1024**3) # GB
}
except Exception as e:
return {
'status': 'error',
'message': f'Resource check failed: {str(e)}'
}
Advanced Configuration Options
Environment Configuration
# config.py
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class OllamaConfig:
"""Ollama service configuration"""
host: str = "localhost"
port: int = 11434
model: str = "llama2"
timeout: int = 30
max_tokens: int = 2048
temperature: float = 0.7
@classmethod
def from_env(cls):
return cls(
host=os.getenv('OLLAMA_HOST', 'localhost'),
port=int(os.getenv('OLLAMA_PORT', '11434')),
model=os.getenv('OLLAMA_MODEL', 'llama2'),
timeout=int(os.getenv('OLLAMA_TIMEOUT', '30')),
max_tokens=int(os.getenv('OLLAMA_MAX_TOKENS', '2048')),
temperature=float(os.getenv('OLLAMA_TEMPERATURE', '0.7'))
)
@dataclass
class TradingConfig:
"""Trading-specific configuration"""
risk_per_trade: float = 0.02 # 2% risk per trade
max_positions: int = 5
stop_loss_pct: float = 0.05 # 5% stop loss
take_profit_pct: float = 0.10 # 10% take profit
min_confidence: float = 0.7 # Minimum AI confidence for signals
@classmethod
def from_env(cls):
return cls(
risk_per_trade=float(os.getenv('RISK_PER_TRADE', '0.02')),
max_positions=int(os.getenv('MAX_POSITIONS', '5')),
stop_loss_pct=float(os.getenv('STOP_LOSS_PCT', '0.05')),
take_profit_pct=float(os.getenv('TAKE_PROFIT_PCT', '0.10')),
min_confidence=float(os.getenv('MIN_CONFIDENCE', '0.7'))
)
@dataclass
class AppConfig:
"""Main application configuration"""
debug: bool = False
secret_key: str = "your-secret-key-here"
database_url: str = "sqlite:///trading.db"
redis_url: str = "redis://localhost:6379"
ollama: OllamaConfig = OllamaConfig()
trading: TradingConfig = TradingConfig()
@classmethod
def from_env(cls):
return cls(
debug=os.getenv('DEBUG', 'false').lower() == 'true',
secret_key=os.getenv('SECRET_KEY', 'your-secret-key-here'),
database_url=os.getenv('DATABASE_URL', 'sqlite:///trading.db'),
redis_url=os.getenv('REDIS_URL', 'redis://localhost:6379'),
ollama=OllamaConfig.from_env(),
trading=TradingConfig.from_env()
)
Performance Optimization Strategies
Model Response Caching
# model_cache.py
import redis
import json
import hashlib
from datetime import timedelta
class ModelCache:
def __init__(self, redis_url='redis://localhost:6379'):
self.redis_client = redis.from_url(redis_url)
self.default_ttl = 300 # 5 minutes
def generate_key(self, model_name, prompt, parameters=None):
"""Generate cache key for model request"""
key_data = {
'model': model_name,
'prompt': prompt,
'parameters': parameters or {}
}
key_string = json.dumps(key_data, sort_keys=True)
return f"model_cache:{hashlib.md5(key_string.encode()).hexdigest()}"
def get(self, model_name, prompt, parameters=None):
"""Get cached model response"""
key = self.generate_key(model_name, prompt, parameters)
cached_data = self.redis_client.get(key)
if cached_data:
return json.loads(cached_data)
return None
def set(self, model_name, prompt, response, parameters=None, ttl=None):
"""Cache model response"""
key = self.generate_key(model_name, prompt, parameters)
cache_data = {
'response': response,
'timestamp': time.time(),
'parameters': parameters
}
self.redis_client.setex(
key,
ttl or self.default_ttl,
json.dumps(cache_data)
)
def invalidate_pattern(self, pattern):
"""Invalidate cache entries matching pattern"""
keys = self.redis_client.keys(f"model_cache:{pattern}*")
if keys:
self.redis_client.delete(*keys)
Async Processing with Celery
# tasks.py
from celery import Celery
import pandas as pd
from datetime import datetime
# Initialize Celery
celery_app = Celery('trading_tasks', broker='redis://localhost:6379')
@celery_app.task
def process_historical_data(symbol, start_date, end_date):
"""Process historical data asynchronously"""
try:
# Fetch historical data (implement your data source)
data = fetch_market_data(symbol, start_date, end_date)
# Process with AI model
signals = []
for i in range(len(data)):
if i >= 50: # Minimum data for analysis
window_data = data.iloc[i-50:i]
signal = analyze_with_ai(window_data)
signals.append({
'timestamp': data.iloc[i]['timestamp'],
'signal': signal['signal'],
'confidence': signal['confidence']
})
# Store results
store_signals(symbol, signals)
return {
'status': 'success',
'symbol': symbol,
'signals_generated': len(signals),
'processed_at': datetime.now().isoformat()
}
except Exception as e:
return {
'status': 'error',
'error': str(e),
'symbol': symbol
}
@celery_app.task
def retrain_model(training_data_path):
"""Retrain AI model with new data"""
try:
# Load training data
df = pd.read_csv(training_data_path)
# Retrain model (implement your training logic)
model_metrics = train_model(df)
# Deploy new model
deploy_model()
return {
'status': 'success',
'metrics': model_metrics,
'retrained_at': datetime.now().isoformat()
}
except Exception as e:
return {
'status': 'error',
'error': str(e)
}
Integration with Popular Trading Platforms
Binance API Integration
# binance_integration.py
import ccxt
import pandas as pd
from datetime import datetime, timedelta
class BinanceIntegration:
def __init__(self, api_key=None, api_secret=None, sandbox=True):
self.exchange = ccxt.binance({
'apiKey': api_key,
'secret': api_secret,
'sandbox': sandbox,
'enableRateLimit': True,
})
def fetch_ohlcv(self, symbol, timeframe='1h', limit=100):
"""Fetch OHLCV data from Binance"""
try:
ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
return df
except Exception as e:
print(f"Error fetching OHLCV data: {e}")
return None
def place_order(self, symbol, side, amount, price=None, order_type='market'):
"""Place order on Binance"""
try:
if order_type == 'market':
order = self.exchange.create_market_order(symbol, side, amount)
else:
order = self.exchange.create_limit_order(symbol, side, amount, price)
return {
'success': True,
'order_id': order['id'],
'symbol': symbol,
'side': side,
'amount': amount,
'price': price,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def get_account_balance(self):
"""Get account balance"""
try:
balance = self.exchange.fetch_balance()
return {
'success': True,
'balance': balance['total'],
'free': balance['free'],
'used': balance['used']
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
Best Practices and Recommendations
Code Organization
Structure your project for maintainability:
tradingview-ollama/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── ai_models.py
│ │ └── trading_models.py
│ ├── services/
│ │ ├── __init__.py
│ │ ├── ollama_service.py
│ │ ├── trading_service.py
│ │ └── market_data_service.py
│ ├── utils/
│ │ ├── __init__.py
│ │ ├── cache.py
│ │ ├── security.py
│ │ └── diagnostics.py
│ └── config/
│ ├── __init__.py
│ ├── settings.py
│ └── logging.py
├── tests/
│ ├── __init__.py
│ ├── test_models.py
│ ├── test_services.py
│ └── test_integration.py
├── scripts/
│ ├── deploy.sh
│ ├── backup.sh
│ └── maintenance.py
├── docker/
│ ├── Dockerfile
│ ├── docker-compose.yml
│ └── requirements.txt
└── docs/
├── API.md
├── DEPLOYMENT.md
└── TROUBLESHOOTING.md
Testing Strategy
# test_integration.py
import unittest
import requests
import json
from unittest.mock import patch, MagicMock
class TestTradingViewOllamaIntegration(unittest.TestCase):
def setUp(self):
self.base_url = "http://localhost:5000"
self.test_data = {
'prices': [100, 101, 102, 103, 104, 105],
'volumes': [1000, 1100, 1200, 1300, 1400, 1500],
'symbol': 'BTCUSD',
'timeframe': '1h'
}
def test_api_health_check(self):
"""Test API health endpoint"""
response = requests.get(f"{self.base_url}/health")
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertIn('status', data)
self.assertEqual(data['status'], 'healthy')
@patch('requests.post')
def test_ollama_integration(self, mock_post):
"""Test Ollama API integration"""
# Mock Ollama response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
'response': 'The market data shows bullish momentum with increasing prices and volume.'
}
mock_post.return_value = mock_response
# Test analysis endpoint
response = requests.post(
f"{self.base_url}/analyze",
json=self.test_data
)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertTrue(data['success'])
self.assertIn('signals', data)
def test_invalid_data_handling(self):
"""Test handling of invalid input data"""
invalid_data = {'prices': []} # Empty prices array
response = requests.post(
f"{self.base_url}/analyze",
json=invalid_data
)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertFalse(data['success'])
self.assertIn('error', data)
def test_rate_limiting(self):
"""Test rate limiting functionality"""
# Make multiple rapid requests
responses = []
for i in range(65): # Exceed rate limit
response = requests.post(
f"{self.base_url}/analyze",
json=self.test_data
)
responses.append(response)
# Check that some requests were rate limited
rate_limited = [r for r in responses if r.status_code == 429]
self.assertTrue(len(rate_limited) > 0)
if __name__ == '__main__':
unittest.main()
Conclusion
Integrating TradingView with Ollama creates powerful possibilities for AI-enhanced trading indicators. This combination provides local AI processing, maintaining data privacy while delivering sophisticated market analysis capabilities.
Key benefits of this integration include:
Enhanced Decision Making: AI models can identify complex patterns that traditional indicators miss, improving signal accuracy and timing.
Privacy and Security: Local processing through Ollama keeps sensitive trading data on your systems, avoiding cloud-based AI services.
Customization Flexibility: You can train models specifically for your trading strategy, market conditions, and risk tolerance.
Real-time Processing: The integration supports live market Data Analysis, enabling responsive trading decisions.
Scalable Architecture: The modular design allows for easy expansion and modification as your trading needs evolve.
Remember to thoroughly backtest any AI-enhanced indicators before live trading. Start with paper trading to validate performance and gradually increase position sizes as confidence builds.
This integration represents the future of algorithmic trading—where artificial intelligence enhances human decision-making rather than replacing it. The combination of TradingView's visualization capabilities with Ollama's AI processing creates a powerful toolkit for modern traders.
For ongoing development, consider implementing continuous learning systems that adapt to changing market conditions, and always maintain proper risk management regardless of AI confidence levels.
The trading landscape continues evolving, and tools like these help traders stay competitive while maintaining control over their data and strategies.