paint-brush
Cách tôi xây dựng hệ thống Pub/Sub tương thích với Redis bằng Golangtừ tác giả@kelvinm
2,869 lượt đọc
2,869 lượt đọc

Cách tôi xây dựng hệ thống Pub/Sub tương thích với Redis bằng Golang

từ tác giả Kelvin Clement M.18m2024/04/28
Read on Terminal Reader

dài quá đọc không nổi

Trong năm qua, tôi đã xây dựng EchoVault, một giải pháp thay thế Redis có thể nhúng cho hệ sinh thái Golang. EchoVault nhằm mục đích sao chép hầu hết các tính năng của Redis trong khi cung cấp cả giao diện nhúng và giao diện máy khách-máy chủ tương thích với các máy khách Redis hiện có bằng giao thức RESP. Một trong những tính năng được triển khai trong EchoVault là tính năng Pub/Sub. Bài viết này là hướng dẫn ngắn gọn về cách triển khai mô-đun Pub/Sub tại thời điểm viết bài.
featured image - Cách tôi xây dựng hệ thống Pub/Sub tương thích với Redis bằng Golang
Kelvin Clement M. HackerNoon profile picture
0-item

Trong năm qua, tôi đã xây dựng EchoVault , một giải pháp thay thế Redis có thể nhúng cho hệ sinh thái Golang. EchoVault nhằm mục đích sao chép hầu hết các tính năng của Redis trong khi cung cấp cả giao diện nhúng và giao diện máy khách-máy chủ tương thích với các máy khách Redis hiện có bằng giao thức RESP.


Một trong những tính năng được triển khai trong EchoVault là tính năng Pub/Sub. Bài viết này là hướng dẫn ngắn gọn về cách triển khai mô-đun Pub/Sub tại thời điểm viết bài.

Pub/Sub là gì?

Pub/Sub là viết tắt của xuất bản/đăng ký. Mô hình này cho phép người tiêu dùng đăng ký một số kênh nhất định. Nhà sản xuất xuất bản tin nhắn tới các kênh và tất cả người tiêu dùng đã đăng ký kênh đó đều nhận được tin nhắn.


Trong ngữ cảnh của chúng tôi, chúng tôi phải kích hoạt quy trình Go để đăng ký các kênh được chỉ định khi EchoVault được nhúng. Khi chạy ở chế độ máy khách-máy chủ, chúng tôi phải cho phép kết nối TCP của máy khách đăng ký các kênh.


Về mặt xuất bản, quy trình Go sẽ có thể xuất bản lên kênh ở chế độ nhúng và kết nối máy khách TCP sẽ có thể xuất bản tin nhắn lên kênh ở chế độ máy khách-máy chủ.

Yêu cầu

Trước khi bắt đầu, chúng ta cần xác định các yêu cầu và điểm số của việc thực hiện này. EchoVault không triển khai tất cả các tính năng có sẵn trong Redis Pub/Sub tại thời điểm viết bài. Tuy nhiên, các tính năng cốt lõi quan trọng nhất được thực hiện. Đây là những gì chúng ta có thể làm với EchoVault PubSub:


  1. Cho phép máy khách TCP đăng ký danh sách các kênh bằng lệnh SUBSCRIBE channel [channel …] . Máy chủ sẽ gửi cho kết nối máy khách xác nhận đăng ký.


  2. Một phiên bản EchoVault được nhúng sẽ có thể đăng ký danh sách các kênh.


  3. Cho phép máy khách TCP đăng ký một mẫu bằng cách sử dụng PSUBSCRIBE pattern [pattern …] trong đó mẫu là một chuỗi toàn cầu cho phép khách hàng nhận các tin nhắn được xuất bản tới tất cả các kênh thỏa mãn mẫu đó.


  4. Một phiên bản EchoVault được nhúng sẽ có thể đăng ký vào danh sách các mẫu.


  5. Cho phép máy khách TCP xuất bản tin nhắn lên một kênh bằng lệnh PUBLISH channel message .


  6. Xuất bản lên kênh từ phiên bản EchoVault được nhúng.


  7. Cho phép máy khách TCP hủy đăng ký khỏi các kênh và mẫu bằng cách sử dụng các lệnh UNSUBSCRIBE channel [channel …]PUNSUBSCRIBE pattern [pattern …] tương ứng.


  8. Cho phép kết nối máy khách TCP sử dụng lệnh PUBSUB CHANNELS [pattern] để xem một mảng chứa các kênh khớp với mẫu đã cho. Nếu không có mẫu nào được cung cấp thì tất cả các kênh đang hoạt động sẽ được trả về. Kênh hoạt động là kênh có một hoặc nhiều người đăng ký.


  9. Cho phép máy khách TCP sử dụng lệnh PUBSUB NUMPAT để xem số lượng mẫu hiện được máy khách đăng ký.


  10. Cung cấp API nhúng để xem số lượng mẫu hoạt động.


  11. Cho phép máy khách TCP sử dụng lệnh PUBSUB NUMSUB [channel [channel ...]] để xem một mảng các mảng chứa tên kênh được cung cấp và số lượng máy khách hiện đang đăng ký kênh.


  12. Cung cấp API nhúng để xem số lượng người đăng ký trong các kênh nhất định.

