From 502a491a292ba01dc47f578e1d0f45ce63441df3 Mon Sep 17 00:00:00 2001 From: jonaslagoni Date: Thu, 8 Jan 2026 21:20:04 +0100 Subject: [PATCH 1/4] wip --- assets/videos/remotion-test | 1 + .../protocols/amqp/publishExchange.ts | 15 +- .../channels/protocols/amqp/publishQueue.ts | 15 +- .../channels/protocols/kafka/publish.ts | 33 +- .../channels/protocols/mqtt/publish.ts | 32 +- .../channels/protocols/websocket/subscribe.ts | 2 +- src/codegen/generators/typescript/payloads.ts | 9 +- src/codegen/modelina/presets/index.ts | 6 + src/codegen/modelina/presets/primitives.ts | 170 +++++ .../__snapshots__/channels.spec.ts.snap | 8 +- test/runtime/asyncapi-regular.json | 135 ++++ test/runtime/typescript/src/Types.ts | 21 +- test/runtime/typescript/src/channels/amqp.ts | 336 ++++++++- .../typescript/src/channels/event_source.ts | 294 +++++++- test/runtime/typescript/src/channels/kafka.ts | 291 +++++++- test/runtime/typescript/src/channels/mqtt.ts | 289 +++++++- test/runtime/typescript/src/channels/nats.ts | 693 +++++++++++++++++- .../typescript/src/channels/websocket.ts | 531 +++++++++++++- .../typescript/src/client/NatsClient.ts | 470 ++++++++++++ ...FindPetsByStatusAndCategoryResponse_200.ts | 40 + .../src/payloads/AnonymousSchema_9.ts | 71 ++ .../typescript/src/payloads/ArrayMessage.ts | 33 + .../typescript/src/payloads/StringMessage.ts | 30 + .../typescript/src/payloads/UnionMessage.ts | 38 + test/runtime/typescript/test/payloads.spec.ts | 116 +++ 25 files changed, 3619 insertions(+), 60 deletions(-) create mode 160000 assets/videos/remotion-test create mode 100644 src/codegen/modelina/presets/primitives.ts create mode 100644 test/runtime/typescript/src/payloads/AnonymousSchema_9.ts create mode 100644 test/runtime/typescript/src/payloads/ArrayMessage.ts create mode 100644 test/runtime/typescript/src/payloads/StringMessage.ts create mode 100644 test/runtime/typescript/src/payloads/UnionMessage.ts diff --git a/assets/videos/remotion-test b/assets/videos/remotion-test new file mode 160000 index 00000000..58d77ef5 --- /dev/null +++ b/assets/videos/remotion-test @@ -0,0 +1 @@ +Subproject commit 58d77ef55f0ecc86b70ccd2b149abd1e779f21c2 diff --git a/src/codegen/generators/typescript/channels/protocols/amqp/publishExchange.ts b/src/codegen/generators/typescript/channels/protocols/amqp/publishExchange.ts index f03d0297..56f19b0c 100644 --- a/src/codegen/generators/typescript/channels/protocols/amqp/publishExchange.ts +++ b/src/codegen/generators/typescript/channels/protocols/amqp/publishExchange.ts @@ -24,10 +24,9 @@ export function renderPublishExchange({ messageMarshalling = `${messageModule}.marshal(message)`; } messageType = messageModule ? `${messageModule}.${messageType}` : messageType; - const publishOperation = `let dataToSend: any = ${messageType === 'null' ? 'null' : messageMarshalling}; -const channel = await amqp.createChannel(); -const routingKey = ${addressToUse}; -// Set up message properties (headers) if provided + + const headersHandling = channelHeaders + ? `// Set up message properties (headers) if provided let publishOptions = { ...options }; if (headers) { const headerData = headers.marshal(); @@ -38,7 +37,13 @@ if (headers) { publishOptions.headers[key] = value; } } -} +}` + : `let publishOptions = { ...options };`; + + const publishOperation = `let dataToSend: any = ${messageType === 'null' ? 'null' : messageMarshalling}; +const channel = await amqp.createChannel(); +const routingKey = ${addressToUse}; +${headersHandling} channel.publish(exchange, routingKey, Buffer.from(dataToSend), publishOptions);`; const functionParameters = [ diff --git a/src/codegen/generators/typescript/channels/protocols/amqp/publishQueue.ts b/src/codegen/generators/typescript/channels/protocols/amqp/publishQueue.ts index 2fc05a83..15dece26 100644 --- a/src/codegen/generators/typescript/channels/protocols/amqp/publishQueue.ts +++ b/src/codegen/generators/typescript/channels/protocols/amqp/publishQueue.ts @@ -21,10 +21,9 @@ export function renderPublishQueue({ messageMarshalling = `${messageModule}.marshal(message)`; } messageType = messageModule ? `${messageModule}.${messageType}` : messageType; - const publishOperation = `let dataToSend: any = ${messageType === 'null' ? 'null' : messageMarshalling}; -const channel = await amqp.createChannel(); -const queue = ${addressToUse}; -// Set up message properties (headers) if provided + + const headersHandling = channelHeaders + ? `// Set up message properties (headers) if provided let publishOptions = { ...options }; if (headers) { const headerData = headers.marshal(); @@ -35,7 +34,13 @@ if (headers) { publishOptions.headers[key] = value; } } -} +}` + : `let publishOptions = { ...options };`; + + const publishOperation = `let dataToSend: any = ${messageType === 'null' ? 'null' : messageMarshalling}; +const channel = await amqp.createChannel(); +const queue = ${addressToUse}; +${headersHandling} channel.sendToQueue(queue, Buffer.from(dataToSend), publishOptions);`; const functionParameters = [ diff --git a/src/codegen/generators/typescript/channels/protocols/kafka/publish.ts b/src/codegen/generators/typescript/channels/protocols/kafka/publish.ts index 867d83cb..bea7bcc6 100644 --- a/src/codegen/generators/typescript/channels/protocols/kafka/publish.ts +++ b/src/codegen/generators/typescript/channels/protocols/kafka/publish.ts @@ -58,6 +58,23 @@ export function renderPublish({ } ]; + const headersHandling = channelHeaders + ? `// Set up headers if provided + let messageHeaders: Record | undefined = undefined; + if (headers) { + const headerData = headers.marshal(); + const parsedHeaders = typeof headerData === 'string' ? JSON.parse(headerData) : headerData; + messageHeaders = {}; + for (const [key, value] of Object.entries(parsedHeaders)) { + if (value !== undefined) { + messageHeaders[key] = String(value); + } + } + }` + : ''; + + const headersInMessage = channelHeaders ? 'headers: messageHeaders' : ''; + const code = `/** * Kafka publish operation for \`${topic}\` * @@ -73,25 +90,13 @@ function ${functionName}({ ${publishOperation} const producer = kafka.producer(); await producer.connect(); - // Set up headers if provided - let messageHeaders: Record | undefined = undefined; - if (headers) { - const headerData = headers.marshal(); - const parsedHeaders = typeof headerData === 'string' ? JSON.parse(headerData) : headerData; - messageHeaders = {}; - for (const [key, value] of Object.entries(parsedHeaders)) { - if (value !== undefined) { - messageHeaders[key] = String(value); - } - } - } + ${headersHandling} await producer.send({ topic: ${addressToUse}, messages: [ { - value: dataToSend, - headers: messageHeaders + value: dataToSend${channelHeaders ? ',\n ' + headersInMessage : ''} }, ], }); diff --git a/src/codegen/generators/typescript/channels/protocols/mqtt/publish.ts b/src/codegen/generators/typescript/channels/protocols/mqtt/publish.ts index 76e4f063..271364d3 100644 --- a/src/codegen/generators/typescript/channels/protocols/mqtt/publish.ts +++ b/src/codegen/generators/typescript/channels/protocols/mqtt/publish.ts @@ -21,35 +21,29 @@ export function renderPublish({ messageMarshalling = `${messageModule}.marshal(message)`; } messageType = messageModule ? `${messageModule}.${messageType}` : messageType; - const publishOperation = - messageType === 'null' - ? `// Set up user properties (headers) if provided - let publishOptions: Mqtt.IClientPublishOptions = {}; - if (headers) { - const headerData = headers.marshal(); - const parsedHeaders = typeof headerData === 'string' ? JSON.parse(headerData) : headerData; - publishOptions.properties = { userProperties: {} }; - for (const [key, value] of Object.entries(parsedHeaders)) { - if (value !== undefined) { - publishOptions.properties.userProperties[key] = String(value); - } - } - } - mqtt.publish(${addressToUse}, '', publishOptions);` - : `let dataToSend: any = ${messageMarshalling}; - // Set up user properties (headers) if provided + + const headersHandling = channelHeaders + ? `// Set up user properties (headers) if provided let publishOptions: Mqtt.IClientPublishOptions = {}; if (headers) { const headerData = headers.marshal(); const parsedHeaders = typeof headerData === 'string' ? JSON.parse(headerData) : headerData; - const userProperties = {}; + const userProperties: Record = {}; for (const [key, value] of Object.entries(parsedHeaders)) { if (value !== undefined) { userProperties[key] = String(value); } } publishOptions.properties = { userProperties }; - } + }` + : `let publishOptions: Mqtt.IClientPublishOptions = {};`; + + const publishOperation = + messageType === 'null' + ? `${headersHandling} + mqtt.publish(${addressToUse}, '', publishOptions);` + : `let dataToSend: any = ${messageMarshalling}; + ${headersHandling} mqtt.publish(${addressToUse}, dataToSend, publishOptions);`; const functionParameters = [ diff --git a/src/codegen/generators/typescript/channels/protocols/websocket/subscribe.ts b/src/codegen/generators/typescript/channels/protocols/websocket/subscribe.ts index f040c415..a86b4cba 100644 --- a/src/codegen/generators/typescript/channels/protocols/websocket/subscribe.ts +++ b/src/codegen/generators/typescript/channels/protocols/websocket/subscribe.ts @@ -99,7 +99,7 @@ function ${functionName}({ // Validate message if validation is enabled if (!skipMessageValidation) { - const messageToValidate = parsedMessage.marshal(); + const messageToValidate = ${messageModule ? `${messageModule}.marshal(parsedMessage)` : `parsedMessage.marshal()`}; const {valid, errors} = ${messageModule ? `${messageModule}.validate({data: messageToValidate, ajvValidatorFunction: validator})` : `${messageType}.validate({data: messageToValidate, ajvValidatorFunction: validator})`}; if (!valid) { onDataCallback({ diff --git a/src/codegen/generators/typescript/payloads.ts b/src/codegen/generators/typescript/payloads.ts index 9dd2db06..8b5df5f7 100644 --- a/src/codegen/generators/typescript/payloads.ts +++ b/src/codegen/generators/typescript/payloads.ts @@ -17,7 +17,8 @@ import {OpenAPIV2, OpenAPIV3, OpenAPIV3_1} from 'openapi-types'; import {TS_COMMON_PRESET} from '@asyncapi/modelina'; import { createValidationPreset, - createUnionPreset + createUnionPreset, + createPrimitivesPreset } from '../../modelina/presets'; export const zodTypeScriptPayloadGenerator = z.object({ @@ -144,6 +145,12 @@ export async function generateTypescriptPayloadsCoreFromSchemas({ includeValidation: generator.includeValidation }, context + ), + createPrimitivesPreset( + { + includeValidation: generator.includeValidation + }, + context ) ], enumType: generator.enum, diff --git a/src/codegen/modelina/presets/index.ts b/src/codegen/modelina/presets/index.ts index 35b93013..69ab474b 100644 --- a/src/codegen/modelina/presets/index.ts +++ b/src/codegen/modelina/presets/index.ts @@ -8,3 +8,9 @@ export { // Core union marshalling/unmarshalling presets export {createUnionPreset, type UnionPresetOptions} from './union'; + +// Primitive and array type marshalling/unmarshalling presets +export { + createPrimitivesPreset, + type PrimitivesPresetOptions +} from './primitives'; diff --git a/src/codegen/modelina/presets/primitives.ts b/src/codegen/modelina/presets/primitives.ts new file mode 100644 index 00000000..d0ff05b2 --- /dev/null +++ b/src/codegen/modelina/presets/primitives.ts @@ -0,0 +1,170 @@ +import { + ConstrainedMetaModel, + ConstrainedStringModel, + ConstrainedIntegerModel, + ConstrainedFloatModel, + ConstrainedBooleanModel, + ConstrainedArrayModel +} from '@asyncapi/modelina'; +import {TypeScriptRenderer} from '@asyncapi/modelina/lib/types/generators/typescript/TypeScriptRenderer'; +import { + BaseGeneratorContext, + generateTypescriptValidationCode +} from './validation'; + +/** + * Configuration options for the primitives preset + */ +export interface PrimitivesPresetOptions { + /** Whether to include validation methods in generated primitive types */ + includeValidation: boolean; +} + +/** + * Check if the model is a primitive type (string, integer, float, boolean) + */ +function isPrimitiveModel(model: ConstrainedMetaModel): boolean { + return ( + model instanceof ConstrainedStringModel || + model instanceof ConstrainedIntegerModel || + model instanceof ConstrainedFloatModel || + model instanceof ConstrainedBooleanModel + ); +} + +/** + * Render marshal function for primitive types + */ +function renderPrimitiveMarshal(model: ConstrainedMetaModel): string { + return `export function marshal(payload: ${model.name}): string { + return JSON.stringify(payload); +}`; +} + +/** + * Render unmarshal function for primitive types + */ +function renderPrimitiveUnmarshal(model: ConstrainedMetaModel): string { + // For string types, the input can be a JSON string (quoted) or the raw value + // We use 'any' for the json parameter since JSON.parse returns 'any' + return `export function unmarshal(json: string): ${model.name} { + return JSON.parse(json) as ${model.name}; +}`; +} + +/** + * Render marshal function for array types + */ +function renderArrayMarshal(model: ConstrainedArrayModel): string { + const valueModel = model.valueModel; + + // Check if array items have a marshal method (object types) + const hasItemMarshal = + valueModel.type !== 'string' && + valueModel.type !== 'number' && + valueModel.type !== 'boolean'; + + if (hasItemMarshal) { + return `export function marshal(payload: ${model.name}): string { + return JSON.stringify(payload.map((item) => { + if (item && typeof item === 'object' && 'marshal' in item && typeof item.marshal === 'function') { + return JSON.parse(item.marshal()); + } + return item; + })); +}`; + } + + return `export function marshal(payload: ${model.name}): string { + return JSON.stringify(payload); +}`; +} + +/** + * Render unmarshal function for array types + */ +function renderArrayUnmarshal(model: ConstrainedArrayModel): string { + const valueModel = model.valueModel; + + // Check if array items have an unmarshal method (object types) + const hasItemUnmarshal = + valueModel.type !== 'string' && + valueModel.type !== 'number' && + valueModel.type !== 'boolean'; + + if (hasItemUnmarshal) { + return `export function unmarshal(json: string | any[]): ${model.name} { + const arr = typeof json === 'string' ? JSON.parse(json) : json; + return arr.map((item: any) => { + if (item && typeof item === 'object') { + // Try to use unmarshal if available on the type + return item; + } + return item; + }) as ${model.name}; +}`; + } + + return `export function unmarshal(json: string | any[]): ${model.name} { + if (typeof json === 'string') { + return JSON.parse(json) as ${model.name}; + } + return json as ${model.name}; +}`; +} + +/** + * Creates a preset that adds marshalling/unmarshalling and validation methods + * to primitive types (string, number, boolean) and array types + * + * @param options Configuration for primitive generation + * @param context Generator context containing input type information + * @returns Modelina preset object with primitive marshalling functionality + * + * @example + * ```typescript + * const preset = createPrimitivesPreset({ + * includeValidation: true + * }, context); + * ``` + */ +export function createPrimitivesPreset( + options: PrimitivesPresetOptions, + context: BaseGeneratorContext +) { + return { + type: { + self({ + model, + content, + renderer + }: { + model: ConstrainedMetaModel; + content: string; + renderer: TypeScriptRenderer; + }) { + // Handle primitive types (string, integer, float, boolean) + if (isPrimitiveModel(model)) { + return `${content} + +${renderPrimitiveUnmarshal(model)} +${renderPrimitiveMarshal(model)} +${options.includeValidation ? generateTypescriptValidationCode({model, renderer, asClassMethods: false, context: context as any}) : ''} +`; + } + + // Handle array types + if (model instanceof ConstrainedArrayModel) { + return `${content} + +${renderArrayUnmarshal(model)} +${renderArrayMarshal(model)} +${options.includeValidation ? generateTypescriptValidationCode({model, renderer, asClassMethods: false, context: context as any}) : ''} +`; + } + + return content; + } + } + }; +} diff --git a/test/codegen/generators/typescript/__snapshots__/channels.spec.ts.snap b/test/codegen/generators/typescript/__snapshots__/channels.spec.ts.snap index 33056eae..b4f2e39f 100644 --- a/test/codegen/generators/typescript/__snapshots__/channels.spec.ts.snap +++ b/test/codegen/generators/typescript/__snapshots__/channels.spec.ts.snap @@ -1956,7 +1956,7 @@ function publishToUserSignedup({ if (headers) { const headerData = headers.marshal(); const parsedHeaders = typeof headerData === 'string' ? JSON.parse(headerData) : headerData; - const userProperties = {}; + const userProperties: Record = {}; for (const [key, value] of Object.entries(parsedHeaders)) { if (value !== undefined) { userProperties[key] = String(value); @@ -2074,7 +2074,7 @@ function publishToNoParameter({ if (headers) { const headerData = headers.marshal(); const parsedHeaders = typeof headerData === 'string' ? JSON.parse(headerData) : headerData; - const userProperties = {}; + const userProperties: Record = {}; for (const [key, value] of Object.entries(parsedHeaders)) { if (value !== undefined) { userProperties[key] = String(value); @@ -2879,7 +2879,7 @@ function subscribeToUserSignedup({ // Validate message if validation is enabled if (!skipMessageValidation) { - const messageToValidate = parsedMessage.marshal(); + const messageToValidate = UserSignedUpPayloadModule.marshal(parsedMessage); const {valid, errors} = UserSignedUpPayloadModule.validate({data: messageToValidate, ajvValidatorFunction: validator}); if (!valid) { onDataCallback({ @@ -3062,7 +3062,7 @@ function subscribeToNoParameter({ // Validate message if validation is enabled if (!skipMessageValidation) { - const messageToValidate = parsedMessage.marshal(); + const messageToValidate = UserSignedUpPayloadModule.marshal(parsedMessage); const {valid, errors} = UserSignedUpPayloadModule.validate({data: messageToValidate, ajvValidatorFunction: validator}); if (!valid) { onDataCallback({ diff --git a/test/runtime/asyncapi-regular.json b/test/runtime/asyncapi-regular.json index 244b82e1..66f23506 100644 --- a/test/runtime/asyncapi-regular.json +++ b/test/runtime/asyncapi-regular.json @@ -29,6 +29,30 @@ "$ref": "#/components/messages/UserSignedUp" } } + }, + "stringPayload": { + "address": "string/payload", + "messages": { + "StringMessage": { + "$ref": "#/components/messages/StringMessage" + } + } + }, + "arrayPayload": { + "address": "array/payload", + "messages": { + "ArrayMessage": { + "$ref": "#/components/messages/ArrayMessage" + } + } + }, + "unionPayload": { + "address": "union/payload", + "messages": { + "UnionMessage": { + "$ref": "#/components/messages/UnionMessage" + } + } } }, "operations": { @@ -53,6 +77,72 @@ "$ref": "#/channels/userSignedup/messages/UserSignedUp" } ] + }, + "sendStringPayload": { + "action": "send", + "channel": { + "$ref": "#/channels/stringPayload" + }, + "messages": [ + { + "$ref": "#/channels/stringPayload/messages/StringMessage" + } + ] + }, + "receiveStringPayload": { + "action": "receive", + "channel": { + "$ref": "#/channels/stringPayload" + }, + "messages": [ + { + "$ref": "#/channels/stringPayload/messages/StringMessage" + } + ] + }, + "sendArrayPayload": { + "action": "send", + "channel": { + "$ref": "#/channels/arrayPayload" + }, + "messages": [ + { + "$ref": "#/channels/arrayPayload/messages/ArrayMessage" + } + ] + }, + "receiveArrayPayload": { + "action": "receive", + "channel": { + "$ref": "#/channels/arrayPayload" + }, + "messages": [ + { + "$ref": "#/channels/arrayPayload/messages/ArrayMessage" + } + ] + }, + "sendUnionPayload": { + "action": "send", + "channel": { + "$ref": "#/channels/unionPayload" + }, + "messages": [ + { + "$ref": "#/channels/unionPayload/messages/UnionMessage" + } + ] + }, + "receiveUnionPayload": { + "action": "receive", + "channel": { + "$ref": "#/channels/unionPayload" + }, + "messages": [ + { + "$ref": "#/channels/unionPayload/messages/UnionMessage" + } + ] } }, "components": { @@ -64,6 +154,21 @@ "headers": { "$ref": "#/components/schemas/UserSignedUpHeaders" } + }, + "StringMessage": { + "payload": { + "$ref": "#/components/schemas/StringPayload" + } + }, + "ArrayMessage": { + "payload": { + "$ref": "#/components/schemas/ArrayPayload" + } + }, + "UnionMessage": { + "payload": { + "$ref": "#/components/schemas/UnionPayload" + } } }, "schemas": { @@ -89,6 +194,36 @@ "description": "Test header" } } + }, + "StringPayload": { + "type": "string", + "description": "A simple string payload" + }, + "ArrayPayload": { + "type": "array", + "items": { + "type": "string" + }, + "description": "An array of strings payload" + }, + "UnionPayload": { + "oneOf": [ + { + "type": "string" + }, + { + "type": "number" + }, + { + "type": "object", + "properties": { + "name": { + "type": "string" + } + } + } + ], + "description": "A union type payload" } } } diff --git a/test/runtime/typescript/src/Types.ts b/test/runtime/typescript/src/Types.ts index 1800c880..58ffd8e1 100644 --- a/test/runtime/typescript/src/Types.ts +++ b/test/runtime/typescript/src/Types.ts @@ -1,11 +1,17 @@ -export type Topics = 'user/signedup/{my_parameter}/{enum_parameter}' | 'noparameters'; -export type TopicIds = 'userSignedup' | 'noParameter'; +export type Topics = 'user/signedup/{my_parameter}/{enum_parameter}' | 'noparameters' | 'string/payload' | 'array/payload' | 'union/payload'; +export type TopicIds = 'userSignedup' | 'noParameter' | 'stringPayload' | 'arrayPayload' | 'unionPayload'; export function ToTopicIds(topic: Topics): TopicIds { switch (topic) { case 'user/signedup/{my_parameter}/{enum_parameter}': return 'userSignedup'; case 'noparameters': return 'noParameter'; + case 'string/payload': + return 'stringPayload'; + case 'array/payload': + return 'arrayPayload'; + case 'union/payload': + return 'unionPayload'; default: throw new Error('Unknown topic: ' + topic); } @@ -16,11 +22,20 @@ export function ToTopics(topicId: TopicIds): Topics { return 'user/signedup/{my_parameter}/{enum_parameter}'; case 'noParameter': return 'noparameters'; + case 'stringPayload': + return 'string/payload'; + case 'arrayPayload': + return 'array/payload'; + case 'unionPayload': + return 'union/payload'; default: throw new Error('Unknown topic ID: ' + topicId); } } export const TopicsMap: Record = { 'userSignedup': 'user/signedup/{my_parameter}/{enum_parameter}', - 'noParameter': 'noparameters' + 'noParameter': 'noparameters', + 'stringPayload': 'string/payload', + 'arrayPayload': 'array/payload', + 'unionPayload': 'union/payload' }; diff --git a/test/runtime/typescript/src/channels/amqp.ts b/test/runtime/typescript/src/channels/amqp.ts index ec89d6cc..ae8cee5e 100644 --- a/test/runtime/typescript/src/channels/amqp.ts +++ b/test/runtime/typescript/src/channels/amqp.ts @@ -1,4 +1,8 @@ import {UserSignedUp} from './../payloads/UserSignedUp'; +import * as StringMessageModule from './../payloads/StringMessage'; +import * as ArrayMessageModule from './../payloads/ArrayMessage'; +import * as UnionMessageModule from './../payloads/UnionMessage'; +import {AnonymousSchema_9} from './../payloads/AnonymousSchema_9'; import {UserSignedupParameters} from './../parameters/UserSignedupParameters'; import {UserSignedUpHeaders} from './../headers/UserSignedUpHeaders'; import * as Amqp from 'amqplib'; @@ -308,4 +312,334 @@ channel.consume(queue, (msg) => { }); } -export { publishToSendUserSignedupExchange, publishToSendUserSignedupQueue, subscribeToReceiveUserSignedupQueue, publishToNoParameterExchange, publishToNoParameterQueue, subscribeToNoParameterQueue }; +/** + * AMQP publish operation for exchange `string/payload` + * + * @param message to publish + * @param amqp the AMQP connection to send over + * @param options for the AMQP publish exchange operation + */ +function publishToSendStringPayloadExchange({ + message, + amqp, + options +}: { + message: StringMessageModule.StringMessage, + amqp: Amqp.Connection, + options?: {exchange: string | undefined} & Amqp.Options.Publish +}): Promise { + return new Promise(async (resolve, reject) => { + const exchange = options?.exchange ?? 'undefined'; + if(!exchange) { + return reject('No exchange value found, please provide one') + } + try { + let dataToSend: any = StringMessageModule.marshal(message); +const channel = await amqp.createChannel(); +const routingKey = 'string/payload'; +let publishOptions = { ...options }; +channel.publish(exchange, routingKey, Buffer.from(dataToSend), publishOptions); + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * AMQP publish operation for queue `string/payload` + * + * @param message to publish + * @param amqp the AMQP connection to send over + * @param options for the AMQP publish queue operation + */ +function publishToSendStringPayloadQueue({ + message, + amqp, + options +}: { + message: StringMessageModule.StringMessage, + amqp: Amqp.Connection, + options?: Amqp.Options.Publish +}): Promise { + return new Promise(async (resolve, reject) => { + try { + let dataToSend: any = StringMessageModule.marshal(message); +const channel = await amqp.createChannel(); +const queue = 'string/payload'; +let publishOptions = { ...options }; +channel.sendToQueue(queue, Buffer.from(dataToSend), publishOptions); + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * AMQP subscribe operation for queue `string/payload` + * + * @param {subscribeToReceiveStringPayloadQueueCallback} onDataCallback to call when messages are received + * @param amqp the AMQP connection to receive from + * @param options for the AMQP subscribe queue operation + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function subscribeToReceiveStringPayloadQueue({ + onDataCallback, + amqp, + options, + skipMessageValidation = false +}: { + onDataCallback: (params: {err?: Error, msg?: StringMessageModule.StringMessage, amqpMsg?: Amqp.ConsumeMessage}) => void, + amqp: Amqp.Connection, + options?: Amqp.Options.Consume, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + const channel = await amqp.createChannel(); +const queue = 'string/payload'; +await channel.assertQueue(queue, { durable: true }); +const validator = StringMessageModule.createValidator(); +channel.consume(queue, (msg) => { + if (msg !== null) { + const receivedData = msg.content.toString() + + if(!skipMessageValidation) { + const {valid, errors} = StringMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + onDataCallback({err: new Error(`Invalid message payload received ${JSON.stringify({cause: errors})}`), msg: undefined, amqpMsg: msg}); return; + } + } + const message = StringMessageModule.unmarshal(receivedData); + onDataCallback({err: undefined, msg: message, amqpMsg: msg}); + } +}, options); + resolve(channel); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * AMQP publish operation for exchange `array/payload` + * + * @param message to publish + * @param amqp the AMQP connection to send over + * @param options for the AMQP publish exchange operation + */ +function publishToSendArrayPayloadExchange({ + message, + amqp, + options +}: { + message: ArrayMessageModule.ArrayMessage, + amqp: Amqp.Connection, + options?: {exchange: string | undefined} & Amqp.Options.Publish +}): Promise { + return new Promise(async (resolve, reject) => { + const exchange = options?.exchange ?? 'undefined'; + if(!exchange) { + return reject('No exchange value found, please provide one') + } + try { + let dataToSend: any = ArrayMessageModule.marshal(message); +const channel = await amqp.createChannel(); +const routingKey = 'array/payload'; +let publishOptions = { ...options }; +channel.publish(exchange, routingKey, Buffer.from(dataToSend), publishOptions); + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * AMQP publish operation for queue `array/payload` + * + * @param message to publish + * @param amqp the AMQP connection to send over + * @param options for the AMQP publish queue operation + */ +function publishToSendArrayPayloadQueue({ + message, + amqp, + options +}: { + message: ArrayMessageModule.ArrayMessage, + amqp: Amqp.Connection, + options?: Amqp.Options.Publish +}): Promise { + return new Promise(async (resolve, reject) => { + try { + let dataToSend: any = ArrayMessageModule.marshal(message); +const channel = await amqp.createChannel(); +const queue = 'array/payload'; +let publishOptions = { ...options }; +channel.sendToQueue(queue, Buffer.from(dataToSend), publishOptions); + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * AMQP subscribe operation for queue `array/payload` + * + * @param {subscribeToReceiveArrayPayloadQueueCallback} onDataCallback to call when messages are received + * @param amqp the AMQP connection to receive from + * @param options for the AMQP subscribe queue operation + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function subscribeToReceiveArrayPayloadQueue({ + onDataCallback, + amqp, + options, + skipMessageValidation = false +}: { + onDataCallback: (params: {err?: Error, msg?: ArrayMessageModule.ArrayMessage, amqpMsg?: Amqp.ConsumeMessage}) => void, + amqp: Amqp.Connection, + options?: Amqp.Options.Consume, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + const channel = await amqp.createChannel(); +const queue = 'array/payload'; +await channel.assertQueue(queue, { durable: true }); +const validator = ArrayMessageModule.createValidator(); +channel.consume(queue, (msg) => { + if (msg !== null) { + const receivedData = msg.content.toString() + + if(!skipMessageValidation) { + const {valid, errors} = ArrayMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + onDataCallback({err: new Error(`Invalid message payload received ${JSON.stringify({cause: errors})}`), msg: undefined, amqpMsg: msg}); return; + } + } + const message = ArrayMessageModule.unmarshal(receivedData); + onDataCallback({err: undefined, msg: message, amqpMsg: msg}); + } +}, options); + resolve(channel); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * AMQP publish operation for exchange `union/payload` + * + * @param message to publish + * @param amqp the AMQP connection to send over + * @param options for the AMQP publish exchange operation + */ +function publishToSendUnionPayloadExchange({ + message, + amqp, + options +}: { + message: UnionMessageModule.UnionMessage, + amqp: Amqp.Connection, + options?: {exchange: string | undefined} & Amqp.Options.Publish +}): Promise { + return new Promise(async (resolve, reject) => { + const exchange = options?.exchange ?? 'undefined'; + if(!exchange) { + return reject('No exchange value found, please provide one') + } + try { + let dataToSend: any = UnionMessageModule.marshal(message); +const channel = await amqp.createChannel(); +const routingKey = 'union/payload'; +let publishOptions = { ...options }; +channel.publish(exchange, routingKey, Buffer.from(dataToSend), publishOptions); + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * AMQP publish operation for queue `union/payload` + * + * @param message to publish + * @param amqp the AMQP connection to send over + * @param options for the AMQP publish queue operation + */ +function publishToSendUnionPayloadQueue({ + message, + amqp, + options +}: { + message: UnionMessageModule.UnionMessage, + amqp: Amqp.Connection, + options?: Amqp.Options.Publish +}): Promise { + return new Promise(async (resolve, reject) => { + try { + let dataToSend: any = UnionMessageModule.marshal(message); +const channel = await amqp.createChannel(); +const queue = 'union/payload'; +let publishOptions = { ...options }; +channel.sendToQueue(queue, Buffer.from(dataToSend), publishOptions); + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * AMQP subscribe operation for queue `union/payload` + * + * @param {subscribeToReceiveUnionPayloadQueueCallback} onDataCallback to call when messages are received + * @param amqp the AMQP connection to receive from + * @param options for the AMQP subscribe queue operation + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function subscribeToReceiveUnionPayloadQueue({ + onDataCallback, + amqp, + options, + skipMessageValidation = false +}: { + onDataCallback: (params: {err?: Error, msg?: UnionMessageModule.UnionMessage, amqpMsg?: Amqp.ConsumeMessage}) => void, + amqp: Amqp.Connection, + options?: Amqp.Options.Consume, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + const channel = await amqp.createChannel(); +const queue = 'union/payload'; +await channel.assertQueue(queue, { durable: true }); +const validator = UnionMessageModule.createValidator(); +channel.consume(queue, (msg) => { + if (msg !== null) { + const receivedData = msg.content.toString() + + if(!skipMessageValidation) { + const {valid, errors} = UnionMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + onDataCallback({err: new Error(`Invalid message payload received ${JSON.stringify({cause: errors})}`), msg: undefined, amqpMsg: msg}); return; + } + } + const message = UnionMessageModule.unmarshal(receivedData); + onDataCallback({err: undefined, msg: message, amqpMsg: msg}); + } +}, options); + resolve(channel); + } catch (e: any) { + reject(e); + } + }); +} + +export { publishToSendUserSignedupExchange, publishToSendUserSignedupQueue, subscribeToReceiveUserSignedupQueue, publishToNoParameterExchange, publishToNoParameterQueue, subscribeToNoParameterQueue, publishToSendStringPayloadExchange, publishToSendStringPayloadQueue, subscribeToReceiveStringPayloadQueue, publishToSendArrayPayloadExchange, publishToSendArrayPayloadQueue, subscribeToReceiveArrayPayloadQueue, publishToSendUnionPayloadExchange, publishToSendUnionPayloadQueue, subscribeToReceiveUnionPayloadQueue }; diff --git a/test/runtime/typescript/src/channels/event_source.ts b/test/runtime/typescript/src/channels/event_source.ts index a5567609..61e8622f 100644 --- a/test/runtime/typescript/src/channels/event_source.ts +++ b/test/runtime/typescript/src/channels/event_source.ts @@ -1,4 +1,8 @@ import {UserSignedUp} from './../payloads/UserSignedUp'; +import * as StringMessageModule from './../payloads/StringMessage'; +import * as ArrayMessageModule from './../payloads/ArrayMessage'; +import * as UnionMessageModule from './../payloads/UnionMessage'; +import {AnonymousSchema_9} from './../payloads/AnonymousSchema_9'; import {UserSignedupParameters} from './../parameters/UserSignedupParameters'; import {UserSignedUpHeaders} from './../headers/UserSignedUpHeaders'; import { NextFunction, Request, Response, Router } from 'express'; @@ -223,4 +227,292 @@ function registerNoParameter({ } -export { registerSendUserSignedup, listenForReceiveUserSignedup, listenForNoParameter, registerNoParameter }; +function registerSendStringPayload({ + router, + callback +}: { + router: Router, + callback: ((req: Request, res: Response, next: NextFunction, sendEvent: (message: StringMessageModule.StringMessage) => void) => void) | ((req: Request, res: Response, next: NextFunction, sendEvent: (message: StringMessageModule.StringMessage) => void) => Promise) +}): void { + const event = '/string/payload'; + router.get(event, async (req, res, next) => { + + res.writeHead(200, { + 'Cache-Control': 'no-cache, no-transform', + 'Content-Type': 'text/event-stream', + Connection: 'keep-alive', + 'Access-Control-Allow-Origin': '*', + }) + const sendEventCallback = (message: StringMessageModule.StringMessage) => { + if (res.closed) { + return + } + res.write(`event: ${event}\n`) + res.write(`data: ${StringMessageModule.marshal(message)}\n\n`) + } + await callback(req, res, next, sendEventCallback) + }) +} + + +/** + * Event source fetch for `string/payload` + * + * @param callback to call when receiving events + * @param options additionally used to handle the event source + * @param skipMessageValidation turn off runtime validation of incoming messages + * @returns A cleanup function to abort the connection + */ +function listenForReceiveStringPayload({ + callback, + options, + skipMessageValidation = false +}: { + callback: (params: {error?: Error, messageEvent?: StringMessageModule.StringMessage}) => void, + options: {authorization?: string, onClose?: (err?: string) => void, baseUrl: string, headers?: Record}, + skipMessageValidation?: boolean +}): (() => void) { + const controller = new AbortController(); + let eventsUrl: string = 'string/payload'; + const url = `${options.baseUrl}/${eventsUrl}` + const requestHeaders: Record = { + ...options.headers ?? {}, + Accept: 'text/event-stream' + } + if(options.authorization) { + requestHeaders['authorization'] = `Bearer ${options?.authorization}`; + } + + const validator = StringMessageModule.createValidator(); + fetchEventSource(`${url}`, { + method: 'GET', + headers: requestHeaders, + signal: controller.signal, + onmessage: (ev: EventSourceMessage) => { + const receivedData = ev.data; + if(!skipMessageValidation) { + const {valid, errors} = StringMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + return callback({error: new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), messageEvent: undefined}); + } + } + const callbackData = StringMessageModule.unmarshal(receivedData); + callback({error: undefined, messageEvent: callbackData}); + }, + onerror: (err) => { + options.onClose?.(err); + }, + onclose: () => { + options.onClose?.(); + }, + async onopen(response: { ok: any; headers: any; status: number }) { + if (response.ok && response.headers.get('content-type') === EventStreamContentType) { + return // everything's good + } else if (response.status >= 400 && response.status < 500 && response.status !== 429) { + // client-side errors are usually non-retriable: + callback({error: new Error('Client side error, could not open event connection'), messageEvent: undefined}) + } else { + callback({error: new Error('Unknown error, could not open event connection'), messageEvent: undefined}); + } + }, + }); + + return () => { + controller.abort(); + }; +} + + +function registerSendArrayPayload({ + router, + callback +}: { + router: Router, + callback: ((req: Request, res: Response, next: NextFunction, sendEvent: (message: ArrayMessageModule.ArrayMessage) => void) => void) | ((req: Request, res: Response, next: NextFunction, sendEvent: (message: ArrayMessageModule.ArrayMessage) => void) => Promise) +}): void { + const event = '/array/payload'; + router.get(event, async (req, res, next) => { + + res.writeHead(200, { + 'Cache-Control': 'no-cache, no-transform', + 'Content-Type': 'text/event-stream', + Connection: 'keep-alive', + 'Access-Control-Allow-Origin': '*', + }) + const sendEventCallback = (message: ArrayMessageModule.ArrayMessage) => { + if (res.closed) { + return + } + res.write(`event: ${event}\n`) + res.write(`data: ${ArrayMessageModule.marshal(message)}\n\n`) + } + await callback(req, res, next, sendEventCallback) + }) +} + + +/** + * Event source fetch for `array/payload` + * + * @param callback to call when receiving events + * @param options additionally used to handle the event source + * @param skipMessageValidation turn off runtime validation of incoming messages + * @returns A cleanup function to abort the connection + */ +function listenForReceiveArrayPayload({ + callback, + options, + skipMessageValidation = false +}: { + callback: (params: {error?: Error, messageEvent?: ArrayMessageModule.ArrayMessage}) => void, + options: {authorization?: string, onClose?: (err?: string) => void, baseUrl: string, headers?: Record}, + skipMessageValidation?: boolean +}): (() => void) { + const controller = new AbortController(); + let eventsUrl: string = 'array/payload'; + const url = `${options.baseUrl}/${eventsUrl}` + const requestHeaders: Record = { + ...options.headers ?? {}, + Accept: 'text/event-stream' + } + if(options.authorization) { + requestHeaders['authorization'] = `Bearer ${options?.authorization}`; + } + + const validator = ArrayMessageModule.createValidator(); + fetchEventSource(`${url}`, { + method: 'GET', + headers: requestHeaders, + signal: controller.signal, + onmessage: (ev: EventSourceMessage) => { + const receivedData = ev.data; + if(!skipMessageValidation) { + const {valid, errors} = ArrayMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + return callback({error: new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), messageEvent: undefined}); + } + } + const callbackData = ArrayMessageModule.unmarshal(receivedData); + callback({error: undefined, messageEvent: callbackData}); + }, + onerror: (err) => { + options.onClose?.(err); + }, + onclose: () => { + options.onClose?.(); + }, + async onopen(response: { ok: any; headers: any; status: number }) { + if (response.ok && response.headers.get('content-type') === EventStreamContentType) { + return // everything's good + } else if (response.status >= 400 && response.status < 500 && response.status !== 429) { + // client-side errors are usually non-retriable: + callback({error: new Error('Client side error, could not open event connection'), messageEvent: undefined}) + } else { + callback({error: new Error('Unknown error, could not open event connection'), messageEvent: undefined}); + } + }, + }); + + return () => { + controller.abort(); + }; +} + + +function registerSendUnionPayload({ + router, + callback +}: { + router: Router, + callback: ((req: Request, res: Response, next: NextFunction, sendEvent: (message: UnionMessageModule.UnionMessage) => void) => void) | ((req: Request, res: Response, next: NextFunction, sendEvent: (message: UnionMessageModule.UnionMessage) => void) => Promise) +}): void { + const event = '/union/payload'; + router.get(event, async (req, res, next) => { + + res.writeHead(200, { + 'Cache-Control': 'no-cache, no-transform', + 'Content-Type': 'text/event-stream', + Connection: 'keep-alive', + 'Access-Control-Allow-Origin': '*', + }) + const sendEventCallback = (message: UnionMessageModule.UnionMessage) => { + if (res.closed) { + return + } + res.write(`event: ${event}\n`) + res.write(`data: ${UnionMessageModule.marshal(message)}\n\n`) + } + await callback(req, res, next, sendEventCallback) + }) +} + + +/** + * Event source fetch for `union/payload` + * + * @param callback to call when receiving events + * @param options additionally used to handle the event source + * @param skipMessageValidation turn off runtime validation of incoming messages + * @returns A cleanup function to abort the connection + */ +function listenForReceiveUnionPayload({ + callback, + options, + skipMessageValidation = false +}: { + callback: (params: {error?: Error, messageEvent?: UnionMessageModule.UnionMessage}) => void, + options: {authorization?: string, onClose?: (err?: string) => void, baseUrl: string, headers?: Record}, + skipMessageValidation?: boolean +}): (() => void) { + const controller = new AbortController(); + let eventsUrl: string = 'union/payload'; + const url = `${options.baseUrl}/${eventsUrl}` + const requestHeaders: Record = { + ...options.headers ?? {}, + Accept: 'text/event-stream' + } + if(options.authorization) { + requestHeaders['authorization'] = `Bearer ${options?.authorization}`; + } + + const validator = UnionMessageModule.createValidator(); + fetchEventSource(`${url}`, { + method: 'GET', + headers: requestHeaders, + signal: controller.signal, + onmessage: (ev: EventSourceMessage) => { + const receivedData = ev.data; + if(!skipMessageValidation) { + const {valid, errors} = UnionMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + return callback({error: new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), messageEvent: undefined}); + } + } + const callbackData = UnionMessageModule.unmarshal(receivedData); + callback({error: undefined, messageEvent: callbackData}); + }, + onerror: (err) => { + options.onClose?.(err); + }, + onclose: () => { + options.onClose?.(); + }, + async onopen(response: { ok: any; headers: any; status: number }) { + if (response.ok && response.headers.get('content-type') === EventStreamContentType) { + return // everything's good + } else if (response.status >= 400 && response.status < 500 && response.status !== 429) { + // client-side errors are usually non-retriable: + callback({error: new Error('Client side error, could not open event connection'), messageEvent: undefined}) + } else { + callback({error: new Error('Unknown error, could not open event connection'), messageEvent: undefined}); + } + }, + }); + + return () => { + controller.abort(); + }; +} + + +export { registerSendUserSignedup, listenForReceiveUserSignedup, listenForNoParameter, registerNoParameter, registerSendStringPayload, listenForReceiveStringPayload, registerSendArrayPayload, listenForReceiveArrayPayload, registerSendUnionPayload, listenForReceiveUnionPayload }; diff --git a/test/runtime/typescript/src/channels/kafka.ts b/test/runtime/typescript/src/channels/kafka.ts index b1bccf31..7c48276a 100644 --- a/test/runtime/typescript/src/channels/kafka.ts +++ b/test/runtime/typescript/src/channels/kafka.ts @@ -1,4 +1,8 @@ import {UserSignedUp} from './../payloads/UserSignedUp'; +import * as StringMessageModule from './../payloads/StringMessage'; +import * as ArrayMessageModule from './../payloads/ArrayMessage'; +import * as UnionMessageModule from './../payloads/UnionMessage'; +import {AnonymousSchema_9} from './../payloads/AnonymousSchema_9'; import {UserSignedupParameters} from './../parameters/UserSignedupParameters'; import {UserSignedUpHeaders} from './../headers/UserSignedUpHeaders'; import * as Kafka from 'kafkajs'; @@ -256,4 +260,289 @@ onDataCallback(undefined, callbackData, extractedHeaders, kafkaMessage); }); } -export { produceToSendUserSignedup, consumeFromReceiveUserSignedup, produceToNoParameter, consumeFromNoParameter }; +/** + * Kafka publish operation for `string.payload` + * + * @param message to publish + * @param kafka the KafkaJS client to publish from + */ +function produceToSendStringPayload({ + message, + kafka +}: { + message: StringMessageModule.StringMessage, + kafka: Kafka.Kafka +}): Promise { + return new Promise(async (resolve, reject) => { + try { + let dataToSend: any = StringMessageModule.marshal(message); + const producer = kafka.producer(); + await producer.connect(); + + + await producer.send({ + topic: 'string.payload', + messages: [ + { + value: dataToSend + }, + ], + }); + resolve(producer); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Callback for when receiving messages + * + * @callback consumeFromReceiveStringPayloadCallback + * @param err if any error occurred this will be sat + * @param msg that was received + * @param kafkaMsg + */ + +/** + * Kafka subscription for `string.payload` + * + * @param {consumeFromReceiveStringPayloadCallback} onDataCallback to call when messages are received + * @param kafka the KafkaJS client to subscribe through + * @param options when setting up the subscription + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function consumeFromReceiveStringPayload({ + onDataCallback, + kafka, + options = {fromBeginning: true, groupId: ''}, + skipMessageValidation = false +}: { + onDataCallback: (err?: Error, msg?: StringMessageModule.StringMessage, kafkaMsg?: Kafka.EachMessagePayload) => void, + kafka: Kafka.Kafka, + options: {fromBeginning: boolean, groupId: string}, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + if(!options.groupId) { + return reject('No group ID provided'); + } + const consumer = kafka.consumer({ groupId: options.groupId }); + + const validator = StringMessageModule.createValidator(); + await consumer.connect(); + await consumer.subscribe({ topic: 'string.payload', fromBeginning: options.fromBeginning }); + await consumer.run({ + eachMessage: async (kafkaMessage: Kafka.EachMessagePayload) => { + const { topic, message } = kafkaMessage; + const receivedData = message.value?.toString()!; + + if(!skipMessageValidation) { + const {valid, errors} = StringMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + return onDataCallback(new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), undefined, kafkaMessage); + } + } +const callbackData = StringMessageModule.unmarshal(receivedData); +onDataCallback(undefined, callbackData, kafkaMessage); + } + }); + resolve(consumer); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Kafka publish operation for `array.payload` + * + * @param message to publish + * @param kafka the KafkaJS client to publish from + */ +function produceToSendArrayPayload({ + message, + kafka +}: { + message: ArrayMessageModule.ArrayMessage, + kafka: Kafka.Kafka +}): Promise { + return new Promise(async (resolve, reject) => { + try { + let dataToSend: any = ArrayMessageModule.marshal(message); + const producer = kafka.producer(); + await producer.connect(); + + + await producer.send({ + topic: 'array.payload', + messages: [ + { + value: dataToSend + }, + ], + }); + resolve(producer); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Callback for when receiving messages + * + * @callback consumeFromReceiveArrayPayloadCallback + * @param err if any error occurred this will be sat + * @param msg that was received + * @param kafkaMsg + */ + +/** + * Kafka subscription for `array.payload` + * + * @param {consumeFromReceiveArrayPayloadCallback} onDataCallback to call when messages are received + * @param kafka the KafkaJS client to subscribe through + * @param options when setting up the subscription + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function consumeFromReceiveArrayPayload({ + onDataCallback, + kafka, + options = {fromBeginning: true, groupId: ''}, + skipMessageValidation = false +}: { + onDataCallback: (err?: Error, msg?: ArrayMessageModule.ArrayMessage, kafkaMsg?: Kafka.EachMessagePayload) => void, + kafka: Kafka.Kafka, + options: {fromBeginning: boolean, groupId: string}, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + if(!options.groupId) { + return reject('No group ID provided'); + } + const consumer = kafka.consumer({ groupId: options.groupId }); + + const validator = ArrayMessageModule.createValidator(); + await consumer.connect(); + await consumer.subscribe({ topic: 'array.payload', fromBeginning: options.fromBeginning }); + await consumer.run({ + eachMessage: async (kafkaMessage: Kafka.EachMessagePayload) => { + const { topic, message } = kafkaMessage; + const receivedData = message.value?.toString()!; + + if(!skipMessageValidation) { + const {valid, errors} = ArrayMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + return onDataCallback(new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), undefined, kafkaMessage); + } + } +const callbackData = ArrayMessageModule.unmarshal(receivedData); +onDataCallback(undefined, callbackData, kafkaMessage); + } + }); + resolve(consumer); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Kafka publish operation for `union.payload` + * + * @param message to publish + * @param kafka the KafkaJS client to publish from + */ +function produceToSendUnionPayload({ + message, + kafka +}: { + message: UnionMessageModule.UnionMessage, + kafka: Kafka.Kafka +}): Promise { + return new Promise(async (resolve, reject) => { + try { + let dataToSend: any = UnionMessageModule.marshal(message); + const producer = kafka.producer(); + await producer.connect(); + + + await producer.send({ + topic: 'union.payload', + messages: [ + { + value: dataToSend + }, + ], + }); + resolve(producer); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Callback for when receiving messages + * + * @callback consumeFromReceiveUnionPayloadCallback + * @param err if any error occurred this will be sat + * @param msg that was received + * @param kafkaMsg + */ + +/** + * Kafka subscription for `union.payload` + * + * @param {consumeFromReceiveUnionPayloadCallback} onDataCallback to call when messages are received + * @param kafka the KafkaJS client to subscribe through + * @param options when setting up the subscription + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function consumeFromReceiveUnionPayload({ + onDataCallback, + kafka, + options = {fromBeginning: true, groupId: ''}, + skipMessageValidation = false +}: { + onDataCallback: (err?: Error, msg?: UnionMessageModule.UnionMessage, kafkaMsg?: Kafka.EachMessagePayload) => void, + kafka: Kafka.Kafka, + options: {fromBeginning: boolean, groupId: string}, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + if(!options.groupId) { + return reject('No group ID provided'); + } + const consumer = kafka.consumer({ groupId: options.groupId }); + + const validator = UnionMessageModule.createValidator(); + await consumer.connect(); + await consumer.subscribe({ topic: 'union.payload', fromBeginning: options.fromBeginning }); + await consumer.run({ + eachMessage: async (kafkaMessage: Kafka.EachMessagePayload) => { + const { topic, message } = kafkaMessage; + const receivedData = message.value?.toString()!; + + if(!skipMessageValidation) { + const {valid, errors} = UnionMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + return onDataCallback(new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), undefined, kafkaMessage); + } + } +const callbackData = UnionMessageModule.unmarshal(receivedData); +onDataCallback(undefined, callbackData, kafkaMessage); + } + }); + resolve(consumer); + } catch (e: any) { + reject(e); + } + }); +} + +export { produceToSendUserSignedup, consumeFromReceiveUserSignedup, produceToNoParameter, consumeFromNoParameter, produceToSendStringPayload, consumeFromReceiveStringPayload, produceToSendArrayPayload, consumeFromReceiveArrayPayload, produceToSendUnionPayload, consumeFromReceiveUnionPayload }; diff --git a/test/runtime/typescript/src/channels/mqtt.ts b/test/runtime/typescript/src/channels/mqtt.ts index 4e215e06..796eb1e6 100644 --- a/test/runtime/typescript/src/channels/mqtt.ts +++ b/test/runtime/typescript/src/channels/mqtt.ts @@ -1,4 +1,8 @@ import {UserSignedUp} from './../payloads/UserSignedUp'; +import * as StringMessageModule from './../payloads/StringMessage'; +import * as ArrayMessageModule from './../payloads/ArrayMessage'; +import * as UnionMessageModule from './../payloads/UnionMessage'; +import {AnonymousSchema_9} from './../payloads/AnonymousSchema_9'; import {UserSignedupParameters} from './../parameters/UserSignedupParameters'; import {UserSignedUpHeaders} from './../headers/UserSignedUpHeaders'; import * as Mqtt from 'mqtt'; @@ -30,7 +34,7 @@ function publishToSendUserSignedup({ if (headers) { const headerData = headers.marshal(); const parsedHeaders = typeof headerData === 'string' ? JSON.parse(headerData) : headerData; - const userProperties = {}; + const userProperties: Record = {}; for (const [key, value] of Object.entries(parsedHeaders)) { if (value !== undefined) { userProperties[key] = String(value); @@ -153,7 +157,7 @@ function publishToNoParameter({ if (headers) { const headerData = headers.marshal(); const parsedHeaders = typeof headerData === 'string' ? JSON.parse(headerData) : headerData; - const userProperties = {}; + const userProperties: Record = {}; for (const [key, value] of Object.entries(parsedHeaders)) { if (value !== undefined) { userProperties[key] = String(value); @@ -248,4 +252,283 @@ function subscribeToNoParameter({ }); } -export { publishToSendUserSignedup, subscribeToReceiveUserSignedup, publishToNoParameter, subscribeToNoParameter }; +/** + * MQTT publish operation for `string/payload` + * + * @param message to publish + * @param mqtt the MQTT client to publish from + */ +function publishToSendStringPayload({ + message, + mqtt +}: { + message: StringMessageModule.StringMessage, + mqtt: Mqtt.MqttClient +}): Promise { + return new Promise(async (resolve, reject) => { + try { + let dataToSend: any = StringMessageModule.marshal(message); + let publishOptions: Mqtt.IClientPublishOptions = {}; + mqtt.publish('string/payload', dataToSend, publishOptions); + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Callback for when receiving messages + * + * @callback subscribeToReceiveStringPayloadCallback + * @param err if any error occurred this will be set + * @param msg that was received + * @param mqttMsg the raw MQTT message packet + */ + +/** + * MQTT subscription for `string/payload` + * + * @param onDataCallback to call when messages are received + * @param mqtt the MQTT client to subscribe with + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function subscribeToReceiveStringPayload({ + onDataCallback, + mqtt, + skipMessageValidation = false +}: { + onDataCallback: (params: {err?: Error, msg?: StringMessageModule.StringMessage, mqttMsg?: Mqtt.IPublishPacket}) => void, + mqtt: Mqtt.MqttClient, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + const validator = StringMessageModule.createValidator(); + + // Set up message listener + const messageHandler = (topic: string, message: Buffer, packet: Mqtt.IPublishPacket) => { + + // Check if the received topic matches this subscription's pattern + const topicPattern = /^string\/payload$/; + if (!topicPattern.test(topic)) { + return; // Ignore messages not matching this subscription's topic pattern + } + + const receivedData = message.toString(); + + + + try { + const parsedMessage = StringMessageModule.unmarshal(receivedData); + if(!skipMessageValidation) { + const {valid, errors} = StringMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + onDataCallback({err: new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), msg: undefined, mqttMsg: packet}); return; + } + } + onDataCallback({err: undefined, msg: parsedMessage, mqttMsg: packet}); + } catch (err: any) { + onDataCallback({err: new Error(`Failed to parse message: ${err.message}`), msg: undefined, mqttMsg: packet}); + } + }; + + mqtt.on('message', messageHandler); + + // Subscribe to the topic + await mqtt.subscribeAsync('string/payload'); + + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * MQTT publish operation for `array/payload` + * + * @param message to publish + * @param mqtt the MQTT client to publish from + */ +function publishToSendArrayPayload({ + message, + mqtt +}: { + message: ArrayMessageModule.ArrayMessage, + mqtt: Mqtt.MqttClient +}): Promise { + return new Promise(async (resolve, reject) => { + try { + let dataToSend: any = ArrayMessageModule.marshal(message); + let publishOptions: Mqtt.IClientPublishOptions = {}; + mqtt.publish('array/payload', dataToSend, publishOptions); + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Callback for when receiving messages + * + * @callback subscribeToReceiveArrayPayloadCallback + * @param err if any error occurred this will be set + * @param msg that was received + * @param mqttMsg the raw MQTT message packet + */ + +/** + * MQTT subscription for `array/payload` + * + * @param onDataCallback to call when messages are received + * @param mqtt the MQTT client to subscribe with + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function subscribeToReceiveArrayPayload({ + onDataCallback, + mqtt, + skipMessageValidation = false +}: { + onDataCallback: (params: {err?: Error, msg?: ArrayMessageModule.ArrayMessage, mqttMsg?: Mqtt.IPublishPacket}) => void, + mqtt: Mqtt.MqttClient, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + const validator = ArrayMessageModule.createValidator(); + + // Set up message listener + const messageHandler = (topic: string, message: Buffer, packet: Mqtt.IPublishPacket) => { + + // Check if the received topic matches this subscription's pattern + const topicPattern = /^array\/payload$/; + if (!topicPattern.test(topic)) { + return; // Ignore messages not matching this subscription's topic pattern + } + + const receivedData = message.toString(); + + + + try { + const parsedMessage = ArrayMessageModule.unmarshal(receivedData); + if(!skipMessageValidation) { + const {valid, errors} = ArrayMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + onDataCallback({err: new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), msg: undefined, mqttMsg: packet}); return; + } + } + onDataCallback({err: undefined, msg: parsedMessage, mqttMsg: packet}); + } catch (err: any) { + onDataCallback({err: new Error(`Failed to parse message: ${err.message}`), msg: undefined, mqttMsg: packet}); + } + }; + + mqtt.on('message', messageHandler); + + // Subscribe to the topic + await mqtt.subscribeAsync('array/payload'); + + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * MQTT publish operation for `union/payload` + * + * @param message to publish + * @param mqtt the MQTT client to publish from + */ +function publishToSendUnionPayload({ + message, + mqtt +}: { + message: UnionMessageModule.UnionMessage, + mqtt: Mqtt.MqttClient +}): Promise { + return new Promise(async (resolve, reject) => { + try { + let dataToSend: any = UnionMessageModule.marshal(message); + let publishOptions: Mqtt.IClientPublishOptions = {}; + mqtt.publish('union/payload', dataToSend, publishOptions); + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Callback for when receiving messages + * + * @callback subscribeToReceiveUnionPayloadCallback + * @param err if any error occurred this will be set + * @param msg that was received + * @param mqttMsg the raw MQTT message packet + */ + +/** + * MQTT subscription for `union/payload` + * + * @param onDataCallback to call when messages are received + * @param mqtt the MQTT client to subscribe with + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function subscribeToReceiveUnionPayload({ + onDataCallback, + mqtt, + skipMessageValidation = false +}: { + onDataCallback: (params: {err?: Error, msg?: UnionMessageModule.UnionMessage, mqttMsg?: Mqtt.IPublishPacket}) => void, + mqtt: Mqtt.MqttClient, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + const validator = UnionMessageModule.createValidator(); + + // Set up message listener + const messageHandler = (topic: string, message: Buffer, packet: Mqtt.IPublishPacket) => { + + // Check if the received topic matches this subscription's pattern + const topicPattern = /^union\/payload$/; + if (!topicPattern.test(topic)) { + return; // Ignore messages not matching this subscription's topic pattern + } + + const receivedData = message.toString(); + + + + try { + const parsedMessage = UnionMessageModule.unmarshal(receivedData); + if(!skipMessageValidation) { + const {valid, errors} = UnionMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + onDataCallback({err: new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), msg: undefined, mqttMsg: packet}); return; + } + } + onDataCallback({err: undefined, msg: parsedMessage, mqttMsg: packet}); + } catch (err: any) { + onDataCallback({err: new Error(`Failed to parse message: ${err.message}`), msg: undefined, mqttMsg: packet}); + } + }; + + mqtt.on('message', messageHandler); + + // Subscribe to the topic + await mqtt.subscribeAsync('union/payload'); + + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +export { publishToSendUserSignedup, subscribeToReceiveUserSignedup, publishToNoParameter, subscribeToNoParameter, publishToSendStringPayload, subscribeToReceiveStringPayload, publishToSendArrayPayload, subscribeToReceiveArrayPayload, publishToSendUnionPayload, subscribeToReceiveUnionPayload }; diff --git a/test/runtime/typescript/src/channels/nats.ts b/test/runtime/typescript/src/channels/nats.ts index f874cb33..e3007d66 100644 --- a/test/runtime/typescript/src/channels/nats.ts +++ b/test/runtime/typescript/src/channels/nats.ts @@ -1,4 +1,8 @@ import {UserSignedUp} from './../payloads/UserSignedUp'; +import * as StringMessageModule from './../payloads/StringMessage'; +import * as ArrayMessageModule from './../payloads/ArrayMessage'; +import * as UnionMessageModule from './../payloads/UnionMessage'; +import {AnonymousSchema_9} from './../payloads/AnonymousSchema_9'; import {UserSignedupParameters} from './../parameters/UserSignedupParameters'; import {UserSignedUpHeaders} from './../headers/UserSignedUpHeaders'; import * as Nats from 'nats'; @@ -649,4 +653,691 @@ await js.publish('noparameters', dataToSend, options); }); } -export { publishToSendUserSignedup, jetStreamPublishToSendUserSignedup, subscribeToReceiveUserSignedup, jetStreamPullSubscribeToReceiveUserSignedup, jetStreamPushSubscriptionFromReceiveUserSignedup, publishToNoParameter, subscribeToNoParameter, jetStreamPullSubscribeToNoParameter, jetStreamPushSubscriptionFromNoParameter, jetStreamPublishToNoParameter }; +/** + * NATS publish operation for `string.payload` + * + * @param message to publish + * @param nc the NATS client to publish from + * @param codec the serialization codec to use while transmitting the message + * @param options to use while publishing the message + */ +function publishToSendStringPayload({ + message, + nc, + codec = Nats.JSONCodec(), + options +}: { + message: StringMessageModule.StringMessage, + nc: Nats.NatsConnection, + codec?: Nats.Codec, + options?: Nats.PublishOptions +}): Promise { + return new Promise(async (resolve, reject) => { + try { + let dataToSend: any = StringMessageModule.marshal(message); + +dataToSend = codec.encode(dataToSend); +nc.publish('string.payload', dataToSend, options); + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * JetStream publish operation for `string.payload` + * + * @param message to publish over jetstream + * @param js the JetStream client to publish from + * @param codec the serialization codec to use while transmitting the message + * @param options to use while publishing the message + */ +function jetStreamPublishToSendStringPayload({ + message, + js, + codec = Nats.JSONCodec(), + options = {} +}: { + message: StringMessageModule.StringMessage, + js: Nats.JetStreamClient, + codec?: Nats.Codec, + options?: Partial +}): Promise { + return new Promise(async (resolve, reject) => { + try { + let dataToSend: any = StringMessageModule.marshal(message); + +dataToSend = codec.encode(dataToSend); +await js.publish('string.payload', dataToSend, options); + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Callback for when receiving messages + * + * @callback subscribeToReceiveStringPayloadCallback + * @param err if any error occurred this will be sat + * @param msg that was received + * @param natsMsg + */ + +/** + * Core subscription for `string.payload` + * + * @param {subscribeToReceiveStringPayloadCallback} onDataCallback to call when messages are received + * @param nc the nats client to setup the subscribe for + * @param codec the serialization codec to use while receiving the message + * @param options when setting up the subscription + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function subscribeToReceiveStringPayload({ + onDataCallback, + nc, + codec = Nats.JSONCodec(), + options, + skipMessageValidation = false +}: { + onDataCallback: (err?: Error, msg?: StringMessageModule.StringMessage, natsMsg?: Nats.Msg) => void, + nc: Nats.NatsConnection, + codec?: Nats.Codec, + options?: Nats.SubscriptionOptions, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + const subscription = nc.subscribe('string.payload', options); + const validator = StringMessageModule.createValidator(); + (async () => { + for await (const msg of subscription) { + + let receivedData: any = codec.decode(msg.data); +if(!skipMessageValidation) { + const {valid, errors} = StringMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + onDataCallback(new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), undefined, msg); continue; + } + } +onDataCallback(undefined, StringMessageModule.unmarshal(receivedData), msg); + } + })(); + resolve(subscription); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Callback for when receiving messages + * + * @callback jetStreamPullSubscribeToReceiveStringPayloadCallback + * @param err if any error occurred this will be sat + * @param msg that was received + * @param jetstreamMsg + */ + +/** + * JetStream pull subscription for `string.payload` + * + * @param {jetStreamPullSubscribeToReceiveStringPayloadCallback} onDataCallback to call when messages are received + * @param js the JetStream client to pull subscribe through + * @param options when setting up the subscription + * @param codec the serialization codec to use while transmitting the message + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function jetStreamPullSubscribeToReceiveStringPayload({ + onDataCallback, + js, + options, + codec = Nats.JSONCodec(), + skipMessageValidation = false +}: { + onDataCallback: (err?: Error, msg?: StringMessageModule.StringMessage, jetstreamMsg?: Nats.JsMsg) => void, + js: Nats.JetStreamClient, + options: Nats.ConsumerOptsBuilder | Partial, + codec?: Nats.Codec, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + const subscription = await js.pullSubscribe('string.payload', options); + const validator = StringMessageModule.createValidator(); + (async () => { + for await (const msg of subscription) { + + let receivedData: any = codec.decode(msg.data); +if(!skipMessageValidation) { + const {valid, errors} = StringMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + onDataCallback(new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), undefined, msg); continue; + } + } +onDataCallback(undefined, StringMessageModule.unmarshal(receivedData), msg); + } + })(); + resolve(subscription); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Callback for when receiving messages + * + * @callback jetStreamPushSubscriptionFromReceiveStringPayloadCallback + * @param err if any error occurred this will be sat + * @param msg that was received + * @param jetstreamMsg + */ + +/** + * JetStream push subscription for `string.payload` + * + * @param {jetStreamPushSubscriptionFromReceiveStringPayloadCallback} onDataCallback to call when messages are received + * @param js the JetStream client to pull subscribe through + * @param options when setting up the subscription + * @param codec the serialization codec to use while transmitting the message + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function jetStreamPushSubscriptionFromReceiveStringPayload({ + onDataCallback, + js, + options, + codec = Nats.JSONCodec(), + skipMessageValidation = false +}: { + onDataCallback: (err?: Error, msg?: StringMessageModule.StringMessage, jetstreamMsg?: Nats.JsMsg) => void, + js: Nats.JetStreamClient, + options: Nats.ConsumerOptsBuilder | Partial, + codec?: Nats.Codec, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + const subscription = await js.subscribe('string.payload', options); + const validator = StringMessageModule.createValidator(); + (async () => { + for await (const msg of subscription) { + + let receivedData: any = codec.decode(msg.data); +if(!skipMessageValidation) { + const {valid, errors} = StringMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + onDataCallback(new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), undefined, msg); continue; + } + } +onDataCallback(undefined, StringMessageModule.unmarshal(receivedData), msg); + } + })(); + resolve(subscription); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * NATS publish operation for `array.payload` + * + * @param message to publish + * @param nc the NATS client to publish from + * @param codec the serialization codec to use while transmitting the message + * @param options to use while publishing the message + */ +function publishToSendArrayPayload({ + message, + nc, + codec = Nats.JSONCodec(), + options +}: { + message: ArrayMessageModule.ArrayMessage, + nc: Nats.NatsConnection, + codec?: Nats.Codec, + options?: Nats.PublishOptions +}): Promise { + return new Promise(async (resolve, reject) => { + try { + let dataToSend: any = ArrayMessageModule.marshal(message); + +dataToSend = codec.encode(dataToSend); +nc.publish('array.payload', dataToSend, options); + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * JetStream publish operation for `array.payload` + * + * @param message to publish over jetstream + * @param js the JetStream client to publish from + * @param codec the serialization codec to use while transmitting the message + * @param options to use while publishing the message + */ +function jetStreamPublishToSendArrayPayload({ + message, + js, + codec = Nats.JSONCodec(), + options = {} +}: { + message: ArrayMessageModule.ArrayMessage, + js: Nats.JetStreamClient, + codec?: Nats.Codec, + options?: Partial +}): Promise { + return new Promise(async (resolve, reject) => { + try { + let dataToSend: any = ArrayMessageModule.marshal(message); + +dataToSend = codec.encode(dataToSend); +await js.publish('array.payload', dataToSend, options); + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Callback for when receiving messages + * + * @callback subscribeToReceiveArrayPayloadCallback + * @param err if any error occurred this will be sat + * @param msg that was received + * @param natsMsg + */ + +/** + * Core subscription for `array.payload` + * + * @param {subscribeToReceiveArrayPayloadCallback} onDataCallback to call when messages are received + * @param nc the nats client to setup the subscribe for + * @param codec the serialization codec to use while receiving the message + * @param options when setting up the subscription + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function subscribeToReceiveArrayPayload({ + onDataCallback, + nc, + codec = Nats.JSONCodec(), + options, + skipMessageValidation = false +}: { + onDataCallback: (err?: Error, msg?: ArrayMessageModule.ArrayMessage, natsMsg?: Nats.Msg) => void, + nc: Nats.NatsConnection, + codec?: Nats.Codec, + options?: Nats.SubscriptionOptions, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + const subscription = nc.subscribe('array.payload', options); + const validator = ArrayMessageModule.createValidator(); + (async () => { + for await (const msg of subscription) { + + let receivedData: any = codec.decode(msg.data); +if(!skipMessageValidation) { + const {valid, errors} = ArrayMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + onDataCallback(new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), undefined, msg); continue; + } + } +onDataCallback(undefined, ArrayMessageModule.unmarshal(receivedData), msg); + } + })(); + resolve(subscription); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Callback for when receiving messages + * + * @callback jetStreamPullSubscribeToReceiveArrayPayloadCallback + * @param err if any error occurred this will be sat + * @param msg that was received + * @param jetstreamMsg + */ + +/** + * JetStream pull subscription for `array.payload` + * + * @param {jetStreamPullSubscribeToReceiveArrayPayloadCallback} onDataCallback to call when messages are received + * @param js the JetStream client to pull subscribe through + * @param options when setting up the subscription + * @param codec the serialization codec to use while transmitting the message + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function jetStreamPullSubscribeToReceiveArrayPayload({ + onDataCallback, + js, + options, + codec = Nats.JSONCodec(), + skipMessageValidation = false +}: { + onDataCallback: (err?: Error, msg?: ArrayMessageModule.ArrayMessage, jetstreamMsg?: Nats.JsMsg) => void, + js: Nats.JetStreamClient, + options: Nats.ConsumerOptsBuilder | Partial, + codec?: Nats.Codec, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + const subscription = await js.pullSubscribe('array.payload', options); + const validator = ArrayMessageModule.createValidator(); + (async () => { + for await (const msg of subscription) { + + let receivedData: any = codec.decode(msg.data); +if(!skipMessageValidation) { + const {valid, errors} = ArrayMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + onDataCallback(new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), undefined, msg); continue; + } + } +onDataCallback(undefined, ArrayMessageModule.unmarshal(receivedData), msg); + } + })(); + resolve(subscription); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Callback for when receiving messages + * + * @callback jetStreamPushSubscriptionFromReceiveArrayPayloadCallback + * @param err if any error occurred this will be sat + * @param msg that was received + * @param jetstreamMsg + */ + +/** + * JetStream push subscription for `array.payload` + * + * @param {jetStreamPushSubscriptionFromReceiveArrayPayloadCallback} onDataCallback to call when messages are received + * @param js the JetStream client to pull subscribe through + * @param options when setting up the subscription + * @param codec the serialization codec to use while transmitting the message + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function jetStreamPushSubscriptionFromReceiveArrayPayload({ + onDataCallback, + js, + options, + codec = Nats.JSONCodec(), + skipMessageValidation = false +}: { + onDataCallback: (err?: Error, msg?: ArrayMessageModule.ArrayMessage, jetstreamMsg?: Nats.JsMsg) => void, + js: Nats.JetStreamClient, + options: Nats.ConsumerOptsBuilder | Partial, + codec?: Nats.Codec, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + const subscription = await js.subscribe('array.payload', options); + const validator = ArrayMessageModule.createValidator(); + (async () => { + for await (const msg of subscription) { + + let receivedData: any = codec.decode(msg.data); +if(!skipMessageValidation) { + const {valid, errors} = ArrayMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + onDataCallback(new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), undefined, msg); continue; + } + } +onDataCallback(undefined, ArrayMessageModule.unmarshal(receivedData), msg); + } + })(); + resolve(subscription); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * NATS publish operation for `union.payload` + * + * @param message to publish + * @param nc the NATS client to publish from + * @param codec the serialization codec to use while transmitting the message + * @param options to use while publishing the message + */ +function publishToSendUnionPayload({ + message, + nc, + codec = Nats.JSONCodec(), + options +}: { + message: UnionMessageModule.UnionMessage, + nc: Nats.NatsConnection, + codec?: Nats.Codec, + options?: Nats.PublishOptions +}): Promise { + return new Promise(async (resolve, reject) => { + try { + let dataToSend: any = UnionMessageModule.marshal(message); + +dataToSend = codec.encode(dataToSend); +nc.publish('union.payload', dataToSend, options); + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * JetStream publish operation for `union.payload` + * + * @param message to publish over jetstream + * @param js the JetStream client to publish from + * @param codec the serialization codec to use while transmitting the message + * @param options to use while publishing the message + */ +function jetStreamPublishToSendUnionPayload({ + message, + js, + codec = Nats.JSONCodec(), + options = {} +}: { + message: UnionMessageModule.UnionMessage, + js: Nats.JetStreamClient, + codec?: Nats.Codec, + options?: Partial +}): Promise { + return new Promise(async (resolve, reject) => { + try { + let dataToSend: any = UnionMessageModule.marshal(message); + +dataToSend = codec.encode(dataToSend); +await js.publish('union.payload', dataToSend, options); + resolve(); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Callback for when receiving messages + * + * @callback subscribeToReceiveUnionPayloadCallback + * @param err if any error occurred this will be sat + * @param msg that was received + * @param natsMsg + */ + +/** + * Core subscription for `union.payload` + * + * @param {subscribeToReceiveUnionPayloadCallback} onDataCallback to call when messages are received + * @param nc the nats client to setup the subscribe for + * @param codec the serialization codec to use while receiving the message + * @param options when setting up the subscription + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function subscribeToReceiveUnionPayload({ + onDataCallback, + nc, + codec = Nats.JSONCodec(), + options, + skipMessageValidation = false +}: { + onDataCallback: (err?: Error, msg?: UnionMessageModule.UnionMessage, natsMsg?: Nats.Msg) => void, + nc: Nats.NatsConnection, + codec?: Nats.Codec, + options?: Nats.SubscriptionOptions, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + const subscription = nc.subscribe('union.payload', options); + const validator = UnionMessageModule.createValidator(); + (async () => { + for await (const msg of subscription) { + + let receivedData: any = codec.decode(msg.data); +if(!skipMessageValidation) { + const {valid, errors} = UnionMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + onDataCallback(new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), undefined, msg); continue; + } + } +onDataCallback(undefined, UnionMessageModule.unmarshal(receivedData), msg); + } + })(); + resolve(subscription); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Callback for when receiving messages + * + * @callback jetStreamPullSubscribeToReceiveUnionPayloadCallback + * @param err if any error occurred this will be sat + * @param msg that was received + * @param jetstreamMsg + */ + +/** + * JetStream pull subscription for `union.payload` + * + * @param {jetStreamPullSubscribeToReceiveUnionPayloadCallback} onDataCallback to call when messages are received + * @param js the JetStream client to pull subscribe through + * @param options when setting up the subscription + * @param codec the serialization codec to use while transmitting the message + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function jetStreamPullSubscribeToReceiveUnionPayload({ + onDataCallback, + js, + options, + codec = Nats.JSONCodec(), + skipMessageValidation = false +}: { + onDataCallback: (err?: Error, msg?: UnionMessageModule.UnionMessage, jetstreamMsg?: Nats.JsMsg) => void, + js: Nats.JetStreamClient, + options: Nats.ConsumerOptsBuilder | Partial, + codec?: Nats.Codec, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + const subscription = await js.pullSubscribe('union.payload', options); + const validator = UnionMessageModule.createValidator(); + (async () => { + for await (const msg of subscription) { + + let receivedData: any = codec.decode(msg.data); +if(!skipMessageValidation) { + const {valid, errors} = UnionMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + onDataCallback(new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), undefined, msg); continue; + } + } +onDataCallback(undefined, UnionMessageModule.unmarshal(receivedData), msg); + } + })(); + resolve(subscription); + } catch (e: any) { + reject(e); + } + }); +} + +/** + * Callback for when receiving messages + * + * @callback jetStreamPushSubscriptionFromReceiveUnionPayloadCallback + * @param err if any error occurred this will be sat + * @param msg that was received + * @param jetstreamMsg + */ + +/** + * JetStream push subscription for `union.payload` + * + * @param {jetStreamPushSubscriptionFromReceiveUnionPayloadCallback} onDataCallback to call when messages are received + * @param js the JetStream client to pull subscribe through + * @param options when setting up the subscription + * @param codec the serialization codec to use while transmitting the message + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function jetStreamPushSubscriptionFromReceiveUnionPayload({ + onDataCallback, + js, + options, + codec = Nats.JSONCodec(), + skipMessageValidation = false +}: { + onDataCallback: (err?: Error, msg?: UnionMessageModule.UnionMessage, jetstreamMsg?: Nats.JsMsg) => void, + js: Nats.JetStreamClient, + options: Nats.ConsumerOptsBuilder | Partial, + codec?: Nats.Codec, + skipMessageValidation?: boolean +}): Promise { + return new Promise(async (resolve, reject) => { + try { + const subscription = await js.subscribe('union.payload', options); + const validator = UnionMessageModule.createValidator(); + (async () => { + for await (const msg of subscription) { + + let receivedData: any = codec.decode(msg.data); +if(!skipMessageValidation) { + const {valid, errors} = UnionMessageModule.validate({data: receivedData, ajvValidatorFunction: validator}); + if(!valid) { + onDataCallback(new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), undefined, msg); continue; + } + } +onDataCallback(undefined, UnionMessageModule.unmarshal(receivedData), msg); + } + })(); + resolve(subscription); + } catch (e: any) { + reject(e); + } + }); +} + +export { publishToSendUserSignedup, jetStreamPublishToSendUserSignedup, subscribeToReceiveUserSignedup, jetStreamPullSubscribeToReceiveUserSignedup, jetStreamPushSubscriptionFromReceiveUserSignedup, publishToNoParameter, subscribeToNoParameter, jetStreamPullSubscribeToNoParameter, jetStreamPushSubscriptionFromNoParameter, jetStreamPublishToNoParameter, publishToSendStringPayload, jetStreamPublishToSendStringPayload, subscribeToReceiveStringPayload, jetStreamPullSubscribeToReceiveStringPayload, jetStreamPushSubscriptionFromReceiveStringPayload, publishToSendArrayPayload, jetStreamPublishToSendArrayPayload, subscribeToReceiveArrayPayload, jetStreamPullSubscribeToReceiveArrayPayload, jetStreamPushSubscriptionFromReceiveArrayPayload, publishToSendUnionPayload, jetStreamPublishToSendUnionPayload, subscribeToReceiveUnionPayload, jetStreamPullSubscribeToReceiveUnionPayload, jetStreamPushSubscriptionFromReceiveUnionPayload }; diff --git a/test/runtime/typescript/src/channels/websocket.ts b/test/runtime/typescript/src/channels/websocket.ts index a0c4f302..a69fb42f 100644 --- a/test/runtime/typescript/src/channels/websocket.ts +++ b/test/runtime/typescript/src/channels/websocket.ts @@ -1,4 +1,8 @@ import {UserSignedUp} from './../payloads/UserSignedUp'; +import * as StringMessageModule from './../payloads/StringMessage'; +import * as ArrayMessageModule from './../payloads/ArrayMessage'; +import * as UnionMessageModule from './../payloads/UnionMessage'; +import {AnonymousSchema_9} from './../payloads/AnonymousSchema_9'; import {UserSignedupParameters} from './../parameters/UserSignedupParameters'; import {UserSignedUpHeaders} from './../headers/UserSignedUpHeaders'; import * as WebSocket from 'ws'; @@ -366,4 +370,529 @@ function registerNoParameter({ }); } -export { publishToSendUserSignedup, registerSendUserSignedup, subscribeToReceiveUserSignedup, publishToNoParameter, subscribeToNoParameter, registerNoParameter }; +/** + * WebSocket client-side function to publish messages to `/string/payload` + * + * @param message to publish + * @param ws the WebSocket connection (assumed to be already connected) + */ +function publishToSendStringPayload({ + message, + ws +}: { + message: StringMessageModule.StringMessage, + ws: WebSocket.WebSocket +}): Promise { + return new Promise((resolve, reject) => { + // Check if WebSocket is open + if (ws.readyState !== WebSocket.WebSocket.OPEN) { + reject(new Error('WebSocket is not open')); + return; + } + + // Send message directly + ws.send(StringMessageModule.marshal(message), (err) => { + if (err) { + reject(new Error(`Failed to send message: ${err.message}`)); + } + resolve(); + }); + }); +} + +/** + * WebSocket server-side function to handle messages for `/string/payload` + * + * @param wss the WebSocket server instance + * @param onConnection callback when a client connects to this channel + * @param onMessage callback when a message is received on this channel + */ +function registerSendStringPayload({ + wss, + onConnection, + onMessage +}: { + wss: WebSocket.WebSocketServer, + onConnection: (params: {ws: WebSocket.WebSocket, request: IncomingMessage}) => void, + onMessage: (params: {message: StringMessageModule.StringMessage, ws: WebSocket.WebSocket}) => void +}): void { + const channelPattern = /^\/string\/payload(?:\?.*)?$/; + + wss.on('connection', (ws: WebSocket.WebSocket, request: IncomingMessage) => { + try { + const url = request.url || ''; + const match = url.match(channelPattern); + if (match) { + try { + + onConnection({ + ws, + request + }); + } catch (connectionError) { + console.error('Error in onConnection callback:', connectionError); + ws.close(1011, 'Connection error'); + return; + } + + ws.on('message', (data: WebSocket.RawData) => { + try { + const receivedData = data.toString(); + const parsedMessage = StringMessageModule.unmarshal(receivedData); + onMessage({ + message: parsedMessage, + ws + }); + } catch (error: any) { + // Ignore parsing errors + } + }); + } + } catch (error: any) { + ws.close(1011, 'Server error'); + } + }); +} + +/** + * WebSocket client-side function to subscribe to messages from `/string/payload` + * + * @param onDataCallback callback when messages are received + * @param ws the WebSocket connection (assumed to be already connected) + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function subscribeToReceiveStringPayload({ + onDataCallback, + ws, + skipMessageValidation = false +}: { + onDataCallback: (params: {err?: Error, msg?: StringMessageModule.StringMessage, ws?: WebSocket.WebSocket}) => void, + ws: WebSocket.WebSocket, + skipMessageValidation?: boolean +}): void { + try { + // Check if WebSocket is open + if (ws.readyState !== WebSocket.WebSocket.OPEN) { + onDataCallback({ + err: new Error('WebSocket is not open'), + msg: undefined, + ws + }); + return; + } + + const validator = StringMessageModule.createValidator(); + + ws.on('message', (data: WebSocket.RawData) => { + try { + const receivedData = data.toString(); + const parsedMessage = StringMessageModule.unmarshal(receivedData); + + // Validate message if validation is enabled + if (!skipMessageValidation) { + const messageToValidate = StringMessageModule.marshal(parsedMessage); + const {valid, errors} = StringMessageModule.validate({data: messageToValidate, ajvValidatorFunction: validator}); + if (!valid) { + onDataCallback({ + err: new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), + msg: undefined, + ws + }); + return; + } + } + + onDataCallback({ + err: undefined, + msg: parsedMessage, + ws + }); + + } catch (error: any) { + onDataCallback({ + err: new Error(`Failed to parse message: ${error.message}`), + msg: undefined, + ws + }); + } + }); + + ws.on('error', (error: Error) => { + onDataCallback({ + err: new Error(`WebSocket error: ${error.message}`), + msg: undefined, + ws + }); + }); + + ws.on('close', (code: number, reason: Buffer) => { + // Only report as error if it's not a normal closure (1000) or going away (1001) + if (code !== 1000 && code !== 1001 && code !== 1005) { // 1005 is no status received + onDataCallback({ + err: new Error(`WebSocket closed unexpectedly: ${code} ${reason.toString()}`), + msg: undefined, + ws + }); + } + }); + + } catch (error: any) { + onDataCallback({ + err: new Error(`Failed to set up WebSocket subscription: ${error.message}`), + msg: undefined, + ws + }); + } +} + +/** + * WebSocket client-side function to publish messages to `/array/payload` + * + * @param message to publish + * @param ws the WebSocket connection (assumed to be already connected) + */ +function publishToSendArrayPayload({ + message, + ws +}: { + message: ArrayMessageModule.ArrayMessage, + ws: WebSocket.WebSocket +}): Promise { + return new Promise((resolve, reject) => { + // Check if WebSocket is open + if (ws.readyState !== WebSocket.WebSocket.OPEN) { + reject(new Error('WebSocket is not open')); + return; + } + + // Send message directly + ws.send(ArrayMessageModule.marshal(message), (err) => { + if (err) { + reject(new Error(`Failed to send message: ${err.message}`)); + } + resolve(); + }); + }); +} + +/** + * WebSocket server-side function to handle messages for `/array/payload` + * + * @param wss the WebSocket server instance + * @param onConnection callback when a client connects to this channel + * @param onMessage callback when a message is received on this channel + */ +function registerSendArrayPayload({ + wss, + onConnection, + onMessage +}: { + wss: WebSocket.WebSocketServer, + onConnection: (params: {ws: WebSocket.WebSocket, request: IncomingMessage}) => void, + onMessage: (params: {message: ArrayMessageModule.ArrayMessage, ws: WebSocket.WebSocket}) => void +}): void { + const channelPattern = /^\/array\/payload(?:\?.*)?$/; + + wss.on('connection', (ws: WebSocket.WebSocket, request: IncomingMessage) => { + try { + const url = request.url || ''; + const match = url.match(channelPattern); + if (match) { + try { + + onConnection({ + ws, + request + }); + } catch (connectionError) { + console.error('Error in onConnection callback:', connectionError); + ws.close(1011, 'Connection error'); + return; + } + + ws.on('message', (data: WebSocket.RawData) => { + try { + const receivedData = data.toString(); + const parsedMessage = ArrayMessageModule.unmarshal(receivedData); + onMessage({ + message: parsedMessage, + ws + }); + } catch (error: any) { + // Ignore parsing errors + } + }); + } + } catch (error: any) { + ws.close(1011, 'Server error'); + } + }); +} + +/** + * WebSocket client-side function to subscribe to messages from `/array/payload` + * + * @param onDataCallback callback when messages are received + * @param ws the WebSocket connection (assumed to be already connected) + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function subscribeToReceiveArrayPayload({ + onDataCallback, + ws, + skipMessageValidation = false +}: { + onDataCallback: (params: {err?: Error, msg?: ArrayMessageModule.ArrayMessage, ws?: WebSocket.WebSocket}) => void, + ws: WebSocket.WebSocket, + skipMessageValidation?: boolean +}): void { + try { + // Check if WebSocket is open + if (ws.readyState !== WebSocket.WebSocket.OPEN) { + onDataCallback({ + err: new Error('WebSocket is not open'), + msg: undefined, + ws + }); + return; + } + + const validator = ArrayMessageModule.createValidator(); + + ws.on('message', (data: WebSocket.RawData) => { + try { + const receivedData = data.toString(); + const parsedMessage = ArrayMessageModule.unmarshal(receivedData); + + // Validate message if validation is enabled + if (!skipMessageValidation) { + const messageToValidate = ArrayMessageModule.marshal(parsedMessage); + const {valid, errors} = ArrayMessageModule.validate({data: messageToValidate, ajvValidatorFunction: validator}); + if (!valid) { + onDataCallback({ + err: new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), + msg: undefined, + ws + }); + return; + } + } + + onDataCallback({ + err: undefined, + msg: parsedMessage, + ws + }); + + } catch (error: any) { + onDataCallback({ + err: new Error(`Failed to parse message: ${error.message}`), + msg: undefined, + ws + }); + } + }); + + ws.on('error', (error: Error) => { + onDataCallback({ + err: new Error(`WebSocket error: ${error.message}`), + msg: undefined, + ws + }); + }); + + ws.on('close', (code: number, reason: Buffer) => { + // Only report as error if it's not a normal closure (1000) or going away (1001) + if (code !== 1000 && code !== 1001 && code !== 1005) { // 1005 is no status received + onDataCallback({ + err: new Error(`WebSocket closed unexpectedly: ${code} ${reason.toString()}`), + msg: undefined, + ws + }); + } + }); + + } catch (error: any) { + onDataCallback({ + err: new Error(`Failed to set up WebSocket subscription: ${error.message}`), + msg: undefined, + ws + }); + } +} + +/** + * WebSocket client-side function to publish messages to `/union/payload` + * + * @param message to publish + * @param ws the WebSocket connection (assumed to be already connected) + */ +function publishToSendUnionPayload({ + message, + ws +}: { + message: UnionMessageModule.UnionMessage, + ws: WebSocket.WebSocket +}): Promise { + return new Promise((resolve, reject) => { + // Check if WebSocket is open + if (ws.readyState !== WebSocket.WebSocket.OPEN) { + reject(new Error('WebSocket is not open')); + return; + } + + // Send message directly + ws.send(UnionMessageModule.marshal(message), (err) => { + if (err) { + reject(new Error(`Failed to send message: ${err.message}`)); + } + resolve(); + }); + }); +} + +/** + * WebSocket server-side function to handle messages for `/union/payload` + * + * @param wss the WebSocket server instance + * @param onConnection callback when a client connects to this channel + * @param onMessage callback when a message is received on this channel + */ +function registerSendUnionPayload({ + wss, + onConnection, + onMessage +}: { + wss: WebSocket.WebSocketServer, + onConnection: (params: {ws: WebSocket.WebSocket, request: IncomingMessage}) => void, + onMessage: (params: {message: UnionMessageModule.UnionMessage, ws: WebSocket.WebSocket}) => void +}): void { + const channelPattern = /^\/union\/payload(?:\?.*)?$/; + + wss.on('connection', (ws: WebSocket.WebSocket, request: IncomingMessage) => { + try { + const url = request.url || ''; + const match = url.match(channelPattern); + if (match) { + try { + + onConnection({ + ws, + request + }); + } catch (connectionError) { + console.error('Error in onConnection callback:', connectionError); + ws.close(1011, 'Connection error'); + return; + } + + ws.on('message', (data: WebSocket.RawData) => { + try { + const receivedData = data.toString(); + const parsedMessage = UnionMessageModule.unmarshal(receivedData); + onMessage({ + message: parsedMessage, + ws + }); + } catch (error: any) { + // Ignore parsing errors + } + }); + } + } catch (error: any) { + ws.close(1011, 'Server error'); + } + }); +} + +/** + * WebSocket client-side function to subscribe to messages from `/union/payload` + * + * @param onDataCallback callback when messages are received + * @param ws the WebSocket connection (assumed to be already connected) + * @param skipMessageValidation turn off runtime validation of incoming messages + */ +function subscribeToReceiveUnionPayload({ + onDataCallback, + ws, + skipMessageValidation = false +}: { + onDataCallback: (params: {err?: Error, msg?: UnionMessageModule.UnionMessage, ws?: WebSocket.WebSocket}) => void, + ws: WebSocket.WebSocket, + skipMessageValidation?: boolean +}): void { + try { + // Check if WebSocket is open + if (ws.readyState !== WebSocket.WebSocket.OPEN) { + onDataCallback({ + err: new Error('WebSocket is not open'), + msg: undefined, + ws + }); + return; + } + + const validator = UnionMessageModule.createValidator(); + + ws.on('message', (data: WebSocket.RawData) => { + try { + const receivedData = data.toString(); + const parsedMessage = UnionMessageModule.unmarshal(receivedData); + + // Validate message if validation is enabled + if (!skipMessageValidation) { + const messageToValidate = UnionMessageModule.marshal(parsedMessage); + const {valid, errors} = UnionMessageModule.validate({data: messageToValidate, ajvValidatorFunction: validator}); + if (!valid) { + onDataCallback({ + err: new Error(`Invalid message payload received; ${JSON.stringify({cause: errors})}`), + msg: undefined, + ws + }); + return; + } + } + + onDataCallback({ + err: undefined, + msg: parsedMessage, + ws + }); + + } catch (error: any) { + onDataCallback({ + err: new Error(`Failed to parse message: ${error.message}`), + msg: undefined, + ws + }); + } + }); + + ws.on('error', (error: Error) => { + onDataCallback({ + err: new Error(`WebSocket error: ${error.message}`), + msg: undefined, + ws + }); + }); + + ws.on('close', (code: number, reason: Buffer) => { + // Only report as error if it's not a normal closure (1000) or going away (1001) + if (code !== 1000 && code !== 1001 && code !== 1005) { // 1005 is no status received + onDataCallback({ + err: new Error(`WebSocket closed unexpectedly: ${code} ${reason.toString()}`), + msg: undefined, + ws + }); + } + }); + + } catch (error: any) { + onDataCallback({ + err: new Error(`Failed to set up WebSocket subscription: ${error.message}`), + msg: undefined, + ws + }); + } +} + +export { publishToSendUserSignedup, registerSendUserSignedup, subscribeToReceiveUserSignedup, publishToNoParameter, subscribeToNoParameter, registerNoParameter, publishToSendStringPayload, registerSendStringPayload, subscribeToReceiveStringPayload, publishToSendArrayPayload, registerSendArrayPayload, subscribeToReceiveArrayPayload, publishToSendUnionPayload, registerSendUnionPayload, subscribeToReceiveUnionPayload }; diff --git a/test/runtime/typescript/src/client/NatsClient.ts b/test/runtime/typescript/src/client/NatsClient.ts index 11626807..30452a7f 100644 --- a/test/runtime/typescript/src/client/NatsClient.ts +++ b/test/runtime/typescript/src/client/NatsClient.ts @@ -1,5 +1,13 @@ import {UserSignedUp} from './../payloads/UserSignedUp'; +import * as StringMessageModule from './../payloads/StringMessage'; +import * as ArrayMessageModule from './../payloads/ArrayMessage'; +import * as UnionMessageModule from './../payloads/UnionMessage'; +import {AnonymousSchema_9} from './../payloads/AnonymousSchema_9'; export {UserSignedUp}; +export {StringMessageModule}; +export {ArrayMessageModule}; +export {UnionMessageModule}; +export {AnonymousSchema_9}; import {UserSignedupParameters} from './../parameters/UserSignedupParameters'; export {UserSignedupParameters}; @@ -427,4 +435,466 @@ export class NatsClient { } } + + /** + * + * + * @param message to publish + * @param options to use while publishing the message + */ + public async publishToSendStringPayload({ + message, + options + }: { + message: StringMessageModule.StringMessage, + options?: Nats.PublishOptions + }): Promise { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined) { + await nats.publishToSendStringPayload({ + message, + nc: this.nc, + codec: this.codec, + options + }); + } else { + Promise.reject('Nats client not available yet, please connect or set the client'); + } + } + + /** + * + * + * @param message to publish + * @param options to use while publishing the message + */ + public async jetStreamPublishToSendStringPayload({ + message, + options = {} + }: { + message: StringMessageModule.StringMessage, + options?: Partial + }): Promise { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) { + return nats.jetStreamPublishToSendStringPayload({ + message, + js: this.js, + codec: this.codec, + options + }); + } else { + Promise.reject('Nats client not available yet, please connect or set the client'); + } + } + + + /** + * + * + * @param {subscribeToReceiveStringPayloadCallback} onDataCallback to call when messages are received + * @param options when setting up the subscription + * @param options when setting up the subscription + */ + public subscribeToReceiveStringPayload({ + onDataCallback, + options, + flush + }: { + onDataCallback: (err?: Error, msg?: StringMessageModule.StringMessage, natsMsg?: Nats.Msg) => void, + options?: Nats.SubscriptionOptions, + flush?: boolean + }): Promise { + return new Promise(async (resolve, reject) => { + if(!this.isClosed() && this.nc !== undefined && this.codec !== undefined){ + try { + const sub = await nats.subscribeToReceiveStringPayload({ + onDataCallback, + nc: this.nc, + codec: this.codec, + options + }); + if(flush){ + await this.nc.flush(); + } + resolve(sub); + }catch(e: any){ + reject(e); + } + } else { + Promise.reject('Nats client not available yet, please connect or set the client'); + } + }); +} + + /** + * + * + * @param {jetStreamPullSubscribeToReceiveStringPayloadCallback} onDataCallback to call when messages are received + * @param options when setting up the subscription + */ + public jetStreamPullSubscribeToReceiveStringPayload({ + onDataCallback, + options = {} + }: { + onDataCallback: (err?: Error, msg?: StringMessageModule.StringMessage, jetstreamMsg?: Nats.JsMsg) => void, + options: Nats.ConsumerOptsBuilder | Partial + }): Promise { + return new Promise(async (resolve, reject) => { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) { + try { + const sub = await nats.jetStreamPullSubscribeToReceiveStringPayload({ + onDataCallback, + js: this.js, + codec: this.codec, + options + }); + resolve(sub); + } catch (e: any) { + reject(e); + } + } else { + Promise.reject('Nats client not available yet, please connect or set the client'); + } + }); + } + + + /** + * + * + * @param {jetStreamPushSubscriptionFromReceiveStringPayloadCallback} onDataCallback to call when messages are received + * @param options when setting up the subscription + */ + public jetStreamPushSubscriptionFromReceiveStringPayload({ + onDataCallback, + options = {} + }: { + onDataCallback: (err?: Error, msg?: StringMessageModule.StringMessage, jetstreamMsg?: Nats.JsMsg) => void, + options: Nats.ConsumerOptsBuilder | Partial + }): Promise { + return new Promise(async (resolve, reject) => { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) { + try { + const sub = await nats.jetStreamPushSubscriptionFromReceiveStringPayload({ + onDataCallback, + js: this.js, + codec: this.codec, + options + }); + resolve(sub); + } catch (e: any) { + reject(e); + } + } else { + Promise.reject('Nats client not available yet, please connect or set the client'); + } + }); + } + + /** + * + * + * @param message to publish + * @param options to use while publishing the message + */ + public async publishToSendArrayPayload({ + message, + options + }: { + message: ArrayMessageModule.ArrayMessage, + options?: Nats.PublishOptions + }): Promise { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined) { + await nats.publishToSendArrayPayload({ + message, + nc: this.nc, + codec: this.codec, + options + }); + } else { + Promise.reject('Nats client not available yet, please connect or set the client'); + } + } + + /** + * + * + * @param message to publish + * @param options to use while publishing the message + */ + public async jetStreamPublishToSendArrayPayload({ + message, + options = {} + }: { + message: ArrayMessageModule.ArrayMessage, + options?: Partial + }): Promise { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) { + return nats.jetStreamPublishToSendArrayPayload({ + message, + js: this.js, + codec: this.codec, + options + }); + } else { + Promise.reject('Nats client not available yet, please connect or set the client'); + } + } + + + /** + * + * + * @param {subscribeToReceiveArrayPayloadCallback} onDataCallback to call when messages are received + * @param options when setting up the subscription + * @param options when setting up the subscription + */ + public subscribeToReceiveArrayPayload({ + onDataCallback, + options, + flush + }: { + onDataCallback: (err?: Error, msg?: ArrayMessageModule.ArrayMessage, natsMsg?: Nats.Msg) => void, + options?: Nats.SubscriptionOptions, + flush?: boolean + }): Promise { + return new Promise(async (resolve, reject) => { + if(!this.isClosed() && this.nc !== undefined && this.codec !== undefined){ + try { + const sub = await nats.subscribeToReceiveArrayPayload({ + onDataCallback, + nc: this.nc, + codec: this.codec, + options + }); + if(flush){ + await this.nc.flush(); + } + resolve(sub); + }catch(e: any){ + reject(e); + } + } else { + Promise.reject('Nats client not available yet, please connect or set the client'); + } + }); +} + + /** + * + * + * @param {jetStreamPullSubscribeToReceiveArrayPayloadCallback} onDataCallback to call when messages are received + * @param options when setting up the subscription + */ + public jetStreamPullSubscribeToReceiveArrayPayload({ + onDataCallback, + options = {} + }: { + onDataCallback: (err?: Error, msg?: ArrayMessageModule.ArrayMessage, jetstreamMsg?: Nats.JsMsg) => void, + options: Nats.ConsumerOptsBuilder | Partial + }): Promise { + return new Promise(async (resolve, reject) => { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) { + try { + const sub = await nats.jetStreamPullSubscribeToReceiveArrayPayload({ + onDataCallback, + js: this.js, + codec: this.codec, + options + }); + resolve(sub); + } catch (e: any) { + reject(e); + } + } else { + Promise.reject('Nats client not available yet, please connect or set the client'); + } + }); + } + + + /** + * + * + * @param {jetStreamPushSubscriptionFromReceiveArrayPayloadCallback} onDataCallback to call when messages are received + * @param options when setting up the subscription + */ + public jetStreamPushSubscriptionFromReceiveArrayPayload({ + onDataCallback, + options = {} + }: { + onDataCallback: (err?: Error, msg?: ArrayMessageModule.ArrayMessage, jetstreamMsg?: Nats.JsMsg) => void, + options: Nats.ConsumerOptsBuilder | Partial + }): Promise { + return new Promise(async (resolve, reject) => { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) { + try { + const sub = await nats.jetStreamPushSubscriptionFromReceiveArrayPayload({ + onDataCallback, + js: this.js, + codec: this.codec, + options + }); + resolve(sub); + } catch (e: any) { + reject(e); + } + } else { + Promise.reject('Nats client not available yet, please connect or set the client'); + } + }); + } + + /** + * + * + * @param message to publish + * @param options to use while publishing the message + */ + public async publishToSendUnionPayload({ + message, + options + }: { + message: UnionMessageModule.UnionMessage, + options?: Nats.PublishOptions + }): Promise { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined) { + await nats.publishToSendUnionPayload({ + message, + nc: this.nc, + codec: this.codec, + options + }); + } else { + Promise.reject('Nats client not available yet, please connect or set the client'); + } + } + + /** + * + * + * @param message to publish + * @param options to use while publishing the message + */ + public async jetStreamPublishToSendUnionPayload({ + message, + options = {} + }: { + message: UnionMessageModule.UnionMessage, + options?: Partial + }): Promise { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) { + return nats.jetStreamPublishToSendUnionPayload({ + message, + js: this.js, + codec: this.codec, + options + }); + } else { + Promise.reject('Nats client not available yet, please connect or set the client'); + } + } + + + /** + * + * + * @param {subscribeToReceiveUnionPayloadCallback} onDataCallback to call when messages are received + * @param options when setting up the subscription + * @param options when setting up the subscription + */ + public subscribeToReceiveUnionPayload({ + onDataCallback, + options, + flush + }: { + onDataCallback: (err?: Error, msg?: UnionMessageModule.UnionMessage, natsMsg?: Nats.Msg) => void, + options?: Nats.SubscriptionOptions, + flush?: boolean + }): Promise { + return new Promise(async (resolve, reject) => { + if(!this.isClosed() && this.nc !== undefined && this.codec !== undefined){ + try { + const sub = await nats.subscribeToReceiveUnionPayload({ + onDataCallback, + nc: this.nc, + codec: this.codec, + options + }); + if(flush){ + await this.nc.flush(); + } + resolve(sub); + }catch(e: any){ + reject(e); + } + } else { + Promise.reject('Nats client not available yet, please connect or set the client'); + } + }); +} + + /** + * + * + * @param {jetStreamPullSubscribeToReceiveUnionPayloadCallback} onDataCallback to call when messages are received + * @param options when setting up the subscription + */ + public jetStreamPullSubscribeToReceiveUnionPayload({ + onDataCallback, + options = {} + }: { + onDataCallback: (err?: Error, msg?: UnionMessageModule.UnionMessage, jetstreamMsg?: Nats.JsMsg) => void, + options: Nats.ConsumerOptsBuilder | Partial + }): Promise { + return new Promise(async (resolve, reject) => { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) { + try { + const sub = await nats.jetStreamPullSubscribeToReceiveUnionPayload({ + onDataCallback, + js: this.js, + codec: this.codec, + options + }); + resolve(sub); + } catch (e: any) { + reject(e); + } + } else { + Promise.reject('Nats client not available yet, please connect or set the client'); + } + }); + } + + + /** + * + * + * @param {jetStreamPushSubscriptionFromReceiveUnionPayloadCallback} onDataCallback to call when messages are received + * @param options when setting up the subscription + */ + public jetStreamPushSubscriptionFromReceiveUnionPayload({ + onDataCallback, + options = {} + }: { + onDataCallback: (err?: Error, msg?: UnionMessageModule.UnionMessage, jetstreamMsg?: Nats.JsMsg) => void, + options: Nats.ConsumerOptsBuilder | Partial + }): Promise { + return new Promise(async (resolve, reject) => { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) { + try { + const sub = await nats.jetStreamPushSubscriptionFromReceiveUnionPayload({ + onDataCallback, + js: this.js, + codec: this.codec, + options + }); + resolve(sub); + } catch (e: any) { + reject(e); + } + } else { + Promise.reject('Nats client not available yet, please connect or set the client'); + } + }); + } } \ No newline at end of file diff --git a/test/runtime/typescript/src/openapi/payloads/FindPetsByStatusAndCategoryResponse_200.ts b/test/runtime/typescript/src/openapi/payloads/FindPetsByStatusAndCategoryResponse_200.ts index 1c6e70d4..286bb5a6 100644 --- a/test/runtime/typescript/src/openapi/payloads/FindPetsByStatusAndCategoryResponse_200.ts +++ b/test/runtime/typescript/src/openapi/payloads/FindPetsByStatusAndCategoryResponse_200.ts @@ -1,3 +1,43 @@ import {APet} from './APet'; +import {Ajv, Options as AjvOptions, ErrorObject, ValidateFunction} from 'ajv'; +import addFormats from 'ajv-formats'; type FindPetsByStatusAndCategoryResponse_200 = APet[]; + +export function unmarshal(json: string | any[]): FindPetsByStatusAndCategoryResponse_200 { + const arr = typeof json === 'string' ? JSON.parse(json) : json; + return arr.map((item: any) => { + if (item && typeof item === 'object') { + // Try to use unmarshal if available on the type + return item; + } + return item; + }) as FindPetsByStatusAndCategoryResponse_200; +} +export function marshal(payload: FindPetsByStatusAndCategoryResponse_200): string { + return JSON.stringify(payload.map((item) => { + if (item && typeof item === 'object' && 'marshal' in item && typeof item.marshal === 'function') { + return JSON.parse(item.marshal()); + } + return item; + })); +} +export const theCodeGenSchema = {"type":"array","items":{"title":"a Pet","description":"A pet for sale in the pet store","type":"object","required":["name","photoUrls"],"properties":{"id":{"type":"integer","format":"int64"},"category":{"title":"Pet category","description":"A category for a pet","type":"object","properties":{"id":{"type":"integer","format":"int64"},"name":{"type":"string","pattern":"^[a-zA-Z0-9]+[a-zA-Z0-9\\.\\-_]*[a-zA-Z0-9]+$"}},"xml":{"name":"Category"}},"name":{"type":"string","example":"doggie"},"photoUrls":{"type":"array","xml":{"name":"photoUrl","wrapped":true},"items":{"type":"string"}},"tags":{"type":"array","xml":{"name":"tag","wrapped":true},"items":{"title":"Pet Tag","description":"A tag for a pet","type":"object","properties":{"id":{"type":"integer","format":"int64"},"name":{"type":"string"}},"xml":{"name":"Tag"}}},"status":{"type":"string","description":"pet status in the store","deprecated":true,"enum":["available","pending","sold"]}},"xml":{"name":"Pet"}},"$id":"findPetsByStatusAndCategory_Response_200","$schema":"http://json-schema.org/draft-07/schema"}; +export function validate(context?: {data: any, ajvValidatorFunction?: ValidateFunction, ajvInstance?: Ajv, ajvOptions?: AjvOptions}): { valid: boolean; errors?: ErrorObject[]; } { + const {data, ajvValidatorFunction} = context ?? {}; + const parsedData = typeof data === 'string' ? JSON.parse(data) : data; + const validate = ajvValidatorFunction ?? createValidator(context) + return { + valid: validate(parsedData), + errors: validate.errors ?? undefined, + }; +} +export function createValidator(context?: {ajvInstance?: Ajv, ajvOptions?: AjvOptions}): ValidateFunction { + const {ajvInstance} = {...context ?? {}, ajvInstance: new Ajv(context?.ajvOptions ?? {})}; + addFormats(ajvInstance); + ajvInstance.addVocabulary(["xml", "example"]) + const validate = ajvInstance.compile(theCodeGenSchema); + return validate; +} + + export { FindPetsByStatusAndCategoryResponse_200 }; \ No newline at end of file diff --git a/test/runtime/typescript/src/payloads/AnonymousSchema_9.ts b/test/runtime/typescript/src/payloads/AnonymousSchema_9.ts new file mode 100644 index 00000000..911067ff --- /dev/null +++ b/test/runtime/typescript/src/payloads/AnonymousSchema_9.ts @@ -0,0 +1,71 @@ +import {Ajv, Options as AjvOptions, ErrorObject, ValidateFunction} from 'ajv'; +import addFormats from 'ajv-formats'; +class AnonymousSchema_9 { + private _name?: string; + private _additionalProperties?: Record; + + constructor(input: { + name?: string, + additionalProperties?: Record, + }) { + this._name = input.name; + this._additionalProperties = input.additionalProperties; + } + + get name(): string | undefined { return this._name; } + set name(name: string | undefined) { this._name = name; } + + get additionalProperties(): Record | undefined { return this._additionalProperties; } + set additionalProperties(additionalProperties: Record | undefined) { this._additionalProperties = additionalProperties; } + + public marshal() : string { + let json = '{' + if(this.name !== undefined) { + json += `"name": ${typeof this.name === 'number' || typeof this.name === 'boolean' ? this.name : JSON.stringify(this.name)},`; + } + if(this.additionalProperties !== undefined) { + for (const [key, value] of this.additionalProperties.entries()) { + //Only unwrap those that are not already a property in the JSON object + if(["name","additionalProperties"].includes(String(key))) continue; + json += `"${key}": ${typeof value === 'number' || typeof value === 'boolean' ? value : JSON.stringify(value)},`; + } + } + //Remove potential last comma + return `${json.charAt(json.length-1) === ',' ? json.slice(0, json.length-1) : json}}`; + } + + public static unmarshal(json: string | object): AnonymousSchema_9 { + const obj = typeof json === "object" ? json : JSON.parse(json); + const instance = new AnonymousSchema_9({} as any); + + if (obj["name"] !== undefined) { + instance.name = obj["name"]; + } + + instance.additionalProperties = new Map(); + const propsToCheck = Object.entries(obj).filter((([key,]) => {return !["name","additionalProperties"].includes(key);})); + for (const [key, value] of propsToCheck) { + instance.additionalProperties.set(key, value as any); + } + return instance; + } + public static theCodeGenSchema = {"type":"object","properties":{"name":{"type":"string"}}}; + public static validate(context?: {data: any, ajvValidatorFunction?: ValidateFunction, ajvInstance?: Ajv, ajvOptions?: AjvOptions}): { valid: boolean; errors?: ErrorObject[]; } { + const {data, ajvValidatorFunction} = context ?? {}; + const parsedData = typeof data === 'string' ? JSON.parse(data) : data; + const validate = ajvValidatorFunction ?? this.createValidator(context) + return { + valid: validate(parsedData), + errors: validate.errors ?? undefined, + }; + } + public static createValidator(context?: {ajvInstance?: Ajv, ajvOptions?: AjvOptions}): ValidateFunction { + const {ajvInstance} = {...context ?? {}, ajvInstance: new Ajv(context?.ajvOptions ?? {})}; + addFormats(ajvInstance); + + const validate = ajvInstance.compile(this.theCodeGenSchema); + return validate; + } + +} +export { AnonymousSchema_9 }; \ No newline at end of file diff --git a/test/runtime/typescript/src/payloads/ArrayMessage.ts b/test/runtime/typescript/src/payloads/ArrayMessage.ts new file mode 100644 index 00000000..32762a49 --- /dev/null +++ b/test/runtime/typescript/src/payloads/ArrayMessage.ts @@ -0,0 +1,33 @@ +import {Ajv, Options as AjvOptions, ErrorObject, ValidateFunction} from 'ajv'; +import addFormats from 'ajv-formats'; +type ArrayMessage = string[]; + +export function unmarshal(json: string | any[]): ArrayMessage { + if (typeof json === 'string') { + return JSON.parse(json) as ArrayMessage; + } + return json as ArrayMessage; +} +export function marshal(payload: ArrayMessage): string { + return JSON.stringify(payload); +} +export const theCodeGenSchema = {"type":"array","$schema":"http://json-schema.org/draft-07/schema","items":{"type":"string"},"description":"An array of strings payload","$id":"ArrayMessage"}; +export function validate(context?: {data: any, ajvValidatorFunction?: ValidateFunction, ajvInstance?: Ajv, ajvOptions?: AjvOptions}): { valid: boolean; errors?: ErrorObject[]; } { + const {data, ajvValidatorFunction} = context ?? {}; + const parsedData = typeof data === 'string' ? JSON.parse(data) : data; + const validate = ajvValidatorFunction ?? createValidator(context) + return { + valid: validate(parsedData), + errors: validate.errors ?? undefined, + }; +} +export function createValidator(context?: {ajvInstance?: Ajv, ajvOptions?: AjvOptions}): ValidateFunction { + const {ajvInstance} = {...context ?? {}, ajvInstance: new Ajv(context?.ajvOptions ?? {})}; + addFormats(ajvInstance); + + const validate = ajvInstance.compile(theCodeGenSchema); + return validate; +} + + +export { ArrayMessage }; \ No newline at end of file diff --git a/test/runtime/typescript/src/payloads/StringMessage.ts b/test/runtime/typescript/src/payloads/StringMessage.ts new file mode 100644 index 00000000..e651fe4c --- /dev/null +++ b/test/runtime/typescript/src/payloads/StringMessage.ts @@ -0,0 +1,30 @@ +import {Ajv, Options as AjvOptions, ErrorObject, ValidateFunction} from 'ajv'; +import addFormats from 'ajv-formats'; +type StringMessage = string; + +export function unmarshal(json: string): StringMessage { + return JSON.parse(json) as StringMessage; +} +export function marshal(payload: StringMessage): string { + return JSON.stringify(payload); +} +export const theCodeGenSchema = {"type":"string","$schema":"http://json-schema.org/draft-07/schema","description":"A simple string payload","$id":"StringMessage"}; +export function validate(context?: {data: any, ajvValidatorFunction?: ValidateFunction, ajvInstance?: Ajv, ajvOptions?: AjvOptions}): { valid: boolean; errors?: ErrorObject[]; } { + const {data, ajvValidatorFunction} = context ?? {}; + const parsedData = typeof data === 'string' ? JSON.parse(data) : data; + const validate = ajvValidatorFunction ?? createValidator(context) + return { + valid: validate(parsedData), + errors: validate.errors ?? undefined, + }; +} +export function createValidator(context?: {ajvInstance?: Ajv, ajvOptions?: AjvOptions}): ValidateFunction { + const {ajvInstance} = {...context ?? {}, ajvInstance: new Ajv(context?.ajvOptions ?? {})}; + addFormats(ajvInstance); + + const validate = ajvInstance.compile(theCodeGenSchema); + return validate; +} + + +export { StringMessage }; \ No newline at end of file diff --git a/test/runtime/typescript/src/payloads/UnionMessage.ts b/test/runtime/typescript/src/payloads/UnionMessage.ts new file mode 100644 index 00000000..71b4060d --- /dev/null +++ b/test/runtime/typescript/src/payloads/UnionMessage.ts @@ -0,0 +1,38 @@ +import {AnonymousSchema_9} from './AnonymousSchema_9'; +import {Ajv, Options as AjvOptions, ErrorObject, ValidateFunction} from 'ajv'; +import addFormats from 'ajv-formats'; +type UnionMessage = string | number | AnonymousSchema_9; + +export function unmarshal(json: any): UnionMessage { + + return JSON.parse(json); +} +export function marshal(payload: UnionMessage) { + + +if(payload instanceof AnonymousSchema_9) { +return payload.marshal(); +} + return JSON.stringify(payload); +} + +export const theCodeGenSchema = {"type":"object","$schema":"http://json-schema.org/draft-07/schema","oneOf":[{"type":"string"},{"type":"number"},{"type":"object","properties":{"name":{"type":"string"}}}],"description":"A union type payload","$id":"UnionMessage"}; +export function validate(context?: {data: any, ajvValidatorFunction?: ValidateFunction, ajvInstance?: Ajv, ajvOptions?: AjvOptions}): { valid: boolean; errors?: ErrorObject[]; } { + const {data, ajvValidatorFunction} = context ?? {}; + const parsedData = typeof data === 'string' ? JSON.parse(data) : data; + const validate = ajvValidatorFunction ?? createValidator(context) + return { + valid: validate(parsedData), + errors: validate.errors ?? undefined, + }; +} +export function createValidator(context?: {ajvInstance?: Ajv, ajvOptions?: AjvOptions}): ValidateFunction { + const {ajvInstance} = {...context ?? {}, ajvInstance: new Ajv(context?.ajvOptions ?? {})}; + addFormats(ajvInstance); + + const validate = ajvInstance.compile(theCodeGenSchema); + return validate; +} + + +export { UnionMessage }; \ No newline at end of file diff --git a/test/runtime/typescript/test/payloads.spec.ts b/test/runtime/typescript/test/payloads.spec.ts index 55f349f6..dcd3f022 100644 --- a/test/runtime/typescript/test/payloads.spec.ts +++ b/test/runtime/typescript/test/payloads.spec.ts @@ -1,4 +1,7 @@ import { UserSignedUp } from '../src/payloads/UserSignedUp'; +import * as StringMessage from '../src/payloads/StringMessage'; +import * as ArrayMessage from '../src/payloads/ArrayMessage'; +import * as UnionMessage from '../src/payloads/UnionMessage'; describe('payloads', () => { describe('should be able to serialize and deserialize the model', () => { @@ -76,4 +79,117 @@ describe('payloads', () => { }).valid).toBe(true); }); }); + + describe('StringMessage (primitive type)', () => { + test('should marshal a string payload', () => { + const testString: StringMessage.StringMessage = 'Hello World'; + const serialized = StringMessage.marshal(testString); + expect(serialized).toEqual('"Hello World"'); + }); + + test('should unmarshal a string payload', () => { + const serialized = '"Hello World"'; + const result = StringMessage.unmarshal(serialized); + expect(result).toEqual('Hello World'); + }); + + test('should validate correct string payload (as JSON string)', () => { + // When passing data as a string, it must be valid JSON (i.e., the marshalled form) + const result = StringMessage.validate({ + data: '"Hello World"' + }); + expect(result.valid).toBe(true); + }); + + test('should invalidate non-string payload', () => { + // Passing the raw value (not as a JSON string) - validator expects JSON when string + const result = StringMessage.validate({ + data: 123 // This is not a string, so will fail validation + }); + expect(result.valid).toBe(false); + }); + + test('should provide validation function', () => { + const validate = StringMessage.createValidator(); + expect(typeof validate).toBe('function'); + }); + }); + + describe('ArrayMessage (array type)', () => { + test('should marshal an array payload', () => { + const testArray: ArrayMessage.ArrayMessage = ['item1', 'item2', 'item3']; + const serialized = ArrayMessage.marshal(testArray); + expect(serialized).toEqual('["item1","item2","item3"]'); + }); + + test('should unmarshal an array payload from string', () => { + const serialized = '["item1","item2","item3"]'; + const result = ArrayMessage.unmarshal(serialized); + expect(result).toEqual(['item1', 'item2', 'item3']); + }); + + test('should unmarshal an array payload from array', () => { + const array = ['item1', 'item2', 'item3']; + const result = ArrayMessage.unmarshal(array); + expect(result).toEqual(['item1', 'item2', 'item3']); + }); + + test('should validate correct array payload', () => { + const result = ArrayMessage.validate({ + data: ['item1', 'item2'] + }); + expect(result.valid).toBe(true); + }); + + test('should invalidate array with wrong item types', () => { + const result = ArrayMessage.validate({ + data: [1, 2, 3] // Numbers instead of strings + }); + expect(result.valid).toBe(false); + }); + + test('should provide validation function', () => { + const validate = ArrayMessage.createValidator(); + expect(typeof validate).toBe('function'); + }); + }); + + describe('UnionMessage (union type)', () => { + test('should marshal a string union member', () => { + const testValue: UnionMessage.UnionMessage = 'Hello World'; + const serialized = UnionMessage.marshal(testValue); + expect(serialized).toEqual('"Hello World"'); + }); + + test('should marshal a number union member', () => { + const testValue: UnionMessage.UnionMessage = 42; + const serialized = UnionMessage.marshal(testValue); + expect(serialized).toEqual('42'); + }); + + test('should unmarshal a string union member', () => { + const serialized = '"Hello World"'; + const result = UnionMessage.unmarshal(serialized); + expect(result).toEqual('Hello World'); + }); + + test('should unmarshal a number union member', () => { + const serialized = '42'; + const result = UnionMessage.unmarshal(serialized); + expect(result).toEqual(42); + }); + + test('should validate object union member', () => { + // oneOf validation: objects are one of the valid union members + const result = UnionMessage.validate({ + data: { name: 'Test' } + }); + expect(result.valid).toBe(true); + }); + + test('should provide validation function', () => { + const validate = UnionMessage.createValidator(); + expect(typeof validate).toBe('function'); + }); + }); }); From 303b17c89e1daec30ff1cc5f23ae6be8e2f8b765 Mon Sep 17 00:00:00 2001 From: jonaslagoni Date: Thu, 8 Jan 2026 21:23:08 +0100 Subject: [PATCH 2/4] wip --- assets/videos/remotion-test | 1 - 1 file changed, 1 deletion(-) delete mode 160000 assets/videos/remotion-test diff --git a/assets/videos/remotion-test b/assets/videos/remotion-test deleted file mode 160000 index 58d77ef5..00000000 --- a/assets/videos/remotion-test +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 58d77ef55f0ecc86b70ccd2b149abd1e779f21c2 From 7a1ba5b0f44437177b8d1f1a9f98e21b0a01de84 Mon Sep 17 00:00:00 2001 From: jonaslagoni Date: Thu, 8 Jan 2026 21:26:58 +0100 Subject: [PATCH 3/4] wip --- .../generators/typescript/channels/protocols/kafka/publish.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/codegen/generators/typescript/channels/protocols/kafka/publish.ts b/src/codegen/generators/typescript/channels/protocols/kafka/publish.ts index bea7bcc6..3e3bf4df 100644 --- a/src/codegen/generators/typescript/channels/protocols/kafka/publish.ts +++ b/src/codegen/generators/typescript/channels/protocols/kafka/publish.ts @@ -96,7 +96,7 @@ function ${functionName}({ topic: ${addressToUse}, messages: [ { - value: dataToSend${channelHeaders ? ',\n ' + headersInMessage : ''} + value: dataToSend${channelHeaders ? `,\n ${ headersInMessage}` : ''} }, ], }); From 21179aa01005720e82161b53e599374cc5da6587 Mon Sep 17 00:00:00 2001 From: jonaslagoni Date: Thu, 8 Jan 2026 21:42:40 +0100 Subject: [PATCH 4/4] wip --- .../channels/protocols/kafka/publish.ts | 2 +- src/codegen/modelina/presets/primitives.ts | 4 +- ...FindPetsByStatusAndCategoryResponse_200.ts | 3 +- test/runtime/typescript/test/payloads.spec.ts | 327 ++++++++++++++++++ 4 files changed, 331 insertions(+), 5 deletions(-) diff --git a/src/codegen/generators/typescript/channels/protocols/kafka/publish.ts b/src/codegen/generators/typescript/channels/protocols/kafka/publish.ts index 3e3bf4df..6ed24022 100644 --- a/src/codegen/generators/typescript/channels/protocols/kafka/publish.ts +++ b/src/codegen/generators/typescript/channels/protocols/kafka/publish.ts @@ -96,7 +96,7 @@ function ${functionName}({ topic: ${addressToUse}, messages: [ { - value: dataToSend${channelHeaders ? `,\n ${ headersInMessage}` : ''} + value: dataToSend${channelHeaders ? `,\n ${headersInMessage}` : ''} }, ], }); diff --git a/src/codegen/modelina/presets/primitives.ts b/src/codegen/modelina/presets/primitives.ts index d0ff05b2..d150db5d 100644 --- a/src/codegen/modelina/presets/primitives.ts +++ b/src/codegen/modelina/presets/primitives.ts @@ -93,12 +93,12 @@ function renderArrayUnmarshal(model: ConstrainedArrayModel): string { valueModel.type !== 'boolean'; if (hasItemUnmarshal) { + const itemTypeName = valueModel.name; return `export function unmarshal(json: string | any[]): ${model.name} { const arr = typeof json === 'string' ? JSON.parse(json) : json; return arr.map((item: any) => { if (item && typeof item === 'object') { - // Try to use unmarshal if available on the type - return item; + return ${itemTypeName}.unmarshal(item); } return item; }) as ${model.name}; diff --git a/test/runtime/typescript/src/openapi/payloads/FindPetsByStatusAndCategoryResponse_200.ts b/test/runtime/typescript/src/openapi/payloads/FindPetsByStatusAndCategoryResponse_200.ts index 286bb5a6..119f0774 100644 --- a/test/runtime/typescript/src/openapi/payloads/FindPetsByStatusAndCategoryResponse_200.ts +++ b/test/runtime/typescript/src/openapi/payloads/FindPetsByStatusAndCategoryResponse_200.ts @@ -7,8 +7,7 @@ export function unmarshal(json: string | any[]): FindPetsByStatusAndCategoryResp const arr = typeof json === 'string' ? JSON.parse(json) : json; return arr.map((item: any) => { if (item && typeof item === 'object') { - // Try to use unmarshal if available on the type - return item; + return APet.unmarshal(item); } return item; }) as FindPetsByStatusAndCategoryResponse_200; diff --git a/test/runtime/typescript/test/payloads.spec.ts b/test/runtime/typescript/test/payloads.spec.ts index dcd3f022..38ad2f97 100644 --- a/test/runtime/typescript/test/payloads.spec.ts +++ b/test/runtime/typescript/test/payloads.spec.ts @@ -2,6 +2,8 @@ import { UserSignedUp } from '../src/payloads/UserSignedUp'; import * as StringMessage from '../src/payloads/StringMessage'; import * as ArrayMessage from '../src/payloads/ArrayMessage'; import * as UnionMessage from '../src/payloads/UnionMessage'; +import { APet } from '../src/openapi/payloads/APet'; +import * as FindPetsByStatusAndCategoryResponse_200 from '../src/openapi/payloads/FindPetsByStatusAndCategoryResponse_200'; describe('payloads', () => { describe('should be able to serialize and deserialize the model', () => { @@ -93,6 +95,35 @@ describe('payloads', () => { expect(result).toEqual('Hello World'); }); + test('should marshal and unmarshal roundtrip correctly', () => { + const original: StringMessage.StringMessage = 'Test roundtrip'; + const serialized = StringMessage.marshal(original); + const deserialized = StringMessage.unmarshal(serialized); + expect(deserialized).toEqual(original); + }); + + test('should handle empty string', () => { + const emptyString: StringMessage.StringMessage = ''; + const serialized = StringMessage.marshal(emptyString); + expect(serialized).toEqual('""'); + const deserialized = StringMessage.unmarshal(serialized); + expect(deserialized).toEqual(''); + }); + + test('should handle special characters', () => { + const specialChars: StringMessage.StringMessage = 'Hello\nWorld\t"Quoted"'; + const serialized = StringMessage.marshal(specialChars); + const deserialized = StringMessage.unmarshal(serialized); + expect(deserialized).toEqual(specialChars); + }); + + test('should handle unicode characters', () => { + const unicode: StringMessage.StringMessage = '你好世界 🌍 émojis'; + const serialized = StringMessage.marshal(unicode); + const deserialized = StringMessage.unmarshal(serialized); + expect(deserialized).toEqual(unicode); + }); + test('should validate correct string payload (as JSON string)', () => { // When passing data as a string, it must be valid JSON (i.e., the marshalled form) const result = StringMessage.validate({ @@ -101,6 +132,13 @@ describe('payloads', () => { expect(result.valid).toBe(true); }); + test('should validate empty string', () => { + const result = StringMessage.validate({ + data: '""' + }); + expect(result.valid).toBe(true); + }); + test('should invalidate non-string payload', () => { // Passing the raw value (not as a JSON string) - validator expects JSON when string const result = StringMessage.validate({ @@ -109,10 +147,39 @@ describe('payloads', () => { expect(result.valid).toBe(false); }); + test('should invalidate object payload', () => { + const result = StringMessage.validate({ + data: { message: 'not a string' } + }); + expect(result.valid).toBe(false); + }); + + test('should invalidate array payload', () => { + const result = StringMessage.validate({ + data: ['not', 'a', 'string'] + }); + expect(result.valid).toBe(false); + }); + test('should provide validation function', () => { const validate = StringMessage.createValidator(); expect(typeof validate).toBe('function'); }); + + test('should validate multiple times with reusable validator', () => { + const validate = StringMessage.createValidator(); + const result1 = StringMessage.validate({ + data: '"First string"', + ajvValidatorFunction: validate + }); + expect(result1.valid).toBe(true); + + const result2 = StringMessage.validate({ + data: '"Second string"', + ajvValidatorFunction: validate + }); + expect(result2.valid).toBe(true); + }); }); describe('ArrayMessage (array type)', () => { @@ -134,6 +201,36 @@ describe('payloads', () => { expect(result).toEqual(['item1', 'item2', 'item3']); }); + test('should marshal and unmarshal roundtrip correctly', () => { + const original: ArrayMessage.ArrayMessage = ['a', 'b', 'c']; + const serialized = ArrayMessage.marshal(original); + const deserialized = ArrayMessage.unmarshal(serialized); + expect(deserialized).toEqual(original); + }); + + test('should handle empty array', () => { + const emptyArray: ArrayMessage.ArrayMessage = []; + const serialized = ArrayMessage.marshal(emptyArray); + expect(serialized).toEqual('[]'); + const deserialized = ArrayMessage.unmarshal(serialized); + expect(deserialized).toEqual([]); + }); + + test('should handle single item array', () => { + const singleItem: ArrayMessage.ArrayMessage = ['only']; + const serialized = ArrayMessage.marshal(singleItem); + expect(serialized).toEqual('["only"]'); + const deserialized = ArrayMessage.unmarshal(serialized); + expect(deserialized).toEqual(['only']); + }); + + test('should handle array with special characters', () => { + const specialChars: ArrayMessage.ArrayMessage = ['hello\nworld', 'tab\there', '"quoted"']; + const serialized = ArrayMessage.marshal(specialChars); + const deserialized = ArrayMessage.unmarshal(serialized); + expect(deserialized).toEqual(specialChars); + }); + test('should validate correct array payload', () => { const result = ArrayMessage.validate({ data: ['item1', 'item2'] @@ -141,6 +238,13 @@ describe('payloads', () => { expect(result.valid).toBe(true); }); + test('should validate empty array', () => { + const result = ArrayMessage.validate({ + data: [] + }); + expect(result.valid).toBe(true); + }); + test('should invalidate array with wrong item types', () => { const result = ArrayMessage.validate({ data: [1, 2, 3] // Numbers instead of strings @@ -148,10 +252,40 @@ describe('payloads', () => { expect(result.valid).toBe(false); }); + test('should invalidate non-array payload (object)', () => { + const result = ArrayMessage.validate({ + data: { items: ['a', 'b'] } + }); + expect(result.valid).toBe(false); + }); + + test('should invalidate non-array payload (string)', () => { + // The validate function tries to JSON.parse string inputs, so we pass a valid JSON string + const result = ArrayMessage.validate({ + data: '"not an array"' // This is valid JSON for a string, but not an array + }); + expect(result.valid).toBe(false); + }); + test('should provide validation function', () => { const validate = ArrayMessage.createValidator(); expect(typeof validate).toBe('function'); }); + + test('should validate multiple times with reusable validator', () => { + const validate = ArrayMessage.createValidator(); + const result1 = ArrayMessage.validate({ + data: ['first', 'array'], + ajvValidatorFunction: validate + }); + expect(result1.valid).toBe(true); + + const result2 = ArrayMessage.validate({ + data: ['second', 'array'], + ajvValidatorFunction: validate + }); + expect(result2.valid).toBe(true); + }); }); describe('UnionMessage (union type)', () => { @@ -167,6 +301,13 @@ describe('payloads', () => { expect(serialized).toEqual('42'); }); + test('should marshal an object union member', () => { + // Plain objects work with marshal at runtime (it uses JSON.stringify for non-class instances) + const testValue = { name: 'Test Object' } as UnionMessage.UnionMessage; + const serialized = UnionMessage.marshal(testValue); + expect(JSON.parse(serialized)).toEqual({ name: 'Test Object' }); + }); + test('should unmarshal a string union member', () => { const serialized = '"Hello World"'; const result = UnionMessage.unmarshal(serialized); @@ -179,6 +320,34 @@ describe('payloads', () => { expect(result).toEqual(42); }); + test('should unmarshal an object union member', () => { + const serialized = '{"name":"Test Object"}'; + const result = UnionMessage.unmarshal(serialized); + expect(result).toEqual({ name: 'Test Object' }); + }); + + test('should marshal and unmarshal string roundtrip', () => { + const original: UnionMessage.UnionMessage = 'roundtrip test'; + const serialized = UnionMessage.marshal(original); + const deserialized = UnionMessage.unmarshal(serialized); + expect(deserialized).toEqual(original); + }); + + test('should marshal and unmarshal number roundtrip', () => { + const original: UnionMessage.UnionMessage = 3.14159; + const serialized = UnionMessage.marshal(original); + const deserialized = UnionMessage.unmarshal(serialized); + expect(deserialized).toEqual(original); + }); + + test('should marshal and unmarshal object roundtrip', () => { + // Plain objects work with marshal/unmarshal at runtime + const original = { name: 'roundtrip' } as UnionMessage.UnionMessage; + const serialized = UnionMessage.marshal(original); + const deserialized = UnionMessage.unmarshal(serialized); + expect(deserialized).toEqual(original); + }); + test('should validate object union member', () => { // oneOf validation: objects are one of the valid union members const result = UnionMessage.validate({ @@ -187,9 +356,167 @@ describe('payloads', () => { expect(result.valid).toBe(true); }); + test('should validate empty object union member', () => { + // Empty object should match the object schema with no required properties + const result = UnionMessage.validate({ + data: {} + }); + expect(result.valid).toBe(true); + }); + + test('should validate object with name property', () => { + const result = UnionMessage.validate({ + data: { name: 'John Doe' } + }); + expect(result.valid).toBe(true); + }); + + test('should invalidate boolean (not in union)', () => { + // Boolean is not part of the union (string | number | object) + const result = UnionMessage.validate({ + data: true + }); + expect(result.valid).toBe(false); + }); + + test('should invalidate array (not in union)', () => { + // Array is not part of the union + const result = UnionMessage.validate({ + data: [1, 2, 3] + }); + expect(result.valid).toBe(false); + }); + + test('should invalidate null (not in union)', () => { + const result = UnionMessage.validate({ + data: null + }); + expect(result.valid).toBe(false); + }); + test('should provide validation function', () => { const validate = UnionMessage.createValidator(); expect(typeof validate).toBe('function'); }); + + test('should validate multiple times with reusable validator for objects', () => { + const validate = UnionMessage.createValidator(); + + // Validate object 1 + const result1 = UnionMessage.validate({ + data: { name: 'test1' }, + ajvValidatorFunction: validate + }); + expect(result1.valid).toBe(true); + + // Validate object 2 + const result2 = UnionMessage.validate({ + data: { name: 'test2' }, + ajvValidatorFunction: validate + }); + expect(result2.valid).toBe(true); + + // Validate empty object + const result3 = UnionMessage.validate({ + data: {}, + ajvValidatorFunction: validate + }); + expect(result3.valid).toBe(true); + }); + }); + + describe('FindPetsByStatusAndCategoryResponse_200 (array of complex types)', () => { + const testPet1 = new APet({ + name: 'Fluffy', + photoUrls: ['http://example.com/fluffy.jpg'], + id: 1, + status: 'available' + }); + + const testPet2 = new APet({ + name: 'Buddy', + photoUrls: ['http://example.com/buddy.jpg', 'http://example.com/buddy2.jpg'], + id: 2, + status: 'pending' + }); + + test('should marshal an array of APet instances', () => { + const petsArray = [testPet1, testPet2]; + const serialized = FindPetsByStatusAndCategoryResponse_200.marshal(petsArray); + const parsed = JSON.parse(serialized); + + expect(parsed).toHaveLength(2); + expect(parsed[0].name).toBe('Fluffy'); + expect(parsed[1].name).toBe('Buddy'); + }); + + test('should unmarshal and return proper APet instances with marshal method', () => { + const jsonData = JSON.stringify([ + { name: 'Fluffy', photoUrls: ['http://example.com/fluffy.jpg'], id: 1, status: 'available' }, + { name: 'Buddy', photoUrls: ['http://example.com/buddy.jpg'], id: 2, status: 'pending' } + ]); + + const result = FindPetsByStatusAndCategoryResponse_200.unmarshal(jsonData); + + // Verify it's an array with correct length + expect(result).toHaveLength(2); + + // Verify items are proper APet instances with marshal method + expect(typeof result[0].marshal).toBe('function'); + expect(typeof result[1].marshal).toBe('function'); + + // Verify marshalling works on the unmarshalled instances + const pet1Marshalled = result[0].marshal(); + expect(pet1Marshalled).toContain('Fluffy'); + + const pet2Marshalled = result[1].marshal(); + expect(pet2Marshalled).toContain('Buddy'); + }); + + test('should unmarshal from array and return proper APet instances', () => { + const arrayData = [ + { name: 'Whiskers', photoUrls: ['http://example.com/whiskers.jpg'] }, + { name: 'Max', photoUrls: ['http://example.com/max.jpg'] } + ]; + + const result = FindPetsByStatusAndCategoryResponse_200.unmarshal(arrayData); + + // Verify items have getters and setters (instance properties) + expect(result[0].name).toBe('Whiskers'); + expect(result[1].name).toBe('Max'); + + // Verify items have marshal method + expect(typeof result[0].marshal).toBe('function'); + expect(typeof result[1].marshal).toBe('function'); + }); + + test('should marshal and unmarshal roundtrip correctly with proper instances', () => { + const original = [testPet1, testPet2]; + const serialized = FindPetsByStatusAndCategoryResponse_200.marshal(original); + const deserialized = FindPetsByStatusAndCategoryResponse_200.unmarshal(serialized); + + // Verify roundtrip preserves data + expect(deserialized[0].name).toBe(testPet1.name); + expect(deserialized[0].id).toBe(testPet1.id); + expect(deserialized[1].name).toBe(testPet2.name); + expect(deserialized[1].id).toBe(testPet2.id); + + // Verify deserialized items are proper instances with marshal method + expect(typeof deserialized[0].marshal).toBe('function'); + expect(typeof deserialized[1].marshal).toBe('function'); + + // Verify re-marshalling produces valid JSON + const remarshalled = FindPetsByStatusAndCategoryResponse_200.marshal(deserialized); + expect(JSON.parse(remarshalled)).toEqual(JSON.parse(serialized)); + }); + + test('should handle empty array', () => { + const emptyArray: FindPetsByStatusAndCategoryResponse_200.FindPetsByStatusAndCategoryResponse_200 = []; + const serialized = FindPetsByStatusAndCategoryResponse_200.marshal(emptyArray); + expect(serialized).toEqual('[]'); + + const deserialized = FindPetsByStatusAndCategoryResponse_200.unmarshal(serialized); + expect(deserialized).toEqual([]); + }); }); });