Skip to content

Commit 2df0e05

Browse files
Niall Egansusodapop
authored andcommitted
.description attribute
This PR implements the `description` attribute. To quote PEP-249: ``` This read-only attribute is a sequence of 7-item sequences. Each of these sequences contains information describing one result column: name type_code display_size internal_size precision scale null_ok The first two items (name and type_code) are mandatory, the other five are optional and are set to None if no meaningful values can be provided. This attribute will be None for operations that do not return rows or if the cursor has not had an operation invoked via the .execute*() method yet. The type_code can be interpreted by comparing it to the Type Objects specified in the section below. ``` I only implemented the first two attributes for now as the others are optional, we can revisit the others later. For now, the actual `description` `type_code` will return the name of the Scala types, as this is what the cmdexec server returns. However, once we put the translation layer in place, it will return the appropriate value in the Thrift type enum. This will bring it inline with the PyHive client. * Runs against the ODBC smoke test * New integration test to check comparison to type objects Author: Niall Egan <niall.egan@databricks.com>
1 parent 6a06556 commit 2df0e05

File tree

4 files changed

+79
-28
lines changed

4 files changed

+79
-28
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,21 @@
11
from .client import Connection
22

33

4+
class _DBAPITypeObject(object):
5+
def __init__(self, *values):
6+
self.values = values
7+
8+
def __eq__(self, other):
9+
return other in self.values
10+
11+
12+
STRING = _DBAPITypeObject('string')
13+
BINARY = _DBAPITypeObject('binary')
14+
NUMBER = _DBAPITypeObject('boolean', 'byte', 'short', 'integer', 'long', 'double', 'decimal')
15+
DATETIME = _DBAPITypeObject('timestamp')
16+
DATE = _DBAPITypeObject('date')
17+
ROWID = _DBAPITypeObject()
18+
19+
420
def connect(**kwargs):
521
return Connection(**kwargs)

cmdexec/clients/python/src/dbsql/connector/client.py

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ def close(self):
6464
class Cursor:
6565
def __init__(self, connection, result_buffer_size_bytes=DEFAULT_RESULT_BUFFER_SIZE_BYTES):
6666
self.connection = connection
67-
self.description = None
6867
self.rowcount = -1
6968
self.arraysize = 1
7069
self.buffer_size_bytes = result_buffer_size_bytes
@@ -88,12 +87,15 @@ def __iter__(self):
8887
def _response_to_result_set(self, execute_command_response, status):
8988
command_id = execute_command_response.command_id
9089
arrow_results = execute_command_response.results.arrow_ipc_stream
90+
schema_message = execute_command_response.results.metadata \
91+
and execute_command_response.results.metadata.schema
9192
has_been_closed_server_side = execute_command_response.closed
9293
has_more_rows = execute_command_response.results.has_more_rows
9394
num_valid_rows = execute_command_response.results.num_valid_rows
9495

9596
return ResultSet(self.connection, command_id, status, has_been_closed_server_side,
96-
has_more_rows, arrow_results, num_valid_rows, self.buffer_size_bytes)
97+
has_more_rows, arrow_results, num_valid_rows, schema_message,
98+
self.buffer_size_bytes)
9799

98100
def _close_and_clear_active_result_set(self):
99101
try:
@@ -145,8 +147,10 @@ def execute(self, operation, query_params=None, metadata=None):
145147
command=operation,
146148
conf_overlay=None,
147149
row_limit=None,
148-
result_options=None,
149-
)
150+
result_options=messages_pb2.CommandResultOptions(
151+
max_bytes=self.buffer_size_bytes,
152+
include_metadata=True,
153+
))
150154

151155
execute_command_response = self.connection.base_client.make_request(
152156
self.connection.base_client.stub.ExecuteCommand, execute_command_request)
@@ -186,6 +190,13 @@ def close(self):
186190
if self.active_result_set:
187191
self._close_and_clear_active_result_set()
188192

193+
@property
194+
def description(self):
195+
if self.active_result_set:
196+
return self.active_result_set.description
197+
else:
198+
return None
199+
189200

