Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import static com.sap.cds.sdm.constants.SDMConstants.TECHNICAL_USER_FLOW;

import com.sap.cds.feature.attachments.service.model.servicehandler.AttachmentCreateEventContext;
import com.sap.cds.reflect.CdsEntity;
import com.sap.cds.reflect.CdsModel;
import com.sap.cds.sdm.constants.SDMConstants;
import com.sap.cds.sdm.constants.SDMErrorMessages;
import com.sap.cds.sdm.handler.TokenHandler;
Expand Down Expand Up @@ -67,9 +65,6 @@ public JSONObject createDocument(
}
long totalSize = cmisDocument.getContentLength();
int chunkSize = SDMConstants.CHUNK_SIZE;
CdsModel model = eventContext.getModel();
Optional<CdsEntity> attachmentDraftEntity =
model.findEntity(eventContext.getAttachmentEntity() + "_drafts");
cmisDocument.setUploadStatus(SDMConstants.UPLOAD_STATUS_IN_PROGRESS);
if (totalSize <= 400 * 1024 * 1024) {

Expand Down Expand Up @@ -278,15 +273,27 @@ private JSONObject uploadLargeFileInChunks(

// Step 7: Append Chunk. Call cmis api to append content stream
if (bytesRead > 0) {
responseBody =
appendContentStream(
cmisDocument,
sdmUrl,
chunkBuffer,
bytesRead,
isLastChunk,
chunkIndex,
isSystemUser);
// Only capture response from the last chunk to avoid unnecessary object allocation
if (isLastChunk) {
responseBody =
appendContentStream(
cmisDocument,
sdmUrl,
chunkBuffer,
bytesRead,
isLastChunk,
chunkIndex,
isSystemUser);
} else {
appendContentStream(
cmisDocument,
sdmUrl,
chunkBuffer,
bytesRead,
isLastChunk,
chunkIndex,
isSystemUser);
}
}

long endChunkUploadTime = System.currentTimeMillis();
Expand Down
75 changes: 41 additions & 34 deletions sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

import com.sap.cds.sdm.constants.SDMConstants;
import com.sap.cds.sdm.service.exceptions.InsufficientDataException;
import io.reactivex.Flowable;
import java.io.*;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -25,7 +23,8 @@ public class ReadAheadInputStream extends InputStream {
private final ExecutorService executor =
Executors.newFixedThreadPool(2); // Thread pool to Read next chunk
private final BlockingQueue<byte[]> chunkQueue =
new LinkedBlockingQueue<>(50); // Next chunk is read to a queue
new LinkedBlockingQueue<>(
4); // Reduced from 50 to 4 (80MB) - balances read-ahead performance with heap constraints

public ReadAheadInputStream(InputStream inputStream, long totalSize) throws IOException {
if (inputStream == null) {
Expand Down Expand Up @@ -91,42 +90,50 @@ private void preloadChunks() {

private void readChunk(AtomicReference<byte[]> bufferRef, AtomicLong bytesReadAtomic)
throws IOException {
int maxRetries = 5;
int retryCount = 0;

while (bytesReadAtomic.get() < CHUNK_SIZE) {
try {
List<Integer> results =
Flowable.fromCallable(
() -> {
byte[] buffer = bufferRef.get();
// Read from stream and update bytesReadAtomic
int result =
originalStream.read(
buffer,
(int) bytesReadAtomic.get(),
CHUNK_SIZE - (int) bytesReadAtomic.get());
if (result > 0) {
bytesReadAtomic.addAndGet(result);
} else if (result == 0) {
throw new InsufficientDataException("Read returned 0 bytes");
}
return result;
})
.retryWhen(RetryUtils.retryLogic(5)) // Apply retry logic with 5 attempts
.toList()
.blockingGet();

if (results == null || results.isEmpty())
throw new IOException("Failed to read chunk: results is null or empty");
// Check if the read was successful

int readAttempt = results.get(0);

if (readAttempt == -1) {
byte[] buffer = bufferRef.get();
int result =
originalStream.read(
buffer, (int) bytesReadAtomic.get(), CHUNK_SIZE - (int) bytesReadAtomic.get());

if (result > 0) {
bytesReadAtomic.addAndGet(result);
retryCount = 0; // Reset retry count on successful read
} else if (result == -1) {
logger.info("EOF reached while reading the stream.");
break;
} else if (result == 0) {
// Treat 0 bytes read as InsufficientDataException (matches original behavior)
throw new InsufficientDataException("Read returned 0 bytes");
}
} catch (EOFException | InsufficientDataException e) {
// These exceptions should be retried (matching RetryUtils.shouldRetry())
retryCount++;
if (retryCount >= maxRetries) {
logger.error("Failed to read chunk after {} retries: {}", maxRetries, e.getMessage(), e);
throw new IOException("Failed to read chunk after retries", e);
}
long delaySeconds =
(long) Math.pow(2, retryCount); // Exponential backoff: 2, 4, 8, 16, 32 seconds
logger.info(
"Retry attempt {} failed. Retrying in {} seconds. Error: {}",
retryCount,
delaySeconds,
e.getMessage());
try {
Thread.sleep(delaySeconds * 1000); // Convert to milliseconds
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted during retry backoff", ie);
}
} catch (Exception e) {
logger.error("Failed to read chunk after retries: {}", e.getMessage(), e);
throw new IOException("Failed to read chunk", e);
} catch (IOException e) {
// Other IOExceptions should fail immediately (not retried in original)
logger.error("Non-retryable IOException: {}", e.getMessage(), e);
throw e;
}
}
}
Expand Down
Loading