全国免费咨询:

13245491521

VR图标白色 VR图标黑色
X

中高端软件定制开发服务商

与我们取得联系

13245491521     13245491521

2025-04-23_性能飞跃!Node.js 亿级文件读写优化

您的位置:首页 >> 新闻 >> 行业资讯

性能飞跃!Node.js 亿级文件读写优化 ??即日起至5.11发文瓜分万元奖金 ??主赛道推好文,AI命题叠福利 ??点击下方活动图了解详情 业务背景所负责的业务场景是从hdfs中捞取生成的数据生成文本文件(txt、csv)等格式,并且压缩上传到对象存储,文件大小从几kb到几十个gb, 数据行数几十亿,经常是不是的内存就是CPU告警,迫切解决服务稳定性问题。 ?? 痛点爆破:为何传统方案频发内存 / CPU 告警?内存黑洞:全量加载百 GB 文件导致堆内存爆仓CPU 过载:单线程核心利用率飙至 440%+I/O 风暴:频繁读写文件客户端(浏览器)上传场景回顾下前端上传文件时对文件的处理的常见方式 通过JSON传递优点:这种方式是最舒适的传递方式,首先前端把文件对象转换成base64,再以字符串的形式传递给服务端。通过这种方式可以享受到JSON的灵活性,需要考虑的问题较少。缺点:base64编码会导致传输体积增大,并且这个计算过程也需要消耗浏览器有限的资源。在移动端如果通过这个方式发送稍微大一些的文件,就会感觉到温度和卡顿了。通过formData传输优点:直接传递二进制文件给服务端,没有额外的带宽和性能开销。缺点:在TypeScript开发环境下,formData的使用体验就是不得劲。并且formData是扁平的,不能对象嵌套对象,但是JSON可以。以及服务端接收formData需要写的代码量也比JSON多。向服务端请求token后,将文件上传到对象存储优点:通常对象存储会封装很多逻辑,上传控制各方面也不需要前端关心了。缺点:逻辑会比较复杂,往往需要先和业务逻辑请求一次,再去请求对象存储服务器,上传成功后再请求业务逻辑。 如果文件很小,就几百KB,那毫不犹豫会选择方法 1,实在是太爽了。如果是 MB 级别,还是formData比较好。再大会倾向于方案 3 分片上传下载 + 断点续传 (这里就不展开描述了)Nodejs 读写文件同步读写(最简单直接)const fs = require('node:fs'); // 同步读取 const data = fs.readFileSync('/path/to/small/file.txt', 'utf8'); console.log(data); // 同步写入 fs.writeFileSync('/path/to/output.txt', 'Hello World', 'utf8'); 回调异步读写(传统方式)const fs = require('node:fs'); // 异步读取 fs.readFile('/path/to/small/file.txt', 'utf8', (err, data) = { if (err) throw err; console.log(data); // 异步写入 fs.writeFile('/path/to/output.txt', data, 'utf8', (err) = { if (err) throw err; }); Promise 异步读写(现代方式)const fs = require('node:fs/promises'); async function readWriteFile() { try { // Promise方式读取 const data = await fs.readFile('/path/to/small/file.txt', 'utf8'); console.log(data); // Promise方式写入 await fs.writeFile('/path/to/output.txt', data, 'utf8'); } catch (err) { console.error(err); } } readWriteFile(); 流式读写(适合稍大文件或需要逐行处理)const fs = require('node:fs'); const readline = require('node:readline'); // 流式读取 const readStream = fs.createReadStream('/path/to/small/file.txt', { encoding: 'utf8', highWaterMark: 1024 // 每次读取的字节数 }); // 流式写入 const writeStream = fs.createWriteStream('/path/to/output.txt'); // 逐行处理 const rl = readline.createInterface({ input: readStream, crlfDelay: Infinity }); rl.on('line', (line) = { writeStream.write(`${line}\n`); }); rl.on('close', () = { writeStream.end(); }); const fs = require('node:fs'); function copyLargeFile(src, dest) { return new Promise((resolve, reject) = { // 1. 创建可读流(128MB分块) const readStream = fs.createReadStream(src, { highWaterMark: 128 * 1024 * 1024 // 2. 创建可写流(64MB缓冲) const writeStream = fs.createWriteStream(dest, { highWaterMark: 64 * 1024 * 1024 // 3. 管道连接与错误处理 readStream .on('error', reject) .pipe(writeStream) .on('error', reject) .on('finish', () = { console.log(`文件 ${src} 已成功拷贝至 ${dest}`); resolve(); } // 使用示例 copyLargeFile('xxx.txt', 'copy_xxx.txt') .catch(err = console.error('处理失败:', err)); 内存映射(高性能方式)const fs = require('node:fs'); const { Buffer } = require('node:buffer'); // 内存映射读取 fs.open('/path/to/small/file.txt', 'r', (err, fd) = { if (err) throw err; const stats = fs.fstatSync(fd); const buffer = Buffer.alloc(stats.size); fs.read(fd, buffer, 0, buffer.length, 0, (err) = { if (err) throw err; console.log(buffer.toString('utf8')); fs.close(fd); }); 当前业务实现核心片段const fs = require('node:fs'); const through2 = require('through2'); const split = require('split'); // 1. 创建可读流 fs.createReadStream('ex.txt') .on('error', reject) .pipe(split(os.EOL)) .on('error', reject) .pipe( .pipe(through2(function (chunk, enc, callback) { // 逻辑处理 })) // 3. 创建写入流 .pipe(fs.createWriteStream('out.txt')) // 4. 完成回调 .on('finish', () = doSomethingSpecial()); through2[1]是Node.js流(Streams)工具库,用于快速创建Transform流(数据转换流),无需手动继承stream.Transform并编写子类。它通过简洁的函数式 API,简化了流的定义和操作,适用于对数据进行中间处理(如修改、过滤、转换等)。输入流:fs.createReadStream创建可读流转换流水线:split(os.EOL):按行分割文本输出流:fs.createWriteStream写入目标文件分析代码并处理代码问题使用createReadStream和pipe来处理文件,用到了split和through2库来按行处理。这可能存在性能问题因逐行处理字符串可能比较慢,尤其是在处理大文件时。避免使readline和字符串分割,改用直接处理字节,这样可以提高速度未开启多线程,可以尝试将文件分成多个块,每个工作线程处理一个块,独立处理其中的行和分隔符最终实现import * as fs from 'fs'; import * as os from 'os'; import { Worker, isMainThread, parentPort, workerData } from 'worker_threads'; // 配置常量 const FILE_DELIMITER = 'YOUR_FILE_DELIMITER'; const EOL = os.EOL; const EOL_BUFFER = Buffer.from(EOL); const DELIMITER_LINE = Buffer.from(FILE_DELIMITER + EOL); const WORKER_COUNT = os.cpus().length; // 主线程预处理函数 async function findDelimiterPositions(filePath: string) { const fileSize = (await fs.promises.stat(filePath)).size; const chunkSize = 1024 * 1024; const positions: number[] = []; let previousRemaining = Buffer.alloc(0); const fd = await fs.promises.open(filePath, 'r'); try { for (let offset = 0; offset fileSize;) { const buffer = Buffer.alloc(chunkSize + DELIMITER_LINE.length); const { bytesRead } = await fd.read(buffer, 0, chunkSize, offset); if (!bytesRead) break; const combined = Buffer.concat([previousRemaining, buffer.subarray(0, bytesRead)]); let pos = 0; while (pos combined.length) { const idx = combined.indexOf(DELIMITER_LINE, pos); if (idx === -1) { previousRemaining = combined.subarray(pos); break; } positions.push(offset - previousRemaining.length + idx); pos = idx + DELIMITER_LINE.length; } offset += bytesRead; } } finally { await fd.close(); } return positions; } // 工作线程处理逻辑 function workerProcess() { const { filePath, start, end, filePo } = workerData; const stream = fs.createReadStream(filePath, { start, end }); const result: Buffer[] = []; let stationBuffer = Buffer.alloc(100); let tempBuffer = Buffer.alloc(5); let linePo = 0; // 字节处理状态机 let state = 0; // 0: 正常行处理,1: 分隔符处理 let buffer = Buffer.alloc(0); return new Promisevoid((resolve) = { stream.on('data', (chunk: Buffer) = { buffer = Buffer.concat([buffer, chunk]); while (true) { olIndex = buffer.indexOf(EOL_BUFFER); if (eolIndex === -1) break; const line = buffer.subarray(0, eolIndex); buffer = buffer.subarray(eolIndex + EOL_BUFFER.length); if (line.equals(DELIMITER_LINE.subarray(0, line.length))) { // 分隔符行直接跳过 continue; } // 字节到字符串转换优化 const transformed = processLine(line, linePo, filePo); result.push(Buffer.from(transformed + EOL)); linePo++; } stream.on('end', () = { parentPort.postMessage({ filePo, data: Buffer.concat(result) resolve(); } // 优化的行处理函数 function processLine(lineBuffer: Buffer, linePo: number, filePo: number) { // 实现你的自定义转换逻辑 return lineBuffer.toString() + `_processed_${filePo}_${linePo}`; } // 主线程逻辑 async function main() { const filePath = process.argv[2]; const outputPath = process.argv[3]; const delimiterPositions = await findDelimiterPositions(filePath); const fileSize = (await fs.promises.stat(filePath)).size; const parts = []; let prevPos = 0; // 生成处理区间 for (const pos of delimiterPositions) { parts.push({ start: prevPos, end: pos, filePo: parts.length }); prevPos = pos + DELIMITER_LINE.length; } parts.push({ start: prevPos, end: fileSize, filePo: parts.length }); // 创建工作线程 const workers = new Map(); const results = new Map(); parts.forEach((part, idx) = { const worker = new Worker(__filename, { workerData: { ...part, filePath } worker.on('message', (msg) = { results.set(msg.filePo, msg.data); if (results.size === parts.length) { // 按顺序写入结果 const sorted = Array.from(results.keys()).sort((a, b) = a - b); const ws = fs.createWriteStream(outputPath); sorted.forEach(po = ws.write(results.get(po))); ws.end(); } workers.set(idx, worker); } // 启动入口 if (isMainThread) { main(); } else { workerProcess(); } 总结多用 ai 分析当前代码可能存在的问题替换逐行的字符串处理为基于 Buffer 的字节处理,减少 split 和字符串转换的开销尽可能的提升代码的执行速度,开启多线程参考文档?jackyef.com/posts/1brc-…[2] github.com/rvagg/throu…[3] ?关注更多AI编程资讯请去AI Coding专区:https://juejin.cn/aicoding 阅读原文

