Skip to content

Commit 3fa8ab2

Browse files
committed
fixup! feat(etl): refactor existing extraction into ETL.
1 parent 8a66125 commit 3fa8ab2

File tree

7 files changed

+73
-59
lines changed

7 files changed

+73
-59
lines changed

alert_system/etl/Gdacs_flood/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ def __init__(self):
66
self.impact_endpoint = "/gdacs-impacts/items"
77
self.people_exposed_threshold = 5
88

9+
910
gdacs_flood_config = GdacsFloodConfig()

alert_system/etl/Gdacs_flood/extraction.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
1+
import logging
2+
13
from alert_system.etl.base.extraction import BaseExtractionClass
2-
from alert_system.etl.base.transform import BaseTransformerClass
34
from alert_system.etl.base.loader import BaseLoaderClass
4-
import logging
5-
from .transform import GdacsTransformer
6-
from .loader import GdacsLoader
5+
from alert_system.etl.base.transform import BaseTransformerClass
6+
77
from .config import gdacs_flood_config
8+
from .loader import GdacsLoader
9+
from .transform import GdacsTransformer
810

911
logger = logging.getLogger(__name__)
1012

13+
1114
class GdacsFloodExtraction(BaseExtractionClass):
1215
event_endpoint = gdacs_flood_config.event_endpoint
1316
hazard_endpoint = getattr(gdacs_flood_config, "hazard_endpoint", None)
1417
impact_endpoint = getattr(gdacs_flood_config, "impact_endpoint", None)
15-
18+
1619
def get_transformer_class(self) -> type[BaseTransformerClass]:
1720
return GdacsTransformer
18-
21+
1922
def get_loader_class(self) -> type[BaseLoaderClass]:
2023
return GdacsLoader
21-

alert_system/etl/Gdacs_flood/loader.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from alert_system.etl.base.loader import BaseLoaderClass
2+
23
from .config import gdacs_flood_config
34

5+
46
class GdacsLoader(BaseLoaderClass):
57
people_exposed_threshold = gdacs_flood_config.people_exposed_threshold
68

alert_system/etl/Gdacs_flood/transform.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
from typing import Dict, List, Tuple, Any
2-
from alert_system.etl.base.transform import BaseTransformerClass
31
import logging
2+
from typing import Dict, Tuple
3+
4+
from alert_system.etl.base.transform import BaseTransformerClass
45

56
logger = logging.getLogger(__name__)
67

@@ -21,23 +22,29 @@ class GdacsTransformer(BaseTransformerClass):
2122
("people", "highest_risk"): "people.highest_risk",
2223
("buildings", "destroyed"): "buildings.destroyed",
2324
}
24-
25+
2526
# NOTE: This logic might change in future
2627
def compute_people_exposed(self, impacts: dict) -> int:
27-
value = next((impacts.get(key) for key in ["people.affected_total", "people.potentially_affected", "people.affected_direct"] if impacts.get(key)), 0)
28+
value = next(
29+
(
30+
impacts.get(key)
31+
for key in ["people.affected_total", "people.potentially_affected", "people.affected_direct"]
32+
if impacts.get(key)
33+
),
34+
0,
35+
)
2836
if not isinstance(value, int):
2937
logger.warning(f"people_exposed value is not int: {value}")
3038
return 0
3139
return value
3240

