diff --git a/.github/workflows/github-action-build.yml b/.github/workflows/github-action-build.yml index 4c07bd728ca..3bdd0a40295 100644 --- a/.github/workflows/github-action-build.yml +++ b/.github/workflows/github-action-build.yml @@ -111,6 +111,13 @@ jobs: if [ -f core/amber/requirements.txt ]; then pip install -r core/amber/requirements.txt; fi if [ -f core/amber/r-requirements.txt ]; then pip install -r core/amber/r-requirements.txt; fi if [ -f core/amber/operator-requirements.txt ]; then pip install -r core/amber/operator-requirements.txt; fi + - name: Install PostgreSQL + run: sudo apt-get update && sudo apt-get install -y postgresql + - name: Start PostgreSQL Service + run: sudo systemctl start postgresql + - name: Create Database and User + run: | + cd core/scripts/sql && sudo -u postgres psql -f iceberg_postgres_catalog.sql - name: Lint with flake8 and black run: | cd core/amber/src/main/python && flake8 && black . --check diff --git a/core/amber/requirements.txt b/core/amber/requirements.txt index cd79a6157d4..bc79a49365c 100644 --- a/core/amber/requirements.txt +++ b/core/amber/requirements.txt @@ -26,4 +26,9 @@ bidict==0.22.0 cached_property==1.5.2 psutil==5.9.0 transformers==4.44.2 -tzlocal==2.1 \ No newline at end of file +tzlocal==2.1 +pyiceberg==0.8.1 +readerwriterlock==1.0.9 +tenacity==8.5.0 +SQLAlchemy==2.0.37 +psycopg2==2.9.10 \ No newline at end of file diff --git a/core/amber/src/main/python/core/models/schema/attribute_type.py b/core/amber/src/main/python/core/models/schema/attribute_type.py index f12a3b92f5b..d2aac8fb55c 100644 --- a/core/amber/src/main/python/core/models/schema/attribute_type.py +++ b/core/amber/src/main/python/core/models/schema/attribute_type.py @@ -42,16 +42,18 @@ class AttributeType(Enum): AttributeType.DOUBLE: pa.float64(), AttributeType.BOOL: pa.bool_(), AttributeType.BINARY: pa.binary(), - AttributeType.TIMESTAMP: pa.timestamp("ms", tz="UTC"), + AttributeType.TIMESTAMP: pa.timestamp("us"), } FROM_ARROW_MAPPING = { lib.Type_INT32: AttributeType.INT, lib.Type_INT64: AttributeType.LONG, lib.Type_STRING: AttributeType.STRING, + lib.Type_LARGE_STRING: AttributeType.STRING, lib.Type_DOUBLE: AttributeType.DOUBLE, lib.Type_BOOL: AttributeType.BOOL, lib.Type_BINARY: AttributeType.BINARY, + lib.Type_LARGE_BINARY: AttributeType.BINARY, lib.Type_TIMESTAMP: AttributeType.TIMESTAMP, } diff --git a/core/amber/src/main/python/core/models/schema/test_schema.py b/core/amber/src/main/python/core/models/schema/test_schema.py index 1b0798192af..ca9f14ae9a3 100644 --- a/core/amber/src/main/python/core/models/schema/test_schema.py +++ b/core/amber/src/main/python/core/models/schema/test_schema.py @@ -27,7 +27,7 @@ def arrow_schema(self): pa.field("field-3", pa.int64()), pa.field("field-4", pa.float64()), pa.field("field-5", pa.bool_()), - pa.field("field-6", pa.timestamp("ms", tz="UTC")), + pa.field("field-6", pa.timestamp("us")), pa.field("field-7", pa.binary()), ] ) diff --git a/core/amber/src/main/python/core/storage/__init__.py b/core/amber/src/main/python/core/storage/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/core/amber/src/main/python/core/storage/document_factory.py b/core/amber/src/main/python/core/storage/document_factory.py new file mode 100644 index 00000000000..ed522df02b5 --- /dev/null +++ b/core/amber/src/main/python/core/storage/document_factory.py @@ -0,0 +1,103 @@ +from urllib.parse import urlparse + +from typing import Optional + +from core.models import Schema, Tuple +from core.storage.iceberg.iceberg_catalog_instance import IcebergCatalogInstance +from core.storage.iceberg.iceberg_document import IcebergDocument +from core.storage.iceberg.iceberg_utils import ( + create_table, + amber_tuples_to_arrow_table, + arrow_table_to_amber_tuples, + load_table_metadata, +) +from core.storage.model.virtual_document import VirtualDocument +from core.storage.storage_config import StorageConfig +from core.storage.vfs_uri_factory import VFSURIFactory, VFSResourceType + + +class DocumentFactory: + """ + Factory class to create and open documents. + Currently only iceberg documents are supported. + """ + + ICEBERG = "iceberg" + + @staticmethod + def sanitize_uri_path(uri): + return uri.path.lstrip("/").replace("/", "_") + + @staticmethod + def create_document(uri: str, schema: Schema) -> VirtualDocument: + parsed_uri = urlparse(uri) + if parsed_uri.scheme == VFSURIFactory.VFS_FILE_URI_SCHEME: + _, _, _, _, resource_type = VFSURIFactory.decode_uri(uri) + + if resource_type in { + VFSResourceType.RESULT, + VFSResourceType.MATERIALIZED_RESULT, + }: + storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) + + iceberg_schema = Schema.as_arrow_schema(schema) + + create_table( + IcebergCatalogInstance.get_instance(), + StorageConfig.ICEBERG_TABLE_NAMESPACE, + storage_key, + iceberg_schema, + override_if_exists=True, + ) + + return IcebergDocument[Tuple]( + StorageConfig.ICEBERG_TABLE_NAMESPACE, + storage_key, + iceberg_schema, + amber_tuples_to_arrow_table, + arrow_table_to_amber_tuples, + ) + else: + raise ValueError(f"Resource type {resource_type} is not supported") + else: + raise NotImplementedError( + f"Unsupported URI scheme: {parsed_uri.scheme} for creating the document" + ) + + @staticmethod + def open_document(uri: str) -> (VirtualDocument, Optional[Schema]): + parsed_uri = urlparse(uri) + if parsed_uri.scheme == "vfs": + _, _, _, _, resource_type = VFSURIFactory.decode_uri(uri) + + if resource_type in { + VFSResourceType.RESULT, + VFSResourceType.MATERIALIZED_RESULT, + }: + storage_key = DocumentFactory.sanitize_uri_path(parsed_uri) + + table = load_table_metadata( + IcebergCatalogInstance.get_instance(), + StorageConfig.ICEBERG_TABLE_NAMESPACE, + storage_key, + ) + + if table is None: + raise ValueError("No storage is found for the given URI") + + amber_schema = Schema(table.schema().as_arrow()) + + document = IcebergDocument( + StorageConfig.ICEBERG_TABLE_NAMESPACE, + storage_key, + table.schema(), + amber_tuples_to_arrow_table, + arrow_table_to_amber_tuples, + ) + return document, amber_schema + else: + raise ValueError(f"Resource type {resource_type} is not supported") + else: + raise NotImplementedError( + f"Unsupported URI scheme: {parsed_uri.scheme} for opening the document" + ) diff --git a/core/amber/src/main/python/core/storage/iceberg/__init__.py b/core/amber/src/main/python/core/storage/iceberg/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/core/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py b/core/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py new file mode 100644 index 00000000000..a00358a71ea --- /dev/null +++ b/core/amber/src/main/python/core/storage/iceberg/iceberg_catalog_instance.py @@ -0,0 +1,43 @@ +from pyiceberg.catalog import Catalog +from typing import Optional + +from core.storage.iceberg.iceberg_utils import create_postgres_catalog +from core.storage.storage_config import StorageConfig + + +class IcebergCatalogInstance: + """ + IcebergCatalogInstance is a singleton that manages the Iceberg catalog instance. + Currently only postgres SQL catalog is supported. + - Provides a single shared catalog for all Iceberg table-related operations. + - Lazily initializes the catalog on first access. + - Supports replacing the catalog instance for testing or reconfiguration. + """ + + _instance: Optional[Catalog] = None + + @classmethod + def get_instance(cls): + """ + Retrieves the singleton Iceberg catalog instance. + - If the catalog is not initialized, it is lazily created using the configured + properties. + :return: the Iceberg catalog instance. + """ + if cls._instance is None: + cls._instance = create_postgres_catalog( + "texera_iceberg", + StorageConfig.ICEBERG_FILE_STORAGE_DIRECTORY_PATH, + StorageConfig.ICEBERG_POSTGRES_CATALOG_USERNAME, + StorageConfig.ICEBERG_POSTGRES_CATALOG_PASSWORD, + ) + return cls._instance + + @classmethod + def replace_instance(cls, catalog: Catalog): + """ + Replaces the existing Iceberg catalog instance. + - This method is useful for testing or dynamically updating the catalog. + :param catalog: the new Iceberg catalog instance to replace the current one. + """ + cls._instance = catalog diff --git a/core/amber/src/main/python/core/storage/iceberg/iceberg_document.py b/core/amber/src/main/python/core/storage/iceberg/iceberg_document.py new file mode 100644 index 00000000000..9d15336dff5 --- /dev/null +++ b/core/amber/src/main/python/core/storage/iceberg/iceberg_document.py @@ -0,0 +1,260 @@ +from itertools import islice +from threading import RLock +from typing import Iterator, Optional, Callable, Iterable +from typing import TypeVar +from urllib.parse import ParseResult, urlparse + +import pyarrow as pa +from pyiceberg.catalog import Catalog +from pyiceberg.schema import Schema +from pyiceberg.table import Table, FileScanTask +from readerwriterlock import rwlock + +from core.storage.iceberg.iceberg_catalog_instance import IcebergCatalogInstance +from core.storage.iceberg.iceberg_table_writer import IcebergTableWriter +from core.storage.iceberg.iceberg_utils import ( + load_table_metadata, + read_data_file_as_arrow_table, +) +from core.storage.model.virtual_document import VirtualDocument + +# Define a type variable +T = TypeVar("T") + + +class IcebergDocument(VirtualDocument[T]): + """ + IcebergDocument is used to read and write a set of T as an Iceberg table. + It provides iterator-based read methods and supports multiple writers to write to + the same table. + + - On construction, the table will be created if it does not exist. + - If the table exists, it will be overridden. + + :param table_namespace: Namespace of the table. + :param table_name: Name of the table. + :param table_schema: Schema of the table. + :param serde: A function to convert a T iterable into a pyarrow Table. Note the + conversion is not based on a single T item (unlike Texera's Java IcebergDocument.) + :param deserde: A function to convert a pyarrow Table back into a T iterable. + """ + + def __init__( + self, + table_namespace: str, + table_name: str, + table_schema: Schema, + serde: Callable[[Schema, Iterable[T]], pa.Table], + deserde: Callable[[Schema, pa.Table], Iterable[T]], + ): + self.table_namespace = table_namespace + self.table_name = table_name + self.table_schema = table_schema + self.serde = serde + self.deserde = deserde + + self.lock = rwlock.RWLockFair() + self.catalog = IcebergCatalogInstance.get_instance() + + def get_uri(self) -> ParseResult: + """Returns the URI of the table location.""" + table = load_table_metadata(self.catalog, self.table_namespace, self.table_name) + if not table: + raise Exception( + f"table {self.table_namespace}.{self.table_name} doesn't exist." + ) + return urlparse(table.location()) + + def clear(self): + """Deletes the table and clears its contents.""" + with self.lock.gen_wlock(): + table_identifier = f"{self.table_namespace}.{self.table_name}" + if self.catalog.table_exists(table_identifier): + self.catalog.drop_table(table_identifier) + + def get(self) -> Iterator[T]: + """Get an iterator for reading all records from the table.""" + return self._get_using_file_sequence_order(0, None) + + def get_range(self, from_index: int, until_index: int) -> Iterator[T]: + """Get records within a specified range [from, until).""" + return self._get_using_file_sequence_order(from_index, until_index) + + def get_after(self, offset: int) -> Iterator[T]: + """Get records starting after a specified offset.""" + return self._get_using_file_sequence_order(offset, None) + + def get_count(self) -> int: + """Get the total count of records in the table.""" + table = load_table_metadata(self.catalog, self.table_namespace, self.table_name) + if not table: + return 0 + return sum(f.file.record_count for f in table.scan().plan_files()) + + def writer(self, writer_identifier: str): + """ + Creates a BufferedItemWriter for writing data to the table. + :param writer_identifier: The writer's ID. It should be unique within the same + table, as each writer will use it as the prefix of the files they append + :return: An IcebergTableWriter + """ + return IcebergTableWriter[T]( + writer_identifier=writer_identifier, + catalog=self.catalog, + table_namespace=self.table_namespace, + table_name=self.table_name, + table_schema=self.table_schema, + serde=self.serde, + ) + + def _get_using_file_sequence_order( + self, from_index: int, until_index: Optional[int] + ) -> Iterator[T]: + """Utility to get records within a specified range.""" + with self.lock.gen_rlock(): + return IcebergIterator[T]( + from_index, + until_index, + self.catalog, + self.table_namespace, + self.table_name, + self.table_schema, + self.deserde, + ) + + +class IcebergIterator(Iterator[T]): + """ + A custom iterator class to read items from an iceberg table based on an index range. + """ + + def __init__( + self, + from_index: int, + until_index: int, + catalog: Catalog, + table_namespace: str, + table_name: str, + table_schema: Schema, + deserde: Callable[[Schema, pa.Table], Iterable[T]], + ): + self.from_index = from_index + self.until_index = until_index + self.catalog = catalog + self.table_namespace = table_namespace + self.table_name = table_name + self.table_schema = table_schema + self.deserde = deserde + self.lock = RLock() + # Counter for how many records have been skipped + self.num_of_skipped_records = 0 + # Counter for how many records have been returned + self.num_of_returned_records = 0 + # Total number of records to return, used for termination condition + self.total_records_to_return = ( + self.until_index - self.from_index if until_index else float("inf") + ) + # Load the table instance, initially the table instance may not exist + self.table = self._load_table_metadata() + # Iterator for usable file scan tasks + self.usable_file_iterator = self._seek_to_usable_file() + # Current record iterator for the active file + self.current_record_iterator = iter([]) + + def _load_table_metadata(self) -> Optional[Table]: + """Util function to load the table's metadata.""" + return load_table_metadata(self.catalog, self.table_namespace, self.table_name) + + def _seek_to_usable_file(self) -> Iterator[FileScanTask]: + """Find usable file scan tasks starting from the specified record index.""" + with self.lock: + if self.num_of_skipped_records > self.from_index: + raise RuntimeError("seek operation should not be called") + + # Load the table for the first time + if not self.table: + self.table = self._load_table_metadata() + + # If the table still does not exist after loading, end iterator. + if self.table: + try: + self.table.refresh() + current_snapshot = self.table.current_snapshot() + if current_snapshot is None: + return iter([]) + sorted_file_scan_tasks = self._extract_sorted_file_scan_tasks( + current_snapshot + ) + # Skip records in files before the `from_index` + for task in sorted_file_scan_tasks: + record_count = task.file.record_count + if ( + self.num_of_skipped_records + record_count + <= self.from_index + ): + self.num_of_skipped_records += record_count + continue + yield task + except Exception: + print("Could not read iceberg table:\n") + raise Exception + else: + return iter([]) + + def _extract_sorted_file_scan_tasks(self, current_snapshot): + """ + As self.table.inspect.entries() does not work with java files, this method + implements the logic to find file_sequence_number for each data file ourselves + :param current_snapshot: The current snapshot of the table. + :return: The file scan tasks of the file sorted by file_sequence_number + """ + file_sequence_map = {} + for manifest in current_snapshot.manifests(self.table.io): + for entry in manifest.fetch_manifest_entry(io=self.table.io): + file_sequence_map[entry.data_file.file_path] = entry.sequence_number + # Retrieve and sort the file scan tasks by file sequence number + file_scan_tasks = list(self.table.scan().plan_files()) + # Sort files by their sequence number. Files without a sequence + # number will be read last. + sorted_file_scan_tasks = sorted( + file_scan_tasks, + key=lambda t: file_sequence_map.get(t.file.file_path, float("inf")), + ) + return sorted_file_scan_tasks + + def __iter__(self) -> Iterator[T]: + return self + + def __next__(self) -> T: + if self.num_of_returned_records >= self.total_records_to_return: + raise StopIteration("No more records available") + + while True: + try: + record = next(self.current_record_iterator) + self.num_of_returned_records += 1 + return record + except StopIteration: + # current_record_iterator is exhausted, need to go to the next file + try: + next_file = next(self.usable_file_iterator) + arrow_table = read_data_file_as_arrow_table(next_file, self.table) + self.current_record_iterator = self.deserde( + self.table_schema, arrow_table + ) + # Skip records within the file if necessary + records_to_skip_in_file = ( + self.from_index - self.num_of_skipped_records + ) + if records_to_skip_in_file > 0: + self.current_record_iterator = self._skip_records( + self.current_record_iterator, records_to_skip_in_file + ) + self.num_of_skipped_records += records_to_skip_in_file + except StopIteration: + # no more files left in this table + raise StopIteration("No more records available") + + @staticmethod + def _skip_records(iterator, count): + return islice(iterator, count, None) diff --git a/core/amber/src/main/python/core/storage/iceberg/iceberg_table_writer.py b/core/amber/src/main/python/core/storage/iceberg/iceberg_table_writer.py new file mode 100644 index 00000000000..8f547a6af63 --- /dev/null +++ b/core/amber/src/main/python/core/storage/iceberg/iceberg_table_writer.py @@ -0,0 +1,110 @@ +import pyarrow as pa +from pyiceberg.catalog import Catalog +from pyiceberg.schema import Schema +from pyiceberg.table import Table +from tenacity import retry, stop_after_attempt, wait_random_exponential +from typing import List, TypeVar, Callable, Iterable + +from core.storage.model.buffered_item_writer import BufferedItemWriter +from core.storage.storage_config import StorageConfig + +# Define a type variable for the data type T +T = TypeVar("T") + + +class IcebergTableWriter(BufferedItemWriter[T]): + """ + IcebergTableWriter writes data to the given Iceberg table in an append-only way. + - Each time the buffer is flushed, a new data file is created using pyarrow + - Iceberg data files are immutable once created. So each flush will create a + distinct file. + + **Thread Safety**: This writer is NOT thread-safe, so only one thread should call + this writer. + + :param writer_identifier: A unique identifier used to prefix the created files. + :param catalog: The Iceberg catalog to manage table metadata. + :param table_namespace: The namespace of the Iceberg table. + :param table_name: The name of the Iceberg table. + :param table_schema: The schema of the Iceberg table. + """ + + def __init__( + self, + writer_identifier: str, + catalog: Catalog, + table_namespace: str, + table_name: str, + table_schema: pa.Schema, + serde: Callable[[Schema, Iterable[T]], pa.Table], + ): + self.writer_identifier = writer_identifier + self.catalog = catalog + self.table_namespace = table_namespace + self.table_name = table_name + self.table_schema = table_schema + self.serde = serde + self.buffer_size = StorageConfig.ICEBERG_TABLE_COMMIT_BATCH_SIZE + + # Internal state + self.buffer: List[T] = [] + + # Load the Iceberg table + self.table: Table = self.catalog.load_table( + f"{self.table_namespace}.{self.table_name}" + ) + + @property + def buffer_size(self) -> int: + return self._buffer_size + + def open(self) -> None: + """Open the writer and clear the buffer.""" + self.buffer.clear() + + def put_one(self, item: T) -> None: + """Add a single item to the buffer.""" + self.buffer.append(item) + if len(self.buffer) >= self.buffer_size: + self._flush_buffer() + + def remove_one(self, item: T) -> None: + """Remove a single item from the buffer.""" + self.buffer.remove(item) + + def _flush_buffer(self) -> None: + """ + Flush the current buffer to a new Iceberg data file. The buffer is first + converted to a pyarrow table, and then appended to the iceberg table as a + parquet file. Note in the case of concurrent writers, as iceberg uses + optimistic concurrency control, we use a random exponential backoff mechanism + when commit failure happens because currently pyiceberg does not natively + support retry. + """ + if not self.buffer: + return + df = self.serde(self.table_schema, self.buffer) + + def append_to_table_with_retry(pa_df: pa.Table) -> None: + @retry( + wait=wait_random_exponential(0.001, 10), + stop=stop_after_attempt(10), + reraise=True, + ) + def append_with_retry(): + self.table.refresh() + self.table.append(pa_df) + + append_with_retry() + + append_to_table_with_retry(df) + self.buffer.clear() + + def close(self) -> None: + """Close the writer, ensuring any remaining buffered items are flushed.""" + if self.buffer: + self._flush_buffer() + + @buffer_size.setter + def buffer_size(self, value): + self._buffer_size = value diff --git a/core/amber/src/main/python/core/storage/iceberg/iceberg_utils.py b/core/amber/src/main/python/core/storage/iceberg/iceberg_utils.py new file mode 100644 index 00000000000..54db90f856b --- /dev/null +++ b/core/amber/src/main/python/core/storage/iceberg/iceberg_utils.py @@ -0,0 +1,139 @@ +import pyarrow as pa +import pyiceberg.table +from pyiceberg.catalog import Catalog +from pyiceberg.catalog.sql import SqlCatalog +from pyiceberg.expressions import AlwaysTrue +from pyiceberg.io.pyarrow import ArrowScan +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC +from pyiceberg.schema import Schema +from pyiceberg.table import Table +from typing import Optional, Iterable + +import core +from core.models import ArrowTableTupleProvider, Tuple + + +def create_postgres_catalog( + catalog_name: str, warehouse_path: str, username: str, password: str +) -> SqlCatalog: + """ + Creates a Postgres SQL catalog instance by connecting to the database named + "texera_iceberg_catalog". + - The only requirement of the database is that it already exists. Once pyiceberg + can connect to the database, it will handle the initializations. + :param catalog_name: the name of the catalog. + :param warehouse_path: the root path for the warehouse where the tables are stored. + :param username: the username of the postgres database. + :param password: the password of the postgres database. + :return: a SQLCatalog instance. + """ + return SqlCatalog( + catalog_name, + **{ + "uri": f"postgresql+psycopg2://{username}:" + f"{password}@localhost/texera_iceberg_catalog", + "warehouse": f"file://{warehouse_path}", + }, + ) + + +def create_table( + catalog: Catalog, + table_namespace: str, + table_name: str, + table_schema: Schema, + override_if_exists: bool = False, +) -> Table: + """ + Creates a new Iceberg table with the specified schema and properties. + - Drops the existing table if `override_if_exists` is true and the table already + exists. + - Creates an unpartitioned table with custom commit retry properties. + + :param catalog: The Iceberg catalog to manage the table. + :param table_namespace: The namespace of the table. + :param table_name: The name of the table. + :param table_schema: The schema of the table. + :param override_if_exists: Whether to drop and recreate the table if it exists. + :return: The created Iceberg table. + """ + + identifier = f"{table_namespace}.{table_name}" + + catalog.create_namespace_if_not_exists(table_namespace) + + if catalog.table_exists(identifier) and override_if_exists: + catalog.drop_table(identifier) + + table = catalog.create_table( + identifier=identifier, + schema=table_schema, + partition_spec=UNPARTITIONED_PARTITION_SPEC, + ) + + return table + + +def load_table_metadata( + catalog: Catalog, table_namespace: str, table_name: str +) -> Optional[Table]: + """ + Loads metadata for an existing Iceberg table. + - Returns the table if it exists and is successfully loaded. + - Returns None if the table does not exist or cannot be loaded. + + :param catalog: The Iceberg catalog to load the table from. + :param table_namespace: The namespace of the table. + :param table_name: The name of the table. + :return: The table if found, or None if not found. + """ + identifier = f"{table_namespace}.{table_name}" + try: + return catalog.load_table(identifier) + except Exception: + return None + + +def read_data_file_as_arrow_table( + planfile: pyiceberg.table.FileScanTask, iceberg_table: pyiceberg.table.Table +) -> pa.Table: + """Reads a data file as a pyarrow table and returns an iterator over its records.""" + arrow_table: pa.Table = ArrowScan( + iceberg_table.metadata, + iceberg_table.io, + iceberg_table.schema(), + AlwaysTrue(), + True, + ).to_table([planfile]) + return arrow_table + + +def amber_tuples_to_arrow_table( + iceberg_schema: Schema, tuple_list: Iterable[Tuple] +) -> pa.Table: + """ + Converts a list of amber tuples to a pyarrow table for serialization. + """ + return pa.Table.from_pydict( + { + name: [t[name] for t in tuple_list] + for name in iceberg_schema.as_arrow().names + }, + schema=iceberg_schema.as_arrow(), + ) + + +def arrow_table_to_amber_tuples( + iceberg_schema: Schema, arrow_table: pa.Table +) -> Iterable[Tuple]: + """ + Converts an arrow table to a list of amber tuples for deserialization. + """ + tuple_provider = ArrowTableTupleProvider(arrow_table) + return ( + Tuple( + {name: field_accessor for name in arrow_table.column_names}, + schema=core.models.Schema(iceberg_schema.as_arrow()), + ) + for field_accessor in tuple_provider + ) diff --git a/core/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py b/core/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py new file mode 100644 index 00000000000..ef41c820615 --- /dev/null +++ b/core/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py @@ -0,0 +1,289 @@ +import datetime +import random +import uuid +from concurrent.futures import as_completed +from concurrent.futures.thread import ThreadPoolExecutor + +import pytest + +from core.models import Schema, Tuple +from core.storage.document_factory import DocumentFactory +from core.storage.storage_config import StorageConfig +from core.storage.vfs_uri_factory import VFSURIFactory +from proto.edu.uci.ics.amber.core import ( + WorkflowIdentity, + ExecutionIdentity, + OperatorIdentity, + PortIdentity, +) + +# Hardcoded storage config only for test purposes. +StorageConfig.initialize( + postgres_username="texera_iceberg_admin", + postgres_password="password", + table_namespace="operator-port-result", + directory_path="../../../../../../core/amber/user-resources/workflow-results", + commit_batch_size=4096, +) + + +class TestIcebergDocument: + + @pytest.fixture + def amber_schema(self): + """Sample Amber schema""" + return Schema( + raw_schema={ + "col-string": "STRING", + "col-int": "INTEGER", + "col-bool": "BOOLEAN", + "col-long": "LONG", + "col-double": "DOUBLE", + "col-timestamp": "TIMESTAMP", + "col-binary": "BINARY", + } + ) + + @pytest.fixture + def iceberg_document(self, amber_schema): + """ + Creates an iceberg document of operator port results using the sample schema + with a random operator id + """ + operator_uuid = str(uuid.uuid4()).replace("-", "") + uri = VFSURIFactory.create_result_uri( + WorkflowIdentity(id=0), + ExecutionIdentity(id=0), + OperatorIdentity(id=f"test_table_{operator_uuid}"), + PortIdentity(id=0), + ) + DocumentFactory.create_document(uri, amber_schema) + document, _ = DocumentFactory.open_document(uri) + return document + + @pytest.fixture + def sample_items(self, amber_schema) -> [Tuple]: + """ + Generates a list of sample tuples + """ + base_tuples = [ + Tuple( + { + "col-string": "Hello World", + "col-int": 42, + "col-bool": True, + "col-long": 1123213213213, + "col-double": 214214.9969346, + "col-timestamp": datetime.datetime.now(), + "col-binary": b"hello", + }, + schema=amber_schema, + ), + Tuple( + { + "col-string": "", + "col-int": -1, + "col-bool": False, + "col-long": -98765432109876, + "col-double": -0.001, + "col-timestamp": datetime.datetime.fromtimestamp(100000000), + "col-binary": bytearray([255, 0, 0, 64]), + }, + schema=amber_schema, + ), + Tuple( + { + "col-string": "Special Characters: \n\t\r", + "col-int": 2147483647, + "col-bool": True, + "col-long": 9223372036854775807, + "col-double": 1.7976931348623157e308, + "col-timestamp": datetime.datetime.fromtimestamp(1234567890), + "col-binary": bytearray([1, 2, 3, 4, 5]), + }, + schema=amber_schema, + ), + ] + + # Function to generate random binary data + def generate_random_binary(size): + return bytearray(random.getrandbits(8) for _ in range(size)) + + # Generate additional tuples + additional_tuples = [ + Tuple( + { + "col-string": None if i % 7 == 0 else f"Generated String {i}", + "col-int": None if i % 5 == 0 else i, + "col-bool": None if i % 6 == 0 else i % 2 == 0, + "col-long": None if i % 4 == 0 else i * 1000000, + "col-double": None if i % 3 == 0 else i * 0.12345, + "col-timestamp": ( + None + if i % 8 == 0 + else datetime.datetime.fromtimestamp( + datetime.datetime.now().timestamp() + i + ) + ), + "col-binary": None if i % 9 == 0 else generate_random_binary(10), + }, + schema=amber_schema, + ) + for i in range(1, 20001) + ] + + return base_tuples + additional_tuples + + def test_basic_read_and_write(self, iceberg_document, sample_items): + """ + Create an iceberg document, write sample items, and read it back. + """ + writer = iceberg_document.writer(str(uuid.uuid4())) + writer.open() + for item in sample_items: + writer.put_one(item) + writer.close() + retrieved_items = list(iceberg_document.get()) + assert sample_items == retrieved_items + + def test_clear_document(self, iceberg_document, sample_items): + """ + Create an iceberg document, write sample items, and clear the document. + """ + writer = iceberg_document.writer(str(uuid.uuid4())) + writer.open() + for item in sample_items: + writer.put_one(item) + writer.close() + assert len(list(iceberg_document.get())) > 0 + + iceberg_document.clear() + assert len(list(iceberg_document.get())) == 0 + + def test_handle_empty_read(self, iceberg_document): + """ + The iceberg document should handle empty reads gracefully + """ + retrieved_items = list(iceberg_document.get()) + assert retrieved_items == [] + + def test_concurrent_writes_followed_by_read(self, iceberg_document, sample_items): + """ + Tests multiple concurrent writers writing to the same iceberg document + """ + all_items = sample_items + num_writers = 10 + # Calculate the batch size and the remainder + batch_size = len(all_items) // num_writers + remainder = len(all_items) % num_writers + # Create writer's batches + item_batches = [ + all_items[ + i * batch_size + + min(i, remainder) : i * batch_size + + min(i, remainder) + + batch_size + + (1 if i < remainder else 0) + ] + for i in range(num_writers) + ] + + assert ( + len(item_batches) == num_writers + ), f"Expected {num_writers} batches but got {len(item_batches)}" + + # Perform concurrent writes + def write_batch(batch): + writer = iceberg_document.writer(str(uuid.uuid4())) + writer.open() + for item in batch: + writer.put_one(item) + writer.close() + + with ThreadPoolExecutor(max_workers=num_writers) as executor: + futures = [executor.submit(write_batch, batch) for batch in item_batches] + for future in as_completed(futures): + future.result() # Wait for each future to complete + + # Read all items back + retrieved_items = list(iceberg_document.get()) + # Verify that the retrieved items match the original items + assert set(retrieved_items) == set( + all_items + ), "All items should be read correctly after concurrent writes." + + def test_read_using_range(self, iceberg_document, sample_items): + """ + The iceberg document should read all items using rages correctly. + """ + writer = iceberg_document.writer(str(uuid.uuid4())) + writer.open() + for item in sample_items: + writer.put_one(item) + writer.close() + # Read all items using ranges + batch_size = 1500 + # Generate ranges + ranges = [ + range(i, min(i + batch_size, len(sample_items))) + for i in range(0, len(sample_items), batch_size) + ] + + # Retrieve items using ranges + retrieved_items = [ + item for r in ranges for item in iceberg_document.get_range(r.start, r.stop) + ] + + assert len(retrieved_items) == len( + sample_items + ), "The number of retrieved items does not match the number of all items." + + # Verify that the retrieved items match the original items + assert set(retrieved_items) == set( + sample_items + ), "All items should be retrieved correctly using ranges." + + def test_get_after(self, iceberg_document, sample_items): + """ + The iceberg document should retrieve items correctly using get_after + """ + writer = iceberg_document.writer(str(uuid.uuid4())) + writer.open() + for item in sample_items: + writer.put_one(item) + writer.close() + # Test get_after for various offsets + offsets = [0, len(sample_items) // 2, len(sample_items) - 1] + for offset in offsets: + if offset < len(sample_items): + expected_items = sample_items[offset:] + else: + expected_items = [] + + retrieved_items = list(iceberg_document.get_after(offset)) + assert retrieved_items == expected_items, ( + f"get_after({offset}) did not return the expected items. " + f"Expected: {expected_items}, Got: {retrieved_items}" + ) + + # Test get_after for an offset beyond the range + invalid_offset = len(sample_items) + retrieved_items = list(iceberg_document.get_after(invalid_offset)) + assert not retrieved_items, ( + f"get_after({invalid_offset}) should return " + f"an empty list, but got: {retrieved_items}" + ) + + def test_get_counts(self, iceberg_document, sample_items): + """ + The iceberg document should correctly return the count of items. + """ + writer = iceberg_document.writer(str(uuid.uuid4())) + writer.open() + for item in sample_items: + writer.put_one(item) + writer.close() + + assert iceberg_document.get_count() == len( + sample_items + ), "get_count should return the same number as the length of sample_items" diff --git a/core/amber/src/main/python/core/storage/model/__init__.py b/core/amber/src/main/python/core/storage/model/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/core/amber/src/main/python/core/storage/model/buffered_item_writer.py b/core/amber/src/main/python/core/storage/model/buffered_item_writer.py new file mode 100644 index 00000000000..7f00eb52d3a --- /dev/null +++ b/core/amber/src/main/python/core/storage/model/buffered_item_writer.py @@ -0,0 +1,58 @@ +from abc import ABC, abstractmethod +from typing import Generic, TypeVar + +# Define a type variable +T = TypeVar("T") + + +class BufferedItemWriter(ABC, Generic[T]): + """ + BufferedItemWriter provides an interface for writing items to a buffer and + performing I/O operations. + The items are buffered before being written to the underlying storage to + optimize performance. + """ + + @property + @abstractmethod + def buffer_size(self) -> int: + """ + The size of the buffer. + :return: the buffer size. + """ + pass + + @abstractmethod + def open(self) -> None: + """ + Open the writer, initializing any necessary resources. + This method should be called before any write operations. + """ + pass + + @abstractmethod + def close(self) -> None: + """ + Close the writer, flushing any remaining items in the buffer + to the underlying storage and releasing any held resources. + """ + pass + + @abstractmethod + def put_one(self, item: T) -> None: + """ + Put one item into the buffer. If the buffer is full, it should be flushed to + the underlying storage. + :param item: the data item to be written. + """ + pass + + @abstractmethod + def remove_one(self, item: T) -> None: + """ + Remove one item from the buffer. If the item is not found in the buffer, an + appropriate action should be taken, + such as throwing an exception or ignoring the request. + :param item: the data item to be removed. + """ + pass diff --git a/core/amber/src/main/python/core/storage/model/readonly_virtual_document.py b/core/amber/src/main/python/core/storage/model/readonly_virtual_document.py new file mode 100644 index 00000000000..dcaca3b941b --- /dev/null +++ b/core/amber/src/main/python/core/storage/model/readonly_virtual_document.py @@ -0,0 +1,69 @@ +from abc import ABC, abstractmethod +from urllib.parse import ParseResult + +from typing import Generic, TypeVar, Iterator + +# Define a type variable +T = TypeVar("T") + + +class ReadonlyVirtualDocument(ABC, Generic[T]): + """ + ReadonlyVirtualDocument provides an abstraction for read operations over a single + resource. + This class can be implemented by resources that only need to support read-related + functionality. + """ + + @abstractmethod + def get_uri(self) -> ParseResult: + """ + Get the URI of the corresponding document. + :return: the URI of the document as a ParseResult object + """ + pass + + @abstractmethod + def get_item(self, i: int) -> T: + """ + Find the ith item and return. + :param i: index starting from 0 + :return: data item of type T + """ + pass + + @abstractmethod + def get(self) -> Iterator[T]: + """ + Get an iterator that iterates over all indexed items. + :return: an iterator that returns data items of type T + """ + pass + + @abstractmethod + def get_range(self, from_index: int, until_index: int) -> Iterator[T]: + """ + Get an iterator of a sequence starting from index `from_index`, until index + `until`. + :param from_index: the starting index (inclusive) + :param until_index: the ending index (exclusive) + :return: an iterator that returns data items of type T + """ + pass + + @abstractmethod + def get_after(self, offset: int) -> Iterator[T]: + """ + Get an iterator of all items after the specified index `offset`. + :param offset: the starting index (exclusive) + :return: an iterator that returns data items of type T + """ + pass + + @abstractmethod + def get_count(self) -> int: + """ + Get the count of items in the document. + :return: the count of items + """ + pass diff --git a/core/amber/src/main/python/core/storage/model/virtual_document.py b/core/amber/src/main/python/core/storage/model/virtual_document.py new file mode 100644 index 00000000000..10b953eda97 --- /dev/null +++ b/core/amber/src/main/python/core/storage/model/virtual_document.py @@ -0,0 +1,60 @@ +from abc import ABC, abstractmethod +from urllib.parse import ParseResult + +from overrides import overrides +from typing import TypeVar, Iterator + +from core.storage.model.buffered_item_writer import BufferedItemWriter +from core.storage.model.readonly_virtual_document import ReadonlyVirtualDocument + +# Define a type variable +T = TypeVar("T") + + +class VirtualDocument(ReadonlyVirtualDocument[T], ABC): + """ + VirtualDocument provides the abstraction of performing read/write/copy/delete + operations over a single resource. + Note that all methods have a default implementation. This is because one document + implementation may not be able to reasonably support all methods. + """ + + @overrides + def get_uri(self) -> ParseResult: + raise NotImplementedError("get_uri method is not implemented") + + @overrides + def get_item(self, i: int) -> T: + raise NotImplementedError("get_item method is not implemented") + + @overrides + def get(self) -> Iterator[T]: + raise NotImplementedError("get method is not implemented") + + @overrides + def get_range(self, from_index: int, until_index: int) -> Iterator[T]: + raise NotImplementedError("get_range method is not implemented") + + @overrides + def get_after(self, offset: int) -> Iterator[T]: + raise NotImplementedError("get_after method is not implemented") + + @overrides + def get_count(self) -> int: + raise NotImplementedError("get_count method is not implemented") + + def writer(self, writer_identifier: str) -> BufferedItemWriter[T]: + """ + return a writer that buffers the items and performs the flush operation at + close time + :param writer_identifier: the id of the writer + :return: a buffered item writer + """ + raise NotImplementedError("writer method is not implemented") + + @abstractmethod + def clear(self) -> None: + """ + Physically remove the current document. + """ + pass diff --git a/core/amber/src/main/python/core/storage/storage_config.py b/core/amber/src/main/python/core/storage/storage_config.py new file mode 100644 index 00000000000..19300feaf51 --- /dev/null +++ b/core/amber/src/main/python/core/storage/storage_config.py @@ -0,0 +1,38 @@ +class StorageConfig: + """ + A static class to keep the storage-related configs. + This class should be initialized with the configs passed from Java side and + is used by all storage-related classes. + """ + + _initialized = False + + ICEBERG_POSTGRES_CATALOG_USERNAME = None + ICEBERG_POSTGRES_CATALOG_PASSWORD = None + ICEBERG_TABLE_NAMESPACE = None + ICEBERG_FILE_STORAGE_DIRECTORY_PATH = None + ICEBERG_TABLE_COMMIT_BATCH_SIZE = None + + @classmethod + def initialize( + cls, + postgres_username, + postgres_password, + table_namespace, + directory_path, + commit_batch_size, + ): + if cls._initialized: + raise RuntimeError( + "Storage config has already been initialized" "and cannot be modified." + ) + + cls.ICEBERG_POSTGRES_CATALOG_USERNAME = postgres_username + cls.ICEBERG_POSTGRES_CATALOG_PASSWORD = postgres_password + cls.ICEBERG_TABLE_NAMESPACE = table_namespace + cls.ICEBERG_FILE_STORAGE_DIRECTORY_PATH = directory_path + cls.ICEBERG_TABLE_COMMIT_BATCH_SIZE = commit_batch_size + cls._initialized = True + + def __new__(cls, *args, **kwargs): + raise TypeError(f"{cls.__name__} is a static class and cannot be instantiated.") diff --git a/core/amber/src/main/python/core/storage/vfs_uri_factory.py b/core/amber/src/main/python/core/storage/vfs_uri_factory.py new file mode 100644 index 00000000000..59c95ad3d11 --- /dev/null +++ b/core/amber/src/main/python/core/storage/vfs_uri_factory.py @@ -0,0 +1,126 @@ +from enum import Enum +from typing import Optional +from urllib.parse import urlparse + +from proto.edu.uci.ics.amber.core import ( + WorkflowIdentity, + ExecutionIdentity, + OperatorIdentity, + PortIdentity, +) + + +class VFSResourceType(str, Enum): + RESULT = "result" + MATERIALIZED_RESULT = "materialized_result" + + +class VFSURIFactory: + VFS_FILE_URI_SCHEME = "vfs" + + @staticmethod + def decode_uri( + uri: str, + ) -> ( + WorkflowIdentity, + ExecutionIdentity, + OperatorIdentity, + Optional[PortIdentity], + VFSResourceType, + ): + """ + Parses a VFS URI and extracts its components. + """ + parsed_uri = urlparse(uri) + + if parsed_uri.scheme != VFSURIFactory.VFS_FILE_URI_SCHEME: + raise ValueError(f"Invalid URI scheme: {parsed_uri.scheme}") + + segments = parsed_uri.path.lstrip("/").split("/") + + def extract_value(key: str) -> str: + try: + index = segments.index(key) + return segments[index + 1] + except (ValueError, IndexError): + raise ValueError(f"Missing value for key: {key} in URI: {uri}") + + workflow_id = WorkflowIdentity(int(extract_value("wid"))) + execution_id = ExecutionIdentity(int(extract_value("eid"))) + operator_id = OperatorIdentity(extract_value("opid")) + + port_identity = VFSURIFactory._extract_optional_port_identity(segments, uri) + + resource_type_str = segments[-1].lower() + try: + resource_type = VFSResourceType(resource_type_str) + except ValueError: + raise ValueError(f"Unknown resource type: {resource_type_str}") + + return workflow_id, execution_id, operator_id, port_identity, resource_type + + @staticmethod + def _extract_optional_port_identity(segments, uri): + if "pid" in segments: + try: + pid_index = segments.index("pid") + port_id_str, port_type = segments[pid_index + 1].split("_") + port_id = int(port_id_str) + if port_type != "I" and port_type != "E": + raise ValueError(f"Invalid port type: {port_type} in URI: {uri}") + is_internal = port_type == "I" + return PortIdentity(port_id, is_internal) + except (ValueError, IndexError): + raise ValueError(f"Invalid port information in URI: {uri}") + else: + return None + + @staticmethod + def create_result_uri(workflow_id, execution_id, operator_id, port_identity) -> str: + """Creates a URI pointing to a result storage.""" + return VFSURIFactory._create_vfs_uri( + VFSResourceType.RESULT, + workflow_id, + execution_id, + operator_id, + port_identity, + ) + + @staticmethod + def create_materialized_result_uri( + workflow_id, execution_id, operator_id, port_identity + ) -> str: + """Creates a URI pointing to a materialized storage.""" + return VFSURIFactory._create_vfs_uri( + VFSResourceType.MATERIALIZED_RESULT, + workflow_id, + execution_id, + operator_id, + port_identity, + ) + + @staticmethod + def _create_vfs_uri( + resource_type, workflow_id, execution_id, operator_id, port_identity=None + ) -> str: + """Internal helper to create URI pointing to a VFS resource.""" + if ( + resource_type + in (VFSResourceType.RESULT, VFSResourceType.MATERIALIZED_RESULT) + and port_identity is None + ): + raise ValueError( + "PortIdentity must be provided when resourceType is RESULT or " + "MATERIALIZED_RESULT." + ) + + base_uri = ( + f"{VFSURIFactory.VFS_FILE_URI_SCHEME}:///wid/{workflow_id.id}" + f"/eid/{execution_id.id}/opid/{operator_id.id}" + ) + + if port_identity is not None: + port_type = "I" if port_identity.internal else "E" + base_uri += f"/pid/{port_identity.id}_{port_type}" + + return f"{base_uri}/{resource_type.value}" diff --git a/core/scripts/sql/iceberg_postgres_catalog.sql b/core/scripts/sql/iceberg_postgres_catalog.sql new file mode 100755 index 00000000000..374da5b0f82 --- /dev/null +++ b/core/scripts/sql/iceberg_postgres_catalog.sql @@ -0,0 +1,16 @@ +-- Important: replace "texera_iceberg_admin" and "password" with your own username +-- and password before executing the script. +\set db_user texera_iceberg_admin +\set db_password '\'password\'' + +CREATE DATABASE texera_iceberg_catalog; + +CREATE USER :db_user WITH ENCRYPTED PASSWORD :db_password; + +GRANT ALL PRIVILEGES ON DATABASE texera_iceberg_catalog TO :db_user; + +ALTER DATABASE texera_iceberg_catalog OWNER TO :db_user; + +\c texera_iceberg_catalog; + +GRANT ALL ON SCHEMA public TO :db_user;