上一篇:2022-04-04_「转」澜舟科技创始人周明:从感知智能跨越到认知智能,NLP领域要做哪些创新? 下一篇:2024-02-06_解谜 Dart VM中的线程池:并发编程艺术的详细分析

TAG标签:

19
网站开发网络凭借多年的网站建设经验,坚持以“帮助中小企业实现网络营销化”为宗旨,累计为4000多家客户提供品质建站服务,得到了客户的一致好评。如果您有网站建设网站改版域名注册主机空间手机网站建设网站备案等方面的需求...
请立即点击咨询我们或拨打咨询热线:13245491521 13245491521 ,我们会详细为你一一解答你心中的疑难。
项目经理在线

相关阅读 更多>>

猜您喜欢更多>>

我们已经准备好了,你呢?
2022我们与您携手共赢,为您的企业营销保驾护航!

不达标就退款

高性价比建站

免费网站代备案

1对1原创设计服务

7×24小时售后支持

 

全国免费咨询:

13245491521

业务咨询:13245491521 / 13245491521

节假值班:13245491521()

联系地址:

Copyright © 2019-2025      ICP备案:沪ICP备19027192号-6 法律顾问:律师XXX支持

在线
客服

技术在线服务时间:9:00-20:00

在网站开发,您对接的直接是技术员,而非客服传话!

电话
咨询

13245491521
7*24小时客服热线

13245491521
项目经理手机

微信
咨询

加微信获取报价