golang中消息总线 EventBus 是 Go 语言中轻量级的事件总线库,核心用于实现组件间的解耦通信(发布 - 订阅模式),支持普通事件、异步事件和定时事件。
首先通过 go get 安装依赖,在终端执行以下命令:
go get github.com/asaskevich/EventBus
核心使用步骤 初始化 EventBus 创建一个事件总线实例,有两种常用方式:
package mainimport ( "fmt" "github.com/asaskevich/EventBus" ) func main () { globalBus := EventBus.Default() customBus := EventBus.New() }
订阅事件(Subscribe) 通过 Subscribe(topic string, handler interface{}) 绑定「事件主题(topic)」和「处理函数(handler)」,一个主题可绑定多个处理函数。
注意 :处理函数需满足「参数数量与发布事件的参数一致」,且返回值可选(若有返回值,发布时需处理)。
func handleHello (name string ) { fmt.Printf("Hello, %s!\n" , name) } func handleAdd (a, b int ) int { return a + b } func main () { bus := EventBus.Default() _ = bus.Subscribe("hello" , handleHello) _ = bus.Subscribe("add" , handleAdd) }
发布事件(Publish) 通过 Publish(topic string, args …interface{}) 触发指定主题的所有订阅函数,并传递参数。若处理函数有返回值,Publish 会返回所有返回值的切片。
func main () { bus := EventBus.Default() _ = bus.Subscribe("hello" , handleHello) _ = bus.Subscribe("add" , handleAdd) _ = bus.Publish("hello" , "Go Developer" ) results, _ := bus.Publish("add" , 3 , 5 ) fmt.Println("3 + 5 =" , results[0 ].(int )) }
取消订阅(Unsubscribe) 通过 Unsubscribe(topic string, handler interface{}) 解除主题与处理函数的绑定,需确保传入的「主题」和「处理函数」与订阅时完全一致。
func main () { bus := EventBus.Default() _ = bus.Subscribe("hello" , handleHello) _ = bus.Publish("hello" , "Test" ) _ = bus.Unsubscribe("hello" , handleHello) _ = bus.Publish("hello" , "After Unsubscribe" ) }
进阶特性 异步发布事件(PublishAsync) PublishAsync(topic string, args ...interface{})
会在新的 Goroutine 中执行处理函数,避免阻塞当前流程(适合耗时操作)。
import "time" func slowHandle (msg string ) { time.Sleep(2 * time.Second) fmt.Println("Slow handle:" , msg) } func main () { bus := EventBus.Default() _ = bus.Subscribe("slow" , slowHandle) _ = bus.PublishAsync("slow" , "Async Task" ) fmt.Println("Main process continues..." ) time.Sleep(3 * time.Second) }
定时事件(SubscribeOnceAsync) SubscribeOnceAsync(topic string, handler interface{}, delay time.Duration) 实现「延迟指定时间后执行一次处理函数」,且仅执行一次(类似定时任务)。
func main () { bus := EventBus.Default() _ = bus.SubscribeOnceAsync("delayed" , handleHello, 3 *time.Second) fmt.Println("Waiting for delayed event..." ) time.Sleep(4 * time.Second) }
检查订阅状态
func main () { bus := EventBus.Default() _ = bus.Subscribe("hello" , handleHello) fmt.Println("Has 'hello' callback?" , bus.HasCallback("hello" )) callbacks := bus.Callbacks("hello" ) fmt.Println("Number of callbacks:" , len (callbacks)) }
注意事项
参数匹配 :发布事件的参数数量、类型必须与订阅函数的参数一致,否则会触发运行时错误。
并发安全 :EventBus 内部已做并发安全处理,支持多 Goroutine 同时订阅 / 发布。
返回值处理 :若订阅函数有返回值,Publish 会返回 []interface{}
类型的结果切片,需通过类型断言(如 results[0].(int)
)转换为具体类型。
资源释放 :不再使用的总线,可通过 bus.Close()
关闭(释放内部 Goroutine,避免内存泄漏)。
Golang 协程并发限制的简单方法 除了工作池模式,Go 语言还提供了一些更简单的方法来限制并发协程数量。以下是几种简单易用的方法:
使用带缓冲的通道作为信号量 这是最简单直接的方法,使用一个带缓冲的通道作为计数信号量:
package mainimport ( "fmt" "sync" "time" ) func main () { const maxConcurrent = 3 const totalTasks = 10 sem := make (chan struct {}, maxConcurrent) var wg sync.WaitGroup for i := 1 ; i <= totalTasks; i++ { wg.Add(1 ) go func (taskID int ) { defer wg.Done() sem <- struct {}{} defer func () { <-sem }() fmt.Printf("开始处理任务 %d\n" , taskID) time.Sleep(2 * time.Second) fmt.Printf("完成任务 %d\n" , taskID) }(i) } wg.Wait() fmt.Println("所有任务已完成" ) }
使用 errgroup 包 (Go 1.20+) 如果你使用 Go 1.20 或更高版本,可以使用 golang.org/x/sync/errgroup
包中的 SetLimit
方法:
package mainimport ( "context" "fmt" "time" "golang.org/x/sync/errgroup" ) func main () { const maxConcurrent = 3 const totalTasks = 10 g, _ := errgroup.WithContext(context.Background()) g.SetLimit(maxConcurrent) for i := 1 ; i <= totalTasks; i++ { taskID := i g.Go(func () error { fmt.Printf("开始处理任务 %d\n" , taskID) time.Sleep(2 * time.Second) fmt.Printf("完成任务 %d\n" , taskID) return nil }) } if err := g.Wait(); err != nil { fmt.Printf("任务执行出错: %v\n" , err) } else { fmt.Println("所有任务已完成" ) } }
使用简单的计数器 使用原子计数器或互斥锁来限制并发数:
package mainimport ( "fmt" "sync" "sync/atomic" "time" ) func main () { const maxConcurrent = 3 const totalTasks = 10 var ( wg sync.WaitGroup current int32 ) for i := 1 ; i <= totalTasks; i++ { for { if atomic.LoadInt32(¤t) < maxConcurrent { break } time.Sleep(10 * time.Millisecond) } atomic.AddInt32(¤t, 1 ) wg.Add(1 ) go func (taskID int ) { defer func () { atomic.AddInt32(¤t, -1 ) wg.Done() }() fmt.Printf("开始处理任务 %d (当前并发: %d)\n" , taskID, atomic.LoadInt32(¤t)) time.Sleep(2 * time.Second) fmt.Printf("完成任务 %d\n" , taskID) }(i) } wg.Wait() fmt.Println("所有任务已完成" ) }
使用第三方库 (如 “go.uber.org/zap
“ 作者的 “golang.org/x/sync/semaphore
“)
package mainimport ( "context" "fmt" "time" "golang.org/x/sync/semaphore" ) func main () { const maxConcurrent = 3 const totalTasks = 10 sem := semaphore.NewWeighted(int64 (maxConcurrent)) ctx := context.Background() for i := 1 ; i <= totalTasks; i++ { taskID := i go func () { if err := sem.Acquire(ctx, 1 ); err != nil { fmt.Printf("获取信号量失败: %v\n" , err) return } defer sem.Release(1 ) fmt.Printf("开始处理任务 %d\n" , taskID) time.Sleep(2 * time.Second) fmt.Printf("完成任务 %d\n" , taskID) }() } time.Sleep(time.Duration(totalTasks/maxConcurrent+1 ) * 2 * time.Second) fmt.Println("所有任务可能已完成" ) }
sync.WaitGroup 详细说明与示例 sync.WaitGroup
是 Go 语言中用于等待一组 goroutine 完成执行的同步原语。它非常适用于需要等待多个并发操作全部完成后才能继续执行的场景。
核心方法 sync.WaitGroup
提供三个主要方法:
Add(delta int) :增加或减少等待的 goroutine 数量
Done() :相当于 Add(-1),表示一个 goroutine 已完成
Wait() :阻塞当前 goroutine,直到所有等待的 goroutine 都完成
基本使用模式
var wg sync.WaitGroupfor i := 0 ; i < n; i++ { wg.Add(1 ) go func () { defer wg.Done() }() } wg.Wait()
详细示例 示例 1: 基本用法 package mainimport ( "fmt" "sync" "time" ) func main () { var wg sync.WaitGroup for i := 1 ; i <= 3 ; i++ { wg.Add(1 ) go func (id int ) { defer wg.Done() fmt.Printf("Worker %d: 开始工作\n" , id) time.Sleep(time.Duration(id) * time.Second) fmt.Printf("Worker %d: 工作完成\n" , id) }(i) } fmt.Println("主goroutine: 等待所有worker完成..." ) wg.Wait() fmt.Println("主goroutine: 所有worker已完成!" ) }
输出结果: 主goroutine: 等待所有worker完成... Worker 1: 开始工作 Worker 3: 开始工作 Worker 2: 开始工作 Worker 1: 工作完成 Worker 2: 工作完成 Worker 3: 工作完成 主goroutine: 所有worker已完成!
示例 2: 并行处理任务 package mainimport ( "fmt" "math/rand" "sync" "time" ) func processTask (id int , wg *sync.WaitGroup) { defer wg.Done() sleepTime := time.Duration(rand.Intn(3 )+1 ) * time.Second fmt.Printf("任务 %d: 需要 %v 处理时间\n" , id, sleepTime) time.Sleep(sleepTime) fmt.Printf("任务 %d: 完成\n" , id) } func main () { rand.Seed(time.Now().UnixNano()) var wg sync.WaitGroup for i := 1 ; i <= 10 ; i++ { wg.Add(1 ) go processTask(i, &wg) } fmt.Println("主程序: 等待所有任务完成..." ) wg.Wait() fmt.Println("主程序: 所有任务已完成!" ) }
示例 3: 分批处理与错误处理 package mainimport ( "errors" "fmt" "sync" "time" ) func main () { var wg sync.WaitGroup var mu sync.Mutex var errorsList []error tasks := []string {"任务A" , "任务B" , "任务C" , "任务D" , "任务E" } for i, task := range tasks { wg.Add(1 ) go func (taskID int , taskName string ) { defer wg.Done() if taskID%3 == 0 { mu.Lock() errorsList = append (errorsList, errors.New(fmt.Sprintf("任务 %s 执行失败" , taskName))) mu.Unlock() return } fmt.Printf("正在处理: %s\n" , taskName) time.Sleep(500 * time.Millisecond) fmt.Printf("完成: %s\n" , taskName) }(i, task) } wg.Wait() if len (errorsList) > 0 { fmt.Printf("处理完成,但有 %d 个错误:\n" , len (errorsList)) for _, err := range errorsList { fmt.Printf(" - %v\n" , err) } } else { fmt.Println("所有任务成功完成!" ) } }
示例 4: 结合通道限制并发数 package mainimport ( "fmt" "sync" "time" ) func main () { var wg sync.WaitGroup maxConcurrent := 3 semaphore := make (chan struct {}, maxConcurrent) totalTasks := 10 for i := 1 ; i <= totalTasks; i++ { wg.Add(1 ) semaphore <- struct {}{} go func (taskID int ) { defer func () { <-semaphore wg.Done() }() fmt.Printf("任务 %d 开始 (当前并发: %d/%d)\n" , taskID, len (semaphore), maxConcurrent) time.Sleep(2 * time.Second) fmt.Printf("任务 %d 完成\n" , taskID) }(i) } wg.Wait() close (semaphore) fmt.Println("所有任务处理完毕!" ) }
重要注意事项
Add 必须在 goroutine 外部调用 :在启动 goroutine 之前调用 Add,否则可能出现竞态条件
wg.Add(1 ) go func () { defer wg.Done() }() go func () { wg.Add(1 ) defer wg.Done() }()
使用 defer 调用 Done :确保即使在 goroutine 发生 panic 时也能减少计数器
不要复制 WaitGroup :总是传递 WaitGroup 的指针
计数器不能为负 :如果 Done() 调用次数超过 Add(),会引发 panic