Skip to content

Commit c910244

Browse files
init cloud fetch and a bunch of experiments
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 8da6f5a commit c910244

15 files changed

+2191
-41
lines changed
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Test script specifically for testing SEA cloud fetch with multiple chunks.
4+
This script focuses on verifying that the Python connector can properly handle
5+
multiple chunks from the SEA API.
6+
"""
7+
8+
import os
9+
import sys
10+
import logging
11+
import time
12+
import pyarrow
13+
from databricks.sql.client import Connection
14+
15+
# Set up detailed logging
16+
logging.basicConfig(
17+
level=logging.DEBUG,
18+
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
19+
)
20+
logger = logging.getLogger(__name__)
21+
22+
def test_sea_cloud_fetch_multiple_chunks():
23+
"""
24+
Test the SEA cloud fetch implementation with multiple chunks.
25+
26+
This function specifically tests:
27+
1. Very large dataset to force multiple chunks in the manifest
28+
2. Detailed logging of chunk information
29+
3. Progressive fetching to observe chunk transitions
30+
"""
31+
# Use the values from export-tests-e2e.sh
32+
server_hostname = "adb-6436897454825492.12.azuredatabricks.net"
33+
http_path = "/sql/1.0/warehouses/2f03dd43e35e2aa0"
34+
access_token = os.environ.get("DATABRICKS_TOKEN")
35+
catalog = "peco"
36+
schema = "default" # Using default schema which should be available
37+
38+
if not access_token:
39+
logger.error("Missing required environment variable DATABRICKS_TOKEN.")
40+
sys.exit(1)
41+
42+
try:
43+
# Create connection with SEA backend
44+
logger.info("Creating connection with SEA backend...")
45+
connection = Connection(
46+
server_hostname=server_hostname,
47+
http_path=http_path,
48+
access_token=access_token,
49+
catalog=catalog,
50+
schema=schema,
51+
use_sea=True,
52+
use_cloud_fetch=True, # Enable cloud fetch to trigger EXTERNAL_LINKS + ARROW
53+
user_agent_entry="SEA-Test-Client",
54+
)
55+
56+
logger.info(f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}")
57+
58+
# Create cursor with a moderate arraysize
59+
# We want chunks to be based on data size, not row count
60+
cursor = connection.cursor(arraysize=1000)
61+
62+
# Execute a query that generates a VERY large dataset with large string values
63+
# This should force the server to split the result into multiple chunks
64+
query = """
65+
WITH large_dataset AS (
66+
SELECT
67+
id,
68+
id * 2 as double_id,
69+
id * 3 as triple_id,
70+
concat('value_', repeat(cast(id as string), 100)) as large_string_value,
71+
array_repeat(id, 50) as large_array_value,
72+
rand() as random_val,
73+
current_timestamp() as current_time
74+
FROM range(1, 100000) AS t(id)
75+
)
76+
SELECT * FROM large_dataset
77+
"""
78+
logger.info(f"Executing query with large complex data: {query}")
79+
cursor.execute(query)
80+
81+
# Access the underlying result set to log chunk information
82+
result_set = cursor.active_result_set
83+
84+
# Directly examine the raw API response to see the full manifest
85+
if hasattr(result_set, 'results') and hasattr(result_set.results, '_sea_client') and hasattr(result_set.results, '_statement_id'):
86+
logger.info("Examining raw API response to see full manifest...")
87+
sea_client = result_set.results._sea_client
88+
statement_id = result_set.results._statement_id
89+
90+
# Make a direct API call to get the statement status
91+
try:
92+
response = sea_client.http_client._make_request(
93+
method="GET",
94+
path=f"/api/2.0/sql/statements/{statement_id}",
95+
data={}
96+
)
97+
98+
# Log the manifest information
99+
if "manifest" in response:
100+
manifest = response["manifest"]
101+
logger.info(f"Manifest from API: {manifest}")
102+
103+
# Log chunk information
104+
if "chunks" in manifest:
105+
chunks = manifest["chunks"]
106+
logger.info(f"Number of chunks in manifest: {len(chunks)}")
107+
for i, chunk in enumerate(chunks):
108+
logger.info(f"Chunk {i}: index={chunk.get('chunk_index')}, row_count={chunk.get('row_count')}, row_offset={chunk.get('row_offset')}, byte_count={chunk.get('byte_count')}")
109+
110+
# Log total_row_count and total_chunk_count
111+
if "total_row_count" in manifest:
112+
total_row_count = manifest["total_row_count"]
113+
logger.info(f"Total row count: {total_row_count}")
114+
115+
if "total_chunk_count" in manifest:
116+
total_chunk_count = manifest["total_chunk_count"]
117+
logger.info(f"Total chunk count: {total_chunk_count}")
118+
119+
# Log the external links information
120+
if "result" in response and "external_links" in response["result"]:
121+
external_links = response["result"]["external_links"]
122+
logger.info(f"Number of external links in response: {len(external_links)}")
123+
for i, link in enumerate(external_links):
124+
logger.info(f"Link {i}: chunk_index={link.get('chunk_index')}, row_count={link.get('row_count')}, next_chunk_index={link.get('next_chunk_index')}")
125+
except Exception as e:
126+
logger.warning(f"Error examining raw API response: {str(e)}")
127+
128+
# Log initial chunk information
129+
if hasattr(result_set, 'results'):
130+
logger.info(f"Result set type: {type(result_set.results)}")
131+
132+
if hasattr(result_set.results, '_total_chunk_count'):
133+
logger.info(f"Total chunks: {result_set.results._total_chunk_count}")
134+
135+
if hasattr(result_set.results, 'initial_links'):
136+
logger.info(f"Initial links count: {len(result_set.results.initial_links)}")
137+
for i, link in enumerate(result_set.results.initial_links):
138+
logger.info(f"Link {i}: chunk_index={link.chunk_index}, row_count={link.row_count}, next_chunk_index={link.next_chunk_index}")
139+
140+
if hasattr(result_set.results, '_fetched_chunk_indices'):
141+
logger.info(f"Initial fetched chunk indices: {result_set.results._fetched_chunk_indices}")
142+
143+
# Fetch data in progressively larger batches to force multiple chunk fetches
144+
# We'll fetch a large number of rows to ensure we need to fetch multiple chunks
145+
batch_sizes = [1000, 5000, 10000, 20000]
146+
total_rows_fetched = 0
147+
148+
for i, batch_size in enumerate(batch_sizes):
149+
logger.info(f"Fetching batch {i+1} ({batch_size} rows)...")
150+
rows = cursor.fetchmany(batch_size)
151+
total_rows_fetched += len(rows)
152+
logger.info(f"Fetched {len(rows)} rows (total: {total_rows_fetched})")
153+
154+
# Log chunk information after each fetch
155+
if hasattr(result_set, 'results') and hasattr(result_set.results, '_fetched_chunk_indices'):
156+
logger.info(f"Fetched chunk indices after batch {i+1}: {result_set.results._fetched_chunk_indices}")
157+
158+
# If we have links, log information about them
159+
if hasattr(result_set.results, '_chunk_index_to_link'):
160+
for chunk_index in sorted(result_set.results._fetched_chunk_indices):
161+
if chunk_index in result_set.results._chunk_index_to_link:
162+
link = result_set.results._chunk_index_to_link[chunk_index]
163+
logger.info(f" Chunk {chunk_index}: row_count={link.row_count}, next_chunk_index={link.next_chunk_index}")
164+
165+
# Close cursor and connection
166+
cursor.close()
167+
connection.close()
168+
logger.info("Successfully closed SEA session")
169+
170+
except Exception as e:
171+
logger.error(f"Error during SEA cloud fetch test: {str(e)}")
172+
import traceback
173+
logger.error(traceback.format_exc())
174+
sys.exit(1)
175+
176+
logger.info("SEA cloud fetch test with multiple chunks completed successfully")
177+
178+
if __name__ == "__main__":
179+
test_sea_cloud_fetch_multiple_chunks()

0 commit comments

Comments
 (0)