From 01938d76639266d29dfe41129e6d814af445a3c8 Mon Sep 17 00:00:00 2001 From: Akshat Harit Date: Fri, 3 Oct 2025 14:40:46 -0700 Subject: [PATCH 1/2] Remove oauth2 folder Make changes in the sdk to accept the new class Test changes Enable full javac linting and resolve legacy warnings upgrade maven-compiler-plugin to 3.13.0, compile with --release 8, and turn on -Xlint:all with targeted suppressions add serialVersionUID, swap Jackson currentName() usage, refine retry policy constructors, and avoid deprecated SSL provider APIs Add script and pom to run validation and then fix warnings Added static analysis checks Add more rules for static analysis Fix tests --- .gitignore | 1 + CHANGES.md | 5 + README.md | 8 +- config/checkstyle/unused-imports.xml | 12 + config/pmd/ruleset.xml | 13 + pom.xml | 139 ++++++- scripts/validate-lts.ps1 | 24 ++ .../azure/datalake/store/ADLException.java | 13 + .../datalake/store/ADLFileInputStream.java | 7 +- .../datalake/store/ADLFileOutputStream.java | 3 - .../azure/datalake/store/ADLStoreClient.java | 235 ++++++----- .../azure/datalake/store/ADLStoreOptions.java | 28 +- .../azure/datalake/store/ContentSummary.java | 16 +- .../store/ContentSummaryProcessor.java | 5 - .../microsoft/azure/datalake/store/Core.java | 15 +- .../azure/datalake/store/DirectoryEntry.java | 17 + .../datalake/store/DirectoryEntryType.java | 4 +- .../azure/datalake/store/HttpTransport.java | 16 +- .../datalake/store/OperationResponse.java | 6 + .../azure/datalake/store/QueryParams.java | 33 +- .../datalake/store/ReadBufferManager.java | 4 +- .../datalake/store/ReadBufferWorker.java | 9 +- .../azure/datalake/store/RequestOptions.java | 6 + .../datalake/store/SSLSocketFactoryEx.java | 53 ++- .../microsoft/azure/datalake/store/Utils.java | 2 - .../azure/datalake/store/acl/AclAction.java | 14 +- .../azure/datalake/store/acl/AclEntry.java | 20 +- .../azure/datalake/store/acl/AclScope.java | 4 +- .../azure/datalake/store/acl/AclStatus.java | 11 +- .../azure/datalake/store/acl/AclType.java | 6 +- .../store/oauth2/AccessTokenProvider.java | 80 ---- .../store/oauth2/AzureADAuthenticator.java | 388 ------------------ .../datalake/store/oauth2/AzureADToken.java | 18 - .../oauth2/ClientCredsTokenProvider.java | 41 -- .../store/oauth2/DeviceCodeCallback.java | 41 -- .../store/oauth2/DeviceCodeTokenProvider.java | 64 --- .../oauth2/DeviceCodeTokenProviderHelper.java | 241 ----------- .../store/oauth2/MsiTokenProvider.java | 97 ----- .../RefreshTokenBasedTokenProvider.java | 66 --- .../store/oauth2/RefreshTokenInfo.java | 20 - .../oauth2/UserPasswordTokenProvider.java | 34 -- .../datalake/store/oauth2/package-info.java | 10 - .../ExponentialBackoffPolicy.java | 43 +- .../ExponentialBackoffPolicyforMSI.java | 11 +- .../store/retrypolicies/NoRetryPolicy.java | 13 + .../NonIdempotentRetryPolicy.java | 13 + src/test/configfiles/.gitignore | 2 +- .../java/com/contoso/helpers/HelperUtils.java | 81 ++++ .../contoso/liveservicetests/TestCore.java | 28 +- .../contoso/liveservicetests/TestFileSdk.java | 94 +++-- .../liveservicetests/TestPositionedReads.java | 47 +-- .../com/contoso/mocktests/TestSdkMock.java | 13 +- 52 files changed, 778 insertions(+), 1396 deletions(-) create mode 100644 config/checkstyle/unused-imports.xml create mode 100644 config/pmd/ruleset.xml create mode 100644 scripts/validate-lts.ps1 delete mode 100644 src/main/java/com/microsoft/azure/datalake/store/oauth2/AccessTokenProvider.java delete mode 100644 src/main/java/com/microsoft/azure/datalake/store/oauth2/AzureADAuthenticator.java delete mode 100644 src/main/java/com/microsoft/azure/datalake/store/oauth2/AzureADToken.java delete mode 100644 src/main/java/com/microsoft/azure/datalake/store/oauth2/ClientCredsTokenProvider.java delete mode 100644 src/main/java/com/microsoft/azure/datalake/store/oauth2/DeviceCodeCallback.java delete mode 100644 src/main/java/com/microsoft/azure/datalake/store/oauth2/DeviceCodeTokenProvider.java delete mode 100644 src/main/java/com/microsoft/azure/datalake/store/oauth2/DeviceCodeTokenProviderHelper.java delete mode 100644 src/main/java/com/microsoft/azure/datalake/store/oauth2/MsiTokenProvider.java delete mode 100644 src/main/java/com/microsoft/azure/datalake/store/oauth2/RefreshTokenBasedTokenProvider.java delete mode 100644 src/main/java/com/microsoft/azure/datalake/store/oauth2/RefreshTokenInfo.java delete mode 100644 src/main/java/com/microsoft/azure/datalake/store/oauth2/UserPasswordTokenProvider.java delete mode 100644 src/main/java/com/microsoft/azure/datalake/store/oauth2/package-info.java diff --git a/.gitignore b/.gitignore index 0f4d742..e274ea0 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ target node_modules .idea +.vscode *.iml *.pydevproject diff --git a/CHANGES.md b/CHANGES.md index 21d2c4c..9b4876f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,4 +1,9 @@ # Changes to the SDK +### Version 3.0.0 +1. Replaced custom OAuth2 implementation with Azure Identity `TokenCredential` support. +2. Updated `ADLStoreClient` factory methods to accept Azure Identity credentials and added dependency on `com.azure:azure-identity` 1.17.0. +3. Updated live service tests to authenticate with `DefaultAzureCredential`. + ### Version 2.3.10 1. Update log4j to mitigate CVE-2021-44228. Also update junit. 2. ERRATA: This impacts only tests in this repository. SLF4J(See http://slf4j.org/log4shell.html) interface is used for logging and if log4j is available, it can be used depending on customer configuration. The version depends on customer application built using the sdk. Previous versions of sdk(<2.3.10) are not impacted by log4j CVE diff --git a/README.md b/README.md index 7676d28..0d6729c 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,10 @@ # Azure Datalake Store client for Java - - - For an introduction to Azure Data Lake, see here: https://azure.microsoft.com/en-us/services/data-lake-store/ - For getting started introduction to the SDK, see here: https://github.com/Azure-samples/data-lake-store-java-upload-download-get-started - For the SDK Javadoc, see here: http://azure.github.io/azure-data-lake-store-java/javadoc - - - +## Dependency maintenance +- To refresh dependencies to the latest stable releases, run `mvn versions:use-latest-releases -DincludeScope=compile "-Dmaven.version.ignore=.*-alpha.*,.*-beta.*,.*-rc.*"` from the project root. +- Review the resulting changes in `pom.xml` before committing to confirm no prerelease versions slipped through. \ No newline at end of file diff --git a/config/checkstyle/unused-imports.xml b/config/checkstyle/unused-imports.xml new file mode 100644 index 0000000..0905d4c --- /dev/null +++ b/config/checkstyle/unused-imports.xml @@ -0,0 +1,12 @@ + + + + + + + + + + diff --git a/config/pmd/ruleset.xml b/config/pmd/ruleset.xml new file mode 100644 index 0000000..fbebeef --- /dev/null +++ b/config/pmd/ruleset.xml @@ -0,0 +1,13 @@ + + + Rules focused on catching unused code and common correctness issues. + + + + + + + diff --git a/pom.xml b/pom.xml index c1c9c71..2f45aac 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,8 @@ UTF-8 + 2.20.0 + 8 @@ -56,6 +58,10 @@ **/*.properties + + src/test/resources + false + @@ -63,16 +69,24 @@ org.apache.maven.plugins maven-compiler-plugin - 3.5.1 + 3.14.1 - 1.8 - 1.8 + ${maven.compiler.release} + true + true + + -Xlint:all + + -Xlint:-processing + + -Xlint:-options + org.apache.maven.plugins maven-install-plugin - 2.5.2 + 3.1.4 true @@ -80,7 +94,7 @@ org.apache.maven.plugins maven-jar-plugin - 2.5 + 3.4.2 @@ -96,7 +110,7 @@ org.apache.maven.plugins maven-source-plugin - 3.0.0 + 3.3.1 attach-sources @@ -109,7 +123,7 @@ org.apache.maven.plugins maven-javadoc-plugin - 3.0.1 + 3.12.0 true @@ -125,7 +139,7 @@ org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M7 + 3.5.4 true true @@ -134,7 +148,7 @@ org.apache.maven.plugins maven-resources-plugin - 2.6 + 3.3.1 copy-resources @@ -152,6 +166,13 @@ .gitignore + + src/test/resources + false + + log4j2.xml + + @@ -164,53 +185,133 @@ com.fasterxml.jackson.core jackson-core - 2.8.6 + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} org.slf4j slf4j-api - 1.7.21 + 2.0.17 org.wildfly.openssl wildfly-openssl - 1.0.7.Final + 2.2.5.Final + + + com.azure + azure-identity + 1.18.0 junit junit - 4.13.1 + 4.13.2 test com.squareup.okhttp3 mockwebserver - 3.2.0 + 5.1.0 test org.apache.logging.log4j - log4j-slf4j-impl - 2.15.0 + log4j-slf4j2-impl + 2.25.2 test org.apache.logging.log4j log4j-api - 2.17.0 + 2.25.2 test org.apache.logging.log4j log4j-core - 2.17.1 + 2.25.2 test + + java11 + + 11 + + + + java17 + + 17 + + + + java21 + + 21 + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + 3.6.0 + + + com.puppycrawl.tools + checkstyle + 11.1.0 + + + + + check-unused-imports + validate + + check + + + true + config/checkstyle/unused-imports.xml + true + true + + + + + + org.apache.maven.plugins + maven-pmd-plugin + 3.27.0 + + 21 + true + + config/pmd/ruleset.xml + + + + + pmd-check + validate + + check + + + + + + + adlsRelease @@ -218,7 +319,7 @@ org.apache.maven.plugins maven-gpg-plugin - 1.6 + 3.2.8 sign-artifacts diff --git a/scripts/validate-lts.ps1 b/scripts/validate-lts.ps1 new file mode 100644 index 0000000..09fe899 --- /dev/null +++ b/scripts/validate-lts.ps1 @@ -0,0 +1,24 @@ +param( + [string]$MavenExecutable = "mvn" +) + +$commands = @( + @{ Name = "Java 8"; Args = @("clean", "compile", "-DskipTests") }, + @{ Name = "Java 11"; Args = @("-Pjava11", "clean", "compile", "-DskipTests") }, + @{ Name = "Java 17"; Args = @("-Pjava17", "clean", "compile", "-DskipTests") }, + @{ Name = "Java 21"; Args = @("-Pjava21", "clean", "compile", "-DskipTests") }, + @{ Name = "Restore Java 8"; Args = @("clean", "compile", "-DskipTests") } +) + +foreach ($command in $commands) { + Write-Host "==> Building with $($command.Name)" -ForegroundColor Cyan + Write-Host " $MavenExecutable $($command.Args -join ' ')" -ForegroundColor DarkGray + + & $MavenExecutable @($command.Args) + if ($LASTEXITCODE -ne 0) { + Write-Error "Build failed for $($command.Name)" + exit $LASTEXITCODE + } +} + +Write-Host "All LTS compilation checks completed successfully." -ForegroundColor Green diff --git a/src/main/java/com/microsoft/azure/datalake/store/ADLException.java b/src/main/java/com/microsoft/azure/datalake/store/ADLException.java index 2f902dd..ab3c9b9 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/ADLException.java +++ b/src/main/java/com/microsoft/azure/datalake/store/ADLException.java @@ -16,6 +16,8 @@ */ public class ADLException extends IOException { + private static final long serialVersionUID = 1L; + /** * the HTTP response code returned by the server */ @@ -65,10 +67,21 @@ public class ADLException extends IOException { */ public String remoteExceptionJavaClassName = null; + /** + * Creates a new exception with the supplied message. + * + * @param message description of the failure condition. + */ public ADLException(String message) { super(message); } + /** + * Creates a new exception with the supplied message and root cause. + * + * @param message description of the failure condition. + * @param initCause underlying exception that triggered this failure. + */ public ADLException(String message, Throwable initCause) { super(message, initCause); } diff --git a/src/main/java/com/microsoft/azure/datalake/store/ADLFileInputStream.java b/src/main/java/com/microsoft/azure/datalake/store/ADLFileInputStream.java index 4d16ff5..9063a65 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/ADLFileInputStream.java +++ b/src/main/java/com/microsoft/azure/datalake/store/ADLFileInputStream.java @@ -432,8 +432,11 @@ public void close() throws IOException { streamClosed = true; buffer = null; // de-reference the buffer so it can be GC'ed sooner } - - + /** + * Returns the fully qualified path for the file backing this stream. + * + * @return path of the file represented by the stream. + */ public String getFilename() { return this.filename; } diff --git a/src/main/java/com/microsoft/azure/datalake/store/ADLFileOutputStream.java b/src/main/java/com/microsoft/azure/datalake/store/ADLFileOutputStream.java index d5f0ea8..5df7c88 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/ADLFileOutputStream.java +++ b/src/main/java/com/microsoft/azure/datalake/store/ADLFileOutputStream.java @@ -6,7 +6,6 @@ package com.microsoft.azure.datalake.store; -import com.microsoft.azure.datalake.store.retrypolicies.ExponentialBackoffPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,7 +27,6 @@ public class ADLFileOutputStream extends OutputStream { private final String filename; private final ADLStoreClient client; - private final boolean isCreate; private final String leaseId; private int blocksize = 4 * 1024 *1024; // default buffer size of 4MB @@ -46,7 +44,6 @@ public class ADLFileOutputStream extends OutputStream { String leaseId) throws IOException { this.filename = filename; this.client = client; - this.isCreate = isCreate; this.leaseId = (leaseId == null)? UUID.randomUUID().toString() : leaseId; if (!isCreate) initializeAppendStream(); diff --git a/src/main/java/com/microsoft/azure/datalake/store/ADLStoreClient.java b/src/main/java/com/microsoft/azure/datalake/store/ADLStoreClient.java index 75b7dcb..252ac91 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/ADLStoreClient.java +++ b/src/main/java/com/microsoft/azure/datalake/store/ADLStoreClient.java @@ -6,9 +6,11 @@ package com.microsoft.azure.datalake.store; +import com.azure.core.credential.AccessToken; +import com.azure.core.credential.TokenCredential; +import com.azure.core.credential.TokenRequestContext; import com.microsoft.azure.datalake.store.acl.AclEntry; import com.microsoft.azure.datalake.store.acl.AclStatus; -import com.microsoft.azure.datalake.store.oauth2.*; import com.microsoft.azure.datalake.store.retrypolicies.ExponentialBackoffPolicy; import com.microsoft.azure.datalake.store.retrypolicies.NonIdempotentRetryPolicy; import com.microsoft.azure.datalake.store.SSLSocketFactoryEx.SSLChannelMode; @@ -21,6 +23,9 @@ import java.lang.reflect.Constructor; import java.net.URI; import java.net.URISyntaxException; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.*; import java.util.concurrent.atomic.AtomicLong; @@ -33,8 +38,9 @@ public class ADLStoreClient { private final String accountFQDN; - private String accessToken; - private final AccessTokenProvider tokenProvider; + private final TokenCredential tokenCredential; + private final StaticTokenCredential staticTokenCredential; + private AccessToken cachedAccessToken; private static final Logger log = LoggerFactory.getLogger("com.microsoft.azure.datalake.store"); // package-default logging policy private static final AtomicLong clientIdCounter = new AtomicLong(0); private final long clientId; @@ -56,6 +62,9 @@ public class ADLStoreClient { private boolean enableConditionalCreate = false; + private static final String DATA_LAKE_SCOPE = "https://datalake.azure.net/.default"; + private static final Duration TOKEN_REFRESH_SKEW = Duration.ofMinutes(5); + private static String sdkVersion = null; static { InputStream is = ADLStoreClient.class.getResourceAsStream("/adlsdkversion.properties"); @@ -100,10 +109,10 @@ public class ADLStoreClient { // private constructor, references should be obtained using the createClient factory method - private ADLStoreClient(String accountFQDN, String accessToken, long clientId, AccessTokenProvider tokenProvider) { + private ADLStoreClient(String accountFQDN, TokenCredential tokenCredential, StaticTokenCredential staticTokenCredential, long clientId) { this.accountFQDN = accountFQDN; - this.accessToken = "Bearer " + accessToken; - this.tokenProvider = tokenProvider; + this.tokenCredential = Objects.requireNonNull(tokenCredential, "tokenCredential cannot be null"); + this.staticTokenCredential = staticTokenCredential; this.clientId = clientId; this.userAgentString = userAgent; } @@ -113,19 +122,19 @@ private ADLStoreClient(String accountFQDN, String accessToken, long clientId, Ac * * @param accountFQDN string containing the fully qualified domain name of the account. * e.g., contoso.azuredatalakestore.net - * @param token {@link AzureADToken} object that contains the AAD token to use + * @param tokenCredential {@link TokenCredential} used to acquire AAD tokens * @return the client object */ - public static ADLStoreClient createClient(String accountFQDN, AzureADToken token) { + public static ADLStoreClient createClient(String accountFQDN, TokenCredential tokenCredential) { if (accountFQDN == null || accountFQDN.trim().equals("")) { throw new IllegalArgumentException("account name is required"); } - if (token == null || token.accessToken == null || token.accessToken.equals("")) { - throw new IllegalArgumentException("token is required"); + if (tokenCredential == null) { + throw new IllegalArgumentException("token credential is required"); } long clientId = clientIdCounter.incrementAndGet(); log.trace("ADLStoreClient {} created for {} using SDK version {}", clientId, accountFQDN, sdkVersion); - return new ADLStoreClient(accountFQDN, token.accessToken, clientId, null); + return new ADLStoreClient(accountFQDN, tokenCredential, null, clientId); } /** @@ -140,98 +149,15 @@ public static ADLStoreClient createClient(String accountFQDN, String accessToken if (accountFQDN == null || accountFQDN.trim().equals("")) { throw new IllegalArgumentException("account name is required"); } - if (accessToken == null || accessToken.equals("")) { throw new IllegalArgumentException("token is required"); } long clientId = clientIdCounter.incrementAndGet(); log.trace("ADLStoreClient {} created for {} using SDK version {}", clientId, accountFQDN, sdkVersion); - return new ADLStoreClient(accountFQDN, accessToken, clientId, null); + StaticTokenCredential credential = new StaticTokenCredential(accessToken); + return new ADLStoreClient(accountFQDN, credential, credential, clientId); } - /** - * gets an {@code ADLStoreClient} object. - * - * @param accountFQDN string containing the fully qualified domain name of the account. - * For example, contoso.azuredatalakestore.net - * @param tokenProvider {@link AccessTokenProvider} that can provide the AAD token - * @return the client object - */ - public static ADLStoreClient createClient(String accountFQDN, AccessTokenProvider tokenProvider) { - if (accountFQDN == null || accountFQDN.trim().equals("")) { - throw new IllegalArgumentException("account name is required"); - } - - if (tokenProvider == null) { - throw new IllegalArgumentException("token provider is required"); - } - long clientId = clientIdCounter.incrementAndGet(); - log.trace("ADLStoreClient {} created for {} using SDK version {}", clientId, accountFQDN, sdkVersion); - return new ADLStoreClient(accountFQDN, null, clientId, tokenProvider); - } - - - /** - * gets an {@code ADLStoreClient} object. - * - * @param accountFQDN string containing the fully qualified domain name of the account. - * For example, contoso.azuredatalakestore.net - * @param tokenProvider {@link ClientCredsTokenProvider} that can provide the AAD token - * @return the client object - */ - public static ADLStoreClient createClient(String accountFQDN, ClientCredsTokenProvider tokenProvider) { - // just a convenience overload, for easy discoverability in IDE's autocompletion. - return createClient(accountFQDN, (AccessTokenProvider) tokenProvider); - } - - /** - * gets an {@code ADLStoreClient} object. - * - * @param accountFQDN string containing the fully qualified domain name of the account. - * For example, contoso.azuredatalakestore.net - * @param tokenProvider {@link RefreshTokenBasedTokenProvider} that can provide the AAD token - * @return the client object - */ - public static ADLStoreClient createClient(String accountFQDN, RefreshTokenBasedTokenProvider tokenProvider) { - // just a convenience overload, for easy discoverability in IDE's autocompletion. - return createClient(accountFQDN, (AccessTokenProvider) tokenProvider); } - - /** - * gets an {@code ADLStoreClient} object. - * - * @param accountFQDN string containing the fully qualified domain name of the account. - * For example, contoso.azuredatalakestore.net - * @param tokenProvider {@link UserPasswordTokenProvider} that can provide the AAD token - * @return the client object - */ - public static ADLStoreClient createClient(String accountFQDN, UserPasswordTokenProvider tokenProvider) { - // just a convenience overload, for easy discoverability in IDE's autocompletion. - return createClient(accountFQDN, (AccessTokenProvider) tokenProvider); } - - /** - * gets an {@code ADLStoreClient} object. - * - * @param accountFQDN string containing the fully qualified domain name of the account. - * For example, contoso.azuredatalakestore.net - * @param tokenProvider {@link DeviceCodeTokenProvider} that can provide the AAD token - * @return the client object - */ - public static ADLStoreClient createClient(String accountFQDN, DeviceCodeTokenProvider tokenProvider) { - // just a convenience overload, for easy discoverability in IDE's autocompletion. - return createClient(accountFQDN, (AccessTokenProvider) tokenProvider); } - - /** - * gets an {@code ADLStoreClient} object. - * - * @param accountFQDN string containing the fully qualified domain name of the account. - * For example, contoso.azuredatalakestore.net - * @param tokenProvider {@link DeviceCodeTokenProvider} that can provide the AAD token - * @return the client object - */ - public static ADLStoreClient createClient(String accountFQDN, MsiTokenProvider tokenProvider) { - // just a convenience overload, for easy discoverability in IDE's autocompletion. - return createClient(accountFQDN, (AccessTokenProvider) tokenProvider); } - ExponentialBackoffPolicy makeExponentialBackoffPolicy() { return new ExponentialBackoffPolicy(maxRetries, 0, exponentialRetryInterval, exponentialFactor); } @@ -1090,20 +1016,26 @@ public synchronized void setOptions(ADLStoreOptions o) throws IOException { * * @param token The OAuth2 Token */ - public synchronized void updateToken(AzureADToken token) { - log.trace("AAD Token Updated for client client {} for account {}", clientId, accountFQDN); - this.accessToken = "Bearer " + token.accessToken; + public synchronized void updateToken(AccessToken token) { + if (token == null || token.getToken() == null || token.getToken().isEmpty()) { + throw new IllegalArgumentException("token is required"); + } + updateToken(token.getToken()); } /** - * update token on existing client. + * update token on existing client when a static token credential is used. * This is useful if the client is expected to be used over long time, and token has expired. * * @param accessToken The AAD Token string */ public synchronized void updateToken(String accessToken) { + if (this.staticTokenCredential == null) { + throw new UnsupportedOperationException("updateToken is only supported when the client was created with a static access token"); + } log.trace("AAD Token Updated for client client {} for account {}", clientId, accountFQDN); - this.accessToken = "Bearer " + accessToken; + this.staticTokenCredential.setToken(accessToken); + this.cachedAccessToken = this.staticTokenCredential.getCurrentToken(); } /* ----------------------------------------------------------------------------------------------------------*/ @@ -1125,11 +1057,22 @@ synchronized int getReadAheadQueueDepth() { * @throws IOException thrown if a token provider is being used and the token provider has problem getting token */ synchronized String getAccessToken() throws IOException { - if (tokenProvider != null ) { - return "Bearer " + tokenProvider.getToken().accessToken; - } else { - return accessToken; + boolean needsRefresh = cachedAccessToken == null || shouldRefreshAccessToken(cachedAccessToken); + if (needsRefresh) { + OffsetDateTime previousExpiry = cachedAccessToken == null ? null : cachedAccessToken.getExpiresAt(); + if (cachedAccessToken == null) { + log.info("Client {} acquiring initial access token for {}", clientId, accountFQDN); + } else if (previousExpiry != null) { + log.info("Client {} refreshing access token expiring at {}", clientId, previousExpiry); + } else { + log.info("Client {} refreshing access token with unknown expiry", clientId); + } + cachedAccessToken = acquireAccessToken(); + } else if (log.isDebugEnabled()) { + OffsetDateTime expiry = cachedAccessToken.getExpiresAt(); + log.debug("Client {} reusing cached access token expiring at {}", clientId, expiry); } + return "Bearer " + cachedAccessToken.getToken(); } @@ -1293,14 +1236,82 @@ public IOException getExceptionFromResponse(OperationResponse resp, String defau private static IOException getRemoteException(String className, String message) { try { - Class clazz = Class.forName(className); - if (!IOException.class.isAssignableFrom(clazz)) { return new IOException(message); } - Constructor c = clazz.getConstructor(String.class); - return (IOException) c.newInstance(message); - } catch (Exception ex) { - return new IOException(message); - } + Class clazz = Class.forName(className); + if (!IOException.class.isAssignableFrom(clazz)) { + return new IOException(message); + } + Constructor constructor = clazz.asSubclass(IOException.class).getConstructor(String.class); + return constructor.newInstance(message); + } catch (Exception ex) { + return new IOException(message); + } + } + + private synchronized AccessToken acquireAccessToken() throws IOException { + try { + log.debug("Client {} requesting token from credential", clientId); + AccessToken token = tokenCredential.getTokenSync(createTokenRequestContext()); + if (token == null || token.getToken() == null || token.getToken().isEmpty()) { + throw new IOException("TokenCredential returned an empty access token"); + } + if (log.isInfoEnabled()) { + OffsetDateTime expiresAt = token.getExpiresAt(); + log.info("Client {} acquired access token expiring at {}", clientId, expiresAt); + } + return token; + } catch (RuntimeException ex) { + throw new IOException("Failed to acquire token from TokenCredential", ex); + } + } + + private boolean shouldRefreshAccessToken(AccessToken token) { + if (token == null) { + return true; + } + OffsetDateTime expiresAt = token.getExpiresAt(); + if (expiresAt == null) { + return false; + } + OffsetDateTime now = OffsetDateTime.now(ZoneOffset.UTC); + OffsetDateTime refreshOn = expiresAt.minus(TOKEN_REFRESH_SKEW); + return !refreshOn.isAfter(now); + } + + private TokenRequestContext createTokenRequestContext() { + TokenRequestContext context = new TokenRequestContext(); + context.addScopes(DATA_LAKE_SCOPE); + return context; + } + + private static final class StaticTokenCredential implements TokenCredential { + private static final Duration STATIC_TOKEN_EXPIRY = Duration.ofDays(3650); + private volatile AccessToken token; + + private StaticTokenCredential(String accessToken) { + setToken(accessToken); + } + + @Override + public reactor.core.publisher.Mono getToken(TokenRequestContext request) { + AccessToken current = token; + if (current == null) { + return reactor.core.publisher.Mono.error(new IllegalStateException("Access token not initialized")); } + return reactor.core.publisher.Mono.just(current); + } + + private synchronized void setToken(String accessToken) { + if (accessToken == null || accessToken.trim().isEmpty()) { + throw new IllegalArgumentException("accessToken is required"); + } + OffsetDateTime expires = OffsetDateTime.now(ZoneOffset.UTC).plus(STATIC_TOKEN_EXPIRY); + this.token = new AccessToken(accessToken, expires); + } + + private AccessToken getCurrentToken() { + return token; + } + } } \ No newline at end of file diff --git a/src/main/java/com/microsoft/azure/datalake/store/ADLStoreOptions.java b/src/main/java/com/microsoft/azure/datalake/store/ADLStoreOptions.java index 57c699d..77fbab5 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/ADLStoreOptions.java +++ b/src/main/java/com/microsoft/azure/datalake/store/ADLStoreOptions.java @@ -31,6 +31,9 @@ public class ADLStoreOptions { static final int DEFAULT_EXPONENTIAL_FACTOR = 4; private boolean enableConditionalCreate = false; + /** + * Creates a new options instance with the SDK defaults applied. + */ public ADLStoreOptions() { } @@ -173,6 +176,7 @@ int getDefaultTimeout() { * If set to false then SDK would use default cipher suite. * * @param alterCipherSuits true if the cipher suite alteration is required. + * @return {@code this} */ public ADLStoreOptions alterCipherSuits(boolean alterCipherSuits) { this.alterCipherSuits = alterCipherSuits; @@ -184,9 +188,10 @@ boolean shouldAlterCipherSuits() { } /** - * Set SSL Channel mode - * @param sslChannelMode SSL Channel mdoe to set - * @return + * Sets the SSL channel mode used by the transport layer. + * + * @param sslChannelMode name of the {@link SSLChannelMode} to apply. + * @return {@code this} */ public ADLStoreOptions setSSLChannelMode(String sslChannelMode) { SSLChannelMode[] sslChannelModes = SSLChannelMode.values(); @@ -212,9 +217,10 @@ int getMaxRetries() { } /** - * sets the number of retries for exponential retry policy used by methods in ADLStoreClient objects - * @param maxRetries number of retries for exponential retry policy + * Sets the number of retries for the exponential retry policy used by client operations. * + * @param maxRetries number of retries for exponential retry policy + * @return {@code this} */ public ADLStoreOptions setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; @@ -226,9 +232,10 @@ int getExponentialRetryInterval() { } /** - * sets the retry interval for exponential retry policy used by methods in ADLStoreClient objects - * @param exponentialRetryInterval retry interval for exponential retry policy in milliseconds + * Sets the initial retry interval for the exponential retry policy used by client operations. * + * @param exponentialRetryInterval retry interval for exponential retry policy in milliseconds + * @return {@code this} */ public ADLStoreOptions setExponentialRetryInterval(int exponentialRetryInterval) { this.exponentialRetryInterval = exponentialRetryInterval; @@ -240,9 +247,10 @@ int getExponentialFactor() { } /** - * sets the factor of backoff for exponential retry policy used by methods in ADLStoreClient objects - * @param exponentialFactor retry backoff factor for exponential retry policy + * Sets the multiplier applied on each retry attempt for the exponential retry policy. * + * @param exponentialFactor retry backoff factor for exponential retry policy + * @return {@code this} */ public ADLStoreOptions setExponentialFactor(int exponentialFactor) { this.exponentialFactor = exponentialFactor; @@ -252,7 +260,7 @@ public ADLStoreOptions setExponentialFactor(int exponentialFactor) { /** * Config to enable create only if conditional delete has passed * @param enableConditionalCreate boolean value to set - * @return + * @return {@code this} */ public ADLStoreOptions setEnableConditionalCreate(boolean enableConditionalCreate) { this.enableConditionalCreate = enableConditionalCreate; diff --git a/src/main/java/com/microsoft/azure/datalake/store/ContentSummary.java b/src/main/java/com/microsoft/azure/datalake/store/ContentSummary.java index 73329b7..8debf6b 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/ContentSummary.java +++ b/src/main/java/com/microsoft/azure/datalake/store/ContentSummary.java @@ -31,11 +31,19 @@ public class ContentSummary { */ public final long spaceConsumed; + /** + * Creates a new summary populated with the statistics returned from the service. + * + * @param length length of the file in bytes. + * @param directoryCount number of child directories. + * @param fileCount number of child files. + * @param spaceConsumed space consumed by the directory hierarchy in bytes. + */ public ContentSummary( - long length, - long directoryCount, - long fileCount, - long spaceConsumed + long length, + long directoryCount, + long fileCount, + long spaceConsumed ) { this.length = length; this.directoryCount = directoryCount; diff --git a/src/main/java/com/microsoft/azure/datalake/store/ContentSummaryProcessor.java b/src/main/java/com/microsoft/azure/datalake/store/ContentSummaryProcessor.java index 94a8013..a699e03 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/ContentSummaryProcessor.java +++ b/src/main/java/com/microsoft/azure/datalake/store/ContentSummaryProcessor.java @@ -5,13 +5,8 @@ */ package com.microsoft.azure.datalake.store; - - -import com.microsoft.azure.datalake.store.retrypolicies.ExponentialBackoffPolicy; - import java.io.IOException; import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicLong; /** diff --git a/src/main/java/com/microsoft/azure/datalake/store/Core.java b/src/main/java/com/microsoft/azure/datalake/store/Core.java index fe9e8aa..d03631b 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/Core.java +++ b/src/main/java/com/microsoft/azure/datalake/store/Core.java @@ -526,7 +526,7 @@ public static ContentSummary getContentSummary(String path, jp.nextToken(); while (jp.hasCurrentToken()) { if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { - fieldName = jp.getCurrentName(); + fieldName = jp.currentName(); jp.nextToken(); fieldValue = jp.getText(); @@ -723,7 +723,7 @@ public static DirectoryEntry getFileStatus(String path, jp.nextToken(); while (jp.hasCurrentToken()) { if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { - fieldName = jp.getCurrentName(); + fieldName = jp.currentName(); jp.nextToken(); fieldValue = jp.getText(); @@ -943,7 +943,7 @@ static DirectoryEntryListWithContinuationToken listStatusWithToken(String path, list.add(entry); } if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { - fieldName = jp.getCurrentName(); + fieldName = jp.currentName(); jp.nextToken(); fieldValue = jp.getText(); @@ -1116,6 +1116,13 @@ public static void setPermission(String path, } private static final Pattern octalPattern = Pattern.compile("[01]?[0-7]?[0-7]?[0-7]"); + + /** + * Checks whether the supplied string is a valid octal permission specification understood by the service. + * + * @param input candidate permission string to validate. + * @return {@code true} when the input matches the expected octal pattern; otherwise {@code false}. + */ public static boolean isValidOctal(String input) { return octalPattern.matcher(input).matches(); } @@ -1417,7 +1424,7 @@ public static AclStatus getAclStatus(String path, jp.nextToken(); while (jp.hasCurrentToken()) { if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { - fieldName = jp.getCurrentName(); + fieldName = jp.currentName(); if (fieldName.equals("entries")) { jp.nextToken(); // START_ARRAY - [ jp.nextToken(); diff --git a/src/main/java/com/microsoft/azure/datalake/store/DirectoryEntry.java b/src/main/java/com/microsoft/azure/datalake/store/DirectoryEntry.java index 40350c8..bc5a289 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/DirectoryEntry.java +++ b/src/main/java/com/microsoft/azure/datalake/store/DirectoryEntry.java @@ -88,6 +88,23 @@ public class DirectoryEntry { final String fileContextId ; + /** + * Creates a directory entry populated with the metadata returned from the service. + * + * @param name short name of the entry without the parent path. + * @param fullName fully qualified path of the entry. + * @param length length of the file in bytes. + * @param group owning group identifier. + * @param user owning user identifier. + * @param lastAccessTime timestamp of the last access operation. + * @param lastModifiedTime timestamp of the last modification. + * @param type entry type (file or directory). + * @param blocksize logical block size reported by the service. + * @param replicationFactor replication factor reported by the service. + * @param permission unix-style permission string for the entry. + * @param aclBit {@code true} when ACLs are set on the entry. + * @param expiryTime time at which the file expires, or {@code null} if none. + */ public DirectoryEntry(String name, String fullName, long length, diff --git a/src/main/java/com/microsoft/azure/datalake/store/DirectoryEntryType.java b/src/main/java/com/microsoft/azure/datalake/store/DirectoryEntryType.java index e461f4d..191757c 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/DirectoryEntryType.java +++ b/src/main/java/com/microsoft/azure/datalake/store/DirectoryEntryType.java @@ -8,9 +8,11 @@ /** - * enum to indicate whether a directory entry is a file or a directory. + * Indicates whether a directory entry represents a file or a directory. */ public enum DirectoryEntryType { + /** Entry represents a file. */ FILE, + /** Entry represents a directory. */ DIRECTORY } diff --git a/src/main/java/com/microsoft/azure/datalake/store/HttpTransport.java b/src/main/java/com/microsoft/azure/datalake/store/HttpTransport.java index 821e024..6128a61 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/HttpTransport.java +++ b/src/main/java/com/microsoft/azure/datalake/store/HttpTransport.java @@ -33,6 +33,10 @@ */ class HttpTransport { + private HttpTransport() { + throw new AssertionError("Utility class"); + } + private static final String API_VERSION = "2018-12-01"; // API version used in REST requests private static final Logger log = LoggerFactory.getLogger("com.microsoft.azure.datalake.store.HttpTransport"); private static final Logger tokenlog = LoggerFactory.getLogger("com.microsoft.azure.datalake.store.HttpTransport.tokens"); @@ -87,6 +91,10 @@ static void makeCall (ADLStoreClient client, opts.requestid = clientRequestId + "." + Integer.toString(retryCount); resp.reset(); long start = System.nanoTime(); + if (log.isInfoEnabled()) { + log.info("ADLS request start op={} path={} clientId={} timeout={}ms retry={}" , + op.name, path, client.getClientId(), opts.timeout, retryCount); + } makeSingleCall(client, op, path, queryParams, requestBody, offsetWithinContentsArray, length, opts, resp); resp.lastCallLatency = System.nanoTime() - start; resp.lastCallLatency = resp.lastCallLatency / 1000000; // convert from nanoseconds to milliseconds @@ -112,6 +120,10 @@ static void makeCall (ADLStoreClient client, retryCount++; resp.exceptionHistory = resp.exceptionHistory == null ? error : resp.exceptionHistory + "," + error; } + if (log.isInfoEnabled()) { + log.info("ADLS request end op={} path={} clientId={} status={} httpCode={} latency={}ms error={}", + op.name, path, client.getClientId(), outcome, resp.httpResponseCode, resp.lastCallLatency, error); + } if (log.isDebugEnabled()) { String logline = "HTTPRequest," + outcome + @@ -241,7 +253,7 @@ private static void makeSingleCall(ADLStoreClient client, URL url; try { - url = new URL(urlString.toString()); + url = URI.create(urlString.toString()).toURL(); } catch (MalformedURLException ex) { resp.ex = ex; resp.successful = false; @@ -371,7 +383,7 @@ private static void getCodesFromJSon(InputStream s, OperationResponse resp) { jp.nextToken(); while (jp.hasCurrentToken()) { if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { - fieldName = jp.getCurrentName(); + fieldName = jp.currentName(); jp.nextToken(); fieldValue = jp.getText(); diff --git a/src/main/java/com/microsoft/azure/datalake/store/OperationResponse.java b/src/main/java/com/microsoft/azure/datalake/store/OperationResponse.java index ddb9de8..fa2de7d 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/OperationResponse.java +++ b/src/main/java/com/microsoft/azure/datalake/store/OperationResponse.java @@ -16,6 +16,12 @@ * */ public class OperationResponse { + /** + * Creates a response container with default values indicating a successful call. + */ + public OperationResponse() { + } + /** * whether the request was successful. Callers should always check for success before using any return value from * any of the calls. diff --git a/src/main/java/com/microsoft/azure/datalake/store/QueryParams.java b/src/main/java/com/microsoft/azure/datalake/store/QueryParams.java index a3e441a..bca246c 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/QueryParams.java +++ b/src/main/java/com/microsoft/azure/datalake/store/QueryParams.java @@ -9,7 +9,6 @@ import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.Hashtable; -import java.util.Map; /** @@ -23,18 +22,48 @@ public class QueryParams { String separator = ""; String serializedString = null; + /** + * Creates an empty query parameter collection. + */ + public QueryParams() { + } + + /** + * Adds a name/value pair to the query string. + * + * @param name parameter name to include. + * @param value parameter value associated with the name. + */ public void add(String name, String value) { params.put(name, value); serializedString = null; } + /** + * Sets the ADLS operation whose name should be included in the query string. + * + * @param op operation to encode. + */ public void setOp(Operation op) { this.op = op; serializedString = null; } - public void setApiVersion(String apiVersion) { this.apiVersion = apiVersion; serializedString = null; } + /** + * Sets the API version that should be appended to the query string. + * + * @param apiVersion service API version identifier. + */ + public void setApiVersion(String apiVersion) { + this.apiVersion = apiVersion; + serializedString = null; + } + /** + * Serializes the configured parameters into a URL query string suitable for HTTP calls. + * + * @return serialized representation of the query parameters. + */ public String serialize() { if (serializedString == null) { StringBuilder sb = new StringBuilder(); diff --git a/src/main/java/com/microsoft/azure/datalake/store/ReadBufferManager.java b/src/main/java/com/microsoft/azure/datalake/store/ReadBufferManager.java index d283848..1285499 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/ReadBufferManager.java +++ b/src/main/java/com/microsoft/azure/datalake/store/ReadBufferManager.java @@ -47,7 +47,6 @@ class ReadBufferManager { private final static int numThreads = 8; private final static int thresholdAgeMilliseconds = 3000; // have to see if 3 seconds is a good threshold - private Thread[] threads = new Thread[numThreads]; private byte[][] buffers; // array of byte[] buffers, to hold the data that is read private Stack freeList = new Stack(); // indices in buffers[] array that are available @@ -72,9 +71,8 @@ private void init() { freeList.add(i); } for (int i = 0; i < numThreads; i++) { - Thread t = new Thread(new ReadBufferWorker(i)); + Thread t = new Thread(new ReadBufferWorker()); t.setDaemon(true); - threads[i] = t; t.setName("ADLS-prefetch-" + i); t.start(); } diff --git a/src/main/java/com/microsoft/azure/datalake/store/ReadBufferWorker.java b/src/main/java/com/microsoft/azure/datalake/store/ReadBufferWorker.java index 1f14a63..a50848e 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/ReadBufferWorker.java +++ b/src/main/java/com/microsoft/azure/datalake/store/ReadBufferWorker.java @@ -1,10 +1,6 @@ package com.microsoft.azure.datalake.store; -import com.microsoft.azure.datalake.store.ReadBuffer; -import com.microsoft.azure.datalake.store.ReadBufferManager; -import com.microsoft.azure.datalake.store.ReadBufferStatus; - import java.util.concurrent.CountDownLatch; /** @@ -16,10 +12,7 @@ class ReadBufferWorker implements Runnable { static final CountDownLatch unleashWorkers = new CountDownLatch(1); - private int id; - - ReadBufferWorker(int id) { - this.id = id; + ReadBufferWorker() { } /** diff --git a/src/main/java/com/microsoft/azure/datalake/store/RequestOptions.java b/src/main/java/com/microsoft/azure/datalake/store/RequestOptions.java index 7bf2e7e..8827fb3 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/RequestOptions.java +++ b/src/main/java/com/microsoft/azure/datalake/store/RequestOptions.java @@ -13,6 +13,12 @@ * common options to control the behavior of server calls */ public class RequestOptions { + /** + * Creates a new set of request options populated with default values. + */ + public RequestOptions() { + } + /** * the timeout (in milliseconds) to use for the request. This is used for both * the readTimeout and the connectTimeout for the request, so diff --git a/src/main/java/com/microsoft/azure/datalake/store/SSLSocketFactoryEx.java b/src/main/java/com/microsoft/azure/datalake/store/SSLSocketFactoryEx.java index dc27dd9..9d19963 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/SSLSocketFactoryEx.java +++ b/src/main/java/com/microsoft/azure/datalake/store/SSLSocketFactoryEx.java @@ -16,8 +16,11 @@ import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; +import java.security.Provider; import java.security.SecureRandom; import java.util.ArrayList; import java.util.logging.Level; @@ -28,13 +31,15 @@ */ public class SSLSocketFactoryEx extends SSLSocketFactory { + /** + * Defines which SSL implementation should be used for outbound connections. + */ public enum SSLChannelMode { + /** Always use the native OpenSSL provider. */ OpenSSL, - /** - * Ordered, preferred OpenSSL, if failed to load then fall back to - * Default_JSE - */ + /** Prefer the native OpenSSL provider, falling back to the JDK implementation on failure. */ Default, + /** Always use the JDK SSL implementation. */ Default_JSE } @@ -47,6 +52,13 @@ public enum SSLChannelMode { private String[] m_ciphers; private SSLChannelMode channelMode; + /** + * Returns a cached {@link SSLSocketFactoryEx} configured for the requested SSL channel mode. + * + * @param sslChannelMode SSL implementation preference. + * @return shared socket factory instance. + * @throws IOException when the SSL context cannot be initialized. + */ public static SSLSocketFactoryEx getDefaultFactory(SSLChannelMode sslChannelMode) throws IOException { if (instance == null) { synchronized (lock) { @@ -63,6 +75,12 @@ public static SSLSocketFactoryEx getDefaultFactory(SSLChannelMode sslChannelMode OpenSSLProvider.register(); } + /** + * Creates a new socket factory honoring the specified SSL channel mode. + * + * @param channelMode SSL implementation preference. + * @throws IOException when the SSL context cannot be initialized. + */ public SSLSocketFactoryEx(SSLChannelMode channelMode) throws IOException { this.channelMode = channelMode; try { @@ -73,9 +91,15 @@ public SSLSocketFactoryEx(SSLChannelMode channelMode) throws IOException { throw new IOException(e); } - userAgent = m_ctx.getProvider().getName() + "-" + m_ctx.getProvider().getVersion(); + Provider provider = m_ctx.getProvider(); + userAgent = provider.getName() + "-" + resolveProviderVersion(provider); } + /** + * Returns the user-agent string describing the active SSL provider. + * + * @return user-agent fragment identifying the SSL provider. + */ public String getUserAgent() { return userAgent; } @@ -205,4 +229,23 @@ private String[] alterCipherList(String[] defaultCiphers) { m_ciphers = preferredSuits.toArray(new String[0]); return m_ciphers; } + + private String resolveProviderVersion(Provider provider) { + try { + Method versionMethod = Provider.class.getMethod("getVersionStr"); + Object version = versionMethod.invoke(provider); + if (version != null) { + return version.toString(); + } + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) { + // fall back to other mechanisms when version string is unavailable + } + + Object versionProperty = provider.get("ProviderVersion"); + if (versionProperty != null) { + return versionProperty.toString(); + } + + return provider.getInfo(); + } } diff --git a/src/main/java/com/microsoft/azure/datalake/store/Utils.java b/src/main/java/com/microsoft/azure/datalake/store/Utils.java index 686b0ff..4585836 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/Utils.java +++ b/src/main/java/com/microsoft/azure/datalake/store/Utils.java @@ -5,8 +5,6 @@ */ package com.microsoft.azure.datalake.store; - -import com.microsoft.azure.datalake.store.retrypolicies.ExponentialBackoffPolicy; import com.microsoft.azure.datalake.store.retrypolicies.NonIdempotentRetryPolicy; import java.io.*; diff --git a/src/main/java/com/microsoft/azure/datalake/store/acl/AclAction.java b/src/main/java/com/microsoft/azure/datalake/store/acl/AclAction.java index a6ad726..8f8f863 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/acl/AclAction.java +++ b/src/main/java/com/microsoft/azure/datalake/store/acl/AclAction.java @@ -8,21 +8,33 @@ /** * Specifies the possible combinations of actions allowed in an ACL. - * */ public enum AclAction { + /** No permissions are granted. */ NONE ("---"), + /** Execute permission only. */ EXECUTE ("--x"), + /** Write permission only. */ WRITE ("-w-"), + /** Write and execute permissions. */ WRITE_EXECUTE ("-wx"), + /** Read permission only. */ READ ("r--"), + /** Read and execute permissions. */ READ_EXECUTE ("r-x"), + /** Read and write permissions. */ READ_WRITE ("rw-"), + /** All permissions (read, write, and execute). */ ALL ("rwx"); private final String rwx; private static final AclAction[] values = AclAction.values(); + /** + * Creates a new {@code AclAction} with the provided rwx representation. + * + * @param rwx Unix-style permission triplet corresponding to the action. + */ AclAction(String rwx) { this.rwx = rwx; } diff --git a/src/main/java/com/microsoft/azure/datalake/store/acl/AclEntry.java b/src/main/java/com/microsoft/azure/datalake/store/acl/AclEntry.java index 8713f5f..d766439 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/acl/AclEntry.java +++ b/src/main/java/com/microsoft/azure/datalake/store/acl/AclEntry.java @@ -10,18 +10,12 @@ import java.util.List; /** - * Contains one ACL entry. An ACL entry consists of a scope (access or default), - * the type of the ACL (user, group, other or mask), the name of the user or group - * associated with this ACL (can be blank to specify the default permissions for - * users and groups, and must be blank for mask entries), and the action permitted - * by this ACL entry. - *

