Au cours de la dernière année, j'ai construit EchoVault , une alternative Redis intégrable pour l'écosystème Golang. EchoVault vise à répliquer la plupart des fonctionnalités de Redis tout en fournissant à la fois une interface intégrée et une interface client-serveur compatible avec les clients Redis existants utilisant le protocole RESP.
L'une des fonctionnalités implémentées dans EchoVault est la fonctionnalité Pub/Sub. Cet article est une brève présentation de la façon dont le module Pub/Sub a été implémenté au moment de la rédaction.
Pub/Sub signifie publier/s'abonner. Ce modèle permet aux consommateurs de s'abonner à certaines chaînes. Les producteurs publient des messages sur les chaînes et tous les consommateurs abonnés à cette chaîne reçoivent le message.
Dans notre contexte, nous devons permettre à un processus Go de s'abonner aux canaux spécifiés lorsqu'EchoVault est intégré. Lors de l'exécution en mode client-serveur, nous devons autoriser une connexion TCP client à s'abonner aux canaux.
Côté publication, un processus Go doit pouvoir publier sur un canal en mode intégré, et une connexion client TCP doit pouvoir publier un message sur un canal en mode client-serveur.
Avant de commencer, nous devons déterminer les exigences et le score de cette implémentation. EchoVault n'implémente pas toutes les fonctionnalités disponibles dans Redis Pub/Sub au moment de la rédaction. Cependant, les fonctionnalités de base les plus importantes sont implémentées. Voici ce que nous devrions pouvoir faire avec EchoVault PubSub :
Autoriser un client TCP à s'abonner à une liste de canaux à l'aide de la commande SUBSCRIBE channel [channel …]
. Le serveur doit envoyer à la connexion client une confirmation des abonnements.
Une instance EchoVault intégrée doit pouvoir s'abonner à une liste de canaux.
Autoriser un client TCP à s'abonner à un modèle à l'aide PSUBSCRIBE pattern [pattern …]
où le modèle est une chaîne globale qui permet au client de recevoir des messages publiés sur tous les canaux qui satisfont au modèle.
Une instance EchoVault intégrée doit pouvoir s'abonner à une liste de modèles.
Autoriser un client TCP à publier un message sur un canal à l'aide de la commande PUBLISH channel message
.
Publiez sur un canal à partir d’une instance EchoVault intégrée.
Autorisez les clients TCP à se désabonner des canaux et des modèles à l'aide des commandes UNSUBSCRIBE channel [channel …]
et PUNSUBSCRIBE pattern [pattern …]
, respectivement.
Autorisez la connexion client TCP à utiliser la commande PUBSUB CHANNELS [pattern]
pour afficher un tableau contenant les canaux qui correspondent au modèle donné. Si aucun modèle n'est fourni, tous les canaux actifs sont renvoyés. Les chaînes actives sont des chaînes avec un ou plusieurs abonnés.
Autorisez les clients TCP à utiliser la commande PUBSUB NUMPAT
pour afficher le nombre de modèles actuellement abonnés par les clients.
Fournissez une API intégrée pour afficher le nombre de modèles actifs.
Autorisez les clients TCP à utiliser la commande PUBSUB NUMSUB [channel [channel ...]]
pour afficher un tableau de tableaux contenant le nom du canal fourni et le nombre de clients actuellement abonnés au canal.
Fournissez une API intégrée pour afficher le nombre d'abonnés dans les chaînes données.
Maintenant que nous avons défini les exigences, passons à la mise en œuvre. À un niveau élevé, nous aurons une structure PubSub qui fournit toutes les fonctionnalités PubSub. Nous aurons également une structure Channel qui fournit les fonctionnalités des canaux et des canaux de modèle.
Pour cette section, nous utiliserons le package tidwall/resp. Les utilisateurs d'EchoVault utilisent ce package pour envoyer des réponses RESP aux abonnés de la chaîne. Le package gobwas/glob gère la logique du modèle global.
Tout d’abord, nous allons créer la structure Channel
et toutes ses méthodes. Nous aurons deux types de canaux : les canaux réguliers et les canaux de modèles.
Les chaînes normales sont des chaînes avec des noms et aucun modèle ne leur est associé. Les canaux de modèle utiliseront le modèle comme nom et un modèle global compilé sera associé au canal.
Les canaux de modèles sont utilisés pour les abonnements aux modèles. Sinon, les canaux réguliers sont utilisés.
La structure Channel
a la forme suivante :
type Channel struct { name string // Channel name. This can be a glob pattern string. pattern glob.Glob // Compiled glob pattern. This is nil if the channel is not a pattern channel. subscribersRWMut sync.RWMutex // RWMutex to concurrency control when accessing channel subscribers. subscribers map[*net.Conn]*resp.Conn // Map containing the channel subscribers. messageChan *chan string // Messages published to this channel will be sent to this channel. }
Nous utiliserons le modèle d'option pour créer une nouvelle instance de canal. Voici les deux options disponibles :
// WithName option sets the channels name. func WithName(name string) func(channel *Channel) { return func(channel *Channel) { channel.name = name } } // WithPattern option sets the compiled glob pattern for the channel if it's a pattern channel. func WithPattern(pattern string) func(channel *Channel) { return func(channel *Channel) { channel.name = pattern channel.pattern = glob.MustCompile(pattern) } } func NewChannel(options ...func(channel *Channel)) *Channel { messageChan := make(chan string, 4096) // messageChan is buffered. This could be a configurable value. channel := &Channel{ name: "", pattern: nil, subscribersRWMut: sync.RWMutex{}, subscribers: make(map[*net.Conn]*resp.Conn), messageChan: &messageChan, } for _, option := range options { option(channel) } return channel }
La première méthode pour la structure Channel est la méthode Start
. Cette méthode démarre une goroutine qui écoute sur messageChan
les messages à diffuser à tous ses abonnés. Voici la mise en œuvre :
func (ch *Channel) Start() { go func() { for { message := <-*ch.messageChan ch.subscribersRWMut.RLock() for _, conn := range ch.subscribers { go func(conn *resp.Conn) { if err := conn.WriteArray([]resp.Value{ resp.StringValue("message"), resp.StringValue(ch.name), resp.StringValue(message), }); err != nil { log.Println(err) } }(conn) } ch.subscribersRWMut.RUnlock() } }() }
La goroutine récupère le message suivant de messageChan
, acquiert un verrou de lecture pour les abonnés, diffuse le message à chaque abonné, puis libère le verrou de lecture. Comme vous pouvez le voir, le package tidwall/resp nous permet d'envoyer facilement les valeurs RESP à la connexion client.
La plupart des clients Redis s'attendent à ce que le format du message pub/sub soit un tableau ayant la forme suivante : ["message", <channel name>, <message string>]. C'est ce qui est diffusé par EchoVault.
Vous remarquerez peut-être que nous envoyons le message à resp.Conn
et non net.Conn
. Nous utilisons resp.Conn
car il fournit des méthodes d'assistance pour y écrire des valeurs RESP, telles que WriteArray
. Il s'agit essentiellement d'un wrapper autour net.Conn
comme nous le verrons dans la méthode Subscribe
:
func (ch *Channel) Subscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() // Acquire write-lock because we'll be modifying the subscriber map. defer ch.subscribersRWMut.Unlock() // If the connection does not exist in the subscriber map, add it. if _, ok := ch.subscribers[conn]; !ok { ch.subscribers[conn] = resp.NewConn(*conn) } _, ok := ch.subscribers[conn] return ok }
En plus de la méthode Subscribe, nous avons également besoin d'une méthode Unsubscribe :
func (ch *Channel) Unsubscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() defer ch.subscribersRWMut.Unlock() if _, ok := ch.subscribers[conn]; !ok { return false } delete(ch.subscribers, conn) return true }
La méthode Unsubscribe renvoie true
si la connexion a été trouvée et supprimée de la carte des abonnés. Sinon, il renvoie false
. En plus des méthodes ci-dessus, il existe d'autres méthodes d'assistance pour la structure Channel
:
func (ch *Channel) Name() string { return ch.name } func (ch *Channel) Pattern() glob.Glob { return ch.pattern } func (ch *Channel) Publish(message string) { *ch.messageChan <- message } // IsActive returns true when the channel has 1 or more subscribers. func (ch *Channel) IsActive() bool { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() active := len(ch.subscribers) > 0 return active } // NumSubs returns the number of subscribers for this channel. func (ch *Channel) NumSubs() int { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() n := len(ch.subscribers) return n } // Subscribers returns a copy of the subscriber map. func (ch *Channel) Subscribers() map[*net.Conn]*resp.Conn { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() subscribers := make(map[*net.Conn]*resp.Conn, len(ch.subscribers)) for k, v := range ch.subscribers { subscribers[k] = v } return subscribers }
Nous utiliserons le module PubSub pour interagir avec les chaînes. La forme de la structure PubSub est la suivante :
type PubSub struct { channels []*Channel // Slice of references to channels channelsRWMut sync.RWMutex // RWMutex for concurrency controls when accessing channels } func NewPubSub() *PubSub { return &PubSub{ channels: []*Channel{}, channelsRWMut: sync.RWMutex{}, } }
La première méthode est la méthode Subscribe
. Cette méthode gère l'abonnement du client au(x) canal(s) spécifié(s).
func (ps *PubSub) Subscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) { ps.channelsRWMut.Lock() // Acquire write-lock as we may edit the slice of channels. defer ps.channelsRWMut.Unlock() r := resp.NewConn(*conn) // Wrap net.Conn connection with resp.Conn. action := "subscribe" if withPattern { action = "psubscribe" } // Loop through all the channels that the client has requested to subscribe to. for i := 0; i < len(channels); i++ { // Check if channel with given name exists // If it does, subscribe the connection to the channel // If it does not, create the channel and subscribe to it channelIdx := slices.IndexFunc(ps.channels, func(channel *Channel) bool { return channel.name == channels[i] }) if channelIdx == -1 { // If the channel does not exist, create new channel, start it, and subscribe to it. var newChan *Channel if withPattern { newChan = NewChannel(WithPattern(channels[i])) } else { newChan = NewChannel(WithName(channels[i])) } newChan.Start() if newChan.Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(newChan.name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } ps.channels = append(ps.channels, newChan) // Append the new channel to the list of channels. } else { // Subscribe to existing channel if ps.channels[channelIdx].Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(ps.channels[channelIdx].name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } } } }
Pour abonner un client à un canal, EchoVault écoute une commande SUBSCRIBE
ou PSUBSCRIBE
. Lorsque la commande est reçue d'un client, la connexion TCP du client, ainsi que la liste des canaux, sont transmises à la méthode Subscribe
. Lorsqu'un client s'abonne à l'aide de la commande PSUBSCRIBE
, le paramètre withPatterns
sera true
et la valeur de la variable action
sera définie sur « psubscribe ».
La méthode Unsubscribe
permet à un client de se désabonner des canaux ou des modèles selon qu'il a utilisé SUBSCRIBE
ou PSUBSCRIBE
. Sa mise en œuvre est la suivante :
func (ps *PubSub) Unsubscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() action := "unsubscribe" if withPattern { action = "punsubscribe" } unsubscribed := make(map[int]string) // A map of all the channels/patterns successfully unsubscribed from. idx := 1 // idx holds the 1-based index of the channel/pattern unsubscribed from. if len(channels) <= 0 { if !withPattern { // If the channels slice is empty, and no pattern is provided // unsubscribe from all channels. for _, channel := range ps.channels { if channel.pattern != nil { // Skip pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } else { // If the channels slice is empty, and pattern is provided // unsubscribe from all patterns. for _, channel := range ps.channels { if channel.pattern == nil { // Skip non-pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } // Unsubscribe from channels where the name exactly matches channel name. // If unsubscribing from a pattern, also unsubscribe from all channel whose // names exactly matches the pattern name. for _, channel := range ps.channels { // For each channel in PubSub for _, c := range channels { // For each channel name provided if channel.name == c && channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } // If withPattern is true, unsubscribe from channels where pattern matches pattern provided, // also unsubscribe from channels where the name matches the given pattern. if withPattern { for _, pattern := range channels { g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If it's a pattern channel, directly compare the patterns if channel.pattern != nil && channel.name == pattern { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } continue } // If this is a regular channel, check if the channel name matches the pattern given if g.Match(channel.name) { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } } // Construct a RESP response confirming the channels/patterns unsubscribed from. res := fmt.Sprintf("*%d\r\n", len(unsubscribed)) for key, value := range unsubscribed { res += fmt.Sprintf("*3\r\n+%s\r\n$%d\r\n%s\r\n:%d\r\n", action, len(value), value, key) } return []byte(res) }
Lorsque le client se désabonne des canaux/modèles, il recevra une réponse contenant un tableau de tableaux. Chaque tableau interne contient l'action (désabonnement/jeu de mots), le nom du canal/modèle et l'index.
La méthode suivante est la méthode Publier. Cette méthode accepte le message et le nom du canal, puis gère la publication du message en interne.
func (ps *PubSub) Publish(_ context.Context, message string, channelName string) { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() // Loop through all of the existing channels. for _, channel := range ps.channels { // If it's a regular channel, check if the channel name matches the name given. if channel.pattern == nil { if channel.name == channelName { channel.Publish(message) // Publish the message to the channel. } continue } // If it's a glob pattern channel, check if the provided channel name matches the pattern. if channel.pattern.Match(channelName) { channel.Publish(message) // Publish the message to the channel } } }
La méthode suivante est la méthode Channels
, qui gère la commande PUBSUB CHANNELS [pattern]
:
func (ps *PubSub) Channels(pattern string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int var res string // If pattern is an empty string, return all the active channels. if pattern == "" { for _, channel := range ps.channels { if channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } res = fmt.Sprintf("*%d\r\n%s", count, res) return []byte(res) } g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If channel is a pattern channel, then directly compare the channel name to pattern. if channel.pattern != nil && channel.name == pattern && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 continue } // Channel is not a pattern channel. Check if the channel name matches the provided glob pattern. if g.Match(channel.name) && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } // Return a RESP array containing all the active channel names. return []byte(fmt.Sprintf("*%d\r\n%s", count, res)) }
Les 2 méthodes suivantes sont les méthodes NumPat
et NumSub
pour gérer le PUBSUB NUMPAT
et
Commandes PUBSUB NUMSUB [channel [channel …]]
respectivement.
func (ps *PubSub) NumPat() int { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int for _, channel := range ps.channels { if channel.pattern != nil && channel.IsActive() { count += 1 } } return count } func (ps *PubSub) NumSub(channels []string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() res := fmt.Sprintf("*%d\r\n", len(channels)) for _, channel := range channels { // If it's a pattern channel, skip it chanIdx := slices.IndexFunc(ps.channels, func(c *Channel) bool { return c.name == channel }) if chanIdx == -1 { res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:0\r\n", len(channel), channel) continue } res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:%d\r\n", len(channel), channel, ps.channels[chanIdx].NumSubs()) } return []byte(res) }
Le module PubSub attend qu'une connexion soit transmise pour abonner un client à un canal. Cependant, lorsque EchoVault est intégré, il n'existe aucune connexion TCP vers un client pouvant être utilisée pour un abonnement. Pour contourner ce problème, nous utilisons net.Pipe
pour obtenir les deux extrémités d'une connexion.
Une extrémité de la connexion est transmise au gestionnaire de commandes, qui l'utilise pour s'abonner au canal spécifié. L'autre extrémité de la connexion est utilisée dans la fonction ReadPubSubMessage
renvoyée.
type conn struct { readConn *net.Conn writeConn *net.Conn } var connections map[string]conn // ReadPubSubMessage is returned by the SUBSCRIBE and PSUBSCRIBE functions. // // This function is lazy, therefore it needs to be invoked in order to read the next message. // When the message is read, the function returns a string slice with 3 elements. // Index 0 holds the event type which in this case will be "message". Index 1 holds the channel name. // Index 2 holds the actual message. type ReadPubSubMessage func() []string // SUBSCRIBE subscribes the caller to the list of provided channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) SUBSCRIBE(tag string, channels ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"SUBSCRIBE"}, channels...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // UNSUBSCRIBE unsubscribes the caller from the given channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to unsubscribe from. func (server *EchoVault) UNSUBSCRIBE(tag string, channels ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"UNSUBSCRIBE"}, channels...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) } // PSUBSCRIBE subscribes the caller to the list of provided glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) PSUBSCRIBE(tag string, patterns ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"PSUBSCRIBE"}, patterns...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // PUNSUBSCRIBE unsubscribes the caller from the given glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to unsubscribe from. func (server *EchoVault) PUNSUBSCRIBE(tag string, patterns ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"PUNSUBSCRIBE"}, patterns...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }
Voici un exemple d'utilisation des API SUBSCRIBE et PUBLISH :
// Subscribe to multiple EchoVault channels. readMessage := server.SUBSCRIBE("subscriber1", "channel_1", "channel_2", "channel_3") wg.Add(1) go func() { wg.Done() for { message := readMessage() fmt.Printf("EVENT: %s, CHANNEL: %s, MESSAGE: %s\n", message[0], message[1], message[2]) } }() wg.Wait() wg.Add(1) go func() { for i := 1; i <= 3; i++ { // Simulating delay. <-time.After(1 * time.Second) // Publish message to each EchoVault channel. _, _ = server.PUBLISH(fmt.Sprintf("channel_%d", i), "Hello!") } wg.Done() }() wg.Wait()
Cet article était un bref aperçu de l'implémentation Pub/Sub dans EchoVault. Bien sûr, le code partagé ici est entouré de nombreux contextes qui ne peuvent pas tenir dans un seul article. Si le contexte vous intéresse, consultez le référentiel GitHub d'EchoVault et jetez un œil à ce que nous construisons. Si vous aimez ce que vous voyez, une star de Github serait extrêmement appréciée !