11from decimal import Decimal
22import logging
3+ import math
34import time
45import threading
56from uuid import uuid4
1415
1516from databricks .sql .thrift_api .TCLIService import TCLIService , ttypes
1617from databricks .sql import *
17- from databricks .sql .utils import ArrowQueue , ExecuteResponse
18+ from databricks .sql .utils import ArrowQueue , ExecuteResponse , _bound , RequestErrorInfo , NoRetryReason
1819
1920logger = logging .getLogger (__name__ )
2021
22+ THRIFT_ERROR_MESSAGE_HEADER = "x-thriftserver-error-message"
23+ DATABRICKS_ERROR_OR_REDIRECT_HEADER = "x-databricks-error-or-redirect-message"
24+ DATABRICKS_REASON_HEADER = "x-databricks-reason-phrase"
25+
26+ # see Connection.__init__ for parameter descriptions.
27+ # - Min/Max avoids unsustainable configs (sane values are far more constrained)
28+ # - 900s attempts-duration lines up w ODBC/JDBC drivers (for cluster startup > 10 mins)
29+ _retry_policy = { # (type, default, min, max)
30+ "_retry_delay_min" : (float , 1 , 0.1 , 60 ),
31+ "_retry_delay_max" : (float , 60 , 5 , 3600 ),
32+ "_retry_stop_after_attempts_count" : (int , 30 , 1 , 60 ),
33+ "_retry_stop_after_attempts_duration" : (float , 900 , 1 , 86400 ),
34+ }
35+
2136
2237class ThriftBackend :
2338 CLOSED_OP_STATE = ttypes .TOperationState .CLOSED_STATE
2439 ERROR_OP_STATE = ttypes .TOperationState .ERROR_STATE
2540 BIT_MASKS = [1 , 2 , 4 , 8 , 16 , 32 , 64 , 128 ]
26- ERROR_MSG_HEADER = "X-Thriftserver-Error-Message"
2741
2842 def __init__ (self , server_hostname : str , port , http_path : str , http_headers , ** kwargs ):
2943 # Internal arguments in **kwargs:
@@ -43,8 +57,18 @@ def __init__(self, server_hostname: str, port, http_path: str, http_headers, **k
4357 # See https://docs.python.org/3/library/ssl.html#ssl.SSLContext.load_cert_chain
4458 # _connection_uri
4559 # Overrides server_hostname and http_path.
46- # _retry_stop_after_attempts_count
47- # The maximum number of times we should retry retryable requests (defaults to 24)
60+ # RETRY/ATTEMPT POLICY
61+ # _retry_delay_min (default: 1)
62+ # _retry_delay_max (default: 60)
63+ # {min,max} pre-retry delay bounds
64+ # _retry_stop_after_attempts_count (default: 30)
65+ # total max attempts during retry sequence
66+ # _retry_stop_after_attempts_duration (default: 900)
67+ # total max wait duration during retry sequence
68+ # (Note this will stop _before_ intentionally exceeding; thus if the
69+ # next calculated pre-retry delay would go past
70+ # _retry_stop_after_attempts_duration, stop now.)
71+ #
4872
4973 port = port or 443
5074 if kwargs .get ("_connection_uri" ):
@@ -55,7 +79,7 @@ def __init__(self, server_hostname: str, port, http_path: str, http_headers, **k
5579 else :
5680 raise ValueError ("No valid connection settings." )
5781
58- self ._retry_stop_after_attempts_count = kwargs . get ( "_retry_stop_after_attempts_count" , 24 )
82+ self ._initialize_retry_args ( kwargs )
5983
6084 # Configure tls context
6185 ssl_context = create_default_context (cafile = kwargs .get ("_tls_trusted_ca_file" ))
@@ -95,55 +119,156 @@ def __init__(self, server_hostname: str, port, http_path: str, http_headers, **k
95119
96120 self ._request_lock = threading .RLock ()
97121
122+ def _initialize_retry_args (self , kwargs ):
123+ # Configure retries & timing: use user-settings or defaults, and bound
124+ # by policy. Log.warn when given param gets restricted.
125+ for (key , (type_ , default , min , max )) in _retry_policy .items ():
126+ given_or_default = type_ (kwargs .get (key , default ))
127+ bound = _bound (min , max , given_or_default )
128+ setattr (self , key , bound )
129+ logger .debug ('retry parameter: {} given_or_default {}' .format (key , given_or_default ))
130+ if bound != given_or_default :
131+ logger .warn ('Override out of policy retry parameter: ' +
132+ '{} given {}, restricted to {}' .format (key , given_or_default , bound ))
133+
134+ # Fail on retry delay min > max; consider later adding fail on min > duration?
135+ if self ._retry_stop_after_attempts_count > 1 \
136+ and self ._retry_delay_min > self ._retry_delay_max :
137+ raise ValueError (
138+ "Invalid configuration enables retries with retry delay min(={}) > max(={})" .format (
139+ self ._retry_delay_min , self ._retry_delay_max ))
140+
98141 @staticmethod
99142 def _check_response_for_error (response ):
100143 if response .status and response .status .statusCode in \
101144 [ttypes .TStatusCode .ERROR_STATUS , ttypes .TStatusCode .INVALID_HANDLE_STATUS ]:
102145 raise DatabaseError (response .status .errorMessage )
103146
147+ @staticmethod
148+ def _extract_error_message_from_headers (headers ):
149+ err_msg = ""
150+ if THRIFT_ERROR_MESSAGE_HEADER in headers :
151+ err_msg = headers [THRIFT_ERROR_MESSAGE_HEADER ]
152+ if DATABRICKS_ERROR_OR_REDIRECT_HEADER in headers :
153+ if err_msg : # We don't expect both to be set, but log both here just in case
154+ err_msg = "Thriftserver error: {}, Databricks error: {}" .format (
155+ err_msg , headers [DATABRICKS_ERROR_OR_REDIRECT_HEADER ])
156+ else :
157+ err_msg = headers [DATABRICKS_ERROR_OR_REDIRECT_HEADER ]
158+ if DATABRICKS_REASON_HEADER in headers :
159+ err_msg += ": " + headers [DATABRICKS_REASON_HEADER ]
160+ return err_msg
161+
162+ def _handle_request_error (self , error_info , attempt , elapsed ):
163+ max_attempts = self ._retry_stop_after_attempts_count
164+ max_duration_s = self ._retry_stop_after_attempts_duration
165+
166+ if error_info .retry_delay is not None and elapsed + error_info .retry_delay > max_duration_s :
167+ no_retry_reason = NoRetryReason .OUT_OF_TIME
168+ elif error_info .retry_delay is not None and attempt >= max_attempts :
169+ no_retry_reason = NoRetryReason .OUT_OF_ATTEMPTS
170+ elif error_info .retry_delay is None :
171+ no_retry_reason = NoRetryReason .NOT_RETRYABLE
172+ else :
173+ no_retry_reason = None
174+
175+ full_error_info_str = error_info .full_info_logging_str (
176+ no_retry_reason , attempt , max_attempts , elapsed , max_duration_s )
177+
178+ if no_retry_reason is not None :
179+ user_friendly_error_message = error_info .user_friendly_error_message (
180+ no_retry_reason , attempt , elapsed )
181+ logger .info ("{}: {}" .format (user_friendly_error_message , full_error_info_str ))
182+
183+ raise OperationalError (user_friendly_error_message , error_info .error )
184+
185+ logger .info ("Retrying request after error in {} seconds: {}" .format (
186+ error_info .retry_delay , full_error_info_str ))
187+ time .sleep (error_info .retry_delay )
188+
104189 # FUTURE: Consider moving to https://github.com/litl/backoff or
105- # https://github.com/jd/tenacity for retry logic. Otherwise, copy from
106- # v1 client.
107- def make_request (self , method , request , attempt_number = 1 ):
108- try :
190+ # https://github.com/jd/tenacity for retry logic.
191+ def make_request (self , method , request ):
192+ """Execute given request, attempting retries when receiving HTTP 429/503.
193+
194+ For delay between attempts, honor the given Retry-After header, but with bounds.
195+ Use lower bound of expontial-backoff based on _retry_delay_min,
196+ and upper bound of _retry_delay_max.
197+ Will stop retry attempts if total elapsed time + next retry delay would exceed
198+ _retry_stop_after_attempts_duration.
199+ """
200+ # basic strategy: build range iterator rep'ing number of available
201+ # retries. bounds can be computed from there. iterate over it with
202+ # retries until success or final failure achieved.
203+
204+ t0 = time .time ()
205+
206+ def get_elapsed ():
207+ return time .time () - t0
208+
209+ def extract_retry_delay (attempt ):
210+ # encapsulate retry checks, returns None || delay-in-secs
211+ # Retry IFF 429/503 code + Retry-After header set
212+ http_code = getattr (self ._transport , "code" , None )
213+ retry_after = getattr (self ._transport , "headers" , {}).get ("Retry-After" )
214+ if http_code in [429 , 503 ] and retry_after :
215+ # bound delay (seconds) by [min_delay*1.5^(attempt-1), max_delay]
216+ delay = int (retry_after )
217+ delay = max (delay , self ._retry_delay_min * math .pow (1.5 , attempt - 1 ))
218+ delay = min (delay , self ._retry_delay_max )
219+ return delay
220+ return None
221+
222+ def attempt_request (attempt ):
223+ # splits out lockable attempt, from delay & retry loop
224+ # returns tuple: (method_return, delay_fn(), error, error_message)
225+ # - non-None method_return -> success, return and be done
226+ # - non-None retry_delay -> sleep delay before retry
227+ # - error, error_message always set when available
228+ try :
229+ logger .debug ("Sending request: {}" .format (request ))
230+ response = method (request )
231+ logger .debug ("Received response: {}" .format (response ))
232+ return response
233+ except Exception as error :
234+ retry_delay = extract_retry_delay (attempt )
235+ error_message = ThriftBackend ._extract_error_message_from_headers (
236+ getattr (self ._transport , "headers" , {}))
237+ return RequestErrorInfo (
238+ error = error ,
239+ error_message = error_message ,
240+ retry_delay = retry_delay ,
241+ http_code = getattr (self ._transport , "code" , None ),
242+ method = method .__name__ ,
243+ request = request )
244+
245+ # The real work:
246+ # - for each available attempt:
247+ # lock-and-attempt
248+ # return on success
249+ # if available: bounded delay and retry
250+ # if not: raise error
251+ max_attempts = self ._retry_stop_after_attempts_count
252+
253+ # use index-1 counting for logging/human consistency
254+ for attempt in range (1 , max_attempts + 1 ):
109255 # We have a lock here because .cancel can be called from a separate thread.
110256 # We do not want threads to be simultaneously sharing the Thrift Transport
111257 # because we use its state to determine retries
112- self ._request_lock .acquire ()
113- logger .warning ("Sending request: {}" .format (request ))
114- response = method (request )
115- logger .warning ("Received response: {}" .format (response ))
116- ThriftBackend ._check_response_for_error (response )
117- return response
118- except Exception as error :
119- # _transport.code isn't necessarily set :(
120- code_and_headers_is_set = hasattr (self ._transport , 'code' ) \
121- and hasattr (self ._transport , 'headers' )
122- # We only retry if a Retry-After header is set
123- if code_and_headers_is_set and self ._transport .code in [503 , 429 ] and \
124- "Retry-After" in self ._transport .headers and \
125- attempt_number <= self ._retry_stop_after_attempts_count - 1 :
126- retry_time_seconds = int (self ._transport .headers ["Retry-After" ])
127- if self .ERROR_MSG_HEADER in self ._transport .headers :
128- error_message = self ._transport .headers [self .ERROR_MSG_HEADER ]
129- else :
130- error_message = str (error )
131- logger .warning ("Received retryable error during {}. Request: {} Error: {}" .format (
132- method , request , error_message ))
133- logger .warning ("Retrying in {} seconds. This is attempt number {}" .format (
134- retry_time_seconds , attempt_number ))
135- time .sleep (retry_time_seconds )
136- return self .make_request (method , request , attempt_number + 1 )
137- else :
138- logger .error ("Received error when issuing: {}" .format (request ))
139- if hasattr (self ._transport , "headers" ) and \
140- self .ERROR_MSG_HEADER in self ._transport .headers :
141- error_message = self ._transport .headers [self .ERROR_MSG_HEADER ]
142- raise OperationalError ("Error during Thrift request: {}" .format (error_message ))
143- else :
144- raise OperationalError ("Error during Thrift request" , error )
145- finally :
146- self ._request_lock .release ()
258+ with self ._request_lock :
259+ response_or_error_info = attempt_request (attempt )
260+ elapsed = get_elapsed ()
261+
262+ # conditions: success, non-retry-able, no-attempts-left, no-time-left, delay+retry
263+ if not isinstance (response_or_error_info , RequestErrorInfo ):
264+ # log nothing here, presume that main request logging covers
265+ response = response_or_error_info
266+ ThriftBackend ._check_response_for_error (response )
267+ return response
268+
269+ error_info = response_or_error_info
270+ # The error handler will either sleep or throw an exception
271+ self ._handle_request_error (error_info , attempt , elapsed )
147272
148273 def _check_protocol_version (self , t_open_session_resp ):
149274 protocol_version = t_open_session_resp .serverProtocolVersion
0 commit comments