江夏
主 题
前言
主要步骤:
- 前端获取到File对象之后,对其进行文件hash值的计算和把文件按照一片2m大小进行文件切片;
- 切片完成直接调用切片文件上传接口,全部切片上传完成之后,请求文件合并接口,后端根据文件hash值和文件名进行文件合并并生成对应文件并且入库保存上传记录;
主要难点和坑点
- 前端批量上传文件切片时,对并发量进行控制,推荐使用队列来进行控制
- 后端使用nest生成流来合并文件的时候,当切文件太多了,生成的监听器太多了,node会报异常,解决方法时直接用递归,完成一个再执行下一个;
前端实现
具体代码参考
前端主要步骤:
- 对文件对象进行文件切片和文件hash值计算;
- 调用检查文件是否已经上传接口,如果之前已经上传过的相关切片直接跳过;
- 对切片文件进行并发上传和并发量的控制;上传完成请求文件合并接口;
处理文件
获取到文件对象之后,使用Worker开启一个线程去执行文件的切片和文件hash值得计算;使用的库为spark-md5
worker执行文件 worker.js
// 使用线程创建切片
const createChunksByWorker = (file) => {
return new Promise((resolve, reject) => {
const url = new URL('./worker.js', import.meta.url).href
const myWorker = new Worker(url)
myWorker.postMessage({ file, chunkSize: ChunkSize, })
myWorker.onmessage = (e) => {
// console.log('线程切片完成', e.data)
resolve(e.data)
myWorker.terminate()
}
})
}
上传执行函数
上传成功之后直接请求合并文件接口
const uploadFile = async (file) => {
// 设置文件名
fileName.value = file.name
// 获取文件hash值
const { chunkList, hash, } = await createChunksByWorker(file)
fileHash.value = hash
chunkTotal.value = chunkList.length
// console.log(chunks, fileName)
const [err, data] = await checkFile({ hash, })
const { isExist, chunks = [], } = data
curProgress.value = chunks.length
if (err) {
console.log(err, data)
return
}
console.log(
'curProgress==================>',
curProgress.value,
chunkTotal.value,
curProgress.value / chunkTotal.value
)
if (isExist) {
messageDanger('文件已存在')
loading.value = false
return
}
// 只上传没上传成功的
const filterChunkList = chunkList.filter(v => !chunks.includes(v.index))
const uploaded = await uploadChunks(filterChunkList)
if (uploaded) {
currentIndex = 0
const [err, data] = await mergeFile({
chunks: chunkTotal.value,
fileName: fileName.value,
hash,
})
if (!err) {
messageSuccess('上传成功')
}
setTimeout(() => {
starting.value = false
loading.value = false
}, 1000)
}
}
上传切片文件
并发数控制
限制请求并发数队列实现;首次最大并发数请求,后续一个请求完成则继续从队列取出数据接着请求,先进先出
let currentIndex = 0;
function limitRequests2 (chunks, fn, maxRequest) {
requests = [...chunks]
const totalRequests = chunks.length
const requestFn = fn
let errorNum = 0 // 错误数
currentIndex = 0
return new Promise((resolve, reject) => {
const makeRequest = async (chunk) => {
if (stopUpload) {
reject(new Error('停止'))
return
}
try {
await requestFn(toFormData(chunk))
currentIndex++
curProgress.value++
// console.log('Request to completed', response)
} catch (error) {
errorNum++
// 请求失败重试逻辑
if (errorNum < 10) {
console.log('Retrying request to', errorNum)
await makeRequest(chunk) // 重新发起请求
}
} finally {
// await之后执行
// 如果还有待处理的请求,则发起下一个请求
if (requests.length > 0) {
await makeRequest(requests.shift())
}
if (errorNum > 10) {
reject(new Error('错误数太多'))
}
if (requests.length === 0 && currentIndex === totalRequests) {
resolve(true)
}
}
}
// 初始化拿出最大最大并发数请求
for (let i = 0; i < maxRequest; i++) {
makeRequest(requests.shift())
}
})
}
// 批量上传切片
const uploadChunks = async (chunks) => {
try {
const resList = await limitRequests2(chunks, uploadHandler, MaxRequest)
// const resList = await limitRequests1(chunks)
console.log('resList====>', resList)
return resList
} catch (error) {
// console.log(error)
}
}
// 转为formData
const toFormData = (chunk) => {
const fd = new FormData()
Object.keys(chunk).forEach(k => fd.append(k, chunk[k]))
return fd
}
// 上传
const uploadHandler = (formData) => {
return uploadFileRequest(formData)
}
后端实现
具体代码实现
需要实现三个接口:
- 上传文件切片接口;
- 检查文件是否已经上传过;
- 文件合并接口
Multer相关配置
import * as fs from 'fs';
@Module({
imports: [
TypeOrmModule.forFeature([FileStore]),
MulterModule.registerAsync({
imports: [],
useFactory: async () => ({
storage: diskStorage({
// 配置文件上传后的文件夹路径
destination: (req, file, callback) => {
const { hash, index } = req.query;
const path = `${Config.fileConfig.filePath}tempFolder/${hash}`;
// 创建文件夹
fs.mkdir(path, { recursive: true }, (err) => {
if (err) {
console.log('创建文件夹失败', err);
return;
}
});
callback(null, path);
},
filename: (req, file, cb) => {
const { hash, index } = req.query;
const filename = `${hash}-${index}`;
return cb(null, filename);
},
}),
}),
inject: [],
}),
]
})
export class FileModule {}
处理合并文件操作
async mergeFile(body: any) {
return await new Promise(async (resolve, reject) => {
const { hash, fileName, chunks } = body;
const basePath = `${Config.fileConfig.filePath}tempFolder`;
const slicePath = `${basePath}/${hash}`;
const folderSize = await getFolderSizeBin(basePath);
const sliceSize = await getFolderSizeBin(basePath);
const status = HttpStatus.INTERNAL_SERVER_ERROR;
// console.log('folderSize=============>', folderSize);
// 4194304 4M 524288000 500M 2147483648 2G
if (folderSize > 2147483648) {
// promise 响应错误
reject(new HttpException('硬盘内存不足了~', status));
return;
}
let files = [];
try {
// 文件夹不存在会报错
files = fs.readdirSync(slicePath);
} catch (error) {
reject(new HttpException('文件不存在!', status));
return;
}
if (chunks !== files.length) {
reject(new HttpException('前后切片数量不一致,禁止合并', status));
return;
}
const sortedFiles = files.sort((a, b) => {
const [aIndex, bIndex] = [parseInt(a.split('-')[1]), parseInt(b.split('-')[1])];
return aIndex - bIndex;
});
// console.log('sortedFiles=============>', sortedFiles, fileName);
// 保存合成文件路径
const datePath = `${Config.fileConfig.filePath}${dayjs().format('YYYY-MM')}`;
const fileNameTargetPath = `${datePath}/${hash}-${fileName}`;
const writeStream = fs.createWriteStream(fileNameTargetPath);
// 一次合并一块切片
const mergeChunk = (index: number) => {
if (index >= sortedFiles.length) {
// 全部读取完写入
// 完结束写入执行end()才会触发finish时间
writeStream.end();
return;
}
const filePath = path.join(slicePath, sortedFiles[index]);
const readStream = fs.createReadStream(filePath);
// https://www.nodeapp.cn/stream.html#stream_class_stream_readable
readStream.pipe(writeStream, { end: false });
readStream.on('end', () => {
// 当切片文件过多时会报监听器超过最大值
// 删除已合并的切片文件(单个删除) 每个事件独立的
fs.unlinkSync(filePath);
// 处理下一个切片
mergeChunk(index + 1);
});
};
mergeChunk(0);
// 写入完成事件
writeStream.on('finish', () => {
console.log('Merge complete');
const list: Partial<Express.Multer.File>[] = [];
const fName = `${hash}-${fileName}`;
const saveFile = {
originalname: fileName,
filename: fName,
destination: datePath,
mimetype: fileName.split('.').slice(-1)[0],
size: sliceSize,
};
list.push(saveFile);
this.resourcesService.uploadFile(list, '19f66b84-8841-4cf5-8932-d11b95947d2d');
resolve(saveFile);
// 移除切片文件夹
deleteFolderRecursive(slicePath);
});
});
}
全部评论(0)