Skip to content

Commit 06551a0

Browse files
committed
more comments
1 parent 63661f2 commit 06551a0

File tree

5 files changed

+62
-21
lines changed

5 files changed

+62
-21
lines changed

src/databricks/sql/client.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1402,17 +1402,42 @@ def _convert_columnar_table(self, table):
14021402
result.append(ResultRow(*curr_row))
14031403

14041404
return result
1405+
1406+
def print_mem(self):
1407+
import os
1408+
import psutil
1409+
1410+
process = psutil.Process(os.getpid())
1411+
mem_info = process.memory_info()
1412+
total_mem_mb = mem_info.rss / 1024 / 1024
1413+
cpu_percent = process.cpu_percent(interval=0.1)
1414+
print(f"Total memory usage: {total_mem_mb:.2f} MB")
1415+
print(f"CPU percent: {cpu_percent:.2f}%")
1416+
# total_size_bytes = table.get_total_buffer_size()
1417+
# total_size_mb = total_size_bytes / (1024 * 1024)
1418+
1419+
# print(f"Total PyArrow table size: {total_size_bytes} bytes ({total_size_mb:.2f} MB)")
14051420

14061421
def _convert_arrow_table(self, table: "pyarrow.Table"):
1422+
import sys
1423+
from pympler import asizeof
1424+
1425+
self.print_mem()
1426+
print(f"Memory size table: {table.nbytes / (1024 ** 2):.2f} MB")
1427+
# Convert to MB for easier reading
14071428
column_names = [c[0] for c in self.description]
14081429
ResultRow = Row(*column_names)
14091430

14101431
if self.connection.disable_pandas is True:
14111432
start_time = time.time()
14121433
columns_as_lists = [col.to_pylist() for col in table.itercolumns()]
1434+
self.print_mem()
1435+
print(f"Memory size columns_as_lists: {sum(sys.getsizeof(col) for col in columns_as_lists) / (1024 ** 2):.2f} MB")
14131436
res = [ResultRow(*row) for row in zip(*columns_as_lists)]
1437+
self.print_mem()
14141438
end_time = time.time()
14151439
print(f"Time taken to convert arrow table to list: {end_time - start_time} seconds")
1440+
print(f"Memory size res: {sum(sys.getsizeof(row) for row in res) / (1024 ** 2):.2f} MB")
14161441
return res
14171442

14181443
start_time = time.time()
@@ -1436,14 +1461,23 @@ def _convert_arrow_table(self, table: "pyarrow.Table"):
14361461

14371462
# Need to rename columns, as the to_pandas function cannot handle duplicate column names
14381463
table_renamed = table.rename_columns([str(c) for c in range(table.num_columns)])
1464+
print(f"Memory size table_renamed: {table_renamed.nbytes / (1024 ** 2):.2f} MB")
14391465
df = table_renamed.to_pandas(
14401466
types_mapper=dtype_mapping.get,
14411467
date_as_object=True,
14421468
timestamp_as_object=True,
1469+
self_destruct=True,
14431470
)
1471+
print(f"Memory size df: {df.memory_usage(deep=True).sum() / (1024 ** 2):.2f} MB")
1472+
self.print_mem()
1473+
# del table_renamed
14441474

14451475
res = df.to_numpy(na_value=None, dtype="object")
1476+
print(f"Memory size res: {res.nbytes / (1024 ** 2):.2f} MB")
1477+
self.print_mem()
1478+
# del df
14461479
tmp_res = [ResultRow(*v) for v in res]
1480+
self.print_mem()
14471481
end_time = time.time()
14481482
print(f"Time taken to convert arrow table to list: {end_time - start_time} seconds")
14491483
return tmp_res
@@ -1471,7 +1505,7 @@ def fetchmany_arrow(self, size: int) -> "pyarrow.Table":
14711505
and not self.has_been_closed_server_side
14721506
and self.has_more_rows
14731507
):
1474-
print(f"TOTAL DATA ROWS {TOTAL_SIZE}")
1508+
# print(f"TOTAL DATA ROWS {TOTAL_SIZE}")
14751509
self._fill_results_buffer()
14761510
partial_results = self.results.next_n_rows(n_remaining_rows)
14771511
results.append(partial_results)
@@ -1515,11 +1549,11 @@ def fetchall_arrow(self) -> "pyarrow.Table":
15151549
self._next_row_index += results.num_rows
15161550

15171551
# partial_result_chunks = [results]
1518-
print("Server side has more rows", self.has_more_rows)
1552+
# print("Server side has more rows", self.has_more_rows)
15191553
TOTAL_SIZE = results.num_rows
15201554

15211555
while not self.has_been_closed_server_side and self.has_more_rows:
1522-
print(f"TOTAL DATA ROWS {TOTAL_SIZE}")
1556+
# print(f"TOTAL DATA ROWS {TOTAL_SIZE}")
15231557
self._fill_results_buffer()
15241558
partial_results = self.results.remaining_rows()
15251559
results.append(partial_results)

