Go语言中消息总线和sync.WaitGroup的简单使用

世间万物都别等失去了,再来睹物思人

Posted by yishuifengxiao on 2024-12-15

golang中消息总线

EventBus 是 Go 语言中轻量级的事件总线库,核心用于实现组件间的解耦通信(发布 - 订阅模式),支持普通事件、异步事件和定时事件。

首先通过 go get 安装依赖,在终端执行以下命令:

go get github.com/asaskevich/EventBus

核心使用步骤

初始化 EventBus

创建一个事件总线实例,有两种常用方式:

  • 默认全局总线:直接使用 EventBus.Default() 获取全局单例,适合简单场景。

  • 自定义实例:通过 EventBus.New() 创建新实例,适合多总线隔离的场景。

package main

import (
"fmt"
"github.com/asaskevich/EventBus"
)

func main() {
// 方式1:使用全局默认总线
globalBus := EventBus.Default()

// 方式2:创建自定义总线实例
customBus := EventBus.New()
}

订阅事件(Subscribe)

通过 Subscribe(topic string, handler interface{}) 绑定「事件主题(topic)」和「处理函数(handler)」,一个主题可绑定多个处理函数。

注意:处理函数需满足「参数数量与发布事件的参数一致」,且返回值可选(若有返回值,发布时需处理)。

// 定义事件处理函数(无返回值)
func handleHello(name string) {
fmt.Printf("Hello, %s!\n", name)
}

// 定义带返回值的处理函数
func handleAdd(a, b int) int {
return a + b
}

func main() {
bus := EventBus.Default()

// 订阅 "hello" 主题,绑定 handleHello 函数
_ = bus.Subscribe("hello", handleHello)

// 订阅 "add" 主题,绑定 handleAdd 函数
_ = bus.Subscribe("add", handleAdd)
}

发布事件(Publish)

通过 Publish(topic string, args …interface{}) 触发指定主题的所有订阅函数,并传递参数。若处理函数有返回值,Publish 会返回所有返回值的切片。

func main() {
bus := EventBus.Default()
_ = bus.Subscribe("hello", handleHello)
_ = bus.Subscribe("add", handleAdd)

// 1. 发布 "hello" 事件(无返回值,忽略结果)
_ = bus.Publish("hello", "Go Developer") // 输出:Hello, Go Developer!

// 2. 发布 "add" 事件(有返回值,需接收结果)
results, _ := bus.Publish("add", 3, 5)
fmt.Println("3 + 5 =", results[0].(int)) // 输出:3 + 5 = 8
}

取消订阅(Unsubscribe)

通过 Unsubscribe(topic string, handler interface{}) 解除主题与处理函数的绑定,需确保传入的「主题」和「处理函数」与订阅时完全一致。

func main() {
bus := EventBus.Default()
_ = bus.Subscribe("hello", handleHello)

// 发布事件(正常触发)
_ = bus.Publish("hello", "Test") // 输出:Hello, Test!

// 取消 "hello" 主题与 handleHello 的订阅
_ = bus.Unsubscribe("hello", handleHello)

// 再次发布(无处理函数,无输出)
_ = bus.Publish("hello", "After Unsubscribe")
}

进阶特性

异步发布事件(PublishAsync)

PublishAsync(topic string, args ...interface{}) 会在新的 Goroutine 中执行处理函数,避免阻塞当前流程(适合耗时操作)。

import "time"

func slowHandle(msg string) {
time.Sleep(2 * time.Second) // 模拟耗时操作
fmt.Println("Slow handle:", msg)
}

func main() {
bus := EventBus.Default()
_ = bus.Subscribe("slow", slowHandle)

// 异步发布:当前流程不阻塞,2秒后输出结果
_ = bus.PublishAsync("slow", "Async Task")
fmt.Println("Main process continues...") // 先输出此行
time.Sleep(3 * time.Second) // 等待异步任务完成
}

定时事件(SubscribeOnceAsync)

SubscribeOnceAsync(topic string, handler interface{}, delay time.Duration) 实现「延迟指定时间后执行一次处理函数」,且仅执行一次(类似定时任务)。

func main() {
bus := EventBus.Default()

// 3秒后执行一次 handleHello,主题为 "delayed"
_ = bus.SubscribeOnceAsync("delayed", handleHello, 3*time.Second)
fmt.Println("Waiting for delayed event...")
time.Sleep(4 * time.Second) // 等待定时任务执行(输出:Hello, Delayed!)
}

检查订阅状态

  • HasCallback(topic string) bool:判断指定主题是否有订阅函数。

  • Callbacks(topic string) []interface{}:获取指定主题的所有订阅函数。

func main() {
bus := EventBus.Default()
_ = bus.Subscribe("hello", handleHello)

// 检查 "hello" 主题是否有订阅
fmt.Println("Has 'hello' callback?", bus.HasCallback("hello")) // 输出:true

// 获取 "hello" 主题的所有订阅函数
callbacks := bus.Callbacks("hello")
fmt.Println("Number of callbacks:", len(callbacks)) // 输出:1
}

