全国免费咨询:

13245491521

VR图标白色 VR图标黑色
X

中高端软件定制开发服务商

与我们取得联系

13245491521     13245491521

2024-03-17_从0到1实现 Raft — 日志压缩 (MIT 6.5840 Lab3 PartD)

您的位置:首页 >> 新闻 >> 行业资讯

从0到1实现 Raft — 日志压缩 (MIT 6.5840 Lab3 PartD) ?我想学习下分布式系统的经典入门课程 MIT 6.824 ,正好看到木鸟(https://www.zhihu.com/people/qtmuniao)的课程 —— 基于 MIT 6.824 的课程,从零实现分布式 KV。门课会,手把手带你看论文写代码。所以这篇文章的主要内容是我的课程心得,介绍什么是Raft,并实现一个基本的框架。 希望这篇文章能带你进入分布式系统的大门,作为一个新手,我深知学习分布式系统是一个挑战,但我也坚信通过不断地学习和实践,我们可以逐步掌握其中的要领不断进步。期待与大家一起探索分布式系统的奥秘,共同成长。这篇文章只讨论怎么实现日志压缩部分,代码在这:https://github.com/Maricaya/raft/tree/partD 如果对你有帮助,请给我点赞 评论,这是我继续更新的最大动力~ ?点击关注公众号,“技术干货”及时达! 我们前面介绍了 Raft 算法 part A Leader 选举、B 日志同步、C持久化,现在来看最后一章——partD 日志压缩。在 Raft 中,每个节点都会维护一个日志,用于记录节点的状态变化和通信历史。 随着系统运行时间的增长,日志可能会变得非常庞大。当日志过大时,不仅会占用大量的「存储空间」,还会「增加数据传输和处理的开销」。为了解决这个问题,Raft引入了日志压缩 Snapshotting 机制。 那什么是日志压缩 Snapshotting 呢?就是将日志中的旧数据转化为快照 Snapshot 的过程。快照是当前系统状态的静态副本,它包括系统的完整信息。 「简单来说,就是将某一时刻系统的状态保存下来并落地存储,这样该时刻之前的所有日志就都可以丢弃了。」 但是,这时候新的问题来了: 如果 Leader 想给从节点发送日志时,发现日志条目已经被截断怎么办? 很简单,我们先将 Leader 的 Snapshot 无脑同步给 Follower,再做之后其他的日志同步。我们把这个过程引入一个新的 RPCInstallSnapshot 图解snapshot那么具体怎么把日志压缩为 snapshot 呢? 举个例子你就明白了,来看这张 Raft 论文里的原图: 假设我们当前的日志存储了变量 x 和 y 的更新信息。其中,x 的更新依次是 3、2、0、5,而 y 的更新依次是 1、9、7。当前,日志下标为 1 到 5 的日志已经被提交 commited,表示这段日志已经过时,当前节点不再需要它们。 我们可以将这段日志的最后一次存储信息作为日志的快照,即 x=0,y=9。同时,我们记录最后包含的日志下标(last included index)以及其对应的任期。此时,我们的新日志仅包含未提交的部分,即 6、7,log 的长度也从 7 缩减为 2。 因此,我们可以观察到快照存储是根据 Raft 节点的数量确定的。每个节点都会保存自己的快照,其中快照的信息相当于 commit 后的日志。 快照机制快照机制通常包括以下几个关键步骤: 生成快照:首先,Leader 节点会生成当前系统状态的快照。快照包含了系统的状态信息,比如存储在节点中的数据以及节点的配置信息。传输快照:一旦快照生成完成,Leader 节点会将快照传输给其他节点。这一步,可能包括传递整个快照,或者通过正常的日志同步逐步更新其他节点的状态(和PartB一样)。应用快照:接收到快照的节点会将快照应用到自己的状态机中。这样,节点就可以从快照中恢复出系统的状态。更新日志:一旦快照被应用到状态机中,节点会更新自己的日志。旧的日志条目就可以被删除,从而释放存储空间。「持久化快照(这一步没画出来):」 为了防止节点在重新启动后丢失快照,系统会将快照持久化存储。总的来说,快照机制的实现可以显著减少系统中存储的日志大小,从而提高系统的性能和效率。还可以简化系统的故障恢复过程,因为节点在重新启动后只需加载最新的快照即可恢复状态,不用逐个应用历史日志。 开始写代码下面,我们开始愉快的写代码了~ 定义快照,使用 snapsohot 重构日志代码定义 log 压缩之后的结构体RaftLog,主要包含三部分:日志截断后压缩成的 snapshot,剩余日志 tailLog 和 分界线 snapLastIdx、snapLastTerm。 「在初始化 tailLog 的时候,我们把第一个日志设置为」 「{Term: snapLastIdx}」 「,真正的下标从 1 开始,对应」 「snapLastIdx + 1」 「,这样做转化的时候更容易。」 typeRaftLogstruct{ snapLastIdxint snapLastTermint //containsindex[1,snapLastIdx] snapshot[]byte //thefirstentryis`snapLastIdx`,butonlycontainsthesnapLastTerm //theentriesbetween(snapLastIdx,snapLastIdx+len(tailLog)-1]haverealdata tailLog[]LogEntry } 把原来的日志类型为 RaftLog typeRaftstruct{ //loginthePeer'slocal log[]LogEntry log*RaftLog } 把原来的日志操作为 RaftLog 的操作 //thedummylogiscounted func(rl*RaftLog)size()int{ returnrl.snapLastIdx+len(rl.tailLog) } //accesstheindex`rl.snapLastIdx`isallowed,althoughit'snotexistactually. func(rl*RaftLog)idx(logicIdxint)int{ iflogicIdxrl.snapLastIdx||logicIdx=rl.size(){ panic(fmt.Sprintf("%disoutof[%d,%d]",logicIdx,rl.snapLastIdx+1,rl.size()-1)) } returnlogicIdx-rl.snapLastIdx } func(rl*RaftLog)at(logicIdxint)LogEntry{ returnrl.tailLog[rl.idx(logicIdx)] } func(rl*RaftLog)last()(idx,termint){ returnrl.size()-1,rl.tailLog[len(rl.tailLog)-1].Term } func(rl*RaftLog)tail(startIdxint)[]LogEntry{ ifstartIdx=rl.size(){ returnnil } returnrl.tailLog[rl.idx(startIdx):] } func(rl*RaftLog)firstLogFor(termint)int{ foridx,entry:=rangerl.tailLog{ ifentry.Term==term{ returnidx }elseifentry.Termterm{ break } } returnInvalidIndex } func(rl*RaftLog)append(eLogEntry){ rl.tailLog=append(rl.tailLog,e) } func(rl*RaftLog)appendFrom(prevIdxint,entries[]LogEntry){ rl.tailLog=append(rl.tailLog[:rl.idx(prevIdx)+1],entries...) } 「生成snapshot,截断日志」Snapshot 方法接收快照的索引和快照数据。 不能在 commitIndex 之前的位置创建快照,也就是说 index commitIndex 时,直接返回。不能在已经有快照的地方再次进行快照,所以 index = 已经存在的最新的快照索引 rl.log.snapLastIdx 时,直接返回。先进行一些条件检查: 然后调用 doSnapshot 方法进行实际的快照操作。 func(rf*Raft)Snapshot(indexint,snapshot[]byte){ //Yourcodehere(PartD). rf.mu.Lock() deferrf.mu.Unlock() LOG(rf.me,rf.currentTerm,DSnap,"Snapon%d",index) ifindexrf.commitIndex{ LOG(rf.me,rf.currentTerm,DSnap,"Couldn'tsnapshotbeforeCommitIdx:%d%d",index,rf.commitIndex) return } ifindex=rf.log.snapLastIdx{ LOG(rf.me,rf.currentTerm,DSnap,"Alreadysnapshotin%d=%d",index,rf.log.snapLastIdx) return } rf.log.doSnapshot(index,snapshot) //todo持久化 } doSnapshot 方法执行了快照的实际操作 先更新快照的最后一个索引和对应的任期。接着,对 tailLog 进行更新,即删除快照索引之前的所有日志条目,只保留快照索引后的日志条目,并重新构建 tailLog。这段代码的关键逻辑是对日志进行压缩,「通过删除已经快照的日志条目来减少存储和传输的数据量。」 func(rl*RaftLog)doSnapshot(indexint,snapshot[]byte){ //sinceidx()willuserl.snapLastIdx,soweshouldkeepitfirst idx:=rl.idx(index) rl.snapLastTerm=rl.tailLog[idx].Term rl.snapLastIdx=index rl.snapshot=snapshot //allocateanewslice newLog:=make([]LogEntry,0,rl.size()-rl.snapLastIdx) newLog=append(newLog,LogEntry{ Term:rl.snapLastTerm, }) newLog=append(newLog,rl.tailLog[idx+1:]...) rl.tailLog=newLog } 发送 snapshot接下来从 Leader 发送 snapshot 给 Follower。Raft 论文里给我们提供了 InstallSnapshot RPC 的写法,我们按照要求实现即可。整体代码逻辑如下: 第一步,发送snapshot,当 Follower 的日志落后于 Leader snapshot 的时候,Leader 直接向 Follower 发送全部快照,Follower 替换当前的 snapshot。 检查条件:if prevIdx rf.log.snapLastIdx { ... }:上一条日志的索引是否小于快照的最后索引。 在此之后,生成 InstallSnapshotArgs 对象,定期的向 Followers 发送InstallSnapshot RPC func(rf*Raft)startReplication(termint)bool{ //... //todoinstallOnPeer //... prevIdx:=rf.nextIndex[peer]-1 ifprevIdxrf.log.snapLastIdx{ args:=&InstallSnapshotArgs{ Term:rf.currentTerm, LeaderId:rf.me, LastIncludedIndex:rf.log.snapLastIdx, LastIncludedTerm:rf.log.snapLastTerm, Snapshot:rf.log.snapshot, } LOG(rf.me,rf.currentTerm,DDebug,"-S%d,InstallSnap,Args=%v",peer,args.String()) goinstallOnPeer(peer,term,args) continue } //... } 「Followers」 接收 snapshot, 「处理 AppendEntries 的 RPC」Follower 接收到 Leader RPC 后替换本地日志,我们可以参考 AppendEntries 来写这段代码。 InstallSnapshot进行边界检查,首先是 Leader 选举中的 Term 逻辑检查: Leader 的 Term 必须大于当前节点的 Term,如果小于则返回。如果 Leader Term 大于等于当前节点的 Term,则将当前节点转变为 Follower。然后检查 snapshot 是不是已经安装在当前节点中,如果是,就拒绝接受snapshot 。 检查完成后,开始安装快照,将接收到的快照安装到当前节点的内存、持久化存储和应用层中。调用 rf.log.installSnapshot。 标记快照为待处理状态,并发送信号通知应用层快照已经准备好,可以进行处理。 typeInstallSnapshotReplystruct{ Termint } //follower——参照AppendEntries func(rf*Raft)InstallSnapshot(args*InstallSnapshotArgs,reply*InstallSnapshotReply){ rf.mu.Lock() deferrf.mu.Unlock() LOG(rf.me,rf.currentTerm,DDebug,"-S%d,RecvSnap,Args=%v",args.LeaderId,args.String()) //aligntheterm reply.Term=rf.currentTerm ifargs.Termrf.currentTerm{ LOG(rf.me,rf.currentTerm,DSnap,"-S%d,RejectSnap,HigherTerm,T%dT%d",args.LeaderId,rf.currentTerm,args.Term) return } ifargs.Term=rf.currentTerm{//handlethecasewhenthepeeriscandidate rf.becomeFollowerLocked(args.Term) } //checkifitisaRPCwhichisoutoforder ifrf.log.snapLastIdx=args.LastIncludedIndex{ LOG(rf.me,rf.currentTerm,DSnap,"-S%d,RejectSnap,Alreadyinstalled,Last:%d=%d",args.LeaderId,rf.log.snapLastIdx,args.LastIncludedIndex) return } //installthesnapshotinthememory/persister/app rf.log.installSnapshot(args.LastIncludedIndex,args.LastIncludedTerm,args.Snapshot) //todo持久化 rf.applyCond.Signal() } 完成 rf.log.installSnapshot(args.LastIncludedIndex, args.LastIncludedTerm, args.Snapshot) 代码 先设置快照的snapLastIdx、snapLastTerm和snapshot,表示当前快照的数据内容。 再创建一个新的日志数组newLog,初始化为只包含 snapLastTerm 的数据,并将其赋值给 RaftLog 结构体中的 tailLog 字段,代替原有的日志数组。 //isntallsnapshotfromtheleadertothefollower func(rl*RaftLog)installSnapshot(index,termint,snapshot[]byte){ rl.snapLastIdx=index rl.snapLastTerm=term rl.snapshot=snapshot //makeanewlogarray //justdiscardallthelocallog,andusetheleader'ssnapshot newLog:=make([]LogEntry,0,1) newLog=append(newLog,LogEntry{ Term:rl.snapLastTerm, }) rl.tailLog=newLog } 返回,处理错误Leader 节点执行 InstallSnapshot RPC 后的处理过程。 参考 startReplication - replicateToPeer,进行边界检查 Term 和当前状态。「然后,更新 matchIndex 和 nextIndex,如果snapshot 比当前 matchIndex 大,就更新 matchIndex 和nextIndex。」 最后,不需尝试再次更新 commitIndex,因为 snapshot 包含所有已提交的 index。 func(rf*Raft)sendInstallSnapshot(serverint,args*InstallSnapshotArgs,reply*InstallSnapshotReply)bool{ ok:=rf.peers[server].Call("Raft.InstallSnapshot",args,reply) returnok } //参考 startReplication - replicateToPeer,进行边界检查 Term 和当前状态。 //然后,更新matchIndex和nextIndex installOnPeer:=func(peerint,termint,args*InstallSnapshotArgs){reply:=&InstallSnapshotReply{}ok:=rf.sendInstallSnapshot(peer,args,reply)rf.mu.Lock()deferrf.mu.Unlock()if!ok{LOG(rf.me,rf.currentTerm,DSnap,"-S%d,Lostorcrashed",peer)return}LOG(rf.me,rf.currentTerm,DDebug,"-S%d,Append,Reply=%v",peer,reply.String())//alignthetermifreply.Termrf.currentTerm{rf.becomeFollowerLocked(reply.Term)return}ifrf.contextLostLocked(Leader,term){LOG(rf.me,rf.currentTerm,DLog,"-S%d,ContextLost,T%d:Leader-T%d:%s",peer,term,rf.currentTerm,rf.role)return}//updatethematchandnextifargs.LastIncludedIndexrf.matchIndex[peer]{//toavoiddisorderreplyrf.matchIndex[peer]=args.LastIncludedIndexrf.nextIndex[peer]=args.LastIncludedIndex+1}//note:weneednottrytoupdatethecommitIndexagain,//becausethesnapshotincludedindexesareallcommitted} 应用 snapshot将已经 commit 但尚未 apply 到状态机的日志条目 apply 到状态机上。 我们在 partC 的 applicationTicker 基础上进行改造, 我们加入一个新的标记变量 snapPending,判断当前 apply 的内容是 snapshot,还是普通的 log。 typeRaftstruct{ //... snapPendingbool //... } //follower func(rf*Raft)InstallSnapshot(args*InstallSnapshotArgs,reply*InstallSnapshotReply){ //... rf.snapPending=true rf.applyCond.Signal() } //初始化 funcMake(peers[]*labrpc.ClientEnd,meint, persister*Persister,applyChchanApplyMsg)*Raft{ //... rf.commitIndex=0 rf.lastApplied=0 rf.snapPending=false //initializefromstatepersistedbeforeacrash rf.readPersist(persister.ReadRaftState()) //... returnrf } func(rf*Raft)applicationTicker(){ for!rf.killed(){ rf.mu.Lock() rf.applyCond.Wait() entries:=make([]LogEntry,0) snapPendingApply:=rf.snapPending if!snapPendingApply{ //改造log,找到entries ifrf.lastAppliedrf.log.snapLastIdx{ rf.lastApplied=rf.log.snapLastIdx } //makesurethattherf.loghavealltheentries start:=rf.lastApplied+1 end:=rf.commitIndex ifend=rf.log.size(){ end=rf.log.size()-1 } fori:=start;i=i++{ entries=append(entries,rf.log.at(i)) } } rf.mu.Unlock() if!snapPendingApply{ fori,entry:=rangeentries{ rf.applyCh-ApplyMsg{ CommandValid:entry.CommandValid, Command:entry.Command, CommandIndex:rf.lastApplied+1+i,//mustbecautious } } }else{ rf.applyCh-ApplyMsg{ SnapshotValid:true, Snapshot:rf.log.snapshot, SnapshotIndex:rf.log.snapLastIdx, SnapshotTerm:rf.log.snapLastTerm, } } rf.mu.Lock() if!snapPendingApply{ LOG(rf.me,rf.currentTerm,DApply,"Applylogfor[%d,%d]",rf.lastApplied+1,rf.lastApplied+len(entries)) rf.lastApplied+=len(entries) }else{ LOG(rf.me,rf.currentTerm,DApply,"InstallSnapshotfor[0,%d]",rf.log.snapLastIdx) //更新rf.lastApplied和rf.commitIndex rf.lastApplied=rf.log.snapLastIdx ifrf.commitIndexrf.lastApplied{ rf.commitIndex=rf.lastApplied } rf.snapPending=false } rf.mu.Unlock() } } 「持久化快照」最后,对 snapshot 进行持久化改造。 //---raft_compaction.go func(rf*Raft)Snapshot(indexint,snapshot[]byte){ //Yourcodehere(PartD). rf.mu.Lock() deferrf.mu.Unlock() rf.log.doSnapshot(index,snapshot) rf.persistLocked() } //---raft_log.go func(rl*RaftLog)doSnapshot(indexint,snapshot[]byte){ ifindex=rl.snapLastIdx{ return } //sinceidx()willuserl.snapLastIdx,soweshouldkeepitfirst idx:=rl.idx(index) rl.snapLastTerm=rl.tailLog[idx].Term rl.snapLastIdx=index rl.snapshot=snapshot //allocateanewslice newLog:=make([]LogEntry,0,rl.size()-rl.snapLastIdx) newLog=append(newLog,LogEntry{ Term:rl.snapLastTerm, }) newLog=append(newLog,rl.tailLog[idx+1:]...) rl.tailLog=newLog } //---raft_persistence.go func(rf*Raft)persistLocked(){ w:=new(bytes.Buffer) e:=labgob.NewEncoder(w) e.Encode(rf.currentTerm) e.Encode(rf.votedFor) rf.log.persist(e) raftstate:=w.Bytes() rf.persister.Save(raftstate,rf.log.snapshot) } func(rf*Raft)InstallSnapshot(args*InstallSnapshotArgs,reply*InstallSnapshotReply){ //... rf.log.installSnapshot(args.LastIncludedIndex,args.LastIncludedTerm,args.Snapshot) rf.persistLocked() //... } 点击关注公众号,“技术干货”及时达! 阅读原文

上一篇:2022-12-09_远程办公灵感不断 雷孜LaCie助你创意不停 下一篇:2024-12-09_深度揭秘“快稳省”背后的数仓硬核技术

TAG标签:

18
网站开发网络凭借多年的网站建设经验,坚持以“帮助中小企业实现网络营销化”为宗旨,累计为4000多家客户提供品质建站服务,得到了客户的一致好评。如果您有网站建设网站改版域名注册主机空间手机网站建设网站备案等方面的需求...
请立即点击咨询我们或拨打咨询热线:13245491521 13245491521 ,我们会详细为你一一解答你心中的疑难。
项目经理在线

相关阅读 更多>>

猜您喜欢更多>>

我们已经准备好了,你呢?
2022我们与您携手共赢,为您的企业营销保驾护航!

不达标就退款

高性价比建站

免费网站代备案

1对1原创设计服务

7×24小时售后支持

 

全国免费咨询:

13245491521

业务咨询:13245491521 / 13245491521

节假值班:13245491521()

联系地址:

Copyright © 2019-2025      ICP备案:沪ICP备19027192号-6 法律顾问:律师XXX支持

在线
客服

技术在线服务时间:9:00-20:00

在网站开发,您对接的直接是技术员,而非客服传话!

电话
咨询

13245491521
7*24小时客服热线

13245491521
项目经理手机

微信
咨询

加微信获取报价