Picture this: USDC trades at $1.002 on Binance while sitting at $0.998 on Coinbase. That's a $4 profit per $1,000 traded. Now imagine capturing these opportunities 24/7 with an AI-powered bot that never sleeps, never gets tired, and never misses a profitable trade.
Stablecoin arbitrage presents one of crypto's most consistent profit opportunities. Unlike volatile altcoins, stablecoins maintain predictable price ranges while still experiencing micro-fluctuations across exchanges. This guide shows you how to build an intelligent arbitrage bot using Ollama's local AI capabilities to identify and execute profitable trades across multiple exchanges.
What Is Stablecoin Arbitrage and Why It Works
Stablecoin arbitrage exploits price differences of the same asset across different exchanges. While stablecoins like USDT, USDC, and BUSD target $1.00, they fluctuate between $0.998 and $1.002 based on supply, demand, and exchange-specific factors.
Why Stablecoin Arbitrage Opportunities Exist
Price discrepancies occur due to:
- Exchange liquidity differences: Lower liquidity creates higher spreads
- Regional demand variations: Geographic factors affect local pricing
- Trading volume imbalances: High buy/sell pressure on specific platforms
- Withdrawal/deposit delays: Transfer times create temporary price gaps
- Market maker inefficiencies: Automated systems don't always sync perfectly
Advantages Over Traditional Crypto Arbitrage
Stablecoin arbitrage offers several benefits:
- Lower volatility risk: Minimal price movement during transfer times
- Predictable profit margins: Consistent 0.1% to 0.5% opportunities
- Reduced slippage: Stable prices mean better execution
- Lower capital requirements: Smaller spreads but higher frequency
Setting Up Your Development Environment
Prerequisites
Before building your stablecoin arbitrage bot, ensure you have:
- Python 3.8 or higher installed
- API keys from supported exchanges (Binance, Coinbase Pro, Kraken)
- Ollama installed locally
- Basic understanding of cryptocurrency trading concepts
Installing Required Dependencies
# Install core dependencies
pip install ccxt pandas numpy requests asyncio websockets
# Install Ollama Python client
pip install ollama
# Install additional utilities
pip install python-dotenv schedule logging
Environment Configuration
Create a .env file for sensitive credentials:
# Exchange API credentials
BINANCE_API_KEY=your_binance_api_key
BINANCE_SECRET_KEY=your_binance_secret_key
COINBASE_API_KEY=your_coinbase_api_key
COINBASE_SECRET_KEY=your_coinbase_secret_key
COINBASE_PASSPHRASE=your_coinbase_passphrase
# Trading parameters
MIN_PROFIT_THRESHOLD=0.001
MAX_POSITION_SIZE=1000
RISK_PERCENTAGE=0.02
Understanding Ollama for Trading Intelligence
Ollama provides local AI model execution without relying on external APIs. For trading applications, this offers several advantages:
Benefits of Local AI Processing
- Privacy: Trading strategies and data remain on your machine
- Speed: No network latency for AI decision-making
- Cost: No per-request charges for AI processing
- Reliability: No dependency on external AI services
Selecting the Right Ollama Model
Choose models based on your system capabilities:
# Recommended models for trading analysis
LIGHTWEIGHT_MODEL = "llama2:7b" # Fast decisions, basic analysis
BALANCED_MODEL = "llama2:13b" # Good balance of speed and accuracy
ADVANCED_MODEL = "llama2:70b" # Complex analysis, slower processing
Building the Core Arbitrage Detection System
Exchange Price Monitoring
Create a robust price monitoring system that tracks stablecoin prices across multiple exchanges:
import ccxt
import asyncio
import time
from dataclasses import dataclass
from typing import Dict, List, Optional
@dataclass
class PriceData:
exchange: str
symbol: str
bid: float
ask: float
timestamp: float
volume: float
class ExchangeMonitor:
def __init__(self, exchanges: Dict[str, ccxt.Exchange]):
self.exchanges = exchanges
self.price_data = {}
self.running = False
async def fetch_ticker(self, exchange_name: str, symbol: str) -> Optional[PriceData]:
"""Fetch current ticker data from exchange"""
try:
exchange = self.exchanges[exchange_name]
ticker = await exchange.fetch_ticker(symbol)
return PriceData(
exchange=exchange_name,
symbol=symbol,
bid=ticker['bid'],
ask=ticker['ask'],
timestamp=ticker['timestamp'],
volume=ticker['baseVolume']
)
except Exception as e:
print(f"Error fetching {symbol} from {exchange_name}: {e}")
return None
async def monitor_prices(self, symbols: List[str]):
"""Continuously monitor prices across all exchanges"""
while self.running:
tasks = []
for exchange_name in self.exchanges:
for symbol in symbols:
task = self.fetch_ticker(exchange_name, symbol)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results and update price data
for result in results:
if isinstance(result, PriceData):
key = f"{result.exchange}_{result.symbol}"
self.price_data[key] = result
await asyncio.sleep(5) # Update every 5 seconds
Arbitrage Opportunity Detection
Implement logic to identify profitable arbitrage opportunities:
class ArbitrageDetector:
def __init__(self, min_profit_threshold: float = 0.001):
self.min_profit_threshold = min_profit_threshold
self.opportunities = []
def calculate_arbitrage_profit(self, buy_price: PriceData, sell_price: PriceData,
amount: float) -> Dict:
"""Calculate potential profit from arbitrage opportunity"""
# Buy from lower price exchange (use ask price)
buy_cost = buy_price.ask * amount
# Sell on higher price exchange (use bid price)
sell_revenue = sell_price.bid * amount
# Calculate gross profit
gross_profit = sell_revenue - buy_cost
# Estimate trading fees (0.1% per trade is typical)
trading_fees = (buy_cost + sell_revenue) * 0.001
# Calculate net profit
net_profit = gross_profit - trading_fees
profit_percentage = (net_profit / buy_cost) * 100
return {
'buy_exchange': buy_price.exchange,
'sell_exchange': sell_price.exchange,
'symbol': buy_price.symbol,
'buy_price': buy_price.ask,
'sell_price': sell_price.bid,
'amount': amount,
'gross_profit': gross_profit,
'trading_fees': trading_fees,
'net_profit': net_profit,
'profit_percentage': profit_percentage,
'timestamp': time.time()
}
def find_opportunities(self, price_data: Dict[str, PriceData],
symbol: str) -> List[Dict]:
"""Find arbitrage opportunities for a specific symbol"""
opportunities = []
exchange_prices = {}
# Group prices by exchange
for key, data in price_data.items():
if data.symbol == symbol:
exchange_prices[data.exchange] = data
# Compare all exchange pairs
exchanges = list(exchange_prices.keys())
for i in range(len(exchanges)):
for j in range(i + 1, len(exchanges)):
exchange1 = exchanges[i]
exchange2 = exchanges[j]
price1 = exchange_prices[exchange1]
price2 = exchange_prices[exchange2]
# Check if price1 is lower (buy from exchange1, sell on exchange2)
if price1.ask < price2.bid:
profit_calc = self.calculate_arbitrage_profit(
price1, price2, 1000 # Test with $1000
)
if profit_calc['net_profit'] > self.min_profit_threshold:
opportunities.append(profit_calc)
# Check reverse direction
if price2.ask < price1.bid:
profit_calc = self.calculate_arbitrage_profit(
price2, price1, 1000
)
if profit_calc['net_profit'] > self.min_profit_threshold:
opportunities.append(profit_calc)
return opportunities
Integrating Ollama for Smart Decision Making
Setting Up Ollama Integration
Connect your arbitrage bot to Ollama for intelligent trade analysis:
import ollama
import json
from typing import Dict, List
class OllamaTradeAnalyzer:
def __init__(self, model_name: str = "llama2:7b"):
self.model_name = model_name
self.client = ollama.Client()
def analyze_opportunity(self, opportunity: Dict) -> Dict:
"""Analyze arbitrage opportunity using Ollama"""
prompt = f"""
Analyze this stablecoin arbitrage opportunity:
Symbol: {opportunity['symbol']}
Buy Exchange: {opportunity['buy_exchange']} at ${opportunity['buy_price']:.4f}
Sell Exchange: {opportunity['sell_exchange']} at ${opportunity['sell_price']:.4f}
Net Profit: ${opportunity['net_profit']:.2f}
Profit Percentage: {opportunity['profit_percentage']:.3f}%
Consider these factors:
1. Market conditions and volatility
2. Exchange reliability and liquidity
3. Transfer time risks
4. Historical performance of similar opportunities
Provide a recommendation (BUY, HOLD, AVOID) with reasoning.
Format response as JSON with 'recommendation' and 'reasoning' fields.
"""
try:
response = self.client.chat(model=self.model_name, messages=[
{'role': 'user', 'content': prompt}
])
# Parse AI response
content = response['message']['content']
# Extract JSON from response
start_idx = content.find('{')
end_idx = content.rfind('}') + 1
if start_idx != -1 and end_idx != -1:
json_str = content[start_idx:end_idx]
analysis = json.loads(json_str)
return analysis
else:
return {
'recommendation': 'HOLD',
'reasoning': 'Unable to parse AI response'
}
except Exception as e:
print(f"Error analyzing opportunity: {e}")
return {
'recommendation': 'AVOID',
'reasoning': f'Analysis error: {str(e)}'
}
def assess_market_conditions(self, price_history: List[Dict]) -> str:
"""Assess overall market conditions using historical data"""
prompt = f"""
Analyze these recent stablecoin price movements:
{json.dumps(price_history[-10:], indent=2)}
Determine:
1. Market stability level (HIGH, MEDIUM, LOW)
2. Arbitrage opportunity frequency
3. Risk assessment for automated trading
Provide brief analysis focusing on trading safety.
"""
try:
response = self.client.chat(model=self.model_name, messages=[
{'role': 'user', 'content': prompt}
])
return response['message']['content']
except Exception as e:
return f"Market analysis unavailable: {e}"
Risk Management with AI
Implement AI-powered risk management:
class RiskManager:
def __init__(self, ollama_analyzer: OllamaTradeAnalyzer):
self.analyzer = ollama_analyzer
self.position_limits = {
'max_position_size': 5000,
'max_daily_trades': 50,
'max_exposure_per_exchange': 10000
}
self.active_positions = {}
self.daily_trades = 0
def evaluate_trade_risk(self, opportunity: Dict) -> Dict:
"""Evaluate risk factors for a specific trade"""
# Get AI analysis
ai_analysis = self.analyzer.analyze_opportunity(opportunity)
# Check position limits
risk_factors = {
'ai_recommendation': ai_analysis['recommendation'],
'ai_reasoning': ai_analysis['reasoning'],
'within_position_limits': self._check_position_limits(opportunity),
'exchange_exposure_ok': self._check_exchange_exposure(opportunity),
'daily_trade_limit_ok': self.daily_trades < self.position_limits['max_daily_trades']
}
# Calculate overall risk score
risk_score = self._calculate_risk_score(risk_factors)
return {
'risk_factors': risk_factors,
'risk_score': risk_score,
'trade_approved': risk_score < 0.3 and ai_analysis['recommendation'] == 'BUY'
}
def _check_position_limits(self, opportunity: Dict) -> bool:
"""Check if trade is within position size limits"""
return opportunity['amount'] <= self.position_limits['max_position_size']
def _check_exchange_exposure(self, opportunity: Dict) -> bool:
"""Check exchange exposure limits"""
buy_exchange = opportunity['buy_exchange']
sell_exchange = opportunity['sell_exchange']
buy_exposure = self.active_positions.get(buy_exchange, 0)
sell_exposure = self.active_positions.get(sell_exchange, 0)
return (buy_exposure + opportunity['amount'] <=
self.position_limits['max_exposure_per_exchange'])
def _calculate_risk_score(self, risk_factors: Dict) -> float:
"""Calculate numerical risk score (0-1, lower is better)"""
score = 0.0
# AI recommendation weight
if risk_factors['ai_recommendation'] == 'AVOID':
score += 0.5
elif risk_factors['ai_recommendation'] == 'HOLD':
score += 0.2
# Position limit violations
if not risk_factors['within_position_limits']:
score += 0.3
if not risk_factors['exchange_exposure_ok']:
score += 0.2
if not risk_factors['daily_trade_limit_ok']:
score += 0.1
return min(score, 1.0)
Implementing Multi-Exchange Trading Logic
Exchange API Integration
Create a unified interface for multiple exchanges:
class ExchangeManager:
def __init__(self):
self.exchanges = {}
self.initialize_exchanges()
def initialize_exchanges(self):
"""Initialize exchange connections with API credentials"""
# Binance
self.exchanges['binance'] = ccxt.binance({
'apiKey': os.getenv('BINANCE_API_KEY'),
'secret': os.getenv('BINANCE_SECRET_KEY'),
'sandbox': False, # Set to True for testing
'enableRateLimit': True,
})
# Coinbase Pro
self.exchanges['coinbase'] = ccxt.coinbasepro({
'apiKey': os.getenv('COINBASE_API_KEY'),
'secret': os.getenv('COINBASE_SECRET_KEY'),
'password': os.getenv('COINBASE_PASSPHRASE'),
'sandbox': False,
'enableRateLimit': True,
})
# Kraken
self.exchanges['kraken'] = ccxt.kraken({
'apiKey': os.getenv('KRAKEN_API_KEY'),
'secret': os.getenv('KRAKEN_SECRET_KEY'),
'enableRateLimit': True,
})
async def execute_buy_order(self, exchange_name: str, symbol: str,
amount: float, price: float) -> Dict:
"""Execute buy order on specified exchange"""
try:
exchange = self.exchanges[exchange_name]
# Place limit buy order
order = await exchange.create_limit_buy_order(
symbol, amount, price
)
return {
'success': True,
'order_id': order['id'],
'exchange': exchange_name,
'symbol': symbol,
'side': 'buy',
'amount': amount,
'price': price,
'timestamp': time.time()
}
except Exception as e:
return {
'success': False,
'error': str(e),
'exchange': exchange_name,
'symbol': symbol,
'timestamp': time.time()
}
async def execute_sell_order(self, exchange_name: str, symbol: str,
amount: float, price: float) -> Dict:
"""Execute sell order on specified exchange"""
try:
exchange = self.exchanges[exchange_name]
# Place limit sell order
order = await exchange.create_limit_sell_order(
symbol, amount, price
)
return {
'success': True,
'order_id': order['id'],
'exchange': exchange_name,
'symbol': symbol,
'side': 'sell',
'amount': amount,
'price': price,
'timestamp': time.time()
}
except Exception as e:
return {
'success': False,
'error': str(e),
'exchange': exchange_name,
'symbol': symbol,
'timestamp': time.time()
}
async def check_order_status(self, exchange_name: str, order_id: str) -> Dict:
"""Check status of existing order"""
try:
exchange = self.exchanges[exchange_name]
order = await exchange.fetch_order(order_id)
return {
'success': True,
'order_id': order_id,
'status': order['status'],
'filled': order['filled'],
'remaining': order['remaining'],
'timestamp': time.time()
}
except Exception as e:
return {
'success': False,
'error': str(e),
'order_id': order_id,
'timestamp': time.time()
}
Trade Execution Engine
Build the core trading engine that executes arbitrage opportunities:
class ArbitrageExecutor:
def __init__(self, exchange_manager: ExchangeManager,
risk_manager: RiskManager):
self.exchange_manager = exchange_manager
self.risk_manager = risk_manager
self.active_trades = {}
self.trade_history = []
async def execute_arbitrage(self, opportunity: Dict) -> Dict:
"""Execute arbitrage trade based on opportunity"""
# Risk assessment
risk_assessment = self.risk_manager.evaluate_trade_risk(opportunity)
if not risk_assessment['trade_approved']:
return {
'success': False,
'reason': 'Trade rejected by risk management',
'risk_factors': risk_assessment['risk_factors']
}
# Execute simultaneous buy and sell orders
buy_exchange = opportunity['buy_exchange']
sell_exchange = opportunity['sell_exchange']
symbol = opportunity['symbol']
amount = opportunity['amount']
# Create trade ID
trade_id = f"{int(time.time())}_{symbol}_{buy_exchange}_{sell_exchange}"
try:
# Execute buy order
buy_result = await self.exchange_manager.execute_buy_order(
buy_exchange, symbol, amount, opportunity['buy_price']
)
# Execute sell order
sell_result = await self.exchange_manager.execute_sell_order(
sell_exchange, symbol, amount, opportunity['sell_price']
)
# Track trade
trade_record = {
'trade_id': trade_id,
'buy_order': buy_result,
'sell_order': sell_result,
'opportunity': opportunity,
'timestamp': time.time(),
'status': 'pending'
}
self.active_trades[trade_id] = trade_record
return {
'success': True,
'trade_id': trade_id,
'buy_order': buy_result,
'sell_order': sell_result
}
except Exception as e:
return {
'success': False,
'error': str(e),
'trade_id': trade_id
}
async def monitor_active_trades(self):
"""Monitor and update status of active trades"""
for trade_id, trade in self.active_trades.items():
try:
# Check buy order status
if trade['buy_order']['success']:
buy_status = await self.exchange_manager.check_order_status(
trade['buy_order']['exchange'],
trade['buy_order']['order_id']
)
trade['buy_status'] = buy_status
# Check sell order status
if trade['sell_order']['success']:
sell_status = await self.exchange_manager.check_order_status(
trade['sell_order']['exchange'],
trade['sell_order']['order_id']
)
trade['sell_status'] = sell_status
# Update trade status
self._update_trade_status(trade)
except Exception as e:
print(f"Error monitoring trade {trade_id}: {e}")
def _update_trade_status(self, trade: Dict):
"""Update trade status based on order execution"""
buy_filled = (trade.get('buy_status', {}).get('status') == 'closed')
sell_filled = (trade.get('sell_status', {}).get('status') == 'closed')
if buy_filled and sell_filled:
trade['status'] = 'completed'
trade['completion_time'] = time.time()
# Calculate actual profit
self._calculate_actual_profit(trade)
# Move to history
self.trade_history.append(trade)
del self.active_trades[trade['trade_id']]
def _calculate_actual_profit(self, trade: Dict):
"""Calculate actual profit from completed trade"""
buy_filled = trade['buy_status']['filled']
sell_filled = trade['sell_status']['filled']
# Calculate actual costs and revenues
actual_buy_cost = buy_filled * trade['buy_order']['price']
actual_sell_revenue = sell_filled * trade['sell_order']['price']
# Estimate actual fees (would need to fetch from exchange)
estimated_fees = (actual_buy_cost + actual_sell_revenue) * 0.001
trade['actual_profit'] = actual_sell_revenue - actual_buy_cost - estimated_fees
trade['actual_profit_percentage'] = (trade['actual_profit'] / actual_buy_cost) * 100
Testing and Optimization Strategies
Backtesting Framework
Create a backtesting system to validate your arbitrage strategy:
class ArbitrageBacktester:
def __init__(self, initial_capital: float = 10000):
self.initial_capital = initial_capital
self.current_capital = initial_capital
self.trades = []
self.performance_metrics = {}
def simulate_historical_trading(self, historical_data: List[Dict],
detector: ArbitrageDetector) -> Dict:
"""Simulate trading on historical price data"""
total_trades = 0
profitable_trades = 0
total_profit = 0
for data_point in historical_data:
# Find opportunities in historical data
opportunities = detector.find_opportunities(
data_point['price_data'],
'USDT/USD'
)
# Simulate trades
for opportunity in opportunities:
if self._should_execute_trade(opportunity):
trade_result = self._simulate_trade(opportunity)
self.trades.append(trade_result)
total_trades += 1
if trade_result['profit'] > 0:
profitable_trades += 1
total_profit += trade_result['profit']
self.current_capital += trade_result['profit']
# Calculate performance metrics
self.performance_metrics = {
'total_trades': total_trades,
'profitable_trades': profitable_trades,
'win_rate': profitable_trades / total_trades if total_trades > 0 else 0,
'total_profit': total_profit,
'total_return': (self.current_capital - self.initial_capital) / self.initial_capital,
'average_profit_per_trade': total_profit / total_trades if total_trades > 0 else 0
}
return self.performance_metrics
def _should_execute_trade(self, opportunity: Dict) -> bool:
"""Determine if trade should be executed in backtest"""
# Simple criteria for backtesting
return (opportunity['profit_percentage'] > 0.1 and
opportunity['net_profit'] > 5)
def _simulate_trade(self, opportunity: Dict) -> Dict:
"""Simulate trade execution with realistic assumptions"""
# Add slippage and execution delays
slippage = 0.0001 # 0.01% slippage
execution_delay_impact = 0.00005 # Price movement during execution
adjusted_profit = (opportunity['net_profit'] -
(opportunity['amount'] * slippage) -
(opportunity['amount'] * execution_delay_impact))
return {
'timestamp': opportunity['timestamp'],
'symbol': opportunity['symbol'],
'buy_exchange': opportunity['buy_exchange'],
'sell_exchange': opportunity['sell_exchange'],
'amount': opportunity['amount'],
'expected_profit': opportunity['net_profit'],
'profit': adjusted_profit,
'success': adjusted_profit > 0
}
Performance Monitoring
Implement comprehensive performance tracking:
class PerformanceMonitor:
def __init__(self):
self.metrics = {
'total_trades': 0,
'successful_trades': 0,
'total_profit': 0,
'total_fees': 0,
'average_profit_per_trade': 0,
'win_rate': 0,
'daily_profits': {},
'exchange_performance': {}
}
self.start_time = time.time()
def record_trade(self, trade_result: Dict):
"""Record trade result for performance analysis"""
self.metrics['total_trades'] += 1
if trade_result.get('actual_profit', 0) > 0:
self.metrics['successful_trades'] += 1
self.metrics['total_profit'] += trade_result['actual_profit']
# Update daily profit tracking
date = time.strftime('%Y-%m-%d', time.localtime(trade_result['timestamp']))
if date not in self.metrics['daily_profits']:
self.metrics['daily_profits'][date] = 0
self.metrics['daily_profits'][date] += trade_result.get('actual_profit', 0)
# Update exchange performance
buy_exchange = trade_result['buy_order']['exchange']
sell_exchange = trade_result['sell_order']['exchange']
exchange_pair = f"{buy_exchange}-{sell_exchange}"
if exchange_pair not in self.metrics['exchange_performance']:
self.metrics['exchange_performance'][exchange_pair] = {
'trades': 0,
'profit': 0
}
self.metrics['exchange_performance'][exchange_pair]['trades'] += 1
self.metrics['exchange_performance'][exchange_pair]['profit'] += trade_result.get('actual_profit', 0)
# Recalculate derived metrics
self._update_derived_metrics()
def _update_derived_metrics(self):
"""Update calculated performance metrics"""
if self.metrics['total_trades'] > 0:
self.metrics['win_rate'] = (self.metrics['successful_trades'] /
self.metrics['total_trades'])
self.metrics['average_profit_per_trade'] = (self.metrics['total_profit'] /
self.metrics['total_trades'])
def generate_performance_report(self) -> str:
"""Generate comprehensive performance report"""
runtime_hours = (time.time() - self.start_time) / 3600
report = f"""
=== ARBITRAGE BOT PERFORMANCE REPORT ===
Runtime: {runtime_hours:.2f} hours
Total Trades: {self.metrics['total_trades']}
Successful Trades: {self.metrics['successful_trades']}
Win Rate: {self.metrics['win_rate']:.2%}
Financial Performance:
Total Profit: ${self.metrics['total_profit']:.2f}
Average Profit per Trade: ${self.metrics['average_profit_per_trade']:.2f}
Hourly Profit Rate: ${self.metrics['total_profit'] / runtime_hours:.2f}
Daily Profit Breakdown:
"""
for date, profit in sorted(self.metrics['daily_profits'].items()):
report += f" {date}: ${profit:.2f}\n"
report += "\n Exchange Pair Performance:\n"
for pair, data in self.metrics['exchange_performance'].items():
avg_profit = data['profit'] / data['trades'] if data['trades'] > 0 else 0
report += f" {pair}: {data['trades']} trades, ${avg_profit:.2f} avg profit\n"
return report
Complete Bot Implementation
Main Bot Class
Combine all components into a complete arbitrage bot:
class StablecoinArbitrageBot:
def __init__(self, config: Dict):
self.config = config
self.running = False
# Initialize components
self.exchange_manager = ExchangeManager()
self.ollama_analyzer = OllamaTradeAnalyzer(config.get('ollama_model', 'llama2:7b'))
self.risk_manager = RiskManager(self.ollama_analyzer)
self.detector = ArbitrageDetector(config.get('min_profit_threshold', 0.001))
self.executor = ArbitrageExecutor(self.exchange_manager, self.risk_manager)
self.monitor = ExchangeMonitor(self.exchange_manager.exchanges)
self.performance_monitor = PerformanceMonitor()
# Trading parameters
self.symbols = config.get('symbols', ['USDT/USD', 'USDC/USD', 'BUSD/USD'])
self.scan_interval = config.get('scan_interval', 10) # seconds
# Logging setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('arbitrage_bot.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
async def start(self):
"""Start the arbitrage bot"""
self.logger.info("Starting Stablecoin Arbitrage Bot...")
self.running = True
# Start price monitoring
self.monitor.running = True
monitor_task = asyncio.create_task(
self.monitor.monitor_prices(self.symbols)
)
# Start main trading loop
trading_task = asyncio.create_task(self.main_trading_loop())
# Start trade monitoring
trade_monitor_task = asyncio.create_task(self.trade_monitoring_loop())
try:
await asyncio.gather(monitor_task, trading_task, trade_monitor_task)
except KeyboardInterrupt:
self.logger.info("Bot stopped by user")
finally:
await self.stop()
async def main_trading_loop(self):
"""Main trading logic loop"""
while self.running:
try:
# Scan for opportunities
for symbol in self.symbols:
opportunities = self.detector.find_opportunities(
self.monitor.price_data, symbol
)
# Process each opportunity
for opportunity in opportunities:
if self.running:
await self.process_opportunity(opportunity)
# Wait before next scan
await asyncio.sleep(self.scan_interval)
except Exception as e:
self.logger.error(f"Error in main trading loop: {e}")
await asyncio.sleep(5)
async def process_opportunity(self, opportunity: Dict):
"""Process a single arbitrage opportunity"""
self.logger.info(f"Processing opportunity: {opportunity['symbol']} "
f"({opportunity['buy_exchange']} -> {opportunity['sell_exchange']}) "
f"Profit: ${opportunity['net_profit']:.2f}")
# Execute arbitrage if conditions are met
execution_result = await self.executor.execute_arbitrage(opportunity)
if execution_result['success']:
self.logger.info(f"Trade executed successfully: {execution_result['trade_id']}")
else:
self.logger.warning(f"Trade execution failed: {execution_result.get('reason', 'Unknown error')}")
async def trade_monitoring_loop(self):
"""Monitor active trades and update performance metrics"""
while self.running:
try:
# Monitor active trades
await self.executor.monitor_active_trades()
# Update performance metrics for completed trades
for trade in self.executor.trade_history:
if trade.get('recorded', False) == False:
self.performance_monitor.record_trade(trade)
trade['recorded'] = True
# Generate periodic performance reports
if int(time.time()) % 3600 == 0: # Every hour
report = self.performance_monitor.generate_performance_report()
self.logger.info(f"Performance Report:\n{report}")
await asyncio.sleep(30) # Check every 30 seconds
except Exception as e:
self.logger.error(f"Error in trade monitoring loop: {e}")
await asyncio.sleep(10)
async def stop(self):
"""Stop the arbitrage bot"""
self.logger.info("Stopping Stablecoin Arbitrage Bot...")
self.running = False
self.monitor.running = False
# Generate final performance report
final_report = self.performance_monitor.generate_performance_report()
self.logger.info(f"Final Performance Report:\n{final_report}")
# Save trade history
self.save_trade_history()
def save_trade_history(self):
"""Save trade history to file"""
try:
import json
history_data = {
'trades': self.executor.trade_history,
'performance_metrics': self.performance_monitor.metrics,
'timestamp': time.time()
}
with open('trade_history.json', 'w') as f:
json.dump(history_data, f, indent=2)
self.logger.info("Trade history saved to trade_history.json")
except Exception as e:
self.logger.error(f"Error saving trade history: {e}")
# Configuration and startup
def main():
"""Main function to start the arbitrage bot"""
# Load configuration
config = {
'ollama_model': 'llama2:7b',
'min_profit_threshold': 1.0, # Minimum $1 profit
'symbols': ['USDT/USD', 'USDC/USD'],
'scan_interval': 5, # Scan every 5 seconds
'max_position_size': 1000, # Maximum $1000 per trade
}
# Create and start bot
bot = StablecoinArbitrageBot(config)
try:
asyncio.run(bot.start())
except KeyboardInterrupt:
print("\nBot stopped by user")
except Exception as e:
print(f"Bot error: {e}")
if __name__ == "__main__":
main()
Deployment and Monitoring
Production Deployment Setup
Set up your bot for production deployment:
# production_config.py
import os
from typing import Dict
class ProductionConfig:
def __init__(self):
self.config = {
# Trading parameters
'min_profit_threshold': float(os.getenv('MIN_PROFIT_THRESHOLD', '2.0')),
'max_position_size': float(os.getenv('MAX_POSITION_SIZE', '1000')),
'scan_interval': int(os.getenv('SCAN_INTERVAL', '5')),
# Risk management
'max_daily_trades': int(os.getenv('MAX_DAILY_TRADES', '100')),
'max_drawdown_percentage': float(os.getenv('MAX_DRAWDOWN_PERCENTAGE', '5.0')),
# Ollama configuration
'ollama_model': os.getenv('OLLAMA_MODEL', 'llama2:7b'),
'ollama_host': os.getenv('OLLAMA_HOST', 'localhost:11434'),
# Monitoring
'enable_telegram_alerts': os.getenv('ENABLE_TELEGRAM_ALERTS', 'false').lower() == 'true',
'telegram_bot_token': os.getenv('TELEGRAM_BOT_TOKEN'),
'telegram_chat_id': os.getenv('TELEGRAM_CHAT_ID'),
# Logging
'log_level': os.getenv('LOG_LEVEL', 'INFO'),
'log_file': os.getenv('LOG_FILE', 'arbitrage_bot.log')
}
def get_config(self) -> Dict:
return self.config
Monitoring and Alerting System
Add comprehensive monitoring capabilities:
class TelegramNotifier:
def __init__(self, bot_token: str, chat_id: str):
self.bot_token = bot_token
self.chat_id = chat_id
self.enabled = bool(bot_token and chat_id)
async def send_alert(self, message: str):
"""Send alert message via Telegram"""
if not self.enabled:
return
try:
import aiohttp
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
data = {
'chat_id': self.chat_id,
'text': message,
'parse_mode': 'HTML'
}
async with aiohttp.ClientSession() as session:
async with session.post(url, data=data) as response:
if response.status == 200:
print(f"Alert sent: {message}")
else:
print(f"Failed to send alert: {response.status}")
except Exception as e:
print(f"Error sending Telegram alert: {e}")
async def send_trade_alert(self, trade_result: Dict):
"""Send trade execution alert"""
if trade_result['success']:
message = f"""
🟢 <b>Trade Executed Successfully</b>
Trade ID: {trade_result['trade_id']}
Symbol: {trade_result['buy_order']['symbol']}
Buy Exchange: {trade_result['buy_order']['exchange']}
Sell Exchange: {trade_result['sell_order']['exchange']}
Amount: ${trade_result['buy_order']['amount']:.2f}
Expected Profit: ${trade_result.get('expected_profit', 0):.2f}
"""
else:
message = f"""
🔴 <b>Trade Execution Failed</b>
Error: {trade_result.get('error', 'Unknown error')}
Trade ID: {trade_result.get('trade_id', 'N/A')}
"""
await self.send_alert(message)
async def send_performance_alert(self, performance_data: Dict):
"""Send performance summary alert"""
message = f"""
📊 <b>Performance Update</b>
Total Trades: {performance_data['total_trades']}
Win Rate: {performance_data['win_rate']:.2%}
Total Profit: ${performance_data['total_profit']:.2f}
Average Profit: ${performance_data['average_profit_per_trade']:.2f}
"""
await self.send_alert(message)
Best Practices and Security
Security Considerations
Implement essential security measures:
class SecurityManager:
def __init__(self):
self.max_api_calls_per_minute = 100
self.api_call_timestamps = []
self.emergency_stop_conditions = {
'max_consecutive_losses': 5,
'max_daily_loss': 1000,
'max_drawdown_percentage': 10
}
self.consecutive_losses = 0
self.daily_loss = 0
self.starting_balance = 0
def check_api_rate_limits(self) -> bool:
"""Check if API rate limits are respected"""
current_time = time.time()
# Remove timestamps older than 1 minute
self.api_call_timestamps = [
timestamp for timestamp in self.api_call_timestamps
if current_time - timestamp < 60
]
# Check if under rate limit
if len(self.api_call_timestamps) >= self.max_api_calls_per_minute:
return False
self.api_call_timestamps.append(current_time)
return True
def check_emergency_stop_conditions(self, trade_result: Dict) -> bool:
"""Check if emergency stop conditions are met"""
# Track consecutive losses
if trade_result.get('actual_profit', 0) < 0:
self.consecutive_losses += 1
self.daily_loss += abs(trade_result.get('actual_profit', 0))
else:
self.consecutive_losses = 0
# Check stop conditions
if self.consecutive_losses >= self.emergency_stop_conditions['max_consecutive_losses']:
return True
if self.daily_loss >= self.emergency_stop_conditions['max_daily_loss']:
return True
# Check drawdown percentage
if self.starting_balance > 0:
current_loss_percentage = (self.daily_loss / self.starting_balance) * 100
if current_loss_percentage >= self.emergency_stop_conditions['max_drawdown_percentage']:
return True
return False
def validate_trade_parameters(self, opportunity: Dict) -> bool:
"""Validate trade parameters for safety"""
# Check minimum profit threshold
if opportunity['net_profit'] < 1.0:
return False
# Check maximum position size
if opportunity['amount'] > 5000:
return False
# Check profit percentage is reasonable
if opportunity['profit_percentage'] > 2.0: # > 2% seems suspicious
return False
return True
Error Handling and Recovery
Implement robust error handling:
class ErrorHandler:
def __init__(self, telegram_notifier: TelegramNotifier = None):
self.telegram_notifier = telegram_notifier
self.error_counts = {}
self.max_error_threshold = 5
self.recovery_strategies = {
'connection_error': self.handle_connection_error,
'insufficient_balance': self.handle_insufficient_balance,
'order_execution_error': self.handle_order_execution_error
}
async def handle_error(self, error: Exception, context: str) -> bool:
"""Handle errors with appropriate recovery strategies"""
error_type = type(error).__name__
error_key = f"{context}_{error_type}"
# Track error frequency
if error_key not in self.error_counts:
self.error_counts[error_key] = 0
self.error_counts[error_key] += 1
# Log error
logging.error(f"Error in {context}: {error}")
# Send alert if available
if self.telegram_notifier:
await self.telegram_notifier.send_alert(
f"🚨 Error in {context}: {str(error)}"
)
# Check if error threshold exceeded
if self.error_counts[error_key] >= self.max_error_threshold:
await self.handle_critical_error(error_key)
return False
# Apply recovery strategy
recovery_strategy = self.recovery_strategies.get(
error_type.lower(), self.default_recovery
)
return await recovery_strategy(error, context)
async def handle_connection_error(self, error: Exception, context: str) -> bool:
"""Handle connection errors with retry logic"""
logging.info(f"Attempting to recover from connection error in {context}")
# Wait before retry
await asyncio.sleep(30)
# Attempt to reconnect
try:
# This would involve reinitializing exchange connections
return True
except Exception as e:
logging.error(f"Connection recovery failed: {e}")
return False
async def handle_insufficient_balance(self, error: Exception, context: str) -> bool:
"""Handle insufficient balance errors"""
logging.warning(f"Insufficient balance detected in {context}")
# Send critical alert
if self.telegram_notifier:
await self.telegram_notifier.send_alert(
"🚨 Critical: Insufficient balance detected. Manual intervention required."
)
# Pause trading temporarily
await asyncio.sleep(300) # Wait 5 minutes
return True
async def handle_order_execution_error(self, error: Exception, context: str) -> bool:
"""Handle order execution errors"""
logging.error(f"Order execution error in {context}: {error}")
# Wait before retrying
await asyncio.sleep(10)
return True
async def default_recovery(self, error: Exception, context: str) -> bool:
"""Default recovery strategy"""
logging.info(f"Applying default recovery for {context}")
await asyncio.sleep(5)
return True
async def handle_critical_error(self, error_key: str):
"""Handle critical errors that require immediate attention"""
logging.critical(f"Critical error threshold exceeded: {error_key}")
if self.telegram_notifier:
await self.telegram_notifier.send_alert(
f"🚨 CRITICAL: Error threshold exceeded for {error_key}. Bot may need manual intervention."
)
Conclusion
Building a stablecoin arbitrage bot with Ollama creates a powerful trading system that combines AI intelligence with automated execution. The local AI processing ensures privacy and speed while the multi-exchange architecture captures opportunities across different platforms.
Key benefits of this approach include consistent profit opportunities from stablecoin price differences, reduced risk compared to volatile cryptocurrency arbitrage, and intelligent decision-making through Ollama's AI analysis. The system provides 24/7 automated trading with comprehensive risk management and performance monitoring.
Remember to start with paper trading to validate your strategy, implement proper risk management controls, and continuously monitor performance metrics. The combination of AI analysis and automated execution creates a robust arbitrage system capable of generating steady returns from cryptocurrency market inefficiencies.
Your stablecoin arbitrage bot with Ollama represents a sophisticated approach to algorithmic trading that leverages cutting-edge AI technology for consistent profit generation in the cryptocurrency markets.