src/databricks/sql/cloudfetch/download_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ def _schedule_downloads(self):
8484
"""
8585
While download queue has a capacity, peek pending links and submit them to thread pool.
8686
"""
87-
print("Schedule_downloads")
88-
logger.debug("ResultFileDownloadManager: schedule downloads")
87+
# print("Schedule_downloads")
88+
# logger.debug("ResultFileDownloadManager: schedule downloads")
8989
while (len(self._download_tasks) < self._max_download_threads) and (
9090
len(self._pending_links) > 0
9191
):

src/databricks/sql/cloudfetch/downloader.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,9 @@ def run(self) -> DownloadedFile:
9999
verify=self._ssl_options.tls_verify,
100100
headers=self.link.httpHeaders
101101
) as response:
102-
print_text = [
102+
# print_text = [
103103

104-
]
104+
# ]
105105

106106
response.raise_for_status()
107107

@@ -127,12 +127,12 @@ def run(self) -> DownloadedFile:
127127
)
128128
)
129129

130-
print_text.append(
131-
f"Downloaded file startRowOffset - {self.link.startRowOffset} - rowCount - {self.link.rowCount}"
132-
)
130+
# print_text.append(
131+
# f"Downloaded file startRowOffset - {self.link.startRowOffset} - rowCount - {self.link.rowCount}"
132+
# )
133133

134-
for text in print_text:
135-
print(text)
134+
# for text in print_text:
135+
# print(text)
136136

137137
return DownloadedFile(
138138
decompressed_data,

src/databricks/sql/common/http.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def execute(
7373
start_time = time.time()
7474
response = self.session.request(method.value, url, **kwargs)
7575
end_time = time.time()
76-
print(f"Downloaded file in {end_time - start_time} seconds")
76+
# print(f"Downloaded file in {end_time - start_time} seconds")
7777
yield response
7878
except Exception as e:
7979
logger.error("Error executing HTTP request in DatabricksHttpClient: %s", e)

src/databricks/sql/utils.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,12 @@ def batch_generator():
219219
yield self.convert_decimals_in_record_batch(batch)
220220

221221
return pyarrow.Table.from_batches(batch_generator())
222-
222+
223+
def remove_extraneous_rows(self):
224+
num_rows_in_data = sum(batch.num_rows for batch in self.record_batches)
225+
if num_rows_in_data > self.num_rows:
226+
self.record_batches = self.record_batches[:self.num_rows]
227+
self.num_rows = self.num_rows
223228

224229
class ColumnQueue(ResultSetQueue):
225230
def __init__(self, column_table: ColumnTable):
@@ -319,8 +324,8 @@ def __init__(
319324
result_link.startRowOffset, result_link.rowCount
320325
)
321326
)
322-
print("Initial Setup Cloudfetch Queue")
323-
print(f"No of result links - {len(result_links)}")
327+
# print("Initial Setup Cloudfetch Queue")
328+
# print(f"No of result links - {len(result_links)}")
324329
self.download_manager = ResultFileDownloadManager(
325330
links=result_links or [],
326331
max_download_threads=self.max_download_threads,
@@ -383,8 +388,8 @@ def remaining_rows(self):
383388
# results = self.table.slice(0, 0)
384389
# result = self._create_empty_table()
385390

386-
print("remaining_rows call")
387-
print(f"self.table.num_rows - {self.table.num_rows}")
391+
# print("remaining_rows call")
392+
# print(f"self.table.num_rows - {self.table.num_rows}")
388393
while self.table:
389394
# table_slice = self.table.slice(
390395
# self.table_row_index, self.table.num_rows - self.table_row_index
@@ -393,7 +398,7 @@ def remaining_rows(self):
393398
# self.table_row_index += table_slice.num_rows
394399
self.table = self._create_next_table()
395400
# self.table_row_index = 0
396-
print(f"result.num_rows - {result.num_rows}")
401+
# print(f"result.num_rows - {result.num_rows}")
397402
return result
398403

399404
def _create_next_table(self) -> ArrowStreamTable:
@@ -419,6 +424,8 @@ def _create_next_table(self) -> ArrowStreamTable:
419424
list(pyarrow.ipc.open_stream(downloaded_file.file_bytes)),
420425
downloaded_file.row_count,
421426
self.description)
427+
428+
arrow_stream_table.remove_extraneous_rows()
422429
# arrow_table = create_arrow_table_from_arrow_file(
423430
# downloaded_file.file_bytes, self.description
424431
# )
@@ -439,8 +446,8 @@ def _create_next_table(self) -> ArrowStreamTable:
439446
)
440447
)
441448

442-
print("_create_next_table")
443-
print(f"arrow_stream_table.num_rows - {arrow_stream_table.num_rows}")
449+
# print("_create_next_table")
450+
# print(f"arrow_stream_table.num_rows - {arrow_stream_table.num_rows}")
444451
return arrow_stream_table
445452

446453
def _create_empty_table(self) -> ArrowStreamTable:

0 commit comments

Comments
 (0)