全国免费咨询:

13245491521

VR图标白色 VR图标黑色
X

中高端软件定制开发服务商

与我们取得联系

13245491521     13245491521

2024-01-11_从理论到实践:Queue 和 Simple Queue 的详细解析

您的位置:首页 >> 新闻 >> 行业资讯

从理论到实践:Queue 和 Simple Queue 的详细解析 点击关注公众号,”技术干货”及时达!我是 LEE,老李,一个在 IT 行业摸爬滚打 17 年的技术老兵。 这篇是WorkQueue项目介绍的第二篇文章,上一篇是《简约而不简单:WorkQueue 的轻量级高效之道》。在上一篇文章中,我们介绍了WorkQueue的基本原理和架构,但是我还没有介绍WorkQueue核心模块Queue和Simple Queue的实现原理。本篇文章就是要介绍这两个模块的实现原理,以及如何使用它们。 事件背景在日常开发工作中,经常会使用Queue的数据结构,比如在消息队列中,我们会使用Queue来存储消息,然后再从Queue中取出消息进行处理。在 Kubernetes 中,也有很多地方使用了Queue,比如Controller中的WorkQueue,Scheduler中的SchedulingQueue,Kubelet中的PodWorkers等等。在 Kubernetes 中,Queue的使用场景非常多,都是基于FIFO的队列,只是在具体的实现上有所差异。Queue在我们日常开发使用中也是一个非常常见的组件,甚至可以说是必不可少的组件。 如果从数据存储和使用角度中,可以将Queue中的数据分为两种方式: 「可以重复」:Queue中的数据可以重复,比如应用在不同时间产生的两条内容相同的日志。WorkQueue中使用了Simple Queue作为实现。「保证唯一」:Queue中的数据保证唯一,比如消息队列中的消息,每条消息和事件都是唯一的。WorkQueue中使用了Queue作为实现Queue 介绍Queue和Simple Queue的逻辑结构是一样的,都是基于FIFO的队列,只是在具体的实现上有所差异。Queue的实现是基于set和deque的,Simple Queue的实现是基于deque的。通俗的说:Queue不运行存储的数据有重复,而Simple Queue可以存储重复的数据。 1. Queue1.1 Queue 简介Queue是一个FIFO的队列,基于标准的Interface实现所有的功能接口,包括Add,Get,Len,GetWithBlock,Done,Stop,IsClosed。Queue使用set来保证数据的唯一性,deque用来按顺序存储数据。Queue实现非常简单。 // 队列方法接口 // Queue interface type Interface interface { Add(element any) error // 添加元素 Len() int // 队列长度 Get() (element any, err error) // 获取元素 GetWithBlock() (element any, err error) // 阻塞获取元素 Done(element any) // 完成处理 Stop() // 停止队列 IsClosed() bool // 队列是否关闭 } Queue在保证数据唯一上采用了processing和dirty是两个set,processing用来存储正在处理的数据,dirty用来存储等待理完成的数据。processing和dirty两个set的数据都是唯一的,Queue的deque中存储的数据是dirty+processing的数据。 Queue在数据处理的生命周期中还包含可以被自己定义的 Callback 方法,方便使用者干预或者介入数据处理的生命周期。Queue的 Callback 方法包括OnAdd,OnGet,OnDone。 // 队列的回调接口 // Callback interface type Callback interface { OnAdd(any) // 添加元素回调 OnGet(any) // 获取元素回调 OnDone(any) // 完成处理回调 } 1.2 Queue 实现Queue整体结构设计如下: queue-0.png1.3 Queue 使用举例当然 Queue 使用起来也非常简单,没有太多复杂的参数初始化过程。但是这里有「一个非常需要注意的地方」:不论使用Get还是GetWithBlock之后一定要记得使用Done方法,来标记这个数据已经处理完成,否则再往Queue添加相同的数据的时候,会返回ErrorQueueElementExist错误。 「代码举例」 package main import ( "fmt" "time" "github.com/shengyanli1982/workqueue" ) func main() { q := workqueue.NewQueue(nil) // create a queue go func() { for { element, err := q.Get() // get element from queue if err != nil { fmt.Println(err) return } fmt.Println("get element:", element) q.Done(element) // mark element as done, 'Done' is required after 'Get' } }() _ = q.Add("hello") // add element to queue _ = q.Add("world") time.Sleep(time.Second * 2) // wait for element to be executed q.Stop() } 如果你不想使用workqueue.NewQueue(nil)这个方式来创建队列,还有一个函数偷懒workqueue.DefaultQueue(),两者是等效的。 如果你想在Queue过程中使用Callback方法,这个使用的方法: // 实现 Callback 接口 type callback struct {} func (cb *callback) OnAdd(item any) {} func (cb *callback) OnGet(item any) {} func (cb *callback) OnDone(item any) {} // 创建 Config conf := NewQConfig() // 关联 Callback conf.WithCallback(&callback{}) // 创建 Queue 的时候传入这个 conf 对象,而不是在用 nil q := NewQueue(conf) 后续代码跟上面的代码举例中的内容一样。 2.4 Queue 代码解析「Add」 // 添加元素到队列 // Add element to queue func (q *Q) Add(element any) error { if q.IsClosed() { // 队列已经关闭,返回 ErrorQueueClosed 错误 return ErrorQueueClosed } if q.isElementMarked(element) { // 判断元素是否已经被标记,如果已经被标记,返回 ErrorQueueElementExist 错误 // 判断元素是否在 processing 和 dirty 中 return ErrorQueueElementExist } // 从 nodepool 中获取一个 node,node 是一个双向链表的节点,nodepool 是一个 sync.Pool n := q.nodepool.Get() n.SetData(element) q.cond.L.Lock() q.queue.Push(n) // 将 node 添加到 queue 中 q.cond.Signal() // 发送信号,通过 sync.Cond 通知等待的方法 q.cond.L.Unlock() q.prepare(element) // 将数据放入 dirty 中 q.config.cb.OnAdd(element) // 执行添加回调 return nil } 「Get」 // 从队列中获取一个元素, 如果队列为空,不阻塞等待 // Get an element from the queue. func (q *Q) Get() (element any, err error) { if q.IsClosed() { // 队列已经关闭,返回 ErrorQueueClosed 错误 return nil, ErrorQueueClosed } q.qlock.Lock() n := q.queue.Pop() // 从 queue 中获取一个 node q.qlock.Unlock() if n == nil { // 如果 node 为空,返回 ErrorQueueEmpty 错误, 这里不会阻塞等待 return nil, ErrorQueueEmpty } element = n.Data() // 获取 node 中的数据 q.todo(element) // 将数据从 dirty 删除,放入 processing 中 q.config.cb.OnGet(element) // 执行获取回调 q.nodepool.Put(n) // 将 node 放回 nodepool 中 return element, nil } 「GetWithBlock」 // 从队列中获取一个元素,如果队列为空,阻塞等待 // Get an element from the queue, if the queue is empty, block and wait. func (q *Q) GetWithBlock() (element any, err error) { if q.IsClosed() { return nil, ErrorQueueClosed } q.cond.L.Lock() for q.queue.Len() == 0 { // 如果 queue 为空,阻塞等待 q.cond.Wait() // 等待信号 } n := q.queue.Pop() q.cond.L.Unlock() if n == nil { return nil, ErrorQueueEmpty } element = n.Data() q.todo(element) q.config.cb.OnGet(element) q.nodepool.Put(n) return element, nil } 2. Simple Queue2.1 Simple Queue 简介Simple Queue跟Queue十分类似,代码也大量复用。Simple Queue是一个FIFO的队列,基于标准的Interface实现所有的功能接口,包括Add,Get,Len,GetWithBlock,Done,Stop,IsClosed。Simple Queue使用deque来存储数据,deque,但是没有set数据结构,也是说Simple Queue中的数据可以重复。 // 队列方法接口 // Queue interface type Interface interface { Add(element any) error // 添加元素 Len() int // 队列长度 Get() (element any, err error) // 获取元素 GetWithBlock() (element any, err error) // 阻塞获取元素 Done(element any) // 完成处理 Stop() // 停止队列 IsClosed() bool // 队列是否关闭 } Simple Queue在数据处理的生命周期中还包含可以被自己定义的 Callback 方法,方便使用者干预或者介入数据处理的生命周期。Simple Queue的 Callback 方法包括OnAdd,OnGet,OnDone。 // 队列的回调接口 // Callback interface type Callback interface { OnAdd(any) // 添加元素回调 OnGet(any) // 获取元素回调 OnDone(any) // 完成处理回调 } 2.2 Simple Queue 实现Simple Queue整体结构设计如下: queue-1.png2.3 Simple Queue 使用举例当然 Queue 使用起来也非常简单,没有太多复杂的初始化参数过程。但是这里有「一个非常需要注意的地方」:这里Get还是GetWithBlock可以不使用Done方法。Done方法,是一个空壳方法,没有任何实际的逻辑,只是为了保持Simple Queue和Queue的接口一致。Simple Queue中的数据可以重复,所以不需要使用Done方法来标记数据已经处理完成。 「代码举例」 package main import ( "fmt" "time" "github.com/shengyanli1982/workqueue" ) func main() { q := workqueue.NewSimpleQueue(nil) // create a queue go func() { for { element, err := q.Get() // get element from queue if err != nil { fmt.Println(err) return } fmt.Println("get element:", element) q.Done(element) // mark element as done, 'Done' is required after 'Get' } }() _ = q.Add("hello") // add element to queue _ = q.Add("world") time.Sleep(time.Second * 2) // wait for element to be executed q.Stop() } 如果你不想使用workqueue.NewSimpleQueue(nil)这个方式来创建队列,还有一个函数偷懒workqueue.DefaultSimpleQueue(),两者是等效的。 如果你想在Simple Queue过程中使用Callback方法,这个使用的方法: // 实现 Callback 接口 type callback struct {} func (cb *callback) OnAdd(item any) {} func (cb *callback) OnGet(item any) {} func (cb *callback) OnDone(item any) {} // 创建 Config conf := NewQConfig() // 关联 Callback conf.WithCallback(&callback{}) // 创建 Simple Queue 的时候传入这个 conf 对象,而不是在用 nil q := NewSimpleQueue(conf) 后续代码跟上面的代码举例中的内容一样。 2.4 Simple Queue 代码解析「Add」 // 添加元素到队列 // Add element to queue func (q *SimpleQ) Add(element any) error { if q.IsClosed() { // 队列已经关闭,返回 ErrorQueueClosed 错误 return ErrorQueueClosed } // 对比 Queue 没有操作 set 相关的部分 // 从 nodepool 中获取一个 node,node 是一个双向链表的节点,nodepool 是一个 sync.Pool n := q.nodepool.Get() n.SetData(element) q.cond.L.Lock() q.queue.Push(n) // 将 node 添加到 queue 中 q.cond.Signal() // 发送信号,通过 sync.Cond 通知等待的方法 q.cond.L.Unlock() // 对比 Queue 没有操作 set 相关的部分 q.config.cb.OnAdd(element) // 执行添加回调 return nil } 「Get」 // 从队列中获取一个元素, 如果队列为空,不阻塞等待 // Get an element from the queue. func (q *SimpleQ) Get() (element any, err error) { if q.IsClosed() { return nil, ErrorQueueClosed } q.qlock.Lock() n := q.queue.Pop() q.qlock.Unlock() if n == nil { // 队列为空 (queue is empty) return nil, ErrorQueueEmpty } element = n.Data() // 获取 node 中的数据 q.config.cb.OnGet(element) // 执行获取回调 q.nodepool.Put(n) // 将 node 放回 nodepool 中 // 对比 Queue 没有操作 set 相关的部分 return element, nil } 「GetWithBlock」 // 从队列中获取一个元素,如果队列为空,阻塞等待 // Get an element from the queue, if the queue is empty, block and wait. func (q *Q) GetWithBlock() (element any, err error) { if q.IsClosed() { return nil, ErrorQueueClosed } q.cond.L.Lock() for q.queue.Len() == 0 { // 如果 queue 为空,阻塞等待 q.cond.Wait() // 等待信号 } n := q.queue.Pop() q.cond.L.Unlock() if n == nil { return nil, ErrorQueueEmpty } element = n.Data() q.config.cb.OnGet(element) q.nodepool.Put(n) // 对比 Queue 没有操作 set 相关的部分 return element, nil } 总结谢谢你,到此你已经看完了两篇文章了。通过本篇文章向你介绍了Queue还是Simple Queue的定义、使用方法和实现原理。希望能够让你对Queue还是Simple Queue有一个清晰的认识。 Queue和Simple Queue作为WorkQueue的核心模块,它们的实现非常简单,也非常高效,可以满足大部分的使用场景。在实际的使用过程中,还是需要根据实际的业务场景来选择使用Queue还是Simple Queue。如果你的数据需要保证唯一性,那么你就需要使用Queue,如果你的数据可以重复,那么你就可以使用Simple Queue。 下一篇将讲围绕介绍Delaying Queue和Priority Queue模块,包括使用方法和代码讲解。这两个高级的 Queue 都是基于Queue和Simple Queue实现的,所以你对Queue和Simple Queue有了清晰的认识之后,对Delaying Queue和Priority Queue的理解就会非常容易了。 点击关注公众号,”技术干货”及时达! 阅读原文

