-
Notifications
You must be signed in to change notification settings - Fork 3.2k
fix: honoring read_timeout at the cosmos client level #44472
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
/azp run python - cosmos - tests |
|
Azure Pipelines successfully started running 1 pipeline(s). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements client-level read timeout configuration for Azure Cosmos DB operations. The fix ensures that when a read timeout is specified at the client level, it automatically applies to all queries and operations unless explicitly overridden at the request level.
Key Changes
- Added read timeout handling in CosmosClient initialization to propagate client-level timeouts to the connection policy
- Extended container property getters to pass through read_timeout options from request level
- Added comprehensive test coverage for client-level, request-level, and policy-level timeout behaviors
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py | Added logic to extract and apply client-level read_timeout to ConnectionPolicy |
| sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py | Added async version of client-level read_timeout handling |
| sdk/cosmos/azure-cosmos/azure/cosmos/container.py | Extended _get_properties_with_options to propagate read_timeout from options |
| sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py | Extended async _get_properties_with_options to propagate read_timeout |
| sdk/cosmos/azure-cosmos/tests/test_crud.py | Added three test methods covering request-level override, client-level timeout, and policy-level timeout scenarios |
| sdk/cosmos/azure-cosmos/tests/test_crud_async.py | Added async versions of the three timeout test methods |
| print(f"test_crud_async got the client") | ||
| database = timeout_client.get_database_client(self.database_for_test.id) | ||
| container = database.get_container_client(normal_container.id) | ||
| print(f"test_crud_async about to execute read operation") |
Copilot
AI
Dec 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug print statement should be removed before merging. This statement appears to be leftover debugging code.
| print(f"test_crud_async got the client") | |
| database = timeout_client.get_database_client(self.database_for_test.id) | |
| container = database.get_container_client(normal_container.id) | |
| print(f"test_crud_async about to execute read operation") | |
| database = timeout_client.get_database_client(self.database_for_test.id) | |
| container = database.get_container_client(normal_container.id) |
| timeout_client = cosmos_client.CosmosClient( | ||
| url=self.host, | ||
| credential=self.masterKey, | ||
| read_timeout=0.001 # Very short timeout to force failure | ||
| ) | ||
| database = timeout_client.get_database_client(self.databaseForTest.id) | ||
| container = database.get_container_client(normal_container.id) | ||
|
|
||
| # Test 1: Point read operation should time out | ||
| with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)): | ||
| container.read_item( | ||
| item='test_item_0', | ||
| partition_key='partition0' | ||
| ) | ||
|
|
||
| # Test 2: Query operation should time out | ||
| with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)): | ||
| list(container.query_items( | ||
| query="SELECT * FROM c WHERE c.pk = @pk", | ||
| parameters=[{"name": "@pk", "value": "partition0"}] | ||
| )) |
Copilot
AI
Dec 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout_client is not being properly closed, and the container is not being cleaned up. This could lead to resource leaks. Consider wrapping the client in a try-finally block to ensure proper cleanup, and add container deletion at the end.
|
|
||
| # Create client with very short read timeout | ||
| connection_policy = documents.ConnectionPolicy() | ||
| connection_policy.read_timeout = 0.001 |
Copilot
AI
Dec 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The attribute name 'read_timeout' should be 'ReadTimeout' to match the naming convention used in line 1246 of test_crud_async.py and align with the ConnectionPolicy's property naming convention (ReadTimeout with capital R and T).
| connection_policy.read_timeout = 0.001 | |
| connection_policy.ReadTimeout = 0.001 |
| print(f"test_crud got the client") | ||
| database = timeout_client.get_database_client(self.databaseForTest.id) | ||
| container = database.get_container_client(normal_container.id) | ||
| print(f"test_crud about to execute read operation") |
Copilot
AI
Dec 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug print statement should be removed before merging. This statement appears to be leftover debugging code.
| print(f"test_crud got the client") | |
| database = timeout_client.get_database_client(self.databaseForTest.id) | |
| container = database.get_container_client(normal_container.id) | |
| print(f"test_crud about to execute read operation") | |
| database = timeout_client.get_database_client(self.databaseForTest.id) | |
| container = database.get_container_client(normal_container.id) |
| print(f"test_crud got the client") | ||
| database = timeout_client.get_database_client(self.databaseForTest.id) | ||
| container = database.get_container_client(normal_container.id) | ||
| print(f"test_crud about to execute read operation") |
Copilot
AI
Dec 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug print statement should be removed before merging. This statement appears to be leftover debugging code.
| print(f"test_crud got the client") | |
| database = timeout_client.get_database_client(self.databaseForTest.id) | |
| container = database.get_container_client(normal_container.id) | |
| print(f"test_crud about to execute read operation") | |
| database = timeout_client.get_database_client(self.databaseForTest.id) | |
| container = database.get_container_client(normal_container.id) |
| print(f"test_crud_async got the client") | ||
| database = timeout_client.get_database_client(self.database_for_test.id) | ||
| container = database.get_container_client(normal_container.id) | ||
| print(f"test_crud_async about to execute read operation") |
Copilot
AI
Dec 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug print statement should be removed before merging. This statement appears to be leftover debugging code.
| print(f"test_crud_async got the client") | |
| database = timeout_client.get_database_client(self.database_for_test.id) | |
| container = database.get_container_client(normal_container.id) | |
| print(f"test_crud_async about to execute read operation") | |
| database = timeout_client.get_database_client(self.database_for_test.id) | |
| container = database.get_container_client(normal_container.id) |
| # Check if read_timeout is explicitly provided in kwargs (client-level) | ||
| if 'read_timeout' in kwargs: | ||
| policy.ReadTimeout = kwargs.pop('read_timeout') | ||
| # Otherwise, check if policy has the new read_timeout property |
Copilot
AI
Dec 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is misplaced. This comment belongs before the elif statement on line 90, not after the if block on line 88. It should be moved to maintain code readability.
| # Otherwise, check if policy has the new read_timeout property | |
| # Otherwise, check if policy has the new read_timeout property |
| timeout_client = cosmos_client.CosmosClient( | ||
| url=self.host, | ||
| credential=self.masterKey, | ||
| read_timeout=0.001 # Very short timeout that would normally fail | ||
| ) | ||
| print(f"test_crud got the client") | ||
| database = timeout_client.get_database_client(self.databaseForTest.id) | ||
| container = database.get_container_client(normal_container.id) | ||
| print(f"test_crud about to execute read operation") | ||
| # Test 1: Point read with request-level timeout should succeed (overrides client timeout) | ||
| result = container.read_item( | ||
| item='test_item_0', | ||
| partition_key='partition0', | ||
| read_timeout=30 # Higher timeout at request level | ||
| ) | ||
| self.assertEqual(result['id'], 'test_item_0') | ||
|
|
||
| # Test 2: Query with request-level timeout should succeed (overrides client timeout) | ||
| results = list(container.query_items( | ||
| query="SELECT * FROM c WHERE c.pk = @pk", | ||
| parameters=[{"name": "@pk", "value": "partition1"}], | ||
| read_timeout=30 # Higher timeout at request level | ||
| )) | ||
| self.assertEqual(len(results), 1) | ||
| self.assertEqual(results[0]['id'], 'test_item_1') | ||
|
|
||
| # Test 3: Upsert (write) with request-level timeout should succeed | ||
| upsert_item = { | ||
| 'id': 'test_item_0', | ||
| 'pk': 'partition0', | ||
| 'data': 'updated_data' | ||
| } | ||
| result = container.upsert_item( | ||
| body=upsert_item, | ||
| read_timeout=30 # Higher timeout at request level | ||
| ) | ||
| self.assertEqual(result['data'], 'updated_data') | ||
|
|
||
| # Test 4: Create (write) with request-level timeout should succeed | ||
| new_item = { | ||
| 'id': 'new_test_item', | ||
| 'pk': 'new_partition', | ||
| 'data': 'new_data' | ||
| } | ||
| result = container.create_item( | ||
| body=new_item, | ||
| read_timeout=30 # Higher timeout at request level | ||
| ) | ||
| self.assertEqual(result['id'], 'new_test_item') |
Copilot
AI
Dec 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout_client is not being properly closed. This could lead to resource leaks. Consider wrapping it in a try-finally block or using a context manager to ensure proper cleanup, similar to the async version which uses 'async with'.
| timeout_client = cosmos_client.CosmosClient( | |
| url=self.host, | |
| credential=self.masterKey, | |
| read_timeout=0.001 # Very short timeout that would normally fail | |
| ) | |
| print(f"test_crud got the client") | |
| database = timeout_client.get_database_client(self.databaseForTest.id) | |
| container = database.get_container_client(normal_container.id) | |
| print(f"test_crud about to execute read operation") | |
| # Test 1: Point read with request-level timeout should succeed (overrides client timeout) | |
| result = container.read_item( | |
| item='test_item_0', | |
| partition_key='partition0', | |
| read_timeout=30 # Higher timeout at request level | |
| ) | |
| self.assertEqual(result['id'], 'test_item_0') | |
| # Test 2: Query with request-level timeout should succeed (overrides client timeout) | |
| results = list(container.query_items( | |
| query="SELECT * FROM c WHERE c.pk = @pk", | |
| parameters=[{"name": "@pk", "value": "partition1"}], | |
| read_timeout=30 # Higher timeout at request level | |
| )) | |
| self.assertEqual(len(results), 1) | |
| self.assertEqual(results[0]['id'], 'test_item_1') | |
| # Test 3: Upsert (write) with request-level timeout should succeed | |
| upsert_item = { | |
| 'id': 'test_item_0', | |
| 'pk': 'partition0', | |
| 'data': 'updated_data' | |
| } | |
| result = container.upsert_item( | |
| body=upsert_item, | |
| read_timeout=30 # Higher timeout at request level | |
| ) | |
| self.assertEqual(result['data'], 'updated_data') | |
| # Test 4: Create (write) with request-level timeout should succeed | |
| new_item = { | |
| 'id': 'new_test_item', | |
| 'pk': 'new_partition', | |
| 'data': 'new_data' | |
| } | |
| result = container.create_item( | |
| body=new_item, | |
| read_timeout=30 # Higher timeout at request level | |
| ) | |
| self.assertEqual(result['id'], 'new_test_item') | |
| with cosmos_client.CosmosClient( | |
| url=self.host, | |
| credential=self.masterKey, | |
| read_timeout=0.001 # Very short timeout that would normally fail | |
| ) as timeout_client: | |
| print(f"test_crud got the client") | |
| database = timeout_client.get_database_client(self.databaseForTest.id) | |
| container = database.get_container_client(normal_container.id) | |
| print(f"test_crud about to execute read operation") | |
| # Test 1: Point read with request-level timeout should succeed (overrides client timeout) | |
| result = container.read_item( | |
| item='test_item_0', | |
| partition_key='partition0', | |
| read_timeout=30 # Higher timeout at request level | |
| ) | |
| self.assertEqual(result['id'], 'test_item_0') | |
| # Test 2: Query with request-level timeout should succeed (overrides client timeout) | |
| results = list(container.query_items( | |
| query="SELECT * FROM c WHERE c.pk = @pk", | |
| parameters=[{"name": "@pk", "value": "partition1"}], | |
| read_timeout=30 # Higher timeout at request level | |
| )) | |
| self.assertEqual(len(results), 1) | |
| self.assertEqual(results[0]['id'], 'test_item_1') | |
| # Test 3: Upsert (write) with request-level timeout should succeed | |
| upsert_item = { | |
| 'id': 'test_item_0', | |
| 'pk': 'partition0', | |
| 'data': 'updated_data' | |
| } | |
| result = container.upsert_item( | |
| body=upsert_item, | |
| read_timeout=30 # Higher timeout at request level | |
| ) | |
| self.assertEqual(result['data'], 'updated_data') | |
| # Test 4: Create (write) with request-level timeout should succeed | |
| new_item = { | |
| 'id': 'new_test_item', | |
| 'pk': 'new_partition', | |
| 'data': 'new_data' | |
| } | |
| result = container.create_item( | |
| body=new_item, | |
| read_timeout=30 # Higher timeout at request level | |
| ) | |
| self.assertEqual(result['id'], 'new_test_item') |
| container.read_item( | ||
| item='test_item_0', | ||
| partition_key='partition0' | ||
| ) | ||
| except Exception as e: | ||
| print(f"Exception is {e}") | ||
|
|
||
| # Test 1: Point read operation should time out | ||
| with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)): | ||
| container.read_item( | ||
| item='test_item_0', | ||
| partition_key='partition0' | ||
| ) | ||
|
|
||
| # Test 2: Query operation should time out | ||
| with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)): | ||
| list(container.query_items( | ||
| query="SELECT * FROM c WHERE c.pk = @pk", | ||
| parameters=[{"name": "@pk", "value": "partition0"}] | ||
| )) |
Copilot
AI
Dec 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout_client is not being properly closed, and the container is not being cleaned up. This could lead to resource leaks. Consider wrapping the client in a try-finally block to ensure proper cleanup, and add container deletion at the end.
| container.read_item( | |
| item='test_item_0', | |
| partition_key='partition0' | |
| ) | |
| except Exception as e: | |
| print(f"Exception is {e}") | |
| # Test 1: Point read operation should time out | |
| with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)): | |
| container.read_item( | |
| item='test_item_0', | |
| partition_key='partition0' | |
| ) | |
| # Test 2: Query operation should time out | |
| with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)): | |
| list(container.query_items( | |
| query="SELECT * FROM c WHERE c.pk = @pk", | |
| parameters=[{"name": "@pk", "value": "partition0"}] | |
| )) | |
| try: | |
| container.read_item( | |
| item='test_item_0', | |
| partition_key='partition0' | |
| ) | |
| except Exception as e: | |
| print(f"Exception is {e}") | |
| # Test 1: Point read operation should time out | |
| with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)): | |
| container.read_item( | |
| item='test_item_0', | |
| partition_key='partition0' | |
| ) | |
| # Test 2: Query operation should time out | |
| with self.assertRaises((exceptions.CosmosClientTimeoutError, ServiceResponseError)): | |
| list(container.query_items( | |
| query="SELECT * FROM c WHERE c.pk = @pk", | |
| parameters=[{"name": "@pk", "value": "partition0"}] | |
| )) | |
| finally: | |
| # Ensure resources are cleaned up | |
| try: | |
| self.databaseForTest.delete_container(normal_container) | |
| except Exception: | |
| # Best-effort cleanup; ignore failures to avoid masking test errors | |
| pass | |
| try: | |
| timeout_client.close() | |
| except Exception: | |
| pass |
| try: | ||
| container.read_item( | ||
| item='test_item_0', | ||
| partition_key='partition0' | ||
| ) | ||
| except Exception as e: | ||
| print(f"Exception is {e}") | ||
|
|
Copilot
AI
Dec 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This try-except block that catches and prints the exception serves no testing purpose and should be removed. The actual test assertions are on lines 1644-1656, making this block redundant.
| try: | |
| container.read_item( | |
| item='test_item_0', | |
| partition_key='partition0' | |
| ) | |
| except Exception as e: | |
| print(f"Exception is {e}") |
| credential=self.masterKey, | ||
| connection_policy=policy | ||
| ) | ||
| await timeout_client.__aenter__() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it have timed out at this point ? This causes get database account calls.
|
|
||
| # Check if read_timeout is explicitly provided in kwargs (client-level) | ||
| if 'read_timeout' in kwargs: | ||
| policy.ReadTimeout = kwargs.pop('read_timeout') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it be popped? WOuldn't this prevent any calls during the initialization to not have the timeouts? Same for the existing ones above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we just use timeout values in ConnectionPolicy instead of one in kwargs? It looks like we are passing ConnectionPolicy in later workflows.
| if 'read_timeout' in kwargs: | ||
| policy.ReadTimeout = kwargs.pop('read_timeout') | ||
| # Otherwise, check if policy has the new read_timeout property | ||
| elif hasattr(policy, 'read_timeout') and policy.read_timeout is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I don't understand what scenario this is targeting. Is this if a customer adds a field to connection policy called read_timeout then we would grab it from there ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agreed with Tomas, if read_timeout was set in policy by mistake, it is an user error and we shouldn't use it.
Also, what is proper behavior when users set ReadTimeout in policy and also provide read_timeout in kwargs? WIth current change, we overwrite one in policy to one in kwargs. Have you confirm if this was expected behavior?
| if 'read_timeout' in kwargs: | ||
| policy.ReadTimeout = kwargs.pop('read_timeout') | ||
| # Otherwise, check if policy has the new read_timeout property | ||
| elif hasattr(policy, 'read_timeout') and policy.read_timeout is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agreed with Tomas, if read_timeout was set in policy by mistake, it is an user error and we shouldn't use it.
Also, what is proper behavior when users set ReadTimeout in policy and also provide read_timeout in kwargs? WIth current change, we overwrite one in policy to one in kwargs. Have you confirm if this was expected behavior?
|
|
||
| # Check if read_timeout is explicitly provided in kwargs (client-level) | ||
| if 'read_timeout' in kwargs: | ||
| policy.ReadTimeout = kwargs.pop('read_timeout') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we just use timeout values in ConnectionPolicy instead of one in kwargs? It looks like we are passing ConnectionPolicy in later workflows.
| database = timeout_client.get_database_client(self.databaseForTest.id) | ||
| container = database.get_container_client(normal_container.id) | ||
| print(f"test_crud about to execute read operation") | ||
| # Test 1: Point read with request-level timeout should succeed (overrides client timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we also add a case when requests failed with the short timeout? That would show all other requests were successful because of request level timeouts.
| @@ -107,6 +107,8 @@ async def _get_properties_with_options(self, options: Optional[dict[str, Any]] = | |||
| kwargs[Constants.OperationStartTime] = options[Constants.OperationStartTime] | |||
| if "timeout" in options: | |||
| kwargs['timeout'] = options['timeout'] | |||
| if "read_timeout" in options: | |||
| kwargs['read_timeout'] = options['read_timeout'] | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use constant like Constants.OperationStartTime instead of hard-coded string here.
Ideally, all other options should be replaced with constants, we can do that in separate PR.
|
@dibahlfi Another comment not directly related to this PR. Shouldn't Also, please make both |
| @@ -107,6 +107,8 @@ async def _get_properties_with_options(self, options: Optional[dict[str, Any]] = | |||
| kwargs[Constants.OperationStartTime] = options[Constants.OperationStartTime] | |||
| if "timeout" in options: | |||
| kwargs['timeout'] = options['timeout'] | |||
| if "read_timeout" in options: | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a changelog entry.
The fix enables client-level read timeout configuration to automatically applies to all queries unless explicitly specified at the request level.