
 主 题 
前言
主要步骤:
- 前端获取到File对象之后,对其进行文件hash值的计算和把文件按照一片2m大小进行文件切片;
- 切片完成直接调用切片文件上传接口,全部切片上传完成之后,请求文件合并接口,后端根据文件hash值和文件名进行文件合并并生成对应文件并且入库保存上传记录;
主要难点和坑点
- 前端批量上传文件切片时,对并发量进行控制,推荐使用队列来进行控制
- 后端使用nest生成流来合并文件的时候,当切文件太多了,生成的监听器太多了,node会报异常,解决方法时直接用递归,完成一个再执行下一个;
前端实现
具体代码参考
前端主要步骤:
- 对文件对象进行文件切片和文件hash值计算;
- 调用检查文件是否已经上传接口,如果之前已经上传过的相关切片直接跳过;
- 对切片文件进行并发上传和并发量的控制;上传完成请求文件合并接口;
处理文件
获取到文件对象之后,使用Worker开启一个线程去执行文件的切片和文件hash值得计算;使用的库为spark-md5
worker执行文件 worker.js
            
            
              JavaScript
              
              
              
            
          
          // 使用线程创建切片
const createChunksByWorker = (file) => {
    return new Promise((resolve, reject) => {
      const url = new URL('./worker.js', import.meta.url).href
      const myWorker = new Worker(url)
      myWorker.postMessage({ file, chunkSize: ChunkSize, })
      myWorker.onmessage = (e) => {
        // console.log('线程切片完成', e.data)
        resolve(e.data)
        myWorker.terminate()
      }
    })
}上传执行函数
上传成功之后直接请求合并文件接口
            
            
              JavaScript
              
              
              
            
          
            const uploadFile = async (file) => {
    // 设置文件名
    fileName.value = file.name
    // 获取文件hash值
    const { chunkList, hash, } = await createChunksByWorker(file)
    fileHash.value = hash
    chunkTotal.value = chunkList.length
    // console.log(chunks, fileName)
    const [err, data] = await checkFile({ hash, })
    const { isExist, chunks = [], } = data
    curProgress.value = chunks.length
    if (err) {
      console.log(err, data)
      return
    }
    console.log(
      'curProgress==================>',
      curProgress.value,
      chunkTotal.value,
      curProgress.value / chunkTotal.value
    )
    if (isExist) {
      messageDanger('文件已存在')
      loading.value = false
      return
    }
    // 只上传没上传成功的
    const filterChunkList = chunkList.filter(v => !chunks.includes(v.index))
    const uploaded = await uploadChunks(filterChunkList)
    if (uploaded) {
      currentIndex = 0
      const [err, data] = await mergeFile({
        chunks: chunkTotal.value,
        fileName: fileName.value,
        hash,
      })
      if (!err) {
        messageSuccess('上传成功')
      }
      setTimeout(() => {
        starting.value = false
        loading.value = false
      }, 1000)
    }
  }上传切片文件
并发数控制
限制请求并发数队列实现;首次最大并发数请求,后续一个请求完成则继续从队列取出数据接着请求,先进先出
            
            
              JavaScript
              
              
              
            
          
          let currentIndex = 0;
function limitRequests2 (chunks, fn, maxRequest) {
    requests = [...chunks]
    const totalRequests = chunks.length
    const requestFn = fn
    let errorNum = 0 // 错误数
    currentIndex = 0
    return new Promise((resolve, reject) => {
      const makeRequest = async (chunk) => {
        if (stopUpload) {
          reject(new Error('停止'))
          return
        }
        try {
          await requestFn(toFormData(chunk))
          currentIndex++
          curProgress.value++
          // console.log('Request to completed', response)
        } catch (error) {
          errorNum++
          // 请求失败重试逻辑
          if (errorNum < 10) {
            console.log('Retrying request to', errorNum)
            await makeRequest(chunk) // 重新发起请求
          }
        } finally {
          // await之后执行
          // 如果还有待处理的请求,则发起下一个请求
          if (requests.length > 0) {
            await makeRequest(requests.shift())
          }
          if (errorNum > 10) {
            reject(new Error('错误数太多'))
          }
          if (requests.length === 0 && currentIndex === totalRequests) {
            resolve(true)
          }
        }
      }
      // 初始化拿出最大最大并发数请求
      for (let i = 0; i < maxRequest; i++) {
        makeRequest(requests.shift())
      }
    })
  }
// 批量上传切片
const uploadChunks = async (chunks) => {
    try {
      const resList = await limitRequests2(chunks, uploadHandler, MaxRequest)
      // const resList = await limitRequests1(chunks)
      console.log('resList====>', resList)
      return resList
    } catch (error) {
      //  console.log(error)
    }
  }
  // 转为formData
