11package land .oras ;
22
3- import java .io .BufferedInputStream ;
4- import java .io .InputStream ;
3+ import java .io .*;
54import java .net .URI ;
6- import java .io .IOException ;
75import java .nio .file .Files ;
86import java .nio .file .Path ;
97import java .nio .file .StandardCopyOption ;
8+ import java .security .DigestInputStream ;
9+ import java .security .MessageDigest ;
10+ import java .security .NoSuchAlgorithmException ;
1011import java .util .ArrayList ;
1112import java .util .HashMap ;
1213import java .util .List ;
@@ -639,7 +640,6 @@ public Registry build() {
639640 return registry .build ();
640641 }
641642 }
642-
643643
644644 /**
645645 * Push a blob using input stream so to abpid loading the whole blob in memory
@@ -648,90 +648,103 @@ public Registry build() {
648648 * @param size the size of the blob
649649 */
650650 public Layer pushBlobStream (ContainerRef containerRef , InputStream input , long size ) {
651+ Path tempFile = null ;
651652 try {
652- // Wrap the input stream in a BufferedInputStream to support mark/reset
653- BufferedInputStream bufferedInputStream = new BufferedInputStream (input );
654-
655- // Calculate the digest directly from the buffered stream
656- String digest = DigestUtils .sha256 (bufferedInputStream );
657-
658- // Log the calculated digest to verify it
659- System .out .println ("Calculated Digest: " + digest );
653+ // Create a temporary file to store the stream content
654+ tempFile = Files .createTempFile ("oras-upload-" , ".tmp" );
655+
656+ // Copy input stream to temp file while calculating digest
657+ String digest ;
658+ try (InputStream bufferedInput = new BufferedInputStream (input );
659+ DigestInputStream digestInput = new DigestInputStream (bufferedInput , MessageDigest .getInstance ("SHA-256" ));
660+ OutputStream fileOutput = Files .newOutputStream (tempFile )) {
661+
662+ digestInput .transferTo (fileOutput );
663+ byte [] digestBytes = digestInput .getMessageDigest ().digest ();
664+ digest = "sha256:" + bytesToHex (digestBytes );
665+ }
660666
661667 // Check if the blob already exists
662668 if (hasBlob (containerRef .withDigest (digest ))) {
663669 LOG .info ("Blob already exists: {}" , digest );
664670 return Layer .fromDigest (digest , size );
665671 }
666672
667- // Construct the URI for uploading the blob
668- URI uri = URI .create ("%s://%s" .formatted (getScheme (),
669- containerRef .withDigest (digest ).getBlobsUploadDigestPath ()));
670- System .out .println ("Uploading blob to: " + uri );
671- System .out .println ("Uploading file size: " + size );
673+ // Construct the URI for initiating the upload
674+ URI baseUri = URI .create ("%s://%s" .formatted (getScheme (), containerRef .getBlobsUploadPath ()));
675+ System .out .println ("Initiating blob upload at: " + baseUri );
672676
673- // Mark the stream position before uploading
674- bufferedInputStream . mark ( Integer . MAX_VALUE ); // Mark the stream for resetting
677+ // Create an empty input stream for the initial POST request
678+ InputStream emptyStream = new ByteArrayInputStream ( new byte [ 0 ]);
675679
676- // Upload the stream (resetting it for the upload process)
677- try (BufferedInputStream uploadStream = bufferedInputStream ) {
678- OrasHttpClient .ResponseWrapper <String > response = client .uploadStream (
679- "POST" , uri , uploadStream , size ,
680- Map .of (Const .CONTENT_TYPE_HEADER , Const .APPLICATION_OCTET_STREAM_HEADER_VALUE ));
681-
682- System .out .println ("Upload response code: " + response .statusCode ());
683- System .out .println ("Upload response headers: " + response .headers ());
684-
685- // Check for upload errors based on response
686- if (response .statusCode () != 202 ) {
687- throw new OrasException ("Unexpected response code during upload: " + response .statusCode ());
688- }
680+ // Start with a POST request to initiate the upload
681+ OrasHttpClient .ResponseWrapper <String > initiateResponse = client .uploadStream (
682+ "POST" , baseUri , emptyStream , 0 ,
683+ Map .of (Const .CONTENT_TYPE_HEADER , Const .APPLICATION_OCTET_STREAM_HEADER_VALUE ));
689684
690- // Extract the location URL from the response headers
691- String locationUrl = response . headers (). get ( "location" );
692- System . out . println ( "Location URL: " + locationUrl );
685+ if ( initiateResponse . statusCode () != 202 ) {
686+ throw new OrasException ( "Failed to initiate blob upload: " + initiateResponse . statusCode () );
687+ }
693688
694- if (locationUrl != null && !locationUrl .isEmpty ()) {
695- // Extract the digest from the location URL (before ?)
696- String locationDigest = locationUrl .split ("\\ ?" )[0 ].split ("/" )[5 ];
697- System .out .println ("Digest from location URL: " + locationDigest );
689+ // Get the location URL for the actual upload
690+ String locationUrl = initiateResponse .headers ().get ("location" );
691+ if (locationUrl == null || locationUrl .isEmpty ()) {
692+ throw new OrasException ("No location URL provided for blob upload" );
693+ }
698694
699- // Check if the digest matches the one in the location URL
700- if (!locationDigest . equals ( digest )) {
701- throw new OrasException ( "Digest mismatch in location URL: expected " + digest + ", but found " + locationDigest );
702- }
695+ // Ensure the location URL is absolute
696+ if (!locationUrl . startsWith ( "http" )) {
697+ locationUrl = "%s://%s%s" . formatted ( getScheme (), containerRef . getRegistry (), locationUrl );
698+ }
703699
704- // Finalize the upload with a PUT request to the location URL
705- URI finalizeUri = URI .create (locationUrl );
706- try (BufferedInputStream uploadFinalizeStream = bufferedInputStream ) {
707- OrasHttpClient .ResponseWrapper <String > finalizeResponse = client .uploadStream (
708- "PUT" , finalizeUri , uploadFinalizeStream , size ,
709- Map .of (Const .CONTENT_TYPE_HEADER , Const .APPLICATION_OCTET_STREAM_HEADER_VALUE ));
700+ // Construct the final upload URI with the digest parameter
701+ String separator = locationUrl .contains ("?" ) ? "&" : "?" ;
702+ URI finalizeUri = URI .create (locationUrl + separator + "digest=" + digest );
710703
711- System .out .println ("Finalize upload response code: " + finalizeResponse .statusCode ());
712- System .out .println ("Finalize upload response body: " + finalizeResponse .response ());
704+ // Upload the content from the temporary file
705+ try (InputStream uploadStream = Files .newInputStream (tempFile )) {
706+ OrasHttpClient .ResponseWrapper <String > uploadResponse = client .uploadStream (
707+ "PUT" , finalizeUri , uploadStream , size ,
708+ Map .of (Const .CONTENT_TYPE_HEADER , Const .APPLICATION_OCTET_STREAM_HEADER_VALUE ));
713709
714- // Handle the finalization response
715- logResponse (finalizeResponse );
716- handleError (finalizeResponse );
717- }
710+ if (uploadResponse .statusCode () != 201 && uploadResponse .statusCode () != 202 ) {
711+ throw new OrasException ("Failed to upload blob: " + uploadResponse .statusCode () +
712+ " - Response: " + uploadResponse .response ());
718713 }
719714
720715 return Layer .fromDigest (digest , size );
721716 }
722- } catch (IOException e ) {
723- System .err .println ("IOException occurred during blob upload: " + e .getMessage ());
717+ } catch (IOException | NoSuchAlgorithmException e ) {
718+ System .err .println ("Error during blob upload: " + e .getMessage ());
719+ e .printStackTrace ();
724720 throw new OrasException ("Failed to push blob stream" , e );
721+ } finally {
722+ // Clean up the temporary file
723+ if (tempFile != null ) {
724+ try {
725+ Files .deleteIfExists (tempFile );
726+ } catch (IOException e ) {
727+ LOG .warn ("Failed to delete temporary file: {}" , tempFile , e );
728+ }
729+ }
725730 }
726731 }
727732
728-
729-
730-
731-
732-
733-
734-
733+ /**
734+ * Bites to hex string
735+ * @param bytes of bytes[]
736+ */
737+ private static String bytesToHex (byte [] bytes ) {
738+ StringBuilder hexString = new StringBuilder ();
739+ for (byte b : bytes ) {
740+ String hex = Integer .toHexString (0xff & b );
741+ if (hex .length () == 1 ) {
742+ hexString .append ('0' );
743+ }
744+ hexString .append (hex );
745+ }
746+ return hexString .toString ();
747+ }
735748
736749 /**
737750 * Get blob as stream to avoid loading into memory
0 commit comments