Skip to content

Commit 3d82d70

Browse files
committed
refactor(extraction): refactor extraction logic for hazards and impacts.
- Nested extraction with correlation id as fliter. - Self retry with exponential backoff. - Different models for Hazards and Impacts.
1 parent 53de547 commit 3d82d70

File tree

8 files changed

+432
-122
lines changed

8 files changed

+432
-122
lines changed

alert_system/admin.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from django.contrib import admin
22

3-
from .models import Connector, EligibleEventMonty
3+
from .models import Connector, StacItems
44

55

66
@admin.register(Connector)
@@ -9,9 +9,9 @@ class ConnectorAdmin(admin.ModelAdmin):
99
readonly_fields = ("last_success_run",)
1010

1111

12-
@admin.register(EligibleEventMonty)
13-
class MontyAdmin(admin.ModelAdmin):
14-
list_display = ("event_id", "created_at")
12+
@admin.register(StacItems)
13+
class EventAdmin(admin.ModelAdmin):
14+
list_display = ("stac_id", "created_at", "collection")
1515
list_filter = ("connector",)
1616
readonly_fields = ("connector",)
17-
search_fields = ("event_id",)
17+
search_fields = ("stac_id",)

alert_system/extraction.py

Lines changed: 216 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,226 @@
1-
import datetime
21
import logging
2+
from datetime import datetime, timedelta, timezone
3+
from typing import Dict, Generator, Optional
34

45
import httpx
5-
from django.utils import timezone
6+
from django.db import transaction
67

7-
from .models import Connector, EligibleEventMonty
8+
from .models import Connector, StacItems
89

910
logger = logging.getLogger(__name__)
1011

1112

