Building an On-Chain Data Pipeline with AI Agents and The Graph

Step-by-step guide to querying, transforming, and analyzing Ethereum on-chain data using AI agents, web3.py, and The Graph subgraphs — with real DeFi protocol examples.

Dune Analytics is great until your query costs $40 in credits and returns stale data. Here's how to build your own on-chain data pipeline for $0. You’re a developer, not a finance department. You need fresh, queryable data to feed your trading bot, your dashboard, or your risk model, and waiting for a third-party platform to index the latest block is like watching paint dry on the blockchain. Let’s cut out the middleman, leverage The Graph for heavy lifting, and use a dash of AI to glue it all together without burning your wallet on RPC calls.

Why Your RPC Node is Begging for Mercy

You’ve probably started with web3.py or ethers.js, pointed it at a public endpoint, and hammered it with eth_getLogs for event history. It works until you hit the rate limit, or worse, you realize that Ethereum processes ~1.2M transactions/day (Etherscan, Q1 2026). Sifting through that via direct RPC calls for a specific DeFi protocol is like drinking from a firehose and asking for a specific molecule of water. Each call has latency and cost, especially if you’re using a managed service.

Let’s break down your options before we build:

  • Direct RPC (web3.py/ethers.js): Maximum flexibility, real-time data. Suffers from rate limits, requires complex data aggregation logic, and latency varies wildly by provider.
  • The Graph Subgraphs: Indexed, structured data served via GraphQL. Someone (maybe you) has to define the schema and indexing logic. Once deployed, queries are fast, cheap, and perfect for historical analysis.
  • Dune Analytics: Wizard-like for SQL lovers, but you’re at the mercy of their indexing speed, query engine costs, and abstraction layer. When you need data from a contract deployed 10 minutes ago, Dune is still having coffee.

The smart move is a hybrid approach. Use The Graph for historical, aggregated queries and RPC for real-time, edge-case checks. Your pipeline should know the difference.

Configuring web3.py with an AI Copilot for Smarter Queries

