logo资料库

对于Raft的四个修改优化论文.pdf

第1页 / 共14页
第2页 / 共14页
第3页 / 共14页
第4页 / 共14页
第5页 / 共14页
第6页 / 共14页
第7页 / 共14页
第8页 / 共14页
资料共14页,剩余部分请下载后查看
  Four modifications for the Raft consensus algorithm   Henrik Ingo  henrik.ingo@openlife.cc  September 2015    1. Background In my work at MongoDB, I've been involved in a team that is adapting our replication protocol to  conform to the principles set out in the academic paper that describes the Raft algorithm​(Ongaro,  2014)​. Breaking with the academia's long standing obsession with Paxos, of which I'm yet to hear  about a robust real world and open source implementation, Raft describes a simple, easy to  understand leader based consensus algorithm. (In the vocabulary used outside of academia,  Raft describes a single­master, synchronous replication protocol using majority elections.)   Hence, while my original interest in Raft is purely work related, clearly it is meaningful to  consider Raft itself as a valuable contribution to the brittle art of making databases highly  available. As such, this paper is written purely within the context of Raft itself, without any  reference to the replication implementation in MongoDB, and ignoring the multitude of  differences that a MongoDB implementation will have compared to Raft. (One of which is  MongoDB being pull based, while Raft is a push based algorithm.)  This is the second version of this paper and supersedes its predecessor from a month ago,  which was called "Three modifications for Raft consensus". This version adds an explicit cluster  initialization step, hence making it four modifications. Adding the cluster initialization as an  explicit part of the algorithm, makes the description of the database id more straightforward. As  part of simplifying the creation of the database id, this paper no longer proposes the option to  automatically delete old data from a server ­ this was seen as a an unsafe operation by several  reviewers of the first version and therefore became cause for much unnecessary confusion . For  the benefit of those who already read the previous version, the new and changed parts of this  paper are colored dark red.  1 I would like to particularly thank Oren Eini for his thorough review of the first version of this  paper and for proposing to codify the cluster initialization to a more explicit part of the algorithm.  1 ​https://groups.google.com/forum/#!topic/raft­dev/2OGYyUmjRFY   © 2015 by MongoDB Inc. © 2014 by Diego Andres Ongaro. (RPC call tables)  This work is licensed under a Creative Commons Attribution­3.0 License.  http://creativecommons.org/licenses/by/3.0/  1 
