Ever tried to build a crypto trading bot only to realize you're feeding it data from a carrier pigeon? Welcome to the world of real-time cryptocurrency market data, where milliseconds matter and stale data costs money. Today, we'll transform your Ollama setup into a cryptocurrency market intelligence powerhouse using the CoinGecko API.
This guide shows you how to create a robust data pipeline that fetches live market data, processes it efficiently, and integrates seamlessly with your Ollama environment. You'll learn to handle rate limits, implement error recovery, and deploy a production-ready system that won't crash when Bitcoin decides to moonwalk.
Why CoinGecko API Integration Matters for Crypto Development
The cryptocurrency market never sleeps, and neither should your data pipeline. Manual data collection creates bottlenecks that kill trading opportunities and analysis accuracy. CoinGecko's comprehensive API provides access to over 10,000 cryptocurrencies with real-time pricing, market cap data, and historical trends.
Key Benefits:
- Real-time market data from 500+ exchanges
- Historical price tracking for technical analysis
- Market sentiment indicators and trending coins
- Free tier support with 50 calls per minute
- Comprehensive documentation with Python examples
Setting Up Your CoinGecko API Environment
Prerequisites and API Key Setup
First, create your CoinGecko account and generate an API key:
# requirements.txt
requests==2.31.0
python-dotenv==1.0.0
pandas==2.1.0
schedule==1.2.0
# Install dependencies
pip install -r requirements.txt
# Create environment file
touch .env
# .env configuration
COINGECKO_API_KEY=your_api_key_here
COINGECKO_BASE_URL=https://api.coingecko.com/api/v3
REQUEST_TIMEOUT=30
MAX_RETRIES=3
Core API Client Implementation
# coingecko_client.py
import requests
import time
import os
from typing import Dict, List, Optional
from dotenv import load_dotenv
load_dotenv()
class CoinGeckoClient:
def __init__(self):
self.api_key = os.getenv('COINGECKO_API_KEY')
self.base_url = os.getenv('COINGECKO_BASE_URL')
self.timeout = int(os.getenv('REQUEST_TIMEOUT', 30))
self.max_retries = int(os.getenv('MAX_RETRIES', 3))
# Rate limiting: 50 calls per minute for free tier
self.rate_limit_delay = 1.2 # seconds between requests
self.last_request_time = 0
def _make_request(self, endpoint: str, params: Dict = None) -> Dict:
"""Make rate-limited API request with retry logic"""
url = f"{self.base_url}/{endpoint}"
# Enforce rate limiting
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.rate_limit_delay:
time.sleep(self.rate_limit_delay - time_since_last)
headers = {
'accept': 'application/json',
'x-cg-demo-api-key': self.api_key
}
for attempt in range(self.max_retries):
try:
response = requests.get(
url,
headers=headers,
params=params,
timeout=self.timeout
)
response.raise_for_status()
self.last_request_time = time.time()
return response.json()
except requests.exceptions.RequestException as e:
if attempt == self.max_retries - 1:
raise Exception(f"API request failed after {self.max_retries} attempts: {e}")
time.sleep(2 ** attempt) # Exponential backoff
def get_market_data(self, coin_ids: List[str]) -> Dict:
"""Fetch current market data for specified coins"""
params = {
'ids': ','.join(coin_ids),
'vs_currencies': 'usd',
'include_market_cap': 'true',
'include_24hr_vol': 'true',
'include_24hr_change': 'true',
'include_last_updated_at': 'true'
}
return self._make_request('simple/price', params)
Building the Cryptocurrency Data Pipeline
Market Data Collection Module
# market_data_collector.py
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List
import json
class MarketDataCollector:
def __init__(self, client: CoinGeckoClient):
self.client = client
self.data_cache = {}
def collect_top_coins(self, limit: int = 100) -> pd.DataFrame:
"""Collect market data for top cryptocurrencies by market cap"""
params = {
'vs_currency': 'usd',
'order': 'market_cap_desc',
'per_page': limit,
'page': 1,
'sparkline': 'false',
'price_change_percentage': '1h,24h,7d'
}
try:
data = self.client._make_request('coins/markets', params)
# Convert to DataFrame for easy manipulation
df = pd.DataFrame(data)
df['last_updated'] = pd.to_datetime(df['last_updated'])
df['data_collection_time'] = datetime.now()
# Cache for later use
self.data_cache['top_coins'] = df
return df
except Exception as e:
print(f"Error collecting market data: {e}")
return pd.DataFrame()
def get_coin_history(self, coin_id: str, days: int = 30) -> pd.DataFrame:
"""Fetch historical price data for a specific coin"""
params = {
'vs_currency': 'usd',
'days': days,
'interval': 'daily'
}
try:
data = self.client._make_request(f'coins/{coin_id}/market_chart', params)
# Convert timestamps and create DataFrame
prices = [(datetime.fromtimestamp(item[0]/1000), item[1])
for item in data['prices']]
df = pd.DataFrame(prices, columns=['timestamp', 'price'])
df['coin_id'] = coin_id
return df
except Exception as e:
print(f"Error fetching history for {coin_id}: {e}")
return pd.DataFrame()
def analyze_market_trends(self) -> Dict:
"""Analyze current market trends from cached data"""
if 'top_coins' not in self.data_cache:
return {}
df = self.data_cache['top_coins']
analysis = {
'total_market_cap': df['market_cap'].sum(),
'average_24h_change': df['price_change_percentage_24h'].mean(),
'gainers': df.nlargest(5, 'price_change_percentage_24h')[
['name', 'symbol', 'price_change_percentage_24h']
].to_dict('records'),
'losers': df.nsmallest(5, 'price_change_percentage_24h')[
['name', 'symbol', 'price_change_percentage_24h']
].to_dict('records'),
'high_volume_coins': df.nlargest(10, 'total_volume')[
['name', 'symbol', 'total_volume']
].to_dict('records')
}
return analysis
Ollama Integration Layer
# ollama_integration.py
import requests
import json
from typing import Dict, List
class OllamaIntegration:
def __init__(self, ollama_url: str = "http://localhost:11434"):
self.ollama_url = ollama_url
self.model_name = "llama2" # or your preferred model
def generate_market_summary(self, market_data: Dict) -> str:
"""Generate natural language market summary using Ollama"""
prompt = self._create_market_prompt(market_data)
try:
response = requests.post(
f"{self.ollama_url}/api/generate",
json={
"model": self.model_name,
"prompt": prompt,
"stream": False
}
)
if response.status_code == 200:
return response.json()['response']
else:
return f"Error generating summary: {response.status_code}"
except Exception as e:
return f"Ollama connection error: {e}"
def _create_market_prompt(self, market_data: Dict) -> str:
"""Create structured prompt for market analysis"""
prompt = f"""
Analyze this cryptocurrency market data and provide a concise summary:
Market Overview:
- Total Market Cap: ${market_data.get('total_market_cap', 0):,.0f}
- Average 24h Change: {market_data.get('average_24h_change', 0):.2f}%
Top Gainers:
"""
for gainer in market_data.get('gainers', []):
prompt += f"- {gainer['name']} ({gainer['symbol']}): +{gainer['price_change_percentage_24h']:.2f}%\n"
prompt += "\nTop Losers:\n"
for loser in market_data.get('losers', []):
prompt += f"- {loser['name']} ({loser['symbol']}): {loser['price_change_percentage_24h']:.2f}%\n"
prompt += """
Please provide:
1. A brief market sentiment analysis
2. Key trends and patterns
3. Notable observations
4. Potential implications for traders
Keep the response under 200 words and focus on actionable insights.
"""
return prompt
def analyze_coin_sentiment(self, coin_data: Dict) -> str:
"""Analyze sentiment for a specific cryptocurrency"""
prompt = f"""
Analyze the sentiment for {coin_data['name']} ({coin_data['symbol']}):
Current Price: ${coin_data['current_price']}
24h Change: {coin_data['price_change_percentage_24h']:.2f}%
7d Change: {coin_data.get('price_change_percentage_7d', 0):.2f}%
Market Cap: ${coin_data['market_cap']:,.0f}
Volume: ${coin_data['total_volume']:,.0f}
Provide a sentiment score (Bullish/Bearish/Neutral) and brief reasoning.
"""
try:
response = requests.post(
f"{self.ollama_url}/api/generate",
json={
"model": self.model_name,
"prompt": prompt,
"stream": False
}
)
return response.json()['response'] if response.status_code == 200 else "Analysis unavailable"
except Exception as e:
return f"Sentiment analysis error: {e}"
Automated Data Pipeline Implementation
Scheduler and Automation
# automated_pipeline.py
import schedule
import time
import json
from datetime import datetime
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('crypto_pipeline.log'),
logging.StreamHandler()
]
)
class CryptoPipeline:
def __init__(self):
self.client = CoinGeckoClient()
self.collector = MarketDataCollector(self.client)
self.ollama = OllamaIntegration()
def run_market_analysis(self):
"""Main pipeline execution function"""
try:
logging.info("Starting market data collection...")
# Collect market data
market_df = self.collector.collect_top_coins(50)
if market_df.empty:
logging.error("No market data collected")
return
# Analyze trends
trends = self.collector.analyze_market_trends()
# Generate AI summary
summary = self.ollama.generate_market_summary(trends)
# Save results
self._save_results(market_df, trends, summary)
logging.info("Market analysis completed successfully")
except Exception as e:
logging.error(f"Pipeline execution failed: {e}")
def _save_results(self, market_df, trends, summary):
"""Save analysis results to files"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Save market data
market_df.to_csv(f'data/market_data_{timestamp}.csv', index=False)
# Save trends analysis
with open(f'data/trends_{timestamp}.json', 'w') as f:
json.dump(trends, f, indent=2, default=str)
# Save AI summary
with open(f'data/summary_{timestamp}.txt', 'w') as f:
f.write(summary)
logging.info(f"Results saved with timestamp: {timestamp}")
def setup_schedule(self):
"""Configure automated scheduling"""
# Run every 15 minutes during market hours
schedule.every(15).minutes.do(self.run_market_analysis)
# Daily comprehensive analysis at 9 AM
schedule.every().day.at("09:00").do(self.run_comprehensive_analysis)
logging.info("Pipeline schedule configured")
def run_comprehensive_analysis(self):
"""Extended analysis for daily reports"""
try:
logging.info("Running comprehensive daily analysis...")
# Collect extended dataset
market_df = self.collector.collect_top_coins(100)
# Analyze top 10 coins individually
detailed_analysis = {}
for _, coin in market_df.head(10).iterrows():
sentiment = self.ollama.analyze_coin_sentiment(coin.to_dict())
detailed_analysis[coin['symbol']] = sentiment
# Save comprehensive report
timestamp = datetime.now().strftime("%Y%m%d")
with open(f'reports/daily_report_{timestamp}.json', 'w') as f:
json.dump(detailed_analysis, f, indent=2)
logging.info("Daily comprehensive analysis completed")
except Exception as e:
logging.error(f"Comprehensive analysis failed: {e}")
# Main execution
if __name__ == "__main__":
pipeline = CryptoPipeline()
pipeline.setup_schedule()
print("Crypto pipeline started. Press Ctrl+C to stop.")
try:
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
except KeyboardInterrupt:
print("\nPipeline stopped by user")
Error Handling and Rate Limit Management
Robust Error Recovery
# error_handling.py
import functools
import time
import logging
from typing import Callable, Any
def retry_with_backoff(max_retries: int = 3, backoff_factor: float = 2.0):
"""Decorator for retry logic with exponential backoff"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
logging.error(f"Function {func.__name__} failed after {max_retries} attempts: {e}")
raise
sleep_time = backoff_factor ** attempt
logging.warning(f"Attempt {attempt + 1} failed, retrying in {sleep_time}s: {e}")
time.sleep(sleep_time)
return wrapper
return decorator
class RateLimitManager:
def __init__(self, calls_per_minute: int = 50):
self.calls_per_minute = calls_per_minute
self.call_times = []
def wait_if_needed(self):
"""Implement rate limiting logic"""
current_time = time.time()
# Remove calls older than 1 minute
self.call_times = [t for t in self.call_times if current_time - t < 60]
if len(self.call_times) >= self.calls_per_minute:
# Wait until we can make another call
wait_time = 60 - (current_time - self.call_times[0])
if wait_time > 0:
time.sleep(wait_time)
self.call_times.append(current_time)
Testing and Validation
Unit Tests for Pipeline Components
# test_pipeline.py
import unittest
from unittest.mock import Mock, patch
import pandas as pd
from datetime import datetime
class TestCryptoPipeline(unittest.TestCase):
def setUp(self):
self.client = CoinGeckoClient()
self.collector = MarketDataCollector(self.client)
@patch('requests.get')
def test_api_client_request(self, mock_get):
"""Test API client makes requests correctly"""
mock_response = Mock()
mock_response.json.return_value = {'test': 'data'}
mock_response.raise_for_status.return_value = None
mock_get.return_value = mock_response
result = self.client._make_request('test/endpoint')
self.assertEqual(result, {'test': 'data'})
mock_get.assert_called_once()
def test_market_data_processing(self):
"""Test market data processing logic"""
sample_data = [
{
'id': 'bitcoin',
'name': 'Bitcoin',
'symbol': 'btc',
'current_price': 45000,
'market_cap': 850000000000,
'price_change_percentage_24h': 2.5
}
]
df = pd.DataFrame(sample_data)
self.collector.data_cache['top_coins'] = df
trends = self.collector.analyze_market_trends()
self.assertIn('total_market_cap', trends)
self.assertIn('gainers', trends)
self.assertIn('losers', trends)
def test_error_handling(self):
"""Test error handling in data collection"""
with patch.object(self.client, '_make_request', side_effect=Exception("API Error")):
result = self.collector.collect_top_coins()
self.assertTrue(result.empty)
if __name__ == '__main__':
unittest.main()
Deployment and Production Setup
Docker Configuration
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create data directories
RUN mkdir -p data reports logs
# Set environment variables
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1
# Expose port for health checks
EXPOSE 8000
# Run the pipeline
CMD ["python", "automated_pipeline.py"]
Docker Compose with Ollama
# docker-compose.yml
version: '3.8'
services:
ollama:
image: ollama/ollama:latest
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
environment:
- OLLAMA_ORIGINS=*
crypto-pipeline:
build: .
depends_on:
- ollama
volumes:
- ./data:/app/data
- ./reports:/app/reports
- ./logs:/app/logs
environment:
- COINGECKO_API_KEY=${COINGECKO_API_KEY}
- OLLAMA_URL=http://ollama:11434
restart: unless-stopped
volumes:
ollama_data:
Health Monitoring
# health_monitor.py
from flask import Flask, jsonify
import threading
import time
from datetime import datetime, timedelta
app = Flask(__name__)
class HealthMonitor:
def __init__(self):
self.last_successful_run = None
self.error_count = 0
self.status = "starting"
def record_success(self):
self.last_successful_run = datetime.now()
self.error_count = 0
self.status = "healthy"
def record_error(self):
self.error_count += 1
if self.error_count > 5:
self.status = "unhealthy"
monitor = HealthMonitor()
@app.route('/health')
def health_check():
"""Health check endpoint for monitoring"""
current_time = datetime.now()
health_data = {
"status": monitor.status,
"last_successful_run": monitor.last_successful_run.isoformat() if monitor.last_successful_run else None,
"error_count": monitor.error_count,
"uptime": current_time.isoformat()
}
# Check if last run was more than 30 minutes ago
if monitor.last_successful_run:
time_since_last = current_time - monitor.last_successful_run
if time_since_last > timedelta(minutes=30):
health_data["status"] = "warning"
status_code = 200 if monitor.status == "healthy" else 503
return jsonify(health_data), status_code
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)
Advanced Features and Optimizations
Caching Strategy
# cache_manager.py
import redis
import json
import pickle
from datetime import datetime, timedelta
from typing import Any, Optional
class CacheManager:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
self.default_ttl = 900 # 15 minutes
def get_cached_data(self, key: str) -> Optional[Any]:
"""Retrieve cached data"""
try:
cached = self.redis_client.get(key)
if cached:
return pickle.loads(cached)
except Exception as e:
print(f"Cache retrieval error: {e}")
return None
def cache_data(self, key: str, data: Any, ttl: int = None) -> bool:
"""Cache data with expiration"""
try:
ttl = ttl or self.default_ttl
serialized = pickle.dumps(data)
return self.redis_client.setex(key, ttl, serialized)
except Exception as e:
print(f"Cache storage error: {e}")
return False
def get_or_fetch(self, key: str, fetch_func: callable, ttl: int = None) -> Any:
"""Get from cache or fetch and cache"""
cached = self.get_cached_data(key)
if cached is not None:
return cached
# Fetch fresh data
fresh_data = fetch_func()
if fresh_data is not None:
self.cache_data(key, fresh_data, ttl)
return fresh_data
Performance Optimization
# performance_optimizer.py
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict
class AsyncCoinGeckoClient:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.coingecko.com/api/v3"
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch_multiple_coins(self, coin_ids: List[str]) -> List[Dict]:
"""Fetch multiple coins concurrently"""
tasks = []
for coin_id in coin_ids:
task = self.fetch_coin_data(coin_id)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions
valid_results = [r for r in results if not isinstance(r, Exception)]
return valid_results
async def fetch_coin_data(self, coin_id: str) -> Dict:
"""Fetch individual coin data"""
url = f"{self.base_url}/coins/{coin_id}"
headers = {'x-cg-demo-api-key': self.api_key}
async with self.session.get(url, headers=headers) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"API error for {coin_id}: {response.status}")
# Usage example
async def optimized_data_collection():
"""Demonstrate optimized concurrent data collection"""
coin_ids = ['bitcoin', 'ethereum', 'cardano', 'polkadot', 'chainlink']
async with AsyncCoinGeckoClient("your-api-key") as client:
results = await client.fetch_multiple_coins(coin_ids)
return results
Monitoring and Analytics Dashboard
Real-time Dashboard
# dashboard.py
from flask import Flask, render_template, jsonify
import json
import pandas as pd
from datetime import datetime, timedelta
import plotly.graph_objs as go
import plotly.utils
app = Flask(__name__)
@app.route('/')
def dashboard():
"""Main dashboard page"""
return render_template('dashboard.html')
@app.route('/api/market-overview')
def market_overview():
"""API endpoint for market overview data"""
try:
# Load latest market data
latest_file = get_latest_data_file('data/market_data_*.csv')
if latest_file:
df = pd.read_csv(latest_file)
overview = {
'total_market_cap': df['market_cap'].sum(),
'total_volume': df['total_volume'].sum(),
'average_change': df['price_change_percentage_24h'].mean(),
'top_gainers': df.nlargest(5, 'price_change_percentage_24h')[
['name', 'symbol', 'price_change_percentage_24h']
].to_dict('records'),
'top_losers': df.nsmallest(5, 'price_change_percentage_24h')[
['name', 'symbol', 'price_change_percentage_24h']
].to_dict('records')
}
return jsonify(overview)
else:
return jsonify({'error': 'No data available'}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/price-chart/<coin_id>')
def price_chart(coin_id):
"""Generate price chart for specific coin"""
try:
# This would typically load from your database
# For demo, using sample data
dates = pd.date_range(start='2024-01-01', periods=30, freq='D')
prices = [45000 + i * 100 + (i % 5) * 500 for i in range(30)]
trace = go.Scatter(
x=dates,
y=prices,
mode='lines+markers',
name=coin_id.upper(),
line=dict(color='#1f77b4', width=2)
)
layout = go.Layout(
title=f'{coin_id.upper()} Price History',
xaxis=dict(title='Date'),
yaxis=dict(title='Price (USD)'),
hovermode='x unified'
)
fig = go.Figure(data=[trace], layout=layout)
return jsonify(json.loads(plotly.utils.PlotlyJSONEncoder().encode(fig)))
except Exception as e:
return jsonify({'error': str(e)}), 500
def get_latest_data_file(pattern: str) -> str:
"""Get the most recent data file matching pattern"""
import glob
import os
files = glob.glob(pattern)
if files:
return max(files, key=os.path.getctime)
return None
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
Conclusion
Building a robust CoinGecko API integration with Ollama creates a powerful cryptocurrency market data pipeline that transforms raw market data into actionable insights. This implementation handles rate limiting, provides error recovery, and scales efficiently for production use.
The combination of real-time data collection, intelligent caching, and AI-powered analysis gives you a competitive edge in cryptocurrency markets. Your pipeline now automatically monitors market trends, identifies opportunities, and generates natural language summaries that make complex market data accessible.
Key takeaways:
- Implement proper rate limiting to avoid API restrictions
- Use caching strategies to reduce API calls and improve performance
- Build comprehensive error handling for production reliability
- Leverage Ollama integration for intelligent market analysis
- Monitor system health with automated alerts and dashboards
Ready to deploy your cryptocurrency market intelligence system? Start with the basic pipeline, then gradually add advanced features like sentiment analysis, portfolio tracking, and automated trading signals. The foundation you've built here scales from personal projects to enterprise-level market analysis platforms.
Production Deployment Checklist
Security Best Practices
# security_config.py
import os
import hashlib
import hmac
from cryptography.fernet import Fernet
class SecurityManager:
def __init__(self):
self.encryption_key = os.getenv('ENCRYPTION_KEY') or Fernet.generate_key()
self.cipher_suite = Fernet(self.encryption_key)
def encrypt_api_key(self, api_key: str) -> str:
"""Encrypt API key for secure storage"""
return self.cipher_suite.encrypt(api_key.encode()).decode()
def decrypt_api_key(self, encrypted_key: str) -> str:
"""Decrypt stored API key"""
return self.cipher_suite.decrypt(encrypted_key.encode()).decode()
def validate_webhook_signature(self, payload: str, signature: str, secret: str) -> bool:
"""Validate webhook signatures for security"""
expected_signature = hmac.new(
secret.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected_signature)
# Environment configuration
SECURITY_CONFIG = {
'API_KEY_ENCRYPTION': True,
'WEBHOOK_VALIDATION': True,
'RATE_LIMIT_STRICT': True,
'LOG_SENSITIVE_DATA': False,
'CORS_ORIGINS': os.getenv('CORS_ORIGINS', 'http://localhost:3000').split(',')
}
Database Integration
# database_manager.py
import sqlite3
import pandas as pd
from datetime import datetime
from typing import List, Dict, Optional
import logging
class DatabaseManager:
def __init__(self, db_path: str = "crypto_data.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize database tables"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Market data table
cursor.execute('''
CREATE TABLE IF NOT EXISTS market_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
coin_id TEXT NOT NULL,
name TEXT NOT NULL,
symbol TEXT NOT NULL,
current_price REAL,
market_cap REAL,
total_volume REAL,
price_change_24h REAL,
price_change_percentage_24h REAL,
market_cap_rank INTEGER,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(coin_id, timestamp)
)
''')
# Analysis results table
cursor.execute('''
CREATE TABLE IF NOT EXISTS analysis_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
analysis_type TEXT NOT NULL,
coin_id TEXT,
result_data TEXT,
confidence_score REAL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# System metrics table
cursor.execute('''
CREATE TABLE IF NOT EXISTS system_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
metric_name TEXT NOT NULL,
metric_value REAL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
logging.info("Database initialized successfully")
def store_market_data(self, market_df: pd.DataFrame) -> bool:
"""Store market data in database"""
try:
with sqlite3.connect(self.db_path) as conn:
market_df.to_sql('market_data', conn, if_exists='append', index=False)
logging.info(f"Stored {len(market_df)} market data records")
return True
except Exception as e:
logging.error(f"Error storing market data: {e}")
return False
def get_historical_data(self, coin_id: str, days: int = 30) -> pd.DataFrame:
"""Retrieve historical data for a coin"""
query = '''
SELECT * FROM market_data
WHERE coin_id = ?
AND timestamp > datetime('now', '-{} days')
ORDER BY timestamp DESC
'''.format(days)
try:
with sqlite3.connect(self.db_path) as conn:
return pd.read_sql_query(query, conn, params=(coin_id,))
except Exception as e:
logging.error(f"Error retrieving historical data: {e}")
return pd.DataFrame()
def store_analysis_result(self, analysis_type: str, coin_id: str,
result_data: str, confidence_score: float = 0.0):
"""Store analysis results"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO analysis_results
(analysis_type, coin_id, result_data, confidence_score)
VALUES (?, ?, ?, ?)
''', (analysis_type, coin_id, result_data, confidence_score))
conn.commit()
except Exception as e:
logging.error(f"Error storing analysis result: {e}")
Advanced Analytics Module
# advanced_analytics.py
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from typing import Dict, List, Tuple
import ta # Technical Analysis library
class AdvancedAnalytics:
def __init__(self, db_manager: DatabaseManager):
self.db_manager = db_manager
self.scaler = StandardScaler()
def calculate_technical_indicators(self, coin_id: str) -> Dict:
"""Calculate technical indicators for a coin"""
df = self.db_manager.get_historical_data(coin_id, days=60)
if df.empty or len(df) < 20:
return {}
# Sort by timestamp
df = df.sort_values('timestamp')
# Calculate technical indicators
indicators = {
'sma_20': ta.trend.sma_indicator(df['current_price'], window=20).iloc[-1],
'sma_50': ta.trend.sma_indicator(df['current_price'], window=50).iloc[-1],
'rsi': ta.momentum.rsi(df['current_price'], window=14).iloc[-1],
'macd': ta.trend.macd_diff(df['current_price']).iloc[-1],
'bollinger_upper': ta.volatility.bollinger_hband(df['current_price']).iloc[-1],
'bollinger_lower': ta.volatility.bollinger_lband(df['current_price']).iloc[-1],
'volume_sma': ta.volume.volume_sma(df['total_volume'], window=20).iloc[-1]
}
# Generate signals
current_price = df['current_price'].iloc[-1]
indicators['signal'] = self._generate_signal(current_price, indicators)
return indicators
def _generate_signal(self, current_price: float, indicators: Dict) -> str:
"""Generate trading signal based on indicators"""
signals = []
# RSI signals
if indicators['rsi'] > 70:
signals.append('overbought')
elif indicators['rsi'] < 30:
signals.append('oversold')
# MACD signals
if indicators['macd'] > 0:
signals.append('bullish_momentum')
else:
signals.append('bearish_momentum')
# Bollinger Bands signals
if current_price > indicators['bollinger_upper']:
signals.append('overbought_bb')
elif current_price < indicators['bollinger_lower']:
signals.append('oversold_bb')
# SMA crossover
if indicators['sma_20'] > indicators['sma_50']:
signals.append('golden_cross')
elif indicators['sma_20'] < indicators['sma_50']:
signals.append('death_cross')
# Determine overall signal
bullish_signals = ['oversold', 'bullish_momentum', 'oversold_bb', 'golden_cross']
bearish_signals = ['overbought', 'bearish_momentum', 'overbought_bb', 'death_cross']
bullish_count = sum(1 for s in signals if s in bullish_signals)
bearish_count = sum(1 for s in signals if s in bearish_signals)
if bullish_count > bearish_count:
return 'BUY'
elif bearish_count > bullish_count:
return 'SELL'
else:
return 'HOLD'
def perform_market_clustering(self, n_clusters: int = 5) -> Dict:
"""Perform market clustering analysis"""
# Get recent market data for all coins
query = '''
SELECT coin_id, AVG(price_change_percentage_24h) as avg_change,
AVG(total_volume) as avg_volume,
AVG(market_cap) as avg_market_cap
FROM market_data
WHERE timestamp > datetime('now', '-7 days')
GROUP BY coin_id
HAVING COUNT(*) >= 5
'''
try:
with sqlite3.connect(self.db_manager.db_path) as conn:
df = pd.read_sql_query(query, conn)
if df.empty:
return {}
# Prepare features for clustering
features = ['avg_change', 'avg_volume', 'avg_market_cap']
X = df[features].fillna(0)
# Standardize features
X_scaled = self.scaler.fit_transform(X)
# Perform K-means clustering
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
df['cluster'] = kmeans.fit_predict(X_scaled)
# Analyze clusters
cluster_analysis = {}
for cluster_id in range(n_clusters):
cluster_coins = df[df['cluster'] == cluster_id]
cluster_analysis[f'cluster_{cluster_id}'] = {
'coin_count': len(cluster_coins),
'avg_price_change': cluster_coins['avg_change'].mean(),
'avg_volume': cluster_coins['avg_volume'].mean(),
'avg_market_cap': cluster_coins['avg_market_cap'].mean(),
'top_coins': cluster_coins.nlargest(5, 'avg_market_cap')['coin_id'].tolist()
}
return cluster_analysis
except Exception as e:
logging.error(f"Error in market clustering: {e}")
return {}
def calculate_portfolio_metrics(self, holdings: Dict[str, float]) -> Dict:
"""Calculate portfolio performance metrics"""
portfolio_data = []
for coin_id, quantity in holdings.items():
df = self.db_manager.get_historical_data(coin_id, days=30)
if not df.empty:
current_price = df['current_price'].iloc[0] # Most recent price
initial_price = df['current_price'].iloc[-1] # Oldest price
portfolio_data.append({
'coin_id': coin_id,
'quantity': quantity,
'current_value': current_price * quantity,
'initial_value': initial_price * quantity,
'return_pct': ((current_price - initial_price) / initial_price) * 100
})
if not portfolio_data:
return {}
df = pd.DataFrame(portfolio_data)
# Calculate portfolio metrics
total_current_value = df['current_value'].sum()
total_initial_value = df['initial_value'].sum()
portfolio_return = ((total_current_value - total_initial_value) / total_initial_value) * 100
# Calculate weighted returns for risk metrics
weights = df['current_value'] / total_current_value
weighted_returns = df['return_pct'] * weights
metrics = {
'total_value': total_current_value,
'total_return_pct': portfolio_return,
'best_performer': df.loc[df['return_pct'].idxmax(), 'coin_id'],
'worst_performer': df.loc[df['return_pct'].idxmin(), 'coin_id'],
'avg_return': df['return_pct'].mean(),
'volatility': df['return_pct'].std(),
'sharpe_ratio': df['return_pct'].mean() / df['return_pct'].std() if df['return_pct'].std() > 0 else 0,
'diversification_score': 1 - (weights ** 2).sum(), # Herfindahl index
'holdings_breakdown': df[['coin_id', 'quantity', 'current_value', 'return_pct']].to_dict('records')
}
return metrics
Real-time WebSocket Integration
# websocket_handler.py
import asyncio
import websockets
import json
import logging
from typing import Dict, Set
from datetime import datetime
class WebSocketHandler:
def __init__(self, pipeline):
self.pipeline = pipeline
self.connected_clients: Set[websockets.WebSocketServerProtocol] = set()
self.subscriptions: Dict[str, Set[websockets.WebSocketServerProtocol]] = {}
async def register_client(self, websocket, path):
"""Register new WebSocket client"""
self.connected_clients.add(websocket)
logging.info(f"New client connected: {websocket.remote_address}")
try:
await websocket.wait_closed()
finally:
self.connected_clients.remove(websocket)
# Remove from all subscriptions
for subscription_set in self.subscriptions.values():
subscription_set.discard(websocket)
async def handle_message(self, websocket, message):
"""Handle incoming WebSocket messages"""
try:
data = json.loads(message)
action = data.get('action')
if action == 'subscribe':
await self.handle_subscription(websocket, data)
elif action == 'unsubscribe':
await self.handle_unsubscription(websocket, data)
elif action == 'get_market_data':
await self.send_market_data(websocket, data)
else:
await self.send_error(websocket, "Unknown action")
except json.JSONDecodeError:
await self.send_error(websocket, "Invalid JSON")
except Exception as e:
logging.error(f"WebSocket error: {e}")
await self.send_error(websocket, "Internal server error")
async def handle_subscription(self, websocket, data):
"""Handle subscription requests"""
topic = data.get('topic')
if topic:
if topic not in self.subscriptions:
self.subscriptions[topic] = set()
self.subscriptions[topic].add(websocket)
await websocket.send(json.dumps({
'type': 'subscription_confirmed',
'topic': topic
}))
async def broadcast_market_update(self, market_data: Dict):
"""Broadcast market updates to subscribed clients"""
message = json.dumps({
'type': 'market_update',
'data': market_data,
'timestamp': datetime.now().isoformat()
})
# Send to all clients subscribed to market updates
if 'market_updates' in self.subscriptions:
disconnected = set()
for websocket in self.subscriptions['market_updates']:
try:
await websocket.send(message)
except websockets.exceptions.ConnectionClosed:
disconnected.add(websocket)
# Clean up disconnected clients
for websocket in disconnected:
self.subscriptions['market_updates'].remove(websocket)
async def send_market_data(self, websocket, data):
"""Send current market data to client"""
try:
# Get latest market data from pipeline
market_df = self.pipeline.collector.collect_top_coins(50)
response = {
'type': 'market_data',
'data': market_df.to_dict('records') if not market_df.empty else [],
'timestamp': datetime.now().isoformat()
}
await websocket.send(json.dumps(response))
except Exception as e:
await self.send_error(websocket, f"Error fetching market data: {e}")
async def send_error(self, websocket, error_message):
"""Send error message to client"""
error_response = {
'type': 'error',
'message': error_message,
'timestamp': datetime.now().isoformat()
}
try:
await websocket.send(json.dumps(error_response))
except:
pass # Client might be disconnected
# WebSocket server setup
async def start_websocket_server(pipeline, host='localhost', port=8765):
"""Start WebSocket server"""
handler = WebSocketHandler(pipeline)
async def websocket_handler(websocket, path):
await handler.register_client(websocket, path)
server = await websockets.serve(websocket_handler, host, port)
logging.info(f"WebSocket server started on {host}:{port}")
return server, handler
Configuration Management
# config_manager.py
import os
import yaml
import json
from typing import Dict, Any
from pathlib import Path
class ConfigManager:
def __init__(self, config_path: str = "config/pipeline.yaml"):
self.config_path = Path(config_path)
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from file"""
if self.config_path.exists():
with open(self.config_path, 'r') as f:
return yaml.safe_load(f)
else:
return self._create_default_config()
def _create_default_config(self) -> Dict[str, Any]:
"""Create default configuration"""
default_config = {
'api': {
'coingecko_api_key': os.getenv('COINGECKO_API_KEY', ''),
'base_url': 'https://api.coingecko.com/api/v3',
'timeout': 30,
'max_retries': 3,
'rate_limit_delay': 1.2
},
'ollama': {
'url': os.getenv('OLLAMA_URL', 'http://localhost:11434'),
'model': 'llama2',
'temperature': 0.7,
'max_tokens': 500
},
'pipeline': {
'collection_interval': 15, # minutes
'top_coins_limit': 100,
'historical_days': 30,
'enable_caching': True,
'cache_ttl': 900 # seconds
},
'database': {
'path': 'crypto_data.db',
'backup_interval': 3600, # seconds
'max_backup_files': 7
},
'monitoring': {
'enable_health_check': True,
'health_check_port': 8000,
'log_level': 'INFO',
'metrics_retention_days': 30
},
'security': {
'encrypt_api_keys': True,
'enable_webhook_validation': True,
'cors_origins': ['http://localhost:3000'],
'rate_limit_requests_per_minute': 60
},
'notifications': {
'enable_email': False,
'email_smtp_host': '',
'email_smtp_port': 587,
'enable_discord': False,
'discord_webhook_url': '',
'enable_slack': False,
'slack_webhook_url': ''
}
}
# Save default config
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.config_path, 'w') as f:
yaml.dump(default_config, f, default_flow_style=False)
return default_config
def get(self, key: str, default: Any = None) -> Any:
"""Get configuration value using dot notation"""
keys = key.split('.')
value = self.config
for k in keys:
if isinstance(value, dict) and k in value:
value = value[k]
else:
return default
return value
def update(self, key: str, value: Any):
"""Update configuration value"""
keys = key.split('.')
config = self.config
for k in keys[:-1]:
if k not in config:
config[k] = {}
config = config[k]
config[keys[-1]] = value
# Save updated config
with open(self.config_path, 'w') as f:
yaml.dump(self.config, f, default_flow_style=False)
Production Deployment Guide
#!/bin/bash
# deploy.sh - Production deployment script
set -e
echo "🚀 Starting CoinGecko-Ollama Pipeline Deployment"
# Check prerequisites
echo "📋 Checking prerequisites..."
command -v docker >/dev/null 2>&1 || { echo "Docker is required but not installed. Aborting." >&2; exit 1; }
command -v docker-compose >/dev/null 2>&1 || { echo "Docker Compose is required but not installed. Aborting." >&2; exit 1; }
# Create necessary directories
echo "📁 Creating directory structure..."
mkdir -p {data,reports,logs,config,backups}
# Set permissions
echo "🔒 Setting permissions..."
chmod 755 data reports logs config backups
chmod 600 .env
# Generate SSL certificates for production
echo "🔐 Generating SSL certificates..."
if [ ! -f "ssl/server.crt" ]; then
mkdir -p ssl
openssl req -x509 -newkey rsa:4096 -keyout ssl/server.key -out ssl/server.crt -days 365 -nodes -subj "/CN=localhost"
fi
# Build Docker images
echo "🏗️ Building Docker images..."
docker-compose build --no-cache
# Start services
echo "🚀 Starting services..."
docker-compose up -d
# Wait for services to be ready
echo "⏳ Waiting for services to be ready..."
sleep 30
# Run health checks
echo "🏥 Running health checks..."
curl -f http://localhost:8000/health || { echo "Health check failed!" >&2; exit 1; }
# Setup cron jobs for maintenance
echo "⏰ Setting up maintenance cron jobs..."
(crontab -l 2>/dev/null; echo "0 2 * * * /usr/bin/docker-compose exec crypto-pipeline python maintenance.py") | crontab -
echo "✅ Deployment completed successfully!"
echo "📊 Dashboard available at: http://localhost:5000"
echo "🏥 Health check available at: http://localhost:8000/health"
echo "📝 Logs can be viewed with: docker-compose logs -f"
Maintenance and Monitoring
# maintenance.py
import os
import sqlite3
import logging
from datetime import datetime, timedelta
from pathlib import Path
import gzip
import shutil
class MaintenanceManager:
def __init__(self, config_manager):
self.config = config_manager
self.db_path = config_manager.get('database.path')
self.backup_dir = Path('backups')
self.log_dir = Path('logs')
def run_daily_maintenance(self):
"""Run daily maintenance tasks"""
logging.info("Starting daily maintenance tasks")
try:
self.backup_database()
self.cleanup_old_backups()
self.cleanup_old_logs()
self.optimize_database()
self.generate_maintenance_report()
logging.info("Daily maintenance completed successfully")
except Exception as e:
logging.error(f"Maintenance failed: {e}")
raise
def backup_database(self):
"""Create database backup"""
if not os.path.exists(self.db_path):
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = self.backup_dir / f"crypto_data_backup_{timestamp}.db"
self.backup_dir.mkdir(exist_ok=True)
shutil.copy2(self.db_path, backup_path)
# Compress backup
with open(backup_path, 'rb') as f_in:
with gzip.open(f"{backup_path}.gz", 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# Remove uncompressed backup
backup_path.unlink()
logging.info(f"Database backup created: {backup_path}.gz")
def cleanup_old_backups(self):
"""Remove old backup files"""
max_backup_files = self.config.get('database.max_backup_files', 7)
backup_files = sorted(
self.backup_dir.glob("crypto_data_backup_*.db.gz"),
key=lambda x: x.stat().st_mtime,
reverse=True
)
# Keep only the most recent backups
for backup_file in backup_files[max_backup_files:]:
backup_file.unlink()
logging.info(f"Removed old backup: {backup_file}")
def cleanup_old_logs(self):
"""Remove old log files"""
retention_days = self.config.get('monitoring.metrics_retention_days', 30)
cutoff_date = datetime.now() - timedelta(days=retention_days)
for log_file in self.log_dir.glob("*.log"):
if datetime.fromtimestamp(log_file.stat().st_mtime) < cutoff_date:
log_file.unlink()
logging.info(f"Removed old log file: {log_file}")
def optimize_database(self):
"""Optimize database performance"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Vacuum database
cursor.execute("VACUUM")
# Analyze tables for query optimization
cursor.execute("ANALYZE")
# Remove old data beyond retention period
retention_days = self.config.get('monitoring.metrics_retention_days', 30)
cutoff_date = datetime.now() - timedelta(days=retention_days)
cursor.execute('''
DELETE FROM market_data
WHERE timestamp < ?
''', (cutoff_date,))
cursor.execute('''
DELETE FROM analysis_results
WHERE timestamp < ?
''', (cutoff_date,))
conn.commit()
logging.info("Database optimization completed")
except Exception as e:
logging.error(f"Database optimization failed: {e}")
def generate_maintenance_report(self):
"""Generate maintenance report"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Get database statistics
cursor.execute("SELECT COUNT(*) FROM market_data")
market_data_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM analysis_results")
analysis_count = cursor.fetchone()[0]
# Get disk usage
data_size = sum(f.stat().st_size for f in Path('data').rglob('*') if f.is_file())
backup_size = sum(f.stat().st_size for f in self.backup_dir.rglob('*') if f.is_file())
report = {
'timestamp': datetime.now().isoformat(),
'database_stats': {
'market_data_records': market_data_count,
'analysis_records': analysis_count,
'database_size_mb': os.path.getsize(self.db_path) / (1024 * 1024)
},
'storage_stats': {
'data_directory_size_mb': data_size / (1024 * 1024),
'backup_directory_size_mb': backup_size / (1024 * 1024),
'backup_files_count': len(list(self.backup_dir.glob('*.gz')))
}
}
# Save report
report_path = Path('reports') / f"maintenance_report_{datetime.now().strftime('%Y%m%d')}.json"
with open(report_path, 'w') as f:
json.dump(report, f, indent=2)
logging.info(f"Maintenance report generated: {report_path}")
except Exception as e:
logging.error(f"Failed to generate maintenance report: {e}")
if __name__ == "__main__":
from config_manager import ConfigManager
config = ConfigManager()
maintenance = MaintenanceManager(config)
maintenance.run_daily_maintenance()
Next Steps and Scaling Considerations
Your CoinGecko-Ollama cryptocurrency pipeline is now production-ready with enterprise-grade features. Here's how to scale and enhance the system further:
Horizontal Scaling Options
- Microservices Architecture: Split components into separate services
- Load Balancing: Distribute API requests across multiple instances
- Database Sharding: Partition data across multiple databases
- Container Orchestration: Use Kubernetes for auto-scaling
Performance Optimizations
- Redis Clustering: Scale caching layer horizontally
- Message Queues: Implement RabbitMQ or Apache Kafka for async processing
- CDN Integration: Cache static assets and API responses
- Database Indexing: Optimize queries with proper indexing strategies
Enterprise Features
- Multi-tenant Support: Isolate data for different organizations
- Advanced Security: Implement OAuth2, JWT tokens, and audit logging
- Compliance: Add GDPR, SOC2, and financial regulations compliance
- Business Intelligence: Integrate with Tableau, Power BI, or custom dashboards
Ready to dominate the crypto market with intelligent Data Analysis? Your pipeline foundation scales from hobby projects to institutional trading platforms.