1313# See the License for the specific language governing permissions and
1414# limitations under the License.
1515#
16- import json
17- import logging as std_logging
18- import pickle
1916import warnings
2017from typing import Callable , Dict , Optional , Sequence , Tuple , Union
2118
2421import google .auth # type: ignore
2522from google .auth import credentials as ga_credentials # type: ignore
2623from google .auth .transport .grpc import SslCredentials # type: ignore
27- from google .protobuf .json_format import MessageToJson
28- import google .protobuf .message
2924
3025import grpc # type: ignore
31- import proto # type: ignore
3226
3327from google .iam .v1 import iam_policy_pb2 # type: ignore
3428from google .iam .v1 import policy_pb2 # type: ignore
3529from google .protobuf import empty_pb2 # type: ignore
3630from google .pubsub_v1 .types import pubsub
3731from .base import PublisherTransport , DEFAULT_CLIENT_INFO
3832
39- try :
40- from google .api_core import client_logging # type: ignore
41-
42- CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
43- except ImportError : # pragma: NO COVER
44- CLIENT_LOGGING_SUPPORTED = False
45-
46- _LOGGER = std_logging .getLogger (__name__ )
47-
48-
49- class _LoggingClientInterceptor (grpc .UnaryUnaryClientInterceptor ): # pragma: NO COVER
50- def intercept_unary_unary (self , continuation , client_call_details , request ):
51- logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER .isEnabledFor (
52- std_logging .DEBUG
53- )
54- if logging_enabled : # pragma: NO COVER
55- request_metadata = client_call_details .metadata
56- if isinstance (request , proto .Message ):
57- request_payload = type (request ).to_json (request )
58- elif isinstance (request , google .protobuf .message .Message ):
59- request_payload = MessageToJson (request )
60- else :
61- request_payload = f"{ type (request ).__name__ } : { pickle .dumps (request )} "
62-
63- request_metadata = {
64- key : value .decode ("utf-8" ) if isinstance (value , bytes ) else value
65- for key , value in request_metadata
66- }
67- grpc_request = {
68- "payload" : request_payload ,
69- "requestMethod" : "grpc" ,
70- "metadata" : dict (request_metadata ),
71- }
72- _LOGGER .debug (
73- f"Sending request for { client_call_details .method } " ,
74- extra = {
75- "serviceName" : "google.pubsub.v1.Publisher" ,
76- "rpcName" : client_call_details .method ,
77- "request" : grpc_request ,
78- "metadata" : grpc_request ["metadata" ],
79- },
80- )
81-
82- response = continuation (client_call_details , request )
83- if logging_enabled : # pragma: NO COVER
84- response_metadata = response .trailing_metadata ()
85- # Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
86- metadata = (
87- dict ([(k , str (v )) for k , v in response_metadata ])
88- if response_metadata
89- else None
90- )
91- result = response .result ()
92- if isinstance (result , proto .Message ):
93- response_payload = type (result ).to_json (result )
94- elif isinstance (result , google .protobuf .message .Message ):
95- response_payload = MessageToJson (result )
96- else :
97- response_payload = f"{ type (result ).__name__ } : { pickle .dumps (result )} "
98- grpc_response = {
99- "payload" : response_payload ,
100- "metadata" : metadata ,
101- "status" : "OK" ,
102- }
103- _LOGGER .debug (
104- f"Received response for { client_call_details .method } ." ,
105- extra = {
106- "serviceName" : "google.pubsub.v1.Publisher" ,
107- "rpcName" : client_call_details .method ,
108- "response" : grpc_response ,
109- "metadata" : grpc_response ["metadata" ],
110- },
111- )
112- return response
113-
11433
11534class PublisherGrpcTransport (PublisherTransport ):
11635 """gRPC backend transport for Publisher.
@@ -267,12 +186,7 @@ def __init__(
267186 ],
268187 )
269188
270- self ._interceptor = _LoggingClientInterceptor ()
271- self ._logged_channel = grpc .intercept_channel (
272- self ._grpc_channel , self ._interceptor
273- )
274-
275- # Wrap messages. This must be done after self._logged_channel exists
189+ # Wrap messages. This must be done after self._grpc_channel exists
276190 self ._prep_wrapped_messages (client_info )
277191
278192 @classmethod
@@ -346,7 +260,7 @@ def create_topic(self) -> Callable[[pubsub.Topic], pubsub.Topic]:
346260 # gRPC handles serialization and deserialization, so we just need
347261 # to pass in the functions for each.
348262 if "create_topic" not in self ._stubs :
349- self ._stubs ["create_topic" ] = self ._logged_channel .unary_unary (
263+ self ._stubs ["create_topic" ] = self .grpc_channel .unary_unary (
350264 "/google.pubsub.v1.Publisher/CreateTopic" ,
351265 request_serializer = pubsub .Topic .serialize ,
352266 response_deserializer = pubsub .Topic .deserialize ,
@@ -372,7 +286,7 @@ def update_topic(self) -> Callable[[pubsub.UpdateTopicRequest], pubsub.Topic]:
372286 # gRPC handles serialization and deserialization, so we just need
373287 # to pass in the functions for each.
374288 if "update_topic" not in self ._stubs :
375- self ._stubs ["update_topic" ] = self ._logged_channel .unary_unary (
289+ self ._stubs ["update_topic" ] = self .grpc_channel .unary_unary (
376290 "/google.pubsub.v1.Publisher/UpdateTopic" ,
377291 request_serializer = pubsub .UpdateTopicRequest .serialize ,
378292 response_deserializer = pubsub .Topic .deserialize ,
@@ -397,7 +311,7 @@ def publish(self) -> Callable[[pubsub.PublishRequest], pubsub.PublishResponse]:
397311 # gRPC handles serialization and deserialization, so we just need
398312 # to pass in the functions for each.
399313 if "publish" not in self ._stubs :
400- self ._stubs ["publish" ] = self ._logged_channel .unary_unary (
314+ self ._stubs ["publish" ] = self .grpc_channel .unary_unary (
401315 "/google.pubsub.v1.Publisher/Publish" ,
402316 request_serializer = pubsub .PublishRequest .serialize ,
403317 response_deserializer = pubsub .PublishResponse .deserialize ,
@@ -421,7 +335,7 @@ def get_topic(self) -> Callable[[pubsub.GetTopicRequest], pubsub.Topic]:
421335 # gRPC handles serialization and deserialization, so we just need
422336 # to pass in the functions for each.
423337 if "get_topic" not in self ._stubs :
424- self ._stubs ["get_topic" ] = self ._logged_channel .unary_unary (
338+ self ._stubs ["get_topic" ] = self .grpc_channel .unary_unary (
425339 "/google.pubsub.v1.Publisher/GetTopic" ,
426340 request_serializer = pubsub .GetTopicRequest .serialize ,
427341 response_deserializer = pubsub .Topic .deserialize ,
@@ -447,7 +361,7 @@ def list_topics(
447361 # gRPC handles serialization and deserialization, so we just need
448362 # to pass in the functions for each.
449363 if "list_topics" not in self ._stubs :
450- self ._stubs ["list_topics" ] = self ._logged_channel .unary_unary (
364+ self ._stubs ["list_topics" ] = self .grpc_channel .unary_unary (
451365 "/google.pubsub.v1.Publisher/ListTopics" ,
452366 request_serializer = pubsub .ListTopicsRequest .serialize ,
453367 response_deserializer = pubsub .ListTopicsResponse .deserialize ,
@@ -476,7 +390,7 @@ def list_topic_subscriptions(
476390 # gRPC handles serialization and deserialization, so we just need
477391 # to pass in the functions for each.
478392 if "list_topic_subscriptions" not in self ._stubs :
479- self ._stubs ["list_topic_subscriptions" ] = self ._logged_channel .unary_unary (
393+ self ._stubs ["list_topic_subscriptions" ] = self .grpc_channel .unary_unary (
480394 "/google.pubsub.v1.Publisher/ListTopicSubscriptions" ,
481395 request_serializer = pubsub .ListTopicSubscriptionsRequest .serialize ,
482396 response_deserializer = pubsub .ListTopicSubscriptionsResponse .deserialize ,
@@ -509,7 +423,7 @@ def list_topic_snapshots(
509423 # gRPC handles serialization and deserialization, so we just need
510424 # to pass in the functions for each.
511425 if "list_topic_snapshots" not in self ._stubs :
512- self ._stubs ["list_topic_snapshots" ] = self ._logged_channel .unary_unary (
426+ self ._stubs ["list_topic_snapshots" ] = self .grpc_channel .unary_unary (
513427 "/google.pubsub.v1.Publisher/ListTopicSnapshots" ,
514428 request_serializer = pubsub .ListTopicSnapshotsRequest .serialize ,
515429 response_deserializer = pubsub .ListTopicSnapshotsResponse .deserialize ,
@@ -538,7 +452,7 @@ def delete_topic(self) -> Callable[[pubsub.DeleteTopicRequest], empty_pb2.Empty]
538452 # gRPC handles serialization and deserialization, so we just need
539453 # to pass in the functions for each.
540454 if "delete_topic" not in self ._stubs :
541- self ._stubs ["delete_topic" ] = self ._logged_channel .unary_unary (
455+ self ._stubs ["delete_topic" ] = self .grpc_channel .unary_unary (
542456 "/google.pubsub.v1.Publisher/DeleteTopic" ,
543457 request_serializer = pubsub .DeleteTopicRequest .serialize ,
544458 response_deserializer = empty_pb2 .Empty .FromString ,
@@ -570,7 +484,7 @@ def detach_subscription(
570484 # gRPC handles serialization and deserialization, so we just need
571485 # to pass in the functions for each.
572486 if "detach_subscription" not in self ._stubs :
573- self ._stubs ["detach_subscription" ] = self ._logged_channel .unary_unary (
487+ self ._stubs ["detach_subscription" ] = self .grpc_channel .unary_unary (
574488 "/google.pubsub.v1.Publisher/DetachSubscription" ,
575489 request_serializer = pubsub .DetachSubscriptionRequest .serialize ,
576490 response_deserializer = pubsub .DetachSubscriptionResponse .deserialize ,
@@ -595,7 +509,7 @@ def set_iam_policy(
595509 # gRPC handles serialization and deserialization, so we just need
596510 # to pass in the functions for each.
597511 if "set_iam_policy" not in self ._stubs :
598- self ._stubs ["set_iam_policy" ] = self ._logged_channel .unary_unary (
512+ self ._stubs ["set_iam_policy" ] = self .grpc_channel .unary_unary (
599513 "/google.iam.v1.IAMPolicy/SetIamPolicy" ,
600514 request_serializer = iam_policy_pb2 .SetIamPolicyRequest .SerializeToString ,
601515 response_deserializer = policy_pb2 .Policy .FromString ,
@@ -621,7 +535,7 @@ def get_iam_policy(
621535 # gRPC handles serialization and deserialization, so we just need
622536 # to pass in the functions for each.
623537 if "get_iam_policy" not in self ._stubs :
624- self ._stubs ["get_iam_policy" ] = self ._logged_channel .unary_unary (
538+ self ._stubs ["get_iam_policy" ] = self .grpc_channel .unary_unary (
625539 "/google.iam.v1.IAMPolicy/GetIamPolicy" ,
626540 request_serializer = iam_policy_pb2 .GetIamPolicyRequest .SerializeToString ,
627541 response_deserializer = policy_pb2 .Policy .FromString ,
@@ -650,15 +564,15 @@ def test_iam_permissions(
650564 # gRPC handles serialization and deserialization, so we just need
651565 # to pass in the functions for each.
652566 if "test_iam_permissions" not in self ._stubs :
653- self ._stubs ["test_iam_permissions" ] = self ._logged_channel .unary_unary (
567+ self ._stubs ["test_iam_permissions" ] = self .grpc_channel .unary_unary (
654568 "/google.iam.v1.IAMPolicy/TestIamPermissions" ,
655569 request_serializer = iam_policy_pb2 .TestIamPermissionsRequest .SerializeToString ,
656570 response_deserializer = iam_policy_pb2 .TestIamPermissionsResponse .FromString ,
657571 )
658572 return self ._stubs ["test_iam_permissions" ]
659573
660574 def close (self ):
661- self ._logged_channel .close ()
575+ self .grpc_channel .close ()
662576
663577 @property
664578 def kind (self ) -> str :
0 commit comments