封装优雅的缓存组件库(Redis 缓存与内存缓存)
点击关注公众号,“技术干货”及时达!写作背景很早前就想写一篇关于缓存组件库的文章,开发中总觉得对于缓存组件封装、使用这块都还差点意思,最近思考了一些方案,顺便记录下来。
为什么需要缓存如果接口qps不高、接口内部逻辑没有密集IO/高频热点数据,另外就是数据变动非常频繁,我理解并不需要缓存。需要缓存一般是下面几点。
提高应用程序的性能:通过存储常用数据在内存中(内存缓存/分布式缓存),避免了每次请求时都从慢速存储设备(如磁盘)中读取数据,提高了应用程序的响应时间。
减少数据库负载:频繁访问数据库会给数据库带来压力,使用缓存可以减少对数据库的查询次数,从而减轻数据库的负担。
提高用户体验:快速响应和减少用户的等待时间。
需求背景按照我平时的使用习惯需求比较简单,如下
支持多类型缓存,比如:分布式缓存(redis、memcached)、内存等缓存等,设计方便后续扩展;
支持多种缓存组合使用,比如内存缓存和分布式缓存组合,内存缓存命中直接返回数据,否则查询分布式缓存,直到找到对应数据,然后再回写缓存;
使用方数据结构不同,所以,需要支持多种数据结构,需要使用范型;
若缓存 Miss,从数据库/下游回查数据,需要支持自动查询数据库/下游接口,并且回写缓存。(为什么会有这个需求?看下面伪代码,这种方式对业务方来说太繁琐了并且研发同学如果不留意可能会忘记回写缓存);
先从缓存中查询
if缓存命中{
return
}
if缓存miss{
查询数据库/下游
if成功{
写回缓存
return
}
}
5. 分布式组件支持监控、自动重连机制;
6. 支持日志、metrics等能力,这期先不支持。
类图简单画了一个类图,只画了核心几个类,我用了适配器模式和责任链模式,后面详细讲解。
关键描述
Adaptor定义适配器接口,被子类实现,常见子类:redis适配器子类、内存适配器子类等,主是对缓存的适配,接口很简单只有三个,理解成增、删、改就可以了,后面可以扩展更多适配器。
缓存接口ICache,定义了几个外部方法(方法定义很克制,没有必要定义太多)Get、Set、Del、GetAndSet(这个比较重要,一般业务开发中用的多),留意,有一个实现类(MultiCache[Tany])含义是多级缓存,是对分布式缓存和内存缓存的编排,用了责任链模式。
其它类都是被引用的类,不太重要,这里不赘述。
核心代码完整代码参考:GitHub-PycMono/go-cache:基于三方库的二次实现(https://github.com/PycMono/go-cache)
缓存适配器适配器接口(Idaptor)import (
"context"
"time"
)
//IAdaptor接口转换器
typeIAdaptorinterface {
Set(ctxcontext.Context,paramsmap[string][]byte,expiretime.Duration) error
Get(ctxcontext.Context,k[]string) (map[string][]byte, error)
Del(ctxcontext.Context,k[]string) error
}
接口定义非常克制,就3个方法,不用定义太多,完全够用了。
分布式缓存适配器实现类以redis为案例,我主要分redisClient、IAdaptor接口实现2部分来讲。
Client意如名称,redis连接客户端,主要负责连接redis服务器、监控、重连机制。
Config配置文件,我用了构建者设计模式(建造者模式)、也可以用go的选项模式可以看下这篇文章。深入GO选项模式「附详细案例」-掘金
//Config配置文件
typeConfigstruct {
namestring //appname
addrstring //redisaddr,例如127.0.0.1:6379
passwordstring //redispassword
dbint //redisdb
poolSizeint //redispoolsize
poolTimeouttime.Duration//redis超时(单位秒,默认为0)
readTimeouttime.Duration//redis超时(单位秒,默认为0)
writeTimeouttime.Duration//redis超时(单位秒,默认为0)
}
//...省略一些代码
func (cConfig) build() (*redis.Options, error) {
if len(c.name) == 0 {
return nil,fmt.Errorf("name为空")
}
if len(c.addr) == 0 {
return nil,fmt.Errorf("addr为空")
}
if len(c.password) == 0 {
return nil,fmt.Errorf("password为空")
}
ifc.db== 0 {
return nil,fmt.Errorf("db为空")
}
ifc.poolSize== 0 {
c.poolSize= 30
}
return &redis.Options{
Addr:c.addr,
ClientName:c.name,
Password:c.password,
DB:c.db,
WriteTimeout:c.writeTimeout,
PoolSize:c.poolSize,
PoolTimeout:c.poolTimeout,
ReadTimeout:c.readTimeout,
}, nil
}
typeClientstruct {
redisClientredis.UniversalClient
conf*Config
ctxcontext.Context
smsync.RWMutex
}
func NewRedisClient(conf*Config) (*Client, error) {
ctx:=context.Background()
redisClient,err:= connect(ctx,conf)
iferr!= nil {
return nil,err
}
c:= &Client{
conf:conf,
ctx:ctx,
redisClient:redisClient, //首次初始化不用加锁
}
goc.monitoring() //监控
returnc, nil
}
func (c*Client) GetRedisClient()redis.UniversalClient{
c.sm.RLock()
deferc.sm.RUnlock()
returnc.redisClient
}
//monitoring重连监控,无限循环,检查redis客服端是否断开连接,如果断开重新连接
func (c*Client) monitoring() {
defer func() {
iferr:= recover();err!= nil {
fmt.Println(err)
return
}
}()
for {
//先休眠30秒
time.Sleep(30 *time.Second)
ifc.ping() {
continue
}
fmt.Println("redis异常断开,正在尝试重连~~~~~")
c.reConnect()
}
}
func (c*Client) ping() bool {
_,err:=c.redisClient.Ping(c.ctx).Result()
returnerr== nil
}
//reConnect重连
func (c*Client) reConnect() {
redisClient,err:= connect(c.ctx,c.conf)
iferr!= nil {
fmt.Println("redis重连失败...")
}
//尝试关闭历史连接
c.redisClient.Close()
c.sm.Lock()
deferc.sm.Unlock()
c.redisClient=redisClient
}
//connect建立redis连接
func connect(ctxcontext.Context,conf*Config) (*redis.Client, error) {
opt,err:=conf.build()
iferr!= nil {
return nil,err
}
redisClient:=redis.NewClient(opt)
_,err=redisClient.Ping(ctx).Result()
iferr!= nil {
return nil,err
}
//设置redisClientname(appname),方便定位问题
iferr=redisClient.Process(ctx,redis.NewStringCmd(ctx, "client", "setname",fmt.Sprintf("%s",conf.name)));err!= nil {
return nil,err
}
returnredisClient, nil
}
redis监控,我简单写了一个monitoring 方法,死循环每隔30sping一次redis服务器,若失败,则重连。
Redis适配器以Redis为案例,实现IAdaptor接口
typeCachestruct {
client*Client
}
func NewRedisAdaptor(client*Client)client.IAdaptor{
return &Cache{client:client}
}
func (r*Cache) Set(ctxcontext.Context,paramsmap[string][]byte,expiretime.Duration) error {
m:= make(map[string]interface{})
fork,v:= rangeparams{
m[k] = string(v)
}
_,err:=r.client.GetRedisClient().Pipelined(ctx, func(piperedis.Pipeliner) error {
forkey,value:= rangem{
err:=pipe.Set(ctx,key,value,expire).Err()
iferr!= nil {
returnerr
}
}
return nil
})
returnerr
}
func (r*Cache) Del(ctxcontext.Context,k[]string) error {
out:=r.client.GetRedisClient().Del(ctx,k...)
returnout.Err()
}
func (r*Cache) Get(ctxcontext.Context,k[]string) (map[string][]byte, error) {
pipe:=r.client.GetRedisClient().Pipeline()
for _,key:= rangek{
_, _ =pipe.Get(ctx,key).Result()
}
cmds,err:=pipe.Exec(ctx)
iferr!= nil && !errors.Is(err,redis.Nil) {
return nil,err
}
out:= make(map[string][]byte)
for _,cmd:= rangecmds{
args:=cmd.Args()
ifv,ok:=args[1].(string);ok{
err=cmd.Err()
iferrors.Is(err,redis.Nil) {
continue
}
iferr!= nil {
return nil,err
}
ifcmd,ok:=cmd.(*redis.StringCmd);ok{
str:=cmd.Val()
out[v] = []byte(str)
}
}
}
returnout, nil
}
Redis适配器实现3个接口,Del方法比较简单,不多赘述。Get和Set方法均采用管道的方式,对管道不熟悉的看下Redis文档命令篇就会了。
缓存编排管理器缓存接口(ICache),我定义为缓存编排器接口,主要是负责对缓存适配器的编排。接口非常简单,仅有下面几个方法,需要注意的是GetAndSet、GetAndSetSingle2个方法,主要是提供获取缓存并且设置缓存能力。
typeICache[Tany]interface{
Set(ctxcontext.Context,paramsmap[string]T)error
Get(ctxcontext.Context,keys[]string)(map[string]T,error)
GetAndSet(ctxcontext.Context,keys[]string,ffunc(keys[]string)(map[string]T,error))(map[string]T,error)
GetAndSetSingle(ctxcontext.Context,kstring,ffunc(kstring)(T,bool,error))(T,bool,error)
Del(ctxcontext.Context,keys[]string)error
}
缓存编排器实现类我以多级缓存为案例(更复杂一些),编排Redis缓存和内存缓存,流程如下
优先查内存缓存,内存缓命中则返回业务方;
内存缓存miss再查Redis缓存,Redis缓存命中则返回,并且回写内存缓存;
Redis缓存miss再查业务下游/数据库;
数据库/业务下游查回数据,回写Redis缓存和内存缓存,ps:数据库/业务下游未成功查询数据根据实际情况是否写入缓存标志。
//MultiCacheOptions可选参数
typeMultiCacheOptionsstruct {
Base
EnableLogbool //是否输出日志
WriteNilbool //缓存miss是否写入nil防止缓存穿透,默认不写入
Expiretime.Duration//过期时间
}
//MultiCache多级缓存
typeMultiCache[Tany] struct {
//多级缓存适配器
//后续处理逻辑是根据数组的顺序遍历,建议把离用户最近的缓存设置到下标0的位置,依次排列。需注意,一定要保证离用户最近的缓存有数据
//假设缓存顺序,内存缓存、redis缓存、缓存miss透传数据库
//若内存缓存未miss,直接返回
//若内存缓存miss,从redis中查询,redis缓存miss再从数据库中查询,一定要回写内存缓存
handlers[]client.IAdaptor
opts*MultiCacheOptions//基础配置
sfsingleflight.Group
//还可以增加一些中间件比如日志输出、埋点之类的
}
funcNewMultiCache[Tany](opts*MultiCacheOptions,handlers...client.IAdaptor)ICache[T] {
return &MultiCache[T]{
handlers:handlers,
opts:opts,
}
}
func (c*MultiCache[T]) Set(ctxcontext.Context,paramsmap[string]T) error {
kv:= make(map[string][]byte)
fork,v:= rangeparams{
key:=c.opts.buildKey(k)
b,err:=sonic.Marshal(v)
iferr!= nil {
returnerr
}
kv[key] =b
}
for _,v:= rangec.handlers{
err:=v.Set(ctx,kv,c.opts.Expire)
iferr!= nil {
returnerr
}
}
return nil
}
func (c*MultiCache[T]) Get(ctxcontext.Context,keys[]string) (map[string]T, error) {
var (
tmpKeys=c.opts.buildKeys(keys)
)
//多级缓存查询
//思路:第一个client先查找,若miss,将miss的key集合投递下一个client查找,直到所有client查找完成,或者keys全部找到
var (
kvMap= make(map[string][]byte)
missKeys=tmpKeys
preClientclient.IAdaptor
)
for _,cli:= rangec.handlers{
if len(missKeys) == 0 {
break //退出循环
}
tmpKvMap,err:=cli.Get(ctx,missKeys)
iferr!= nil {
return nil,err
}
fork,v:= rangetmpKvMap{
kvMap[k] =v
}
missKeys= []string{} //重新设置值
for _,key:= rangetmpKeys{
if _,ok:=tmpKvMap[key]; !ok{
missKeys= append(missKeys,key)
}
}
if len(tmpKvMap) == 0 ||preClient== nil {
preClient=cli
continue
}
err=preClient.Set(ctx,tmpKvMap,c.opts.Expire) //如果1级缓存miss了,2级缓存加载后,回写1级缓存
iferr!= nil {
fmt.Println(err)
}
preClient=cli
}
//处理数据返回
out:= make(map[string]T)
fork,v:= rangekvMap{
varobjT
err:=sonic.Unmarshal(v, &obj)
iferr!= nil {
return nil,err
}
key:=c.opts.splitKey(k) //切割key
out[key] =obj
}
returnout, nil
}
//GetAndSet缓存miss,支持调用f函数从其它db中获取数据
func (c*MultiCache[T]) GetAndSet(ctxcontext.Context,k[]string,ffunc(k[]string) (map[string]T, error)) (map[string]T, error) {
kvMap,err:=c.Get(ctx,k)
iferr!= nil {
return nil,err
}
var (
missKeys= []string{}
)
for _,v:= rangek{
if _,ok:=kvMap[v];ok{
continue
}
missKeys= append(missKeys,v)
}
if len(missKeys) == 0 {
returnkvMap, nil
}
tmpKvMap,err:= f(missKeys)
iferr!= nil {
return nil,err
}
fork,v:= rangetmpKvMap{
kvMap[k] =v
}
//检查外部数据源数据查询是否一致
ifc.opts.WriteNil&& len(missKeys) != len(tmpKvMap) {
for _,v:= rangemissKeys{
if _,ok:=tmpKvMap[v];ok{
continue
}
varobjT
tmpKvMap[v] =obj
}
}
if len(tmpKvMap) 0 {
err=c.Set(ctx,tmpKvMap)
iferr!= nil {
//todo打印日志就好了,不影响后续流程,下次请求再次尝试加载到缓存
fmt.Println(err)
}
}
returnkvMap, nil
}
func (c*MultiCache[T]) GetAndSetSingle(ctxcontext.Context,kstring,ffunc(kstring) (T, bool, error)) (T, bool, error) {
var (
valT
okbool
)
kvMap,err:=c.Get(ctx, []string{k})
iferr!= nil {
returnval, false,err
}
val,ok=kvMap[k]
ifok{
returnval, true, nil
}
//缓存miss从外部查询
iff== nil {
returnval, false, nil
}
//单飞查询
_,err, _ =c.sf.Do(k, func() (interface{}, error) {
val,ok,err= f(k)
iferr!= nil {
return nil,err
}
returnval, nil
})
iferr!= nil {
returnval, false,err
}
//写入缓存
var (
tmpKvMap= make(map[string]T)
)
//写入缓存条件:1、数据存在;2、数据不存在并且WriteNil为true
ifok|| (c.opts.WriteNil&& !ok) {
tmpKvMap[k] =val
err=c.Set(ctx,tmpKvMap)
iferr!= nil {
fmt.Println(err)
}
}
returnval,ok, nil
}
func (c*MultiCache[T]) Del(ctxcontext.Context,k[]string) error {
var (
tmpKeys=c.opts.buildKeys(k)
)
for _,v:= rangec.handlers{
err:=v.Del(ctx,tmpKeys)
iferr!= nil {
returnerr
}
}
return nil
}
代码不多大概230行,抽几个比较简单的来讲
GetAndSetSingle:获取/设置单条缓存记录,支持给业务方传入一个函数,若缓存均命中失败,调用参数函数从数据库/业务下游查询。
a. 查询业务下游用了单飞(golang.org/x/sync/singleflight),简单解释下同一个POD可以把多次请求合并成一个,高QPS的情况下缓存miss避免对数据库/业务下游造成冲击。感兴趣可以看看这篇文章;GOsingleflight你真的会用吗?「源码分析+详细案例」-掘金(https://juejin.cn/post/7337963242415259660)
b. 数据查询成功/失败回写缓存。
GetAndSet:获取/设置多条缓存记录,跟GetAndSetSingle类似,不一样的点是批量操作。
a. 查询并为用单飞,单飞是对同一个id多次查询操作;
b. 若缓存miss,对misskeys特殊处理,调用参数函数查询数据库/业务下游接口;
c. 数据查询成功/失败回写缓存。
下面看看如何使用,初始化Adaptor。
func getRedisAdaptor()client.IAdaptor{
conf:=redis.Config{}.WithName("test").
WithDB(10).
WithAddr(":6379").
WithPassword("12345").
WithPoolSize(100)
redisClient,err:=redis.NewRedisClient(&conf)
iferr!= nil {
panic(err)
}
returnredis.NewRedisAdaptor(redisClient)
}
func getMemAdaptor()client.IAdaptor{
conf:=mem.Config{}.WithCacheSize(1024 * 1024 * 1024).WithGCPercent(20)
returnmem.NewMemoryAdaptor(mem.NewMemCache(&conf))
}
增、删、改、查
func main() {
typePersonstruct {
Namestring `json:"name"`
Ageint `json:"age"`
}
cache:=goCache.NewMultiCache[*Person](&goCache.MultiCacheOptions{
Base:goCache.Base{Prefix: "demo"},
EnableLog: false,
WriteNil: false,
Expire:time.Minute* 10,
}, getMemAdaptor(), getRedisAdaptor())
var (
m= make(map[string]*Person)
)
m["xxxxx_test1"] = &Person{
Name: "李四",
Age: 20,
}
err:=cache.Set(context.TODO(),m)
iferr!= nil {
panic(err)
}
kvMap,err:=cache.Get(context.TODO(), []string{"12344pyc-test1"})
iferr!= nil {
panic(err)
}
b, _ :=sonic.Marshal(kvMap)
fmt.Println(string(b))
time.Sleep(time.Second* 10)
err=cache.Del(context.TODO(), []string{"12344pyc-test1"})
iferr!= nil {
panic(err)
}
kvMap,err=cache.Get(context.TODO(), []string{"12344pyc-test1"})
iferr!= nil {
panic(err)
}
b, _ =sonic.Marshal(kvMap)
fmt.Println(string(b))
}
好了,整体讲完啦
关于缓存与数据库不一致问题很多、很多、很多文章都在讲缓存与数据库不一致的问题,我先解释下缓存的场景
对业务下游数据缓存:比如A应用调用B应用,B应用就是业务下游。这种情况可能比较少,比如:组织架构被所有系统依赖,QPS非常高,某些特定情况下客户端做了缓存。但是,删除缓存非常复杂,B应用将数据变更如何通知A应用问题,常见的做法是广播(抛一个事件到MQ),使用缓存客户端均消费MQ数据删除数据,容易出现缓存一致性问题,一般不建议这么做;
对数据库缓存数据:一般情况增、删、改、查均在一个应用内。所以,缓存的设置和删除都比较简单。
不建议大家对业务下游数据缓存,除非固定不变的数据。比如:租户信息,开户之后不会变化。
缓存与数据库不一致问题盘点下来应该就是下面几种情况造成的。
先更新缓存,再更新数据库;
先更新数据库,再更新缓存;
先删除缓存,再更新数据库;
先更新数据库,再删除缓存。
分析下每种方案的最坏情况
缓存更新成功,数据库更新失败。导致缓存中数据是最新的,数据库还是历史数据,缓存中明显就是脏数据了,并且后期很难和数据库保持一致,一般不用。
先更新数据库,再更新缓存。数据库更新成功,缓存更新失败,查询结果是历史数据。勉强能接受,但是一般也不会用,QPS高的情况下,可能会导致Redis被多次更新。
先删除缓存,再更新数据库。第一步失败和第二步失败都不会造成缓存和数据库一致性问题。但,在QPS高的情况,第二步删除数据库的同时,可能存在有线程将历史数据查询后回写缓存造成数据不一致问题,一般不用。
先更新数据库,再删除缓存。数据库更新成功,缓存删除失败,数据不一致了,这种是最常用的方案,提供几种方案解决一致性问题
a. QPS低的场景可以用事务,第二步删除失败回滚数据库就可以了,QPS高的场景不适合;
b. 用重试机制,如果删除缓存失败,尝试多次即可,比如:每次删除失败间隔3s,删除3次任然失败则不处理了。
c. 异步方案,更新投递MQ或采集数据库更新日志(binlog、oplog等投递MQ,消费MQ删除缓存。
异步方案很早前我觉得很牛逼,有几个业务场景用了,并且是采集数据库binlog日志投递MQ。但是,但是后面我发现这玩意儿太坑了。
每次发布有灰度问题,不能灰度,跟基建有关;
链路复杂,更新数据库--采集日志投递mq--消费mq删除缓存,线上出现数据一致性问题,整个链路都排查一通,可能引入兄弟团队协助排查;
对后面接手业务的同学不友好,后续同学接手快,设计越简单越好。
我并不建议大家用异步方案,别给后面同学留坑,删除数据用重试机制,另外缓存都设置过期时间(根据业务来定时间长短),保证数据最终一致性,营销自动化QPS每秒400-500左右没出现一次缓存一致性问题。另外,想想整个业务团队都用异步方案,多少MQ?每个业务方都有一条MQ链路,太复杂了。
总结本文讲了封装Redis基础组件思路,主要以简单为主,尽量别提供一对乱七八糟的操作,用了一些设计模式,比如:构建者模式、适配器模式、责任链模式。
缓存不一致问题,常见方案是先更新数据库,再删除缓存,另外缓存设置过期时间保证数据最终一致性,建议不使用MQ方案。
点击关注公众号,“技术干货”及时达!
阅读原文
网站开发网络凭借多年的网站建设经验,坚持以“帮助中小企业实现网络营销化”为宗旨,累计为4000多家客户提供品质建站服务,得到了客户的一致好评。如果您有网站建设、网站改版、域名注册、主机空间、手机网站建设、网站备案等方面的需求...
请立即点击咨询我们或拨打咨询热线:13245491521 13245491521 ,我们会详细为你一一解答你心中的疑难。 项目经理在线