2. Introduction The major contribution of the Raft authors has clearly been to finally introduce an easy to  understand, easy to implement protocol for distributed consensus. While more complicated  protocols may offer additional value in terms of feature set ­ such as supporting a multi­master  scheme ­ it is a valuable resource for the community of distributed databases to at least have a  robust alternative for the most basic kind of replication, leader based consensus. Somewhat  surprisingly, the level of elegance achieved by Raft has not been clearly documented  previously, let alone provided with proofs of correctness. The closest, and compared to Paxos  rather useful, cousin to Raft would be the family of ViewStamped replication algorithms​(Oki & Liskov,  1988)​, however Raft significantly simplifies compared to ViewStamped replication.  It is common for academic papers to focus mostly on the core replication of events themselves,  and the closely related mechanism of electing a leader or determining a quorum. On the other  hand they commonly neglect important surrounding topics, such as cluster maintenance  activities or "environmental corner cases", that in the real world are equally important  ingredients in creating a complete solution for a highly available database.  Also the Raft algorithm has evolved in similar fashion. This is evident when following the  succession of published papers from the original Usenix 2014 paper​(Ongaro & Ousterhout, Usenix 2014)​ to  the extended version of the same paper​(Ongaro & Ousterhout, 2014)​ to the thesis by Diego Ongaro  published later in 2014​(Ongaro, 2014)​.   For example, the original Usenix paper did not include any mention of snapshotting the state  machine, rather simply describes an indefinitely growing log of events ­ clearly not realistic for  most real world systems that might implement Raft, including the authors' own Ramcloud  database. The extended version then added a discussion on snapshotting, including an  InstallSnapshot RPC for sending the snapshot to followers when needed.   Similarly the original Usenix paper does include a discussion on cluster membership changes,  but it is obvious even to the casual reader that this part of the paper did not receive the same  amount of thought that went into the core of the algorithm, and certainly does not achieve the  simplicity goal the authors had set themselves. Ultimately the cluster membership change  protocol ends up in the curious state where members of the cluster are receiving (and  accepting!) events from a leader that's not even part of the current cluster. The Ongaro thesis  then replaces that complexity with 2 very simple RPCs to add and remove servers one at a time.  And as is often the case, the simpler algorithm also turns out to be more robust than the  complex one!  In the same spirit of evolving from a core protocol to a more complete and realistic  implementation, the goal of this paper is to introduce 4 modifications to Raft, that are relevant to  real­world distributed databases:  2 
1. Cluster initialization: A cluster initialization step that is the starting point of the lifecycle of  the cluster. Having this step explicitly described makes it more straightforward to  describe also the database id, and their relation to AddServer RPC.  2. Universally Unique Database Identifier: to identify whether a snapshot or a log on one  server is in fact some (predecessor or successor) state of the same state machine on  other servers of the cluster, or whether it's a snapshot of some completely different state  machine that has nothing to do with this cluster.  3. Pre­vote algorithm: this paper provides a more detailed specification of the idea  suggested only in passing in §9.6 in (Ongaro, 2014)  4. Leader stickiness: Building on the pre­vote algorithm, we also modify Raft to reject  servers that attempt to elect themselves as leader, if the current leader appears to still  be healthy to the rest of the cluster. This is to avoid flip­flopping between two competing  leaders.  The proposed modifications in this paper are written against the most recent publication of Raft,  Diego Ongaro's thesis paper​(Ongaro, 2014)​, which the reader is assumed to be familiar with. The  tables summarizing the algorithm have been reproduced on the next few pages. The additions  of this paper are highlighted​ ​in blue​.      InitializeCluster (NEW)  Called on a single server that becomes the first member of a  new cluster.    Set state:  databaseId currentTerm if unset, set 0  log[]   After initialization, move to follower state (then elect yourself  to leader).  keep current contents (which may be nothing)  generate a universally unique id  State  Persistent state on all servers:  (Updated on stable storage before responding to RPCs)  databaseId unique, constant identifier generated by  InitializeCluster.  currentTerm latest term server has seen (initialized to  votedFor log[] 0, increases monotonically)  candidateId that received vote in current  term (or null if none)  log entries; each entry contains command  for state machine, and term when entry  was received by leader (first index is 1)    Volatile state on all servers:  commitIndex index of highest log entry known to be  committed (initialized to 0, increases  monotonically)  lastApplied index of highest log entry applied to state  machine (initialized to 0, increases  monotonically)    Volatile state on all leaders:  (Reinitialized after election)  nextIndex[] for each server, index of the next log entry  to send to that server (initialized to leader  last log index + 1)  matchIndex[] for each server, index of highest log entry  known to be replicated on server  (initialized to 0, increases monotonically)        3 
        AppendEntries RPC  new ones  leaders term  so follower can redirect clients  Invoked by leader to replicate log entries (§3.5), also  used as heartbeat (§3.4).  Arguments:  term leaderId prevLogIndex​index of log entry immediately preceding  prevLogTermterm of prevLogIndex entry  entries​[] leaderCommit​leader's commitIndex    Results:  term success log entries to store (empty for heartbeat;  may send more than one for efficiency)  currentTerm, for leader to update itself  true if follower contained entry matching  prevLogIndex and prevLogTerm    Receiver implementation:  1. Reply false if term < currentTerm (§3.3)  2. Reply false if log doesn't contain an entry at  3. prevLogIndex whose term matches prevLogTerm  (§3.5)  If an existing entry conflicts with a new one (same  index but different terms), delete the existing entry  and all that follow it (§3.5)  4. Append any new entries not already in the log  5. If leaderCommit > commitIndex, set commitIndex =  min(leaderCommit, index of last new entry)    AddServer RPC  Invoked by admin to add a server to the cluster  configuration.    Arguments:  newServer databaseId address of server to add to configuration  the universally unique databaseId of the  cluster to which the new server is added.  OK if server was added successfully  address of recent leader, if known    Results:  status leaderHint   Receiver implementation:  1. Reply NOT_LEADER if not leader (§6.2)  2. If newServer.databaseId is set, and  leader.databaseId != newServer.databaseId, reply  with error and instruct admin to reset the new server  to uninitialized state (e.g. delete database).    4  Rules for Servers  All servers:  ● ● If commitIndex > lastApplied: increment lastApplied,  apply log[lastApplied] to state machine (§3.5)  If RPC request or response contains term T >  currentTerm: set currentTerm = T, convert to follower  (§3.3)  Followers (§3.4):  ● Respond to RPCs from candidates and leaders  ● If election timeout elapses without receiving  AppendEntries RPC from current leader or granting vote  to candidate: ​call PreVote RPC  Candidates (§3.4)  ● On conversion to candidate, start election:  Increment currentTerm  Vote for self  Send RequestVote RPCs to all other servers  ○ ○ ○ Reset election timer  ○ If votes received from majority of servers: become leader  If AppendEntries RPC received from new leader: convert  to follower  If election time elapses: start new election  ● Leaders:  ● Upon election: send initial empty AppendEntries RPC  ● ● ● ● ● (heartbeat) to each server, repeat during idle periods to  prevent election timeouts (§3.4)  If command received from client: append entry to local  log, respond after entry applied to state machine (§3.5)  If last log index >= nextIndex for a follower: send  AppendEntries RPC with log entries starting at nextIndex  If successful: update nextIndex and matchIndex for  ○ follower (§3.5)  If AppendEntries fails because of log inconsistency:  decrement nextIndex and retry (§3.5)  ○ If there exists an N such that N > commitIndex, a  majority of matchIndex[i] >= N, and log[N].term ==  currentTerm: set commitIndex = N (§3.5, §3.6)  RemoveServer RPC  address of server to remove from configuration  Invoked by admin to remove a server from the cluster  configuration.    Arguments:  oldServer   Results:  status leaderHint   Receiver implementation:  1. Reply NOT_LEADER if not leader (§6.2)  2. Wait until previous configuration in log is committed  OK if server was removed successfully  address of recent leader, if known  3. Append new configuration entry to log (old configuration  without oldServer), commit it using majority of new  configuration (§4.1)  4. Reply OK and, if this server was removed, step down  (§4.1)  (§4.2.2) 
  InstallSnapshot RPC  Invoked by leader to send chunks of a snapshot to a follower.  Leaders always send chunks in order. AddServer may call  InstallSnapshot also to copy an empty initial state, in this  case zero bytes are transferred, but the state is synced.  Arguments:  term leaderId lastIndex leader's term  so follower can redirect clients  the snapshot replaces all entries up to and  including this index  term of lastIndex  latest cluster configuration as of lastIndex  (include only with first chunk)  byte offset where chunk is positioned in the  snapshot file  raw bytes of the snapshot file, starting at  chunk  true if this is the last chunk  lastTerm lastConfig offset data​[] done Results:  term currentTerm, for leader to update itself  Receiver implementation:  1. Reply immediately if term < currentTerm  2. Create new snapshot file if first chunk (offset is 0)  3. Write data into snapshot file at given offset  4. Reply and wait for more data chunks if done is false  5. If lastIndex is larger than latest snapshot's, ​or if lastIndex  and lastTerm are zero​, save snapshot file and Raft state  (​databaseId,​ lastIndex, lastTerm, lastConfig). Discard  any existing or partial snapshot.  If existing log entry has same index and term as  lastIndex and lastTerm, discard log up to through  lastIndex (but retain any following entries) and reply  7. Discard the entire log  8. Reset state machine using snapshot contents (and load  6. lastConfig as cluster configuration)          3. If newServer.databaseId is unset, set it to  leader.databaseId, also set currentTerm and  log.index to 0.  4. Catch up new server for fixed number of rounds.  Reply TIMEOUT if new server does not make  progress for an election timeout or if the last round  takes longer than the election timeout. (§4.2.1)  5. Wait until previous configuration in log is committed  (§4.1)  6. Append new configuration entry to log (old  configuration plus newServer), commit it using  majority of new configuration (§4.1)  7. Reply OK  Raft State for Compaction  Persisted before discarding log entries. Also sent from  leader to slow followers when transmitting state.  prevIndex index of last discarded entry (initialized to  0 on first boot)  term of last discarded entry (initialized to 0  on first boot)  the unique identifier for this database  latest cluster membership configuration up  through prevIndex  prevTerm databaseId prevConfig   PreVote RPC (NEW)  caller's term + 1  Called by a server before changing itself to Candidate  status. If a majority of servers return true, proceed to  Candidate. Otherwise, wait for another election timeout.    Arguments:  nextTerm candidateId caller  lastLogIndex index of caller's last log entry  lastLogTerm term of caller's last log entry    Results:  term voteGranted true means caller would receive vote if it  currentTerm, for caller to update itself  was a candidate    Receiver implementation:  1. Reply false if last AppendEntries call was received  less than election timeout ago (leader stickiness)  2. Reply false if nextTerm < currentTerm  3. If caller's log is is at least as up­to­date as receiver's  log, return true    5 
        RequestVote RPC  candidate's term  Invoked by candidates to gather votes (§3.4).    Arguments:  term candidateId candidate requesting vote  lastLogIndex index of candidate's last log entry (§3.6)  lastLogTerm term of candidate's last log entry (§3.6)    Results:  term voteGranted true means candidate received vote    Receiver implementation:  1. Reply false if last AppendEntries call was received  currentTerm, for candidate to update itself  less than election timeout ago (leader stickiness)  2. Reply false if term < currentTerm (§3.3)  3. If votedFor is null or candidateId, and candidate's log  is at least as up­to­date as receiver's log, grant vote  (§3.4, §3.6)    Figure 1: Summary of the Raft algorithm, from (Ongaro, 2014). References to a section (like  §3.4) reference sections in (Ongaro, 2014). Additions proposed in this paper are ​highlighted in  blue​.​ InitializeCluster and​ PreVote RPC are new in their entirety, though the idea of a PreVote  was suggested in (Ongaro, 2014).  3. Cluster Initialization A real world implementation of Raft, or any other consensus protocol, would of course have  some kind of bootstrap procedure, that will execute multiple actions to initialize the state of a  new cluster. Arguably, such a bootstrap sequence is clearly implied in (Ongaro, 2014), as the  state is set to some default values "at first boot".  However, reviewers of previous versions of this paper pointed out that the descriptions of a  databaseId and the closely related AddServer RPC would benefit from having an explicit  initialization step as part of the Raft algorithm. In addition, the discussion resulted in a  consensus that all real world implementations we are familiar with, actually have an initialization  sequence that does exactly the steps we will document here. (Of course a real world  implementation would still do a number of other initialization tasks that will still not be relevant to  describe as part of Raft.)  The cluster initialization step is simple yet profound. When a server starts, it is incapable of  becoming a leader and hence incapable of processing any events from client. The server must  either join an existing cluster via an AddServer call, or explicitly become a new cluster with  InitializeCluster.  6 
