全国免费咨询:

13245491521

VR图标白色 VR图标黑色
X

中高端软件定制开发服务商

与我们取得联系

13245491521     13245491521

2024-11-16_超详细的大文件分片上传—实战与优化(前端部分)

您的位置:首页 >> 新闻 >> 行业资讯

超详细的大文件分片上传—实战与优化(前端部分) 点击关注公众号,“技术干货”及时达! 大文件分片上传实战与优化(前端Part)1. 需求分析1.1 分析实现大文件上传, 其中包括以下功能 秒传: 文件已存在, 直接给前端返回文件 url 记录文件的 hash 与元数据到数据库中上传文件前先计算 hash 和获取文件元数据请求接口进行比对若比对成功则说明文件已存在, 直接返回前端文件 url断点续传: 上传过程意外中断, 下次上传时不需要从头上传整个文件 前端将文件分片上传, 后端接收分片然后进行合并上传分片前先请求接口查询需要上传的分片即实现断点续传1.2 优化与亮点实现了「基于文件真实上传进度的进度条」实现了「可控制 Promise 并发数量的 PromisePool」实现了「基于 WebWorker 的 WorkerPool / ThreadPool」「解决了前端计算大文件 hash 速度过慢的痛点」, 最终实现 5GB 文件计算 Hash 仅需 3.5 秒解决了 Node 中合并文件分片导致文件损坏的问题 (思路: 在 Minio 中做合并而不是在 Node 里合并) (详见后续文章)其中计算文件 hash 的过程已经封装了一个库 hash-worker, 详见: https://github.com/Tkunl/hash-worker 1.3 技术选型「具体使用什么技术栈并不重要, 如果你使用的是 Vue/React 同样适用」 前端: Angular + NG-Zorro 后端: Nest.js + Prisma + MySQL + Minio(文件存储) (OOP 的大胜利) 2. 实现文件切片2.1 目标就是将文件按指定的切片大小进行切片, 最终拿到文件的 ArrayBuffer 数组用于上传和分片 Hash 计算 2.2 实现传入文件然后返回文件切片后的 Blob 数组, 不过 Blob 数组不能直接用于计算分片 hash, 还需要将它们转成 ArrayBuffer 数组 /** *分割文件 *@paramfile *@parambaseSize默认分块大小为1MB *@private */ functionsliceFile(file:File,baseSize=1):Blob[]{ constchunkSize=baseSize*1024*1024//KB constchunks:Blob[]=[] letstartPos=0 while(startPosfile.size){ chunks.push(file.slice(startPos,startPos+chunkSize)) startPos+=chunkSize } returnchunks } 可以使用 FileReader 转换, 可以直接转换, 二者没有性能差距 区别只是 FileReader API 的兼容性可能会更好一点 以下两种实现二选一即可 /** *将File转成ArrayBuffer *注意:Blob无法直接移交到Worker中,所以需要放到主线程中执行 *@paramchunks *@private */ asyncfunctiongetArrayBufFromBlobs(chunks:Blob[]):PromiseArrayBuffer[]{ asyncfunctionreadAsArrayBuffer(file:Blob){ returnnewPromiseArrayBuffer((rs)={ constfileReader=newFileReader() fileReader.onload=(e)=rs(e.target!.resultasArrayBuffer) fileReader.readAsArrayBuffer(file) }) } returnawaitPromise.all(chunks.map((chunk:Blob)=readAsArrayBuffer(chunk))) } /** *功能同上但语法更简洁 *@paramchunks */ asyncfunctiongetArrayBufFromBlobsV2(chunks:Blob[]):PromiseArrayBuffer[]{ returnPromise.all(chunks.map(chunk=chunk.arrayBuffer())) } 至此已实现了前端文件切片 2.3 踩坑有些文章将可以把切片过程放到 WebWorker 中, 以避免阻塞主线程 实际情况是: 切片过程不会消耗太久时间, 其中主要是 IO 瓶颈 而 Blob[] 转 ArrayBuffer[] 的过程是基于 Promise 的, 这并不会阻塞主线程 「注意 ! ! !」 如果你将这个过程放到 Worker 中, 由于 File 或 Blob 并不是 Worker 中的可 Transfer 对象 此处会导致 主线程与 Worker 通信时进行结构化克隆, 由此会产生额外的CPU性能消耗和内存消耗 而且如果文件很大时(大概超过2GB)会导致 Worker 线程 OOM (内存溢出错误) 3. 前端计算分片 Hash3.1 目标使用文件分片的 Hash 来标识文件分片, 用来判断这个分片是否已经上传过了 3.2 存在的问题与解决思路计算文件分片 Hash 是一个 CPU 密集型任务, 直接在主线程中计算 hash 必定会导致 UI 卡死, 考虑「放到 WebWorker 中计算 Hash」 ArrayBuffer 是可 Transfer 的对象, 在主线程与 Worker 线程通信时, 可以通过移交控制权的方式通信, 「避免线程通信引起的结构化克隆」 分片之间的 Hash 计算没有关联, 而 WebWorker 可以用来开额外的计算线程, 考虑「基于 WebWorker 实现线程池(WorkerPool)来加速计算分片 Hash」 当文件较大时计算使用分片的 MD5值作为 Hash 计算速度仍然较慢, 但分片的 hash 其实只是为了标识分片, 对于唯一性要求并不高, 「考虑在文件较大的场景下使用 CRC32 值作为分片的 Hash」 CRC32的十六进制表示只有8位(MD5有32位), 且 CPU 对计算 CRC32 有硬件加速, 速度会比计算 MD5 快得多 3.3 Web Worker用于计算 MD5 的 Worker 这里使用了 SparkMD5 计算文件的 MD5 //md5.worker.ts ///referencelib="webworker"/ import{WorkerMessage}from'./util/worker-message' import{WorkerLabelsEnum}from'./types/worker-labels.enum' importSparkMD5from'spark-md5' addEventListener('message',({data}:{data:ArrayBuffer})={ consthash=SparkMD5.ArrayBuffer.hash(data) postMessage( newWorkerMessage(WorkerLabelsEnum.DONE,{ result:hash, chunk:data, }), [data],//用于transfer的数据,以避免结构化克隆 ) }) 用于计算 CRC32 的 Worker //crc32.worker.ts ///referencelib="webworker"/ import{getCrc,getCrcHex}from'../utils/upload-helper' import{WorkerMessage}from'./util/worker-message' import{WorkerLabelsEnum}from'./types/worker-labels.enum' addEventListener('message',({data}:{data:ArrayBuffer})={ constcrc=getCrc(data) consthash=getCrcHex(crc) postMessage( newWorkerMessage(WorkerLabelsEnum.DONE,{ result:hash, chunk:data, }), [data],//用于transfer的数据,以避免结构化克隆 ) }) WorkerMessage: 用于 Worker 线程向主线程通信 //WorkerMessage.ts import{WorkerLabelsEnum}from'../types/worker-labels.enum' exportclassWorkerMessageT=any{ label:WorkerLabelsEnum content?:T constructor(label:WorkerLabelsEnum,content?:T){ this.label=label this.content=content } } WorkerLabelsEnum: 用于标识 Worker Message 的类型 //WorkerLabelsEnum.ts exportenumWorkerLabelsEnum{ INIT, CHUNK, DONE, } WorkerRep: WorkerMessage 的进一步封装, 方便传泛型 //WorkerRep.ts exportinterfaceWorkerRepT=any{ data:WorkerMessage } 3.4 Worker Pool 的实现使用 Worker Pool 来复用 Worker 而不是每次计算 hash 都开新的 Worker WorkerWrapper: 基于 Promise 追踪当前 Worker 的运行状态 import{WorkerRep}from'./worker-message' import{WorkerLabelsEnum}from'../types/worker-labels.enum' exportenumStatusEnum{ RUNNING='running', WAITING='waiting', } exportclassWorkerWrapper{ worker:Worker status:StatusEnum constructor( worker:Worker, ){ this.worker=worker this.status=StatusEnum.WAITING } run(param:ArrayBuffer,params:ArrayBuffer[],index:number){ this.status=StatusEnum.RUNNING returnnewPromise((rs,rj)={ this.worker.onmessage=({data}:WorkerRep{result:string;chunk:ArrayBuffer})={ const{label,content}=data if(label===WorkerLabelsEnum.DONEcontent){ params[index]=content.chunk//归还分片的所有权 this.status=StatusEnum.WAITING rs(content.resultasT) } } this.worker.onerror=(e)={ this.status=StatusEnum.WAITING rj(e) } this.worker.postMessage(param,[param])//用于transfer的数据,以避免结构化克隆 }) } } WorkerPool: 用于管理 WorkerWrapper, 实现 Worker 复用 核心思路是使用发布订阅模式来订阅当前 正在跑的 Worker 的数量(curRunningCount) 此处使用了 Rxjs 中的 BehaviorSubject, 也可以自己写一个 发布订阅模式来实现 只需要实现两个方法 subscribe() 和 next(), 其中 subscribe 用来订阅, next 用于发布新值 import{StatusEnum,WorkerWrapper}from'./worker-wrapper' import{BehaviorSubject}from'rxjs' exportabstractclassWorkerPool{ pool:WorkerWrapper[]=[] maxWorkerCount:number curRunningCount=newBehaviorSubject(0) results:any[]=[] protectedconstructor( maxWorkers=navigator.hardwareConcurrency||4, ){ this.maxWorkerCount=maxWorkers } exec(params:ArrayBuffer[]){ this.results.length=0 constworkerParams=params.map( (param,index)=({data:param,index}), ) returnnewPromiseT[]((rs)={ this.curRunningCount.subscribe(count={ if(countthis.maxWorkerCountworkerParams.length!==0){ //当前能跑的任务数量 letcurTaskCount=this.maxWorkerCount-count if(curTaskCountparams.length){ curTaskCount=params.length } //此时可以用来执行任务的Worker constcanUseWorker:WorkerWrapper[]=[] for(constworkerofthis.pool){ if(worker.status===StatusEnum.WAITING){ canUseWorker.push(worker) if(canUseWorker.length===curTaskCount){ break } } } constparamsToRun=workerParams.splice(0,curTaskCount) //更新当前正在跑起来的worker数量 this.curRunningCount.next(this.curRunningCount.value+curTaskCount) canUseWorker.forEach((workerApp,index)={ constparam=paramsToRun[index] workerApp.run(param.data,params,param.index) .then((res)={ this.results[param.index]=res }) .catch((e)={ this.results[param.index]=e }) .finally(()={ this.curRunningCount.next(this.curRunningCount.value-1) }) }) } if(this.curRunningCount.value===0workerParams.length===0){ rs(this.resultsasT[]) } }) }) } } WorkerPoolForMd5s: 用于实现使用 Worker Pool 计算所有分片的 MD5 值 import{WorkerWrapper}from'./util/worker-wrapper' import{WorkerPool}from'./util/worker-pool' exportclassWorkerPoolForMd5sextendsWorkerPool{ constructor(maxWorkers:number){ super(maxWorkers) this.pool=Array.from({length:this.maxWorkerCount}).map( ()= newWorkerWrapper( newWorker(newURL('./md5-single.worker',import.meta.url)), ), ) } } WorkerPoolForCrc32s: 用于实现使用 Worker Pool 计算所有分片的 CRC32 值 import{WorkerPool}from'./util/worker-pool' import{WorkerWrapper}from'./util/worker-wrapper' exportclassWorkerPoolForCrc32sextendsWorkerPool{ constructor( maxWorkers=navigator.hardwareConcurrency||4, ){ super(maxWorkers) this.pool=Array.from({length:this.maxWorkerCount}).map( ()= newWorkerWrapper( newWorker(newURL('./crc32-single.worker',import.meta.url)), ), ) } } 3.5 使用 Worker Pool 计算分片的 hash 值exportclassWorkerService{ readonlyMAX_WORKERS=8 md5SingleWorkerPool:WorkerPoolForMd5s|undefined crc32SingleWorkerPool:WorkerPoolForCrc32s|undefined //计算所有分片的MD5 getMD5ForFiles(chunks:ArrayBuffer[]):stirng[]{ if(this.md5SingleWorkerPool===undefined){ this.md5SingleWorkerPool=newWorkerPoolForMd5s(this.MAX_WORKERS) } returnthis.md5SingleWorkerPool.execstring(chunks) } //计算所有分片的CRC32 getCRC32ForFiles(chunks:ArrayBuffer[]):stirng[]{ if(this.crc32SingleWorkerPool===undefined){ this.crc32SingleWorkerPool=newWorkerPoolForCrc32s(this.MAX_WORKERS) } returnthis.crc32SingleWorkerPool.execstring(chunks) } } 3.6 踩坑这里如果将 new Worker(new URL('./crc32-single.worker', import.meta.url) 中的 url 拆出来作为一个变量传进去会导致在运行时浏览器无法正确拿到 Worker 文件, 具体原因未知, 可能是打包工具引起的问题 确保 WorkerPoolForMd5s 和 WorkerPoolForCrc32s 是单例的, 否则会导致浏览器创建过多的 Web Worker Worker 是需要手动关闭的, 可以找到合适的时机去关掉所有的 Worker 3.7 性能实测硬件情况: Ryzen9 5900HX + 32Gb DDR4 3种方式各算了两次 计算 1.8GB 文件分片的 MD5 [主线程中直接算(单线程)] 14585.115966796875 ms 14066.404052734375 ms 8线程 WebWorker = 比单线程快了 670% 2174.992919921875 ms 2169.323974609375 ms 12线程 WebWorker = 比 8 单线程快了 19%, 比单线程快了 776% 1825.158935546875 ms 1878.386962890625 ms 总结: 「使用多线程的方式可以使 hash 计算性能提高 6 ~ 7 倍」 4. 前端计算文件 Hash4.1 目标计算文件的 Hash 用来标识这个文件是否已经上传过了 4.2 存在的问题与解决思路计算全部文件的 hash 无法采用并行计算的方式, 实测假定用户上传 1.8GB 文件, 仅算文件 MD5 就要消耗 15秒 时间(不包括计算文件分片 hash 的时间) 考虑 使用 wasm「使用 MerkleTree(默克尔树) 的树根 hash 作为 文件的 hash」 (本文采用)4.3 MerkleTree每个叶子节点是对应数据分片的 Hash, 非叶子结点为它 2 个子节点的哈希, 从叶子结点层层向上计算 hash, 即得到 默克尔树根, 它可以用来校验数据集的完整性 如图所示, 其中的 hA hB hC hD hE hF hG hH 即为各个文件分片的 Hash hAB 只是基于文件分片的 Hash 计算 Hash 所以得到默克尔树根的 hash 速度会非常快, 因为并没有直接计算全部文件的 hash, 只是根据全部分片的 hash 进行计算 最后使用 树根的 hash 作为 文件 hash, 这样即实现了标识文件的唯一, 计算速度又非常快 4.4 MerkleTree 的实现此处仍然使用 SparkMD5 来计算 MD5 作为 hash 这里实现了 MerkleTree 的序列化和反序列化方法, 如果想保存整个 MerkleTree 树可以将这个序列化后的结果存入数据库, 我在这里只是用到了计算 MerkleTree 的树根 importSparkMD5from'spark-md5' //定义Merkle树节点的接口 interfaceIMerkleNode{ h:string l:IMerkleNode|null r:IMerkleNode|null } //定义Merkle树的接口 interfaceIMerkleTree{ root:IMerkleNode leafs:IMerkleNode[] //你可以根据需要添加其他属性或方法,例如校验、添加和生成树等功能 } //Merkle树节点的类实现 classMerkleNodeimplementsIMerkleNode{ h:string l:IMerkleNode|null r:IMerkleNode|null constructor(hash:string,left:IMerkleNode|null=null,right:IMerkleNode|null=null){ this.h=hash this.l=left this.r=right } } //Merkle树的类实现 exportclassMerkleTreeimplementsIMerkleTree{ root:IMerkleNode leafs:IMerkleNode[] constructor(hashList:string[]) constructor(leafNodes:IMerkleNode[]) constructor(nodes:string[]|IMerkleNode[]){ if(nodes.length===0){ thrownewError('EmptyNodes') } if(typeofnodes[0]==='string'){ this.leafs=nodes.map((node)=newMerkleNode(nodeasstring)) }else{ this.leafs=nodesasIMerkleNode[] } this.root=this.buildTree() } getRootHash(){ returnthis.root.h } buildTree():IMerkleNode{ //实现构建Merkle树的逻辑。根据叶子节点创建父节点,一直到根节点。 letcurrentLevelNodes=this.leafs while(currentLevelNodes.length1){ constparentNodes:IMerkleNode[]=[] for(leti=0;icurrentLevelNodes.length;i+=2){ constleft=currentLevelNodes[i] constright=i+1currentLevelNodes.length?currentLevelNodes[i+1]:null //具体的哈希计算方法 constparentHash=this.calculateHash(left,right) parentNodes.push(newMerkleNode(parentHash,left,right)) } currentLevelNodes=parentNodes } returncurrentLevelNodes[0]//返回根节点 } //序列化Merkle树 serialize():string{ constserializeNode=(node:IMerkleNode|null):any={ if(node===null){ returnnull } return{ h:node.h, l:serializeNode(node.l), r:serializeNode(node.r), } } constserializedRoot=serializeNode(this.root) returnJSON.stringify(serializedRoot) } //反序列化Merkle树 staticdeserialize(serializedTree:string):MerkleTree{ constparsedData=JSON.parse(serializedTree) constdeserializeNode=(data:any):IMerkleNode|null={ if(data===null){ returnnull } returnnewMerkleNode(data.h,deserializeNode(data.l),deserializeNode(data.r)) } constroot=deserializeNode(parsedData) if(!root){ thrownewError('Invalidserializedtreedata') } //创建一个包含所有叶子节点的数组,这是为了与MerkleTree的构造函数兼容 //没有保存这些叶子节点的序列化版本,所以这里需要一些额外的逻辑来处理 //如果你需要将整个树的所有节点存储为序列化版本,那么可能需要修改这部分逻辑 constextractLeafNodes=(node:IMerkleNode):IMerkleNode[]={ if(node.l===nullnode.r===null){ return[node] } return[ ...(node.l?extractLeafNodes(node.l):[]), ...(node.r?extractLeafNodes(node.r):[]), ] } constleafNodes=extractLeafNodes(root) returnnewMerkleTree(leafNodes) } privatecalculateHash(left:IMerkleNode,right:IMerkleNode|null):string{ returnright?SparkMD5.hash(left.h+right.h):left.h } } 4.5 使用 MerkleTree 树根的 Hash 作为文件 Hash//chunksHash为所有文件分片的hash数组 constmerkleTree=newMerkleTree(chunksHash) constfileHash=merkleTree.getRootHash() 5. 文件分片的并发上传5.1 目标多个文件分片可以同时上传到后端, 但不能使用 Promise.all() 直接将所有分片一起传到后端, 当文件分片数量较多时, 会导致同时开启的 HTTP 链接过多 「使用一个 PromisePool 来控制同时处于 pending 状态的 Promise 的数量」 「注意:」 使用 Promise.all() 只是用来收集 Promise 数组的执行结果, 它并不能用来控制同时处于 Pending 状态 Promise 的数量, 而 Promise 一旦创建了就会立即执行其中 new Promise() 中的同步代码(即发送网络请求), 所以需要创建 Promise 这个过程用函数包起来, 以实现当需要的时候再去执行 即函数调用的时候才会创建这个 Promise 5.2 实现实现思路同 Worker Pool 接收一个 () = Promiseany 数组作为任务 import{BehaviorSubject}from'rxjs' typeAsyncFunction=()=Promiseany exportclassPromisePool{ privatereadonlyqueue:{fn:AsyncFunction,index:number}[]=[] privatereadonlymaxConcurrentTasks:number privateresults:any[]=[] curRunningCount=newBehaviorSubject(0) constructor( functions:AsyncFunction[], maxConcurrentTasks:number=navigator.hardwareConcurrency||8, ){ this.queue=functions.map((fn,index)=({fn,index})) this.maxConcurrentTasks=maxConcurrentTasks } exec(){ returnnewPromiseT[]((rs)={ this.curRunningCount.subscribe((count)={ if(countthis.maxConcurrentTasksthis.queue.length!==0){ //当前需要跑的任务数量 letcurTaskCount=this.maxConcurrentTasks-count if(curTaskCountthis.queue.length){ curTaskCount=this.queue.length } //当前要跑的任务 consttasks=this.queue.splice(0,curTaskCount) this.curRunningCount.next(this.curRunningCount.value+curTaskCount) //执行任务 tasks.forEach((taskWrap)={ const{fn,index}=taskWrap fn().then((result)={ this.results[index]=result }).catch((error)={ this.results[index]=error }).finally(()= this.curRunningCount.next(this.curRunningCount.value-1) ) }) } if(this.curRunningCount.value===0this.queue.length===0){ rs(this.resultsasT[]) } }) }) } } 5.3 使用示例可以看见同时处于 跑起来了状态的 Promise 只有 4 个 asynctestPromisePool(){ constarr=[1,2,3,4,5,6,7,8,9,10] constasyncFns=arr.map( (num)=async()={ awaitnewPromisenumber((rs)={ console.log('跑起来了:'+num) setTimeout(()={ rs(num*2) },100) }) returnnewPromise((rs)={ setTimeout(()={ rs('结果:'+num*10) },2000) }) }, ) constpool=newPromisePool(asyncFns,4) pool.exec().then((res)={ console.log(res) }) } 6. 基于实际上传进度的进度计算6.1 思路 (Axios)使用可以设置其中的上传参数: onUploadProgress, 用于处理上传进度事件, 详见 https://axios-http.com/zh/docs/req_config 6.2 思路 (Angular HTTP Client)通过 Rxjs 的 pipe() 来处理上传事件, 详见 https://angular.cn/guide/http#tracking-and-showing-request-progress 6.3 基于 HTTP Client 的实现封装 HttpClient, 其中 cb(event.loaded) 中的 event.loaded 即为当前请求已经上传了多少数据 //http.service.ts import{HttpClient,HttpEvent,HttpEventType,HttpRequest}from'@angular/common/http' import{BehaviorSubject,last,lastValueFrom,map,Observable}from'rxjs' exportclassHttpService{ constructor(privatehttp:HttpClient){} privategetEventMessage(event:HttpEventany,cb?:(current:any)=void){ if(event.type===HttpEventType.UploadProgress){ cbcb(event.loaded) } } postPWithProgress( url:string, body:any, extra:HttpExtraParam={}, cb?:(current:number)=void, ){ consturlWithExtra=appendQueryParams(url,extra) returnlastValueFrom( this.http .request( newHttpRequest('POST',urlWithExtra,body,{ reportProgress:true, }), ) .pipe( map((event)=this.getEventMessage(event,cb)), last(), ), ) } } 完整应用详见 前端上传流程 7. 前端上传流程与策略7.1 流程获取文件元数据文件分片计算分片 Hash 与 文件 Hash检查文件是否已经上传过查询需要上传的文件分片构建上传参数上传实际需要上传的分片待全部分片上传完成后校验分片合并分片7.2 策略文件分片大小设定为 10MB 一个分片 Hash 策略 当文件分片数量 为1片时 (10MB 以下) 直接计算整个文件的 MD5当文件分片数量小于 100 片时(1GB 以下) 基于分片的 MD5 计算 默克尔树根作为 文件 Hash当文件分片数量大于 100 片时(1GB 以上) 基于分片的 CRC32 Hex 计算 默克尔树根作为 文件 Hash7.3 实时上传进度的思路其实类似于多线程状态下去刷一个数据(已上传数据的大小) 使用一个数组去存放所有请求的已上传文件大小, 初始值全是 0 按数组顺序刷写整个数组中各个请求的上传进度, 最后将数组中各元素求和, 即得到当前总的已上传大小 使用一个 定时器每隔 100ms 读一下当前上传的进度, 更新进度条即可 并不需要每个子上传进度上传时就更新进度条, 类似节流的思想 详见整体实现部分的代码 7.4 整体实现interfaceIMetaData{ size:number, lastModified:number, type:string } exportclassMinioUploaderService{ //用于追踪当前的上传阶段 uploadStatus=newBehaviorSubjectstring('Pleaseselectafile.') constructor(privateuploadApiSvc:UploadApiService){} asyncdoUpload( file:File, chunkSize:number, cb:(progress:number)=void, ){ //分片数量小于borderCount用MD5,否则用CRC32算Hash constBORDER_COUNT=100 //文件大小 constfileSize=file.size/1000 //文件元数据 constmetadata:IMetaData={ size:file.size, lastModified:file.lastModified, type:file.type, } //文件分片 this.uploadStatus.next('Parsingfile...') constchunksBlob=sliceFile(file,chunkSize) constchunksBuf=awaitgetArrayBufFromBlobsV2(chunksBlob) //按文件分片数量执行不同Hash策略 letchunksHash:string[]=[] if(chunksBuf.length===1){ chunksHash=[getMD5FromArrayBuffer(chunksBuf[0])] }elseif(chunksBuf.length=BORDER_COUNT){ chunksHash=awaitthis.workerSvc.getMD5ForFiles(chunksBuf) }else{ chunksHash=awaitthis.workerSvc.getCRC32ForFiles(chunksBuf) } constmerkleTree=newMerkleTree(chunksHash) constfileHash=merkleTree.getRootHash() //检查文件是否已经上传过 this.uploadStatus.next('Checkingfileifexist...') const{data:existUrl}=awaitthis.uploadApiSvc.checkFileIfExist(fileHash,fileSize) if(existUrl){ this.uploadStatus.next('Completed.') returnexistUrl } //查询需要上传的分片 this.uploadStatus.next('Getthechunksthatneedtobeuploaded...') const{data:_chunksNeedUpload}=awaitthis.uploadApiSvc.getExistChunks( fileHash, chunksHash, ) //完整的上传参数 this.uploadStatus.next('Buildinguploadparams...') constparamsMap=newMapstring,FormData() chunksBlob.forEach((chunk,index)={ constdata=newFormData() data.append('files',chunk) data.set('name',file.name) data.set('index',index.toString()) data.set('fileHash',fileHash) data.set('chunkHash',chunksHash[index]) paramsMap.set(chunksHash[index],data) }) //获取实际需要上传的分片 constparams=_chunksNeedUpload.map((chunkHash)=paramsMap.get(chunkHash)!) this.uploadStatus.next('Uploading...') //基于实时上传进度的进度 consttotal=file.size constcurrentProgressList:number[]=[] constintervalId=setInterval(()={ constcurrent=currentProgressList.reduce((acc,cur)=acc+cur,0) cb(Math.ceil((current/total)*100)) },150) awaitnewPromisePool(params.map((param,index)=()= this.uploadApiSvc.uploadChunks(param,(current)={ currentProgressList[index]=current }) )).exec() clearInterval(intervalId) cb(100) //获取校验失败的分块并尝试重新上传 this.uploadStatus.next('Verifyuploadedchunks...') const{data:brokenChunksList}=awaitthis.uploadApiSvc.verifyChunks2(fileHash,chunksHash) if(brokenChunksList.length!==0){ console.log('brokenChunksList:',brokenChunksList) return'' } //合并分片 this.uploadStatus.next('Mergingchunks...') const{data:url}=awaitthis.uploadApiSvc.mergeChunks(fileHash,file.name,fileSize,metadata) this.uploadStatus.next('Completed.') returnurl } } 8. 总结目前存在的问题: 使用默克尔树计算树根 Hash 的方式依赖于分片大小, 如果分片大小改变 Hash 值会不同在计算文件分片之前获取了全部文件分片的 ArrayBuffer, 这会导致将整个文件都读入到内存中, 后续会尝试优化一下完整 Demo 仓库: 前端项目: https://github.com/Tkunl/kun-uploader 后端项目: https://github.com/Tkunl/kun-uploader-server 此外本文可能有少许不准确或者有误的地方,欢迎评论区赐教。最后,如果觉得还不错,对你有帮助的话,欢迎点赞、收藏、转发 ??? 后端部分详见: # 超详细的大文件分片上传?实战与优化(后端部分) https://juejin.cn/post/7354362021428117519 文章参考: 一文吃透??大文件分片上传、断点续传、秒传? https://juejin.cn/post/7324140839780433932 转载请注明原作者, 和原文链接, 谢谢 ! ! ! 9. 重大性能优化更新解决了在计算hash时浏览器内存占用过高的问题, 使用以下逻辑替换掉之前的 Hash 策略现在每轮 hash 计算开始前会释放掉上一次计算 hash 时使用的 ArrayBuffer 从而大量减少内存占用 (之前会将文件的全部分片转为 ArrayBuffer 数组然后都存到内存中, 从而产生大量内存占用) //文件分片 this.uploadStatus.next('Parsingfile...') constchunksBlob=sliceFile(file,chunkSize) letchunksHash:string[]=[] if(chunksBlob.length===1){ chunksHash=[getMD5FromArrayBuffer(awaitchunksBlob[0].arrayBuffer())] }else{ letchunksBuf:ArrayBuffer[]=[] //将文件分片进行分组,组内任务并行执行,组外任务串行执行 constchunksPart=getArrPartsBlob(chunksBlob,this.workerSvc.MAX_WORKERS) consttasks=chunksPart.map( (part)=async()={ //手动释放上一次用于计算Hash的ArrayBuffer //!!!现在只会占用MAX_WORKERS*分片数量大小的内存!!! chunksBuf.length=0 chunksBuf=awaitgetArrayBufFromBlobsV2(part) //按文件分片数量执行不同Hash策略 returnchunksBlob.length=BORDER_COUNT? awaitthis.workerSvc.getMD5ForFiles(chunksBuf): awaitthis.workerSvc.getCRC32ForFiles(chunksBuf) }, ) for(consttaskoftasks){ constresult=awaittask() chunksHash.push(...result) } } 其中用到的 getArrParts 方法的实现 /** *用于将数组分块[1,2,3,4]=[[1,2],[3,4]] *@paramchunks原始数组 *@paramsize分part大小 */ functiongetArrPartsT(chunks:any,size:number){ constresult:T[][]=[] lettempPart:T[]=[] chunks.forEach((chunk:T)={ tempPart.push(chunk) if(tempPart.length===size){ result.push(tempPart) tempPart=[] } }) if(tempPart.length!==0)result.push(tempPart) returnresult } 现在计算 hash 的速度更快了使用 hash-wasm 替换掉原来的 spark-md5 //md5.worker.ts ///referencelib="webworker"/ import{md5}from'hash-wasm' import{WorkerMessage}from'./util/worker-message' import{WorkerLabelsEnum}from'./types/worker-labels.enum' addEventListener('message',async({data}:{data:ArrayBuffer})={ consthash=awaitmd5(newUint8Array(data)) postMessage( newWorkerMessage(WorkerLabelsEnum.DONE,{ result:hash, chunk:data, }), [data], ) }) 点击关注公众号,“技术干货”及时达! 阅读原文

