高性能链上事件监听与索引服务,支持自定义 ABI 解析、事件过滤与全文检索
┌─────────────────────────────────────────────────────────────┐ │ gRPC API Layer │ ├─────────────────────────────────────────────────────────────┤ │ Query Service │ Subscribe Service │ Admin Service │ ├─────────────────────────────────────────────────────────────┤ │ Event Processor │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ ABI Parser │ │Event Filter │ │ Transformer │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ ├─────────────────────────────────────────────────────────────┤ │ Block Fetcher │ Kafka Queue │ Reorg Handler │ ├─────────────────────────────────────────────────────────────┤ │ ClickHouse (Events) │ Elasticsearch (Search) │ └─────────────────────────────────────────────────────────────┘
1// EventParser 动态解析链上事件2type EventParser struct {3 abiCache sync.Map // contract -> parsed ABI4 client *ethclient.Client5}6 7// ParseEventLog 解析原始事件日志8func (p *EventParser) ParseEventLog(9 ctx context.Context,10 log types.Log,11) (*ParsedEvent, error) {12 // 获取或加载合约 ABI13 abi, err := p.getContractABI(ctx, log.Address)14 if err != nil {15 // 未知合约,返回原始数据16 return &ParsedEvent{17 Raw: log,18 EventName: "Unknown",19 Decoded: nil,20 }, nil21 }22 23 // 根据 topic[0] 查找事件定义24 eventSig := log.Topics[0]25 event, err := abi.EventByID(eventSig)26 if err != nil {27 return nil, fmt.Errorf("event not found: %w", err)28 }29 30 // 解码 indexed 参数 (topics[1:])31 indexed := make([]interface{}, 0)32 for i, input := range event.Inputs {33 if input.Indexed && i+1 < len(log.Topics) {34 val, _ := p.decodeIndexed(input.Type, log.Topics[i+1])35 indexed = append(indexed, val)36 }37 }38 39 // 解码 non-indexed 参数 (data)40 nonIndexed := make(map[string]interface{})41 if len(log.Data) > 0 {42 err = abi.UnpackIntoMap(nonIndexed, event.Name, log.Data)43 if err != nil {44 return nil, fmt.Errorf("unpack data: %w", err)45 }46 }47 48 return &ParsedEvent{49 Raw: log,50 EventName: event.Name,51 Contract: log.Address,52 BlockNumber: log.BlockNumber,53 TxHash: log.TxHash,54 Indexed: indexed,55 Decoded: nonIndexed,56 Timestamp: time.Now(),57 }, nil58}1// ReorgHandler 处理链重组事件2type ReorgHandler struct {3 db *ClickHouseDB4 blockCache *lru.Cache5 confirmations int6}7 8// DetectReorg 检测链重组9func (h *ReorgHandler) DetectReorg(10 ctx context.Context,11 newBlock *types.Block,12) (*ReorgEvent, error) {13 // 获取缓存的父区块哈希14 cachedParent, ok := h.blockCache.Get(newBlock.NumberU64() - 1)15 if !ok {16 return nil, nil // 无缓存,无法检测17 }18 19 // 比较父区块哈希20 if cachedParent.(common.Hash) != newBlock.ParentHash() {21 // 检测到重组!22 reorgDepth := h.findReorgDepth(ctx, newBlock)23 24 return &ReorgEvent{25 DetectedAt: newBlock.NumberU64(),26 ReorgDepth: reorgDepth,27 OldBranch: h.getOldBranchBlocks(reorgDepth),28 NewBranch: newBlock.Hash(),29 }, nil30 }31 32 return nil, nil33}34 35// HandleReorg 执行重组回滚36func (h *ReorgHandler) HandleReorg(37 ctx context.Context,38 event *ReorgEvent,39) error {40 log.Warn("Chain reorg detected",41 "depth", event.ReorgDepth,42 "from_block", event.DetectedAt-uint64(event.ReorgDepth),43 )44 45 // 1. 删除受影响区块的事件46 startBlock := event.DetectedAt - uint64(event.ReorgDepth)47 err := h.db.DeleteEventsAfterBlock(ctx, startBlock)48 if err != nil {49 return fmt.Errorf("delete events: %w", err)50 }51 52 // 2. 清理缓存53 for i := startBlock; i <= event.DetectedAt; i++ {54 h.blockCache.Remove(i)55 }56 57 // 3. 触发重新索引58 h.triggerReindex(ctx, startBlock)59 60 return nil61}