- * An ACL for an object consists of a {@code List} of Acl entries. - *

+ * Represents a single ACL entry including scope, type, optional user or group name, and the + * permissions granted by that entry. *

- * This class also provides a number of convenience methods for converting ACL entries - * and ACLs to and back from strings. - *

+ * Instances of this class can be converted to or created from POSIX-style ACL strings to simplify + * integration with command-line tooling and service APIs. + *

*/ public class AclEntry { /** @@ -50,8 +44,10 @@ public class AclEntry { */ public AclAction action; + /** + * Creates an empty ACL entry that can be populated programmatically before use. + */ public AclEntry() { - } /** diff --git a/src/main/java/com/microsoft/azure/datalake/store/acl/AclScope.java b/src/main/java/com/microsoft/azure/datalake/store/acl/AclScope.java index 17b6679..b412519 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/acl/AclScope.java +++ b/src/main/java/com/microsoft/azure/datalake/store/acl/AclScope.java @@ -8,9 +8,11 @@ /** - * The scope of an ACL Entry (access or default). + * Scope of an ACL entry, indicating whether permissions apply to the resource itself or to newly created children. */ public enum AclScope { + /** Applies directly to the target resource. */ ACCESS, + /** Applies to child resources created under the target directory. */ DEFAULT } diff --git a/src/main/java/com/microsoft/azure/datalake/store/acl/AclStatus.java b/src/main/java/com/microsoft/azure/datalake/store/acl/AclStatus.java index 3f480ab..fc0bd73 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/acl/AclStatus.java +++ b/src/main/java/com/microsoft/azure/datalake/store/acl/AclStatus.java @@ -10,11 +10,16 @@ import java.util.List; /** - * Object returned by the {@link com.microsoft.azure.datalake.store.ADLStoreClient#getAclStatus(String)} getAclStatus} - * call, that contains the Acl and Permission information for that file or directory. - * + * Result containing the ACL and permission details for a given file or directory as returned by + * {@link com.microsoft.azure.datalake.store.ADLStoreClient#getAclStatus(String)}. */ public class AclStatus { + /** + * Creates an empty status container whose fields can be populated by a service response. + */ + public AclStatus() { + } + /** * {@code List} containing the list of Acl entries for a file */ diff --git a/src/main/java/com/microsoft/azure/datalake/store/acl/AclType.java b/src/main/java/com/microsoft/azure/datalake/store/acl/AclType.java index 7d2e638..8190aab 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/acl/AclType.java +++ b/src/main/java/com/microsoft/azure/datalake/store/acl/AclType.java @@ -8,11 +8,15 @@ /** - * Type of Acl entry (user, group, other, or mask). + * Entity type targeted by an ACL entry (user, group, other, or mask). */ public enum AclType { + /** ACL entry applies to a specific user. */ USER, + /** ACL entry applies to members of a specific group. */ GROUP, + /** ACL entry applies to all users not otherwise matched. */ OTHER, + /** ACL entry defines the mask (effective permissions) for named entries. */ MASK } diff --git a/src/main/java/com/microsoft/azure/datalake/store/oauth2/AccessTokenProvider.java b/src/main/java/com/microsoft/azure/datalake/store/oauth2/AccessTokenProvider.java deleted file mode 100644 index f27fb13..0000000 --- a/src/main/java/com/microsoft/azure/datalake/store/oauth2/AccessTokenProvider.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - * See License.txt in the project root for license information. - */ - -package com.microsoft.azure.datalake.store.oauth2; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Date; - -/** - * Returns an Azure Active Directory token when requested. The provider can cache the token if it has already - * retrieved one. If it does, then the provider is responsible for checking expiry and refreshing as needed. - *

- * In other words, this is is a token cache that fetches tokens when requested, if the cached token has expired. - *

- */ -public abstract class AccessTokenProvider { - - protected AzureADToken token; - private static final Logger log = LoggerFactory.getLogger("com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider"); - - /** - * returns the {@link AzureADToken} cached (or retrieved) by this instance. - * - * @return {@link AzureADToken} containing the access token - * @throws IOException if there is an error fetching the token - */ - public synchronized AzureADToken getToken() throws IOException { - if (isTokenAboutToExpire()) { - log.debug("AAD Token is missing or expired: Calling refresh-token from abstract base class"); - token = refreshToken(); - } - return token; - } - - /** - * the method to fetch the access token. Derived classes should override this method to - * actually get the token from Azure Active Directory. - *

- * This method will be called initially, and then once when the token is about to expire. - *

- * - * - * @return {@link AzureADToken} containing the access token - * @throws IOException if there is an error fetching the token - */ - protected abstract AzureADToken refreshToken() throws IOException; - - /** - * Checks if the token is about to expire in the next 5 minutes. The 5 minute allowance is to - * allow for clock skew and also to allow for token to be refreshed in that much time. - * - * - * @return true if the token is expiring in next 5 minutes - */ - protected boolean isTokenAboutToExpire() { - if (token==null) { - log.debug("AADToken: no token. Returning expiring=true"); - return true; // no token should have same response as expired token - } - if (token.expiry == null) { - log.debug("AADToken: no token expiry set. Returning expiring=true"); - return true; // if don't know expiry then assume expired (should not happen with a correctly implemented token) - } - long offset = FIVE_MINUTES; - long approximatelyNow = System.currentTimeMillis() + offset; // allow x minutes for clock skew, depends on type of provider - boolean expiring = (token.expiry.getTime() < approximatelyNow); - if (expiring) { - log.debug("AADToken: token expiring: " + token.expiry.toString() + " : " + offset + " milliseconds window: " + new Date(approximatelyNow).toString()); - } - - return expiring; - } - private static final long FIVE_MINUTES = 300 * 1000; // 5 minutes in milliseconds -} diff --git a/src/main/java/com/microsoft/azure/datalake/store/oauth2/AzureADAuthenticator.java b/src/main/java/com/microsoft/azure/datalake/store/oauth2/AzureADAuthenticator.java deleted file mode 100644 index 02313e5..0000000 --- a/src/main/java/com/microsoft/azure/datalake/store/oauth2/AzureADAuthenticator.java +++ /dev/null @@ -1,388 +0,0 @@ -/* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - * See License.txt in the project root for license information. - */ - -package com.microsoft.azure.datalake.store.oauth2; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.microsoft.azure.datalake.store.QueryParams; -import com.microsoft.azure.datalake.store.retrypolicies.ExponentialBackoffPolicy; -import com.microsoft.azure.datalake.store.retrypolicies.ExponentialBackoffPolicyforMSI; -import com.microsoft.azure.datalake.store.retrypolicies.RetryPolicy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.Date; -import java.util.Hashtable; -import java.util.UUID; - -/** - * This class provides convenience methods to obtain AAD tokens. While convenient, it is not necessary to - * use these methods to obtain the tokens. Customers can use any other method (e.g., using the adal4j client) - * to obtain tokens. - */ - -public class AzureADAuthenticator { - - private static final Logger log = LoggerFactory.getLogger(AzureADAuthenticator.class.getName()); - static final String resource = "https://datalake.azure.net/"; - - /** - * gets Azure Active Directory token using the user ID and password of a service principal (that is, Web App - * in Azure Active Directory). - *

- * Azure Active Directory allows users to set up a web app as a service principal. Users can optionally - * obtain service principal keys from AAD. This method gets a token using a service principal's client ID - * and keys. In addition, it needs the token endpoint associated with the user's directory. - *

- * - * - * @param authEndpoint the OAuth 2.0 token endpoint associated with the user's directory - * (obtain from Active Directory configuration) - * @param clientId the client ID (GUID) of the client web app obtained from Azure Active Directory configuration - * @param clientSecret the secret key of the client web app - * @return {@link AzureADToken} obtained using the creds - * @throws IOException throws IOException if there is a failure in connecting to Azure AD - */ - public static AzureADToken getTokenUsingClientCreds(String authEndpoint, String clientId, String clientSecret) - throws IOException - { - QueryParams qp = new QueryParams(); - - qp.add("resource", resource); - qp.add("grant_type","client_credentials"); - qp.add("client_id", clientId); - qp.add("client_secret", clientSecret); - log.debug("AADToken: starting to fetch token using client creds for client ID " + clientId ); - - return getTokenCall(authEndpoint, qp.serialize(), null, null, null); - } - - - /** - * Gets AAD token from the local virtual machine's VM extension. This only works on an Azure VM with MSI extension - * enabled. - * - * @deprecated Deprecated, use the other overloads instead. With the change to the way MSI is done in - * Azure Active Directory, the parameters on this call (localPort) are not relevant anymore. - * - * @param localPort port at which the MSI extension is running. If 0 or negative number is specified, then assume - * default port number of 50342. - * @param tenantGuid (optional) The guid of the AAD tenant. Can be {@code null}. - * @return {@link AzureADToken} obtained using the creds - * @throws IOException throws IOException if there is a failure in obtaining the token - */ - @Deprecated - public static AzureADToken getTokenFromMsi(int localPort, String tenantGuid) throws IOException { - return getTokenFromMsi(tenantGuid, null, false); - } - - /** - * Gets AAD token from the local virtual machine's VM extension. This only works on an Azure VM with MSI extension - * enabled. - * - * @param tenantGuid (optional) The guid of the AAD tenant. Can be {@code null}. - * @param clientId (optional) The clientId guid of the MSI service principal to use. Can be {@code null}. - * @param bypassCache {@code boolean} specifying whether a cached token is acceptable or a fresh token - * request should me made to AAD - * @return {@link AzureADToken} obtained using the creds - * @throws IOException throws IOException if there is a failure in obtaining the token - */ - public static AzureADToken getTokenFromMsi(String tenantGuid, String clientId, boolean bypassCache) throws IOException { - String authEndpoint = "http://169.254.169.254/metadata/identity/oauth2/token"; - - QueryParams qp = new QueryParams(); - qp.add("api-version", "2018-02-01"); - qp.add("resource", resource); - - - if (tenantGuid != null && tenantGuid.length() > 0) { - String authority = "https://login.microsoftonline.com/" + tenantGuid; - qp.add("authority", authority); - } - - if (clientId != null && clientId.length() > 0) { - qp.add("client_id", clientId); - } - - if (bypassCache) { - qp.add("bypass_cache", "true"); - } - - Hashtable headers = new Hashtable(); - headers.put("Metadata", "true"); - - RetryPolicy retryPolicy = new ExponentialBackoffPolicyforMSI(3, 1000, 2); - - log.debug("AADToken: starting to fetch token using MSI"); - return getTokenCall(authEndpoint, qp.serialize(), headers, "GET", retryPolicy); - } - - /** - * gets Azure Active Directory token using refresh token - * - * @param clientId the client ID (GUID) of the client web app obtained from Azure Active Directory configuration - * @param refreshToken the refresh token - * @return {@link AzureADToken} obtained using the refresh token - * @throws IOException throws IOException if there is a failure in connecting to Azure AD - */ - public static AzureADToken getTokenUsingRefreshToken(String clientId, String refreshToken) - throws IOException - { - String authEndpoint = "https://login.microsoftonline.com/Common/oauth2/token"; - - QueryParams qp = new QueryParams(); - qp.add("grant_type", "refresh_token"); - qp.add("refresh_token", refreshToken); - if (clientId != null) qp.add("client_id", clientId); - log.debug("AADToken: starting to fetch token using refresh token for client ID " + clientId ); - - return getTokenCall(authEndpoint, qp.serialize(), null, null, null); - } - - /** - * gets Azure Active Directory token using the user's username and password. This only - * works if the identity can be authenticated directly by microsoftonline.com. It will likely - * not work if the domain is federated and/or multi-factor authentication or other form of - * strong authentication is configured for the user. - *

- * @deprecated - * Due to security concerns with user ID and password,this auth method is deprecated. Please use - * device code authentication instead for interactive user-based authentication. - *

- * @param clientId the client ID (GUID) of the client web app obtained from Azure Active Directory configuration - * @param username the user name of the user - * @param password the password of the user - * @return {@link AzureADToken} obtained using the user's creds - * @throws IOException throws IOException if there is a failure in connecting to Azure AD - */ - @Deprecated - public static AzureADToken getTokenUsingUserCreds(String clientId, String username, String password) - throws IOException - { - String authEndpoint = "https://login.microsoftonline.com/Common/oauth2/token"; - - QueryParams qp = new QueryParams(); - qp.add("grant_type", "password"); - qp.add("resource", resource); - qp.add("scope", "openid"); - qp.add("client_id", clientId); - qp.add("username",username); - qp.add("password",password); - log.debug("AADToken: starting to fetch token using username for user " + username ); - - return getTokenCall(authEndpoint, qp.serialize(), null, null, null); - } - - private static class HttpException extends IOException { - public int httpErrorCode; - public String requestId; - - public HttpException(int httpErrorCode, String requestId, String message) { - super(message); - this.httpErrorCode = httpErrorCode; - this.requestId = requestId; - } - } - - private static AzureADToken getTokenCall(String authEndpoint, String body, Hashtable headers, String httpMethod, RetryPolicy retryPolicy) - throws IOException { - AzureADToken token = null; - - RetryPolicy retryPolicyUsed; - retryPolicyUsed = (retryPolicy != null) ? - retryPolicy : new ExponentialBackoffPolicy(3, 0, 1000, 2); - - int httperror = 0; - String requestId; - String httpExceptionMessage = null; - IOException ex = null; - boolean succeeded = false; - String clientRequestId = UUID.randomUUID().toString(); - int retryCount = 0; - if(headers == null){ - headers = new Hashtable(); - } - do { - httperror = 0; - ex = null; - try { - headers.put("client-request-id", clientRequestId + "."+retryCount); - token = getTokenSingleCall(authEndpoint, body, headers, httpMethod); - retryCount++; - } catch (HttpException e) { - httperror = e.httpErrorCode; - requestId = e.requestId; - httpExceptionMessage = e.getMessage(); - } catch (IOException e) { - ex = e; - } - succeeded = ((httperror == 0) && (ex == null)); - } while (!succeeded && retryPolicyUsed.shouldRetry(httperror, ex)); - if (!succeeded) { - if (ex != null) throw ex; - if (httperror!=0) throw new IOException(httpExceptionMessage); - } - return token; - } - - private static AzureADToken getTokenSingleCall(String authEndpoint, String payload, Hashtable headers, String httpMethod) - throws IOException { - - AzureADToken token = null; - HttpURLConnection conn = null; - String urlString = authEndpoint; - - httpMethod = (httpMethod == null) ? "POST" : httpMethod; - if (httpMethod.equals("GET")) { - urlString = urlString + "?" + payload; - } - long startTime = System.nanoTime(); - long totalTime = 0; - - try { - URL url = new URL(urlString); - conn = (HttpURLConnection) url.openConnection(); - conn.setRequestMethod(httpMethod); - conn.setReadTimeout(30000); - conn.setConnectTimeout(30000); - - if (headers != null && headers.size() > 0) { - for (String name : headers.keySet()) { - conn.setRequestProperty(name, headers.get(name)); - } - } - - conn.setRequestProperty("Connection", "close"); - - if (httpMethod.equals("POST")) { - conn.setDoOutput(true); - conn.getOutputStream().write(payload.getBytes("UTF-8")); - } - - int httpResponseCode = conn.getResponseCode(); - String requestId = conn.getHeaderField("x-ms-request-id"); - String responseContentType = conn.getHeaderField("Content-Type"); - long responseContentLength = conn.getHeaderFieldLong("Content-Length", 0); - requestId = requestId == null ? "" : requestId; - - if (httpResponseCode == 200 && responseContentType.startsWith("application/json") && responseContentLength > 0) { - InputStream httpResponseStream = conn.getInputStream(); - token = parseTokenFromStream(httpResponseStream); - - if (log.isDebugEnabled()) { - totalTime = System.nanoTime() - startTime; - String logMessage = - "AADToken: HTTP connection succeeded for getting token from AzureAD. Http response: " - + httpResponseCode + " " + conn.getResponseMessage() - + " Content-Type: " + responseContentType - + " Content-Length: " + responseContentLength - + " Request ID: " + requestId.toString() - + " Client Request Id: " + headers.get("client-request-id") - + " Latency(ns) : " + totalTime; - log.debug(logMessage); - } - } else { - String responseBody = consumeInputStream(conn.getInputStream(), 1024); - totalTime = System.nanoTime() - startTime; - String proxies = "none"; - String httpProxy=System.getProperty("http.proxy"); - String httpsProxy=System.getProperty("https.proxy"); - if (httpProxy!=null || httpsProxy!=null) { - proxies = "http:" + httpProxy + ";https:" + httpsProxy; - } - String logMessage = - "AADToken: HTTP connection failed for getting token from AzureAD. Http response: " - + httpResponseCode + " " + conn.getResponseMessage() - + " Content-Type: " + responseContentType - + " Content-Length: " + responseContentLength - + " Request ID: " + requestId.toString() - + " Client Request Id: " + headers.get("client-request-id") - + " Latency(ns) : " + totalTime - + " Proxies: " + proxies - + " First 1K of Body: " + responseBody; - log.debug(logMessage); - throw new HttpException(httpResponseCode, requestId, logMessage); - } - } catch(IOException e) { - totalTime = System.nanoTime() - startTime; - String logMessage = - "AADToken: HTTP connection failed for getting token from AzureAD due to timeout. " - + " Client Request Id :" + headers.get("client-request-id") - + " Latency(ns) : " + totalTime; - log.debug(logMessage); - throw new IOException(logMessage, e); - } - finally { - if (conn != null) conn.disconnect(); - } - return token; - } - - private static AzureADToken parseTokenFromStream(InputStream httpResponseStream) throws IOException { - AzureADToken token = new AzureADToken(); - try { - int expiryPeriodRelative = 0; - long expiryPeriodActual = -1; - JsonFactory jf = new JsonFactory(); - JsonParser jp = jf.createParser(httpResponseStream); - String fieldName, fieldValue; - jp.nextToken(); - while (jp.hasCurrentToken()) { - if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { - fieldName = jp.getCurrentName(); - jp.nextToken(); // field value - fieldValue = jp.getText(); - - if (fieldName.equals("access_token")) token.accessToken = fieldValue; - if (fieldName.equals("expires_in")) expiryPeriodRelative = Integer.parseInt(fieldValue); - if (fieldName.equals("expires_on")) expiryPeriodActual = Integer.parseInt(fieldValue); - } - jp.nextToken(); - } - jp.close(); - boolean expiresOn = false; - if(expiryPeriodActual>0) { - // convert expiryPeriodActual to milliseconds - token.expiry = new Date(expiryPeriodActual*1000); - expiresOn=true; - } - else { - long expiry = System.currentTimeMillis(); - expiry = expiry + expiryPeriodRelative * 1000L; // convert expiryPeriod to milliseconds and add - token.expiry = new Date(expiry); - } - log.debug("AADToken: fetched token with expiry " + token.expiry.toString() + " expiresOn passed: "+expiresOn); - } catch (Exception ex) { - log.debug("AADToken: got exception when parsing json token " + ex.toString()); - throw ex; - } finally { - httpResponseStream.close(); - } - return token; - } - - private static String consumeInputStream(InputStream inStream, int length) throws IOException { - byte[] b = new byte[length]; - int totalBytesRead = 0; - int bytesRead = 0; - - do { - bytesRead = inStream.read(b, totalBytesRead, length - totalBytesRead); - if (bytesRead > 0) { - totalBytesRead += bytesRead; - } - } while (bytesRead >= 0 && totalBytesRead < length); - - return new String(b, 0, totalBytesRead); - } -} - - diff --git a/src/main/java/com/microsoft/azure/datalake/store/oauth2/AzureADToken.java b/src/main/java/com/microsoft/azure/datalake/store/oauth2/AzureADToken.java deleted file mode 100644 index 2c19626..0000000 --- a/src/main/java/com/microsoft/azure/datalake/store/oauth2/AzureADToken.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - * See License.txt in the project root for license information. - */ - -package com.microsoft.azure.datalake.store.oauth2; - -import java.util.Date; - - -/** - * Object represnting the AAD access token to use when making HTTP requests to Azure Data Lake Storage. - */ -public class AzureADToken { - public String accessToken; - public Date expiry; -} \ No newline at end of file diff --git a/src/main/java/com/microsoft/azure/datalake/store/oauth2/ClientCredsTokenProvider.java b/src/main/java/com/microsoft/azure/datalake/store/oauth2/ClientCredsTokenProvider.java deleted file mode 100644 index dabe857..0000000 --- a/src/main/java/com/microsoft/azure/datalake/store/oauth2/ClientCredsTokenProvider.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - * See License.txt in the project root for license information. - */ - -package com.microsoft.azure.datalake.store.oauth2; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * Provides tokens based on client credentials - */ -public class ClientCredsTokenProvider extends AccessTokenProvider { - - private static final Logger log = LoggerFactory.getLogger("com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider"); - private final String authEndpoint, clientId, clientSecret; - - /** - * constructs a token provider based on supplied credentials. - * - * @param authEndpoint the OAuth 2.0 token endpoint associated with the user's directory - * (obtain from Active Directory configuration) - * @param clientId the client ID (GUID) of the client web app obtained from Azure Active Directory configuration - * @param clientSecret the secret key of the client web app - */ - public ClientCredsTokenProvider(String authEndpoint, String clientId, String clientSecret) { - this.authEndpoint = authEndpoint; - this.clientId = clientId; - this.clientSecret = clientSecret; - } - - @Override - protected AzureADToken refreshToken() throws IOException { - log.debug("AADToken: refreshing client-credential based token"); - return AzureADAuthenticator.getTokenUsingClientCreds(authEndpoint, clientId, clientSecret); - } -} diff --git a/src/main/java/com/microsoft/azure/datalake/store/oauth2/DeviceCodeCallback.java b/src/main/java/com/microsoft/azure/datalake/store/oauth2/DeviceCodeCallback.java deleted file mode 100644 index 7e020a0..0000000 --- a/src/main/java/com/microsoft/azure/datalake/store/oauth2/DeviceCodeCallback.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - * See License.txt in the project root for license information. - */ - -package com.microsoft.azure.datalake.store.oauth2; - -/** - * - * Shows the login message for device code to user. The default implementation shows on the console. - * Subclasses can override the {@link DeviceCodeCallback#showDeviceCodeMessage(DeviceCodeInfo)} method to - * display the message in a different way, appropriate for the context the program is running in. - * - */ -class DeviceCodeCallback { - - private static DeviceCodeCallback defaultInstance = new DeviceCodeCallback(); - - /** - * Show the message to the user, instructing them to log in using the browser. - * This method displays the message on standard output; subclasses may display - * it differently. - * - * @param dcInfo {@link DeviceCodeInfo} object containing the info to display - */ - public void showDeviceCodeMessage(DeviceCodeInfo dcInfo) { - System.out.println(dcInfo.message); - } - - /** - * Returns an instance of the default {@link DeviceCodeCallback} - * - * @return an instance of the default {@link DeviceCodeCallback} - */ - public static DeviceCodeCallback getDefaultInstance() { - return defaultInstance; - } - -} - diff --git a/src/main/java/com/microsoft/azure/datalake/store/oauth2/DeviceCodeTokenProvider.java b/src/main/java/com/microsoft/azure/datalake/store/oauth2/DeviceCodeTokenProvider.java deleted file mode 100644 index 8c9b88e..0000000 --- a/src/main/java/com/microsoft/azure/datalake/store/oauth2/DeviceCodeTokenProvider.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - * See License.txt in the project root for license information. - */ - -package com.microsoft.azure.datalake.store.oauth2; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * - * Enables interactive login in non-browser based contexts. Displays a message asking the user to login using - * the browser, using the provided link and code. - * - * - */ -public class DeviceCodeTokenProvider extends AccessTokenProvider { - private static final Logger log = LoggerFactory.getLogger("com.microsoft.azure.datalake.store.oauth2.DeviceCodeTokenProvider"); - private RefreshTokenBasedTokenProvider tokenProviderInternal = null; - private String refreshTokenString = null; - - /** - * Prompts user to log in and constructs a tokenProvider based on the refresh token obtained from the login. - * - * @param appId the app ID whose name Azure AD will display on the login screen - * @throws IOException in case of errors - */ - - public DeviceCodeTokenProvider(String appId) throws IOException { - this(appId, null); - } - - /** - * Prompts user to log in and constructs a tokenProvider based on the refresh token obtained from the login. - * - * @param appId the app ID whose name Azure AD will display on the login screen. Can be null. - * If not provided, a default appId will be used. - * @param callback callback that can display the message to the user on how to login. Can be null. - * If not provided, the default callback will be used which diplays the message on standard output. - * @throws IOException in case of errors - */ - public DeviceCodeTokenProvider(String appId, DeviceCodeCallback callback) throws IOException { - if (appId == null || appId.trim().length() == 0) throw new IllegalArgumentException("appId is required"); - if (callback == null) callback = DeviceCodeCallback.getDefaultInstance(); - - RefreshTokenInfo token = DeviceCodeTokenProviderHelper.getRefreshToken(appId, callback); - refreshTokenString = token.refreshToken; - tokenProviderInternal = new RefreshTokenBasedTokenProvider(null, token); - } - - @Override - protected AzureADToken refreshToken() throws IOException { - return tokenProviderInternal.refreshToken(); - } - - public String getRefreshToken() { - return refreshTokenString; - } - -} diff --git a/src/main/java/com/microsoft/azure/datalake/store/oauth2/DeviceCodeTokenProviderHelper.java b/src/main/java/com/microsoft/azure/datalake/store/oauth2/DeviceCodeTokenProviderHelper.java deleted file mode 100644 index bf688bb..0000000 --- a/src/main/java/com/microsoft/azure/datalake/store/oauth2/DeviceCodeTokenProviderHelper.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - * See License.txt in the project root for license information. - */ - -package com.microsoft.azure.datalake.store.oauth2; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.microsoft.azure.datalake.store.QueryParams; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.Date; - -/** - * Internal use. Contains device code info. - */ -class DeviceCodeInfo { - String usercode; - String verificationUrl; - String message; - String devicecode; - int pollingInterval; - Date expiry; - String clientId; -} - - -/** - * Internal use. Methods to obtain the refresh token using interactive login (using device code AAD flow). - * - */ -class DeviceCodeTokenProviderHelper { - - /* - AAD .net sample: https://azure.microsoft.com/en-us/resources/samples/active-directory-dotnet-deviceprofile/ - How this stuff works: https://developers.google.com/identity/protocols/OAuth2ForDevices?hl=en - */ - - private static final Logger log = LoggerFactory.getLogger("com.microsoft.azure.datalake.store.oauth2.DeviceCodeTokenProvider"); - private static final String defaultAppId = "c8964590-6116-42e6-8a29-ec0865dff3d5"; - private static final String resource = AzureADAuthenticator.resource; - private static final String deviceCodeUrl = "https://login.microsoftonline.com/common/oauth2/devicecode"; - private static final String tokenUrl = "https://login.microsoftonline.com/common/oauth2/token"; - - public static RefreshTokenInfo getRefreshToken(String appId, DeviceCodeCallback callback) throws IOException { - if (appId == null) appId = defaultAppId; - if (callback == null) callback = DeviceCodeCallback.getDefaultInstance(); - - DeviceCodeInfo dcInfo = getDeviceCodeInfo(appId); - log.debug("AADToken: obtained device code, prompting user to login through browser"); - callback.showDeviceCodeMessage(dcInfo); - RefreshTokenInfo token = getTokenFromDeviceCode(dcInfo); - log.debug("AADToken: obtained refresh token from device-code based user login"); - return token; - } - - - private static DeviceCodeInfo getDeviceCodeInfo(String appId) throws IOException { - - QueryParams qp = new QueryParams(); - qp.add("resource", resource); - qp.add("client_id", appId); - String queryString = qp.serialize(); - - URL url = new URL(deviceCodeUrl + "?" + queryString); - HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn.setRequestMethod("GET"); - - DeviceCodeInfo dcInfo = new DeviceCodeInfo(); - dcInfo.clientId = appId; - int httpResponseCode = conn.getResponseCode(); - if (httpResponseCode == 200) { - InputStream httpResponseStream = conn.getInputStream(); - try { - int expiryPeriod = 0; - - JsonFactory jf = new JsonFactory(); - JsonParser jp = jf.createParser(httpResponseStream); - String fieldName, fieldValue; - jp.nextToken(); - while (jp.hasCurrentToken()) { - if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { - fieldName = jp.getCurrentName(); - jp.nextToken(); // field value - fieldValue = jp.getText(); - - if (fieldName.equals("user_code")) dcInfo.usercode = fieldValue; - if (fieldName.equals("device_code")) dcInfo.devicecode = fieldValue; - if (fieldName.equals("verification_url")) dcInfo.verificationUrl = fieldValue; - if (fieldName.equals("message")) dcInfo.message = fieldValue; - if (fieldName.equals("expires_in")) expiryPeriod = Integer.parseInt(fieldValue); - if (fieldName.equals("interval")) dcInfo.pollingInterval = Integer.parseInt(fieldValue); - } - jp.nextToken(); - } - jp.close(); - long expiry = System.currentTimeMillis(); - expiry = expiry + expiryPeriod * 1000L; // convert expiryPeriod to milliseconds and add - dcInfo.expiry = new Date(expiry); - } finally { - httpResponseStream.close(); - } - } else { - String message = "Failed to get device code from AzureAD. Http response: " + httpResponseCode + " " + conn.getResponseMessage(); - log.debug(message); - throw new IOException(message); - } - log.debug("Obtained device code from AAD: " + dcInfo.usercode); - return dcInfo; - } - - private static RefreshTokenInfo getTokenFromDeviceCode(final DeviceCodeInfo dcInfo) throws IOException { - RefreshTokenInfo refreshToken = null; - int sleepDuration = (dcInfo.pollingInterval + 1) * 1000; - while (dcInfo.expiry.getTime() > (new Date()).getTime() && refreshToken == null) { - try { - Thread.sleep(sleepDuration); - refreshToken = getTokenInternal(dcInfo.devicecode, dcInfo.clientId); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); // http://www.ibm.com/developerworks/library/j-jtp05236/ - } catch (Exception ex) { - log.debug("Exception getting token from device code " + ex.toString()); - throw ex; - } - } - return refreshToken; - } - - private static RefreshTokenInfo getTokenInternal(final String deviceCode, final String clientId) throws IOException { - QueryParams qp = new QueryParams(); - qp.add("resource", resource); - qp.add("client_id", clientId); - qp.add("grant_type", "device_code"); - qp.add("code", deviceCode); - String bodyString = qp.serialize(); - - URL url = new URL(tokenUrl); - HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn.setRequestMethod("POST"); - conn.setDoOutput(true); - conn.getOutputStream().write(bodyString.getBytes("UTF-8")); - - RefreshTokenInfo token = new RefreshTokenInfo(); - String tokentype = null; - String scope = null; - int httpResponseCode = conn.getResponseCode(); - if (httpResponseCode == 200) { - InputStream httpResponseStream = conn.getInputStream(); - try { - int expiryPeriod = 0; - JsonFactory jf = new JsonFactory(); - JsonParser jp = jf.createParser(httpResponseStream); - String fieldName, fieldValue; - jp.nextToken(); - while (jp.hasCurrentToken()) { - if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { - fieldName = jp.getCurrentName(); - jp.nextToken(); // field value - fieldValue = jp.getText(); - - if (fieldName.equals("token_type")) tokentype = fieldValue; - if (fieldName.equals("scope")) scope = fieldValue; - if (fieldName.equals("expires_in")) expiryPeriod = Integer.parseInt(fieldValue); - if (fieldName.equals("access_token")) token.accessToken = fieldValue; - if (fieldName.equals("refresh_token")) token.refreshToken = fieldValue; - } - jp.nextToken(); - } - jp.close(); - - if (!"Bearer".equals(tokentype) || !"user_impersonation".equals(scope) ) { - throw new IOException("not sure what kind of token we got"); - } - - long expiry = System.currentTimeMillis(); - expiry = expiry + expiryPeriod * 1000L; // convert expiryPeriod to milliseconds and add - token.accessTokenExpiry = new Date(expiry); - return token; - } catch (Exception ex) { - log.debug("Exception retrieving token from AAD response" + ex.toString()); - throw ex; - } finally { - httpResponseStream.close(); - } - } else if (httpResponseCode == 400) { - InputStream httpResponseStream = conn.getErrorStream(); - try { - String error = null; - - JsonFactory jf = new JsonFactory(); - JsonParser jp = jf.createParser(httpResponseStream); - String fieldName, fieldValue; - jp.nextToken(); - while (jp.hasCurrentToken()) { - if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { - fieldName = jp.getCurrentName(); - jp.nextToken(); // field value - fieldValue = jp.getText(); - - if (fieldName.equals("error")) error = fieldValue; - } - jp.nextToken(); - } - jp.close(); - - if (!"authorization_pending".equals(error)) { - String message = "Failed to acquire token from AzureAD. Http response: " + httpResponseCode + " Error: " + error; - log.debug(message); - throw new IOException(message); - } else { - log.debug("polled AAD for token, got authorization_pending (still waiting for user to complete login)"); - } - } finally { - httpResponseStream.close(); - } - } else { - String message = "Failed to acquire token from AzureAD. Http response: " + httpResponseCode + " " + conn.getResponseMessage(); - log.debug(message); - throw new IOException(message); - } - return null; - } - - private static void printDeviceCodeInfo(DeviceCodeInfo dcInfo) { - System.out.println("UserCode: " + dcInfo.usercode); - System.out.println("VerificationUrl: " + dcInfo.verificationUrl); - System.out.println("Polling Interval: " + dcInfo.devicecode); - System.out.println("Expires: " + dcInfo.expiry); - System.out.println("Message: " + dcInfo.message); - System.out.println("Devicecode: " + dcInfo.devicecode); - System.out.println(); - } -} diff --git a/src/main/java/com/microsoft/azure/datalake/store/oauth2/MsiTokenProvider.java b/src/main/java/com/microsoft/azure/datalake/store/oauth2/MsiTokenProvider.java deleted file mode 100644 index 74022f6..0000000 --- a/src/main/java/com/microsoft/azure/datalake/store/oauth2/MsiTokenProvider.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - * See License.txt in the project root for license information. - */ - -package com.microsoft.azure.datalake.store.oauth2; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * Provides tokens based on Azure VM's Managed Service Identity - */ -public class MsiTokenProvider extends AccessTokenProvider { - - private static final Logger log = LoggerFactory.getLogger("com.microsoft.azure.datalake.store.oauth2.MsiTokenProvider"); - private final int localPort = -1; - private final String tenantGuid; - private final String clientId; - private long tokenFetchTime =-1; - /** - * Constructs a token provider that fetches tokens from the MSI token-service running on an Azure IaaS VM. This - * only works on an Azure VM with the MSI extansion enabled. - */ - public MsiTokenProvider() { - this(null, null); - } - - /** - * Constructs a token provider that fetches tokens from the MSI token-service running on an Azure IaaS VM. This - * only works on an Azure VM with the MSI extansion enabled. - * - * @deprecated localPort is not relevant anymore in the new MSI mechanism - * - * @param localPort port on localhost for the MSI token service. (the port that was set in the deployment template). - * If 0 or negative number is specified, then assume default port number of 50342. - */ - @Deprecated - public MsiTokenProvider(int localPort) { - this(null, null); - } - - /** - * Constructs a token provider that fetches tokens from the MSI token-service running on an Azure IaaS VM. This - * only works on an Azure VM with the MSI extansion enabled. - * - * @deprecated localPort is not relevant anymore in the new MSI mechanism - * - * @param localPort port on localhost for the MSI token service. (the port that was set in the deployment template). - * If 0 or negative number is specified, then assume default port number of 50342. - * @param tenantGuid (optional) AAD Tenant ID {@code guid}. Can be {@code null}. - */ - @Deprecated - public MsiTokenProvider(int localPort, String tenantGuid) { - this(tenantGuid, null); - } - - public MsiTokenProvider(String tenantGuid, String clientId) { - this.tenantGuid = tenantGuid; - this.clientId = clientId; - } - - /** - * Checks if the token is about to expire as per base expiry logic. Otherwise try to expire every 1 hour - * - * - * @return true if the token is expiring in next 5 minutes - */ - @Override - protected boolean isTokenAboutToExpire() { - if( super.isTokenAboutToExpire()){ - return true; - } - if (tokenFetchTime == -1){ - return true; - } - long offset = ONE_HOUR; - if ((tokenFetchTime +offset) < System.currentTimeMillis()) { - log.debug("MSIToken: token renewing : " + offset + " milliseconds window"); - return true; - } - - return false; - } - - @Override - protected AzureADToken refreshToken() throws IOException { - log.debug("AADToken: refreshing token from MSI with expiry"); - AzureADToken newToken = AzureADAuthenticator.getTokenFromMsi(tenantGuid, clientId, false); - tokenFetchTime=System.currentTimeMillis(); - return newToken; - } - private static final long ONE_HOUR = 3600 * 1000; // 5 minutes in milliseconds -} \ No newline at end of file diff --git a/src/main/java/com/microsoft/azure/datalake/store/oauth2/RefreshTokenBasedTokenProvider.java b/src/main/java/com/microsoft/azure/datalake/store/oauth2/RefreshTokenBasedTokenProvider.java deleted file mode 100644 index 06e6077..0000000 --- a/src/main/java/com/microsoft/azure/datalake/store/oauth2/RefreshTokenBasedTokenProvider.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - * See License.txt in the project root for license information. - */ - -package com.microsoft.azure.datalake.store.oauth2; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * Provides tokens based on refresh token - */ -public class RefreshTokenBasedTokenProvider extends AccessTokenProvider { - - private static final Logger log = LoggerFactory.getLogger("com.microsoft.azure.datalake.store.oauth2.RefreshTokenBasedTokenProvider"); - private final String clientId, refreshToken; - - /** - * constructs a token provider based on the refresh token provided - * - * @param refreshToken the refresh token - */ - public RefreshTokenBasedTokenProvider(String refreshToken) { - this.clientId = null; - this.refreshToken = refreshToken; - } - - /** - * constructs a token provider based on the refresh token provided - * - * @param clientId the client ID (GUID) of the client web app obtained from Azure Active Directory configuration - * @param refreshToken the refresh token - */ - public RefreshTokenBasedTokenProvider(String clientId, String refreshToken) { - this.clientId = clientId; - this.refreshToken = refreshToken; - } - - /** - * constructs a token provider based on the refresh token provided - * - * @param clientId the client ID (GUID) of the client web app obtained from Azure Active Directory configuration - * @param refreshToken the refresh token - */ - public RefreshTokenBasedTokenProvider(String clientId, RefreshTokenInfo refreshToken) { - this.clientId = clientId; - this.refreshToken = refreshToken.refreshToken; - if (refreshToken.accessToken != null && - !refreshToken.accessToken.equals("") && - refreshToken.accessTokenExpiry != null) { - this.token = new AzureADToken(); - this.token.accessToken = refreshToken.accessToken; - this.token.expiry = refreshToken.accessTokenExpiry; - } - } - - @Override - protected AzureADToken refreshToken() throws IOException { - log.debug("AADToken: refreshing refresh-token based token"); - return AzureADAuthenticator.getTokenUsingRefreshToken(clientId, refreshToken); - } -} diff --git a/src/main/java/com/microsoft/azure/datalake/store/oauth2/RefreshTokenInfo.java b/src/main/java/com/microsoft/azure/datalake/store/oauth2/RefreshTokenInfo.java deleted file mode 100644 index 6b39fa8..0000000 --- a/src/main/java/com/microsoft/azure/datalake/store/oauth2/RefreshTokenInfo.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - * See License.txt in the project root for license information. - */ - -package com.microsoft.azure.datalake.store.oauth2; - - -import java.util.Date; - -/** - * Information about the refresh token, and the associated access token - * - */ -public class RefreshTokenInfo { - public String accessToken; - public String refreshToken; - public Date accessTokenExpiry; -} \ No newline at end of file diff --git a/src/main/java/com/microsoft/azure/datalake/store/oauth2/UserPasswordTokenProvider.java b/src/main/java/com/microsoft/azure/datalake/store/oauth2/UserPasswordTokenProvider.java deleted file mode 100644 index 7d7924a..0000000 --- a/src/main/java/com/microsoft/azure/datalake/store/oauth2/UserPasswordTokenProvider.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.microsoft.azure.datalake.store.oauth2; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * Provides tokens based on username and password - */ -public class UserPasswordTokenProvider extends AccessTokenProvider { - - private static final Logger log = LoggerFactory.getLogger("com.microsoft.azure.datalake.store.oauth2.UserPasswordTokenProvider"); - private final String clientId, username, password; - - /** - * constructs a token provider based on supplied credentials. - * - * @param username the username - * @param clientId the client ID (GUID) obtained from Azure Active Directory configuration - * @param password the password - */ - public UserPasswordTokenProvider(String clientId, String username, String password) { - this.clientId = clientId; - this.username = username; - this.password = password; - } - - @Override - protected AzureADToken refreshToken() throws IOException { - log.debug("AADToken: refreshing user-password based token"); - return AzureADAuthenticator.getTokenUsingClientCreds(clientId, username, password); - } -} diff --git a/src/main/java/com/microsoft/azure/datalake/store/oauth2/package-info.java b/src/main/java/com/microsoft/azure/datalake/store/oauth2/package-info.java deleted file mode 100644 index 9c5c413..0000000 --- a/src/main/java/com/microsoft/azure/datalake/store/oauth2/package-info.java +++ /dev/null @@ -1,10 +0,0 @@ -/* - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. - * See License.txt in the project root for license information. - */ - -/** - * Classes to help fetch and manage Azure ActiveDirectory tokens - */ -package com.microsoft.azure.datalake.store.oauth2; \ No newline at end of file diff --git a/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/ExponentialBackoffPolicy.java b/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/ExponentialBackoffPolicy.java index 9243547..62ac3b7 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/ExponentialBackoffPolicy.java +++ b/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/ExponentialBackoffPolicy.java @@ -23,26 +23,57 @@ public class ExponentialBackoffPolicy implements RetryPolicy { private int exponentialFactor = 4; private long lastAttemptStartTime = System.nanoTime(); + /** + * Creates a policy using the default retry counts and intervals. + */ public ExponentialBackoffPolicy() { } /** + * Creates a policy configured with a custom retry count and base backoff interval. + * * @param maxRetries maximum number of retries - * @param linearRetryInterval interval to use for linear retries (in milliseconds). - * Deprecated, not used in the retry policy. * @param exponentialRetryInterval (starting) interval to use for exponential backoff retries (in milliseconds) */ - public ExponentialBackoffPolicy(int maxRetries, @Deprecated int linearRetryInterval, int exponentialRetryInterval) { - this.maxRetries = maxRetries; - this.exponentialRetryInterval = exponentialRetryInterval; + public ExponentialBackoffPolicy(int maxRetries, int exponentialRetryInterval) { + this(maxRetries, 0, exponentialRetryInterval); } - public ExponentialBackoffPolicy(int maxRetries, @Deprecated int linearRetryInterval, int exponentialRetryInterval, int exponentialFactor) { + /** + * Creates a policy configured with a custom retry count and interval while retaining a now-ignored linear retry parameter. + * + * @param maxRetries maximum number of retries. + * @param linearRetryInterval retained for backwards compatibility and ignored. + * @param exponentialRetryInterval (starting) interval to use for exponential backoff retries (in milliseconds). + */ + public ExponentialBackoffPolicy(int maxRetries, int linearRetryInterval, int exponentialRetryInterval) { + if (linearRetryInterval < 0) { + throw new IllegalArgumentException("linearRetryInterval must be non-negative."); + } this.maxRetries = maxRetries; this.exponentialRetryInterval = exponentialRetryInterval; + } + + /** + * Creates a policy with custom retry count, interval, and multiplicative factor applied after each attempt. + * + * @param maxRetries maximum number of retries. + * @param linearRetryInterval retained for backwards compatibility and ignored. + * @param exponentialRetryInterval (starting) interval to use for exponential backoff retries (in milliseconds). + * @param exponentialFactor multiplier applied to the retry interval after each retry. + */ + public ExponentialBackoffPolicy(int maxRetries, int linearRetryInterval, int exponentialRetryInterval, int exponentialFactor) { + this(maxRetries, linearRetryInterval, exponentialRetryInterval); this.exponentialFactor = exponentialFactor; } + /** + * Determines whether another retry should be attempted for the supplied HTTP response code and exception. + * + * @param httpResponseCode HTTP status code received from the service. + * @param lastException exception thrown during the last attempt, if any. + * @return {@code true} when a retry should be attempted; otherwise {@code false}. + */ public boolean shouldRetry(int httpResponseCode, Exception lastException) { // Non-retryable error diff --git a/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/ExponentialBackoffPolicyforMSI.java b/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/ExponentialBackoffPolicyforMSI.java index eab8efe..7aa3e60 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/ExponentialBackoffPolicyforMSI.java +++ b/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/ExponentialBackoffPolicyforMSI.java @@ -23,6 +23,9 @@ public class ExponentialBackoffPolicyforMSI implements RetryPolicy { private int exponentialFactor = 4; private long lastAttemptStartTime = System.nanoTime(); + /** + * Creates a policy using the default retry counts and intervals. + */ public ExponentialBackoffPolicyforMSI() { } @@ -38,7 +41,13 @@ public ExponentialBackoffPolicyforMSI(int maxRetries, int exponentialRetryInterv this.exponentialFactor = exponentialFactor; } - + /** + * Determines whether another retry should be attempted for the supplied HTTP response code and exception. + * + * @param httpResponseCode HTTP status code received from the service. + * @param lastException exception thrown during the last attempt, if any. + * @return {@code true} when a retry should be attempted; otherwise {@code false}. + */ public boolean shouldRetry(int httpResponseCode, Exception lastException) { // Non-retryable error diff --git a/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/NoRetryPolicy.java b/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/NoRetryPolicy.java index ad835ae..dd1c2de 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/NoRetryPolicy.java +++ b/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/NoRetryPolicy.java @@ -21,6 +21,19 @@ public class NoRetryPolicy implements RetryPolicy { private int waitInterval = 100; + /** + * Creates a retry policy that retries at most once for an initial 401 response. + */ + public NoRetryPolicy() { + } + + /** + * Determines whether another retry should be attempted for the supplied HTTP response code and exception. + * + * @param httpResponseCode HTTP status code received from the service. + * @param lastException exception thrown during the last attempt, if any. + * @return {@code true} when a retry should be attempted; otherwise {@code false}. + */ public boolean shouldRetry(int httpResponseCode, Exception lastException) { if (httpResponseCode == 401 && retryCount == 0) { // to mitigate a special problem with intermittent 401's on calls diff --git a/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/NonIdempotentRetryPolicy.java b/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/NonIdempotentRetryPolicy.java index c5b4c32..8ef6a20 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/NonIdempotentRetryPolicy.java +++ b/src/main/java/com/microsoft/azure/datalake/store/retrypolicies/NonIdempotentRetryPolicy.java @@ -26,6 +26,19 @@ public class NonIdempotentRetryPolicy implements RetryPolicy { private int exponentialFactor = 4; + /** + * Creates a retry policy that only retries once for 401 responses and a limited number of times for 429 responses. + */ + public NonIdempotentRetryPolicy() { + } + + /** + * Determines whether another retry should be attempted for the supplied HTTP response code and exception. + * + * @param httpResponseCode HTTP status code received from the service. + * @param lastException exception thrown during the last attempt, if any. + * @return {@code true} when a retry should be attempted; otherwise {@code false}. + */ public boolean shouldRetry(int httpResponseCode, Exception lastException) { if (httpResponseCode == 401 && retryCount401 == 0) { // this could be because of call delay. Just retry once, in hope of token being renewed by now diff --git a/src/test/configfiles/.gitignore b/src/test/configfiles/.gitignore index 8b13789..600d2d3 100644 --- a/src/test/configfiles/.gitignore +++ b/src/test/configfiles/.gitignore @@ -1 +1 @@ - +.vscode \ No newline at end of file diff --git a/src/test/java/com/contoso/helpers/HelperUtils.java b/src/test/java/com/contoso/helpers/HelperUtils.java index 072ecf6..5b64042 100644 --- a/src/test/java/com/contoso/helpers/HelperUtils.java +++ b/src/test/java/com/contoso/helpers/HelperUtils.java @@ -7,15 +7,23 @@ package com.contoso.helpers; +import com.azure.core.credential.TokenCredential; +import com.azure.identity.ClientSecretCredentialBuilder; +import com.azure.identity.DefaultAzureCredentialBuilder; import com.microsoft.azure.datalake.store.*; import com.microsoft.azure.datalake.store.retrypolicies.ExponentialBackoffPolicy; import java.io.*; +import java.net.URI; import java.security.SecureRandom; import java.util.Properties; public class HelperUtils { + private HelperUtils() { + throw new AssertionError("Utility class"); + } + private static Properties prop = null; public static Properties getProperties() throws IOException { if (prop==null) { @@ -27,6 +35,32 @@ public static Properties getProperties() throws IOException { return prop; } + public static TokenCredential buildCredential(Properties prop) { + String clientId = trimOrNull(prop.getProperty("ClientId")); + String clientSecret = trimOrNull(prop.getProperty("ClientSecret")); + String tenantId = trimOrNull(prop.getProperty("TenantId")); + if (tenantId == null) { + tenantId = tryParseTenantId(prop.getProperty("OAuth2TokenUrl")); + } + + if (clientId != null && clientSecret != null && tenantId != null) { + ClientSecretCredentialBuilder builder = new ClientSecretCredentialBuilder() + .clientId(clientId) + .clientSecret(clientSecret) + .tenantId(tenantId); + String authorityHost = trimOrNull(prop.getProperty("AuthorityHost")); + if (authorityHost == null) { + authorityHost = tryParseAuthorityHost(prop.getProperty("OAuth2TokenUrl")); + } + if (authorityHost != null) { + builder.authorityHost(authorityHost); + } + return builder.build(); + } + + return new DefaultAzureCredentialBuilder().build(); + } + private static byte[] buf4mb = null; public static byte[] getRandom4mbBuffer() { if (buf4mb == null) { @@ -102,5 +136,52 @@ public static byte[] getSampleText2() { // length of returned array is 159 bytes } + private static String trimOrNull(String value) { + if (value == null) { + return null; + } + String trimmed = value.trim(); + return trimmed.isEmpty() ? null : trimmed; + } + + private static String tryParseTenantId(String oauthUrl) { + if (oauthUrl == null) { + return null; + } + try { + URI uri = URI.create(oauthUrl.trim()); + String path = uri.getPath(); + if (path == null) { + return null; + } + String[] segments = path.split("/"); + for (String segment : segments) { + if (segment != null && !segment.isEmpty()) { + if ("oauth2".equalsIgnoreCase(segment) || "token".equalsIgnoreCase(segment)) { + continue; + } + return segment; + } + } + } catch (IllegalArgumentException ignored) { + } + return null; + } + + private static String tryParseAuthorityHost(String oauthUrl) { + if (oauthUrl == null) { + return null; + } + try { + URI uri = URI.create(oauthUrl.trim()); + if (uri.getScheme() == null || uri.getHost() == null) { + return null; + } + return uri.getScheme() + "://" + uri.getHost(); + } catch (IllegalArgumentException ignored) { + return null; + } + } + } diff --git a/src/test/java/com/contoso/liveservicetests/TestCore.java b/src/test/java/com/contoso/liveservicetests/TestCore.java index 1c331bf..da18545 100644 --- a/src/test/java/com/contoso/liveservicetests/TestCore.java +++ b/src/test/java/com/contoso/liveservicetests/TestCore.java @@ -6,10 +6,9 @@ package com.contoso.liveservicetests; +import com.azure.core.credential.TokenCredential; import com.contoso.helpers.HelperUtils; import com.microsoft.azure.datalake.store.*; -import com.microsoft.azure.datalake.store.oauth2.AzureADAuthenticator; -import com.microsoft.azure.datalake.store.oauth2.AzureADToken; import com.microsoft.azure.datalake.store.retrypolicies.NonIdempotentRetryPolicy; import org.junit.AfterClass; @@ -32,7 +31,7 @@ public class TestCore { final UUID instanceGuid = UUID.randomUUID(); static Properties prop = null; - static AzureADToken aadToken = null; + static TokenCredential credential = null; static String directory = null; static ADLStoreClient client = null; static boolean testsEnabled = true; @@ -41,13 +40,14 @@ public class TestCore { @BeforeClass public static void setup() throws IOException { prop = HelperUtils.getProperties(); - aadToken = AzureADAuthenticator.getTokenUsingClientCreds(prop.getProperty("OAuth2TokenUrl"), - prop.getProperty("ClientId"), - prop.getProperty("ClientSecret") ); - UUID guid = UUID.randomUUID(); + credential = HelperUtils.buildCredential(prop); + String storeAccount = prop.getProperty("StoreAcct"); + if (storeAccount == null || storeAccount.trim().isEmpty()) { + Assume.assumeTrue("Skipping TestCore live tests", false); + } directory = "/" + prop.getProperty("dirName") + "/" + UUID.randomUUID(); - String account = prop.getProperty("StoreAcct") + ".azuredatalakestore.net"; - client = ADLStoreClient.createClient(account, aadToken); + String account = storeAccount + ".azuredatalakestore.net"; + client = ADLStoreClient.createClient(account, credential); testsEnabled = Boolean.parseBoolean(prop.getProperty("CoreTestsEnabled", "true")); } @@ -91,7 +91,6 @@ public void createSmallFileWithNoOverwrite() throws IOException { @Test public void createEmptyFileWithConcurrentAppend() throws IOException { - Assume.assumeTrue(false); // pending change to server behavior String filename = directory + "/" + "Core.createEmptyFileWithConcurrentAppend.txt"; System.out.println("Running createEmptyFileWithConcurrentAppend"); @@ -109,14 +108,13 @@ public void createEmptyFileWithConcurrentAppend() throws IOException { assertTrue("File type should be FILE", de.type == DirectoryEntryType.FILE); assertTrue("File length in DirectoryEntry should be 0 for null-content file", de.length == 0); - byte[] b = getFileContents(filename, contents.length * 2); + byte[] b = getFileContents(filename, 100); assertTrue("file length should be 0 for null content", b.length == 0); } @Test public void create0LengthFileWithConcurrentAppend() throws IOException { - Assume.assumeTrue(false); // pending change to server behavior String filename = directory + "/" + "Core.create0LengthFileWithConcurrentAppend.txt"; System.out.println("Running create0LengthFileWithConcurrentAppend"); @@ -234,7 +232,6 @@ public void concurrentAppendToExistingFile() throws IOException { @Test public void create4MBFile() throws IOException { - Assume.assumeTrue(false); // subsumed by TestFileSdk tests String filename = directory + "/" + "Core.Create4MBFile.txt"; System.out.println("Running create4MBFile"); @@ -248,7 +245,6 @@ public void create4MBFile() throws IOException { @Test public void create5MBFile() throws IOException { - Assume.assumeTrue(false); // subsumed by TestFileSdk tests String filename = directory + "/" + "Core.Create5MBFile.txt"; System.out.println("Running create5MBFile"); @@ -315,11 +311,15 @@ private byte[] getFileContents(String filename, int maxLength) throws IOExceptio } if (!resp.successful) throw client.getExceptionFromResponse(resp, "Error reading from file " + filename); int bytesRead; + int countBeforeRead = count; while ((bytesRead = in.read(b, count, b.length - count)) != -1) { count += bytesRead; if (count >= b.length) break; } in.close(); + if (count == countBeforeRead) { + eof = true; + } } byte[] b2 = Arrays.copyOfRange(b, 0, count); return b2; diff --git a/src/test/java/com/contoso/liveservicetests/TestFileSdk.java b/src/test/java/com/contoso/liveservicetests/TestFileSdk.java index 867748e..4fdd6f4 100644 --- a/src/test/java/com/contoso/liveservicetests/TestFileSdk.java +++ b/src/test/java/com/contoso/liveservicetests/TestFileSdk.java @@ -6,11 +6,10 @@ package com.contoso.liveservicetests; +import com.azure.core.credential.TokenCredential; import com.contoso.helpers.HelperUtils; import com.microsoft.azure.datalake.store.*; -import com.microsoft.azure.datalake.store.oauth2.AzureADAuthenticator; -import com.microsoft.azure.datalake.store.oauth2.AzureADToken; import com.microsoft.azure.datalake.store.retrypolicies.ExponentialBackoffPolicy; import org.junit.AfterClass; import org.junit.Assume; @@ -30,35 +29,35 @@ import java.lang.reflect.InvocationTargetException; import java.net.URISyntaxException; import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.regex.Pattern; @RunWith(Enclosed.class) public class TestFileSdk { - private final UUID instanceGuid = UUID.randomUUID(); - private static String directory = null; private static ADLStoreClient client = null; private static boolean testsEnabled = true; private static boolean symlinkTestsDisabled = true; + private TestFileSdk() { + throw new AssertionError("Utility class"); + } + @BeforeClass public static void setup() throws IOException { Properties prop = HelperUtils.getProperties(); - boolean useMsiForAuth = Boolean.parseBoolean(prop.getProperty("useMsi","false")); - AzureADToken aadToken; - if(useMsiForAuth){ - aadToken = AzureADAuthenticator.getTokenFromMsi(prop.getProperty("OAuth2TokenUrl"), prop.getProperty("ClientId"),false); - } - else { - aadToken = AzureADAuthenticator.getTokenUsingClientCreds(prop.getProperty("OAuth2TokenUrl"), - prop.getProperty("ClientId"), - prop.getProperty("ClientSecret")); + testsEnabled = Boolean.parseBoolean(prop.getProperty("SdkTestsEnabled", "true")); + String storeAccount = prop.getProperty("StoreAcct"); + if (!testsEnabled || storeAccount == null || storeAccount.trim().isEmpty()) { + Assume.assumeTrue("Skipping TestFileSdk live tests", false); } - UUID guid = UUID.randomUUID(); + TokenCredential credential = HelperUtils.buildCredential(prop); directory = "/" + prop.getProperty("dirName") + "/" + UUID.randomUUID(); - String account = prop.getProperty("StoreAcct") + ".azuredatalakestore.net"; - client = ADLStoreClient.createClient(account, aadToken.accessToken); - testsEnabled = Boolean.parseBoolean(prop.getProperty("SdkTestsEnabled", "true")); + String account = storeAccount + ".azuredatalakestore.net"; + client = ADLStoreClient.createClient(account, credential); client.createDirectory(directory); client.removeAllAcls(directory); @@ -70,10 +69,13 @@ public static void setup() throws IOException { @AfterClass public static void teardown() throws IOException { - client.deleteRecursive(directory); + if (client != null && directory != null) { + client.deleteRecursive(directory); + } } public static class NotParameterizedTests { + private static final int MAX_FILE_CREATION_THREADS = 16; @Test public void createDirectory() throws IOException { Assume.assumeTrue(testsEnabled); @@ -931,11 +933,9 @@ public void deleteDirectoryNonRecursive() throws IOException { @Test public void enumerateDirectory() throws IOException { Assume.assumeTrue(testsEnabled); - Assume.assumeTrue(false); // disable test for now; this is very long-running String dirname = directory + "/" + "enumerateDirectory"; System.out.println("Running enumerateDirectory"); - List list; // non-existent directory try { @@ -952,7 +952,6 @@ public void enumerateDirectory() throws IOException { list = client.enumerateDirectory(dirname, 1); assertTrue("empty directory should return 0 entries even with listsize", list.size() == 0); - // directory with single file String fn = dirname + "/f0001.txt"; HelperUtils.createEmptyFile(client, fn); @@ -977,10 +976,11 @@ public void enumerateDirectory() throws IOException { assertTrue("directory of 2 should return 1 entry with startAfter", list.size() == 1); // 1000-file directory - for (int i = 3; i<=1000; i++) { - fn = dirname + "/f" + String.format("%04d", i); - HelperUtils.createEmptyFile(client, fn); + List filesToCreate = new ArrayList<>(998); + for (int i = 3; i <= 1000; i++) { + filesToCreate.add(dirname + "/f" + String.format("%04d", i)); } + createFilesInParallel(filesToCreate, name -> HelperUtils.createEmptyFile(client, name)); list = client.enumerateDirectory(dirname); assertTrue("directory of 1000 should return 1000 entries", list.size() == 1000); list = client.enumerateDirectory(dirname, 1); @@ -995,10 +995,11 @@ public void enumerateDirectory() throws IOException { assertTrue("directory of 1000 should return 500 entries with startAfter f0500", list.size() == 500); // 4000-file directory - for (int i = 1001; i<=4000; i++) { - fn = dirname + "/f" + String.format("%04d", i); - HelperUtils.createEmptyFile(client, fn); + filesToCreate = new ArrayList<>(3000); + for (int i = 1001; i <= 4000; i++) { + filesToCreate.add(dirname + "/f" + String.format("%04d", i)); } + createFilesInParallel(filesToCreate, name -> HelperUtils.createEmptyFile(client, name)); list = client.enumerateDirectory(dirname); assertTrue("directory of 4000 should return 4000 entries", list.size() == 4000); list = client.enumerateDirectory(dirname, 1); @@ -1147,6 +1148,44 @@ public void pathPrefix() throws IOException, URISyntaxException { // delete file client.delete(fn4); } + + // Fan out large batches of file creation to shrink end-to-end test time. + private static void createFilesInParallel(List filenames, ThrowingConsumer fileCreator) throws IOException { + if (filenames == null || filenames.isEmpty()) { + return; + } + int targetThreads = Math.max(1, Runtime.getRuntime().availableProcessors()); + int threads = Math.min(MAX_FILE_CREATION_THREADS, Math.min(filenames.size(), targetThreads * 2)); + ExecutorService executor = Executors.newFixedThreadPool(threads); + try { + List> futures = new ArrayList<>(filenames.size()); + for (String name : filenames) { + futures.add(executor.submit(() -> { + fileCreator.accept(name); + return null; + })); + } + for (Future future : futures) { + future.get(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while creating files in parallel", e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } + throw new IOException("Failed to create files in parallel", cause); + } finally { + executor.shutdown(); + } + } + + @FunctionalInterface + private interface ThrowingConsumer { + void accept(T value) throws IOException; + } } @RunWith(value = Parameterized.class) public static class TestCreateWithOverwrite { @@ -1158,6 +1197,7 @@ public static Collection data() { } @Parameter public boolean useConditionalCreate; + @Test public void smallFileWithOverwrite() throws IOException { Assume.assumeTrue(testsEnabled); diff --git a/src/test/java/com/contoso/liveservicetests/TestPositionedReads.java b/src/test/java/com/contoso/liveservicetests/TestPositionedReads.java index 2ecb096..5486f03 100644 --- a/src/test/java/com/contoso/liveservicetests/TestPositionedReads.java +++ b/src/test/java/com/contoso/liveservicetests/TestPositionedReads.java @@ -6,11 +6,9 @@ package com.contoso.liveservicetests; +import com.azure.core.credential.TokenCredential; import com.contoso.helpers.HelperUtils; import com.microsoft.azure.datalake.store.*; - -import com.microsoft.azure.datalake.store.oauth2.AzureADAuthenticator; -import com.microsoft.azure.datalake.store.oauth2.AzureADToken; import org.junit.AfterClass; import org.junit.Assume; import org.junit.BeforeClass; @@ -22,7 +20,6 @@ import java.util.*; public class TestPositionedReads { - private final UUID instanceGuid = UUID.randomUUID(); private static String directory = null; private static ADLStoreClient client = null; @@ -31,22 +28,24 @@ public class TestPositionedReads { @BeforeClass public static void setup() throws IOException { Properties prop; - AzureADToken aadToken; prop = HelperUtils.getProperties(); - aadToken = AzureADAuthenticator.getTokenUsingClientCreds(prop.getProperty("OAuth2TokenUrl"), - prop.getProperty("ClientId"), - prop.getProperty("ClientSecret") ); - UUID guid = UUID.randomUUID(); - directory = "/" + prop.getProperty("dirName", "unitTests") + "/" + UUID.randomUUID(); - String account = prop.getProperty("StoreAcct") + ".azuredatalakestore.net"; - client = ADLStoreClient.createClient(account, aadToken); testsEnabled = Boolean.parseBoolean(prop.getProperty("PositionedReadsTestsEnabled", "true")); + String storeAccount = prop.getProperty("StoreAcct"); + if (!testsEnabled || storeAccount == null || storeAccount.trim().isEmpty()) { + Assume.assumeTrue("Skipping TestPositionedReads live tests", false); + } + TokenCredential credential = HelperUtils.buildCredential(prop); + directory = "/" + prop.getProperty("dirName", "unitTests") + "/" + UUID.randomUUID(); + String account = storeAccount + ".azuredatalakestore.net"; + client = ADLStoreClient.createClient(account, credential); } @AfterClass public static void teardown() throws IOException { - client.deleteRecursive(directory); + if (client != null && directory != null) { + client.deleteRecursive(directory); + } } @Test @@ -113,26 +112,4 @@ private static boolean checkByteAt(long pos, ADLFileInputStream in, byte[] conte return true; } } - - @Test - public void resizeBuffer() throws IOException { - Assume.assumeTrue(false); - String filename = directory + "/" + "PositionedReads.resizeBuffer.txt"; - - OutputStream stream = client.createFile(filename, IfExists.OVERWRITE); - byte[] content = HelperUtils.getSampleText1(); - stream.write(content); - stream.close(); - - ADLFileInputStream instr = client.getReadStream(filename); - byte[] b1 = new byte[200]; - instr.read(b1); - - instr.setBufferSize(87); - instr.read(b1); - - - - assertTrue("This test needs to be finished", false); - } } diff --git a/src/test/java/com/contoso/mocktests/TestSdkMock.java b/src/test/java/com/contoso/mocktests/TestSdkMock.java index 3e647a9..de22e2e 100644 --- a/src/test/java/com/contoso/mocktests/TestSdkMock.java +++ b/src/test/java/com/contoso/mocktests/TestSdkMock.java @@ -77,13 +77,13 @@ public void testExponentialRetryTiming() throws IOException { } long end = System.currentTimeMillis(); - long duration = end - start; - long expectedDuration = 85 * 1000; - long expectedDurationMin = expectedDuration - 1000; - long expectedDurationMax = expectedDuration + 1000; + long duration = end - start; + long expectedDuration = 85 * 1000; + long expectedDurationMin = expectedDuration - 1000; + long expectedDurationMax = expectedDuration + 1000; - assertTrue("Total time was more than max duration expected", expectedDuration < expectedDurationMax); - assertTrue("Total time was less than min duration expected", expectedDuration > expectedDurationMin); + assertTrue("Total time was more than max duration expected", duration < expectedDurationMax); + assertTrue("Total time was less than min duration expected", duration > expectedDurationMin); } @Test @@ -148,7 +148,6 @@ public void test500Then200PatternWithAtomicCreate() throws IOException { @Test public void testConnectionReset() throws IOException { - Assume.assumeTrue(false); // test half-done String filename = directory + "/" + "Mock.testConnectionReset"; server.enqueue(new MockResponse().setResponseCode(200)); // the first empty CREATE request From bd3f31ae929ca9e537780675a457131b00c72f7c Mon Sep 17 00:00:00 2001 From: Akshat Harit Date: Wed, 8 Oct 2025 11:02:00 -0700 Subject: [PATCH 2/2] Fix auto-gened code --- CHANGES.md | 3 + .../azure/datalake/store/ADLStoreClient.java | 90 +++++++++++-------- 2 files changed, 55 insertions(+), 38 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 9b4876f..9b2ecd0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,9 @@ 1. Replaced custom OAuth2 implementation with Azure Identity `TokenCredential` support. 2. Updated `ADLStoreClient` factory methods to accept Azure Identity credentials and added dependency on `com.azure:azure-identity` 1.17.0. 3. Updated live service tests to authenticate with `DefaultAzureCredential`. +4. Removed the deprecated `oauth2` module now that Azure Identity is the sole authentication mechanism. +5. Added an API to override the `TokenRequestContext` used for Azure Identity token acquisition with a default scope of `https://datalake.azure.net/.default`. +6. Simplified static access token handling to rely on caller-managed expiry. ### Version 2.3.10 1. Update log4j to mitigate CVE-2021-44228. Also update junit. diff --git a/src/main/java/com/microsoft/azure/datalake/store/ADLStoreClient.java b/src/main/java/com/microsoft/azure/datalake/store/ADLStoreClient.java index 252ac91..e1fcfd7 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/ADLStoreClient.java +++ b/src/main/java/com/microsoft/azure/datalake/store/ADLStoreClient.java @@ -39,8 +39,9 @@ public class ADLStoreClient { private final String accountFQDN; private final TokenCredential tokenCredential; - private final StaticTokenCredential staticTokenCredential; + private volatile String staticAccessToken; private AccessToken cachedAccessToken; + private volatile TokenRequestContext tokenRequestContextOverride; private static final Logger log = LoggerFactory.getLogger("com.microsoft.azure.datalake.store"); // package-default logging policy private static final AtomicLong clientIdCounter = new AtomicLong(0); private final long clientId; @@ -109,10 +110,17 @@ public class ADLStoreClient { // private constructor, references should be obtained using the createClient factory method - private ADLStoreClient(String accountFQDN, TokenCredential tokenCredential, StaticTokenCredential staticTokenCredential, long clientId) { + private ADLStoreClient(String accountFQDN, TokenCredential tokenCredential, String staticAccessToken, long clientId) { + if ((tokenCredential == null) == (staticAccessToken == null)) { + throw new IllegalArgumentException("Either tokenCredential or accessToken must be provided, but not both"); + } + if (staticAccessToken != null && staticAccessToken.trim().isEmpty()) { + throw new IllegalArgumentException("accessToken cannot be empty"); + } this.accountFQDN = accountFQDN; - this.tokenCredential = Objects.requireNonNull(tokenCredential, "tokenCredential cannot be null"); - this.staticTokenCredential = staticTokenCredential; + this.tokenCredential = tokenCredential; + this.staticAccessToken = staticAccessToken; + this.cachedAccessToken = null; this.clientId = clientId; this.userAgentString = userAgent; } @@ -149,13 +157,12 @@ public static ADLStoreClient createClient(String accountFQDN, String accessToken if (accountFQDN == null || accountFQDN.trim().equals("")) { throw new IllegalArgumentException("account name is required"); } - if (accessToken == null || accessToken.equals("")) { + if (accessToken == null || accessToken.trim().isEmpty()) { throw new IllegalArgumentException("token is required"); } long clientId = clientIdCounter.incrementAndGet(); log.trace("ADLStoreClient {} created for {} using SDK version {}", clientId, accountFQDN, sdkVersion); - StaticTokenCredential credential = new StaticTokenCredential(accessToken); - return new ADLStoreClient(accountFQDN, credential, credential, clientId); + return new ADLStoreClient(accountFQDN, null, accessToken, clientId); } ExponentialBackoffPolicy makeExponentialBackoffPolicy() { @@ -1009,6 +1016,18 @@ public synchronized void setOptions(ADLStoreOptions o) throws IOException { this.enableConditionalCreate = o.shouldEnableConditionalCreate(); } + /** + * Overrides the {@link TokenRequestContext} used when acquiring tokens from {@link TokenCredential}. + * + * @param tokenRequestContext context to use for token acquisition + */ + public synchronized void setTokenRequestContext(TokenRequestContext tokenRequestContext) { + if (tokenRequestContext == null) { + throw new IllegalArgumentException("tokenRequestContext cannot be null"); + } + this.tokenRequestContextOverride = cloneTokenRequestContext(tokenRequestContext); + } + /** * update token on existing client. @@ -1030,12 +1049,15 @@ public synchronized void updateToken(AccessToken token) { * @param accessToken The AAD Token string */ public synchronized void updateToken(String accessToken) { - if (this.staticTokenCredential == null) { + if (accessToken == null || accessToken.trim().isEmpty()) { + throw new IllegalArgumentException("token is required"); + } + if (this.staticAccessToken == null) { throw new UnsupportedOperationException("updateToken is only supported when the client was created with a static access token"); } log.trace("AAD Token Updated for client client {} for account {}", clientId, accountFQDN); - this.staticTokenCredential.setToken(accessToken); - this.cachedAccessToken = this.staticTokenCredential.getCurrentToken(); + this.staticAccessToken = accessToken; + this.cachedAccessToken = null; } /* ----------------------------------------------------------------------------------------------------------*/ @@ -1057,6 +1079,9 @@ synchronized int getReadAheadQueueDepth() { * @throws IOException thrown if a token provider is being used and the token provider has problem getting token */ synchronized String getAccessToken() throws IOException { + if (staticAccessToken != null) { + return "Bearer " + staticAccessToken; + } boolean needsRefresh = cachedAccessToken == null || shouldRefreshAccessToken(cachedAccessToken); if (needsRefresh) { OffsetDateTime previousExpiry = cachedAccessToken == null ? null : cachedAccessToken.getExpiresAt(); @@ -1248,9 +1273,11 @@ private static IOException getRemoteException(String className, String message) } private synchronized AccessToken acquireAccessToken() throws IOException { - try { + try + { log.debug("Client {} requesting token from credential", clientId); - AccessToken token = tokenCredential.getTokenSync(createTokenRequestContext()); + AccessToken token = Objects.requireNonNull(tokenCredential, "tokenCredential is required for dynamic token acquisition") + .getTokenSync(createTokenRequestContext()); if (token == null || token.getToken() == null || token.getToken().isEmpty()) { throw new IOException("TokenCredential returned an empty access token"); } @@ -1278,40 +1305,27 @@ private boolean shouldRefreshAccessToken(AccessToken token) { } private TokenRequestContext createTokenRequestContext() { + TokenRequestContext override = this.tokenRequestContextOverride; + if (override != null) { + return cloneTokenRequestContext(override); + } TokenRequestContext context = new TokenRequestContext(); context.addScopes(DATA_LAKE_SCOPE); return context; } - private static final class StaticTokenCredential implements TokenCredential { - private static final Duration STATIC_TOKEN_EXPIRY = Duration.ofDays(3650); - private volatile AccessToken token; - - private StaticTokenCredential(String accessToken) { - setToken(accessToken); + private static TokenRequestContext cloneTokenRequestContext(TokenRequestContext source) { + TokenRequestContext copy = new TokenRequestContext(); + if (source.getScopes() != null && !source.getScopes().isEmpty()) { + copy.addScopes(source.getScopes().toArray(new String[0])); } - - @Override - public reactor.core.publisher.Mono getToken(TokenRequestContext request) { - AccessToken current = token; - if (current == null) { - return reactor.core.publisher.Mono.error(new IllegalStateException("Access token not initialized")); - } - return reactor.core.publisher.Mono.just(current); + if (source.getClaims() != null && !source.getClaims().isEmpty()) { + copy.setClaims(source.getClaims()); } - - private synchronized void setToken(String accessToken) { - if (accessToken == null || accessToken.trim().isEmpty()) { - throw new IllegalArgumentException("accessToken is required"); - } - OffsetDateTime expires = OffsetDateTime.now(ZoneOffset.UTC).plus(STATIC_TOKEN_EXPIRY); - this.token = new AccessToken(accessToken, expires); - } - - private AccessToken getCurrentToken() { - return token; + if (source.getTenantId() != null && !source.getTenantId().isEmpty()) { + copy.setTenantId(source.getTenantId()); } + return copy; } - } \ No newline at end of file