diff --git a/infrastructure/terraform/components/api/README.md b/infrastructure/terraform/components/api/README.md index 150af054..59cd0c85 100644 --- a/infrastructure/terraform/components/api/README.md +++ b/infrastructure/terraform/components/api/README.md @@ -12,6 +12,8 @@ No requirements. | [aws\_account\_id](#input\_aws\_account\_id) | The AWS Account ID (numeric) | `string` | n/a | yes | | [ca\_pem\_filename](#input\_ca\_pem\_filename) | Filename for the CA truststore file within the s3 bucket | `string` | `null` | no | | [component](#input\_component) | The variable encapsulating the name of this component | `string` | `"supapi"` | no | +| [core\_account\_id](#input\_core\_account\_id) | AWS Account ID for Core | `string` | `"000000000000"` | no | +| [core\_environment](#input\_core\_environment) | Environment of Core | `string` | `"prod"` | no | | [default\_tags](#input\_default\_tags) | A map of default tags to apply to all taggable resources within the component | `map(string)` | `{}` | no | | [enable\_backups](#input\_enable\_backups) | Enable backups | `bool` | `false` | no | | [environment](#input\_environment) | The name of the tfscaffold environment | `string` | n/a | yes | diff --git a/infrastructure/terraform/components/api/module_lambda_letter_updates_transformer.tf b/infrastructure/terraform/components/api/module_lambda_letter_updates_transformer.tf index 7cf6853c..a0c6471e 100644 --- a/infrastructure/terraform/components/api/module_lambda_letter_updates_transformer.tf +++ b/infrastructure/terraform/components/api/module_lambda_letter_updates_transformer.tf @@ -36,7 +36,8 @@ module "letter_updates_transformer" { log_subscription_role_arn = local.acct.log_subscription_role_arn lambda_env_vars = merge(local.common_lambda_env_vars, { - EVENTPUB_SNS_TOPIC_ARN = "${module.eventpub.sns_topic.arn}" + EVENTPUB_SNS_TOPIC_ARN = "${module.eventpub.sns_topic.arn}", + EVENT_SOURCE = "/data-plane/supplier-api/${var.group}/${var.environment}/letters" }) } diff --git a/internal/events/package.json b/internal/events/package.json index 2d799646..8005d13e 100644 --- a/internal/events/package.json +++ b/internal/events/package.json @@ -50,5 +50,5 @@ "typecheck": "tsc --noEmit" }, "types": "dist/index.d.ts", - "version": "1.0.5" + "version": "1.0.6" } diff --git a/internal/events/schemas/examples/letter.ACCEPTED.json b/internal/events/schemas/examples/letter.ACCEPTED.json index f9a1177c..a16cc536 100644 --- a/internal/events/schemas/examples/letter.ACCEPTED.json +++ b/internal/events/schemas/examples/letter.ACCEPTED.json @@ -17,7 +17,7 @@ "recordedtime": "2025-08-28T08:45:00.000Z", "severitynumber": 2, "severitytext": "INFO", - "source": "/data-plane/supplier-api/prod/update-status", + "source": "/data-plane/supplier-api/nhs-supplier-api-prod/main/update-status", "specversion": "1.0", "subject": "letter-origin/letter-rendering/letter/f47ac10b-58cc-4372-a567-0e02b2c3d479", "time": "2025-08-28T08:45:00.000Z", diff --git a/internal/events/schemas/examples/letter.FORWARDED.json b/internal/events/schemas/examples/letter.FORWARDED.json index bf12ed69..1ed34281 100644 --- a/internal/events/schemas/examples/letter.FORWARDED.json +++ b/internal/events/schemas/examples/letter.FORWARDED.json @@ -19,7 +19,7 @@ "recordedtime": "2025-08-28T08:45:00.000Z", "severitynumber": 2, "severitytext": "INFO", - "source": "/data-plane/supplier-api/prod/update-status", + "source": "/data-plane/supplier-api/nhs-supplier-api-prod/main/update-status", "specversion": "1.0", "subject": "letter-origin/letter-rendering/letter/f47ac10b-58cc-4372-a567-0e02b2c3d479", "time": "2025-08-28T08:45:00.000Z", diff --git a/internal/events/schemas/examples/letter.RETURNED.json b/internal/events/schemas/examples/letter.RETURNED.json index e273029d..87bcaf40 100644 --- a/internal/events/schemas/examples/letter.RETURNED.json +++ b/internal/events/schemas/examples/letter.RETURNED.json @@ -19,7 +19,7 @@ "recordedtime": "2025-08-28T08:45:00.000Z", "severitynumber": 2, "severitytext": "INFO", - "source": "/data-plane/supplier-api/prod/update-status", + "source": "/data-plane/supplier-api/nhs-supplier-api-prod/main/update-status", "specversion": "1.0", "subject": "letter-origin/letter-rendering/letter/f47ac10b-58cc-4372-a567-0e02b2c3d479", "time": "2025-08-28T08:45:00.000Z", diff --git a/lambdas/letter-updates-transformer/src/__tests__/letter-updates-transformer.test.ts b/lambdas/letter-updates-transformer/src/__tests__/letter-updates-transformer.test.ts index 17c271a0..ba27a9e7 100644 --- a/lambdas/letter-updates-transformer/src/__tests__/letter-updates-transformer.test.ts +++ b/lambdas/letter-updates-transformer/src/__tests__/letter-updates-transformer.test.ts @@ -26,15 +26,18 @@ jest.mock("crypto", () => ({ randomBytes: (size: number) => randomBytes[String(size)], })); -describe("letter-updates-transformer Lambda", () => { - const mockedDeps: jest.Mocked = { - snsClient: { send: jest.fn() } as unknown as SNSClient, - logger: { info: jest.fn(), error: jest.fn() } as unknown as pino.Logger, - env: { - EVENTPUB_SNS_TOPIC_ARN: "arn:aws:sns:region:account:topic", - } as unknown as EnvVars, - } as Deps; +const eventSource = + "/data-plane/supplier-api/nhs-supplier-api-dev/main/letters"; +const mockedDeps: jest.Mocked = { + snsClient: { send: jest.fn() } as unknown as SNSClient, + logger: { info: jest.fn(), error: jest.fn() } as unknown as pino.Logger, + env: { + EVENTPUB_SNS_TOPIC_ARN: "arn:aws:sns:region:account:topic", + EVENT_SOURCE: eventSource, + } as unknown as EnvVars, +} as Deps; +describe("letter-updates-transformer Lambda", () => { beforeEach(() => { jest.useFakeTimers(); }); @@ -50,7 +53,9 @@ describe("letter-updates-transformer Lambda", () => { const newLetter = generateLetter("PRINTED"); const expectedEntries = [ expect.objectContaining({ - Message: JSON.stringify(mapLetterToCloudEvent(newLetter)), + Message: JSON.stringify( + mapLetterToCloudEvent(newLetter, eventSource), + ), }), ]; @@ -76,7 +81,9 @@ describe("letter-updates-transformer Lambda", () => { newLetter.reasonCode = "R1"; const expectedEntries = [ expect.objectContaining({ - Message: JSON.stringify(mapLetterToCloudEvent(newLetter)), + Message: JSON.stringify( + mapLetterToCloudEvent(newLetter, eventSource), + ), }), ]; @@ -103,7 +110,9 @@ describe("letter-updates-transformer Lambda", () => { newLetter.reasonCode = "R2"; const expectedEntries = [ expect.objectContaining({ - Message: JSON.stringify(mapLetterToCloudEvent(newLetter)), + Message: JSON.stringify( + mapLetterToCloudEvent(newLetter, eventSource), + ), }), ]; @@ -135,14 +144,28 @@ describe("letter-updates-transformer Lambda", () => { expect(mockedDeps.snsClient.send).not.toHaveBeenCalled(); }); - it("does not publish non-modify events", async () => { + it("publishes INSERT events", async () => { const handler = createHandler(mockedDeps); const newLetter = generateLetter("ACCEPTED"); + const expectedEntries = [ + expect.objectContaining({ + Message: JSON.stringify( + mapLetterToCloudEvent(newLetter, eventSource), + ), + }), + ]; const testData = generateKinesisEvent([generateInsertRecord(newLetter)]); await handler(testData, mockDeep(), jest.fn()); - expect(mockedDeps.snsClient.send).not.toHaveBeenCalled(); + expect(mockedDeps.snsClient.send).toHaveBeenCalledWith( + expect.objectContaining({ + input: expect.objectContaining({ + TopicArn: "arn:aws:sns:region:account:topic", + PublishBatchRequestEntries: expectedEntries, + }), + }), + ); }); it("does not publish invalid letter data", async () => { @@ -159,6 +182,53 @@ describe("letter-updates-transformer Lambda", () => { expect(mockedDeps.snsClient.send).not.toHaveBeenCalled(); }); + + it("throws error when kinesis data contains malformed JSON", async () => { + const handler = createHandler(mockedDeps); + + // Create a Kinesis event with malformed JSON data + const malformedKinesisEvent: KinesisStreamEvent = { + Records: [{ + kinesis: { + data: Buffer.from("invalid-json-data").toString("base64"), + sequenceNumber: "12345" + } + } as any] + }; + + await expect( + handler(malformedKinesisEvent, mockDeep(), jest.fn()) + ).rejects.toThrow(); + + expect(mockedDeps.logger.error).toHaveBeenCalledWith( + expect.objectContaining({ + description: "Error extracting payload", + error: expect.any(Error), + record: expect.objectContaining({ + kinesis: expect.objectContaining({ + data: Buffer.from("invalid-json-data").toString("base64") + }) + }) + }) + ); + }); + + it("handles events with no records", async () => { + const handler = createHandler(mockedDeps); + + // Create a Kinesis event with empty Records array + const emptyKinesisEvent: KinesisStreamEvent = { Records: [] }; + + await handler(emptyKinesisEvent, mockDeep(), jest.fn()); + + expect(mockedDeps.logger.info).toHaveBeenCalledWith( + expect.objectContaining({ + description: "Number of records", + count: 0 + }) + ); + expect(mockedDeps.snsClient.send).not.toHaveBeenCalled(); + }); }); describe("Batching", () => { @@ -168,7 +238,7 @@ describe("letter-updates-transformer Lambda", () => { const newLetters = generateLetters(10, "PRINTED"); const expectedEntries = newLetters.map((letter) => expect.objectContaining({ - Message: JSON.stringify(mapLetterToCloudEvent(letter)), + Message: JSON.stringify(mapLetterToCloudEvent(letter, eventSource)), }), ); @@ -197,19 +267,19 @@ describe("letter-updates-transformer Lambda", () => { newLetters.slice(0, 10).map((letter, index) => expect.objectContaining({ Id: expect.stringMatching(new RegExp(`-${index}$`)), - Message: JSON.stringify(mapLetterToCloudEvent(letter)), + Message: JSON.stringify(mapLetterToCloudEvent(letter, eventSource)), }), ), newLetters.slice(10, 20).map((letter, index) => expect.objectContaining({ Id: expect.stringMatching(new RegExp(`-${index}$`)), - Message: JSON.stringify(mapLetterToCloudEvent(letter)), + Message: JSON.stringify(mapLetterToCloudEvent(letter, eventSource)), }), ), newLetters.slice(20).map((letter, index) => expect.objectContaining({ Id: expect.stringMatching(new RegExp(`-${index}$`)), - Message: JSON.stringify(mapLetterToCloudEvent(letter)), + Message: JSON.stringify(mapLetterToCloudEvent(letter, eventSource)), }), ), ]; diff --git a/lambdas/letter-updates-transformer/src/env.ts b/lambdas/letter-updates-transformer/src/env.ts index f93bbf39..a9138186 100644 --- a/lambdas/letter-updates-transformer/src/env.ts +++ b/lambdas/letter-updates-transformer/src/env.ts @@ -2,6 +2,7 @@ import { z } from "zod"; const EnvVarsSchema = z.object({ EVENTPUB_SNS_TOPIC_ARN: z.string(), + EVENT_SOURCE: z.string(), }); export type EnvVars = z.infer; diff --git a/lambdas/letter-updates-transformer/src/letter-updates-transformer.ts b/lambdas/letter-updates-transformer/src/letter-updates-transformer.ts index 85c6d2c9..3d6e670a 100644 --- a/lambdas/letter-updates-transformer/src/letter-updates-transformer.ts +++ b/lambdas/letter-updates-transformer/src/letter-updates-transformer.ts @@ -20,17 +20,17 @@ const BATCH_SIZE = 10; export default function createHandler(deps: Deps): Handler { return async (streamEvent: KinesisStreamEvent) => { deps.logger.info({ description: "Received event", streamEvent }); + deps.logger.info({ description: "Number of records", count: streamEvent.Records?.length || 0 }); - const cloudEvents: LetterEvent[] = streamEvent.Records.map((record) => + // Ensure logging by extracting all records first + const ddbRecords: DynamoDBRecord[] = streamEvent.Records.map((record) => extractPayload(record, deps), - ) - .filter((record) => record.eventName === "MODIFY") - .filter( - (record) => - isChanged(record, "status") || isChanged(record, "reasonCode"), - ) + ); + + const cloudEvents: LetterEvent[] = ddbRecords + .filter((record) => filterRecord(record, deps)) .map((element) => extractNewLetter(element)) - .map((element) => mapLetterToCloudEvent(element)); + .map((element) => mapLetterToCloudEvent(element, deps.env.EVENT_SOURCE)); for (const batch of generateBatches(cloudEvents)) { deps.logger.info({ @@ -50,14 +50,47 @@ export default function createHandler(deps: Deps): Handler { }; } +function filterRecord(record: DynamoDBRecord, deps: Deps): boolean { + let allowEvent = false; + if (record.eventName === "INSERT") { + allowEvent = true; + } + + if ( + record.eventName === "MODIFY" && + (isChanged(record, "status") || isChanged(record, "reasonCode")) + ) { + allowEvent = true; + } + + deps.logger.info({ + description: "Filtering record", + eventName: record.eventName, + eventId: record.eventID, + allowEvent, + }); + + return allowEvent; +} + function extractPayload( record: KinesisStreamRecord, deps: Deps, ): DynamoDBRecord { - // Kinesis data is base64 encoded - const payload = Buffer.from(record.kinesis.data, "base64").toString("utf8"); - deps.logger.info({ description: "Extracted dynamoDBRecord", payload }); - return JSON.parse(payload); + try { + deps.logger.info({ description: "Processing Kinesis record", recordId: record.kinesis.sequenceNumber }); + + // Kinesis data is base64 encoded + const payload = Buffer.from(record.kinesis.data, "base64").toString("utf8"); + deps.logger.info({ description: "Decoded payload", payload }); + + const jsonParsed = JSON.parse(payload); + deps.logger.info({ description: "Extracted dynamoDBRecord", jsonParsed }); + return jsonParsed; + } catch (error) { + deps.logger.error({ description: "Error extracting payload", error, record }); + throw error; + } } function isChanged(record: DynamoDBRecord, property: string): boolean { diff --git a/lambdas/letter-updates-transformer/src/mappers/__tests__/letter-mapper.test.ts b/lambdas/letter-updates-transformer/src/mappers/__tests__/letter-mapper.test.ts index fe5b5a79..d69524d0 100644 --- a/lambdas/letter-updates-transformer/src/mappers/__tests__/letter-mapper.test.ts +++ b/lambdas/letter-updates-transformer/src/mappers/__tests__/letter-mapper.test.ts @@ -14,7 +14,8 @@ describe("letter-mapper", () => { reasonText: "Reason text", updatedAt: "2025-11-24T15:55:18.000Z", } as Letter; - const event = mapLetterToCloudEvent(letter); + const source = "/data-plane/supplier-api/nhs-supplier-api-dev/main/letters"; + const event = mapLetterToCloudEvent(letter, source); // Check it conforms to the letter event schema - parse will throw an error if not $LetterEvent.parse(event); @@ -22,7 +23,7 @@ describe("letter-mapper", () => { expect(event.dataschema).toBe( `https://notify.nhs.uk/cloudevents/schemas/supplier-api/letter.PRINTED.${event.dataschemaversion}.schema.json`, ); - expect(event.dataschemaversion).toBe("1.0.5"); + expect(event.dataschemaversion).toBe("1.0.6"); expect(event.subject).toBe("letter-origin/supplier-api/letter/id1"); expect(event.time).toBe("2025-11-24T15:55:18.000Z"); expect(event.recordedtime).toBe("2025-11-24T15:55:18.000Z"); @@ -41,5 +42,6 @@ describe("letter-mapper", () => { event: event.id, }, }); + expect(event.source).toBe(source); }); }); diff --git a/lambdas/letter-updates-transformer/src/mappers/letter-mapper.ts b/lambdas/letter-updates-transformer/src/mappers/letter-mapper.ts index 759f5f0f..10885a9f 100644 --- a/lambdas/letter-updates-transformer/src/mappers/letter-mapper.ts +++ b/lambdas/letter-updates-transformer/src/mappers/letter-mapper.ts @@ -5,6 +5,7 @@ import { LetterForEventPub } from "../types"; export default function mapLetterToCloudEvent( letter: LetterForEventPub, + source: string, ): LetterEvent { const eventId = randomUUID(); const dataschemaversion = eventSchemaPackage.version; @@ -15,7 +16,7 @@ export default function mapLetterToCloudEvent( plane: "data", dataschema: `https://notify.nhs.uk/cloudevents/schemas/supplier-api/letter.${letter.status}.${dataschemaversion}.schema.json`, dataschemaversion, - source: "/data-plane/supplier-api/letters", + source, subject: `letter-origin/supplier-api/letter/${letter.id}`, data: {