注意事项

  1. 参数匹配:发布事件的参数数量、类型必须与订阅函数的参数一致,否则会触发运行时错误。

  2. 并发安全:EventBus 内部已做并发安全处理,支持多 Goroutine 同时订阅 / 发布。

  3. 返回值处理:若订阅函数有返回值,Publish 会返回 []interface{} 类型的结果切片,需通过类型断言(如 results[0].(int))转换为具体类型。

  4. 资源释放:不再使用的总线,可通过 bus.Close() 关闭(释放内部 Goroutine,避免内存泄漏)。


Golang 协程并发限制的简单方法

除了工作池模式,Go 语言还提供了一些更简单的方法来限制并发协程数量。以下是几种简单易用的方法:

使用带缓冲的通道作为信号量

这是最简单直接的方法,使用一个带缓冲的通道作为计数信号量:

package main

import (
"fmt"
"sync"
"time"
)

func main() {
const maxConcurrent = 3 // 最大并发数
const totalTasks = 10 // 总任务数

sem := make(chan struct{}, maxConcurrent) // 信号量通道
var wg sync.WaitGroup

for i := 1; i <= totalTasks; i++ {
wg.Add(1)

go func(taskID int) {
defer wg.Done()

// 获取信号量
sem <- struct{}{}
defer func() { <-sem }() // 释放信号量

fmt.Printf("开始处理任务 %d\n", taskID)
time.Sleep(2 * time.Second) // 模拟耗时操作
fmt.Printf("完成任务 %d\n", taskID)
}(i)
}

wg.Wait()
fmt.Println("所有任务已完成")
}

使用 errgroup 包 (Go 1.20+)

如果你使用 Go 1.20 或更高版本,可以使用 golang.org/x/sync/errgroup 包中的 SetLimit 方法:

package main

import (
"context"
"fmt"
"time"

"golang.org/x/sync/errgroup"
)

func main() {
const maxConcurrent = 3
const totalTasks = 10

g, _ := errgroup.WithContext(context.Background())
g.SetLimit(maxConcurrent) // 设置最大并发数

for i := 1; i <= totalTasks; i++ {
taskID := i
g.Go(func() error {
fmt.Printf("开始处理任务 %d\n", taskID)
time.Sleep(2 * time.Second) // 模拟耗时操作
fmt.Printf("完成任务 %d\n", taskID)
return nil
})
}

if err := g.Wait(); err != nil {
fmt.Printf("任务执行出错: %v\n", err)
} else {
fmt.Println("所有任务已完成")
}
}

使用简单的计数器

使用原子计数器或互斥锁来限制并发数:

package main

import (
"fmt"
"sync"
"sync/atomic"
"time"
)

func main() {
const maxConcurrent = 3
const totalTasks = 10

var (
wg sync.WaitGroup
current int32
)

for i := 1; i <= totalTasks; i++ {
// 等待直到有可用的并发槽位
for {
if atomic.LoadInt32(&current) < maxConcurrent {
break
}
time.Sleep(10 * time.Millisecond) // 短暂休眠避免忙等待
}

atomic.AddInt32(&current, 1)
wg.Add(1)

go func(taskID int) {
defer func() {
atomic.AddInt32(&current, -1)
wg.Done()
}()

fmt.Printf("开始处理任务 %d (当前并发: %d)\n", taskID, atomic.LoadInt32(&current))
time.Sleep(2 * time.Second) // 模拟耗时操作
fmt.Printf("完成任务 %d\n", taskID)
}(i)
}

wg.Wait()
fmt.Println("所有任务已完成")
}

使用第三方库

(如 “go.uber.org/zap“ 作者的 “golang.org/x/sync/semaphore“)

package main

import (
"context"
"fmt"
"time"

"golang.org/x/sync/semaphore"
)

func main() {
const maxConcurrent = 3
const totalTasks = 10

sem := semaphore.NewWeighted(int64(maxConcurrent))
ctx := context.Background()

for i := 1; i <= totalTasks; i++ {
taskID := i
go func() {
// 获取信号量
if err := sem.Acquire(ctx, 1); err != nil {
fmt.Printf("获取信号量失败: %v\n", err)
return
}
defer sem.Release(1) // 释放信号量

fmt.Printf("开始处理任务 %d\n", taskID)
time.Sleep(2 * time.Second) // 模拟耗时操作
fmt.Printf("完成任务 %d\n", taskID)
}()
}

// 等待所有任务完成
time.Sleep(time.Duration(totalTasks/maxConcurrent+1) * 2 * time.Second)
fmt.Println("所有任务可能已完成")
}

sync.WaitGroup 详细说明与示例

sync.WaitGroup 是 Go 语言中用于等待一组 goroutine 完成执行的同步原语。它非常适用于需要等待多个并发操作全部完成后才能继续执行的场景。

核心方法

sync.WaitGroup 提供三个主要方法:

  1. Add(delta int):增加或减少等待的 goroutine 数量
  2. Done():相当于 Add(-1),表示一个 goroutine 已完成
  3. Wait():阻塞当前 goroutine,直到所有等待的 goroutine 都完成