上一篇:2019-05-30_黑转粉,有时只需要一点折扣 下一篇:2020-06-02_「转」洞悉2020年数据团队建设,我们和清华、领英一起搞了个大事情,你也可以参与!

TAG标签:

17
网站开发网络凭借多年的网站建设经验,坚持以“帮助中小企业实现网络营销化”为宗旨,累计为4000多家客户提供品质建站服务,得到了客户的一致好评。如果您有网站建设网站改版域名注册主机空间手机网站建设网站备案等方面的需求...
请立即点击咨询我们或拨打咨询热线:13245491521 13245491521 ,我们会详细为你一一解答你心中的疑难。
项目经理在线

相关阅读 更多>>

猜您喜欢更多>>

我们已经准备好了,你呢?
2022我们与您携手共赢,为您的企业营销保驾护航!

不达标就退款

高性价比建站

免费网站代备案

1对1原创设计服务

7×24小时售后支持

 

全国免费咨询:

13245491521

业务咨询:13245491521 / 13245491521

节假值班:13245491521()

联系地址:

Copyright © 2019-2025      ICP备案:沪ICP备19027192号-6 法律顾问:律师XXX支持

在线
客服

技术在线服务时间:9:00-20:00

在网站开发,您对接的直接是技术员,而非客服传话!

电话
咨询

13245491521
7*24小时客服热线

13245491521
项目经理手机

微信
咨询

加微信获取报价