在当今微服务和分布式系统盛行的背景下,事件驱动架构(Event-Driven Architecture,EDA)扮演着一个至关重要的角色,此架构的设计使得服务间可以通过事件进行同步或异步通信,替代了传统的直接接口调用。基于事件的交互方式,促进了服务之间的松耦合,提高系统的可扩展性。
发布-订阅模式是实现事件驱动架构的模式之一,它允许系统的不同组件或服务发布事件,而其他组件或服务可以订阅这些事件并根据事件内容进行响应。相信大部分开发者都接触过这一模式,常见的技术实现有消息队列(MQ)和 Redis 发布/订阅(PUB/SUB)功能等。
在 Go 语言中,我们可以利用其强大的 channel 和并发机制来实现发布-订阅模式。本文将深入探讨如何在 Go 中实现一个简单的事件总线,这是发布-订阅模式的具体实现。
准备好了吗?准备一杯你最喜欢的咖啡或茶,随着本文一探究竟吧。
事件总线是发布-订阅模式的具体实现,它作为发布者和订阅者的中间件,管理着事件传递与分发,确保事件从发布者顺利地传达到订阅者。
图片
事件总线的优势主要包括:
接下来将介绍如何在 Go 语言中实现一个简单的事件总线,它包含以下关键功能:
项目源码地址:https://github.com/chenmingyong0423/go-eventbus
type Event struct { Payload any}
Event 是一个封装事件的结构体,其中 Payload 为事件的上下文信息,类型是 any。
type ( EventChan chan Event)type EventBus struct { mu sync.RWMutex subscribers map[string][]EventChan}func NewEventBus() *EventBus { return &EventBus{ subscribers: make(map[string][]EventChan), }}
EventChan 是一个类型别名,定义为传递 Event 结构体的通道 chan Event。
EventBus 为事件总线的定义,它包含两个属性:
NewEventBus 函数用于创建一个新的 EventBus 事件总线。
事件总线实现了三个方法,分别为发布事件(Publish)和订阅事件(Subscribe)以及取消订阅事件(Unsubscribe)。
func (eb *EventBus) Publish(topic string, event Event) { eb.mu.RLock() defer eb.mu.RUnlock() // 复制一个新的订阅者列表,避免在发布事件时修改订阅者列表 subscribers := append([]EventChan{}, eb.subscribers[topic]...) gofunc() { for _, subscriber := range subscribers { subscriber <- event } }()}
Publish 方法用于发布事件。该方法接收两个参数:topic(主题)和 event (封装事件的对象)。
在 Publish 方法的实现中,首先通过 mu 属性获取读锁,以确保接下来的 subscribers 写操作是协程安全的。然后复制一份当前主题的订阅者列表 subscribers。接下来开启一个新 goroutine,在这个 goroutine 中遍历复制的订阅者列表,将事件通过通道发送给所有订阅者。完成这些操作后,释放读锁。
为什么会复制一个新的订阅者列表?
答:复制订阅者列表是为了在发送事件时保持数据的一致性和稳定性。由于向通道发送数据的操作是在一个新的 goroutine 中进行的,在发送数据时,读锁已经被释放,原来的订阅者列表可能会由于添加或删除订阅者而发生变化。如果直接使用原来的订阅者列表,可能会发生预料之外的错误(如向一个已经关闭的通道发送数据会产生 panic)。
func (eb *EventBus) Subscribe(topic string) EventChan { eb.mu.Lock() defer eb.mu.Unlock() ch := make(EventChan) eb.subscribers[topic] = append(eb.subscribers[topic], ch) return ch}
Subscribe 方法用于订阅特定主题的事件。该方法有接收一个 topic 参数,表示希望订阅的主题。通过此方法,可以获得一个 EventChan 通道,用于接收该主题的事件。
在 Subscribe 方法的实现中,首先通过 mu 属性获取写锁,以保证接下来的 subscribers 读写操作是协程安全的;接着创建一个新的 EventChan 通道 ch,将其添加到相应主题的订阅者切片中。完成这些操作后,释放写锁。
func (eb *EventBus) Unsubscribe(topic string, ch EventChan) { eb.mu.Lock() defer eb.mu.Unlock() if subscribers, ok := eb.subscribers[topic]; ok { for i, subscriber := range subscribers { if ch == subscriber { eb.subscribers[topic] = append(subscribers[:i], subscribers[i+1:]...) close(ch) // 清空通道 forrange ch { } return } } }}
Unsubscribe 方法用于取消订阅事件。该方法接收两个参数:topic(已订阅的主题)和 ch(被颁发的通道)。
在 Unsubscribe 方法里,首先通过 mu 属性获取写锁,以保证接下来的 subscribers 读写操作是协程安全的;然后检查 topic 主题是否存在对应的订阅者。如果存在,遍历该主题的订阅者切片,找到与 ch 相匹配的通道,将其从订阅者切片里移除并关闭该通道。然后清空通道。完成这些操作后,释放写锁。
// https://github.com/chenmingyong0423/blog/blob/master/tutorial-code/go/eventbus/main.gopackage mainimport ( "fmt" "time" "github.com/chenmingyong0423/go-eventbus")func main() { eventBus := eventbus.NewEventBus() // 订阅 post 主题事件 subscribe := eventBus.Subscribe("post") gofunc() { for event := range subscribe { fmt.Println(event.Payload) } }() eventBus.Publish("post", eventbus.Event{Payload: map[string]any{ "postId": 1, "title": "Go 事件驱动编程:实现一个简单的事件总线", "author": "陈明勇", }}) // 不存在订阅者的 topic eventBus.Publish("pay", eventbus.Event{Payload: "pay"}) time.Sleep(time.Second * 2) // 取消订阅 post 主题事件 eventBus.Unsubscribe("post", subscribe)}
本文实现的事件总线较为简单,如果要增强时间总线的灵活性,可靠性和易用性等方面,我们可以考虑扩展它,以下是一些建议:
本文深入探讨了在 Go 语言中实现简单事件总线的过程。通过利用 Go 语言的强大特性,如 channel 和并发机制,我们可以轻松地实现发布-订阅模式。
文章从事件总线的优势开始,介绍了其解耦、异步处理、可扩展性和错误隔离等特点。然后详细解释了如何定义事件数据结构和事件总线结构,并实现了发布、订阅和取消订阅事件的方法。最后,提出了一些可能的扩展方向,如事件持久化、通配符订阅、负载均衡和插件支持,以增强事件总线的灵活性和功能性。
通过阅读本文,你可以学会在 Go 语言中实现一个简单但功能强大的事件总线,并根据可能的需求进行扩展。
★项目源码地址:https://github.com/chenmingyong0423/go-eventbus
本文链接:http://www.28at.com/showinfo-26-88374-0.htmlGo 事件驱动编程:实现一个简单的事件总线
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com
上一篇: 要不要升级?Java 21强大的新特性,代码量减半
下一篇: 剖析 Figma 图形对象的基本属性