Remember when a single Trump tweet could send Bitcoin soaring? Those days aren't over. Political rallies still move crypto markets faster than you can say "HODL." Today, we'll build a Trump rally detector that monitors social media sentiment and predicts crypto price movements.
This Trump rally detector uses Ollama's local AI models to analyze Twitter, Reddit, and Telegram posts. You'll learn to identify rally sentiment patterns and correlate them with cryptocurrency price changes.
Why Trump Rally Sentiment Affects Crypto Markets
Political events create market volatility. Trump rallies generate massive social media buzz that directly impacts cryptocurrency prices. Traders who spot sentiment shifts early gain significant advantages.
The Crypto-Politics Connection
Trump's pro-crypto stance during rallies often triggers buying frenzies. Key sentiment indicators include:
- Rally attendance predictions
- Policy announcement speculation
- Media coverage volume
- Social media engagement rates
- Cryptocurrency mentions during speeches
Setting Up Your Trump Rally Detector Environment
Prerequisites
Install these tools before building your detector:
# Install Ollama
curl -fsSL https://ollama.ai/install.sh | sh
# Install Python dependencies
pip install tweepy pandas numpy matplotlib requests beautifulsoup4 schedule
# Install Node.js packages for web scraping
npm install puppeteer axios cheerio
Ollama Model Selection
Download models optimized for sentiment analysis:
# Download Llama 3.1 for sentiment analysis
ollama pull llama3.1:8b
# Download specialized sentiment model
ollama pull llama3.1:instruct
# Verify installations
ollama list
Building the Social Media Data Collection System
Twitter API Integration
Create a Twitter data collector that monitors Trump-related keywords:
import tweepy
import pandas as pd
import json
from datetime import datetime, timedelta
class TrumpRallyTwitterCollector:
def __init__(self, api_key, api_secret, access_token, access_secret):
"""Initialize Twitter API connection for rally monitoring"""
self.auth = tweepy.OAuthHandler(api_key, api_secret)
self.auth.set_access_token(access_token, access_secret)
self.api = tweepy.API(self.auth, wait_on_rate_limit=True)
# Keywords that indicate Trump rally activity
self.rally_keywords = [
"Trump rally", "MAGA rally", "Trump speech",
"Trump crypto", "Bitcoin Trump", "Trump cryptocurrency"
]
def collect_rally_tweets(self, hours_back=24, max_tweets=1000):
"""Collect tweets about Trump rallies from specified time period"""
tweets_data = []
since_date = (datetime.now() - timedelta(hours=hours_back)).strftime('%Y-%m-%d')
for keyword in self.rally_keywords:
try:
tweets = tweepy.Cursor(
self.api.search_tweets,
q=keyword,
since=since_date,
lang="en",
result_type="mixed"
).items(max_tweets // len(self.rally_keywords))
for tweet in tweets:
tweet_data = {
'id': tweet.id,
'text': tweet.text,
'created_at': tweet.created_at,
'user': tweet.user.screen_name,
'followers': tweet.user.followers_count,
'retweets': tweet.retweet_count,
'likes': tweet.favorite_count,
'keyword': keyword
}
tweets_data.append(tweet_data)
except Exception as e:
print(f"Error collecting tweets for {keyword}: {e}")
continue
return pd.DataFrame(tweets_data)
# Usage example
collector = TrumpRallyTwitterCollector(
api_key="your_api_key",
api_secret="your_api_secret",
access_token="your_access_token",
access_secret="your_access_secret"
)
tweets_df = collector.collect_rally_tweets(hours_back=48, max_tweets=500)
print(f"Collected {len(tweets_df)} tweets about Trump rallies")
Reddit Rally Discussion Scraper
Monitor Reddit communities for rally sentiment:
import praw
import requests
from datetime import datetime
class RedditRallyMonitor:
def __init__(self, client_id, client_secret, user_agent):
"""Initialize Reddit API for rally discussion monitoring"""
self.reddit = praw.Reddit(
client_id=client_id,
client_secret=client_secret,
user_agent=user_agent
)
# Subreddits likely to discuss Trump rallies and crypto
self.target_subreddits = [
'Bitcoin', 'CryptoCurrency', 'wallstreetbets',
'Conservative', 'The_Donald', 'politics'
]
def scrape_rally_discussions(self, limit=100):
"""Scrape Reddit posts and comments about Trump rallies"""
rally_posts = []
for subreddit_name in self.target_subreddits:
try:
subreddit = self.reddit.subreddit(subreddit_name)
# Search for rally-related posts
for post in subreddit.search("Trump rally OR MAGA rally", limit=limit):
post_data = {
'subreddit': subreddit_name,
'title': post.title,
'selftext': post.selftext,
'score': post.score,
'upvote_ratio': post.upvote_ratio,
'num_comments': post.num_comments,
'created_utc': datetime.fromtimestamp(post.created_utc),
'url': post.url
}
rally_posts.append(post_data)
except Exception as e:
print(f"Error scraping {subreddit_name}: {e}")
continue
return pd.DataFrame(rally_posts)
# Initialize Reddit scraper
reddit_monitor = RedditRallyMonitor(
client_id="your_client_id",
client_secret="your_client_secret",
user_agent="TrumpRallyDetector/1.0"
)
reddit_data = reddit_monitor.scrape_rally_discussions(limit=50)
print(f"Found {len(reddit_data)} rally discussions on Reddit")
Implementing Ollama Sentiment Analysis
Setting Up the Sentiment Analyzer
Create an Ollama-powered sentiment analyzer specifically tuned for political rally content:
import requests
import json
from typing import List, Dict
class OllamaSentimentAnalyzer:
def __init__(self, ollama_url="http://localhost:11434"):
"""Initialize Ollama sentiment analyzer for Trump rally content"""
self.ollama_url = ollama_url
self.model = "llama3.1:8b"
# Custom prompt for rally sentiment analysis
self.sentiment_prompt = """
You are an expert political sentiment analyzer specializing in Trump rally content.
Analyze the following text and provide:
1. Overall sentiment: POSITIVE, NEGATIVE, or NEUTRAL
2. Crypto relevance: HIGH, MEDIUM, or LOW
3. Rally excitement level: 1-10 scale
4. Key sentiment indicators found
Text to analyze: {text}
Respond in JSON format:
{{
"sentiment": "POSITIVE/NEGATIVE/NEUTRAL",
"crypto_relevance": "HIGH/MEDIUM/LOW",
"excitement_level": 1-10,
"key_indicators": ["indicator1", "indicator2"],
"confidence": 0.0-1.0
}}
"""
def analyze_text(self, text: str) -> Dict:
"""Analyze sentiment of Trump rally related text"""
try:
# Prepare Ollama request
prompt = self.sentiment_prompt.format(text=text[:1000]) # Limit text length
payload = {
"model": self.model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.1, # Low temperature for consistent results
"top_p": 0.9
}
}
response = requests.post(
f"{self.ollama_url}/api/generate",
json=payload,
timeout=30
)
if response.status_code == 200:
result = response.json()
# Extract JSON from Ollama response
response_text = result.get('response', '{}')
try:
# Find JSON in response text
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
json_str = response_text[start_idx:end_idx]
sentiment_data = json.loads(json_str)
sentiment_data['raw_text'] = text
sentiment_data['analysis_timestamp'] = datetime.now().isoformat()
return sentiment_data
except json.JSONDecodeError:
# Fallback parsing if JSON extraction fails
return self._fallback_analysis(text, response_text)
else:
print(f"Ollama API error: {response.status_code}")
return self._fallback_analysis(text, "API Error")
except Exception as e:
print(f"Sentiment analysis error: {e}")
return self._fallback_analysis(text, str(e))
def _fallback_analysis(self, text: str, error_msg: str) -> Dict:
"""Provide basic fallback sentiment analysis"""
positive_words = ['great', 'amazing', 'winning', 'best', 'fantastic', 'huge']
negative_words = ['bad', 'terrible', 'failing', 'worst', 'disaster', 'sad']
crypto_words = ['bitcoin', 'crypto', 'blockchain', 'btc', 'ethereum']
text_lower = text.lower()
positive_count = sum(1 for word in positive_words if word in text_lower)
negative_count = sum(1 for word in negative_words if word in text_lower)
crypto_count = sum(1 for word in crypto_words if word in text_lower)
if positive_count > negative_count:
sentiment = "POSITIVE"
elif negative_count > positive_count:
sentiment = "NEGATIVE"
else:
sentiment = "NEUTRAL"
return {
"sentiment": sentiment,
"crypto_relevance": "HIGH" if crypto_count > 0 else "LOW",
"excitement_level": min(10, max(1, positive_count * 2)),
"key_indicators": ["fallback_analysis"],
"confidence": 0.5,
"error": error_msg
}
def batch_analyze(self, texts: List[str]) -> List[Dict]:
"""Analyze multiple texts in batch"""
results = []
for i, text in enumerate(texts):
print(f"Analyzing text {i+1}/{len(texts)}")
result = self.analyze_text(text)
results.append(result)
return results
# Usage example
analyzer = OllamaSentimentAnalyzer()
# Test with sample rally tweet
sample_tweet = "Trump's rally tonight was AMAZING! He mentioned Bitcoin and crypto adoption - this is HUGE for the markets! 🚀"
sentiment_result = analyzer.analyze_text(sample_tweet)
print(json.dumps(sentiment_result, indent=2))
Building the Rally Detection Algorithm
Sentiment Aggregation and Scoring
Combine individual sentiment scores into rally detection signals:
import numpy as np
from datetime import datetime, timedelta
from typing import List, Tuple
class TrumpRallyDetector:
def __init__(self, sentiment_analyzer):
"""Initialize rally detector with sentiment analyzer"""
self.analyzer = sentiment_analyzer
self.rally_threshold = 7.0 # Minimum score to trigger rally alert
self.time_window = 4 # Hours to analyze for rally detection
def detect_rally_signals(self, social_media_data: pd.DataFrame) -> Dict:
"""Detect Trump rally signals from social media sentiment"""
# Filter recent data within time window
cutoff_time = datetime.now() - timedelta(hours=self.time_window)
recent_data = social_media_data[
pd.to_datetime(social_media_data['created_at']) > cutoff_time
]
if len(recent_data) == 0:
return self._no_signal_response()
# Analyze sentiment for all recent posts
texts = recent_data['text'].tolist()
sentiment_results = self.analyzer.batch_analyze(texts)
# Calculate rally detection metrics
rally_metrics = self._calculate_rally_metrics(sentiment_results, recent_data)
# Determine if rally signal detected
rally_signal = rally_metrics['composite_score'] >= self.rally_threshold
return {
'rally_detected': rally_signal,
'confidence': rally_metrics['confidence'],
'composite_score': rally_metrics['composite_score'],
'metrics': rally_metrics,
'timestamp': datetime.now().isoformat(),
'data_points': len(recent_data)
}
def _calculate_rally_metrics(self, sentiment_results: List[Dict],
social_data: pd.DataFrame) -> Dict:
"""Calculate comprehensive rally detection metrics"""
# Sentiment distribution
sentiments = [r['sentiment'] for r in sentiment_results]
positive_pct = sentiments.count('POSITIVE') / len(sentiments) * 100
negative_pct = sentiments.count('NEGATIVE') / len(sentiments) * 100
# Excitement levels
excitement_scores = [r['excitement_level'] for r in sentiment_results]
avg_excitement = np.mean(excitement_scores)
# Crypto relevance
crypto_relevance = [r['crypto_relevance'] for r in sentiment_results]
high_crypto_pct = crypto_relevance.count('HIGH') / len(crypto_relevance) * 100
# Social engagement metrics
if 'retweets' in social_data.columns:
avg_retweets = social_data['retweets'].mean()
avg_likes = social_data['likes'].mean()
engagement_score = (avg_retweets + avg_likes) / 2
else:
engagement_score = social_data.get('score', pd.Series([0])).mean()
# Volume surge detection
volume_score = min(10, len(sentiment_results) / 10) # More posts = higher volume
# Calculate composite rally score
composite_score = self._calculate_composite_score(
positive_pct, avg_excitement, high_crypto_pct,
engagement_score, volume_score
)
# Confidence based on data quality
confidence = self._calculate_confidence(sentiment_results, social_data)
return {
'positive_sentiment_pct': positive_pct,
'negative_sentiment_pct': negative_pct,
'avg_excitement_level': avg_excitement,
'crypto_relevance_pct': high_crypto_pct,
'engagement_score': engagement_score,
'volume_score': volume_score,
'composite_score': composite_score,
'confidence': confidence
}
def _calculate_composite_score(self, positive_pct: float, excitement: float,
crypto_pct: float, engagement: float,
volume: float) -> float:
"""Calculate weighted composite rally score"""
# Weighted scoring system
weights = {
'sentiment': 0.25, # 25% weight on positive sentiment
'excitement': 0.20, # 20% weight on excitement level
'crypto': 0.20, # 20% weight on crypto relevance
'engagement': 0.20, # 20% weight on social engagement
'volume': 0.15 # 15% weight on post volume
}
# Normalize scores to 0-10 scale
normalized_scores = {
'sentiment': (positive_pct / 100) * 10,
'excitement': excitement,
'crypto': (crypto_pct / 100) * 10,
'engagement': min(10, engagement / 100), # Engagement can vary widely
'volume': volume
}
# Calculate weighted composite score
composite = sum(
normalized_scores[metric] * weights[metric]
for metric in weights.keys()
)
return round(composite, 2)
def _calculate_confidence(self, sentiment_results: List[Dict],
social_data: pd.DataFrame) -> float:
"""Calculate confidence level in rally detection"""
# Factors affecting confidence
data_points = len(sentiment_results)
avg_confidence = np.mean([r.get('confidence', 0.5) for r in sentiment_results])
# More data points = higher confidence (up to a point)
volume_confidence = min(1.0, data_points / 50)
# Recent data is more relevant
if 'created_at' in social_data.columns:
latest_post = pd.to_datetime(social_data['created_at']).max()
hours_ago = (datetime.now() - latest_post).total_seconds() / 3600
recency_confidence = max(0.1, 1.0 - (hours_ago / 24)) # Decay over 24 hours
else:
recency_confidence = 0.5
# Overall confidence
overall_confidence = (avg_confidence + volume_confidence + recency_confidence) / 3
return round(overall_confidence, 3)
def _no_signal_response(self) -> Dict:
"""Return response when no data available for analysis"""
return {
'rally_detected': False,
'confidence': 0.0,
'composite_score': 0.0,
'metrics': {},
'timestamp': datetime.now().isoformat(),
'data_points': 0,
'message': 'Insufficient recent data for rally detection'
}
# Usage example
detector = TrumpRallyDetector(analyzer)
# Combine Twitter and Reddit data
combined_data = pd.concat([tweets_df, reddit_data], ignore_index=True)
# Detect rally signals
rally_result = detector.detect_rally_signals(combined_data)
print(f"Rally Detected: {rally_result['rally_detected']}")
print(f"Confidence: {rally_result['confidence']}")
print(f"Composite Score: {rally_result['composite_score']}")
Cryptocurrency Market Integration
Real-time Crypto Price Monitoring
Connect your rally detector to crypto price feeds:
import ccxt
import time
from typing import Dict, List
class CryptoMarketMonitor:
def __init__(self):
"""Initialize cryptocurrency market monitoring"""
self.exchange = ccxt.binance()
# Major cryptocurrencies likely affected by Trump rallies
self.crypto_symbols = [
'BTC/USDT', 'ETH/USDT', 'ADA/USDT',
'DOGE/USDT', 'XRP/USDT', 'SOL/USDT'
]
self.price_history = {}
def get_current_prices(self) -> Dict[str, float]:
"""Get current cryptocurrency prices"""
prices = {}
try:
tickers = self.exchange.fetch_tickers(self.crypto_symbols)
for symbol in self.crypto_symbols:
if symbol in tickers:
prices[symbol] = tickers[symbol]['last']
except Exception as e:
print(f"Error fetching crypto prices: {e}")
return prices
def monitor_price_changes(self, rally_signal: Dict) -> Dict:
"""Monitor crypto price changes after rally detection"""
if not rally_signal.get('rally_detected', False):
return {'status': 'no_rally', 'price_changes': {}}
# Get baseline prices
baseline_prices = self.get_current_prices()
print(f"Rally detected! Monitoring crypto prices from baseline...")
# Monitor for 30 minutes after rally detection
monitoring_results = []
for minute in range(30):
time.sleep(60) # Wait 1 minute
current_prices = self.get_current_prices()
changes = {}
for symbol in current_prices:
if symbol in baseline_prices:
change_pct = ((current_prices[symbol] - baseline_prices[symbol])
/ baseline_prices[symbol]) * 100
changes[symbol] = {
'price': current_prices[symbol],
'change_pct': round(change_pct, 2)
}
monitoring_results.append({
'minute': minute + 1,
'timestamp': datetime.now().isoformat(),
'changes': changes
})
print(f"Minute {minute + 1}: {changes}")
return {
'status': 'monitored',
'rally_strength': rally_signal['composite_score'],
'baseline_prices': baseline_prices,
'monitoring_results': monitoring_results
}
# Usage example
crypto_monitor = CryptoMarketMonitor()
# Get current prices
current_prices = crypto_monitor.get_current_prices()
print("Current crypto prices:", current_prices)
# Monitor prices after rally detection
if rally_result['rally_detected']:
price_monitoring = crypto_monitor.monitor_price_changes(rally_result)
Creating Real-time Alerts and Notifications
Alert System Setup
Build a notification system for rally detections:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests
class RallyAlertSystem:
def __init__(self, email_config=None, webhook_url=None):
"""Initialize alert system for Trump rally notifications"""
self.email_config = email_config
self.webhook_url = webhook_url
def send_rally_alert(self, rally_data: Dict, crypto_changes: Dict = None):
"""Send alert when Trump rally detected"""
alert_message = self._format_alert_message(rally_data, crypto_changes)
# Send email alert
if self.email_config:
self._send_email_alert(alert_message, rally_data)
# Send webhook notification
if self.webhook_url:
self._send_webhook_alert(rally_data, crypto_changes)
print("Rally alert sent!")
def _format_alert_message(self, rally_data: Dict, crypto_changes: Dict = None) -> str:
"""Format rally detection alert message"""
message = f"""
🚨 TRUMP RALLY DETECTED 🚨
Rally Score: {rally_data['composite_score']}/10
Confidence: {rally_data['confidence']*100:.1f}%
Data Points: {rally_data['data_points']}
Key Metrics:
- Positive Sentiment: {rally_data['metrics'].get('positive_sentiment_pct', 0):.1f}%
- Avg Excitement: {rally_data['metrics'].get('avg_excitement_level', 0):.1f}/10
- Crypto Relevance: {rally_data['metrics'].get('crypto_relevance_pct', 0):.1f}%
Detection Time: {rally_data['timestamp']}
"""
if crypto_changes:
message += "\n📈 CRYPTO PRICE CHANGES:\n"
for symbol, data in crypto_changes.get('baseline_prices', {}).items():
message += f"- {symbol}: ${data:.2f}\n"
return message
def _send_email_alert(self, message: str, rally_data: Dict):
"""Send email notification"""
try:
msg = MIMEMultipart()
msg['From'] = self.email_config['from']
msg['To'] = self.email_config['to']
msg['Subject'] = f"🚨 Trump Rally Detected - Score: {rally_data['composite_score']}"
msg.attach(MIMEText(message, 'plain'))
server = smtplib.SMTP(self.email_config['smtp_server'],
self.email_config['smtp_port'])
server.starttls()
server.login(self.email_config['username'],
self.email_config['password'])
text = msg.as_string()
server.sendmail(self.email_config['from'],
self.email_config['to'], text)
server.quit()
except Exception as e:
print(f"Email alert failed: {e}")
def _send_webhook_alert(self, rally_data: Dict, crypto_changes: Dict = None):
"""Send webhook notification to Discord/Slack/etc"""
try:
payload = {
"embeds": [{
"title": "🚨 Trump Rally Detected",
"color": 16711680, # Red color
"fields": [
{
"name": "Rally Score",
"value": f"{rally_data['composite_score']}/10",
"inline": True
},
{
"name": "Confidence",
"value": f"{rally_data['confidence']*100:.1f}%",
"inline": True
},
{
"name": "Data Points",
"value": str(rally_data['data_points']),
"inline": True
}
],
"timestamp": rally_data['timestamp']
}]
}
response = requests.post(self.webhook_url, json=payload)
response.raise_for_status()
except Exception as e:
print(f"Webhook alert failed: {e}")
# Setup alert system
email_config = {
'from': 'your_email@gmail.com',
'to': 'alerts@your_domain.com',
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'username': 'your_email@gmail.com',
'password': 'your_app_password'
}
alert_system = RallyAlertSystem(
email_config=email_config,
webhook_url="https://discord.com/api/webhooks/your_webhook_url"
)
# Send alert if rally detected
if rally_result['rally_detected']:
alert_system.send_rally_alert(rally_result, current_prices)
Automated Rally Detection Pipeline
Complete Pipeline Implementation
Combine all components into an automated detection system:
import schedule
import time
from datetime import datetime
class AutomatedRallyPipeline:
def __init__(self, twitter_collector, reddit_monitor, analyzer,
detector, crypto_monitor, alert_system):
"""Initialize complete automated rally detection pipeline"""
self.twitter_collector = twitter_collector
self.reddit_monitor = reddit_monitor
self.analyzer = analyzer
self.detector = detector
self.crypto_monitor = crypto_monitor
self.alert_system = alert_system
self.last_rally_time = None
self.rally_cooldown = 2 # Hours between rally alerts
def run_detection_cycle(self):
"""Run complete rally detection cycle"""
print(f"\n--- Rally Detection Cycle: {datetime.now()} ---")
try:
# Step 1: Collect social media data
print("🔍 Collecting social media data...")
tweets_df = self.twitter_collector.collect_rally_tweets(
hours_back=6, max_tweets=200
)
reddit_df = self.reddit_monitor.scrape_rally_discussions(limit=30)
# Combine datasets
combined_data = pd.concat([tweets_df, reddit_df], ignore_index=True)
print(f"📊 Collected {len(combined_data)} social media posts")
# Step 2: Run rally detection
print("🤖 Analyzing sentiment with Ollama...")
rally_result = self.detector.detect_rally_signals(combined_data)
# Step 3: Check if rally detected
if rally_result['rally_detected']:
print(f"🚨 RALLY DETECTED! Score: {rally_result['composite_score']}")
# Check cooldown period
if self._should_send_alert():
# Step 4: Monitor crypto prices
print("💰 Monitoring cryptocurrency prices...")
crypto_changes = self.crypto_monitor.monitor_price_changes(rally_result)
# Step 5: Send alerts
print("📢 Sending rally alerts...")
self.alert_system.send_rally_alert(rally_result, crypto_changes)
self.last_rally_time = datetime.now()
else:
print("⏰ Rally detected but in cooldown period")
else:
print(f"📉 No rally detected. Score: {rally_result['composite_score']}")
# Log results
self._log_detection_results(rally_result, len(combined_data))
except Exception as e:
print(f"❌ Detection cycle error: {e}")
def _should_send_alert(self) -> bool:
"""Check if enough time passed since last alert"""
if self.last_rally_time is None:
return True
time_diff = datetime.now() - self.last_rally_time
hours_passed = time_diff.total_seconds() / 3600
return hours_passed >= self.rally_cooldown
def _log_detection_results(self, rally_result: Dict, data_points: int):
"""Log detection results to file"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'rally_detected': rally_result['rally_detected'],
'composite_score': rally_result['composite_score'],
'confidence': rally_result['confidence'],
'data_points': data_points
}
# Append to log file
with open('rally_detection_log.json', 'a') as f:
f.write(json.dumps(log_entry) + '\n')
def start_monitoring(self):
"""Start automated rally monitoring"""
print("🚀 Starting Trump Rally Detection Pipeline...")
# Schedule detection every 15 minutes
schedule.every(15).minutes.do(self.run_detection_cycle)
# Run initial detection
self.run_detection_cycle()
# Keep running
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
# Initialize complete pipeline
pipeline = AutomatedRallyPipeline(
twitter_collector=collector,
reddit_monitor=reddit_monitor,
analyzer=analyzer,
detector=detector,
crypto_monitor=crypto_monitor,
alert_system=alert_system
)
# Start automated monitoring
# pipeline.start_monitoring() # Uncomment to start continuous monitoring
Deployment and Performance Optimization
Docker Deployment
Create a containerized deployment for your Trump rally detector:
# Dockerfile for Trump Rally Detector
FROM python:3.9-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
wget \
&& rm -rf /var/lib/apt/lists/*
# Install Ollama
RUN curl -fsSL https://ollama.ai/install.sh | sh
# Set working directory
WORKDIR /app
# Copy requirements
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Expose port for monitoring dashboard
EXPOSE 8080
# Start Ollama and run detector
CMD ["python", "main.py"]
Performance Monitoring Dashboard
Create a simple monitoring interface:
from flask import Flask, render_template, jsonify
import json
from datetime import datetime, timedelta
app = Flask(__name__)
class RallyDashboard:
def __init__(self, log_file='rally_detection_log.json'):
self.log_file = log_file
def get_recent_detections(self, hours=24):
"""Get rally detections from last N hours"""
try:
detections = []
cutoff_time = datetime.now() - timedelta(hours=hours)
with open(self.log_file, 'r') as f:
for line in f:
detection = json.loads(line.strip())
detection_time = datetime.fromisoformat(detection['timestamp'])
if detection_time > cutoff_time:
detections.append(detection)
return sorted(detections, key=lambda x: x['timestamp'], reverse=True)
except FileNotFoundError:
return []
def get_rally_stats(self):
"""Get rally detection statistics"""
detections = self.get_recent_detections(hours=168) # Last week
total_detections = len(detections)
rally_count = sum(1 for d in detections if d['rally_detected'])
if total_detections > 0:
avg_score = sum(d['composite_score'] for d in detections) / total_detections
avg_confidence = sum(d['confidence'] for d in detections) / total_detections
else:
avg_score = avg_confidence = 0
return {
'total_detections': total_detections,
'rally_count': rally_count,
'avg_score': round(avg_score, 2),
'avg_confidence': round(avg_confidence, 3)
}
dashboard = RallyDashboard()
@app.route('/')
def index():
"""Rally detection dashboard"""
recent_detections = dashboard.get_recent_detections(hours=24)
stats = dashboard.get_rally_stats()
return render_template('dashboard.html',
detections=recent_detections,
stats=stats)
@app.route('/api/status')
def api_status():
"""API endpoint for current status"""
recent_detections = dashboard.get_recent_detections(hours=1)
latest_detection = recent_detections[0] if recent_detections else None
return jsonify({
'status': 'active',
'latest_detection': latest_detection,
'timestamp': datetime.now().isoformat()
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, debug=True)
Testing and Validation
Backtesting Historical Events
Validate your detector against historical Trump rally data:
class RallyBacktester:
def __init__(self, detector):
self.detector = detector
def backtest_historical_rallies(self, historical_data: List[Dict]) -> Dict:
"""Test detector accuracy against known rally events"""
results = {
'total_events': len(historical_data),
'true_positives': 0,
'false_positives': 0,
'true_negatives': 0,
'false_negatives': 0,
'accuracy': 0,
'precision': 0,
'recall': 0
}
for event in historical_data:
# Convert historical data to DataFrame format
event_data = pd.DataFrame(event['social_media_posts'])
# Run detection
detection_result = self.detector.detect_rally_signals(event_data)
# Compare with known outcome
predicted_rally = detection_result['rally_detected']
actual_rally = event['was_rally']
if predicted_rally and actual_rally:
results['true_positives'] += 1
elif predicted_rally and not actual_rally:
results['false_positives'] += 1
elif not predicted_rally and not actual_rally:
results['true_negatives'] += 1
else:
results['false_negatives'] += 1
# Calculate metrics
tp = results['true_positives']
fp = results['false_positives']
tn = results['true_negatives']
fn = results['false_negatives']
results['accuracy'] = (tp + tn) / (tp + fp + tn + fn)
results['precision'] = tp / (tp + fp) if (tp + fp) > 0 else 0
results['recall'] = tp / (tp + fn) if (tp + fn) > 0 else 0
return results
# Example historical test data
historical_test_data = [
{
'date': '2024-10-15',
'was_rally': True,
'social_media_posts': [
{'text': 'Trump rally tonight! Bitcoin to the moon! 🚀', 'created_at': '2024-10-15 18:00:00'},
{'text': 'MAGA rally was incredible! Crypto policies announced!', 'created_at': '2024-10-15 21:00:00'},
]
}
# Add more historical events...
]
backtester = RallyBacktester(detector)
backtest_results = backtester.backtest_historical_rallies(historical_test_data)
print(f"Detector Accuracy: {backtest_results['accuracy']:.2%}")
Advanced Features and Optimizations
Machine Learning Enhancement
Improve detection accuracy with historical data training:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import joblib
class MLEnhancedDetector:
def __init__(self, base_detector):
self.base_detector = base_detector
self.ml_model = RandomForestClassifier(n_estimators=100, random_state=42)
self.is_trained = False
def prepare_training_data(self, historical_events: List[Dict]) -> Tuple:
"""Prepare training data from historical rally events"""
features = []
labels = []
for event in historical_events:
# Extract features using base detector
event_df = pd.DataFrame(event['social_media_posts'])
detection_result = self.base_detector.detect_rally_signals(event_df)
# Feature vector
feature_vector = [
detection_result['composite_score'],
detection_result['confidence'],
detection_result['data_points'],
detection_result['metrics'].get('positive_sentiment_pct', 0),
detection_result['metrics'].get('avg_excitement_level', 0),
detection_result['metrics'].get('crypto_relevance_pct', 0),
detection_result['metrics'].get('engagement_score', 0),
detection_result['metrics'].get('volume_score', 0)
]
features.append(feature_vector)
labels.append(1 if event['was_rally'] else 0)
return np.array(features), np.array(labels)
def train_model(self, historical_events: List[Dict]):
"""Train ML model on historical data"""
X, y = self.prepare_training_data(historical_events)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Train model
self.ml_model.fit(X_train, y_train)
# Evaluate
y_pred = self.ml_model.predict(X_test)
print("ML Model Performance:")
print(classification_report(y_test, y_pred))
self.is_trained = True
# Save model
joblib.dump(self.ml_model, 'rally_detector_model.pkl')
def enhanced_detection(self, social_media_data: pd.DataFrame) -> Dict:
"""Enhanced rally detection using ML model"""
# Get base detection result
base_result = self.base_detector.detect_rally_signals(social_media_data)
if not self.is_trained:
return base_result
# Prepare features for ML model
feature_vector = [[
base_result['composite_score'],
base_result['confidence'],
base_result['data_points'],
base_result['metrics'].get('positive_sentiment_pct', 0),
base_result['metrics'].get('avg_excitement_level', 0),
base_result['metrics'].get('crypto_relevance_pct', 0),
base_result['metrics'].get('engagement_score', 0),
base_result['metrics'].get('volume_score', 0)
]]
# ML prediction
ml_prediction = self.ml_model.predict(feature_vector)[0]
ml_probability = self.ml_model.predict_proba(feature_vector)[0][1]
# Enhanced result
enhanced_result = base_result.copy()
enhanced_result['ml_prediction'] = bool(ml_prediction)
enhanced_result['ml_probability'] = float(ml_probability)
enhanced_result['enhanced_rally_detected'] = ml_prediction == 1
return enhanced_result
# Usage with enhanced detector
enhanced_detector = MLEnhancedDetector(detector)
# enhanced_detector.train_model(historical_test_data) # Train with historical data
enhanced_result = enhanced_detector.enhanced_detection(combined_data)
Conclusion
You've built a comprehensive Trump rally detector that monitors social media sentiment and predicts cryptocurrency market movements. This system combines Ollama's AI-powered sentiment analysis with real-time social media monitoring to identify rally signals.
Key benefits of your Trump rally detector:
- Real-time monitoring of Twitter, Reddit, and other platforms
- Advanced sentiment analysis using Ollama's local AI models
- Cryptocurrency correlation tracking for trading opportunities
- Automated alerts via email and webhooks
- Machine learning enhancement for improved accuracy
Your detector can identify rally sentiment patterns hours before mainstream media coverage, giving you a significant trading advantage. The system's modular design allows easy expansion to monitor other political events or cryptocurrency influencers.
Remember to comply with API rate limits and terms of service for all social media platforms. Consider implementing additional data sources like Telegram channels and YouTube comments for even broader coverage.