diff --git a/package-lock.json b/package-lock.json index 8ee7081d6a..d6d7593c82 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11578,6 +11578,12 @@ "node": ">= 0.8" } }, + "node_modules/comlink": { + "version": "4.4.2", + "resolved": "https://registry.npmjs.org/comlink/-/comlink-4.4.2.tgz", + "integrity": "sha512-OxGdvBmJuNKSCMO4NTl1L47VRp6xn2wG4F/2hYzB6tiCb709otOxtEYCSvK80PtjODfXXZu8ds+Nw5kVCjqd2g==", + "license": "Apache-2.0" + }, "node_modules/commander": { "version": "14.0.2", "resolved": "https://registry.npmjs.org/commander/-/commander-14.0.2.tgz", @@ -29067,6 +29073,7 @@ "@streamr/proto-rpc": "103.2.0", "@streamr/trackerless-network": "103.2.0", "@streamr/utils": "103.2.0", + "comlink": "^4.4.2", "core-js": "^3.47.0", "env-paths": "^2.2.1", "ethers": "^6.13.0", diff --git a/packages/sdk/package.json b/packages/sdk/package.json index e586b916f5..008af1e844 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -13,7 +13,9 @@ "script": "./dist/streamr-sdk.web.min.js", "browser": { "./src/utils/persistence/ServerPersistence.ts": "./src/utils/persistence/BrowserPersistence.mts", - "./dist/src/utils/persistence/ServerPersistence.js": "./dist/src/utils/persistence/BrowserPersistence.mjs" + "./dist/src/utils/persistence/ServerPersistence.js": "./dist/src/utils/persistence/BrowserPersistence.mjs", + "./src/signature/ServerSignatureValidation.ts": "./src/signature/BrowserSignatureValidation.mts", + "./dist/src/signature/ServerSignatureValidation.js": "./dist/src/signature/BrowserSignatureValidation.mjs" }, "exports": { "default": { @@ -103,6 +105,7 @@ "@streamr/proto-rpc": "103.2.0", "@streamr/trackerless-network": "103.2.0", "@streamr/utils": "103.2.0", + "comlink": "^4.4.2", "core-js": "^3.47.0", "env-paths": "^2.2.1", "ethers": "^6.13.0", diff --git a/packages/sdk/src/signature/BrowserSignatureValidation.mts b/packages/sdk/src/signature/BrowserSignatureValidation.mts new file mode 100644 index 0000000000..55080d3d46 --- /dev/null +++ b/packages/sdk/src/signature/BrowserSignatureValidation.mts @@ -0,0 +1,35 @@ +/** + * Browser implementation of signature validation using Web Worker. + * This offloads CPU-intensive cryptographic operations to a separate thread. + */ +import * as Comlink from 'comlink' +import { SignatureValidationContext } from './SignatureValidationContext.js' +import { SignatureValidationResult, toSignatureValidationData } from './signatureValidation.js' +import type { SignatureValidationWorkerApi } from './SignatureValidationWorker.js' +import { StreamMessage } from '../protocol/StreamMessage.js' + +export default class BrowserSignatureValidation implements SignatureValidationContext { + private worker: Worker + private workerApi: Comlink.Remote + + constructor() { + // Webpack 5 handles this pattern automatically, creating a separate chunk for the worker + this.worker = new Worker( + /* webpackChunkName: "signature-worker" */ + new URL('./SignatureValidationWorker.js', import.meta.url) + ) + this.workerApi = Comlink.wrap(this.worker) + } + + async validateSignature(message: StreamMessage): Promise { + // Convert class instance to plain serializable data before sending to worker + const data = toSignatureValidationData(message) + return this.workerApi.validateSignature(data) + } + + destroy(): void { + if (this.worker) { + this.worker.terminate() + } + } +} diff --git a/packages/sdk/src/signature/ServerSignatureValidation.ts b/packages/sdk/src/signature/ServerSignatureValidation.ts new file mode 100644 index 0000000000..4321c1a067 --- /dev/null +++ b/packages/sdk/src/signature/ServerSignatureValidation.ts @@ -0,0 +1,35 @@ +import * as Comlink from 'comlink' +// eslint-disable-next-line no-restricted-imports +import nodeEndpoint from 'comlink/dist/umd/node-adapter' +import { Worker } from 'worker_threads' +import { StreamMessage } from '../protocol/StreamMessage' +import { SignatureValidationContext } from './SignatureValidationContext' +import { SignatureValidationWorkerApi } from './SignatureValidationWorker' +import { SignatureValidationResult, toSignatureValidationData } from './signatureValidation' +import { join } from 'path' + +export default class ServerSignatureValidation implements SignatureValidationContext { + + private readonly worker: Worker + private readonly workerApi: Comlink.Remote + constructor() { + const isRunningFromDist = __dirname.includes('/dist/') + const workerPath = isRunningFromDist + ? join(__dirname, 'SignatureValidationWorker.js') + : join(__dirname, '../../dist/src/signature/SignatureValidationWorker.js') + this.worker = new Worker(workerPath) + this.workerApi = Comlink.wrap(nodeEndpoint(this.worker)) + } + + async validateSignature(message: StreamMessage): Promise { + // Convert class instance to plain serializable data before sending to worker + const data = toSignatureValidationData(message) + return this.workerApi.validateSignature(data) + } + + // eslint-disable-next-line class-methods-use-this + destroy(): void { + // No-op for server implementation + } +} + diff --git a/packages/sdk/src/signature/SignatureValidationContext.ts b/packages/sdk/src/signature/SignatureValidationContext.ts new file mode 100644 index 0000000000..77f8afb4a0 --- /dev/null +++ b/packages/sdk/src/signature/SignatureValidationContext.ts @@ -0,0 +1,12 @@ +/** + * Interface for signature validation backend. + * Browser implementation uses a Web Worker, Node.js runs on main thread. + */ +import { StreamMessage } from '../protocol/StreamMessage' +import { SignatureValidationResult } from './signatureValidation' + +export interface SignatureValidationContext { + validateSignature(message: StreamMessage): Promise + destroy(): void +} + diff --git a/packages/sdk/src/signature/SignatureValidationWorker.ts b/packages/sdk/src/signature/SignatureValidationWorker.ts new file mode 100644 index 0000000000..11c8cf1626 --- /dev/null +++ b/packages/sdk/src/signature/SignatureValidationWorker.ts @@ -0,0 +1,31 @@ +import * as Comlink from 'comlink' +import { validateSignatureData, SignatureValidationResult, SignatureValidationData } from './signatureValidation' + +const workerApi = { + validateSignature: async (data: SignatureValidationData): Promise => { + return validateSignatureData(data) + } +} + +export type SignatureValidationWorkerApi = typeof workerApi + +// Detect environment and expose accordingly +// Check for Node.js worker_threads first, since `self` is defined in both environments +// but only browser Web Workers have WorkerGlobalScope with addEventListener +let parentPort: import('worker_threads').MessagePort | null = null +try { + // eslint-disable-next-line @typescript-eslint/no-require-imports + parentPort = require('worker_threads').parentPort +} catch { + // Not in Node.js environment +} + +if (parentPort) { + // Node.js Worker Thread + // eslint-disable-next-line @typescript-eslint/no-require-imports + const nodeEndpoint = require('comlink/dist/umd/node-adapter') + Comlink.expose(workerApi, nodeEndpoint(parentPort)) +} else { + // Browser Web Worker + Comlink.expose(workerApi) +} diff --git a/packages/sdk/src/signature/SignatureValidator.ts b/packages/sdk/src/signature/SignatureValidator.ts index 0c7ba17605..bbbf0ac3d9 100644 --- a/packages/sdk/src/signature/SignatureValidator.ts +++ b/packages/sdk/src/signature/SignatureValidator.ts @@ -1,26 +1,30 @@ -import { toEthereumAddress, toUserIdRaw, SigningUtil } from '@streamr/utils' +import { toEthereumAddress } from '@streamr/utils' import { Lifecycle, scoped } from 'tsyringe' import { ERC1271ContractFacade } from '../contracts/ERC1271ContractFacade' +import { DestroySignal } from '../DestroySignal' import { StreamMessage } from '../protocol/StreamMessage' import { StreamrClientError } from '../StreamrClientError' -import { createLegacySignaturePayload } from './createLegacySignaturePayload' import { createSignaturePayload } from './createSignaturePayload' +import { SignatureValidationContext } from './SignatureValidationContext' +// This import will be swapped to BrowserSignatureValidation.mts in browser builds +import SignatureValidation from './ServerSignatureValidation' import { SignatureType } from '@streamr/trackerless-network' -import { IDENTITY_MAPPING } from '../identity/IdentityMapping' - -// Lookup structure SignatureType -> SigningUtil -const signingUtilBySignatureType: Record = Object.fromEntries( - IDENTITY_MAPPING.map((idMapping) => [idMapping.signatureType, SigningUtil.getInstance(idMapping.keyType)]) -) - -const evmSigner = SigningUtil.getInstance('ECDSA_SECP256K1_EVM') @scoped(Lifecycle.ContainerScoped) export class SignatureValidator { private readonly erc1271ContractFacade: ERC1271ContractFacade + private validationContext: SignatureValidationContext | undefined - constructor(erc1271ContractFacade: ERC1271ContractFacade) { + constructor( + erc1271ContractFacade: ERC1271ContractFacade, + destroySignal: DestroySignal + ) { this.erc1271ContractFacade = erc1271ContractFacade + destroySignal.onDestroy.listen(() => this.destroy()) + } + + private getValidationContext(): SignatureValidationContext { + return this.validationContext ??= new SignatureValidation() } /** @@ -41,36 +45,33 @@ export class SignatureValidator { } private async validate(streamMessage: StreamMessage): Promise { - const signingUtil = signingUtilBySignatureType[streamMessage.signatureType] - - // Common case - if (signingUtil) { - return signingUtil.verifySignature( - toUserIdRaw(streamMessage.getPublisherId()), - createSignaturePayload(streamMessage), - streamMessage.signature - ) - } - - // Special handling: different payload computation, same SigningUtil - if (streamMessage.signatureType === SignatureType.ECDSA_SECP256K1_LEGACY) { - return evmSigner.verifySignature( - // publisherId is hex encoded Ethereum address string - toUserIdRaw(streamMessage.getPublisherId()), - createLegacySignaturePayload(streamMessage), - streamMessage.signature - ) - } - - // Special handling: check signature with ERC-1271 contract facade if (streamMessage.signatureType === SignatureType.ERC_1271) { return this.erc1271ContractFacade.isValidSignature( - toEthereumAddress(streamMessage.getPublisherId()), + toEthereumAddress(streamMessage.messageId.publisherId), createSignaturePayload(streamMessage), streamMessage.signature ) } + const result = await this.getValidationContext().validateSignature(streamMessage) + switch (result.type) { + case 'valid': + return true + case 'invalid': + return false + case 'error': + throw new Error(result.message) + default: + throw new Error(`Unknown signature validation result type '${result}'`) + } + } - throw new Error(`Cannot validate message signature, unsupported signatureType: "${streamMessage.signatureType}"`) + /** + * Cleanup worker resources when the validator is no longer needed. + */ + destroy(): void { + if (this.validationContext) { + this.validationContext.destroy() + this.validationContext = undefined + } } } diff --git a/packages/sdk/src/signature/createLegacySignaturePayload.ts b/packages/sdk/src/signature/createLegacySignaturePayload.ts index 3aadd0828e..42c785c2fe 100644 --- a/packages/sdk/src/signature/createLegacySignaturePayload.ts +++ b/packages/sdk/src/signature/createLegacySignaturePayload.ts @@ -1,7 +1,6 @@ import { binaryToHex, binaryToUtf8 } from '@streamr/utils' import { EncryptedGroupKey, EncryptionType } from '@streamr/trackerless-network' -import { MessageID } from '../protocol/MessageID' -import { MessageRef } from '../protocol/MessageRef' +import { MessageIdLike, MessageRefLike } from './createSignaturePayload' const serializeGroupKey = ({ id, data }: EncryptedGroupKey): string => { return JSON.stringify([id, binaryToHex(data)]) @@ -11,10 +10,10 @@ const serializeGroupKey = ({ id, data }: EncryptedGroupKey): string => { * Only to be used for LEGACY_SECP256K1 signature type. */ export const createLegacySignaturePayload = (opts: { - messageId: MessageID + messageId: MessageIdLike content: Uint8Array encryptionType: EncryptionType - prevMsgRef?: MessageRef + prevMsgRef?: MessageRefLike newGroupKey?: EncryptedGroupKey }): Uint8Array => { const prev = ((opts.prevMsgRef !== undefined) ? `${opts.prevMsgRef.timestamp}${opts.prevMsgRef.sequenceNumber}` : '') diff --git a/packages/sdk/src/signature/createSignaturePayload.ts b/packages/sdk/src/signature/createSignaturePayload.ts index 9cfadf7d32..0162139680 100644 --- a/packages/sdk/src/signature/createSignaturePayload.ts +++ b/packages/sdk/src/signature/createSignaturePayload.ts @@ -3,16 +3,34 @@ import { GroupKeyRequest as NewGroupKeyRequest, GroupKeyResponse as NewGroupKeyResponse } from '@streamr/trackerless-network' -import { utf8ToBinary } from '@streamr/utils' -import { MessageID } from '../protocol/MessageID' -import { MessageRef } from '../protocol/MessageRef' +import { StreamID, UserID, utf8ToBinary } from '@streamr/utils' import { StreamMessageType } from '../protocol/StreamMessage' +/** + * Plain data for message ID - accepts class instances or plain objects with the same properties. + */ +export interface MessageIdLike { + streamId: StreamID + streamPartition: number + timestamp: number + sequenceNumber: number + publisherId: UserID + msgChainId: string +} + +/** + * Plain data for message reference - accepts class instances or plain objects with the same properties. + */ +export interface MessageRefLike { + timestamp: number + sequenceNumber: number +} + export const createSignaturePayload = (opts: { - messageId: MessageID + messageId: MessageIdLike content: Uint8Array messageType: StreamMessageType - prevMsgRef?: MessageRef + prevMsgRef?: MessageRefLike newGroupKey?: EncryptedGroupKey }): Uint8Array | never => { const header = Buffer.concat([ diff --git a/packages/sdk/src/signature/signatureValidation.ts b/packages/sdk/src/signature/signatureValidation.ts new file mode 100644 index 0000000000..6f1cd3d348 --- /dev/null +++ b/packages/sdk/src/signature/signatureValidation.ts @@ -0,0 +1,112 @@ +/** + * Core signature validation logic - shared between worker and main thread implementations. + * This file contains pure cryptographic validation functions without any network dependencies. + */ +import { SigningUtil, toUserIdRaw } from '@streamr/utils' +import { EncryptedGroupKey, EncryptionType, SignatureType } from '@streamr/trackerless-network' +import { IDENTITY_MAPPING } from '../identity/IdentityMapping' +import { createSignaturePayload, MessageIdLike, MessageRefLike } from './createSignaturePayload' +import { createLegacySignaturePayload } from './createLegacySignaturePayload' +import { StreamMessage, StreamMessageType } from '../protocol/StreamMessage' + +// Lookup structure SignatureType -> SigningUtil +const signingUtilBySignatureType: Record = Object.fromEntries( + IDENTITY_MAPPING.map((idMapping) => [idMapping.signatureType, SigningUtil.getInstance(idMapping.keyType)]) +) + +const evmSigner = SigningUtil.getInstance('ECDSA_SECP256K1_EVM') + +/** + * Result of signature validation + */ +export type SignatureValidationResult = + | { type: 'valid' } + | { type: 'invalid' } + | { type: 'error', message: string } + +/** + * Plain data type for signature validation that can be serialized to a worker. + * This contains only primitive values and simple objects (no class instances). + */ +export interface SignatureValidationData { + messageId: MessageIdLike + prevMsgRef?: MessageRefLike + messageType: StreamMessageType + content: Uint8Array + signature: Uint8Array + signatureType: SignatureType + encryptionType: EncryptionType + newGroupKey?: EncryptedGroupKey +} + +/** + * Extract plain serializable data from a StreamMessage for worker communication. + */ +export function toSignatureValidationData(message: StreamMessage): SignatureValidationData { + return { + messageId: { + streamId: message.messageId.streamId, + streamPartition: message.messageId.streamPartition, + timestamp: message.messageId.timestamp, + sequenceNumber: message.messageId.sequenceNumber, + publisherId: message.messageId.publisherId, + msgChainId: message.messageId.msgChainId, + }, + prevMsgRef: message.prevMsgRef ? { + timestamp: message.prevMsgRef.timestamp, + sequenceNumber: message.prevMsgRef.sequenceNumber, + } : undefined, + messageType: message.messageType, + content: message.content, + signature: message.signature, + signatureType: message.signatureType, + encryptionType: message.encryptionType, + newGroupKey: message.newGroupKey, + } +} + +/** + * Validate signature using extracted data. + * This is the core validation logic that can be run in a worker. + */ +export async function validateSignatureData(data: SignatureValidationData): Promise { + try { + const signingUtil = signingUtilBySignatureType[data.signatureType] + // Common case: standard signature types + if (signingUtil) { + const payload = createSignaturePayload({ + messageId: data.messageId, + content: data.content, + messageType: data.messageType, + prevMsgRef: data.prevMsgRef, + newGroupKey: data.newGroupKey, + }) + const isValid = await signingUtil.verifySignature( + toUserIdRaw(data.messageId.publisherId), + payload, + data.signature + ) + return isValid ? { type: 'valid' } : { type: 'invalid' } + } + // Special handling: legacy signature type + if (data.signatureType === SignatureType.ECDSA_SECP256K1_LEGACY) { + const payload = createLegacySignaturePayload({ + messageId: data.messageId, + content: data.content, + encryptionType: data.encryptionType, + prevMsgRef: data.prevMsgRef, + newGroupKey: data.newGroupKey, + }) + const isValid = await evmSigner.verifySignature( + toUserIdRaw(data.messageId.publisherId), + payload, + data.signature + ) + return isValid ? { type: 'valid' } : { type: 'invalid' } + } + return { type: 'error', message: `Unsupported signatureType: "${data.signatureType}"` } + } catch (err) { + return { type: 'error', message: String(err) } + } +} + diff --git a/packages/sdk/test/unit/MessageFactory.test.ts b/packages/sdk/test/unit/MessageFactory.test.ts index 93c17213a4..2d93a05696 100644 --- a/packages/sdk/test/unit/MessageFactory.test.ts +++ b/packages/sdk/test/unit/MessageFactory.test.ts @@ -16,6 +16,7 @@ import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMes import { EthereumKeyPairIdentity } from '../../src/identity/EthereumKeyPairIdentity' import { EncryptionType, SignatureType, ContentType } from '@streamr/trackerless-network' import { StrictStreamrClientConfig } from '../../src/Config' +import { DestroySignal } from '../../src/DestroySignal' const CONTENT = { foo: 'bar' } const TIMESTAMP = Date.parse('2001-02-03T04:05:06Z') @@ -58,7 +59,7 @@ describe('MessageFactory', () => { isStreamPublisher: true }), groupKeyQueue: await createGroupKeyQueue(identity, GROUP_KEY), - signatureValidator: new SignatureValidator(opts?.erc1271ContractFacade ?? mock()), + signatureValidator: new SignatureValidator(opts?.erc1271ContractFacade ?? mock(), new DestroySignal()), messageSigner: new MessageSigner(identity), config: { validation: { diff --git a/packages/sdk/test/unit/SignatureValidator.test.ts b/packages/sdk/test/unit/SignatureValidator.test.ts index f303546dea..8b199679b7 100644 --- a/packages/sdk/test/unit/SignatureValidator.test.ts +++ b/packages/sdk/test/unit/SignatureValidator.test.ts @@ -11,6 +11,7 @@ import { StreamrClientError } from './../../src/StreamrClientError' import { MessageID } from './../../src/protocol/MessageID' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' import { ContentType, EncryptionType, SignatureType } from '@streamr/trackerless-network' +import { DestroySignal } from '../../src/DestroySignal' describe('SignatureValidator', () => { let erc1271ContractFacade: MockProxy @@ -18,7 +19,7 @@ describe('SignatureValidator', () => { beforeEach(() => { erc1271ContractFacade = mock() - signatureValidator = new SignatureValidator(erc1271ContractFacade) + signatureValidator = new SignatureValidator(erc1271ContractFacade, new DestroySignal()) }) describe('SECP256K1', () => { diff --git a/packages/sdk/test/unit/messagePipeline.test.ts b/packages/sdk/test/unit/messagePipeline.test.ts index ad7c0e5699..5cf75a458a 100644 --- a/packages/sdk/test/unit/messagePipeline.test.ts +++ b/packages/sdk/test/unit/messagePipeline.test.ts @@ -88,7 +88,7 @@ describe('messagePipeline', () => { getStorageNodes: undefined as any, resends: undefined as any, streamRegistry: streamRegistry as any, - signatureValidator: new SignatureValidator(mock()), + signatureValidator: new SignatureValidator(mock(), destroySignal), groupKeyManager: new GroupKeyManager( mock(), groupKeyStore, diff --git a/packages/sdk/test/unit/validateStreamMessage.test.ts b/packages/sdk/test/unit/validateStreamMessage.test.ts index 851521fb8c..e60c253d82 100644 --- a/packages/sdk/test/unit/validateStreamMessage.test.ts +++ b/packages/sdk/test/unit/validateStreamMessage.test.ts @@ -12,6 +12,7 @@ import { validateStreamMessage } from '../../src/utils/validateStreamMessage' import { createMockMessage } from '../test-utils/utils' import { StreamMessage } from './../../src/protocol/StreamMessage' import { StrictStreamrClientConfig } from '../../src/Config' +import { DestroySignal } from '../../src/DestroySignal' const PARTITION_COUNT = 3 @@ -47,7 +48,7 @@ describe('Validator', () => { await validateStreamMessage( msg, streamRegistry as any, - new SignatureValidator(mock()), + new SignatureValidator(mock(), new DestroySignal()), { validation: { permissions: true, diff --git a/packages/sdk/test/unit/validateStreamMessage2.test.ts b/packages/sdk/test/unit/validateStreamMessage2.test.ts index 1b570fe421..cd2cfec32f 100644 --- a/packages/sdk/test/unit/validateStreamMessage2.test.ts +++ b/packages/sdk/test/unit/validateStreamMessage2.test.ts @@ -14,6 +14,7 @@ import { MessageID } from './../../src/protocol/MessageID' import { MessageRef } from './../../src/protocol/MessageRef' import { StreamMessage, StreamMessageType } from './../../src/protocol/StreamMessage' import { StrictStreamrClientConfig } from '../../src/Config' +import { DestroySignal } from '../../src/DestroySignal' const groupKeyRequestToStreamMessage = async ( groupKeyRequest: GroupKeyRequest, @@ -71,7 +72,7 @@ describe('Validator2', () => { isStreamPublisher: (streamId: string, userId: UserID) => isPublisher(userId, streamId), isStreamSubscriber: (streamId: string, userId: UserID) => isSubscriber(userId, streamId) } as any, - new SignatureValidator(mock()), + new SignatureValidator(mock(), new DestroySignal()), { validation: { permissions: true, diff --git a/packages/sdk/webpack.config.js b/packages/sdk/webpack.config.js index d7005da143..d996bbe75d 100644 --- a/packages/sdk/webpack.config.js +++ b/packages/sdk/webpack.config.js @@ -67,7 +67,8 @@ module.exports = (env, argv) => { GIT_BRANCH: gitRevisionPlugin.branch(), }), new webpack.optimize.LimitChunkCountPlugin({ - maxChunks: 1 + // Allow 2 chunks: main bundle + signature validation worker + maxChunks: 2 }) ], performance: {