const toFormData = (chunk) => {
    const fd = new FormData()
    Object.keys(chunk).forEach(k => fd.append(k, chunk[k]))
    return fd
  }
  // 上传
const uploadHandler = (formData) => {
    return uploadFileRequest(formData)
  }后端实现
具体代码实现
需要实现三个接口:
- 上传文件切片接口;
- 检查文件是否已经上传过;
- 文件合并接口
Multer相关配置
            
            
              typescript
              
              
              
            
          
          import * as fs from 'fs';
@Module({
  imports: [
    TypeOrmModule.forFeature([FileStore]),
    MulterModule.registerAsync({
      imports: [],
      useFactory: async () => ({
        storage: diskStorage({
          // 配置文件上传后的文件夹路径
          destination: (req, file, callback) => {
            const { hash, index } = req.query;
            const path = `${Config.fileConfig.filePath}tempFolder/${hash}`;
            // 创建文件夹
            fs.mkdir(path, { recursive: true }, (err) => {
              if (err) {
                console.log('创建文件夹失败', err);
                return;
              }
            });
            callback(null, path);
          },
          filename: (req, file, cb) => {
            const { hash, index } = req.query;
            const filename = `${hash}-${index}`;
            return cb(null, filename);
          },
        }),
      }),
      inject: [],
    }),
  ]
})
export class FileModule {}处理合并文件操作
            
            
              typescript
              
              
              
            
          
          async mergeFile(body: any) {
    return await new Promise(async (resolve, reject) => {
      const { hash, fileName, chunks } = body;
      const basePath = `${Config.fileConfig.filePath}tempFolder`;
      const slicePath = `${basePath}/${hash}`;
      const folderSize = await getFolderSizeBin(basePath);
      const sliceSize = await getFolderSizeBin(basePath);
      const status = HttpStatus.INTERNAL_SERVER_ERROR;
      // console.log('folderSize=============>', folderSize);
      // 4194304 4M 524288000 500M 2147483648 2G
      if (folderSize > 2147483648) {
        // promise 响应错误
        reject(new HttpException('硬盘内存不足了~', status));
        return;
      }
      let files = [];
      try {
        // 文件夹不存在会报错
        files = fs.readdirSync(slicePath);
      } catch (error) {
        reject(new HttpException('文件不存在!', status));
        return;
      }
      if (chunks !== files.length) {
        reject(new HttpException('前后切片数量不一致,禁止合并', status));
        return;
      }
      const sortedFiles = files.sort((a, b) => {
        const [aIndex, bIndex] = [parseInt(a.split('-')[1]), parseInt(b.split('-')[1])];
        return aIndex - bIndex;
      });
      // console.log('sortedFiles=============>', sortedFiles, fileName);
      // 保存合成文件路径
      const datePath = `${Config.fileConfig.filePath}${dayjs().format('YYYY-MM')}`;
      const fileNameTargetPath = `${datePath}/${hash}-${fileName}`;
      const writeStream = fs.createWriteStream(fileNameTargetPath);
      // 一次合并一块切片
      const mergeChunk = (index: number) => {
        if (index >= sortedFiles.length) {
          // 全部读取完写入
          // 完结束写入执行end()才会触发finish时间
          writeStream.end();
          return;
        }
        const filePath = path.join(slicePath, sortedFiles[index]);
        const readStream = fs.createReadStream(filePath);
        // https://www.nodeapp.cn/stream.html#stream_class_stream_readable
        readStream.pipe(writeStream, { end: false });
        readStream.on('end', () => {
          // 当切片文件过多时会报监听器超过最大值
          // 删除已合并的切片文件(单个删除) 每个事件独立的
          fs.unlinkSync(filePath);
          // 处理下一个切片
          mergeChunk(index + 1);
        });
      };
      mergeChunk(0);
      // 写入完成事件
      writeStream.on('finish', () => {
        console.log('Merge complete');
        const list: Partial<Express.Multer.File>[] = [];
        const fName = `${hash}-${fileName}`;
        const saveFile = {
          originalname: fileName,
          filename: fName,
          destination: datePath,
          mimetype: fileName.split('.').slice(-1)[0],
          size: sliceSize,
        };
        list.push(saveFile);
        this.resourcesService.uploadFile(list, '19f66b84-8841-4cf5-8932-d11b95947d2d');
        resolve(saveFile);
        // 移除切片文件夹
        deleteFolderRecursive(slicePath);
      });
    });
  }
全部评论(0)