If InitializeCluster is called on a server, it will generate a new databaseId (see next section) and  also set any other state variables to their default values. Note that after initialization, the server  will itself be a majority of a cluster with 1 server, so it will proceed to elect itself as a leader.  InitializeCluster marks the starting point of the lifecycle of a Raft cluster and state machine. The  life cycle ends when the last server is removed (or rather, removes itself) from the cluster. In  practice the lifecycle is more likely to end simply with the servers being shut down, never to be  restarted again.  However, it is also allowed to call InitializeCluster on a server that is already part of an initialized  cluster (that has one or more members). This could be necessary for a number of reasons. In  the Raft algorithm itself, it is for example possible to get into a state where a majority of servers  have been lost permanently. The cluster is now without a leader and will not be able to accept  new events from clients, and also will not be able to elect a new leader. Without a leader it is  also impossible to correct the situation, since AddServer and RemoveServer should be called  on the leader.  In such cases the solution is to call InitializeCluster again, essentially starting a new cluster from  the remains of the one that was just lost. As the databaseId is now re­generated, this also helps  prevent split brain. Or, in the case that the admin would call InitializeCluster on multiple parts of  the lost cluster, they would still split into separately identifiable parts, each with their own distinct  databaseId.  Therefore, when calling InitializeCluster, the server may or may not be empty of data. If there  already is some data on the server, the currentTerm and log index are preserved, but they  should be interpreted as the starting point in the lifecycle of a new cluster and detached from the  cluster they previously belonged to ­ the log now lives irrevocably in a parallel history.  Note that for the other route for a server to become part of a Raft cluster, via the AddServer call,  the opposite is true: The server must either already be in a state with a matching databaseId or  it must be in the empty and uninitialized state. It is not allowed to add a server that contains  some unrelated data (or state) ­ the administrator would have to first delete such database/state  from the new server.  Note that in the algorithm description above, we have specified InitializeCluster as an operation  that can only be called on a single server, which then becomes the sole member of its new  cluster. This is mostly an artefact of the way Raft is designed: it is always the Leader (or  sometimes a Candidate) that makes Remote Procedure Calls toward followers. But there is no  procedure where followers call to each other.  For real world implementations it may be desirable ­ in particular to avoid unnecessary  InstallSnapshot calls, which are expensive on large snapshot files ­ to allow a minority of  servers, still connected to each other, to enter into the new cluster together, rather than  initializing a single server first and then re­adding the others with AddServer RPC. This use  case is in fact implicitly supported: After calling InitializeCluster on the first server, it would be  7 
possible to copy the newly generated databaseId to any other servers one would want to bring  into the new cluster. For example, an implementation may provide some kind of setDatabaseId()  call for this purpose. Note that this would definitively be considered an "admin forceful override"  type of operation. The admin would only use this in unusual circumstances and must be sure to  only call it on nodes that actually share the same log history with the server the databaseId is  copied from. After setting the databaseId on a second server, it could then join the first server's  cluster normally via an AddServer call. Since the databaseId's of the servers match, the  AddServer call will succeed without an InstallSnapshot operation.  4. Universally Unique Database Identifier While the Raft algorithm as published in (Ongaro, 2014) does a great job in maintaining the  integrity of a single replicated state machine, in the real world database clusters don't live in a  vacuum. A sysadmin will be operating multiple servers in one or more datacenters, each server  belonging to some cluster. Failing to take this obvious fact into account, implementations will  often be left vulnerable to various split brain conditions, especially due to operational errors  such as misconfiguration. While we could blame many of these conditions on the sysadmin, it is  actually simple for an implementation to protect against such errors, and one should of course  therefore do so.  To illustrate a simple path to a split brain, one out of many possible, consider a 2 server cluster  and the following steps, that are fully legitimate operations as defined in the (Ongaro, 2014)  version of Raft:      Figure 2:  Sequence of steps leading to split brain, which goes undetected:  a) Two servers S1 and S2 form the cluster, with S1 being the leader. They commit  x<­1 and y<­2, after which there is a network split. Since neither server alone is a  majority, there is no leader and replication stops. At this point both servers have  (term=1, index=2).  b) To get the service back into available state, the sysadmin has to change the  cluster configuration by calling S1.RemoveServer(S2). (Strictly speaking this can  only be called on the leader, however since it is not possible for any server to  become leader in this state, a real world implementation would provide ­ or the  8 
分享到:
收藏