在 Node.js 和 React 中使用 S3 进行大文件/分段上传

分段上传是一种有效的、官方推荐的、可控的方式来处理大文件的上传。在使用 S3 预签名 URL 时尤其如此,它允许您以安全的方式执行分段上传,而不会暴露有关您的存储桶的任何信息

在 Node.js 和 React 中使用 S3 进行大文件/分段上传

在云存储中存储文件现在是一种标准,允许用户上传文件是 Web 应用程序的常见功能。具体来说,文件一般先上传到服务器,再上传到云存储服务,或者直接上传到云存储服务。在处理小文件时,这是一项容易完成的任务。另一方面,在上传大文件时,它可能会变得具有挑战性。

这就是 AWS 云存储和其他类似 Amazon S3 的云存储服务支持分段上传的原因。此技术允许您将文件拆分为几个小块,然后按顺序或并行上传它们,从而使您能够简洁地处理大文件。

首先,让我们了解什么是分段上传,它是如何工作的,以及为什么最好的方法涉及 S3 预签名 URL。然后,让我们看看如何通过在 Node.js 和 React 中构建的演示应用程序来实现开始分段上传所需的一切,包括后端和前端。

  • 什么是分段上传?
    • 分段上传如何工作?
    • 使用 S3 预签名URL 进行分段上传
  • 使用 S3 预签名 URL实现分段上传
    • Node.js 中的分段上传
    • React 中的分段上传先决条件

准备条件:

  • valid S3 credentials
  • Node.js and npm 5.2+
  • React >= 17.x
  • aws-sdk >= 2.x

aws-sdk 是适用于 JavaScript 的 AWS 开发工具包,您可以使用以下 npm 命令安它:

npm install --save aws-sdk

什么是分段上传?


如 Amazon S3 文档中所述,分段上传允许您将单个对象作为一组部分上传,它通常用于大文件。这种技术允许您将文件拆分为多个部分并并行上传,在需要时暂停和恢复操作,甚至在知道对象的全局大小之前就开始上传对象。

换句话说,分段上传可以更快、更可控、更灵活地上传到任何类似 Amazon S3 的云存储中。

分段上传如何工作?


上传对象的所有部分后,您必须告诉您的云存储操作已完成。然后,上传的数据将呈现为单个对象,等于上传的文件。

因此,这些是分段上传请求中涉及的步骤:

  1. 将对象拆分为多个部分
  2. 启动分段上传
  3. 过程上传每个部分
  4. 完成分段上传过程

这种上传方式一般用在前端,让用户可以上传大文件。详细来说,官方文档建议对任何大于 1GB 的文件使用 multipart 方法。但请注意,您可以对任何文件使用分段上传,无论其大小如何。

使用 S3 预签名 URL 进行分段上传


分段上传的最佳前端方法涉及预签名 URL。 S3 预签名 URL 是使用 AWS 访问密钥签名的 URL,它临时授予您对特定 S3 对象的受限访问权限。使用 S3 预签名 URL,您可以在预定义的时间限制内执行 GET 或 PUT 操作。默认情况下,S3 预签名 URL 在 15 分钟后过期。

S3 预签名 URL 特别有用,因为它们允许您将 S3 凭证和存储桶保密,仅在有限的时间内授予对资源的访问权限。您所要做的就是在后端生成它们,然后将它们提供给前端。

因此,预签名 URL 是将任何类型的文件上传到 S3 存储桶的安全方式。此外,它们允许您避免创建和管理角色,以及更改存储桶 ACL 或为用户提供特殊帐户来上传文件。

可以想象,它们在分段上传时特别有用。这是因为您可以为原始对象分割成的每个部分生成一个预签名的 URL,然后使用其各自的 URL 上传该部分。

如果您想采用涉及预签名 URL 的分段上传方法,则必须考虑一个新步骤。这是所需的所有步骤的列表:

  1. 将对象拆分为多个部分
  2. 启动分段上传
  3. 为每个部分创建预签名 URL
  4. 通过其预签名的 URL 上传每个部分
  5. 完成分段上传

现在,让我们深入研究使用预签名 URL 方法进行分段上传的优缺点。

优点
  • 它具有分段上传的所有好处,例如通过并行上传提高吞吐量,在网络问题的情况下快速恢复,因为与整个文件相比,每个部分都很小并且更容易重新发送,以及暂停和恢复对象上传的能力
  • 它本质上是安全的
  • 它消除了对具有特殊限制的角色或帐户的需要
缺点
  • 它涉及额外的逻辑,包括后端和前端
  • 它要求您事先知道对象大小以生成所有预签名 URL

使用 S3 预签名 URL 实现分段上传

您可以复制 GitHub 存储库,并通过启动以下命令安装程序:

