В течение прошлого года я создавал EchoVault , встраиваемую альтернативу Redis для экосистемы Golang. EchoVault стремится воспроизвести большинство функций Redis, предоставляя при этом как встроенный интерфейс, так и клиент-серверный интерфейс, совместимый с существующими клиентами Redis, использующими протокол RESP.
Одной из функций, реализованных в EchoVault, является функция Pub/Sub. Эта статья представляет собой краткое описание реализации модуля Pub/Sub на момент написания.
Pub/Sub означает публикацию/подписку. Этот шаблон позволяет потребителям подписываться на определенные каналы. Производители публикуют сообщения в каналах, и все потребители, подписанные на этот канал, получают сообщение.
В нашем контексте мы должны разрешить процессу Go подписаться на указанные каналы при встроенном EchoVault. При работе в режиме клиент-сервер мы должны разрешить TCP-соединению клиента подписываться на каналы.
Что касается публикации, процесс Go должен иметь возможность публиковать сообщение в канале во встроенном режиме, а клиентское соединение TCP должно иметь возможность публиковать сообщение в канале в режиме клиент-сервер.
Прежде чем мы начнем, нам необходимо определить требования и оценку этой реализации. EchoVault не реализует все функции, доступные в Redis Pub/Sub на момент написания. Однако наиболее важные основные функции реализованы. Вот что мы сможем сделать с EchoVault PubSub:
Разрешите TCP-клиенту подписаться на список каналов с помощью команды SUBSCRIBE channel [channel …]
. Сервер должен отправить клиентскому соединению подтверждение подписки.
Встроенный экземпляр EchoVault должен иметь возможность подписаться на список каналов.
Разрешите TCP-клиенту подписаться на шаблон, используя PSUBSCRIBE pattern [pattern …]
, где шаблон представляет собой glob-строку, которая позволяет клиенту получать сообщения, опубликованные во всех каналах, соответствующих шаблону.
Встроенный экземпляр EchoVault должен иметь возможность подписаться на список шаблонов.
Разрешите TCP-клиенту публиковать сообщение в канале с помощью команды PUBLISH channel message
.
Публикуйте в канале из встроенного экземпляра EchoVault.
Разрешите TCP-клиентам отписываться от каналов и шаблонов с помощью команд UNSUBSCRIBE channel [channel …]
и PUNSUBSCRIBE pattern [pattern …]
соответственно.
Разрешите TCP-клиентскому соединению использовать команду PUBSUB CHANNELS [pattern]
для просмотра массива, содержащего каналы, соответствующие заданному шаблону. Если шаблон не указан, возвращаются все активные каналы. Активные каналы — это каналы с одним или несколькими подписчиками.
Разрешите TCP-клиентам использовать команду PUBSUB NUMPAT
для просмотра количества шаблонов, на которые в данный момент подписаны клиенты.
Предоставьте встроенный API для просмотра количества активных шаблонов.
Разрешить TCP-клиентам использовать команду PUBSUB NUMSUB [channel [channel ...]]
для просмотра массива массивов, содержащих указанное имя канала и количество клиентов, которые в настоящее время подписаны на этот канал.
Предоставьте встроенный API для просмотра количества подписчиков в определенных каналах.
Теперь, когда у нас есть набор требований, давайте перейдем к реализации. На высоком уровне у нас будет структура PubSub, обеспечивающая все функциональные возможности PubSub. У нас также будет структура Channel, которая обеспечивает функциональность каналов и каналов шаблонов.
Для этого раздела мы будем использовать пакет tidwall/resp. Пользователи EchoVault используют этот пакет для отправки ответов RESP подписчикам канала. Пакет gobwas/glob обрабатывает логику шаблона glob.
Сначала мы создадим структуру Channel
и все ее методы. У нас будет два типа каналов: обычные каналы и каналы шаблонов.
Обычные каналы — это каналы с именами, без связанных с ними шаблонов. Каналы шаблонов будут использовать шаблон в качестве имени, а скомпилированный шаблон будет связан с каналом.
Каналы шаблонов используются для подписки на шаблоны. В противном случае используются обычные каналы.
Структура Channel
имеет следующую форму:
type Channel struct { name string // Channel name. This can be a glob pattern string. pattern glob.Glob // Compiled glob pattern. This is nil if the channel is not a pattern channel. subscribersRWMut sync.RWMutex // RWMutex to concurrency control when accessing channel subscribers. subscribers map[*net.Conn]*resp.Conn // Map containing the channel subscribers. messageChan *chan string // Messages published to this channel will be sent to this channel. }
Мы будем использовать шаблон опций для создания нового экземпляра канала. Вот два доступных варианта:
// WithName option sets the channels name. func WithName(name string) func(channel *Channel) { return func(channel *Channel) { channel.name = name } } // WithPattern option sets the compiled glob pattern for the channel if it's a pattern channel. func WithPattern(pattern string) func(channel *Channel) { return func(channel *Channel) { channel.name = pattern channel.pattern = glob.MustCompile(pattern) } } func NewChannel(options ...func(channel *Channel)) *Channel { messageChan := make(chan string, 4096) // messageChan is buffered. This could be a configurable value. channel := &Channel{ name: "", pattern: nil, subscribersRWMut: sync.RWMutex{}, subscribers: make(map[*net.Conn]*resp.Conn), messageChan: &messageChan, } for _, option := range options { option(channel) } return channel }
Первый метод структуры Channel — это метод Start
. Этот метод запускает горутину, которая прослушивает messageChan
на предмет сообщений, которые будут транслироваться всем его подписчикам. Вот реализация:
func (ch *Channel) Start() { go func() { for { message := <-*ch.messageChan ch.subscribersRWMut.RLock() for _, conn := range ch.subscribers { go func(conn *resp.Conn) { if err := conn.WriteArray([]resp.Value{ resp.StringValue("message"), resp.StringValue(ch.name), resp.StringValue(message), }); err != nil { log.Println(err) } }(conn) } ch.subscribersRWMut.RUnlock() } }() }
Горутина получает следующее сообщение от messageChan
, получает блокировку чтения для подписчиков, передает сообщение каждому подписчику, а затем снимает блокировку чтения. Как видите, пакет tidwall/resp позволяет нам легко отправлять значения RESP клиентскому соединению.
Большинство клиентов Redis ожидают, что формат сообщения pub/sub будет массивом следующей формы: [«сообщение», <имя канала>, <строка сообщения>]. Вот что транслирует EchoVault.
Вы могли заметить, что мы отправляем сообщение на resp.Conn
, а не net.Conn
. Мы используем resp.Conn
поскольку он предоставляет вспомогательные методы для записи в него значений RESP, например WriteArray
. По сути, это оболочка net.Conn
, как мы увидим в методе Subscribe
:
func (ch *Channel) Subscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() // Acquire write-lock because we'll be modifying the subscriber map. defer ch.subscribersRWMut.Unlock() // If the connection does not exist in the subscriber map, add it. if _, ok := ch.subscribers[conn]; !ok { ch.subscribers[conn] = resp.NewConn(*conn) } _, ok := ch.subscribers[conn] return ok }
Наряду с методом Subscribe нам также понадобится метод Unsubscribe:
func (ch *Channel) Unsubscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() defer ch.subscribersRWMut.Unlock() if _, ok := ch.subscribers[conn]; !ok { return false } delete(ch.subscribers, conn) return true }
Метод Unsubscribe возвращает true
, если соединение было найдено и удалено из карты подписчиков. В противном случае он возвращает false
. Помимо методов, описанных выше, для структуры Channel
существует еще несколько вспомогательных методов:
func (ch *Channel) Name() string { return ch.name } func (ch *Channel) Pattern() glob.Glob { return ch.pattern } func (ch *Channel) Publish(message string) { *ch.messageChan <- message } // IsActive returns true when the channel has 1 or more subscribers. func (ch *Channel) IsActive() bool { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() active := len(ch.subscribers) > 0 return active } // NumSubs returns the number of subscribers for this channel. func (ch *Channel) NumSubs() int { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() n := len(ch.subscribers) return n } // Subscribers returns a copy of the subscriber map. func (ch *Channel) Subscribers() map[*net.Conn]*resp.Conn { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() subscribers := make(map[*net.Conn]*resp.Conn, len(ch.subscribers)) for k, v := range ch.subscribers { subscribers[k] = v } return subscribers }
Мы будем использовать модуль PubSub для взаимодействия с каналами. Форма структуры PubSub следующая:
type PubSub struct { channels []*Channel // Slice of references to channels channelsRWMut sync.RWMutex // RWMutex for concurrency controls when accessing channels } func NewPubSub() *PubSub { return &PubSub{ channels: []*Channel{}, channelsRWMut: sync.RWMutex{}, } }
Первый метод — метод Subscribe
. Этот метод обрабатывает подписку клиента на указанные каналы.
func (ps *PubSub) Subscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) { ps.channelsRWMut.Lock() // Acquire write-lock as we may edit the slice of channels. defer ps.channelsRWMut.Unlock() r := resp.NewConn(*conn) // Wrap net.Conn connection with resp.Conn. action := "subscribe" if withPattern { action = "psubscribe" } // Loop through all the channels that the client has requested to subscribe to. for i := 0; i < len(channels); i++ { // Check if channel with given name exists // If it does, subscribe the connection to the channel // If it does not, create the channel and subscribe to it channelIdx := slices.IndexFunc(ps.channels, func(channel *Channel) bool { return channel.name == channels[i] }) if channelIdx == -1 { // If the channel does not exist, create new channel, start it, and subscribe to it. var newChan *Channel if withPattern { newChan = NewChannel(WithPattern(channels[i])) } else { newChan = NewChannel(WithName(channels[i])) } newChan.Start() if newChan.Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(newChan.name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } ps.channels = append(ps.channels, newChan) // Append the new channel to the list of channels. } else { // Subscribe to existing channel if ps.channels[channelIdx].Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(ps.channels[channelIdx].name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } } } }
Чтобы подписаться клиента на канал, EchoVault прослушивает команду SUBSCRIBE
или PSUBSCRIBE
. Когда команда получена от клиента, TCP-соединение клиента вместе со списком каналов передается методу Subscribe
. Когда клиент подписывается с помощью команды PSUBSCRIBE
, параметр withPatterns
будет иметь true
, а значение переменной action
будет установлено как «psubscribe».
Метод Unsubscribe
позволяет клиенту отказаться от подписки на каналы или шаблоны в зависимости от того, использовал ли он SUBSCRIBE
или PSUBSCRIBE
. Его реализация заключается в следующем:
func (ps *PubSub) Unsubscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() action := "unsubscribe" if withPattern { action = "punsubscribe" } unsubscribed := make(map[int]string) // A map of all the channels/patterns successfully unsubscribed from. idx := 1 // idx holds the 1-based index of the channel/pattern unsubscribed from. if len(channels) <= 0 { if !withPattern { // If the channels slice is empty, and no pattern is provided // unsubscribe from all channels. for _, channel := range ps.channels { if channel.pattern != nil { // Skip pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } else { // If the channels slice is empty, and pattern is provided // unsubscribe from all patterns. for _, channel := range ps.channels { if channel.pattern == nil { // Skip non-pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } // Unsubscribe from channels where the name exactly matches channel name. // If unsubscribing from a pattern, also unsubscribe from all channel whose // names exactly matches the pattern name. for _, channel := range ps.channels { // For each channel in PubSub for _, c := range channels { // For each channel name provided if channel.name == c && channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } // If withPattern is true, unsubscribe from channels where pattern matches pattern provided, // also unsubscribe from channels where the name matches the given pattern. if withPattern { for _, pattern := range channels { g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If it's a pattern channel, directly compare the patterns if channel.pattern != nil && channel.name == pattern { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } continue } // If this is a regular channel, check if the channel name matches the pattern given if g.Match(channel.name) { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } } // Construct a RESP response confirming the channels/patterns unsubscribed from. res := fmt.Sprintf("*%d\r\n", len(unsubscribed)) for key, value := range unsubscribed { res += fmt.Sprintf("*3\r\n+%s\r\n$%d\r\n%s\r\n:%d\r\n", action, len(value), value, key) } return []byte(res) }
Когда клиент отписывается от каналов/шаблонов, он получит ответ, содержащий массив массивов. Каждый внутренний массив содержит действие (отписаться/каламбур), имя канала/шаблона и индекс.
Следующий метод — метод публикации. Этот метод принимает сообщение и имя канала, а затем обрабатывает внутреннюю публикацию сообщения.
func (ps *PubSub) Publish(_ context.Context, message string, channelName string) { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() // Loop through all of the existing channels. for _, channel := range ps.channels { // If it's a regular channel, check if the channel name matches the name given. if channel.pattern == nil { if channel.name == channelName { channel.Publish(message) // Publish the message to the channel. } continue } // If it's a glob pattern channel, check if the provided channel name matches the pattern. if channel.pattern.Match(channelName) { channel.Publish(message) // Publish the message to the channel } } }
Следующий метод — это метод Channels
, который обрабатывает команду PUBSUB CHANNELS [pattern]
:
func (ps *PubSub) Channels(pattern string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int var res string // If pattern is an empty string, return all the active channels. if pattern == "" { for _, channel := range ps.channels { if channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } res = fmt.Sprintf("*%d\r\n%s", count, res) return []byte(res) } g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If channel is a pattern channel, then directly compare the channel name to pattern. if channel.pattern != nil && channel.name == pattern && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 continue } // Channel is not a pattern channel. Check if the channel name matches the provided glob pattern. if g.Match(channel.name) && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } // Return a RESP array containing all the active channel names. return []byte(fmt.Sprintf("*%d\r\n%s", count, res)) }
Следующие два метода — это NumPat
и NumSub
для обработки PUBSUB NUMPAT
и
Команды PUBSUB NUMSUB [channel [channel …]]
соответственно.
func (ps *PubSub) NumPat() int { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int for _, channel := range ps.channels { if channel.pattern != nil && channel.IsActive() { count += 1 } } return count } func (ps *PubSub) NumSub(channels []string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() res := fmt.Sprintf("*%d\r\n", len(channels)) for _, channel := range channels { // If it's a pattern channel, skip it chanIdx := slices.IndexFunc(ps.channels, func(c *Channel) bool { return c.name == channel }) if chanIdx == -1 { res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:0\r\n", len(channel), channel) continue } res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:%d\r\n", len(channel), channel, ps.channels[chanIdx].NumSubs()) } return []byte(res) }
Модуль PubSub ожидает передачи соединения для подписки клиента на канал. Однако когда EchoVault встроен, TCP-соединение с клиентом, которое можно использовать для подписки, отсутствует. Чтобы обойти эту проблему, мы используем net.Pipe
для получения обоих концов соединения.
Один конец соединения передается обработчику команд, который использует его для подписки на указанный канал. Другой конец соединения используется в возвращаемой функции ReadPubSubMessage
.
type conn struct { readConn *net.Conn writeConn *net.Conn } var connections map[string]conn // ReadPubSubMessage is returned by the SUBSCRIBE and PSUBSCRIBE functions. // // This function is lazy, therefore it needs to be invoked in order to read the next message. // When the message is read, the function returns a string slice with 3 elements. // Index 0 holds the event type which in this case will be "message". Index 1 holds the channel name. // Index 2 holds the actual message. type ReadPubSubMessage func() []string // SUBSCRIBE subscribes the caller to the list of provided channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) SUBSCRIBE(tag string, channels ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"SUBSCRIBE"}, channels...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // UNSUBSCRIBE unsubscribes the caller from the given channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to unsubscribe from. func (server *EchoVault) UNSUBSCRIBE(tag string, channels ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"UNSUBSCRIBE"}, channels...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) } // PSUBSCRIBE subscribes the caller to the list of provided glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) PSUBSCRIBE(tag string, patterns ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"PSUBSCRIBE"}, patterns...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // PUNSUBSCRIBE unsubscribes the caller from the given glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to unsubscribe from. func (server *EchoVault) PUNSUBSCRIBE(tag string, patterns ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"PUNSUBSCRIBE"}, patterns...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }
Вот пример использования API SUBSCRIBE и PUBLISH:
// Subscribe to multiple EchoVault channels. readMessage := server.SUBSCRIBE("subscriber1", "channel_1", "channel_2", "channel_3") wg.Add(1) go func() { wg.Done() for { message := readMessage() fmt.Printf("EVENT: %s, CHANNEL: %s, MESSAGE: %s\n", message[0], message[1], message[2]) } }() wg.Wait() wg.Add(1) go func() { for i := 1; i <= 3; i++ { // Simulating delay. <-time.After(1 * time.Second) // Publish message to each EchoVault channel. _, _ = server.PUBLISH(fmt.Sprintf("channel_%d", i), "Hello!") } wg.Done() }() wg.Wait()
В этой статье был представлен краткий обзор реализации Pub/Sub в EchoVault. Конечно, представленный здесь код окружен большим количеством контекста, который не может уместиться в одной статье. Если вас интересует контекст, посетите репозиторий EchoVault на GitHub и посмотрите, что мы создаем. Если вам нравится то, что вы видите, звезда Github будет очень признательна!