feat: Handle memory errors in batch_process_dataset#1602
feat: Handle memory errors in batch_process_dataset#1602
Conversation
…dataset-prod-function
…dataset-prod-function
…dataset-prod-function
| with open("/proc/mounts", "r") as f: | ||
| for line in f: | ||
| parts = line.split() | ||
| if len(parts) >= 3 and parts[2] == "tmpfs" and "in-memory" in parts[1]: |
There was a problem hiding this comment.
I find this very restictive the have mounts with "in-memory" in the name. The main function used
working_dir = os.getenv("WORKING_DIR", "/tmp/in-memory")
If the WORKING_DIR doesn't contain "in-memory" this function will not return the desired result. To make this process more resilient, we can explore using shutil.disk_usage as the caller, or one of the indirect callers should know the path(s) of the mounted volume.
| self.dataset_stable_id, | ||
| extracted_files_path, | ||
| public=public, | ||
| skip_dataset_upload=True, # Skip the upload of the dataset file |
There was a problem hiding this comment.
Why do we need to remove the functionality of skipping the dataset upload? This is used in the rebuild_missing_dataset_files task.
| with zf.open(member, "r") as src, open( | ||
| temp_extracted_path, "wb" | ||
| ) as dst: | ||
| shutil.copyfileobj(src, dst) | ||
|
|
||
| # Upload this single file to GCS under extracted/ | ||
| if os.path.isfile(temp_extracted_path): | ||
| target_path = f"{self.feed_stable_id}/{dataset_stable_id}/extracted/{member.filename}" | ||
| file_blob = bucket.blob(target_path) | ||
| file_blob.upload_from_filename(temp_extracted_path) | ||
| if public: | ||
| file_blob.make_public() | ||
| self.logger.info( | ||
| "Uploaded extracted file %s to %s", | ||
| member.filename, | ||
| file_blob.public_url, | ||
| ) | ||
|
|
||
| extracted_files.append( | ||
| Gtfsfile( | ||
| id=str(uuid.uuid4()), | ||
| file_name=member.filename, | ||
| file_size_bytes=os.path.getsize(temp_extracted_path), | ||
| hosted_url=file_blob.public_url if public else None, | ||
| hash=get_hash_from_file(temp_extracted_path), | ||
| ) | ||
| ) | ||
|
|
||
| # Remove the local temporary extracted file to free disk space | ||
| try: | ||
| if os.path.exists(temp_extracted_path): | ||
| os.remove(temp_extracted_path) | ||
| except Exception as cleanup_err: | ||
| self.logger.warning( | ||
| "Failed to remove temporary file %s: %s", | ||
| temp_extracted_path, | ||
| cleanup_err, |
There was a problem hiding this comment.
[suggestion]: This can be placed in its own function.
| try: | ||
| maximum_executions = int(os.getenv("MAXIMUM_EXECUTIONS", "1")) | ||
| except (ValueError, TypeError): | ||
| maximum_executions = 1 |
| def simulate(request) -> dict: # pragma: no cover | ||
| """HTTP endpoint to simulate a process_dataset call for testing.""" | ||
| # Hardcoded test values | ||
| payload = { | ||
| "execution_id": "task-executor-uuid-af993d49-0d95-42cb-96a4-9cffc5301e87", | ||
| "producer_url": "https://data.bus-data.dft.gov.uk/timetable/download/gtfs-file/all/", | ||
| "feed_stable_id": "mdb-2014", | ||
| "feed_id": "34434a73-0ba7-4070-b01f-dfadb6e30d42", | ||
| "dataset_stable_id": "mdb-2014-202408202259", | ||
| "dataset_hash": "abc", | ||
| "authentication_type": "0", | ||
| "authentication_info_url": "", | ||
| "api_key_parameter_name": "", | ||
| } | ||
|
|
||
| # Create CloudEvent | ||
| encoded_data = base64.b64encode(json.dumps(payload).encode()).decode() | ||
| attributes = { | ||
| "type": "google.cloud.pubsub.topic.v1.messagePublished", | ||
| "source": "//pubsub.googleapis.com/test", | ||
| "specversion": "1.0", | ||
| } | ||
| data = {"message": {"data": encoded_data}} | ||
| cloud_event = CloudEvent(attributes, data) | ||
|
|
||
| # Call process_dataset | ||
| process_dataset(cloud_event) | ||
| return {"status": "completed"} | ||
|
|
||
|
|
||
| def main(): # pragma: no cover | ||
| simulate(None) |
There was a problem hiding this comment.
This should not be part of the deployed code. We have many ways to send a pub sun message. If we need a better way of testing, we can create a script that sends this kind of message.
Summary:
closes #1538
Added some memory management functionalities to limit the available memory of the process. The result is that any out-of-memory error will happen earlier, with some memory left (a security margin currently set at 200MB by default) so the exception can be handled properly and the http call to the function can return a 200 code. This (in theory) prevents automatic retries.
Also log messages will still be printed since there is memory left, which allows printing the stable_id that caused the error.
Also modified the way gtfs datasets are unzipped. Each .txt file within the dataset is unzipped separately on the in-memory file system, then uploaded to GCP storage then immediately deleted locally. This reduces the number of out of disk space errors and (apparently) does not make the process significantly slower.
From Copilot:
This pull request introduces significant improvements to memory management and disk usage in the dataset batch processing function. The main change is the introduction of a memory limiting utility and a new approach for extracting and uploading files from ZIP archives, which minimizes local disk usage and helps prevent out-of-memory errors. Several method names and flows have been updated to reflect these improvements. Additionally, error handling and environment variable parsing have been enhanced for robustness.
Memory Management Enhancements:
shared/common/gcp_memory_utils.pywith functions to calculate available process memory and set memory limits using cgroups and tmpfs information, and integratedlimit_gcp_memory()at startup to restrict process memory usage. [1] [2]Efficient ZIP Extraction and Upload:
extract_and_upload_files_from_zip, which extracts and uploads files one at a time, immediately deleting temporary files to minimize disk usage. This change is reflected in both dataset upload and bucket processing flows, and the oldunzip_filesmethod was removed. [1] [2] [3] [4]Robustness and Error Handling:
MAXIMUM_EXECUTIONS). [1] [2]Database Integration Updates:
Gtfsdatasetand its relationship withgtfsfiles. [1] [2]These changes collectively make the batch processing function more reliable, efficient, and scalable, especially in environments with constrained memory and disk resources.
Expected behavior:
Testing tips:
For the memory limitation change, increased the in-memory disk space to 7 GB (out of 8 GB for the whole process). This left 1GB of memory for running the code, of which 200MB were kept as a security margin. Testing with mdb-2014, we now get these errors:
For the separate zip upload improvement, used mdb-2014. WIth the original code and 6 GB of in-memory disk space, it would originally have an out of disk space exception. With the changes the files were extracted properly.
Please make sure these boxes are checked before submitting your pull request - thanks!
./scripts/api-tests.shto make sure you didn't break anything