git clone https://github.com/Tonel/multipart-upload-js-demo
cd multipart-upload-js-demo
npm i
npm backend:start
npm frontend:start

要使应用程序正常工作,您必须解决 /backend/controllers/upload.js 文件中留下的“TODO”问题,要求您在代码中添加 S3 凭据。

否则,请继续按照此分步教程学习如何构建它以及它是如何工作的。

Node.js 中的分段上传


要在 S3 中实现分段上传签名 URL,您需要三个 API。但首先,您需要一个 AWS.S3 实例。

您可以使用 S3 凭证和 aws-sdk 库对其进行初始化,如下所示:

const AWS = require("aws-sdk")

const s3Endpoint = new AWS.Endpoint("<YOUR_ENDPOINT>")

const s3Credentials = new AWS.Credentials({
  accessKeyId: "<YOUR_ACCESS_KEY>",
  secretAccessKey: "<YOUR_SECRET_KEY>",
})

const s3 = new AWS.S3({
  endpoint: s3Endpoint,
  credentials: s3Credentials,
})

现在,让我们深入研究如何构建三个 API 来实现分段上传:

POST /uploads/initializeMultipartUpload

async function initializeMultipartUpload(req, res) {
    const { name } = req.body

    const multipartParams = {
        Bucket:"",
        Key: ${name},
        ACL: "public-read"
    }

    const multipartUpload = await s3.createMultipartUpload(multipartParams).promise()

    res.send({
        fileId: multipartUpload.UploadId,
        fileKey: multipartUpload.Key
    })
}

请求正文中传递的name参数表示在分段上传操作结束时将在云存储中创建的文件的名称。此 API 通过调用先前创建的 s3 对象中可用的 createMultipartUpload() 函数来负责初始化分段上传请求。

此 API 是必需的,因为要执行分段上传请求,您需要 UploadId 值。云存储服务使用它来关联过程结束时分段上传所涉及的所有部分,而 Key 参数表示文件的全名。

  • POST /uploads/getMultipartPreSignedUrls
async function getMultipartPreSignedUrls(req, res) {
    const { fileKey, fileId, parts } = req.body
    const multipartParams = {
        Bucket: BUCKET_NAME,
        Key: fileKey,
        UploadId: fileId,
    }
    const promises = []
    for (let index = 0; index < parts; index++) {
        promises.push(
            s3.getSignedUrlPromise("uploadPart", {
                ...multipartParams,
                PartNumber: index + 1,
            })
        )
    }
    const signedUrls = await Promise.all(promises)
    // assign to each URL the index of the part to which it corresponds
    const partSignedUrlList = signedUrls.map((signedUrl, index) => {
        return {
            signedUrl: signedUrl,
            PartNumber: index + 1,
        }
    })
    res.send({
        parts: partSignedUrlList,
    })
}

此 API 负责返回与多部分请求中涉及的部分相关联的预签名 URL。它需要从之前的API中获取的fileKey和fileId参数,以及将原始文件分割成的部分数量通过分段上传进行上传。

然后,它使用此信息通过调用 s3 对象上的 getSignedUrlPromise() 函数来生成 S3 预签名 URL。如您所见,告诉 URL 与哪个部分关联的 PartNumber 参数是从 1 开始的索引。使用从 0 开始的索引会导致错误。

POST /uploads/finalizeMultipartUpload

async function finalizeMultipartUpload(req, res) {
    const { fileId, fileKey, parts } = req.body
    const multipartParams = {
        Bucket: BUCKET_NAME,
        Key: fileKey,
        UploadId: fileId,
        MultipartUpload: {
            // ordering the parts to make sure they are in the right order
            Parts: _.orderBy(parts, ["PartNumber"], ["asc"]),
        },
    }
    const completeMultipartUploadOutput = await s3.completeMultipartUpload(multipartParams).promise()
    // completeMultipartUploadOutput.Location represents the
    // URL to the resource just uploaded to the cloud storage
    res.send()
},

最后一个 API 最终确定了分段上传请求。同样,它需要来自第一个 API 的 fileId 和 fileKey。此外,它需要 parts 参数,它是具有以下类型的对象列表:

{
  PartNumber: number
  ETag: string
}

正如官方文档中定义的那样,ETag 是一个标识新创建对象数据的 ID。正如您很快将看到的,这可以在使用预签名 URL 执行的成功上传请求的响应标头中检索。

然后,此数据用于调用 completeMultipartUpload() 函数,该函数最终确定分段上传请求并使上传的对象在云存储中可用。请注意,MultipartUpload 的 Parts 字段必须有一个有序列表,并且 Lodash orderBy() 用于确保这一点。

现在,您拥有开始在前端应用程序上执行分段上传请求所需的一切。

React 中的分段上传

