将任务分发给多个 worker 并行处理,然后汇总结果。
单个处理器无法满足吞吐量需求,需要并行加速。
Fan-out 分发任务,Fan-in 合并结果。
1func FanOut(ctx context.Context, in <-chan Task, workers int) []<-chan Result {2 channels := make([]<-chan Result, workers)3 for i := 0; i < workers; i++ {4 channels[i] = worker(ctx, in)5 }6 return channels7}8 9func FanIn(ctx context.Context, channels ...<-chan Result) <-chan Result {10 var wg sync.WaitGroup11 out := make(chan Result)12 13 output := func(c <-chan Result) {14 defer wg.Done()15 for result := range c {16 select {17 case out <- result:18 case <-ctx.Done():19 return20 }21 }22 }23 24 wg.Add(len(channels))25 for _, c := range channels {26 go output(c)27 }28 29 go func() {30 wg.Wait()31 close(out)32 }()33 34 return out35}36 37// 使用:并行获取多个代币价格38func GetTokenPrices(ctx context.Context, tokens []string) map[string]*big.Int {39 in := make(chan Task)40 go func() {41 defer close(in)42 for _, token := range tokens {43 in <- Task{ID: token, Payload: token}44 }45 }()46 47 workers := FanOut(ctx, in, 10)48 results := FanIn(ctx, workers...)49 50 prices := make(map[string]*big.Int)51 for result := range results {52 prices[result.TaskID] = result.Data.(*big.Int)53 }54 return prices55}并行查询多个链的数据、批量获取代币价格。