大文件的并发上传

如何实现高效率大文件上传?

整体流程概述

未命名文件 (4).png

整体的流程实现大致分为:

  1. hash分包;
  2. 并发上传;
  3. 处理请求响应,统计上传进度

下面针对不同情况,对这几个流程做剖析,此外根据http原理实现断点续传、重传机制、分包机制、慢启动机制做对应优化

hash分包

  • 创建文件hash方法

单个文件的情况下, 利用spark-md5最简单的分包

import SparkMD5 from 'spark-md5'
const spark = new SparkMD5.ArrayBuffer()

  function hash(file) {
    return new Promise(resolve => {
      const read = new FileReader()
      read.readAsArrayBuffer(file)
      read.onload = function () {
        spark.append(this.result)
        const result = spark.end();
        resolve(result)
      }
    })
  }
复制代码

在文件过大的情况下,计算文件的总hash耗时会很长,例如10G的文件的hash计算时间需要几分钟的时间,这样的成本过高,所以我们需要对大文件做切片hash

  • 大文件抽样hash

未命名文件 (3).png
对于大于10M的文件,我们分片取样后再计算hash,计算的效率提升数十倍

var blobSlice = File.prototype.slice || File.prototype.mozSlice || File.prototype.webkitSlice;

  /** 独立分包 */
  function hash(file) {
    return new Promise(async resolve => {
      let file_ = file
      const read = new FileReader()
      if (file.size > 10 * 1024 * 1024) {
        file_ = await getSample(file)
      }
      read.readAsArrayBuffer(file_)
      read.onload = function () {
        spark.append(this.result)
        const result = spark.end();
        resolve(result)
      }
    })
  }
  /** 大文件取样 */
  function getSample(file) {
    return new Promise(resolve => {
      const start = 2 * 1024 * 1024
      const chunks = [blobSlice.call(file, 0, start)]
      let surplusFile = blobSlice.call(file, start)
      const step = parseInt(surplusFile.size / 10) // 切片10%
      const useSize = parseInt(step / 5) // 取每个切片的20%
      let i = 0

      while (i < 10) {
        const slice = blobSlice.call(surplusFile, start + step * i, start + step * (i + 1))
        chunks.push(
          blobSlice.call(slice, 0, useSize)
        )
        i++
      }

      resolve(new Blob(chunks, { type: file.type || '' }))
    })
  }
复制代码
  • 文件分包批量hash

文件分包hash有两种方式,一种是单个文件hash完成触发一次request
未命名文件 (5).png

这样导致频繁的函数栈的切换,还有一种是分包后连续计算hash,然后统一request,
然而批量一次性hash势必会占据js线程,我们可以采用下面两种方式来优化:

  1. 利用requestIdleCallback在空闲时间中去调度hash文件的计算
  2. webWorker中去计算hash,计算完成在通知js主进程

前者可能会被其它长期占据js线程的任务打断
后者能够避免前者的问题,但是无法掌控hash进度的统计

那么我们采用requestIdleCallback

  /** 批量hash分包 */
  function splitHash(files, chunkSize) {
    const res = []
    return new Promise(resolve => {
      let current = 0
      var chunksCount = Math.ceil(files.size / chunkSize);
      const read = new FileReader()
      function fn() {
        let start = chunkSize * current
        let end = start + chunkSize > files.size ? files.size : start + chunkSize
        read.readAsArrayBuffer(files, start, end)
      }
      read.onload = function () {
        current++;
        // 设置hash进度
        setProgress1((current / chunksCount).toFixed(2) * 100)
        spark.append(this.result)
        res.push({
          hash: spark.end(),
          file: this.result
        })
        if (current >= chunksCount) {
          return resolve(res)
        }
        requestIdleCallback(fn)
      }
      requestIdleCallback(fn)
    })
  }
复制代码

请求的封装

  • 本地控制请求的超时
  /** 本地控制超时上传 */
  function request(options, timeOut = 30 * 1000) {
    const controller = new AbortController()
    const single = controller.single
    const timeoutFunction = function (timer) {
      return new Promise(resolve => {
        setTimeout(() => {
          resolve(new Response( // 模拟请求返回的内容
            JSON.stringify({
              code: 999,
              message: 'time out',
              success: false,
            })
          ))
          controller.abort() // 本地关闭请求,请求状态设置为‘cancel’
        }, timer)
      })
    }
    return Promise.race([
      timeoutFunction(timeOut),
      fetch(options.url, {
        single: single,
        method: "post",
        body: options.data,
      })
    ])
  }
复制代码
  • 并发请求
    关于浏览器的http并发数量,一个HTTP 连接请求在同一时间只能被一个线程访问,所以需要一个管理器去管理,这就是连接池管理器。

image.png
线程池在还没有任务到来之前,创建一定数量的线程,放入空闲队列中。这些线程都是处于睡眠状态,即均为启动,不消耗CPU,而只是占用较小的内存空间。当客户端有一个新请求时,就会唤醒线程池中的某一个睡眠线程,让它来处理客户端的这个请求,当处理完这个请求后,线程又处于睡眠状态

不同浏览器的控制并发的策略有所不同

image.png