在前端处理分段上传有点棘手。如果您想并行上传许多部分并计划为用户提供中止操作的能力,则尤其如此。因此,您应该调整来自该存储库的实用程序类,而不是重新发明轮子,以满足您的需求。

更具体地说,您可以实现一个实用程序类来执行分段上传:

import axios from "axios"

// initializing axios
const api = axios.create({
  baseURL: "http://localhost:3000",
})

// original source: https://github.com/pilovm/multithreaded-uploader/blob/master/frontend/uploader.js
export class Uploader {
  constructor(options) {
    // this must be bigger than or equal to 5MB,
    // otherwise AWS will respond with:
    // "Your proposed upload is smaller than the minimum allowed size"
    this.chunkSize = options.chunkSize || 1024 * 1024 * 5
    // number of parallel uploads
    this.threadsQuantity = Math.min(options.threadsQuantity || 5, 15)
    this.file = options.file
    this.fileName = options.fileName
    this.aborted = false
    this.uploadedSize = 0
    this.progressCache = {}
    this.activeConnections = {}
    this.parts = []
    this.uploadedParts = []
    this.fileId = null
    this.fileKey = null
    this.onProgressFn = () => {}
    this.onErrorFn = () => {}
  }

  // starting the multipart upload request
  start() {
    this.initialize()
  }

  async initialize() {
    try {
      // adding the the file extension (if present) to fileName
      let fileName = this.fileName
      const ext = this.file.name.split(".").pop()
      if (ext) {
        fileName += `.${ext}`
      }

      // initializing the multipart request
      const videoInitializationUploadInput = {
        name: fileName,
      }
      const initializeReponse = await api.request({
        url: "/uploads/initializeMultipartUpload",
        method: "POST",
        data: videoInitializationUploadInput,
      })

      const AWSFileDataOutput = initializeReponse.data

      this.fileId = AWSFileDataOutput.fileId
      this.fileKey = AWSFileDataOutput.fileKey

      // retrieving the pre-signed URLs
      const numberOfparts = Math.ceil(this.file.size / this.chunkSize)

      const AWSMultipartFileDataInput = {
        fileId: this.fileId,
        fileKey: this.fileKey,
        parts: numberOfparts,
      }

      const urlsResponse = await api.request({
        url: "/uploads/getMultipartPreSignedUrls",
        method: "POST",
        data: AWSMultipartFileDataInput,
      })

      const newParts = urlsResponse.data.parts
      this.parts.push(...newParts)

      this.sendNext()
    } catch (error) {
      await this.complete(error)
    }
  }

  sendNext() {
    const activeConnections = Object.keys(this.activeConnections).length

    if (activeConnections >= this.threadsQuantity) {
      return
    }

    if (!this.parts.length) {
      if (!activeConnections) {
        this.complete()
      }

      return
    }

    const part = this.parts.pop()
    if (this.file && part) {
      const sentSize = (part.PartNumber - 1) * this.chunkSize
      const chunk = this.file.slice(sentSize, sentSize + this.chunkSize)

      const sendChunkStarted = () => {
        this.sendNext()
      }

      this.sendChunk(chunk, part, sendChunkStarted)
        .then(() => {
          this.sendNext()
        })
        .catch((error) => {
          this.parts.push(part)

          this.complete(error)
        })
    }
  }

  // terminating the multipart upload request on success or failure
  async complete(error) {
    if (error && !this.aborted) {
      this.onErrorFn(error)
      return
    }

    if (error) {
      this.onErrorFn(error)
      return
    }

    try {
      await this.sendCompleteRequest()
    } catch (error) {
      this.onErrorFn(error)
    }
  }

  // finalizing the multipart upload request on success by calling
  // the finalization API
  async sendCompleteRequest() {
    if (this.fileId && this.fileKey) {
      const videoFinalizationMultiPartInput = {
        fileId: this.fileId,
        fileKey: this.fileKey,
        parts: this.uploadedParts,
      }

      await api.request({
        url: "/uploads/finalizeMultipartUpload",
        method: "POST",
        data: videoFinalizationMultiPartInput,
      })
    }
  }

  sendChunk(chunk, part, sendChunkStarted) {
    return new Promise((resolve, reject) => {
      this.upload(chunk, part, sendChunkStarted)
        .then((status) => {
          if (status !== 200) {
            reject(new Error("Failed chunk upload"))
            return
          }

          resolve()
        })
        .catch((error) => {
          reject(error)
        })
    })
  }

