Ever wondered why Bitcoin pumps when Elon tweets about Dogecoin? Welcome to the wild world of crypto sentiment analysis, where emotions drive markets faster than you can say "diamond hands." Smart traders track social media buzz to predict price movements before they happen.
This guide shows you how to build a powerful sentiment analysis crypto trading system using Ollama to process Twitter and Reddit data locally. You'll extract market emotions, generate trading signals, and potentially profit from social media sentiment patterns.
What You'll Learn
- Set up Ollama for local sentiment analysis processing
- Extract sentiment data from Twitter and Reddit APIs
- Build automated trading signals from social media emotions
- Optimize performance for real-time market analysis
- Deploy a complete sentiment-driven trading system
Understanding Crypto Sentiment Analysis
Cryptocurrency sentiment analysis examines social media posts, news articles, and forum discussions to gauge market emotions. This data reveals whether traders feel bullish, bearish, or neutral about specific cryptocurrencies.
Why Sentiment Matters for Crypto Trading
Social media drives crypto markets more than traditional assets. A single tweet can move Bitcoin 10% in minutes. Reddit communities like r/CryptoCurrency influence altcoin prices through collective sentiment shifts.
Key sentiment indicators include:
- Tweet volume and engagement rates
- Reddit upvote ratios and comment sentiment
- Influencer opinion changes
- Fear and greed index fluctuations
- News sentiment correlation with price movements
Why Choose Ollama for Sentiment Analysis
Ollama sentiment analysis offers several advantages over cloud-based solutions for crypto trading applications.
Local Processing Benefits
- Privacy: Keep sensitive trading data on your servers
- Speed: Eliminate API latency for real-time analysis
- Cost: Avoid expensive cloud processing fees
- Reliability: No dependency on external service uptime
- Customization: Fine-tune models for crypto-specific language
Supported Models for Crypto Analysis
Ollama runs various language models optimized for sentiment detection:
# Install recommended models for crypto sentiment
ollama pull llama2:7b-chat
ollama pull mistral:7b-instruct
ollama pull codellama:7b-instruct
Setting Up Ollama for Crypto Sentiment Analysis
Installation and Configuration
First, install Ollama on your system and configure it for sentiment analysis tasks.
# Install Ollama (Linux/macOS)
curl -fsSL https://ollama.ai/install.sh | sh
# Start Ollama service
ollama serve
# Pull the sentiment analysis model
ollama pull llama2:7b-chat
Python Environment Setup
Create a dedicated environment for your crypto sentiment analysis project:
# requirements.txt
import subprocess
import sys
def install_requirements():
"""Install required packages for crypto sentiment analysis"""
packages = [
'ollama',
'tweepy',
'praw', # Reddit API wrapper
'pandas',
'numpy',
'requests',
'python-dotenv',
'schedule',
'ccxt' # Crypto exchange API
]
for package in packages:
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
if __name__ == "__main__":
install_requirements()
Basic Ollama Integration
Connect Python to your local Ollama instance:
import ollama
import json
from typing import Dict, List
class CryptoSentimentAnalyzer:
def __init__(self, model_name: str = "llama2:7b-chat"):
"""Initialize Ollama sentiment analyzer for crypto trading"""
self.model_name = model_name
self.client = ollama.Client()
def analyze_sentiment(self, text: str, crypto_symbol: str) -> Dict:
"""
Analyze sentiment of crypto-related text
Returns sentiment score, confidence, and key emotions
"""
prompt = f"""
Analyze the sentiment of this cryptocurrency post about {crypto_symbol}:
Text: "{text}"
Provide analysis in JSON format:
{{
"sentiment": "positive/negative/neutral",
"confidence": 0.0-1.0,
"emotions": ["fear", "greed", "excitement", "uncertainty"],
"price_impact": "bullish/bearish/neutral",
"urgency": "low/medium/high"
}}
"""
response = self.client.chat(
model=self.model_name,
messages=[{'role': 'user', 'content': prompt}]
)
try:
# Extract JSON from response
result = json.loads(response['message']['content'])
return result
except json.JSONDecodeError:
# Fallback parsing if JSON extraction fails
return self._parse_fallback_sentiment(response['message']['content'])
def _parse_fallback_sentiment(self, text: str) -> Dict:
"""Parse sentiment when JSON extraction fails"""
sentiment = "neutral"
if "positive" in text.lower() or "bullish" in text.lower():
sentiment = "positive"
elif "negative" in text.lower() or "bearish" in text.lower():
sentiment = "negative"
return {
"sentiment": sentiment,
"confidence": 0.7,
"emotions": ["uncertainty"],
"price_impact": sentiment if sentiment != "neutral" else "neutral",
"urgency": "medium"
}
# Example usage
analyzer = CryptoSentimentAnalyzer()
result = analyzer.analyze_sentiment(
"Bitcoin is going to the moon! 🚀 Just bought more BTC",
"BTC"
)
print(f"Sentiment Analysis: {result}")
Processing Twitter Data for Crypto Sentiment
Twitter crypto sentiment analysis requires API access and real-time processing capabilities.
Twitter API Setup
Configure Twitter API credentials for crypto data collection:
import tweepy
import os
from dotenv import load_dotenv
from datetime import datetime, timedelta
load_dotenv()
class TwitterCryptoCollector:
def __init__(self):
"""Initialize Twitter API client for crypto data collection"""
# Twitter API v2 credentials
self.bearer_token = os.getenv('TWITTER_BEARER_TOKEN')
self.client = tweepy.Client(bearer_token=self.bearer_token)
def collect_crypto_tweets(self, crypto_symbols: List[str], count: int = 100) -> List[Dict]:
"""
Collect recent tweets mentioning specific cryptocurrencies
Returns list of tweet data with metadata
"""
all_tweets = []
for symbol in crypto_symbols:
# Build search query for crypto mentions
query = f"({symbol} OR #{symbol} OR ${symbol}) -is:retweet lang:en"
try:
tweets = tweepy.Paginator(
self.client.search_recent_tweets,
query=query,
tweet_fields=['created_at', 'author_id', 'public_metrics', 'context_annotations'],
max_results=min(count, 100) # API limit per request
).flatten(limit=count)
for tweet in tweets:
tweet_data = {
'id': tweet.id,
'text': tweet.text,
'created_at': tweet.created_at,
'author_id': tweet.author_id,
'retweet_count': tweet.public_metrics['retweet_count'],
'like_count': tweet.public_metrics['like_count'],
'reply_count': tweet.public_metrics['reply_count'],
'crypto_symbol': symbol,
'source': 'twitter'
}
all_tweets.append(tweet_data)
except Exception as e:
print(f"Error collecting tweets for {symbol}: {e}")
continue
return all_tweets
def get_trending_crypto_topics(self) -> List[str]:
"""Get trending cryptocurrency hashtags and mentions"""
# Common crypto symbols to monitor
trending_symbols = [
'BTC', 'ETH', 'ADA', 'SOL', 'DOT', 'MATIC', 'LINK', 'UNI',
'AVAX', 'ATOM', 'FTM', 'NEAR', 'ALGO', 'XRP', 'DOGE'
]
return trending_symbols
# Example usage
collector = TwitterCryptoCollector()
crypto_tweets = collector.collect_crypto_tweets(['BTC', 'ETH'], count=50)
print(f"Collected {len(crypto_tweets)} crypto tweets")
Real-time Twitter Sentiment Processing
Process Twitter data streams for immediate sentiment analysis:
import threading
import time
from queue import Queue
class RealTimeCryptoSentiment:
def __init__(self, sentiment_analyzer: CryptoSentimentAnalyzer):
"""Initialize real-time crypto sentiment processor"""
self.analyzer = sentiment_analyzer
self.twitter_collector = TwitterCryptoCollector()
self.sentiment_queue = Queue()
self.is_running = False
def start_monitoring(self, crypto_symbols: List[str], interval: int = 60):
"""
Start real-time crypto sentiment monitoring
Collects and analyzes tweets every 'interval' seconds
"""
self.is_running = True
def monitor_loop():
while self.is_running:
try:
# Collect recent tweets
tweets = self.twitter_collector.collect_crypto_tweets(
crypto_symbols, count=20
)
# Analyze sentiment for each tweet
for tweet in tweets:
sentiment_result = self.analyzer.analyze_sentiment(
tweet['text'],
tweet['crypto_symbol']
)
# Add metadata to sentiment result
sentiment_result.update({
'tweet_id': tweet['id'],
'timestamp': tweet['created_at'],
'engagement': tweet['like_count'] + tweet['retweet_count'],
'crypto_symbol': tweet['crypto_symbol'],
'source': 'twitter'
})
# Queue for processing
self.sentiment_queue.put(sentiment_result)
print(f"Processed {len(tweets)} tweets at {datetime.now()}")
time.sleep(interval)
except Exception as e:
print(f"Error in monitoring loop: {e}")
time.sleep(10) # Wait before retrying
# Start monitoring in separate thread
monitor_thread = threading.Thread(target=monitor_loop)
monitor_thread.daemon = True
monitor_thread.start()
def get_latest_sentiment(self, count: int = 10) -> List[Dict]:
"""Get latest sentiment analysis results"""
results = []
while not self.sentiment_queue.empty() and len(results) < count:
results.append(self.sentiment_queue.get())
return results
def stop_monitoring(self):
"""Stop real-time sentiment monitoring"""
self.is_running = False
# Example usage
sentiment_analyzer = CryptoSentimentAnalyzer()
real_time_monitor = RealTimeCryptoSentiment(sentiment_analyzer)
# Start monitoring Bitcoin and Ethereum
real_time_monitor.start_monitoring(['BTC', 'ETH'], interval=30)
# Get latest sentiment data
latest_sentiment = real_time_monitor.get_latest_sentiment(5)
for sentiment in latest_sentiment:
print(f"Crypto: {sentiment['crypto_symbol']}, Sentiment: {sentiment['sentiment']}")
Processing Reddit Data for Crypto Analysis
Reddit cryptocurrency analysis provides deeper community insights than Twitter's quick reactions.
Reddit API Setup
Configure Reddit API access for crypto community data:
import praw
from datetime import datetime
class RedditCryptoCollector:
def __init__(self):
"""Initialize Reddit API client for crypto data collection"""
self.reddit = praw.Reddit(
client_id=os.getenv('REDDIT_CLIENT_ID'),
client_secret=os.getenv('REDDIT_CLIENT_SECRET'),
user_agent=os.getenv('REDDIT_USER_AGENT')
)
# Popular crypto subreddits
self.crypto_subreddits = [
'CryptoCurrency', 'Bitcoin', 'ethereum', 'altcoin',
'CryptoMarkets', 'btc', 'ethtrader', 'SatoshiStreetBets'
]
def collect_reddit_posts(self, subreddit_name: str, limit: int = 50) -> List[Dict]:
"""
Collect recent posts from crypto subreddit
Returns list of post data with metadata
"""
posts = []
try:
subreddit = self.reddit.subreddit(subreddit_name)
# Get hot posts from subreddit
for post in subreddit.hot(limit=limit):
post_data = {
'id': post.id,
'title': post.title,
'text': post.selftext,
'score': post.score,
'upvote_ratio': post.upvote_ratio,
'num_comments': post.num_comments,
'created_utc': datetime.fromtimestamp(post.created_utc),
'subreddit': subreddit_name,
'author': str(post.author),
'url': post.url,
'source': 'reddit'
}
posts.append(post_data)
except Exception as e:
print(f"Error collecting Reddit posts from {subreddit_name}: {e}")
return posts
def collect_reddit_comments(self, post_id: str, limit: int = 20) -> List[Dict]:
"""Collect comments from specific Reddit post"""
comments = []
try:
submission = self.reddit.submission(id=post_id)
submission.comments.replace_more(limit=0) # Remove MoreComments objects
for comment in submission.comments.list()[:limit]:
if hasattr(comment, 'body') and comment.body != '[deleted]':
comment_data = {
'id': comment.id,
'body': comment.body,
'score': comment.score,
'created_utc': datetime.fromtimestamp(comment.created_utc),
'parent_id': post_id,
'author': str(comment.author),
'source': 'reddit_comment'
}
comments.append(comment_data)
except Exception as e:
print(f"Error collecting comments for post {post_id}: {e}")
return comments
# Example usage
reddit_collector = RedditCryptoCollector()
crypto_posts = reddit_collector.collect_reddit_posts('CryptoCurrency', limit=25)
print(f"Collected {len(crypto_posts)} Reddit posts")
Advanced Reddit Sentiment Analysis
Process Reddit data for deeper community sentiment insights:
class RedditSentimentProcessor:
def __init__(self, sentiment_analyzer: CryptoSentimentAnalyzer):
"""Initialize Reddit sentiment processor with Ollama analyzer"""
self.analyzer = sentiment_analyzer
self.reddit_collector = RedditCryptoCollector()
def analyze_subreddit_sentiment(self, subreddit_name: str, crypto_symbols: List[str]) -> Dict:
"""
Analyze overall sentiment for crypto symbols in specific subreddit
Returns aggregated sentiment data
"""
posts = self.reddit_collector.collect_reddit_posts(subreddit_name, limit=100)
symbol_sentiments = {symbol: [] for symbol in crypto_symbols}
for post in posts:
# Combine title and text for analysis
full_text = f"{post['title']} {post['text']}"
# Check which crypto symbols are mentioned
for symbol in crypto_symbols:
if symbol.lower() in full_text.lower():
sentiment_result = self.analyzer.analyze_sentiment(full_text, symbol)
# Add Reddit-specific metadata
sentiment_result.update({
'post_id': post['id'],
'score': post['score'],
'upvote_ratio': post['upvote_ratio'],
'num_comments': post['num_comments'],
'subreddit': subreddit_name,
'timestamp': post['created_utc']
})
symbol_sentiments[symbol].append(sentiment_result)
# Calculate aggregated sentiment for each symbol
aggregated_results = {}
for symbol, sentiments in symbol_sentiments.items():
if sentiments:
aggregated_results[symbol] = self._calculate_aggregated_sentiment(sentiments)
else:
aggregated_results[symbol] = None
return aggregated_results
def _calculate_aggregated_sentiment(self, sentiments: List[Dict]) -> Dict:
"""Calculate aggregated sentiment metrics from multiple posts"""
if not sentiments:
return None
# Count sentiment types
positive_count = sum(1 for s in sentiments if s['sentiment'] == 'positive')
negative_count = sum(1 for s in sentiments if s['sentiment'] == 'negative')
neutral_count = sum(1 for s in sentiments if s['sentiment'] == 'neutral')
total_count = len(sentiments)
# Calculate weighted sentiment based on post scores
weighted_score = 0
total_weight = 0
for sentiment in sentiments:
weight = max(1, sentiment.get('score', 1)) # Use post score as weight
sentiment_value = 1 if sentiment['sentiment'] == 'positive' else -1 if sentiment['sentiment'] == 'negative' else 0
weighted_score += sentiment_value * weight
total_weight += weight
overall_sentiment = "neutral"
if weighted_score > 0:
overall_sentiment = "positive"
elif weighted_score < 0:
overall_sentiment = "negative"
return {
'overall_sentiment': overall_sentiment,
'sentiment_score': weighted_score / total_weight if total_weight > 0 else 0,
'positive_ratio': positive_count / total_count,
'negative_ratio': negative_count / total_count,
'neutral_ratio': neutral_count / total_count,
'total_posts': total_count,
'confidence': sum(s.get('confidence', 0.5) for s in sentiments) / total_count
}
# Example usage
reddit_processor = RedditSentimentProcessor(sentiment_analyzer)
subreddit_sentiment = reddit_processor.analyze_subreddit_sentiment(
'CryptoCurrency',
['BTC', 'ETH', 'ADA']
)
for symbol, sentiment_data in subreddit_sentiment.items():
if sentiment_data:
print(f"{symbol}: {sentiment_data['overall_sentiment']} "
f"(Score: {sentiment_data['sentiment_score']:.2f})")
Building Trading Signals from Social Media Sentiment
Convert sentiment analysis results into actionable crypto trading signals for automated strategies.
Sentiment-Based Signal Generation
Create trading signals based on sentiment analysis patterns:
import pandas as pd
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class SignalType(Enum):
BUY = "buy"
SELL = "sell"
HOLD = "hold"
@dataclass
class TradingSignal:
symbol: str
signal_type: SignalType
confidence: float
timestamp: datetime
reason: str
price_target: Optional[float] = None
stop_loss: Optional[float] = None
class CryptoSentimentTrader:
def __init__(self, sentiment_analyzer: CryptoSentimentAnalyzer):
"""Initialize crypto sentiment trading system"""
self.analyzer = sentiment_analyzer
self.sentiment_history = []
self.signal_history = []
# Trading parameters
self.min_confidence = 0.7
self.sentiment_threshold_buy = 0.6
self.sentiment_threshold_sell = -0.4
self.min_data_points = 10
def add_sentiment_data(self, sentiment_data: List[Dict]):
"""Add new sentiment data to historical record"""
for data in sentiment_data:
data['timestamp'] = data.get('timestamp', datetime.now())
self.sentiment_history.append(data)
# Keep only recent data (last 24 hours)
cutoff_time = datetime.now() - timedelta(hours=24)
self.sentiment_history = [
s for s in self.sentiment_history
if s['timestamp'] > cutoff_time
]
def calculate_sentiment_momentum(self, symbol: str, hours: int = 4) -> Dict:
"""
Calculate sentiment momentum for specific crypto symbol
Returns trend direction and strength
"""
# Filter sentiment data for symbol and timeframe
cutoff_time = datetime.now() - timedelta(hours=hours)
symbol_data = [
s for s in self.sentiment_history
if s.get('crypto_symbol') == symbol and s['timestamp'] > cutoff_time
]
if len(symbol_data) < self.min_data_points:
return {'momentum': 'insufficient_data', 'strength': 0.0}
# Convert sentiment to numerical scores
sentiment_scores = []
for data in symbol_data:
score = 1.0 if data['sentiment'] == 'positive' else -1.0 if data['sentiment'] == 'negative' else 0.0
# Weight by confidence and engagement
confidence = data.get('confidence', 0.5)
engagement = data.get('engagement', 1)
weight = confidence * (1 + engagement / 100) # Scale engagement
sentiment_scores.append(score * weight)
# Calculate moving average and trend
df = pd.DataFrame({
'timestamp': [s['timestamp'] for s in symbol_data],
'sentiment_score': sentiment_scores
})
df = df.sort_values('timestamp')
# Calculate short and long term averages
short_avg = df['sentiment_score'].tail(5).mean()
long_avg = df['sentiment_score'].mean()
momentum_strength = abs(short_avg - long_avg)
if short_avg > long_avg + 0.1:
momentum = 'bullish'
elif short_avg < long_avg - 0.1:
momentum = 'bearish'
else:
momentum = 'neutral'
return {
'momentum': momentum,
'strength': momentum_strength,
'short_avg': short_avg,
'long_avg': long_avg,
'data_points': len(symbol_data)
}
def generate_trading_signals(self, symbols: List[str]) -> List[TradingSignal]:
"""
Generate trading signals based on sentiment analysis
Returns list of trading signals for each symbol
"""
signals = []
for symbol in symbols:
momentum = self.calculate_sentiment_momentum(symbol)
if momentum['momentum'] == 'insufficient_data':
continue
# Generate signal based on momentum and strength
signal_type = SignalType.HOLD
confidence = momentum['strength']
reason = f"Sentiment momentum: {momentum['momentum']} (strength: {momentum['strength']:.2f})"
if momentum['momentum'] == 'bullish' and momentum['strength'] > 0.3:
if momentum['short_avg'] > self.sentiment_threshold_buy:
signal_type = SignalType.BUY
confidence = min(0.95, confidence + momentum['short_avg'])
elif momentum['momentum'] == 'bearish' and momentum['strength'] > 0.3:
if momentum['short_avg'] < self.sentiment_threshold_sell:
signal_type = SignalType.SELL
confidence = min(0.95, confidence + abs(momentum['short_avg']))
# Only create signal if confidence meets minimum threshold
if confidence >= self.min_confidence:
signal = TradingSignal(
symbol=symbol,
signal_type=signal_type,
confidence=confidence,
timestamp=datetime.now(),
reason=reason
)
signals.append(signal)
self.signal_history.append(signal)
return signals
# Example usage
sentiment_trader = CryptoSentimentTrader(sentiment_analyzer)
# Add sentiment data from Twitter and Reddit
combined_sentiment_data = latest_sentiment + [
# Add Reddit sentiment data here
]
sentiment_trader.add_sentiment_data(combined_sentiment_data)
# Generate trading signals
trading_signals = sentiment_trader.generate_trading_signals(['BTC', 'ETH', 'ADA'])
for signal in trading_signals:
print(f"Signal: {signal.signal_type.value} {signal.symbol} "
f"(Confidence: {signal.confidence:.2f}) - {signal.reason}")
Portfolio Integration
Integrate sentiment signals with existing trading portfolios:
import ccxt
from typing import Dict, List
class SentimentPortfolioManager:
def __init__(self, exchange_id: str, api_credentials: Dict):
"""Initialize portfolio manager with exchange integration"""
exchange_class = getattr(ccxt, exchange_id)
self.exchange = exchange_class(api_credentials)
self.portfolio_allocation = {}
self.max_position_size = 0.1 # 10% max per position
def execute_sentiment_signals(self, signals: List[TradingSignal],
portfolio_balance: float) -> List[Dict]:
"""
Execute trading signals based on portfolio rules
Returns list of executed trades
"""
executed_trades = []
for signal in signals:
try:
# Get current market price
ticker = self.exchange.fetch_ticker(f"{signal.symbol}/USDT")
current_price = ticker['last']
# Calculate position size based on confidence and portfolio rules
position_value = (portfolio_balance * self.max_position_size *
signal.confidence)
if signal.signal_type == SignalType.BUY:
# Execute buy order
quantity = position_value / current_price
order = self.exchange.create_market_buy_order(
f"{signal.symbol}/USDT",
quantity
)
executed_trades.append({
'signal': signal,
'order': order,
'action': 'buy',
'quantity': quantity,
'price': current_price
})
elif signal.signal_type == SignalType.SELL:
# Execute sell order (if we have position)
current_position = self.get_position_size(signal.symbol)
if current_position > 0:
sell_quantity = current_position * signal.confidence
order = self.exchange.create_market_sell_order(
f"{signal.symbol}/USDT",
sell_quantity
)
executed_trades.append({
'signal': signal,
'order': order,
'action': 'sell',
'quantity': sell_quantity,
'price': current_price
})
except Exception as e:
print(f"Error executing signal for {signal.symbol}: {e}")
continue
return executed_trades
def get_position_size(self, symbol: str) -> float:
"""Get current position size for symbol"""
try:
balance = self.exchange.fetch_balance()
return balance.get(symbol, {}).get('total', 0.0)
except:
return 0.0
# Example integration (demo mode - do not use real API keys)
# portfolio_manager = SentimentPortfolioManager('binance', {
# 'apiKey': 'your_api_key',
# 'secret': 'your_secret',
# 'sandbox': True # Use sandbox for testing
# })
#
# executed_trades = portfolio_manager.execute_sentiment_signals(
# trading_signals,
# portfolio_balance=10000.0
# )
Performance Optimization and Monitoring
Optimize your ollama sentiment analysis setup for high-frequency crypto trading.
Batch Processing Optimization
Improve processing speed for large volumes of social media data:
import asyncio
import concurrent.futures
from typing import List, Dict
class OptimizedSentimentProcessor:
def __init__(self, sentiment_analyzer: CryptoSentimentAnalyzer,
max_workers: int = 4):
"""Initialize optimized processor with parallel processing"""
self.analyzer = sentiment_analyzer
self.max_workers = max_workers
async def process_batch_async(self, texts: List[Dict]) -> List[Dict]:
"""
Process multiple texts in parallel using async/await
Returns list of sentiment results
"""
tasks = []
for text_data in texts:
task = asyncio.create_task(
self._analyze_single_async(text_data)
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions and return valid results
valid_results = [r for r in results if not isinstance(r, Exception)]
return valid_results
async def _analyze_single_async(self, text_data: Dict) -> Dict:
"""Analyze single text asynchronously"""
loop = asyncio.get_event_loop()
# Run sentiment analysis in thread pool
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
result = await loop.run_in_executor(
executor,
self.analyzer.analyze_sentiment,
text_data['text'],
text_data.get('crypto_symbol', 'BTC')
)
# Add original metadata
result.update(text_data)
return result
def process_batch_parallel(self, texts: List[Dict]) -> List[Dict]:
"""
Process multiple texts in parallel using ThreadPoolExecutor
Synchronous alternative to async processing
"""
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
future_to_text = {
executor.submit(
self.analyzer.analyze_sentiment,
text_data['text'],
text_data.get('crypto_symbol', 'BTC')
): text_data
for text_data in texts
}
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_text):
text_data = future_to_text[future]
try:
result = future.result(timeout=30) # 30 second timeout
result.update(text_data)
results.append(result)
except Exception as e:
print(f"Error processing text: {e}")
continue
return results
# Example usage
optimized_processor = OptimizedSentimentProcessor(sentiment_analyzer, max_workers=8)
# Prepare batch data
batch_texts = [
{'text': tweet['text'], 'crypto_symbol': tweet['crypto_symbol'], 'id': tweet['id']}
for tweet in crypto_tweets[:50] # Process 50 tweets in parallel
]
# Process in parallel
start_time = time.time()
batch_results = optimized_processor.process_batch_parallel(batch_texts)
processing_time = time.time() - start_time
print(f"Processed {len(batch_results)} texts in {processing_time:.2f} seconds")
print(f"Average processing time: {processing_time/len(batch_results):.3f} seconds per text")
Real-time Performance Monitoring
Monitor sentiment analysis performance for trading optimization:
import logging
from collections import deque
from dataclasses import dataclass
import statistics
@dataclass
class PerformanceMetrics:
avg_processing_time: float
throughput: float # texts per second
accuracy_score: float
memory_usage: float
error_rate: float
class SentimentPerformanceMonitor:
def __init__(self, window_size: int = 100):
"""Initialize performance monitoring system"""
self.window_size = window_size
self.processing_times = deque(maxlen=window_size)
self.accuracy_scores = deque(maxlen=window_size)
self.error_count = 0
self.total_requests = 0
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def record_processing_time(self, processing_time: float):
"""Record processing time for performance analysis"""
self.processing_times.append(processing_time)
self.total_requests += 1
def record_accuracy(self, accuracy_score: float):
"""Record accuracy score for model performance tracking"""
self.accuracy_scores.append(accuracy_score)
def record_error(self):
"""Record processing error"""
self.error_count += 1
self.total_requests += 1
def get_performance_metrics(self) -> PerformanceMetrics:
"""Calculate current performance metrics"""
if not self.processing_times:
return PerformanceMetrics(0, 0, 0, 0, 0)
avg_processing_time = statistics.mean(self.processing_times)
throughput = 1.0 / avg_processing_time if avg_processing_time > 0 else 0
accuracy_score = (statistics.mean(self.accuracy_scores)
if self.accuracy_scores else 0.0)
error_rate = (self.error_count / self.total_requests
if self.total_requests > 0 else 0.0)
# Get memory usage (simplified)
import psutil
process = psutil.Process()
memory_usage = process.memory_info().rss / 1024 / 1024 # MB
return PerformanceMetrics(
avg_processing_time=avg_processing_time,
throughput=throughput,
accuracy_score=accuracy_score,
memory_usage=memory_usage,
error_rate=error_rate
)
def log_performance_report(self):
"""Log detailed performance report"""
metrics = self.get_performance_metrics()
self.logger.info(f"Performance Report:")
self.logger.info(f" Average Processing Time: {metrics.avg_processing_time:.3f}s")
self.logger.info(f" Throughput: {metrics.throughput:.1f} texts/second")
self.logger.info(f" Accuracy Score: {metrics.accuracy_score:.3f}")
self.logger.info(f" Memory Usage: {metrics.memory_usage:.1f} MB")
self.logger.info(f" Error Rate: {metrics.error_rate:.2%}")
self.logger.info(f" Total Requests: {self.total_requests}")
# Integration with sentiment analyzer
class MonitoredSentimentAnalyzer(CryptoSentimentAnalyzer):
def __init__(self, model_name: str = "llama2:7b-chat"):
super().__init__(model_name)
self.monitor = SentimentPerformanceMonitor()
def analyze_sentiment(self, text: str, crypto_symbol: str) -> Dict:
"""Analyze sentiment with performance monitoring"""
start_time = time.time()
try:
result = super().analyze_sentiment(text, crypto_symbol)
processing_time = time.time() - start_time
# Record performance metrics
self.monitor.record_processing_time(processing_time)
self.monitor.record_accuracy(result.get('confidence', 0.5))
return result
except Exception as e:
self.monitor.record_error()
raise e
def get_performance_report(self) -> PerformanceMetrics:
"""Get current performance metrics"""
return self.monitor.get_performance_metrics()
# Example usage
monitored_analyzer = MonitoredSentimentAnalyzer()
# Process some data with monitoring
for i in range(20):
result = monitored_analyzer.analyze_sentiment(
f"Bitcoin is looking bullish today! #{i}",
"BTC"
)
# Check performance
performance = monitored_analyzer.get_performance_report()
print(f"Throughput: {performance.throughput:.1f} texts/second")
print(f"Average accuracy: {performance.accuracy_score:.3f}")
Production Deployment Best Practices
Deploy your sentiment analysis system for reliable crypto trading operations.
Docker Configuration
Create a containerized deployment for consistent performance:
# Dockerfile for crypto sentiment analysis system
FROM python:3.9-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
git \
&& rm -rf /var/lib/apt/lists/*
# Install Ollama
RUN curl -fsSL https://ollama.ai/install.sh | sh
# Set working directory
WORKDIR /app
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create volume for Ollama models
VOLUME ["/root/.ollama"]
# Expose ports
EXPOSE 8000 11434
# Start script
COPY start.sh .
RUN chmod +x start.sh
CMD ["./start.sh"]
#!/bin/bash
# start.sh - Production startup script
# Start Ollama service
ollama serve &
# Wait for Ollama to be ready
sleep 10
# Pull required models
ollama pull llama2:7b-chat
ollama pull mistral:7b-instruct
# Start the sentiment analysis application
python main.py
Environment Configuration
Set up production environment variables:
# .env.production
# Twitter API Configuration
TWITTER_BEARER_TOKEN=your_twitter_bearer_token
# Reddit API Configuration
REDDIT_CLIENT_ID=your_reddit_client_id
REDDIT_CLIENT_SECRET=your_reddit_client_secret
REDDIT_USER_AGENT=CryptoSentimentBot/1.0
# Exchange API Configuration (Demo/Sandbox)
EXCHANGE_API_KEY=your_exchange_api_key
EXCHANGE_SECRET=your_exchange_secret
EXCHANGE_SANDBOX=true
# Ollama Configuration
OLLAMA_HOST=localhost
OLLAMA_PORT=11434
# Application Settings
LOG_LEVEL=INFO
MAX_WORKERS=8
PROCESSING_INTERVAL=30
MIN_CONFIDENCE_THRESHOLD=0.75
Production Monitoring
Implement comprehensive monitoring for production deployment:
import json
import time
from datetime import datetime
from typing import Dict, List
class ProductionMonitor:
def __init__(self, log_file: str = "sentiment_analysis.log"):
"""Initialize production monitoring system"""
self.log_file = log_file
self.metrics = {
'total_processed': 0,
'successful_trades': 0,
'failed_trades': 0,
'avg_processing_time': 0.0,
'last_update': datetime.now().isoformat()
}
def log_trade_execution(self, signal: TradingSignal, success: bool,
execution_details: Dict):
"""Log trade execution for audit trail"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'type': 'trade_execution',
'symbol': signal.symbol,
'signal_type': signal.signal_type.value,
'confidence': signal.confidence,
'success': success,
'execution_details': execution_details
}
self._write_log(log_entry)
# Update metrics
if success:
self.metrics['successful_trades'] += 1
else:
self.metrics['failed_trades'] += 1
def log_sentiment_batch(self, batch_size: int, processing_time: float,
accuracy: float):
"""Log batch processing metrics"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'type': 'batch_processing',
'batch_size': batch_size,
'processing_time': processing_time,
'accuracy': accuracy,
'throughput': batch_size / processing_time
}
self._write_log(log_entry)
# Update metrics
self.metrics['total_processed'] += batch_size
self.metrics['avg_processing_time'] = (
(self.metrics['avg_processing_time'] + processing_time) / 2
)
self.metrics['last_update'] = datetime.now().isoformat()
def _write_log(self, log_entry: Dict):
"""Write log entry to file"""
with open(self.log_file, 'a') as f:
f.write(json.dumps(log_entry) + '\n')
def get_health_status(self) -> Dict:
"""Get system health status for monitoring dashboards"""
success_rate = (
self.metrics['successful_trades'] /
(self.metrics['successful_trades'] + self.metrics['failed_trades'])
if (self.metrics['successful_trades'] + self.metrics['failed_trades']) > 0
else 1.0
)
return {
'status': 'healthy' if success_rate > 0.8 else 'warning',
'success_rate': success_rate,
'total_processed': self.metrics['total_processed'],
'avg_processing_time': self.metrics['avg_processing_time'],
'last_update': self.metrics['last_update']
}
# Example production integration
production_monitor = ProductionMonitor()
# Main application loop
def main_trading_loop():
"""Main production trading loop with monitoring"""
while True:
try:
# Collect social media data
start_time = time.time()
# Twitter data collection
twitter_data = collector.collect_crypto_tweets(['BTC', 'ETH'], count=100)
# Reddit data collection
reddit_data = []
for subreddit in ['CryptoCurrency', 'Bitcoin']:
posts = reddit_collector.collect_reddit_posts(subreddit, limit=50)
reddit_data.extend(posts)
# Process sentiment analysis
combined_data = twitter_data + reddit_data
sentiment_results = optimized_processor.process_batch_parallel(combined_data)
processing_time = time.time() - start_time
# Generate trading signals
sentiment_trader.add_sentiment_data(sentiment_results)
signals = sentiment_trader.generate_trading_signals(['BTC', 'ETH'])
# Log processing metrics
avg_confidence = sum(r.get('confidence', 0.5) for r in sentiment_results) / len(sentiment_results)
production_monitor.log_sentiment_batch(
len(combined_data),
processing_time,
avg_confidence
)
# Execute trades (in demo/sandbox mode)
for signal in signals:
try:
# Simulate trade execution
execution_result = {'order_id': f"demo_{int(time.time())}", 'status': 'filled'}
production_monitor.log_trade_execution(signal, True, execution_result)
print(f"Executed {signal.signal_type.value} signal for {signal.symbol}")
except Exception as trade_error:
production_monitor.log_trade_execution(signal, False, {'error': str(trade_error)})
print(f"Failed to execute signal for {signal.symbol}: {trade_error}")
# Wait before next iteration
time.sleep(60) # Process every minute
except Exception as e:
print(f"Error in main trading loop: {e}")
time.sleep(30) # Wait before retrying
if __name__ == "__main__":
print("Starting crypto sentiment analysis trading system...")
main_trading_loop()
Conclusion
This comprehensive guide demonstrates how to build a complete sentiment analysis crypto trading system using Ollama for local processing of Twitter and Reddit data. You've learned to extract market emotions, generate automated trading signals, and deploy a production-ready sentiment-driven trading platform.
Key benefits of this approach include:
- Privacy: Process sensitive trading data locally without cloud dependencies
- Speed: Real-time sentiment analysis for immediate market opportunities
- Cost efficiency: Eliminate expensive cloud API fees for large-scale processing
- Customization: Fine-tune models specifically for cryptocurrency language patterns
- Reliability: Reduced dependency on external services for critical trading decisions
The system processes social media sentiment patterns, converts emotions into quantified trading signals, and integrates with popular cryptocurrency exchanges for automated execution. With proper risk management and backtesting, sentiment analysis can provide valuable insights for crypto trading strategies.
Start with paper trading to validate your sentiment signals before deploying real capital. Monitor performance metrics closely and adjust sensitivity thresholds based on market conditions and your risk tolerance.
Next steps for optimization:
- Implement advanced ML models for sentiment classification
- Add news sentiment analysis from crypto media sources
- Develop multi-timeframe sentiment trend analysis
- Create custom crypto vocabulary for improved accuracy
- Build portfolio risk management based on sentiment volatility
Master crypto sentiment analysis with Ollama to gain an edge in the fast-moving cryptocurrency markets through data-driven emotional intelligence.