Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 75 additions & 107 deletions mapswipe_workers/mapswipe_workers/utils/process_mapillary.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor
from functools import partial

import mercantile
import pandas as pd
import requests
from shapely import (
LineString,
MultiLineString,
MultiPolygon,
Point,
Polygon,
box,
unary_union,
)
from shapely import MultiPolygon, Point, Polygon, box, unary_union
from shapely.geometry import shape
from vt2geojson import tools as vt2geojson_tools

Expand Down Expand Up @@ -44,7 +37,7 @@ def create_tiles(polygon, level):
return tiles


def download_and_process_tile(row, attempt_limit=3):
def download_and_process_tile(row, polygon, kwargs, attempt_limit=3):
z = row["z"]
x = row["x"]
y = row["y"]
Expand All @@ -53,37 +46,24 @@ def download_and_process_tile(row, attempt_limit=3):
attempt = 0
while attempt < attempt_limit:
try:
r = requests.get(url)
assert r.status_code == 200, r.content
features = vt2geojson_tools.vt_bytes_to_geojson(r.content, x, y, z).get(
"features", []
)
data = []
for feature in features:
geometry = feature.get("geometry", {})
properties = feature.get("properties", {})
geometry_type = geometry.get("type", None)
coordinates = geometry.get("coordinates", [])

element_geometry = None
if geometry_type == "Point":
element_geometry = Point(coordinates)
elif geometry_type == "LineString":
element_geometry = LineString(coordinates)
elif geometry_type == "MultiLineString":
element_geometry = MultiLineString(coordinates)
elif geometry_type == "Polygon":
element_geometry = Polygon(coordinates[0])
elif geometry_type == "MultiPolygon":
element_geometry = MultiPolygon(coordinates)

# Append the dictionary with geometry and properties
row = {"geometry": element_geometry, **properties}
data.append(row)

data = pd.DataFrame(data)

if not data.empty:
data = get_mapillary_data(url, x, y, z)
if data.isna().all().all() is False or data.empty is False:
data = data[data["geometry"].apply(lambda point: point.within(polygon))]
target_columns = [
"id",
"geometry",
"captured_at",
"is_pano",
"compass_angle",
"sequence",
"organization_id",
]
for col in target_columns:
if col not in data.columns:
data[col] = None
if data.isna().all().all() is False or data.empty is False:
data = filter_results(data, **kwargs)

return data
except Exception as e:
print(f"An exception occurred while requesting a tile: {e}")
Expand All @@ -93,8 +73,28 @@ def download_and_process_tile(row, attempt_limit=3):
return None


def get_mapillary_data(url, x, y, z):
r = requests.get(url)
assert r.status_code == 200, r.content
features = vt2geojson_tools.vt_bytes_to_geojson(r.content, x, y, z).get(
"features", []
)
data = []
data.extend(
[
{
"geometry": Point(feature["geometry"]["coordinates"]),
**feature.get("properties", {}),
}
for feature in features
if feature.get("geometry", {}).get("type") == "Point"
]
)
return pd.DataFrame(data)


def coordinate_download(
polygon, level, use_concurrency=True, attempt_limit=3, workers=os.cpu_count() * 4
polygon, level, kwargs: dict, use_concurrency=True, workers=os.cpu_count() * 4
):
tiles = create_tiles(polygon, level)

Expand All @@ -104,48 +104,32 @@ def coordinate_download(
if not use_concurrency:
workers = 1

futures = []
with ThreadPoolExecutor(max_workers=workers) as executor:
for index, row in tiles.iterrows():
futures.append(
executor.submit(download_and_process_tile, row, attempt_limit)
)

for future in as_completed(futures):
if future is not None:
df = future.result()

if df is not None and not df.empty:
downloaded_metadata.append(df)
downloaded_metadata = parallelized_processing(
downloaded_metadata, kwargs, polygon, tiles, workers
)
if len(downloaded_metadata):
downloaded_metadata = pd.concat(downloaded_metadata, ignore_index=True)
else:
return pd.DataFrame(downloaded_metadata)

target_columns = [
"id",
"geometry",
"captured_at",
"is_pano",
"compass_angle",
"sequence",
"organization_id",
]
for col in target_columns:
if col not in downloaded_metadata.columns:
downloaded_metadata[col] = None
if (
downloaded_metadata.isna().all().all() is False
or downloaded_metadata.empty is False
):
downloaded_metadata = downloaded_metadata[
downloaded_metadata["geometry"].apply(
lambda point: point.within(polygon)
)
]
return downloaded_metadata


def parallelized_processing(data, kwargs, polygon, tiles, workers):
process_tile_with_args = partial(
download_and_process_tile, polygon=polygon, kwargs=kwargs
)
with ProcessPoolExecutor(max_workers=workers) as executor:
futures = list(
executor.map(process_tile_with_args, tiles.to_dict(orient="records"))
)

for df in futures:
if df is not None and not df.empty:
data.append(df)
return data


def geojson_to_polygon(geojson_data):
if geojson_data["type"] == "FeatureCollection":
features = geojson_data["features"]
Expand Down Expand Up @@ -198,36 +182,31 @@ def filter_results(
)
return None
df = df[df["creator_id"] == creator_id]

if is_pano is not None:
if df["is_pano"].isna().all():
logger.exception("No Mapillary Feature in the AoI has a 'is_pano' value.")
return None
df = df[df["is_pano"] == is_pano]

if organization_id is not None:
if df["organization_id"].isna().all():
logger.exception(
"No Mapillary Feature in the AoI has an 'organization_id' value."
)
return None
df = df[df["organization_id"] == organization_id]

if start_time is not None:
if df["captured_at"].isna().all():
logger.exception(
"No Mapillary Feature in the AoI has a 'captured_at' value."
)
return None
df = filter_by_timerange(df, start_time, end_time)

return df


def get_image_metadata(
aoi_geojson,
level=14,
attempt_limit=3,
is_pano: bool = None,
creator_id: int = None,
organization_id: str = None,
Expand All @@ -236,33 +215,22 @@ def get_image_metadata(
randomize_order=False,
sampling_threshold=None,
):
kwargs = {
"is_pano": is_pano,
"creator_id": creator_id,
"organization_id": organization_id,
"start_time": start_time,
"end_time": end_time,
}
aoi_polygon = geojson_to_polygon(aoi_geojson)
downloaded_metadata = coordinate_download(aoi_polygon, level, attempt_limit)

downloaded_metadata = coordinate_download(aoi_polygon, level, kwargs)
if downloaded_metadata.empty or downloaded_metadata.isna().all().all():
raise ValueError("No Mapillary Features in the AoI.")

downloaded_metadata = downloaded_metadata[
downloaded_metadata["geometry"].apply(lambda geom: isinstance(geom, Point))
]

downloaded_metadata = filter_results(
downloaded_metadata,
creator_id,
is_pano,
organization_id,
start_time,
end_time,
)

if (
downloaded_metadata is None
or downloaded_metadata.empty
or downloaded_metadata.isna().all().all()
):
raise ValueError("No Mapillary Features in the AoI match the filter criteria.")

downloaded_metadata = spatial_sampling(downloaded_metadata, sampling_threshold)
raise ValueError(
"No Mapillary Features in the AoI or no Features match the filter criteria."
)
downloaded_metadata = downloaded_metadata.drop_duplicates(subset=["geometry"])
if sampling_threshold is not None:
downloaded_metadata = spatial_sampling(downloaded_metadata, sampling_threshold)

if randomize_order is True:
downloaded_metadata = downloaded_metadata.sample(frac=1).reset_index(drop=True)
Expand Down
Loading