A Practical Guide to Fetching Ethereum Blockchain Data

·

This guide demonstrates how to use SpringBoot and Web3J to retrieve transaction data from the Ethereum blockchain. The data obtained is primarily used for calculating real-time popular smart contracts.

Prerequisites for Implementation

To begin, ensure you have the following components ready:

Project Overview and Approach

The goal is to fetch Ethereum transaction data and write it to a Kafka queue. We can break this down into a clear, logical workflow:

  1. Establish a connection to the Ethereum network.
  2. Fetch the desired block data.
  3. Format the retrieved data and write it to Kafka.
  4. Initiate a service to synchronize data continuously.

Establishing a Connection to the Ethereum Network

There are multiple methods to connect to the Ethereum network. You can run your own node and connect to it, or use a connection endpoint provided by a third-party service. This tutorial utilizes the latter approach for its simplicity and reliability.

Using a Third-Party Service Provider

A service like Alchemy provides robust and scalable access to the Ethereum blockchain.

  1. Begin by creating an account on the Alchemy platform.
  2. Once registered, create a new application within your dashboard to generate a unique API key and HTTP endpoint.

Fetching Block Data

We will use the Web3J library, a lightweight Java and Android library for integrating with Ethereum clients, to interact with the blockchain and fetch data.

Integrating Web3J Dependencies

To incorporate Web3J into your SpringBoot project, add the following dependencies to your pom.xml file. These packages provide the core functionality for blockchain interaction.

<dependency>
    <groupId>org.web3j</groupId>
    <artifactId>web3j-spring-boot-starter</artifactId>
    <version>4.0.3</version>
</dependency>

<dependency>
    <groupId>org.web3j</groupId>
    <artifactId>crypto</artifactId>
    <version>4.9.7</version>
</dependency>

<dependency>
    <groupId>org.web3j</groupId>
    <artifactId>core</artifactId>
    <version>4.9.7</version>
</dependency>

Configuring Application Properties

The connection to Ethereum and Kafka is configured in the application.yml file. Replace the placeholder with the API key from your Alchemy application.

web3j:
  client-address: https://eth-mainnet.g.alchemy.com/v2/your-api-key-here

server:
  port: 8900

spring:
  application:
    name: ethereum-data-pipeline
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: ods_transaction
      enable-auto-commit: true
      auto-commit-interval: 100ms
      properties:
        session.timeout.ms: 15000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Defining the Data Model

A Java class is used to model the structure of an Ethereum transaction. This object will be serialized into JSON format before being sent to Kafka.

import java.math.BigInteger;

public class EthTransactions {
    private String hash;
    private String blockHash;
    private BigInteger blockNumber;
    private String from;
    private String to;
    private BigInteger value;
    private BigInteger timestamp;
    private Integer transactionsType;

    // Constructor
    public EthTransactions(String hash, String blockHash, BigInteger blockNumber, String from, String to, BigInteger value, BigInteger timestamp, Integer transactionsType) {
        this.hash = hash;
        this.blockHash = blockHash;
        this.blockNumber = blockNumber;
        this.from = from;
        this.to = to;
        this.value = value;
        this.timestamp = timestamp;
        this.transactionsType = transactionsType;
    }

    // Standard Getter and Setter methods for each field
    public String getHash() { return hash; }
    public void setHash(String hash) { this.hash = hash; }
    // ... other getters and setters
}

Data Processing and Kafka Integration

The core logic involves fetching blocks, processing their transactions, and publishing them to a Kafka topic.

The Data Synchronization Class

