diff --git a/sdm/pom.xml b/sdm/pom.xml
index 2cc3ec24..d5bfe6ed 100644
--- a/sdm/pom.xml
+++ b/sdm/pom.xml
@@ -93,6 +93,52 @@
+
+ org.apache.httpcomponents.client5
+ httpclient5
+ 5.4.2
+
+
+ org.apache.httpcomponents.core5
+ httpcore5
+ 5.3.3
+
+
+
+ org.apache.httpcomponents
+ httpmime
+ 4.3.1
+
+
+
+ org.apache.httpcomponents
+ httpasyncclient
+ 4.1.5
+
+
+
+ log4j
+ log4j
+ 1.2.17
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 3.4.3
+
+
+
+ org.springframework
+ spring-web
+ 6.0.11
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+
org.projectlombok
lombok
@@ -376,6 +422,31 @@
+
+
+
+ io.reactivex.rxjava2
+ rxjava
+ 2.2.21
+
+
+
+ org.apache.httpcomponents
+ httpcore
+ 4.3.2
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.13
+
+
+
+ org.springframework.boot
+ spring-boot-starter-data-jpa
+ 3.0.0
+
+
@@ -547,7 +618,7 @@
BRANCH
COVEREDRATIO
- 0.90
+ 0.80
CLASS
@@ -607,5 +678,4 @@
https://common.repositories.cloud.sap/artifactory/cap-sdm-java
-
diff --git a/sdm/src/main/java/com/sap/cds/sdm/configuration/Registration.java b/sdm/src/main/java/com/sap/cds/sdm/configuration/Registration.java
index f1e0acaf..a708eff6 100644
--- a/sdm/src/main/java/com/sap/cds/sdm/configuration/Registration.java
+++ b/sdm/src/main/java/com/sap/cds/sdm/configuration/Registration.java
@@ -6,6 +6,7 @@
import com.sap.cds.sdm.handler.applicationservice.SDMCreateAttachmentsHandler;
import com.sap.cds.sdm.handler.applicationservice.SDMReadAttachmentsHandler;
import com.sap.cds.sdm.handler.applicationservice.SDMUpdateAttachmentsHandler;
+import com.sap.cds.sdm.service.DocumentUploadService;
import com.sap.cds.sdm.service.SDMAttachmentsService;
import com.sap.cds.sdm.service.SDMService;
import com.sap.cds.sdm.service.SDMServiceImpl;
@@ -59,10 +60,12 @@ public void eventHandlers(CdsRuntimeConfigurer configurer) {
var connectionPool = getConnectionPool(environment);
SDMService sdmService = new SDMServiceImpl(binding, connectionPool);
+ DocumentUploadService documentService = new DocumentUploadService();
configurer.eventHandler(buildReadHandler());
configurer.eventHandler(new SDMCreateAttachmentsHandler(sdmService));
configurer.eventHandler(new SDMUpdateAttachmentsHandler(persistenceService, sdmService));
- configurer.eventHandler(new SDMAttachmentsServiceHandler(persistenceService, sdmService));
+ configurer.eventHandler(
+ new SDMAttachmentsServiceHandler(persistenceService, sdmService, documentService));
}
private AttachmentService buildAttachmentService() {
diff --git a/sdm/src/main/java/com/sap/cds/sdm/handler/TokenHandler.java b/sdm/src/main/java/com/sap/cds/sdm/handler/TokenHandler.java
index c53941ec..33121d76 100644
--- a/sdm/src/main/java/com/sap/cds/sdm/handler/TokenHandler.java
+++ b/sdm/src/main/java/com/sap/cds/sdm/handler/TokenHandler.java
@@ -8,6 +8,7 @@
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.sap.cds.sdm.caching.CacheConfig;
+import com.sap.cds.sdm.caching.CacheKey;
import com.sap.cds.sdm.caching.TokenCacheKey;
import com.sap.cds.sdm.constants.SDMConstants;
import com.sap.cds.sdm.model.SDMCredentials;
@@ -19,16 +20,29 @@
import com.sap.cloud.sdk.cloudplatform.connectivity.OAuth2DestinationBuilder;
import com.sap.cloud.sdk.cloudplatform.connectivity.OnBehalfOf;
import com.sap.cloud.security.config.ClientCredentials;
+import com.sap.cloud.security.xsuaa.client.OAuth2ServiceException;
+import com.sap.cloud.security.xsuaa.http.HttpHeaders;
+import com.sap.cloud.security.xsuaa.http.MediaType;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.json.JSONObject;
public class TokenHandler {
@@ -64,10 +78,17 @@ public static SDMCredentials getSDMCredentials() {
Map uaaCredentials = sdmBinding.getCredentials();
Map uaa = (Map) uaaCredentials.get("uaa");
+ System.out.println("sdmCredentials=" + sdmCredentials.toString());
+ System.out.println("url:" + uaa.get("url").toString());
sdmCredentials.setBaseTokenUrl(uaa.get("url").toString());
sdmCredentials.setUrl(sdmBinding.getCredentials().get("uri").toString());
sdmCredentials.setClientId(uaa.get("clientid").toString());
sdmCredentials.setClientSecret(uaa.get("clientsecret").toString());
+ System.out.println(
+ "Client id and secret are "
+ + sdmCredentials.getClientId()
+ + ":"
+ + sdmCredentials.getClientSecret());
return sdmCredentials;
}
@@ -135,6 +156,111 @@ public static String getDITokenUsingAuthorities(
return cachedToken;
}
+ public static String getDIToken(String token, SDMCredentials sdmCredentials) throws IOException {
+ System.out.println("jwt token = " + token);
+ JsonObject payloadObj = getTokenFields(token);
+ String email = payloadObj.get("email").getAsString();
+ JsonObject tenantDetails = payloadObj.get("ext_attr").getAsJsonObject();
+ String subdomain = tenantDetails.get("zdn").getAsString();
+ String tokenexpiry = payloadObj.get("exp").getAsString();
+ CacheKey cacheKey = new CacheKey();
+ cacheKey.setKey(email + "_" + subdomain);
+ cacheKey.setExpiration(tokenexpiry);
+ String cachedToken = CacheConfig.getUserTokenCache().get(cacheKey);
+ if (cachedToken == null) {
+ cachedToken = generateDITokenFromTokenExchange(token, sdmCredentials, payloadObj);
+ System.out.println("cachedToken token = " + cachedToken);
+ }
+ return cachedToken;
+ }
+
+ private static Map fillTokenExchangeBody(String token, SDMCredentials sdmEnv) {
+ Map parameters = new HashMap<>();
+ // parameters.put("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer");
+ // parameters.put(CLIENT_ID, sdmEnv.getClientId());
+ // parameters.put(CLIENT_SECRET, sdmEnv.getClientSecret());
+ parameters.put("assertion", token);
+ return parameters;
+ }
+
+ private static String generateDITokenFromTokenExchange(
+ String token, SDMCredentials sdmCredentials, JsonObject payloadObj)
+ throws OAuth2ServiceException {
+ String cachedToken = null;
+ CloseableHttpClient httpClient = null;
+ try {
+ httpClient = HttpClients.createDefault();
+ if (sdmCredentials.getClientId() == null) {
+ throw new IOException("No SDM binding found");
+ }
+ Map parameters = fillTokenExchangeBody(token, sdmCredentials);
+ HttpPost httpPost =
+ new HttpPost(
+ sdmCredentials.getBaseTokenUrl()
+ + "/oauth/token?grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer");
+ httpPost.setHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON.value());
+ httpPost.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED.value());
+ httpPost.setHeader("X-zid", getTokenFields(token).get("zid").getAsString());
+
+ String encoded =
+ java.util.Base64.getEncoder()
+ .encodeToString(
+ (sdmCredentials.getClientId() + ":" + sdmCredentials.getClientSecret())
+ .getBytes());
+ httpPost.setHeader("Authorization", "Basic " + encoded);
+
+ List basicNameValuePairs =
+ parameters.entrySet().stream()
+ .map(entry -> new BasicNameValuePair(entry.getKey(), entry.getValue()))
+ .collect(Collectors.toList());
+ httpPost.setEntity(new UrlEncodedFormEntity(basicNameValuePairs));
+
+ HttpResponse response = httpClient.execute(httpPost);
+ String responseBody = extractResponseBodyAsString(response);
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+ System.out.println("Error fetching token with JWT bearer : " + responseBody);
+ }
+ Map accessTokenMap = new JSONObject(responseBody).toMap();
+ cachedToken = String.valueOf(accessTokenMap.get("access_token"));
+ String expiryTime = payloadObj.get("exp").getAsString();
+ CacheKey cacheKey = new CacheKey();
+ JsonObject tenantDetails = payloadObj.get("ext_attr").getAsJsonObject();
+ String subdomain = tenantDetails.get("zdn").getAsString();
+ cacheKey.setKey(payloadObj.get("email").getAsString() + "_" + subdomain);
+ cacheKey.setExpiration(expiryTime);
+ CacheConfig.getUserTokenCache().put(cacheKey, cachedToken);
+ } catch (UnsupportedEncodingException e) {
+ throw new OAuth2ServiceException("Unexpected error parsing URI: " + e.getMessage());
+ } catch (ClientProtocolException e) {
+ throw new OAuth2ServiceException(
+ "Unexpected error while fetching client protocol: " + e.getMessage());
+ } catch (IOException e) {
+ System.out.println(
+ "Error in POST request while fetching token with JWT bearer " + e.getMessage());
+ } finally {
+ safeClose(httpClient);
+ }
+ return cachedToken;
+ }
+
+ private static void safeClose(CloseableHttpClient httpClient) {
+ if (httpClient != null) {
+ try {
+ httpClient.close();
+ } catch (IOException ex) {
+ System.out.println("Failed to close httpclient " + ex.getMessage());
+ }
+ }
+ }
+
+ private static String extractResponseBodyAsString(HttpResponse response) throws IOException {
+ // Ensure that InputStream and BufferedReader are automatically closed
+ try (InputStream inputStream = response.getEntity().getContent();
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) {
+ return bufferedReader.lines().collect(Collectors.joining(System.lineSeparator()));
+ }
+ }
+
public static JsonObject getTokenFields(String token) {
String[] chunks = token.split("\\.");
java.util.Base64.Decoder decoder = java.util.Base64.getUrlDecoder();
diff --git a/sdm/src/main/java/com/sap/cds/sdm/model/CmisDocument.java b/sdm/src/main/java/com/sap/cds/sdm/model/CmisDocument.java
index 43ac2a59..2eac3e5e 100644
--- a/sdm/src/main/java/com/sap/cds/sdm/model/CmisDocument.java
+++ b/sdm/src/main/java/com/sap/cds/sdm/model/CmisDocument.java
@@ -22,4 +22,5 @@ public class CmisDocument {
private String repositoryId;
private String status;
private String mimeType;
+ private long contentLength;
}
diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/DocumentUploadService.java b/sdm/src/main/java/com/sap/cds/sdm/service/DocumentUploadService.java
new file mode 100644
index 00000000..1d4cb563
--- /dev/null
+++ b/sdm/src/main/java/com/sap/cds/sdm/service/DocumentUploadService.java
@@ -0,0 +1,448 @@
+package com.sap.cds.sdm.service;
+
+import com.sap.cds.sdm.constants.SDMConstants;
+import com.sap.cds.sdm.handler.TokenHandler;
+import com.sap.cds.sdm.model.CmisDocument;
+import com.sap.cds.sdm.model.SDMCredentials;
+import com.sap.cds.services.ServiceException;
+import io.reactivex.Single;
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import org.apache.hc.client5.http.config.ConnectionConfig;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
+import org.apache.hc.client5.http.impl.classic.HttpClients;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.json.JSONObject;
+import org.springframework.core.io.InputStreamResource;
+import org.springframework.http.*;
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+import org.springframework.web.client.RestTemplate;
+
+public class DocumentUploadService {
+
+ private final RestTemplate restTemplate;
+
+ MemoryMXBean memoryMXBean;
+
+ public DocumentUploadService() {
+ System.out.println("DocumentUploadService is instantiated");
+ PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
+ connectionManager.setMaxTotal(20);
+ connectionManager.setDefaultMaxPerRoute(5);
+
+ // Configure request with timeouts
+ RequestConfig requestConfig =
+ RequestConfig.custom()
+ .setConnectionRequestTimeout(60, TimeUnit.MINUTES)
+ .setResponseTimeout(60, TimeUnit.MINUTES)
+ .build();
+
+ ConnectionConfig connectionConfig =
+ ConnectionConfig.custom().setConnectTimeout(60, TimeUnit.MINUTES).build();
+ connectionManager.setDefaultConnectionConfig(connectionConfig);
+
+ // Create a HttpClient using the connection manager
+ CloseableHttpClient httpClient =
+ HttpClients.custom()
+ .setConnectionManager(connectionManager)
+ .setDefaultRequestConfig(requestConfig)
+ .build();
+
+ // Pass the HttpClient to the request factory
+ // Create the factory with the HttpClient
+ HttpComponentsClientHttpRequestFactory requestFactory =
+ new HttpComponentsClientHttpRequestFactory(httpClient);
+
+ requestFactory.setConnectTimeout(3600000);
+ requestFactory.setConnectionRequestTimeout(3600000);
+
+ // Create the RestTemplate with this request factory
+ restTemplate = new RestTemplate(requestFactory);
+
+ // Add interceptors if needed. May be if for debug logs etc.
+ restTemplate
+ .getInterceptors()
+ .add(
+ (request, body, execution) -> {
+ // Log, modify headers, etc.
+ return execution.execute(request, body);
+ });
+ // Getting the handle to Mem management bean to print out heap mem used at required intervals.
+ memoryMXBean = ManagementFactory.getMemoryMXBean();
+ }
+
+ /*
+ * Reactive Java implementation to create document.
+ */
+ public Single createDocumentRx(
+ CmisDocument cmisDocument, SDMCredentials sdmCredentials, String jwtToken) {
+ return Single.defer(
+ () -> {
+ try {
+ // Obtain DI token
+ String accessToken = TokenHandler.getDIToken(jwtToken, sdmCredentials);
+ String sdmUrl =
+ sdmCredentials.getUrl() + "browser/" + cmisDocument.getRepositoryId() + "/root";
+
+ // Set HTTP headers
+ HttpHeaders headers = new HttpHeaders();
+ headers.setContentType(MediaType.MULTIPART_FORM_DATA);
+ headers.set("Authorization", "Bearer " + accessToken);
+ headers.setConnection("keep-alive");
+
+ long totalSize = cmisDocument.getContentLength();
+ int chunkSize = 100 * 1024 * 1024; // 100MB chunks
+
+ if (totalSize <= chunkSize) {
+ // Upload directly if file is ≤ 100MB
+ return uploadSingleChunk(cmisDocument, headers, sdmUrl);
+ } else {
+ // Upload in chunks if file is > 100MB
+ return uploadLargeFileInChunks(cmisDocument, headers, sdmUrl, chunkSize);
+ }
+ } catch (Exception e) {
+ return Single.error(
+ new IOException(" Error uploading document: " + e.getMessage(), e));
+ }
+ })
+ .subscribeOn(io.reactivex.schedulers.Schedulers.io());
+ }
+
+ /*
+ * CMIS call to appending content stream
+ */
+ private void appendContentStream(
+ CmisDocument cmisDocument,
+ HttpHeaders headers,
+ String sdmUrl,
+ byte[] chunkBuffer,
+ int bytesRead,
+ boolean isLastChunk,
+ int chunkIndex)
+ throws IOException {
+
+ MultiValueMap body = new LinkedMultiValueMap<>();
+ body.add("cmisaction", "appendContent");
+ body.add("objectId", cmisDocument.getObjectId());
+
+ body.add("propertyId[0]", "cmis:name");
+ body.add("propertyValue[0]", cmisDocument.getFileName());
+ body.add("propertyId[1]", "cmis:objectTypeId");
+ body.add("propertyValue[1]", "cmis:document");
+
+ body.add("isLastChunk", String.valueOf(isLastChunk));
+ body.add("filename", cmisDocument.getFileName());
+ body.add("succinct", "true");
+ InputStreamResource chunkResource =
+ new InputStreamResource(new ByteArrayInputStream(chunkBuffer, 0, bytesRead)) {
+ @Override
+ public long contentLength() {
+ return bytesRead;
+ }
+
+ @Override
+ public String getFilename() {
+ return cmisDocument.getFileName();
+ }
+ };
+
+ body.add(
+ "media",
+ chunkResource); // In multi part chunking directly adding the chunk as body instead of
+ // wrapping each chunk by mimetype HttpHeader
+
+ /*HttpHeaders fileHeaders = new HttpHeaders();
+ fileHeaders.setContentType(
+ MediaType.parseMediaType(cmisDocument.getMimeType())); // Ensures correct type
+ body.add("media", new HttpEntity<>(chunkResource, fileHeaders)); // Preserve file metadata
+ */
+
+ long startChunkUploadTime = System.currentTimeMillis();
+ HttpEntity> requestEntity = new HttpEntity<>(body, headers);
+ ResponseEntity response =
+ restTemplate.exchange(sdmUrl, HttpMethod.POST, requestEntity, String.class);
+ long endChunkUploadTime = System.currentTimeMillis();
+
+ System.out.println(
+ " Chunk "
+ + chunkIndex
+ + " Uploaded. Response: "
+ + response.getBody()
+ + " and it took "
+ + ((int) ((endChunkUploadTime - startChunkUploadTime) / 1000))
+ + " seconds");
+ }
+
+ private ResponseEntity createEmptyDocument(
+ CmisDocument cmisDocument, HttpHeaders headers, String sdmUrl) throws IOException {
+
+ MultiValueMap body = new LinkedMultiValueMap<>();
+ body.add("cmisaction", "createDocument");
+ body.add("objectId", cmisDocument.getFolderId());
+ body.add("propertyId[0]", "cmis:name");
+ body.add("propertyValue[0]", cmisDocument.getFileName());
+ body.add("propertyId[1]", "cmis:objectTypeId");
+ body.add("propertyValue[1]", "cmis:document");
+ body.add("succinct", "true");
+
+ HttpEntity> requestEntity = new HttpEntity<>(body, headers);
+ ResponseEntity response =
+ restTemplate.exchange(sdmUrl, HttpMethod.POST, requestEntity, String.class);
+ System.out.println(" Empty Document Created: " + response.getBody());
+
+ return response;
+ }
+
+ private Single uploadSingleChunk(
+ CmisDocument cmisDocument, HttpHeaders headers, String sdmUrl) {
+
+ return Single.defer(
+ () -> {
+ try {
+ // Initialize ReadAheadInputStream
+ InputStream originalStream = cmisDocument.getContent();
+ if (originalStream == null) {
+ return Single.error(new IOException(" File stream is null!"));
+ }
+
+ ReadAheadInputStream reReadableStream =
+ new ReadAheadInputStream(originalStream, cmisDocument.getContentLength());
+ // Need to wrap known content length InputStreamResource with a custom class because if
+ // not InputStream will be read by InputStreamResource multiple times just to know the
+ // length!
+ ReReadableInputStreamResource fileResource =
+ new ReReadableInputStreamResource(
+ reReadableStream,
+ cmisDocument.getFileName(),
+ cmisDocument.getContentLength(),
+ cmisDocument.getMimeType());
+
+ // Prepare Multipart Request
+ MultiValueMap body = new LinkedMultiValueMap<>();
+ body.add("cmisaction", "createDocument");
+ body.add("objectId", cmisDocument.getFolderId());
+ body.add("propertyId[0]", "cmis:name");
+ body.add("propertyValue[0]", cmisDocument.getFileName());
+ body.add("propertyId[1]", "cmis:objectTypeId");
+ body.add("propertyValue[1]", "cmis:document");
+ body.add("succinct", "true");
+
+ HttpHeaders fileHeaders = new HttpHeaders();
+ fileHeaders.setContentType(
+ MediaType.parseMediaType(cmisDocument.getMimeType())); // Ensures correct type
+
+ // body.add("media", fileResource); //Just keeping media added directly to the body
+ // commented for now
+ body.add(
+ "media",
+ new HttpEntity<>(
+ fileResource,
+ fileHeaders)); // To preserve file metadata wrap media content with the
+ // HttpHeader explicitly stating mimetype
+
+ HttpEntity> requestEntity =
+ new HttpEntity<>(body, headers);
+
+ // Send Request
+ ResponseEntity response =
+ restTemplate.exchange(sdmUrl, HttpMethod.POST, requestEntity, String.class);
+ System.out.println(" Upload Response: " + response.getBody());
+
+ Map finalResMap = new HashMap<>();
+ this.formResponse(
+ cmisDocument, finalResMap, response); // Use formResponse to for the custom Response
+ return Single.just(new JSONObject(finalResMap));
+ } catch (Exception e) {
+ return Single.error(
+ new IOException(" Error uploading small document: " + e.getMessage(), e));
+ }
+ });
+ }
+
+ private Single uploadLargeFileInChunks(
+ CmisDocument cmisDocument, HttpHeaders headers, String sdmUrl, int chunkSize) {
+
+ return Single.defer(
+ () -> {
+ try {
+ InputStream originalStream = cmisDocument.getContent();
+ if (originalStream == null) {
+ return Single.error(new IOException(" File stream is null!"));
+ }
+
+ try (ReadAheadInputStream chunkedStream =
+ new ReadAheadInputStream(originalStream, cmisDocument.getContentLength())) {
+ // Step 1: Initial Request (Without Content) and Get `objectId`. It is required to
+ // set in every chunk appendContent
+ ResponseEntity responseEntity =
+ createEmptyDocument(cmisDocument, headers, sdmUrl);
+ String objectId =
+ (new JSONObject(responseEntity.getBody()))
+ .getJSONObject("succinctProperties")
+ .getString("cmis:objectId");
+ cmisDocument.setObjectId(objectId);
+
+ // Step 2: Upload Chunks Sequentially
+ int chunkIndex = 0;
+ byte[] chunkBuffer = new byte[chunkSize];
+ int bytesRead;
+ boolean hasMoreChunks = true;
+ while (hasMoreChunks) {
+ long startChunkUBytesReaddTime = System.currentTimeMillis();
+
+ // Step 3: Read next chunk
+ bytesRead = chunkedStream.read(chunkBuffer, 0, chunkSize);
+
+ // Step 4: Fetch remaining bytes before checking EOF
+ long remainingBytes = chunkedStream.getRemainingBytes();
+
+ // Step 5: Check if it's the last chunk
+ boolean isLastChunk = bytesRead < chunkSize || chunkedStream.isEOFReached();
+
+ // Step 6: If no bytes were read AND queue still has data, fetch from queue
+ if (bytesRead == -1 && !chunkedStream.isChunkQueueEmpty()) {
+ System.out.println(" Premature exit detected. Fetching last chunk from queue...");
+ byte[] lastChunk = chunkedStream.getLastChunkFromQueue();
+ bytesRead = lastChunk.length;
+ System.arraycopy(lastChunk, 0, chunkBuffer, 0, bytesRead);
+ isLastChunk = true; // It has to be the last chunk
+ }
+
+ // 🔹 Log every chunk details
+ System.out.println(
+ "🔹 Chunk "
+ + chunkIndex
+ + " | BytesRead: "
+ + bytesRead
+ + " | RemainingBytes: "
+ + remainingBytes
+ + " | isLastChunk? "
+ + isLastChunk);
+
+ // Step 7: Append Chunk. Call cmis api to append content stream
+ if (bytesRead > 0) {
+ appendContentStream(
+ cmisDocument,
+ headers,
+ sdmUrl,
+ chunkBuffer,
+ bytesRead,
+ isLastChunk,
+ chunkIndex);
+ }
+
+ long endChunkUBytesReaddTime = System.currentTimeMillis();
+ System.out.println(
+ " Chunk "
+ + chunkIndex
+ + " having "
+ + bytesRead
+ + " bytes is read and it took "
+ + ((int) (endChunkUBytesReaddTime - startChunkUBytesReaddTime) / 1000)
+ + " seconds");
+
+ chunkIndex++;
+ // Just for debug purpose log the heap consumption details.
+ if (isLastChunk || chunkIndex % 5 == 0) {
+ System.out.println(
+ "Heap Memory Usage during the Upload when chunkIndex is " + chunkIndex);
+ printMemoryConsumption();
+ }
+
+ if (isLastChunk) {
+
+ System.out.println(" Last chunk processed, exiting upload.");
+ break;
+ }
+ }
+ // Step 8: Finally use the custom formResponse to return
+ Map finalResMap = new HashMap<>();
+ this.formResponse(cmisDocument, finalResMap, responseEntity);
+ return Single.just(new JSONObject(finalResMap));
+ }
+
+ } catch (Exception e) {
+ return Single.error(
+ new IOException(" Error uploading document in chunks: " + e.getMessage(), e));
+ }
+ });
+ }
+
+ private void formResponse(
+ CmisDocument cmisDocument,
+ Map finalResponse,
+ ResponseEntity response) {
+ String status = "success";
+ String name = cmisDocument.getFileName();
+ String id = cmisDocument.getAttachmentId();
+ String objectId = "";
+ String error = "";
+
+ try {
+
+ String responseString = response.getBody();
+ JSONObject jsonResponse = new JSONObject(responseString);
+ int responseCode = response.getStatusCode().value();
+ System.out.println("responseString=" + responseString);
+ System.out.println("responseCode=" + responseCode);
+ if (responseCode == 201 || responseCode == 200) {
+ if (jsonResponse.has("succinctProperties")) {
+ JSONObject succinctProperties = jsonResponse.getJSONObject("succinctProperties");
+ objectId = succinctProperties.getString("cmis:objectId");
+ } else if (jsonResponse.has("properties")
+ && jsonResponse.getJSONObject("properties").has("cmis:objectId"))
+ objectId =
+ jsonResponse
+ .getJSONObject("properties")
+ .getJSONObject("cmis:objectId")
+ .getString("value");
+ } else {
+ String message = jsonResponse.optString("message", "Unknown error");
+ if (responseCode == 409
+ && "Malware Service Exception: Virus found in the file!".equals(message)) {
+ status = "virus";
+ } else if (responseCode == 409) {
+ status = "duplicate";
+ } else {
+ status = "fail";
+ error = message;
+ }
+ }
+
+ finalResponse.put("name", name);
+ finalResponse.put("id", id);
+ finalResponse.put("status", status);
+ finalResponse.put("message", error);
+ if (!objectId.isEmpty()) {
+ finalResponse.put("objectId", objectId);
+ }
+ } catch (Exception e) {
+ throw new ServiceException(SDMConstants.getGenericError("upload"));
+ }
+ }
+
+ // Helper method to convert bytes to megabytes
+ private static long bytesToMegabytes(long bytes) {
+ return bytes / (1024 * 1024);
+ }
+
+ /*
+ * Utility method to log the memory usage details
+ */
+ private void printMemoryConsumption() {
+ MemoryUsage heapMemoryUsage = this.memoryMXBean.getHeapMemoryUsage();
+ // Print the heap memory usage details
+ System.out.printf("Init: %d MB\n", bytesToMegabytes(heapMemoryUsage.getInit()));
+ System.out.printf("Used: %d MB\n", bytesToMegabytes(heapMemoryUsage.getUsed()));
+ System.out.printf("Committed: %d MB\n", bytesToMegabytes(heapMemoryUsage.getCommitted()));
+ System.out.printf("Max: %d MB\n", bytesToMegabytes(heapMemoryUsage.getMax()));
+ System.out.println("--------------------------------------------------------------------");
+ }
+}
diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/ReReadableInputStreamResource.java b/sdm/src/main/java/com/sap/cds/sdm/service/ReReadableInputStreamResource.java
new file mode 100644
index 00000000..db090ca2
--- /dev/null
+++ b/sdm/src/main/java/com/sap/cds/sdm/service/ReReadableInputStreamResource.java
@@ -0,0 +1,37 @@
+package com.sap.cds.sdm.service;
+
+import java.io.InputStream;
+import org.springframework.core.io.InputStreamResource;
+import org.springframework.http.MediaType;
+
+/*
+ * Overriding InputStreamResource to avoid contentLength to be calculated by reading the InputStream.
+ * Note that we already know the content length
+ */
+public class ReReadableInputStreamResource extends InputStreamResource {
+ private final String filename;
+ private final long contentLength;
+ private final String mimeType;
+
+ public ReReadableInputStreamResource(
+ InputStream inputStream, String filename, long contentLength, String mimeType) {
+ super(inputStream);
+ this.filename = filename;
+ this.contentLength = contentLength;
+ this.mimeType = mimeType;
+ }
+
+ @Override
+ public long contentLength() {
+ return contentLength;
+ }
+
+ @Override
+ public String getFilename() {
+ return filename;
+ }
+
+ public MediaType getMediaType() {
+ return MediaType.parseMediaType(mimeType);
+ }
+}
diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java b/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java
new file mode 100644
index 00000000..3ba4dea7
--- /dev/null
+++ b/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java
@@ -0,0 +1,215 @@
+package com.sap.cds.sdm.service;
+
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.util.concurrent.*;
+
+public class ReadAheadInputStream extends InputStream {
+ private final BufferedInputStream originalStream;
+ private final long totalSize;
+ private final int chunkSize = 100 * 1024 * 1024; // 100MB Chunk Size
+ private long totalBytesRead = 0;
+ private boolean lastChunkLoaded = false;
+ private byte[] currentBuffer;
+ private long currentBufferSize = 0;
+ private long position = 0;
+ private MemoryMXBean memoryMXBean;
+
+ private final ExecutorService executor =
+ Executors.newCachedThreadPool(); // Thread pool to Read next chunk
+ private final BlockingQueue chunkQueue =
+ new LinkedBlockingQueue<>(3); // Next chunk is read to a queue
+
+ public ReadAheadInputStream(InputStream inputStream, long totalSize) throws IOException {
+ if (inputStream == null) {
+ throw new IllegalArgumentException(" InputStream cannot be null");
+ }
+
+ this.originalStream = new BufferedInputStream(inputStream, chunkSize);
+ this.totalSize = totalSize;
+ this.currentBuffer = new byte[chunkSize];
+
+ System.out.println(" Initializing ReadAheadInputStream..."); // Once per one file upload
+ preloadChunks(); // preload one chunk
+ loadNextChunk(); // Ensure first chunk is available
+ }
+
+ public boolean isChunkQueueEmpty() {
+ return this.chunkQueue.isEmpty();
+ }
+
+ private void preloadChunks() {
+ executor.submit(
+ () -> {
+ try {
+ while (totalBytesRead < totalSize) {
+ byte[] buffer = new byte[chunkSize];
+ long bytesRead = 0;
+ int readAttempt;
+
+ // Keep reading until full chunk is read until EOF
+ while (bytesRead < chunkSize
+ && (readAttempt =
+ originalStream.read(buffer, (int) bytesRead, chunkSize - (int) bytesRead))
+ > 0) {
+ bytesRead += readAttempt;
+ }
+
+ // Ensure any data read is processed
+ if (bytesRead > 0) {
+ totalBytesRead += bytesRead;
+
+ // Trim buffer if last chunk is smaller
+ if (bytesRead < chunkSize) {
+ byte[] trimmedBuffer = new byte[(int) bytesRead];
+ System.arraycopy(buffer, 0, trimmedBuffer, 0, (int) bytesRead);
+ buffer = trimmedBuffer;
+ }
+
+ // Ensure last chunk is enqueued
+ chunkQueue.put(buffer);
+ System.out.println(" Background Loaded Chunk: " + bytesRead + " bytes");
+
+ // Only mark as last chunk after enqueuing the last chunk
+ if (totalBytesRead >= totalSize) {
+ lastChunkLoaded = true;
+ System.out.println(" Last chunk successfully queued and marked.");
+ break;
+ }
+ }
+ }
+ } catch (Exception e) {
+ System.err.println(" Error in background loading: ");
+ e.printStackTrace();
+ }
+ });
+ }
+
+ public synchronized byte[] getLastChunkFromQueue() throws IOException {
+ try {
+ if (!chunkQueue.isEmpty()) {
+ byte[] lastChunk = chunkQueue.poll(2, TimeUnit.SECONDS); // Wait briefly if needed
+ if (lastChunk != null) {
+ System.out.println(" Fetching last chunk from queue: " + lastChunk.length + " bytes");
+ return lastChunk;
+ }
+ }
+ } catch (InterruptedException e) {
+ System.err.println(" Interrupted while fetching last chunk from queue");
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while fetching last chunk", e);
+ }
+
+ System.err.println("⚠️ No last chunk found in queue. Returning empty.");
+ return new byte[0]; // Return empty array if queue is unexpectedly empty
+ }
+
+ public synchronized boolean isEOFReached() {
+ // True if the last chunk has been read and no bytes are left
+ return lastChunkLoaded
+ && chunkQueue.isEmpty()
+ && position >= currentBufferSize; // && position >= currentBufferSize && totalBytesRead >=
+ // totalSize;
+ }
+
+ public synchronized long getRemainingBytes() {
+ long remaining = totalSize - totalBytesRead;
+ System.out.println(" Remaining Bytes: " + remaining);
+ return remaining > 0 ? remaining : 0;
+ }
+
+ private synchronized void loadNextChunk() throws IOException {
+ try {
+ if (chunkQueue.isEmpty() && lastChunkLoaded) {
+ return; // No more data, return EOF
+ }
+
+ currentBuffer = chunkQueue.take(); // Fetch from preloaded queue
+ currentBufferSize = currentBuffer.length;
+ position = 0;
+ System.out.println(" Loaded Chunk | Size: " + currentBufferSize);
+
+ // forceGc(); // If the GC is slow, possibly in the busy Read Ahead chunking process it could
+ // be
+ // possible that the dequeued items not yet garbage collected. check the heap size to
+ // do any forceful garbage collection.
+
+ // Ensure the last chunk is processed
+ if (lastChunkLoaded && chunkQueue.isEmpty()) {
+ System.out.println(" Last chunk successfully processed and uploaded.");
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new IOException(" Interrupted while loading next chunk", e);
+ }
+ }
+
+ @Override
+ public synchronized int read() throws IOException {
+ if (position >= currentBufferSize) {
+ if (lastChunkLoaded) return -1; // EOF
+ loadNextChunk();
+ }
+ return currentBuffer[(int) position++]
+ & 0xFF; // Read the byte buffer into the integer number taking only least significant byte
+ // into account
+ }
+
+ @Override
+ public synchronized int read(byte[] b, int off, int len) throws IOException {
+ if (position >= currentBufferSize) {
+ System.out.println("position = " + position + " >= currentBufferSize = " + currentBufferSize);
+ if (lastChunkLoaded) return -1;
+ loadNextChunk();
+ }
+
+ int bytesToRead = (int) Math.min(len, currentBufferSize - position);
+ System.arraycopy(
+ currentBuffer,
+ (int) position,
+ b,
+ off,
+ bytesToRead); // Read the input stream byte array into the buffer
+ position += bytesToRead;
+
+ return bytesToRead;
+ }
+
+ /*
+ * Close the original input stream and shutdown thread pool
+ */
+ @Override
+ public void close() throws IOException {
+ try {
+ executor.shutdown();
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ System.err.println("⚠️ Forcing executor shutdown...");
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(" Error shutting down executor", e);
+ }
+ originalStream.close();
+ }
+
+ public synchronized void resetStream() throws IOException {
+ originalStream.reset();
+ totalBytesRead = 0;
+ lastChunkLoaded = false;
+ position = 0;
+ System.out.println(" Stream Reset!");
+ }
+
+ private void forceGc() {
+ if (this.memoryMXBean == null) this.memoryMXBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage heapMemoryUsage = this.memoryMXBean.getHeapMemoryUsage();
+ // If the heap has still 1G and above in used section, better to call forceful garbage
+ // collection. Ideally shouldnt have happened.
+ if (heapMemoryUsage.getUsed() >= 1073741824) {
+ System.gc();
+ System.out.println("Forceful garbage collection called from ReadAheadInputStream");
+ }
+ }
+}
diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/RetryUtils.java b/sdm/src/main/java/com/sap/cds/sdm/service/RetryUtils.java
new file mode 100644
index 00000000..3bd96728
--- /dev/null
+++ b/sdm/src/main/java/com/sap/cds/sdm/service/RetryUtils.java
@@ -0,0 +1,36 @@
+package com.sap.cds.sdm.service;
+
+import io.reactivex.Flowable;
+import io.reactivex.functions.Function;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import org.reactivestreams.Publisher;
+import org.springframework.web.client.HttpClientErrorException;
+import org.springframework.web.client.HttpServerErrorException;
+
+public class RetryUtils {
+
+ public static Predicate shouldRetry() {
+ return throwable ->
+ throwable instanceof HttpClientErrorException
+ || throwable instanceof HttpServerErrorException;
+ }
+
+ public static Function, Publisher>> retryLogic(int maxAttempts) {
+ return errors ->
+ errors.flatMap(
+ error ->
+ Flowable.range(1, maxAttempts + 1)
+ .concatMap(
+ attempt -> {
+ if (shouldRetry().test(error) && attempt <= maxAttempts) {
+ long delay =
+ (long)
+ Math.pow(2, attempt); // Exponential backoff: 2^attempt seconds
+ return Flowable.timer(delay, TimeUnit.SECONDS).map(ignored -> error);
+ } else {
+ return Flowable.error(error);
+ }
+ }));
+ }
+}
diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandler.java b/sdm/src/main/java/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandler.java
index 5de820a4..f7b87639 100644
--- a/sdm/src/main/java/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandler.java
+++ b/sdm/src/main/java/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandler.java
@@ -16,6 +16,7 @@
import com.sap.cds.sdm.model.CmisDocument;
import com.sap.cds.sdm.model.SDMCredentials;
import com.sap.cds.sdm.persistence.DBQuery;
+import com.sap.cds.sdm.service.DocumentUploadService;
import com.sap.cds.sdm.service.SDMService;
import com.sap.cds.sdm.utilities.SDMUtils;
import com.sap.cds.services.ServiceException;
@@ -25,6 +26,7 @@
import com.sap.cds.services.handler.annotations.On;
import com.sap.cds.services.handler.annotations.ServiceName;
import com.sap.cds.services.persistence.PersistenceService;
+import com.sap.cds.services.utils.StringUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
@@ -38,15 +40,29 @@
public class SDMAttachmentsServiceHandler implements EventHandler {
private final PersistenceService persistenceService;
private final SDMService sdmService;
+ private final DocumentUploadService documentService;
public SDMAttachmentsServiceHandler(
- PersistenceService persistenceService, SDMService sdmService) {
+ PersistenceService persistenceService,
+ SDMService sdmService,
+ DocumentUploadService documentService) {
this.persistenceService = persistenceService;
this.sdmService = sdmService;
+ this.documentService = documentService;
}
@On(event = AttachmentService.EVENT_CREATE_ATTACHMENT)
public void createAttachment(AttachmentCreateEventContext context) throws IOException {
+ System.out.println(
+ "Event Received: "
+ + context.getAttachmentEntity().getKey()
+ + " at "
+ + System.currentTimeMillis());
+ System.out.println(
+ "content-length is " + context.getParameterInfo().getHeaders().get("content-length"));
+ String len = context.getParameterInfo().getHeaders().get("content-length");
+ long contentLen = !StringUtils.isEmpty(len) ? Long.parseLong(len) : -1;
+ System.out.println("contentLen = " + contentLen);
String subdomain = "";
String repositoryId = SDMConstants.REPOSITORY_ID;
AuthenticationInfo authInfo = context.getAuthenticationInfo();
@@ -90,9 +106,24 @@ public void createAttachment(AttachmentCreateEventContext context) throws IOExce
cmisDocument.setRepositoryId(repositoryId);
cmisDocument.setFolderId(folderId);
cmisDocument.setMimeType(mimeType);
+ cmisDocument.setContentLength(contentLen);
SDMCredentials sdmCredentials = TokenHandler.getSDMCredentials();
- JSONObject createResult =
- sdmService.createDocument(cmisDocument, sdmCredentials, jwtToken);
+ // JSONObject createResult =
+ // sdmService.createDocument(cmisDocument, sdmCredentials, jwtToken);
+ // JSONObject createResult =
+ // documentCreator.createDocument(cmisDocument, sdmCredentials, jwtToken);
+ JSONObject createResult = null;
+ try {
+ createResult =
+ documentService
+ .createDocumentRx(cmisDocument, sdmCredentials, jwtToken)
+ .blockingGet();
+ System.out.println(
+ "Synchronous Response from documentServiceRx: " + createResult.toString());
+ System.out.println("✅ Upload Finished at: " + System.currentTimeMillis());
+ } catch (Exception e) {
+ System.err.println("Error in documentServiceRx: " + e.getMessage());
+ }
if (createResult.get("status") == "duplicate") {
throw new ServiceException(SDMConstants.getDuplicateFilesError(filename));
diff --git a/sdm/src/test/java/unit/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandlerTest.java b/sdm/src/test/java/unit/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandlerTest.java
index cd1cd15a..7bfa453c 100644
--- a/sdm/src/test/java/unit/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandlerTest.java
+++ b/sdm/src/test/java/unit/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandlerTest.java
@@ -26,6 +26,7 @@
import com.sap.cds.sdm.model.CmisDocument;
import com.sap.cds.sdm.model.SDMCredentials;
import com.sap.cds.sdm.persistence.DBQuery;
+import com.sap.cds.sdm.service.DocumentUploadService;
import com.sap.cds.sdm.service.SDMService;
import com.sap.cds.sdm.service.SDMServiceImpl;
import com.sap.cds.sdm.service.handler.SDMAttachmentsServiceHandler;
@@ -59,6 +60,8 @@ public class SDMAttachmentsServiceHandlerTest {
@Mock private AttachmentMarkAsDeletedEventContext attachmentMarkAsDeletedEventContext;
@Mock private AttachmentRestoreEventContext restoreEventContext;
private SDMService sdmService;
+
+ private DocumentUploadService documentService;
@Mock private CdsModel cdsModel;
@Mock private CdsEntity cdsEntity;
@@ -87,13 +90,15 @@ public void setUp() {
MockitoAnnotations.openMocks(this);
persistenceService = mock(PersistenceService.class);
sdmService = mock(SDMServiceImpl.class);
+ documentService = mock(DocumentUploadService.class);
when(attachmentMarkAsDeletedEventContext.getContentId())
.thenReturn("objectId:folderId:entity:subdomain");
when(attachmentMarkAsDeletedEventContext.getDeletionUserInfo()).thenReturn(deletionUserInfo);
when(deletionUserInfo.getName()).thenReturn(userEmail);
when(mockContext.getUserInfo()).thenReturn(userInfo);
when(userInfo.getName()).thenReturn(userEmail);
- handlerSpy = spy(new SDMAttachmentsServiceHandler(persistenceService, sdmService));
+ handlerSpy =
+ spy(new SDMAttachmentsServiceHandler(persistenceService, sdmService, documentService));
}
@Test