基本使用模式

var wg sync.WaitGroup

// 启动多个goroutine前
for i := 0; i < n; i++ {
wg.Add(1) // 增加计数器
go func() {
defer wg.Done() // 完成后减少计数器
// 执行任务
}()
}

// 等待所有goroutine完成
wg.Wait()

详细示例

示例 1: 基本用法

package main

import (
"fmt"
"sync"
"time"
)

func main() {
var wg sync.WaitGroup

// 启动3个goroutine
for i := 1; i <= 3; i++ {
wg.Add(1) // 计数器加1

go func(id int) {
defer wg.Done() // 函数退出时计数器减1

fmt.Printf("Worker %d: 开始工作\n", id)
time.Sleep(time.Duration(id) * time.Second) // 模拟工作时间
fmt.Printf("Worker %d: 工作完成\n", id)
}(i)
}

fmt.Println("主goroutine: 等待所有worker完成...")
wg.Wait() // 阻塞直到计数器为0
fmt.Println("主goroutine: 所有worker已完成!")
}

输出结果:

主goroutine: 等待所有worker完成...
Worker 1: 开始工作
Worker 3: 开始工作
Worker 2: 开始工作
Worker 1: 工作完成
Worker 2: 工作完成
Worker 3: 工作完成
主goroutine: 所有worker已完成!

示例 2: 并行处理任务

package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

func processTask(id int, wg *sync.WaitGroup) {
defer wg.Done() // 确保在函数退出时调用

// 模拟随机处理时间
sleepTime := time.Duration(rand.Intn(3)+1) * time.Second
fmt.Printf("任务 %d: 需要 %v 处理时间\n", id, sleepTime)

time.Sleep(sleepTime)
fmt.Printf("任务 %d: 完成\n", id)
}

func main() {
rand.Seed(time.Now().UnixNano())
var wg sync.WaitGroup

// 创建10个任务
for i := 1; i <= 10; i++ {
wg.Add(1)
go processTask(i, &wg) // 传递WaitGroup的指针
}

fmt.Println("主程序: 等待所有任务完成...")
wg.Wait()
fmt.Println("主程序: 所有任务已完成!")
}

示例 3: 分批处理与错误处理

package main

import (
"errors"
"fmt"
"sync"
"time"
)

func main() {
var wg sync.WaitGroup
var mu sync.Mutex
var errorsList []error

tasks := []string{"任务A", "任务B", "任务C", "任务D", "任务E"}

for i, task := range tasks {
wg.Add(1)

go func(taskID int, taskName string) {
defer wg.Done()

// 模拟可能失败的操作
if taskID%3 == 0 { // 每第3个任务模拟失败
mu.Lock()
errorsList = append(errorsList,
errors.New(fmt.Sprintf("任务 %s 执行失败", taskName)))
mu.Unlock()
return
}

fmt.Printf("正在处理: %s\n", taskName)
time.Sleep(500 * time.Millisecond)
fmt.Printf("完成: %s\n", taskName)
}(i, task)
}

wg.Wait()

// 检查是否有错误
if len(errorsList) > 0 {
fmt.Printf("处理完成,但有 %d 个错误:\n", len(errorsList))
for _, err := range errorsList {
fmt.Printf(" - %v\n", err)
}
} else {
fmt.Println("所有任务成功完成!")
}
}

示例 4: 结合通道限制并发数

package main

import (
"fmt"
"sync"
"time"
)

func main() {
var wg sync.WaitGroup
maxConcurrent := 3 // 最大并发数
semaphore := make(chan struct{}, maxConcurrent)
totalTasks := 10

for i := 1; i <= totalTasks; i++ {
wg.Add(1)

// 获取信号量(限制并发)
semaphore <- struct{}{}

go func(taskID int) {
defer func() {
<-semaphore // 释放信号量
wg.Done() // 通知任务完成
}()

fmt.Printf("任务 %d 开始 (当前并发: %d/%d)\n",
taskID, len(semaphore), maxConcurrent)
time.Sleep(2 * time.Second)
fmt.Printf("任务 %d 完成\n", taskID)
}(i)
}

wg.Wait()
close(semaphore)
fmt.Println("所有任务处理完毕!")
}

重要注意事项

  1. Add 必须在 goroutine 外部调用:在启动 goroutine 之前调用 Add,否则可能出现竞态条件

    // 正确做法
    wg.Add(1)
    go func() {
    defer wg.Done()
    // ...
    }()

    // 错误做法(可能导致 Wait 在 Add 之前返回)
    go func() {
    wg.Add(1) // 错误:在goroutine内部调用Add
    defer wg.Done()
    // ...
    }()
  2. 使用 defer 调用 Done:确保即使在 goroutine 发生 panic 时也能减少计数器

  3. 不要复制 WaitGroup:总是传递 WaitGroup 的指针

  4. 计数器不能为负:如果 Done() 调用次数超过 Add(),会引发 panic