diff --git a/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py b/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py index 29f4363f..879e6657 100644 --- a/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py +++ b/mapswipe_workers/mapswipe_workers/utils/process_mapillary.py @@ -1,18 +1,11 @@ import os -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ProcessPoolExecutor +from functools import partial import mercantile import pandas as pd import requests -from shapely import ( - LineString, - MultiLineString, - MultiPolygon, - Point, - Polygon, - box, - unary_union, -) +from shapely import MultiPolygon, Point, Polygon, box, unary_union from shapely.geometry import shape from vt2geojson import tools as vt2geojson_tools @@ -44,7 +37,7 @@ def create_tiles(polygon, level): return tiles -def download_and_process_tile(row, attempt_limit=3): +def download_and_process_tile(row, polygon, kwargs, attempt_limit=3): z = row["z"] x = row["x"] y = row["y"] @@ -53,37 +46,24 @@ def download_and_process_tile(row, attempt_limit=3): attempt = 0 while attempt < attempt_limit: try: - r = requests.get(url) - assert r.status_code == 200, r.content - features = vt2geojson_tools.vt_bytes_to_geojson(r.content, x, y, z).get( - "features", [] - ) - data = [] - for feature in features: - geometry = feature.get("geometry", {}) - properties = feature.get("properties", {}) - geometry_type = geometry.get("type", None) - coordinates = geometry.get("coordinates", []) - - element_geometry = None - if geometry_type == "Point": - element_geometry = Point(coordinates) - elif geometry_type == "LineString": - element_geometry = LineString(coordinates) - elif geometry_type == "MultiLineString": - element_geometry = MultiLineString(coordinates) - elif geometry_type == "Polygon": - element_geometry = Polygon(coordinates[0]) - elif geometry_type == "MultiPolygon": - element_geometry = MultiPolygon(coordinates) - - # Append the dictionary with geometry and properties - row = {"geometry": element_geometry, **properties} - data.append(row) - - data = pd.DataFrame(data) - - if not data.empty: + data = get_mapillary_data(url, x, y, z) + if data.isna().all().all() is False or data.empty is False: + data = data[data["geometry"].apply(lambda point: point.within(polygon))] + target_columns = [ + "id", + "geometry", + "captured_at", + "is_pano", + "compass_angle", + "sequence", + "organization_id", + ] + for col in target_columns: + if col not in data.columns: + data[col] = None + if data.isna().all().all() is False or data.empty is False: + data = filter_results(data, **kwargs) + return data except Exception as e: print(f"An exception occurred while requesting a tile: {e}") @@ -93,8 +73,28 @@ def download_and_process_tile(row, attempt_limit=3): return None +def get_mapillary_data(url, x, y, z): + r = requests.get(url) + assert r.status_code == 200, r.content + features = vt2geojson_tools.vt_bytes_to_geojson(r.content, x, y, z).get( + "features", [] + ) + data = [] + data.extend( + [ + { + "geometry": Point(feature["geometry"]["coordinates"]), + **feature.get("properties", {}), + } + for feature in features + if feature.get("geometry", {}).get("type") == "Point" + ] + ) + return pd.DataFrame(data) + + def coordinate_download( - polygon, level, use_concurrency=True, attempt_limit=3, workers=os.cpu_count() * 4 + polygon, level, kwargs: dict, use_concurrency=True, workers=os.cpu_count() * 4 ): tiles = create_tiles(polygon, level) @@ -104,48 +104,32 @@ def coordinate_download( if not use_concurrency: workers = 1 - futures = [] - with ThreadPoolExecutor(max_workers=workers) as executor: - for index, row in tiles.iterrows(): - futures.append( - executor.submit(download_and_process_tile, row, attempt_limit) - ) - - for future in as_completed(futures): - if future is not None: - df = future.result() - - if df is not None and not df.empty: - downloaded_metadata.append(df) + downloaded_metadata = parallelized_processing( + downloaded_metadata, kwargs, polygon, tiles, workers + ) if len(downloaded_metadata): downloaded_metadata = pd.concat(downloaded_metadata, ignore_index=True) else: return pd.DataFrame(downloaded_metadata) - target_columns = [ - "id", - "geometry", - "captured_at", - "is_pano", - "compass_angle", - "sequence", - "organization_id", - ] - for col in target_columns: - if col not in downloaded_metadata.columns: - downloaded_metadata[col] = None - if ( - downloaded_metadata.isna().all().all() is False - or downloaded_metadata.empty is False - ): - downloaded_metadata = downloaded_metadata[ - downloaded_metadata["geometry"].apply( - lambda point: point.within(polygon) - ) - ] return downloaded_metadata +def parallelized_processing(data, kwargs, polygon, tiles, workers): + process_tile_with_args = partial( + download_and_process_tile, polygon=polygon, kwargs=kwargs + ) + with ProcessPoolExecutor(max_workers=workers) as executor: + futures = list( + executor.map(process_tile_with_args, tiles.to_dict(orient="records")) + ) + + for df in futures: + if df is not None and not df.empty: + data.append(df) + return data + + def geojson_to_polygon(geojson_data): if geojson_data["type"] == "FeatureCollection": features = geojson_data["features"] @@ -198,13 +182,11 @@ def filter_results( ) return None df = df[df["creator_id"] == creator_id] - if is_pano is not None: if df["is_pano"].isna().all(): logger.exception("No Mapillary Feature in the AoI has a 'is_pano' value.") return None df = df[df["is_pano"] == is_pano] - if organization_id is not None: if df["organization_id"].isna().all(): logger.exception( @@ -212,7 +194,6 @@ def filter_results( ) return None df = df[df["organization_id"] == organization_id] - if start_time is not None: if df["captured_at"].isna().all(): logger.exception( @@ -220,14 +201,12 @@ def filter_results( ) return None df = filter_by_timerange(df, start_time, end_time) - return df def get_image_metadata( aoi_geojson, level=14, - attempt_limit=3, is_pano: bool = None, creator_id: int = None, organization_id: str = None, @@ -236,33 +215,22 @@ def get_image_metadata( randomize_order=False, sampling_threshold=None, ): + kwargs = { + "is_pano": is_pano, + "creator_id": creator_id, + "organization_id": organization_id, + "start_time": start_time, + "end_time": end_time, + } aoi_polygon = geojson_to_polygon(aoi_geojson) - downloaded_metadata = coordinate_download(aoi_polygon, level, attempt_limit) - + downloaded_metadata = coordinate_download(aoi_polygon, level, kwargs) if downloaded_metadata.empty or downloaded_metadata.isna().all().all(): - raise ValueError("No Mapillary Features in the AoI.") - - downloaded_metadata = downloaded_metadata[ - downloaded_metadata["geometry"].apply(lambda geom: isinstance(geom, Point)) - ] - - downloaded_metadata = filter_results( - downloaded_metadata, - creator_id, - is_pano, - organization_id, - start_time, - end_time, - ) - - if ( - downloaded_metadata is None - or downloaded_metadata.empty - or downloaded_metadata.isna().all().all() - ): - raise ValueError("No Mapillary Features in the AoI match the filter criteria.") - - downloaded_metadata = spatial_sampling(downloaded_metadata, sampling_threshold) + raise ValueError( + "No Mapillary Features in the AoI or no Features match the filter criteria." + ) + downloaded_metadata = downloaded_metadata.drop_duplicates(subset=["geometry"]) + if sampling_threshold is not None: + downloaded_metadata = spatial_sampling(downloaded_metadata, sampling_threshold) if randomize_order is True: downloaded_metadata = downloaded_metadata.sample(frac=1).reset_index(drop=True) diff --git a/mapswipe_workers/tests/unittests/test_process_mapillary.py b/mapswipe_workers/tests/unittests/test_process_mapillary.py index b5b8302f..6aacf7d1 100644 --- a/mapswipe_workers/tests/unittests/test_process_mapillary.py +++ b/mapswipe_workers/tests/unittests/test_process_mapillary.py @@ -50,6 +50,7 @@ def setUp(self): self.test_polygon = Polygon([(0, 0), (1, 0), (1, 1), (0, 1)]) self.empty_polygon = Polygon() self.empty_geometry = GeometryCollection() + self.row = pd.Series({"x": 1, "y": 1, "z": self.level}) def test_create_tiles_with_valid_polygon(self): tiles = create_tiles(self.test_polygon, self.level) @@ -178,26 +179,26 @@ def test_download_and_process_tile_success(self, mock_get, mock_vt2geojson): row = {"x": 1, "y": 1, "z": 14} - result = download_and_process_tile(row) + polygon = wkt.loads("POLYGON ((-1 -1, -1 1, 1 1, 1 -1, -1 -1))") + result = download_and_process_tile(row, polygon, {}) self.assertIsInstance(result, pd.DataFrame) self.assertEqual(len(result), 1) self.assertEqual(result["geometry"][0].wkt, "POINT (0 0)") @patch("mapswipe_workers.utils.process_mapillary.requests.get") def test_download_and_process_tile_failure(self, mock_get): - # Mock a failed response + mock_response = MagicMock() mock_response.status_code = 500 mock_get.return_value = mock_response - row = pd.Series({"x": 1, "y": 1, "z": self.level}) - result = download_and_process_tile(row) + result = download_and_process_tile(self.row, self.test_polygon, {}) self.assertIsNone(result) - @patch("mapswipe_workers.utils.process_mapillary.download_and_process_tile") - def test_coordinate_download(self, mock_download_and_process_tile): + @patch("mapswipe_workers.utils.process_mapillary.get_mapillary_data") + def test_download_and_process_tile_spatial_filtering(self, mock_get_mapillary_data): inside_points = [ (0.2, 0.2), (0.5, 0.5), @@ -215,20 +216,20 @@ def test_coordinate_download(self, mock_download_and_process_tile): for x, y in points ] - mock_download_and_process_tile.return_value = pd.DataFrame(data) + mock_get_mapillary_data.return_value = pd.DataFrame(data) - metadata = coordinate_download(self.test_polygon, self.level) + metadata = download_and_process_tile(self.row, self.test_polygon, {}) metadata = metadata.drop_duplicates() self.assertEqual(len(metadata), len(inside_points)) self.assertIsInstance(metadata, pd.DataFrame) - @patch("mapswipe_workers.utils.process_mapillary.download_and_process_tile") - def test_coordinate_download_with_failures(self, mock_download_and_process_tile): - mock_download_and_process_tile.return_value = pd.DataFrame() + @patch("mapswipe_workers.utils.process_mapillary.parallelized_processing") + def test_coordinate_download_with_failures(self, mock_parallelized_processing): + mock_parallelized_processing.return_value = pd.DataFrame() - metadata = coordinate_download(self.test_polygon, self.level) + metadata = coordinate_download(self.test_polygon, self.level, {}) self.assertTrue(metadata.empty) @@ -291,7 +292,7 @@ def test_filter_missing_columns(self): "is_pano", "organization_id", "captured_at", - ] # Add your column names here + ] for column in columns_to_check: df_copy = self.fixture_df.copy() df_copy[column] = None @@ -309,33 +310,6 @@ def test_get_image_metadata(self, mock_coordinate_download): self.assertIn("ids", result) self.assertIn("geometries", result) - @patch("mapswipe_workers.utils.process_mapillary.coordinate_download") - def test_get_image_metadata_filtering(self, mock_coordinate_download): - mock_coordinate_download.return_value = self.fixture_df - - params = { - "is_pano": True, - "start_time": "2016-01-20 00:00:00", - "end_time": "2022-01-21 23:59:59", - } - - result = get_image_metadata(self.fixture_data, **params) - self.assertIsInstance(result, dict) - self.assertIn("ids", result) - self.assertIn("geometries", result) - - @patch("mapswipe_workers.utils.process_mapillary.coordinate_download") - def test_get_image_metadata_no_rows(self, mock_coordinate_download): - mock_coordinate_download.return_value = self.fixture_df - - params = { - "is_pano": True, - "start_time": "1916-01-20 00:00:00", - "end_time": "1922-01-21 23:59:59", - } - with self.assertRaises(ValueError): - get_image_metadata(self.fixture_data, **params) - @patch("mapswipe_workers.utils.process_mapillary.coordinate_download") def test_get_image_metadata_empty_response(self, mock_coordinate_download): df = self.fixture_df.copy() @@ -350,16 +324,26 @@ def test_get_image_metadata_empty_response(self, mock_coordinate_download): def test_get_image_metadata_size_restriction( self, mock_coordinate_download, mock_filter_results ): - mock_df = pd.DataFrame({"ID": range(1, 100002)}) - mock_df["geometry"] = self.test_polygon - mock_df["captured_at"] = range(1, 100002) - mock_df["sequence_id"] = 1 - mock_filter_results.return_value = mock_df - mock_coordinate_download.return_value = self.fixture_df - + mock_df = pd.DataFrame({"id": range(1, 100002), "geometry": range(1, 100002)}) + mock_coordinate_download.return_value = mock_df with self.assertRaises(ValueError): get_image_metadata(self.fixture_data) + @patch("mapswipe_workers.utils.process_mapillary.coordinate_download") + def test_get_image_metadata_drop_duplicates(self, mock_coordinate_download): + test_df = pd.DataFrame( + { + "id": [1, 2, 2, 3, 4, 4, 5], + "geometry": ["a", "b", "b", "c", "d", "d", "e"], + } + ) + mock_coordinate_download.return_value = test_df + return_dict = get_image_metadata(self.fixture_data) + + return_df = pd.DataFrame(return_dict) + + self.assertNotEqual(len(return_df), len(test_df)) + if __name__ == "__main__": unittest.main()