解码队列精髓:Delaying Queue 与 Priority Queue 的深度剖析
点击关注公众号,”技术干货”及时达!我是 LEE,老李,一个在 IT 行业摸爬滚打 17 年的技术老兵。
这篇是 WorkQueue 项目介绍的第三篇文章,上一篇是《从理论到实践:Queue 和 Simple Queue 的详细解析》。在上一篇文章中,我向大家介绍了 WorkQueue 核心模块 Queue 和 Simple Queue 的实现原理,以及如何使用它们。本篇文章将继续介绍 WorkQueue 中高级模块,Delaying Queue 和 Priority Queue 模块的实现原理和使用方法。
Delaying Queue 和 Priority Queue 在 WorkQueue 中都是从 Queue 派生的,所以它们都具备 Queue 的所有功能,同时还有自己的特殊功能。Delaying Queue 实现了延迟队列的功能,Priority Queue 实现了优先级队列的功能。
如果你还没有看过 WorkQueue 的前两篇文章,建议先看一下,方便你对后续内容的理解:
《简约而不简单:WorkQueue 的轻量级高效之道》
https://juejin.cn/post/7317870445322223652
《从理论到实践:Queue 和 Simple Queue 的详细解析》https://juejin.cn/post/7319702157459030016事件背景在实际的业务场景中,我们经常会遇到这样的需求:「需要延迟一段时间后再执行某个任务或者需要按照优先级来执行任务」。
「就举两个小例子:」
任务执行失败后,需要延迟一段时间后再重试一段时间内有更重要的任务需要优先执行,而不是按照先进先出的顺序执行作为研发的同学,我相信此时大脑里的画面感已经出现了。这些需求在实际的业务场景中是非常常见的,我相信大家也都有自己的解决方案,但是这些方案都有一个共同的特点:「都是在业务代码中实现的」。
这样做有什么问题呢?
业务代码中的实现不够优雅,通常只适用于特定场景,缺乏通用性。这些实现往往重复,每个场景都需单独实现,导致代码冗余。实现的不可控性高,直接嵌入业务代码中,一旦出错,会影响整个业务流程。那么,有没有一种通用的解决方案呢?
答案是肯定的,这就是 WorkQueue 中的 Delaying Queue 和 Priority Queue 模块。
为什么要使用 Quadruple HeapQuadruple Heap 中文名叫“四叉堆”,它是一种特殊的 Heap。
Quadruple Heap 是一种高效的数据结构,主要用于管理一组数据以快速执行特定操作,如找到最小或最大元素。它是堆(一种特殊的树形数据结构)的一种形式,但与常见的二叉堆(每个节点最多有两个子节点)不同,四叉堆的每个节点最多有四个子节点。
想象你在管理一个大型数据库,需要迅速找到最高或最低的值。四叉堆就像是一个组织良好的档案柜,其中最顶部的抽屉(根节点)总是包含你需要的最值(最小或最大),而下面的每个抽屉(子节点)则包含稍大或稍小的值。
「四叉堆的关键特性是它的结构和维护规则」:
「结构性」:它是一个完全四叉树,这意味着除了最底层,其他每层都被节点充满,且最底层的节点尽可能从左到右填充。「堆性质」:在最小四叉堆中,每个节点的值都小于或等于其子节点的值;在最大四叉堆中,每个节点的值都大于或等于其子节点的值。四叉堆在处理优先队列、调度任务和实现某些图算法(如 Dijkstra 的最短路径算法)时特别有用。由于它的高度比二叉堆更低,某些操作(如插入、删除最大/最小元素)可能更快,尤其是在处理大量数据时。
「(★)大叔白话」:
可以想象成一棵特别的树。在这棵树里,每个节点最多有四个孩子。这跟我们常见的“二叉树”不一样,因为二叉树的每个节点最多只有两个孩子。
想象一下你正在组织一场比赛,需要快速找出最好的选手。在四叉堆中,每个节点(比如每个参赛选手)都必须比它的孩子节点(比如它的对手)更好或者至少一样好。这样,最顶部的节点,也就是树的根部,总是最好的那个。
四叉堆的特点是,它能让我们快速进行几个操作:找出最好的元素、添加新元素、移除元素。这就像在比赛中快速确定谁是最佳选手,或者快速让新选手加入和离开比赛。因为每个节点最多有四个孩子,这使得树的高度比较低,所以我们可以更快地在树中上下移动,完成这些操作。
Queue 介绍Delaying Queue 和 Priority Queue 都是要依赖任务执行顺序来实现的,所以他们都有一个核心模块:Heap。
传统的 Heap 是一个二叉树,它的每个节点都有一个权重值,这个权重值决定了节点的位置。Heap 有两种类型:Min Heap 和 Max Heap,它们的区别在于:Min Heap 中的节点权重值越小,节点越靠近根节点;Max Heap 中的节点权重值越大,节点越靠近根节点。
而 WorkQueue 在综合考虑了性能和易用性后,选择了 Min Quadruple Heap 作为 Heap 的实现。
Delaying Queue 利用 Min Quadruple Heap 来实现投递元素按照时间即将到期的顺序来执行。Priority Queue 利用 Min Quadruple Heap 来实现投递元素按照优先级的顺序来执行。1. Delaying Queue1.1 Delaying Queue 介绍Delaying Queue 是一个延迟队列,它的特点是:「投递的元素会在指定的时间到期后才会被执行」。由于 Delaying Queue 是从 Queue 派生的,所以它具备 Queue 的所有功能,也就是说 Delaying Queue 继承 Queue 所有接口和 Callback 方法。
「新增接口」:
AddAfter,它的作用是添加指定的时间后执行的元素。//DelayingInterface是Queue方法接口的延迟版本
//DelayingInterfaceisthedelayedversionoftheQueuemethodinterface
typeDelayingInterfaceinterface{
Interface
//AddAfter添加一个元素,延迟一段时间后再执行
//Addanelement,executeitafteradelay
AddAfter(elementany,delaytime.Duration)error
}
Delaying Queue 在数据处理的生命周期中还包含可以被自己定义的 Callback 方法,方便使用者干预或者介入数据处理的生命周期。
「新增 Callback 方法」:
OnAddAfter,它的作用是在元素被添加到 Delaying Queue 之后执行。//DelayingCallback是Queue的回调接口的延迟版本
//DelayingCallbackisthedelayedversionoftheQueuecallbackinterface
typeDelayingCallbackinterface{
Callback
//OnAddAfter添加元素后的回调
//Callbackafteraddingelement
OnAddAfter(any,time.Duration)
}
1.2 Delaying Queue 实现原理Delaying Queue 整体结构设计如下:
1.3 Delaying Queue 使用举例当然 Delaying Queue 使用起来也非常简单,没有太多复杂的参数初始化过程。它的注意事项和 Queue 是一样的,就是不论使用 Get 还是 GetWithBlock 之后一定要记得使用 Done 方法,来标记这个数据已经处理完成,否则再往 Delaying Queue 添加相同的数据的时候,会返回 ErrorQueueElementExist 错误。如果使用 Simple Queue,则不需要使用 Done 方法。
「代码举例」
packagemain
import(
"fmt"
"time"
"github.com/shengyanli1982/workqueue"
)
funcmain(){
q:=workqueue.NewDelayingQueue(nil)//createaqueue
gofunc(){
for{
element,err:=q.Get()//getelementfromqueue
iferr!=nil{
fmt.Println(err)
return
}
fmt.Println("getelement:",element)
q.Done(element)//markelementasdone,'Done'isrequiredafter'Get'
}
}()
_=q.Add("hello")//addelementtoqueue,immediatelyexecute
_=q.Add("world")
_=q.AddAfter("delay",time.Second)//addelementtoqueue,executeafter1seconds
time.Sleep(time.Second*2)//waitforelementtobeexecuted
q.Stop()
}
如果你不想使用 workqueue.NewDelayingQueue(nil) 这个方式来创建队列,还有一个函数偷懒 workqueue.DefaultDelayingQueue(),两者是等效的。
?「注意」:创建一个 Delaying Queue 的实例,是允许绑定自定义的 Queue 的,只要你的 Queue 实现了 Interface 接口,就可以绑定到 Delaying Queue 上。
?自定义绑定 Queue 可以参考 NewDelayingQueue 函数的实现:
//创建一个DelayingQueue实例
//CreateanewDelayingQueueconfig
funcNewDelayingQueue(conf*DelayingQConfig)*DelayingQ{
conf=isDelayingQConfigValid(conf)
conf.QConfig.cb=conf.cb
returnNewDelayingQueueWithCustomQueue(conf,NewQueue(&conf.QConfig))
}
如果你想在 Queue 过程中使用 Callback 方法,这个使用的方法:
//实现Callback接口
typecallbackstruct{}
func(cb*callback)OnAdd(itemany){}
func(cb*callback)OnGet(itemany){}
func(cb*callback)OnDone(itemany){}
func(cb*callback)OnAddAfter(itemany,ttime.Duration){}
//创建一个配置对象
conf:=NewDelayingQConfig()
//设置Callback
conf.WithCallback(&callback{})
//创建Queue的时候传入这个conf对象,而不是在用nil
q:=NewDelayingQueue(conf)
后续代码跟上面的 代码举例 中的内容一样。
1.4 Delaying Queue 代码解析「AddAfter」
//AddAfter将元素添加到队列中,在延迟一段时间后处理
//Addanelementtothequeueandprocessitafteraspecifieddelay
func(q*DelayingQ)AddAfter(elementany,delaytime.Duration)error{
ifq.IsClosed(){
returnErrorQueueClosed
}
//如果延迟时间小于等于0,直接添加到队列中,立即执行
ifdelay=0{
returnq.Add(element)
}
ele:=q.elepool.Get()
ele.SetData(element)
ele.SetValue(time.Now().Add(delay).UnixMilli())//设置元素的过期时间
q.lock.Lock()
q.waiting.Push(ele)//添加到等待队列中,等待被执行
q.lock.Unlock()
q.config.cb.OnAddAfter(element,delay)//执行回调
returnnil
}
2. Priority Queue2.1 Priority Queue 介绍Priority Queue 是一个优先级队列,它的特点是:「投递的元素会按照优先级的顺序来执行」。由于 Priority Queue 是从 Queue 派生的,所以它具备 Queue 的所有功能,也就是说 Priority Queue 继承 Queue 所有接口和 Callback 方法。
?在 Priority Queue 中,元素的顺序是通过元素的权重值来决定的,权重值越小,优先级越高(「权重值如果为:0,这个元素将立即执行」)。Priority Queue 根据元素的权重值来排序,但是排序过程中等待的时间是有一个固定长度的,也就是说 Priority Queue 保证通过 AddWeight 方法添加的元素在一段时间内是有顺序的。
这样做的目的是为了保证在 Priority Queue 中的元素都有机会被执行,而不是只有优先级最高的元素被执行。
「默认排序窗口时间:」 500 毫秒,这个值可以通过 PriorityQConfig 的 WithWindow 方法来设置。
?「新增接口」:
AddWeight,它的作用是添加指定的优先级执行元素。//优先级队列方法接口
//Priorityqueueinterface
typePriorityInterfaceinterface{
Interface
//AddWeight添加一个元素,指定权重,并在一段时间内排序
//Addanelementwithspecifiedweightandsortitwithinaperiodoftime
AddWeight(elementany,weightint)error
}
Priority Queue 在数据处理的生命周期中还包含可以被自己定义的 Callback 方法,方便使用者干预或者介入数据处理的生命周期。
「新增 Callback 方法」:
OnAddWeight,它的作用是在元素被添加到 Priority Queue 之后执行。//优先级队列的回调接口
//Priorityqueuecallbackinterface
typePriorityCallbackinterface{
Callback
//OnAddWeight添加元素后的回调
//Callbackafteraddinganelement
OnAddWeight(elementany,weightint)
}
2.2 Priority Queue 实现原理Priority Queue 整体结构设计如下:
2.3 Priority Queue 使用举例当然 Priority Queue 使用起来也非常简单,没有太多复杂的参数初始化过程,只有一天要注意:Priority Queue 只保证排序窗口内的元素是有序的,也就是说在排序窗口外的元素是无序的,不同排序窗口内的元素也是无序的。
还有就是和 Queue 是一样的,就是不论使用 Get 还是 GetWithBlock 之后一定要记得使用 Done 方法,来标记这个数据已经处理完成,否则再往 Priority Queue 添加相同的数据的时候,会返回 ErrorQueueElementExist 错误。如果使用 Simple Queue,则不需要使用 Done 方法。
「代码举例」
packagemain
import(
"fmt"
"time"
"github.com/shengyanli1982/workqueue"
)
funcmain(){
conf:=workqueue.NewPriorityQConfig().WithWindow(time.Second)//设置排序窗口时间为1秒
q:=workqueue.NewPriorityQueue(conf)//createaqueue
gofunc(){
for{
element,err:=q.Get()//getelementfromqueue
iferr!=nil{
fmt.Println(err)
return
}
fmt.Println("getelement:",element)
q.Done(element)//markelementasdone,'Done'isrequiredafter'Get'
}
}()
_=q.Add("hello")//addelementtoqueue,immediatelyexecute
_=q.Add("world")
_=q.AddWeight("delay",10)//addelementwithweightis10toqueue,executeafter500ms(sortwindow)
time.Sleep(time.Second*2)//waitforelementtobeexecuted
q.Stop()
}
如果你不想使用 workqueue.NewPriorityQueue(nil) 这个方式来创建队列,还有一个函数偷懒 workqueue.DefaultPriorityQueue(),两者是等效的。
?「注意」:创建一个 Priority Queue 的实例,是允许绑定自定义的 Queue 的,只要你的 Queue 实现了 Interface 接口,就可以绑定到 Priority Queue 上。
?自定义绑定 Queue 可以参考 NewPriorityQueue 函数的实现:
//创建一个PriorityQueue实例
//CreateanewPriorityQueueconfig
funcNewPriorityQueue(conf*PriorityQConfig)*PriorityQ{
conf=isPriorityQConfigValid(conf)
conf.QConfig.cb=conf.cb
returnNewPriorityQueueWithCustomQueue(conf,NewQueue(&conf.QConfig))
}
2.4 Priority Queue 代码解析「AddWeight」
//AddWeight添加一个元素,指定权重,并在一段时间内排序
//Addanelement,addituseweightandsortitinaperiodoftime
func(q*PriorityQ)AddWeight(elementany,weightint)error{
ifq.IsClosed(){
returnErrorQueueClosed
}
ifweight=0{
returnq.Add(element)//权重值小于等于0,直接添加到队列中,立即执行
}
ele:=q.elepool.Get()
ele.SetData(element)
ele.SetValue(int64(weight))//设置元素的权重值
q.lock.Lock()
q.waiting.Push(ele)//添加到等待队列中, 等待被执行。在一段时间内,会根据权重值进行排序。
q.lock.Unlock()
q.config.cb.OnAddWeight(element,weight)//执行回调
returnnil
}
总结谢谢你,到此你已经看完了三篇文章了。通过本篇文章向你介绍了 Delaying Queue 和 Priority Queue 的定义、使用方法和实现原理。希望能够让你对 Delaying Queue 和 Priority Queue 有一个清晰的认识。
Delaying Queue 和 Priority Queue 都是 WorkQueue 中的核心模块,它们都是从 Queue 派生的,所以它们都具备 Queue 的所有功能,同时还有自己的特殊功能。Delaying Queue 实现了延迟队列的功能,Priority Queue 实现了优先级队列的功能。
当然 Delaying Queue 和 Priority Queue 在创建实例的时候,到底选用 Queue 还是 Simple Queue,请根据实际业务情况来决定。
下一篇将是 WorkQueue 这个项目系列的 「最后一篇文章」,我将向你介绍 WorkQueue 中的 RateLimiting Queue 模块的实现原理和使用方法。
点击关注公众号,”技术干货”及时达!
阅读原文
网站开发网络凭借多年的网站建设经验,坚持以“帮助中小企业实现网络营销化”为宗旨,累计为4000多家客户提供品质建站服务,得到了客户的一致好评。如果您有网站建设、网站改版、域名注册、主机空间、手机网站建设、网站备案等方面的需求...
请立即点击咨询我们或拨打咨询热线:13245491521 13245491521 ,我们会详细为你一一解答你心中的疑难。 项目经理在线