Thực hiện

Bây giờ chúng ta đã đặt ra các yêu cầu, hãy bắt tay vào thực hiện. Ở cấp độ cao, chúng ta sẽ có cấu trúc PubSub cung cấp tất cả chức năng PubSub. Chúng ta cũng sẽ có cấu trúc Channel cung cấp chức năng cho các kênh và kênh mẫu.


Đối với phần này, chúng ta sẽ sử dụng gói tidwall/resp. Người dùng EchoVault sử dụng gói này để gửi phản hồi RESP tới người đăng ký kênh. Gói gobwas/glob xử lý logic mẫu hình cầu.

Kênh

Đầu tiên, chúng ta sẽ tạo cấu trúc Channel và tất cả các phương thức của nó. Chúng ta sẽ có hai loại kênh: kênh thông thường và kênh mẫu.

Kênh thông thường là các kênh có tên và không có mẫu nào liên quan đến chúng. Các kênh mẫu sẽ sử dụng mẫu làm tên và mẫu toàn cầu được biên dịch sẽ được liên kết với kênh.


Các kênh mẫu được sử dụng để đăng ký các mẫu. Nếu không, các kênh thông thường sẽ được sử dụng.


Cấu trúc Channel có dạng sau:


 type Channel struct { name string // Channel name. This can be a glob pattern string. pattern glob.Glob // Compiled glob pattern. This is nil if the channel is not a pattern channel. subscribersRWMut sync.RWMutex // RWMutex to concurrency control when accessing channel subscribers. subscribers map[*net.Conn]*resp.Conn // Map containing the channel subscribers. messageChan *chan string // Messages published to this channel will be sent to this channel. }


Chúng tôi sẽ sử dụng mẫu tùy chọn để tạo một phiên bản kênh mới. Dưới đây là hai tùy chọn có sẵn:


 // WithName option sets the channels name. func WithName(name string) func(channel *Channel) { return func(channel *Channel) { channel.name = name } } // WithPattern option sets the compiled glob pattern for the channel if it's a pattern channel. func WithPattern(pattern string) func(channel *Channel) { return func(channel *Channel) { channel.name = pattern channel.pattern = glob.MustCompile(pattern) } } func NewChannel(options ...func(channel *Channel)) *Channel { messageChan := make(chan string, 4096) // messageChan is buffered. This could be a configurable value. channel := &Channel{ name: "", pattern: nil, subscribersRWMut: sync.RWMutex{}, subscribers: make(map[*net.Conn]*resp.Conn), messageChan: &messageChan, } for _, option := range options { option(channel) } return channel }


Phương thức đầu tiên cho cấu trúc Channel là phương thức Start . Phương thức này bắt đầu một goroutine lắng nghe messageChan để phát các tin nhắn tới tất cả những người đăng ký của nó. Đây là cách thực hiện:


 func (ch *Channel) Start() { go func() { for { message := <-*ch.messageChan ch.subscribersRWMut.RLock() for _, conn := range ch.subscribers { go func(conn *resp.Conn) { if err := conn.WriteArray([]resp.Value{ resp.StringValue("message"), resp.StringValue(ch.name), resp.StringValue(message), }); err != nil { log.Println(err) } }(conn) } ch.subscribersRWMut.RUnlock() } }() }


Goroutine nhận tin nhắn tiếp theo từ messageChan , lấy khóa đọc cho người đăng ký, phát tin nhắn đến từng người đăng ký và sau đó mở khóa đọc. Như bạn có thể thấy, gói tidwall/resp cho phép chúng tôi gửi các giá trị RESP đến kết nối máy khách một cách dễ dàng.


Hầu hết khách hàng Redis mong muốn định dạng tin nhắn pub/sub là một mảng có hình dạng sau: [“message”, <channel name>, <message string>]. Đó là những gì được phát sóng bởi EchoVault.


Bạn có thể nhận thấy rằng chúng tôi đang gửi tin nhắn tới resp.Conn chứ không phải net.Conn . Chúng tôi đang sử dụng resp.Conn vì nó cung cấp các phương thức trợ giúp để ghi giá trị RESP vào nó, chẳng hạn như WriteArray . Về cơ bản, nó là một trình bao bọc xung quanh net.Conn như chúng ta sẽ thấy trong phương thức Subscribe :


 func (ch *Channel) Subscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() // Acquire write-lock because we'll be modifying the subscriber map. defer ch.subscribersRWMut.Unlock() // If the connection does not exist in the subscriber map, add it. if _, ok := ch.subscribers[conn]; !ok { ch.subscribers[conn] = resp.NewConn(*conn) } _, ok := ch.subscribers[conn] return ok }


Cùng với phương thức Đăng ký, chúng ta cũng cần phương thức Hủy đăng ký:


 func (ch *Channel) Unsubscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() defer ch.subscribersRWMut.Unlock() if _, ok := ch.subscribers[conn]; !ok { return false } delete(ch.subscribers, conn) return true }