上一篇:2020-04-22_马化腾为《产业区块链》作序:区块链全面拥抱产业互联网 下一篇:2025-08-20_月薪1W和10W的市场人区别!

TAG标签:

17
网站开发网络凭借多年的网站建设经验,坚持以“帮助中小企业实现网络营销化”为宗旨,累计为4000多家客户提供品质建站服务,得到了客户的一致好评。如果您有网站建设网站改版域名注册主机空间手机网站建设网站备案等方面的需求...
请立即点击咨询我们或拨打咨询热线:13245491521 13245491521 ,我们会详细为你一一解答你心中的疑难。
项目经理在线

相关阅读 更多>>

猜您喜欢更多>>

我们已经准备好了,你呢?
2022我们与您携手共赢,为您的企业营销保驾护航!

不达标就退款

高性价比建站

免费网站代备案

1对1原创设计服务

7×24小时售后支持

 

全国免费咨询:

13245491521

业务咨询:13245491521 / 13245491521

节假值班:13245491521()

联系地址:

Copyright © 2019-2025      ICP备案:沪ICP备19027192号-6 法律顾问:律师XXX支持

在线
客服

技术在线服务时间:9:00-20:00

在网站开发,您对接的直接是技术员,而非客服传话!

电话
咨询

13245491521
7*24小时客服热线

13245491521
项目经理手机

微信
咨询

加微信获取报价