@@ -300,11 +300,36 @@ def upload_filepath(self, local_filepath):
300300 )
301301 else :
302302 # upload the file and create its tracking entry
303- self ._upload_file (
304- local_filepath ,
305- self ._make_external_filepath (relative_filepath ),
306- metadata = {"contents_hash" : str (contents_hash ) if contents_hash else "" },
307- )
303+ external_path = self ._make_external_filepath (relative_filepath )
304+ already_uploaded = False
305+ if self .spec ["protocol" ] == "s3" :
306+ stat = self .s3 .stat (str (external_path ))
307+ if stat is not None and stat .size == file_size :
308+ # Verify contents_hash from S3 metadata when available
309+ if skip_checksum :
310+ already_uploaded = True
311+ else :
312+ remote_meta = {
313+ k .lower ().lstrip ("x-amz-meta-" ): v
314+ for k , v in (stat .metadata or {}).items ()
315+ }
316+ remote_hash = remote_meta .get ("contents_hash" , "" )
317+ if remote_hash == str (contents_hash ):
318+ already_uploaded = True
319+ if already_uploaded :
320+ logger .info (
321+ f"File already exists on S3 with matching size"
322+ f"{ '' if skip_checksum else ' and checksum' } "
323+ f", skipping upload: '{ relative_filepath } '"
324+ )
325+ if not already_uploaded :
326+ self ._upload_file (
327+ local_filepath ,
328+ external_path ,
329+ metadata = {
330+ "contents_hash" : str (contents_hash ) if contents_hash else ""
331+ },
332+ )
308333 self .connection .query (
309334 "INSERT INTO {tab} (hash, size, filepath, contents_hash) VALUES (%s, {size}, '{filepath}', %s)" .format (
310335 tab = self .full_table_name ,
0 commit comments