Skip to content

Comments

feat: Handle memory errors in batch_process_dataset#1602

Open
jcpitre wants to merge 11 commits intomainfrom
1538-memory-limit-exceeded-in-batch-process-dataset-prod-function
Open

feat: Handle memory errors in batch_process_dataset#1602
jcpitre wants to merge 11 commits intomainfrom
1538-memory-limit-exceeded-in-batch-process-dataset-prod-function

Conversation

@jcpitre
Copy link
Collaborator

@jcpitre jcpitre commented Feb 16, 2026

Summary:

closes #1538
Added some memory management functionalities to limit the available memory of the process. The result is that any out-of-memory error will happen earlier, with some memory left (a security margin currently set at 200MB by default) so the exception can be handled properly and the http call to the function can return a 200 code. This (in theory) prevents automatic retries.
Also log messages will still be printed since there is memory left, which allows printing the stable_id that caused the error.

Also modified the way gtfs datasets are unzipped. Each .txt file within the dataset is unzipped separately on the in-memory file system, then uploaded to GCP storage then immediately deleted locally. This reduces the number of out of disk space errors and (apparently) does not make the process significantly slower.

From Copilot:

This pull request introduces significant improvements to memory management and disk usage in the dataset batch processing function. The main change is the introduction of a memory limiting utility and a new approach for extracting and uploading files from ZIP archives, which minimizes local disk usage and helps prevent out-of-memory errors. Several method names and flows have been updated to reflect these improvements. Additionally, error handling and environment variable parsing have been enhanced for robustness.

Memory Management Enhancements:

  • Added shared/common/gcp_memory_utils.py with functions to calculate available process memory and set memory limits using cgroups and tmpfs information, and integrated limit_gcp_memory() at startup to restrict process memory usage. [1] [2]

Efficient ZIP Extraction and Upload:

  • Replaced bulk extraction of ZIP files with extract_and_upload_files_from_zip, which extracts and uploads files one at a time, immediately deleting temporary files to minimize disk usage. This change is reflected in both dataset upload and bucket processing flows, and the old unzip_files method was removed. [1] [2] [3] [4]

Robustness and Error Handling:

  • Improved error handling in dataset processing, including logging exception types and messages, and more robust parsing of environment variables (e.g., MAXIMUM_EXECUTIONS). [1] [2]

Database Integration Updates:

  • Updated database imports and configuration to include Gtfsdataset and its relationship with gtfsfiles. [1] [2]

These changes collectively make the batch processing function more reliable, efficient, and scalable, especially in environments with constrained memory and disk resources.
Expected behavior:

Testing tips:

For the memory limitation change, increased the in-memory disk space to 7 GB (out of 8 GB for the whole process). This left 1GB of memory for running the code, of which 200MB were kept as a security margin. Testing with mdb-2014, we now get these errors:

image Where we see that there was a memory error, but it was caught the logged with the stable_id.

For the separate zip upload improvement, used mdb-2014. WIth the original code and 6 GB of in-memory disk space, it would originally have an out of disk space exception. With the changes the files were extracted properly.

Please make sure these boxes are checked before submitting your pull request - thanks!

  • Run the unit tests with ./scripts/api-tests.sh to make sure you didn't break anything
  • Add or update any needed documentation to the repo
  • Format the title like "feat: [new feature short description]". Title must follow the Conventional Commit Specification(https://www.conventionalcommits.org/en/v1.0.0/).
  • Linked all relevant issues
  • Include screenshot(s) showing how this pull request works and fixes the issue(s)

@jcpitre jcpitre linked an issue Feb 16, 2026 that may be closed by this pull request
@davidgamez davidgamez requested a review from cka-y February 17, 2026 15:02
with open("/proc/mounts", "r") as f:
for line in f:
parts = line.split()
if len(parts) >= 3 and parts[2] == "tmpfs" and "in-memory" in parts[1]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this very restictive the have mounts with "in-memory" in the name. The main function used

working_dir = os.getenv("WORKING_DIR", "/tmp/in-memory")

If the WORKING_DIR doesn't contain "in-memory" this function will not return the desired result. To make this process more resilient, we can explore using shutil.disk_usage as the caller, or one of the indirect callers should know the path(s) of the mounted volume.

self.dataset_stable_id,
extracted_files_path,
public=public,
skip_dataset_upload=True, # Skip the upload of the dataset file
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to remove the functionality of skipping the dataset upload? This is used in the rebuild_missing_dataset_files task.

Comment on lines +214 to +250
with zf.open(member, "r") as src, open(
temp_extracted_path, "wb"
) as dst:
shutil.copyfileobj(src, dst)

# Upload this single file to GCS under extracted/
if os.path.isfile(temp_extracted_path):
target_path = f"{self.feed_stable_id}/{dataset_stable_id}/extracted/{member.filename}"
file_blob = bucket.blob(target_path)
file_blob.upload_from_filename(temp_extracted_path)
if public:
file_blob.make_public()
self.logger.info(
"Uploaded extracted file %s to %s",
member.filename,
file_blob.public_url,
)

extracted_files.append(
Gtfsfile(
id=str(uuid.uuid4()),
file_name=member.filename,
file_size_bytes=os.path.getsize(temp_extracted_path),
hosted_url=file_blob.public_url if public else None,
hash=get_hash_from_file(temp_extracted_path),
)
)

# Remove the local temporary extracted file to free disk space
try:
if os.path.exists(temp_extracted_path):
os.remove(temp_extracted_path)
except Exception as cleanup_err:
self.logger.warning(
"Failed to remove temporary file %s: %s",
temp_extracted_path,
cleanup_err,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[suggestion]: This can be placed in its own function.

Comment on lines +567 to +570
try:
maximum_executions = int(os.getenv("MAXIMUM_EXECUTIONS", "1"))
except (ValueError, TypeError):
maximum_executions = 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines +650 to +681
def simulate(request) -> dict: # pragma: no cover
"""HTTP endpoint to simulate a process_dataset call for testing."""
# Hardcoded test values
payload = {
"execution_id": "task-executor-uuid-af993d49-0d95-42cb-96a4-9cffc5301e87",
"producer_url": "https://data.bus-data.dft.gov.uk/timetable/download/gtfs-file/all/",
"feed_stable_id": "mdb-2014",
"feed_id": "34434a73-0ba7-4070-b01f-dfadb6e30d42",
"dataset_stable_id": "mdb-2014-202408202259",
"dataset_hash": "abc",
"authentication_type": "0",
"authentication_info_url": "",
"api_key_parameter_name": "",
}

# Create CloudEvent
encoded_data = base64.b64encode(json.dumps(payload).encode()).decode()
attributes = {
"type": "google.cloud.pubsub.topic.v1.messagePublished",
"source": "//pubsub.googleapis.com/test",
"specversion": "1.0",
}
data = {"message": {"data": encoded_data}}
cloud_event = CloudEvent(attributes, data)

# Call process_dataset
process_dataset(cloud_event)
return {"status": "completed"}


def main(): # pragma: no cover
simulate(None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be part of the deployed code. We have many ways to send a pub sun message. If we need a better way of testing, we can create a script that sends this kind of message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Memory limit exceeded in batch-process-dataset-prod function

2 participants