全国免费咨询:

13245491521

VR图标白色 VR图标黑色
X

中高端软件定制开发服务商

与我们取得联系

13245491521     13245491521

2023-09-29_使用golang实现一个MapReduce

您的位置:首页 >> 新闻 >> 行业资讯

使用golang实现一个MapReduce 点击关注公众号,回复”福利”即可参与文末抽奖 背景 在日常业务开发中,我们经常遇到需要并发处理的场景。例如: 依据id列表查询db,获取数据。为了保证查询性能,单次查询的id列表长度最好不要超过50(依据业务来判断),当id列表长度超过50时,拆分成并发请求,减少耗时和提高性能,返回聚合后的结果外部提供的接口不支持批量写入/读取数据,当需要批量处理数据时,为了减少耗时和提高性能,并发请求外部接口以上处理数据的场景,都可以分成两个阶段: 请求阶段。基本都是IO操作,请求db,或者是调用外部接口处理阶段。对返回的数据进行转换,过滤,聚合等操作同步调用,调用耗时增长明显 并发调用,可以减少调用耗时 分析上面说的处理数据的场景,都可以分成两个阶段: 请求阶段。IO操作,可以并发的去进行,互不干扰处理阶段。同步进行,保证聚合结果的正确性这种是一种特殊的MapReduce 为了处理这类场景,我们需要明确以下几个部分: 列表长度。代表有多少数据需要进行处理map函数。并发处理的函数,互不干扰reduce函数。同步处理的函数最大并发数。决定需要开多少线程/协程来处理拆分长度。列表长度 / 拆分长度 = 子任务数由于我在日常开发中常使用golang语言,下面梳理下使用golang来解决这类问题的一个思路 函数签名 func ChunkProcess(length int, procedure func(start, end int) (interface{}, error), reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) 核心逻辑: 当最大并发数 = 1 或者子任务数(列表长度 / 拆分长度) = 1时,同步执行map函数和reduce函数即可 其余情况,并发处理map函数,同步执行reduce函数 获取并发处理的子任务数量:lengthTask := int(math.Ceil(float64(length) / float64(chunkSize)))通过sync.Mutex保证reduce同步执行通过sync.WaitGroup保证等待子任务全部执行完成通过chan控制最大并发数代码实现package test import ( "math" "sync" ) func ChunkProcess(length int, procedure func(start, end int) (interface{}, error), reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) { if length 1 { return } if maxConcurrent = 1 || length = chunkSize { doChunkProcessSerially(length, procedure, reduce, chunkSize) } else { doChunkProcessConcurrently(length, procedure, reduce, maxConcurrent, chunkSize) } } // 同步处理 func doChunkProcessSerially(length int, procedure func(start, end int) (interface{}, error), reduce func(partialResult interface{}, partialErr error, start, end int), chunkSize int) { // 拆分的子任务数 chunkNums := int(math.Ceil(float64(length) / float64(chunkSize))) for i := 0; i chunkNums; i++ { func(chunkIndex int) { defer func() { if err := recover(); err != nil { // 自定义错误处理 } }() start := chunkIndex * chunkSize end := start + chunkSize if end length { end = length } // 执行map response, err := procedure(start, end) // 执行reduce if reduce != nil { reduce(response, err, start, end) } }(i) } } // 并发处理 func doChunkProcessConcurrently(length int, procedure func(start, end int) (interface{}, error), reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) { index := 0 chunkIndex := 0 // 拆分的子任务数 lengthTask := int(math.Ceil(float64(length) / float64(chunkSize))) // 保证reduce同步执行 var lock sync.Mutex // 保证子任务全部执行完成 var wg sync.WaitGroup wg.Add(lengthTask) // 控制并发数 throttleChan := make(chan struct{}, maxConcurrent) for { start := index end := index + chunkSize if end length { end = length } throttleChan - struct{}{} go func(chunkIndex int) { defer func() { -throttleChan if err := recover(); err != nil { // 自定义错误处理 } wg.Done() }() // 执行map response, err := procedure(start, end) // 执行reduce if reduce != nil { lock.Lock() defer lock.Unlock() reduce(response, err, start, end) } }(chunkIndex) chunkIndex++ index = index + chunkSize if index = length { break } } wg.Wait() close(throttleChan) } 测试: func TestChunkProcess(t *testing.T) { trackIDs := []int64{123, 456, 789} results := make([]int64, 0) ChunkProcess(len(trackIDs), func(start, end int) (interface{}, error) { result := trackIDs[start] + 100 return result, nil }, func(partialResult interface{}, partialErr error, start, end int) { results = append(results, partialResult.(int64)) }, 2, 1) fmt.Println(results) } 总结多对业务场景进行抽象分析,为这一类场景提供解决方案 点击小卡片,参与粉丝专属福利!! 如果文章对你有帮助的话欢迎「关注+点赞+收藏」 阅读原文

上一篇:2025-05-17_“体面的牛马”:中国高校教师生存图像 下一篇:2024-11-27_国产大模型首发中文逻辑推理,「天工大模型4.0」o1版来了

TAG标签:

17
网站开发网络凭借多年的网站建设经验,坚持以“帮助中小企业实现网络营销化”为宗旨,累计为4000多家客户提供品质建站服务,得到了客户的一致好评。如果您有网站建设网站改版域名注册主机空间手机网站建设网站备案等方面的需求...
请立即点击咨询我们或拨打咨询热线:13245491521 13245491521 ,我们会详细为你一一解答你心中的疑难。
项目经理在线

相关阅读 更多>>

猜您喜欢更多>>

我们已经准备好了,你呢?
2022我们与您携手共赢,为您的企业营销保驾护航!

不达标就退款

高性价比建站

免费网站代备案

1对1原创设计服务

7×24小时售后支持

 

全国免费咨询:

13245491521

业务咨询:13245491521 / 13245491521

节假值班:13245491521()

联系地址:

Copyright © 2019-2025      ICP备案:沪ICP备19027192号-6 法律顾问:律师XXX支持

在线
客服

技术在线服务时间:9:00-20:00

在网站开发,您对接的直接是技术员,而非客服传话!

电话
咨询

13245491521
7*24小时客服热线

13245491521
项目经理手机

微信
咨询

加微信获取报价