12-
def fetch_stac_data(url, params):
13-
current_payload = params.copy()
14-
current_url = url
15-
16-
while current_url:
17-
response = httpx.get(current_url, params=current_payload, timeout=30)
18-
response.raise_for_status()
19-
data = response.json()
20-
21-
features = data.get("features", [])
22-
yield from features
23-
24-
# Find the next page link
25-
next_link = None
26-
for link in data.get("links", []):
27-
if link.get("rel") == "next":
28-
next_link = link.get("href")
29-
break
30-
current_url = next_link
31-
current_payload = None
32-
33-
34-
def process_connector(connector):
35-
logger.info(f"Running ETL for {connector.type}")
36-
connector.status = Connector.Status.RUNNING
37-
connector.save(update_fields=["status"])
38-
39-
filters = dict(connector.filters or {})
40-
start_time = (
41-
connector.last_success_run.isoformat()
42-
if connector.last_success_run
43-
else (timezone.now() - datetime.timedelta(days=30)).isoformat()
44-
) # TODO: Assign start_time instead of timedelta?
45-
end_time = timezone.now().isoformat()
46-
filters["datetime"] = f"{start_time}/{end_time}"
47-
logger.info(f"Fetching data from {start_time} to {end_time}")
48-
count = 0
49-
try:
50-
result = fetch_stac_data(connector.source_url, filters)
51-
for feature in result:
52-
count += 1
13+
class BaseExtractionClass:
14+
"""Base class for extracting STAC data from various disaster monitoring sources."""
15+
16+
ENDPOINT_MAP = {
17+
Connector.ConnectorType.GDACS_CYCLONE: {
18+
"event_endpoint": "/gdacs-events/items",
19+
"hazard_endpoint": "/gdacs-hazards/items",
20+
"impact_endpoint": "/gdacs-impacts/items",
21+
},
22+
Connector.ConnectorType.GDACS_FLOOD: {
23+
"event_endpoint": "/gdacs-events/items",
24+
"hazard_endpoint": "/gdacs-hazards/items",
25+
"impact_endpoint": "/gdacs-impacts/items",
26+
},
27+
Connector.ConnectorType.USGS_EARTHQUAKE: {
28+
"event_endpoint": "/usgs-events/items",
29+
"hazard_endpoint": "/usgs-hazards/items",
30+
"impact_endpoint": "/usgs-impacts/items",
31+
},
32+
}
33+
34+
def __init__(self, connector: Connector):
35+
self.connector = connector
36+
self.base_url = connector.source_url.rstrip("/")
37+
self.endpoints = self.build_endpoints(self.connector)
38+
39+
def build_endpoints(self, connector) -> Dict:
40+
return self.ENDPOINT_MAP[connector.type]
41+
42+
def fetch_stac_data(self, url: str, filters: Optional[Dict] = None) -> Generator[Dict, None, None]:
43+
"""
44+
Fetch STAC data with pagination support.
45+
46+
"""
47+
current_url = url
48+
current_payload = filters.copy() if filters else None
49+
50+
while current_url:
51+
response = httpx.get(current_url, params=current_payload, timeout=30)
52+
response.raise_for_status()
53+
data = response.json()
54+
55+
yield from data.get("features", [])
56+
57+
# Find next page link
58+
current_url = next((link["href"] for link in data.get("links", []) if link.get("rel") == "next"), None)
59+
current_payload = None # Only use params on first request
60+
61+
def _get_correlation_id(self, feature: Dict) -> str:
62+
"""Extract correlation ID from feature properties."""
63+
return feature.get("properties", {}).get("monty:corr_id")
64+
65+
def _build_base_defaults(self, feature: Dict) -> Dict:
66+
"""Build common default fields for all STAC items."""
67+
return {
68+
"collection": feature.get("collection"),
69+
"correlation_id": self._get_correlation_id(feature),
70+
"resp_data": feature,
71+
"connector": self.connector,
72+
}
73+
74+
def build_impact_defaults(self, feature: Dict) -> Dict:
75+
"""Build default values for ImpactItems creation/update."""
76+
defaults = self._build_base_defaults(feature)
77+
78+
impact = feature.get("properties", {}).get("monty:impact_detail", {})
79+
defaults.update(
80+
{
81+
"category": impact.get("category"),
82+
"type": impact.get("type"),
83+
"value": impact.get("value"),
84+
}
85+
)
86+
87+
return defaults
88+
89+
def build_hazard_defaults(self, feature: Dict) -> Dict:
90+
"""Build default values for HazardItems creation/update."""
91+
defaults = self._build_base_defaults(feature)
92+
93+
detail = feature.get("properties", {}).get("monty:hazard_detail", {})
94+
defaults.update(
95+
{
96+
"cluster": detail.get("cluster"),
97+
"estimate_type": detail.get("estimate_type"),
98+
"severity_unit": detail.get("severity_unit"),
99+
"severity_label": detail.get("severity_label"),
100+
"severity_value": detail.get("severity_value"),
101+
}
102+
)
103+
104+
return defaults
105+
106+
def get_datetime_filter(self) -> str:
107+
"""
108+
Generate datetime filter string for STAC queries.
109+
110+
Returns:
111+
ISO 8601 datetime range string
112+
"""
113+
now = datetime.now(timezone.utc)
114+
last_run = self.connector.last_success_run
115+
116+
start_time = last_run if last_run else (now - timedelta(days=15))
117+
return f"{start_time.isoformat()}/{now.isoformat()}"
118+
119+
def _build_filter(self, base_filter: Optional[Dict], correlation_id: str) -> Dict:
120+
"""Build filter dict with correlation ID."""
121+
filters = base_filter.copy() if base_filter else {}
122+
filters["filter"] = f"monty:corr_id = '{correlation_id}'"
123+
return filters
124+
125+
def _fetch_items(self, endpoint_key: str, filter_attr: str, correlation_id: str) -> Generator[Dict, None, None]:
126+
"""
127+
Generic method to fetch items with correlation ID filtering.
128+
129+
"""
130+
url = f"{self.base_url}{self.endpoints[endpoint_key]}"
131+
base_filter = getattr(self.connector, filter_attr, None)
132+
filters = self._build_filter(base_filter, correlation_id)
133+
134+
return self.fetch_stac_data(url, filters)
135+
136+
def _save_stac_item(self, stac_id: str, defaults: Dict, item_type: str) -> Optional[StacItems]:
137+
"""
138+
Generic method to save or update STAC items.
139+
140+
"""
141+
try:
142+
obj, created = StacItems.objects.update_or_create(stac_id=stac_id, defaults=defaults)
143+
action = "Created" if created else "Updated"
144+
logger.info(f"{action} {item_type} {stac_id}")
145+
return obj
146+
except Exception as e:
147+
logger.error(f"Failed to save {item_type} {stac_id}: {e}", exc_info=True)
148+
return None
149+
150+
def process_impact_items(self, stac_obj: StacItems) -> None:
151+
"""Process impact items related to a STAC event object."""
152+
try:
153+
impact_features = self._fetch_items("impact_endpoint", "filter_impact", stac_obj.correlation_id)
154+
except Exception as e:
155+
logger.error(f"Failed to fetch impacts for event {stac_obj.stac_id}: {e}")
156+
return
157+
158+
for feature in impact_features:
159+
impact_id = feature.get("id", None)
160+
if not impact_id:
161+
logger.error(f"Impact feature missing 'id': {feature}")
162+
continue
163+
164+
defaults = self.build_impact_defaults(feature)
165+
self._save_stac_item(impact_id, defaults, "impact")
166+
167+
def process_hazard_items(self, stac_obj: StacItems) -> None:
168+
"""Process hazard items related to a STAC event object."""
169+
try:
170+
hazard_features = self._fetch_items("hazard_endpoint", "filter_hazard", stac_obj.correlation_id)
171+
except Exception as e:
172+
logger.error(f"Failed to fetch hazards for event {stac_obj.stac_id}: {e}")
173+
raise
174+
175+
hazard_feature = next(hazard_features, None)
176+
if not hazard_feature:
177+
logger.info("No hazard features found — skipping hazard processing.")
178+
return
179+
180+
hazard_id = hazard_feature.get("id", None)
181+
if not hazard_id:
182+
logger.error(f"No hazard id found for {hazard_feature}")
183+
return
184+
185+
defaults = self.build_hazard_defaults(hazard_feature)
186+
self._save_stac_item(hazard_id, defaults, "hazard")
187+
188+
def process_event_items(self) -> None:
189+
"""Process all event items from the connector source."""
190+
event_url = f"{self.base_url}{self.endpoints['event_endpoint']}"
191+
event_filter = (self.connector.filter_event or {}).copy()
192+
event_filter["datetime"] = self.get_datetime_filter()
193+
194+
try:
195+
event_items = self.fetch_stac_data(event_url, event_filter)
196+
except Exception as e:
197+
logger.error(f"Failed to fetch events: {e}")
198+
raise
199+
200+
for feature in event_items:
201+
event_id = feature.get("id", None)
202+
if not event_id:
203+
logger.error(f"No event id found for {feature}")
204+
continue
205+
defaults = self._build_base_defaults(feature=feature)
206+
53207
try:
54-
EligibleEventMonty.objects.update_or_create(
55-
event_id=feature.get("id"),
56-
connector=connector,
57-
defaults={
58-
"resp_data": feature,
59-
"metadata": {
60-
"retrieved_at": timezone.now().isoformat(),
61-
"source_url": connector.source_url,
62-
},
63-
},
64-
)
208+
with transaction.atomic():
209+
event_obj = self._save_stac_item(event_id, defaults, "event")
210+
if event_obj:
211+
self.process_hazard_items(event_obj)
212+
self.process_impact_items(event_obj)
65213
except Exception as e:
66-
logger.warning(f"Failed to save event {feature.get('id')}: {e}")
67-
connector.status = Connector.Status.SUCCESS
68-
connector.last_success_run = timezone.now()
69-
connector.save(update_fields=["status", "last_success_run"])
70-
logger.info(f"{count} features processed for {connector.type}")
71-
except Exception as e:
72-
connector.status = Connector.Status.FAILURE
73-
connector.save(update_fields=["status"])
74-
logger.exception(f"ETL failed for {connector.type}: {e}")
214+
logger.error(f"Failed to process event {event_id}: {e}", exc_info=True)
215+
raise
216+
217+
def run(self) -> None:
218+
"""Main entry point for running the connector."""
219+
logger.info(f"Starting connector run for {self.connector}")
220+
221+
try:
222+
self.process_event_items()
223+
logger.info("Connector run completed successfully")
224+
except Exception as e:
225+
logger.error(f"Connector run failed: {e}", exc_info=True)
226+
raise

