Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
2374611
First draft
emmyzhou-db May 28, 2025
5148136
Update test
emmyzhou-db May 28, 2025
78c03a8
Clean up unit tests
emmyzhou-db May 30, 2025
5a37c59
Clean up comments
emmyzhou-db May 30, 2025
f5e4f8a
Add Javadoc to Token.java
emmyzhou-db May 30, 2025
3e65109
Add a token expiry buffer field
emmyzhou-db May 30, 2025
dfce414
Fix for comments
emmyzhou-db Jun 2, 2025
5bd4215
Update tests
emmyzhou-db Jun 3, 2025
93e0baf
Clean up tests
emmyzhou-db Jun 3, 2025
15e221d
Add logging
emmyzhou-db Jun 3, 2025
7589dab
Performance optimization
emmyzhou-db Jun 3, 2025
d97c734
Furter optimizations
emmyzhou-db Jun 3, 2025
105bc99
Add extra token state check in async refresh
emmyzhou-db Jun 3, 2025
b24a0fc
Change LocalDateTime to Instant
emmyzhou-db Jun 3, 2025
6f81b4c
Update parseExpiry in CilTokenSource
emmyzhou-db Jun 4, 2025
daac1b2
Update javadoc
emmyzhou-db Jun 4, 2025
f3d4b8a
Update javadoc
emmyzhou-db Jun 4, 2025
12123a9
Retrigger tests
emmyzhou-db Jun 5, 2025
def03c5
Merge branch 'main' into emmyzhou-db/localdatetime-to-instant
parthban-db Jun 5, 2025
8705ff5
Save progress
emmyzhou-db Jun 6, 2025
0ce06d8
Revert "Save progress"
emmyzhou-db Jun 6, 2025
fdc50ef
Removed redundant date formattters
emmyzhou-db Jun 6, 2025
48a5310
Change all usage of LocalDateTime to Instant
emmyzhou-db Jun 6, 2025
70934b2
Change clock supplier to use UTC time
emmyzhou-db Jun 6, 2025
1bd052f
Add support for space separated expiry strings
emmyzhou-db Jun 7, 2025
408f3b4
revert test data
emmyzhou-db Jun 7, 2025
14d2a8f
Update CilTokenSource
emmyzhou-db Jun 10, 2025
7fccff9
Update exception handling
emmyzhou-db Jun 11, 2025
64313f8
Update Javadoc
emmyzhou-db Jun 11, 2025
447eae2
Added more tests to CilTokenSourceTest
emmyzhou-db Jun 11, 2025
66335a7
Add test to verify perserved behaviour
emmyzhou-db Jun 12, 2025
3a824eb
Generate all timezones
emmyzhou-db Jun 12, 2025
c1367bf
Merge branch 'emmyzhou-db/test_time' into emmyzhou-db/localdatetime-t…
emmyzhou-db Jun 12, 2025
4d31c1e
Merge branch 'emmyzhou-db/test_time' into emmyzhou-db/localdatetime-t…
emmyzhou-db Jun 12, 2025
95a3c6d
Update test
emmyzhou-db Jun 12, 2025
a5c65c5
update tests
emmyzhou-db Jun 12, 2025
a165b72
Date formats are generated at run-time
emmyzhou-db Jun 12, 2025
d1c1a6c
Generate date formats at run-time
emmyzhou-db Jun 12, 2025
785c400
Update stream of test cases
emmyzhou-db Jun 13, 2025
f35988d
Merge branch 'emmyzhou-db/test_time' into emmyzhou-db/localdatetime-t…
emmyzhou-db Jun 13, 2025
9d5c85e
Add comment
emmyzhou-db Jun 13, 2025
2a7cd95
Add comment
emmyzhou-db Jun 13, 2025
257998c
Update comment
emmyzhou-db Jun 13, 2025
ac99bbe
Merge branch 'emmyzhou-db/test_time' into emmyzhou-db/localdatetime-t…
emmyzhou-db Jun 13, 2025
ceea58a
update stream of tests
emmyzhou-db Jun 13, 2025
0697fae
Polish comments
emmyzhou-db Jun 13, 2025
451bd27
Merge branch 'emmyzhou-db/localdatetime-to-instant' into emmyzhou-db/…
emmyzhou-db Jun 13, 2025
9845308
Merge branch 'main' into emmyzhou-db/async_token_cache
emmyzhou-db Jun 16, 2025
60c0890
Merge branch 'main' into emmyzhou-db/async_token_cache
emmyzhou-db Jun 16, 2025
490091c
Small fixes
emmyzhou-db Jun 16, 2025
1b88b29
More small fixes
emmyzhou-db Jun 16, 2025
c07669b
Update comment
emmyzhou-db Jun 16, 2025
d8db181
Small fix
emmyzhou-db Jun 16, 2025
87800e6
Rename SystemClockSupplier
emmyzhou-db Jun 16, 2025
99ba2b1
Update test to use TestClockSupplier instead
emmyzhou-db Jun 16, 2025
536a283
Small refactor of getToken
emmyzhou-db Jun 17, 2025
f6f0dd0
Trigger CI: empty commit
emmyzhou-db Jun 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,254 @@
import com.databricks.sdk.core.http.FormRequest;
import com.databricks.sdk.core.http.HttpClient;
import com.databricks.sdk.core.http.Request;
import com.databricks.sdk.core.utils.ClockSupplier;
import com.databricks.sdk.core.utils.UtcClockSupplier;
import java.time.Duration;
import java.time.Instant;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.http.HttpHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An OAuth TokenSource which can be refreshed.
*
* <p>Calls to getToken() will first check if the token is still valid (currently defined by having
* at least 10 seconds until expiry). If not, refresh() is called first to refresh the token.
* <p>This class supports both synchronous and asynchronous token refresh. When async is enabled,
* stale tokens will trigger a background refresh, while expired tokens will block until a new token
* is fetched.
*/
public abstract class RefreshableTokenSource implements TokenSource {
protected Token token;

/**
* Enum representing the state of the token. FRESH: Token is valid and not close to expiry. STALE:
* Token is valid but will expire soon - an async refresh will be triggered if enabled. EXPIRED:
* Token has expired and must be refreshed using a blocking call.
*/
protected enum TokenState {
FRESH,
STALE,
EXPIRED
}

private static final Logger logger = LoggerFactory.getLogger(RefreshableTokenSource.class);
// Default duration before expiry to consider a token as 'stale'.
private static final Duration DEFAULT_STALE_DURATION = Duration.ofMinutes(3);
// Default additional buffer before expiry to consider a token as expired.
private static final Duration DEFAULT_EXPIRY_BUFFER = Duration.ofSeconds(40);

// The current OAuth token. May be null if not yet fetched.
protected volatile Token token;
// Whether asynchronous refresh is enabled.
private boolean asyncEnabled = false;
// Duration before expiry to consider a token as 'stale'.
private Duration staleDuration = DEFAULT_STALE_DURATION;
// Additional buffer before expiry to consider a token as expired.
private Duration expiryBuffer = DEFAULT_EXPIRY_BUFFER;
// Whether a refresh is currently in progress (for async refresh).
private boolean refreshInProgress = false;
// Whether the last refresh attempt succeeded.
private boolean lastRefreshSucceeded = true;
// Clock supplier for current time.
private ClockSupplier clockSupplier = new UtcClockSupplier();

/** Constructs a new {@code RefreshableTokenSource} with no initial token. */
public RefreshableTokenSource() {}

/**
* Constructor with initial token.
*
* @param token The initial token to use.
*/
public RefreshableTokenSource(Token token) {
this.token = token;
}

/**
* Set the clock supplier for current time.
*
* <p><b>Experimental:</b> This method may change or be removed in future releases.
*
* @param clockSupplier The clock supplier to use.
* @return this instance for chaining
*/
public RefreshableTokenSource withClockSupplier(ClockSupplier clockSupplier) {
this.clockSupplier = clockSupplier;
return this;
}

/**
* Enable or disable asynchronous token refresh.
*
* <p><b>Experimental:</b> This method may change or be removed in future releases.
*
* @param enabled true to enable async refresh, false to disable
* @return this instance for chaining
*/
public RefreshableTokenSource withAsyncRefresh(boolean enabled) {
this.asyncEnabled = enabled;
return this;
}

/**
* Set the expiry buffer. If the token's lifetime is less than this buffer, it is considered
* expired.
*
* <p><b>Experimental:</b> This method may change or be removed in future releases.
*
* @param buffer the expiry buffer duration
* @return this instance for chaining
*/
public RefreshableTokenSource withExpiryBuffer(Duration buffer) {
this.expiryBuffer = buffer;
return this;
}

/**
* Refresh the OAuth token. Subclasses must implement this to define how the token is refreshed.
*
* <p>This method may throw an exception if the token cannot be refreshed. The specific exception
* type depends on the implementation.
*
* @return The newly refreshed Token.
*/
protected abstract Token refresh();

/**
* Gets the current token, refreshing if necessary. If async refresh is enabled, may return a
* stale token while a refresh is in progress.
*
* <p>This method may throw an exception if the token cannot be refreshed, depending on the
* implementation of {@link #refresh()}.
*
* @return The current valid token
*/
public Token getToken() {
if (asyncEnabled) {
return getTokenAsync();
}
return getTokenBlocking();
}

/**
* Determine the state of the current token (fresh, stale, or expired).
*
* @return The token state
*/
protected TokenState getTokenState(Token t) {
if (t == null) {
return TokenState.EXPIRED;
}
Duration lifeTime = Duration.between(Instant.now(clockSupplier.getClock()), t.getExpiry());
if (lifeTime.compareTo(expiryBuffer) <= 0) {
return TokenState.EXPIRED;
}
if (lifeTime.compareTo(staleDuration) <= 0) {
return TokenState.STALE;
}
return TokenState.FRESH;
}

/**
* Get the current token, blocking to refresh if expired.
*
* <p>This method may throw an exception if the token cannot be refreshed, depending on the
* implementation of {@link #refresh()}.
*
* @return The current valid token
*/
protected Token getTokenBlocking() {
// Use double-checked locking to minimize synchronization overhead on reads:
// 1. Check if the token is expired without locking.
// 2. If expired, synchronize and check again (another thread may have refreshed it).
// 3. If still expired, perform the refresh.
if (getTokenState(token) != TokenState.EXPIRED) {
return token;
}
synchronized (this) {
if (getTokenState(token) != TokenState.EXPIRED) {
return token;
}
lastRefreshSucceeded = false;
try {
token = refresh();
} catch (Exception e) {
logger.error("Failed to refresh token synchronously", e);
throw e;
}
lastRefreshSucceeded = true;
return token;
}
}

/**
* Get the current token, possibly triggering an async refresh if stale. If the token is expired,
* blocks to refresh.
*
* <p>This method may throw an exception if the token cannot be refreshed, depending on the
* implementation of {@link #refresh()}.
*
* @return The current valid or stale token
*/
protected Token getTokenAsync() {
Token currentToken = token;

switch (getTokenState(currentToken)) {
case FRESH:
return currentToken;
case STALE:
triggerAsyncRefresh();
return currentToken;
case EXPIRED:
return getTokenBlocking();
default:
throw new IllegalStateException("Invalid token state.");
}
}

/**
* Trigger an asynchronous refresh of the token if not already in progress and last refresh
* succeeded.
*/
private synchronized void triggerAsyncRefresh() {
// Check token state again inside the synchronized block to avoid triggering a refresh if
// another thread updated the token in the meantime.
if (!refreshInProgress && lastRefreshSucceeded && getTokenState(token) != TokenState.FRESH) {
refreshInProgress = true;
CompletableFuture.runAsync(
() -> {
try {
// Attempt to refresh the token in the background
Token newToken = refresh();
synchronized (this) {
token = newToken;
refreshInProgress = false;
}
} catch (Exception e) {
synchronized (this) {
lastRefreshSucceeded = false;
refreshInProgress = false;
logger.error("Asynchronous token refresh failed", e);
}
}
});
}
}

/**
* Helper method implementing OAuth token refresh.
*
* @param hc The HTTP client to use for the request.
* @param clientId The client ID to authenticate with.
* @param clientSecret The client secret to authenticate with.
* @param tokenUrl The authorization URL for fetching tokens.
* @param params Additional request parameters.
* @param headers Additional headers.
* @param position The position of the authentication parameters in the request.
* @return The newly fetched Token.
* @throws DatabricksException if the refresh fails
* @throws IllegalArgumentException if the OAuth response contains an error
*/
protected static Token retrieveToken(
HttpClient hc,
Expand Down Expand Up @@ -75,13 +293,4 @@ protected static Token retrieveToken(
throw new DatabricksException("Failed to refresh credentials: " + e.getMessage(), e);
}
}

protected abstract Token refresh();

public synchronized Token getToken() {
if (token == null || !token.isValid()) {
token = refresh();
}
return token;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.databricks.sdk.core.oauth;

import com.databricks.sdk.core.utils.ClockSupplier;
import com.databricks.sdk.core.utils.SystemClockSupplier;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant;
Expand All @@ -23,16 +21,9 @@ public class Token {
*/
@JsonProperty private Instant expiry;

private final ClockSupplier clockSupplier;

/** Constructor for non-refreshable tokens (e.g. M2M). */
public Token(String accessToken, String tokenType, Instant expiry) {
this(accessToken, tokenType, null, expiry, new SystemClockSupplier());
}

/** Constructor for non-refreshable tokens (e.g. M2M) with ClockSupplier */
public Token(String accessToken, String tokenType, Instant expiry, ClockSupplier clockSupplier) {
this(accessToken, tokenType, null, expiry, clockSupplier);
this(accessToken, tokenType, null, expiry);
}

/** Constructor for refreshable tokens. */
Expand All @@ -42,51 +33,48 @@ public Token(
@JsonProperty("tokenType") String tokenType,
@JsonProperty("refreshToken") String refreshToken,
@JsonProperty("expiry") Instant expiry) {
this(accessToken, tokenType, refreshToken, expiry, new SystemClockSupplier());
}

/** Constructor for refreshable tokens with ClockSupplier. */
public Token(
String accessToken,
String tokenType,
String refreshToken,
Instant expiry,
ClockSupplier clockSupplier) {
Objects.requireNonNull(accessToken, "accessToken must be defined");
Objects.requireNonNull(tokenType, "tokenType must be defined");
Objects.requireNonNull(expiry, "expiry must be defined");
Objects.requireNonNull(clockSupplier, "clockSupplier must be defined");
this.accessToken = accessToken;
this.tokenType = tokenType;
this.refreshToken = refreshToken;
this.expiry = expiry;
this.clockSupplier = clockSupplier;
}

public boolean isExpired() {
if (expiry == null) {
return false;
}
// Azure Databricks rejects tokens that expire in 30 seconds or less,
// so we refresh the token 40 seconds before it expires.
Instant potentiallyExpired = expiry.minusSeconds(40);
Instant now = Instant.now(clockSupplier.getClock());
return potentiallyExpired.isBefore(now);
}

public boolean isValid() {
return accessToken != null && !isExpired();
}

/**
* Returns the type of the token (e.g., "Bearer").
*
* @return the token type
*/
public String getTokenType() {
return tokenType;
}

/**
* Returns the refresh token, if available. May be null for non-refreshable tokens.
*
* @return the refresh token or null
*/
public String getRefreshToken() {
return refreshToken;
}

/**
* Returns the access token string.
*
* @return the access token
*/
public String getAccessToken() {
return accessToken;
}

/**
* Returns the expiry time of the token as a Instant.
*
* @return the expiry time
*/
public Instant getExpiry() {
return this.expiry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.time.Clock;

public class SystemClockSupplier implements ClockSupplier {
public class UtcClockSupplier implements ClockSupplier {
@Override
public Clock getClock() {
return Clock.systemUTC();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void testRefreshWithExpiry(
Token token = tokenSource.refresh();
assertEquals("Bearer", token.getTokenType());
assertEquals("test-token", token.getAccessToken());
assertEquals(shouldBeExpired, token.isExpired());
assertEquals(shouldBeExpired, token.getExpiry().isBefore(Instant.now()));
}
}
}
Expand Down
Loading
Loading