Easy-Go-Web3
知识图谱Go 教程React Web3智能合约
需求分析系统设计设计模式Go 微服务
项目实战DevOps
Go 生态React 生态智能合约生态Web3 生态AI × Web3工具箱Web3 公司远程Web3求职
🎯 AA 工程师面试手册博客
GitHub
项目实战区块链事件索引器
中级基础设施5-6周

区块链事件索引器

高性能链上事件监听与索引服务,支持自定义 ABI 解析、事件过滤与全文检索

技术栈

GoClickHouseKafkaElasticsearchgRPC

核心功能

多链事件监听(EVM 兼容链)
自定义 ABI 动态解析
事件过滤与订阅机制
ClickHouse 高速写入与查询
Elasticsearch 全文检索
链重组(Reorg)处理

系统架构

┌─────────────────────────────────────────────────────────────┐
│                     gRPC API Layer                           │
├─────────────────────────────────────────────────────────────┤
│  Query Service  │  Subscribe Service  │  Admin Service      │
├─────────────────────────────────────────────────────────────┤
│                    Event Processor                           │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │ ABI Parser  │  │Event Filter │  │ Transformer │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
├─────────────────────────────────────────────────────────────┤
│  Block Fetcher  │  Kafka Queue   │  Reorg Handler          │
├─────────────────────────────────────────────────────────────┤
│  ClickHouse (Events)  │  Elasticsearch (Search)            │
└─────────────────────────────────────────────────────────────┘

课程章节

第一章:区块同步引擎

高效区块拉取策略3小时
断点续传与状态恢复2小时
链重组检测与处理3小时

第二章:事件解析系统

ABI 动态加载与缓存2小时
Event Log 解码实现3小时
自定义事件过滤器2小时

第三章:高速存储层

ClickHouse 表设计与分区3小时
Kafka 消息队列集成3小时
批量写入优化2小时

第四章:检索与订阅

Elasticsearch 索引设计2小时
gRPC 流式订阅实现3小时
查询性能优化2小时

核心代码实现

事件解析器

go
1// EventParser 动态解析链上事件
2type EventParser struct {
3 abiCache sync.Map // contract -> parsed ABI
4 client *ethclient.Client
5}
6
7// ParseEventLog 解析原始事件日志
8func (p *EventParser) ParseEventLog(
9 ctx context.Context,
10 log types.Log,
11) (*ParsedEvent, error) {
12 // 获取或加载合约 ABI
13 abi, err := p.getContractABI(ctx, log.Address)
14 if err != nil {
15 // 未知合约,返回原始数据
16 return &ParsedEvent{
17 Raw: log,
18 EventName: "Unknown",
19 Decoded: nil,
20 }, nil
21 }
22
23 // 根据 topic[0] 查找事件定义
24 eventSig := log.Topics[0]
25 event, err := abi.EventByID(eventSig)
26 if err != nil {
27 return nil, fmt.Errorf("event not found: %w", err)
28 }
29
30 // 解码 indexed 参数 (topics[1:])
31 indexed := make([]interface{}, 0)
32 for i, input := range event.Inputs {
33 if input.Indexed && i+1 < len(log.Topics) {
34 val, _ := p.decodeIndexed(input.Type, log.Topics[i+1])
35 indexed = append(indexed, val)
36 }
37 }
38
39 // 解码 non-indexed 参数 (data)
40 nonIndexed := make(map[string]interface{})
41 if len(log.Data) > 0 {
42 err = abi.UnpackIntoMap(nonIndexed, event.Name, log.Data)
43 if err != nil {
44 return nil, fmt.Errorf("unpack data: %w", err)
45 }
46 }
47
48 return &ParsedEvent{
49 Raw: log,
50 EventName: event.Name,
51 Contract: log.Address,
52 BlockNumber: log.BlockNumber,
53 TxHash: log.TxHash,
54 Indexed: indexed,
55 Decoded: nonIndexed,
56 Timestamp: time.Now(),
57 }, nil
58}

链重组处理

go
1// ReorgHandler 处理链重组事件
2type ReorgHandler struct {
3 db *ClickHouseDB
4 blockCache *lru.Cache
5 confirmations int
6}
7
8// DetectReorg 检测链重组
9func (h *ReorgHandler) DetectReorg(
10 ctx context.Context,
11 newBlock *types.Block,
12) (*ReorgEvent, error) {
13 // 获取缓存的父区块哈希
14 cachedParent, ok := h.blockCache.Get(newBlock.NumberU64() - 1)
15 if !ok {
16 return nil, nil // 无缓存,无法检测
17 }
18
19 // 比较父区块哈希
20 if cachedParent.(common.Hash) != newBlock.ParentHash() {
21 // 检测到重组!
22 reorgDepth := h.findReorgDepth(ctx, newBlock)
23
24 return &ReorgEvent{
25 DetectedAt: newBlock.NumberU64(),
26 ReorgDepth: reorgDepth,
27 OldBranch: h.getOldBranchBlocks(reorgDepth),
28 NewBranch: newBlock.Hash(),
29 }, nil
30 }
31
32 return nil, nil
33}
34
35// HandleReorg 执行重组回滚
36func (h *ReorgHandler) HandleReorg(
37 ctx context.Context,
38 event *ReorgEvent,
39) error {
40 log.Warn("Chain reorg detected",
41 "depth", event.ReorgDepth,
42 "from_block", event.DetectedAt-uint64(event.ReorgDepth),
43 )
44
45 // 1. 删除受影响区块的事件
46 startBlock := event.DetectedAt - uint64(event.ReorgDepth)
47 err := h.db.DeleteEventsAfterBlock(ctx, startBlock)
48 if err != nil {
49 return fmt.Errorf("delete events: %w", err)
50 }
51
52 // 2. 清理缓存
53 for i := startBlock; i <= event.DetectedAt; i++ {
54 h.blockCache.Remove(i)
55 }
56
57 // 3. 触发重新索引
58 h.triggerReindex(ctx, startBlock)
59
60 return nil
61}
DeFi 投资组合追踪器多链桥接 API 服务
Easy-Go-Web3

构建 Go 后端与 Web3 的学习之路。从基础到进阶,从理论到实践,助你成为全栈区块链开发者。

学习路径

  • 知识图谱
  • Go 教程
  • Go 微服务
  • 面试手册

资源中心

  • 工具箱
  • DevOps 工具
  • Web3 生态
  • 博客

© 2025 Easy-Go-Web3. All rights reserved.

Created withbyhardybao