|
2 | 2 | Tests for SEA-related queue classes. |
3 | 3 |
|
4 | 4 | This module contains tests for the JsonQueue, SeaResultSetQueueFactory, and SeaCloudFetchQueue classes. |
| 5 | +It also tests the Hybrid disposition which can create either ArrowQueue or SeaCloudFetchQueue based on |
| 6 | +whether attachment is set. |
5 | 7 | """ |
6 | 8 |
|
7 | 9 | import pytest |
|
20 | 22 | from databricks.sql.backend.sea.utils.constants import ResultFormat |
21 | 23 | from databricks.sql.exc import ProgrammingError, ServerOperationError |
22 | 24 | from databricks.sql.types import SSLOptions |
| 25 | +from databricks.sql.utils import ArrowQueue |
23 | 26 |
|
24 | 27 |
|
25 | 28 | class TestJsonQueue: |
@@ -418,3 +421,161 @@ def test_create_next_table_success(self, mock_logger): |
418 | 421 |
|
419 | 422 | # Verify the result is the table |
420 | 423 | assert result == mock_table |
| 424 | + |
| 425 | + |
| 426 | +class TestHybridDisposition: |
| 427 | + """Test suite for the Hybrid disposition handling in SeaResultSetQueueFactory.""" |
| 428 | + |
| 429 | + @pytest.fixture |
| 430 | + def arrow_manifest(self): |
| 431 | + """Create an Arrow manifest for testing.""" |
| 432 | + return ResultManifest( |
| 433 | + format=ResultFormat.ARROW_STREAM.value, |
| 434 | + schema={}, |
| 435 | + total_row_count=5, |
| 436 | + total_byte_count=1000, |
| 437 | + total_chunk_count=1, |
| 438 | + ) |
| 439 | + |
| 440 | + @pytest.fixture |
| 441 | + def description(self): |
| 442 | + """Create column descriptions.""" |
| 443 | + return [ |
| 444 | + ("col1", "string", None, None, None, None, None), |
| 445 | + ("col2", "int", None, None, None, None, None), |
| 446 | + ("col3", "boolean", None, None, None, None, None), |
| 447 | + ] |
| 448 | + |
| 449 | + @pytest.fixture |
| 450 | + def ssl_options(self): |
| 451 | + """Create SSL options for testing.""" |
| 452 | + return SSLOptions(tls_verify=True) |
| 453 | + |
| 454 | + @pytest.fixture |
| 455 | + def mock_sea_client(self): |
| 456 | + """Create a mock SEA client.""" |
| 457 | + client = Mock() |
| 458 | + client.max_download_threads = 10 |
| 459 | + return client |
| 460 | + |
| 461 | + @patch("databricks.sql.backend.sea.queue.create_arrow_table_from_arrow_file") |
| 462 | + def test_hybrid_disposition_with_attachment( |
| 463 | + self, |
| 464 | + mock_create_table, |
| 465 | + arrow_manifest, |
| 466 | + description, |
| 467 | + ssl_options, |
| 468 | + mock_sea_client, |
| 469 | + ): |
| 470 | + """Test that ArrowQueue is created when attachment is present.""" |
| 471 | + # Create mock arrow table |
| 472 | + mock_arrow_table = Mock() |
| 473 | + mock_arrow_table.num_rows = 5 |
| 474 | + mock_create_table.return_value = mock_arrow_table |
| 475 | + |
| 476 | + # Create result data with attachment |
| 477 | + attachment_data = b"mock_arrow_data" |
| 478 | + result_data = ResultData(attachment=attachment_data) |
| 479 | + |
| 480 | + # Build queue |
| 481 | + queue = SeaResultSetQueueFactory.build_queue( |
| 482 | + result_data=result_data, |
| 483 | + manifest=arrow_manifest, |
| 484 | + statement_id="test-statement", |
| 485 | + ssl_options=ssl_options, |
| 486 | + description=description, |
| 487 | + max_download_threads=10, |
| 488 | + sea_client=mock_sea_client, |
| 489 | + lz4_compressed=False, |
| 490 | + ) |
| 491 | + |
| 492 | + # Verify ArrowQueue was created |
| 493 | + assert isinstance(queue, ArrowQueue) |
| 494 | + mock_create_table.assert_called_once_with(attachment_data, description) |
| 495 | + |
| 496 | + @patch("databricks.sql.backend.sea.queue.ResultFileDownloadManager") |
| 497 | + @patch.object(SeaCloudFetchQueue, "_create_table_from_link", return_value=None) |
| 498 | + def test_hybrid_disposition_with_external_links( |
| 499 | + self, |
| 500 | + mock_create_table, |
| 501 | + mock_download_manager, |
| 502 | + arrow_manifest, |
| 503 | + description, |
| 504 | + ssl_options, |
| 505 | + mock_sea_client, |
| 506 | + ): |
| 507 | + """Test that SeaCloudFetchQueue is created when attachment is None but external links are present.""" |
| 508 | + # Create external links |
| 509 | + external_links = [ |
| 510 | + ExternalLink( |
| 511 | + external_link="https://example.com/data/chunk0", |
| 512 | + expiration="2025-07-03T05:51:18.118009", |
| 513 | + row_count=100, |
| 514 | + byte_count=1024, |
| 515 | + row_offset=0, |
| 516 | + chunk_index=0, |
| 517 | + next_chunk_index=1, |
| 518 | + http_headers={"Authorization": "Bearer token123"}, |
| 519 | + ) |
| 520 | + ] |
| 521 | + |
| 522 | + # Create result data with external links but no attachment |
| 523 | + result_data = ResultData(external_links=external_links, attachment=None) |
| 524 | + |
| 525 | + # Build queue |
| 526 | + queue = SeaResultSetQueueFactory.build_queue( |
| 527 | + result_data=result_data, |
| 528 | + manifest=arrow_manifest, |
| 529 | + statement_id="test-statement", |
| 530 | + ssl_options=ssl_options, |
| 531 | + description=description, |
| 532 | + max_download_threads=10, |
| 533 | + sea_client=mock_sea_client, |
| 534 | + lz4_compressed=False, |
| 535 | + ) |
| 536 | + |
| 537 | + # Verify SeaCloudFetchQueue was created |
| 538 | + assert isinstance(queue, SeaCloudFetchQueue) |
| 539 | + mock_create_table.assert_called_once() |
| 540 | + |
| 541 | + @patch("databricks.sql.backend.sea.queue.ResultSetDownloadHandler._decompress_data") |
| 542 | + @patch("databricks.sql.backend.sea.queue.create_arrow_table_from_arrow_file") |
| 543 | + def test_hybrid_disposition_with_compressed_attachment( |
| 544 | + self, |
| 545 | + mock_create_table, |
| 546 | + mock_decompress, |
| 547 | + arrow_manifest, |
| 548 | + description, |
| 549 | + ssl_options, |
| 550 | + mock_sea_client, |
| 551 | + ): |
| 552 | + """Test that ArrowQueue is created with decompressed data when attachment is present and lz4_compressed is True.""" |
| 553 | + # Create mock arrow table |
| 554 | + mock_arrow_table = Mock() |
| 555 | + mock_arrow_table.num_rows = 5 |
| 556 | + mock_create_table.return_value = mock_arrow_table |
| 557 | + |
| 558 | + # Setup decompression mock |
| 559 | + compressed_data = b"compressed_data" |
| 560 | + decompressed_data = b"decompressed_data" |
| 561 | + mock_decompress.return_value = decompressed_data |
| 562 | + |
| 563 | + # Create result data with attachment |
| 564 | + result_data = ResultData(attachment=compressed_data) |
| 565 | + |
| 566 | + # Build queue with lz4_compressed=True |
| 567 | + queue = SeaResultSetQueueFactory.build_queue( |
| 568 | + result_data=result_data, |
| 569 | + manifest=arrow_manifest, |
| 570 | + statement_id="test-statement", |
| 571 | + ssl_options=ssl_options, |
| 572 | + description=description, |
| 573 | + max_download_threads=10, |
| 574 | + sea_client=mock_sea_client, |
| 575 | + lz4_compressed=True, |
| 576 | + ) |
| 577 | + |
| 578 | + # Verify ArrowQueue was created with decompressed data |
| 579 | + assert isinstance(queue, ArrowQueue) |
| 580 | + mock_decompress.assert_called_once_with(compressed_data) |
| 581 | + mock_create_table.assert_called_once_with(decompressed_data, description) |
0 commit comments