diff --git a/infrastructure/terraform/components/api/README.md b/infrastructure/terraform/components/api/README.md
index 150af054..59cd0c85 100644
--- a/infrastructure/terraform/components/api/README.md
+++ b/infrastructure/terraform/components/api/README.md
@@ -12,6 +12,8 @@ No requirements.
| [aws\_account\_id](#input\_aws\_account\_id) | The AWS Account ID (numeric) | `string` | n/a | yes |
| [ca\_pem\_filename](#input\_ca\_pem\_filename) | Filename for the CA truststore file within the s3 bucket | `string` | `null` | no |
| [component](#input\_component) | The variable encapsulating the name of this component | `string` | `"supapi"` | no |
+| [core\_account\_id](#input\_core\_account\_id) | AWS Account ID for Core | `string` | `"000000000000"` | no |
+| [core\_environment](#input\_core\_environment) | Environment of Core | `string` | `"prod"` | no |
| [default\_tags](#input\_default\_tags) | A map of default tags to apply to all taggable resources within the component | `map(string)` | `{}` | no |
| [enable\_backups](#input\_enable\_backups) | Enable backups | `bool` | `false` | no |
| [environment](#input\_environment) | The name of the tfscaffold environment | `string` | n/a | yes |
diff --git a/infrastructure/terraform/components/api/module_lambda_letter_updates_transformer.tf b/infrastructure/terraform/components/api/module_lambda_letter_updates_transformer.tf
index 7cf6853c..a0c6471e 100644
--- a/infrastructure/terraform/components/api/module_lambda_letter_updates_transformer.tf
+++ b/infrastructure/terraform/components/api/module_lambda_letter_updates_transformer.tf
@@ -36,7 +36,8 @@ module "letter_updates_transformer" {
log_subscription_role_arn = local.acct.log_subscription_role_arn
lambda_env_vars = merge(local.common_lambda_env_vars, {
- EVENTPUB_SNS_TOPIC_ARN = "${module.eventpub.sns_topic.arn}"
+ EVENTPUB_SNS_TOPIC_ARN = "${module.eventpub.sns_topic.arn}",
+ EVENT_SOURCE = "/data-plane/supplier-api/${var.group}/${var.environment}/letters"
})
}
diff --git a/internal/events/package.json b/internal/events/package.json
index 2d799646..8005d13e 100644
--- a/internal/events/package.json
+++ b/internal/events/package.json
@@ -50,5 +50,5 @@
"typecheck": "tsc --noEmit"
},
"types": "dist/index.d.ts",
- "version": "1.0.5"
+ "version": "1.0.6"
}
diff --git a/internal/events/schemas/examples/letter.ACCEPTED.json b/internal/events/schemas/examples/letter.ACCEPTED.json
index f9a1177c..a16cc536 100644
--- a/internal/events/schemas/examples/letter.ACCEPTED.json
+++ b/internal/events/schemas/examples/letter.ACCEPTED.json
@@ -17,7 +17,7 @@
"recordedtime": "2025-08-28T08:45:00.000Z",
"severitynumber": 2,
"severitytext": "INFO",
- "source": "/data-plane/supplier-api/prod/update-status",
+ "source": "/data-plane/supplier-api/nhs-supplier-api-prod/main/update-status",
"specversion": "1.0",
"subject": "letter-origin/letter-rendering/letter/f47ac10b-58cc-4372-a567-0e02b2c3d479",
"time": "2025-08-28T08:45:00.000Z",
diff --git a/internal/events/schemas/examples/letter.FORWARDED.json b/internal/events/schemas/examples/letter.FORWARDED.json
index bf12ed69..1ed34281 100644
--- a/internal/events/schemas/examples/letter.FORWARDED.json
+++ b/internal/events/schemas/examples/letter.FORWARDED.json
@@ -19,7 +19,7 @@
"recordedtime": "2025-08-28T08:45:00.000Z",
"severitynumber": 2,
"severitytext": "INFO",
- "source": "/data-plane/supplier-api/prod/update-status",
+ "source": "/data-plane/supplier-api/nhs-supplier-api-prod/main/update-status",
"specversion": "1.0",
"subject": "letter-origin/letter-rendering/letter/f47ac10b-58cc-4372-a567-0e02b2c3d479",
"time": "2025-08-28T08:45:00.000Z",
diff --git a/internal/events/schemas/examples/letter.RETURNED.json b/internal/events/schemas/examples/letter.RETURNED.json
index e273029d..87bcaf40 100644
--- a/internal/events/schemas/examples/letter.RETURNED.json
+++ b/internal/events/schemas/examples/letter.RETURNED.json
@@ -19,7 +19,7 @@
"recordedtime": "2025-08-28T08:45:00.000Z",
"severitynumber": 2,
"severitytext": "INFO",
- "source": "/data-plane/supplier-api/prod/update-status",
+ "source": "/data-plane/supplier-api/nhs-supplier-api-prod/main/update-status",
"specversion": "1.0",
"subject": "letter-origin/letter-rendering/letter/f47ac10b-58cc-4372-a567-0e02b2c3d479",
"time": "2025-08-28T08:45:00.000Z",
diff --git a/lambdas/letter-updates-transformer/src/__tests__/letter-updates-transformer.test.ts b/lambdas/letter-updates-transformer/src/__tests__/letter-updates-transformer.test.ts
index 17c271a0..ba27a9e7 100644
--- a/lambdas/letter-updates-transformer/src/__tests__/letter-updates-transformer.test.ts
+++ b/lambdas/letter-updates-transformer/src/__tests__/letter-updates-transformer.test.ts
@@ -26,15 +26,18 @@ jest.mock("crypto", () => ({
randomBytes: (size: number) => randomBytes[String(size)],
}));
-describe("letter-updates-transformer Lambda", () => {
- const mockedDeps: jest.Mocked = {
- snsClient: { send: jest.fn() } as unknown as SNSClient,
- logger: { info: jest.fn(), error: jest.fn() } as unknown as pino.Logger,
- env: {
- EVENTPUB_SNS_TOPIC_ARN: "arn:aws:sns:region:account:topic",
- } as unknown as EnvVars,
- } as Deps;
+const eventSource =
+ "/data-plane/supplier-api/nhs-supplier-api-dev/main/letters";
+const mockedDeps: jest.Mocked = {
+ snsClient: { send: jest.fn() } as unknown as SNSClient,
+ logger: { info: jest.fn(), error: jest.fn() } as unknown as pino.Logger,
+ env: {
+ EVENTPUB_SNS_TOPIC_ARN: "arn:aws:sns:region:account:topic",
+ EVENT_SOURCE: eventSource,
+ } as unknown as EnvVars,
+} as Deps;
+describe("letter-updates-transformer Lambda", () => {
beforeEach(() => {
jest.useFakeTimers();
});
@@ -50,7 +53,9 @@ describe("letter-updates-transformer Lambda", () => {
const newLetter = generateLetter("PRINTED");
const expectedEntries = [
expect.objectContaining({
- Message: JSON.stringify(mapLetterToCloudEvent(newLetter)),
+ Message: JSON.stringify(
+ mapLetterToCloudEvent(newLetter, eventSource),
+ ),
}),
];
@@ -76,7 +81,9 @@ describe("letter-updates-transformer Lambda", () => {
newLetter.reasonCode = "R1";
const expectedEntries = [
expect.objectContaining({
- Message: JSON.stringify(mapLetterToCloudEvent(newLetter)),
+ Message: JSON.stringify(
+ mapLetterToCloudEvent(newLetter, eventSource),
+ ),
}),
];
@@ -103,7 +110,9 @@ describe("letter-updates-transformer Lambda", () => {
newLetter.reasonCode = "R2";
const expectedEntries = [
expect.objectContaining({
- Message: JSON.stringify(mapLetterToCloudEvent(newLetter)),
+ Message: JSON.stringify(
+ mapLetterToCloudEvent(newLetter, eventSource),
+ ),
}),
];
@@ -135,14 +144,28 @@ describe("letter-updates-transformer Lambda", () => {
expect(mockedDeps.snsClient.send).not.toHaveBeenCalled();
});
- it("does not publish non-modify events", async () => {
+ it("publishes INSERT events", async () => {
const handler = createHandler(mockedDeps);
const newLetter = generateLetter("ACCEPTED");
+ const expectedEntries = [
+ expect.objectContaining({
+ Message: JSON.stringify(
+ mapLetterToCloudEvent(newLetter, eventSource),
+ ),
+ }),
+ ];
const testData = generateKinesisEvent([generateInsertRecord(newLetter)]);
await handler(testData, mockDeep(), jest.fn());
- expect(mockedDeps.snsClient.send).not.toHaveBeenCalled();
+ expect(mockedDeps.snsClient.send).toHaveBeenCalledWith(
+ expect.objectContaining({
+ input: expect.objectContaining({
+ TopicArn: "arn:aws:sns:region:account:topic",
+ PublishBatchRequestEntries: expectedEntries,
+ }),
+ }),
+ );
});
it("does not publish invalid letter data", async () => {
@@ -159,6 +182,53 @@ describe("letter-updates-transformer Lambda", () => {
expect(mockedDeps.snsClient.send).not.toHaveBeenCalled();
});
+
+ it("throws error when kinesis data contains malformed JSON", async () => {
+ const handler = createHandler(mockedDeps);
+
+ // Create a Kinesis event with malformed JSON data
+ const malformedKinesisEvent: KinesisStreamEvent = {
+ Records: [{
+ kinesis: {
+ data: Buffer.from("invalid-json-data").toString("base64"),
+ sequenceNumber: "12345"
+ }
+ } as any]
+ };
+
+ await expect(
+ handler(malformedKinesisEvent, mockDeep(), jest.fn())
+ ).rejects.toThrow();
+
+ expect(mockedDeps.logger.error).toHaveBeenCalledWith(
+ expect.objectContaining({
+ description: "Error extracting payload",
+ error: expect.any(Error),
+ record: expect.objectContaining({
+ kinesis: expect.objectContaining({
+ data: Buffer.from("invalid-json-data").toString("base64")
+ })
+ })
+ })
+ );
+ });
+
+ it("handles events with no records", async () => {
+ const handler = createHandler(mockedDeps);
+
+ // Create a Kinesis event with empty Records array
+ const emptyKinesisEvent: KinesisStreamEvent = { Records: [] };
+
+ await handler(emptyKinesisEvent, mockDeep(), jest.fn());
+
+ expect(mockedDeps.logger.info).toHaveBeenCalledWith(
+ expect.objectContaining({
+ description: "Number of records",
+ count: 0
+ })
+ );
+ expect(mockedDeps.snsClient.send).not.toHaveBeenCalled();
+ });
});
describe("Batching", () => {
@@ -168,7 +238,7 @@ describe("letter-updates-transformer Lambda", () => {
const newLetters = generateLetters(10, "PRINTED");
const expectedEntries = newLetters.map((letter) =>
expect.objectContaining({
- Message: JSON.stringify(mapLetterToCloudEvent(letter)),
+ Message: JSON.stringify(mapLetterToCloudEvent(letter, eventSource)),
}),
);
@@ -197,19 +267,19 @@ describe("letter-updates-transformer Lambda", () => {
newLetters.slice(0, 10).map((letter, index) =>
expect.objectContaining({
Id: expect.stringMatching(new RegExp(`-${index}$`)),
- Message: JSON.stringify(mapLetterToCloudEvent(letter)),
+ Message: JSON.stringify(mapLetterToCloudEvent(letter, eventSource)),
}),
),
newLetters.slice(10, 20).map((letter, index) =>
expect.objectContaining({
Id: expect.stringMatching(new RegExp(`-${index}$`)),
- Message: JSON.stringify(mapLetterToCloudEvent(letter)),
+ Message: JSON.stringify(mapLetterToCloudEvent(letter, eventSource)),
}),
),
newLetters.slice(20).map((letter, index) =>
expect.objectContaining({
Id: expect.stringMatching(new RegExp(`-${index}$`)),
- Message: JSON.stringify(mapLetterToCloudEvent(letter)),
+ Message: JSON.stringify(mapLetterToCloudEvent(letter, eventSource)),
}),
),
];
diff --git a/lambdas/letter-updates-transformer/src/env.ts b/lambdas/letter-updates-transformer/src/env.ts
index f93bbf39..a9138186 100644
--- a/lambdas/letter-updates-transformer/src/env.ts
+++ b/lambdas/letter-updates-transformer/src/env.ts
@@ -2,6 +2,7 @@ import { z } from "zod";
const EnvVarsSchema = z.object({
EVENTPUB_SNS_TOPIC_ARN: z.string(),
+ EVENT_SOURCE: z.string(),
});
export type EnvVars = z.infer;
diff --git a/lambdas/letter-updates-transformer/src/letter-updates-transformer.ts b/lambdas/letter-updates-transformer/src/letter-updates-transformer.ts
index 85c6d2c9..3d6e670a 100644
--- a/lambdas/letter-updates-transformer/src/letter-updates-transformer.ts
+++ b/lambdas/letter-updates-transformer/src/letter-updates-transformer.ts
@@ -20,17 +20,17 @@ const BATCH_SIZE = 10;
export default function createHandler(deps: Deps): Handler {
return async (streamEvent: KinesisStreamEvent) => {
deps.logger.info({ description: "Received event", streamEvent });
+ deps.logger.info({ description: "Number of records", count: streamEvent.Records?.length || 0 });
- const cloudEvents: LetterEvent[] = streamEvent.Records.map((record) =>
+ // Ensure logging by extracting all records first
+ const ddbRecords: DynamoDBRecord[] = streamEvent.Records.map((record) =>
extractPayload(record, deps),
- )
- .filter((record) => record.eventName === "MODIFY")
- .filter(
- (record) =>
- isChanged(record, "status") || isChanged(record, "reasonCode"),
- )
+ );
+
+ const cloudEvents: LetterEvent[] = ddbRecords
+ .filter((record) => filterRecord(record, deps))
.map((element) => extractNewLetter(element))
- .map((element) => mapLetterToCloudEvent(element));
+ .map((element) => mapLetterToCloudEvent(element, deps.env.EVENT_SOURCE));
for (const batch of generateBatches(cloudEvents)) {
deps.logger.info({
@@ -50,14 +50,47 @@ export default function createHandler(deps: Deps): Handler {
};
}
+function filterRecord(record: DynamoDBRecord, deps: Deps): boolean {
+ let allowEvent = false;
+ if (record.eventName === "INSERT") {
+ allowEvent = true;
+ }
+
+ if (
+ record.eventName === "MODIFY" &&
+ (isChanged(record, "status") || isChanged(record, "reasonCode"))
+ ) {
+ allowEvent = true;
+ }
+
+ deps.logger.info({
+ description: "Filtering record",
+ eventName: record.eventName,
+ eventId: record.eventID,
+ allowEvent,
+ });
+
+ return allowEvent;
+}
+
function extractPayload(
record: KinesisStreamRecord,
deps: Deps,
): DynamoDBRecord {
- // Kinesis data is base64 encoded
- const payload = Buffer.from(record.kinesis.data, "base64").toString("utf8");
- deps.logger.info({ description: "Extracted dynamoDBRecord", payload });
- return JSON.parse(payload);
+ try {
+ deps.logger.info({ description: "Processing Kinesis record", recordId: record.kinesis.sequenceNumber });
+
+ // Kinesis data is base64 encoded
+ const payload = Buffer.from(record.kinesis.data, "base64").toString("utf8");
+ deps.logger.info({ description: "Decoded payload", payload });
+
+ const jsonParsed = JSON.parse(payload);
+ deps.logger.info({ description: "Extracted dynamoDBRecord", jsonParsed });
+ return jsonParsed;
+ } catch (error) {
+ deps.logger.error({ description: "Error extracting payload", error, record });
+ throw error;
+ }
}
function isChanged(record: DynamoDBRecord, property: string): boolean {
diff --git a/lambdas/letter-updates-transformer/src/mappers/__tests__/letter-mapper.test.ts b/lambdas/letter-updates-transformer/src/mappers/__tests__/letter-mapper.test.ts
index fe5b5a79..d69524d0 100644
--- a/lambdas/letter-updates-transformer/src/mappers/__tests__/letter-mapper.test.ts
+++ b/lambdas/letter-updates-transformer/src/mappers/__tests__/letter-mapper.test.ts
@@ -14,7 +14,8 @@ describe("letter-mapper", () => {
reasonText: "Reason text",
updatedAt: "2025-11-24T15:55:18.000Z",
} as Letter;
- const event = mapLetterToCloudEvent(letter);
+ const source = "/data-plane/supplier-api/nhs-supplier-api-dev/main/letters";
+ const event = mapLetterToCloudEvent(letter, source);
// Check it conforms to the letter event schema - parse will throw an error if not
$LetterEvent.parse(event);
@@ -22,7 +23,7 @@ describe("letter-mapper", () => {
expect(event.dataschema).toBe(
`https://notify.nhs.uk/cloudevents/schemas/supplier-api/letter.PRINTED.${event.dataschemaversion}.schema.json`,
);
- expect(event.dataschemaversion).toBe("1.0.5");
+ expect(event.dataschemaversion).toBe("1.0.6");
expect(event.subject).toBe("letter-origin/supplier-api/letter/id1");
expect(event.time).toBe("2025-11-24T15:55:18.000Z");
expect(event.recordedtime).toBe("2025-11-24T15:55:18.000Z");
@@ -41,5 +42,6 @@ describe("letter-mapper", () => {
event: event.id,
},
});
+ expect(event.source).toBe(source);
});
});
diff --git a/lambdas/letter-updates-transformer/src/mappers/letter-mapper.ts b/lambdas/letter-updates-transformer/src/mappers/letter-mapper.ts
index 759f5f0f..10885a9f 100644
--- a/lambdas/letter-updates-transformer/src/mappers/letter-mapper.ts
+++ b/lambdas/letter-updates-transformer/src/mappers/letter-mapper.ts
@@ -5,6 +5,7 @@ import { LetterForEventPub } from "../types";
export default function mapLetterToCloudEvent(
letter: LetterForEventPub,
+ source: string,
): LetterEvent {
const eventId = randomUUID();
const dataschemaversion = eventSchemaPackage.version;
@@ -15,7 +16,7 @@ export default function mapLetterToCloudEvent(
plane: "data",
dataschema: `https://notify.nhs.uk/cloudevents/schemas/supplier-api/letter.${letter.status}.${dataschemaversion}.schema.json`,
dataschemaversion,
- source: "/data-plane/supplier-api/letters",
+ source,
subject: `letter-origin/supplier-api/letter/${letter.id}`,
data: {