How to Build Trump Rally Detector: Ollama Social Media Sentiment for Crypto

Build a Trump rally detector using Ollama sentiment analysis to track social media buzz and predict crypto market movements. Step-by-step guide included.

Remember when a single Trump tweet could send Bitcoin soaring? Those days aren't over. Political rallies still move crypto markets faster than you can say "HODL." Today, we'll build a Trump rally detector that monitors social media sentiment and predicts crypto price movements.

This Trump rally detector uses Ollama's local AI models to analyze Twitter, Reddit, and Telegram posts. You'll learn to identify rally sentiment patterns and correlate them with cryptocurrency price changes.

Why Trump Rally Sentiment Affects Crypto Markets

Political events create market volatility. Trump rallies generate massive social media buzz that directly impacts cryptocurrency prices. Traders who spot sentiment shifts early gain significant advantages.

The Crypto-Politics Connection

Trump's pro-crypto stance during rallies often triggers buying frenzies. Key sentiment indicators include:

  • Rally attendance predictions
  • Policy announcement speculation
  • Media coverage volume
  • Social media engagement rates
  • Cryptocurrency mentions during speeches

Setting Up Your Trump Rally Detector Environment

Prerequisites

Install these tools before building your detector:

# Install Ollama
curl -fsSL https://ollama.ai/install.sh | sh

# Install Python dependencies
pip install tweepy pandas numpy matplotlib requests beautifulsoup4 schedule

# Install Node.js packages for web scraping
npm install puppeteer axios cheerio

Ollama Model Selection

Download models optimized for sentiment analysis:

# Download Llama 3.1 for sentiment analysis
ollama pull llama3.1:8b

# Download specialized sentiment model
ollama pull llama3.1:instruct

# Verify installations
ollama list

Building the Social Media Data Collection System

Twitter API Integration

Create a Twitter data collector that monitors Trump-related keywords:

import tweepy
import pandas as pd
import json
from datetime import datetime, timedelta

