গত বছর ধরে, আমি EchoVault তৈরি করছি, গোলং ইকোসিস্টেমের জন্য একটি এম্বেডযোগ্য রেডিস বিকল্প। EchoVault লক্ষ্য করে যে RESP প্রোটোকল ব্যবহার করে বিদ্যমান Redis ক্লায়েন্টদের সাথে সামঞ্জস্যপূর্ণ একটি এমবেডেড ইন্টারফেস এবং একটি ক্লায়েন্ট-সার্ভার ইন্টারফেস উভয়ই প্রদান করার সময় বেশিরভাগ Redis বৈশিষ্ট্যের প্রতিলিপি করা।
ইকোভল্টে বাস্তবায়িত বৈশিষ্ট্যগুলির মধ্যে একটি হল পাব/সাব বৈশিষ্ট্য। এই নিবন্ধটি লেখার সময় পাব/সাব মডিউলটি কীভাবে প্রয়োগ করা হয়েছে তার একটি সংক্ষিপ্ত ওয়াকথ্রু।
পাব/সাব মানে প্রকাশ/সাবস্ক্রাইব। এই প্যাটার্নটি গ্রাহকদের নির্দিষ্ট চ্যানেলে সদস্যতা নিতে দেয়। প্রযোজকরা চ্যানেলগুলিতে বার্তা প্রকাশ করে এবং সেই চ্যানেলে সদস্যতা নেওয়া সমস্ত গ্রাহকরা বার্তাটি গ্রহণ করে।
আমাদের প্রেক্ষাপটে, EchoVault এম্বেড করা হলে নির্দিষ্ট চ্যানেলগুলিতে সদস্যতা নেওয়ার জন্য আমাদের অবশ্যই একটি Go প্রক্রিয়া সক্রিয় করতে হবে। ক্লায়েন্ট-সার্ভার মোডে চলার সময়, আমাদের চ্যানেলে সাবস্ক্রাইব করার জন্য একটি ক্লায়েন্ট TCP সংযোগের অনুমতি দিতে হবে।
প্রকাশনার দিকে, একটি Go প্রক্রিয়া এমবেডেড মোডে একটি চ্যানেলে প্রকাশ করতে সক্ষম হওয়া উচিত এবং একটি TCP ক্লায়েন্ট সংযোগ ক্লায়েন্ট-সার্ভার মোডে একটি চ্যানেলে একটি বার্তা প্রকাশ করতে সক্ষম হওয়া উচিত।
আমরা শুরু করার আগে, আমাদের এই বাস্তবায়নের প্রয়োজনীয়তা এবং স্কোর নির্ধারণ করতে হবে। EchoVault লেখার সময় Redis Pub/Sub-এ উপলব্ধ সমস্ত বৈশিষ্ট্য বাস্তবায়ন করে না। যাইহোক, সবচেয়ে গুরুত্বপূর্ণ মূল বৈশিষ্ট্য বাস্তবায়ন করা হয়. EchoVault PubSub-এর সাথে আমাদের কী করা উচিত তা এখানে:
SUBSCRIBE channel [channel …]
কমান্ডটি ব্যবহার করে একটি TCP ক্লায়েন্টকে চ্যানেলের একটি তালিকায় সদস্যতা নেওয়ার অনুমতি দিন। সার্ভার ক্লায়েন্ট সংযোগ সাবস্ক্রিপশন একটি নিশ্চিতকরণ পাঠাতে হবে.
একটি এমবেডেড EchoVault উদাহরণ চ্যানেলগুলির একটি তালিকায় সদস্যতা নিতে সক্ষম হওয়া উচিত।
একটি TCP ক্লায়েন্টকে PSUBSCRIBE pattern [pattern …]
ব্যবহার করে একটি প্যাটার্নে সাবস্ক্রাইব করার অনুমতি দিন যেখানে প্যাটার্ন হল একটি গ্লোব স্ট্রিং যা ক্লায়েন্টকে প্যাটার্নটি সন্তুষ্ট করে এমন সমস্ত চ্যানেলে প্রকাশিত বার্তাগুলি গ্রহণ করতে দেয়।
একটি এমবেডেড EchoVault উদাহরণ নিদর্শনগুলির একটি তালিকায় সদস্যতা নিতে সক্ষম হওয়া উচিত।
একটি টিসিপি ক্লায়েন্টকে PUBLISH channel message
প্রকাশ করুন কমান্ডটি ব্যবহার করে একটি চ্যানেলে একটি বার্তা প্রকাশ করার অনুমতি দিন।
একটি এম্বেড করা EchoVault উদাহরণ থেকে একটি চ্যানেলে প্রকাশ করুন৷
TCP ক্লায়েন্টদের যথাক্রমে UNSUBSCRIBE channel [channel …]
এবং PUNSUBSCRIBE pattern [pattern …]
কমান্ড ব্যবহার করে চ্যানেল এবং প্যাটার্ন থেকে সদস্যতা ত্যাগ করার অনুমতি দিন।
প্রদত্ত প্যাটার্নের সাথে মেলে এমন চ্যানেল সমন্বিত একটি অ্যারে দেখার জন্য TCP ক্লায়েন্ট সংযোগকে PUBSUB CHANNELS [pattern]
কমান্ড ব্যবহার করার অনুমতি দিন। কোনো প্যাটার্ন প্রদান করা না হলে, সমস্ত সক্রিয় চ্যানেল ফেরত দেওয়া হয়। সক্রিয় চ্যানেল হল এক বা একাধিক গ্রাহকের চ্যানেল।
TCP ক্লায়েন্টদের PUBSUB NUMPAT
কমান্ড ব্যবহার করার অনুমতি দিন যাতে ক্লায়েন্টদের দ্বারা বর্তমানে সাবস্ক্রাইব করা প্যাটার্নের সংখ্যা দেখা যায়।
সক্রিয় প্যাটার্নের সংখ্যা দেখতে একটি এমবেডেড API প্রদান করুন।
TCP ক্লায়েন্টদের PUBSUB NUMSUB [channel [channel ...]]
কমান্ড ব্যবহার করার অনুমতি দিন প্রদত্ত চ্যানেলের নাম সম্বলিত অ্যারে এবং বর্তমানে কতজন ক্লায়েন্ট চ্যানেলটিতে সদস্যতা নিয়েছেন তা দেখতে।
প্রদত্ত চ্যানেলগুলিতে গ্রাহকের সংখ্যা দেখতে একটি এমবেডেড API প্রদান করুন৷
এখন যেহেতু আমরা প্রয়োজনীয়তা সেট করেছি, আসুন বাস্তবায়নে আসা যাক। একটি উচ্চ স্তরে, আমাদের একটি PubSub স্ট্রাকট থাকবে যা PubSub কার্যকারিতা প্রদান করে। আমাদের একটি চ্যানেল স্ট্রাকটও থাকবে যা চ্যানেল এবং প্যাটার্ন চ্যানেলগুলির কার্যকারিতা প্রদান করে।
এই বিভাগের জন্য, আমরা tidwall/resp প্যাকেজ ব্যবহার করব। EchoVault ব্যবহারকারীরা চ্যানেল গ্রাহকদের RESP প্রতিক্রিয়া পাঠাতে এই প্যাকেজটি ব্যবহার করে। গবওয়াস/গ্লোব প্যাকেজ গ্লোব প্যাটার্ন লজিক পরিচালনা করে।
প্রথমে, আমরা Channel
স্ট্রাকট এবং এর সমস্ত পদ্ধতি তৈরি করব। আমাদের দুটি ধরণের চ্যানেল থাকবে: নিয়মিত চ্যানেল এবং প্যাটার্ন চ্যানেল।
নিয়মিত চ্যানেলগুলি নাম সহ চ্যানেল এবং তাদের সাথে সম্পর্কিত কোন নিদর্শন নেই৷ প্যাটার্ন চ্যানেলগুলি প্যাটার্নটিকে নাম হিসাবে ব্যবহার করবে এবং একটি সংকলিত গ্লোব প্যাটার্ন চ্যানেলের সাথে যুক্ত হবে।
প্যাটার্ন চ্যানেলগুলি প্যাটার্নের সদস্যতার জন্য ব্যবহৃত হয়। অন্যথায়, নিয়মিত চ্যানেল ব্যবহার করা হয়।
Channel
কাঠামোর নিম্নলিখিত আকার রয়েছে:
type Channel struct { name string // Channel name. This can be a glob pattern string. pattern glob.Glob // Compiled glob pattern. This is nil if the channel is not a pattern channel. subscribersRWMut sync.RWMutex // RWMutex to concurrency control when accessing channel subscribers. subscribers map[*net.Conn]*resp.Conn // Map containing the channel subscribers. messageChan *chan string // Messages published to this channel will be sent to this channel. }
আমরা একটি নতুন চ্যানেল উদাহরণ তৈরি করতে বিকল্প প্যাটার্ন ব্যবহার করব। এখানে দুটি বিকল্প উপলব্ধ:
// WithName option sets the channels name. func WithName(name string) func(channel *Channel) { return func(channel *Channel) { channel.name = name } } // WithPattern option sets the compiled glob pattern for the channel if it's a pattern channel. func WithPattern(pattern string) func(channel *Channel) { return func(channel *Channel) { channel.name = pattern channel.pattern = glob.MustCompile(pattern) } } func NewChannel(options ...func(channel *Channel)) *Channel { messageChan := make(chan string, 4096) // messageChan is buffered. This could be a configurable value. channel := &Channel{ name: "", pattern: nil, subscribersRWMut: sync.RWMutex{}, subscribers: make(map[*net.Conn]*resp.Conn), messageChan: &messageChan, } for _, option := range options { option(channel) } return channel }
Channel struct-এর প্রথম পদ্ধতি হল Start
পদ্ধতি। এই পদ্ধতিটি একটি গোরুটিন শুরু করে যা messageChan
এ তার সমস্ত গ্রাহকদের কাছে বার্তা সম্প্রচারের জন্য শোনে। এখানে বাস্তবায়ন হল:
func (ch *Channel) Start() { go func() { for { message := <-*ch.messageChan ch.subscribersRWMut.RLock() for _, conn := range ch.subscribers { go func(conn *resp.Conn) { if err := conn.WriteArray([]resp.Value{ resp.StringValue("message"), resp.StringValue(ch.name), resp.StringValue(message), }); err != nil { log.Println(err) } }(conn) } ch.subscribersRWMut.RUnlock() } }() }
গোরুটিন messageChan
থেকে পরবর্তী বার্তাটি তুলে নেয়, গ্রাহকদের জন্য একটি রিড-লক অর্জন করে, প্রতিটি গ্রাহকের কাছে বার্তাটি সম্প্রচার করে এবং তারপরে রিড-লক রিলিজ করে। আপনি দেখতে পাচ্ছেন, tidwall/resp প্যাকেজ আমাদের সহজেই ক্লায়েন্ট সংযোগে RESP মান পাঠাতে দেয়।
বেশিরভাগ রেডিস ক্লায়েন্ট আশা করে যে পাব/সাব মেসেজ ফরম্যাট নিম্নলিখিত আকারে একটি অ্যারে হবে: [“বার্তা”, <চ্যানেলের নাম>, <বার্তা স্ট্রিং>]। এটি ইকোভল্ট দ্বারা সম্প্রচারিত হয়।
আপনি হয়তো লক্ষ্য করবেন যে আমরা বার্তাটি resp.Conn
এ পাঠাচ্ছি এবং net.Conn
নয়। আমরা resp.Conn
ব্যবহার করছি কারণ এটি এতে RESP মান লেখার জন্য সহায়ক পদ্ধতি প্রদান করে, যেমন WriteArray
। এটি মূলত net.Conn
চারপাশে একটি মোড়ক যা আমরা Subscribe
পদ্ধতিতে দেখব:
func (ch *Channel) Subscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() // Acquire write-lock because we'll be modifying the subscriber map. defer ch.subscribersRWMut.Unlock() // If the connection does not exist in the subscriber map, add it. if _, ok := ch.subscribers[conn]; !ok { ch.subscribers[conn] = resp.NewConn(*conn) } _, ok := ch.subscribers[conn] return ok }
সাবস্ক্রাইব পদ্ধতির পাশাপাশি, আমাদের একটি আনসাবস্ক্রাইব পদ্ধতিও প্রয়োজন:
func (ch *Channel) Unsubscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() defer ch.subscribersRWMut.Unlock() if _, ok := ch.subscribers[conn]; !ok { return false } delete(ch.subscribers, conn) return true }
সংযোগটি পাওয়া গেলে এবং গ্রাহকদের মানচিত্র থেকে মুছে ফেলা হলে সদস্যতা ত্যাগ করার পদ্ধতিটি true
হয়৷ অন্যথায়, এটি false
ফেরত দেয়। উপরের পদ্ধতিগুলি ছাড়াও, Channel
কাঠামোর জন্য আরও কিছু সহায়ক পদ্ধতি রয়েছে:
func (ch *Channel) Name() string { return ch.name } func (ch *Channel) Pattern() glob.Glob { return ch.pattern } func (ch *Channel) Publish(message string) { *ch.messageChan <- message } // IsActive returns true when the channel has 1 or more subscribers. func (ch *Channel) IsActive() bool { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() active := len(ch.subscribers) > 0 return active } // NumSubs returns the number of subscribers for this channel. func (ch *Channel) NumSubs() int { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() n := len(ch.subscribers) return n } // Subscribers returns a copy of the subscriber map. func (ch *Channel) Subscribers() map[*net.Conn]*resp.Conn { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() subscribers := make(map[*net.Conn]*resp.Conn, len(ch.subscribers)) for k, v := range ch.subscribers { subscribers[k] = v } return subscribers }
চ্যানেলগুলির সাথে ইন্টারঅ্যাক্ট করতে আমরা PubSub মডিউল ব্যবহার করব। PubSub কাঠামোর আকৃতিটি নিম্নরূপ:
type PubSub struct { channels []*Channel // Slice of references to channels channelsRWMut sync.RWMutex // RWMutex for concurrency controls when accessing channels } func NewPubSub() *PubSub { return &PubSub{ channels: []*Channel{}, channelsRWMut: sync.RWMutex{}, } }
প্রথম পদ্ধতি হল Subscribe
পদ্ধতি। এই পদ্ধতিটি ক্লায়েন্টকে নির্দিষ্ট চ্যানেলে সাবস্ক্রাইব করা পরিচালনা করে।
func (ps *PubSub) Subscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) { ps.channelsRWMut.Lock() // Acquire write-lock as we may edit the slice of channels. defer ps.channelsRWMut.Unlock() r := resp.NewConn(*conn) // Wrap net.Conn connection with resp.Conn. action := "subscribe" if withPattern { action = "psubscribe" } // Loop through all the channels that the client has requested to subscribe to. for i := 0; i < len(channels); i++ { // Check if channel with given name exists // If it does, subscribe the connection to the channel // If it does not, create the channel and subscribe to it channelIdx := slices.IndexFunc(ps.channels, func(channel *Channel) bool { return channel.name == channels[i] }) if channelIdx == -1 { // If the channel does not exist, create new channel, start it, and subscribe to it. var newChan *Channel if withPattern { newChan = NewChannel(WithPattern(channels[i])) } else { newChan = NewChannel(WithName(channels[i])) } newChan.Start() if newChan.Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(newChan.name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } ps.channels = append(ps.channels, newChan) // Append the new channel to the list of channels. } else { // Subscribe to existing channel if ps.channels[channelIdx].Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(ps.channels[channelIdx].name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } } } }
একটি চ্যানেলে ক্লায়েন্ট সাবস্ক্রাইব করতে, EchoVault একটি SUBSCRIBE
বা PSUBSCRIBE
কমান্ড শোনে। ক্লায়েন্টের কাছ থেকে কমান্ড প্রাপ্ত হলে, চ্যানেলের তালিকা সহ ক্লায়েন্টের TCP সংযোগ Subscribe
পদ্ধতিতে প্রেরণ করা হয়। যখন একটি ক্লায়েন্ট PSUBSCRIBE
কমান্ড ব্যবহার করে সদস্যতা নেয়, তখন withPatterns
প্যারামিটারটি true
হবে এবং action
ভেরিয়েবলের মান "সাবস্ক্রাইব" এ সেট করা হবে।
Unsubscribe
পদ্ধতি একটি ক্লায়েন্টকে চ্যানেল বা প্যাটার্ন থেকে সদস্যতা ত্যাগ করতে দেয় যে তারা SUBSCRIBE
বা PSUBSCRIBE
ব্যবহার করেছে তার উপর ভিত্তি করে। এর বাস্তবায়ন নিম্নরূপ:
func (ps *PubSub) Unsubscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() action := "unsubscribe" if withPattern { action = "punsubscribe" } unsubscribed := make(map[int]string) // A map of all the channels/patterns successfully unsubscribed from. idx := 1 // idx holds the 1-based index of the channel/pattern unsubscribed from. if len(channels) <= 0 { if !withPattern { // If the channels slice is empty, and no pattern is provided // unsubscribe from all channels. for _, channel := range ps.channels { if channel.pattern != nil { // Skip pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } else { // If the channels slice is empty, and pattern is provided // unsubscribe from all patterns. for _, channel := range ps.channels { if channel.pattern == nil { // Skip non-pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } // Unsubscribe from channels where the name exactly matches channel name. // If unsubscribing from a pattern, also unsubscribe from all channel whose // names exactly matches the pattern name. for _, channel := range ps.channels { // For each channel in PubSub for _, c := range channels { // For each channel name provided if channel.name == c && channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } // If withPattern is true, unsubscribe from channels where pattern matches pattern provided, // also unsubscribe from channels where the name matches the given pattern. if withPattern { for _, pattern := range channels { g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If it's a pattern channel, directly compare the patterns if channel.pattern != nil && channel.name == pattern { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } continue } // If this is a regular channel, check if the channel name matches the pattern given if g.Match(channel.name) { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } } // Construct a RESP response confirming the channels/patterns unsubscribed from. res := fmt.Sprintf("*%d\r\n", len(unsubscribed)) for key, value := range unsubscribed { res += fmt.Sprintf("*3\r\n+%s\r\n$%d\r\n%s\r\n:%d\r\n", action, len(value), value, key) } return []byte(res) }
যখন ক্লায়েন্ট চ্যানেল/প্যাটার্নগুলি থেকে সদস্যতা ত্যাগ করে, তখন তারা অ্যারের অ্যারে সহ একটি প্রতিক্রিয়া পাবে। প্রতিটি অভ্যন্তরীণ অ্যারেতে অ্যাকশন (আনসাবস্ক্রাইব/পানসাবস্ক্রাইব), চ্যানেল/প্যাটার্নের নাম এবং সূচী থাকে।
পরবর্তী পদ্ধতি হল প্রকাশ পদ্ধতি। এই পদ্ধতিটি বার্তা এবং চ্যানেলের নাম গ্রহণ করে এবং তারপর অভ্যন্তরীণভাবে বার্তার প্রকাশনা পরিচালনা করে।
func (ps *PubSub) Publish(_ context.Context, message string, channelName string) { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() // Loop through all of the existing channels. for _, channel := range ps.channels { // If it's a regular channel, check if the channel name matches the name given. if channel.pattern == nil { if channel.name == channelName { channel.Publish(message) // Publish the message to the channel. } continue } // If it's a glob pattern channel, check if the provided channel name matches the pattern. if channel.pattern.Match(channelName) { channel.Publish(message) // Publish the message to the channel } } }
পরবর্তী পদ্ধতি হল Channels
পদ্ধতি, যা PUBSUB CHANNELS [pattern]
কমান্ড পরিচালনা করে:
func (ps *PubSub) Channels(pattern string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int var res string // If pattern is an empty string, return all the active channels. if pattern == "" { for _, channel := range ps.channels { if channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } res = fmt.Sprintf("*%d\r\n%s", count, res) return []byte(res) } g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If channel is a pattern channel, then directly compare the channel name to pattern. if channel.pattern != nil && channel.name == pattern && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 continue } // Channel is not a pattern channel. Check if the channel name matches the provided glob pattern. if g.Match(channel.name) && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } // Return a RESP array containing all the active channel names. return []byte(fmt.Sprintf("*%d\r\n%s", count, res)) }
নিম্নলিখিত 2টি পদ্ধতি হল PUBSUB NUMPAT
পরিচালনা করার জন্য NumPat
এবং NumSub
পদ্ধতি এবং
যথাক্রমে PUBSUB NUMSUB [channel [channel …]]
কমান্ড।
func (ps *PubSub) NumPat() int { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int for _, channel := range ps.channels { if channel.pattern != nil && channel.IsActive() { count += 1 } } return count } func (ps *PubSub) NumSub(channels []string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() res := fmt.Sprintf("*%d\r\n", len(channels)) for _, channel := range channels { // If it's a pattern channel, skip it chanIdx := slices.IndexFunc(ps.channels, func(c *Channel) bool { return c.name == channel }) if chanIdx == -1 { res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:0\r\n", len(channel), channel) continue } res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:%d\r\n", len(channel), channel, ps.channels[chanIdx].NumSubs()) } return []byte(res) }
PubSub মডিউল একটি চ্যানেলে একটি ক্লায়েন্ট সাবস্ক্রাইব করার জন্য একটি সংযোগ পাস করার আশা করে। যাইহোক, যখন EchoVault এম্বেড করা হয়, তখন কোনও ক্লায়েন্টের সাথে কোনও TCP সংযোগ নেই যা সাবস্ক্রিপশনের জন্য ব্যবহার করা যেতে পারে। এটির কাছাকাছি যেতে, আমরা সংযোগের উভয় প্রান্ত পেতে net.Pipe
ব্যবহার করি।
সংযোগের এক প্রান্ত কমান্ড হ্যান্ডলারে প্রেরণ করা হয়, যা নির্দিষ্ট চ্যানেলে সদস্যতা নিতে এটি ব্যবহার করে। সংযোগের অন্য প্রান্তটি ফিরে আসা ReadPubSubMessage
ফাংশনে ব্যবহৃত হয়।
type conn struct { readConn *net.Conn writeConn *net.Conn } var connections map[string]conn // ReadPubSubMessage is returned by the SUBSCRIBE and PSUBSCRIBE functions. // // This function is lazy, therefore it needs to be invoked in order to read the next message. // When the message is read, the function returns a string slice with 3 elements. // Index 0 holds the event type which in this case will be "message". Index 1 holds the channel name. // Index 2 holds the actual message. type ReadPubSubMessage func() []string // SUBSCRIBE subscribes the caller to the list of provided channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) SUBSCRIBE(tag string, channels ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"SUBSCRIBE"}, channels...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // UNSUBSCRIBE unsubscribes the caller from the given channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to unsubscribe from. func (server *EchoVault) UNSUBSCRIBE(tag string, channels ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"UNSUBSCRIBE"}, channels...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) } // PSUBSCRIBE subscribes the caller to the list of provided glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) PSUBSCRIBE(tag string, patterns ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"PSUBSCRIBE"}, patterns...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // PUNSUBSCRIBE unsubscribes the caller from the given glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to unsubscribe from. func (server *EchoVault) PUNSUBSCRIBE(tag string, patterns ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"PUNSUBSCRIBE"}, patterns...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }
SUBSCRIBE এবং PUBLISH API কীভাবে ব্যবহার করবেন তার একটি উদাহরণ এখানে দেওয়া হল:
// Subscribe to multiple EchoVault channels. readMessage := server.SUBSCRIBE("subscriber1", "channel_1", "channel_2", "channel_3") wg.Add(1) go func() { wg.Done() for { message := readMessage() fmt.Printf("EVENT: %s, CHANNEL: %s, MESSAGE: %s\n", message[0], message[1], message[2]) } }() wg.Wait() wg.Add(1) go func() { for i := 1; i <= 3; i++ { // Simulating delay. <-time.After(1 * time.Second) // Publish message to each EchoVault channel. _, _ = server.PUBLISH(fmt.Sprintf("channel_%d", i), "Hello!") } wg.Done() }() wg.Wait()
এই নিবন্ধটি EchoVault-এ পাব/সাব বাস্তবায়নের উপর একটি দ্রুত রানডাউন ছিল। অবশ্যই, এখানে শেয়ার করা কোডটিকে ঘিরে অনেক প্রসঙ্গ রয়েছে যা একটি নিবন্ধে মাপসই করা যায় না। আপনি যদি প্রসঙ্গে আগ্রহী হন, EchoVault এর GitHub সংগ্রহস্থলটি দেখুন এবং আমরা কী তৈরি করছি তা একবার দেখুন। আপনি যা দেখেন তা পছন্দ করলে, একটি গিথুব তারকা ব্যাপকভাবে প্রশংসা করা হবে!