Open VS Code (`Ctrl+`` to open the terminal). We’re not just writing scripts; we’re building a resilient query engine. With extensions like Continue.dev or GitHub Copilot, you can offload the boilerplate and focus on the data logic. The AI can help you structure complex multi-call requests or parse abstruse ABI outputs.

First, set up a robust provider setup with fallbacks. Never rely on a single RPC endpoint.


import os
from web3 import Web3, HTTPProvider
from web3.middleware import geth_poa_middleware

class FallbackProvider:
    def __init__(self):
        # Get your free RPC URLs from services like Infura, Alchemy, or run a node.
        self.provider_urls = [
            os.getenv('ALCHEMY_URL'),
            os.getenv('INFURA_URL'),
            'https://eth.llamarpc.com',  # Public fallback
        ]
        self.active_provider_index = 0
        self.w3 = self._connect()

    def _connect(self):
        """Connect to the current active provider."""
        provider = HTTPProvider(self.provider_urls[self.active_provider_index])
        w3 = Web3(provider)
        # Essential for chains like Polygon PoS
        w3.middleware_onion.inject(geth_poa_middleware, layer=0)
        return w3

    def switch_provider(self):
        """Rotate to the next provider on failure."""
        self.active_provider_index = (self.active_provider_index + 1) % len(self.provider_urls)
        print(f"Switching to provider: {self.active_provider_index}")
        self.w3 = self._connect()

# Usage
provider = FallbackProvider()
w3 = provider.w3

# Always estimate gas first to avoid a costly, failed tx
try:
    gas_estimate = w3.eth.estimate_gas({'to': '0x...', 'from': '0x...', 'value': w3.to_wei(0.1, 'ether')})
except Exception as e:
    if "insufficient funds" in str(e):
        # Error: insufficient funds for gas * price + value
        # Fix: check msg.value and gas estimation before sending
        print("ABORT: Wallet doesn't have enough for gas + value. Check balance.")
        provider.switch_provider()  # Maybe it's a provider state issue

This pattern saves you from hitting a wall when your primary provider throttles you. Benchmark: web3.py RPC call latency shows why you need speed: self-hosted nodes can hit ~40ms, while managed services like Infura and Alchemy average ~120ms and ~95ms respectively. A fallback keeps you running.

From Raw Logs to a GraphQL Subgraph with AI Assistance

Now for the heavy lifting. Writing a subgraph for The Graph (on the decentralized network or the hosted service) can feel like writing a smart contract in a foreign language. This is where an AI agent in your IDE shines. You can prompt it with: "Generate a GraphQL schema and mapping for indexing Uniswap V3 Swap events, including derived USD values using a Chainlink price feed."

Here’s a distilled version of what you’ll create. First, the schema (schema.graphql):

type Swap @entity {
  id: ID! # transactionHash-logIndex
  transactionHash: Bytes!
  pool: Pool!
  sender: Bytes!
  recipient: Bytes!
  amount0: BigInt!
  amount1: BigInt!
  sqrtPriceX96: BigInt!
  liquidity: BigInt!
  tick: Int!
  timestamp: BigInt!
  derivedAmountUSD: BigDecimal! # Calculated in the mapping
}

type Pool @entity {
  id: ID! # pool address
  token0: Bytes!
  token1: Bytes!
  swaps: [Swap!]! @derivedFrom(field: "pool")
}

Next, the mapping logic (src/mapping.ts). The AI can help you handle BigInt conversions and price feed lookups:

import { Swap } from "../generated/schema"
import { Swap as SwapEvent } from "../generated/UniswapV3Pool/UniswapV3Pool"
import { BigInt, Address, BigDecimal } from "@graphprotocol/graph-ts"
import { ChainlinkPriceFeed } from "./utils/priceFeed"

export function handleSwap(event: SwapEvent): void {
  let entity = new Swap(event.transaction.hash.concatI32(event.logIndex.toI32()))
  entity.pool = event.address
  entity.sender = event.params.sender
  entity.recipient = event.params.recipient
  entity.amount0 = event.params.amount0
  entity.amount1 = event.params.amount1
  entity.sqrtPriceX96 = event.params.sqrtPriceX96
  entity.liquidity = event.params.liquidity
  entity.tick = event.params.tick
  entity.timestamp = event.block.timestamp

  // Use a mock or actual Chainlink aggregator to get USD value
  let priceFeed = new ChainlinkPriceFeed(Address.fromString("0x..."))
  let token0AmountUSD = priceFeed.getUSDValue(event.params.amount0, pool.token0)
  let token1AmountUSD = priceFeed.getUSDValue(event.params.amount1, pool.token1)
  // Use absolute value as one amount is negative (token in)
  entity.derivedAmountUSD = token0AmountUSD.abs().plus(token1AmountUSD.abs())

  entity.save()
}

Deploy this subgraph, and you’ve just created your own dedicated, queryable database for swap events. No more scanning logs.

Benchmark: The Latency Showdown – RPC vs. The Graph

Let’s get quantitative. When should you call your RPC and when should you query your subgraph? The answer is in the milliseconds and the complexity.

Query TypeMethodAvg LatencyData FreshnessComplexity Cost
Latest Block NumberDirect RPC (Alchemy)~95 msReal-timeTrivial
All Swaps in a Pool (Last 24h)Direct RPC (eth_getLogs)1200+ msReal-timeVery High (rate limits, data processing)
All Swaps in a Pool (Last 24h)The Graph Subgraph~350 ms~1-2 blocks behindLow (single GraphQL query)
Aggregated Daily VolumeThe Graph Subgraph~200 ms~1-2 blocks behindTrivial (pre-indexed)
Simulate a Contract CallDirect RPC (eth_call)~150 msReal-timeMedium

The table tells the story. For real-time actions (e.g., checking a wallet balance before executing a trade), use RPC. For any analytical, historical, or aggregated data (e.g., "show me volume trends"), the subgraph wins every time on performance and cost (Gas fees on Ethereum L2s average $0.01–$0.05 vs mainnet's $3–$15, so indexing is cheap).

Building a Real-Time Uniswap V3 Liquidity Monitor

Let’s stitch both worlds together. Your AI-augmented pipeline will use the subgraph for historical context and the RPC for the latest state.

Goal: Monitor the ETH/USDC pool for large liquidity withdrawals in real-time.

Step 1: Query the subgraph for the last hour's baseline.

# pipeline/liquidity_monitor.py
import requests
import json

GRAPHQL_ENDPOINT = "https://api.thegraph.com/subgraphs/name/your-username/uniswap-v3-mainnet"

query = """
{
  pools(where: {id: "0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640"}) {
    liquidity
    swaps(first: 100, orderBy: timestamp, orderDirection: desc) {
      derivedAmountUSD
    }
  }
}
"""
response = requests.post(GRAPHQL_ENDPOINT, json={'query': query})
data = response.json()
baseline_liquidity = int(data['data']['pools'][0]['liquidity'])

Step 2: Use web3.py to get the real-time liquidity.

from web3 import Web3
from pipeline.providers import FallbackProvider

provider = FallbackProvider()
w3 = provider.w3

UNISWAP_V3_POOL_ABI = [...] # Abridged ABI for the pool contract
POOL_ADDRESS = "0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640"

pool_contract = w3.eth.contract(address=POOL_ADDRESS, abi=UNISWAP_V3_POOL_ABI)

try:
    current_liquidity = pool_contract.functions.liquidity().call(block_identifier='latest')
except Exception as e:
    # Handle potential node issues
    print(f"RPC call failed: {e}")
    provider.switch_provider()
    current_liquidity = pool_contract.functions.liquidity().call(block_identifier='latest')

# Calculate change
liquidity_change = ((current_liquidity - baseline_liquidity) / baseline_liquidity) * 100
if abs(liquidity_change) > 10:  # Alert on >10% change
    print(f"ALERT: Liquidity changed by {liquidity_change:.2f}%")

This pipeline gives you a historical baseline from the indexed data and a real-time check via RPC, creating a complete picture.

Handling the Inevitable: Rate Limits and Failed Calls

Your RPC provider will shut you down if you’re not careful. Beyond the fallback provider, implement intelligent batching and retry logic.

  • Batch Requests: Use web3.py's make_batch_request or a library like asyncio to bundle multiple eth_call requests.
  • Exponential Backoff: When you hit a rate limit error (HTTP 429), don’t just retry immediately. Wait and increase the delay.
  • Nonce Management: If you’re also writing transactions, this error will haunt you: Error: nonce too low. The fix: sync nonce with provider.getTransactionCount(address, 'pending') before building and sending every transaction.
# Get the correct nonce, accounting for pending tx
pending_nonce = w3.eth.get_transaction_count(your_address, 'pending')
transaction['nonce'] = pending_nonce
signed_txn = w3.eth.account.sign_transaction(transaction, private_key)

Next Steps: From Pipeline to Production

You’ve now got the blueprint. The next evolution is to containerize this pipeline with Docker, schedule it with Airflow or Prefect, and stream the results to a real-time dashboard or a Kafka topic for other services. Remember, DeFi TVL reached $180B in Jan 2026 (DefiLlama); the data is valuable, and your ability to parse it faster and cheaper is a tangible edge.

Start by forking a major protocol’s subgraph from The Graph Explorer and customizing it. Integrate the FallbackProvider class into your existing bots. Use your AI IDE assistant to generate the tedious parts of the mapping logic. Stop paying for queries and waiting for data. Build the pipeline, own the data, and move at the speed of the chain itself.