那么我们默认并发数量为3,为其它同级域名下的http操作给出一定空间

  /** 并发上传 */
  function concurrentRequest(formDataArr) {
    let loadedNum = 0;
    const total = formDataArr.length
    const results = []
    const concurrentNun = 3
    let i = 0;
    let failMap = {} // 上传错误记录
    return new Promise(resolve => {
      async function doing() {
        while (i < concurrentNun && formDataArr.length) {
          i++
          const data = formDataArr.shift()
          const response = await request({
            url: url_,
            data: data,
          })
          const res = await response.json();
          i--;
          if (res.success && res.code === 0) {
            loadedNum++; // 上传进度更新
            setProgress2((loadedNum / total).toFixed(2) * 100)
          } else if(failMap(data.get('hash'))  < 3) {
            if (data && data.get) {
              failMap[data.get('hash')] = failMap(data.get('hash')) ? failMap[data.get('hash')] + 1 : 1
              formDataArr.unshift(data) // 失败次数不超过三次,就重传
            }
          }else { // 自定义上传失败错误码
            resolve({
              code: -1,
              success: false
            })
          }
          if (formDataArr.length) doing()
          results.push({
            hash: data.hash,
            code: res.code
          })
          if (loadedNum === total) resolve({
            code: 0,
            data: results,
            success: true

          })
        }
      }
      doing()
    })
  }
复制代码

慢启动找到期望包的大小

这里我们参考http的慢启动原理,最初的TCP的实现方式是,在连接建立成功后便会向网络中发送大尺寸的数据包,假如网络出现问题,很多这样的大包会积攒在路由器上,很容易导致网络中路由器缓存空间耗尽,从而发生拥塞。因此现在的TCP协议规定了,新建立的连接不能够一开始就发送大尺寸的数据包,而只能从一个小尺寸的包开始发送,在发送和数据被对方确认的过程中去计算对方的接收速度,来逐步增加每次发送的数据量(最后到达一个稳定的值,进入高速传输阶段。相应的,慢启动过程中,TCP通道处在低速传输阶段),以避免上述现象的发生。这个策略就是慢启动。然而慢启动导致客户端与服务器之间经过几百毫秒才能达到接近最大速度的问题,对于大型流式下载服务的影响倒不显著,因为慢启动的时间可以分摊到整个传输周期内消化掉。可是,对于很多HTTP连接,特别是一些短暂、突发的连接而言,常常会出现还没有达到最大窗口请求就被终止的情况。换句话说,很多Web应用的性能经常受到服务器与客户端之间往返时间的制约。因为慢启动限制了可用的吞吐量,而这对于小文件传输非常不利。慢启动重启除了调节新连接的传输速度,TCP还实现了SSR(Slow-StartRestart,慢启动重启)机制。
慢启动又一定的缺陷性,所以我们只在开始进行慢启动,而不是分包的时候
默认期待10秒能够上传完成单个包,可以根据服务器和网络来适配传入的参数

  /** 慢启动 | 获取已经上传过的包 */
  function slowBegin(file, totalHash, chunkSize = 1024 * 1024 * 2, expectTime = 10 * 1000) {
    let slowTestCount = 0 // 慢启动上传包的数量
    return new Promise(resolve => {
      let lastFile = file
      let size = chunkSize
      let startTime = new Date()
      const request_test = async function () {
        let f = blobSlice.call(lastFile, 0, chunkSize)
        lastFile = blobSlice.call(lastFile, chunkSize)
        let formDataArr = new FormData();
        formDataArr.append("totalHash", totalHash);
        formDataArr.append("hash", await hash(f));
        formDataArr.append("file", f);
        formDataArr.append("fileName", file.name);
        formDataArr.append("index", slowTestCount);
        request({
          url: url_,
          data: formDataArr,
        }).then(async response => {
          slowTestCount++
          let res = await response.json();
          if (res && res.success) {
            if ((new Date() - startTime) < expectTime) {
              resolve({
                chunkSize: size,
                lastFile,
                loadedHash: res.data,
                slowTestCount,
              })
            } else {
              size = size / 2
              request_test()
            }
          }
        })
      }
      request_test()
    })

  }
复制代码
  • 入口开启
  /** 入口 */
  async function work({ file }) {
    if (file) {
      const totalHash = await hash(file)
      const { lastFile, chunkSize, loadedHash, slowTestCount } = await slowBegin(file, totalHash)
      let splitRes = await splitHash(lastFile, chunkSize)
      // 已经上传过的打上标记
      splitRes = splitRes.forEach(item => { 
        if (loadedHash.includes(item.hash)) {
          item.uploaded = true 
        }
       })
      const formDataArr = [];
      splitRes.forEach((item, index) => {
        if(!item.uploader) {
          let formData = new FormData();
          formData.append("hash", item.hash);
          formData.append("file", item.file);
          formData.append("totalHash", totalHash);
          formData.append("index", index + slowTestCount);
          formDataArr.push(formData);
        }
      })
      concurrentRequest(formDataArr)
    }
  }
复制代码

思考

总体实现过程无非是模拟了http的慢启动、断点续传、hash分包。
断点续传需要服务端返回已经上传过了的包hash的数组,并且每次都需要重新分包做标记。
其它的方式:可以将分好的包存在本地的indexDB里面,但是经过实验存入indexDB后取出会被转义成乱码,后面发现可以将文件转base64存入,但是这样加大了传输的成本

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享