diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForPPAFUnitTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForPPAFUnitTests.java index 15da7043bd87..c2ef4713edb8 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForPPAFUnitTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForPPAFUnitTests.java @@ -4,7 +4,6 @@ package com.azure.cosmos; import com.azure.cosmos.implementation.AvailabilityStrategyContext; -import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.ConnectionPolicy; import com.azure.cosmos.implementation.CrossRegionAvailabilityContextForRxDocumentServiceRequest; import com.azure.cosmos.implementation.GlobalEndpointManager; @@ -18,7 +17,9 @@ import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList; import com.azure.cosmos.implementation.guava25.collect.ImmutableList; import com.azure.cosmos.implementation.perPartitionAutomaticFailover.GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover; -import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PartitionLevelFailoverInfo; +import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PartitionLevelAutomaticFailoverInfo; +import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionAutomaticFailoverInfoHolder; +import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder; import com.azure.cosmos.implementation.routing.RegionalRoutingContext; import com.azure.cosmos.rx.TestSuiteBase; import org.apache.commons.lang3.tuple.Pair; @@ -37,7 +38,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; @@ -148,11 +148,11 @@ public void tryMarkEndpointAsUnavailableForPartitionKeyRange( String maxExclusive = "BB"; String collectionResourceId = "dbs/db1/colls/coll1"; - Field failedRegionalRoutingContextsField = PartitionLevelFailoverInfo.class.getDeclaredField("failedRegionalRoutingContexts"); + Field failedRegionalRoutingContextsField = PartitionLevelAutomaticFailoverInfo.class.getDeclaredField("failedRegionalRoutingContexts"); assertThat(failedRegionalRoutingContextsField).isNotNull(); - Field currentRegionalContextField = PartitionLevelFailoverInfo.class.getDeclaredField("current"); + Field currentRegionalContextField = PartitionLevelAutomaticFailoverInfo.class.getDeclaredField("current"); assertThat(currentRegionalContextField).isNotNull(); @@ -362,11 +362,11 @@ private static void validateAllRegionsAreNotUnavailableAfterExceptionInLocation( logger.warn("Handling exception for {}", locationWithFailure.getPath()); globalPartitionEndpointManagerForPerPartitionAutomaticFailover.tryMarkEndpointAsUnavailableForPartitionKeyRange(request, false); - Field failedLocationsField = PartitionLevelFailoverInfo.class.getDeclaredField("failedRegionalRoutingContexts"); + Field failedLocationsField = PartitionLevelAutomaticFailoverInfo.class.getDeclaredField("failedRegionalRoutingContexts"); assertThat(failedLocationsField).isNotNull(); - Field currentField = PartitionLevelFailoverInfo.class.getDeclaredField("current"); + Field currentField = PartitionLevelAutomaticFailoverInfo.class.getDeclaredField("current"); assertThat(currentField).isNotNull(); @@ -421,8 +421,10 @@ private RxDocumentServiceRequest constructRxDocumentServiceRequestInstance( false, collectionLink, new SerializationDiagnosticsContext()), - new AvailabilityStrategyContext(true, true) - ) + new AvailabilityStrategyContext(true, true), + new AtomicBoolean(false), + new PerPartitionCircuitBreakerInfoHolder(), + new PerPartitionAutomaticFailoverInfoHolder()) ); return request; diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForPPCBUnitTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForPPCBUnitTests.java index 11d14758f352..cc7d589836eb 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForPPCBUnitTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/GlobalPartitionEndpointManagerForPPCBUnitTests.java @@ -15,10 +15,12 @@ import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.SerializationDiagnosticsContext; import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList; +import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionAutomaticFailoverInfoHolder; import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker; import com.azure.cosmos.implementation.perPartitionCircuitBreaker.LocationHealthStatus; import com.azure.cosmos.implementation.perPartitionCircuitBreaker.LocationSpecificHealthContext; import com.azure.cosmos.implementation.guava25.collect.ImmutableList; +import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder; import com.azure.cosmos.implementation.routing.RegionalRoutingContext; import org.apache.commons.lang3.tuple.Pair; import org.mockito.Mockito; @@ -1056,7 +1058,10 @@ private RxDocumentServiceRequest constructRxDocumentServiceRequestInstance( false, collectionLink, new SerializationDiagnosticsContext()), - new AvailabilityStrategyContext(false, false))); + new AvailabilityStrategyContext(false, false), + new AtomicBoolean(false), + new PerPartitionCircuitBreakerInfoHolder(), + new PerPartitionAutomaticFailoverInfoHolder())); return request; } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/HubRegionProcessingOnlyTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/HubRegionProcessingOnlyTests.java new file mode 100644 index 000000000000..cd4fc3248d6b --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/HubRegionProcessingOnlyTests.java @@ -0,0 +1,796 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos; + +import com.azure.cosmos.implementation.AsyncDocumentClient; +import com.azure.cosmos.implementation.DatabaseAccount; +import com.azure.cosmos.implementation.DatabaseAccountLocation; +import com.azure.cosmos.implementation.GlobalEndpointManager; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; +import com.azure.cosmos.models.CosmosItemIdentity; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosReadManyRequestOptions; +import com.azure.cosmos.models.FeedRange; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.rx.TestSuiteBase; +import com.fasterxml.jackson.databind.JsonNode; +import com.azure.cosmos.test.faultinjection.CosmosFaultInjectionHelper; +import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder; +import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionType; +import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; +import com.azure.cosmos.test.faultinjection.FaultInjectionResultBuilders; +import com.azure.cosmos.test.faultinjection.FaultInjectionRule; +import com.azure.cosmos.test.faultinjection.FaultInjectionRuleBuilder; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorResult; +import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; +import org.testng.SkipException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for validating hub region processing behavior when using the hub region processing only header. + * These tests verify that when a read operation encounters a 404-1002 error in a non-hub region, + * the operation correctly fails over to the partition-set level hub region. + */ +public class HubRegionProcessingOnlyTests extends TestSuiteBase { + + private volatile CosmosAsyncDatabase database; + private volatile CosmosAsyncContainer container; + private List preferredRegions; + private Map regionNameToEndpoint; + private String partitionKeyValue = "12345"; + + private static final ImplementationBridgeHelpers.CosmosAsyncClientHelper.CosmosAsyncClientAccessor cosmosAsyncClientAccessor + = ImplementationBridgeHelpers.CosmosAsyncClientHelper.getCosmosAsyncClientAccessor(); + + @Factory(dataProvider = "clientBuildersWithDirectSession") + public HubRegionProcessingOnlyTests(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"multi-region"}) + public void beforeClass() { + try (CosmosAsyncClient dummy = getClientBuilder().buildAsyncClient()) { + AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(dummy); + GlobalEndpointManager globalEndpointManager = asyncDocumentClient.getGlobalEndpointManager(); + + DatabaseAccount databaseAccount = globalEndpointManager.getLatestDatabaseAccount(); + + AccountLevelLocationContext accountLevelContext = getAccountLevelLocationContext(databaseAccount, false); + + // Ensure we have at least 3 regions for this test + if (accountLevelContext.serviceOrderedReadableRegions.size() < 3) { + throw new SkipException("Test requires at least 3 readable regions"); + } + + // Set preferred regions - start with third region, then first, then second + this.preferredRegions = new ArrayList<>(); + this.preferredRegions.add(accountLevelContext.serviceOrderedReadableRegions.get(0)); // Third region + this.preferredRegions.add(accountLevelContext.serviceOrderedReadableRegions.get(2)); // First region + this.preferredRegions.add(accountLevelContext.serviceOrderedReadableRegions.get(1)); // Second region + + this.regionNameToEndpoint = accountLevelContext.regionNameToEndpoint; + this.database = getSharedCosmosDatabase(dummy); + this.container = getSharedSinglePartitionCosmosContainer(dummy); + } + } + + @AfterClass(groups = {"multi-region"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + } + + /** + * Validates that when a read operation encounters a 404-1002 (READ_SESSION_NOT_AVAILABLE) error + * in a non-hub region, the operation correctly identifies and contacts the partition-set level hub region. + * + *

Test Flow:

+ *
    + *
  1. Pre-creates a container (expected to already exist)
  2. + *
  3. Injects 404-1002 fault on document reads from the third preferred region (first in preferred list)
  4. + *
  5. Performs a read operation and lets it complete
  6. + *
  7. Extracts the CosmosDiagnosticsContext to determine which regions were contacted
  8. + *
  9. Validates that the partition-set level hub region was contacted
  10. + *
  11. Determines the partition-set hub region as ground truth by: + *
      + *
    • Creating another CosmosClient instance
    • + *
    • Performing an Upsert on the same partition-set
    • + *
    • Extracting the region from which success is obtained
    • + *
    • Using that region as the base truth for validation in step 4
    • + *
    + *
  12. + *
+ * + *

Expected Behavior:

+ * + * + * @throws Exception if test setup fails or unexpected errors occur + */ + @Test(groups = {"multi-region"}) + public void validateHubRegionProcessingOnReadItemWith404_1002() throws Exception { + + // Skip if we don't have at least 3 regions + if (this.preferredRegions.size() < 3) { + throw new SkipException("Test requires at least 3 readable regions"); + } + + System.setProperty("COSMOS.IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF", "false"); + + // Create test client with preferred regions (third region first) + CosmosAsyncClient testClient = getClientBuilder() + .preferredRegions(this.preferredRegions) + .consistencyLevel(ConsistencyLevel.SESSION) + .sessionRetryOptions(new SessionRetryOptionsBuilder().regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED).build()) + .buildAsyncClient(); + + ConnectionMode connectionModeForTestClient + = ConnectionMode.valueOf(cosmosAsyncClientAccessor.getConnectionMode(testClient)); + + try { + + String databaseId = "testDatabase"; + String containerId = "testContainer"; + + // Step 1: Container is pre-created (using shared container) + CosmosAsyncDatabase targetDatabase = testClient.getDatabase(databaseId); + CosmosAsyncContainer targetContainer = targetDatabase.getContainer(containerId); + + TestObject testObject = TestObject.create(this.partitionKeyValue); + + // Create the document using the test client + targetContainer.createItem(testObject).block(); + + // Step 4: Determine partition-set hub region as base truth + String hubRegion = determinePartitionSetHubRegion(this.partitionKeyValue, databaseId, containerId); + + logger.info("Determined hub region for partition '{}': {}", this.partitionKeyValue, hubRegion); + + // Step 2: Inject 404-1002 fault in the third preferred region (first in list) + String thirdRegion = this.preferredRegions.get(0); // Third region is first in our preferred list + injectReadSessionNotAvailableError( + targetContainer, + thirdRegion, + FaultInjectionOperationType.READ_ITEM, + ConnectionMode.DIRECT.equals(connectionModeForTestClient) ? FaultInjectionConnectionType.DIRECT : FaultInjectionConnectionType.GATEWAY); + + // Wait a bit for fault injection to be active + Thread.sleep(1000); + + // Step 3: Perform read operation and let it complete + CosmosItemRequestOptions requestOptions = new CosmosItemRequestOptions(); + CosmosItemResponse response = targetContainer + .readItem(testObject.getId(), new PartitionKey(this.partitionKeyValue), requestOptions, TestObject.class) + .block(); + + // Validate operation succeeded + assertThat(response).isNotNull(); + assertThat(response.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.OK); + + // Step 4: Extract CosmosDiagnosticsContext and validate contacted regions + CosmosDiagnostics diagnostics = response.getDiagnostics(); + assertThat(diagnostics).isNotNull(); + + CosmosDiagnosticsContext diagnosticsContext = diagnostics.getDiagnosticsContext(); + assertThat(diagnosticsContext).isNotNull(); + assertThat(diagnosticsContext.getContactedRegionNames()).isNotNull(); + assertThat(diagnosticsContext.getContactedRegionNames()).isNotEmpty(); + + logger.info("ReadItem contacted regions: {}", diagnosticsContext.getContactedRegionNames()); + + // Validate that hub region was contacted + assertThat(diagnosticsContext.getContactedRegionNames()) + .as("Hub region should be contacted") + .contains(hubRegion); + + // Validate that we contacted more than one region (due to failover from third region) + assertThat(diagnosticsContext.getContactedRegionNames().size()) + .as("Should contact multiple regions due to failover") + .isGreaterThan(1); + + // Validate Create operation only contacts the hub region + TestObject createTestObject = TestObject.create(this.partitionKeyValue); + CosmosItemResponse createResponse = targetContainer.createItem(createTestObject).block(); + assertThat(createResponse).isNotNull(); + assertThat(createResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.CREATED); + CosmosDiagnostics createDiagnostics = createResponse.getDiagnostics(); + assertThat(createDiagnostics).isNotNull(); + CosmosDiagnosticsContext createDiagnosticsContext = createDiagnostics.getDiagnosticsContext(); + assertThat(createDiagnosticsContext).isNotNull(); + logger.info("Create contacted regions: {}", createDiagnosticsContext.getContactedRegionNames()); + assertThat(createDiagnosticsContext.getContactedRegionNames()) + .as("Create should only contact hub region") + .containsExactly(hubRegion); + + } finally { + safeClose(testClient); + } + } + + /** + * Validates that when a Query operation encounters a 404-1002 (READ_SESSION_NOT_AVAILABLE) error + * in a non-hub region, the operation correctly identifies and contacts the partition-set level hub region. + * + *

The query is scoped to partition key "12345".

+ * + * @throws Exception if test setup fails or unexpected errors occur + */ + @Test(groups = {"multi-region"}) + public void validateHubRegionProcessingOnQueryWith404_1002() throws Exception { + + // Skip if we don't have at least 3 regions + if (this.preferredRegions.size() < 3) { + throw new SkipException("Test requires at least 3 readable regions"); + } + + System.setProperty("COSMOS.IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF", "false"); + + // Create test client with preferred regions (third region first) + CosmosAsyncClient testClient = getClientBuilder() + .preferredRegions(this.preferredRegions) + .consistencyLevel(ConsistencyLevel.SESSION) + .sessionRetryOptions(new SessionRetryOptionsBuilder().regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED).build()) + .buildAsyncClient(); + + ConnectionMode connectionModeForTestClient + = ConnectionMode.valueOf(cosmosAsyncClientAccessor.getConnectionMode(testClient)); + + try { + + String databaseId = "testDatabase"; + String containerId = "testContainer"; + + CosmosAsyncDatabase targetDatabase = testClient.getDatabase(databaseId); + CosmosAsyncContainer targetContainer = targetDatabase.getContainer(containerId); + + TestObject testObject = TestObject.create(this.partitionKeyValue); + + // Create the document using the test client + targetContainer.createItem(testObject).block(); + + // Determine partition-set hub region as base truth + String hubRegion = determinePartitionSetHubRegion(this.partitionKeyValue, databaseId, containerId); + + logger.info("Determined hub region for partition '{}': {}", this.partitionKeyValue, hubRegion); + + // Inject 404-1002 fault in the third preferred region (first in list) + String thirdRegion = this.preferredRegions.get(0); + injectReadSessionNotAvailableError( + targetContainer, + thirdRegion, + FaultInjectionOperationType.QUERY_ITEM, + ConnectionMode.DIRECT.equals(connectionModeForTestClient) ? FaultInjectionConnectionType.DIRECT : FaultInjectionConnectionType.GATEWAY); + + Thread.sleep(1000); + + logger.info("Testing Query operation with partition key '{}'", this.partitionKeyValue); + + CosmosQueryRequestOptions queryRequestOptions = new CosmosQueryRequestOptions(); + queryRequestOptions.setPartitionKey(new PartitionKey(this.partitionKeyValue)); + + String query = "SELECT * FROM c WHERE c.mypk = '" + this.partitionKeyValue + "'"; + List queryResults = targetContainer + .queryItems(query, queryRequestOptions, TestObject.class) + .byPage() + .flatMapIterable(FeedResponse::getResults) + .collectList() + .block(); + + assertThat(queryResults).isNotNull(); + assertThat(queryResults).isNotEmpty(); + + // Get diagnostics from query - need to use byPage to access diagnostics + CosmosDiagnostics queryDiagnostics = targetContainer + .queryItems(query, queryRequestOptions, TestObject.class) + .byPage() + .blockLast() + .getCosmosDiagnostics(); + + assertThat(queryDiagnostics).isNotNull(); + CosmosDiagnosticsContext queryDiagnosticsContext = queryDiagnostics.getDiagnosticsContext(); + assertThat(queryDiagnosticsContext).isNotNull(); + + logger.info("Query contacted regions: {}", queryDiagnosticsContext.getContactedRegionNames()); + + assertThat(queryDiagnosticsContext.getContactedRegionNames()) + .as("Hub region should be contacted for Query") + .contains(hubRegion); + + // Validate Create operation only contacts the hub region + TestObject createTestObject = TestObject.create(this.partitionKeyValue); + CosmosItemResponse createResponse = targetContainer.createItem(createTestObject).block(); + assertThat(createResponse).isNotNull(); + assertThat(createResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.CREATED); + CosmosDiagnostics createDiagnostics = createResponse.getDiagnostics(); + assertThat(createDiagnostics).isNotNull(); + CosmosDiagnosticsContext createDiagnosticsContext = createDiagnostics.getDiagnosticsContext(); + assertThat(createDiagnosticsContext).isNotNull(); + logger.info("Create contacted regions: {}", createDiagnosticsContext.getContactedRegionNames()); + assertThat(createDiagnosticsContext.getContactedRegionNames()) + .as("Create should only contact hub region") + .containsExactly(hubRegion); + + } finally { + safeClose(testClient); + } + } + + /** + * Validates that when a Change Feed operation encounters a 404-1002 (READ_SESSION_NOT_AVAILABLE) error + * in a non-hub region, the operation correctly identifies and contacts the partition-set level hub region. + * + *

The Change Feed is read from the beginning and scoped to partition key "12345".

+ * + * @throws Exception if test setup fails or unexpected errors occur + */ + @Test(groups = {"multi-region"}) + public void validateHubRegionProcessingOnChangeFeedWith404_1002() throws Exception { + + // Skip if we don't have at least 3 regions + if (this.preferredRegions.size() < 3) { + throw new SkipException("Test requires at least 3 readable regions"); + } + + System.setProperty("COSMOS.IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF", "false"); + + // Create test client with preferred regions (third region first) + CosmosAsyncClient testClient = getClientBuilder() + .preferredRegions(this.preferredRegions) + .consistencyLevel(ConsistencyLevel.SESSION) + .sessionRetryOptions(new SessionRetryOptionsBuilder().regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED).build()) + .buildAsyncClient(); + + ConnectionMode connectionModeForTestClient + = ConnectionMode.valueOf(cosmosAsyncClientAccessor.getConnectionMode(testClient)); + + try { + + String databaseId = "testDatabase"; + String containerId = "testContainer"; + + CosmosAsyncDatabase targetDatabase = testClient.getDatabase(databaseId); + CosmosAsyncContainer targetContainer = targetDatabase.getContainer(containerId); + + TestObject testObject = TestObject.create(this.partitionKeyValue); + + // Create the document using the test client + targetContainer.createItem(testObject).block(); + + // Determine partition-set hub region as base truth + String hubRegion = determinePartitionSetHubRegion(this.partitionKeyValue, databaseId, containerId); + + logger.info("Determined hub region for partition '{}': {}", this.partitionKeyValue, hubRegion); + + // Inject 404-1002 fault in the third preferred region (first in list) + String thirdRegion = this.preferredRegions.get(0); + injectReadSessionNotAvailableError( + targetContainer, + thirdRegion, + FaultInjectionOperationType.READ_FEED_ITEM, + ConnectionMode.DIRECT.equals(connectionModeForTestClient) ? FaultInjectionConnectionType.DIRECT : FaultInjectionConnectionType.GATEWAY); + + Thread.sleep(1000); + + logger.info("Testing Change Feed operation with partition key '{}'", this.partitionKeyValue); + + // Create ChangeFeedRequestOptions scoped to the specific partition key + CosmosChangeFeedRequestOptions changeFeedRequestOptions = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(this.partitionKeyValue))); + + List changeFeedResults = new ArrayList<>(); + FeedResponse changeFeedResponse = targetContainer + .queryChangeFeed(changeFeedRequestOptions, JsonNode.class) + .byPage() + .blockLast(); + + assertThat(changeFeedResponse).isNotNull(); + changeFeedResults.addAll(changeFeedResponse.getResults()); + + CosmosDiagnostics changeFeedDiagnostics = changeFeedResponse.getCosmosDiagnostics(); + assertThat(changeFeedDiagnostics).isNotNull(); + CosmosDiagnosticsContext changeFeedDiagnosticsContext = changeFeedDiagnostics.getDiagnosticsContext(); + assertThat(changeFeedDiagnosticsContext).isNotNull(); + + logger.info("Change Feed contacted regions: {}", changeFeedDiagnosticsContext.getContactedRegionNames()); + + // Change Feed should return 200 or 304 (Not Modified if no changes) + int changeFeedStatusCode = changeFeedDiagnosticsContext.getStatusCode(); + assertThat(changeFeedStatusCode) + .as("Change Feed should return 200 or 304") + .isIn(HttpConstants.StatusCodes.OK, HttpConstants.StatusCodes.NOT_MODIFIED); + + assertThat(changeFeedDiagnosticsContext.getContactedRegionNames()) + .as("Hub region should be contacted for Change Feed") + .contains(hubRegion); + + // Validate Create operation only contacts the hub region + TestObject createTestObject = TestObject.create(this.partitionKeyValue); + CosmosItemResponse createResponse = targetContainer.createItem(createTestObject).block(); + assertThat(createResponse).isNotNull(); + assertThat(createResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.CREATED); + CosmosDiagnostics createDiagnostics = createResponse.getDiagnostics(); + assertThat(createDiagnostics).isNotNull(); + CosmosDiagnosticsContext createDiagnosticsContext = createDiagnostics.getDiagnosticsContext(); + assertThat(createDiagnosticsContext).isNotNull(); + logger.info("Create contacted regions: {}", createDiagnosticsContext.getContactedRegionNames()); + assertThat(createDiagnosticsContext.getContactedRegionNames()) + .as("Create should only contact hub region") + .containsExactly(hubRegion); + + } finally { + safeClose(testClient); + } + } + + /** + * Validates that when a readMany operation encounters a 404-1002 (READ_SESSION_NOT_AVAILABLE) error + * in a non-hub region, the operation correctly identifies and contacts the partition-set level hub region. + * + *

The readMany operation is scoped to partition key "12345".

+ * + * @throws Exception if test setup fails or unexpected errors occur + */ + @Test(groups = {"multi-region"}) + public void validateHubRegionProcessingOnReadManyWith404_1002() throws Exception { + + // Skip if we don't have at least 3 regions + if (this.preferredRegions.size() < 3) { + throw new SkipException("Test requires at least 3 readable regions"); + } + + System.setProperty("COSMOS.IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF", "false"); + + // Create test client with preferred regions (third region first) + CosmosAsyncClient testClient = getClientBuilder() + .preferredRegions(this.preferredRegions) + .consistencyLevel(ConsistencyLevel.SESSION) + .sessionRetryOptions(new SessionRetryOptionsBuilder().regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED).build()) + .buildAsyncClient(); + + ConnectionMode connectionModeForTestClient + = ConnectionMode.valueOf(cosmosAsyncClientAccessor.getConnectionMode(testClient)); + + try { + + String databaseId = "testDatabase"; + String containerId = "testContainer"; + + CosmosAsyncDatabase targetDatabase = testClient.getDatabase(databaseId); + CosmosAsyncContainer targetContainer = targetDatabase.getContainer(containerId); + + TestObject testObject = TestObject.create(this.partitionKeyValue); + + // Create the document using the test client + targetContainer.createItem(testObject).block(); + + // Determine partition-set hub region as base truth + String hubRegion = determinePartitionSetHubRegion(this.partitionKeyValue, databaseId, containerId); + + logger.info("Determined hub region for partition '{}': {}", this.partitionKeyValue, hubRegion); + + // Inject 404-1002 fault in the third preferred region (first in list) + String thirdRegion = this.preferredRegions.get(0); + injectReadSessionNotAvailableError( + targetContainer, + thirdRegion, + FaultInjectionOperationType.READ_ITEM, + ConnectionMode.DIRECT.equals(connectionModeForTestClient) ? FaultInjectionConnectionType.DIRECT : FaultInjectionConnectionType.GATEWAY); + + Thread.sleep(1000); + + logger.info("Testing readMany operation with partition key '{}'", this.partitionKeyValue); + + List itemIdentities = new ArrayList<>(); + itemIdentities.add(new CosmosItemIdentity( + new PartitionKey(this.partitionKeyValue), + testObject.getId() + )); + + FeedResponse readManyResponse = targetContainer + .readMany(itemIdentities, new CosmosReadManyRequestOptions(), TestObject.class) + .block(); + + assertThat(readManyResponse).isNotNull(); + assertThat(readManyResponse.getResults()).isNotEmpty(); + + CosmosDiagnostics readManyDiagnostics = readManyResponse.getCosmosDiagnostics(); + assertThat(readManyDiagnostics).isNotNull(); + CosmosDiagnosticsContext readManyDiagnosticsContext = readManyDiagnostics.getDiagnosticsContext(); + assertThat(readManyDiagnosticsContext).isNotNull(); + + logger.info("ReadMany contacted regions: {}", readManyDiagnosticsContext.getContactedRegionNames()); + + assertThat(readManyDiagnosticsContext.getStatusCode()) + .as("ReadMany should return 200") + .isEqualTo(HttpConstants.StatusCodes.OK); + + assertThat(readManyDiagnosticsContext.getContactedRegionNames()) + .as("Hub region should be contacted for readMany") + .contains(hubRegion); + + // Validate Create operation only contacts the hub region + TestObject createTestObject = TestObject.create(this.partitionKeyValue); + CosmosItemResponse createResponse = targetContainer.createItem(createTestObject).block(); + assertThat(createResponse).isNotNull(); + assertThat(createResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.CREATED); + CosmosDiagnostics createDiagnostics = createResponse.getDiagnostics(); + assertThat(createDiagnostics).isNotNull(); + CosmosDiagnosticsContext createDiagnosticsContext = createDiagnostics.getDiagnosticsContext(); + assertThat(createDiagnosticsContext).isNotNull(); + logger.info("Create contacted regions: {}", createDiagnosticsContext.getContactedRegionNames()); + assertThat(createDiagnosticsContext.getContactedRegionNames()) + .as("Create should only contact hub region") + .containsExactly(hubRegion); + + } finally { + safeClose(testClient); + } + } + + /** + * Validates that when a readAll operation encounters a 404-1002 (READ_SESSION_NOT_AVAILABLE) error + * in a non-hub region, the operation correctly identifies and contacts the partition-set level hub region. + * + *

The readAll operation is scoped to partition key "12345".

+ * + * @throws Exception if test setup fails or unexpected errors occur + */ + @Test(groups = {"multi-region"}) + public void validateHubRegionProcessingOnReadAllWith404_1002() throws Exception { + + // Skip if we don't have at least 3 regions + if (this.preferredRegions.size() < 3) { + throw new SkipException("Test requires at least 3 readable regions"); + } + + System.setProperty("COSMOS.IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF", "false"); + + // Create test client with preferred regions (third region first) + CosmosAsyncClient testClient = getClientBuilder() + .preferredRegions(this.preferredRegions) + .consistencyLevel(ConsistencyLevel.SESSION) + .sessionRetryOptions(new SessionRetryOptionsBuilder().regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED).build()) + .buildAsyncClient(); + + ConnectionMode connectionModeForTestClient + = ConnectionMode.valueOf(cosmosAsyncClientAccessor.getConnectionMode(testClient)); + + try { + + String databaseId = "testDatabase"; + String containerId = "testContainer"; + + CosmosAsyncDatabase targetDatabase = testClient.getDatabase(databaseId); + CosmosAsyncContainer targetContainer = targetDatabase.getContainer(containerId); + + TestObject testObject = TestObject.create(this.partitionKeyValue); + + // Create the document using the test client + targetContainer.createItem(testObject).block(); + + // Determine partition-set hub region as base truth + String hubRegion = determinePartitionSetHubRegion(this.partitionKeyValue, databaseId, containerId); + + logger.info("Determined hub region for partition '{}': {}", this.partitionKeyValue, hubRegion); + + // Inject 404-1002 fault in the third preferred region (first in list) + // readAll uses QUERY_ITEM fault injection type + String thirdRegion = this.preferredRegions.get(0); + injectReadSessionNotAvailableError( + targetContainer, + thirdRegion, + FaultInjectionOperationType.QUERY_ITEM, + ConnectionMode.DIRECT.equals(connectionModeForTestClient) ? FaultInjectionConnectionType.DIRECT : FaultInjectionConnectionType.GATEWAY); + + Thread.sleep(1000); + + logger.info("Testing readAll operation with partition key '{}'", this.partitionKeyValue); + + CosmosQueryRequestOptions readAllRequestOptions = new CosmosQueryRequestOptions(); + readAllRequestOptions.setPartitionKey(new PartitionKey(this.partitionKeyValue)); + + FeedResponse readAllResponse = targetContainer + .readAllItems(readAllRequestOptions, TestObject.class) + .byPage() + .blockLast(); + + assertThat(readAllResponse).isNotNull(); + assertThat(readAllResponse.getResults()).isNotEmpty(); + + CosmosDiagnostics readAllDiagnostics = readAllResponse.getCosmosDiagnostics(); + assertThat(readAllDiagnostics).isNotNull(); + CosmosDiagnosticsContext readAllDiagnosticsContext = readAllDiagnostics.getDiagnosticsContext(); + assertThat(readAllDiagnosticsContext).isNotNull(); + + logger.info("ReadAll contacted regions: {}", readAllDiagnosticsContext.getContactedRegionNames()); + + assertThat(readAllDiagnosticsContext.getStatusCode()) + .as("ReadAll should return 200") + .isEqualTo(HttpConstants.StatusCodes.OK); + + assertThat(readAllDiagnosticsContext.getContactedRegionNames()) + .as("Hub region should be contacted for readAll") + .contains(hubRegion); + + // Validate Create operation only contacts the hub region + TestObject createTestObject = TestObject.create(this.partitionKeyValue); + CosmosItemResponse createResponse = targetContainer.createItem(createTestObject).block(); + assertThat(createResponse).isNotNull(); + assertThat(createResponse.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.CREATED); + CosmosDiagnostics createDiagnostics = createResponse.getDiagnostics(); + assertThat(createDiagnostics).isNotNull(); + CosmosDiagnosticsContext createDiagnosticsContext = createDiagnostics.getDiagnosticsContext(); + assertThat(createDiagnosticsContext).isNotNull(); + logger.info("Create contacted regions: {}", createDiagnosticsContext.getContactedRegionNames()); + assertThat(createDiagnosticsContext.getContactedRegionNames()) + .as("Create should only contact hub region") + .containsExactly(hubRegion); + + } finally { + safeClose(testClient); + } + } + + /** + * Determines the partition-set hub region by creating a separate client and performing an upsert operation. + * The region that successfully processes the upsert is considered the hub region for that partition-set. + * + * @param partitionKeyValue the partition key value to test + * @return the name of the hub region + */ + private String determinePartitionSetHubRegion(String partitionKeyValue, String databaseId, String containerId) { + // Create a client with a different preferred region order to find the hub + List hubDiscoveryRegions = new ArrayList<>(this.preferredRegions); + + CosmosAsyncClient hubDiscoveryClient = getClientBuilder() + .preferredRegions(hubDiscoveryRegions) + .consistencyLevel(ConsistencyLevel.SESSION) + .buildAsyncClient(); + + try { + CosmosAsyncDatabase targetDatabase = hubDiscoveryClient.getDatabase(databaseId); + CosmosAsyncContainer targetContainer = targetDatabase.getContainer(containerId); + + // Perform an upsert on the same partition-set + String testDocId = UUID.randomUUID().toString(); + TestObject testObject = TestObject.create(partitionKeyValue); + + CosmosItemResponse upsertResponse = targetContainer + .upsertItem(testObject) + .block(); + + assertThat(upsertResponse).isNotNull(); + assertThat(upsertResponse.getStatusCode()).isIn( + HttpConstants.StatusCodes.OK, + HttpConstants.StatusCodes.CREATED); + + // Extract the region from diagnostics + CosmosDiagnostics diagnostics = upsertResponse.getDiagnostics(); + assertThat(diagnostics).isNotNull(); + + CosmosDiagnosticsContext diagnosticsContext = diagnostics.getDiagnosticsContext(); + + assertThat(diagnosticsContext).isNotNull(); + assertThat(diagnosticsContext.getContactedRegionNames()).isNotNull(); + assertThat(diagnosticsContext.getContactedRegionNames()).isNotEmpty(); + + TreeSet contactedRegionNames = (TreeSet) diagnosticsContext.getContactedRegionNames(); + + return contactedRegionNames.pollLast(); + } finally { + safeClose(hubDiscoveryClient); + } + } + + /** + * Injects READ_SESSION_NOT_AVAILABLE (404-1002) errors into a specific region for read operations. + * + * @param container the container to inject faults into + * @param region the region where faults should be injected + * @param faultInjectionOperationType the operation type to inject faults for + */ + private void injectReadSessionNotAvailableError( + CosmosAsyncContainer container, + String region, + FaultInjectionOperationType faultInjectionOperationType, + FaultInjectionConnectionType faultInjectionConnectionType) { + + String ruleName = "serverErrorRule-read-session-unavailable-" + UUID.randomUUID(); + + FaultInjectionServerErrorResult errorResult = FaultInjectionResultBuilders + .getResultBuilder(FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE) + .times(FaultInjectionConnectionType.DIRECT.equals(faultInjectionConnectionType) ? 8 : 1) + .build(); + + FaultInjectionRule rule = new FaultInjectionRuleBuilder(ruleName) + .condition( + new FaultInjectionConditionBuilder() + .operationType(faultInjectionOperationType) + .connectionType(faultInjectionConnectionType) + .region(region) + .build() + ) + .result(errorResult) + .duration(Duration.ofSeconds(60)) + .build(); + + List rules = new ArrayList<>(); + rules.add(rule); + + CosmosFaultInjectionHelper + .configureFaultInjectionRules(container, rules) + .block(); + + logger.info("Injected 404-1002 fault for region '{}', operationType '{}'", region, faultInjectionOperationType); + } + + /** + * Retrieves account-level location context including readable/writable regions and their endpoints. + * + * @param databaseAccount the database account + * @param writeOnly whether to get write regions only + * @return account level location context + */ + private AccountLevelLocationContext getAccountLevelLocationContext(DatabaseAccount databaseAccount, boolean writeOnly) { + Iterator locationIterator = + writeOnly ? databaseAccount.getWritableLocations().iterator() : databaseAccount.getReadableLocations().iterator(); + + List serviceOrderedReadableRegions = new ArrayList<>(); + List serviceOrderedWriteableRegions = new ArrayList<>(); + Map regionMap = new ConcurrentHashMap<>(); + + while (locationIterator.hasNext()) { + DatabaseAccountLocation accountLocation = locationIterator.next(); + regionMap.put(accountLocation.getName(), accountLocation.getEndpoint()); + + if (writeOnly) { + serviceOrderedWriteableRegions.add(accountLocation.getName()); + } else { + serviceOrderedReadableRegions.add(accountLocation.getName()); + } + } + + return new AccountLevelLocationContext( + serviceOrderedReadableRegions, + serviceOrderedWriteableRegions, + regionMap); + } + + private static class AccountLevelLocationContext { + private final List serviceOrderedReadableRegions; + private final List serviceOrderedWriteableRegions; + private final Map regionNameToEndpoint; + + public AccountLevelLocationContext( + List serviceOrderedReadableRegions, + List serviceOrderedWriteableRegions, + Map regionNameToEndpoint) { + + this.serviceOrderedReadableRegions = serviceOrderedReadableRegions; + this.serviceOrderedWriteableRegions = serviceOrderedWriteableRegions; + this.regionNameToEndpoint = regionNameToEndpoint; + } + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java index 94575922f652..9292ec16e4dc 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java @@ -86,7 +86,7 @@ public void requestRateTooLarge( Mockito .doReturn(new RegionalRoutingContext(new URI("http://localhost"))) - .when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); + .when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class), Mockito.eq(false)); Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy( mockDiagnosticsClientContext(), @@ -156,7 +156,7 @@ public static Object[][] tcpNetworkFailureOnWriteArgProvider() { public void networkFailureOnRead() throws Exception { ThrottlingRetryOptions throttlingRetryOptions = new ThrottlingRetryOptions(); GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class); - Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); + Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class), Mockito.eq(false)); GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker globalPartitionEndpointManagerForPerPartitionCircuitBreaker = Mockito.mock(GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.class); @@ -213,7 +213,7 @@ public void shouldRetryOnGatewayTimeout( GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover = Mockito.mock(GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.class); - Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); + Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class), Mockito.eq(false)); Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(true)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy( @@ -260,7 +260,7 @@ public void tcpNetworkFailureOnRead() throws Exception { GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover = Mockito.mock(GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.class); - Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); + Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class), Mockito.eq(false)); Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount(); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy( @@ -318,7 +318,7 @@ public void networkFailureOnWrite() throws Exception { GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover = Mockito.mock(GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.class); - Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); + Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class), Mockito.eq(false)); Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy( mockDiagnosticsClientContext(), @@ -363,7 +363,7 @@ public void tcpNetworkFailureOnWrite( GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover = Mockito.mock(GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.class); - Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); + Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class), Mockito.eq(false)); Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount(); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy( @@ -433,7 +433,7 @@ public void networkFailureOnUpsert() throws Exception { GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover = Mockito.mock(GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.class); - Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); + Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class), Mockito.eq(false)); Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy( mockDiagnosticsClientContext(), @@ -476,7 +476,7 @@ public void tcpNetworkFailureOnUpsert() throws Exception { GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover = Mockito.mock(GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.class); - Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); + Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class), Mockito.eq(false)); Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount(); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy( @@ -522,7 +522,7 @@ public void networkFailureOnDelete() throws Exception { GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover = Mockito.mock(GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.class); - Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); + Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class), Mockito.eq(false)); Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy( mockDiagnosticsClientContext(), @@ -566,7 +566,7 @@ public void tcpNetworkFailureOnDelete() throws Exception { GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover = Mockito.mock(GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.class); - Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); + Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class), Mockito.eq(false)); Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false)); Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount(); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy( @@ -651,7 +651,7 @@ public void returnWithInternalServerErrorOnPpcbFailure(CosmosException cosmosExc Mockito.when(globalPartitionEndpointManagerForPerPartitionCircuitBreaker.isPerPartitionLevelCircuitBreakingApplicable(Mockito.any())).thenReturn(true); Mockito.doThrow(cosmosEx).when(globalPartitionEndpointManagerForPerPartitionCircuitBreaker).handleLocationExceptionForPartitionKeyRange(Mockito.any(), Mockito.any(), Mockito.anyBoolean()); - Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); + Mockito.doReturn(new RegionalRoutingContext(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class), Mockito.eq(false)); Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(true)); ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy( diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/DocumentProducerTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/DocumentProducerTest.java index 754a4aa21c9e..e6df02c59bd2 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/DocumentProducerTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/DocumentProducerTest.java @@ -126,7 +126,7 @@ private IRetryPolicyFactory mockDocumentClientIRetryPolicyFactory() { GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover = Mockito.mock(GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.class); - Mockito.doReturn(regionalRoutingContext).when(globalEndpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class)); + Mockito.doReturn(regionalRoutingContext).when(globalEndpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class), Mockito.eq(false)); doReturn(false).when(globalEndpointManager).isClosed(); return new RetryPolicy(mockDiagnosticsClientContext(), globalEndpointManager, ConnectionPolicy.getDefaultPolicy(), globalPartitionEndpointManagerForPerPartitionCircuitBreaker, globalPartitionEndpointManagerForPerPartitionAutomaticFailover); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/ApplicableRegionEvaluatorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/ApplicableRegionEvaluatorTest.java index 3312dcba3eda..718b75f44744 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/ApplicableRegionEvaluatorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/routing/ApplicableRegionEvaluatorTest.java @@ -24,7 +24,9 @@ import com.azure.cosmos.implementation.ServiceUnavailableException; import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; import com.azure.cosmos.implementation.perPartitionAutomaticFailover.GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover; +import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionAutomaticFailoverInfoHolder; import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker; +import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder; import org.assertj.core.api.Assertions; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; @@ -299,8 +301,10 @@ private static RxDocumentServiceRequest createRequest( true, collectionResourceId, new SerializationDiagnosticsContext()), - new AvailabilityStrategyContext(true, false) - )); + new AvailabilityStrategyContext(true, false), + new AtomicBoolean(false), + new PerPartitionCircuitBreakerInfoHolder(), + new PerPartitionAutomaticFailoverInfoHolder())); } else { request.requestContext.setCrossRegionAvailabilityContext( new CrossRegionAvailabilityContextForRxDocumentServiceRequest( @@ -310,8 +314,10 @@ private static RxDocumentServiceRequest createRequest( false, collectionResourceId, new SerializationDiagnosticsContext()), - new AvailabilityStrategyContext(false, false) - )); + new AvailabilityStrategyContext(false, false), + new AtomicBoolean(false), + new PerPartitionCircuitBreakerInfoHolder(), + new PerPartitionAutomaticFailoverInfoHolder())); } return request; diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java index baf6b455c0ab..8352435d424c 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/QueryValidationTests.java @@ -356,7 +356,7 @@ public void queryPlanCacheSinglePartitionParameterizedQueriesCorrectness() { assertThat(contextClient.getQueryPlanCache().containsKey(sqlQuerySpec.getQueryText())).isFalse(); // group by should not be cached - sqlQuerySpec.setQueryText("select max(c.id) from c order by c.name group by c.name"); + sqlQuerySpec.setQueryText("select max(c.id) from c group by c.name"); values1 = queryAndGetResults(sqlQuerySpec, options, TestObject.class); assertThat(contextClient.getQueryPlanCache().containsKey(sqlQuerySpec.getQueryText())).isFalse(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java index dc2adf1dce7b..eb5d3110f8e1 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Supplier; @@ -51,6 +52,7 @@ class ChangeFeedQueryImpl { private final OperationContextAndListenerTuple operationContextAndListener; private final CosmosItemSerializer itemSerializer; private final DiagnosticsClientContext diagnosticsClientContext; + private final CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContext; public ChangeFeedQueryImpl( RxDocumentClientImpl client, @@ -59,7 +61,8 @@ public ChangeFeedQueryImpl( String collectionLink, String collectionRid, CosmosChangeFeedRequestOptions requestOptions, - DiagnosticsClientContext diagnosticsClientContext) { + DiagnosticsClientContext diagnosticsClientContext, CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContext) { + this.crossRegionAvailabilityContext = crossRegionAvailabilityContext; checkNotNull(client, "Argument 'client' must not be null."); checkNotNull(resourceType, "Argument 'resourceType' must not be null."); @@ -181,11 +184,7 @@ private RxDocumentServiceRequest createDocumentServiceRequest() { if (request.requestContext != null) { request.requestContext.setExcludeRegions(options.getExcludedRegions()); request.requestContext.setKeywordIdentifiers(options.getKeywordIdentifiers()); - request.requestContext.setCrossRegionAvailabilityContext( - new CrossRegionAvailabilityContextForRxDocumentServiceRequest( - new FeedOperationContextForCircuitBreaker(new ConcurrentHashMap<>(), false, collectionLink), - null, - new AvailabilityStrategyContext(false, false))); + request.requestContext.setCrossRegionAvailabilityContext(this.crossRegionAvailabilityContext); } return request; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java index ca9731f86e50..ee42eda8210e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java @@ -97,6 +97,7 @@ public Mono shouldRetry(Exception e) { } this.retryContext = null; + this.setPerPartitionAutomaticFailoverOverrideForReads(this.request, false); // Received 403.3 on write region, initiate the endpoint re-discovery CosmosException clientException = Utils.as(e, CosmosException.class); if (clientException != null && clientException.getDiagnostics() != null) { @@ -107,11 +108,12 @@ public Mono shouldRetry(Exception e) { Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN)) { logger.info("Endpoint not writable. Will refresh cache and retry ", e); - if (this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.tryMarkEndpointAsUnavailableForPartitionKeyRange(this.request, false)) { + this.setPerPartitionAutomaticFailoverOverrideForReads(this.request, true); + if (this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.tryMarkEndpointAsUnavailableForPartitionKeyRange(this.request, false, true)) { return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO)); } - return this.shouldRetryOnEndpointFailureAsync(false, true, false); + return this.shouldRetryOnEndpointFailureAsync(this.isReadRequest, true, false); } // Regional endpoint is not available yet for reads (e.g. add/ online of region is in progress) @@ -261,11 +263,21 @@ private ShouldRetryResult shouldRetryOnSessionNotAvailable(RxDocumentServiceRequ checkNotNull(request, "Argument 'request' cannot be null!"); checkNotNull(request.requestContext, "Argument 'request' cannot be null!"); - CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequest - = request.requestContext.getCrossRegionAvailabilityContext(); - checkNotNull(request.requestContext, "Argument 'crossRegionAvailabilityContextForRequest' cannot be null!"); - crossRegionAvailabilityContextForRequest.shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable(true); + this.setPerPartitionAutomaticFailoverOverrideForReads(request, true); + } + + if (!this.globalEndpointManager.canUseMultipleWriteLocations(request) && this.isReadRequest) { + + if (request.requestContext != null) { + + CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContext + = request.requestContext.getCrossRegionAvailabilityContext(); + + if (crossRegionAvailabilityContext != null) { + crossRegionAvailabilityContext.setShouldAddHubRegionProcessingOnlyHeader(true); + } + } } return ShouldRetryResult.retryAfter(Duration.ZERO); @@ -516,7 +528,22 @@ public void onBeforeSendRequest(RxDocumentServiceRequest request) { // Resolve the endpoint for the request and pin the resolution to the resolved endpoint // This enables marking the endpoint unavailability on endpoint failover/unreachability - this.regionalRoutingContext = this.globalEndpointManager.resolveServiceEndpoint(request); + + boolean isInHubRegionDiscoveryMode = false; + + if (request.requestContext != null) { + CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContext + = request.requestContext.getCrossRegionAvailabilityContext(); + + if (crossRegionAvailabilityContext != null) { + if (crossRegionAvailabilityContext.shouldAddHubRegionProcessingOnlyHeader()) { + isInHubRegionDiscoveryMode = true; + request.getHeaders().put(HttpConstants.HttpHeaders.HUB_REGION_PROCESSING_ONLY, "true"); + } + } + } + + this.regionalRoutingContext = this.globalEndpointManager.resolveServiceEndpoint(request, isInHubRegionDiscoveryMode); if (request.requestContext != null) { request.requestContext.routeToLocation(this.regionalRoutingContext); @@ -526,6 +553,28 @@ public void onBeforeSendRequest(RxDocumentServiceRequest request) { this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.tryAddPartitionLevelLocationOverride(request); } + private void setPerPartitionAutomaticFailoverOverrideForReads(RxDocumentServiceRequest request, boolean followPerPartitionHub) { + + if (request == null) { + return; + } + + if (request.requestContext == null) { + return; + } + + if (!request.isReadOnlyRequest() || request.isMetadataRequest()) { + return; + } + + CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContext = request.requestContext.getCrossRegionAvailabilityContext(); + request.requestContext.resolvedPartitionKeyRangeForPerPartitionAutomaticFailover = request.requestContext.resolvedPartitionKeyRange; + + if (crossRegionAvailabilityContext != null) { + crossRegionAvailabilityContext.setShouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable(followPerPartitionHub); + } + } + @Override public com.azure.cosmos.implementation.RetryContext getRetryContext() { return BridgeInternal.getRetryContext(this.getCosmosDiagnostics()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java index fbfaf776edc8..cdd97adcf18c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientSideRequestStatistics.java @@ -3,7 +3,7 @@ package com.azure.cosmos.implementation; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; -import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionFailoverInfoHolder; +import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionAutomaticFailoverInfoHolder; import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder; import com.azure.cosmos.implementation.cpu.CpuMemoryMonitor; import com.azure.cosmos.implementation.directconnectivity.StoreResponseDiagnostics; @@ -172,7 +172,18 @@ public void recordResponse(RxDocumentServiceRequest request, StoreResultDiagnost this.approximateInsertionCountInBloomFilter = request.requestContext.getApproximateBloomFilterInsertionCount(); storeResponseStatistics.sessionTokenEvaluationResults = request.requestContext.getSessionTokenEvaluationResults(); storeResponseStatistics.perPartitionCircuitBreakerInfoHolder = request.requestContext.getPerPartitionCircuitBreakerInfoHolder(); - storeResponseStatistics.perPartitionFailoverInfoHolder = request.requestContext.getPerPartitionFailoverContextHolder(); + storeResponseStatistics.perPartitionAutomaticFailoverInfoHolder = request.requestContext.getPerPartitionFailoverContextHolder(); + + if (request.requestContext.getCrossRegionAvailabilityContext() != null) { + CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequest + = request.requestContext.getCrossRegionAvailabilityContext(); + + if (crossRegionAvailabilityContextForRequest.shouldAddHubRegionProcessingOnlyHeader()) { + storeResponseStatistics.isHubRegionProcessingOnly = "true"; + } else { + storeResponseStatistics.isHubRegionProcessingOnly = "false"; + } + } if (request.requestContext.getEndToEndOperationLatencyPolicyConfig() != null) { storeResponseStatistics.e2ePolicyCfg = @@ -255,7 +266,17 @@ public void recordGatewayResponse( if (rxDocumentServiceRequest.requestContext != null) { gatewayStatistics.sessionTokenEvaluationResults = rxDocumentServiceRequest.requestContext.getSessionTokenEvaluationResults(); gatewayStatistics.perPartitionCircuitBreakerInfoHolder = rxDocumentServiceRequest.requestContext.getPerPartitionCircuitBreakerInfoHolder(); - gatewayStatistics.perPartitionFailoverInfoHolder = rxDocumentServiceRequest.requestContext.getPerPartitionFailoverContextHolder(); + gatewayStatistics.perPartitionAutomaticFailoverInfoHolder = rxDocumentServiceRequest.requestContext.getPerPartitionFailoverContextHolder(); + gatewayStatistics.isHubRegionProcessingOnly = "false"; + + CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequest + = rxDocumentServiceRequest.requestContext.getCrossRegionAvailabilityContext(); + + if (crossRegionAvailabilityContextForRequest != null) { + if (crossRegionAvailabilityContextForRequest.shouldAddHubRegionProcessingOnlyHeader()) { + gatewayStatistics.isHubRegionProcessingOnly = "true"; + } + } } } gatewayStatistics.statusCode = storeResponseDiagnostics.getStatusCode(); @@ -700,8 +721,11 @@ public static class StoreResponseStatistics { @JsonSerialize(using = PerPartitionCircuitBreakerInfoHolder.PerPartitionCircuitBreakerInfoHolderSerializer.class) private PerPartitionCircuitBreakerInfoHolder perPartitionCircuitBreakerInfoHolder; - @JsonSerialize(using = PerPartitionFailoverInfoHolder.PerPartitionFailoverInfoHolderSerializer.class) - private PerPartitionFailoverInfoHolder perPartitionFailoverInfoHolder; + @JsonSerialize(using = PerPartitionAutomaticFailoverInfoHolder.PerPartitionFailoverInfoHolderSerializer.class) + private PerPartitionAutomaticFailoverInfoHolder perPartitionAutomaticFailoverInfoHolder; + + @JsonSerialize + private String isHubRegionProcessingOnly; public String getExcludedRegions() { return this.excludedRegions; @@ -743,8 +767,16 @@ public PerPartitionCircuitBreakerInfoHolder getPerPartitionCircuitBreakerInfoHol return perPartitionCircuitBreakerInfoHolder; } - public PerPartitionFailoverInfoHolder getPerPartitionFailoverInfoHolder() { - return perPartitionFailoverInfoHolder; + public PerPartitionAutomaticFailoverInfoHolder getPerPartitionFailoverInfoHolder() { + return perPartitionAutomaticFailoverInfoHolder; + } + + public String getIsHubRegionProcessingOnly() { + return isHubRegionProcessingOnly; + } + + public void setIsHubRegionProcessingOnly(String isHubRegionProcessingOnly) { + this.isHubRegionProcessingOnly = isHubRegionProcessingOnly; } @JsonIgnore @@ -910,10 +942,11 @@ public static class GatewayStatistics { private List faultInjectionEvaluationResults; private Set sessionTokenEvaluationResults; private PerPartitionCircuitBreakerInfoHolder perPartitionCircuitBreakerInfoHolder; - private PerPartitionFailoverInfoHolder perPartitionFailoverInfoHolder; + private PerPartitionAutomaticFailoverInfoHolder perPartitionAutomaticFailoverInfoHolder; private String endpoint; private String requestThroughputControlGroupName; private String requestThroughputControlGroupConfig; + private String isHubRegionProcessingOnly; public String getSessionToken() { return sessionToken; @@ -975,8 +1008,8 @@ public PerPartitionCircuitBreakerInfoHolder getPerPartitionCircuitBreakerInfoHol return perPartitionCircuitBreakerInfoHolder; } - public PerPartitionFailoverInfoHolder getPerPartitionFailoverInfoHolder() { - return perPartitionFailoverInfoHolder; + public PerPartitionAutomaticFailoverInfoHolder getPerPartitionFailoverInfoHolder() { + return perPartitionAutomaticFailoverInfoHolder; } public String getEndpoint() { @@ -1016,6 +1049,7 @@ public void serialize(GatewayStatistics gatewayStatistics, this.writeNonNullStringField(jsonGenerator, "exceptionResponseHeaders", gatewayStatistics.getExceptionResponseHeaders()); this.writeNonNullStringField(jsonGenerator, "faultInjectionRuleId", gatewayStatistics.getFaultInjectionRuleId()); this.writeNonNullStringField(jsonGenerator, "endpoint", gatewayStatistics.getEndpoint()); + this.writeNonNullStringField(jsonGenerator, "isHubRegionProcessingOnly", gatewayStatistics.isHubRegionProcessingOnly); if (StringUtils.isEmpty(gatewayStatistics.getFaultInjectionRuleId())) { this.writeNonEmptyStringArrayField( @@ -1026,7 +1060,7 @@ public void serialize(GatewayStatistics gatewayStatistics, this.writeNonEmptyStringSetField(jsonGenerator, "sessionTokenEvaluationResults", gatewayStatistics.getSessionTokenEvaluationResults()); this.writeNonNullObjectField(jsonGenerator, "perPartitionCircuitBreakerInfoHolder", gatewayStatistics.getPerPartitionCircuitBreakerInfoHolder()); - this.writeNonNullObjectField(jsonGenerator, "perPartitionFailoverInfoHolder", gatewayStatistics.getPerPartitionFailoverInfoHolder()); + this.writeNonNullObjectField(jsonGenerator, "perPartitionAutomaticFailoverInfoHolder", gatewayStatistics.getPerPartitionFailoverInfoHolder()); this.writeNonNullStringField(jsonGenerator, "requestTCG", gatewayStatistics.getRequestThroughputControlGroupName()); this.writeNonNullStringField(jsonGenerator, "requestTCGConfig", gatewayStatistics.getRequestThroughputControlGroupConfig()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CrossRegionAvailabilityContextForRxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CrossRegionAvailabilityContextForRxDocumentServiceRequest.java index 8053957e3051..b5faa8cb5ded 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CrossRegionAvailabilityContextForRxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CrossRegionAvailabilityContextForRxDocumentServiceRequest.java @@ -3,6 +3,12 @@ package com.azure.cosmos.implementation; +import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PartitionLevelAutomaticFailoverInfo; +import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionAutomaticFailoverInfoHolder; +import com.azure.cosmos.implementation.perPartitionCircuitBreaker.LocationSpecificHealthContext; +import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder; + +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; public class CrossRegionAvailabilityContextForRxDocumentServiceRequest { @@ -11,20 +17,32 @@ public class CrossRegionAvailabilityContextForRxDocumentServiceRequest { private final AtomicBoolean hasPerPartitionAutomaticFailoverBeenAppliedForReads = new AtomicBoolean(false); + private final AtomicBoolean shouldAddHubRegionProcessingOnlyHeader; + private final FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreaker; private final PointOperationContextForCircuitBreaker pointOperationContextForCircuitBreaker; private final AvailabilityStrategyContext availabilityStrategyContext; + private final PerPartitionCircuitBreakerInfoHolder perPartitionCircuitBreakerInfoHolder; + + private final PerPartitionAutomaticFailoverInfoHolder perPartitionAutomaticFailoverInfoHolder; + public CrossRegionAvailabilityContextForRxDocumentServiceRequest( FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreaker, PointOperationContextForCircuitBreaker pointOperationContextForCircuitBreaker, - AvailabilityStrategyContext availabilityStrategyContext) { + AvailabilityStrategyContext availabilityStrategyContext, + AtomicBoolean shouldAddHubRegionProcessingOnlyHeader, + PerPartitionCircuitBreakerInfoHolder perPartitionCircuitBreakerInfoHolder, + PerPartitionAutomaticFailoverInfoHolder perPartitionAutomaticFailoverInfoHolder) { this.feedOperationContextForCircuitBreaker = feedOperationContextForCircuitBreaker; this.pointOperationContextForCircuitBreaker = pointOperationContextForCircuitBreaker; this.availabilityStrategyContext = availabilityStrategyContext; + this.shouldAddHubRegionProcessingOnlyHeader = shouldAddHubRegionProcessingOnlyHeader; + this.perPartitionCircuitBreakerInfoHolder = perPartitionCircuitBreakerInfoHolder; + this.perPartitionAutomaticFailoverInfoHolder = perPartitionAutomaticFailoverInfoHolder; } public FeedOperationContextForCircuitBreaker getFeedOperationContextForCircuitBreaker() { @@ -43,10 +61,18 @@ public boolean shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicabl return shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable.get(); } - public void shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable(boolean shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable) { + public void setShouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable(boolean shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable) { this.shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable.set(shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable); } + public void setShouldAddHubRegionProcessingOnlyHeader(boolean shouldAddHubRegionProcessingOnlyHeader) { + this.shouldAddHubRegionProcessingOnlyHeader.set(shouldAddHubRegionProcessingOnlyHeader); + } + + public boolean shouldAddHubRegionProcessingOnlyHeader() { + return this.shouldAddHubRegionProcessingOnlyHeader.get(); + } + public void setPerPartitionAutomaticFailoverAppliedStatusForReads(boolean perPartitionAutomaticFailoverAppliedStatus) { this.hasPerPartitionAutomaticFailoverBeenAppliedForReads.set(perPartitionAutomaticFailoverAppliedStatus); } @@ -54,4 +80,20 @@ public void setPerPartitionAutomaticFailoverAppliedStatusForReads(boolean perPar public boolean hasPerPartitionAutomaticFailoverBeenAppliedForReads() { return this.hasPerPartitionAutomaticFailoverBeenAppliedForReads.get(); } + + public void setPerPartitionCircuitBreakerInfo(Map locationToLocationSpecificHealthContext) { + this.perPartitionCircuitBreakerInfoHolder.setPerPartitionCircuitBreakerInfoHolder(locationToLocationSpecificHealthContext); + } + + public PerPartitionCircuitBreakerInfoHolder getPerPartitionCircuitBreakerInfoHolder() { + return this.perPartitionCircuitBreakerInfoHolder; + } + + public void setPerPartitionFailoverInfo(PartitionLevelAutomaticFailoverInfo partitionLevelAutomaticFailoverInfo) { + this.perPartitionAutomaticFailoverInfoHolder.setPartitionLevelFailoverInfo(partitionLevelAutomaticFailoverInfo); + } + + public PerPartitionAutomaticFailoverInfoHolder getPerPartitionAutomaticFailoverInfoHolder() { + return this.perPartitionAutomaticFailoverInfoHolder; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java index 18da5250c458..a4a9ea563389 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DocumentServiceRequestContext.java @@ -8,8 +8,8 @@ import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig; import com.azure.cosmos.CosmosException; import com.azure.cosmos.ReadConsistencyStrategy; -import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PartitionLevelFailoverInfo; -import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionFailoverInfoHolder; +import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PartitionLevelAutomaticFailoverInfo; +import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionAutomaticFailoverInfoHolder; import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder; import com.azure.cosmos.implementation.perPartitionCircuitBreaker.LocationSpecificHealthContext; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; @@ -76,9 +76,6 @@ public class DocumentServiceRequestContext implements Cloneable { private volatile Supplier clientRetryPolicySupplier; - private volatile PerPartitionCircuitBreakerInfoHolder perPartitionCircuitBreakerInfoHolder; - private volatile PerPartitionFailoverInfoHolder perPartitionFailoverInfoHolder; - public DocumentServiceRequestContext() {} /** @@ -240,30 +237,32 @@ public void setClientRetryPolicySupplier(Supplier cli } public PerPartitionCircuitBreakerInfoHolder getPerPartitionCircuitBreakerInfoHolder() { - return this.perPartitionCircuitBreakerInfoHolder; + + if (this.crossRegionAvailabilityContextForRequest == null) { + return new PerPartitionCircuitBreakerInfoHolder(); + } + + return this.crossRegionAvailabilityContextForRequest.getPerPartitionCircuitBreakerInfoHolder(); } public void setPerPartitionCircuitBreakerInfoHolder(Map locationToLocationSpecificHealthContext) { - - if (this.perPartitionCircuitBreakerInfoHolder == null) { - this.perPartitionCircuitBreakerInfoHolder = new PerPartitionCircuitBreakerInfoHolder(); - this.perPartitionCircuitBreakerInfoHolder.setPerPartitionCircuitBreakerInfoHolder(locationToLocationSpecificHealthContext); - } else { - this.perPartitionCircuitBreakerInfoHolder.setPerPartitionCircuitBreakerInfoHolder(locationToLocationSpecificHealthContext); + if (this.crossRegionAvailabilityContextForRequest != null) { + this.crossRegionAvailabilityContextForRequest.setPerPartitionCircuitBreakerInfo(locationToLocationSpecificHealthContext); } } - public PerPartitionFailoverInfoHolder getPerPartitionFailoverContextHolder() { - return this.perPartitionFailoverInfoHolder; - } + public PerPartitionAutomaticFailoverInfoHolder getPerPartitionFailoverContextHolder() { + + if (this.crossRegionAvailabilityContextForRequest == null) { + return new PerPartitionAutomaticFailoverInfoHolder(); + } - public void setPerPartitionAutomaticFailoverInfoHolder(PartitionLevelFailoverInfo partitionLevelFailoverInfo) { + return this.crossRegionAvailabilityContextForRequest.getPerPartitionAutomaticFailoverInfoHolder(); + } - if (this.perPartitionFailoverInfoHolder == null) { - this.perPartitionFailoverInfoHolder = new PerPartitionFailoverInfoHolder(); - this.perPartitionFailoverInfoHolder.setPartitionLevelFailoverInfo(partitionLevelFailoverInfo); - } else { - this.perPartitionFailoverInfoHolder.setPartitionLevelFailoverInfo(partitionLevelFailoverInfo); + public void setPerPartitionAutomaticFailoverInfoHolder(PartitionLevelAutomaticFailoverInfo partitionLevelAutomaticFailoverInfo) { + if (this.crossRegionAvailabilityContextForRequest != null) { + this.crossRegionAvailabilityContextForRequest.setPerPartitionFailoverInfo(partitionLevelAutomaticFailoverInfo); } } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java index ec0ceb536615..cdbbaa45bfcc 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GlobalEndpointManager.java @@ -154,7 +154,11 @@ public static Mono getDatabaseAccountFromAnyLocationsAsync( } public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest request) { - RegionalRoutingContext serviceEndpoints = this.locationCache.resolveServiceEndpoint(request); + return this.resolveServiceEndpoint(request, false); + } + + public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest request, boolean isInHubRegionDiscoveryMode) { + RegionalRoutingContext serviceEndpoints = this.locationCache.resolveServiceEndpoint(request, isInHubRegionDiscoveryMode); if (request.faultInjectionRequestContext != null) { // TODO: integrate thin client into fault injection request.faultInjectionRequestContext.setRegionalRoutingContextToRoute(serviceEndpoints); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java index 174b155485f8..b70b2812ee30 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java @@ -294,6 +294,9 @@ public static class HttpHeaders { // Throughput bucket header public static final String THROUGHPUT_BUCKET = "x-ms-cosmos-throughput-bucket"; + + // Region affinity headers + public static final String HUB_REGION_PROCESSING_ONLY = "x-ms-cosmos-hub-region-processing-only"; } public static class A_IMHeaderValues { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 9b035b28dff6..d1433025eb27 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -50,8 +50,10 @@ import com.azure.cosmos.implementation.interceptor.ITransportClientInterceptor; import com.azure.cosmos.implementation.patch.PatchUtil; import com.azure.cosmos.implementation.perPartitionAutomaticFailover.GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover; +import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionAutomaticFailoverInfoHolder; import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker; import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PartitionLevelCircuitBreakerConfig; +import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder; import com.azure.cosmos.implementation.query.DocumentQueryExecutionContextFactory; import com.azure.cosmos.implementation.query.IDocumentQueryClient; import com.azure.cosmos.implementation.query.IDocumentQueryExecutionContext; @@ -2176,6 +2178,10 @@ private Mono getBatchDocumentRequest(DocumentClientRet request.requestContext.setKeywordIdentifiers(options.getKeywordIdentifiers()); } + AtomicBoolean shouldAddHubRegionProcessingOnlyHeader = new AtomicBoolean(false); + PerPartitionCircuitBreakerInfoHolder perPartitionCircuitBreakerInfoHolder = new PerPartitionCircuitBreakerInfoHolder(); + PerPartitionAutomaticFailoverInfoHolder perPartitionAutomaticFailoverInfoHolder = new PerPartitionAutomaticFailoverInfoHolder(); + + if (request.requestContext != null) { + request.requestContext.setCrossRegionAvailabilityContext( + new CrossRegionAvailabilityContextForRxDocumentServiceRequest( + null, + new PointOperationContextForCircuitBreaker( + new AtomicBoolean(false), + false, + documentCollectionLink, + serializationDiagnosticsContext), + null, + shouldAddHubRegionProcessingOnlyHeader, + perPartitionCircuitBreakerInfoHolder, + perPartitionAutomaticFailoverInfoHolder)); + } + // note: calling onBeforeSendRequest is a cheap operation which injects a CosmosDiagnostics // instance into 'request' amongst other things - this way metadataDiagnosticsContext is not // null and can be used for metadata-related telemetry (partition key range, container and server address lookups) @@ -2245,17 +2270,6 @@ private Mono getBatchDocumentRequest(DocumentClientRet MetadataDiagnosticsContext metadataDiagnosticsContext = BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics); - request.requestContext.setCrossRegionAvailabilityContext( - - new CrossRegionAvailabilityContextForRxDocumentServiceRequest( - null, - new PointOperationContextForCircuitBreaker( - new AtomicBoolean(false), - false, - documentCollectionLink, - serializationDiagnosticsContext), - null)); - return this.collectionCache.resolveCollectionAsync(metadataDiagnosticsContext, request) .flatMap(documentCollectionValueHolder -> { @@ -2637,7 +2651,7 @@ private Mono> createDocumentCore( crossRegionAvailabilityContextForRxDocumentServiceRequest), finalRetryPolicyInstance), scopedDiagnosticsFactory - ), requestReference, endToEndPolicyConfig); + ), requestReference); } private Mono> createDocumentInternal( @@ -2682,8 +2696,6 @@ private Mono> createDocumentInternal( options.setPartitionKeyDefinition(documentCollectionValueHolder.v.getPartitionKey()); - request.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest); - PartitionKeyRange preResolvedPartitionKeyRangeIfAny = setPartitionKeyRangeForPointOperationRequestForPerPartitionAutomaticFailover( request, options, @@ -2699,6 +2711,8 @@ private Mono> createDocumentInternal( requestRetryPolicy, preResolvedPartitionKeyRangeIfAny); + applyEndToEndLatencyPolicyCfgToRequestContext(request, options.getCosmosEndToEndLatencyPolicyConfig()); + documentServiceRequestReference.set(request); // needs to be after onBeforeSendRequest since CosmosDiagnostics instance needs to be wired @@ -2747,10 +2761,7 @@ private static Mono getPointOperationResponseMonoWithE2ETimeout( private Mono handleCircuitBreakingFeedbackForPointOperation( Mono response, - AtomicReference requestReference, - CosmosEndToEndOperationLatencyPolicyConfig effectiveEndToEndPolicyConfig) { - - applyEndToEndLatencyPolicyCfgToRequestContext(requestReference.get(), effectiveEndToEndPolicyConfig); + AtomicReference requestReference) { return response .doOnSuccess(ignore -> { @@ -3022,7 +3033,7 @@ private Mono> upsertDocumentCore( requestReference, crossRegionAvailabilityContextForRequest), finalRetryPolicyInstance), - scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig); + scopedDiagnosticsFactory), requestReference); } private Mono> upsertDocumentInternal( @@ -3066,7 +3077,6 @@ private Mono> upsertDocumentInternal( } options.setPartitionKeyDefinition(documentCollectionValueHolder.v.getPartitionKey()); - request.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest); PartitionKeyRange preResolvedPartitionKeyRangeIfAny = setPartitionKeyRangeForPointOperationRequestForPerPartitionAutomaticFailover( request, @@ -3083,6 +3093,8 @@ private Mono> upsertDocumentInternal( retryPolicyInstance, preResolvedPartitionKeyRangeIfAny); + applyEndToEndLatencyPolicyCfgToRequestContext(request, options.getCosmosEndToEndLatencyPolicyConfig()); + requestReference.set(request); // needs to be after onBeforeSendRequest since CosmosDiagnostics instance needs to be wired @@ -3163,7 +3175,7 @@ private Mono> replaceDocumentCore( requestReference, crossRegionAvailabilityContextForRequest), finalRequestRetryPolicy), - scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig); + scopedDiagnosticsFactory), requestReference); } private Mono> replaceDocumentInternal( @@ -3245,7 +3257,7 @@ private Mono> replaceDocumentCore( clientContextOverride, requestReference, crossRegionAvailabilityContextForRequest), - requestRetryPolicy), requestReference, cosmosEndToEndOperationLatencyPolicyConfig); + requestRetryPolicy), requestReference); } private Mono> replaceDocumentInternal( @@ -3336,6 +3348,10 @@ private Mono> replaceDocumentInternal( serializationDiagnosticsContext.addSerializationDiagnostics(serializationDiagnostics); } + if (request.requestContext != null) { + request.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest); + } + if (retryPolicyInstance != null) { retryPolicyInstance.onBeforeSendRequest(request); } @@ -3365,8 +3381,6 @@ private Mono> replaceDocumentInternal( options.setPartitionKeyDefinition(documentCollectionValueHolder.v.getPartitionKey()); - req.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest); - PartitionKeyRange preResolvedPartitionKeyRangeIfAny = setPartitionKeyRangeForPointOperationRequestForPerPartitionAutomaticFailover( req, options, @@ -3382,6 +3396,8 @@ private Mono> replaceDocumentInternal( retryPolicyInstance, preResolvedPartitionKeyRangeIfAny); + applyEndToEndLatencyPolicyCfgToRequestContext(request, options.getCosmosEndToEndLatencyPolicyConfig()); + requestReference.set(req); // needs to be after onBeforeSendRequest since CosmosDiagnostics instance needs to be wired @@ -3498,7 +3514,7 @@ private Mono> patchDocumentCore( requestReference, crossRegionAvailabilityContextForRequest), documentClientRetryPolicy), - scopedDiagnosticsFactory), requestReference, cosmosEndToEndOperationLatencyPolicyConfig); + scopedDiagnosticsFactory), requestReference); } private Mono> patchDocumentInternal( @@ -3553,6 +3569,10 @@ private Mono> patchDocumentInternal( requestContext.setKeywordIdentifiers(options.getKeywordIdentifiers()); } + if (request.requestContext != null) { + request.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest); + } + if (retryPolicyInstance != null) { retryPolicyInstance.onBeforeSendRequest(request); } @@ -3597,8 +3617,6 @@ private Mono> patchDocumentInternal( options.setPartitionKeyDefinition(documentCollectionValueHolder.v.getPartitionKey()); - req.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest); - PartitionKeyRange preResolvedPartitionKeyRangeIfAny = setPartitionKeyRangeForPointOperationRequestForPerPartitionAutomaticFailover( req, options, @@ -3614,6 +3632,8 @@ private Mono> patchDocumentInternal( retryPolicyInstance, preResolvedPartitionKeyRangeIfAny); + applyEndToEndLatencyPolicyCfgToRequestContext(request, options.getCosmosEndToEndLatencyPolicyConfig()); + requestReference.set(req); // needs to be after onBeforeSendRequest since CosmosDiagnostics instance needs to be wired @@ -3710,7 +3730,7 @@ private Mono> deleteDocumentCore( requestReference, crossRegionAvailabilityContextForRequest), requestRetryPolicy), - scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig); + scopedDiagnosticsFactory), requestReference); } private Mono> deleteDocumentInternal( @@ -3748,6 +3768,10 @@ private Mono> deleteDocumentInternal( requestContext.setKeywordIdentifiers(options.getKeywordIdentifiers()); } + if (request.requestContext != null) { + request.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest); + } + if (retryPolicyInstance != null) { retryPolicyInstance.onBeforeSendRequest(request); } @@ -3769,8 +3793,6 @@ private Mono> deleteDocumentInternal( options.setPartitionKeyDefinition(documentCollectionValueHolder.v.getPartitionKey()); - req.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest); - PartitionKeyRange preResolvedPartitionKeyRangeIfAny = setPartitionKeyRangeForPointOperationRequestForPerPartitionAutomaticFailover( req, options, @@ -3786,6 +3808,8 @@ private Mono> deleteDocumentInternal( retryPolicyInstance, preResolvedPartitionKeyRangeIfAny); + applyEndToEndLatencyPolicyCfgToRequestContext(request, options.getCosmosEndToEndLatencyPolicyConfig()); + requestReference.set(req); // needs to be after onBeforeSendRequest since CosmosDiagnostics instance needs to be wired @@ -3902,7 +3926,7 @@ private Mono> readDocumentCore( crossRegionAvailabilityContextForRequest), retryPolicyInstance), scopedDiagnosticsFactory - ), requestReference, endToEndPolicyConfig); + ), requestReference); } private Mono> readDocumentInternal( @@ -3933,6 +3957,10 @@ private Mono> readDocumentInternal( requestContext.setExcludeRegions(options.getExcludedRegions()); requestContext.setKeywordIdentifiers(options.getKeywordIdentifiers()); + if (requestContext != null) { + requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest); + } + if (retryPolicyInstance != null) { retryPolicyInstance.onBeforeSendRequest(request); } @@ -3959,7 +3987,7 @@ private Mono> readDocumentInternal( options.setPartitionKeyDefinition(documentCollection.getPartitionKey()); req.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest); - PartitionKeyRange preResolvedPartionKeyRangeIfAny = setPartitionKeyRangeForPointOperationRequestForPerPartitionAutomaticFailover( + PartitionKeyRange preResolvedPartitionKeyRangeIfAny = setPartitionKeyRangeForPointOperationRequestForPerPartitionAutomaticFailover( req, options, collectionRoutingMapValueHolder.v, @@ -3972,7 +4000,9 @@ private Mono> readDocumentInternal( options, collectionRoutingMapValueHolder.v, retryPolicyInstance, - preResolvedPartionKeyRangeIfAny); + preResolvedPartitionKeyRangeIfAny); + + applyEndToEndLatencyPolicyCfgToRequestContext(request, options.getCosmosEndToEndLatencyPolicyConfig()); requestReference.set(req); @@ -4691,6 +4721,18 @@ public Flux> queryDocumentChangeFeed( checkNotNull(collection, "Argument 'collection' must not be null."); + AtomicBoolean shouldAddHubRegionProcessingOnlyHeader = new AtomicBoolean(false); + PerPartitionCircuitBreakerInfoHolder perPartitionCircuitBreakerInfoHolder = new PerPartitionCircuitBreakerInfoHolder(); + PerPartitionAutomaticFailoverInfoHolder perPartitionAutomaticFailoverInfoHolder = new PerPartitionAutomaticFailoverInfoHolder(); + + + CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequest = new CrossRegionAvailabilityContextForRxDocumentServiceRequest( + new FeedOperationContextForCircuitBreaker(new ConcurrentHashMap<>(), false, collection.getAltLink()), + null, + new AvailabilityStrategyContext(false, false), + shouldAddHubRegionProcessingOnlyHeader, + perPartitionCircuitBreakerInfoHolder, + perPartitionAutomaticFailoverInfoHolder); ChangeFeedQueryImpl changeFeedQueryImpl = new ChangeFeedQueryImpl<>( this, @@ -4699,7 +4741,8 @@ public Flux> queryDocumentChangeFeed( collection.getAltLink(), collection.getResourceId(), requestOptions, - diagnosticsClientContext); + diagnosticsClientContext, + crossRegionAvailabilityContextForRequest); return changeFeedQueryImpl.executeAsync(); } @@ -5215,7 +5258,7 @@ public Mono executeBatchRequest(String collectionLink, requestReference), documentClientRetryPolicy), scopedDiagnosticsFactory ), - requestReference, endToEndPolicyConfig); + requestReference); } private Mono executeStoredProcedureInternal(String storedProcedureLink, @@ -5271,6 +5314,7 @@ private Mono executeBatchRequestInternal(String collectionL Mono responseObservable = requestObs.flatMap(request -> { requestReference.set(request); + applyEndToEndLatencyPolicyCfgToRequestContext(request, options.getCosmosEndToEndLatencyPolicyConfig()); return create(request, requestRetryPolicy, getOperationContextAndListenerTuple(options)); }); @@ -7230,6 +7274,10 @@ private Mono> wrapPointOperationWithAvailabilityStrat nonNullRequestOptions); AtomicBoolean isOperationSuccessful = new AtomicBoolean(false); + AtomicBoolean shouldAddHubRegionProcessingOnlyHeader = new AtomicBoolean(false); + PerPartitionCircuitBreakerInfoHolder perPartitionCircuitBreakerInfoHolder = new PerPartitionCircuitBreakerInfoHolder(); + PerPartitionAutomaticFailoverInfoHolder perPartitionAutomaticFailoverInfoHolder = new PerPartitionAutomaticFailoverInfoHolder(); + if (orderedApplicableRegionsForSpeculation.size() < 2) { // There is at most one applicable region - no hedging possible @@ -7247,7 +7295,10 @@ private Mono> wrapPointOperationWithAvailabilityStrat = new CrossRegionAvailabilityContextForRxDocumentServiceRequest( null, pointOperationContextForCircuitBreakerForMainRequest, - availabilityStrategyContextForMainRequest); + availabilityStrategyContextForMainRequest, + shouldAddHubRegionProcessingOnlyHeader, + perPartitionCircuitBreakerInfoHolder, + perPartitionAutomaticFailoverInfoHolder); return callback.apply(nonNullRequestOptions, endToEndPolicyConfig, innerDiagnosticsFactory, crossRegionAvailabilityContextForMainRequest); } @@ -7283,7 +7334,10 @@ private Mono> wrapPointOperationWithAvailabilityStrat = new CrossRegionAvailabilityContextForRxDocumentServiceRequest( null, pointOperationContextForCircuitBreakerForMainRequest, - availabilityStrategyContextForMainRequest); + availabilityStrategyContextForMainRequest, + shouldAddHubRegionProcessingOnlyHeader, + perPartitionCircuitBreakerInfoHolder, + perPartitionAutomaticFailoverInfoHolder); Mono initialMonoAcrossAllRegions = callback.apply(clonedOptions, endToEndPolicyConfig, diagnosticsFactory, crossRegionAvailabilityContextForMainRequest) @@ -7327,7 +7381,10 @@ private Mono> wrapPointOperationWithAvailabilityStrat CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForHedgedRequest = new CrossRegionAvailabilityContextForRxDocumentServiceRequest( null, pointOperationContextForCircuitBreakerForHedgedRequest, - availabilityStrategyContextForHedgedRequest); + availabilityStrategyContextForHedgedRequest, + shouldAddHubRegionProcessingOnlyHeader, + perPartitionCircuitBreakerInfoHolder, + perPartitionAutomaticFailoverInfoHolder); Mono regionalCrossRegionRetryMono = callback.apply(clonedOptions, endToEndPolicyConfig, diagnosticsFactory, crossRegionAvailabilityContextForHedgedRequest) @@ -7505,7 +7562,7 @@ private CosmosEndToEndOperationLatencyPolicyConfig evaluatePpafEnforcedE2eLatenc checkNotNull(networkRequestTimeout, "Argument 'networkRequestTimeout' cannot be null!"); - Duration overallE2eLatencyTimeout = networkRequestTimeout.plus(Utils.ONE_SECOND); + Duration overallE2eLatencyTimeout = Duration.ofSeconds(500); Duration threshold = Utils.min(networkRequestTimeout.dividedBy(2), Utils.ONE_SECOND); Duration thresholdStep = Utils.min(threshold.dividedBy(2), Utils.HALF_SECOND); @@ -7514,7 +7571,7 @@ private CosmosEndToEndOperationLatencyPolicyConfig evaluatePpafEnforcedE2eLatenc .build(); } else { - Duration httpNetworkRequestTimeout = connectionPolicy.getHttpNetworkRequestTimeout(); + Duration httpNetworkRequestTimeout = Duration.ofSeconds(500); checkNotNull(httpNetworkRequestTimeout, "Argument 'httpNetworkRequestTimeout' cannot be null!"); @@ -7649,6 +7706,10 @@ private Mono executeFeedOperationWithAvailabilityStrategy( this.getEffectiveEndToEndOperationLatencyPolicyConfig( req.requestContext.getEndToEndOperationLatencyPolicyConfig(), resourceType, operationType); + AtomicBoolean shouldAddHubRegionProcessingOnlyHeader = new AtomicBoolean(false); + PerPartitionCircuitBreakerInfoHolder perPartitionCircuitBreakerInfoHolder = new PerPartitionCircuitBreakerInfoHolder(); + PerPartitionAutomaticFailoverInfoHolder perPartitionAutomaticFailoverInfoHolder = new PerPartitionAutomaticFailoverInfoHolder(); + req.requestContext.setEndToEndOperationLatencyPolicyConfig(endToEndPolicyConfig); List initialExcludedRegions = req.requestContext.getExcludeRegions(); @@ -7676,7 +7737,10 @@ private Mono executeFeedOperationWithAvailabilityStrategy( = new CrossRegionAvailabilityContextForRxDocumentServiceRequest( feedOperationContextForCircuitBreakerForRequestOutsideOfAvailabilityStrategyFlow, null, - availabilityStrategyContext); + availabilityStrategyContext, + shouldAddHubRegionProcessingOnlyHeader, + perPartitionCircuitBreakerInfoHolder, + perPartitionAutomaticFailoverInfoHolder); req.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest); @@ -7697,7 +7761,10 @@ private Mono executeFeedOperationWithAvailabilityStrategy( CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequest = new CrossRegionAvailabilityContextForRxDocumentServiceRequest( feedOperationContextForCircuitBreakerForParentRequestInAvailabilityStrategyFlow, null, - availabilityStrategyContext); + availabilityStrategyContext, + shouldAddHubRegionProcessingOnlyHeader, + perPartitionCircuitBreakerInfoHolder, + perPartitionAutomaticFailoverInfoHolder); req.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest); @@ -7728,7 +7795,10 @@ private Mono executeFeedOperationWithAvailabilityStrategy( CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequestForNonHedgedRequest = new CrossRegionAvailabilityContextForRxDocumentServiceRequest( feedOperationContextForCircuitBreakerForNonHedgedRequest, null, - availabilityStrategyContextForNonHedgedRequest); + availabilityStrategyContextForNonHedgedRequest, + shouldAddHubRegionProcessingOnlyHeader, + perPartitionCircuitBreakerInfoHolder, + perPartitionAutomaticFailoverInfoHolder); clonedRequest.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequestForNonHedgedRequest); @@ -7771,8 +7841,10 @@ private Mono executeFeedOperationWithAvailabilityStrategy( = new CrossRegionAvailabilityContextForRxDocumentServiceRequest( feedOperationContextForCircuitBreakerForHedgedRequest, null, - availabilityStrategyContextForHedgedRequest - ); + availabilityStrategyContextForHedgedRequest, + shouldAddHubRegionProcessingOnlyHeader, + perPartitionCircuitBreakerInfoHolder, + perPartitionAutomaticFailoverInfoHolder); clonedRequest.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequestForHedgedRequest); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index 4c71d49fd5e6..786e08199d19 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -1185,6 +1185,10 @@ public void setThroughputBucket(Integer throughputBucket) { } } + public void setHubRegionProcessingOnly(boolean hubRegionProcessingOnly) { + this.headers.put(HttpConstants.HttpHeaders.HUB_REGION_PROCESSING_ONLY, Boolean.toString(hubRegionProcessingOnly)); + } + public Duration getResponseTimeout() { return responseTimeout; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java index 0e9dfa7b86de..38d889c8a2df 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java @@ -597,7 +597,8 @@ public enum RntbdRequestHeader implements RntbdHeader { ChangeFeedWireFormatVersion((short) 0x00B2, RntbdTokenType.String, false), PriorityLevel((short) 0x00BF, RntbdTokenType.Byte, false), GlobalDatabaseAccountName((short) 0x00CE, RntbdTokenType.String, false), - ThroughputBucket((short)0x00DB, RntbdTokenType.Byte, false); + ThroughputBucket((short)0x00DB, RntbdTokenType.Byte, false), + HubRegionProcessingOnly((short)0x00EF, RntbdTokenType.Byte , false); public static final List thinClientHeadersInOrderList = Arrays.asList( EffectivePartitionKey, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java index 0c3f9615312f..6f6e46ee695d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java @@ -133,6 +133,7 @@ final class RntbdRequestHeaders extends RntbdTokenStream { this.addPriorityLevel(headers); this.addGlobalDatabaseAccountName(headers); this.addThroughputBucket(headers); + this.addHubRegionProcessingOnly(headers); // Normal headers (Strings, Ints, Longs, etc.) @@ -294,6 +295,8 @@ private RntbdToken getCorrelatedActivityId() { private RntbdToken getThroughputBucket() { return this.get(RntbdRequestHeader.ThroughputBucket); } + private RntbdToken getHubRegionProcessingOnly() { return this.get(RntbdRequestHeader.HubRegionProcessingOnly); } + private RntbdToken getGlobalDatabaseAccountName() { return this.get(RntbdRequestHeader.GlobalDatabaseAccountName); } @@ -795,8 +798,7 @@ private void addPriorityLevel(final Map headers) } } - private void addThroughputBucket(final Map headers) - { + private void addThroughputBucket(final Map headers) { final String value = headers.get(HttpHeaders.THROUGHPUT_BUCKET); if (StringUtils.isNotEmpty(value)) { @@ -805,6 +807,15 @@ private void addThroughputBucket(final Map headers) } } + private void addHubRegionProcessingOnly(final Map headers) { + final String value = headers.get(HttpHeaders.HUB_REGION_PROCESSING_ONLY); + + if (StringUtils.isNotEmpty(value)) { + final boolean hubRegionProcessingOnly = Boolean.parseBoolean(value); + this.getHubRegionProcessingOnly().setValue(hubRegionProcessingOnly); + } + } + private void addGlobalDatabaseAccountName(final Map headers) { final String value = headers.get(HttpHeaders.GLOBAL_DATABASE_ACCOUNT_NAME); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java index d50643f0e5a9..5a92ac4d8ad4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java @@ -28,7 +28,7 @@ public class GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover { private static final Logger logger = LoggerFactory.getLogger(GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.class); - private final ConcurrentHashMap partitionKeyRangeToFailoverInfo; + private final ConcurrentHashMap partitionKeyRangeToFailoverInfo; private final ConcurrentHashMap partitionKeyRangeToEndToEndTimeoutErrorTracker; private final GlobalEndpointManager globalEndpointManager; private final AtomicBoolean isPerPartitionAutomaticFailoverEnabled; @@ -150,17 +150,12 @@ public boolean tryAddPartitionLevelLocationOverride(RxDocumentServiceRequest req if (!crossRegionAvailabilityContextForRequest.shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable()) { return false; } - - // apply PPAF override for reads once - in retry flows stick to applicable regions - if (crossRegionAvailabilityContextForRequest.hasPerPartitionAutomaticFailoverBeenAppliedForReads()) { - return false; - } } PartitionKeyRangeWrapper partitionKeyRangeWrapper = new PartitionKeyRangeWrapper(partitionKeyRange, resolvedCollectionRid); - PartitionLevelFailoverInfo partitionLevelFailoverInfo = this.partitionKeyRangeToFailoverInfo.get(partitionKeyRangeWrapper); + PartitionLevelAutomaticFailoverInfo partitionLevelAutomaticFailoverInfo = this.partitionKeyRangeToFailoverInfo.get(partitionKeyRangeWrapper); - if (partitionLevelFailoverInfo != null) { + if (partitionLevelAutomaticFailoverInfo != null) { if (request.isReadOnlyRequest()) { @@ -172,8 +167,8 @@ public boolean tryAddPartitionLevelLocationOverride(RxDocumentServiceRequest req crossRegionAvailabilityContextForRequest.setPerPartitionAutomaticFailoverAppliedStatusForReads(true); } - request.requestContext.routeToLocation(partitionLevelFailoverInfo.getCurrent()); - request.requestContext.setPerPartitionAutomaticFailoverInfoHolder(partitionLevelFailoverInfo); + request.requestContext.routeToLocation(partitionLevelAutomaticFailoverInfo.getCurrent()); + request.requestContext.setPerPartitionAutomaticFailoverInfoHolder(partitionLevelAutomaticFailoverInfo); return true; } @@ -182,13 +177,17 @@ public boolean tryAddPartitionLevelLocationOverride(RxDocumentServiceRequest req public boolean tryMarkEndpointAsUnavailableForPartitionKeyRange(RxDocumentServiceRequest request, boolean isEndToEndTimeoutHit) { + return this.tryMarkEndpointAsUnavailableForPartitionKeyRange(request, isEndToEndTimeoutHit, false); + } + + public boolean tryMarkEndpointAsUnavailableForPartitionKeyRange(RxDocumentServiceRequest request, boolean isEndToEndTimeoutHit, boolean forceFailoverThroughReads) { boolean isPerPartitionAutomaticFailoverEnabledSnapshot = this.isPerPartitionAutomaticFailoverEnabled.get(); if (!isPerPartitionAutomaticFailoverEnabledSnapshot) { return false; } - if (!isPerPartitionAutomaticFailoverApplicable(request)) { + if (!isPerPartitionAutomaticFailoverApplicable(request, forceFailoverThroughReads)) { return false; } @@ -236,13 +235,13 @@ public boolean tryMarkEndpointAsUnavailableForPartitionKeyRange(RxDocumentServic return false; } - PartitionLevelFailoverInfo partitionLevelFailoverInfo - = this.partitionKeyRangeToFailoverInfo.computeIfAbsent(partitionKeyRangeWrapper, partitionKeyRangeWrapper1 -> new PartitionLevelFailoverInfo(failedRegionalRoutingContext, this.globalEndpointManager)); + PartitionLevelAutomaticFailoverInfo partitionLevelAutomaticFailoverInfo + = this.partitionKeyRangeToFailoverInfo.computeIfAbsent(partitionKeyRangeWrapper, partitionKeyRangeWrapper1 -> new PartitionLevelAutomaticFailoverInfo(failedRegionalRoutingContext, this.globalEndpointManager)); // Rely on account-level read endpoints for new write region discovery List accountLevelReadRoutingContexts = this.globalEndpointManager.getAvailableReadRoutingContexts(); - if (partitionLevelFailoverInfo.tryMoveToNextLocation(accountLevelReadRoutingContexts, failedRegionalRoutingContext)) { + if (partitionLevelAutomaticFailoverInfo.tryMoveToNextLocation(accountLevelReadRoutingContexts, failedRegionalRoutingContext)) { if (logger.isWarnEnabled()) { logger.warn("Marking region {} as failed for partition key range {} and collection rid {}", @@ -251,7 +250,7 @@ public boolean tryMarkEndpointAsUnavailableForPartitionKeyRange(RxDocumentServic partitionKeyRangeWrapper.getCollectionResourceId()); } - request.requestContext.setPerPartitionAutomaticFailoverInfoHolder(partitionLevelFailoverInfo); + request.requestContext.setPerPartitionAutomaticFailoverInfoHolder(partitionLevelAutomaticFailoverInfo); this.partitionKeyRangeToEndToEndTimeoutErrorTracker.remove(partitionKeyRangeWrapper); return true; @@ -269,6 +268,11 @@ public boolean isPerPartitionAutomaticFailoverEnabled() { public boolean isPerPartitionAutomaticFailoverApplicable(RxDocumentServiceRequest request) { + return this.isPerPartitionAutomaticFailoverApplicable(request, false); + } + + public boolean isPerPartitionAutomaticFailoverApplicable(RxDocumentServiceRequest request, boolean forceFailoverThroughReads) { + boolean isPerPartitionAutomaticFailoverEnabledSnapshot = this.isPerPartitionAutomaticFailoverEnabled.get(); if (!isPerPartitionAutomaticFailoverEnabledSnapshot) { @@ -288,7 +292,14 @@ public boolean isPerPartitionAutomaticFailoverApplicable(RxDocumentServiceReques } if (request.isReadOnlyRequest()) { - return false; + + CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRxDocumentServiceRequest = + request.requestContext.getCrossRegionAvailabilityContext(); + + if (!forceFailoverThroughReads + && (crossRegionAvailabilityContextForRxDocumentServiceRequest != null && !crossRegionAvailabilityContextForRxDocumentServiceRequest.shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable())) { + return false; + } } if (this.globalEndpointManager.getApplicableReadRegionalRoutingContexts(Collections.emptyList()).size() <= 1) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/PartitionLevelFailoverInfo.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/PartitionLevelAutomaticFailoverInfo.java similarity index 80% rename from sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/PartitionLevelFailoverInfo.java rename to sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/PartitionLevelAutomaticFailoverInfo.java index 237915272008..b59c136b9723 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/PartitionLevelFailoverInfo.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/PartitionLevelAutomaticFailoverInfo.java @@ -5,8 +5,6 @@ import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.OperationType; -import com.azure.cosmos.implementation.PartitionKeyRangeWrapper; -import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.routing.RegionalRoutingContext; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; @@ -15,16 +13,12 @@ import java.io.IOException; import java.io.Serializable; import java.net.URI; -import java.time.Instant; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -@JsonSerialize(using = PartitionLevelFailoverInfo.PartitionLevelFailoverInfoSerializer.class) -public class PartitionLevelFailoverInfo implements Serializable { +@JsonSerialize(using = PartitionLevelAutomaticFailoverInfo.PartitionLevelFailoverInfoSerializer.class) +public class PartitionLevelAutomaticFailoverInfo implements Serializable { // Set of URIs which have seen 503s (specific to document writes) or 403/3s private final Set failedRegionalRoutingContexts = ConcurrentHashMap.newKeySet(); @@ -33,7 +27,7 @@ public class PartitionLevelFailoverInfo implements Serializable { private RegionalRoutingContext current; private final GlobalEndpointManager globalEndpointManager; - PartitionLevelFailoverInfo(RegionalRoutingContext current, GlobalEndpointManager globalEndpointManager) { + PartitionLevelAutomaticFailoverInfo(RegionalRoutingContext current, GlobalEndpointManager globalEndpointManager) { this.current = current; this.globalEndpointManager = globalEndpointManager; } @@ -69,10 +63,10 @@ public RegionalRoutingContext getCurrent() { return this.current; } - static class PartitionLevelFailoverInfoSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + static class PartitionLevelFailoverInfoSerializer extends com.fasterxml.jackson.databind.JsonSerializer { @Override - public void serialize(PartitionLevelFailoverInfo value, JsonGenerator gen, SerializerProvider serializers) throws IOException { + public void serialize(PartitionLevelAutomaticFailoverInfo value, JsonGenerator gen, SerializerProvider serializers) throws IOException { gen.writeStartObject(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/PerPartitionFailoverInfoHolder.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/PerPartitionAutomaticFailoverInfoHolder.java similarity index 53% rename from sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/PerPartitionFailoverInfoHolder.java rename to sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/PerPartitionAutomaticFailoverInfoHolder.java index 37eeed6acb5a..0d892e73a238 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/PerPartitionFailoverInfoHolder.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/PerPartitionAutomaticFailoverInfoHolder.java @@ -11,27 +11,27 @@ import java.io.IOException; import java.io.Serializable; -@JsonSerialize(using = PerPartitionFailoverInfoHolder.PerPartitionFailoverInfoHolderSerializer.class) -public class PerPartitionFailoverInfoHolder implements Serializable { +@JsonSerialize(using = PerPartitionAutomaticFailoverInfoHolder.PerPartitionFailoverInfoHolderSerializer.class) +public class PerPartitionAutomaticFailoverInfoHolder implements Serializable { - private final Utils.ValueHolder partitionLevelFailoverInfoValueHolder = new Utils.ValueHolder<>(); + private final Utils.ValueHolder partitionLevelFailoverInfoValueHolder = new Utils.ValueHolder<>(); - public synchronized PartitionLevelFailoverInfo getPartitionLevelFailoverInfo() { + public synchronized PartitionLevelAutomaticFailoverInfo getPartitionLevelFailoverInfo() { return partitionLevelFailoverInfoValueHolder.v; } - public synchronized void setPartitionLevelFailoverInfo(PartitionLevelFailoverInfo partitionLevelFailoverInfo) { - this.partitionLevelFailoverInfoValueHolder.v = partitionLevelFailoverInfo; + public synchronized void setPartitionLevelFailoverInfo(PartitionLevelAutomaticFailoverInfo partitionLevelAutomaticFailoverInfo) { + this.partitionLevelFailoverInfoValueHolder.v = partitionLevelAutomaticFailoverInfo; } - public static class PerPartitionFailoverInfoHolderSerializer extends com.fasterxml.jackson.databind.JsonSerializer { + public static class PerPartitionFailoverInfoHolderSerializer extends com.fasterxml.jackson.databind.JsonSerializer { @Override - public void serialize(PerPartitionFailoverInfoHolder value, JsonGenerator gen, SerializerProvider serializers) throws IOException { + public void serialize(PerPartitionAutomaticFailoverInfoHolder value, JsonGenerator gen, SerializerProvider serializers) throws IOException { - PartitionLevelFailoverInfo partitionLevelFailoverInfo = value.getPartitionLevelFailoverInfo(); + PartitionLevelAutomaticFailoverInfo partitionLevelAutomaticFailoverInfo = value.getPartitionLevelFailoverInfo(); - if (partitionLevelFailoverInfo != null) { + if (partitionLevelAutomaticFailoverInfo != null) { gen.writeStartObject(); gen.writeObjectField("perPartitionAutomaticFailoverCtx", value.getPartitionLevelFailoverInfo()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java index 6d61cca536d1..bcbf8f35b731 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/LocationCache.java @@ -173,6 +173,10 @@ public void onDatabaseAccountRead(DatabaseAccount databaseAccount) { * @return Resolved getEndpoint */ public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest request) { + return this.resolveServiceEndpoint(request, false); + } + + public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest request, boolean isInHubRegionDiscoveryMode) { Objects.requireNonNull(request.requestContext, "RxDocumentServiceRequest.requestContext is required and cannot be null."); @@ -189,10 +193,20 @@ public RegionalRoutingContext resolveServiceEndpoint(RxDocumentServiceRequest re // first and the second writable region in DatabaseAccount (for manual failover) DatabaseAccountLocationsInfo currentLocationInfo = this.locationInfo; - if (this.enableEndpointDiscovery && !currentLocationInfo.availableWriteLocations.isEmpty()) { - locationIndex = Math.min(locationIndex%2, currentLocationInfo.availableWriteLocations.size()-1); - String writeLocation = currentLocationInfo.availableWriteLocations.get(locationIndex); - return currentLocationInfo.availableWriteRegionalRoutingContextsByRegionName.get(writeLocation); + if (this.enableEndpointDiscovery) { + + if (isInHubRegionDiscoveryMode && !currentLocationInfo.availableReadLocations.isEmpty()) { + locationIndex = locationIndex % currentLocationInfo.availableReadLocations.size(); + String potentialHubLocation = currentLocationInfo.availableReadLocations.get(locationIndex); + return currentLocationInfo.availableReadRegionalRoutingContextsByRegionName.get(potentialHubLocation); + } else if (!currentLocationInfo.availableWriteLocations.isEmpty()) { + locationIndex = Math.min(locationIndex%2, currentLocationInfo.availableWriteLocations.size()-1); + String writeLocation = currentLocationInfo.availableWriteLocations.get(locationIndex); + return currentLocationInfo.availableWriteRegionalRoutingContextsByRegionName.get(writeLocation); + } + + return this.defaultRoutingContext; + } else { return this.defaultRoutingContext; } @@ -455,7 +469,7 @@ private UnmodifiableList reevaluate( // user wishes to exclude all regions - use partition-set level primary region [or] account-level primary region // no cross region retries applicable if (!userConfiguredExcludeRegions.isEmpty() && regionalRoutingContextsRemovedByInternalExcludeRegions.isEmpty()) { - crossRegionAvailabilityContextForRequest.shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable(true); + crossRegionAvailabilityContextForRequest.setShouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable(true); return applicableRegionalRoutingContexts; } @@ -464,10 +478,9 @@ private UnmodifiableList reevaluate( if (effectivePreferredLocations != null && !effectivePreferredLocations.isEmpty()) { if (crossRegionAvailabilityContextForRequest.hasPerPartitionAutomaticFailoverBeenAppliedForReads()) { - crossRegionAvailabilityContextForRequest.shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable(false); modifiedRegionalRoutingContexts.add(firstApplicableRegionalRoutingContext); } else { - crossRegionAvailabilityContextForRequest.shouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable(true); + crossRegionAvailabilityContextForRequest.setShouldUsePerPartitionAutomaticFailoverOverrideForReadsIfApplicable(true); } } }