将处理过程分解为多个阶段,每个阶段通过 channel 连接。
数据需要经过多个处理步骤,每个步骤可以并行执行。
每个阶段是一个 goroutine,通过 channel 传递数据。
1// 区块处理 Pipeline2func BlockPipeline(ctx context.Context, blocks <-chan *types.Block) <-chan *ProcessedBlock {3 // Stage 1: 解析交易4 txs := parseTxStage(ctx, blocks)5 6 // Stage 2: 解码日志7 logs := decodeLogsStage(ctx, txs)8 9 // Stage 3: 存储数据10 results := storeStage(ctx, logs)11 12 return results13}14 15func parseTxStage(ctx context.Context, in <-chan *types.Block) <-chan *ParsedTx {16 out := make(chan *ParsedTx)17 go func() {18 defer close(out)19 for block := range in {20 for _, tx := range block.Transactions() {21 parsed := parseTx(tx)22 select {23 case out <- parsed:24 case <-ctx.Done():25 return26 }27 }28 }29 }()30 return out31}32 33func decodeLogsStage(ctx context.Context, in <-chan *ParsedTx) <-chan *DecodedLog {34 out := make(chan *DecodedLog)35 go func() {36 defer close(out)37 for tx := range in {38 for _, log := range tx.Logs {39 decoded := decodeLog(log)40 select {41 case out <- decoded:42 case <-ctx.Done():43 return44 }45 }46 }47 }()48 return out49}区块链数据索引器,流式处理区块、交易、事件。