class TrumpRallyTwitterCollector:
    def __init__(self, api_key, api_secret, access_token, access_secret):
        """Initialize Twitter API connection for rally monitoring"""
        self.auth = tweepy.OAuthHandler(api_key, api_secret)
        self.auth.set_access_token(access_token, access_secret)
        self.api = tweepy.API(self.auth, wait_on_rate_limit=True)
        
        # Keywords that indicate Trump rally activity
        self.rally_keywords = [
            "Trump rally", "MAGA rally", "Trump speech", 
            "Trump crypto", "Bitcoin Trump", "Trump cryptocurrency"
        ]
    
    def collect_rally_tweets(self, hours_back=24, max_tweets=1000):
        """Collect tweets about Trump rallies from specified time period"""
        tweets_data = []
        since_date = (datetime.now() - timedelta(hours=hours_back)).strftime('%Y-%m-%d')
        
        for keyword in self.rally_keywords:
            try:
                tweets = tweepy.Cursor(
                    self.api.search_tweets,
                    q=keyword,
                    since=since_date,
                    lang="en",
                    result_type="mixed"
                ).items(max_tweets // len(self.rally_keywords))
                
                for tweet in tweets:
                    tweet_data = {
                        'id': tweet.id,
                        'text': tweet.text,
                        'created_at': tweet.created_at,
                        'user': tweet.user.screen_name,
                        'followers': tweet.user.followers_count,
                        'retweets': tweet.retweet_count,
                        'likes': tweet.favorite_count,
                        'keyword': keyword
                    }
                    tweets_data.append(tweet_data)
                    
            except Exception as e:
                print(f"Error collecting tweets for {keyword}: {e}")
                continue
        
        return pd.DataFrame(tweets_data)

# Usage example
collector = TrumpRallyTwitterCollector(
    api_key="your_api_key",
    api_secret="your_api_secret", 
    access_token="your_access_token",
    access_secret="your_access_secret"
)

tweets_df = collector.collect_rally_tweets(hours_back=48, max_tweets=500)
print(f"Collected {len(tweets_df)} tweets about Trump rallies")

Reddit Rally Discussion Scraper

Monitor Reddit communities for rally sentiment:

import praw
import requests
from datetime import datetime

class RedditRallyMonitor:
    def __init__(self, client_id, client_secret, user_agent):
        """Initialize Reddit API for rally discussion monitoring"""
        self.reddit = praw.Reddit(
            client_id=client_id,
            client_secret=client_secret,
            user_agent=user_agent
        )
        
        # Subreddits likely to discuss Trump rallies and crypto
        self.target_subreddits = [
            'Bitcoin', 'CryptoCurrency', 'wallstreetbets', 
            'Conservative', 'The_Donald', 'politics'
        ]
    
    def scrape_rally_discussions(self, limit=100):
        """Scrape Reddit posts and comments about Trump rallies"""
        rally_posts = []
        
        for subreddit_name in self.target_subreddits:
            try:
                subreddit = self.reddit.subreddit(subreddit_name)
                
                # Search for rally-related posts
                for post in subreddit.search("Trump rally OR MAGA rally", limit=limit):
                    post_data = {
                        'subreddit': subreddit_name,
                        'title': post.title,
                        'selftext': post.selftext,
                        'score': post.score,
                        'upvote_ratio': post.upvote_ratio,
                        'num_comments': post.num_comments,
                        'created_utc': datetime.fromtimestamp(post.created_utc),
                        'url': post.url
                    }
                    rally_posts.append(post_data)
                    
            except Exception as e:
                print(f"Error scraping {subreddit_name}: {e}")
                continue
        
        return pd.DataFrame(rally_posts)

# Initialize Reddit scraper
reddit_monitor = RedditRallyMonitor(
    client_id="your_client_id",
    client_secret="your_client_secret",
    user_agent="TrumpRallyDetector/1.0"
)

reddit_data = reddit_monitor.scrape_rally_discussions(limit=50)
print(f"Found {len(reddit_data)} rally discussions on Reddit")

Implementing Ollama Sentiment Analysis

Setting Up the Sentiment Analyzer

Create an Ollama-powered sentiment analyzer specifically tuned for political rally content:

import requests
import json
from typing import List, Dict

class OllamaSentimentAnalyzer:
    def __init__(self, ollama_url="http://localhost:11434"):
        """Initialize Ollama sentiment analyzer for Trump rally content"""
        self.ollama_url = ollama_url
        self.model = "llama3.1:8b"
        
        # Custom prompt for rally sentiment analysis
        self.sentiment_prompt = """
        You are an expert political sentiment analyzer specializing in Trump rally content.
        
        Analyze the following text and provide:
        1. Overall sentiment: POSITIVE, NEGATIVE, or NEUTRAL
        2. Crypto relevance: HIGH, MEDIUM, or LOW  
        3. Rally excitement level: 1-10 scale
        4. Key sentiment indicators found
        
        Text to analyze: {text}
        
        Respond in JSON format:
        {{
            "sentiment": "POSITIVE/NEGATIVE/NEUTRAL",
            "crypto_relevance": "HIGH/MEDIUM/LOW",
            "excitement_level": 1-10,
            "key_indicators": ["indicator1", "indicator2"],
            "confidence": 0.0-1.0
        }}
        """
    
    def analyze_text(self, text: str) -> Dict:
        """Analyze sentiment of Trump rally related text"""
        try:
            # Prepare Ollama request
            prompt = self.sentiment_prompt.format(text=text[:1000])  # Limit text length
            
            payload = {
                "model": self.model,
                "prompt": prompt,
                "stream": False,
                "options": {
                    "temperature": 0.1,  # Low temperature for consistent results
                    "top_p": 0.9
                }
            }
            
            response = requests.post(
                f"{self.ollama_url}/api/generate",
                json=payload,
                timeout=30
            )
            
            if response.status_code == 200:
                result = response.json()
                
                # Extract JSON from Ollama response
                response_text = result.get('response', '{}')
                try:
                    # Find JSON in response text
                    start_idx = response_text.find('{')
                    end_idx = response_text.rfind('}') + 1
                    json_str = response_text[start_idx:end_idx]
                    
                    sentiment_data = json.loads(json_str)
                    sentiment_data['raw_text'] = text
                    sentiment_data['analysis_timestamp'] = datetime.now().isoformat()
                    
                    return sentiment_data
                    
                except json.JSONDecodeError:
                    # Fallback parsing if JSON extraction fails
                    return self._fallback_analysis(text, response_text)
            
            else:
                print(f"Ollama API error: {response.status_code}")
                return self._fallback_analysis(text, "API Error")
                
        except Exception as e:
            print(f"Sentiment analysis error: {e}")
            return self._fallback_analysis(text, str(e))
    
    def _fallback_analysis(self, text: str, error_msg: str) -> Dict:
        """Provide basic fallback sentiment analysis"""
        positive_words = ['great', 'amazing', 'winning', 'best', 'fantastic', 'huge']
        negative_words = ['bad', 'terrible', 'failing', 'worst', 'disaster', 'sad']
        crypto_words = ['bitcoin', 'crypto', 'blockchain', 'btc', 'ethereum']
        
        text_lower = text.lower()
        
        positive_count = sum(1 for word in positive_words if word in text_lower)
        negative_count = sum(1 for word in negative_words if word in text_lower)
        crypto_count = sum(1 for word in crypto_words if word in text_lower)
        
        if positive_count > negative_count:
            sentiment = "POSITIVE"
        elif negative_count > positive_count:
            sentiment = "NEGATIVE"
        else:
            sentiment = "NEUTRAL"
        
        return {
            "sentiment": sentiment,
            "crypto_relevance": "HIGH" if crypto_count > 0 else "LOW",
            "excitement_level": min(10, max(1, positive_count * 2)),
            "key_indicators": ["fallback_analysis"],
            "confidence": 0.5,
            "error": error_msg
        }
    
    def batch_analyze(self, texts: List[str]) -> List[Dict]:
        """Analyze multiple texts in batch"""
        results = []
        for i, text in enumerate(texts):
            print(f"Analyzing text {i+1}/{len(texts)}")
            result = self.analyze_text(text)
            results.append(result)
        return results

# Usage example
analyzer = OllamaSentimentAnalyzer()

# Test with sample rally tweet
sample_tweet = "Trump's rally tonight was AMAZING! He mentioned Bitcoin and crypto adoption - this is HUGE for the markets! 🚀"
sentiment_result = analyzer.analyze_text(sample_tweet)
print(json.dumps(sentiment_result, indent=2))

Building the Rally Detection Algorithm

Sentiment Aggregation and Scoring

Combine individual sentiment scores into rally detection signals:

import numpy as np
from datetime import datetime, timedelta
from typing import List, Tuple

class TrumpRallyDetector:
    def __init__(self, sentiment_analyzer):
        """Initialize rally detector with sentiment analyzer"""
        self.analyzer = sentiment_analyzer
        self.rally_threshold = 7.0  # Minimum score to trigger rally alert
        self.time_window = 4  # Hours to analyze for rally detection
        
    def detect_rally_signals(self, social_media_data: pd.DataFrame) -> Dict:
        """Detect Trump rally signals from social media sentiment"""
        
        # Filter recent data within time window
        cutoff_time = datetime.now() - timedelta(hours=self.time_window)
        recent_data = social_media_data[
            pd.to_datetime(social_media_data['created_at']) > cutoff_time
        ]
        
        if len(recent_data) == 0:
            return self._no_signal_response()
        
        # Analyze sentiment for all recent posts
        texts = recent_data['text'].tolist()
        sentiment_results = self.analyzer.batch_analyze(texts)
        
        # Calculate rally detection metrics
        rally_metrics = self._calculate_rally_metrics(sentiment_results, recent_data)
        
        # Determine if rally signal detected
        rally_signal = rally_metrics['composite_score'] >= self.rally_threshold
        
        return {
            'rally_detected': rally_signal,
            'confidence': rally_metrics['confidence'],
            'composite_score': rally_metrics['composite_score'],
            'metrics': rally_metrics,
            'timestamp': datetime.now().isoformat(),
            'data_points': len(recent_data)
        }
    
    def _calculate_rally_metrics(self, sentiment_results: List[Dict], 
                                social_data: pd.DataFrame) -> Dict:
        """Calculate comprehensive rally detection metrics"""
        
        # Sentiment distribution
        sentiments = [r['sentiment'] for r in sentiment_results]
        positive_pct = sentiments.count('POSITIVE') / len(sentiments) * 100
        negative_pct = sentiments.count('NEGATIVE') / len(sentiments) * 100
        
        # Excitement levels
        excitement_scores = [r['excitement_level'] for r in sentiment_results]
        avg_excitement = np.mean(excitement_scores)
        
        # Crypto relevance
        crypto_relevance = [r['crypto_relevance'] for r in sentiment_results]
        high_crypto_pct = crypto_relevance.count('HIGH') / len(crypto_relevance) * 100
        
        # Social engagement metrics
        if 'retweets' in social_data.columns:
            avg_retweets = social_data['retweets'].mean()
            avg_likes = social_data['likes'].mean()
            engagement_score = (avg_retweets + avg_likes) / 2
        else:
            engagement_score = social_data.get('score', pd.Series([0])).mean()
        
        # Volume surge detection
        volume_score = min(10, len(sentiment_results) / 10)  # More posts = higher volume
        
        # Calculate composite rally score
        composite_score = self._calculate_composite_score(
            positive_pct, avg_excitement, high_crypto_pct, 
            engagement_score, volume_score
        )
        
        # Confidence based on data quality
        confidence = self._calculate_confidence(sentiment_results, social_data)
        
        return {
            'positive_sentiment_pct': positive_pct,
            'negative_sentiment_pct': negative_pct,
            'avg_excitement_level': avg_excitement,
            'crypto_relevance_pct': high_crypto_pct,
            'engagement_score': engagement_score,
            'volume_score': volume_score,
            'composite_score': composite_score,
            'confidence': confidence
        }
    
    def _calculate_composite_score(self, positive_pct: float, excitement: float,
                                 crypto_pct: float, engagement: float, 
                                 volume: float) -> float:
        """Calculate weighted composite rally score"""
        
        # Weighted scoring system
        weights = {
            'sentiment': 0.25,    # 25% weight on positive sentiment
            'excitement': 0.20,   # 20% weight on excitement level
            'crypto': 0.20,       # 20% weight on crypto relevance
            'engagement': 0.20,   # 20% weight on social engagement
            'volume': 0.15        # 15% weight on post volume
        }
        
        # Normalize scores to 0-10 scale
        normalized_scores = {
            'sentiment': (positive_pct / 100) * 10,
            'excitement': excitement,
            'crypto': (crypto_pct / 100) * 10,
            'engagement': min(10, engagement / 100),  # Engagement can vary widely
            'volume': volume
        }
        
        # Calculate weighted composite score
        composite = sum(
            normalized_scores[metric] * weights[metric] 
            for metric in weights.keys()
        )
        
        return round(composite, 2)
    
    def _calculate_confidence(self, sentiment_results: List[Dict], 
                            social_data: pd.DataFrame) -> float:
        """Calculate confidence level in rally detection"""
        
        # Factors affecting confidence
        data_points = len(sentiment_results)
        avg_confidence = np.mean([r.get('confidence', 0.5) for r in sentiment_results])
        
        # More data points = higher confidence (up to a point)
        volume_confidence = min(1.0, data_points / 50)
        
        # Recent data is more relevant
        if 'created_at' in social_data.columns:
            latest_post = pd.to_datetime(social_data['created_at']).max()
            hours_ago = (datetime.now() - latest_post).total_seconds() / 3600
            recency_confidence = max(0.1, 1.0 - (hours_ago / 24))  # Decay over 24 hours
        else:
            recency_confidence = 0.5
        
        # Overall confidence
        overall_confidence = (avg_confidence + volume_confidence + recency_confidence) / 3
        
        return round(overall_confidence, 3)
    
    def _no_signal_response(self) -> Dict:
        """Return response when no data available for analysis"""
        return {
            'rally_detected': False,
            'confidence': 0.0,
            'composite_score': 0.0,
            'metrics': {},
            'timestamp': datetime.now().isoformat(),
            'data_points': 0,
            'message': 'Insufficient recent data for rally detection'
        }

# Usage example
detector = TrumpRallyDetector(analyzer)

# Combine Twitter and Reddit data
combined_data = pd.concat([tweets_df, reddit_data], ignore_index=True)

# Detect rally signals
rally_result = detector.detect_rally_signals(combined_data)
print(f"Rally Detected: {rally_result['rally_detected']}")
print(f"Confidence: {rally_result['confidence']}")
print(f"Composite Score: {rally_result['composite_score']}")

Cryptocurrency Market Integration

Real-time Crypto Price Monitoring

Connect your rally detector to crypto price feeds:

import ccxt
import time
from typing import Dict, List

class CryptoMarketMonitor:
    def __init__(self):
        """Initialize cryptocurrency market monitoring"""
        self.exchange = ccxt.binance()
        
        # Major cryptocurrencies likely affected by Trump rallies
        self.crypto_symbols = [
            'BTC/USDT', 'ETH/USDT', 'ADA/USDT', 
            'DOGE/USDT', 'XRP/USDT', 'SOL/USDT'
        ]
        
        self.price_history = {}
    
    def get_current_prices(self) -> Dict[str, float]:
        """Get current cryptocurrency prices"""
        prices = {}
        
        try:
            tickers = self.exchange.fetch_tickers(self.crypto_symbols)
            
            for symbol in self.crypto_symbols:
                if symbol in tickers:
                    prices[symbol] = tickers[symbol]['last']
                    
        except Exception as e:
            print(f"Error fetching crypto prices: {e}")
            
        return prices
    
    def monitor_price_changes(self, rally_signal: Dict) -> Dict:
        """Monitor crypto price changes after rally detection"""
        
        if not rally_signal.get('rally_detected', False):
            return {'status': 'no_rally', 'price_changes': {}}
        
        # Get baseline prices
        baseline_prices = self.get_current_prices()
        print(f"Rally detected! Monitoring crypto prices from baseline...")
        
        # Monitor for 30 minutes after rally detection
        monitoring_results = []
        
        for minute in range(30):
            time.sleep(60)  # Wait 1 minute
            
            current_prices = self.get_current_prices()
            changes = {}
            
            for symbol in current_prices:
                if symbol in baseline_prices:
                    change_pct = ((current_prices[symbol] - baseline_prices[symbol]) 
                                / baseline_prices[symbol]) * 100
                    changes[symbol] = {
                        'price': current_prices[symbol],
                        'change_pct': round(change_pct, 2)
                    }
            
            monitoring_results.append({
                'minute': minute + 1,
                'timestamp': datetime.now().isoformat(),
                'changes': changes
            })
            
            print(f"Minute {minute + 1}: {changes}")
        
        return {
            'status': 'monitored',
            'rally_strength': rally_signal['composite_score'],
            'baseline_prices': baseline_prices,
            'monitoring_results': monitoring_results
        }

# Usage example
crypto_monitor = CryptoMarketMonitor()

# Get current prices
current_prices = crypto_monitor.get_current_prices()
print("Current crypto prices:", current_prices)

# Monitor prices after rally detection
if rally_result['rally_detected']:
    price_monitoring = crypto_monitor.monitor_price_changes(rally_result)

Creating Real-time Alerts and Notifications

Alert System Setup

Build a notification system for rally detections:

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests

class RallyAlertSystem:
    def __init__(self, email_config=None, webhook_url=None):
        """Initialize alert system for Trump rally notifications"""
        self.email_config = email_config
        self.webhook_url = webhook_url
        
    def send_rally_alert(self, rally_data: Dict, crypto_changes: Dict = None):
        """Send alert when Trump rally detected"""
        
        alert_message = self._format_alert_message(rally_data, crypto_changes)
        
        # Send email alert
        if self.email_config:
            self._send_email_alert(alert_message, rally_data)
        
        # Send webhook notification
        if self.webhook_url:
            self._send_webhook_alert(rally_data, crypto_changes)
        
        print("Rally alert sent!")
    
    def _format_alert_message(self, rally_data: Dict, crypto_changes: Dict = None) -> str:
        """Format rally detection alert message"""
        
        message = f"""
🚨 TRUMP RALLY DETECTED 🚨

Rally Score: {rally_data['composite_score']}/10
Confidence: {rally_data['confidence']*100:.1f}%
Data Points: {rally_data['data_points']}

Key Metrics:
- Positive Sentiment: {rally_data['metrics'].get('positive_sentiment_pct', 0):.1f}%
- Avg Excitement: {rally_data['metrics'].get('avg_excitement_level', 0):.1f}/10
- Crypto Relevance: {rally_data['metrics'].get('crypto_relevance_pct', 0):.1f}%

Detection Time: {rally_data['timestamp']}
"""
        
        if crypto_changes:
            message += "\n📈 CRYPTO PRICE CHANGES:\n"
            for symbol, data in crypto_changes.get('baseline_prices', {}).items():
                message += f"- {symbol}: ${data:.2f}\n"
        
        return message
    
    def _send_email_alert(self, message: str, rally_data: Dict):
        """Send email notification"""
        try:
            msg = MIMEMultipart()
            msg['From'] = self.email_config['from']
            msg['To'] = self.email_config['to']
            msg['Subject'] = f"🚨 Trump Rally Detected - Score: {rally_data['composite_score']}"
            
            msg.attach(MIMEText(message, 'plain'))
            
            server = smtplib.SMTP(self.email_config['smtp_server'], 
                                self.email_config['smtp_port'])
            server.starttls()
            server.login(self.email_config['username'], 
                        self.email_config['password'])
            
            text = msg.as_string()
            server.sendmail(self.email_config['from'], 
                          self.email_config['to'], text)
            server.quit()
            
        except Exception as e:
            print(f"Email alert failed: {e}")
    
    def _send_webhook_alert(self, rally_data: Dict, crypto_changes: Dict = None):
        """Send webhook notification to Discord/Slack/etc"""
        try:
            payload = {
                "embeds": [{
                    "title": "🚨 Trump Rally Detected",
                    "color": 16711680,  # Red color
                    "fields": [
                        {
                            "name": "Rally Score",
                            "value": f"{rally_data['composite_score']}/10",
                            "inline": True
                        },
                        {
                            "name": "Confidence",
                            "value": f"{rally_data['confidence']*100:.1f}%",
                            "inline": True
                        },
                        {
                            "name": "Data Points",
                            "value": str(rally_data['data_points']),
                            "inline": True
                        }
                    ],
                    "timestamp": rally_data['timestamp']
                }]
            }
            
            response = requests.post(self.webhook_url, json=payload)
            response.raise_for_status()
            
        except Exception as e:
            print(f"Webhook alert failed: {e}")

# Setup alert system
email_config = {
    'from': 'your_email@gmail.com',
    'to': 'alerts@your_domain.com',
    'smtp_server': 'smtp.gmail.com',
    'smtp_port': 587,
    'username': 'your_email@gmail.com',
    'password': 'your_app_password'
}

alert_system = RallyAlertSystem(
    email_config=email_config,
    webhook_url="https://discord.com/api/webhooks/your_webhook_url"
)

# Send alert if rally detected
if rally_result['rally_detected']:
    alert_system.send_rally_alert(rally_result, current_prices)

Automated Rally Detection Pipeline

Complete Pipeline Implementation

Combine all components into an automated detection system:

import schedule
import time
from datetime import datetime

class AutomatedRallyPipeline:
    def __init__(self, twitter_collector, reddit_monitor, analyzer, 
                 detector, crypto_monitor, alert_system):
        """Initialize complete automated rally detection pipeline"""
        self.twitter_collector = twitter_collector
        self.reddit_monitor = reddit_monitor
        self.analyzer = analyzer
        self.detector = detector
        self.crypto_monitor = crypto_monitor
        self.alert_system = alert_system
        
        self.last_rally_time = None
        self.rally_cooldown = 2  # Hours between rally alerts
    
    def run_detection_cycle(self):
        """Run complete rally detection cycle"""
        print(f"\n--- Rally Detection Cycle: {datetime.now()} ---")
        
        try:
            # Step 1: Collect social media data
            print("🔍 Collecting social media data...")
            tweets_df = self.twitter_collector.collect_rally_tweets(
                hours_back=6, max_tweets=200
            )
            reddit_df = self.reddit_monitor.scrape_rally_discussions(limit=30)
            
            # Combine datasets
            combined_data = pd.concat([tweets_df, reddit_df], ignore_index=True)
            print(f"📊 Collected {len(combined_data)} social media posts")
            
            # Step 2: Run rally detection
            print("🤖 Analyzing sentiment with Ollama...")
            rally_result = self.detector.detect_rally_signals(combined_data)
            
            # Step 3: Check if rally detected
            if rally_result['rally_detected']:
                print(f"🚨 RALLY DETECTED! Score: {rally_result['composite_score']}")
                
                # Check cooldown period
                if self._should_send_alert():
                    # Step 4: Monitor crypto prices
                    print("💰 Monitoring cryptocurrency prices...")
                    crypto_changes = self.crypto_monitor.monitor_price_changes(rally_result)
                    
                    # Step 5: Send alerts
                    print("📢 Sending rally alerts...")
                    self.alert_system.send_rally_alert(rally_result, crypto_changes)
                    
                    self.last_rally_time = datetime.now()
                else:
                    print("⏰ Rally detected but in cooldown period")
            else:
                print(f"📉 No rally detected. Score: {rally_result['composite_score']}")
            
            # Log results
            self._log_detection_results(rally_result, len(combined_data))
            
        except Exception as e:
            print(f"❌ Detection cycle error: {e}")
    
    def _should_send_alert(self) -> bool:
        """Check if enough time passed since last alert"""
        if self.last_rally_time is None:
            return True
        
        time_diff = datetime.now() - self.last_rally_time
        hours_passed = time_diff.total_seconds() / 3600
        
        return hours_passed >= self.rally_cooldown
    
    def _log_detection_results(self, rally_result: Dict, data_points: int):
        """Log detection results to file"""
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'rally_detected': rally_result['rally_detected'],
            'composite_score': rally_result['composite_score'],
            'confidence': rally_result['confidence'],
            'data_points': data_points
        }
        
        # Append to log file
        with open('rally_detection_log.json', 'a') as f:
            f.write(json.dumps(log_entry) + '\n')
    
    def start_monitoring(self):
        """Start automated rally monitoring"""
        print("🚀 Starting Trump Rally Detection Pipeline...")
        
        # Schedule detection every 15 minutes
        schedule.every(15).minutes.do(self.run_detection_cycle)
        
        # Run initial detection
        self.run_detection_cycle()
        
        # Keep running
        while True:
            schedule.run_pending()
            time.sleep(60)  # Check every minute

