It also provides very easy semantics to join multiple data sources based on their output and input types, at the same time having no coupling between the data sources. This helps in creating new APIs or resolvers for GraphQL APIs a breeze.
Many times we write web apps which connect to different data sources, combine the data obtained from these sources and then do some more jobs. During these processes, we do a lot of boilerplate to transform one data type to other. Also in the absence of a proper job scheduler, we create goroutines abruptly and without proper management. These create unmanageable code. To update those codes is even more hard in future, when there is a new team member in the team.
Rio tries to solve this problem by introducing two concepts.
This is the piece which runs the multiple jobs asynchronously. It has a priority queue (balancer.go and pool.go) which hands off incoming requests to a set of managed workers.
// The balancer struct, this struct is used inside the GetBalancer method to provide a load balancer to the caller
type Balancer struct {
// Its the pool of Worker, which is itself a priority queue based on min heap.
pool Pool
// This channel is used to receive a request instance form the caller. After getting the request it is dispatched
// to the most lightly loaded worker
jobChannel chan *Request
// This channel is used by the worker. After processing a task, a worker uses this channel to let the balancer know
// that it is done and able to take new requests from its request channel
done chan *Worker
// Its the number of queued requests
queuedItems int
// The close channel. When the Close method is called by any calling goroutine sending a chanel of boolean, the
// balancer waits for all the requests to be processed, then closes all the worker, closes all its owen loops and
// then finally respond by sending boolean true to the passed channel by the caller, confirming that all the inner
// loop are closed and the balancer is shutdown.
closeChannel chan chan bool
}
The balancer is implemented by a min heap priority queue and when assigning a new task it checks the least loaded worker.
To implement the min heap, we just need to implement 4 handy methods of the pool interface like this:
// The pool is a list of workers. The pool is also a priority queue.
type Pool []*Worker
func (p Pool) Len() int {
return len(p)
}
func (p Pool) Less(i, j int) bool {
return p[i].pending < p[j].pending
}
func (p *Pool) Swap(i, j int) {
(*p)[i], (*p)[j] = (*p)[j], (*p)[i]
}
func (p *Pool) Push(x interface{}) {
//n := len(*p)
item := x.(*Worker)
//item.index = n
*p = append(*p, item)
}
func (p *Pool) Pop() interface{} {
old := *p
n := len(old)
item := old[n-1]
//item.index = 0 // for safety
*p = old[0 : n-1]
return item
}
How many times do we do this:
call service 1 in goroutine 1 wait and get response from goroutine 1
call service 2 in goroutine 2, taking piece of data from service call 1 wait and get response from goroutine 2
call service 3 in goroutine 3, taking piece of data from service call 3 wait and get response from goroutine 3
You get the idea, this only delays things more and does a lot of context switching. Rio helps in this, by chaining multiple calls together by means of using closures and function types and runs in one goroutine.
Now many can think is it not going to be slower compared to doing multiple goroutine calls. Let’s see.
Think of the previous example. If you do not get a response from service 1, can you invoke service 2, or if service 2 fails, can you call service 3? No, as there is data dependency between these calls.
Rio chains dependent jobs together by introducing this pattern.
request := rio.BuildRequests(context,
(<callback of service 1>.WithTimeOut(100 ms).WithRetry(3))
.FollowedBy(<function for transforming data from service 1 response to request or partial request of 2>,
<callback of service 2>)
.FollowedBy(<function for transforming data data from service 2 response to request or partial request of 3>,
<callback of service 3>)
Let’s see an example
func SampleHandler(w http.ResponseWriter, r *http.Request) {
// Create the load balancer, this should be created only once.
balancer := rio.GetBalancer(10, 2) // 10 threads
// Setup the callbacks
callback1 := GetNameById("Some Name")
callback2 := GetStreetAddressByNameAndLocationId(rio.EMPTY_ARG_PLACEHOLDER, "Some Location ID")
// Set up the pipeline
request := rio.BuildRequests(context.Background(),
rio.NewFutureTask(callback1).WithMilliSecondTimeout(10).WithRetry(3), 2).
FollowedBy(Call1ToCall2, rio.NewFutureTask(callback2).WithMilliSecondTimeout(20))
// Post job
balancer.PostJob(request)
// Wait for response
<-request.CompletedChannel
// Responses
response1, err := request.GetResponse(0)
if err == nil {
// Do something with the response
fmt.Println(response1)
}
response2, err := request.GetResponse(1)
if err == nil {
// Do something with the response
fmt.Println(response2)
}
}
Once the chaining is done in line 10, we are posting the jobs like this
balancer.PostJob(request)
And finally waiting for the chain to complete
<-request.CompletedChannel
Once the call chain happens, the request comes back with responses for all these calls in a slice and we can do this:
Only one job response
request.GetOnlyResponse()
Multiple job responses
request.GetResponse(index) //---0,1,2
If any job fails, the response will be empty response, specifically rio.EMPTY_CALLBACK_RESPONSE
Watch out for the full example in the example folder if anyone wants to use it.
Thanks !!!
Also published on Medium’s subdomain: https://medium.com/geekculture/rio-a-lightweight-job-scheduler-in-go-with-batteries-included-fe1040d5a3c3