diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/DocumentUploadService.java b/sdm/src/main/java/com/sap/cds/sdm/service/DocumentUploadService.java index dc80e930..98e001d3 100644 --- a/sdm/src/main/java/com/sap/cds/sdm/service/DocumentUploadService.java +++ b/sdm/src/main/java/com/sap/cds/sdm/service/DocumentUploadService.java @@ -4,8 +4,6 @@ import static com.sap.cds.sdm.constants.SDMConstants.TECHNICAL_USER_FLOW; import com.sap.cds.feature.attachments.service.model.servicehandler.AttachmentCreateEventContext; -import com.sap.cds.reflect.CdsEntity; -import com.sap.cds.reflect.CdsModel; import com.sap.cds.sdm.constants.SDMConstants; import com.sap.cds.sdm.constants.SDMErrorMessages; import com.sap.cds.sdm.handler.TokenHandler; @@ -67,9 +65,6 @@ public JSONObject createDocument( } long totalSize = cmisDocument.getContentLength(); int chunkSize = SDMConstants.CHUNK_SIZE; - CdsModel model = eventContext.getModel(); - Optional attachmentDraftEntity = - model.findEntity(eventContext.getAttachmentEntity() + "_drafts"); cmisDocument.setUploadStatus(SDMConstants.UPLOAD_STATUS_IN_PROGRESS); if (totalSize <= 400 * 1024 * 1024) { @@ -278,15 +273,27 @@ private JSONObject uploadLargeFileInChunks( // Step 7: Append Chunk. Call cmis api to append content stream if (bytesRead > 0) { - responseBody = - appendContentStream( - cmisDocument, - sdmUrl, - chunkBuffer, - bytesRead, - isLastChunk, - chunkIndex, - isSystemUser); + // Only capture response from the last chunk to avoid unnecessary object allocation + if (isLastChunk) { + responseBody = + appendContentStream( + cmisDocument, + sdmUrl, + chunkBuffer, + bytesRead, + isLastChunk, + chunkIndex, + isSystemUser); + } else { + appendContentStream( + cmisDocument, + sdmUrl, + chunkBuffer, + bytesRead, + isLastChunk, + chunkIndex, + isSystemUser); + } } long endChunkUploadTime = System.currentTimeMillis(); diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java b/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java index 08a273cf..67f5e1b8 100644 --- a/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java +++ b/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java @@ -2,9 +2,7 @@ import com.sap.cds.sdm.constants.SDMConstants; import com.sap.cds.sdm.service.exceptions.InsufficientDataException; -import io.reactivex.Flowable; import java.io.*; -import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -25,7 +23,8 @@ public class ReadAheadInputStream extends InputStream { private final ExecutorService executor = Executors.newFixedThreadPool(2); // Thread pool to Read next chunk private final BlockingQueue chunkQueue = - new LinkedBlockingQueue<>(50); // Next chunk is read to a queue + new LinkedBlockingQueue<>( + 4); // Reduced from 50 to 4 (80MB) - balances read-ahead performance with heap constraints public ReadAheadInputStream(InputStream inputStream, long totalSize) throws IOException { if (inputStream == null) { @@ -91,42 +90,50 @@ private void preloadChunks() { private void readChunk(AtomicReference bufferRef, AtomicLong bytesReadAtomic) throws IOException { + int maxRetries = 5; + int retryCount = 0; + while (bytesReadAtomic.get() < CHUNK_SIZE) { try { - List results = - Flowable.fromCallable( - () -> { - byte[] buffer = bufferRef.get(); - // Read from stream and update bytesReadAtomic - int result = - originalStream.read( - buffer, - (int) bytesReadAtomic.get(), - CHUNK_SIZE - (int) bytesReadAtomic.get()); - if (result > 0) { - bytesReadAtomic.addAndGet(result); - } else if (result == 0) { - throw new InsufficientDataException("Read returned 0 bytes"); - } - return result; - }) - .retryWhen(RetryUtils.retryLogic(5)) // Apply retry logic with 5 attempts - .toList() - .blockingGet(); - - if (results == null || results.isEmpty()) - throw new IOException("Failed to read chunk: results is null or empty"); - // Check if the read was successful - - int readAttempt = results.get(0); - - if (readAttempt == -1) { + byte[] buffer = bufferRef.get(); + int result = + originalStream.read( + buffer, (int) bytesReadAtomic.get(), CHUNK_SIZE - (int) bytesReadAtomic.get()); + + if (result > 0) { + bytesReadAtomic.addAndGet(result); + retryCount = 0; // Reset retry count on successful read + } else if (result == -1) { logger.info("EOF reached while reading the stream."); break; + } else if (result == 0) { + // Treat 0 bytes read as InsufficientDataException (matches original behavior) + throw new InsufficientDataException("Read returned 0 bytes"); + } + } catch (EOFException | InsufficientDataException e) { + // These exceptions should be retried (matching RetryUtils.shouldRetry()) + retryCount++; + if (retryCount >= maxRetries) { + logger.error("Failed to read chunk after {} retries: {}", maxRetries, e.getMessage(), e); + throw new IOException("Failed to read chunk after retries", e); + } + long delaySeconds = + (long) Math.pow(2, retryCount); // Exponential backoff: 2, 4, 8, 16, 32 seconds + logger.info( + "Retry attempt {} failed. Retrying in {} seconds. Error: {}", + retryCount, + delaySeconds, + e.getMessage()); + try { + Thread.sleep(delaySeconds * 1000); // Convert to milliseconds + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted during retry backoff", ie); } - } catch (Exception e) { - logger.error("Failed to read chunk after retries: {}", e.getMessage(), e); - throw new IOException("Failed to read chunk", e); + } catch (IOException e) { + // Other IOExceptions should fail immediately (not retried in original) + logger.error("Non-retryable IOException: {}", e.getMessage(), e); + throw e; } } }