paint-brush
Cómo construí un sistema Pub/Sub compatible con Redis usando Golangpor@kelvinm
2,869 lecturas
2,869 lecturas

Cómo construí un sistema Pub/Sub compatible con Redis usando Golang

por Kelvin Clement M.18m2024/04/28
Read on Terminal Reader

Demasiado Largo; Para Leer

Durante el último año, he estado construyendo EchoVault, una alternativa integrable a Redis para el ecosistema de Golang. EchoVault tiene como objetivo replicar la mayoría de las funciones de Redis y, al mismo tiempo, proporciona una interfaz integrada y una interfaz cliente-servidor que sea compatible con los clientes Redis existentes que utilizan el protocolo RESP. Una de las funciones implementadas en EchoVault es la función Pub/Sub. Este artículo es un breve recorrido por cómo se implementó el módulo Pub/Sub al momento de escribir este artículo.
featured image - Cómo construí un sistema Pub/Sub compatible con Redis usando Golang
Kelvin Clement M. HackerNoon profile picture
0-item

Durante el último año, he estado construyendo EchoVault , una alternativa integrable a Redis para el ecosistema de Golang. EchoVault tiene como objetivo replicar la mayoría de las funciones de Redis y al mismo tiempo proporcionar una interfaz integrada y una interfaz cliente-servidor que sea compatible con los clientes Redis existentes que utilizan el protocolo RESP.


Una de las funciones implementadas en EchoVault es la función Pub/Sub. Este artículo es un breve recorrido por cómo se implementó el módulo Pub/Sub al momento de escribir este artículo.

¿Qué es Pub/Sub?

Pub/Sub significa publicar/suscribir. Este patrón permite a los consumidores suscribirse a ciertos canales. Los productores publican mensajes en los canales y todos los consumidores suscritos a ese canal reciben el mensaje.


En nuestro contexto, debemos habilitar un proceso Go para suscribirse a canales específicos cuando EchoVault está integrado. Cuando se ejecuta en modo cliente-servidor, debemos permitir que una conexión TCP de cliente se suscriba a canales.


En el lado de la publicación, un proceso Go debería poder publicar en un canal en modo integrado, y una conexión de cliente TCP debería poder publicar un mensaje en un canal en modo cliente-servidor.

Requisitos

Antes de comenzar, debemos determinar los requisitos y la puntuación de esta implementación. EchoVault no implementa todas las funciones disponibles en Redis Pub/Sub al momento de escribir este artículo. Sin embargo, se implementan las características principales más importantes. Esto es lo que deberíamos poder hacer con EchoVault PubSub:


  1. Permita que un cliente TCP se suscriba a una lista de canales usando el comando SUBSCRIBE channel [channel …] . El servidor debe enviar a la conexión del cliente una confirmación de las suscripciones.


  2. Una instancia de EchoVault integrada debería poder suscribirse a una lista de canales.


  3. Permitir que un cliente TCP se suscriba a un patrón utilizando PSUBSCRIBE pattern [pattern …] donde el patrón es una cadena global que permite al cliente recibir mensajes publicados en todos los canales que satisfacen el patrón.


  4. Una instancia de EchoVault integrada debería poder suscribirse a una lista de patrones.


  5. Permita que un cliente TCP publique un mensaje en un canal usando el comando PUBLISH channel message .


  6. Publicar en un canal desde una instancia de EchoVault integrada.


  7. Permita que los clientes TCP se den de baja de canales y patrones usando los comandos UNSUBSCRIBE channel [channel …] y PUNSUBSCRIBE pattern [pattern …] , respectivamente.


  8. Permita que la conexión del cliente TCP utilice el comando PUBSUB CHANNELS [pattern] para ver una matriz que contiene los canales que coinciden con el patrón dado. Si no se proporciona ningún patrón, se devuelven todos los canales activos. Los canales activos son canales con uno o más suscriptores.


  9. Permita que los clientes TCP utilicen el comando PUBSUB NUMPAT para ver la cantidad de patrones a los que están suscritos actualmente los clientes.


  10. Proporcione una API integrada para ver la cantidad de patrones activos.


  11. Permita que los clientes TCP utilicen el comando PUBSUB NUMSUB [channel [channel ...]] para ver una serie de matrices que contienen el nombre del canal proporcionado y cuántos clientes están suscritos actualmente al canal.


  12. Proporcione una API integrada para ver la cantidad de suscriptores en los canales determinados.

Implementación

Ahora que tenemos los requisitos establecidos, pasemos a la implementación. En un nivel alto, tendremos una estructura PubSub que proporciona toda la funcionalidad de PubSub. También tendremos una estructura Channel que proporciona la funcionalidad para canales y canales de patrones.