  // calculating the current progress of the multipart upload request
  handleProgress(part, event) {
    if (this.file) {
      if (event.type === "progress" || event.type === "error" || event.type === "abort") {
        this.progressCache[part] = event.loaded
      }

      if (event.type === "uploaded") {
        this.uploadedSize += this.progressCache[part] || 0
        delete this.progressCache[part]
      }

      const inProgress = Object.keys(this.progressCache)
        .map(Number)
        .reduce((memo, id) => (memo += this.progressCache[id]), 0)

      const sent = Math.min(this.uploadedSize + inProgress, this.file.size)

      const total = this.file.size

      const percentage = Math.round((sent / total) * 100)

      this.onProgressFn({
        sent: sent,
        total: total,
        percentage: percentage,
      })
    }
  }

  // uploading a part through its pre-signed URL
  upload(file, part, sendChunkStarted) {
    // uploading each part with its pre-signed URL
    return new Promise((resolve, reject) => {
      if (this.fileId && this.fileKey) {
        // - 1 because PartNumber is an index starting from 1 and not 0
        const xhr = (this.activeConnections[part.PartNumber - 1] = new XMLHttpRequest())

        sendChunkStarted()

        const progressListener = this.handleProgress.bind(this, part.PartNumber - 1)

        xhr.upload.addEventListener("progress", progressListener)

        xhr.addEventListener("error", progressListener)
        xhr.addEventListener("abort", progressListener)
        xhr.addEventListener("loadend", progressListener)

        xhr.open("PUT", part.signedUrl)

        xhr.onreadystatechange = () => {
          if (xhr.readyState === 4 && xhr.status === 200) {
            // retrieving the ETag parameter from the HTTP headers
            const ETag = xhr.getResponseHeader("ETag")

            if (ETag) {
              const uploadedPart = {
                PartNumber: part.PartNumber,
                // removing the " enclosing carachters from
                // the raw ETag
                ETag: ETag.replaceAll('"', ""),
              }

              this.uploadedParts.push(uploadedPart)

              resolve(xhr.status)
              delete this.activeConnections[part.PartNumber - 1]
            }
          }
        }

        xhr.onerror = (error) => {
          reject(error)
          delete this.activeConnections[part.PartNumber - 1]
        }

        xhr.onabort = () => {
          reject(new Error("Upload canceled by user"))
          delete this.activeConnections[part.PartNumber - 1]
        }

        xhr.send(file)
      }
    })
  }

  onProgress(onProgress) {
    this.onProgressFn = onProgress
    return this
  }

  onError(onError) {
    this.onErrorFn = onError
    return this
  }

  abort() {
    Object.keys(this.activeConnections)
      .map(Number)
      .forEach((id) => {
        this.activeConnections[id].abort()
      })

    this.aborted = true
  }
}

正如您即将看到的,这个实用程序类允许您在一堆代码中执行多部分请求。 Uploader 实用程序类使用 axios API 客户端,但任何其他基于 Promise 的 API 请求都可以。

该实用程序类负责拆分表示对象的文件参数,以便将构造函数中接收到的内容上传到每个 5 MB 的较小部分。然后初始化分段上传请求(一次最多并行上传15个分段),最后调用上一章定义的finalization API完成请求。

如果发生错误,sendNext() 函数负责将上传失败的部分放回队列中。如果出现致命错误或故意中断,上传过程将停止。

实用程序类中最相关的部分由 upload() 函数表示。这是通过预签名 URL 上传每个部分并检索其相应 ETag 值的地方。

现在,让我们看看如何使用 Uploader 类:

import "./App.css"
import { Uploader } from "./utils/upload"
import { useEffect, useState } from "react"

export default function App() {
  const [file, setFile] = useState(undefined)
  const [uploader, setUploader] = useState(undefined)

  useEffect(() => {
    if (file) {
      let percentage = undefined

     const videoUploaderOptions = {
        fileName: "foo",
        file: file,
      }
      const uploader = new Uploader(videoUploaderOptions)
      setUploader(uploader)

      uploader
        .onProgress(({ percentage: newPercentage }) => {
          // to avoid the same percentage to be logged twice
          if (newPercentage !== percentage) {
            percentage = newPercentage
            console.log(`${percentage}%`)
          }
        })
        .onError((error) => {
          setFile(undefined)
          console.error(error)
        })

      uploader.start()
    }
  }, [file])

  const onCancel = () => {
    if (uploader) {
      uploader.abort()
      setFile(undefined)
    }
  }

  return (
    <div className="App">
      <h1>Upload your file</h1>
      <div>
        <input
          type="file"
          onChange={(e) => {
            setFile(e.target?.files?.[0])
          }}
        />
      </div>
      <div>
        <button onClick={onCancel}>Cancel</button>
      </div>
    </div>
  )
}

一旦通过  上传了带有 type="file" HTML 元素的文件,就会执行 useEffect() 挂钩。在那里,使用 Uploader 实用程序类来相应地自动管理分段上传请求。在上传过程中,您可以按取消按钮中止操作。