alert_system/management/commands/poll_gdacs_cy.py renamed to alert_system/management/commands/poll_gdacs_cyclone.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
class Command(BasePollingCommand):
88
help = "Poll data for gdacs cyclone"
9-
SOURCE_TYPE = "GDACS_CYCLONE"
9+
SOURCE_TYPE = 200
1010

1111
@monitor(monitor_slug=SentryMonitor.POLL_GDACS_CY)
1212
def handle(self, *args, **options):

alert_system/management/commands/poll_gdacs_fl.py renamed to alert_system/management/commands/poll_gdacs_flood.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
class Command(BasePollingCommand):
99
help = "Poll data for gdacs flood"
10-
SOURCE_TYPE = "GDACS_FLOOD"
10+
SOURCE_TYPE = 100
1111

1212
@monitor(monitor_slug=SentryMonitor.POLL_GDACS_FL)
1313
def handle(self, *args, **options):

alert_system/management/commands/poll_usgs_eq.py renamed to alert_system/management/commands/poll_usgs_earthquake.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
class Command(BasePollingCommand):
99
help = "Poll data for usgs eartquake"
10-
SOURCE_TYPE = "USGS_EARTHQUAKE"
10+
SOURCE_TYPE = 300
1111

1212
@monitor(monitor_slug=SentryMonitor.POLL_USGS_EQ)
1313
def handle(self, *args, **options):

0 commit comments

Comments
 (0)