Para esta sección, usaremos el paquete tidwall/resp. Los usuarios de EchoVault utilizan este paquete para enviar respuestas RESP a los suscriptores del canal. El paquete gobwas/glob maneja la lógica del patrón glob.

Canal

Primero, crearemos la estructura Channel y todos sus métodos. Tendremos dos tipos de canales: canales regulares y canales de patrones.

Los canales regulares son canales con nombres y sin patrones asociados a ellos. Los canales de patrón utilizarán el patrón como nombre y se asociará un patrón global compilado con el canal.


Los canales de patrones se utilizan para suscripciones a patrones. De lo contrario, se utilizan canales regulares.


La estructura Channel tiene la siguiente forma:


 type Channel struct { name string // Channel name. This can be a glob pattern string. pattern glob.Glob // Compiled glob pattern. This is nil if the channel is not a pattern channel. subscribersRWMut sync.RWMutex // RWMutex to concurrency control when accessing channel subscribers. subscribers map[*net.Conn]*resp.Conn // Map containing the channel subscribers. messageChan *chan string // Messages published to this channel will be sent to this channel. }


Usaremos el patrón de opción para crear una nueva instancia de canal. Aquí están las dos opciones disponibles:


 // WithName option sets the channels name. func WithName(name string) func(channel *Channel) { return func(channel *Channel) { channel.name = name } } // WithPattern option sets the compiled glob pattern for the channel if it's a pattern channel. func WithPattern(pattern string) func(channel *Channel) { return func(channel *Channel) { channel.name = pattern channel.pattern = glob.MustCompile(pattern) } } func NewChannel(options ...func(channel *Channel)) *Channel { messageChan := make(chan string, 4096) // messageChan is buffered. This could be a configurable value. channel := &Channel{ name: "", pattern: nil, subscribersRWMut: sync.RWMutex{}, subscribers: make(map[*net.Conn]*resp.Conn), messageChan: &messageChan, } for _, option := range options { option(channel) } return channel }


El primer método para la estructura Channel es el método Start . Este método inicia una rutina que escucha en messageChan los mensajes para transmitirlos a todos sus suscriptores. Aquí está la implementación:


 func (ch *Channel) Start() { go func() { for { message := <-*ch.messageChan ch.subscribersRWMut.RLock() for _, conn := range ch.subscribers { go func(conn *resp.Conn) { if err := conn.WriteArray([]resp.Value{ resp.StringValue("message"), resp.StringValue(ch.name), resp.StringValue(message), }); err != nil { log.Println(err) } }(conn) } ch.subscribersRWMut.RUnlock() } }() }


La gorutina recoge el siguiente mensaje de messageChan , adquiere un bloqueo de lectura para los suscriptores, transmite el mensaje a cada suscriptor y luego libera el bloqueo de lectura. Como puede ver, el paquete tidwall/resp nos permite enviar valores RESP a la conexión del cliente fácilmente.


La mayoría de los clientes de Redis esperan que el formato del mensaje de publicación/suscripción sea una matriz con la siguiente forma: [“mensaje”, <nombre del canal>, <cadena de mensaje>]. Eso es lo que transmite EchoVault.


Es posible que notes que enviamos el mensaje a resp.Conn y no net.Conn . Estamos usando resp.Conn porque proporciona métodos auxiliares para escribirle valores RESP, como WriteArray . Es esencialmente un contenedor de net.Conn , como veremos en el método Subscribe :


 func (ch *Channel) Subscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() // Acquire write-lock because we'll be modifying the subscriber map. defer ch.subscribersRWMut.Unlock() // If the connection does not exist in the subscriber map, add it. if _, ok := ch.subscribers[conn]; !ok { ch.subscribers[conn] = resp.NewConn(*conn) } _, ok := ch.subscribers[conn] return ok }


Junto con el método de suscripción, también necesitamos un método para cancelar la suscripción:


 func (ch *Channel) Unsubscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() defer ch.subscribersRWMut.Unlock() if _, ok := ch.subscribers[conn]; !ok { return false } delete(ch.subscribers, conn) return true }


El método Cancelar suscripción devuelve true si la conexión se encontró y se eliminó del mapa de suscriptores. De lo contrario, devuelve false . Además de los métodos anteriores, existen algunos métodos auxiliares más para la estructura Channel :


 func (ch *Channel) Name() string { return ch.name } func (ch *Channel) Pattern() glob.Glob { return ch.pattern } func (ch *Channel) Publish(message string) { *ch.messageChan <- message } // IsActive returns true when the channel has 1 or more subscribers. func (ch *Channel) IsActive() bool { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() active := len(ch.subscribers) > 0 return active } // NumSubs returns the number of subscribers for this channel. func (ch *Channel) NumSubs() int { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() n := len(ch.subscribers) return n } // Subscribers returns a copy of the subscriber map. func (ch *Channel) Subscribers() map[*net.Conn]*resp.Conn { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() subscribers := make(map[*net.Conn]*resp.Conn, len(ch.subscribers)) for k, v := range ch.subscribers { subscribers[k] = v } return subscribers }

