From 6f627c7e5af001de17ed0907c67be4597c69b61f Mon Sep 17 00:00:00 2001 From: Dimitri Yatsenko Date: Tue, 17 Feb 2026 12:40:01 -0600 Subject: [PATCH] fix: Skip redundant S3 upload when file already exists (#1397) After a transaction rollback, S3 files survive but DB tracking entries are lost. On retry, upload_filepath would re-upload the entire file (potentially multi-GB) because it only checked the DB. Now checks S3 via a single stat_object call before uploading. If the object exists with matching size and contents_hash metadata, the upload is skipped. The DB tracking entry is always (re-)inserted regardless. Also adds s3.Folder.stat() method and refactors exists() to use it, avoiding redundant stat_object calls. Co-Authored-By: Claude Opus 4.6 --- datajoint/external.py | 35 ++++++++++++++++++++++++++++++----- datajoint/s3.py | 17 ++++++++++------- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/datajoint/external.py b/datajoint/external.py index ef4ce0be4..0f210556d 100644 --- a/datajoint/external.py +++ b/datajoint/external.py @@ -300,11 +300,36 @@ def upload_filepath(self, local_filepath): ) else: # upload the file and create its tracking entry - self._upload_file( - local_filepath, - self._make_external_filepath(relative_filepath), - metadata={"contents_hash": str(contents_hash) if contents_hash else ""}, - ) + external_path = self._make_external_filepath(relative_filepath) + already_uploaded = False + if self.spec["protocol"] == "s3": + stat = self.s3.stat(str(external_path)) + if stat is not None and stat.size == file_size: + # Verify contents_hash from S3 metadata when available + if skip_checksum: + already_uploaded = True + else: + remote_meta = { + k.lower().lstrip("x-amz-meta-"): v + for k, v in (stat.metadata or {}).items() + } + remote_hash = remote_meta.get("contents_hash", "") + if remote_hash == str(contents_hash): + already_uploaded = True + if already_uploaded: + logger.info( + f"File already exists on S3 with matching size" + f"{'' if skip_checksum else ' and checksum'}" + f", skipping upload: '{relative_filepath}'" + ) + if not already_uploaded: + self._upload_file( + local_filepath, + external_path, + metadata={ + "contents_hash": str(contents_hash) if contents_hash else "" + }, + ) self.connection.query( "INSERT INTO {tab} (hash, size, filepath, contents_hash) VALUES (%s, {size}, '{filepath}', %s)".format( tab=self.full_table_name, diff --git a/datajoint/s3.py b/datajoint/s3.py index 98dc75708..9f58f52f0 100644 --- a/datajoint/s3.py +++ b/datajoint/s3.py @@ -95,16 +95,19 @@ def fget(self, name, local_filepath): if "contents_hash" in meta: return uuid.UUID(meta["contents_hash"]) - def exists(self, name): - logger.debug("exists: {}:{}".format(self.bucket, name)) + def stat(self, name): + """Return stat result for an object, or None if it does not exist.""" + logger.debug("stat: {}:{}".format(self.bucket, name)) try: - self.client.stat_object(self.bucket, str(name)) + return self.client.stat_object(self.bucket, str(name)) except minio.error.S3Error as e: if e.code == "NoSuchKey": - return False - else: - raise e - return True + return None + raise e + + def exists(self, name): + logger.debug("exists: {}:{}".format(self.bucket, name)) + return self.stat(name) is not None def get_size(self, name): logger.debug("get_size: {}:{}".format(self.bucket, name))