# Initialize complete pipeline
pipeline = AutomatedRallyPipeline(
    twitter_collector=collector,
    reddit_monitor=reddit_monitor,
    analyzer=analyzer,
    detector=detector,
    crypto_monitor=crypto_monitor,
    alert_system=alert_system
)

# Start automated monitoring
# pipeline.start_monitoring()  # Uncomment to start continuous monitoring

Deployment and Performance Optimization

Docker Deployment

Create a containerized deployment for your Trump rally detector:

# Dockerfile for Trump Rally Detector
FROM python:3.9-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \
    curl \
    wget \
    && rm -rf /var/lib/apt/lists/*

# Install Ollama
RUN curl -fsSL https://ollama.ai/install.sh | sh

# Set working directory
WORKDIR /app

# Copy requirements
COPY requirements.txt .

# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Expose port for monitoring dashboard
EXPOSE 8080

# Start Ollama and run detector
CMD ["python", "main.py"]

Performance Monitoring Dashboard

Create a simple monitoring interface:

from flask import Flask, render_template, jsonify
import json
from datetime import datetime, timedelta

app = Flask(__name__)

class RallyDashboard:
    def __init__(self, log_file='rally_detection_log.json'):
        self.log_file = log_file
    
    def get_recent_detections(self, hours=24):
        """Get rally detections from last N hours"""
        try:
            detections = []
            cutoff_time = datetime.now() - timedelta(hours=hours)
            
            with open(self.log_file, 'r') as f:
                for line in f:
                    detection = json.loads(line.strip())
                    detection_time = datetime.fromisoformat(detection['timestamp'])
                    
                    if detection_time > cutoff_time:
                        detections.append(detection)
            
            return sorted(detections, key=lambda x: x['timestamp'], reverse=True)
            
        except FileNotFoundError:
            return []
    
    def get_rally_stats(self):
        """Get rally detection statistics"""
        detections = self.get_recent_detections(hours=168)  # Last week
        
        total_detections = len(detections)
        rally_count = sum(1 for d in detections if d['rally_detected'])
        
        if total_detections > 0:
            avg_score = sum(d['composite_score'] for d in detections) / total_detections
            avg_confidence = sum(d['confidence'] for d in detections) / total_detections
        else:
            avg_score = avg_confidence = 0
        
        return {
            'total_detections': total_detections,
            'rally_count': rally_count,
            'avg_score': round(avg_score, 2),
            'avg_confidence': round(avg_confidence, 3)
        }

dashboard = RallyDashboard()

@app.route('/')
def index():
    """Rally detection dashboard"""
    recent_detections = dashboard.get_recent_detections(hours=24)
    stats = dashboard.get_rally_stats()
    
    return render_template('dashboard.html', 
                         detections=recent_detections, 
                         stats=stats)

@app.route('/api/status')
def api_status():
    """API endpoint for current status"""
    recent_detections = dashboard.get_recent_detections(hours=1)
    latest_detection = recent_detections[0] if recent_detections else None
    
    return jsonify({
        'status': 'active',
        'latest_detection': latest_detection,
        'timestamp': datetime.now().isoformat()
    })

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080, debug=True)

Testing and Validation

Backtesting Historical Events

Validate your detector against historical Trump rally data:

class RallyBacktester:
    def __init__(self, detector):
        self.detector = detector
        
    def backtest_historical_rallies(self, historical_data: List[Dict]) -> Dict:
        """Test detector accuracy against known rally events"""
        
        results = {
            'total_events': len(historical_data),
            'true_positives': 0,
            'false_positives': 0,
            'true_negatives': 0,
            'false_negatives': 0,
            'accuracy': 0,
            'precision': 0,
            'recall': 0
        }
        
        for event in historical_data:
            # Convert historical data to DataFrame format
            event_data = pd.DataFrame(event['social_media_posts'])
            
            # Run detection
            detection_result = self.detector.detect_rally_signals(event_data)
            
            # Compare with known outcome
            predicted_rally = detection_result['rally_detected']
            actual_rally = event['was_rally']
            
            if predicted_rally and actual_rally:
                results['true_positives'] += 1
            elif predicted_rally and not actual_rally:
                results['false_positives'] += 1
            elif not predicted_rally and not actual_rally:
                results['true_negatives'] += 1
            else:
                results['false_negatives'] += 1
        
        # Calculate metrics
        tp = results['true_positives']
        fp = results['false_positives']
        tn = results['true_negatives']
        fn = results['false_negatives']
        
        results['accuracy'] = (tp + tn) / (tp + fp + tn + fn)
        results['precision'] = tp / (tp + fp) if (tp + fp) > 0 else 0
        results['recall'] = tp / (tp + fn) if (tp + fn) > 0 else 0
        
        return results

# Example historical test data
historical_test_data = [
    {
        'date': '2024-10-15',
        'was_rally': True,
        'social_media_posts': [
            {'text': 'Trump rally tonight! Bitcoin to the moon! 🚀', 'created_at': '2024-10-15 18:00:00'},
            {'text': 'MAGA rally was incredible! Crypto policies announced!', 'created_at': '2024-10-15 21:00:00'},
        ]
    }
    # Add more historical events...
]

backtester = RallyBacktester(detector)
backtest_results = backtester.backtest_historical_rallies(historical_test_data)
print(f"Detector Accuracy: {backtest_results['accuracy']:.2%}")

Advanced Features and Optimizations

Machine Learning Enhancement

Improve detection accuracy with historical data training:

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import joblib

class MLEnhancedDetector:
    def __init__(self, base_detector):
        self.base_detector = base_detector
        self.ml_model = RandomForestClassifier(n_estimators=100, random_state=42)
        self.is_trained = False
        
    def prepare_training_data(self, historical_events: List[Dict]) -> Tuple:
        """Prepare training data from historical rally events"""
        features = []
        labels = []
        
        for event in historical_events:
            # Extract features using base detector
            event_df = pd.DataFrame(event['social_media_posts'])
            detection_result = self.base_detector.detect_rally_signals(event_df)
            
            # Feature vector
            feature_vector = [
                detection_result['composite_score'],
                detection_result['confidence'],
                detection_result['data_points'],
                detection_result['metrics'].get('positive_sentiment_pct', 0),
                detection_result['metrics'].get('avg_excitement_level', 0),
                detection_result['metrics'].get('crypto_relevance_pct', 0),
                detection_result['metrics'].get('engagement_score', 0),
                detection_result['metrics'].get('volume_score', 0)
            ]
            
            features.append(feature_vector)
            labels.append(1 if event['was_rally'] else 0)
        
        return np.array(features), np.array(labels)
    
    def train_model(self, historical_events: List[Dict]):
        """Train ML model on historical data"""
        X, y = self.prepare_training_data(historical_events)
        
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        
        # Train model
        self.ml_model.fit(X_train, y_train)
        
        # Evaluate
        y_pred = self.ml_model.predict(X_test)
        print("ML Model Performance:")
        print(classification_report(y_test, y_pred))
        
        self.is_trained = True
        
        # Save model
        joblib.dump(self.ml_model, 'rally_detector_model.pkl')
    
    def enhanced_detection(self, social_media_data: pd.DataFrame) -> Dict:
        """Enhanced rally detection using ML model"""
        
        # Get base detection result
        base_result = self.base_detector.detect_rally_signals(social_media_data)
        
        if not self.is_trained:
            return base_result
        
        # Prepare features for ML model
        feature_vector = [[
            base_result['composite_score'],
            base_result['confidence'],
            base_result['data_points'],
            base_result['metrics'].get('positive_sentiment_pct', 0),
            base_result['metrics'].get('avg_excitement_level', 0),
            base_result['metrics'].get('crypto_relevance_pct', 0),
            base_result['metrics'].get('engagement_score', 0),
            base_result['metrics'].get('volume_score', 0)
        ]]
        
        # ML prediction
        ml_prediction = self.ml_model.predict(feature_vector)[0]
        ml_probability = self.ml_model.predict_proba(feature_vector)[0][1]
        
        # Enhanced result
        enhanced_result = base_result.copy()
        enhanced_result['ml_prediction'] = bool(ml_prediction)
        enhanced_result['ml_probability'] = float(ml_probability)
        enhanced_result['enhanced_rally_detected'] = ml_prediction == 1
        
        return enhanced_result

# Usage with enhanced detector
enhanced_detector = MLEnhancedDetector(detector)
# enhanced_detector.train_model(historical_test_data)  # Train with historical data
enhanced_result = enhanced_detector.enhanced_detection(combined_data)

Conclusion

You've built a comprehensive Trump rally detector that monitors social media sentiment and predicts cryptocurrency market movements. This system combines Ollama's AI-powered sentiment analysis with real-time social media monitoring to identify rally signals.

Key benefits of your Trump rally detector:

  • Real-time monitoring of Twitter, Reddit, and other platforms
  • Advanced sentiment analysis using Ollama's local AI models
  • Cryptocurrency correlation tracking for trading opportunities
  • Automated alerts via email and webhooks
  • Machine learning enhancement for improved accuracy

Your detector can identify rally sentiment patterns hours before mainstream media coverage, giving you a significant trading advantage. The system's modular design allows easy expansion to monitor other political events or cryptocurrency influencers.

Remember to comply with API rate limits and terms of service for all social media platforms. Consider implementing additional data sources like Telegram channels and YouTube comments for even broader coverage.

Trump Rally Detector Dashboard ScreenshotMobile Rally Detection Notification