33-
3441
# NOTE: This logic might change in future
3542
def compute_buildings_exposed(self, impacts: dict) -> int:
3643
"""
3744
Compute the 'buildings_exposed' field.
3845
"""
3946
return impacts.get("buildings.destroyed") or 0
40-
47+
4148
def process_impact(self, impact_items) -> BaseTransformerClass.ImpactType:
4249
raw_impacts, metadata = {}, {}
4350
for item in impact_items:
@@ -56,7 +63,7 @@ def process_impact(self, impact_items) -> BaseTransformerClass.ImpactType:
5663
"buildings_exposed": self.compute_buildings_exposed(raw_impacts),
5764
"impact_metadata": metadata,
5865
}
59-
66+
6067
def process_hazard(self, hazard_item) -> BaseTransformerClass.HazardType:
6168
if not hazard_item:
6269
return {
@@ -73,9 +80,9 @@ def process_hazard(self, hazard_item) -> BaseTransformerClass.HazardType:
7380
"severity_label": detail.get("severity_label", ""),
7481
"severity_value": detail.get("severity_value", 0),
7582
}
76-
83+
7784
def process_event(self, event_item) -> BaseTransformerClass.EventType:
78-
properties = event_item.resp_data.get("properties",{})
85+
properties = event_item.resp_data.get("properties", {})
7986
return {
8087
"title": properties.get("title", ""),
8188
"description": properties.get("description", ""),

alert_system/etl/base/extraction.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
1-
import logging
2-
from datetime import datetime, timedelta, timezone
3-
from typing import Dict, Generator, Optional, List
41
import abc
2+
import logging
53
from abc import ABC
4+
from datetime import datetime, timedelta, timezone
5+
from typing import Dict, Generator, List, Optional
6+
67
import httpx
78
from django.db import transaction
9+
810
from alert_system.models import Connector, ExtractionItem
9-
from .transform import BaseTransformerClass
11+
1012
from .loader import BaseLoaderClass
13+
from .transform import BaseTransformerClass
14+
1115
logger = logging.getLogger(__name__)
1216

1317

@@ -49,7 +53,7 @@ def fetch_stac_data(self, url: str, filters: Optional[Dict] = None) -> Generator
4953
@abc.abstractmethod
5054
def get_transformer_class(self) -> type[BaseTransformerClass]:
5155
raise NotImplementedError()
52-
56+
5357
@abc.abstractmethod
5458
def get_loader_class(self) -> type[BaseLoaderClass]:
5559
raise NotImplementedError()
@@ -134,7 +138,6 @@ def _extract_impact_items(self, stac_obj: ExtractionItem) -> List[ExtractionItem
134138
if impact_object:
135139
impact_objects.append(impact_object)
136140
return impact_objects
137-
138141

139142
def _extract_hazard_items(self, stac_obj: ExtractionItem) -> ExtractionItem | None:
140143
"""Process hazard items related to a STAC event object."""
@@ -196,14 +199,14 @@ def process_event_items(self) -> None:
196199
impact_obj = self._extract_impact_items(event_obj)
197200

198201
transformer = transformer_class(
199-
event_obj = event_obj,
200-
hazard_obj = hazard_obj,
201-
impact_obj = impact_obj,
202+
event_obj=event_obj,
203+
hazard_obj=hazard_obj,
204+
impact_obj=impact_obj,
202205
)
203206
transformed_data = transformer.transform_stac_item()
204-
207+
205208
loader.load(transformed_data, self.connector)
206-
209+
207210
logger.info(f"Successfully processed event {event_id}")
208211

209212
except Exception as e:
@@ -219,4 +222,4 @@ def run(self) -> None:
219222
logger.info("Connector run completed successfully")
220223
except Exception as e:
221224
logger.error(f"Connector run failed: {e}", exc_info=True)
222-
raise
225+
raise

alert_system/etl/base/loader.py

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
from django.db import transaction
21
import logging
2+
from abc import ABC, abstractmethod
33
from typing import Dict
4+
45
from alert_system.models import Connector, LoadItems
5-
from abc import ABC, abstractmethod
66

77
logger = logging.getLogger(__name__)
88

@@ -13,40 +13,39 @@ class BaseLoaderClass(ABC):
1313
@abstractmethod
1414
def filter_eligible_items(self, load_obj):
1515
raise NotImplementedError()
16-
16+
1717
def load(self, transformed_data: Dict, connector: Connector) -> LoadItems:
1818
"""
1919
Save aggregated event.
20-
20+
2121
Args:
2222
transformed_data: Output from transformer.transform()
2323
connector: The connector this data came from
24-
24+
2525
Returns:
2626
Created DisasterEvent object
2727
"""
28-
correlation_id = transformed_data['correlation_id']
28+
correlation_id = transformed_data["correlation_id"]
2929
is_item_eligible = self.filter_eligible_items(transformed_data)
30-
30+
3131
load_obj, created = LoadItems.objects.update_or_create(
3232
correlation_id=correlation_id,
3333
defaults={
34-
'connector': connector,
35-
#'event_type': transformed_data.get('event_type'),
36-
'event_title': transformed_data.get('title'),
37-
'event_description': transformed_data.get('description'),
38-
'country': transformed_data.get('country'),
39-
'severity_value': transformed_data.get('severity_value'),
40-
'severity_label': transformed_data.get('severity_label'),
41-
'severity_unit': transformed_data.get('severity_unit'),
42-
'total_people_exposed': transformed_data.get('people_exposed'),
43-
'total_buildings_exposed': transformed_data.get('buildings_exposed'),
44-
'impact_metadata': transformed_data.get('impact_metadata'),
45-
'item_eligible': is_item_eligible,
46-
}
34+
"connector": connector,
35+
"event_title": transformed_data.get("title"),
36+
"event_description": transformed_data.get("description"),
37+
"country": transformed_data.get("country"),
38+
"severity_value": transformed_data.get("severity_value"),
39+
"severity_label": transformed_data.get("severity_label"),
40+
"severity_unit": transformed_data.get("severity_unit"),
41+
"total_people_exposed": transformed_data.get("people_exposed"),
42+
"total_buildings_exposed": transformed_data.get("buildings_exposed"),
43+
"impact_metadata": transformed_data.get("impact_metadata"),
44+
"item_eligible": is_item_eligible,
45+
},
4746
)
48-
47+
4948
action = "Created" if created else "Updated"
5049
logger.info(f"{action} Event for correlation_id={correlation_id}")
51-
50+
5251
return load_obj

alert_system/etl/base/transform.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
from abc import abstractmethod, ABC
21
import logging
3-
from typing import TypedDict, Optional, List
2+
from abc import ABC, abstractmethod
3+
from typing import List, Optional, TypedDict
4+
45
from alert_system.models import ExtractionItem
56

67
logger = logging.getLogger(__name__)
78

9+
810
class BaseTransformerClass(ABC):
911

1012
class ImpactType(TypedDict):
@@ -22,7 +24,9 @@ class EventType(TypedDict):
2224
description: str
2325
country: str
2426

25-
def __init__(self, event_obj: ExtractionItem, hazard_obj: Optional[ExtractionItem] = None, impact_obj: List[ExtractionItem] = []):
27+
def __init__(
28+
self, event_obj: ExtractionItem, hazard_obj: Optional[ExtractionItem] = None, impact_obj: List[ExtractionItem] = []
29+
):
2630
self.event_obj = event_obj
2731
self.hazard_obj = hazard_obj
2832
self.impact_obj = impact_obj
@@ -44,7 +48,7 @@ def transform_stac_item(self):
4448
"""
4549
Transform STAC items for a given extraction object.
4650
47-
Fetches event, hazard and impact items separately, processes them,
51+
Fetches event, hazard and impact items separately, processes them,
4852
and returns processed data if available.
4953
"""
5054
logger.info(f"Starting transformer for correlation_id={self.correlation_id}")
@@ -55,12 +59,8 @@ def transform_stac_item(self):
5559
impact_result = self.process_impact(self.impact_obj)
5660

5761
return {
58-
'correlation_id': self.correlation_id,
62+
"correlation_id": self.correlation_id,
5963
**event_result,
6064
**hazard_result,
6165
**impact_result,
6266
}
63-
64-
65-
66-

0 commit comments

Comments
 (0)