Phương thức Hủy đăng ký trả về true nếu kết nối được tìm thấy và xóa khỏi bản đồ người đăng ký. Nếu không, nó sẽ trả về false . Ngoài các phương thức trên, còn có một số phương thức trợ giúp khác cho cấu trúc Channel :


 func (ch *Channel) Name() string { return ch.name } func (ch *Channel) Pattern() glob.Glob { return ch.pattern } func (ch *Channel) Publish(message string) { *ch.messageChan <- message } // IsActive returns true when the channel has 1 or more subscribers. func (ch *Channel) IsActive() bool { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() active := len(ch.subscribers) > 0 return active } // NumSubs returns the number of subscribers for this channel. func (ch *Channel) NumSubs() int { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() n := len(ch.subscribers) return n } // Subscribers returns a copy of the subscriber map. func (ch *Channel) Subscribers() map[*net.Conn]*resp.Conn { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() subscribers := make(map[*net.Conn]*resp.Conn, len(ch.subscribers)) for k, v := range ch.subscribers { subscribers[k] = v } return subscribers }

PubSub

Chúng tôi sẽ sử dụng mô-đun PubSub để tương tác với các kênh. Hình dạng của cấu trúc PubSub như sau:


 type PubSub struct { channels []*Channel // Slice of references to channels channelsRWMut sync.RWMutex // RWMutex for concurrency controls when accessing channels } func NewPubSub() *PubSub { return &PubSub{ channels: []*Channel{}, channelsRWMut: sync.RWMutex{}, } }


Phương thức đầu tiên là phương thức Subscribe . Phương pháp này xử lý việc đăng ký máy khách vào (các) kênh được chỉ định.


 func (ps *PubSub) Subscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) { ps.channelsRWMut.Lock() // Acquire write-lock as we may edit the slice of channels. defer ps.channelsRWMut.Unlock() r := resp.NewConn(*conn) // Wrap net.Conn connection with resp.Conn. action := "subscribe" if withPattern { action = "psubscribe" } // Loop through all the channels that the client has requested to subscribe to. for i := 0; i < len(channels); i++ { // Check if channel with given name exists // If it does, subscribe the connection to the channel // If it does not, create the channel and subscribe to it channelIdx := slices.IndexFunc(ps.channels, func(channel *Channel) bool { return channel.name == channels[i] }) if channelIdx == -1 { // If the channel does not exist, create new channel, start it, and subscribe to it. var newChan *Channel if withPattern { newChan = NewChannel(WithPattern(channels[i])) } else { newChan = NewChannel(WithName(channels[i])) } newChan.Start() if newChan.Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(newChan.name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } ps.channels = append(ps.channels, newChan) // Append the new channel to the list of channels. } else { // Subscribe to existing channel if ps.channels[channelIdx].Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(ps.channels[channelIdx].name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } } } }


Để đăng ký một ứng dụng khách vào một kênh, EchoVault sẽ lắng nghe lệnh SUBSCRIBE hoặc PSUBSCRIBE . Khi nhận được lệnh từ máy khách, kết nối TCP của máy khách cùng với danh sách các kênh sẽ được chuyển tới phương thức Subscribe . Khi khách hàng đăng ký bằng lệnh PSUBSCRIBE , tham số withPatterns sẽ là true và giá trị của biến action sẽ được đặt thành “psubscribe”.


Phương thức Unsubscribe cho phép khách hàng hủy đăng ký các kênh hoặc mẫu dựa trên việc họ đã sử dụng SUBSCRIBE hay PSUBSCRIBE . Việc thực hiện nó như sau:


 func (ps *PubSub) Unsubscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() action := "unsubscribe" if withPattern { action = "punsubscribe" } unsubscribed := make(map[int]string) // A map of all the channels/patterns successfully unsubscribed from. idx := 1 // idx holds the 1-based index of the channel/pattern unsubscribed from. if len(channels) <= 0 { if !withPattern { // If the channels slice is empty, and no pattern is provided // unsubscribe from all channels. for _, channel := range ps.channels { if channel.pattern != nil { // Skip pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } else { // If the channels slice is empty, and pattern is provided // unsubscribe from all patterns. for _, channel := range ps.channels { if channel.pattern == nil { // Skip non-pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } // Unsubscribe from channels where the name exactly matches channel name. // If unsubscribing from a pattern, also unsubscribe from all channel whose // names exactly matches the pattern name. for _, channel := range ps.channels { // For each channel in PubSub for _, c := range channels { // For each channel name provided if channel.name == c && channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } // If withPattern is true, unsubscribe from channels where pattern matches pattern provided, // also unsubscribe from channels where the name matches the given pattern. if withPattern { for _, pattern := range channels { g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If it's a pattern channel, directly compare the patterns if channel.pattern != nil && channel.name == pattern { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } continue } // If this is a regular channel, check if the channel name matches the pattern given if g.Match(channel.name) { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } } // Construct a RESP response confirming the channels/patterns unsubscribed from. res := fmt.Sprintf("*%d\r\n", len(unsubscribed)) for key, value := range unsubscribed { res += fmt.Sprintf("*3\r\n+%s\r\n$%d\r\n%s\r\n:%d\r\n", action, len(value), value, key) } return []byte(res) }


Khi khách hàng hủy đăng ký kênh/mẫu, họ sẽ nhận được phản hồi chứa một mảng. Mỗi mảng bên trong chứa hành động (hủy đăng ký/chấm đăng ký), tên kênh/mẫu và chỉ mục.


Phương pháp tiếp theo là phương pháp Xuất bản. Phương thức này chấp nhận tin nhắn và tên kênh, sau đó xử lý việc xuất bản tin nhắn nội bộ.


 func (ps *PubSub) Publish(_ context.Context, message string, channelName string) { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() // Loop through all of the existing channels. for _, channel := range ps.channels { // If it's a regular channel, check if the channel name matches the name given. if channel.pattern == nil { if channel.name == channelName { channel.Publish(message) // Publish the message to the channel. } continue } // If it's a glob pattern channel, check if the provided channel name matches the pattern. if channel.pattern.Match(channelName) { channel.Publish(message) // Publish the message to the channel } } }


Phương thức tiếp theo là phương thức Channels , xử lý lệnh PUBSUB CHANNELS [pattern] :


 func (ps *PubSub) Channels(pattern string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int var res string // If pattern is an empty string, return all the active channels. if pattern == "" { for _, channel := range ps.channels { if channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } res = fmt.Sprintf("*%d\r\n%s", count, res) return []byte(res) } g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If channel is a pattern channel, then directly compare the channel name to pattern. if channel.pattern != nil && channel.name == pattern && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 continue } // Channel is not a pattern channel. Check if the channel name matches the provided glob pattern. if g.Match(channel.name) && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } // Return a RESP array containing all the active channel names. return []byte(fmt.Sprintf("*%d\r\n%s", count, res)) }


2 phương thức sau đây là phương thức NumPatNumSub để xử lý PUBSUB NUMPAT

PUBSUB NUMSUB [channel [channel …]] lệnh tương ứng.


 func (ps *PubSub) NumPat() int { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int for _, channel := range ps.channels { if channel.pattern != nil && channel.IsActive() { count += 1 } } return count } func (ps *PubSub) NumSub(channels []string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() res := fmt.Sprintf("*%d\r\n", len(channels)) for _, channel := range channels { // If it's a pattern channel, skip it chanIdx := slices.IndexFunc(ps.channels, func(c *Channel) bool { return c.name == channel }) if chanIdx == -1 { res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:0\r\n", len(channel), channel) continue } res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:%d\r\n", len(channel), channel, ps.channels[chanIdx].NumSubs()) } return []byte(res) }


API nhúng

Mô-đun PubSub mong muốn một kết nối được chuyển để đăng ký một kênh khách hàng. Tuy nhiên, khi nhúng EchoVault, không có kết nối TCP nào tới máy khách có thể được sử dụng để đăng ký. Để giải quyết vấn đề này, chúng tôi sử dụng net.Pipe để có được cả hai đầu kết nối.


Một đầu của kết nối được chuyển tới bộ xử lý lệnh, bộ xử lý này sử dụng nó để đăng ký kênh được chỉ định. Đầu kia của kết nối được sử dụng trong hàm ReadPubSubMessage được trả về.


 type conn struct { readConn *net.Conn writeConn *net.Conn } var connections map[string]conn // ReadPubSubMessage is returned by the SUBSCRIBE and PSUBSCRIBE functions. // // This function is lazy, therefore it needs to be invoked in order to read the next message. // When the message is read, the function returns a string slice with 3 elements. // Index 0 holds the event type which in this case will be "message". Index 1 holds the channel name. // Index 2 holds the actual message. type ReadPubSubMessage func() []string // SUBSCRIBE subscribes the caller to the list of provided channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) SUBSCRIBE(tag string, channels ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"SUBSCRIBE"}, channels...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // UNSUBSCRIBE unsubscribes the caller from the given channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to unsubscribe from. func (server *EchoVault) UNSUBSCRIBE(tag string, channels ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"UNSUBSCRIBE"}, channels...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) } // PSUBSCRIBE subscribes the caller to the list of provided glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) PSUBSCRIBE(tag string, patterns ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"PSUBSCRIBE"}, patterns...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // PUNSUBSCRIBE unsubscribes the caller from the given glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to unsubscribe from. func (server *EchoVault) PUNSUBSCRIBE(tag string, patterns ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"PUNSUBSCRIBE"}, patterns...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }


Dưới đây là ví dụ về cách sử dụng API ĐĂNG KÝ và XUẤT BẢN:


 // Subscribe to multiple EchoVault channels. readMessage := server.SUBSCRIBE("subscriber1", "channel_1", "channel_2", "channel_3") wg.Add(1) go func() { wg.Done() for { message := readMessage() fmt.Printf("EVENT: %s, CHANNEL: %s, MESSAGE: %s\n", message[0], message[1], message[2]) } }() wg.Wait() wg.Add(1) go func() { for i := 1; i <= 3; i++ { // Simulating delay. <-time.After(1 * time.Second) // Publish message to each EchoVault channel. _, _ = server.PUBLISH(fmt.Sprintf("channel_%d", i), "Hello!") } wg.Done() }() wg.Wait()

Phần kết luận

Bài viết này tóm tắt nhanh về cách triển khai Pub/Sub trong EchoVault. Tất nhiên, rất nhiều bối cảnh xung quanh đoạn mã được chia sẻ ở đây không thể gói gọn trong một bài viết. Nếu bạn quan tâm đến ngữ cảnh, hãy xem kho lưu trữ GitHub của EchoVault và xem những gì chúng tôi đang xây dựng. Nếu bạn thích những gì bạn thấy, một ngôi sao Github sẽ được đánh giá rất cao!