This class handles the interaction with Web3J, processes block data, and manages Kafka producers.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameterNumber;
import org.web3j.protocol.core.methods.response.EthBlock;
import org.web3j.protocol.core.methods.response.EthBlockNumber;
import org.web3j.protocol.core.methods.response.EthGetCode;
import org.web3j.protocol.core.methods.response.Transaction;
import java.math.BigInteger;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class EtherDataPipeline {
    private static final Logger logger = LoggerFactory.getLogger(EtherDataPipeline.class);
    private KafkaTemplate<String, String> kafkaTemplate;
    private Web3j web3j;
    private BigInteger blockNumber;
    private Gson gson;

    public EtherDataPipeline(KafkaTemplate<String, String> kafkaTemplate, Web3j web3j) throws ExecutionException, InterruptedException {
        this.web3j = web3j;
        this.kafkaTemplate = kafkaTemplate;
        this.blockNumber = getLastBlockNumber();
        this.gson = new Gson();
    }

    private BigInteger getLastBlockNumber() throws ExecutionException, InterruptedException {
        EthBlockNumber ethBlockNumber = this.web3j.ethBlockNumber().sendAsync().get();
        BigInteger currentBlock = ethBlockNumber.getBlockNumber();
        logger.info("Current BlockNumber: {}", currentBlock);
        return currentBlock;
    }

    public void startSyncTransactionsData() throws ExecutionException, InterruptedException {
        BigInteger latestBlock = getLastBlockNumber();
        if (this.blockNumber.equals(latestBlock)) {
            return; // No new blocks
        }
        this.blockNumber = latestBlock;
        this.syncTransactionsDataToKafka();
    }

    private void syncTransactionsDataToKafka() throws ExecutionException, InterruptedException {
        DefaultBlockParameterNumber blockParam = new DefaultBlockParameterNumber(this.blockNumber);
        EthBlock ethBlock = this.web3j.ethGetBlockByNumber(blockParam, true).sendAsync().get();
        List<EthBlock.TransactionResult> transactionResults = ethBlock.getBlock().getTransactions();
        BigInteger timestamp = ethBlock.getBlock().getTimestamp();

        for (EthBlock.TransactionResult txResult : transactionResults) {
            Transaction tx = (Transaction) txResult;
            Integer transactionType = isContractTransaction(tx.getTo());
            EthTransactions ethTx = new EthTransactions(
                    tx.getHash(), tx.getBlockHash(), tx.getBlockNumber(),
                    tx.getFrom(), tx.getTo(), tx.getValue(),
                    timestamp, transactionType
            );
            String jsonMessage = this.gson.toJson(ethTx);
            this.kafkaTemplate.send("ods_transactions", jsonMessage);
        }
    }

    private Integer isContractTransaction(String address) throws ExecutionException, InterruptedException {
        if (address == null) {
            return 1; // Contract creation transaction
        }
        DefaultBlockParameterNumber blockParam = new DefaultBlockParameterNumber(this.blockNumber);
        EthGetCode ethGetCode = web3j.ethGetCode(address, blockParam).sendAsync().get();
        String code = ethGetCode.getResult();
        return (code.equals("0x")) ? 0 : 1; // 0 for normal, 1 for contract
    }
}

Initiating the Data Sync Service

To start the continuous data synchronization process as soon as the SpringBoot application is running, we implement an ApplicationRunner.

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.web3j.protocol.Web3j;

@Slf4j
@Component
public class InitApplicationDataSyncRunner implements ApplicationRunner {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private Web3j web3j;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        EtherDataPipeline etherDataPipeline = new EtherDataPipeline(kafkaTemplate, web3j);
        while (true) {
            etherDataPipeline.startSyncTransactionsData();
            Thread.sleep(12000); // Poll approximately every 12 seconds (avg block time)
        }
    }
}

This implementation provides a foundational workflow for streaming Ethereum data. For a production-ready system, critical enhancements like checkpointing for fault tolerance, managing backpressure, and optimizing connection pools are essential. 👉 Explore more strategies for robust data pipelines

Frequently Asked Questions

What is the primary use case for fetching Ethereum block data?

This data is crucial for on-chain analytics. Common use cases include identifying trending smart contracts by transaction volume, analyzing transaction flows for specific tokens or NFTs, tracking wallet activity, and building real-time dashboards that reflect the current state of the blockchain.

Why use a service like Alchemy instead of running my own node?

Running a self-hosted Ethereum node requires significant storage, bandwidth, and maintenance to stay synced. Service providers offer managed, scalable, and highly available access to the blockchain, which simplifies development, reduces infrastructure overhead, and ensures reliable connectivity for your applications.

How does the code differentiate between a regular and a contract transaction?

The code checks the bytecode at the recipient's to address. If the ethGetCode call returns 0x (meaning no code is present), it is classified as a regular value transfer between externally owned accounts (EOAs). If it returns any other bytecode, the transaction is interacting with a smart contract.

What is the purpose of the infinite loop in the ApplicationRunner?

The loop continuously polls the Ethereum network for the latest block number. If a new block has been mined, it fetches that block's data and sends its transactions to Kafka. In a real-world scenario, you would add a delay (e.g., Thread.sleep()) to align with the average Ethereum block time of around 12 seconds.

How can I make this data pipeline more robust for production?

Key improvements include implementing a persistence layer to store the last processed block number, enabling recovery from failures. Additionally, consider using reactive programming for better concurrency, adding comprehensive logging and monitoring, and implementing retry logic with exponential backoff for network calls.

Can this method be used to fetch historical block data?

Absolutely. Instead of always tracking the latest block, you can modify the logic to iterate through a range of historical block numbers. However, be mindful of the large volume of data and the potential rate limits imposed by your node provider when fetching historical information.