Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
376 changes: 283 additions & 93 deletions package-lock.json

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion tests/playwright/config/component/component.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,19 @@ export default defineConfig({
name: 'senders:setup',
testMatch: 'senders.setup.ts',
},
{
name: 'firehose:setup',
testMatch: 'firehose.setup.ts',
teardown: 'firehose:teardown',
},
{
name: 'firehose:teardown',
testMatch: 'firehose.teardown.ts',
},
{
name: 'component',
testMatch: '*.component.spec.ts',
dependencies: ['senders:setup'],
dependencies: ['senders:setup', 'firehose:setup'],
teardown: 'component:teardown',
},
{
Expand Down
21 changes: 21 additions & 0 deletions tests/playwright/config/component/firehose.setup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { test as setup } from '@playwright/test';
import {
MINIMUM_DESTINATION_BUFFER_INTERVAL,
MINIMUM_PROCESSOR_BUFFER_INTERVAL,
TERRAFORM_DESTINATION_BUFFER_INTERVAL,
TERRAFORM_PROCESSOR_BUFFER_INTERVAL,
} from 'constants/backend-constants';
import { alterFirehoseBufferIntervals } from 'helpers/data-firehose-helpers';

setup('Reduce Firehose buffer intervals', async () => {
await alterFirehoseBufferIntervals({
expected: {
destination: TERRAFORM_DESTINATION_BUFFER_INTERVAL,
processor: TERRAFORM_PROCESSOR_BUFFER_INTERVAL,
},
update: {
destination: MINIMUM_DESTINATION_BUFFER_INTERVAL,
processor: MINIMUM_PROCESSOR_BUFFER_INTERVAL,
},
});
});
21 changes: 21 additions & 0 deletions tests/playwright/config/component/firehose.teardown.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { test as teardown } from '@playwright/test';
import {
MINIMUM_DESTINATION_BUFFER_INTERVAL,
MINIMUM_PROCESSOR_BUFFER_INTERVAL,
TERRAFORM_DESTINATION_BUFFER_INTERVAL,
TERRAFORM_PROCESSOR_BUFFER_INTERVAL,
} from 'constants/backend-constants';
import { alterFirehoseBufferIntervals } from 'helpers/data-firehose-helpers';

teardown('Restore Firehose buffer intervals', async () => {
await alterFirehoseBufferIntervals({
expected: {
destination: MINIMUM_DESTINATION_BUFFER_INTERVAL,
processor: MINIMUM_PROCESSOR_BUFFER_INTERVAL,
},
update: {
destination: TERRAFORM_DESTINATION_BUFFER_INTERVAL,
processor: TERRAFORM_PROCESSOR_BUFFER_INTERVAL,
},
});
});
16 changes: 16 additions & 0 deletions tests/playwright/constants/backend-constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@ export const EVENT_BUS_LOG_GROUP_NAME = `/aws/vendedlogs/events/event-bus/${CSI}
// DynamoDB
export const TTL_TABLE_NAME = `${CSI}-ttl`;

// Glue
export const GLUE_DATABASE_NAME = `${CSI}-reporting`;
export const GLUE_TABLE_NAME = 'event_record';

// S3
export const LETTERS_S3_BUCKET_NAME = `nhs-${process.env.AWS_ACCOUNT_ID}-${REGION}-${ENV}-dl-letters`;
export const NON_PII_S3_BUCKET_NAME = `nhs-${process.env.AWS_ACCOUNT_ID}-${REGION}-${ENV}-dl-non-pii-data`;
export const PII_S3_BUCKET_NAME = `nhs-${process.env.AWS_ACCOUNT_ID}-${REGION}-${ENV}-dl-pii-data`;
export const REPORTING_S3_BUCKET_NAME = `nhs-${process.env.AWS_ACCOUNT_ID}-${REGION}-${ENV}-dl-reporting`;
export const REPORTING_S3_KEY_PREFIX = `kinesis-firehose-output/reporting/parquet/${GLUE_TABLE_NAME}`;
export const FILE_SAFE_S3_BUCKET_NAME = `nhs-${process.env.AWS_ACCOUNT_ID}-${REGION}-${ENV}-dl-file-safe`;
export const UNSCANNED_FILES_S3_BUCKET_NAME = `nhs-${process.env.AWS_ACCOUNT_ID}-${REGION}-main-acct-digi-unscanned-files`;
export const FILE_QUARANTINE_S3_BUCKET_NAME = `nhs-${process.env.AWS_ACCOUNT_ID}-${REGION}-${ENV}-dl-file-quarantine`;
Expand All @@ -61,3 +67,13 @@ export const PRINT_STATUS_HANDLER_LAMBDA_LOG_GROUP_NAME = `/aws/lambda/${CSI}-pr
export const PRINT_ANALYSER_LAMBDA_LOG_GROUP_NAME = `/aws/lambda/${CSI}-print-analyser`;
export const PRINT_SENDER_LAMBDA_LOG_GROUP_NAME = `/aws/lambda/${CSI}-print-sender`;
export const MOVE_SCANNED_FILES_LAMBDA_LOG_GROUP_NAME = `/aws/lambda/${CSI}-move-scanned-files`;

// Data Firehose
export const FIREHOSE_STREAM_NAME = `${CSI}-to-s3-reporting`;
export const TERRAFORM_DESTINATION_BUFFER_INTERVAL = 300;
export const TERRAFORM_PROCESSOR_BUFFER_INTERVAL = 301;
export const MINIMUM_DESTINATION_BUFFER_INTERVAL = 60;
export const MINIMUM_PROCESSOR_BUFFER_INTERVAL = 0;

// Athena
export const ATHENA_WORKGROUP_NAME = CSI;
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { expect, test } from '@playwright/test';
import {
ATHENA_WORKGROUP_NAME,
GLUE_DATABASE_NAME,
GLUE_TABLE_NAME,
REPORTING_S3_BUCKET_NAME,
REPORTING_S3_KEY_PREFIX,
} from 'constants/backend-constants';
import { MESHInboxMessageDownloaded } from 'digital-letters-events';
import messageDownloadedValidator from 'digital-letters-events/MESHInboxMessageDownloaded.js';
import {
QueryExecutionState,
getQueryState,
triggerTableMetadataRefresh,
} from 'helpers/athena-helpers';
import eventPublisher from 'helpers/event-bus-helpers';
import expectToPassEventually from 'helpers/expectations';
import { existsInS3 } from 'helpers/s3-helpers';
import { v4 as uuidv4 } from 'uuid';

test.describe('Digital Letters - Report Generator', () => {
test.beforeAll(async () => {
// We need to wait for events to make their way from EventBridge -> Firehose -> S3 -> Glue
test.setTimeout(300_000);

// Use a random sender ID, so we can be sure that if there are files with this prefix
// in S3 they've been created by this test.
const senderId = `report-generator-test-${uuidv4()}`;

const eventId = uuidv4();
await eventPublisher.sendEvents<MESHInboxMessageDownloaded>(
[
{
id: eventId,
specversion: '1.0',
source:
'/nhs/england/notify/production/primary/data-plane/digitalletters/mesh',
subject:
'customer/920fca11-596a-4eca-9c47-99f624614658/recipient/769acdd4-6a47-496f-999f-76a6fd2c3959',
type: 'uk.nhs.notify.digital.letters.mesh.inbox.message.downloaded.v1',
time: '2023-06-20T12:00:00Z',
recordedtime: '2023-06-20T12:00:00.250Z',
severitynumber: 2,
traceparent:
'00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01',
datacontenttype: 'application/json',
dataschema:
'https://notify.nhs.uk/cloudevents/schemas/digital-letters/2025-10-draft/data/digital-letters-mesh-inbox-message-downloaded-data.schema.json',
severitytext: 'INFO',
data: {
meshMessageId: '12345',
messageUri: `https://example.com/ttl/resource/${eventId}`,
messageReference: 'ref1',
senderId,
},
},
],
messageDownloadedValidator,
);

await expectToPassEventually(async () => {
const eventsHaveBeenWrittenToS3 = await existsInS3(
REPORTING_S3_BUCKET_NAME,
`${REPORTING_S3_KEY_PREFIX}/senderid=${senderId}`,
);

expect(eventsHaveBeenWrittenToS3).toBeTruthy();
}, 300_000);

// Trigger a metadata refresh for the Glue table, which will cause it to pick up any new files in S3
const refreshQueryExecutionId = await triggerTableMetadataRefresh(
GLUE_DATABASE_NAME,
GLUE_TABLE_NAME,
ATHENA_WORKGROUP_NAME,
);

await expectToPassEventually(async () => {
const refreshQueryState = await getQueryState(refreshQueryExecutionId);

expect(refreshQueryState).toEqual(QueryExecutionState.SUCCEEDED);
});
});

test('should test something', async () => {});
});
61 changes: 61 additions & 0 deletions tests/playwright/helpers/athena-helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import {
AthenaClient,
GetQueryExecutionCommand,
QueryExecutionState,
StartQueryExecutionCommand,
} from '@aws-sdk/client-athena';

export { QueryExecutionState } from '@aws-sdk/client-athena';

const client = new AthenaClient();

/**
* Triggers a metadata refresh for an Athena table using the MSCK REPAIR TABLE command.
*
* This will cause any new files in S3 to be picked up.
*
* @param database - The name of the Athena database
* @param tableName - The name of the table to repair
* @param workgroup - The Athena workgroup to run the query in
* @returns The query execution ID
*/
export async function triggerTableMetadataRefresh(
database: string,
tableName: string,
workgroup: string,
): Promise<string> {
const command = new StartQueryExecutionCommand({
QueryString: `MSCK REPAIR TABLE ${tableName};`,
QueryExecutionContext: {
Database: database,
Catalog: 'AwsDataCatalog',
},
WorkGroup: workgroup,
});

const response = await client.send(command);

if (!response.QueryExecutionId) {
throw new Error(
'Failed to start query execution - no query execution ID returned',
);
}

return response.QueryExecutionId;
}

export async function getQueryState(
queryExecutionId: string,
): Promise<QueryExecutionState> {
const queryExecutionInfo = await client.send(
new GetQueryExecutionCommand({
QueryExecutionId: queryExecutionId,
}),
);

if (!queryExecutionInfo.QueryExecution?.Status?.State) {
throw new Error('Failed to get query execution state');
}

return queryExecutionInfo.QueryExecution?.Status?.State;
}
116 changes: 116 additions & 0 deletions tests/playwright/helpers/data-firehose-helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import {
DescribeDeliveryStreamCommand,
DescribeDeliveryStreamCommandOutput,
ExtendedS3DestinationUpdate,
FirehoseClient,
UpdateDestinationCommand,
} from '@aws-sdk/client-firehose';
import { FIREHOSE_STREAM_NAME, REGION } from 'constants/backend-constants';

export async function alterFirehoseBufferIntervals(bufferIntervalConfig: {
expected: {
destination: number;
processor: number;
};
update: {
destination: number;
processor: number;
};
}) {
const client = new FirehoseClient({ region: REGION });

const deliveryStreamDetails: DescribeDeliveryStreamCommandOutput =
await client.send(
new DescribeDeliveryStreamCommand({
DeliveryStreamName: FIREHOSE_STREAM_NAME,
}),
);

const destinations =
deliveryStreamDetails.DeliveryStreamDescription?.Destinations ?? [];
if (destinations.length !== 1) {
throw new Error('expected a single delivery destination');
}

const destination = destinations[0];

const currentDestinationBufferInterval =
destination.ExtendedS3DestinationDescription?.BufferingHints
?.IntervalInSeconds;

if (
currentDestinationBufferInterval !==
bufferIntervalConfig.expected.destination
) {
throw new Error(
`Expected destination buffer size to be ${bufferIntervalConfig.expected.destination} - got ${currentDestinationBufferInterval} - cannot safely alter, has the default value changed in code or manually?`,
);
}

const processors =
destination.ExtendedS3DestinationDescription?.ProcessingConfiguration
?.Processors;

if (processors?.length !== 1) {
throw new Error('Expected one processor to be configured');
}

const processor = processors[0];

const currentProcessorBufferInterval = processor.Parameters?.find(
(p) => p.ParameterName === 'BufferIntervalInSeconds',
)?.ParameterValue;

const otherParams =
processor.Parameters?.filter(
(p) => p.ParameterName !== 'BufferIntervalInSeconds',
) ?? [];

if (
currentProcessorBufferInterval !==
bufferIntervalConfig.expected.processor.toString()
) {
throw new Error(
`Expected processor buffer size to be ${bufferIntervalConfig.expected.processor} - got ${currentProcessorBufferInterval} - cannot safely alter, has the default value changed in code or manually?`,
);
}

const destinationId = destination.DestinationId;

if (!destinationId) {
throw new Error('Destination ID not found');
}

const updatedDestinationConfig: ExtendedS3DestinationUpdate = {
...destination.ExtendedS3DestinationDescription,
BufferingHints: {
...destination.ExtendedS3DestinationDescription?.BufferingHints,
IntervalInSeconds: bufferIntervalConfig.update.destination,
},
ProcessingConfiguration: {
...destination.ExtendedS3DestinationDescription?.ProcessingConfiguration,
Processors: [
{
...processor,
Parameters: [
...otherParams,
{
ParameterName: 'BufferIntervalInSeconds',
ParameterValue: bufferIntervalConfig.update.processor.toString(),
},
],
},
],
},
};

await client.send(
new UpdateDestinationCommand({
DeliveryStreamName: FIREHOSE_STREAM_NAME,
DestinationId: destinationId,
CurrentDeliveryStreamVersionId:
deliveryStreamDetails.DeliveryStreamDescription?.VersionId,
ExtendedS3DestinationUpdate: updatedDestinationConfig,
}),
);
}
10 changes: 9 additions & 1 deletion tests/playwright/helpers/s3-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,12 @@ async function downloadFromS3(
};
}

export { downloadFromS3, uploadToS3 };
async function existsInS3(bucket: string, keyPrefix: string): Promise<boolean> {
const objects = await s3.send(
new ListObjectsV2Command({ Bucket: bucket, Prefix: keyPrefix }),
);

return (objects.Contents?.length ?? 0) > 0;
}

export { downloadFromS3, existsInS3, uploadToS3 };
2 changes: 2 additions & 0 deletions tests/playwright/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
{
"dependencies": {
"@aws-sdk/client-athena": "^3.900.0",
"@aws-sdk/client-cloudwatch-logs": "^3.900.0",
"@aws-sdk/client-dynamodb": "^3.900.0",
"@aws-sdk/client-firehose": "^3.900.0",
"@aws-sdk/client-lambda": "^3.900.0",
"@aws-sdk/client-s3": "^3.900.0",
"@aws-sdk/client-sqs": "^3.900.0",
Expand Down
Loading