使用golang实现一个MapReduce
点击关注公众号,回复”福利”即可参与文末抽奖
背景
在日常业务开发中,我们经常遇到需要并发处理的场景。例如:
依据id列表查询db,获取数据。为了保证查询性能,单次查询的id列表长度最好不要超过50(依据业务来判断),当id列表长度超过50时,拆分成并发请求,减少耗时和提高性能,返回聚合后的结果外部提供的接口不支持批量写入/读取数据,当需要批量处理数据时,为了减少耗时和提高性能,并发请求外部接口以上处理数据的场景,都可以分成两个阶段:
请求阶段。基本都是IO操作,请求db,或者是调用外部接口处理阶段。对返回的数据进行转换,过滤,聚合等操作同步调用,调用耗时增长明显
并发调用,可以减少调用耗时
分析上面说的处理数据的场景,都可以分成两个阶段:
请求阶段。IO操作,可以并发的去进行,互不干扰处理阶段。同步进行,保证聚合结果的正确性这种是一种特殊的MapReduce
为了处理这类场景,我们需要明确以下几个部分:
列表长度。代表有多少数据需要进行处理map函数。并发处理的函数,互不干扰reduce函数。同步处理的函数最大并发数。决定需要开多少线程/协程来处理拆分长度。列表长度 / 拆分长度 = 子任务数由于我在日常开发中常使用golang语言,下面梳理下使用golang来解决这类问题的一个思路
函数签名
func ChunkProcess(length int, procedure func(start, end int) (interface{}, error),
reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int)
核心逻辑:
当最大并发数 = 1 或者子任务数(列表长度 / 拆分长度) = 1时,同步执行map函数和reduce函数即可
其余情况,并发处理map函数,同步执行reduce函数
获取并发处理的子任务数量:lengthTask := int(math.Ceil(float64(length) / float64(chunkSize)))通过sync.Mutex保证reduce同步执行通过sync.WaitGroup保证等待子任务全部执行完成通过chan控制最大并发数代码实现package test
import (
"math"
"sync"
)
func ChunkProcess(length int, procedure func(start, end int) (interface{}, error),
reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) {
if length 1 {
return
}
if maxConcurrent = 1 || length = chunkSize {
doChunkProcessSerially(length, procedure, reduce, chunkSize)
} else {
doChunkProcessConcurrently(length, procedure, reduce, maxConcurrent, chunkSize)
}
}
// 同步处理
func doChunkProcessSerially(length int, procedure func(start, end int) (interface{}, error),
reduce func(partialResult interface{}, partialErr error, start, end int), chunkSize int) {
// 拆分的子任务数
chunkNums := int(math.Ceil(float64(length) / float64(chunkSize)))
for i := 0; i chunkNums; i++ {
func(chunkIndex int) {
defer func() {
if err := recover(); err != nil {
// 自定义错误处理
}
}()
start := chunkIndex * chunkSize
end := start + chunkSize
if end length {
end = length
}
// 执行map
response, err := procedure(start, end)
// 执行reduce
if reduce != nil {
reduce(response, err, start, end)
}
}(i)
}
}
// 并发处理
func doChunkProcessConcurrently(length int, procedure func(start, end int) (interface{}, error),
reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) {
index := 0
chunkIndex := 0
// 拆分的子任务数
lengthTask := int(math.Ceil(float64(length) / float64(chunkSize)))
// 保证reduce同步执行
var lock sync.Mutex
// 保证子任务全部执行完成
var wg sync.WaitGroup
wg.Add(lengthTask)
// 控制并发数
throttleChan := make(chan struct{}, maxConcurrent)
for {
start := index
end := index + chunkSize
if end length {
end = length
}
throttleChan - struct{}{}
go func(chunkIndex int) {
defer func() {
-throttleChan
if err := recover(); err != nil {
// 自定义错误处理
}
wg.Done()
}()
// 执行map
response, err := procedure(start, end)
// 执行reduce
if reduce != nil {
lock.Lock()
defer lock.Unlock()
reduce(response, err, start, end)
}
}(chunkIndex)
chunkIndex++
index = index + chunkSize
if index = length {
break
}
}
wg.Wait()
close(throttleChan)
}
测试:
func TestChunkProcess(t *testing.T) {
trackIDs := []int64{123, 456, 789}
results := make([]int64, 0)
ChunkProcess(len(trackIDs), func(start, end int) (interface{}, error) {
result := trackIDs[start] + 100
return result, nil
}, func(partialResult interface{}, partialErr error, start, end int) {
results = append(results, partialResult.(int64))
}, 2, 1)
fmt.Println(results)
}
总结多对业务场景进行抽象分析,为这一类场景提供解决方案
点击小卡片,参与粉丝专属福利!!
如果文章对你有帮助的话欢迎「关注+点赞+收藏」
阅读原文
网站开发网络凭借多年的网站建设经验,坚持以“帮助中小企业实现网络营销化”为宗旨,累计为4000多家客户提供品质建站服务,得到了客户的一致好评。如果您有网站建设、网站改版、域名注册、主机空间、手机网站建设、网站备案等方面的需求...
请立即点击咨询我们或拨打咨询热线:13245491521 13245491521 ,我们会详细为你一一解答你心中的疑难。 项目经理在线