监听链上区块、交易、事件,支持多链(ETH/BSC/Polygon),WebSocket推送实时数据
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Blockchain │────▶│ Go Monitor │────▶│ WebSocket │
│ (ETH/BSC) │ │ Service │ │ Clients │
└─────────────┘ └──────────────┘ └─────────────┘
│
┌──────┴──────┐
▼ ▼
┌──────────┐ ┌──────────┐
│PostgreSQL│ │ Redis │
└──────────┘ └──────────┘
1package main2 3import (4 "context"5 "log"6 7 "github.com/ethereum/go-ethereum/ethclient"8)9 10type ChainConfig struct {11 Name string12 RPCURL string13 ChainID int6414 WSUrl string15}16 17type BlockchainMonitor struct {18 chains map[string]*ethclient.Client19 configs []ChainConfig20}21 22func NewBlockchainMonitor(configs []ChainConfig) *BlockchainMonitor {23 return &BlockchainMonitor{24 chains: make(map[string]*ethclient.Client),25 configs: configs,26 }27}28 29func (m *BlockchainMonitor) Connect(ctx context.Context) error {30 for _, cfg := range m.configs {31 client, err := ethclient.DialContext(ctx, cfg.RPCURL)32 if err != nil {33 return fmt.Errorf("failed to connect %s: %w", cfg.Name, err)34 }35 m.chains[cfg.Name] = client36 log.Printf("Connected to %s (ChainID: %d)", cfg.Name, cfg.ChainID)37 }38 return nil39}1func (m *BlockchainMonitor) SubscribeBlocks(ctx context.Context, chainName string) error {2 client, ok := m.chains[chainName]3 if !ok {4 return fmt.Errorf("chain %s not found", chainName)5 }6 7 headers := make(chan *types.Header)8 sub, err := client.SubscribeNewHead(ctx, headers)9 if err != nil {10 return fmt.Errorf("subscribe failed: %w", err)11 }12 13 go func() {14 for {15 select {16 case err := <-sub.Err():17 log.Printf("Subscription error: %v", err)18 // 重连逻辑19 return20 case header := <-headers:21 m.processBlock(ctx, chainName, header)22 case <-ctx.Done():23 return24 }25 }26 }()27 28 return nil29}30 31func (m *BlockchainMonitor) processBlock(ctx context.Context, chain string, header *types.Header) {32 block, err := m.chains[chain].BlockByHash(ctx, header.Hash())33 if err != nil {34 log.Printf("Failed to get block: %v", err)35 return36 }37 38 log.Printf("[%s] Block #%d: %d txs, gas used: %d", 39 chain, block.Number(), len(block.Transactions()), block.GasUsed())40 41 for _, tx := range block.Transactions() {42 m.processTransaction(chain, tx)43 }44}1type WSServer struct {2 clients map[*websocket.Conn]bool3 broadcast chan []byte4 register chan *websocket.Conn5 unregister chan *websocket.Conn6 mu sync.RWMutex7}8 9func (s *WSServer) Run() {10 for {11 select {12 case client := <-s.register:13 s.mu.Lock()14 s.clients[client] = true15 s.mu.Unlock()16 17 case client := <-s.unregister:18 s.mu.Lock()19 if _, ok := s.clients[client]; ok {20 delete(s.clients, client)21 client.Close()22 }23 s.mu.Unlock()24 25 case message := <-s.broadcast:26 s.mu.RLock()27 for client := range s.clients {28 err := client.WriteMessage(websocket.TextMessage, message)29 if err != nil {30 client.Close()31 delete(s.clients, client)32 }33 }34 s.mu.RUnlock()35 }36 }37}38 39func (s *WSServer) BroadcastBlock(block *BlockEvent) {40 data, _ := json.Marshal(block)41 s.broadcast <- data42}1type Storage struct {2 db *sql.DB3 redis *redis.Client4}5 6func (s *Storage) SaveBlock(ctx context.Context, block *BlockData) error {7 // 存储到 PostgreSQL8 _, err := s.db.ExecContext(ctx, `9 INSERT INTO blocks (chain, number, hash, parent_hash, timestamp, tx_count, gas_used)10 VALUES ($1, $2, $3, $4, $5, $6, $7)11 ON CONFLICT (chain, number) DO NOTHING12 `, block.Chain, block.Number, block.Hash, block.ParentHash, 13 block.Timestamp, block.TxCount, block.GasUsed)14 15 if err != nil {16 return err17 }18 19 // 缓存到 Redis (最近100个区块)20 key := fmt.Sprintf("block:%s:latest", block.Chain)21 data, _ := json.Marshal(block)22 s.redis.LPush(ctx, key, data)23 s.redis.LTrim(ctx, key, 0, 99)24 25 return nil26}27 28func (s *Storage) GetLatestBlocks(ctx context.Context, chain string, limit int) ([]*BlockData, error) {29 key := fmt.Sprintf("block:%s:latest", chain)30 results, err := s.redis.LRange(ctx, key, 0, int64(limit-1)).Result()31 if err != nil {32 return nil, err33 }34 35 blocks := make([]*BlockData, len(results))36 for i, r := range results {37 json.Unmarshal([]byte(r), &blocks[i])38 }39 return blocks, nil40}