190201
class ResultSet:
191202
def __init__(self,
@@ -196,6 +207,7 @@ def __init__(self,
196207
has_more_rows,
197208
arrow_ipc_stream=None,
198209
num_valid_rows=None,
210+
schema_message=None,
199211
result_buffer_size_bytes=DEFAULT_RESULT_BUFFER_SIZE_BYTES):
200212
self.connection = connection
201213
self.command_id = command_id
@@ -204,14 +216,16 @@ def __init__(self,
204216
self.has_more_rows = has_more_rows
205217
self.buffer_size_bytes = result_buffer_size_bytes
206218
self._row_index = 0
219+
self.description = None
207220

208221
assert (self.status not in [messages_pb2.PENDING, messages_pb2.RUNNING])
209222

210223
if arrow_ipc_stream:
211-
# In the case we are passed in an initial result set, the server has taken the
212-
# fast path and has no more rows to send
224+
# In this case the server has taken the fast path and returned an initial batch of
225+
# results
213226
self.results = ArrowQueue(
214227
pyarrow.ipc.open_stream(arrow_ipc_stream).read_all(), num_valid_rows, 0)
228+
self.description = self._get_schema_description(schema_message)
215229
else:
216230
# In this case, there are results waiting on the server so we fetch now for simplicity
217231
self._fill_results_buffer()
@@ -239,17 +253,19 @@ def _fetch_and_deserialize_results(self):
239253
arrow_table = pyarrow.ipc.open_stream(result_message.arrow_ipc_stream).read_all()
240254
results = ArrowQueue(arrow_table, num_valid_rows,
241255
self._row_index - result_message.start_row_offset)
242-
return results, result_message.has_more_rows
256+
description = self._get_schema_description(result_message.metadata.schema)
257+
return results, result_message.has_more_rows, description
243258

244259
def _fill_results_buffer(self):
245260
if self.status == messages_pb2.CLOSED:
246261
raise Error("Can't fetch results on closed command %s" % self.command_id)
247262
elif self.status == messages_pb2.ERROR:
248263
raise DatabaseError("Command %s failed" % self.command_id)
249264
else:
250-
results, has_more_rows = self._fetch_and_deserialize_results()
265+
results, has_more_rows, description = self._fetch_and_deserialize_results()
251266
self.results = results
252267
self.has_more_rows = has_more_rows
268+
self.description = description
253269

254270
@staticmethod
255271
def _convert_arrow_table(table):
@@ -334,6 +350,21 @@ def close(self):
334350
self.has_been_closed_server_side = True
335351
self.status = messages_pb2.CLOSED
336352

353+
@staticmethod
354+
def _get_schema_description(table_schema_message):
355+
"""
356+
Takes a TableSchema message and returns a description 7-tuple as specified by PEP-249
357+
"""
358+
359+
def map_col_type(type_):
360+
if type_.startswith('decimal'):
361+
return 'decimal'
362+
else:
363+
return type_
364+
365+
return [(column.name, map_col_type(column.type), None, None, None, None, None)
366+
for column in table_schema_message.columns]
367+
337368

338369
class ArrowQueue:
339370
def __init__(self, arrow_table, n_valid_rows, start_row_index):

cmdexec/clients/python/tests/test_fetches.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import unittest
2+
from unittest.mock import MagicMock
23
from collections import deque
34

45
import pyarrow as pa
@@ -40,13 +41,14 @@ def make_dummy_result_set_from_initial_results(initial_results):
4041
# If the initial results have been set, then we should never try and fetch more
4142
arrow_ipc_stream = FetchTests.make_arrow_ipc_stream(initial_results)
4243
return client.ResultSet(
43-
None,
44-
None,
45-
None,
46-
True,
44+
connection=None,
45+
command_id=None,
46+
status=None,
47+
has_been_closed_server_side=True,
48+
has_more_rows=False,
4749
arrow_ipc_stream=arrow_ipc_stream,
4850
num_valid_rows=len(initial_results),
49-
has_more_rows=False)
51+
schema_message=MagicMock())
5052

5153
@staticmethod
5254
def make_dummy_result_set_from_batch_list(batch_list):
@@ -58,7 +60,8 @@ def _fetch_and_deserialize_results(self):
5860
results = FetchTests.make_arrow_queue(batch_list[batch_index])
5961
batch_index += 1
6062

61-
return results, batch_index < len(batch_list)
63+
return results, batch_index < len(batch_list), \
64+
[('id', 'integer', None, None, None, None, None)]
6265

6366
return SemiFakeResultSet(None, None, None, False, False)
6467

cmdexec/clients/python/tests/tests.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def test_missing_params_throws_interface_exception(self):
3939
@patch("%s.client.CmdExecBaseHttpClient" % PACKAGE_NAME)
4040
def test_close_uses_the_correct_session_id(self, mock_client_class):
4141
instance = mock_client_class.return_value
42-
mock_response = Mock()
42+
mock_response = MagicMock()
4343
mock_response.id = b'\x22'
4444
instance.make_request.return_value = mock_response
4545
good_connection_args = {"HOST": 1, "PORT": 1}
@@ -58,7 +58,7 @@ def test_closing_connection_closes_commands(self, mock_result_set_class, mock_cl
5858
for closed in (True, False):
5959
with self.subTest(closed=closed):
6060
instance = mock_client_class.return_value
61-
mock_response = Mock()
61+
mock_response = MagicMock()
6262
mock_response.id = b'\x22'
6363
instance.make_request.return_value = mock_response
6464
instance.stub.CloseCommand = Mock()
@@ -79,7 +79,7 @@ def test_closing_connection_closes_commands(self, mock_result_set_class, mock_cl
7979
@patch("%s.client.CmdExecBaseHttpClient" % PACKAGE_NAME)
8080
def test_cant_open_cursor_on_closed_connection(self, mock_client_class):
8181
instance = mock_client_class.return_value
82-
mock_response = Mock()
82+
mock_response = MagicMock()
8383
mock_response.id = b'\x22'
8484
instance.make_request.return_value = mock_response
8585
good_connection_args = {"HOST": 1, "PORT": 1}
@@ -95,17 +95,18 @@ def test_cant_open_cursor_on_closed_connection(self, mock_client_class):
9595
def test_closing_result_set_with_closed_connection_soft_closes_commands(
9696
self, pyarrow_ipc_open_stream):
9797
mock_connection = Mock()
98-
mock_response = Mock()
98+
mock_response = MagicMock()
9999
mock_response.id = b'\x22'
100100
mock_connection.base_client.make_request.return_value = mock_response
101101
result_set = client.ResultSet(
102-
mock_connection,
103-
b'\x10',
104-
command_pb2.SUCCESS,
105-
False,
102+
connection=mock_connection,
103+
command_id=b'\x10',
104+
status=command_pb2.SUCCESS,
105+
has_been_closed_server_side=False,
106106
arrow_ipc_stream=Mock(),
107107
num_valid_rows=0,
108-
has_more_rows=False)
108+
has_more_rows=False,
109+
schema_message=MagicMock())
109110
mock_connection.open = False
110111

111112
result_set.close()
@@ -118,7 +119,7 @@ def test_closing_result_set_with_closed_connection_soft_closes_commands(
118119
@patch("pyarrow.ipc.open_stream")
119120
def test_closing_result_set_hard_closes_commands(self, pyarrow_ipc_open_stream):
120121
mock_connection = Mock()
121-
mock_response = Mock()
122+
mock_response = MagicMock()
122123
mock_response.id = b'\x22'
123124
mock_response.results.start_row_offset = 0
124125
mock_connection.base_client.make_request.return_value = mock_response
@@ -135,7 +136,7 @@ def test_closing_result_set_hard_closes_commands(self, pyarrow_ipc_open_stream):
135136
@patch("%s.client.ResultSet" % PACKAGE_NAME)
136137
def test_executing_multiple_commands_uses_the_most_recent_command(self, mock_result_set_class):
137138
mock_client = Mock()
138-
mock_response = Mock()
139+
mock_response = MagicMock()
139140
mock_connection = Mock()
140141
mock_response.id = b'\x22'
141142
mock_response.status.state = command_pb2.SUCCESS
@@ -159,7 +160,7 @@ def test_executing_multiple_commands_uses_the_most_recent_command(self, mock_res
159160

160161
def test_closed_cursor_doesnt_allow_operations(self):
161162
mock_connection = Mock()
162-
mock_response = Mock()
163+
mock_response = MagicMock()
163164
mock_response.id = b'\x22'
164165
mock_response.status.state = command_pb2.SUCCESS
165166
mock_connection.base_client.make_request.return_value = mock_response
@@ -178,7 +179,7 @@ def test_closed_cursor_doesnt_allow_operations(self):
178179
@patch("pyarrow.ipc.open_stream")
179180
def test_negative_fetch_throws_exception(self, pyarrow_ipc_open_stream_mock):
180181
mock_connection = Mock()
181-
mock_response = Mock()
182+
mock_response = MagicMock()
182183
mock_response.id = b'\x22'
183184
mock_response.results.start_row_offset = 0
184185
mock_response.status.state = command_pb2.SUCCESS
@@ -199,7 +200,7 @@ def test_context_manager_closes_cursor(self):
199200
@patch("%s.client.CmdExecBaseHttpClient" % PACKAGE_NAME)
200201
def test_context_manager_closes_connection(self, mock_client_class):
201202
instance = mock_client_class.return_value
202-
mock_response = Mock()
203+
mock_response = MagicMock()
203204
mock_response.id = b'\x22'
204205
instance.make_request.return_value = mock_response
205206
good_connection_args = {"HOST": 1, "PORT": 1}

0 commit comments

Comments
 (0)