Im letzten Jahr habe ich EchoVault entwickelt, eine einbettbare Redis-Alternative für das Golang-Ökosystem. EchoVault zielt darauf ab, die meisten Redis-Funktionen zu replizieren und gleichzeitig sowohl eine eingebettete Schnittstelle als auch eine Client-Server-Schnittstelle bereitzustellen, die mit vorhandenen Redis-Clients kompatibel ist, die das RESP-Protokoll verwenden.
Eine der in EchoVault implementierten Funktionen ist die Pub/Sub-Funktion. Dieser Artikel ist eine kurze Einführung in die Implementierung des Pub/Sub-Moduls zum Zeitpunkt des Schreibens.
Pub/Sub steht für Publizieren/Abonnieren. Mit diesem Muster können Verbraucher bestimmte Kanäle abonnieren. Produzenten veröffentlichen Nachrichten in Kanälen und alle Verbraucher, die diesen Kanal abonniert haben, erhalten die Nachricht.
In unserem Kontext müssen wir einem Go-Prozess ermöglichen, bestimmte Kanäle zu abonnieren, wenn EchoVault eingebettet ist. Beim Ausführen im Client-Server-Modus müssen wir einer Client-TCP-Verbindung erlauben, Kanäle zu abonnieren.
Auf der Veröffentlichungsseite sollte ein Go-Prozess in der Lage sein, im eingebetteten Modus in einem Kanal zu veröffentlichen, und eine TCP-Client-Verbindung sollte in der Lage sein, eine Nachricht in einem Kanal im Client-Server-Modus zu veröffentlichen.
Bevor wir beginnen, müssen wir die Anforderungen und die Bewertung dieser Implementierung ermitteln. EchoVault implementiert zum Zeitpunkt des Schreibens nicht alle in Redis Pub/Sub verfügbaren Funktionen. Die wichtigsten Kernfunktionen sind jedoch implementiert. Folgendes sollten wir mit EchoVault PubSub tun können:
Erlauben Sie einem TCP-Client, eine Liste von Kanälen mit dem Befehl SUBSCRIBE channel [channel …]
zu abonnieren. Der Server sollte der Client-Verbindung eine Bestätigung der Abonnements senden.
Eine eingebettete EchoVault-Instanz sollte in der Lage sein, eine Liste von Kanälen zu abonnieren.
Erlauben Sie einem TCP-Client, ein Muster mit PSUBSCRIBE pattern [pattern …]
zu abonnieren, wobei das Muster eine Glob-Zeichenfolge ist, die es dem Client ermöglicht, Nachrichten zu empfangen, die an alle Kanäle gesendet werden, die dem Muster entsprechen.
Eine eingebettete EchoVault-Instanz sollte in der Lage sein, eine Liste von Mustern zu abonnieren.
Erlauben Sie einem TCP-Client, mit dem Befehl PUBLISH channel message
eine Nachricht in einem Kanal zu veröffentlichen.
Veröffentlichen Sie von einer eingebetteten EchoVault-Instanz in einem Kanal.
Erlauben Sie TCP-Clients, sich mit den Befehlen UNSUBSCRIBE channel [channel …]
bzw. PUNSUBSCRIBE pattern [pattern …]
von Kanälen und Mustern abzumelden.
Erlaubt der TCP-Clientverbindung, den Befehl PUBSUB CHANNELS [pattern]
zu verwenden, um ein Array mit den Kanälen anzuzeigen, die dem angegebenen Muster entsprechen. Wenn kein Muster angegeben ist, werden alle aktiven Kanäle zurückgegeben. Aktive Kanäle sind Kanäle mit einem oder mehreren Abonnenten.
Erlauben Sie TCP-Clients, den Befehl PUBSUB NUMPAT
zu verwenden, um die Anzahl der Muster anzuzeigen, die aktuell von Clients abonniert werden.
Stellen Sie eine eingebettete API bereit, um die Anzahl der aktiven Muster anzuzeigen.
Erlauben Sie TCP-Clients, den Befehl PUBSUB NUMSUB [channel [channel ...]]
zu verwenden, um ein Array von Arrays anzuzeigen, das den angegebenen Kanalnamen und die Anzahl der Clients enthält, die aktuell den Kanal abonniert haben.
Stellen Sie eine eingebettete API bereit, um die Anzahl der Abonnenten in den angegebenen Kanälen anzuzeigen.
Nachdem wir nun die Anforderungen festgelegt haben, können wir mit der Implementierung beginnen. Auf hoher Ebene verfügen wir über eine PubSub-Struktur, die die gesamte PubSub-Funktionalität bereitstellt. Außerdem verfügen wir über eine Channel-Struktur, die die Funktionalität für Kanäle und Musterkanäle bereitstellt.
Für diesen Abschnitt verwenden wir das Paket tidwall/resp. EchoVault-Benutzer verwenden dieses Paket, um RESP-Antworten an Kanalabonnenten zu senden. Das Paket gobwas/glob behandelt die Glob-Musterlogik.
Zuerst erstellen wir die Channel
und alle ihre Methoden. Wir werden zwei Arten von Kanälen haben: normale Kanäle und Musterkanäle.
Normale Kanäle sind Kanäle mit Namen und ohne zugeordnete Muster. Musterkanäle verwenden das Muster als Namen und dem Kanal wird ein kompiliertes Glob-Muster zugeordnet.
Musterkanäle werden für das Abonnement von Mustern verwendet. Andernfalls werden reguläre Kanäle verwendet.
Die Channel
hat die folgende Form:
type Channel struct { name string // Channel name. This can be a glob pattern string. pattern glob.Glob // Compiled glob pattern. This is nil if the channel is not a pattern channel. subscribersRWMut sync.RWMutex // RWMutex to concurrency control when accessing channel subscribers. subscribers map[*net.Conn]*resp.Conn // Map containing the channel subscribers. messageChan *chan string // Messages published to this channel will be sent to this channel. }
Wir verwenden das Optionsmuster, um eine neue Kanalinstanz zu erstellen. Hier sind die beiden verfügbaren Optionen:
// WithName option sets the channels name. func WithName(name string) func(channel *Channel) { return func(channel *Channel) { channel.name = name } } // WithPattern option sets the compiled glob pattern for the channel if it's a pattern channel. func WithPattern(pattern string) func(channel *Channel) { return func(channel *Channel) { channel.name = pattern channel.pattern = glob.MustCompile(pattern) } } func NewChannel(options ...func(channel *Channel)) *Channel { messageChan := make(chan string, 4096) // messageChan is buffered. This could be a configurable value. channel := &Channel{ name: "", pattern: nil, subscribersRWMut: sync.RWMutex{}, subscribers: make(map[*net.Conn]*resp.Conn), messageChan: &messageChan, } for _, option := range options { option(channel) } return channel }
Die erste Methode für die Channel-Struktur ist die Start
Methode. Diese Methode startet eine Goroutine, die auf messageChan
nach Nachrichten lauscht, die an alle ihre Abonnenten gesendet werden sollen. Hier ist die Implementierung:
func (ch *Channel) Start() { go func() { for { message := <-*ch.messageChan ch.subscribersRWMut.RLock() for _, conn := range ch.subscribers { go func(conn *resp.Conn) { if err := conn.WriteArray([]resp.Value{ resp.StringValue("message"), resp.StringValue(ch.name), resp.StringValue(message), }); err != nil { log.Println(err) } }(conn) } ch.subscribersRWMut.RUnlock() } }() }
Die Goroutine nimmt die nächste Nachricht von messageChan
entgegen, erwirbt eine Lesesperre für die Abonnenten, sendet die Nachricht an jeden Abonnenten und gibt dann die Lesesperre frei. Wie Sie sehen, können wir mit dem Paket tidwall/resp problemlos RESP-Werte an die Clientverbindung senden.
Die meisten Redis-Clients erwarten, dass das Pub/Sub-Nachrichtenformat ein Array in der folgenden Form ist: [„Nachricht“, <Kanalname>, <Nachrichtenzeichenfolge>]. Dies wird von EchoVault gesendet.
Möglicherweise fällt Ihnen auf, dass wir die Nachricht an resp.Conn
und nicht net.Conn
senden. Wir verwenden resp.Conn
, weil es Hilfsmethoden zum Schreiben von RESP-Werten bereitstellt, wie z. B. WriteArray
. Es ist im Wesentlichen ein Wrapper um net.Conn
, wie wir in der Subscribe
Methode sehen werden:
func (ch *Channel) Subscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() // Acquire write-lock because we'll be modifying the subscriber map. defer ch.subscribersRWMut.Unlock() // If the connection does not exist in the subscriber map, add it. if _, ok := ch.subscribers[conn]; !ok { ch.subscribers[conn] = resp.NewConn(*conn) } _, ok := ch.subscribers[conn] return ok }
Neben der Subscribe-Methode benötigen wir auch eine Unsubscribe-Methode:
func (ch *Channel) Unsubscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() defer ch.subscribersRWMut.Unlock() if _, ok := ch.subscribers[conn]; !ok { return false } delete(ch.subscribers, conn) return true }
Die Methode Unsubscribe gibt true
zurück, wenn die Verbindung gefunden und aus der Abonnentenzuordnung gelöscht wurde. Andernfalls gibt sie false
zurück. Zusätzlich zu den oben genannten Methoden gibt es noch einige weitere Hilfsmethoden für die Channel
Struktur:
func (ch *Channel) Name() string { return ch.name } func (ch *Channel) Pattern() glob.Glob { return ch.pattern } func (ch *Channel) Publish(message string) { *ch.messageChan <- message } // IsActive returns true when the channel has 1 or more subscribers. func (ch *Channel) IsActive() bool { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() active := len(ch.subscribers) > 0 return active } // NumSubs returns the number of subscribers for this channel. func (ch *Channel) NumSubs() int { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() n := len(ch.subscribers) return n } // Subscribers returns a copy of the subscriber map. func (ch *Channel) Subscribers() map[*net.Conn]*resp.Conn { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() subscribers := make(map[*net.Conn]*resp.Conn, len(ch.subscribers)) for k, v := range ch.subscribers { subscribers[k] = v } return subscribers }
Wir werden das PubSub-Modul verwenden, um mit den Kanälen zu interagieren. Die Form der PubSub-Struktur ist wie folgt:
type PubSub struct { channels []*Channel // Slice of references to channels channelsRWMut sync.RWMutex // RWMutex for concurrency controls when accessing channels } func NewPubSub() *PubSub { return &PubSub{ channels: []*Channel{}, channelsRWMut: sync.RWMutex{}, } }
Die erste Methode ist die Subscribe
Methode. Diese Methode kümmert sich um das Abonnieren des Clients für die angegebenen Kanäle.
func (ps *PubSub) Subscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) { ps.channelsRWMut.Lock() // Acquire write-lock as we may edit the slice of channels. defer ps.channelsRWMut.Unlock() r := resp.NewConn(*conn) // Wrap net.Conn connection with resp.Conn. action := "subscribe" if withPattern { action = "psubscribe" } // Loop through all the channels that the client has requested to subscribe to. for i := 0; i < len(channels); i++ { // Check if channel with given name exists // If it does, subscribe the connection to the channel // If it does not, create the channel and subscribe to it channelIdx := slices.IndexFunc(ps.channels, func(channel *Channel) bool { return channel.name == channels[i] }) if channelIdx == -1 { // If the channel does not exist, create new channel, start it, and subscribe to it. var newChan *Channel if withPattern { newChan = NewChannel(WithPattern(channels[i])) } else { newChan = NewChannel(WithName(channels[i])) } newChan.Start() if newChan.Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(newChan.name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } ps.channels = append(ps.channels, newChan) // Append the new channel to the list of channels. } else { // Subscribe to existing channel if ps.channels[channelIdx].Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(ps.channels[channelIdx].name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } } } }
Um einen Client für einen Kanal zu abonnieren, wartet EchoVault auf einen SUBSCRIBE
oder PSUBSCRIBE
Befehl. Wenn der Befehl von einem Client empfangen wird, wird die TCP-Verbindung des Clients zusammen mit der Liste der Kanäle an die Subscribe
Methode übergeben. Wenn ein Client mit dem PSUBSCRIBE
Befehl abonniert, ist der Parameter withPatterns
true
und der Wert der action
wird auf „psubscribe“ gesetzt.
Mit der Methode Unsubscribe
kann ein Client Kanäle oder Muster abbestellen, je nachdem, ob er SUBSCRIBE
oder PSUBSCRIBE
verwendet hat. Die Implementierung erfolgt wie folgt:
func (ps *PubSub) Unsubscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() action := "unsubscribe" if withPattern { action = "punsubscribe" } unsubscribed := make(map[int]string) // A map of all the channels/patterns successfully unsubscribed from. idx := 1 // idx holds the 1-based index of the channel/pattern unsubscribed from. if len(channels) <= 0 { if !withPattern { // If the channels slice is empty, and no pattern is provided // unsubscribe from all channels. for _, channel := range ps.channels { if channel.pattern != nil { // Skip pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } else { // If the channels slice is empty, and pattern is provided // unsubscribe from all patterns. for _, channel := range ps.channels { if channel.pattern == nil { // Skip non-pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } // Unsubscribe from channels where the name exactly matches channel name. // If unsubscribing from a pattern, also unsubscribe from all channel whose // names exactly matches the pattern name. for _, channel := range ps.channels { // For each channel in PubSub for _, c := range channels { // For each channel name provided if channel.name == c && channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } // If withPattern is true, unsubscribe from channels where pattern matches pattern provided, // also unsubscribe from channels where the name matches the given pattern. if withPattern { for _, pattern := range channels { g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If it's a pattern channel, directly compare the patterns if channel.pattern != nil && channel.name == pattern { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } continue } // If this is a regular channel, check if the channel name matches the pattern given if g.Match(channel.name) { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } } // Construct a RESP response confirming the channels/patterns unsubscribed from. res := fmt.Sprintf("*%d\r\n", len(unsubscribed)) for key, value := range unsubscribed { res += fmt.Sprintf("*3\r\n+%s\r\n$%d\r\n%s\r\n:%d\r\n", action, len(value), value, key) } return []byte(res) }
Wenn der Client sich von Kanälen/Mustern abmeldet, erhält er eine Antwort, die ein Array von Arrays enthält. Jedes innere Array enthält die Aktion (Abbestellen/Abmelden aufheben), den Kanal-/Musternamen und den Index.
Die nächste Methode ist die Publish-Methode. Diese Methode akzeptiert den Nachrichten- und Kanalnamen und kümmert sich dann intern um die Veröffentlichung der Nachricht.
func (ps *PubSub) Publish(_ context.Context, message string, channelName string) { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() // Loop through all of the existing channels. for _, channel := range ps.channels { // If it's a regular channel, check if the channel name matches the name given. if channel.pattern == nil { if channel.name == channelName { channel.Publish(message) // Publish the message to the channel. } continue } // If it's a glob pattern channel, check if the provided channel name matches the pattern. if channel.pattern.Match(channelName) { channel.Publish(message) // Publish the message to the channel } } }
Die nächste Methode ist die Channels
Methode, die den PUBSUB CHANNELS [pattern]
verarbeitet:
func (ps *PubSub) Channels(pattern string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int var res string // If pattern is an empty string, return all the active channels. if pattern == "" { for _, channel := range ps.channels { if channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } res = fmt.Sprintf("*%d\r\n%s", count, res) return []byte(res) } g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If channel is a pattern channel, then directly compare the channel name to pattern. if channel.pattern != nil && channel.name == pattern && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 continue } // Channel is not a pattern channel. Check if the channel name matches the provided glob pattern. if g.Match(channel.name) && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } // Return a RESP array containing all the active channel names. return []byte(fmt.Sprintf("*%d\r\n%s", count, res)) }
Die folgenden 2 Methoden sind die NumPat
und NumSub
Methoden zur Handhabung der PUBSUB NUMPAT
und
PUBSUB NUMSUB [channel [channel …]]
Befehle jeweils.
func (ps *PubSub) NumPat() int { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int for _, channel := range ps.channels { if channel.pattern != nil && channel.IsActive() { count += 1 } } return count } func (ps *PubSub) NumSub(channels []string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() res := fmt.Sprintf("*%d\r\n", len(channels)) for _, channel := range channels { // If it's a pattern channel, skip it chanIdx := slices.IndexFunc(ps.channels, func(c *Channel) bool { return c.name == channel }) if chanIdx == -1 { res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:0\r\n", len(channel), channel) continue } res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:%d\r\n", len(channel), channel, ps.channels[chanIdx].NumSubs()) } return []byte(res) }
Das PubSub-Modul erwartet, dass eine Verbindung übergeben wird, um einen Client für einen Kanal zu abonnieren. Wenn EchoVault jedoch eingebettet ist, gibt es keine TCP-Verbindung zu einem Client, die für ein Abonnement verwendet werden kann. Um dies zu umgehen, verwenden wir net.Pipe
, um beide Enden einer Verbindung abzurufen.
Ein Ende der Verbindung wird an den Befehlshandler übergeben, der es zum Abonnieren des angegebenen Kanals verwendet. Das andere Ende der Verbindung wird in der zurückgegebenen Funktion ReadPubSubMessage
verwendet.
type conn struct { readConn *net.Conn writeConn *net.Conn } var connections map[string]conn // ReadPubSubMessage is returned by the SUBSCRIBE and PSUBSCRIBE functions. // // This function is lazy, therefore it needs to be invoked in order to read the next message. // When the message is read, the function returns a string slice with 3 elements. // Index 0 holds the event type which in this case will be "message". Index 1 holds the channel name. // Index 2 holds the actual message. type ReadPubSubMessage func() []string // SUBSCRIBE subscribes the caller to the list of provided channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) SUBSCRIBE(tag string, channels ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"SUBSCRIBE"}, channels...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // UNSUBSCRIBE unsubscribes the caller from the given channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to unsubscribe from. func (server *EchoVault) UNSUBSCRIBE(tag string, channels ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"UNSUBSCRIBE"}, channels...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) } // PSUBSCRIBE subscribes the caller to the list of provided glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) PSUBSCRIBE(tag string, patterns ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"PSUBSCRIBE"}, patterns...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // PUNSUBSCRIBE unsubscribes the caller from the given glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to unsubscribe from. func (server *EchoVault) PUNSUBSCRIBE(tag string, patterns ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"PUNSUBSCRIBE"}, patterns...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }
Hier ist ein Beispiel für die Verwendung der SUBSCRIBE- und PUBLISH-API:
// Subscribe to multiple EchoVault channels. readMessage := server.SUBSCRIBE("subscriber1", "channel_1", "channel_2", "channel_3") wg.Add(1) go func() { wg.Done() for { message := readMessage() fmt.Printf("EVENT: %s, CHANNEL: %s, MESSAGE: %s\n", message[0], message[1], message[2]) } }() wg.Wait() wg.Add(1) go func() { for i := 1; i <= 3; i++ { // Simulating delay. <-time.After(1 * time.Second) // Publish message to each EchoVault channel. _, _ = server.PUBLISH(fmt.Sprintf("channel_%d", i), "Hello!") } wg.Done() }() wg.Wait()
Dieser Artikel war ein kurzer Überblick über die Pub/Sub-Implementierung in EchoVault. Natürlich gibt es zu dem hier geteilten Code viel Kontext, der nicht in einen Artikel passt. Wenn Sie sich für den Kontext interessieren, sehen Sie sich das GitHub-Repository von EchoVault an und sehen Sie sich an, was wir bauen. Wenn Ihnen gefällt, was Sie sehen, würden wir uns riesig über einen Github-Stern freuen!