Browse Source

Sign JSON objects in worker threads

Chocobozzz 7 months ago
parent
commit
edc3ff6085

+ 48 - 16
packages/tests/src/api/activitypub/security.ts

@@ -9,13 +9,21 @@ import {
   buildGlobalHTTPHeaders,
   signAndContextify
 } from '@peertube/peertube-server/core/helpers/activity-pub-utils.js'
-import { buildDigest } from '@peertube/peertube-server/core/helpers/peertube-crypto.js'
+import { buildDigest, signJsonLDObject } from '@peertube/peertube-server/core/helpers/peertube-crypto.js'
 import { ACTIVITY_PUB, HTTP_SIGNATURE } from '@peertube/peertube-server/core/initializers/constants.js'
 import { makePOSTAPRequest } from '@tests/shared/requests.js'
 import { SQLCommand } from '@tests/shared/sql-command.js'
 import { expect } from 'chai'
 import { readJsonSync } from 'fs-extra/esm'
 
+function signJsonLDObjectWithoutAssertion (options: Parameters<typeof signJsonLDObject>[0]) {
+  return signJsonLDObject({
+    ...options,
+
+    disableWorkerThreadAssertion: true
+  })
+}
+
 function fakeFilter () {
   return (data: any) => Promise.resolve(data)
 }
