From bc2bcd5c3a7b549c23911337a8431e2535e7030b Mon Sep 17 00:00:00 2001 From: Levi Szamek Date: Mon, 27 Jan 2025 15:48:51 +0100 Subject: [PATCH 1/6] feat: use ProcessPoolExecutor for download_and_process_tile --- .../utils/process_mapillary.py | 144 +++++++----------- 1 file changed, 59 insertions(+), 85 deletions(-) diff --git a/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py b/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py index 1faf0b23b..ffec6de8d 100644 --- a/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py +++ b/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py @@ -1,18 +1,11 @@ import os -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ProcessPoolExecutor +from functools import partial import mercantile import pandas as pd import requests -from shapely import ( - LineString, - MultiLineString, - MultiPolygon, - Point, - Polygon, - box, - unary_union, -) +from shapely import MultiPolygon, Point, Polygon, box, unary_union from shapely.geometry import shape from vt2geojson import tools as vt2geojson_tools @@ -44,7 +37,7 @@ def create_tiles(polygon, level): return tiles -def download_and_process_tile(row, attempt_limit=3): +def download_and_process_tile(row, polygon, kwargs, attempt_limit=3): z = row["z"] x = row["x"] y = row["y"] @@ -59,31 +52,37 @@ def download_and_process_tile(row, attempt_limit=3): "features", [] ) data = [] - for feature in features: - geometry = feature.get("geometry", {}) - properties = feature.get("properties", {}) - geometry_type = geometry.get("type", None) - coordinates = geometry.get("coordinates", []) - - element_geometry = None - if geometry_type == "Point": - element_geometry = Point(coordinates) - elif geometry_type == "LineString": - element_geometry = LineString(coordinates) - elif geometry_type == "MultiLineString": - element_geometry = MultiLineString(coordinates) - elif geometry_type == "Polygon": - element_geometry = Polygon(coordinates[0]) - elif geometry_type == "MultiPolygon": - element_geometry = MultiPolygon(coordinates) - - # Append the dictionary with geometry and properties - row = {"geometry": element_geometry, **properties} - data.append(row) + data.extend( + [ + { + "geometry": Point(feature["geometry"]["coordinates"]), + **feature.get("properties", {}), + } + for feature in features + if feature.get("geometry", {}).get("type") == "Point" + ] + ) data = pd.DataFrame(data) - if not data.empty: + if data.isna().all().all() is False or data.empty is False: + data = data[data["geometry"].apply(lambda point: point.within(polygon))] + target_columns = [ + "id", + "geometry", + "captured_at", + "is_pano", + "compass_angle", + "sequence", + "organization_id", + ] + for col in target_columns: + if col not in data.columns: + data[col] = None + + if data.isna().all().all() is False or data.empty is False: + data = filter_results(data, **kwargs) + return data except Exception as e: print(f"An exception occurred while requesting a tile: {e}") @@ -94,7 +93,7 @@ def download_and_process_tile(row, attempt_limit=3): def coordinate_download( - polygon, level, use_concurrency=True, attempt_limit=3, workers=os.cpu_count() * 4 + polygon, level, kwargs: dict, use_concurrency=True, workers=os.cpu_count() * 4 ): tiles = create_tiles(polygon, level) @@ -104,45 +103,22 @@ def coordinate_download( if not use_concurrency: workers = 1 - futures = [] - with ThreadPoolExecutor(max_workers=workers) as executor: - for index, row in tiles.iterrows(): - futures.append( - executor.submit(download_and_process_tile, row, attempt_limit) - ) - - for future in as_completed(futures): - if future is not None: - df = future.result() + process_tile_with_args = partial( + download_and_process_tile, polygon=polygon, kwargs=kwargs + ) + with ProcessPoolExecutor(max_workers=workers) as executor: + futures = list( + executor.map(process_tile_with_args, tiles.to_dict(orient="records")) + ) - if df is not None and not df.empty: - downloaded_metadata.append(df) + for df in futures: + if df is not None and not df.empty: + downloaded_metadata.append(df) if len(downloaded_metadata): downloaded_metadata = pd.concat(downloaded_metadata, ignore_index=True) else: return pd.DataFrame(downloaded_metadata) - target_columns = [ - "id", - "geometry", - "captured_at", - "is_pano", - "compass_angle", - "sequence", - "organization_id", - ] - for col in target_columns: - if col not in downloaded_metadata.columns: - downloaded_metadata[col] = None - if ( - downloaded_metadata.isna().all().all() is False - or downloaded_metadata.empty is False - ): - downloaded_metadata = downloaded_metadata[ - downloaded_metadata["geometry"].apply( - lambda point: point.within(polygon) - ) - ] return downloaded_metadata @@ -198,13 +174,12 @@ def filter_results( ) return None df = df[df["creator_id"] == creator_id] - if is_pano is not None: if df["is_pano"].isna().all(): + print(df) logger.exception("No Mapillary Feature in the AoI has a 'is_pano' value.") return None df = df[df["is_pano"] == is_pano] - if organization_id is not None: if df["organization_id"].isna().all(): logger.exception( @@ -212,7 +187,6 @@ def filter_results( ) return None df = df[df["organization_id"] == organization_id] - if start_time is not None: if df["captured_at"].isna().all(): logger.exception( @@ -220,14 +194,12 @@ def filter_results( ) return None df = filter_by_timerange(df, start_time, end_time) - return df def get_image_metadata( aoi_geojson, level=14, - attempt_limit=3, is_pano: bool = None, creator_id: int = None, organization_id: str = None, @@ -235,9 +207,15 @@ def get_image_metadata( end_time: str = None, sampling_threshold=None, ): + kwargs = { + "is_pano": is_pano, + "creator_id": creator_id, + "organization_id": organization_id, + "start_time": start_time, + "end_time": end_time, + } aoi_polygon = geojson_to_polygon(aoi_geojson) - downloaded_metadata = coordinate_download(aoi_polygon, level, attempt_limit) - + downloaded_metadata = coordinate_download(aoi_polygon, level, kwargs) if downloaded_metadata.empty or downloaded_metadata.isna().all().all(): raise ValueError("No Mapillary Features in the AoI.") @@ -245,27 +223,23 @@ def get_image_metadata( downloaded_metadata["geometry"].apply(lambda geom: isinstance(geom, Point)) ] - filtered_metadata = filter_results( - downloaded_metadata, creator_id, is_pano, organization_id, start_time, end_time - ) - if ( - filtered_metadata is None - or filtered_metadata.empty - or filtered_metadata.isna().all().all() + downloaded_metadata is None + or downloaded_metadata.empty + or downloaded_metadata.isna().all().all() ): raise ValueError("No Mapillary Features in the AoI match the filter criteria.") if sampling_threshold is not None: - filtered_metadata = spatial_sampling(filtered_metadata, sampling_threshold) + downloaded_metadata = spatial_sampling(downloaded_metadata, sampling_threshold) - total_images = len(filtered_metadata) + total_images = len(downloaded_metadata) if total_images > 100000: raise ValueError( f"Too many Images with selected filter options for the AoI: {total_images}" ) return { - "ids": filtered_metadata["id"].tolist(), - "geometries": filtered_metadata["geometry"].tolist(), + "ids": downloaded_metadata["id"].tolist(), + "geometries": downloaded_metadata["geometry"].tolist(), } From d4dd6a0655dec987e98877c39a5099683ba8b8e2 Mon Sep 17 00:00:00 2001 From: Levi Szamek Date: Tue, 28 Jan 2025 13:36:17 +0100 Subject: [PATCH 2/6] refactor: extract functions for improved testing --- .../utils/process_mapillary.py | 74 ++++++++++--------- 1 file changed, 39 insertions(+), 35 deletions(-) diff --git a/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py b/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py index ffec6de8d..a8f7f62f6 100644 --- a/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py +++ b/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py @@ -46,25 +46,7 @@ def download_and_process_tile(row, polygon, kwargs, attempt_limit=3): attempt = 0 while attempt < attempt_limit: try: - r = requests.get(url) - assert r.status_code == 200, r.content - features = vt2geojson_tools.vt_bytes_to_geojson(r.content, x, y, z).get( - "features", [] - ) - data = [] - data.extend( - [ - { - "geometry": Point(feature["geometry"]["coordinates"]), - **feature.get("properties", {}), - } - for feature in features - if feature.get("geometry", {}).get("type") == "Point" - ] - ) - - data = pd.DataFrame(data) - + data = get_mapillary_data(url, x, y, z) if data.isna().all().all() is False or data.empty is False: data = data[data["geometry"].apply(lambda point: point.within(polygon))] target_columns = [ @@ -79,7 +61,6 @@ def download_and_process_tile(row, polygon, kwargs, attempt_limit=3): for col in target_columns: if col not in data.columns: data[col] = None - if data.isna().all().all() is False or data.empty is False: data = filter_results(data, **kwargs) @@ -92,6 +73,26 @@ def download_and_process_tile(row, polygon, kwargs, attempt_limit=3): return None +def get_mapillary_data(url, x, y, z): + r = requests.get(url) + assert r.status_code == 200, r.content + features = vt2geojson_tools.vt_bytes_to_geojson(r.content, x, y, z).get( + "features", [] + ) + data = [] + data.extend( + [ + { + "geometry": Point(feature["geometry"]["coordinates"]), + **feature.get("properties", {}), + } + for feature in features + if feature.get("geometry", {}).get("type") == "Point" + ] + ) + return pd.DataFrame(data) + + def coordinate_download( polygon, level, kwargs: dict, use_concurrency=True, workers=os.cpu_count() * 4 ): @@ -103,18 +104,11 @@ def coordinate_download( if not use_concurrency: workers = 1 - process_tile_with_args = partial( - download_and_process_tile, polygon=polygon, kwargs=kwargs + downloaded_metadata = parallelized_processing( + downloaded_metadata, kwargs, polygon, tiles, workers ) - with ProcessPoolExecutor(max_workers=workers) as executor: - futures = list( - executor.map(process_tile_with_args, tiles.to_dict(orient="records")) - ) - - for df in futures: - if df is not None and not df.empty: - downloaded_metadata.append(df) if len(downloaded_metadata): + breakpoint() downloaded_metadata = pd.concat(downloaded_metadata, ignore_index=True) else: return pd.DataFrame(downloaded_metadata) @@ -122,6 +116,21 @@ def coordinate_download( return downloaded_metadata +def parallelized_processing(data, kwargs, polygon, tiles, workers): + process_tile_with_args = partial( + download_and_process_tile, polygon=polygon, kwargs=kwargs + ) + with ProcessPoolExecutor(max_workers=workers) as executor: + futures = list( + executor.map(process_tile_with_args, tiles.to_dict(orient="records")) + ) + + for df in futures: + if df is not None and not df.empty: + data.append(df) + return data + + def geojson_to_polygon(geojson_data): if geojson_data["type"] == "FeatureCollection": features = geojson_data["features"] @@ -176,7 +185,6 @@ def filter_results( df = df[df["creator_id"] == creator_id] if is_pano is not None: if df["is_pano"].isna().all(): - print(df) logger.exception("No Mapillary Feature in the AoI has a 'is_pano' value.") return None df = df[df["is_pano"] == is_pano] @@ -219,10 +227,6 @@ def get_image_metadata( if downloaded_metadata.empty or downloaded_metadata.isna().all().all(): raise ValueError("No Mapillary Features in the AoI.") - downloaded_metadata = downloaded_metadata[ - downloaded_metadata["geometry"].apply(lambda geom: isinstance(geom, Point)) - ] - if ( downloaded_metadata is None or downloaded_metadata.empty From 87d65cb895a3551dfa7cd66807d8c561f62bfca3 Mon Sep 17 00:00:00 2001 From: Levi Szamek Date: Tue, 28 Jan 2025 13:37:47 +0100 Subject: [PATCH 3/6] fix: adapt tests to new multiprocessing --- .../tests/unittests/test_process_mapillary.py | 62 +++++-------------- 1 file changed, 16 insertions(+), 46 deletions(-) diff --git a/mapswipe_workers/tests/unittests/test_process_mapillary.py b/mapswipe_workers/tests/unittests/test_process_mapillary.py index 32c1bad46..9fd1b4d1c 100644 --- a/mapswipe_workers/tests/unittests/test_process_mapillary.py +++ b/mapswipe_workers/tests/unittests/test_process_mapillary.py @@ -53,6 +53,7 @@ def setUp(self): ) self.empty_polygon = Polygon() self.empty_geometry = GeometryCollection() + self.row = pd.Series({"x": 1, "y": 1, "z": self.level}) def test_create_tiles_with_valid_polygon(self): tiles = create_tiles(self.test_polygon, self.level) @@ -171,26 +172,26 @@ def test_download_and_process_tile_success(self, mock_get, mock_vt2geojson): row = {"x": 1, "y": 1, "z": 14} - result = download_and_process_tile(row) + polygon = wkt.loads("POLYGON ((-1 -1, -1 1, 1 1, 1 -1, -1 -1))") + result = download_and_process_tile(row, polygon, {}) self.assertIsInstance(result, pd.DataFrame) self.assertEqual(len(result), 1) self.assertEqual(result["geometry"][0].wkt, "POINT (0 0)") @patch("mapswipe_workers.utils.process_mapillary.requests.get") def test_download_and_process_tile_failure(self, mock_get): - # Mock a failed response + mock_response = MagicMock() mock_response.status_code = 500 mock_get.return_value = mock_response - row = pd.Series({"x": 1, "y": 1, "z": self.level}) - result = download_and_process_tile(row) + result = download_and_process_tile(self.row, self.test_polygon, {}) self.assertIsNone(result) - @patch("mapswipe_workers.utils.process_mapillary.download_and_process_tile") - def test_coordinate_download(self, mock_download_and_process_tile): + @patch("mapswipe_workers.utils.process_mapillary.get_mapillary_data") + def test_download_and_process_tile_spatial_filtering(self, mock_get_mapillary_data): inside_points = [ (0.2, 0.2), (0.5, 0.5), @@ -208,20 +209,20 @@ def test_coordinate_download(self, mock_download_and_process_tile): for x, y in points ] - mock_download_and_process_tile.return_value = pd.DataFrame(data) + mock_get_mapillary_data.return_value = pd.DataFrame(data) - metadata = coordinate_download(self.test_polygon, self.level) + metadata = download_and_process_tile(self.row, self.test_polygon, {}) metadata = metadata.drop_duplicates() self.assertEqual(len(metadata), len(inside_points)) self.assertIsInstance(metadata, pd.DataFrame) - @patch("mapswipe_workers.utils.process_mapillary.download_and_process_tile") - def test_coordinate_download_with_failures(self, mock_download_and_process_tile): - mock_download_and_process_tile.return_value = pd.DataFrame() + @patch("mapswipe_workers.utils.process_mapillary.parallelized_processing") + def test_coordinate_download_with_failures(self, mock_parallelized_processing): + mock_parallelized_processing.return_value = pd.DataFrame() - metadata = coordinate_download(self.test_polygon, self.level) + metadata = coordinate_download(self.test_polygon, self.level, {}) self.assertTrue(metadata.empty) @@ -284,7 +285,7 @@ def test_filter_missing_columns(self): "is_pano", "organization_id", "captured_at", - ] # Add your column names here + ] for column in columns_to_check: df_copy = self.fixture_df.copy() df_copy[column] = None @@ -302,33 +303,6 @@ def test_get_image_metadata(self, mock_coordinate_download): self.assertIn("ids", result) self.assertIn("geometries", result) - @patch("mapswipe_workers.utils.process_mapillary.coordinate_download") - def test_get_image_metadata_filtering(self, mock_coordinate_download): - mock_coordinate_download.return_value = self.fixture_df - - params = { - "is_pano": True, - "start_time": "2016-01-20 00:00:00", - "end_time": "2022-01-21 23:59:59", - } - - result = get_image_metadata(self.fixture_data, **params) - self.assertIsInstance(result, dict) - self.assertIn("ids", result) - self.assertIn("geometries", result) - - @patch("mapswipe_workers.utils.process_mapillary.coordinate_download") - def test_get_image_metadata_no_rows(self, mock_coordinate_download): - mock_coordinate_download.return_value = self.fixture_df - - params = { - "is_pano": True, - "start_time": "1916-01-20 00:00:00", - "end_time": "1922-01-21 23:59:59", - } - with self.assertRaises(ValueError): - get_image_metadata(self.fixture_data, **params) - @patch("mapswipe_workers.utils.process_mapillary.coordinate_download") def test_get_image_metadata_empty_response(self, mock_coordinate_download): df = self.fixture_df.copy() @@ -338,13 +312,9 @@ def test_get_image_metadata_empty_response(self, mock_coordinate_download): with self.assertRaises(ValueError): get_image_metadata(self.fixture_data) - @patch("mapswipe_workers.utils.process_mapillary.filter_results") @patch("mapswipe_workers.utils.process_mapillary.coordinate_download") - def test_get_image_metadata_size_restriction( - self, mock_coordinate_download, mock_filter_results - ): - mock_filter_results.return_value = pd.DataFrame({"ID": range(1, 100002)}) - mock_coordinate_download.return_value = self.fixture_df + def test_get_image_metadata_size_restriction(self, mock_coordinate_download): + mock_coordinate_download.return_value = pd.DataFrame({"ID": range(1, 100002)}) with self.assertRaises(ValueError): get_image_metadata(self.fixture_data) From 41d641363d7eecf78acc6d8b8e111e7ea5fe2111 Mon Sep 17 00:00:00 2001 From: Levi Szamek Date: Tue, 28 Jan 2025 13:58:59 +0100 Subject: [PATCH 4/6] fix: remove debugging artifact --- mapswipe_workers/mapswipe_workers/utils/process_mapillary.py | 1 - 1 file changed, 1 deletion(-) diff --git a/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py b/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py index a8f7f62f6..b2dd3c76c 100644 --- a/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py +++ b/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py @@ -108,7 +108,6 @@ def coordinate_download( downloaded_metadata, kwargs, polygon, tiles, workers ) if len(downloaded_metadata): - breakpoint() downloaded_metadata = pd.concat(downloaded_metadata, ignore_index=True) else: return pd.DataFrame(downloaded_metadata) From 4d7790bf02b5ee5a342767e09e4854a03ee8fd8f Mon Sep 17 00:00:00 2001 From: Levi Szamek Date: Thu, 30 Jan 2025 14:09:13 +0100 Subject: [PATCH 5/6] fix: remove unnecessary check for empty dataframe --- .../mapswipe_workers/utils/process_mapillary.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py b/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py index b2dd3c76c..a87c1970b 100644 --- a/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py +++ b/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py @@ -224,14 +224,9 @@ def get_image_metadata( aoi_polygon = geojson_to_polygon(aoi_geojson) downloaded_metadata = coordinate_download(aoi_polygon, level, kwargs) if downloaded_metadata.empty or downloaded_metadata.isna().all().all(): - raise ValueError("No Mapillary Features in the AoI.") - - if ( - downloaded_metadata is None - or downloaded_metadata.empty - or downloaded_metadata.isna().all().all() - ): - raise ValueError("No Mapillary Features in the AoI match the filter criteria.") + raise ValueError( + "No Mapillary Features in the AoI or no Features match the filter criteria." + ) if sampling_threshold is not None: downloaded_metadata = spatial_sampling(downloaded_metadata, sampling_threshold) From 916d4f703b5a9c3731b004eb592f904b69d2cff4 Mon Sep 17 00:00:00 2001 From: Levi Szamek Date: Thu, 30 Jan 2025 14:21:49 +0100 Subject: [PATCH 6/6] feat: drop duplicated images at exact same location --- .../utils/process_mapillary.py | 2 +- .../tests/unittests/test_process_mapillary.py | 19 ++++++++++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py b/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py index a87c1970b..ac06f435e 100644 --- a/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py +++ b/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py @@ -227,7 +227,7 @@ def get_image_metadata( raise ValueError( "No Mapillary Features in the AoI or no Features match the filter criteria." ) - + downloaded_metadata = downloaded_metadata.drop_duplicates(subset=["geometry"]) if sampling_threshold is not None: downloaded_metadata = spatial_sampling(downloaded_metadata, sampling_threshold) diff --git a/mapswipe_workers/tests/unittests/test_process_mapillary.py b/mapswipe_workers/tests/unittests/test_process_mapillary.py index 9fd1b4d1c..e6894fae1 100644 --- a/mapswipe_workers/tests/unittests/test_process_mapillary.py +++ b/mapswipe_workers/tests/unittests/test_process_mapillary.py @@ -314,11 +314,28 @@ def test_get_image_metadata_empty_response(self, mock_coordinate_download): @patch("mapswipe_workers.utils.process_mapillary.coordinate_download") def test_get_image_metadata_size_restriction(self, mock_coordinate_download): - mock_coordinate_download.return_value = pd.DataFrame({"ID": range(1, 100002)}) + mock_coordinate_download.return_value = pd.DataFrame( + {"geometry": range(1, 100002)} + ) with self.assertRaises(ValueError): get_image_metadata(self.fixture_data) + @patch("mapswipe_workers.utils.process_mapillary.coordinate_download") + def test_get_image_metadata_drop_duplicates(self, mock_coordinate_download): + test_df = pd.DataFrame( + { + "id": [1, 2, 2, 3, 4, 4, 5], + "geometry": ["a", "b", "b", "c", "d", "d", "e"], + } + ) + mock_coordinate_download.return_value = test_df + return_dict = get_image_metadata(self.fixture_data) + + return_df = pd.DataFrame(return_dict) + + self.assertNotEqual(len(return_df), len(test_df)) + if __name__ == "__main__": unittest.main()