diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/requests.py b/google/cloud/pubsub_v1/subscriber/_protocol/requests.py index 6fd35896b..9a0ba5a50 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/requests.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/requests.py @@ -32,6 +32,7 @@ class AckRequest(NamedTuple): ordering_key: Optional[str] future: Optional["futures.Future"] opentelemetry_data: Optional[SubscribeOpenTelemetry] = None + message_id: Optional[str] = None class DropRequest(NamedTuple): @@ -52,6 +53,7 @@ class ModAckRequest(NamedTuple): seconds: float future: Optional["futures.Future"] opentelemetry_data: Optional[SubscribeOpenTelemetry] = None + message_id: Optional[str] = None class NackRequest(NamedTuple): diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index de3ac3780..d509d8074 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -21,7 +21,16 @@ import logging import threading import typing -from typing import Any, Dict, Callable, Iterable, List, Optional, Set, Tuple +from typing import ( + Any, + Dict, + Callable, + Iterable, + List, + Optional, + Set, + Tuple, +) import uuid from opentelemetry import trace @@ -60,6 +69,12 @@ _LOGGER = logging.getLogger(__name__) +_SLOW_ACK_LOGGER = logging.getLogger("slow-ack") +_STREAMS_LOGGER = logging.getLogger("subscriber-streams") +_FLOW_CONTROL_LOGGER = logging.getLogger("subscriber-flow-control") +_CALLBACK_DELIVERY_LOGGER = logging.getLogger("callback-delivery") +_CALLBACK_EXCEPTION_LOGGER = logging.getLogger("callback-exceptions") +_EXPIRY_LOGGER = logging.getLogger("expiry") _REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown" _RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated" _RETRYABLE_STREAM_ERRORS = ( @@ -145,6 +160,14 @@ def _wrap_callback_errors( callback: The user callback. message: The Pub/Sub message. """ + _CALLBACK_DELIVERY_LOGGER.debug( + "Message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s) received by subscriber callback", + message.message_id, + message.ack_id, + message.ordering_key, + message.exactly_once_enabled, + ) + try: if message.opentelemetry_data: message.opentelemetry_data.end_subscribe_concurrency_control_span() @@ -156,9 +179,15 @@ def _wrap_callback_errors( # Note: the likelihood of this failing is extremely low. This just adds # a message to a queue, so if this doesn't work the world is in an # unrecoverable state and this thread should just bail. - _LOGGER.exception( - "Top-level exception occurred in callback while processing a message" + + _CALLBACK_EXCEPTION_LOGGER.exception( + "Message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s)'s callback threw exception, nacking message.", + message.message_id, + message.ack_id, + message.ordering_key, + message.exactly_once_enabled, ) + message.nack() on_callback_error(exc) @@ -199,6 +228,9 @@ def _process_requests( error_status: Optional["status_pb2.Status"], ack_reqs_dict: Dict[str, requests.AckRequest], errors_dict: Optional[Dict[str, str]], + ack_histogram: Optional[histogram.Histogram] = None, + # TODO - Change this param to a Union of Literals when we drop p3.7 support + req_type: str = "ack", ): """Process requests when exactly-once delivery is enabled by referring to error_status and errors_dict. @@ -209,28 +241,40 @@ def _process_requests( """ requests_completed = [] requests_to_retry = [] - for ack_id in ack_reqs_dict: + for ack_id, ack_request in ack_reqs_dict.items(): + # Debug logging: slow acks + if ( + req_type == "ack" + and ack_histogram + and ack_request.time_to_ack > ack_histogram.percentile(percent=99) + ): + _SLOW_ACK_LOGGER.debug( + "Message (id=%s, ack_id=%s) ack duration of %s s is higher than the p99 ack duration", + ack_request.message_id, + ack_request.ack_id, + ) + # Handle special errors returned for ack/modack RPCs via the ErrorInfo # sidecar metadata when exactly-once delivery is enabled. if errors_dict and ack_id in errors_dict: exactly_once_error = errors_dict[ack_id] if exactly_once_error.startswith("TRANSIENT_"): - requests_to_retry.append(ack_reqs_dict[ack_id]) + requests_to_retry.append(ack_request) else: if exactly_once_error == "PERMANENT_FAILURE_INVALID_ACK_ID": exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None) else: exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error) - future = ack_reqs_dict[ack_id].future + future = ack_request.future if future is not None: future.set_exception(exc) - requests_completed.append(ack_reqs_dict[ack_id]) + requests_completed.append(ack_request) # Temporary GRPC errors are retried elif ( error_status and error_status.code in _EXACTLY_ONCE_DELIVERY_TEMPORARY_RETRY_ERRORS ): - requests_to_retry.append(ack_reqs_dict[ack_id]) + requests_to_retry.append(ack_request) # Other GRPC errors are NOT retried elif error_status: if error_status.code == code_pb2.PERMISSION_DENIED: @@ -239,20 +283,20 @@ def _process_requests( exc = AcknowledgeError(AcknowledgeStatus.FAILED_PRECONDITION, info=None) else: exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status)) - future = ack_reqs_dict[ack_id].future + future = ack_request.future if future is not None: future.set_exception(exc) - requests_completed.append(ack_reqs_dict[ack_id]) + requests_completed.append(ack_request) # Since no error occurred, requests with futures are completed successfully. - elif ack_reqs_dict[ack_id].future: - future = ack_reqs_dict[ack_id].future + elif ack_request.future: + future = ack_request.future # success assert future is not None future.set_result(AcknowledgeStatus.SUCCESS) - requests_completed.append(ack_reqs_dict[ack_id]) + requests_completed.append(ack_request) # All other requests are considered completed. else: - requests_completed.append(ack_reqs_dict[ack_id]) + requests_completed.append(ack_request) return requests_completed, requests_to_retry @@ -560,8 +604,10 @@ def maybe_pause_consumer(self) -> None: with self._pause_resume_lock: if self.load >= _MAX_LOAD: if self._consumer is not None and not self._consumer.is_paused: - _LOGGER.debug( - "Message backlog over load at %.2f, pausing.", self.load + _FLOW_CONTROL_LOGGER.debug( + "Message backlog over load at %.2f (threshold %.2f), initiating client-side flow control", + self.load, + _RESUME_THRESHOLD, ) self._consumer.pause() @@ -588,10 +634,18 @@ def maybe_resume_consumer(self) -> None: self._maybe_release_messages() if self.load < _RESUME_THRESHOLD: - _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load) + _FLOW_CONTROL_LOGGER.debug( + "Current load is %.2f (threshold %.2f), suspending client-side flow control.", + self.load, + _RESUME_THRESHOLD, + ) self._consumer.resume() else: - _LOGGER.debug("Did not resume, current load is %.2f.", self.load) + _FLOW_CONTROL_LOGGER.debug( + "Current load is %.2f (threshold %.2f), retaining client-side flow control.", + self.load, + _RESUME_THRESHOLD, + ) def _maybe_release_messages(self) -> None: """Release (some of) the held messages if the current load allows for it. @@ -702,7 +756,7 @@ def send_unary_ack( if self._exactly_once_delivery_enabled(): requests_completed, requests_to_retry = _process_requests( - error_status, ack_reqs_dict, ack_errors_dict + error_status, ack_reqs_dict, ack_errors_dict, self.ack_histogram, "ack" ) else: requests_completed = [] @@ -796,7 +850,11 @@ def send_unary_modack( if self._exactly_once_delivery_enabled(): requests_completed, requests_to_retry = _process_requests( - error_status, ack_reqs_dict, modack_errors_dict + error_status, + ack_reqs_dict, + modack_errors_dict, + self.ack_histogram, + "modack", ) else: requests_completed = [] @@ -1239,6 +1297,11 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: receipt_modack=True, ) + if len(expired_ack_ids): + _EXPIRY_LOGGER.debug( + "ack ids %s were dropped as they have already expired.", expired_ack_ids + ) + with self._pause_resume_lock: if self._scheduler is None or self._leaser is None: _LOGGER.debug( @@ -1304,9 +1367,13 @@ def _should_recover(self, exception: BaseException) -> bool: # If this is in the list of idempotent exceptions, then we want to # recover. if isinstance(exception, _RETRYABLE_STREAM_ERRORS): - _LOGGER.debug("Observed recoverable stream error %s", exception) + _STREAMS_LOGGER.debug( + "Observed recoverable stream error %s, reopening stream", exception + ) return True - _LOGGER.debug("Observed non-recoverable stream error %s", exception) + _STREAMS_LOGGER.debug( + "Observed non-recoverable stream error %s, shutting down stream", exception + ) return False def _should_terminate(self, exception: BaseException) -> bool: @@ -1326,9 +1393,13 @@ def _should_terminate(self, exception: BaseException) -> bool: is_api_error = isinstance(exception, exceptions.GoogleAPICallError) # Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.) if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS): - _LOGGER.debug("Observed terminating stream error %s", exception) + _STREAMS_LOGGER.debug( + "Observed terminating stream error %s, shutting down stream", exception + ) return True - _LOGGER.debug("Observed non-terminating stream error %s", exception) + _STREAMS_LOGGER.debug( + "Observed non-terminating stream error %s, attempting to reopen", exception + ) return False def _on_rpc_done(self, future: Any) -> None: diff --git a/google/cloud/pubsub_v1/subscriber/message.py b/google/cloud/pubsub_v1/subscriber/message.py index 61f60c4d9..aa715ac67 100644 --- a/google/cloud/pubsub_v1/subscriber/message.py +++ b/google/cloud/pubsub_v1/subscriber/message.py @@ -16,6 +16,7 @@ import datetime as dt import json +import logging import math import time import typing @@ -43,6 +44,8 @@ attributes: {} }}""" +_ACK_NACK_LOGGER = logging.getLogger("ack-nack") + _SUCCESS_FUTURE = futures.Future() _SUCCESS_FUTURE.set_result(AcknowledgeStatus.SUCCESS) @@ -274,6 +277,7 @@ def ack(self) -> None: time_to_ack = math.ceil(time.time() - self._received_timestamp) self._request_queue.put( requests.AckRequest( + message_id=self.message_id, ack_id=self._ack_id, byte_size=self.size, time_to_ack=time_to_ack, @@ -282,6 +286,12 @@ def ack(self) -> None: opentelemetry_data=self.opentelemetry_data, ) ) + _ACK_NACK_LOGGER.debug( + "Called ack for message (id=%s, ack_id=%s, ordering_key=%s)", + self.message_id, + self.ack_id, + self.ordering_key, + ) def ack_with_response(self) -> "futures.Future": """Acknowledge the given message. @@ -322,6 +332,12 @@ def ack_with_response(self) -> "futures.Future": pubsub_v1.subscriber.exceptions.AcknowledgeError exception will be thrown. """ + _ACK_NACK_LOGGER.debug( + "Called ack for message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=True)", + self.message_id, + self.ack_id, + self.ordering_key, + ) if self.opentelemetry_data: self.opentelemetry_data.add_process_span_event("ack called") self.opentelemetry_data.end_process_span() @@ -335,6 +351,7 @@ def ack_with_response(self) -> "futures.Future": time_to_ack = math.ceil(time.time() - self._received_timestamp) self._request_queue.put( requests.AckRequest( + message_id=self.message_id, ack_id=self._ack_id, byte_size=self.size, time_to_ack=time_to_ack, @@ -382,6 +399,7 @@ def modify_ack_deadline(self, seconds: int) -> None: """ self._request_queue.put( requests.ModAckRequest( + message_id=self.message_id, ack_id=self._ack_id, seconds=seconds, future=None, @@ -445,6 +463,7 @@ def modify_ack_deadline_with_response(self, seconds: int) -> "futures.Future": self._request_queue.put( requests.ModAckRequest( + message_id=self.message_id, ack_id=self._ack_id, seconds=seconds, future=req_future, @@ -461,6 +480,13 @@ def nack(self) -> None: may take place immediately or after a delay, and may arrive at this subscriber or another. """ + _ACK_NACK_LOGGER.debug( + "Called nack for message (id=%s, ack_id=%s, ordering_key=%s, exactly_once=%s)", + self.message_id, + self.ack_id, + self.ordering_key, + self._exactly_once_delivery_enabled_func(), + ) if self.opentelemetry_data: self.opentelemetry_data.add_process_span_event("nack called") self.opentelemetry_data.end_process_span() @@ -530,3 +556,7 @@ def nack_with_response(self) -> "futures.Future": ) return future + + @property + def exactly_once_enabled(self): + return self._exactly_once_delivery_enabled_func() diff --git a/pytest.ini b/pytest.ini index 09a522efe..fc17230ef 100644 --- a/pytest.ini +++ b/pytest.ini @@ -21,4 +21,6 @@ filterwarnings = # Remove once https://github.com/googleapis/gapic-generator-python/issues/2303 is fixed ignore:The python-bigquery library will stop supporting Python 3.7:PendingDeprecationWarning # Remove once we move off credential files https://github.com/googleapis/google-auth-library-python/pull/1812 + # Note that these are used in tests only ignore:Your config file at [/home/kbuilder/.docker/config.json] contains these credential helper entries:DeprecationWarning + ignore:The `credentials_file` argument is deprecated because of a potential security risk:DeprecationWarning \ No newline at end of file diff --git a/tests/unit/pubsub_v1/subscriber/test_message.py b/tests/unit/pubsub_v1/subscriber/test_message.py index 8d9d2566e..03bdc1514 100644 --- a/tests/unit/pubsub_v1/subscriber/test_message.py +++ b/tests/unit/pubsub_v1/subscriber/test_message.py @@ -289,6 +289,7 @@ def test_ack(): msg.ack() put.assert_called_once_with( requests.AckRequest( + message_id=msg.message_id, ack_id="bogus_ack_id", byte_size=30, time_to_ack=mock.ANY, @@ -305,6 +306,7 @@ def test_ack_with_response_exactly_once_delivery_disabled(): future = msg.ack_with_response() put.assert_called_once_with( requests.AckRequest( + message_id=msg.message_id, ack_id="bogus_ack_id", byte_size=30, time_to_ack=mock.ANY, @@ -325,6 +327,7 @@ def test_ack_with_response_exactly_once_delivery_enabled(): future = msg.ack_with_response() put.assert_called_once_with( requests.AckRequest( + message_id=msg.message_id, ack_id="bogus_ack_id", byte_size=30, time_to_ack=mock.ANY, @@ -350,7 +353,12 @@ def test_modify_ack_deadline(): with mock.patch.object(msg._request_queue, "put") as put: msg.modify_ack_deadline(60) put.assert_called_once_with( - requests.ModAckRequest(ack_id="bogus_ack_id", seconds=60, future=None) + requests.ModAckRequest( + message_id=msg.message_id, + ack_id="bogus_ack_id", + seconds=60, + future=None, + ) ) check_call_types(put, requests.ModAckRequest) @@ -360,7 +368,12 @@ def test_modify_ack_deadline_with_response_exactly_once_delivery_disabled(): with mock.patch.object(msg._request_queue, "put") as put: future = msg.modify_ack_deadline_with_response(60) put.assert_called_once_with( - requests.ModAckRequest(ack_id="bogus_ack_id", seconds=60, future=None) + requests.ModAckRequest( + message_id=msg.message_id, + ack_id="bogus_ack_id", + seconds=60, + future=None, + ) ) assert future.result() == AcknowledgeStatus.SUCCESS assert future == message._SUCCESS_FUTURE @@ -374,7 +387,12 @@ def test_modify_ack_deadline_with_response_exactly_once_delivery_enabled(): with mock.patch.object(msg._request_queue, "put") as put: future = msg.modify_ack_deadline_with_response(60) put.assert_called_once_with( - requests.ModAckRequest(ack_id="bogus_ack_id", seconds=60, future=future) + requests.ModAckRequest( + message_id=msg.message_id, + ack_id="bogus_ack_id", + seconds=60, + future=future, + ) ) check_call_types(put, requests.ModAckRequest) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index f45959637..b9561d747 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import functools import logging import sys import threading @@ -61,6 +60,15 @@ from google.rpc import error_details_pb2 +def create_mock_message(**kwargs): + _message_mock = mock.create_autospec(message.Message, instance=True) + msg = _message_mock.return_value + for k, v in kwargs.items(): + setattr(msg, k, v) + + return msg + + @pytest.mark.parametrize( "exception,expected_cls", [ @@ -80,7 +88,7 @@ def test__wrap_as_exception(exception, expected_cls): def test__wrap_callback_errors_no_error(): - msg = mock.create_autospec(message.Message, instance=True) + msg = create_mock_message() callback = mock.Mock() on_callback_error = mock.Mock() @@ -99,7 +107,7 @@ def test__wrap_callback_errors_no_error(): ], ) def test__wrap_callback_errors_error(callback_error): - msg = mock.create_autospec(message.Message, instance=True) + msg = create_mock_message() callback = mock.Mock(side_effect=callback_error) on_callback_error = mock.Mock() @@ -511,8 +519,8 @@ def test__maybe_release_messages_on_overload(): manager = make_manager( flow_control=types.FlowControl(max_messages=10, max_bytes=1000) ) + msg = create_mock_message(ack_id="ack", size=11) - msg = mock.create_autospec(message.Message, instance=True, ack_id="ack", size=11) manager._messages_on_hold.put(msg) manager._on_hold_bytes = msg.size @@ -539,9 +547,8 @@ def test_opentelemetry__maybe_release_messages_subscribe_scheduler_span(span_exp # max load is hit. _leaser = manager._leaser = mock.create_autospec(leaser.Leaser) fake_leaser_add(_leaser, init_msg_count=8, assumed_msg_size=10) - msg = mock.create_autospec( - message.Message, instance=True, ack_id="ack_foo", size=10 - ) + msg = create_mock_message(ack_id="ack_foo", size=10) + msg.message_id = 3 opentelemetry_data = SubscribeOpenTelemetry(msg) msg.opentelemetry_data = opentelemetry_data @@ -611,7 +618,7 @@ def test__maybe_release_messages_negative_on_hold_bytes_warning( ) manager._callback = lambda msg: msg # pragma: NO COVER - msg = mock.create_autospec(message.Message, instance=True, ack_id="ack", size=17) + msg = create_mock_message(ack_id="ack", size=17) manager._messages_on_hold.put(msg) manager._on_hold_bytes = 5 # too low for some reason @@ -1633,8 +1640,11 @@ def test_close_nacks_internally_queued_messages(): def fake_nack(self): nacked_messages.append(self.data) - MockMsg = functools.partial(mock.create_autospec, message.Message, instance=True) - messages = [MockMsg(data=b"msg1"), MockMsg(data=b"msg2"), MockMsg(data=b"msg3")] + messages = [ + create_message(data=b"msg1"), + create_message(data=b"msg2"), + create_message(data=b"msg3"), + ] for msg in messages: msg.nack = stdlib_types.MethodType(fake_nack, msg)