从理论到实践:Queue 和 Simple Queue 的详细解析
点击关注公众号,”技术干货”及时达!我是 LEE,老李,一个在 IT 行业摸爬滚打 17 年的技术老兵。
这篇是WorkQueue项目介绍的第二篇文章,上一篇是《简约而不简单:WorkQueue 的轻量级高效之道》。在上一篇文章中,我们介绍了WorkQueue的基本原理和架构,但是我还没有介绍WorkQueue核心模块Queue和Simple Queue的实现原理。本篇文章就是要介绍这两个模块的实现原理,以及如何使用它们。
事件背景在日常开发工作中,经常会使用Queue的数据结构,比如在消息队列中,我们会使用Queue来存储消息,然后再从Queue中取出消息进行处理。在 Kubernetes 中,也有很多地方使用了Queue,比如Controller中的WorkQueue,Scheduler中的SchedulingQueue,Kubelet中的PodWorkers等等。在 Kubernetes 中,Queue的使用场景非常多,都是基于FIFO的队列,只是在具体的实现上有所差异。Queue在我们日常开发使用中也是一个非常常见的组件,甚至可以说是必不可少的组件。
如果从数据存储和使用角度中,可以将Queue中的数据分为两种方式:
「可以重复」:Queue中的数据可以重复,比如应用在不同时间产生的两条内容相同的日志。WorkQueue中使用了Simple Queue作为实现。「保证唯一」:Queue中的数据保证唯一,比如消息队列中的消息,每条消息和事件都是唯一的。WorkQueue中使用了Queue作为实现Queue 介绍Queue和Simple Queue的逻辑结构是一样的,都是基于FIFO的队列,只是在具体的实现上有所差异。Queue的实现是基于set和deque的,Simple Queue的实现是基于deque的。通俗的说:Queue不运行存储的数据有重复,而Simple Queue可以存储重复的数据。
1. Queue1.1 Queue 简介Queue是一个FIFO的队列,基于标准的Interface实现所有的功能接口,包括Add,Get,Len,GetWithBlock,Done,Stop,IsClosed。Queue使用set来保证数据的唯一性,deque用来按顺序存储数据。Queue实现非常简单。
// 队列方法接口
// Queue interface
type Interface interface {
Add(element any) error // 添加元素
Len() int // 队列长度
Get() (element any, err error) // 获取元素
GetWithBlock() (element any, err error) // 阻塞获取元素
Done(element any) // 完成处理
Stop() // 停止队列
IsClosed() bool // 队列是否关闭
}
Queue在保证数据唯一上采用了processing和dirty是两个set,processing用来存储正在处理的数据,dirty用来存储等待理完成的数据。processing和dirty两个set的数据都是唯一的,Queue的deque中存储的数据是dirty+processing的数据。
Queue在数据处理的生命周期中还包含可以被自己定义的 Callback 方法,方便使用者干预或者介入数据处理的生命周期。Queue的 Callback 方法包括OnAdd,OnGet,OnDone。
// 队列的回调接口
// Callback interface
type Callback interface {
OnAdd(any) // 添加元素回调
OnGet(any) // 获取元素回调
OnDone(any) // 完成处理回调
}
1.2 Queue 实现Queue整体结构设计如下:
queue-0.png1.3 Queue 使用举例当然 Queue 使用起来也非常简单,没有太多复杂的参数初始化过程。但是这里有「一个非常需要注意的地方」:不论使用Get还是GetWithBlock之后一定要记得使用Done方法,来标记这个数据已经处理完成,否则再往Queue添加相同的数据的时候,会返回ErrorQueueElementExist错误。
「代码举例」
package main
import (
"fmt"
"time"
"github.com/shengyanli1982/workqueue"
)
func main() {
q := workqueue.NewQueue(nil) // create a queue
go func() {
for {
element, err := q.Get() // get element from queue
if err != nil {
fmt.Println(err)
return
}
fmt.Println("get element:", element)
q.Done(element) // mark element as done, 'Done' is required after 'Get'
}
}()
_ = q.Add("hello") // add element to queue
_ = q.Add("world")
time.Sleep(time.Second * 2) // wait for element to be executed
q.Stop()
}
如果你不想使用workqueue.NewQueue(nil)这个方式来创建队列,还有一个函数偷懒workqueue.DefaultQueue(),两者是等效的。
如果你想在Queue过程中使用Callback方法,这个使用的方法:
// 实现 Callback 接口
type callback struct {}
func (cb *callback) OnAdd(item any) {}
func (cb *callback) OnGet(item any) {}
func (cb *callback) OnDone(item any) {}
// 创建 Config
conf := NewQConfig()
// 关联 Callback
conf.WithCallback(&callback{})
// 创建 Queue 的时候传入这个 conf 对象,而不是在用 nil
q := NewQueue(conf)
后续代码跟上面的代码举例中的内容一样。
2.4 Queue 代码解析「Add」
// 添加元素到队列
// Add element to queue
func (q *Q) Add(element any) error {
if q.IsClosed() {
// 队列已经关闭,返回 ErrorQueueClosed 错误
return ErrorQueueClosed
}
if q.isElementMarked(element) {
// 判断元素是否已经被标记,如果已经被标记,返回 ErrorQueueElementExist 错误
// 判断元素是否在 processing 和 dirty 中
return ErrorQueueElementExist
}
// 从 nodepool 中获取一个 node,node 是一个双向链表的节点,nodepool 是一个 sync.Pool
n := q.nodepool.Get()
n.SetData(element)
q.cond.L.Lock()
q.queue.Push(n) // 将 node 添加到 queue 中
q.cond.Signal() // 发送信号,通过 sync.Cond 通知等待的方法
q.cond.L.Unlock()
q.prepare(element) // 将数据放入 dirty 中
q.config.cb.OnAdd(element) // 执行添加回调
return nil
}
「Get」
// 从队列中获取一个元素, 如果队列为空,不阻塞等待
// Get an element from the queue.
func (q *Q) Get() (element any, err error) {
if q.IsClosed() {
// 队列已经关闭,返回 ErrorQueueClosed 错误
return nil, ErrorQueueClosed
}
q.qlock.Lock()
n := q.queue.Pop() // 从 queue 中获取一个 node
q.qlock.Unlock()
if n == nil {
// 如果 node 为空,返回 ErrorQueueEmpty 错误, 这里不会阻塞等待
return nil, ErrorQueueEmpty
}
element = n.Data() // 获取 node 中的数据
q.todo(element) // 将数据从 dirty 删除,放入 processing 中
q.config.cb.OnGet(element) // 执行获取回调
q.nodepool.Put(n) // 将 node 放回 nodepool 中
return element, nil
}
「GetWithBlock」
// 从队列中获取一个元素,如果队列为空,阻塞等待
// Get an element from the queue, if the queue is empty, block and wait.
func (q *Q) GetWithBlock() (element any, err error) {
if q.IsClosed() {
return nil, ErrorQueueClosed
}
q.cond.L.Lock()
for q.queue.Len() == 0 { // 如果 queue 为空,阻塞等待
q.cond.Wait() // 等待信号
}
n := q.queue.Pop()
q.cond.L.Unlock()
if n == nil {
return nil, ErrorQueueEmpty
}
element = n.Data()
q.todo(element)
q.config.cb.OnGet(element)
q.nodepool.Put(n)
return element, nil
}
2. Simple Queue2.1 Simple Queue 简介Simple Queue跟Queue十分类似,代码也大量复用。Simple Queue是一个FIFO的队列,基于标准的Interface实现所有的功能接口,包括Add,Get,Len,GetWithBlock,Done,Stop,IsClosed。Simple Queue使用deque来存储数据,deque,但是没有set数据结构,也是说Simple Queue中的数据可以重复。
// 队列方法接口
// Queue interface
type Interface interface {
Add(element any) error // 添加元素
Len() int // 队列长度
Get() (element any, err error) // 获取元素
GetWithBlock() (element any, err error) // 阻塞获取元素
Done(element any) // 完成处理
Stop() // 停止队列
IsClosed() bool // 队列是否关闭
}
Simple Queue在数据处理的生命周期中还包含可以被自己定义的 Callback 方法,方便使用者干预或者介入数据处理的生命周期。Simple Queue的 Callback 方法包括OnAdd,OnGet,OnDone。
// 队列的回调接口
// Callback interface
type Callback interface {
OnAdd(any) // 添加元素回调
OnGet(any) // 获取元素回调
OnDone(any) // 完成处理回调
}
2.2 Simple Queue 实现Simple Queue整体结构设计如下:
queue-1.png2.3 Simple Queue 使用举例当然 Queue 使用起来也非常简单,没有太多复杂的初始化参数过程。但是这里有「一个非常需要注意的地方」:这里Get还是GetWithBlock可以不使用Done方法。Done方法,是一个空壳方法,没有任何实际的逻辑,只是为了保持Simple Queue和Queue的接口一致。Simple Queue中的数据可以重复,所以不需要使用Done方法来标记数据已经处理完成。
「代码举例」
package main
import (
"fmt"
"time"
"github.com/shengyanli1982/workqueue"
)
func main() {
q := workqueue.NewSimpleQueue(nil) // create a queue
go func() {
for {
element, err := q.Get() // get element from queue
if err != nil {
fmt.Println(err)
return
}
fmt.Println("get element:", element)
q.Done(element) // mark element as done, 'Done' is required after 'Get'
}
}()
_ = q.Add("hello") // add element to queue
_ = q.Add("world")
time.Sleep(time.Second * 2) // wait for element to be executed
q.Stop()
}
如果你不想使用workqueue.NewSimpleQueue(nil)这个方式来创建队列,还有一个函数偷懒workqueue.DefaultSimpleQueue(),两者是等效的。
如果你想在Simple Queue过程中使用Callback方法,这个使用的方法:
// 实现 Callback 接口
type callback struct {}
func (cb *callback) OnAdd(item any) {}
func (cb *callback) OnGet(item any) {}
func (cb *callback) OnDone(item any) {}
// 创建 Config
conf := NewQConfig()
// 关联 Callback
conf.WithCallback(&callback{})
// 创建 Simple Queue 的时候传入这个 conf 对象,而不是在用 nil
q := NewSimpleQueue(conf)
后续代码跟上面的代码举例中的内容一样。
2.4 Simple Queue 代码解析「Add」
// 添加元素到队列
// Add element to queue
func (q *SimpleQ) Add(element any) error {
if q.IsClosed() {
// 队列已经关闭,返回 ErrorQueueClosed 错误
return ErrorQueueClosed
}
// 对比 Queue 没有操作 set 相关的部分
// 从 nodepool 中获取一个 node,node 是一个双向链表的节点,nodepool 是一个 sync.Pool
n := q.nodepool.Get()
n.SetData(element)
q.cond.L.Lock()
q.queue.Push(n) // 将 node 添加到 queue 中
q.cond.Signal() // 发送信号,通过 sync.Cond 通知等待的方法
q.cond.L.Unlock()
// 对比 Queue 没有操作 set 相关的部分
q.config.cb.OnAdd(element) // 执行添加回调
return nil
}
「Get」
// 从队列中获取一个元素, 如果队列为空,不阻塞等待
// Get an element from the queue.
func (q *SimpleQ) Get() (element any, err error) {
if q.IsClosed() {
return nil, ErrorQueueClosed
}
q.qlock.Lock()
n := q.queue.Pop()
q.qlock.Unlock()
if n == nil { // 队列为空 (queue is empty)
return nil, ErrorQueueEmpty
}
element = n.Data() // 获取 node 中的数据
q.config.cb.OnGet(element) // 执行获取回调
q.nodepool.Put(n) // 将 node 放回 nodepool 中
// 对比 Queue 没有操作 set 相关的部分
return element, nil
}
「GetWithBlock」
// 从队列中获取一个元素,如果队列为空,阻塞等待
// Get an element from the queue, if the queue is empty, block and wait.
func (q *Q) GetWithBlock() (element any, err error) {
if q.IsClosed() {
return nil, ErrorQueueClosed
}
q.cond.L.Lock()
for q.queue.Len() == 0 { // 如果 queue 为空,阻塞等待
q.cond.Wait() // 等待信号
}
n := q.queue.Pop()
q.cond.L.Unlock()
if n == nil {
return nil, ErrorQueueEmpty
}
element = n.Data()
q.config.cb.OnGet(element)
q.nodepool.Put(n)
// 对比 Queue 没有操作 set 相关的部分
return element, nil
}
总结谢谢你,到此你已经看完了两篇文章了。通过本篇文章向你介绍了Queue还是Simple Queue的定义、使用方法和实现原理。希望能够让你对Queue还是Simple Queue有一个清晰的认识。
Queue和Simple Queue作为WorkQueue的核心模块,它们的实现非常简单,也非常高效,可以满足大部分的使用场景。在实际的使用过程中,还是需要根据实际的业务场景来选择使用Queue还是Simple Queue。如果你的数据需要保证唯一性,那么你就需要使用Queue,如果你的数据可以重复,那么你就可以使用Simple Queue。
下一篇将讲围绕介绍Delaying Queue和Priority Queue模块,包括使用方法和代码讲解。这两个高级的 Queue 都是基于Queue和Simple Queue实现的,所以你对Queue和Simple Queue有了清晰的认识之后,对Delaying Queue和Priority Queue的理解就会非常容易了。
点击关注公众号,”技术干货”及时达!
阅读原文
网站开发网络凭借多年的网站建设经验,坚持以“帮助中小企业实现网络营销化”为宗旨,累计为4000多家客户提供品质建站服务,得到了客户的一致好评。如果您有网站建设、网站改版、域名注册、主机空间、手机网站建设、网站备案等方面的需求...
请立即点击咨询我们或拨打咨询热线:13245491521 13245491521 ,我们会详细为你一一解答你心中的疑难。 项目经理在线