从0到1实现 Raft — 日志压缩 (MIT 6.5840 Lab3 PartD)
?我想学习下分布式系统的经典入门课程 MIT 6.824 ,正好看到木鸟(https://www.zhihu.com/people/qtmuniao)的课程 —— 基于 MIT 6.824 的课程,从零实现分布式 KV。门课会,手把手带你看论文写代码。所以这篇文章的主要内容是我的课程心得,介绍什么是Raft,并实现一个基本的框架。
希望这篇文章能带你进入分布式系统的大门,作为一个新手,我深知学习分布式系统是一个挑战,但我也坚信通过不断地学习和实践,我们可以逐步掌握其中的要领不断进步。期待与大家一起探索分布式系统的奥秘,共同成长。这篇文章只讨论怎么实现日志压缩部分,代码在这:https://github.com/Maricaya/raft/tree/partD
如果对你有帮助,请给我点赞 评论,这是我继续更新的最大动力~
?点击关注公众号,“技术干货”及时达!
我们前面介绍了 Raft 算法 part A Leader 选举、B 日志同步、C持久化,现在来看最后一章——partD 日志压缩。在 Raft 中,每个节点都会维护一个日志,用于记录节点的状态变化和通信历史。
随着系统运行时间的增长,日志可能会变得非常庞大。当日志过大时,不仅会占用大量的「存储空间」,还会「增加数据传输和处理的开销」。为了解决这个问题,Raft引入了日志压缩 Snapshotting 机制。
那什么是日志压缩 Snapshotting 呢?就是将日志中的旧数据转化为快照 Snapshot 的过程。快照是当前系统状态的静态副本,它包括系统的完整信息。
「简单来说,就是将某一时刻系统的状态保存下来并落地存储,这样该时刻之前的所有日志就都可以丢弃了。」
但是,这时候新的问题来了:
如果 Leader 想给从节点发送日志时,发现日志条目已经被截断怎么办?
很简单,我们先将 Leader 的 Snapshot 无脑同步给 Follower,再做之后其他的日志同步。我们把这个过程引入一个新的 RPCInstallSnapshot
图解snapshot那么具体怎么把日志压缩为 snapshot 呢?
举个例子你就明白了,来看这张 Raft 论文里的原图:
假设我们当前的日志存储了变量 x 和 y 的更新信息。其中,x 的更新依次是 3、2、0、5,而 y 的更新依次是 1、9、7。当前,日志下标为 1 到 5 的日志已经被提交 commited,表示这段日志已经过时,当前节点不再需要它们。
我们可以将这段日志的最后一次存储信息作为日志的快照,即 x=0,y=9。同时,我们记录最后包含的日志下标(last included index)以及其对应的任期。此时,我们的新日志仅包含未提交的部分,即 6、7,log 的长度也从 7 缩减为 2。
因此,我们可以观察到快照存储是根据 Raft 节点的数量确定的。每个节点都会保存自己的快照,其中快照的信息相当于 commit 后的日志。
快照机制快照机制通常包括以下几个关键步骤:
生成快照:首先,Leader 节点会生成当前系统状态的快照。快照包含了系统的状态信息,比如存储在节点中的数据以及节点的配置信息。传输快照:一旦快照生成完成,Leader 节点会将快照传输给其他节点。这一步,可能包括传递整个快照,或者通过正常的日志同步逐步更新其他节点的状态(和PartB一样)。应用快照:接收到快照的节点会将快照应用到自己的状态机中。这样,节点就可以从快照中恢复出系统的状态。更新日志:一旦快照被应用到状态机中,节点会更新自己的日志。旧的日志条目就可以被删除,从而释放存储空间。「持久化快照(这一步没画出来):」 为了防止节点在重新启动后丢失快照,系统会将快照持久化存储。总的来说,快照机制的实现可以显著减少系统中存储的日志大小,从而提高系统的性能和效率。还可以简化系统的故障恢复过程,因为节点在重新启动后只需加载最新的快照即可恢复状态,不用逐个应用历史日志。
开始写代码下面,我们开始愉快的写代码了~
定义快照,使用 snapsohot 重构日志代码定义 log 压缩之后的结构体RaftLog,主要包含三部分:日志截断后压缩成的 snapshot,剩余日志 tailLog 和 分界线 snapLastIdx、snapLastTerm。
「在初始化 tailLog 的时候,我们把第一个日志设置为」 「{Term: snapLastIdx}」 「,真正的下标从 1 开始,对应」 「snapLastIdx + 1」 「,这样做转化的时候更容易。」
typeRaftLogstruct{
snapLastIdxint
snapLastTermint
//containsindex[1,snapLastIdx]
snapshot[]byte
//thefirstentryis`snapLastIdx`,butonlycontainsthesnapLastTerm
//theentriesbetween(snapLastIdx,snapLastIdx+len(tailLog)-1]haverealdata
tailLog[]LogEntry
}
把原来的日志类型为 RaftLog
typeRaftstruct{
//loginthePeer'slocal
log[]LogEntry
log*RaftLog
}
把原来的日志操作为 RaftLog 的操作
//thedummylogiscounted
func(rl*RaftLog)size()int{
returnrl.snapLastIdx+len(rl.tailLog)
}
//accesstheindex`rl.snapLastIdx`isallowed,althoughit'snotexistactually.
func(rl*RaftLog)idx(logicIdxint)int{
iflogicIdxrl.snapLastIdx||logicIdx=rl.size(){
panic(fmt.Sprintf("%disoutof[%d,%d]",logicIdx,rl.snapLastIdx+1,rl.size()-1))
}
returnlogicIdx-rl.snapLastIdx
}
func(rl*RaftLog)at(logicIdxint)LogEntry{
returnrl.tailLog[rl.idx(logicIdx)]
}
func(rl*RaftLog)last()(idx,termint){
returnrl.size()-1,rl.tailLog[len(rl.tailLog)-1].Term
}
func(rl*RaftLog)tail(startIdxint)[]LogEntry{
ifstartIdx=rl.size(){
returnnil
}
returnrl.tailLog[rl.idx(startIdx):]
}
func(rl*RaftLog)firstLogFor(termint)int{
foridx,entry:=rangerl.tailLog{
ifentry.Term==term{
returnidx
}elseifentry.Termterm{
break
}
}
returnInvalidIndex
}
func(rl*RaftLog)append(eLogEntry){
rl.tailLog=append(rl.tailLog,e)
}
func(rl*RaftLog)appendFrom(prevIdxint,entries[]LogEntry){
rl.tailLog=append(rl.tailLog[:rl.idx(prevIdx)+1],entries...)
}
「生成snapshot,截断日志」Snapshot 方法接收快照的索引和快照数据。
不能在 commitIndex 之前的位置创建快照,也就是说 index commitIndex 时,直接返回。不能在已经有快照的地方再次进行快照,所以 index = 已经存在的最新的快照索引 rl.log.snapLastIdx 时,直接返回。先进行一些条件检查:
然后调用 doSnapshot 方法进行实际的快照操作。
func(rf*Raft)Snapshot(indexint,snapshot[]byte){
//Yourcodehere(PartD).
rf.mu.Lock()
deferrf.mu.Unlock()
LOG(rf.me,rf.currentTerm,DSnap,"Snapon%d",index)
ifindexrf.commitIndex{
LOG(rf.me,rf.currentTerm,DSnap,"Couldn'tsnapshotbeforeCommitIdx:%d%d",index,rf.commitIndex)
return
}
ifindex=rf.log.snapLastIdx{
LOG(rf.me,rf.currentTerm,DSnap,"Alreadysnapshotin%d=%d",index,rf.log.snapLastIdx)
return
}
rf.log.doSnapshot(index,snapshot)
//todo持久化
}
doSnapshot 方法执行了快照的实际操作
先更新快照的最后一个索引和对应的任期。接着,对 tailLog 进行更新,即删除快照索引之前的所有日志条目,只保留快照索引后的日志条目,并重新构建 tailLog。这段代码的关键逻辑是对日志进行压缩,「通过删除已经快照的日志条目来减少存储和传输的数据量。」
func(rl*RaftLog)doSnapshot(indexint,snapshot[]byte){
//sinceidx()willuserl.snapLastIdx,soweshouldkeepitfirst
idx:=rl.idx(index)
rl.snapLastTerm=rl.tailLog[idx].Term
rl.snapLastIdx=index
rl.snapshot=snapshot
//allocateanewslice
newLog:=make([]LogEntry,0,rl.size()-rl.snapLastIdx)
newLog=append(newLog,LogEntry{
Term:rl.snapLastTerm,
})
newLog=append(newLog,rl.tailLog[idx+1:]...)
rl.tailLog=newLog
}
发送 snapshot接下来从 Leader 发送 snapshot 给 Follower。Raft 论文里给我们提供了 InstallSnapshot RPC 的写法,我们按照要求实现即可。整体代码逻辑如下:
第一步,发送snapshot,当 Follower 的日志落后于 Leader snapshot 的时候,Leader 直接向 Follower 发送全部快照,Follower 替换当前的 snapshot。
检查条件:if prevIdx rf.log.snapLastIdx { ... }:上一条日志的索引是否小于快照的最后索引。
在此之后,生成 InstallSnapshotArgs 对象,定期的向 Followers 发送InstallSnapshot RPC
func(rf*Raft)startReplication(termint)bool{
//...
//todoinstallOnPeer
//...
prevIdx:=rf.nextIndex[peer]-1
ifprevIdxrf.log.snapLastIdx{
args:=&InstallSnapshotArgs{
Term:rf.currentTerm,
LeaderId:rf.me,
LastIncludedIndex:rf.log.snapLastIdx,
LastIncludedTerm:rf.log.snapLastTerm,
Snapshot:rf.log.snapshot,
}
LOG(rf.me,rf.currentTerm,DDebug,"-S%d,InstallSnap,Args=%v",peer,args.String())
goinstallOnPeer(peer,term,args)
continue
}
//...
}
「Followers」 接收 snapshot, 「处理 AppendEntries 的 RPC」Follower 接收到 Leader RPC 后替换本地日志,我们可以参考 AppendEntries 来写这段代码。
InstallSnapshot进行边界检查,首先是 Leader 选举中的 Term 逻辑检查:
Leader 的 Term 必须大于当前节点的 Term,如果小于则返回。如果 Leader Term 大于等于当前节点的 Term,则将当前节点转变为 Follower。然后检查 snapshot 是不是已经安装在当前节点中,如果是,就拒绝接受snapshot 。
检查完成后,开始安装快照,将接收到的快照安装到当前节点的内存、持久化存储和应用层中。调用 rf.log.installSnapshot。
标记快照为待处理状态,并发送信号通知应用层快照已经准备好,可以进行处理。
typeInstallSnapshotReplystruct{
Termint
}
//follower——参照AppendEntries
func(rf*Raft)InstallSnapshot(args*InstallSnapshotArgs,reply*InstallSnapshotReply){
rf.mu.Lock()
deferrf.mu.Unlock()
LOG(rf.me,rf.currentTerm,DDebug,"-S%d,RecvSnap,Args=%v",args.LeaderId,args.String())
//aligntheterm
reply.Term=rf.currentTerm
ifargs.Termrf.currentTerm{
LOG(rf.me,rf.currentTerm,DSnap,"-S%d,RejectSnap,HigherTerm,T%dT%d",args.LeaderId,rf.currentTerm,args.Term)
return
}
ifargs.Term=rf.currentTerm{//handlethecasewhenthepeeriscandidate
rf.becomeFollowerLocked(args.Term)
}
//checkifitisaRPCwhichisoutoforder
ifrf.log.snapLastIdx=args.LastIncludedIndex{
LOG(rf.me,rf.currentTerm,DSnap,"-S%d,RejectSnap,Alreadyinstalled,Last:%d=%d",args.LeaderId,rf.log.snapLastIdx,args.LastIncludedIndex)
return
}
//installthesnapshotinthememory/persister/app
rf.log.installSnapshot(args.LastIncludedIndex,args.LastIncludedTerm,args.Snapshot)
//todo持久化
rf.applyCond.Signal()
}
完成 rf.log.installSnapshot(args.LastIncludedIndex, args.LastIncludedTerm, args.Snapshot) 代码
先设置快照的snapLastIdx、snapLastTerm和snapshot,表示当前快照的数据内容。
再创建一个新的日志数组newLog,初始化为只包含 snapLastTerm 的数据,并将其赋值给 RaftLog 结构体中的 tailLog 字段,代替原有的日志数组。
//isntallsnapshotfromtheleadertothefollower
func(rl*RaftLog)installSnapshot(index,termint,snapshot[]byte){
rl.snapLastIdx=index
rl.snapLastTerm=term
rl.snapshot=snapshot
//makeanewlogarray
//justdiscardallthelocallog,andusetheleader'ssnapshot
newLog:=make([]LogEntry,0,1)
newLog=append(newLog,LogEntry{
Term:rl.snapLastTerm,
})
rl.tailLog=newLog
}
返回,处理错误Leader 节点执行 InstallSnapshot RPC 后的处理过程。
参考 startReplication - replicateToPeer,进行边界检查 Term 和当前状态。「然后,更新 matchIndex 和 nextIndex,如果snapshot 比当前 matchIndex 大,就更新 matchIndex 和nextIndex。」 最后,不需尝试再次更新 commitIndex,因为 snapshot 包含所有已提交的 index。
func(rf*Raft)sendInstallSnapshot(serverint,args*InstallSnapshotArgs,reply*InstallSnapshotReply)bool{
ok:=rf.peers[server].Call("Raft.InstallSnapshot",args,reply)
returnok
}
//参考 startReplication - replicateToPeer,进行边界检查 Term 和当前状态。
//然后,更新matchIndex和nextIndex
installOnPeer:=func(peerint,termint,args*InstallSnapshotArgs){reply:=&InstallSnapshotReply{}ok:=rf.sendInstallSnapshot(peer,args,reply)rf.mu.Lock()deferrf.mu.Unlock()if!ok{LOG(rf.me,rf.currentTerm,DSnap,"-S%d,Lostorcrashed",peer)return}LOG(rf.me,rf.currentTerm,DDebug,"-S%d,Append,Reply=%v",peer,reply.String())//alignthetermifreply.Termrf.currentTerm{rf.becomeFollowerLocked(reply.Term)return}ifrf.contextLostLocked(Leader,term){LOG(rf.me,rf.currentTerm,DLog,"-S%d,ContextLost,T%d:Leader-T%d:%s",peer,term,rf.currentTerm,rf.role)return}//updatethematchandnextifargs.LastIncludedIndexrf.matchIndex[peer]{//toavoiddisorderreplyrf.matchIndex[peer]=args.LastIncludedIndexrf.nextIndex[peer]=args.LastIncludedIndex+1}//note:weneednottrytoupdatethecommitIndexagain,//becausethesnapshotincludedindexesareallcommitted}
应用 snapshot将已经 commit 但尚未 apply 到状态机的日志条目 apply 到状态机上。
我们在 partC 的 applicationTicker 基础上进行改造, 我们加入一个新的标记变量 snapPending,判断当前 apply 的内容是 snapshot,还是普通的 log。
typeRaftstruct{
//...
snapPendingbool
//...
}
//follower
func(rf*Raft)InstallSnapshot(args*InstallSnapshotArgs,reply*InstallSnapshotReply){
//...
rf.snapPending=true
rf.applyCond.Signal()
}
//初始化
funcMake(peers[]*labrpc.ClientEnd,meint,
persister*Persister,applyChchanApplyMsg)*Raft{
//...
rf.commitIndex=0
rf.lastApplied=0
rf.snapPending=false
//initializefromstatepersistedbeforeacrash
rf.readPersist(persister.ReadRaftState())
//...
returnrf
}
func(rf*Raft)applicationTicker(){
for!rf.killed(){
rf.mu.Lock()
rf.applyCond.Wait()
entries:=make([]LogEntry,0)
snapPendingApply:=rf.snapPending
if!snapPendingApply{
//改造log,找到entries
ifrf.lastAppliedrf.log.snapLastIdx{
rf.lastApplied=rf.log.snapLastIdx
}
//makesurethattherf.loghavealltheentries
start:=rf.lastApplied+1
end:=rf.commitIndex
ifend=rf.log.size(){
end=rf.log.size()-1
}
fori:=start;i=i++{
entries=append(entries,rf.log.at(i))
}
}
rf.mu.Unlock()
if!snapPendingApply{
fori,entry:=rangeentries{
rf.applyCh-ApplyMsg{
CommandValid:entry.CommandValid,
Command:entry.Command,
CommandIndex:rf.lastApplied+1+i,//mustbecautious
}
}
}else{
rf.applyCh-ApplyMsg{
SnapshotValid:true,
Snapshot:rf.log.snapshot,
SnapshotIndex:rf.log.snapLastIdx,
SnapshotTerm:rf.log.snapLastTerm,
}
}
rf.mu.Lock()
if!snapPendingApply{
LOG(rf.me,rf.currentTerm,DApply,"Applylogfor[%d,%d]",rf.lastApplied+1,rf.lastApplied+len(entries))
rf.lastApplied+=len(entries)
}else{
LOG(rf.me,rf.currentTerm,DApply,"InstallSnapshotfor[0,%d]",rf.log.snapLastIdx)
//更新rf.lastApplied和rf.commitIndex
rf.lastApplied=rf.log.snapLastIdx
ifrf.commitIndexrf.lastApplied{
rf.commitIndex=rf.lastApplied
}
rf.snapPending=false
}
rf.mu.Unlock()
}
}
「持久化快照」最后,对 snapshot 进行持久化改造。
//---raft_compaction.go
func(rf*Raft)Snapshot(indexint,snapshot[]byte){
//Yourcodehere(PartD).
rf.mu.Lock()
deferrf.mu.Unlock()
rf.log.doSnapshot(index,snapshot)
rf.persistLocked()
}
//---raft_log.go
func(rl*RaftLog)doSnapshot(indexint,snapshot[]byte){
ifindex=rl.snapLastIdx{
return
}
//sinceidx()willuserl.snapLastIdx,soweshouldkeepitfirst
idx:=rl.idx(index)
rl.snapLastTerm=rl.tailLog[idx].Term
rl.snapLastIdx=index
rl.snapshot=snapshot
//allocateanewslice
newLog:=make([]LogEntry,0,rl.size()-rl.snapLastIdx)
newLog=append(newLog,LogEntry{
Term:rl.snapLastTerm,
})
newLog=append(newLog,rl.tailLog[idx+1:]...)
rl.tailLog=newLog
}
//---raft_persistence.go
func(rf*Raft)persistLocked(){
w:=new(bytes.Buffer)
e:=labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
rf.log.persist(e)
raftstate:=w.Bytes()
rf.persister.Save(raftstate,rf.log.snapshot)
}
func(rf*Raft)InstallSnapshot(args*InstallSnapshotArgs,reply*InstallSnapshotReply){
//...
rf.log.installSnapshot(args.LastIncludedIndex,args.LastIncludedTerm,args.Snapshot)
rf.persistLocked()
//...
}
点击关注公众号,“技术干货”及时达!
阅读原文
网站开发网络凭借多年的网站建设经验,坚持以“帮助中小企业实现网络营销化”为宗旨,累计为4000多家客户提供品质建站服务,得到了客户的一致好评。如果您有网站建设、网站改版、域名注册、主机空间、手机网站建设、网站备案等方面的需求...
请立即点击咨询我们或拨打咨询热线:13245491521 13245491521 ,我们会详细为你一一解答你心中的疑难。 项目经理在线