From 2ae6937eb31559ea57b2d932b751791767261b0e Mon Sep 17 00:00:00 2001 From: Mahesh S Date: Tue, 4 Feb 2025 14:14:11 +0530 Subject: [PATCH 1/4] revert --- sdm/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdm/pom.xml b/sdm/pom.xml index 2cc3ec24..15be91a7 100644 --- a/sdm/pom.xml +++ b/sdm/pom.xml @@ -547,7 +547,7 @@ BRANCH COVEREDRATIO - 0.90 + 0.80 CLASS From 22fd021c2774f2df179d33b8a145a1c10fd236e0 Mon Sep 17 00:00:00 2001 From: Mahesh S Date: Sat, 8 Mar 2025 11:24:44 +0530 Subject: [PATCH 2/4] Fix for large file upload issue --- sdm/pom.xml | 72 ++- .../cds/sdm/configuration/Registration.java | 5 +- .../com/sap/cds/sdm/handler/TokenHandler.java | 126 +++++ .../com/sap/cds/sdm/model/CmisDocument.java | 1 + .../sdm/service/DocumentUploadService.java | 448 ++++++++++++++++++ .../ReReadableInputStreamResource.java | 37 ++ .../cds/sdm/service/ReadAheadInputStream.java | 215 +++++++++ .../handler/SDMAttachmentsServiceHandler.java | 37 +- .../SDMAttachmentsServiceHandlerTest.java | 7 +- 9 files changed, 942 insertions(+), 6 deletions(-) create mode 100644 sdm/src/main/java/com/sap/cds/sdm/service/DocumentUploadService.java create mode 100644 sdm/src/main/java/com/sap/cds/sdm/service/ReReadableInputStreamResource.java create mode 100644 sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java diff --git a/sdm/pom.xml b/sdm/pom.xml index 15be91a7..d5bfe6ed 100644 --- a/sdm/pom.xml +++ b/sdm/pom.xml @@ -93,6 +93,52 @@ + + org.apache.httpcomponents.client5 + httpclient5 + 5.4.2 + + + org.apache.httpcomponents.core5 + httpcore5 + 5.3.3 + + + + org.apache.httpcomponents + httpmime + 4.3.1 + + + + org.apache.httpcomponents + httpasyncclient + 4.1.5 + + + + log4j + log4j + 1.2.17 + + + + org.springframework.boot + spring-boot-starter-web + 3.4.3 + + + + org.springframework + spring-web + 6.0.11 + + + org.apache.httpcomponents + httpclient + + + org.projectlombok lombok @@ -376,6 +422,31 @@ + + + + io.reactivex.rxjava2 + rxjava + 2.2.21 + + + + org.apache.httpcomponents + httpcore + 4.3.2 + + + org.apache.httpcomponents + httpclient + 4.5.13 + + + + org.springframework.boot + spring-boot-starter-data-jpa + 3.0.0 + + @@ -607,5 +678,4 @@ https://common.repositories.cloud.sap/artifactory/cap-sdm-java - diff --git a/sdm/src/main/java/com/sap/cds/sdm/configuration/Registration.java b/sdm/src/main/java/com/sap/cds/sdm/configuration/Registration.java index f1e0acaf..a708eff6 100644 --- a/sdm/src/main/java/com/sap/cds/sdm/configuration/Registration.java +++ b/sdm/src/main/java/com/sap/cds/sdm/configuration/Registration.java @@ -6,6 +6,7 @@ import com.sap.cds.sdm.handler.applicationservice.SDMCreateAttachmentsHandler; import com.sap.cds.sdm.handler.applicationservice.SDMReadAttachmentsHandler; import com.sap.cds.sdm.handler.applicationservice.SDMUpdateAttachmentsHandler; +import com.sap.cds.sdm.service.DocumentUploadService; import com.sap.cds.sdm.service.SDMAttachmentsService; import com.sap.cds.sdm.service.SDMService; import com.sap.cds.sdm.service.SDMServiceImpl; @@ -59,10 +60,12 @@ public void eventHandlers(CdsRuntimeConfigurer configurer) { var connectionPool = getConnectionPool(environment); SDMService sdmService = new SDMServiceImpl(binding, connectionPool); + DocumentUploadService documentService = new DocumentUploadService(); configurer.eventHandler(buildReadHandler()); configurer.eventHandler(new SDMCreateAttachmentsHandler(sdmService)); configurer.eventHandler(new SDMUpdateAttachmentsHandler(persistenceService, sdmService)); - configurer.eventHandler(new SDMAttachmentsServiceHandler(persistenceService, sdmService)); + configurer.eventHandler( + new SDMAttachmentsServiceHandler(persistenceService, sdmService, documentService)); } private AttachmentService buildAttachmentService() { diff --git a/sdm/src/main/java/com/sap/cds/sdm/handler/TokenHandler.java b/sdm/src/main/java/com/sap/cds/sdm/handler/TokenHandler.java index c53941ec..33121d76 100644 --- a/sdm/src/main/java/com/sap/cds/sdm/handler/TokenHandler.java +++ b/sdm/src/main/java/com/sap/cds/sdm/handler/TokenHandler.java @@ -8,6 +8,7 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.sap.cds.sdm.caching.CacheConfig; +import com.sap.cds.sdm.caching.CacheKey; import com.sap.cds.sdm.caching.TokenCacheKey; import com.sap.cds.sdm.constants.SDMConstants; import com.sap.cds.sdm.model.SDMCredentials; @@ -19,16 +20,29 @@ import com.sap.cloud.sdk.cloudplatform.connectivity.OAuth2DestinationBuilder; import com.sap.cloud.sdk.cloudplatform.connectivity.OnBehalfOf; import com.sap.cloud.security.config.ClientCredentials; +import com.sap.cloud.security.xsuaa.client.OAuth2ServiceException; +import com.sap.cloud.security.xsuaa.http.HttpHeaders; +import com.sap.cloud.security.xsuaa.http.MediaType; import java.io.*; import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.ClientProtocolException; import org.apache.http.client.HttpClient; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicNameValuePair; +import org.json.JSONObject; public class TokenHandler { @@ -64,10 +78,17 @@ public static SDMCredentials getSDMCredentials() { Map uaaCredentials = sdmBinding.getCredentials(); Map uaa = (Map) uaaCredentials.get("uaa"); + System.out.println("sdmCredentials=" + sdmCredentials.toString()); + System.out.println("url:" + uaa.get("url").toString()); sdmCredentials.setBaseTokenUrl(uaa.get("url").toString()); sdmCredentials.setUrl(sdmBinding.getCredentials().get("uri").toString()); sdmCredentials.setClientId(uaa.get("clientid").toString()); sdmCredentials.setClientSecret(uaa.get("clientsecret").toString()); + System.out.println( + "Client id and secret are " + + sdmCredentials.getClientId() + + ":" + + sdmCredentials.getClientSecret()); return sdmCredentials; } @@ -135,6 +156,111 @@ public static String getDITokenUsingAuthorities( return cachedToken; } + public static String getDIToken(String token, SDMCredentials sdmCredentials) throws IOException { + System.out.println("jwt token = " + token); + JsonObject payloadObj = getTokenFields(token); + String email = payloadObj.get("email").getAsString(); + JsonObject tenantDetails = payloadObj.get("ext_attr").getAsJsonObject(); + String subdomain = tenantDetails.get("zdn").getAsString(); + String tokenexpiry = payloadObj.get("exp").getAsString(); + CacheKey cacheKey = new CacheKey(); + cacheKey.setKey(email + "_" + subdomain); + cacheKey.setExpiration(tokenexpiry); + String cachedToken = CacheConfig.getUserTokenCache().get(cacheKey); + if (cachedToken == null) { + cachedToken = generateDITokenFromTokenExchange(token, sdmCredentials, payloadObj); + System.out.println("cachedToken token = " + cachedToken); + } + return cachedToken; + } + + private static Map fillTokenExchangeBody(String token, SDMCredentials sdmEnv) { + Map parameters = new HashMap<>(); + // parameters.put("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer"); + // parameters.put(CLIENT_ID, sdmEnv.getClientId()); + // parameters.put(CLIENT_SECRET, sdmEnv.getClientSecret()); + parameters.put("assertion", token); + return parameters; + } + + private static String generateDITokenFromTokenExchange( + String token, SDMCredentials sdmCredentials, JsonObject payloadObj) + throws OAuth2ServiceException { + String cachedToken = null; + CloseableHttpClient httpClient = null; + try { + httpClient = HttpClients.createDefault(); + if (sdmCredentials.getClientId() == null) { + throw new IOException("No SDM binding found"); + } + Map parameters = fillTokenExchangeBody(token, sdmCredentials); + HttpPost httpPost = + new HttpPost( + sdmCredentials.getBaseTokenUrl() + + "/oauth/token?grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer"); + httpPost.setHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON.value()); + httpPost.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED.value()); + httpPost.setHeader("X-zid", getTokenFields(token).get("zid").getAsString()); + + String encoded = + java.util.Base64.getEncoder() + .encodeToString( + (sdmCredentials.getClientId() + ":" + sdmCredentials.getClientSecret()) + .getBytes()); + httpPost.setHeader("Authorization", "Basic " + encoded); + + List basicNameValuePairs = + parameters.entrySet().stream() + .map(entry -> new BasicNameValuePair(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()); + httpPost.setEntity(new UrlEncodedFormEntity(basicNameValuePairs)); + + HttpResponse response = httpClient.execute(httpPost); + String responseBody = extractResponseBodyAsString(response); + if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + System.out.println("Error fetching token with JWT bearer : " + responseBody); + } + Map accessTokenMap = new JSONObject(responseBody).toMap(); + cachedToken = String.valueOf(accessTokenMap.get("access_token")); + String expiryTime = payloadObj.get("exp").getAsString(); + CacheKey cacheKey = new CacheKey(); + JsonObject tenantDetails = payloadObj.get("ext_attr").getAsJsonObject(); + String subdomain = tenantDetails.get("zdn").getAsString(); + cacheKey.setKey(payloadObj.get("email").getAsString() + "_" + subdomain); + cacheKey.setExpiration(expiryTime); + CacheConfig.getUserTokenCache().put(cacheKey, cachedToken); + } catch (UnsupportedEncodingException e) { + throw new OAuth2ServiceException("Unexpected error parsing URI: " + e.getMessage()); + } catch (ClientProtocolException e) { + throw new OAuth2ServiceException( + "Unexpected error while fetching client protocol: " + e.getMessage()); + } catch (IOException e) { + System.out.println( + "Error in POST request while fetching token with JWT bearer " + e.getMessage()); + } finally { + safeClose(httpClient); + } + return cachedToken; + } + + private static void safeClose(CloseableHttpClient httpClient) { + if (httpClient != null) { + try { + httpClient.close(); + } catch (IOException ex) { + System.out.println("Failed to close httpclient " + ex.getMessage()); + } + } + } + + private static String extractResponseBodyAsString(HttpResponse response) throws IOException { + // Ensure that InputStream and BufferedReader are automatically closed + try (InputStream inputStream = response.getEntity().getContent(); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream))) { + return bufferedReader.lines().collect(Collectors.joining(System.lineSeparator())); + } + } + public static JsonObject getTokenFields(String token) { String[] chunks = token.split("\\."); java.util.Base64.Decoder decoder = java.util.Base64.getUrlDecoder(); diff --git a/sdm/src/main/java/com/sap/cds/sdm/model/CmisDocument.java b/sdm/src/main/java/com/sap/cds/sdm/model/CmisDocument.java index 43ac2a59..2eac3e5e 100644 --- a/sdm/src/main/java/com/sap/cds/sdm/model/CmisDocument.java +++ b/sdm/src/main/java/com/sap/cds/sdm/model/CmisDocument.java @@ -22,4 +22,5 @@ public class CmisDocument { private String repositoryId; private String status; private String mimeType; + private long contentLength; } diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/DocumentUploadService.java b/sdm/src/main/java/com/sap/cds/sdm/service/DocumentUploadService.java new file mode 100644 index 00000000..1d4cb563 --- /dev/null +++ b/sdm/src/main/java/com/sap/cds/sdm/service/DocumentUploadService.java @@ -0,0 +1,448 @@ +package com.sap.cds.sdm.service; + +import com.sap.cds.sdm.constants.SDMConstants; +import com.sap.cds.sdm.handler.TokenHandler; +import com.sap.cds.sdm.model.CmisDocument; +import com.sap.cds.sdm.model.SDMCredentials; +import com.sap.cds.services.ServiceException; +import io.reactivex.Single; +import java.io.*; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.util.*; +import java.util.concurrent.TimeUnit; +import org.apache.hc.client5.http.config.ConnectionConfig; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.HttpClients; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; +import org.json.JSONObject; +import org.springframework.core.io.InputStreamResource; +import org.springframework.http.*; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.client.RestTemplate; + +public class DocumentUploadService { + + private final RestTemplate restTemplate; + + MemoryMXBean memoryMXBean; + + public DocumentUploadService() { + System.out.println("DocumentUploadService is instantiated"); + PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); + connectionManager.setMaxTotal(20); + connectionManager.setDefaultMaxPerRoute(5); + + // Configure request with timeouts + RequestConfig requestConfig = + RequestConfig.custom() + .setConnectionRequestTimeout(60, TimeUnit.MINUTES) + .setResponseTimeout(60, TimeUnit.MINUTES) + .build(); + + ConnectionConfig connectionConfig = + ConnectionConfig.custom().setConnectTimeout(60, TimeUnit.MINUTES).build(); + connectionManager.setDefaultConnectionConfig(connectionConfig); + + // Create a HttpClient using the connection manager + CloseableHttpClient httpClient = + HttpClients.custom() + .setConnectionManager(connectionManager) + .setDefaultRequestConfig(requestConfig) + .build(); + + // Pass the HttpClient to the request factory + // Create the factory with the HttpClient + HttpComponentsClientHttpRequestFactory requestFactory = + new HttpComponentsClientHttpRequestFactory(httpClient); + + requestFactory.setConnectTimeout(3600000); + requestFactory.setConnectionRequestTimeout(3600000); + + // Create the RestTemplate with this request factory + restTemplate = new RestTemplate(requestFactory); + + // Add interceptors if needed. May be if for debug logs etc. + restTemplate + .getInterceptors() + .add( + (request, body, execution) -> { + // Log, modify headers, etc. + return execution.execute(request, body); + }); + // Getting the handle to Mem management bean to print out heap mem used at required intervals. + memoryMXBean = ManagementFactory.getMemoryMXBean(); + } + + /* + * Reactive Java implementation to create document. + */ + public Single createDocumentRx( + CmisDocument cmisDocument, SDMCredentials sdmCredentials, String jwtToken) { + return Single.defer( + () -> { + try { + // Obtain DI token + String accessToken = TokenHandler.getDIToken(jwtToken, sdmCredentials); + String sdmUrl = + sdmCredentials.getUrl() + "browser/" + cmisDocument.getRepositoryId() + "/root"; + + // Set HTTP headers + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.MULTIPART_FORM_DATA); + headers.set("Authorization", "Bearer " + accessToken); + headers.setConnection("keep-alive"); + + long totalSize = cmisDocument.getContentLength(); + int chunkSize = 100 * 1024 * 1024; // 100MB chunks + + if (totalSize <= chunkSize) { + // Upload directly if file is ≤ 100MB + return uploadSingleChunk(cmisDocument, headers, sdmUrl); + } else { + // Upload in chunks if file is > 100MB + return uploadLargeFileInChunks(cmisDocument, headers, sdmUrl, chunkSize); + } + } catch (Exception e) { + return Single.error( + new IOException(" Error uploading document: " + e.getMessage(), e)); + } + }) + .subscribeOn(io.reactivex.schedulers.Schedulers.io()); + } + + /* + * CMIS call to appending content stream + */ + private void appendContentStream( + CmisDocument cmisDocument, + HttpHeaders headers, + String sdmUrl, + byte[] chunkBuffer, + int bytesRead, + boolean isLastChunk, + int chunkIndex) + throws IOException { + + MultiValueMap body = new LinkedMultiValueMap<>(); + body.add("cmisaction", "appendContent"); + body.add("objectId", cmisDocument.getObjectId()); + + body.add("propertyId[0]", "cmis:name"); + body.add("propertyValue[0]", cmisDocument.getFileName()); + body.add("propertyId[1]", "cmis:objectTypeId"); + body.add("propertyValue[1]", "cmis:document"); + + body.add("isLastChunk", String.valueOf(isLastChunk)); + body.add("filename", cmisDocument.getFileName()); + body.add("succinct", "true"); + InputStreamResource chunkResource = + new InputStreamResource(new ByteArrayInputStream(chunkBuffer, 0, bytesRead)) { + @Override + public long contentLength() { + return bytesRead; + } + + @Override + public String getFilename() { + return cmisDocument.getFileName(); + } + }; + + body.add( + "media", + chunkResource); // In multi part chunking directly adding the chunk as body instead of + // wrapping each chunk by mimetype HttpHeader + + /*HttpHeaders fileHeaders = new HttpHeaders(); + fileHeaders.setContentType( + MediaType.parseMediaType(cmisDocument.getMimeType())); // Ensures correct type + body.add("media", new HttpEntity<>(chunkResource, fileHeaders)); // Preserve file metadata + */ + + long startChunkUploadTime = System.currentTimeMillis(); + HttpEntity> requestEntity = new HttpEntity<>(body, headers); + ResponseEntity response = + restTemplate.exchange(sdmUrl, HttpMethod.POST, requestEntity, String.class); + long endChunkUploadTime = System.currentTimeMillis(); + + System.out.println( + " Chunk " + + chunkIndex + + " Uploaded. Response: " + + response.getBody() + + " and it took " + + ((int) ((endChunkUploadTime - startChunkUploadTime) / 1000)) + + " seconds"); + } + + private ResponseEntity createEmptyDocument( + CmisDocument cmisDocument, HttpHeaders headers, String sdmUrl) throws IOException { + + MultiValueMap body = new LinkedMultiValueMap<>(); + body.add("cmisaction", "createDocument"); + body.add("objectId", cmisDocument.getFolderId()); + body.add("propertyId[0]", "cmis:name"); + body.add("propertyValue[0]", cmisDocument.getFileName()); + body.add("propertyId[1]", "cmis:objectTypeId"); + body.add("propertyValue[1]", "cmis:document"); + body.add("succinct", "true"); + + HttpEntity> requestEntity = new HttpEntity<>(body, headers); + ResponseEntity response = + restTemplate.exchange(sdmUrl, HttpMethod.POST, requestEntity, String.class); + System.out.println(" Empty Document Created: " + response.getBody()); + + return response; + } + + private Single uploadSingleChunk( + CmisDocument cmisDocument, HttpHeaders headers, String sdmUrl) { + + return Single.defer( + () -> { + try { + // Initialize ReadAheadInputStream + InputStream originalStream = cmisDocument.getContent(); + if (originalStream == null) { + return Single.error(new IOException(" File stream is null!")); + } + + ReadAheadInputStream reReadableStream = + new ReadAheadInputStream(originalStream, cmisDocument.getContentLength()); + // Need to wrap known content length InputStreamResource with a custom class because if + // not InputStream will be read by InputStreamResource multiple times just to know the + // length! + ReReadableInputStreamResource fileResource = + new ReReadableInputStreamResource( + reReadableStream, + cmisDocument.getFileName(), + cmisDocument.getContentLength(), + cmisDocument.getMimeType()); + + // Prepare Multipart Request + MultiValueMap body = new LinkedMultiValueMap<>(); + body.add("cmisaction", "createDocument"); + body.add("objectId", cmisDocument.getFolderId()); + body.add("propertyId[0]", "cmis:name"); + body.add("propertyValue[0]", cmisDocument.getFileName()); + body.add("propertyId[1]", "cmis:objectTypeId"); + body.add("propertyValue[1]", "cmis:document"); + body.add("succinct", "true"); + + HttpHeaders fileHeaders = new HttpHeaders(); + fileHeaders.setContentType( + MediaType.parseMediaType(cmisDocument.getMimeType())); // Ensures correct type + + // body.add("media", fileResource); //Just keeping media added directly to the body + // commented for now + body.add( + "media", + new HttpEntity<>( + fileResource, + fileHeaders)); // To preserve file metadata wrap media content with the + // HttpHeader explicitly stating mimetype + + HttpEntity> requestEntity = + new HttpEntity<>(body, headers); + + // Send Request + ResponseEntity response = + restTemplate.exchange(sdmUrl, HttpMethod.POST, requestEntity, String.class); + System.out.println(" Upload Response: " + response.getBody()); + + Map finalResMap = new HashMap<>(); + this.formResponse( + cmisDocument, finalResMap, response); // Use formResponse to for the custom Response + return Single.just(new JSONObject(finalResMap)); + } catch (Exception e) { + return Single.error( + new IOException(" Error uploading small document: " + e.getMessage(), e)); + } + }); + } + + private Single uploadLargeFileInChunks( + CmisDocument cmisDocument, HttpHeaders headers, String sdmUrl, int chunkSize) { + + return Single.defer( + () -> { + try { + InputStream originalStream = cmisDocument.getContent(); + if (originalStream == null) { + return Single.error(new IOException(" File stream is null!")); + } + + try (ReadAheadInputStream chunkedStream = + new ReadAheadInputStream(originalStream, cmisDocument.getContentLength())) { + // Step 1: Initial Request (Without Content) and Get `objectId`. It is required to + // set in every chunk appendContent + ResponseEntity responseEntity = + createEmptyDocument(cmisDocument, headers, sdmUrl); + String objectId = + (new JSONObject(responseEntity.getBody())) + .getJSONObject("succinctProperties") + .getString("cmis:objectId"); + cmisDocument.setObjectId(objectId); + + // Step 2: Upload Chunks Sequentially + int chunkIndex = 0; + byte[] chunkBuffer = new byte[chunkSize]; + int bytesRead; + boolean hasMoreChunks = true; + while (hasMoreChunks) { + long startChunkUBytesReaddTime = System.currentTimeMillis(); + + // Step 3: Read next chunk + bytesRead = chunkedStream.read(chunkBuffer, 0, chunkSize); + + // Step 4: Fetch remaining bytes before checking EOF + long remainingBytes = chunkedStream.getRemainingBytes(); + + // Step 5: Check if it's the last chunk + boolean isLastChunk = bytesRead < chunkSize || chunkedStream.isEOFReached(); + + // Step 6: If no bytes were read AND queue still has data, fetch from queue + if (bytesRead == -1 && !chunkedStream.isChunkQueueEmpty()) { + System.out.println(" Premature exit detected. Fetching last chunk from queue..."); + byte[] lastChunk = chunkedStream.getLastChunkFromQueue(); + bytesRead = lastChunk.length; + System.arraycopy(lastChunk, 0, chunkBuffer, 0, bytesRead); + isLastChunk = true; // It has to be the last chunk + } + + // 🔹 Log every chunk details + System.out.println( + "🔹 Chunk " + + chunkIndex + + " | BytesRead: " + + bytesRead + + " | RemainingBytes: " + + remainingBytes + + " | isLastChunk? " + + isLastChunk); + + // Step 7: Append Chunk. Call cmis api to append content stream + if (bytesRead > 0) { + appendContentStream( + cmisDocument, + headers, + sdmUrl, + chunkBuffer, + bytesRead, + isLastChunk, + chunkIndex); + } + + long endChunkUBytesReaddTime = System.currentTimeMillis(); + System.out.println( + " Chunk " + + chunkIndex + + " having " + + bytesRead + + " bytes is read and it took " + + ((int) (endChunkUBytesReaddTime - startChunkUBytesReaddTime) / 1000) + + " seconds"); + + chunkIndex++; + // Just for debug purpose log the heap consumption details. + if (isLastChunk || chunkIndex % 5 == 0) { + System.out.println( + "Heap Memory Usage during the Upload when chunkIndex is " + chunkIndex); + printMemoryConsumption(); + } + + if (isLastChunk) { + + System.out.println(" Last chunk processed, exiting upload."); + break; + } + } + // Step 8: Finally use the custom formResponse to return + Map finalResMap = new HashMap<>(); + this.formResponse(cmisDocument, finalResMap, responseEntity); + return Single.just(new JSONObject(finalResMap)); + } + + } catch (Exception e) { + return Single.error( + new IOException(" Error uploading document in chunks: " + e.getMessage(), e)); + } + }); + } + + private void formResponse( + CmisDocument cmisDocument, + Map finalResponse, + ResponseEntity response) { + String status = "success"; + String name = cmisDocument.getFileName(); + String id = cmisDocument.getAttachmentId(); + String objectId = ""; + String error = ""; + + try { + + String responseString = response.getBody(); + JSONObject jsonResponse = new JSONObject(responseString); + int responseCode = response.getStatusCode().value(); + System.out.println("responseString=" + responseString); + System.out.println("responseCode=" + responseCode); + if (responseCode == 201 || responseCode == 200) { + if (jsonResponse.has("succinctProperties")) { + JSONObject succinctProperties = jsonResponse.getJSONObject("succinctProperties"); + objectId = succinctProperties.getString("cmis:objectId"); + } else if (jsonResponse.has("properties") + && jsonResponse.getJSONObject("properties").has("cmis:objectId")) + objectId = + jsonResponse + .getJSONObject("properties") + .getJSONObject("cmis:objectId") + .getString("value"); + } else { + String message = jsonResponse.optString("message", "Unknown error"); + if (responseCode == 409 + && "Malware Service Exception: Virus found in the file!".equals(message)) { + status = "virus"; + } else if (responseCode == 409) { + status = "duplicate"; + } else { + status = "fail"; + error = message; + } + } + + finalResponse.put("name", name); + finalResponse.put("id", id); + finalResponse.put("status", status); + finalResponse.put("message", error); + if (!objectId.isEmpty()) { + finalResponse.put("objectId", objectId); + } + } catch (Exception e) { + throw new ServiceException(SDMConstants.getGenericError("upload")); + } + } + + // Helper method to convert bytes to megabytes + private static long bytesToMegabytes(long bytes) { + return bytes / (1024 * 1024); + } + + /* + * Utility method to log the memory usage details + */ + private void printMemoryConsumption() { + MemoryUsage heapMemoryUsage = this.memoryMXBean.getHeapMemoryUsage(); + // Print the heap memory usage details + System.out.printf("Init: %d MB\n", bytesToMegabytes(heapMemoryUsage.getInit())); + System.out.printf("Used: %d MB\n", bytesToMegabytes(heapMemoryUsage.getUsed())); + System.out.printf("Committed: %d MB\n", bytesToMegabytes(heapMemoryUsage.getCommitted())); + System.out.printf("Max: %d MB\n", bytesToMegabytes(heapMemoryUsage.getMax())); + System.out.println("--------------------------------------------------------------------"); + } +} diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/ReReadableInputStreamResource.java b/sdm/src/main/java/com/sap/cds/sdm/service/ReReadableInputStreamResource.java new file mode 100644 index 00000000..db090ca2 --- /dev/null +++ b/sdm/src/main/java/com/sap/cds/sdm/service/ReReadableInputStreamResource.java @@ -0,0 +1,37 @@ +package com.sap.cds.sdm.service; + +import java.io.InputStream; +import org.springframework.core.io.InputStreamResource; +import org.springframework.http.MediaType; + +/* + * Overriding InputStreamResource to avoid contentLength to be calculated by reading the InputStream. + * Note that we already know the content length + */ +public class ReReadableInputStreamResource extends InputStreamResource { + private final String filename; + private final long contentLength; + private final String mimeType; + + public ReReadableInputStreamResource( + InputStream inputStream, String filename, long contentLength, String mimeType) { + super(inputStream); + this.filename = filename; + this.contentLength = contentLength; + this.mimeType = mimeType; + } + + @Override + public long contentLength() { + return contentLength; + } + + @Override + public String getFilename() { + return filename; + } + + public MediaType getMediaType() { + return MediaType.parseMediaType(mimeType); + } +} diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java b/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java new file mode 100644 index 00000000..b3b6dd94 --- /dev/null +++ b/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java @@ -0,0 +1,215 @@ +package com.sap.cds.sdm.service; + +import java.io.*; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.util.concurrent.*; + +public class ReadAheadInputStream extends InputStream { + private final BufferedInputStream originalStream; + private final long totalSize; + private final int chunkSize = 100 * 1024 * 1024; // 100MB Chunk Size + private long totalBytesRead = 0; + private boolean lastChunkLoaded = false; + private byte[] currentBuffer; + private long currentBufferSize = 0; + private long position = 0; + private MemoryMXBean memoryMXBean; + + private final ExecutorService executor = + Executors.newCachedThreadPool(); // Thread pool to Read next chunk + private final BlockingQueue chunkQueue = + new LinkedBlockingQueue<>(3); // Next chunk is read to a queue + + public ReadAheadInputStream(InputStream inputStream, long totalSize) throws IOException { + if (inputStream == null) { + throw new IllegalArgumentException(" InputStream cannot be null"); + } + + this.originalStream = new BufferedInputStream(inputStream, chunkSize); + this.totalSize = totalSize; + this.currentBuffer = new byte[chunkSize]; + + System.out.println(" Initializing ReadAheadInputStream..."); // Once per one file upload + preloadChunks(); // preload one chunk + loadNextChunk(); // Ensure first chunk is available + } + + public boolean isChunkQueueEmpty() { + return this.chunkQueue.isEmpty(); + } + + private void preloadChunks() { + executor.submit( + () -> { + try { + while (totalBytesRead < totalSize) { + byte[] buffer = new byte[chunkSize]; + long bytesRead = 0; + int readAttempt; + + // 🔹 Keep reading until full chunk is read (unless EOF) + while (bytesRead < chunkSize + && (readAttempt = + originalStream.read(buffer, (int) bytesRead, chunkSize - (int) bytesRead)) + > 0) { + bytesRead += readAttempt; + } + + // Ensure any data read is processed + if (bytesRead > 0) { + totalBytesRead += bytesRead; + + // Trim buffer if last chunk is smaller + if (bytesRead < chunkSize) { + byte[] trimmedBuffer = new byte[(int) bytesRead]; + System.arraycopy(buffer, 0, trimmedBuffer, 0, (int) bytesRead); + buffer = trimmedBuffer; + } + + // Ensure last chunk is enqueued + chunkQueue.put(buffer); + System.out.println(" Background Loaded Chunk: " + bytesRead + " bytes"); + + // Only mark as last chunk after enqueuing the last chunk + if (totalBytesRead >= totalSize) { + lastChunkLoaded = true; + System.out.println(" Last chunk successfully queued and marked."); + break; + } + } + } + } catch (Exception e) { + System.err.println(" Error in background loading: "); + e.printStackTrace(); + } + }); + } + + public synchronized byte[] getLastChunkFromQueue() throws IOException { + try { + if (!chunkQueue.isEmpty()) { + byte[] lastChunk = chunkQueue.poll(2, TimeUnit.SECONDS); // Wait briefly if needed + if (lastChunk != null) { + System.out.println(" Fetching last chunk from queue: " + lastChunk.length + " bytes"); + return lastChunk; + } + } + } catch (InterruptedException e) { + System.err.println(" Interrupted while fetching last chunk from queue"); + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while fetching last chunk", e); + } + + System.err.println("⚠️ No last chunk found in queue. Returning empty."); + return new byte[0]; // Return empty array if queue is unexpectedly empty + } + + public synchronized boolean isEOFReached() { + // True if the last chunk has been read and no bytes are left + return lastChunkLoaded + && chunkQueue.isEmpty() + && position >= currentBufferSize; // && position >= currentBufferSize && totalBytesRead >= + // totalSize; + } + + public synchronized long getRemainingBytes() { + long remaining = totalSize - totalBytesRead; + System.out.println("🔹 Remaining Bytes: " + remaining); + return remaining > 0 ? remaining : 0; + } + + private synchronized void loadNextChunk() throws IOException { + try { + if (chunkQueue.isEmpty() && lastChunkLoaded) { + return; // No more data, return EOF + } + + currentBuffer = chunkQueue.take(); // Fetch from preloaded queue + currentBufferSize = currentBuffer.length; + position = 0; + System.out.println(" Loaded Chunk | Size: " + currentBufferSize); + + // forceGc(); // If the GC is slow, possibly in the busy Read Ahead chunking process it could + // be + // possible that the dequeued items not yet garbage collected. check the heap size to + // do any forceful garbage collection. + + // Ensure the last chunk is processed + if (lastChunkLoaded && chunkQueue.isEmpty()) { + System.out.println(" Last chunk successfully processed and uploaded."); + } + } catch (InterruptedException e) { + e.printStackTrace(); + throw new IOException(" Interrupted while loading next chunk", e); + } + } + + @Override + public synchronized int read() throws IOException { + if (position >= currentBufferSize) { + if (lastChunkLoaded) return -1; // EOF + loadNextChunk(); + } + return currentBuffer[(int) position++] + & 0xFF; // Read the byte buffer into the integer number taking only least significant byte + // into account + } + + @Override + public synchronized int read(byte[] b, int off, int len) throws IOException { + if (position >= currentBufferSize) { + System.out.println("position = " + position + " >= currentBufferSize = " + currentBufferSize); + if (lastChunkLoaded) return -1; + loadNextChunk(); + } + + int bytesToRead = (int) Math.min(len, currentBufferSize - position); + System.arraycopy( + currentBuffer, + (int) position, + b, + off, + bytesToRead); // Read the input stream byte array into the buffer + position += bytesToRead; + + return bytesToRead; + } + + /* + * Close the original input stream and shutdown thread pool + */ + @Override + public void close() throws IOException { + try { + executor.shutdown(); + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + System.err.println("⚠️ Forcing executor shutdown..."); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + throw new IOException(" Error shutting down executor", e); + } + originalStream.close(); + } + + public synchronized void resetStream() throws IOException { + originalStream.reset(); + totalBytesRead = 0; + lastChunkLoaded = false; + position = 0; + System.out.println(" Stream Reset!"); + } + + private void forceGc() { + if (this.memoryMXBean == null) this.memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage heapMemoryUsage = this.memoryMXBean.getHeapMemoryUsage(); + // If the heap has still 1G and above in used section, better to call forceful garbage + // collection. Ideally shouldnt have happened. + if (heapMemoryUsage.getUsed() >= 1073741824) { + System.gc(); + System.out.println("Forceful garbage collection called from ReadAheadInputStream"); + } + } +} diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandler.java b/sdm/src/main/java/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandler.java index 5de820a4..e7634946 100644 --- a/sdm/src/main/java/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandler.java +++ b/sdm/src/main/java/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandler.java @@ -16,6 +16,7 @@ import com.sap.cds.sdm.model.CmisDocument; import com.sap.cds.sdm.model.SDMCredentials; import com.sap.cds.sdm.persistence.DBQuery; +import com.sap.cds.sdm.service.DocumentUploadService; import com.sap.cds.sdm.service.SDMService; import com.sap.cds.sdm.utilities.SDMUtils; import com.sap.cds.services.ServiceException; @@ -25,6 +26,7 @@ import com.sap.cds.services.handler.annotations.On; import com.sap.cds.services.handler.annotations.ServiceName; import com.sap.cds.services.persistence.PersistenceService; +import com.sap.cds.services.utils.StringUtils; import java.io.IOException; import java.io.InputStream; import java.util.Collections; @@ -38,15 +40,29 @@ public class SDMAttachmentsServiceHandler implements EventHandler { private final PersistenceService persistenceService; private final SDMService sdmService; + private final DocumentUploadService documentService; public SDMAttachmentsServiceHandler( - PersistenceService persistenceService, SDMService sdmService) { + PersistenceService persistenceService, + SDMService sdmService, + DocumentUploadService documentService) { this.persistenceService = persistenceService; this.sdmService = sdmService; + this.documentService = documentService; } @On(event = AttachmentService.EVENT_CREATE_ATTACHMENT) public void createAttachment(AttachmentCreateEventContext context) throws IOException { + System.out.println( + "🔹 Event Received: " + + context.getAttachmentEntity().getKey() + + " at " + + System.currentTimeMillis()); + System.out.println( + "content-length is " + context.getParameterInfo().getHeaders().get("content-length")); + String len = context.getParameterInfo().getHeaders().get("content-length"); + long contentLen = !StringUtils.isEmpty(len) ? Long.parseLong(len) : -1; + System.out.println("contentLen = " + contentLen); String subdomain = ""; String repositoryId = SDMConstants.REPOSITORY_ID; AuthenticationInfo authInfo = context.getAuthenticationInfo(); @@ -90,9 +106,24 @@ public void createAttachment(AttachmentCreateEventContext context) throws IOExce cmisDocument.setRepositoryId(repositoryId); cmisDocument.setFolderId(folderId); cmisDocument.setMimeType(mimeType); + cmisDocument.setContentLength(contentLen); SDMCredentials sdmCredentials = TokenHandler.getSDMCredentials(); - JSONObject createResult = - sdmService.createDocument(cmisDocument, sdmCredentials, jwtToken); + // JSONObject createResult = + // sdmService.createDocument(cmisDocument, sdmCredentials, jwtToken); + // JSONObject createResult = + // documentCreator.createDocument(cmisDocument, sdmCredentials, jwtToken); + JSONObject createResult = null; + try { + createResult = + documentService + .createDocumentRx(cmisDocument, sdmCredentials, jwtToken) + .blockingGet(); + System.out.println( + "Synchronous Response from documentServiceRx: " + createResult.toString()); + System.out.println("✅ Upload Finished at: " + System.currentTimeMillis()); + } catch (Exception e) { + System.err.println("Error in documentServiceRx: " + e.getMessage()); + } if (createResult.get("status") == "duplicate") { throw new ServiceException(SDMConstants.getDuplicateFilesError(filename)); diff --git a/sdm/src/test/java/unit/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandlerTest.java b/sdm/src/test/java/unit/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandlerTest.java index cd1cd15a..7bfa453c 100644 --- a/sdm/src/test/java/unit/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandlerTest.java +++ b/sdm/src/test/java/unit/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandlerTest.java @@ -26,6 +26,7 @@ import com.sap.cds.sdm.model.CmisDocument; import com.sap.cds.sdm.model.SDMCredentials; import com.sap.cds.sdm.persistence.DBQuery; +import com.sap.cds.sdm.service.DocumentUploadService; import com.sap.cds.sdm.service.SDMService; import com.sap.cds.sdm.service.SDMServiceImpl; import com.sap.cds.sdm.service.handler.SDMAttachmentsServiceHandler; @@ -59,6 +60,8 @@ public class SDMAttachmentsServiceHandlerTest { @Mock private AttachmentMarkAsDeletedEventContext attachmentMarkAsDeletedEventContext; @Mock private AttachmentRestoreEventContext restoreEventContext; private SDMService sdmService; + + private DocumentUploadService documentService; @Mock private CdsModel cdsModel; @Mock private CdsEntity cdsEntity; @@ -87,13 +90,15 @@ public void setUp() { MockitoAnnotations.openMocks(this); persistenceService = mock(PersistenceService.class); sdmService = mock(SDMServiceImpl.class); + documentService = mock(DocumentUploadService.class); when(attachmentMarkAsDeletedEventContext.getContentId()) .thenReturn("objectId:folderId:entity:subdomain"); when(attachmentMarkAsDeletedEventContext.getDeletionUserInfo()).thenReturn(deletionUserInfo); when(deletionUserInfo.getName()).thenReturn(userEmail); when(mockContext.getUserInfo()).thenReturn(userInfo); when(userInfo.getName()).thenReturn(userEmail); - handlerSpy = spy(new SDMAttachmentsServiceHandler(persistenceService, sdmService)); + handlerSpy = + spy(new SDMAttachmentsServiceHandler(persistenceService, sdmService, documentService)); } @Test From b87b0b319578a059fd9c6654ecd181e5ab0c1600 Mon Sep 17 00:00:00 2001 From: Mahesh S Date: Sat, 8 Mar 2025 12:42:37 +0530 Subject: [PATCH 3/4] EventHandler to call DocumentUploadService instead of SDMServiceImpl for file upload --- .../cds/sdm/service/handler/SDMAttachmentsServiceHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandler.java b/sdm/src/main/java/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandler.java index e7634946..f7b87639 100644 --- a/sdm/src/main/java/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandler.java +++ b/sdm/src/main/java/com/sap/cds/sdm/service/handler/SDMAttachmentsServiceHandler.java @@ -54,7 +54,7 @@ public SDMAttachmentsServiceHandler( @On(event = AttachmentService.EVENT_CREATE_ATTACHMENT) public void createAttachment(AttachmentCreateEventContext context) throws IOException { System.out.println( - "🔹 Event Received: " + "Event Received: " + context.getAttachmentEntity().getKey() + " at " + System.currentTimeMillis()); From 4dc8b77c5c3b0dd8fbff0834216fbef1edfcc0e9 Mon Sep 17 00:00:00 2001 From: Mahesh S Date: Thu, 13 Mar 2025 17:54:15 +0530 Subject: [PATCH 4/4] Adding retry for every appendContent call so that just with one appendcontent failing the whole operation wont fail --- .../cds/sdm/service/ReadAheadInputStream.java | 4 +-- .../com/sap/cds/sdm/service/RetryUtils.java | 36 +++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) create mode 100644 sdm/src/main/java/com/sap/cds/sdm/service/RetryUtils.java diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java b/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java index b3b6dd94..3ba4dea7 100644 --- a/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java +++ b/sdm/src/main/java/com/sap/cds/sdm/service/ReadAheadInputStream.java @@ -49,7 +49,7 @@ private void preloadChunks() { long bytesRead = 0; int readAttempt; - // 🔹 Keep reading until full chunk is read (unless EOF) + // Keep reading until full chunk is read until EOF while (bytesRead < chunkSize && (readAttempt = originalStream.read(buffer, (int) bytesRead, chunkSize - (int) bytesRead)) @@ -116,7 +116,7 @@ public synchronized boolean isEOFReached() { public synchronized long getRemainingBytes() { long remaining = totalSize - totalBytesRead; - System.out.println("🔹 Remaining Bytes: " + remaining); + System.out.println(" Remaining Bytes: " + remaining); return remaining > 0 ? remaining : 0; } diff --git a/sdm/src/main/java/com/sap/cds/sdm/service/RetryUtils.java b/sdm/src/main/java/com/sap/cds/sdm/service/RetryUtils.java new file mode 100644 index 00000000..3bd96728 --- /dev/null +++ b/sdm/src/main/java/com/sap/cds/sdm/service/RetryUtils.java @@ -0,0 +1,36 @@ +package com.sap.cds.sdm.service; + +import io.reactivex.Flowable; +import io.reactivex.functions.Function; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import org.reactivestreams.Publisher; +import org.springframework.web.client.HttpClientErrorException; +import org.springframework.web.client.HttpServerErrorException; + +public class RetryUtils { + + public static Predicate shouldRetry() { + return throwable -> + throwable instanceof HttpClientErrorException + || throwable instanceof HttpServerErrorException; + } + + public static Function, Publisher> retryLogic(int maxAttempts) { + return errors -> + errors.flatMap( + error -> + Flowable.range(1, maxAttempts + 1) + .concatMap( + attempt -> { + if (shouldRetry().test(error) && attempt <= maxAttempts) { + long delay = + (long) + Math.pow(2, attempt); // Exponential backoff: 2^attempt seconds + return Flowable.timer(delay, TimeUnit.SECONDS).map(ignored -> error); + } else { + return Flowable.error(error); + } + })); + } +}