从零开始,探索一次使用 Go 打造通用连接池的经历
点击关注公众号,“技术干货”及时达!我是 LEE,老李,一个在 IT 行业摸爬滚打 17 年的技术老兵。
在开始今天的话题之前,我想先从这段代码开始今天的内容分享。在日常的开发过程中,作为后端的小伙伴,肯定避免不了连接 Redis、MySQL、MongoDB 等数据库,或者调用远程服务的场景。在我们调用这些服务的时候,必定会使用它们的客户端 SDK。在使用这些 SDK 的过程中,有一个功能大家一定用过,可能平常确实没有注意到,那就是「连接池」。
「例如」:在使用 MySQL 数据库时,假设我们使用 github.com/go-sql-driver/mysql 这个库。在初始化连接后,我们会得到一个 DB 对象,这个对象就是一个连接池。在使用这个库时,我们会调用 DB 对象的 Query、Exec 等方法,这些方法会从连接池中获取一个连接,然后执行 SQL 语句,最后再将连接放回连接池。
「代码举例」
funcexecSQL()error{
//打开数据库
db,err:=sql.Open("mysql","user:password@/dbname")
iferr!=nil{
returnerr
}
//关闭数据库
deferdb.Close()
//执行SQL语句
db.Exec("INSERTINTOuser(name,age)VALUES(?,?)","老李",18)
returnnil
当然,这些优秀的客户端 SDK 已经帮助我们实现了连接池的功能,我们只需要调用它们提供的方法就可以了。
但是,如果我们要在自己的业务中实现部分连接管理,尤其是像 TCP/UDP这样的连接,我们就需要自己实现连接池了。另外,一些需要保持活动状态的连接,例如会话连接,也是需要的。虽然这个需求不是非常常见,但在一些特殊的场景下,每个项目都要实现一次,不仅浪费时间,而且不利于维护。
所以,我们是否可以有一个轻量级的通用连接池,适用于各种场景呢?这样可以减少重复造轮子的时间,提高开发效率。那么,如何使用 Golang 实现一个通用的连接池的想法就在我心中产生了。
事件背景有一段时间没有更新文章,主要是忙着改进自己的连接池模型。早在 2023 年的业务开发中,我发现很多业务代码中都有连接池的实现,但每个业务代码中的连接池实现都不一样。有的是 TCP 连接池,有的是 UDP 连接池,有的是数据库连接池,有的是 Redis 连接池,有的是 HTTP 连接池等等。这样的实现方式不仅浪费了很多时间,而且不利于维护。
各个小组的开发方式五花八门,实现方式可谓是用尽了奇门遁甲。有的使用sync.Pool,有的使用channel,有的使用sync.Map,有的使用map等等。看完代码后感到晕头转向,更可怕的是,每种实现方式都有自己的特点。有的支持连接池的最大连接数,有的支持连接的最大空闲时间,有的支持连接的最大存活时间,有的支持连接的最大空闲连接数等等。即便如此,最终的实现效果和性能也参差不齐。
在前段时间,我也在自己的项目中需要一个连接池。于是,我开始了自己的通用连接池实现。
痛点分析在之前提到的代码中,我们使用 github.com/go-sql-driver/mysql 这个库实现了一个 INSERT 的 SQL 语句。其中的连接池是由库自己实现的,我们只需要调用库提供的方法就可以了。但是,如果我们要在自己的业务中实现部分连接管理,就不可能这样使用了。
我的第一个想法是能否像 Cache 一样实现一个通用的连接池,使用 Get 和 Set 方法来获取和放回连接,而将其他功能交给连接池来实现。
「想要实现的效果」
//创建一个连接池
pool:=NewPool(10,30,10*time.Second,func()(interface{},error){
//创建一个连接
returnnet.Dial("tcp","192.168.0.1:80")
})
deferpool.Close()
//从连接池中获取一个连接
conn,err:=pool.Get()
//使用连接
if_,err:=conn.Write([]byte("hello"));err!=nil{
//发生错误,将连接关闭
conn.Close()
return
}
//将连接放回连接池
pool.Put(conn)
看上面的代码是不是很简单?只需要实现一个 NewPool 函数,传入最大连接数、最大空闲连接数、最大空闲时间和创建连接的函数,就可以实现一个通用的连接池了。然后像 Cache 一样,调用 Get 和 Put 方法就可以了。
Get 方法从连接池中获取一个连接,如果连接池中没有连接,就会调用创建连接的函数创建一个连接。Put 方法将连接放回连接池,如果连接池已满,就会关闭连接。
同时,这个连接池在工作的时候,可以根据我提供的探活函数定时检查连接的状态。如果连接已经关闭,要能关闭连接并创建一个新的连接。
想法很美好,但是实现起来确实各种打脸。底层有大量细节和问题需要考虑,曾经我也是一脸懵逼,不知道从何下手。
「曾经的探索经历」
#方案优点缺点放弃原因1map + lock简单易用,类似于 Cache,可以轻松使用 Get 和 Put 存储和获取连接,然后使用 Release 释放连接。容易遍历全部对象。需要对 map 存储的对象进行生命周期管理,同时在 Get 和 Put 操作时,需要保证读取的对象是唯一独立的,每次获取的连接不能是同一个连接。需要编写大量连接管理代码,成本高,复杂度高,容易出现 Bug。2channel简单易用,能够保证在 Get 和 Put 操作时,需要保证读取的对象是唯一独立的,每次获取的连接不能是同一个连接。无法对 channel 中的全部对象进行遍历,需要额外维护数据结构来存储全部对象。需要编写大量的状态维护和同步代码,复杂度不易控制,放弃。3第三方库包装底层存储和连接状态维护由第三方库实现,自己只需要编写少量控制代码。依赖第三方库,通用性完全依赖第三方库的实现程度,灵活性不够。无法满足自己的需求,放弃。通过不断尝试和探索,我终于找到了一种比较好的实现方式。类似于 channel 的数据存储机制保证每次取出的对象是唯一独立的,并且能够快速遍历并释放过期连接。存储的对象使用 interface{} 类型,可以存储任意类型的对象。
「以下是需要解决的问题和痛点:」
「独立数据结构的存储模仿 channel:」
设计一个类似于 channel 的数据存储机制,确保每个数据对象独立存取。确保数据存储机制可以线程安全地处理数据,类似于 channel 的同步访问。「高效遍历与过期连接处理:」
实现一个迭代器来遍历存储的所有对象。在遍历过程中,检查并释放那些已经过期的连接。「interface{} 对象的生命周期管理:」
创建用于管理 interface{} 类型对象的生命周期的具体策略(创建、释放、探活)。封装这些策略到一个统一的管理结构中,便于维护和更新。「逻辑的可靠性和简易性:」
设计清晰的接口和模块,将复杂逻辑封装内部,对外提供简单接口。采用单元测试和集成测试确保各部分逻辑的正确性和稳定性。「创建通用模型以适应不同场景:」
设计一个灵活配置的框架,允许用户根据业务需求调整模型参数。减少对特定场景的硬编码,采用插件或钩子(hook)机制来适应不同的业务逻辑和数据处理需求。「为解决以上问题,需要使用如下的解决方案:」
「链表存储机制:」
利用 list 链表存储每个数据项,确保存取操作中的数据独立性。链表结构支持元素的快速插入和删除,适合动态数据管理。「有效的数据遍历与过期管理:」
使用 list 链表实现高效的数据遍历,便于监控和处理。通过遍历快速识别并处理(如释放或更新)过期的连接。「对象生命周期的结构化管理:」
设计一个 struct,封装 interface{} 类型的对象及其状态,用于详细管理生命周期。结构内部包括创建、更新、和销毁等状态转换逻辑。「自定义连接操作:」
提供接口允许用户根据业务需求自定义连接的创建、维护和终止过程。确保通过用户输入的自定义函数,逻辑保持简洁并易于集成。「配置的可定制性:」
允许用户设置关键操作参数,如连接检测频率、重试间隔、初始加载量和最大失败尝试次数。这些参数可通过配置文件或在运行时通过 API 进行调整,提高应用的适应性和灵活性。「回调函数的扩展性:」
支持用户定义 Callback 函数,以便在连接生命周期的关键节点进行自定义处理。回调机制增加了系统的灵活性,允许用户根据实际情况插入特定逻辑或处理流程。解决方案「我的解题思路:」
使用链表实现的 Queue 作为连接存储的数据结构。使用一个 struct 作为 Queue 中的存储对象,这个对象使用目标对象的 interface{} 类型。使用 sync.Pool 管理 struct 的生命周期,避免频繁的 GC。使用 Get 和 Put 方法来获取和放回连接,额外提供 GetOrCreate 方法来获取或创建连接,解决并发问题。使用一个 goroutine 定时检查连接的状态,如果连接已经关闭,则关闭连接。在 Get 或者 GetOrCreate 时,如果连接已经关闭,则抛弃连接,重新获取一个新的连接。提供 WithNewFunc,WithPingFunc, WithCloseFunc 方法来自定义连接的创建、探活和销毁函数。提供 WithCallback 方法来自定义连接的回调函数,用于在连接的生命周期中执行特定的操作。提供 WithInitialize,WithPingMaxRetries,WithScanInterval 方法来设置连接池的初始化参数。「在有相关的解题思路后,在实现过程中,我遵循了以下设计原则:」
「简单易用:」
提供简单易用的接口,让用户可以轻松创建和管理连接池。尽量减少用户的配置和编码工作,提高开发效率。「灵活可扩展:」
提供灵活的配置选项,允许用户根据业务需求调整连接池的参数(未来可以根据实际情况扩展)。支持自定义连接的创建、销毁和验证函数,以满足不同的业务场景。「高效稳定:」
采用高效的数据结构和算法,确保连接池的性能和稳定性。通过单元测试和集成测试,确保连接池的正确性和可靠性。「易于维护:」
采用清晰的代码结构和模块化设计,便于维护和扩展。提供详细的文档和示例,帮助用户快速上手和使用。结构设计conecta-1.png结构阐述Conecta 这个项目在设计的时候就想用最简单的方式来实现一个通用的连接池。为了解决实际问题,我在设计的时候尽量保持简单和易用。所以整个模型就两个部分:「初始部分」和「工作单元」。
初始部分初始部分主要是根据使用者对连接池的需求来初始化连接池的参数。
Conecta 提供了一个配置对象,允许您自定义其行为。您可以使用以下方法来配置配置对象:
WithCallback:设置回调函数。默认值为 &emptyCallback{}。WithInitialize:设置初始化池时的最小对象数。默认值为 0。WithPingMaxRetries:设置对象验证失败的最大次数,超过该次数将销毁对象。默认值为 3。WithNewFunc:设置对象创建函数。默认值为 DefaultNewFunc。WithPingFunc:设置对象验证函数。默认值为 DefaultPingFunc。WithCloseFunc:设置对象销毁函数。默认值为 DefaultCloseFunc。WithScanInterval:设置两次扫描之间的间隔时间。默认值为 10000ms。最终使用 New 方法来创建一个连接池。在创建过程中,会根据配置对象的参数来初始化连接池。
Conecta 虽然内部使用了一个基于链表实现的 Queue 来存储连接,但是对外提供了一个简单的接口,可以让用户根据自己的情况将自己实现的 Queue 替换掉(有的用户可能对 Queue 有特别的需求)。
Conecta 使用的是 WorkQueue 这个项目中的 SimpleQueue。WorkQueue 项目包含了多种 Queue 类型,有兴趣的用户可以在 Github 上查找这个项目,也可以阅读这个项目的介绍文章:《简约而不简单:WorkQueue 的轻量级高效之道》。
工作单元Conecta 的工作单元主要用于处理连接的创建、销毁和验证。在工作单元中,主要包含以下几个部分:
Queue:用于存储连接的队列。Executor:用于执行存储中所有连接的验证和探活工作。Executor 会在初始化时启动一个 goroutine,用于定时检查连接的状态。这个检查间隔时间可以通过配置对象的 WithScanInterval 方法来设置。在检查的过程中,如果连接已经关闭,则关闭并标记这个连接。
「周期执行的定时器」
//创建一个定时器,每p.config.scanInterval毫秒触发一次
//Createatimerthattriggerseveryp.config.scanIntervalmilliseconds
ticker:=time.NewTicker(time.Millisecond*time.Duration(p.config.scanInterval))
「标记异常的连接」
//如果元素的Ping次数超过最大重试次数,则关闭连接
//IfthenumberofPingtimesoftheelementexceedsthemaximumnumberofretries,theconnectionisclosed
ifretryCount=p.config.maxRetries{
//如果元素的值不为nil,则关闭连接
//Ifthevalueoftheelementisnotnil,closetheconnection
ifvalue!=nil{
//关闭连接并调用回调函数
//Closetheconnectionandcallthecallbackfunction
err:=p.config.closeFunc(value)
//调用OnClose回调函数,传入value和err
//CalltheOnClosecallbackfunction,passinginvalueanderr
p.config.callback.OnClose(value,err)
//将元素的数据置为空
//Setthedataoftheelementtonil
element.SetData(nil)
}
}else{
//对元素进行Ping检测
//PerformaPingcheckontheelement
ifok:=p.config.pingFunc(value,retryCount);ok{
//如果Ping检测成功,将元素的值设为0
//IfthePingcheckissuccessful,setthevalueoftheelementto0
element.SetValue(0)
//调用OnPingSuccess回调函数,传入value
//CalltheOnPingSuccesscallbackfunction,passinginvalue
p.config.callback.OnPingSuccess(value)
}else{
//如果Ping检测失败,将元素的值加1
//IfthePingcheckfails,incrementthevalueoftheelementby1
element.SetValue(int64(retryCount)+1)
//调用OnPingFailure回调函数,传入value
//CalltheOnPingFailurecallbackfunction,passinginvalue
p.config.callback.OnPingFailure(value)
}
}
一旦一个连接被标记为关闭,就会执行 closeFunc 关闭连接,并指定对应的 OnClose 回调函数,同时将连接的数据置为空。但是此时这个有问题的连接仍然存在于队列中。当调用 Get 和 GetOrCreate 方法时,会将这个连接从队列中移除(这里可能会有些小伙伴觉得绕,可以查看项目中的源代码)。
「移除标记为关闭的连接」
//从队列中获取一个元素,如果元素的值不为nil,则返回元素的值,否则继续获取
//Getanelementfromthequeue.Ifthevalueoftheelementisnotnil,returnthevalueoftheelement,otherwisecontinuetogetit
for{
...
//将获取到的元素转换为Element类型
//ConverttheobtainedelementtotheElementtype
data:=element.(*pool.Element)
//获取元素中的数据
//Getthedatafromtheelement
value:=data.GetData()
//如果元素的数据不为nil
//Ifthedataoftheelementisnotnil
ifvalue!=nil{
//将元素放回元素池
//Puttheelementbackintotheelementpool
p.elementpool.Put(data)
//返回元素的数据和nil错误
//Returnthedataoftheelementandanilerror
returnvalue,nil
}
//如果元素的数据为nil,则将元素放回元素池,然后继续获取
//Ifthedataoftheelementisnil,puttheelementbackintotheelementpoolandcontinuetogetit
p.elementpool.Put(data)
}
在之前的 「标记异常的连接」 中,错误和异常的连接被标记 element.SetData(nil) 和执行关闭。因此,只需要判断数据是否为 nil,即可确定需要移除的连接,并继续循环获取下一个连接。
项目介绍Conecta:https://github.com/shengyanli1982/conecta
conecta-0.pngConecta 是一个轻量级的用于管理连接/会话池的 Go 模块。
Conecta 项目的目标是提供一个简单易用的通用连接池,让开发者可以更加方便地对连接进行管理。
Conecta 能够提供以下功能:
自定义对象创建函数自定义对象销毁函数自定义对象验证函数自定义池初始化期间的最小对象数量自定义对象验证失败的最大次数接口设计Conecta 的方法接口也非常简洁,只有几个方法,非常容易上手。
方法接口New:创建一个 Conecta 对象。Get:从池中获取一个对象。GetOrCreate:从池中获取一个对象,如果池为空,则创建一个新对象。Put:将一个对象放回池中。Stop:关闭池。Cleanup: 清理池,释放所有资源。?如果池已关闭,Get、GetOrCreate 和 Put 方法将返回错误。使用 Stop 关闭池会清理池并释放所有资源。
?CallbackCallback 接口用于定义 Conecta 的回调函数。它包括以下方法:
OnPingSuccess:当对象验证成功时调用。OnPingFailure:当对象验证失败时调用。OnClose:当对象销毁时调用。使用示例packagemain
import(
"fmt"
"github.com/google/uuid"
"github.com/shengyanli1982/conecta"
"github.com/shengyanli1982/workqueue"
)
//Demo是一个包含value字段的结构体
//Demoisastructthatcontainsavaluefield
typeDemostruct{
//value是一个字符串
//valueisastring
valuestring
//id是一个字符串
//idisastring
idstring
}
//GetValue是一个方法,它返回Demo结构体中的value字段
//GetValueisamethodthatreturnsthevaluefieldintheDemostruct
func(d*Demo)GetValue()string{
//返回value字段
//Returnthevaluefield
returnd.value
}
//GetID是一个方法,它返回Demo结构体中的id字段
//GetIDisamethodthatreturnstheidfieldintheDemostruct
func(d*Demo)GetID()string{
//返回id字段
//Returntheidfield
returnd.id
}
//NewFunc是一个函数,它创建并返回一个新的Demo结构体
//NewFuncisafunctionthatcreatesandreturnsanewDemostruct
funcNewFunc()(any,error){
//创建一个新的Demo结构体,其value字段被设置为"test"
//CreateanewDemostructwithitsvaluefieldsetto"test"
return&Demo{value:"test",id:uuid.NewString()},nil
}
funcmain(){
//创建一个工作队列
//Createaworkqueue.
baseQ:=workqueue.NewSimpleQueue(nil)
//创建一个Conecta池
//CreateaConectapool.
conf:=conecta.NewConfig().
//使用NewFunc函数作为新建连接的函数
//UsetheNewFuncfunctionasthefunctiontocreateanewconnection.
WithNewFunc(NewFunc)
//使用conecta.New函数创建一个新的连接池
//Usetheconecta.Newfunctiontocreateanewconnectionpool.
pool,err:=conecta.New(baseQ,conf)
//检查是否在创建连接池时出现错误
//Checkiftherewasanerrorwhilecreatingtheconnectionpool.
iferr!=nil{
//如果创建连接池时出错,打印错误并返回
//Ifanerroroccurswhilecreatingtheconnectionpool,printtheerrorandreturn.
fmt.Println("!![main]createpoolerror:",err)
return
}
//使用defer关键字确保在函数结束时停止池
//Usethedeferkeywordtoensurethatthepoolisstoppedwhenthefunctionends
deferpool.Stop()
//使用for循环从池中获取数据
//Useaforlooptogetdatafromthepool
fori:=0;i10;i++{
//使用GetOrCreate方法从池中获取数据
//UsetheGetOrCreatemethodtogetdatafromthepool
ifdata,err:=pool.GetOrCreate();err!=nil{
//如果从池中获取数据时出错,打印错误并返回
//Ifanerroroccurswhilegettingdatafromthepool,printtheerrorandreturn
fmt.Println("!![main]getdataerror:",err)
return
}else{
//打印从池中获取的数据
//Printthedataobtainedfromthepool
fmt.Printf("[main]getdata:%s,id:%s\n",fmt.Sprintf("%s_%v",data.(*Demo).GetValue(),i),data.(*Demo).GetID())
//使用Put方法将数据放回池中
//UsethePutmethodtoputthedatabackintothepool
iferr:=pool.Put(data);err!=nil{
//如果将数据放回池中时出错,打印错误并返回
//Ifanerroroccurswhileputtingthedatabackintothepool,printtheerrorandreturn
fmt.Println("!![main]putdataerror:",err)
return
}
}
}
}
「输出结果」
$gorundemo.go
[main]getdata:test_0,id:7b781fde-b392-470a-9c12-2e495429c1a0
[main]getdata:test_1,id:7b781fde-b392-470a-9c12-2e495429c1a0
[main]getdata:test_2,id:7b781fde-b392-470a-9c12-2e495429c1a0
[main]getdata:test_3,id:7b781fde-b392-470a-9c12-2e495429c1a0
[main]getdata:test_4,id:7b781fde-b392-470a-9c12-2e495429c1a0
[main]getdata:test_5,id:7b781fde-b392-470a-9c12-2e495429c1a0
[main]getdata:test_6,id:7b781fde-b392-470a-9c12-2e495429c1a0
[main]getdata:test_7,id:7b781fde-b392-470a-9c12-2e495429c1a0
[main]getdata:test_8,id:7b781fde-b392-470a-9c12-2e495429c1a0
[main]getdata:test_9,id:7b781fde-b392-470a-9c12-2e495429c1a0
总结Conecta 是一个通用的连接池实现,适用于各种场景。它提供了简单易用的接口,可以轻松地创建和管理连接。同时,它还支持自定义连接的创建和销毁过程,以满足不同业务需求。通过 Conecta,我们可以避免重复造轮子,提高开发效率,减少代码冗余。希望 Conecta 能够帮助更多的开发者,让他们的开发工作更加轻松和高效。
最后,如果您有任何问题或建议,请在 Conecta 的 GitHub 上提出 issue。我将尽快回复您的问题。
点击关注公众号,“技术干货”及时达!
阅读原文
网站开发网络凭借多年的网站建设经验,坚持以“帮助中小企业实现网络营销化”为宗旨,累计为4000多家客户提供品质建站服务,得到了客户的一致好评。如果您有网站建设、网站改版、域名注册、主机空间、手机网站建设、网站备案等方面的需求...
请立即点击咨询我们或拨打咨询热线:13245491521 13245491521 ,我们会详细为你一一解答你心中的疑难。 项目经理在线