@@ -132,7 +140,7 @@ describe('Test ActivityPub security', function () {
 
     it('Should fail with an invalid date', async function () {
       const body = await activityPubContextify(getAnnounceWithoutContext(servers[1]), 'Announce', fakeFilter())
-      const headers = buildGlobalHTTPHeaders(body)
+      const headers = buildGlobalHTTPHeaders(body, buildDigest)
       headers['date'] = 'Wed, 21 Oct 2015 07:28:00 GMT'
 
       try {
@@ -148,7 +156,7 @@ describe('Test ActivityPub security', function () {
       await setKeysOfServer(sqlCommands[1], servers[1].url, invalidKeys.publicKey, invalidKeys.privateKey)
 
       const body = await activityPubContextify(getAnnounceWithoutContext(servers[1]), 'Announce', fakeFilter())
-      const headers = buildGlobalHTTPHeaders(body)
+      const headers = buildGlobalHTTPHeaders(body, buildDigest)
 
       try {
         await makePOSTAPRequest(url, body, baseHttpSignature(), headers)
@@ -163,7 +171,7 @@ describe('Test ActivityPub security', function () {
       await setKeysOfServer(sqlCommands[1], servers[1].url, keys.publicKey, keys.privateKey)
 
       const body = await activityPubContextify(getAnnounceWithoutContext(servers[1]), 'Announce', fakeFilter())
-      const headers = buildGlobalHTTPHeaders(body)
+      const headers = buildGlobalHTTPHeaders(body, buildDigest)
 
       const signatureOptions = baseHttpSignature()
       const badHeadersMatrix = [
@@ -186,7 +194,7 @@ describe('Test ActivityPub security', function () {
 
     it('Should succeed with a valid HTTP signature draft 11 (without date but with (created))', async function () {
       const body = await activityPubContextify(getAnnounceWithoutContext(servers[1]), 'Announce', fakeFilter())
-      const headers = buildGlobalHTTPHeaders(body)
+      const headers = buildGlobalHTTPHeaders(body, buildDigest)
 
       const signatureOptions = baseHttpSignature()
       signatureOptions.headers = [ '(request-target)', '(created)', 'host', 'digest' ]
@@ -197,7 +205,7 @@ describe('Test ActivityPub security', function () {
 
     it('Should succeed with a valid HTTP signature', async function () {
       const body = await activityPubContextify(getAnnounceWithoutContext(servers[1]), 'Announce', fakeFilter())
-      const headers = buildGlobalHTTPHeaders(body)
+      const headers = buildGlobalHTTPHeaders(body, buildDigest)
 
       const { statusCode } = await makePOSTAPRequest(url, body, baseHttpSignature(), headers)
       expect(statusCode).to.equal(HttpStatusCode.NO_CONTENT_204)
@@ -216,7 +224,7 @@ describe('Test ActivityPub security', function () {
       await servers[1].run()
 
       const body = await activityPubContextify(getAnnounceWithoutContext(servers[1]), 'Announce', fakeFilter())
-      const headers = buildGlobalHTTPHeaders(body)
+      const headers = buildGlobalHTTPHeaders(body, buildDigest)
 
       try {
         await makePOSTAPRequest(url, body, baseHttpSignature(), headers)
@@ -247,9 +255,15 @@ describe('Test ActivityPub security', function () {
       body.actor = servers[2].url + '/accounts/peertube'
 
       const signer: any = { privateKey: invalidKeys.privateKey, url: servers[2].url + '/accounts/peertube' }
-      const signedBody = await signAndContextify(signer, body, 'Announce', fakeFilter())
+      const signedBody = await signAndContextify({
+        byActor: signer,
+        data: body,
+        contextType: 'Announce',
+        contextFilter: fakeFilter(),
+        signerFunction: signJsonLDObjectWithoutAssertion
+      })
 
-      const headers = buildGlobalHTTPHeaders(signedBody)
+      const headers = buildGlobalHTTPHeaders(signedBody, buildDigest)
 
       try {
         await makePOSTAPRequest(url, signedBody, baseHttpSignature(), headers)
@@ -267,11 +281,17 @@ describe('Test ActivityPub security', function () {
       body.actor = servers[2].url + '/accounts/peertube'
 
       const signer: any = { privateKey: keys.privateKey, url: servers[2].url + '/accounts/peertube' }
-      const signedBody = await signAndContextify(signer, body, 'Announce', fakeFilter())
+      const signedBody: any = await signAndContextify({
+        byActor: signer,
+        data: body,
+        contextType: 'Announce',
+        contextFilter: fakeFilter(),
+        signerFunction: signJsonLDObjectWithoutAssertion
+      })
 
       signedBody.actor = servers[2].url + '/account/peertube'
 
-      const headers = buildGlobalHTTPHeaders(signedBody)
+      const headers = buildGlobalHTTPHeaders(signedBody, buildDigest)
 
       try {
         await makePOSTAPRequest(url, signedBody, baseHttpSignature(), headers)
@@ -286,9 +306,15 @@ describe('Test ActivityPub security', function () {
       body.actor = servers[2].url + '/accounts/peertube'
 
       const signer: any = { privateKey: keys.privateKey, url: servers[2].url + '/accounts/peertube' }
-      const signedBody = await signAndContextify(signer, body, 'Announce', fakeFilter())
+      const signedBody = await signAndContextify({
+        byActor: signer,
+        data: body,
+        contextType: 'Announce',
+        contextFilter: fakeFilter(),
+        signerFunction: signJsonLDObjectWithoutAssertion
+      })
 
-      const headers = buildGlobalHTTPHeaders(signedBody)
+      const headers = buildGlobalHTTPHeaders(signedBody, buildDigest)
 
       const { statusCode } = await makePOSTAPRequest(url, signedBody, baseHttpSignature(), headers)
       expect(statusCode).to.equal(HttpStatusCode.NO_CONTENT_204)
@@ -308,9 +334,15 @@ describe('Test ActivityPub security', function () {
       body.actor = servers[2].url + '/accounts/peertube'
 
       const signer: any = { privateKey: keys.privateKey, url: servers[2].url + '/accounts/peertube' }
-      const signedBody = await signAndContextify(signer, body, 'Announce', fakeFilter())
-
-      const headers = buildGlobalHTTPHeaders(signedBody)
+      const signedBody = await signAndContextify({
+        byActor: signer,
+        data: body,
+        contextType: 'Announce',
+        contextFilter: fakeFilter(),
+        signerFunction: signJsonLDObjectWithoutAssertion
+      })
+
+      const headers = buildGlobalHTTPHeaders(signedBody, buildDigest)
 
       try {
         await makePOSTAPRequest(url, signedBody, baseHttpSignature(), headers)

+ 24 - 3
packages/tests/src/server-helpers/activitypub.ts

@@ -5,13 +5,22 @@ import { signAndContextify } from '@peertube/peertube-server/core/helpers/activi
 import {
   isHTTPSignatureVerified,
   isJsonLDSignatureVerified,
-  parseHTTPSignature
+  parseHTTPSignature,
+  signJsonLDObject
 } from '@peertube/peertube-server/core/helpers/peertube-crypto.js'
 import { buildRequestStub } from '@tests/shared/tests.js'
 import { expect } from 'chai'
 import { readJsonSync } from 'fs-extra/esm'
 import cloneDeep from 'lodash-es/cloneDeep.js'
 
+function signJsonLDObjectWithoutAssertion (options: Parameters<typeof signJsonLDObject>[0]) {
+  return signJsonLDObject({
+    ...options,
+
+    disableWorkerThreadAssertion: true
+  })
+}
+
 function fakeFilter () {
   return (data: any) => Promise.resolve(data)
 }
@@ -55,7 +64,13 @@ describe('Test activity pub helpers', function () {
       const body = readJsonSync(buildAbsoluteFixturePath('./ap-json/peertube/announce-without-context.json'))
 
       const actorSignature = { url: 'http://localhost:9002/accounts/peertube', privateKey: keys.privateKey }
-      const signedBody = await signAndContextify(actorSignature as any, body, 'Announce', fakeFilter())
+      const signedBody = await signAndContextify({
+        byActor: actorSignature as any,
+        data: body,
+        contextType: 'Announce',
+        contextFilter: fakeFilter(),
+        signerFunction: signJsonLDObjectWithoutAssertion
+      })
 
       const fromActor = { publicKey: keys.publicKey, url: 'http://localhost:9002/accounts/peertube' }
       const result = await isJsonLDSignatureVerified(fromActor as any, signedBody)
@@ -68,7 +83,13 @@ describe('Test activity pub helpers', function () {
       const body = readJsonSync(buildAbsoluteFixturePath('./ap-json/peertube/announce-without-context.json'))
 
       const actorSignature = { url: 'http://localhost:9002/accounts/peertube', privateKey: keys.privateKey }
-      const signedBody = await signAndContextify(actorSignature as any, body, 'Announce', fakeFilter())
+      const signedBody = await signAndContextify({
+        byActor: actorSignature as any,
+        data: body,
+        contextType: 'Announce',
+        contextFilter: fakeFilter(),
+        signerFunction: signJsonLDObjectWithoutAssertion
+      })
 
       const fromActor = { publicKey: keys.publicKey, url: 'http://localhost:9002/accounts/peertube' }
       const result = await isJsonLDSignatureVerified(fromActor as any, signedBody)

+ 15 - 9
server/core/helpers/activity-pub-utils.ts

@@ -2,11 +2,14 @@ import { ContextType } from '@peertube/peertube-models'
 import { ACTIVITY_PUB } from '@server/initializers/constants.js'
 import { buildDigest, signJsonLDObject } from './peertube-crypto.js'
 
-type ContextFilter = <T> (arg: T) => Promise<T>
+export type ContextFilter = <T> (arg: T) => Promise<T>
 
-export function buildGlobalHTTPHeaders (body: any) {
+export function buildGlobalHTTPHeaders (
+  body: any,
+  digestBuilder: typeof buildDigest
+) {
   return {
-    'digest': buildDigest(body),
+    'digest': digestBuilder(body),
     'content-type': 'application/activity+json',
     'accept': ACTIVITY_PUB.ACCEPT_HEADER
   }
@@ -16,17 +19,20 @@ export async function activityPubContextify <T> (data: T, type: ContextType, con
   return { ...await getContextData(type, contextFilter), ...data }
 }
 
-export async function signAndContextify <T> (
-  byActor: { url: string, privateKey: string },
-  data: T,
-  contextType: ContextType | null,
+export async function signAndContextify <T> (options: {
+  byActor: { url: string, privateKey: string }
+  data: T
+  contextType: ContextType | null
   contextFilter: ContextFilter
-) {
+  signerFunction: typeof signJsonLDObject<T>
+}) {
+  const { byActor, data, contextType, contextFilter, signerFunction } = options
+
   const activity = contextType
     ? await activityPubContextify(data, contextType, contextFilter)
     : data
 
-  return signJsonLDObject(byActor, activity)
+  return signerFunction({ byActor, data: activity })
 }
 
 // ---------------------------------------------------------------------------

+ 10 - 1
server/core/helpers/peertube-crypto.ts

@@ -7,6 +7,7 @@ import { BCRYPT_SALT_SIZE, ENCRYPTION, HTTP_SIGNATURE, PRIVATE_RSA_KEY_SIZE } fr
 import { MActor } from '../types/models/index.js'
 import { generateRSAKeyPairPromise, randomBytesPromise, scryptPromise } from './core-utils.js'
 import { logger } from './logger.js'
+import { assertIsInWorkerThread } from './threads.js'
 
 function createPrivateAndPublicKeys () {
   logger.info('Generating a RSA key...')
@@ -94,7 +95,15 @@ async function isJsonLDRSA2017Verified (fromActor: MActor, signedDocument: any)
   return verify.verify(fromActor.publicKey, signedDocument.signature.signatureValue, 'base64')
 }
 
-async function signJsonLDObject <T> (byActor: { url: string, privateKey: string }, data: T) {
+async function signJsonLDObject <T> (options: {
+  byActor: { url: string, privateKey: string }
+  data: T
+  disableWorkerThreadAssertion?: boolean
+}) {
+  const { byActor, data, disableWorkerThreadAssertion = false } = options
+
+  if (!disableWorkerThreadAssertion) assertIsInWorkerThread()
+
   const signature = {
     type: 'RsaSignature2017',
     creator: byActor.url,

+ 8 - 0
server/core/helpers/threads.ts

@@ -0,0 +1,8 @@
+import { isMainThread } from 'node:worker_threads'
+import { logger } from './logger.js'
+
+export function assertIsInWorkerThread () {
+  if (!isMainThread) return
+
+  logger.error('Caller is not in worker thread', { stack: new Error().stack })
+}

+ 8 - 0
server/core/initializers/constants.ts

@@ -976,6 +976,14 @@ const WORKER_THREADS = {
   GET_IMAGE_SIZE: {
     CONCURRENCY: 1,
     MAX_THREADS: 5
+  },
+  SIGN_JSON_LD_OBJECT: {
+    CONCURRENCY: 1,
+    MAX_THREADS: 2
+  },
+  BUILD_DIGEST: {
+    CONCURRENCY: 1,
+    MAX_THREADS: 2
   }
 }
 

+ 18 - 3
server/core/lib/activitypub/send/http.ts

@@ -1,10 +1,11 @@
 import { ContextType } from '@peertube/peertube-models'
-import { signAndContextify } from '@server/helpers/activity-pub-utils.js'
-import { HTTP_SIGNATURE } from '@server/initializers/constants.js'
+import { ACTIVITY_PUB, HTTP_SIGNATURE } from '@server/initializers/constants.js'
 import { ActorModel } from '@server/models/actor/actor.js'
 import { getServerActor } from '@server/models/application/application.js'
 import { MActor } from '@server/types/models/index.js'
 import { getContextFilter } from '../context.js'
+import { buildDigestFromWorker, signJsonLDObjectFromWorker } from '@server/lib/worker/parent-process.js'
+import { signAndContextify } from '@server/helpers/activity-pub-utils.js'
 
 type Payload <T> = { body: T, contextType: ContextType, signatureActorId?: number }
 
@@ -17,12 +18,26 @@ export async function computeBody <T> (
     const actorSignature = await ActorModel.load(payload.signatureActorId)
     if (!actorSignature) throw new Error('Unknown signature actor id.')
 
-    body = await signAndContextify(actorSignature, payload.body, payload.contextType, getContextFilter())
+    body = await signAndContextify({
+      byActor: { url: actorSignature.url, privateKey: actorSignature.privateKey },
+      data: payload.body,
+      contextType: payload.contextType,
+      contextFilter: getContextFilter(),
+      signerFunction: signJsonLDObjectFromWorker
+    })
   }
 
   return body
 }
 
+export async function buildGlobalHTTPHeaders (body: any) {
+  return {
+    'digest': await buildDigestFromWorker(body),
+    'content-type': 'application/activity+json',
+    'accept': ACTIVITY_PUB.ACCEPT_HEADER
+  }
+}
+
 export async function buildSignedRequestOptions (options: {
   signatureActorId?: number
   hasPayload: boolean

+ 2 - 3
server/core/lib/job-queue/handlers/activitypub-http-broadcast.ts

@@ -1,7 +1,6 @@
 import { Job } from 'bullmq'
 import { ActivitypubHttpBroadcastPayload } from '@peertube/peertube-models'
-import { buildGlobalHTTPHeaders } from '@server/helpers/activity-pub-utils.js'
-import { buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send/index.js'
+import { buildGlobalHTTPHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send/http.js'
 import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache.js'
 import { parallelHTTPBroadcastFromWorker, sequentialHTTPBroadcastFromWorker } from '@server/lib/worker/parent-process.js'
 import { logger } from '../../../helpers/logger.js'
@@ -45,6 +44,6 @@ async function buildRequestOptions (payload: ActivitypubHttpBroadcastPayload) {
     method: 'POST' as 'POST',
     json: body,
     httpSignature: httpSignatureOptions,
-    headers: buildGlobalHTTPHeaders(body)
+    headers: await buildGlobalHTTPHeaders(body)
   }
 }

+ 2 - 3
server/core/lib/job-queue/handlers/activitypub-http-unicast.ts

@@ -1,7 +1,6 @@
 import { Job } from 'bullmq'
 import { ActivitypubHttpUnicastPayload } from '@peertube/peertube-models'
-import { buildGlobalHTTPHeaders } from '@server/helpers/activity-pub-utils.js'
-import { buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send/index.js'
+import { buildGlobalHTTPHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send/http.js'
 import { logger } from '../../../helpers/logger.js'
 import { doRequest } from '../../../helpers/requests.js'
 import { ActorFollowHealthCache } from '../../actor-follow-health-cache.js'
@@ -19,7 +18,7 @@ async function processActivityPubHttpUnicast (job: Job) {
     method: 'POST' as 'POST',
     json: body,
     httpSignature: httpSignatureOptions,
-    headers: buildGlobalHTTPHeaders(body)
+    headers: await buildGlobalHTTPHeaders(body)
   }
 
   try {

+ 45 - 11
server/core/lib/worker/parent-process.ts

@@ -5,10 +5,12 @@ import type httpBroadcast from './workers/http-broadcast.js'
 import type downloadImage from './workers/image-downloader.js'
 import type processImage from './workers/image-processor.js'
 import type getImageSize from './workers/get-image-size.js'
+import type signJsonLDObject from './workers/sign-json-ld-object.js'
+import type buildDigest from './workers/build-digest.js'
 
 let downloadImageWorker: Piscina
 
-function downloadImageFromWorker (options: Parameters<typeof downloadImage>[0]): Promise<ReturnType<typeof downloadImage>> {
+export function downloadImageFromWorker (options: Parameters<typeof downloadImage>[0]): Promise<ReturnType<typeof downloadImage>> {
   if (!downloadImageWorker) {
     downloadImageWorker = new Piscina({
       filename: new URL(join('workers', 'image-downloader.js'), import.meta.url).href,
@@ -24,7 +26,7 @@ function downloadImageFromWorker (options: Parameters<typeof downloadImage>[0]):
 
 let processImageWorker: Piscina
 
-function processImageFromWorker (options: Parameters<typeof processImage>[0]): Promise<ReturnType<typeof processImage>> {
+export function processImageFromWorker (options: Parameters<typeof processImage>[0]): Promise<ReturnType<typeof processImage>> {
   if (!processImageWorker) {
     processImageWorker = new Piscina({
       filename: new URL(join('workers', 'image-processor.js'), import.meta.url).href,
@@ -40,7 +42,7 @@ function processImageFromWorker (options: Parameters<typeof processImage>[0]): P
 
 let getImageSizeWorker: Piscina
 
-function getImageSizeFromWorker (options: Parameters<typeof getImageSize>[0]): Promise<ReturnType<typeof getImageSize>> {
+export function getImageSizeFromWorker (options: Parameters<typeof getImageSize>[0]): Promise<ReturnType<typeof getImageSize>> {
   if (!getImageSizeWorker) {
     getImageSizeWorker = new Piscina({
       filename: new URL(join('workers', 'get-image-size.js'), import.meta.url).href,
@@ -56,7 +58,7 @@ function getImageSizeFromWorker (options: Parameters<typeof getImageSize>[0]): P
 
 let parallelHTTPBroadcastWorker: Piscina
 
-function parallelHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadcast>[0]): Promise<ReturnType<typeof httpBroadcast>> {
+export function parallelHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadcast>[0]): Promise<ReturnType<typeof httpBroadcast>> {
   if (!parallelHTTPBroadcastWorker) {
     parallelHTTPBroadcastWorker = new Piscina({
       filename: new URL(join('workers', 'http-broadcast.js'), import.meta.url).href,
@@ -73,7 +75,9 @@ function parallelHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadca
 
 let sequentialHTTPBroadcastWorker: Piscina
 
-function sequentialHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadcast>[0]): Promise<ReturnType<typeof httpBroadcast>> {
+export function sequentialHTTPBroadcastFromWorker (
+  options: Parameters<typeof httpBroadcast>[0]
+): Promise<ReturnType<typeof httpBroadcast>> {
   if (!sequentialHTTPBroadcastWorker) {
     sequentialHTTPBroadcastWorker = new Piscina({
       filename: new URL(join('workers', 'http-broadcast.js'), import.meta.url).href,
@@ -86,10 +90,40 @@ function sequentialHTTPBroadcastFromWorker (options: Parameters<typeof httpBroad
   return sequentialHTTPBroadcastWorker.run(options)
 }
 
-export {
-  downloadImageFromWorker,
-  processImageFromWorker,
-  parallelHTTPBroadcastFromWorker,
-  getImageSizeFromWorker,
-  sequentialHTTPBroadcastFromWorker
+// ---------------------------------------------------------------------------
+
+let signJsonLDObjectWorker: Piscina
+
+export function signJsonLDObjectFromWorker <T> (
+  options: Parameters<typeof signJsonLDObject<T>>[0]
+): ReturnType<typeof signJsonLDObject<T>> {
+  if (!signJsonLDObjectWorker) {
+    signJsonLDObjectWorker = new Piscina({
+      filename: new URL(join('workers', 'sign-json-ld-object.js'), import.meta.url).href,
+      // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs
+      concurrentTasksPerWorker: WORKER_THREADS.SIGN_JSON_LD_OBJECT.CONCURRENCY,
+      maxThreads: WORKER_THREADS.SIGN_JSON_LD_OBJECT.MAX_THREADS
+    })
+  }
+
+  return signJsonLDObjectWorker.run(options)
+}
+
+// ---------------------------------------------------------------------------
+
+let buildDigestWorker: Piscina
+
+export function buildDigestFromWorker (
+  options: Parameters<typeof buildDigest>[0]
+): Promise<ReturnType<typeof buildDigest>> {
+  if (!buildDigestWorker) {
+    buildDigestWorker = new Piscina({
+      filename: new URL(join('workers', 'build-digest.js'), import.meta.url).href,
+      // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs
+      concurrentTasksPerWorker: WORKER_THREADS.BUILD_DIGEST.CONCURRENCY,
+      maxThreads: WORKER_THREADS.BUILD_DIGEST.MAX_THREADS
+    })
+  }
+
+  return buildDigestWorker.run(options)
 }

+ 3 - 0
server/core/lib/worker/workers/build-digest.ts

@@ -0,0 +1,3 @@
+import { buildDigest } from '@server/helpers/peertube-crypto.js'
+
+export default buildDigest

+ 3 - 0
server/core/lib/worker/workers/sign-json-ld-object.ts

@@ -0,0 +1,3 @@
+import { signJsonLDObject } from '@server/helpers/peertube-crypto.js'
+
+export default signJsonLDObject