Picture this: You're scrolling through crypto Twitter at 3 AM, watching ETH price charts bounce like a caffeinated kangaroo. Sound familiar? Welcome to the wild world of Ethereum trading, where emotions run high and wallets run dry faster than free pizza at a developer conference.
Traditional technical analysis feels like reading tea leaves in a hurricane. But what if you could combine the raw power of on-chain data with AI-powered sentiment analysis? That's exactly what we'll build today using Ollama and real blockchain metrics.
This comprehensive guide shows you how to create a robust Ethereum market analysis system that processes social sentiment, on-chain metrics, and market indicators. You'll learn to build predictive models that actually work in the volatile crypto market.
What Makes Ethereum Analysis Different in 2025?
Ethereum's market behavior has evolved dramatically since the merge and recent upgrades. Traditional indicators miss crucial signals that modern traders need to understand.
The Three Pillars of Modern ETH Analysis
On-Chain Metrics: Network activity, validator behavior, and transaction patterns reveal true market sentiment before price movements occur.
Social Sentiment: Community discussions, developer activity, and media coverage create momentum that precedes major price shifts.
Market Microstructure: DEX flows, staking patterns, and institutional activity provide early warning signals for trend changes.
Setting Up Your Ethereum Analysis Environment
Before diving into complex models, let's establish a solid foundation with the right tools and data sources.
Required Tools and Dependencies
# Core analysis libraries
import pandas as pd
import numpy as np
import requests
import json
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns
# Ollama integration
from ollama import Client
import asyncio
import aiohttp
# On-chain data sources
from web3 import Web3
import etherscan
from pycoingecko import CoinGeckoAPI
# Machine learning components
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
Connecting to Data Sources
class EthereumDataCollector:
def __init__(self):
self.etherscan_api = etherscan.Etherscan("YOUR_ETHERSCAN_API_KEY")
self.coingecko = CoinGeckoAPI()
self.ollama_client = Client(host='http://localhost:11434')
self.web3 = Web3(Web3.HTTPProvider('YOUR_RPC_ENDPOINT'))
def get_price_data(self, days=30):
"""Fetch historical ETH price data"""
data = self.coingecko.get_coin_market_chart_by_id(
id='ethereum',
vs_currency='usd',
days=days
)
prices = pd.DataFrame(data['prices'], columns=['timestamp', 'price'])
prices['timestamp'] = pd.to_datetime(prices['timestamp'], unit='ms')
return prices
def get_network_metrics(self):
"""Collect key on-chain metrics"""
# Get current network stats
latest_block = self.web3.eth.get_block('latest')
metrics = {
'block_number': latest_block['number'],
'gas_used': latest_block['gasUsed'],
'gas_limit': latest_block['gasLimit'],
'base_fee': latest_block.get('baseFeePerGas', 0),
'timestamp': latest_block['timestamp']
}
return metrics
Building the Sentiment Analysis Pipeline
Ollama provides powerful local language models that can process crypto-specific content without sending sensitive data to external APIs.
Setting Up Ollama for Crypto Sentiment
class CryptoSentimentAnalyzer:
def __init__(self, model_name='llama3.1'):
self.client = Client(host='http://localhost:11434')
self.model = model_name
def analyze_sentiment(self, text):
"""Analyze crypto-specific sentiment using Ollama"""
prompt = f"""
Analyze the sentiment of this cryptocurrency-related text about Ethereum.
Rate the sentiment from -1 (very bearish) to +1 (very bullish).
Consider crypto-specific terms, market conditions, and technical indicators.
Text: "{text}"
Provide only a numerical score between -1 and +1, followed by a brief explanation.
"""
response = self.client.generate(
model=self.model,
prompt=prompt,
options={'temperature': 0.3} # Low temperature for consistent scoring
)
return self.parse_sentiment_response(response['response'])
def parse_sentiment_response(self, response):
"""Extract numerical sentiment score from Ollama response"""
try:
# Look for numerical score in response
import re
score_match = re.search(r'(-?\d+\.?\d*)', response)
if score_match:
score = float(score_match.group(1))
return max(-1, min(1, score)) # Clamp between -1 and 1
return 0 # Neutral if no score found
except:
return 0
Processing Social Media Data
class SocialDataProcessor:
def __init__(self, sentiment_analyzer):
self.sentiment_analyzer = sentiment_analyzer
def process_reddit_data(self, subreddit='ethereum', limit=100):
"""Process Reddit posts for sentiment analysis"""
# Using PRAW (Python Reddit API Wrapper)
import praw
reddit = praw.Reddit(
client_id="YOUR_CLIENT_ID",
client_secret="YOUR_CLIENT_SECRET",
user_agent="ethereum_analyzer_v1.0"
)
posts = []
for post in reddit.subreddit(subreddit).hot(limit=limit):
sentiment_score = self.sentiment_analyzer.analyze_sentiment(
post.title + " " + post.selftext
)
posts.append({
'title': post.title,
'score': post.score,
'num_comments': post.num_comments,
'sentiment': sentiment_score,
'created_utc': post.created_utc
})
return pd.DataFrame(posts)
def aggregate_sentiment(self, social_data):
"""Aggregate sentiment scores with weights"""
# Weight by engagement (upvotes + comments)
social_data['engagement'] = social_data['score'] + social_data['num_comments']
social_data['weighted_sentiment'] = social_data['sentiment'] * social_data['engagement']
total_sentiment = social_data['weighted_sentiment'].sum()
total_engagement = social_data['engagement'].sum()
return total_sentiment / total_engagement if total_engagement > 0 else 0
Advanced On-Chain Analytics
On-chain data provides objective insights into network health and user behavior that sentiment analysis alone cannot capture.
Key Metrics Collection
class OnChainAnalyzer:
def __init__(self, etherscan_key):
self.etherscan = etherscan.Etherscan(etherscan_key)
self.web3 = Web3(Web3.HTTPProvider('YOUR_RPC_ENDPOINT'))
def get_validator_metrics(self):
"""Analyze validator and staking behavior"""
# Get beacon chain data
beacon_url = "https://beaconcha.in/api/v1/epoch/latest"
response = requests.get(beacon_url)
if response.status_code == 200:
data = response.json()['data']
return {
'total_validators': data.get('validatorscount', 0),
'active_validators': data.get('validatorscount', 0),
'total_staked_eth': data.get('totalvalidatorbalance', 0) / 1e9,
'epoch': data.get('epoch', 0)
}
return {}
def analyze_transaction_patterns(self, blocks_to_analyze=100):
"""Analyze recent transaction patterns"""
latest_block = self.web3.eth.get_block('latest')
metrics = {
'avg_gas_price': [],
'tx_count': [],
'avg_tx_value': [],
'block_utilization': []
}
for i in range(blocks_to_analyze):
block_num = latest_block['number'] - i
block = self.web3.eth.get_block(block_num, full_transactions=True)
if block['transactions']:
gas_prices = [tx['gasPrice'] for tx in block['transactions']]
tx_values = [tx['value'] for tx in block['transactions']]
metrics['avg_gas_price'].append(np.mean(gas_prices))
metrics['tx_count'].append(len(block['transactions']))
metrics['avg_tx_value'].append(np.mean(tx_values))
metrics['block_utilization'].append(block['gasUsed'] / block['gasLimit'])
return {
'avg_gas_price': np.mean(metrics['avg_gas_price']),
'avg_tx_count': np.mean(metrics['tx_count']),
'avg_tx_value': np.mean(metrics['avg_tx_value']),
'avg_utilization': np.mean(metrics['block_utilization'])
}
def get_defi_metrics(self):
"""Analyze DeFi protocol metrics"""
# Total Value Locked (TVL) data
tvl_url = "https://api.llama.fi/protocol/ethereum"
response = requests.get(tvl_url)
if response.status_code == 200:
data = response.json()
return {
'total_tvl': data.get('tvl', 0),
'change_1d': data.get('change_1d', 0),
'change_7d': data.get('change_7d', 0)
}
return {}
Creating the Integrated Analysis Model
Now we combine sentiment analysis with on-chain metrics to create a comprehensive market analysis model.
Feature Engineering
class EthereumMarketModel:
def __init__(self):
self.scaler = StandardScaler()
self.model = RandomForestRegressor(n_estimators=100, random_state=42)
self.features = []
def create_features(self, price_data, sentiment_data, onchain_data):
"""Create comprehensive feature set"""
features = pd.DataFrame()
# Price-based features
features['price'] = price_data['price']
features['price_change_1h'] = price_data['price'].pct_change(periods=1)
features['price_change_24h'] = price_data['price'].pct_change(periods=24)
features['price_volatility'] = price_data['price'].rolling(24).std()
# Technical indicators
features['sma_20'] = price_data['price'].rolling(20).mean()
features['sma_50'] = price_data['price'].rolling(50).mean()
features['rsi'] = self.calculate_rsi(price_data['price'])
# Sentiment features
features['sentiment_score'] = sentiment_data['sentiment_score']
features['sentiment_volume'] = sentiment_data['post_volume']
features['sentiment_trend'] = sentiment_data['sentiment_score'].rolling(5).mean()
# On-chain features
features['gas_price'] = onchain_data['avg_gas_price']
features['tx_count'] = onchain_data['tx_count']
features['network_utilization'] = onchain_data['utilization']
features['staked_eth_ratio'] = onchain_data['staked_eth_ratio']
return features.dropna()
def calculate_rsi(self, prices, period=14):
"""Calculate Relative Strength Index"""
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def train_model(self, features, target):
"""Train the market prediction model"""
X = features.drop(['price'], axis=1)
y = target
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Scale features
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
# Train model
self.model.fit(X_train_scaled, y_train)
# Evaluate
y_pred = self.model.predict(X_test_scaled)
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
return {
'mse': mse,
'r2': r2,
'feature_importance': dict(zip(X.columns, self.model.feature_importances_))
}
Real-Time Analysis Pipeline
class RealTimeAnalyzer:
def __init__(self):
self.data_collector = EthereumDataCollector()
self.sentiment_analyzer = CryptoSentimentAnalyzer()
self.onchain_analyzer = OnChainAnalyzer("YOUR_ETHERSCAN_KEY")
self.model = EthereumMarketModel()
async def run_analysis(self):
"""Run complete analysis pipeline"""
print("Starting Ethereum market analysis...")
# Collect data
price_data = self.data_collector.get_price_data(days=30)
sentiment_data = await self.collect_sentiment_data()
onchain_data = self.collect_onchain_data()
# Process and analyze
features = self.model.create_features(price_data, sentiment_data, onchain_data)
# Generate predictions
current_features = features.iloc[-1:].drop(['price'], axis=1)
current_features_scaled = self.model.scaler.transform(current_features)
prediction = self.model.model.predict(current_features_scaled)[0]
# Create analysis report
report = self.generate_report(
current_price=price_data['price'].iloc[-1],
prediction=prediction,
sentiment_score=sentiment_data['sentiment_score'].iloc[-1],
onchain_metrics=onchain_data
)
return report
def generate_report(self, current_price, prediction, sentiment_score, onchain_metrics):
"""Generate comprehensive analysis report"""
price_change = ((prediction - current_price) / current_price) * 100
report = {
'timestamp': datetime.now().isoformat(),
'current_price': current_price,
'predicted_price': prediction,
'price_change_percent': price_change,
'sentiment_score': sentiment_score,
'market_signal': self.get_market_signal(price_change, sentiment_score),
'onchain_health': self.assess_network_health(onchain_metrics),
'confidence_level': self.calculate_confidence(sentiment_score, onchain_metrics)
}
return report
def get_market_signal(self, price_change, sentiment):
"""Generate trading signal based on analysis"""
if price_change > 5 and sentiment > 0.3:
return "STRONG_BUY"
elif price_change > 2 and sentiment > 0.1:
return "BUY"
elif price_change < -5 and sentiment < -0.3:
return "STRONG_SELL"
elif price_change < -2 and sentiment < -0.1:
return "SELL"
else:
return "HOLD"
Advanced Analysis Techniques
Correlation Analysis
def analyze_correlations(price_data, sentiment_data, onchain_data):
"""Analyze correlations between different data sources"""
# Combine all data
combined_data = pd.merge(price_data, sentiment_data, on='timestamp', how='inner')
combined_data = pd.merge(combined_data, onchain_data, on='timestamp', how='inner')
# Calculate correlation matrix
correlation_matrix = combined_data.corr()
# Plot heatmap
plt.figure(figsize=(12, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('Ethereum Market Data Correlation Matrix')
plt.tight_layout()
plt.show()
return correlation_matrix
Market Regime Detection
class MarketRegimeDetector:
def __init__(self):
self.regimes = ['Bull', 'Bear', 'Sideways', 'Volatile']
def detect_regime(self, price_data, sentiment_data, volatility_threshold=0.05):
"""Detect current market regime"""
# Calculate metrics
price_trend = price_data['price'].pct_change(periods=20).mean()
volatility = price_data['price'].pct_change().std()
sentiment_avg = sentiment_data['sentiment_score'].mean()
# Classify regime
if price_trend > 0.02 and sentiment_avg > 0.2:
return 'Bull'
elif price_trend < -0.02 and sentiment_avg < -0.2:
return 'Bear'
elif volatility > volatility_threshold:
return 'Volatile'
else:
return 'Sideways'
def regime_specific_analysis(self, regime, features):
"""Adjust analysis based on market regime"""
if regime == 'Bull':
# In bull markets, focus on momentum indicators
return self.bull_market_signals(features)
elif regime == 'Bear':
# In bear markets, focus on support levels
return self.bear_market_signals(features)
elif regime == 'Volatile':
# In volatile markets, focus on risk management
return self.volatile_market_signals(features)
else:
# In sideways markets, focus on range trading
return self.sideways_market_signals(features)
Performance Optimization and Deployment
Efficient Data Processing
class OptimizedDataProcessor:
def __init__(self):
self.cache = {}
self.cache_ttl = 300 # 5 minutes
def cached_request(self, url, cache_key):
"""Implement caching for API requests"""
current_time = time.time()
if cache_key in self.cache:
cached_data, timestamp = self.cache[cache_key]
if current_time - timestamp < self.cache_ttl:
return cached_data
# Make fresh request
response = requests.get(url)
if response.status_code == 200:
data = response.json()
self.cache[cache_key] = (data, current_time)
return data
return None
def batch_process_sentiment(self, texts, batch_size=10):
"""Process sentiment analysis in batches"""
results = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
batch_results = []
for text in batch:
sentiment = self.sentiment_analyzer.analyze_sentiment(text)
batch_results.append(sentiment)
results.extend(batch_results)
time.sleep(0.1) # Rate limiting
return results
Model Deployment Strategy
class ModelDeployment:
def __init__(self):
self.model_path = "ethereum_market_model.pkl"
self.config_path = "model_config.json"
def save_model(self, model, scaler, feature_names):
"""Save trained model and preprocessing components"""
import pickle
model_data = {
'model': model,
'scaler': scaler,
'feature_names': feature_names,
'timestamp': datetime.now().isoformat()
}
with open(self.model_path, 'wb') as f:
pickle.dump(model_data, f)
print(f"Model saved to {self.model_path}")
def load_model(self):
"""Load saved model for inference"""
import pickle
with open(self.model_path, 'rb') as f:
model_data = pickle.load(f)
return model_data['model'], model_data['scaler'], model_data['feature_names']
def create_api_endpoint(self):
"""Create FastAPI endpoint for model inference"""
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class PredictionRequest(BaseModel):
features: dict
@app.post("/predict")
async def predict(request: PredictionRequest):
model, scaler, feature_names = self.load_model()
# Prepare features
feature_values = [request.features.get(name, 0) for name in feature_names]
features_scaled = scaler.transform([feature_values])
# Make prediction
prediction = model.predict(features_scaled)[0]
return {
'prediction': prediction,
'timestamp': datetime.now().isoformat(),
'model_version': '1.0'
}
return app
Monitoring and Alerting System
Real-Time Monitoring
class MarketMonitor:
def __init__(self):
self.alert_thresholds = {
'price_change': 0.05, # 5% price change
'sentiment_shift': 0.3, # Significant sentiment change
'volume_spike': 2.0, # 2x normal volume
'gas_spike': 3.0 # 3x normal gas prices
}
def monitor_market(self, interval_minutes=5):
"""Continuous market monitoring"""
while True:
try:
# Collect current data
current_data = self.collect_current_data()
# Check for alerts
alerts = self.check_alerts(current_data)
if alerts:
self.send_alerts(alerts)
# Log status
self.log_status(current_data)
time.sleep(interval_minutes * 60)
except Exception as e:
print(f"Monitoring error: {e}")
time.sleep(60) # Wait before retrying
def check_alerts(self, data):
"""Check for alert conditions"""
alerts = []
# Price change alert
if abs(data['price_change_1h']) > self.alert_thresholds['price_change']:
alerts.append({
'type': 'price_change',
'severity': 'high',
'message': f"ETH price changed {data['price_change_1h']:.2%} in 1 hour"
})
# Sentiment shift alert
if abs(data['sentiment_change']) > self.alert_thresholds['sentiment_shift']:
alerts.append({
'type': 'sentiment_shift',
'severity': 'medium',
'message': f"Sentiment shifted by {data['sentiment_change']:.2f}"
})
return alerts
def send_alerts(self, alerts):
"""Send alerts via multiple channels"""
for alert in alerts:
# Console output
print(f"ALERT: {alert['message']}")
# Email notification (implement as needed)
# self.send_email_alert(alert)
# Discord/Slack notification (implement as needed)
# self.send_discord_alert(alert)
Performance Metrics and Backtesting
Model Evaluation
class ModelEvaluator:
def __init__(self):
self.metrics = {}
def backtest_model(self, model, historical_data, test_period_days=30):
"""Backtest model performance"""
# Split data for backtesting
cutoff_date = datetime.now() - timedelta(days=test_period_days)
test_data = historical_data[historical_data['timestamp'] >= cutoff_date]
predictions = []
actual_prices = []
for i in range(len(test_data) - 1):
# Use current data to predict next period
current_features = test_data.iloc[i:i+1].drop(['price', 'timestamp'], axis=1)
prediction = model.predict(current_features)[0]
actual = test_data.iloc[i+1]['price']
predictions.append(prediction)
actual_prices.append(actual)
# Calculate metrics
mse = mean_squared_error(actual_prices, predictions)
mae = np.mean(np.abs(np.array(actual_prices) - np.array(predictions)))
mape = np.mean(np.abs((np.array(actual_prices) - np.array(predictions)) / np.array(actual_prices))) * 100
return {
'mse': mse,
'mae': mae,
'mape': mape,
'predictions': predictions,
'actual': actual_prices
}
def calculate_trading_performance(self, signals, price_data):
"""Calculate trading performance based on signals"""
portfolio_value = 10000 # Starting with $10,000
position = 0 # 0 = no position, 1 = long, -1 = short
trades = []
for i, signal in enumerate(signals):
current_price = price_data.iloc[i]['price']
if signal == 'BUY' and position <= 0:
# Buy signal
position = 1
trades.append({
'action': 'BUY',
'price': current_price,
'timestamp': price_data.iloc[i]['timestamp']
})
elif signal == 'SELL' and position >= 0:
# Sell signal
if position == 1:
# Calculate profit/loss
last_buy = trades[-1]['price']
pnl = (current_price - last_buy) / last_buy
portfolio_value *= (1 + pnl)
position = -1
trades.append({
'action': 'SELL',
'price': current_price,
'timestamp': price_data.iloc[i]['timestamp']
})
return {
'final_portfolio_value': portfolio_value,
'total_return': (portfolio_value - 10000) / 10000,
'number_of_trades': len(trades),
'trades': trades
}
Visualization and Reporting
Interactive Dashboard
class DashboardGenerator:
def __init__(self):
self.fig_size = (15, 10)
def create_comprehensive_dashboard(self, analysis_results):
"""Create comprehensive analysis dashboard"""
fig, axes = plt.subplots(2, 3, figsize=self.fig_size)
# Price and prediction chart
self.plot_price_prediction(axes[0, 0], analysis_results['price_data'],
analysis_results['predictions'])
# Sentiment over time
self.plot_sentiment_trend(axes[0, 1], analysis_results['sentiment_data'])
# On-chain metrics
self.plot_onchain_metrics(axes[0, 2], analysis_results['onchain_data'])
# Correlation heatmap
self.plot_correlation_heatmap(axes[1, 0], analysis_results['correlations'])
# Feature importance
self.plot_feature_importance(axes[1, 1], analysis_results['feature_importance'])
# Trading signals
self.plot_trading_signals(axes[1, 2], analysis_results['signals'])
plt.tight_layout()
plt.show()
def plot_price_prediction(self, ax, price_data, predictions):
"""Plot price vs predictions"""
ax.plot(price_data['timestamp'], price_data['price'], label='Actual Price', color='blue')
ax.plot(price_data['timestamp'][-len(predictions):], predictions,
label='Predicted Price', color='red', linestyle='--')
ax.set_title('ETH Price vs Predictions')
ax.set_xlabel('Time')
ax.set_ylabel('Price (USD)')
ax.legend()
ax.grid(True, alpha=0.3)
def generate_html_report(self, analysis_results):
"""Generate HTML report for web viewing"""
html_template = """
<!DOCTYPE html>
<html>
<head>
<title>Ethereum Market Analysis Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.metric {{ background: #f0f0f0; padding: 10px; margin: 10px 0; }}
.alert {{ background: #ffcccc; padding: 10px; margin: 10px 0; }}
.positive {{ color: green; }}
.negative {{ color: red; }}
</style>
</head>
<body>
<h1>Ethereum Market Analysis Report</h1>
<p>Generated: {timestamp}</p>
<div class="metric">
<h3>Current Price Analysis</h3>
<p>Current Price: ${current_price:.2f}</p>
<p>Predicted Price: ${predicted_price:.2f}</p>
<p class="{price_class}">Price Change: {price_change:.2f}%</p>
</div>
<div class="metric">
<h3>Sentiment Analysis</h3>
<p>Sentiment Score: {sentiment_score:.2f}</p>
<p>Market Signal: {market_signal}</p>
</div>
<div class="metric">
<h3>On-Chain Metrics</h3>
<p>Network Utilization: {network_utilization:.1f}%</p>
<p>Average Gas Price: {gas_price:.2f} Gwei</p>
<p>Transaction Count: {tx_count}</p>
</div>
<div class="metric">
<h3>Model Performance</h3>
<p>R² Score: {r2_score:.3f}</p>
<p>Mean Absolute Error: ${mae:.2f}</p>
<p>Confidence Level: {confidence:.1f}%</p>
</div>
</body>
</html>
"""
return html_template.format(
timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
current_price=analysis_results['current_price'],
predicted_price=analysis_results['predicted_price'],
price_change=analysis_results['price_change_percent'],
price_class='positive' if analysis_results['price_change_percent'] > 0 else 'negative',
sentiment_score=analysis_results['sentiment_score'],
market_signal=analysis_results['market_signal'],
network_utilization=analysis_results['network_utilization'],
gas_price=analysis_results['gas_price'] / 1e9, # Convert to Gwei
tx_count=analysis_results['tx_count'],
r2_score=analysis_results['r2_score'],
mae=analysis_results['mae'],
confidence=analysis_results['confidence_level']
)
Advanced Trading Strategies
Multi-Signal Trading Strategy
class AdvancedTradingStrategy:
def __init__(self):
self.position_size = 0.1 # 10% of portfolio per trade
self.stop_loss = 0.05 # 5% stop loss
self.take_profit = 0.15 # 15% take profit
self.max_positions = 3 # Maximum concurrent positions
def generate_trading_signals(self, market_data, sentiment_data, onchain_data):
"""Generate advanced trading signals using multiple indicators"""
signals = []
for i in range(len(market_data)):
# Technical indicators
rsi = market_data.iloc[i]['rsi']
macd_signal = self.calculate_macd_signal(market_data.iloc[max(0, i-25):i+1])
# Sentiment indicators
sentiment_score = sentiment_data.iloc[i]['sentiment_score']
sentiment_momentum = self.calculate_sentiment_momentum(sentiment_data.iloc[max(0, i-5):i+1])
# On-chain indicators
network_growth = self.calculate_network_growth(onchain_data.iloc[max(0, i-7):i+1])
whale_activity = self.detect_whale_activity(onchain_data.iloc[i])
# Combine signals
signal_strength = self.combine_signals(
rsi, macd_signal, sentiment_score, sentiment_momentum,
network_growth, whale_activity
)
# Generate final signal
if signal_strength > 0.7:
signals.append('STRONG_BUY')
elif signal_strength > 0.3:
signals.append('BUY')
elif signal_strength < -0.7:
signals.append('STRONG_SELL')
elif signal_strength < -0.3:
signals.append('SELL')
else:
signals.append('HOLD')
return signals
def calculate_macd_signal(self, price_data):
"""Calculate MACD signal"""
if len(price_data) < 26:
return 0
prices = price_data['price']
ema_12 = prices.ewm(span=12).mean()
ema_26 = prices.ewm(span=26).mean()
macd_line = ema_12 - ema_26
signal_line = macd_line.ewm(span=9).mean()
return 1 if macd_line.iloc[-1] > signal_line.iloc[-1] else -1
def calculate_sentiment_momentum(self, sentiment_data):
"""Calculate sentiment momentum"""
if len(sentiment_data) < 3:
return 0
recent_sentiment = sentiment_data['sentiment_score'].iloc[-3:].mean()
older_sentiment = sentiment_data['sentiment_score'].iloc[:-3].mean() if len(sentiment_data) > 3 else 0
return (recent_sentiment - older_sentiment) / (abs(older_sentiment) + 0.01)
def calculate_network_growth(self, onchain_data):
"""Calculate network growth metrics"""
if len(onchain_data) < 7:
return 0
current_metrics = onchain_data.iloc[-1]
week_ago_metrics = onchain_data.iloc[0]
tx_growth = (current_metrics['tx_count'] - week_ago_metrics['tx_count']) / week_ago_metrics['tx_count']
utilization_change = current_metrics['network_utilization'] - week_ago_metrics['network_utilization']
return (tx_growth + utilization_change) / 2
def detect_whale_activity(self, onchain_data):
"""Detect unusual whale activity"""
avg_tx_value = onchain_data.get('avg_tx_value', 0)
large_tx_threshold = 1000 * 1e18 # 1000 ETH in wei
return 1 if avg_tx_value > large_tx_threshold else 0
def combine_signals(self, rsi, macd, sentiment, sentiment_momentum, network_growth, whale_activity):
"""Combine multiple signals into single strength score"""
# Weight different signals
weights = {
'technical': 0.3,
'sentiment': 0.25,
'momentum': 0.2,
'network': 0.15,
'whale': 0.1
}
# Normalize RSI signal
rsi_signal = (50 - rsi) / 50 # Convert RSI to -1 to 1 scale
# Combine weighted signals
combined_signal = (
weights['technical'] * (rsi_signal + macd) / 2 +
weights['sentiment'] * sentiment +
weights['momentum'] * sentiment_momentum +
weights['network'] * network_growth +
weights['whale'] * whale_activity
)
return np.clip(combined_signal, -1, 1)
Risk Management System
class RiskManager:
def __init__(self):
self.max_portfolio_risk = 0.02 # 2% maximum portfolio risk per trade
self.max_correlation_exposure = 0.6 # Maximum 60% exposure to correlated assets
self.volatility_lookback = 20 # Days for volatility calculation
def calculate_position_size(self, portfolio_value, entry_price, stop_loss_price, volatility):
"""Calculate optimal position size based on risk parameters"""
# Calculate risk per share
risk_per_share = abs(entry_price - stop_loss_price)
# Calculate maximum dollar risk
max_dollar_risk = portfolio_value * self.max_portfolio_risk
# Adjust for volatility
volatility_adjustment = min(1.0, 0.2 / volatility) # Reduce size in high volatility
# Calculate position size
raw_position_size = max_dollar_risk / risk_per_share
adjusted_position_size = raw_position_size * volatility_adjustment
return min(adjusted_position_size, portfolio_value * 0.1) # Max 10% of portfolio
def check_risk_limits(self, current_positions, new_trade):
"""Check if new trade violates risk limits"""
# Calculate total portfolio risk
total_risk = sum(pos['risk_amount'] for pos in current_positions)
new_risk = new_trade['risk_amount']
if (total_risk + new_risk) / new_trade['portfolio_value'] > self.max_portfolio_risk * 5:
return False, "Portfolio risk limit exceeded"
# Check correlation limits
correlation_exposure = self.calculate_correlation_exposure(current_positions, new_trade)
if correlation_exposure > self.max_correlation_exposure:
return False, "Correlation exposure limit exceeded"
return True, "Risk limits satisfied"
def calculate_correlation_exposure(self, positions, new_trade):
"""Calculate exposure to correlated assets"""
# Simplified correlation calculation
# In practice, you'd use actual correlation coefficients
crypto_exposure = sum(pos['value'] for pos in positions if pos['asset_type'] == 'crypto')
total_exposure = sum(pos['value'] for pos in positions)
return crypto_exposure / total_exposure if total_exposure > 0 else 0
Production Deployment Guide
Docker Configuration
# Dockerfile for Ethereum Market Analysis
FROM python:3.9-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Install Ollama
RUN curl -fsSL https://ollama.ai/install.sh | sh
# Copy requirements
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Expose ports
EXPOSE 8000 11434
# Start services
CMD ["python", "main.py"]
Configuration Management
# config.py
import os
from dataclasses import dataclass
from typing import Dict, List
@dataclass
class APIConfig:
etherscan_key: str
coingecko_api_key: str
reddit_client_id: str
reddit_client_secret: str
web3_rpc_url: str
@dataclass
class ModelConfig:
ollama_host: str
ollama_model: str
update_interval_minutes: int
backtesting_days: int
@dataclass
class TradingConfig:
max_position_size: float
stop_loss_percent: float
take_profit_percent: float
risk_per_trade: float
@dataclass
class AppConfig:
api: APIConfig
model: ModelConfig
trading: TradingConfig
@classmethod
def from_env(cls):
"""Load configuration from environment variables"""
return cls(
api=APIConfig(
etherscan_key=os.getenv('ETHERSCAN_API_KEY'),
coingecko_api_key=os.getenv('COINGECKO_API_KEY'),
reddit_client_id=os.getenv('REDDIT_CLIENT_ID'),
reddit_client_secret=os.getenv('REDDIT_CLIENT_SECRET'),
web3_rpc_url=os.getenv('WEB3_RPC_URL')
),
model=ModelConfig(
ollama_host=os.getenv('OLLAMA_HOST', 'http://localhost:11434'),
ollama_model=os.getenv('OLLAMA_MODEL', 'llama3.1'),
update_interval_minutes=int(os.getenv('UPDATE_INTERVAL', '5')),
backtesting_days=int(os.getenv('BACKTESTING_DAYS', '30'))
),
trading=TradingConfig(
max_position_size=float(os.getenv('MAX_POSITION_SIZE', '0.1')),
stop_loss_percent=float(os.getenv('STOP_LOSS_PERCENT', '0.05')),
take_profit_percent=float(os.getenv('TAKE_PROFIT_PERCENT', '0.15')),
risk_per_trade=float(os.getenv('RISK_PER_TRADE', '0.02'))
)
)
Main Application
# main.py
import asyncio
import logging
from datetime import datetime
from config import AppConfig
from ethereum_analyzer import EthereumMarketAnalyzer
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('ethereum_analysis.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class EthereumAnalysisApp:
def __init__(self):
self.config = AppConfig.from_env()
self.analyzer = EthereumMarketAnalyzer(self.config)
self.running = False
async def start(self):
"""Start the analysis application"""
logger.info("Starting Ethereum Market Analysis Application")
self.running = True
# Initialize components
await self.analyzer.initialize()
# Start main analysis loop
await self.main_loop()
async def main_loop(self):
"""Main application loop"""
while self.running:
try:
# Run analysis
results = await self.analyzer.run_analysis()
# Log results
logger.info(f"Analysis complete - Signal: {results['market_signal']}, "
f"Price: ${results['current_price']:.2f}, "
f"Sentiment: {results['sentiment_score']:.2f}")
# Generate reports
await self.generate_reports(results)
# Wait for next iteration
await asyncio.sleep(self.config.model.update_interval_minutes * 60)
except Exception as e:
logger.error(f"Analysis error: {e}")
await asyncio.sleep(60) # Wait before retrying
async def generate_reports(self, results):
"""Generate and save analysis reports"""
# Generate HTML report
html_report = self.analyzer.generate_html_report(results)
# Save to file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"ethereum_analysis_{timestamp}.html"
with open(filename, 'w') as f:
f.write(html_report)
logger.info(f"Report saved to {filename}")
def stop(self):
"""Stop the application"""
logger.info("Stopping Ethereum Market Analysis Application")
self.running = False
async def main():
"""Main entry point"""
app = EthereumAnalysisApp()
try:
await app.start()
except KeyboardInterrupt:
logger.info("Received interrupt signal")
finally:
app.stop()
if __name__ == "__main__":
asyncio.run(main())
Best Practices and Optimization Tips
Performance Optimization
Data Caching: Implement intelligent caching for API responses to reduce latency and API costs.
Batch Processing: Process multiple sentiment analyses in batches to improve throughput.
Async Operations: Use asynchronous programming for I/O-bound operations like API calls.
Memory Management: Implement rolling windows for historical data to prevent memory bloat.
Model Improvement Strategies
Feature Engineering: Continuously test new features based on market conditions.
Ensemble Methods: Combine multiple models for better prediction accuracy.
Regular Retraining: Retrain models periodically to adapt to changing market conditions.
Cross-Validation: Use time-series cross-validation for more robust model evaluation.
Error Handling and Reliability
class RobustAnalyzer:
def __init__(self):
self.max_retries = 3
self.retry_delay = 1
async def robust_api_call(self, api_func, *args, **kwargs):
"""Make API calls with retry logic"""
for attempt in range(self.max_retries):
try:
return await api_func(*args, **kwargs)
except Exception as e:
if attempt == self.max_retries - 1:
raise e
logger.warning(f"API call failed (attempt {attempt + 1}): {e}")
await asyncio.sleep(self.retry_delay * (2 ** attempt))
return None
def validate_data(self, data):
"""Validate data quality before analysis"""
if not data or len(data) == 0:
raise ValueError("Empty data received")
# Check for missing values
if hasattr(data, 'isnull') and data.isnull().sum().sum() > len(data) * 0.1:
logger.warning("High percentage of missing values detected")
return True
Conclusion
This comprehensive Ethereum market analysis system combines the power of Ollama's local AI capabilities with real-time on-chain data to provide actionable trading insights. The system integrates multiple data sources, applies advanced machine learning techniques, and provides robust risk management tools.
Key benefits of this approach include:
- Privacy-First Analysis: Using Ollama ensures sensitive trading data never leaves your environment
- Multi-Signal Approach: Combines technical, sentiment, and on-chain indicators for comprehensive analysis
- Real-Time Monitoring: Continuous market monitoring with automated alerting
- Backtesting Capabilities: Validate strategies with historical data before deployment
- Production-Ready Architecture: Scalable, maintainable codebase with proper error handling
The system provides a solid foundation for serious cryptocurrency analysis while maintaining the flexibility to adapt to changing market conditions. Regular model updates and feature engineering will ensure continued performance as the Ethereum ecosystem evolves.
Remember that cryptocurrency markets are highly volatile and unpredictable. Always conduct thorough testing, implement proper risk management, and never invest more than you can afford to lose. This analysis system should be used as a tool to inform decisions, not as a guarantee of trading success.
For optimal results, combine this technical analysis with fundamental research, market news, and your own trading experience. The most successful traders use multiple tools and approaches to make informed decisions in the dynamic cryptocurrency market.