Original article available at https://habrahabr.ru/company/mailru/blog/325582/
Many of you may have already heard about the high performance of the Tarantool DBMS, about its rich toolset and certain features. Say, it has a really cool on-disk storage engine called Vinyl, and it knows how to work with JSON documents. However, most articles out there tend to overlook one crucial thing: usually, Tarantool is regarded simply as storage, whereas its killer feature is the possibility of writing code inside it, which makes working with your data extremely effective. If youâd like to know how igorcoding and I built a system almost entirely inside Tarantool, read on.
If youâve ever used the Mail.Ru email service, you probably know that it allows collecting emails from other accounts. If the OAuth protocol is supported, we donât need to ask a user for third-party service credentials to do thatâââwe can use OAuth tokens instead. Besides, Mail.Ru Group has lots of projects that require authorization via third-party services and need usersâ OAuth tokens to work with certain applications. Thatâs why we decided to build a service for storing and updating tokens.
I guess everybody knows what an OAuth token looks like. To refresh your memory, itâs a structure consisting of 3â4 fields:
{âtoken_typeâ : âbearerâ,âaccess_tokenâ : âXXXXXXâ,ârefresh_tokenâ : âYYYYYYâ,âexpires_inâ : 3600}
- access_tokenâââallows you to perform an action, obtain user data, download a userâs friend list and so on;
- refresh_tokenâââlets you get a new access_token unlimited number of times;
- expires_inâââtoken expiration timestamp or any other predefined date. If your access_token expires, you wonât be able to access the required resource.
Now letâs take a look at the approximate architecture of our service. Imagine there are some frontends that can write tokens to our service and read them from there. Thereâs also a separate entity called a refresher. Its main purpose is to obtain new access tokens from an OAuth provider once they expire.
The database is structured quite simply as well. There are two database nodes (master and slave) separated by a vertical dotted line in the image above, which is meant to signify theyâre situated in two data centers: one containing the master with its frontend and refresher, the other containing the slave with its frontend and refresher that accesses the master.
What difficulties we faced
The main problem had to do with the token lifetime (one hour). After taking a closer look at the project, one might think: â10 million records that need to be refreshed within an hourâââis it really a high-load service? If we divide one by the other, weâll get about 3,000 RPS.â However, the going gets tough once something stops being refreshed due to database maintenance or failure, or even server failure (anything can happen!). The thing is, if our service (that is master database) stays down for 15 minutes for some reason, it results in 25% outage (a quarter of our tokens becomes invalid and canât be used anymore). In case of a 30-minute downtime, half the data isnât refreshed. In an hour, there wonât be a single valid token. Imagine the database has been down for one hour, we got it up and running againâââand now all the 10 million tokens need to be updated really fast. Howâs that for a high-load service!
I must say that at first everything was working quite smoothly, but two years later we extended the logic, added extra indices and started implementing some auxiliary logic⌠long story short, Tarantoolâs run out of CPU resources. Although every resource is depletable, it did take us aback.
Luckily, system administrators helped us out by installing the most powerful CPU they had in stock, which allowed us to grow over the next 6 months. In the meantime, we had to come up with a solution to this problem. At that time, we learned about a new version of Tarantool (our system was written with Tarantool 1.5, which is hardly used outside Mail.Ru Group). Tarantool 1.6 boasted master-master replication, so this gave us the following idea: why not have a database copy in each of the three data centers connected with master-master replication? That sounded like a great plan.
Three masters, three data centers, three refreshers, each interacting with its own master. If one or even two masters are down, everything should still work, right? What pitfalls does this scheme have? The main issue is that weâre effectively tripling the number of requests to an OAuth provider: we refresh almost the same tokens as many times as there are replicas. That wonât do. The most straightforward workaround is to somehow make the nodes decide themselves who a leader is, meaning that tokens stored only on that node get refreshed.
Electing a leader
There exist numerous consensus-solving algorithms. One of them is called Paxos. Quite complicated stuff. We were unable to figure out how to make something simple out of it, so we decided to use Raft instead. Raft is a very easy-to-understand algorithm, in which a leader is elected based on whether itâs possible to communicate with it and remains a leader until a new one is elected due to connection failure or some other reason. Hereâs how we implemented it:
Tarantool doesnât have either Raft or Paxos out of the box. But we can take a built-in net.box module, which allows connecting nodes into a full mesh (that is each node is connected to all other nodes), and simply implement Raftâs leader election process over these connections. Each node will consequently be either a leader or a follower, or it wonât be seeing both the leader and the follower.
If you think implementing Raft is difficult, hereâs Lua code that does exactly that:
local r = self.pool.call(self.FUNC.request_vote,self.term, self.uuid)self._vote_count = self:count_votes(r)
if self._vote_count > self._nodes_count / 2 thenlog.info(â[raft-srv] node %d won electionsâ, self.id)self:_set_state(self.S.LEADER)self:_set_leader({ id=self.id, uuid=self.uuid })self._vote_count = 0self:stop_election_timer()self:start_heartbeater()elselog.info(â[raft-srv] node %d lost electionsâ, self.id)self:_set_state(self.S.IDLE)self:_set_leader(msgpack.NULL)self._vote_count = 0self:start_election_timer()end
Here weâre sending requests to remote servers (other Tarantool replicas) and counting the number of votes received from each node. If we have a quorum, weâre elected a leader and start sending heartbeatsââânotifications to other nodes that weâre still alive. If we lose the election, we initiate another one, and after some time weâll be able to vote or get elected.
Once we have a quorum and elect a leader, we can direct our refreshers to all the nodes but instruct them to work only with the leader.
This way we normalize the traffic: since tasks are given out by a single node, each refresher gets approximately a third. This setup allows us to lose any of the masters, which would result in another election and the refreshers switching to another node. However, as in any distributed system, there are certain issues associated with a quorum.
âAbandonedâ node
If connectivity between data centers is lost, itâs necessary to have some mechanism in place that can keep the whole system functioning properly, as well as a mechanism for restoring system integrity. Raft successfully does that.
Suppose the Dataline data center connection is down. Then the node located there becomes âabandonedâ, that is it canât see the other nodes. The remaining nodes in the cluster can see that the node is lost, which causes another election, and a new cluster nodeâââsay the upper oneâââis elected a leader. The system remains functional, since thereâs still a consensus between the nodes (that is more than half of them can still see each other).
The main question is what happens to the refresher associated with the disconnected data center. The Raft specification doesnât have a separate name for such a node. Normally, a node without a quorum and not communicating with a leader remains idle. However, it can establish a network connection and update tokens on its own. Tokens are usually updated in connectivity mode, but maybe itâs possible to update them with a refresher connected to an âabandonedâ node? At first we werenât sure it makes much sense to do it. Wouldnât it result in superfluous updates?
We needed to figure it out in the process of implementing our system. The first thought was not to make updates: we have a consensus, a quorum, and if we lose any member, updates shouldnât take place. But then we had another idea. Letâs take a look at master-master replication in Tarantool. Suppose there exist two master nodes and a variable (key) X with a value of 1. We simultaneously assign new values to this variable on each node: 2 on one, and 3 on the other. Then the nodes exchange their replication logs (that is values of the X variable). Consistency-wise, such an implementation of master-master replication is terrible (no offence to Tarantool developers).
If we need strict consistency, this wonât work. However, recall our OAuth token that consists of two important elements:
- refresh token, with essentially an unlimited lifetime;
- access token, which is valid for 1 hour.
Our refresher has a refresh function that can obtain any number of access tokens from a refresh token. Once issued, they will all remain valid for 1 hour.
Letâs consider the following scenario: two follower nodes are interacting with a leader; they refresh their tokens and receive the first access token. This access token gets replicated, so now everybody has it. But then the connection is lost, so one of the followers becomes an âabandonedâ node: it doesnât have a quorum and canât see both the leader and other followers. However, we allow our refresher to update the tokens sitting on the âabandonedâ node. If that node doesnât have a network connection, the whole scheme wonât work. In case of a simple network split, though, the refresher will be able to do its job.
Once the network split is over and the âabandonedâ node rejoins the cluster, either another election or data exchange takes place. Note that the second and the third tokens are equally âgood.â
After the original cluster membership is restored, the next update will occur only on one node, and itâll get replicated. In other words, when the cluster is split, its parts perform updates independently, but once itâs complete again, data consistency is restored. Normally, it takes N / 2 + 1 active nodes (for a 3-node cluster, this number is 2) to keep a cluster functional. In our case, though, even one active cluster is enough: itâll send as many external requests as necessary.
To reiterate, weâve discussed the issue of a growing number of requests. During a network split or node downtime, we can afford to have a single active node, which weâll be updating it as usual. In case of an absolute split (that is when a cluster is divided into the maximum number of nodes, each with a network connection), weâre tripling the number of requests to an OAuth provider, as mentioned above. But since this event is relatively short-lived, itâs not that bad: we donât expect to work in split mode all the time. Normally, the system is in a quorum and has connectivity, with all the nodes up and running.
Sharding
One issue still remains: weâve hit the CPU limit. An obvious solution is sharding.
Letâs say we have two database shards, each being replicated, and thereâs a function that, given some key, helps us figure out which shard has the required data. If we shard by email, addresses are stored partly on one shard and partly on the other, and we always know where our data is.
There are two approaches to sharding. The first is client sharding. We pick a consistent sharding function that returns a shard number, for example, CRC32, Guava, Sumbur. This function is implemented in the same way on all the clients. One clear advantage of this approach is that the database doesnât know anything about sharding: you have your database that works as usual, and then thereâs sharding somewhere around the corner.
However, thereâs a serious drawback to this approach as well. To begin with, clients are pretty thick. If you want to make a new one, you need to add the sharding logic to it. But the gravest issue here is that some clients may be using one schema, while others are using a totally different one. And the database itself isnât aware that sharding is performed differently.
We chose another approachâââintra-database sharding. In this case, the database code grows more complex, but we can use simple clients as a trade-off. Each client connecting to the database is routed to any node, where a special function calculates which node should be contacted and which one should be yielded control. As mentioned, clients become simpler at the expense of the increased database complexity, but the database is fully responsible for its data in this case. Besides, the most difficult thing out there is resharding, which is much easier to do when the database is responsible for its data, as compared to when you have a bunch of clients that you canât even update.
How did we implement it?
Hexagons represent Tarantool instances. Letâs take three nodes and call them shard 1, and another three-node cluster shard 2. If we connect all the nodes to each other, what does it give us? We have Raft in place, so for each cluster we know its status and which server is a leader or a follower. Given the inter-cluster connection, we also know the state of the other shard (for example, its leader and followers). Generally speaking, we always know where to direct a user accessing the first shard if the first shardâs not what they need.
Letâs consider a couple of simple examples.
Suppose a user requests a key residing on the first shard. Their request is received by one of the nodes in the first shard, and since this node knows who the leader is, it reroutes the request to the shard leader that, in its turn, obtains or writes the key and returns a response to the user.
Another scenario: a userâs request arrives to the same node in the first shard, but the requested key sits on the second shard. This situation is handled the same: the first shard knows who the leader is in the second shard, so the request is forwarded and processed there, and a response is returned to the user.
Itâs a pretty straightforward scheme, but it has its downsides. The greatest issue is the number of connections. In our two-shard case, where each node is connected to all the other nodes, there are 6 * 5 = 30 connections. Add one more three-node shardâââand this number soars to 72 connections. Isnât that too many?
Hereâs how we solved this problem: we just added a couple of Tarantool instancesâââbut we called them proxies instead of shards or databasesâââto handle all the sharding: they calculate keys and locate shard leaders. Raft clusters, on the other hand, remain self-contained and work only within a shard. When a user comes to a proxy, it calculates which shard they need, and if they need a leader, redirects the user accordingly; if not, the user is redirected to any node within this shard.
The resulting complexity is linear and depends on the number of nodes. Given three shards of three nodes each, the number of connections is several times smaller.
The proxy scheme was designed with further scaling (when the number of shards is greater than 2) in mind. With just two shards, the number of connections is the same, but as the number of shards grows, the decrease in the number of connections becomes significant. The list of shards is stored in a Lua configuration file, so to obtain a new list, we just need to reload the codeâââand everythingâs OK.
To sum up, we started with master-master replication, implemented Raft, then added sharding and proxies. And we got is a single block, a cluster, so our scheme now looks quite simple.
Whatâs left is our frontends that only write or read tokens. There are refreshers that update tokens, get the refresh token and pass it to an OAuth provider, and then write a new access token.
We mentioned that we have some auxiliary logic that depleted our CPU resources. Letâs move it to another cluster.
This auxiliary logic has mainly to do with address books. If there exists a userâs token, there exists this userâs corresponding address book that has the same amount of data as there are tokens. Not to run out of the CPU resources on one machine, we obviously need the same cluster with replication. We just added a bunch of refreshers that update address books (this is a rarer task, so address books arenât updated along with tokens)
As a result, by combining two such clusters we got this relatively simple overall structure:
Token refresh queue
Why did we need to implement our own queue when we couldâve used something standard? Itâs all about our token update model. Once issued, a token is valid for one hour. When the expiration date is near, the token needs to be updated, and it must be done before a certain point in time.
Suppose an outage occurs, but we have a bunch of expired tokens. While weâre updating them, some more tokens will expire. Sure enough, weâll catch up eventually, but wouldnât it be better to first update the ones about to expire (in 60 seconds) and then use the remaining resources to update the expired ones? The lowest priority is assigned to tokens with a longer remaining lifetime (4â5 minutes until expiration).
Implementing this logic with some third-party software wouldnât be easy. Tarantool makes it a breeze, though. Letâs take a look at a simple scheme: we have a tuple that stores data in Tarantool, and it has some ID with a primary key set on it. To make a queue that we need, weâre just adding two fields: status (state of a queued token) and time (expiration time or some other predefined one).
Now letâs consider two main functions of a queueâââput and take. What put does is writes new data. Weâre given some payload, and put sets the status and time itself and then writes the data, that is creates a new tuple.
As for take, it creates an index-based iterator and starts picking those tasks that await resolution (those with the ready status) and checking if itâs time to take them or if theyâve already expired. If there are no tasks, take switches to wait mode. Apart from the built-in Lua stuff, Tarantool has so-called channels, which are essentially inter-fiber synchronization primitives. Any fiber can create a channel and say, âIâm waiting over here.â Any other fiber can wake up this channel and send messages to it.
The function waiting for somethingâââfor tasks to be released, for appointed time or for something elseâââcreates a channel, labels it appropriately, puts it someplace and listens on it afterward. If weâre given a token that urgently needs to be updated, put will send a notification to this channel, and take will pick up the task.
Tarantool has one special feature: if a token gets accidentally released or is picked up by take for refresh or if somebody just takes a task, itâs possible to keep track of client disconnects. We associate each connection with tasks assigned to it and keep these mappings in the session stash. Suppose the refresh process fails due to a network split, and we donât know whether it will update the token and write it back to the database. So a disconnect event is triggered, searches the session stash for all the tasks associated with the failed process, and automatically releases them. After that any released task can use the same channel to send a message to another put that will quickly pick and process this task.
In fact, implementing this scheme doesnât require too much code:
function put(data)local t = box.space.queue:auto_increment({ârâ, -- [[ status ]]util.time(), -- [[ time ]]data -- [[ any payload ]]})
return tend
function take(timeout)local start_time = util.time()local q_ind = box.space.tokens.index.queuelocal _,t
while true dolocal it = util.iter(q_ind, {ârâ}, {iterator = box.index.GE})_,t = it()if t and t[F.tokens.status] ~= âtâ thenbreakend
local left = (start\_time + timeout) â util.time()
if left <= 0 then return end
t = q:wait(left)
if t then break end
endt = q:taken(t)return tend
function queue:taken(task)local sid = box.session.id()if self._consumers[sid] == nil thenself._consumers[sid] = {}endlocal k = task[self.f_id]local t = self:set_status(k, âtâ)
self._consumers[sid][k] = {util.time(), box.session.peer(sid), t}self._taken[k] = sidreturn tend
function on_disconnect()local sid = box.session.idlocal now = util.time()
if self._consumers[sid] thenlocal consumers = self._consumers[sid]for k, rec in pairs(consumers) dotime, peer, task = unpack(rec)
local v = box.space\[self.space\].index\[self.index\_primary\]:get({k})
if v and v\[self.f\_status\] == âtâ then
v = self:release(v\[self.f\_id\])
end
end
self.\_consumers\[sid\] = nil
endend
Put simply takes all the data a user wants to enqueue and writes it to a space, sets the status and the current time if itâs a simple indexed FIFO queue, and returns the task.
Things get a little bit more involved with take, but itâs still quite straightforward: weâre creating an iterator and waiting for new tasks to pick up. The taken function simply marks a task as taken, but more importantly, it also remembers what tasks are taken by what processes. The on_disconnect function allows releasing a certain connection or all the tasks taken by a certain user.
Are there any alternatives?
Of course, there are. We couldâve used any database. But, regardless of our choice, we wouldâve had to create a queue for working with an external system, with updates and so on. We canât simply update tokens on demand, as it would generate unpredictable workload. We need to keep our system alive anyway, but then we wouldâve had to enqueue postponed tasks and needed to ensure consistency between the database and the queue. We wouldâve been forced to implement a fault-tolerant queue with a quorum anyway. Besides, if we put our data both in RAM and in a queue that, given our workload, would likely need to be in-memory, weâd be consuming more resources.
In our case, the database stores tokens, and weâre paying only 7 bytes for the queue logicâââextra 7 bytes per tuple, and we have the queue logic! We wouldâve paid much more for any other queue implementation, up to double the memory capacity.
Wrapping it up
First we solved the issue of outage, which came up pretty often. Deploying the system described above rid us of this nuisance.
Sharding enabled us to scale horizontally. Then we lowered the number of connections from quadratic to linear and improved the queue logic for our business task: update all we still can update in case of delays. These delays arenât always our fault: Google, Microsoft or other services may make changes to their OAuth providers, which results in lots of unupdated tokens on our side.
Perform computations inside the database, close to dataâââitâs very convenient, efficient, scalable and flexible. Tarantoolâs really cool!
Thanks for reading this article.