From 10c28e4cad4be1b34860af6380e477ebf05edef9 Mon Sep 17 00:00:00 2001 From: jackleary Date: Thu, 19 Dec 2024 12:15:26 +0000 Subject: [PATCH 01/19] NRL-1187 Subscribe lambda to new firehose stream --- terraform/infrastructure/lambda.tf | 91 +++++++++++-------- .../infrastructure/modules/firehose/output.tf | 15 +++ 2 files changed, 67 insertions(+), 39 deletions(-) diff --git a/terraform/infrastructure/lambda.tf b/terraform/infrastructure/lambda.tf index 64b05c6bc..1aed7d7c5 100644 --- a/terraform/infrastructure/lambda.tf +++ b/terraform/infrastructure/lambda.tf @@ -20,9 +20,10 @@ module "consumer__readDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = [ - module.firehose__processor.firehose_subscription - ] + firehose_subscriptions = compact([ + module.firehose__processor.firehose_subscription, + module.firehose__processor.firehose_reporting_subscription + ]) handler = "read_document_reference.handler" retention = var.log_retention_period } @@ -49,9 +50,10 @@ module "consumer__countDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = [ - module.firehose__processor.firehose_subscription - ] + firehose_subscriptions = compact([ + module.firehose__processor.firehose_subscription, + module.firehose__processor.firehose_reporting_subscription + ]) handler = "count_document_reference.handler" retention = var.log_retention_period } @@ -78,9 +80,10 @@ module "consumer__searchDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = [ - module.firehose__processor.firehose_subscription - ] + firehose_subscriptions = compact([ + module.firehose__processor.firehose_subscription, + module.firehose__processor.firehose_reporting_subscription + ]) handler = "search_document_reference.handler" retention = var.log_retention_period } @@ -107,9 +110,10 @@ module "consumer__searchPostDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = [ - module.firehose__processor.firehose_subscription - ] + firehose_subscriptions = compact([ + module.firehose__processor.firehose_subscription, + module.firehose__processor.firehose_reporting_subscription + ]) handler = "search_post_document_reference.handler" retention = var.log_retention_period } @@ -137,9 +141,10 @@ module "producer__createDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = [ - module.firehose__processor.firehose_subscription - ] + firehose_subscriptions = compact([ + module.firehose__processor.firehose_subscription, + module.firehose__processor.firehose_reporting_subscription + ]) handler = "create_document_reference.handler" retention = var.log_retention_period } @@ -167,9 +172,10 @@ module "producer__deleteDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = [ - module.firehose__processor.firehose_subscription - ] + firehose_subscriptions = compact([ + module.firehose__processor.firehose_subscription, + module.firehose__processor.firehose_reporting_subscription + ]) handler = "delete_document_reference.handler" retention = var.log_retention_period } @@ -196,9 +202,10 @@ module "producer__readDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = [ - module.firehose__processor.firehose_subscription - ] + firehose_subscriptions = compact([ + module.firehose__processor.firehose_subscription, + module.firehose__processor.firehose_reporting_subscription + ]) handler = "read_document_reference.handler" retention = var.log_retention_period } @@ -225,9 +232,10 @@ module "producer__searchDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = [ - module.firehose__processor.firehose_subscription - ] + firehose_subscriptions = compact([ + module.firehose__processor.firehose_subscription, + module.firehose__processor.firehose_reporting_subscription + ]) handler = "search_document_reference.handler" retention = var.log_retention_period } @@ -254,9 +262,10 @@ module "producer__searchPostDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = [ - module.firehose__processor.firehose_subscription - ] + firehose_subscriptions = compact([ + module.firehose__processor.firehose_subscription, + module.firehose__processor.firehose_reporting_subscription + ]) handler = "search_post_document_reference.handler" retention = var.log_retention_period } @@ -284,9 +293,10 @@ module "producer__updateDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = [ - module.firehose__processor.firehose_subscription - ] + firehose_subscriptions = compact([ + module.firehose__processor.firehose_subscription, + module.firehose__processor.firehose_reporting_subscription + ]) handler = "update_document_reference.handler" retention = var.log_retention_period } @@ -314,9 +324,10 @@ module "producer__upsertDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = [ - module.firehose__processor.firehose_subscription - ] + firehose_subscriptions = compact([ + module.firehose__processor.firehose_subscription, + module.firehose__processor.firehose_reporting_subscription + ]) handler = "upsert_document_reference.handler" retention = var.log_retention_period } @@ -344,9 +355,10 @@ module "consumer__status" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = [ - module.firehose__processor.firehose_subscription - ] + firehose_subscriptions = compact([ + module.firehose__processor.firehose_subscription, + module.firehose__processor.firehose_reporting_subscription + ]) handler = "status.handler" retention = var.log_retention_period } @@ -375,9 +387,10 @@ module "producer__status" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = [ - module.firehose__processor.firehose_subscription - ] + firehose_subscriptions = compact([ + module.firehose__processor.firehose_subscription, + module.firehose__processor.firehose_reporting_subscription + ]) handler = "status.handler" retention = var.log_retention_period } diff --git a/terraform/infrastructure/modules/firehose/output.tf b/terraform/infrastructure/modules/firehose/output.tf index 9e83e76a3..a0b594642 100644 --- a/terraform/infrastructure/modules/firehose/output.tf +++ b/terraform/infrastructure/modules/firehose/output.tf @@ -31,3 +31,18 @@ output "firehose_subscription" { } } } + +output "firehose_reporting_subscription" { + value = var.reporting_infra_toggle ? { + destination = { + arn = local.iam_subscriptions.firehose_reporting_stream_arn + } + role = { + arn = aws_iam_role.firehose_subscription.arn + } + filter = { + # At least two items, and the first not any of INIT_START, START, END, REPORT + pattern = "[first_item_on_this_log_line != \"INIT_START\" && first_item_on_this_log_line != \"START\" && first_item_on_this_log_line != \"END\" && first_item_on_this_log_line != \"REPORT\", everything_else_on_this_log_line]" + } + } : null +} From 8152f64a69c41e770ad197b8e191afe49555b856 Mon Sep 17 00:00:00 2001 From: jackleary Date: Fri, 20 Dec 2024 12:40:36 +0000 Subject: [PATCH 02/19] NRL-1187 Subscribe lambda to new firehose stream --- terraform/infrastructure/lambda.tf | 117 ++++++++++------------------- terraform/infrastructure/locals.tf | 6 ++ 2 files changed, 45 insertions(+), 78 deletions(-) diff --git a/terraform/infrastructure/lambda.tf b/terraform/infrastructure/lambda.tf index 1aed7d7c5..4658b4f78 100644 --- a/terraform/infrastructure/lambda.tf +++ b/terraform/infrastructure/lambda.tf @@ -20,12 +20,9 @@ module "consumer__readDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = compact([ - module.firehose__processor.firehose_subscription, - module.firehose__processor.firehose_reporting_subscription - ]) - handler = "read_document_reference.handler" - retention = var.log_retention_period + firehose_subscriptions = local.firehose_lambda_subscriptions + handler = "read_document_reference.handler" + retention = var.log_retention_period } module "consumer__countDocumentReference" { @@ -50,12 +47,9 @@ module "consumer__countDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = compact([ - module.firehose__processor.firehose_subscription, - module.firehose__processor.firehose_reporting_subscription - ]) - handler = "count_document_reference.handler" - retention = var.log_retention_period + firehose_subscriptions = local.firehose_lambda_subscriptions + handler = "count_document_reference.handler" + retention = var.log_retention_period } module "consumer__searchDocumentReference" { @@ -80,12 +74,9 @@ module "consumer__searchDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = compact([ - module.firehose__processor.firehose_subscription, - module.firehose__processor.firehose_reporting_subscription - ]) - handler = "search_document_reference.handler" - retention = var.log_retention_period + firehose_subscriptions = local.firehose_lambda_subscriptions + handler = "search_document_reference.handler" + retention = var.log_retention_period } module "consumer__searchPostDocumentReference" { @@ -110,12 +101,9 @@ module "consumer__searchPostDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = compact([ - module.firehose__processor.firehose_subscription, - module.firehose__processor.firehose_reporting_subscription - ]) - handler = "search_post_document_reference.handler" - retention = var.log_retention_period + firehose_subscriptions = local.firehose_lambda_subscriptions + handler = "search_post_document_reference.handler" + retention = var.log_retention_period } module "producer__createDocumentReference" { @@ -141,12 +129,9 @@ module "producer__createDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = compact([ - module.firehose__processor.firehose_subscription, - module.firehose__processor.firehose_reporting_subscription - ]) - handler = "create_document_reference.handler" - retention = var.log_retention_period + firehose_subscriptions = local.firehose_lambda_subscriptions + handler = "create_document_reference.handler" + retention = var.log_retention_period } module "producer__deleteDocumentReference" { @@ -172,12 +157,9 @@ module "producer__deleteDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = compact([ - module.firehose__processor.firehose_subscription, - module.firehose__processor.firehose_reporting_subscription - ]) - handler = "delete_document_reference.handler" - retention = var.log_retention_period + firehose_subscriptions = local.firehose_lambda_subscriptions + handler = "delete_document_reference.handler" + retention = var.log_retention_period } module "producer__readDocumentReference" { @@ -202,12 +184,9 @@ module "producer__readDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = compact([ - module.firehose__processor.firehose_subscription, - module.firehose__processor.firehose_reporting_subscription - ]) - handler = "read_document_reference.handler" - retention = var.log_retention_period + firehose_subscriptions = local.firehose_lambda_subscriptions + handler = "read_document_reference.handler" + retention = var.log_retention_period } module "producer__searchDocumentReference" { @@ -232,12 +211,9 @@ module "producer__searchDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = compact([ - module.firehose__processor.firehose_subscription, - module.firehose__processor.firehose_reporting_subscription - ]) - handler = "search_document_reference.handler" - retention = var.log_retention_period + firehose_subscriptions = local.firehose_lambda_subscriptions + handler = "search_document_reference.handler" + retention = var.log_retention_period } module "producer__searchPostDocumentReference" { @@ -262,12 +238,9 @@ module "producer__searchPostDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = compact([ - module.firehose__processor.firehose_subscription, - module.firehose__processor.firehose_reporting_subscription - ]) - handler = "search_post_document_reference.handler" - retention = var.log_retention_period + firehose_subscriptions = local.firehose_lambda_subscriptions + handler = "search_post_document_reference.handler" + retention = var.log_retention_period } module "producer__updateDocumentReference" { @@ -293,12 +266,9 @@ module "producer__updateDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = compact([ - module.firehose__processor.firehose_subscription, - module.firehose__processor.firehose_reporting_subscription - ]) - handler = "update_document_reference.handler" - retention = var.log_retention_period + firehose_subscriptions = local.firehose_lambda_subscriptions + handler = "update_document_reference.handler" + retention = var.log_retention_period } module "producer__upsertDocumentReference" { @@ -324,12 +294,9 @@ module "producer__upsertDocumentReference" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = compact([ - module.firehose__processor.firehose_subscription, - module.firehose__processor.firehose_reporting_subscription - ]) - handler = "upsert_document_reference.handler" - retention = var.log_retention_period + firehose_subscriptions = local.firehose_lambda_subscriptions + handler = "upsert_document_reference.handler" + retention = var.log_retention_period } module "consumer__status" { @@ -355,12 +322,9 @@ module "consumer__status" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = compact([ - module.firehose__processor.firehose_subscription, - module.firehose__processor.firehose_reporting_subscription - ]) - handler = "status.handler" - retention = var.log_retention_period + firehose_subscriptions = local.firehose_lambda_subscriptions + handler = "status.handler" + retention = var.log_retention_period } @@ -387,10 +351,7 @@ module "producer__status" { local.pointers_kms_read_write_arn, local.auth_store_read_policy_arn ] - firehose_subscriptions = compact([ - module.firehose__processor.firehose_subscription, - module.firehose__processor.firehose_reporting_subscription - ]) - handler = "status.handler" - retention = var.log_retention_period + firehose_subscriptions = local.firehose_lambda_subscriptions + handler = "status.handler" + retention = var.log_retention_period } diff --git a/terraform/infrastructure/locals.tf b/terraform/infrastructure/locals.tf index dd1cd0f06..7c7ea7855 100644 --- a/terraform/infrastructure/locals.tf +++ b/terraform/infrastructure/locals.tf @@ -30,6 +30,12 @@ locals { # Logic / vars for reporting reporting_bucket_arn = local.is_dev_env ? data.aws_s3_bucket.source-data-bucket[0].arn : null + firehose_lambda_subscriptions = local.is_dev_env ? [ + module.firehose__processor.firehose_subscription, + module.firehose__processor.firehose_reporting_subscription + ] : [ + module.firehose__processor.firehose_subscription + ] # Logic / vars for splunk environment splunk_environment = local.is_sandbox_env ? "${var.account_name}sandbox" : var.account_name From 12f5e13fb8679b404c3771505464396b920f4b4b Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 30 Dec 2024 16:20:47 +0000 Subject: [PATCH 03/19] NRL-1187 update kms reosurces --- terraform/infrastructure/modules/firehose/iam_firehose.tf | 4 +--- terraform/infrastructure/modules/firehose/locals.tf | 7 +++++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/terraform/infrastructure/modules/firehose/iam_firehose.tf b/terraform/infrastructure/modules/firehose/iam_firehose.tf index 89e72587d..ac9fe75db 100644 --- a/terraform/infrastructure/modules/firehose/iam_firehose.tf +++ b/terraform/infrastructure/modules/firehose/iam_firehose.tf @@ -44,9 +44,7 @@ data "aws_iam_policy_document" "firehose" { "kms:Decrypt", ] - resources = [ - aws_kms_key.firehose.arn, - ] + resources = local.iam_kms_resources } statement { actions = [ diff --git a/terraform/infrastructure/modules/firehose/locals.tf b/terraform/infrastructure/modules/firehose/locals.tf index 4658e993a..548d5afe6 100644 --- a/terraform/infrastructure/modules/firehose/locals.tf +++ b/terraform/infrastructure/modules/firehose/locals.tf @@ -40,4 +40,11 @@ locals { firehose_reporting_stream_arn = var.reporting_infra_toggle ? aws_kinesis_firehose_delivery_stream.reporting_stream[0].arn : null } + iam_kms_resources = var.reporting_infra_toggle ? [ + aws_kms_key.firehose.arn, + aws_kms_key.glue.arn, + ] : [ + aws_kms_key.firehose.arn, + ] + } From 4c8322b4b7e9b617d77764d8eea744b2cb27241c Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 30 Dec 2024 17:22:19 +0000 Subject: [PATCH 04/19] NRL-1187 pull correct kms key --- terraform/infrastructure/data.tf | 4 ++++ terraform/infrastructure/firehose.tf | 1 + terraform/infrastructure/locals.tf | 1 + terraform/infrastructure/modules/firehose/locals.tf | 8 +++----- terraform/infrastructure/modules/firehose/vars.tf | 5 +++++ 5 files changed, 14 insertions(+), 5 deletions(-) diff --git a/terraform/infrastructure/data.tf b/terraform/infrastructure/data.tf index 2c3512fa3..e9634aec8 100644 --- a/terraform/infrastructure/data.tf +++ b/terraform/infrastructure/data.tf @@ -46,3 +46,7 @@ data "aws_s3_bucket" "source-data-bucket" { count = local.is_dev_env ? 1 : 0 bucket = "${local.shared_prefix}-source-data-bucket" } + +data "aws_kms_key" "glue" { + count = local.is_dev_env ? 1 : 0 +} diff --git a/terraform/infrastructure/firehose.tf b/terraform/infrastructure/firehose.tf index fea8712ee..11792c439 100644 --- a/terraform/infrastructure/firehose.tf +++ b/terraform/infrastructure/firehose.tf @@ -9,5 +9,6 @@ module "firehose__processor" { splunk_index = local.splunk_index destination = "splunk" reporting_bucket_arn = local.reporting_bucket_arn + reporting_kms_arn = local.reporting_kms_arn reporting_infra_toggle = local.is_dev_env } diff --git a/terraform/infrastructure/locals.tf b/terraform/infrastructure/locals.tf index 7c7ea7855..13594ef0a 100644 --- a/terraform/infrastructure/locals.tf +++ b/terraform/infrastructure/locals.tf @@ -30,6 +30,7 @@ locals { # Logic / vars for reporting reporting_bucket_arn = local.is_dev_env ? data.aws_s3_bucket.source-data-bucket[0].arn : null + reporting_kms_arn = local.is_dev_env ? data.aws_kms_key.glue[0].arn : null firehose_lambda_subscriptions = local.is_dev_env ? [ module.firehose__processor.firehose_subscription, module.firehose__processor.firehose_reporting_subscription diff --git a/terraform/infrastructure/modules/firehose/locals.tf b/terraform/infrastructure/modules/firehose/locals.tf index 548d5afe6..2714ab2b7 100644 --- a/terraform/infrastructure/modules/firehose/locals.tf +++ b/terraform/infrastructure/modules/firehose/locals.tf @@ -40,11 +40,9 @@ locals { firehose_reporting_stream_arn = var.reporting_infra_toggle ? aws_kinesis_firehose_delivery_stream.reporting_stream[0].arn : null } - iam_kms_resources = var.reporting_infra_toggle ? [ + iam_kms_resources = compact([ aws_kms_key.firehose.arn, - aws_kms_key.glue.arn, - ] : [ - aws_kms_key.firehose.arn, - ] + data.aws_kms_key.glue.arn + ]) } diff --git a/terraform/infrastructure/modules/firehose/vars.tf b/terraform/infrastructure/modules/firehose/vars.tf index dec876c12..e98affd1d 100644 --- a/terraform/infrastructure/modules/firehose/vars.tf +++ b/terraform/infrastructure/modules/firehose/vars.tf @@ -40,6 +40,11 @@ variable "reporting_bucket_arn" { default = null } +variable "reporting_kms_arn" { + type = string + default = null +} + variable "reporting_infra_toggle" { type = bool } From 14331f0bb95b2556a31cc4c12507e5f861440387 Mon Sep 17 00:00:00 2001 From: jackleary Date: Tue, 31 Dec 2024 10:34:02 +0000 Subject: [PATCH 05/19] NRL-1187 pull correct kms key --- terraform/infrastructure/modules/firehose/locals.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/terraform/infrastructure/modules/firehose/locals.tf b/terraform/infrastructure/modules/firehose/locals.tf index 2714ab2b7..248129930 100644 --- a/terraform/infrastructure/modules/firehose/locals.tf +++ b/terraform/infrastructure/modules/firehose/locals.tf @@ -42,7 +42,7 @@ locals { iam_kms_resources = compact([ aws_kms_key.firehose.arn, - data.aws_kms_key.glue.arn + local.reporting_kms_arn ]) } From ba9367eeea1751870f125d01100dcd6f37ecbd31 Mon Sep 17 00:00:00 2001 From: jackleary Date: Tue, 31 Dec 2024 12:31:39 +0000 Subject: [PATCH 06/19] NRL-1187 pull correct kms key --- terraform/infrastructure/data.tf | 3 ++- terraform/infrastructure/modules/firehose/locals.tf | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/terraform/infrastructure/data.tf b/terraform/infrastructure/data.tf index e9634aec8..39c2cce04 100644 --- a/terraform/infrastructure/data.tf +++ b/terraform/infrastructure/data.tf @@ -48,5 +48,6 @@ data "aws_s3_bucket" "source-data-bucket" { } data "aws_kms_key" "glue" { - count = local.is_dev_env ? 1 : 0 + count = local.is_dev_env ? 1 : 0 + key_id = "alias/${local.shared_prefix}-glue" } diff --git a/terraform/infrastructure/modules/firehose/locals.tf b/terraform/infrastructure/modules/firehose/locals.tf index 248129930..3cc4c229e 100644 --- a/terraform/infrastructure/modules/firehose/locals.tf +++ b/terraform/infrastructure/modules/firehose/locals.tf @@ -42,7 +42,7 @@ locals { iam_kms_resources = compact([ aws_kms_key.firehose.arn, - local.reporting_kms_arn + var.reporting_kms_arn ]) } From 9f4f5d1a2425901465139c278ab9cdedbc094cba Mon Sep 17 00:00:00 2001 From: jackleary Date: Tue, 31 Dec 2024 16:42:49 +0000 Subject: [PATCH 07/19] NRL-1187 pull correct kms key --- terraform/infrastructure/modules/firehose/iam_firehose.tf | 1 + terraform/infrastructure/modules/firehose/locals.tf | 1 + 2 files changed, 2 insertions(+) diff --git a/terraform/infrastructure/modules/firehose/iam_firehose.tf b/terraform/infrastructure/modules/firehose/iam_firehose.tf index ac9fe75db..340b12fb6 100644 --- a/terraform/infrastructure/modules/firehose/iam_firehose.tf +++ b/terraform/infrastructure/modules/firehose/iam_firehose.tf @@ -31,6 +31,7 @@ data "aws_iam_policy_document" "firehose" { aws_s3_bucket.firehose.arn, "${aws_s3_bucket.firehose.arn}/*", var.reporting_bucket_arn, + local.iam_firehose.reporting_s3_arn, ]) effect = "Allow" } diff --git a/terraform/infrastructure/modules/firehose/locals.tf b/terraform/infrastructure/modules/firehose/locals.tf index 3cc4c229e..80a0f3367 100644 --- a/terraform/infrastructure/modules/firehose/locals.tf +++ b/terraform/infrastructure/modules/firehose/locals.tf @@ -34,6 +34,7 @@ locals { iam_firehose = { cloudwatch_reporting_log_group_arn = var.reporting_infra_toggle ? aws_cloudwatch_log_group.firehose_reporting[0].arn : null cloudwatch_reporting_log_stream_arn = var.reporting_infra_toggle ? aws_cloudwatch_log_stream.firehose_reporting[0].arn : null + reporting_s3_arn = var.reporting_infra_toggle ? "${var.reporting_bucket_arn}/*" : null } iam_subscriptions = { From 0c66f731da04285663a8ef1b73385e9153aa0c1e Mon Sep 17 00:00:00 2001 From: jackleary Date: Tue, 31 Dec 2024 17:07:22 +0000 Subject: [PATCH 08/19] NRL-1187 decompress logs --- terraform/infrastructure/modules/firehose/kinesis.tf | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/terraform/infrastructure/modules/firehose/kinesis.tf b/terraform/infrastructure/modules/firehose/kinesis.tf index 7c0c4a288..c164dc67d 100644 --- a/terraform/infrastructure/modules/firehose/kinesis.tf +++ b/terraform/infrastructure/modules/firehose/kinesis.tf @@ -67,7 +67,16 @@ resource "aws_kinesis_firehose_delivery_stream" "reporting_stream" { bucket_arn = var.reporting_bucket_arn processing_configuration { - enabled = "false" + enabled = "true" + + processors { + type = "Decompression" + parameters { + parameter_name = "CompressionFormat" + parameter_value = "GZIP" + } + } + } cloudwatch_logging_options { From 47e7516a7ba4f57948997eafccecb80f562cb51c Mon Sep 17 00:00:00 2001 From: jackleary Date: Tue, 31 Dec 2024 17:20:22 +0000 Subject: [PATCH 09/19] NRL-1187 decompress logs --- terraform/infrastructure/modules/firehose/kinesis.tf | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/terraform/infrastructure/modules/firehose/kinesis.tf b/terraform/infrastructure/modules/firehose/kinesis.tf index c164dc67d..b44735357 100644 --- a/terraform/infrastructure/modules/firehose/kinesis.tf +++ b/terraform/infrastructure/modules/firehose/kinesis.tf @@ -77,6 +77,15 @@ resource "aws_kinesis_firehose_delivery_stream" "reporting_stream" { } } + processors { + type = "CloudWatchLogProcessing" + + parameters { + parameter_name = "DataMessageExtraction" + parameter_value = "true" + } + } + } cloudwatch_logging_options { From 71816b2ffd68ffbafa09e5b1a97c3624783a6746 Mon Sep 17 00:00:00 2001 From: jackleary Date: Sun, 5 Jan 2025 11:29:15 +0000 Subject: [PATCH 10/19] NRL-1187 update glue script to format data, update infrastructure for athena --- .../modules/athena/athena.tf | 18 +++--- .../modules/athena/outputs.tf | 6 +- .../modules/glue/glue.tf | 24 +++---- .../modules/glue/iam.tf | 54 +++++++++++++++- .../modules/glue/s3.tf | 5 +- .../modules/glue/src/main.py | 15 +++-- .../modules/glue/src/pipeline.py | 8 ++- .../modules/glue/src/transformations.py | 63 ++++++++++++++++++- 8 files changed, 153 insertions(+), 40 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/athena/athena.tf b/terraform/account-wide-infrastructure/modules/athena/athena.tf index b5765e113..200d5c787 100644 --- a/terraform/account-wide-infrastructure/modules/athena/athena.tf +++ b/terraform/account-wide-infrastructure/modules/athena/athena.tf @@ -1,15 +1,15 @@ -resource "aws_athena_database" "reporting-db" { - name = var.database +# resource "aws_athena_database" "reporting-db" { +# name = var.database - bucket = var.target_bucket_name +# bucket = var.target_bucket_name - encryption_configuration { - encryption_option = "SSE_KMS" - kms_key = aws_kms_key.athena.arn - } +# encryption_configuration { +# encryption_option = "SSE_KMS" +# kms_key = aws_kms_key.athena.arn +# } - force_destroy = true -} +# force_destroy = true +# } resource "aws_athena_workgroup" "athena" { name = "${var.name_prefix}-athena-wg" diff --git a/terraform/account-wide-infrastructure/modules/athena/outputs.tf b/terraform/account-wide-infrastructure/modules/athena/outputs.tf index 574aeb3f8..aa2feb764 100644 --- a/terraform/account-wide-infrastructure/modules/athena/outputs.tf +++ b/terraform/account-wide-infrastructure/modules/athena/outputs.tf @@ -6,6 +6,6 @@ output "bucket" { value = aws_s3_bucket.athena } -output "database" { - value = aws_athena_database.reporting-db -} +# output "database" { +# value = aws_athena_database.reporting-db +# } diff --git a/terraform/account-wide-infrastructure/modules/glue/glue.tf b/terraform/account-wide-infrastructure/modules/glue/glue.tf index 64cca24f6..c1e4c4065 100644 --- a/terraform/account-wide-infrastructure/modules/glue/glue.tf +++ b/terraform/account-wide-infrastructure/modules/glue/glue.tf @@ -1,16 +1,16 @@ # Create Glue Data Catalog Database -resource "aws_glue_catalog_database" "raw_log_database" { - name = "${var.name_prefix}-raw_log" - location_uri = "${aws_s3_bucket.source-data-bucket.id}/" +resource "aws_glue_catalog_database" "log_database" { + name = "${var.name_prefix}-reporting" + location_uri = "${aws_s3_bucket.target-data-bucket.id}/logs/" } # Create Glue Crawler -resource "aws_glue_crawler" "raw_log_crawler" { - name = "${var.name_prefix}-raw-log-crawler" - database_name = aws_glue_catalog_database.raw_log_database.name +resource "aws_glue_crawler" "log_crawler" { + name = "${var.name_prefix}-log-crawler" + database_name = aws_glue_catalog_database.log_database.name role = aws_iam_role.glue_service_role.name s3_target { - path = "${aws_s3_bucket.source-data-bucket.id}/" + path = "${aws_s3_bucket.target-data-bucket.id}/logs/" } schema_change_policy { delete_behavior = "LOG" @@ -22,11 +22,11 @@ resource "aws_glue_crawler" "raw_log_crawler" { } }) } -resource "aws_glue_trigger" "raw_log_trigger" { +resource "aws_glue_trigger" "log_trigger" { name = "${var.name_prefix}-org-report-trigger" type = "ON_DEMAND" actions { - crawler_name = aws_glue_crawler.raw_log_crawler.name + crawler_name = aws_glue_crawler.log_crawler.name } } @@ -49,9 +49,9 @@ resource "aws_glue_job" "glue_job" { "--enable-auto-scaling" = "true" "--enable-continous-cloudwatch-log" = "true" "--datalake-formats" = "delta" - "--source-path" = "s3://${aws_s3_bucket.source-data-bucket.id}/" # Specify the source S3 path - "--destination-path" = "s3://${aws_s3_bucket.target-data-bucket.id}/" # Specify the destination S3 path - "--job-name" = "poc-glue-job" + "--source_path" = "s3://${aws_s3_bucket.target-data-bucket.id}/" # Specify the source S3 path + "--target_path" = "s3://${aws_s3_bucket.target-data-bucket.id}/logs" # Specify the destination S3 path + "--job_name" = "poc-glue-job" "--enable-continuous-log-filter" = "true" "--enable-metrics" = "true" "--extra-py-files" = "s3://${aws_s3_bucket.code-bucket.id}/src.zip" diff --git a/terraform/account-wide-infrastructure/modules/glue/iam.tf b/terraform/account-wide-infrastructure/modules/glue/iam.tf index 890b47593..00ca397a9 100644 --- a/terraform/account-wide-infrastructure/modules/glue/iam.tf +++ b/terraform/account-wide-infrastructure/modules/glue/iam.tf @@ -15,7 +15,57 @@ resource "aws_iam_role" "glue_service_role" { }) } +data "aws_iam_policy_document" "glue_service" { + statement { + actions = [ + "s3:AbortMultipartUpload", + "s3:GetBucketLocation", + "s3:GetObject", + "s3:ListBucket", + "s3:ListBucketMultipartUploads", + "s3:PutObject", + "s3:DeleteObject", + ] + + resources = compact([ + aws_s3_bucket.source-data-bucket.arn, + "${aws_s3_bucket.source-data-bucket.arn}/*", + aws_s3_bucket.target-data-bucket.arn, + "${aws_s3_bucket.target-data-bucket.arn}/*", + aws_s3_bucket.code-bucket.arn, + "${aws_s3_bucket.code-bucket.arn}/*", + ]) + effect = "Allow" + } + + statement { + actions = [ + "kms:DescribeKey", + "kms:GenerateDataKey*", + "kms:Encrypt", + "kms:ReEncrypt*", + "kms:Decrypt", + ] + + resources = [ + aws_kms_key.glue.arn, + ] + + effect = "Allow" + } +} + +resource "aws_iam_policy" "glue_service" { + name = "${var.name_prefix}-glue" + policy = data.aws_iam_policy_document.glue_service.json +} + resource "aws_iam_role_policy_attachment" "glue_service" { - role = aws_iam_role.glue_service_role.id - policy_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole" + role = aws_iam_role.glue_service_role.name + policy_arn = aws_iam_policy.glue_service.arn } + +# resource "aws_iam_role_policy_attachment" "glue_service" { +# role = aws_iam_role.glue_service_role.id +# policy_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole" +# } diff --git a/terraform/account-wide-infrastructure/modules/glue/s3.tf b/terraform/account-wide-infrastructure/modules/glue/s3.tf index 4695f2b5b..56ed72010 100644 --- a/terraform/account-wide-infrastructure/modules/glue/s3.tf +++ b/terraform/account-wide-infrastructure/modules/glue/s3.tf @@ -174,6 +174,7 @@ data "archive_file" "python" { resource "aws_s3_object" "zip" { bucket = aws_s3_bucket.code-bucket.bucket - key = "main.py" - source = "${path.module}/files/src.zip" + key = "src.zip" + source = data.archive_file.python.output_path + etag = filemd5(data.archive_file.python.output_path) } diff --git a/terraform/account-wide-infrastructure/modules/glue/src/main.py b/terraform/account-wide-infrastructure/modules/glue/src/main.py index a29ef78d8..3e030accb 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/main.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/main.py @@ -1,25 +1,24 @@ import sys from awsglue.utils import getResolvedOptions +from pipeline import LogPipeline from pyspark.context import SparkContext -from src.pipeline import LogPipeline -from src.transformations import placeholder +from transformations import logSchema, placeholder # Get arguments from AWS Glue job -args = getResolvedOptions( - sys.argv, ["JOB_NAME", "SOURCE_PATH", "TARGET_PATH", "PARTITION_COLS"] -) +args = getResolvedOptions(sys.argv, ["job_name", "source_path", "target_path"]) # Start Glue context sc = SparkContext() -partition_cols = args["PARTITION_COLS"].split(",") if "PARTITION_COLS" in args else [] +partition_cols = args["partition_cols"].split(",") if "partition_cols" in args else [] # Initialize ETL process etl_job = LogPipeline( spark_context=sc, - source_path=args["SOURCE_PATH"], - target_path=args["TARGET_PATH"], + source_path=args["source_path"], + target_path=args["target_path"], + schema=logSchema, partition_cols=partition_cols, transformations=[placeholder], ) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index 50c34af23..a0844fb27 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -1,4 +1,4 @@ -from src.instances import GlueContextSingleton, LoggerSingleton +from instances import GlueContextSingleton, LoggerSingleton class LogPipeline: @@ -7,7 +7,8 @@ def __init__( spark_context, source_path, target_path, - partition_cols=None, + schema, + partition_cols=[], transformations=[], ): """Initialize Glue context, Spark session, logger, and paths""" @@ -16,6 +17,7 @@ def __init__( self.logger = LoggerSingleton().logger self.source_path = source_path self.target_path = target_path + self.schema = schema self.partition_cols = partition_cols self.transformations = transformations @@ -36,7 +38,7 @@ def run(self): def extract(self): """Extract JSON data from S3""" self.logger.info(f"Extracting data from {self.source_path} as JSON") - return self.spark.read.json(self.source_path) + return self.spark.read.schema(self.schema).json(self.source_path) def transform(self, dataframe): """Apply a list of transformations on the dataframe""" diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index 1d59d52bc..4e15d3a7e 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -1 +1,62 @@ -def placeholder(): ... +from pyspark.sql.types import ( + BooleanType, + StringType, + StructField, + StructType, + TimestampType, +) + +logSchema = StructType( + [ + StructField("time", TimestampType(), True), + StructField("index", StringType(), True), + StructField("host", StringType(), True), + StructField("source", StringType(), True), + StructField( + "event", + StructType( + [ + StructField("level", StringType(), True), + StructField("location", StringType(), True), + StructField("message", StringType(), True), + StructField("timestamp", TimestampType(), True), + StructField("service", StringType(), True), + StructField("cold_start", BooleanType(), True), + StructField("function_name", StringType(), True), + StructField("function_memory_size", StringType(), True), + StructField("function_arn", StringType(), True), + StructField("function_request_id", StringType(), True), + StructField("correlation_id", StringType(), True), + StructField("method", StringType(), True), + StructField("path", StringType(), True), + StructField( + "headers", + StructType( + [ + StructField("accept", StringType(), True), + StructField("accept-encoding", StringType(), True), + StructField("Authorization", StringType(), True), + StructField("Host", StringType(), True), + StructField( + "NHSD-Connection-Metadata", StringType(), True + ), + StructField("NHSD-Correlation-Id", StringType(), True), + StructField("User-Agent", StringType(), True), + StructField("X-Forwarded-For", StringType(), True), + StructField("X-Request-Id", StringType(), True), + ] + ), + True, + ), + StructField("log_reference", StringType(), True), + StructField("xray_trace_id", StringType(), True), + ] + ), + True, + ), + ] +) + + +def placeholder(df): + return df From 437e74aa650d70d25d9ef2bf341c111a8b93b6e7 Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 13 Jan 2025 08:33:12 +0000 Subject: [PATCH 11/19] NRL-1187 update glue source path and iam role for crawler --- .../modules/glue/glue.tf | 2 +- .../modules/glue/iam.tf | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/glue.tf b/terraform/account-wide-infrastructure/modules/glue/glue.tf index c1e4c4065..b2c4e262e 100644 --- a/terraform/account-wide-infrastructure/modules/glue/glue.tf +++ b/terraform/account-wide-infrastructure/modules/glue/glue.tf @@ -49,7 +49,7 @@ resource "aws_glue_job" "glue_job" { "--enable-auto-scaling" = "true" "--enable-continous-cloudwatch-log" = "true" "--datalake-formats" = "delta" - "--source_path" = "s3://${aws_s3_bucket.target-data-bucket.id}/" # Specify the source S3 path + "--source_path" = "s3://${aws_s3_bucket.source-data-bucket.id}/" # Specify the source S3 path "--target_path" = "s3://${aws_s3_bucket.target-data-bucket.id}/logs" # Specify the destination S3 path "--job_name" = "poc-glue-job" "--enable-continuous-log-filter" = "true" diff --git a/terraform/account-wide-infrastructure/modules/glue/iam.tf b/terraform/account-wide-infrastructure/modules/glue/iam.tf index 00ca397a9..d86e18096 100644 --- a/terraform/account-wide-infrastructure/modules/glue/iam.tf +++ b/terraform/account-wide-infrastructure/modules/glue/iam.tf @@ -53,6 +53,21 @@ data "aws_iam_policy_document" "glue_service" { effect = "Allow" } + + statement { + actions = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ] + + resources = [ + "arn:aws:logs:*:*:*:/aws-glue/*", + # "arn:aws:logs:*:*:*:/customlogs/*" + ] + + effect = "Allow" + } } resource "aws_iam_policy" "glue_service" { From b4c073edb5f7aaf87b135ef171cf04dd05a3e4b5 Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 13 Jan 2025 09:31:38 +0000 Subject: [PATCH 12/19] NRL-1187 update glue iam role --- .../account-wide-infrastructure/modules/glue/iam.tf | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/terraform/account-wide-infrastructure/modules/glue/iam.tf b/terraform/account-wide-infrastructure/modules/glue/iam.tf index d86e18096..87e1c5a66 100644 --- a/terraform/account-wide-infrastructure/modules/glue/iam.tf +++ b/terraform/account-wide-infrastructure/modules/glue/iam.tf @@ -68,6 +68,18 @@ data "aws_iam_policy_document" "glue_service" { effect = "Allow" } + + statement { + actions = [ + "glue: *", + ] + + resources = [ + "*" + ] + + effect = "Allow" + } } resource "aws_iam_policy" "glue_service" { From 224a1fd2087e5eca26c840255d4edd3da9c46996 Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 13 Jan 2025 10:29:50 +0000 Subject: [PATCH 13/19] NRL-1187 update glue iam role --- terraform/account-wide-infrastructure/modules/glue/iam.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/iam.tf b/terraform/account-wide-infrastructure/modules/glue/iam.tf index 87e1c5a66..30fdb1707 100644 --- a/terraform/account-wide-infrastructure/modules/glue/iam.tf +++ b/terraform/account-wide-infrastructure/modules/glue/iam.tf @@ -71,7 +71,7 @@ data "aws_iam_policy_document" "glue_service" { statement { actions = [ - "glue: *", + "glue:*", ] resources = [ From ab4e6b3e4434f71993d4921e5cbdd87bedba12cb Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 13 Jan 2025 10:33:02 +0000 Subject: [PATCH 14/19] NRL-1187 remove unused resources --- .../modules/athena/athena.tf | 13 ------------- .../modules/athena/outputs.tf | 4 ---- .../account-wide-infrastructure/modules/glue/iam.tf | 5 ----- 3 files changed, 22 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/athena/athena.tf b/terraform/account-wide-infrastructure/modules/athena/athena.tf index 200d5c787..873ecdf7c 100644 --- a/terraform/account-wide-infrastructure/modules/athena/athena.tf +++ b/terraform/account-wide-infrastructure/modules/athena/athena.tf @@ -1,16 +1,3 @@ -# resource "aws_athena_database" "reporting-db" { -# name = var.database - -# bucket = var.target_bucket_name - -# encryption_configuration { -# encryption_option = "SSE_KMS" -# kms_key = aws_kms_key.athena.arn -# } - -# force_destroy = true -# } - resource "aws_athena_workgroup" "athena" { name = "${var.name_prefix}-athena-wg" diff --git a/terraform/account-wide-infrastructure/modules/athena/outputs.tf b/terraform/account-wide-infrastructure/modules/athena/outputs.tf index aa2feb764..40a8c7961 100644 --- a/terraform/account-wide-infrastructure/modules/athena/outputs.tf +++ b/terraform/account-wide-infrastructure/modules/athena/outputs.tf @@ -5,7 +5,3 @@ output "workgroup" { output "bucket" { value = aws_s3_bucket.athena } - -# output "database" { -# value = aws_athena_database.reporting-db -# } diff --git a/terraform/account-wide-infrastructure/modules/glue/iam.tf b/terraform/account-wide-infrastructure/modules/glue/iam.tf index 30fdb1707..267506ea7 100644 --- a/terraform/account-wide-infrastructure/modules/glue/iam.tf +++ b/terraform/account-wide-infrastructure/modules/glue/iam.tf @@ -91,8 +91,3 @@ resource "aws_iam_role_policy_attachment" "glue_service" { role = aws_iam_role.glue_service_role.name policy_arn = aws_iam_policy.glue_service.arn } - -# resource "aws_iam_role_policy_attachment" "glue_service" { -# role = aws_iam_role.glue_service_role.id -# policy_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole" -# } From f30450238bcb8dcf4ddc64353d591e56045df11c Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 13 Jan 2025 12:05:17 +0000 Subject: [PATCH 15/19] NRL-1187 set correct output for athena --- terraform/account-wide-infrastructure/modules/athena/athena.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/terraform/account-wide-infrastructure/modules/athena/athena.tf b/terraform/account-wide-infrastructure/modules/athena/athena.tf index 873ecdf7c..d111611e5 100644 --- a/terraform/account-wide-infrastructure/modules/athena/athena.tf +++ b/terraform/account-wide-infrastructure/modules/athena/athena.tf @@ -6,7 +6,7 @@ resource "aws_athena_workgroup" "athena" { publish_cloudwatch_metrics_enabled = true result_configuration { - output_location = "s3://{aws_s3_bucket.athena.bucket}/output/" + output_location = "s3://${aws_s3_bucket.athena.id}/output/" encryption_configuration { encryption_option = "SSE_KMS" From 6d3fc8076dc60da4147a373808b6cea170c7cbd9 Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 13 Jan 2025 12:15:49 +0000 Subject: [PATCH 16/19] NRL-1187 update reporting infra toggle --- terraform/infrastructure/data.tf | 4 ++-- terraform/infrastructure/firehose.tf | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/terraform/infrastructure/data.tf b/terraform/infrastructure/data.tf index 39c2cce04..e2d2d23d0 100644 --- a/terraform/infrastructure/data.tf +++ b/terraform/infrastructure/data.tf @@ -43,11 +43,11 @@ data "external" "current-info" { } data "aws_s3_bucket" "source-data-bucket" { - count = local.is_dev_env ? 1 : 0 + count = local.is_dev_env && !local.is_sandbox_env ? 1 : 0 bucket = "${local.shared_prefix}-source-data-bucket" } data "aws_kms_key" "glue" { - count = local.is_dev_env ? 1 : 0 + count = local.is_dev_env && !local.is_sandbox_env ? 1 : 0 key_id = "alias/${local.shared_prefix}-glue" } diff --git a/terraform/infrastructure/firehose.tf b/terraform/infrastructure/firehose.tf index 11792c439..db063e6f4 100644 --- a/terraform/infrastructure/firehose.tf +++ b/terraform/infrastructure/firehose.tf @@ -10,5 +10,5 @@ module "firehose__processor" { destination = "splunk" reporting_bucket_arn = local.reporting_bucket_arn reporting_kms_arn = local.reporting_kms_arn - reporting_infra_toggle = local.is_dev_env + reporting_infra_toggle = local.is_dev_env && !local.is_sandbox_env } From 31413b589c53f77b67ffc5eae888f06399e273b9 Mon Sep 17 00:00:00 2001 From: jackleary Date: Wed, 15 Jan 2025 12:14:24 +0000 Subject: [PATCH 17/19] NRL-1187 format data --- .../modules/glue/src/main.py | 4 ++-- .../modules/glue/src/pipeline.py | 2 +- .../modules/glue/src/transformations.py | 10 ++++++++++ 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/main.py b/terraform/account-wide-infrastructure/modules/glue/src/main.py index 3e030accb..4f80fd078 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/main.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/main.py @@ -3,7 +3,7 @@ from awsglue.utils import getResolvedOptions from pipeline import LogPipeline from pyspark.context import SparkContext -from transformations import logSchema, placeholder +from transformations import flatten_df, logSchema # Get arguments from AWS Glue job args = getResolvedOptions(sys.argv, ["job_name", "source_path", "target_path"]) @@ -20,7 +20,7 @@ target_path=args["target_path"], schema=logSchema, partition_cols=partition_cols, - transformations=[placeholder], + transformations=[flatten_df], ) # Run the job diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index a0844fb27..4c38553a9 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -50,6 +50,6 @@ def transform(self, dataframe): def load(self, dataframe): """Load transformed data into Parquet format""" self.logger.info(f"Loading data into {self.target_path} as Parquet") - dataframe.write.mode("overwrite").partitionBy(*self.partition_cols).parquet( + dataframe.write.mode("append").partitionBy(*self.partition_cols).parquet( self.target_path ) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index 4e15d3a7e..9f58d9f74 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -58,5 +58,15 @@ ) +def flatten_df(df): + cols = [] + for c in df.dtypes: + if "struct" in c[1]: + nested_col = c[0] + else: + cols.append(c[0]) + return df.select(*cols, f"{nested_col}.*") + + def placeholder(df): return df From 65f7a9651be355854892ad83206978dc7d1c6e5f Mon Sep 17 00:00:00 2001 From: jackleary Date: Wed, 15 Jan 2025 12:17:42 +0000 Subject: [PATCH 18/19] NRL-1187 read all files --- .../modules/glue/src/pipeline.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index 4c38553a9..e1ba22215 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -38,7 +38,11 @@ def run(self): def extract(self): """Extract JSON data from S3""" self.logger.info(f"Extracting data from {self.source_path} as JSON") - return self.spark.read.schema(self.schema).json(self.source_path) + return ( + self.spark.read.option("recursiveFileLookup", "true") + .schema(self.schema) + .json(self.source_path) + ) def transform(self, dataframe): """Apply a list of transformations on the dataframe""" From 9007dccfa509dcaae4530792eb1104153f295b57 Mon Sep 17 00:00:00 2001 From: jackleary Date: Wed, 15 Jan 2025 12:47:14 +0000 Subject: [PATCH 19/19] NRL-1187 deal with timestamps --- .../account-wide-infrastructure/modules/glue/src/main.py | 4 ++-- .../modules/glue/src/transformations.py | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/main.py b/terraform/account-wide-infrastructure/modules/glue/src/main.py index 4f80fd078..416cef5ef 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/main.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/main.py @@ -3,7 +3,7 @@ from awsglue.utils import getResolvedOptions from pipeline import LogPipeline from pyspark.context import SparkContext -from transformations import flatten_df, logSchema +from transformations import dtype_conversion, flatten_df, logSchema # Get arguments from AWS Glue job args = getResolvedOptions(sys.argv, ["job_name", "source_path", "target_path"]) @@ -20,7 +20,7 @@ target_path=args["target_path"], schema=logSchema, partition_cols=partition_cols, - transformations=[flatten_df], + transformations=[flatten_df, dtype_conversion], ) # Run the job diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index 9f58d9f74..64bb4abe6 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -1,3 +1,4 @@ +from pyspark.sql.functions import to_timestamp from pyspark.sql.types import ( BooleanType, StringType, @@ -19,7 +20,7 @@ StructField("level", StringType(), True), StructField("location", StringType(), True), StructField("message", StringType(), True), - StructField("timestamp", TimestampType(), True), + StructField("timestamp", StringType(), True), StructField("service", StringType(), True), StructField("cold_start", BooleanType(), True), StructField("function_name", StringType(), True), @@ -68,5 +69,8 @@ def flatten_df(df): return df.select(*cols, f"{nested_col}.*") -def placeholder(df): +def dtype_conversion(df): + df = df.withColumn( + "timestamp", to_timestamp(df["timestamp"], "yyyy-MM-dd HH:mm:ss,SSSXXX") + ) return df