Browse Source

object-storage: @aws-sdk/lib-storage for multipart (#4903)

* object-storage: @aws-sdk/lib-storage for multipart

* gitignore: add .DS_Store

* test(object-storage): remove only

* test(object-storage/multipart): generate video

* fix lint issue

* test(obj-storage/video): ensure file size

* Styling

Co-authored-by: Chocobozzz <me@florianbigard.com>
kontrollanten 2 years ago
parent
commit
156cdbac22

+ 1 - 0
.gitignore

@@ -47,6 +47,7 @@ yarn-error.log
 /*.zip
 /*.tar.xz
 /*.asc
+*.DS_Store
 /server/tools/import-mediacore.ts
 /docker-volume/
 /init.mp4

+ 2 - 1
package.json

@@ -78,7 +78,8 @@
   },
   "dependencies": {
     "@aws-sdk/client-s3": "^3.23.0",
-    "@babel/parser": "7.17.9",
+    "@aws-sdk/lib-storage": "^3.72.0",
+    "@babel/parser": "7.17.8",
     "@peertube/feed": "^5.0.1",
     "@peertube/http-signature": "^1.4.0",
     "@uploadx/core": "^5.1.0",

+ 19 - 70
server/lib/object-storage/shared/object-storage-helpers.ts

@@ -1,19 +1,14 @@
-import { close, createReadStream, createWriteStream, ensureDir, open, ReadStream, stat } from 'fs-extra'
-import { min } from 'lodash'
+import { createReadStream, createWriteStream, ensureDir, ReadStream, stat } from 'fs-extra'
 import { dirname } from 'path'
 import { Readable } from 'stream'
 import {
-  CompletedPart,
-  CompleteMultipartUploadCommand,
-  CreateMultipartUploadCommand,
-  CreateMultipartUploadCommandInput,
   DeleteObjectCommand,
   GetObjectCommand,
   ListObjectsV2Command,
   PutObjectCommand,
-  PutObjectCommandInput,
-  UploadPartCommand
+  PutObjectCommandInput
 } from '@aws-sdk/client-s3'
+import { Upload } from '@aws-sdk/lib-storage'
 import { pipelinePromise } from '@server/helpers/core-utils'
 import { isArray } from '@server/helpers/custom-validators/misc'
 import { logger } from '@server/helpers/logger'
@@ -37,13 +32,12 @@ async function storeObject (options: {
   logger.debug('Uploading file %s to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
 
   const stats = await stat(inputPath)
+  const fileStream = createReadStream(inputPath)
 
-  // If bigger than max allowed size we do a multipart upload
   if (stats.size > CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART) {
-    return multiPartUpload({ inputPath, objectStorageKey, bucketInfo })
+    return multiPartUpload({ content: fileStream, objectStorageKey, bucketInfo })
   }
 
-  const fileStream = createReadStream(inputPath)
   return objectStoragePut({ objectStorageKey, content: fileStream, bucketInfo })
 }
 
@@ -163,18 +157,14 @@ async function objectStoragePut (options: {
 }
 
 async function multiPartUpload (options: {
-  inputPath: string
+  content: ReadStream
   objectStorageKey: string
   bucketInfo: BucketInfo
 }) {
-  const { objectStorageKey, inputPath, bucketInfo } = options
+  const { content, objectStorageKey, bucketInfo } = options
 
-  const key = buildKey(objectStorageKey, bucketInfo)
-  const s3Client = getClient()
-
-  const statResult = await stat(inputPath)
-
-  const input: CreateMultipartUploadCommandInput = {
+  const input: PutObjectCommandInput = {
+    Body: content,
     Bucket: bucketInfo.BUCKET_NAME,
     Key: buildKey(objectStorageKey, bucketInfo)
   }
@@ -183,60 +173,19 @@ async function multiPartUpload (options: {
     input.ACL = CONFIG.OBJECT_STORAGE.UPLOAD_ACL
   }
 
-  const createMultipartCommand = new CreateMultipartUploadCommand(input)
-  const createResponse = await s3Client.send(createMultipartCommand)
-
-  const fd = await open(inputPath, 'r')
-  let partNumber = 1
-  const parts: CompletedPart[] = []
-  const partSize = CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART
-
-  for (let start = 0; start < statResult.size; start += partSize) {
-    logger.debug(
-      'Uploading part %d of file to %s%s in bucket %s',
-      partNumber, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
-    )
-
-    // FIXME: Remove when https://github.com/aws/aws-sdk-js-v3/pull/2637 is released
-    // The s3 sdk needs to know the length of the http body beforehand, but doesn't support
-    // streams with start and end set, so it just tries to stat the file in stream.path.
-    // This fails for us because we only want to send part of the file. The stream type
-    // is modified so we can set the byteLength here, which s3 detects because array buffers
-    // have this field set
-    const stream: ReadStream & { byteLength: number } =
-      createReadStream(
-        inputPath,
-        { fd, autoClose: false, start, end: (start + partSize) - 1 }
-      ) as ReadStream & { byteLength: number }
-
-    // Calculate if the part size is more than what's left over, and in that case use left over bytes for byteLength
-    stream.byteLength = min([ statResult.size - start, partSize ])
-
-    const uploadPartCommand = new UploadPartCommand({
-      Bucket: bucketInfo.BUCKET_NAME,
-      Key: key,
-      UploadId: createResponse.UploadId,
-      PartNumber: partNumber,
-      Body: stream
-    })
-    const uploadResponse = await s3Client.send(uploadPartCommand)
-
-    parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber })
-    partNumber += 1
-  }
-  await close(fd)
-
-  const completeUploadCommand = new CompleteMultipartUploadCommand({
-    Bucket: bucketInfo.BUCKET_NAME,
-    Key: key,
-    UploadId: createResponse.UploadId,
-    MultipartUpload: { Parts: parts }
+  const parallelUploads3 = new Upload({
+    client: getClient(),
+    queueSize: 4,
+    partSize: CONFIG.OBJECT_STORAGE.MAX_UPLOAD_PART,
+    leavePartsOnError: false,
+    params: input
   })
-  await s3Client.send(completeUploadCommand)
+
+  await parallelUploads3.done()
 
   logger.debug(
-    'Completed %s%s in bucket %s in %d parts',
-    bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, partNumber - 1, lTags()
+    'Completed %s%s in bucket %s',
+    bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()
   )
 
   return getPrivateUrl(bucketInfo, objectStorageKey)

+ 33 - 9
server/tests/api/object-storage/videos.ts

@@ -1,9 +1,17 @@
 /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
 
 import 'mocha'
+import bytes from 'bytes'
 import * as chai from 'chai'
+import { stat } from 'fs-extra'
 import { merge } from 'lodash'
-import { checkTmpIsEmpty, expectLogDoesNotContain, expectStartWith, MockObjectStorage } from '@server/tests/shared'
+import {
+  checkTmpIsEmpty,
+  expectLogDoesNotContain,
+  expectStartWith,
+  generateHighBitrateVideo,
+  MockObjectStorage
+} from '@server/tests/shared'
 import { areObjectStorageTestsDisabled } from '@shared/core-utils'
 import { HttpStatusCode, VideoDetails } from '@shared/models'
 import {
@@ -107,6 +115,10 @@ async function checkFiles (options: {
 }
 
 function runTestSuite (options: {
+  fixture?: string
+
+  maxUploadPart?: string
+
   playlistBucket: string
   playlistPrefix?: string
 
@@ -114,10 +126,9 @@ function runTestSuite (options: {
   webtorrentPrefix?: string
 
   useMockBaseUrl?: boolean
-
-  maxUploadPart?: string
 }) {
   const mockObjectStorage = new MockObjectStorage()
+  const { fixture } = options
   let baseMockUrl: string
 
   let servers: PeerTubeServer[]
@@ -144,7 +155,7 @@ function runTestSuite (options: {
 
         credentials: ObjectStorageCommand.getCredentialsConfig(),
 
-        max_upload_part: options.maxUploadPart || '2MB',
+        max_upload_part: options.maxUploadPart || '5MB',
 
         streaming_playlists: {
           bucket_name: options.playlistBucket,
@@ -181,7 +192,7 @@ function runTestSuite (options: {
   it('Should upload a video and move it to the object storage without transcoding', async function () {
     this.timeout(40000)
 
-    const { uuid } = await servers[0].videos.quickUpload({ name: 'video 1' })
+    const { uuid } = await servers[0].videos.quickUpload({ name: 'video 1', fixture })
     uuidsToDelete.push(uuid)
 
     await waitJobs(servers)
@@ -197,7 +208,7 @@ function runTestSuite (options: {
   it('Should upload a video and move it to the object storage with transcoding', async function () {
     this.timeout(120000)
 
-    const { uuid } = await servers[1].videos.quickUpload({ name: 'video 2' })
+    const { uuid } = await servers[1].videos.quickUpload({ name: 'video 2', fixture })
     uuidsToDelete.push(uuid)
 
     await waitJobs(servers)
@@ -390,12 +401,25 @@ describe('Object storage for videos', function () {
     })
   })
 
-  describe('Test object storage with small upload part', function () {
+  describe('Test object storage with file bigger than upload part', function () {
+    let fixture: string
+    const maxUploadPart = '5MB'
+
+    before(async function () {
+      fixture = await generateHighBitrateVideo()
+
+      const { size } = await stat(fixture)
+
+      if (bytes.parse(maxUploadPart) > size) {
+        throw Error(`Fixture file is too small (${size}) to make sense for this test.`)
+      }
+    })
+
     runTestSuite({
+      maxUploadPart,
       playlistBucket: 'streaming-playlists',
       webtorrentBucket: 'videos',
-
-      maxUploadPart: '5KB'
+      fixture
     })
   })
 })

+ 37 - 6
yarn.lock

@@ -484,6 +484,16 @@
   dependencies:
     tslib "^2.3.1"
 
+"@aws-sdk/lib-storage@^3.72.0":
+  version "3.72.0"
+  resolved "https://registry.yarnpkg.com/@aws-sdk/lib-storage/-/lib-storage-3.72.0.tgz#035c577e306d6472aa5cb15220936262cb394763"
+  integrity sha512-z2L//IMN9fkXMhFyC0F9SXTH0oHA7zsOsLOyQS2hqKXAE3TGTK6d0hj6vmut4RH0wGzXOQ9zrh0DexAVdv29pA==
+  dependencies:
+    buffer "5.6.0"
+    events "3.3.0"
+    stream-browserify "3.0.0"
+    tslib "^2.3.1"
+
 "@aws-sdk/md5-js@3.58.0":
   version "3.58.0"
   resolved "https://registry.yarnpkg.com/@aws-sdk/md5-js/-/md5-js-3.58.0.tgz#a7ecf5cc8a81ce247fd620f8c981802d0427737f"
@@ -989,7 +999,12 @@
   resolved "https://registry.yarnpkg.com/@babel/parser/-/parser-7.16.4.tgz#d5f92f57cf2c74ffe9b37981c0e72fee7311372e"
   integrity sha512-6V0qdPUaiVHH3RtZeLIsc+6pDhbYzHR8ogA8w+f+Wc77DuXto19g2QUwveINoS34Uw+W8/hQDGJCx+i4n7xcng==
 
-"@babel/parser@7.17.9", "@babel/parser@^7.16.4", "@babel/parser@^7.16.7", "@babel/parser@^7.17.9", "@babel/parser@^7.6.0", "@babel/parser@^7.9.6":
+"@babel/parser@7.17.8":
+  version "7.17.8"
+  resolved "https://registry.yarnpkg.com/@babel/parser/-/parser-7.17.8.tgz#2817fb9d885dd8132ea0f8eb615a6388cca1c240"
+  integrity sha512-BoHhDJrJXqcg+ZL16Xv39H9n+AqJ4pcDrQBGZN+wHxIysrLZ3/ECwCBUch/1zUNhnsXULcONU3Ei5Hmkfk6kiQ==
+
+"@babel/parser@^7.16.4", "@babel/parser@^7.16.7", "@babel/parser@^7.17.9", "@babel/parser@^7.6.0", "@babel/parser@^7.9.6":
   version "7.17.9"
   resolved "https://registry.yarnpkg.com/@babel/parser/-/parser-7.17.9.tgz#9c94189a6062f0291418ca021077983058e171ef"
   integrity sha512-vqUSBLP8dQHFPdPi9bc5GK9vRkYHJ49fsZdtoJ8EQ8ibpwk5rPKfvNIwChB0KVXcIjcepEBBd2VHC5r9Gy8ueg==
@@ -2555,7 +2570,7 @@ base32.js@0.1.0:
   resolved "https://registry.yarnpkg.com/base32.js/-/base32.js-0.1.0.tgz#b582dec693c2f11e893cf064ee6ac5b6131a2202"
   integrity sha1-tYLexpPC8R6JPPBk7mrFthMaIgI=
 
-base64-js@^1.2.0, base64-js@^1.3.1:
+base64-js@^1.0.2, base64-js@^1.2.0, base64-js@^1.3.1:
   version "1.5.1"
   resolved "https://registry.yarnpkg.com/base64-js/-/base64-js-1.5.1.tgz#1b1b440160a5bf7ad40b650f095963481903930a"
   integrity sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==
@@ -2818,6 +2833,14 @@ buffer-writer@2.0.0:
   resolved "https://registry.yarnpkg.com/buffer-writer/-/buffer-writer-2.0.0.tgz#ce7eb81a38f7829db09c873f2fbb792c0c98ec04"
   integrity sha512-a7ZpuTZU1TRtnwyCNW3I5dc0wWNC3VR9S++Ewyk2HHZdrO3CQJqSpd+95Us590V6AL7JqUAH2IwZ/398PmNFgw==
 
+buffer@5.6.0:
+  version "5.6.0"
+  resolved "https://registry.yarnpkg.com/buffer/-/buffer-5.6.0.tgz#a31749dc7d81d84db08abf937b6b8c4033f62786"
+  integrity sha512-/gDYp/UtU0eA1ys8bOs9J6a+E/KWIY+DZ+Q2WESNUA0jFRsJOc0SNUO6xJ5SGA1xueg3NL65W6s+NY5l9cunuw==
+  dependencies:
+    base64-js "^1.0.2"
+    ieee754 "^1.1.4"
+
 buffer@^5.2.0:
   version "5.7.1"
   resolved "https://registry.yarnpkg.com/buffer/-/buffer-5.7.1.tgz#ba62e7c13133053582197160851a8f648e99eed0"
@@ -4228,7 +4251,7 @@ event-target-shim@^5.0.0:
   resolved "https://registry.yarnpkg.com/event-target-shim/-/event-target-shim-5.0.1.tgz#5d4d3ebdf9583d63a5333ce2deb7480ab2b05789"
   integrity sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==
 
-events@^3.3.0:
+events@3.3.0, events@^3.3.0:
   version "3.3.0"
   resolved "https://registry.yarnpkg.com/events/-/events-3.3.0.tgz#31a95ad0a924e2d2c419a813aeb2c4e878ea7400"
   integrity sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==
@@ -5006,7 +5029,7 @@ iconv-lite@0.6.3:
   dependencies:
     safer-buffer ">= 2.1.2 < 3.0.0"
 
-ieee754@^1.1.13, ieee754@^1.2.1:
+ieee754@^1.1.13, ieee754@^1.1.4, ieee754@^1.2.1:
   version "1.2.1"
   resolved "https://registry.yarnpkg.com/ieee754/-/ieee754-1.2.1.tgz#8eb7a10a63fff25d15a57b001586d177d1b0d352"
   integrity sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==
@@ -5066,7 +5089,7 @@ inflight@^1.0.4:
     once "^1.3.0"
     wrappy "1"
 
-inherits@2, inherits@2.0.4, inherits@^2.0.1, inherits@^2.0.3, inherits@^2.0.4, inherits@~2.0.1, inherits@~2.0.3:
+inherits@2, inherits@2.0.4, inherits@^2.0.1, inherits@^2.0.3, inherits@^2.0.4, inherits@~2.0.1, inherits@~2.0.3, inherits@~2.0.4:
   version "2.0.4"
   resolved "https://registry.yarnpkg.com/inherits/-/inherits-2.0.4.tgz#0fa2c64f932917c3433a0ded55363aae37416b7c"
   integrity sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==
@@ -7353,7 +7376,7 @@ readable-stream@^2.0.0, readable-stream@^2.2.2, readable-stream@~2.3.6:
     string_decoder "~1.1.1"
     util-deprecate "~1.0.1"
 
-readable-stream@^3.0.2, readable-stream@^3.0.6, readable-stream@^3.4.0, readable-stream@^3.6.0:
+readable-stream@^3.0.2, readable-stream@^3.0.6, readable-stream@^3.4.0, readable-stream@^3.5.0, readable-stream@^3.6.0:
   version "3.6.0"
   resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.6.0.tgz#337bbda3adc0706bd3e024426a286d4b4b2c9198"
   integrity sha512-BViHy7LKeTz4oNnkcLJ+lVSL6vpiFeX6/d3oSH8zCW7UxP2onchk+vTGB143xuFjHS3deTgkKoXXymXqymiIdA==
@@ -8017,6 +8040,14 @@ statuses@1.5.0, "statuses@>= 1.5.0 < 2", statuses@~1.5.0:
   resolved "https://registry.yarnpkg.com/statuses/-/statuses-1.5.0.tgz#161c7dac177659fd9811f43771fa99381478628c"
   integrity sha1-Fhx9rBd2Wf2YEfQ3cfqZOBR4Yow=
 
+stream-browserify@3.0.0:
+  version "3.0.0"
+  resolved "https://registry.yarnpkg.com/stream-browserify/-/stream-browserify-3.0.0.tgz#22b0a2850cdf6503e73085da1fc7b7d0c2122f2f"
+  integrity sha512-H73RAHsVBapbim0tU2JwwOiXUj+fikfiaoYAKHF3VJfA0pe2BCzkhAHBlLG6REzE+2WNZcxOXjK7lkso+9euLA==
+  dependencies:
+    inherits "~2.0.4"
+    readable-stream "^3.5.0"
+
 stream-combiner@~0.0.4:
   version "0.0.4"
   resolved "https://registry.yarnpkg.com/stream-combiner/-/stream-combiner-0.0.4.tgz#4d5e433c185261dde623ca3f44c586bcf5c4ad14"