Warning: This tutorial might make your wallet happier than a DeFi degen finding a 1000% APY pool that's actually legitimate.
Manually hunting yield farming opportunities feels like playing whack-a-mole with your portfolio. You refresh dozens of DeFi protocols, calculate gas fees, and by the time you act, the opportunity vanishes faster than your patience during a market crash.
Python yield farming scripts solve this problem by automating the entire process. This tutorial shows you how to build intelligent farming bots using Web3.py that monitor multiple protocols, execute trades, and maximize your DeFi returns while you sleep.
You'll learn to create scripts that interact with popular DeFi protocols, implement safety checks, and deploy automated strategies that outperform manual farming.
What Makes Python Perfect for Yield Farming Automation
Python dominates DeFi automation for three key reasons: Web3.py provides direct blockchain interaction, extensive libraries handle complex calculations, and readable syntax makes debugging financial logic straightforward.
Core Advantages of Python DeFi Scripts
Speed and Efficiency: Python scripts execute trades in milliseconds, capturing fleeting arbitrage opportunities that manual traders miss.
Safety Mechanisms: Built-in error handling prevents costly mistakes like sending transactions to wrong addresses or exceeding slippage limits.
Multi-Protocol Support: Single scripts can interact with Uniswap, Compound, Aave, and dozens of other protocols simultaneously.
Essential Web3.py Setup for Yield Farming
Before building yield farming scripts, you need a robust Web3.py environment that handles multiple RPC endpoints, manages private keys securely, and processes blockchain data efficiently.
Installing Required Dependencies
# Install core dependencies for yield farming scripts
pip install web3==6.15.1
pip install python-dotenv==1.0.0
pip install requests==2.31.0
pip install pandas==2.0.3
pip install asyncio==3.4.3
Configuring Web3 Provider Connection
from web3 import Web3
from dotenv import load_dotenv
import os
# Load environment variables securely
load_dotenv()
class YieldFarmingBot:
def __init__(self):
# Initialize Web3 connection with fallback RPCs
self.w3 = self._setup_web3_connection()
self.account = self._load_account()
def _setup_web3_connection(self):
"""Setup Web3 connection with multiple RPC endpoints"""
rpc_endpoints = [
f"https://mainnet.infura.io/v3/{os.getenv('INFURA_KEY')}",
f"https://eth-mainnet.alchemyapi.io/v2/{os.getenv('ALCHEMY_KEY')}",
"https://rpc.ankr.com/eth" # Backup free endpoint
]
for endpoint in rpc_endpoints:
try:
w3 = Web3(Web3.HTTPProvider(endpoint))
if w3.is_connected():
print(f"Connected to {endpoint}")
return w3
except Exception as e:
print(f"Failed to connect to {endpoint}: {e}")
continue
raise Exception("Failed to connect to any RPC endpoint")
def _load_account(self):
"""Load account from private key with safety checks"""
private_key = os.getenv('PRIVATE_KEY')
if not private_key:
raise ValueError("PRIVATE_KEY not found in environment variables")
account = self.w3.eth.account.from_key(private_key)
return account
Building Your First Yield Farming Script
This section demonstrates how to create a basic yield farming script that monitors Uniswap V3 pools and automatically stakes tokens in high-yield opportunities.
Monitoring Liquidity Pool APY Rates
import json
import requests
from datetime import datetime, timedelta
class LiquidityPoolMonitor:
def __init__(self, web3_instance):
self.w3 = web3_instance
self.uniswap_v3_factory = "0x1F98431c8aD98523631AE4a59f267346ea31F984"
self.monitored_pools = []
def get_pool_apy(self, pool_address, days_back=7):
"""Calculate APY for specific liquidity pool"""
try:
# Get pool contract instance
pool_contract = self.w3.eth.contract(
address=pool_address,
abi=self._get_pool_abi()
)
# Fetch recent fee data
current_block = self.w3.eth.block_number
blocks_back = days_back * 6500 # Approximate blocks per day
fee_events = pool_contract.events.Swap.get_logs(
fromBlock=current_block - blocks_back,
toBlock=current_block
)
# Calculate total fees collected
total_fees = sum(event.args.amount0 + event.args.amount1
for event in fee_events)
# Get total value locked (TVL)
tvl = self._get_pool_tvl(pool_contract)
# Calculate APY: (fees / TVL) * (365 / days_back) * 100
daily_yield = total_fees / tvl / days_back
apy = daily_yield * 365 * 100
return {
'pool_address': pool_address,
'apy': apy,
'tvl': tvl,
'daily_volume': total_fees / days_back,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
print(f"Error calculating APY for {pool_address}: {e}")
return None
def scan_profitable_pools(self, min_apy=5.0, min_tvl=100000):
"""Scan for pools meeting profitability criteria"""
profitable_pools = []
# Popular token pairs to monitor
token_pairs = [
("USDC", "WETH"),
("DAI", "USDC"),
("WBTC", "WETH"),
("USDT", "WETH")
]
for token0, token1 in token_pairs:
pool_address = self._get_pool_address(token0, token1)
pool_data = self.get_pool_apy(pool_address)
if pool_data and pool_data['apy'] >= min_apy and pool_data['tvl'] >= min_tvl:
profitable_pools.append(pool_data)
# Sort by APY descending
profitable_pools.sort(key=lambda x: x['apy'], reverse=True)
return profitable_pools
Automated Liquidity Provision Strategy
class AutomatedLiquidityProvider:
def __init__(self, web3_instance, account):
self.w3 = web3_instance
self.account = account
self.position_manager = "0xC36442b4a4522E871399CD717aBDD847Ab11FE88"
def provide_liquidity(self, pool_address, token0_amount, token1_amount,
tick_lower, tick_upper):
"""Provide liquidity to Uniswap V3 pool"""
try:
# Get position manager contract
manager_contract = self.w3.eth.contract(
address=self.position_manager,
abi=self._get_position_manager_abi()
)
# Approve tokens for spending
self._approve_tokens(token0_amount, token1_amount)
# Build mint transaction
mint_params = {
'token0': self._get_token0_address(pool_address),
'token1': self._get_token1_address(pool_address),
'fee': 3000, # 0.3% fee tier
'tickLower': tick_lower,
'tickUpper': tick_upper,
'amount0Desired': token0_amount,
'amount1Desired': token1_amount,
'amount0Min': int(token0_amount * 0.95), # 5% slippage
'amount1Min': int(token1_amount * 0.95),
'recipient': self.account.address,
'deadline': int(datetime.now().timestamp()) + 1800 # 30 min
}
# Execute transaction
transaction = manager_contract.functions.mint(mint_params)
# Estimate gas and build transaction
gas_estimate = transaction.estimate_gas({'from': self.account.address})
tx_dict = {
'from': self.account.address,
'gas': int(gas_estimate * 1.2), # 20% gas buffer
'gasPrice': self.w3.eth.gas_price,
'nonce': self.w3.eth.get_transaction_count(self.account.address)
}
# Sign and send transaction
built_tx = transaction.build_transaction(tx_dict)
signed_tx = self.account.sign_transaction(built_tx)
tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction)
# Wait for confirmation
receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash)
print(f"Liquidity provided successfully! Tx hash: {receipt.transactionHash.hex()}")
return receipt
except Exception as e:
print(f"Error providing liquidity: {e}")
return None
Advanced Yield Farming Strategies
Professional yield farmers use sophisticated strategies that automatically rebalance positions, compound rewards, and optimize for multiple yield sources simultaneously.
Multi-Protocol Yield Optimization
class MultiProtocolYieldOptimizer:
def __init__(self, web3_instance, account):
self.w3 = web3_instance
self.account = account
self.protocols = {
'compound': CompoundInterface(web3_instance),
'aave': AaveInterface(web3_instance),
'yearn': YearnInterface(web3_instance),
'convex': ConvexInterface(web3_instance)
}
def find_best_yield(self, token_address, amount):
"""Compare yields across multiple protocols"""
yield_opportunities = []
for protocol_name, protocol_interface in self.protocols.items():
try:
yield_data = protocol_interface.get_apy(token_address)
if yield_data:
yield_opportunities.append({
'protocol': protocol_name,
'apy': yield_data['apy'],
'risk_score': yield_data['risk_score'],
'liquidity': yield_data['available_liquidity'],
'gas_cost': protocol_interface.estimate_deposit_gas(amount)
})
except Exception as e:
print(f"Error fetching yield from {protocol_name}: {e}")
# Calculate net APY after gas costs
for opportunity in yield_opportunities:
gas_cost_yearly = opportunity['gas_cost'] * self.w3.eth.gas_price * 52 # Weekly rebalancing
net_apy = opportunity['apy'] - (gas_cost_yearly / amount * 100)
opportunity['net_apy'] = net_apy
# Sort by net APY descending
yield_opportunities.sort(key=lambda x: x['net_apy'], reverse=True)
return yield_opportunities
def execute_optimal_strategy(self, token_address, amount, max_risk_score=5):
"""Execute the highest yield strategy within risk tolerance"""
opportunities = self.find_best_yield(token_address, amount)
# Filter by risk tolerance
safe_opportunities = [op for op in opportunities
if op['risk_score'] <= max_risk_score]
if not safe_opportunities:
print("No opportunities within risk tolerance")
return None
best_opportunity = safe_opportunities[0]
protocol = self.protocols[best_opportunity['protocol']]
print(f"Executing strategy on {best_opportunity['protocol']} "
f"with {best_opportunity['net_apy']:.2f}% net APY")
return protocol.deposit(token_address, amount)
Automated Reward Compounding
class RewardCompounder:
def __init__(self, web3_instance, account):
self.w3 = web3_instance
self.account = account
self.compound_threshold = 100 # Minimum USD value to compound
def check_pending_rewards(self, protocol_addresses):
"""Check all protocols for pending rewards worth compounding"""
total_rewards = 0
compound_actions = []
for protocol_name, address in protocol_addresses.items():
try:
contract = self.w3.eth.contract(
address=address,
abi=self._get_protocol_abi(protocol_name)
)
# Get pending rewards
pending = contract.functions.earned(self.account.address).call()
reward_value_usd = self._get_token_price_usd(pending, protocol_name)
if reward_value_usd >= self.compound_threshold:
compound_actions.append({
'protocol': protocol_name,
'amount': pending,
'value_usd': reward_value_usd,
'contract': contract
})
total_rewards += reward_value_usd
except Exception as e:
print(f"Error checking rewards for {protocol_name}: {e}")
return compound_actions, total_rewards
def execute_compound_strategy(self, compound_actions):
"""Execute compounding for all profitable rewards"""
successful_compounds = 0
for action in compound_actions:
try:
# Claim rewards
claim_tx = action['contract'].functions.getReward()
self._execute_transaction(claim_tx, "Claim rewards")
# Swap rewards to base token if needed
base_token_amount = self._swap_to_base_token(
action['amount'],
action['protocol']
)
# Reinvest in same protocol
reinvest_tx = action['contract'].functions.stake(base_token_amount)
self._execute_transaction(reinvest_tx, "Reinvest rewards")
print(f"Compounded {action['value_usd']:.2f} USD on {action['protocol']}")
successful_compounds += 1
except Exception as e:
print(f"Error compounding {action['protocol']}: {e}")
return successful_compounds
Safety and Risk Management
Yield farming scripts handle real money, making robust safety mechanisms essential. This section covers position sizing, slippage protection, and emergency stops.
Implementing Position Size Limits
class RiskManager:
def __init__(self, max_position_size=0.1, max_daily_trades=10):
self.max_position_size = max_position_size # 10% of portfolio max
self.max_daily_trades = max_daily_trades
self.daily_trade_count = 0
self.positions = {}
def validate_trade_size(self, amount, total_portfolio_value):
"""Validate trade size against position limits"""
position_percentage = amount / total_portfolio_value
if position_percentage > self.max_position_size:
raise ValueError(f"Position size {position_percentage:.1%} exceeds "
f"maximum allowed {self.max_position_size:.1%}")
if self.daily_trade_count >= self.max_daily_trades:
raise ValueError("Daily trade limit reached")
return True
def check_slippage_limits(self, expected_output, actual_output, max_slippage=0.05):
"""Verify transaction slippage is within acceptable limits"""
actual_slippage = abs(expected_output - actual_output) / expected_output
if actual_slippage > max_slippage:
raise ValueError(f"Slippage {actual_slippage:.1%} exceeds "
f"maximum {max_slippage:.1%}")
return True
def emergency_exit_all_positions(self):
"""Emergency function to exit all positions immediately"""
print("EMERGENCY EXIT TRIGGERED - Exiting all positions")
exit_results = []
for position_id, position_data in self.positions.items():
try:
result = self._exit_position(position_id, position_data)
exit_results.append(result)
print(f"Exited position {position_id}: {result['status']}")
except Exception as e:
print(f"Failed to exit position {position_id}: {e}")
return exit_results
Price Oracle Integration
class PriceOracle:
def __init__(self, web3_instance):
self.w3 = web3_instance
self.chainlink_feeds = {
'ETH/USD': '0x5f4eC3Df9cbd43714FE2740f5E3616155c5b8419',
'BTC/USD': '0xF4030086522a5bEEa4988F8cA5B36dbC97BeE88c',
'USDC/USD': '0x8fFfFfd4AfB6115b954Bd326cbe7B4BA576818f6'
}
def get_price(self, token_pair):
"""Get current price from Chainlink oracle"""
try:
if token_pair not in self.chainlink_feeds:
raise ValueError(f"Price feed not available for {token_pair}")
feed_address = self.chainlink_feeds[token_pair]
aggregator_contract = self.w3.eth.contract(
address=feed_address,
abi=self._get_chainlink_abi()
)
# Get latest price data
latest_data = aggregator_contract.functions.latestRoundData().call()
price = latest_data[1] / 10**8 # Chainlink prices have 8 decimals
timestamp = latest_data[3]
# Check if price is fresh (within last hour)
current_time = datetime.now().timestamp()
if current_time - timestamp > 3600:
print(f"Warning: Price data for {token_pair} is stale")
return {
'price': price,
'timestamp': timestamp,
'is_fresh': current_time - timestamp <= 3600
}
except Exception as e:
print(f"Error fetching price for {token_pair}: {e}")
return None
def validate_trade_price(self, token_amount, expected_usd_value, tolerance=0.02):
"""Validate trade pricing against oracle data"""
# Implementation for price validation
pass
Deploying and Monitoring Your Yield Farming Bot
Production deployment requires robust monitoring, logging, and alerting systems to track performance and catch issues before they become costly.
Setting Up Continuous Monitoring
import logging
import time
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
class YieldFarmingMonitor:
def __init__(self, bot_instance, alert_email=None):
self.bot = bot_instance
self.alert_email = alert_email
self.performance_log = []
self.setup_logging()
def setup_logging(self):
"""Configure comprehensive logging system"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('yield_farming.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def run_monitoring_loop(self, check_interval=300): # 5 minutes
"""Main monitoring loop for continuous operation"""
self.logger.info("Starting yield farming monitor")
while True:
try:
# Check all active positions
position_health = self.check_position_health()
# Monitor for new opportunities
new_opportunities = self.scan_new_opportunities()
# Check system health
system_status = self.check_system_health()
# Log performance metrics
self.log_performance_metrics()
# Send alerts if needed
if not system_status['healthy']:
self.send_alert(f"System health issue: {system_status['issues']}")
# Execute new opportunities if found
if new_opportunities:
self.evaluate_and_execute_opportunities(new_opportunities)
time.sleep(check_interval)
except Exception as e:
self.logger.error(f"Error in monitoring loop: {e}")
self.send_alert(f"Monitoring loop error: {e}")
time.sleep(60) # Wait before retrying
def check_position_health(self):
"""Monitor all active positions for issues"""
unhealthy_positions = []
for position_id, position in self.bot.active_positions.items():
try:
current_value = self.bot.get_position_value(position_id)
entry_value = position['entry_value']
# Check for significant losses
loss_percentage = (entry_value - current_value) / entry_value
if loss_percentage > 0.1: # 10% loss threshold
unhealthy_positions.append({
'position_id': position_id,
'loss_percentage': loss_percentage,
'current_value': current_value,
'protocol': position['protocol']
})
except Exception as e:
self.logger.error(f"Error checking position {position_id}: {e}")
return unhealthy_positions
def send_alert(self, message):
"""Send email alert for critical issues"""
if not self.alert_email:
return
try:
msg = MIMEText(f"""
Yield Farming Bot Alert
Time: {datetime.now()}
Message: {message}
Please check the bot status immediately.
""")
msg['Subject'] = 'Yield Farming Bot Alert'
msg['From'] = 'bot@yourdefi.com'
msg['To'] = self.alert_email
# Configure your SMTP settings
smtp_server = smtplib.SMTP('smtp.gmail.com', 587)
smtp_server.starttls()
smtp_server.login(os.getenv('EMAIL_USER'), os.getenv('EMAIL_PASS'))
smtp_server.send_message(msg)
smtp_server.quit()
self.logger.info(f"Alert sent: {message}")
except Exception as e:
self.logger.error(f"Failed to send alert: {e}")
Performance Analytics Dashboard
class PerformanceAnalyzer:
def __init__(self, bot_instance):
self.bot = bot_instance
self.start_time = datetime.now()
def calculate_total_returns(self):
"""Calculate comprehensive performance metrics"""
current_portfolio_value = self.bot.get_total_portfolio_value()
initial_portfolio_value = self.bot.initial_portfolio_value
# Calculate raw returns
absolute_return = current_portfolio_value - initial_portfolio_value
percentage_return = (absolute_return / initial_portfolio_value) * 100
# Calculate time-adjusted returns
days_elapsed = (datetime.now() - self.start_time).days
annualized_return = (percentage_return / days_elapsed) * 365 if days_elapsed > 0 else 0
# Calculate metrics by protocol
protocol_performance = {}
for position in self.bot.active_positions.values():
protocol = position['protocol']
if protocol not in protocol_performance:
protocol_performance[protocol] = {
'total_invested': 0,
'current_value': 0,
'positions': 0
}
protocol_performance[protocol]['total_invested'] += position['entry_value']
protocol_performance[protocol]['current_value'] += self.bot.get_position_value(position['id'])
protocol_performance[protocol]['positions'] += 1
return {
'absolute_return': absolute_return,
'percentage_return': percentage_return,
'annualized_return': annualized_return,
'current_value': current_portfolio_value,
'days_active': days_elapsed,
'protocol_breakdown': protocol_performance,
'timestamp': datetime.now().isoformat()
}
def generate_performance_report(self):
"""Generate detailed performance report"""
metrics = self.calculate_total_returns()
report = f"""
═══════════════════════════════════════
YIELD FARMING BOT PERFORMANCE REPORT
═══════════════════════════════════════
📊 OVERALL PERFORMANCE
├─ Total Return: ${metrics['absolute_return']:,.2f} ({metrics['percentage_return']:.2f}%)
├─ Annualized Return: {metrics['annualized_return']:.2f}%
├─ Current Portfolio Value: ${metrics['current_value']:,.2f}
└─ Days Active: {metrics['days_active']}
🏛️ PROTOCOL BREAKDOWN
"""
for protocol, data in metrics['protocol_breakdown'].items():
protocol_return = ((data['current_value'] - data['total_invested']) / data['total_invested']) * 100
report += f"""
{protocol.upper()}:
├─ Positions: {data['positions']}
├─ Invested: ${data['total_invested']:,.2f}
├─ Current Value: ${data['current_value']:,.2f}
└─ Return: {protocol_return:.2f}%
"""
return report
Advanced Troubleshooting and Optimization
Common issues in yield farming automation include failed transactions, price feed delays, and protocol changes. This section provides solutions for robust error handling.
Transaction Failure Recovery
class TransactionManager:
def __init__(self, web3_instance, max_retries=3):
self.w3 = web3_instance
self.max_retries = max_retries
self.failed_transactions = []
def execute_with_retry(self, transaction_function, *args, **kwargs):
"""Execute transaction with automatic retry logic"""
for attempt in range(self.max_retries):
try:
# Get current gas price with buffer
gas_price = int(self.w3.eth.gas_price * 1.1) # 10% buffer
# Build transaction
tx = transaction_function(*args, **kwargs)
tx_dict = tx.build_transaction({
'from': self.account.address,
'gasPrice': gas_price,
'nonce': self.w3.eth.get_transaction_count(self.account.address)
})
# Sign and send
signed_tx = self.account.sign_transaction(tx_dict)
tx_hash = self.w3.eth.send_raw_transaction(signed_tx.rawTransaction)
# Wait for confirmation with timeout
receipt = self.w3.eth.wait_for_transaction_receipt(tx_hash, timeout=300)
if receipt.status == 1:
print(f"Transaction successful: {receipt.transactionHash.hex()}")
return receipt
else:
raise Exception("Transaction failed")
except Exception as e:
print(f"Transaction attempt {attempt + 1} failed: {e}")
if attempt < self.max_retries - 1:
# Increase gas price for retry
time.sleep(30) # Wait before retry
else:
self.failed_transactions.append({
'function': transaction_function.__name__,
'args': args,
'kwargs': kwargs,
'error': str(e),
'timestamp': datetime.now()
})
raise e
def recover_failed_transactions(self):
"""Attempt to recover failed transactions"""
recovered = 0
for failed_tx in self.failed_transactions.copy():
try:
# Retry failed transaction
self.execute_with_retry(
failed_tx['function'],
*failed_tx['args'],
**failed_tx['kwargs']
)
self.failed_transactions.remove(failed_tx)
recovered += 1
except Exception as e:
print(f"Failed to recover transaction: {e}")
return recovered
Conclusion
Python yield farming scripts with Web3.py integration transform manual DeFi farming into automated profit generation. You've learned to build monitoring systems that track multiple protocols, implement safety mechanisms that protect your capital, and deploy strategies that compound returns automatically.
The scripts in this tutorial provide a foundation for sophisticated automated yield farming strategies. Start with simple single-protocol farming, then expand to multi-protocol optimization as you gain confidence.
Your yield farming bot now runs 24/7, capturing opportunities while you focus on strategy rather than execution. The combination of Python's flexibility and Web3.py's blockchain integration creates a powerful automation platform that consistently outperforms manual farming.
Ready to deploy your first automated yield farming strategy? Test the scripts on testnets first, then start with small amounts to validate your risk management systems.
Disclaimer: Yield farming involves significant financial risk. Never invest more than you can afford to lose. This tutorial is for educational purposes only and does not constitute financial advice. Always conduct thorough testing before deploying scripts with real funds.