diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py index 2ce07c61c9c9..4096abd2504a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py @@ -107,6 +107,8 @@ async def _get_properties_with_options(self, options: Optional[dict[str, Any]] = kwargs[Constants.OperationStartTime] = options[Constants.OperationStartTime] if "timeout" in options: kwargs['timeout'] = options['timeout'] + if "read_timeout" in options: + kwargs['read_timeout'] = options['read_timeout'] return await self._get_properties(**kwargs) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py index 86b6c33ccf2b..52da7f051cca 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py @@ -82,6 +82,14 @@ def _build_connection_policy(kwargs: dict[str, Any]) -> ConnectionPolicy: policy.RequestTimeout = kwargs.pop('request_timeout') / 1000.0 else: policy.RequestTimeout = kwargs.pop('connection_timeout', policy.RequestTimeout) + + # Check if read_timeout is explicitly provided in kwargs (client-level) + if 'read_timeout' in kwargs: + policy.ReadTimeout = kwargs.pop('read_timeout') + # Otherwise, check if policy has the new read_timeout property + elif hasattr(policy, 'read_timeout') and policy.read_timeout is not None: + policy.ReadTimeout = policy.read_timeout + policy.ConnectionMode = kwargs.pop('connection_mode', policy.ConnectionMode) policy.ProxyConfiguration = kwargs.pop('proxy_config', policy.ProxyConfiguration) policy.EnableEndpointDiscovery = kwargs.pop('enable_endpoint_discovery', policy.EnableEndpointDiscovery) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index 1bb4e95caa8a..627722929c79 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -108,6 +108,8 @@ def _get_properties_with_options(self, options: Optional[dict[str, Any]] = None) kwargs[Constants.OperationStartTime] = options[Constants.OperationStartTime] if "timeout" in options: kwargs['timeout'] = options['timeout'] + if "read_timeout" in options: + kwargs['read_timeout'] = options['read_timeout'] return self._get_properties(**kwargs) def _get_properties(self, **kwargs: Any) -> dict[str, Any]: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py index 879675522a6d..a9a9a93d6f54 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py @@ -93,6 +93,14 @@ def _build_connection_policy(kwargs: dict[str, Any]) -> ConnectionPolicy: policy.RequestTimeout = kwargs.pop('request_timeout') / 1000.0 else: policy.RequestTimeout = kwargs.pop('connection_timeout', policy.RequestTimeout) + + # Check if read_timeout is explicitly provided in kwargs (client-level) + if 'read_timeout' in kwargs: + policy.ReadTimeout = kwargs.pop('read_timeout') + # Otherwise, check if policy has the new read_timeout property + elif hasattr(policy, 'read_timeout') and policy.read_timeout is not None: + policy.ReadTimeout = policy.read_timeout + policy.ConnectionMode = kwargs.pop('connection_mode', policy.ConnectionMode) policy.ProxyConfiguration = kwargs.pop('proxy_config', policy.ProxyConfiguration) policy.EnableEndpointDiscovery = kwargs.pop('enable_endpoint_discovery', policy.EnableEndpointDiscovery) diff --git a/sdk/cosmos/azure-cosmos/tests/test_crud.py b/sdk/cosmos/azure-cosmos/tests/test_crud.py index dc9cb5882747..4204c1cd3f56 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_crud.py +++ b/sdk/cosmos/azure-cosmos/tests/test_crud.py @@ -1466,6 +1466,78 @@ def test_timeout_for_point_operation(self): ) self.assertEqual(result['id'], 'test_item_1') + def test_request_level_timeout_overrides_client_read_timeout(self): + """Test that request-level read_timeout overrides client-level timeout for reads and writes""" + + # Create container with normal client + normal_container = self.databaseForTest.create_container( + id='request_timeout_container_' + str(uuid.uuid4()), + partition_key=PartitionKey(path="/pk") + ) + + # Create test items + for i in range(5): + test_item = { + 'id': f'test_item_{i}', + 'pk': f'partition{i}', + 'data': f'test_data_{i}' + } + normal_container.create_item(test_item) + + # Create client with very short read timeout at client level + timeout_client = cosmos_client.CosmosClient( + url=self.host, + credential=self.masterKey, + read_timeout=0.001 # Very short timeout that would normally fail + ) + print(f"test_crud got the client") + database = timeout_client.get_database_client(self.databaseForTest.id) + container = database.get_container_client(normal_container.id) + print(f"test_crud about to execute read operation") + # Test 1: Point read with request-level timeout should succeed (overrides client timeout) + result = container.read_item( + item='test_item_0', + partition_key='partition0', + read_timeout=30 # Higher timeout at request level + ) + self.assertEqual(result['id'], 'test_item_0') + + # Test 2: Query with request-level timeout should succeed (overrides client timeout) + results = list(container.query_items( + query="SELECT * FROM c WHERE c.pk = @pk", + parameters=[{"name": "@pk", "value": "partition1"}], + read_timeout=30 # Higher timeout at request level + )) + self.assertEqual(len(results), 1) + self.assertEqual(results[0]['id'], 'test_item_1') + + # Test 3: Upsert (write) with request-level timeout should succeed + upsert_item = { + 'id': 'test_item_0', + 'pk': 'partition0', + 'data': 'updated_data' + } + result = container.upsert_item( + body=upsert_item, + read_timeout=30 # Higher timeout at request level + ) + self.assertEqual(result['data'], 'updated_data') + + # Test 4: Create (write) with request-level timeout should succeed + new_item = { + 'id': 'new_test_item', + 'pk': 'new_partition', + 'data': 'new_data' + } + result = container.create_item( + body=new_item, + read_timeout=30 # Higher timeout at request level + ) + self.assertEqual(result['id'], 'new_test_item') + + # Cleanup + self.databaseForTest.delete_container(normal_container.id) + def test_point_operation_read_timeout(self): """Test that point operations respect client provided read timeout""" @@ -1491,6 +1563,98 @@ def test_point_operation_read_timeout(self): except Exception as e: print(f"Exception is {e}") + def test_client_level_read_timeout_on_queries_and_point_operations(self): + """Test that queries and point operations respect client-level read timeout""" + + # Create container with normal client + normal_container = self.databaseForTest.create_container( + id='read_timeout_container_' + str(uuid.uuid4()), + partition_key=PartitionKey(path="/pk") + ) + + # Create test items + for i in range(5): + test_item = { + 'id': f'test_item_{i}', + 'pk': f'partition{i}', + 'data': f'test_data_{i}' + } + normal_container.create_item(test_item) + + # Create client with very short read timeout + timeout_client = cosmos_client.CosmosClient( + url=self.host, + credential=self.masterKey, + read_timeout=0.001 # Very short timeout to force failure + ) + database = timeout_client.get_database_client(self.databaseForTest.id) + container = database.get_container_client(normal_container.id) + + # Test 1: Point read operation should time out + with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)): + container.read_item( + item='test_item_0', + partition_key='partition0' + ) + + # Test 2: Query operation should time out + with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)): + list(container.query_items( + query="SELECT * FROM c WHERE c.pk = @pk", + parameters=[{"name": "@pk", "value": "partition0"}] + )) + + def test_policy_level_read_timeout_on_queries_and_point_operations(self): + """Test that queries and point operations respect connection-policy level read timeout""" + + # Create container with normal client + normal_container = self.databaseForTest.create_container( + id='read_timeout_container_' + str(uuid.uuid4()), + partition_key=PartitionKey(path="/pk") + ) + + # Create test items + for i in range(5): + test_item = { + 'id': f'test_item_{i}', + 'pk': f'partition{i}', + 'data': f'test_data_{i}' + } + normal_container.create_item(test_item) + + # Create client with very short read timeout + connection_policy = documents.ConnectionPolicy() + connection_policy.read_timeout = 0.001 + timeout_client = cosmos_client.CosmosClient( + url=self.host, + credential=self.masterKey, + connection_policy=connection_policy + ) + database = timeout_client.get_database_client(self.databaseForTest.id) + container = database.get_container_client(normal_container.id) + + try: + container.read_item( + item='test_item_0', + partition_key='partition0' + ) + except Exception as e: + print(f"Exception is {e}") + + # Test 1: Point read operation should time out + with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)): + container.read_item( + item='test_item_0', + partition_key='partition0' + ) + + # Test 2: Query operation should time out + with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)): + list(container.query_items( + query="SELECT * FROM c WHERE c.pk = @pk", + parameters=[{"name": "@pk", "value": "partition0"}] + )) + # TODO: for read timeouts azure-core returns a ServiceResponseError, needs to be fixed in azure-core and then this test can be enabled @unittest.skip def test_query_operation_single_partition_read_timeout(self): diff --git a/sdk/cosmos/azure-cosmos/tests/test_crud_async.py b/sdk/cosmos/azure-cosmos/tests/test_crud_async.py index 8e6a3529ff86..b278f3fa462a 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_crud_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_crud_async.py @@ -1112,6 +1112,165 @@ async def send(self, request, **kwargs): self.assertLess(elapsed_time, 7) # Allow some overhead self.assertGreater(elapsed_time, 5) # Should wait at least close to timeout + async def test_request_level_timeout_overrides_client_read_timeout_async(self): + """Test that request-level read_timeout overrides client-level timeout for reads and writes """ + + # Create container with normal client + normal_container = await self.database_for_test.create_container( + id='request_timeout_container_async_' + str(uuid.uuid4()), + partition_key=PartitionKey(path="/pk") + ) + + # Create test items + for i in range(5): + test_item = { + 'id': f'test_item_{i}', + 'pk': f'partition{i}', + 'data': f'test_data_{i}' + } + await normal_container.create_item(test_item) + + # Create async client with very short read timeout at client level + async with CosmosClient( + url=self.host, + credential=self.masterKey, + read_timeout=0.001 # Very short timeout that would normally fail + ) as timeout_client: + print(f"test_crud_async got the client") + database = timeout_client.get_database_client(self.database_for_test.id) + container = database.get_container_client(normal_container.id) + print(f"test_crud_async about to execute read operation") + + # Test 1: Point read with request-level timeout should succeed (overrides client timeout) + result = await container.read_item( + item='test_item_0', + partition_key='partition0', + read_timeout=30 # Higher timeout at request level + ) + self.assertEqual(result['id'], 'test_item_0') + + # Test 2: Query with request-level timeout should succeed (overrides client timeout) + results = [] + async for item in container.query_items( + query="SELECT * FROM c WHERE c.pk = @pk", + parameters=[{"name": "@pk", "value": "partition1"}], + read_timeout=30 # Higher timeout at request level + ): + results.append(item) + self.assertEqual(len(results), 1) + self.assertEqual(results[0]['id'], 'test_item_1') + + # Test 3: Upsert (write) with request-level timeout should succeed + upsert_item = { + 'id': 'test_item_0', + 'pk': 'partition0', + 'data': 'updated_data' + } + result = await container.upsert_item( + body=upsert_item, + read_timeout=30 # Higher timeout at request level + ) + self.assertEqual(result['data'], 'updated_data') + + # Test 4: Create (write) with request-level timeout should succeed + new_item = { + 'id': 'new_test_item', + 'pk': 'new_partition', + 'data': 'new_data' + } + result = await container.create_item( + body=new_item, + read_timeout=30 # Higher timeout at request level + ) + self.assertEqual(result['id'], 'new_test_item') + + # Cleanup + await self.database_for_test.delete_container(normal_container.id) + + async def test_client_level_read_timeout_on_queries_and_point_operations_async(self): + """Test that queries and point operations respect client-level read timeout""" + + # Create container with normal client + normal_container = self.database_for_test.get_container_client( + self.configs.TEST_MULTI_PARTITION_CONTAINER_ID + ) + + # Create test items + for i in range(5): + await normal_container.create_item( + body={'id': f'item{i}', 'pk': f'partition{i % 2}'} + ) + + # Create client with very short read timeout + timeout_client = CosmosClient( + url=self.host, + credential=self.masterKey, + read_timeout=0.001 # 1ms - should time out + ) + await timeout_client.__aenter__() + + try: + database = timeout_client.get_database_client(self.database_for_test.id) + container = database.get_container_client(normal_container.id) + + # Test 1: Point read operation should time out + with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)): + await container.read_item(item='item0', partition_key='partition0') + + # Test 2: Query operation should time out + with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)): + items = [doc async for doc in container.query_items( + query="SELECT * FROM c WHERE c.pk = @pk", + parameters=[{"name": "@pk", "value": "partition0"}] + )] + + finally: + await timeout_client.close() + + async def test_policy_level_read_timeout_on_queries_and_point_operations_async(self): + """Test that queries and point operations respect policy-level read timeout""" + + # Create container with normal client + normal_container = self.database_for_test.get_container_client( + self.configs.TEST_MULTI_PARTITION_CONTAINER_ID + ) + + # Create test items + for i in range(5): + await normal_container.create_item( + body={'id': f'policy_item{i}', 'pk': f'policy_partition{i % 2}'} + ) + + # Create connection policy with very short read timeout + policy = documents.ConnectionPolicy() + policy.ReadTimeout = 0.001 # 1ms - should timeout + + # Create client with the policy + timeout_client = CosmosClient( + url=self.host, + credential=self.masterKey, + connection_policy=policy + ) + await timeout_client.__aenter__() + + try: + database = timeout_client.get_database_client(self.database_for_test.id) + container = database.get_container_client(normal_container.id) + + # Test 1: Point read operation should time out + with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)): + await container.read_item(item='policy_item0', partition_key='policy_partition0') + + # Test 2: Query operation should time out + with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)): + items = [doc async for doc in container.query_items( + query="SELECT * FROM c WHERE c.pk = @pk", + parameters=[{"name": "@pk", "value": "policy_partition0"}] + )] + + finally: + await timeout_client.close() + async def test_timeout_for_point_operation_async(self): """Test that point operations respect client timeout"""