Commit 753c299
Add Storage Layer In Python (#3224)
This PR adds a storage layer implementation on the Python side of
Texera's codebase, mirroring the implementation of our Java-based
storage layer.
## Motivation
- The primary motivation of having a storage layer in Python so that we
can let Python UDF operators' ports write directly to result tables
without needing to send the results back to Java.
- In the future we will also use the Python storage layer for UDF logs
and workflow runtime statistics.
## Storage APIs
- There are 3 abstract classes in Java's storage implementation:
- `ReadOnlyVirtualDocument` for read-only tables
- `VirtualDocument` for tables supporting both read and write
operations.
- `BufferedItemWriter` as a writer class of `VirtualDocument`
- We mirror the implementation in Python, but keep only the APIs
relevant to table storage (e.g., APIs related to dataset storage are not
kept in Python.)
## Iceberg Document
Following #3147, we add a table-storage implementation based on Apache
Iceberg (pyiceberg), including `IcebergDocument`, `IcebergTableWriter`,
`IcebergCatalogInstance`, and related util functions and tests.
### Limitations of / TODOs for python implementation
pyiceberg is less mature than its java-based counterpart. As a result
there are a few functionalities not supported in our current Python
storage implementation.
#### Incremental Read
Incremental Read is not supported by pyiceberg. It will be supported [in
the future](apache/iceberg-python#533). Before
then we will not include incremental read in our Python codebase (it is
also not currently needed)
#### Concurrent writers
Iceberg uses optimistic concurrency control for concurrent writers. Java
Iceberg natively supports retry with configurable retry parameters,
using exponential backoff (without randomness). However pyiceberg does
not currently support retry. We implemented an ad-hoc custom retry
mechanism in `IcebergTableWriter`, using exponential random backoff
based on the [tenacity](https://tenacity.readthedocs.io/en/latest/)
library. It has a good speed (~0.6s for 10 concurrent writers writing
20K tuples) and is faster than Java’s iceberg-native retry (~6 seconds
for the same test). We may need to re-evaluate this custom
implementation if pyiceberg supports retry natively in the future.
## Iceberg Catalog
pyiceberg only supports SQL catalog (postgreSQL to be specific) and REST
catalog for production. We use postgresql based SQL catalog in this
implementation for the following reasons:
- It supports local storage.
- We tested that it is works with both Java and Python iceberg storage.
- It is easier to set up for developers (compared to REST services).
### PostgreSQL setup
Python storage layer requires a running postgreSQL service in the
environment, and an empty database for iceberg to work.
- **A script to set up a new postgres database for Texera's iceberg
storage has been added for CI tests.**
- The database will be used by pyiceberg to manage the catalog.
- The logic to setup the database is added in GitHub CI config.
- Java side can continue using Hadoop-based catalog for now until we add
storage on operator ports for both Java and Python.
- As the Python storage is not currently used by Python workers, no
action is required for developers for now.
### REST catalogs (feel free to skip this section)
I also explored 3 major REST catalog implementations
([lakekeeper](https://lakekeeper.io),
[polaris](https://polaris.apache.org), and
[gravitino](https://gravitino.apache.org)) and here are some
observations:
- REST catalogs are the trend primarily because different query engines
(Spark, Flink, Snowflake, etc.) relying on iceberg need a central place
to keep and manage the catalogs. Under the hood they all still use some
database as their storage layer.
- Most of them support / recommend cloud storage only in production and
do not support local storage.
- They are incubating projects and lack documentation. For example I
find it very hard to set up authentication (as pyiceberg requires
authentication to work with REST catalogs) using gravitino, and using
them will add a lot more burden to our developers.
- I have successfully made polaris work with our implementation after
setting up auth, but somehow it was very very slow.
- As postgres catalog is working, we will explore more about REST
catalog in the future if have migrated to cloud storage and have
scalability issues.
## Storage configurations
A static class `StorageConfigs` is added to manage storage-related
configurations. We do NOT read the configs from files. Instead we will
let Java pass the configs to Python worker, and the config will be
filled when initializing the worker. The storage config is hardcoded in
CI tests.
## Other items
`VFSURIFactory` and `DocumentFactory` are added in Python storage layer
mirroring the Java implementations.
## TODO for Java Storage
- Add SQL catalog as another type of iceberg catalog
---------
Co-authored-by: Jiadong Bai <43344272+bobbai00@users.noreply.github.com>1 parent a536172 commit 753c299
File tree
19 files changed
+1328
-3
lines changed- .github/workflows
- core
- amber
- src/main/python/core
- models/schema
- storage
- iceberg
- model
- scripts/sql
19 files changed
+1328
-3
lines changed| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
111 | 111 | | |
112 | 112 | | |
113 | 113 | | |
| 114 | + | |
| 115 | + | |
| 116 | + | |
| 117 | + | |
| 118 | + | |
| 119 | + | |
| 120 | + | |
114 | 121 | | |
115 | 122 | | |
116 | 123 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
26 | 26 | | |
27 | 27 | | |
28 | 28 | | |
29 | | - | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
Lines changed: 3 additions & 1 deletion
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
42 | 42 | | |
43 | 43 | | |
44 | 44 | | |
45 | | - | |
| 45 | + | |
46 | 46 | | |
47 | 47 | | |
48 | 48 | | |
49 | 49 | | |
50 | 50 | | |
51 | 51 | | |
| 52 | + | |
52 | 53 | | |
53 | 54 | | |
54 | 55 | | |
| 56 | + | |
55 | 57 | | |
56 | 58 | | |
57 | 59 | | |
| |||
Lines changed: 1 addition & 1 deletion
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
27 | 27 | | |
28 | 28 | | |
29 | 29 | | |
30 | | - | |
| 30 | + | |
31 | 31 | | |
32 | 32 | | |
33 | 33 | | |
| |||
Whitespace-only changes.
Lines changed: 103 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
| 1 | + | |
| 2 | + | |
| 3 | + | |
| 4 | + | |
| 5 | + | |
| 6 | + | |
| 7 | + | |
| 8 | + | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
| 38 | + | |
| 39 | + | |
| 40 | + | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
| 44 | + | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
| 48 | + | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
| 52 | + | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
| 56 | + | |
| 57 | + | |
| 58 | + | |
| 59 | + | |
| 60 | + | |
| 61 | + | |
| 62 | + | |
| 63 | + | |
| 64 | + | |
| 65 | + | |
| 66 | + | |
| 67 | + | |
| 68 | + | |
| 69 | + | |
| 70 | + | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
| 74 | + | |
| 75 | + | |
| 76 | + | |
| 77 | + | |
| 78 | + | |
| 79 | + | |
| 80 | + | |
| 81 | + | |
| 82 | + | |
| 83 | + | |
| 84 | + | |
| 85 | + | |
| 86 | + | |
| 87 | + | |
| 88 | + | |
| 89 | + | |
| 90 | + | |
| 91 | + | |
| 92 | + | |
| 93 | + | |
| 94 | + | |
| 95 | + | |
| 96 | + | |
| 97 | + | |
| 98 | + | |
| 99 | + | |
| 100 | + | |
| 101 | + | |
| 102 | + | |
| 103 | + | |
Whitespace-only changes.
Lines changed: 43 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
| 1 | + | |
| 2 | + | |
| 3 | + | |
| 4 | + | |
| 5 | + | |
| 6 | + | |
| 7 | + | |
| 8 | + | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
| 38 | + | |
| 39 | + | |
| 40 | + | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
0 commit comments