PubSub

Usaremos el módulo PubSub para interactuar con los canales. La forma de la estructura PubSub es la siguiente:


 type PubSub struct { channels []*Channel // Slice of references to channels channelsRWMut sync.RWMutex // RWMutex for concurrency controls when accessing channels } func NewPubSub() *PubSub { return &PubSub{ channels: []*Channel{}, channelsRWMut: sync.RWMutex{}, } }


El primer método es el método Subscribe . Este método maneja la suscripción del cliente a los canales especificados.


 func (ps *PubSub) Subscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) { ps.channelsRWMut.Lock() // Acquire write-lock as we may edit the slice of channels. defer ps.channelsRWMut.Unlock() r := resp.NewConn(*conn) // Wrap net.Conn connection with resp.Conn. action := "subscribe" if withPattern { action = "psubscribe" } // Loop through all the channels that the client has requested to subscribe to. for i := 0; i < len(channels); i++ { // Check if channel with given name exists // If it does, subscribe the connection to the channel // If it does not, create the channel and subscribe to it channelIdx := slices.IndexFunc(ps.channels, func(channel *Channel) bool { return channel.name == channels[i] }) if channelIdx == -1 { // If the channel does not exist, create new channel, start it, and subscribe to it. var newChan *Channel if withPattern { newChan = NewChannel(WithPattern(channels[i])) } else { newChan = NewChannel(WithName(channels[i])) } newChan.Start() if newChan.Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(newChan.name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } ps.channels = append(ps.channels, newChan) // Append the new channel to the list of channels. } else { // Subscribe to existing channel if ps.channels[channelIdx].Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(ps.channels[channelIdx].name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } } } }


Para suscribir un cliente a un canal, EchoVault escucha un comando SUBSCRIBE o PSUBSCRIBE . Cuando se recibe el comando de un cliente, la conexión TCP del cliente, junto con la lista de canales, se pasa al método Subscribe . Cuando un cliente se suscribe usando el comando PSUBSCRIBE , el parámetro withPatterns será true y el valor de la variable action se establecerá en "psubscribe".


El método Unsubscribe permite a un cliente cancelar la suscripción a canales o patrones en función de si utilizó SUBSCRIBE o PSUBSCRIBE . Su implementación es la siguiente:


 func (ps *PubSub) Unsubscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() action := "unsubscribe" if withPattern { action = "punsubscribe" } unsubscribed := make(map[int]string) // A map of all the channels/patterns successfully unsubscribed from. idx := 1 // idx holds the 1-based index of the channel/pattern unsubscribed from. if len(channels) <= 0 { if !withPattern { // If the channels slice is empty, and no pattern is provided // unsubscribe from all channels. for _, channel := range ps.channels { if channel.pattern != nil { // Skip pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } else { // If the channels slice is empty, and pattern is provided // unsubscribe from all patterns. for _, channel := range ps.channels { if channel.pattern == nil { // Skip non-pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } // Unsubscribe from channels where the name exactly matches channel name. // If unsubscribing from a pattern, also unsubscribe from all channel whose // names exactly matches the pattern name. for _, channel := range ps.channels { // For each channel in PubSub for _, c := range channels { // For each channel name provided if channel.name == c && channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } // If withPattern is true, unsubscribe from channels where pattern matches pattern provided, // also unsubscribe from channels where the name matches the given pattern. if withPattern { for _, pattern := range channels { g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If it's a pattern channel, directly compare the patterns if channel.pattern != nil && channel.name == pattern { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } continue } // If this is a regular channel, check if the channel name matches the pattern given if g.Match(channel.name) { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } } // Construct a RESP response confirming the channels/patterns unsubscribed from. res := fmt.Sprintf("*%d\r\n", len(unsubscribed)) for key, value := range unsubscribed { res += fmt.Sprintf("*3\r\n+%s\r\n$%d\r\n%s\r\n:%d\r\n", action, len(value), value, key) } return []byte(res) }


Cuando el cliente se da de baja de canales/patrones, recibirá una respuesta que contiene una serie de matrices. Cada matriz interna contiene la acción (cancelar suscripción/punsubscribe), el nombre del canal/patrón y el índice.


El siguiente método es el método Publicar. Este método acepta el mensaje y el nombre del canal y luego maneja la publicación del mensaje internamente.


 func (ps *PubSub) Publish(_ context.Context, message string, channelName string) { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() // Loop through all of the existing channels. for _, channel := range ps.channels { // If it's a regular channel, check if the channel name matches the name given. if channel.pattern == nil { if channel.name == channelName { channel.Publish(message) // Publish the message to the channel. } continue } // If it's a glob pattern channel, check if the provided channel name matches the pattern. if channel.pattern.Match(channelName) { channel.Publish(message) // Publish the message to the channel } } }


El siguiente método es el método Channels , que maneja el comando PUBSUB CHANNELS [pattern] :


 func (ps *PubSub) Channels(pattern string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int var res string // If pattern is an empty string, return all the active channels. if pattern == "" { for _, channel := range ps.channels { if channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } res = fmt.Sprintf("*%d\r\n%s", count, res) return []byte(res) } g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If channel is a pattern channel, then directly compare the channel name to pattern. if channel.pattern != nil && channel.name == pattern && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 continue } // Channel is not a pattern channel. Check if the channel name matches the provided glob pattern. if g.Match(channel.name) && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } // Return a RESP array containing all the active channel names. return []byte(fmt.Sprintf("*%d\r\n%s", count, res)) }


Los siguientes 2 métodos son los métodos NumPat y NumSub para manejar PUBSUB NUMPAT y

Comandos PUBSUB NUMSUB [channel [channel …]] respectivamente.


 func (ps *PubSub) NumPat() int { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int for _, channel := range ps.channels { if channel.pattern != nil && channel.IsActive() { count += 1 } } return count } func (ps *PubSub) NumSub(channels []string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() res := fmt.Sprintf("*%d\r\n", len(channels)) for _, channel := range channels { // If it's a pattern channel, skip it chanIdx := slices.IndexFunc(ps.channels, func(c *Channel) bool { return c.name == channel }) if chanIdx == -1 { res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:0\r\n", len(channel), channel) continue } res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:%d\r\n", len(channel), channel, ps.channels[chanIdx].NumSubs()) } return []byte(res) }


API integrada

El módulo PubSub espera que se pase una conexión para suscribir a un cliente a un canal. Sin embargo, cuando EchoVault está integrado, no hay ninguna conexión TCP con un cliente que pueda usarse para una suscripción. Para solucionar esto, usamos net.Pipe para obtener ambos extremos de una conexión.


Un extremo de la conexión se pasa al controlador de comandos, que lo utiliza para suscribirse al canal especificado. El otro extremo de la conexión se utiliza en la función ReadPubSubMessage devuelta.


 type conn struct { readConn *net.Conn writeConn *net.Conn } var connections map[string]conn // ReadPubSubMessage is returned by the SUBSCRIBE and PSUBSCRIBE functions. // // This function is lazy, therefore it needs to be invoked in order to read the next message. // When the message is read, the function returns a string slice with 3 elements. // Index 0 holds the event type which in this case will be "message". Index 1 holds the channel name. // Index 2 holds the actual message. type ReadPubSubMessage func() []string // SUBSCRIBE subscribes the caller to the list of provided channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) SUBSCRIBE(tag string, channels ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"SUBSCRIBE"}, channels...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // UNSUBSCRIBE unsubscribes the caller from the given channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to unsubscribe from. func (server *EchoVault) UNSUBSCRIBE(tag string, channels ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"UNSUBSCRIBE"}, channels...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) } // PSUBSCRIBE subscribes the caller to the list of provided glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) PSUBSCRIBE(tag string, patterns ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"PSUBSCRIBE"}, patterns...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // PUNSUBSCRIBE unsubscribes the caller from the given glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to unsubscribe from. func (server *EchoVault) PUNSUBSCRIBE(tag string, patterns ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"PUNSUBSCRIBE"}, patterns...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }


A continuación se muestra un ejemplo de cómo utilizar SUBSCRIBE y PUBLISH API:


 // Subscribe to multiple EchoVault channels. readMessage := server.SUBSCRIBE("subscriber1", "channel_1", "channel_2", "channel_3") wg.Add(1) go func() { wg.Done() for { message := readMessage() fmt.Printf("EVENT: %s, CHANNEL: %s, MESSAGE: %s\n", message[0], message[1], message[2]) } }() wg.Wait() wg.Add(1) go func() { for i := 1; i <= 3; i++ { // Simulating delay. <-time.After(1 * time.Second) // Publish message to each EchoVault channel. _, _ = server.PUBLISH(fmt.Sprintf("channel_%d", i), "Hello!") } wg.Done() }() wg.Wait()

Conclusión

Este artículo fue un resumen rápido de la implementación de Pub/Sub en EchoVault. Por supuesto, hay mucho contexto en torno al código compartido aquí que no cabe en un solo artículo. Si está interesado en el contexto, consulte el repositorio GitHub de EchoVault y observe lo que estamos creando. Si te gusta lo que ves, ¡una estrella de Github sería enormemente apreciada!