Skip to content

Comments

Add TableSink operator with Java/Spark implementations#665

Draft
harrygav wants to merge 5 commits intoapache:mainfrom
harrygav:introduce_postgresql_sink
Draft

Add TableSink operator with Java/Spark implementations#665
harrygav wants to merge 5 commits intoapache:mainfrom
harrygav:introduce_postgresql_sink

Conversation

@harrygav
Copy link

Summary

This PR introduces a new TableSink operator for writing Record data into a database table via JDBC, with implementations for the Java and Spark platforms.

Opening as Draft to start discussion on the operator design and expected behavior.

Changes

  • New operator: TableSink (in wayang-basic)

    • A UnarySink<Record> that targets a table name and accepts JDBC connection Properties
    • Supports a write mode (e.g. overwrite) and optional column names
  • Java platform: JavaTableSink (in wayang-java)

    • JDBC-based implementation that can create the target table (if missing) and batch-insert records
    • Supports overwrite by dropping the target table first
  • Spark platform: SparkTableSink (in wayang-spark)

    • Spark-side implementation of the same TableSink operator

Notes / open questions

  • This started as a PostgreSQL sink, but the intention should likely be a generic JDBC sink that works across multiple databases.
  • DDL generation is currently basic (e.g., columns are auto-created as VARCHARs)
  • mode behavior (overwrite vs append, etc.) should be agreed on and formalized.

How to use / test

To run end-to-end locally, you currently need an external PostgreSQL instance available and provide JDBC connection details (driver/url/user/password) in the test setup/environment.

@juripetersen
Copy link
Contributor

Thanks @harrygav, this is great!

Could we make TableSink generic over its input type and thus make DDL generation easier with reflections on the given type?

@novatechflow
Copy link
Member

Thank you - just to make the tests running, how's about mocking the JDBC layer?

Wrap DriverManager.getConnection/Connection in a small interface (e.g., JdbcClient) and inject a fake in tests. Then assert SQL statements and batch parameters without a real DB.

@harrygav
Copy link
Author

Thanks @harrygav, this is great!

Could we make TableSink generic over its input type and thus make DDL generation easier with reflections on the given type?

I will take a look and update the PR to continue the discussion!

@zkaoudi
Copy link
Contributor

zkaoudi commented Feb 11, 2026

Hey @harrygav, any news on this? Apparently a table sink is crucial for many things and we would like to start using it already.

@zkaoudi
Copy link
Contributor

zkaoudi commented Feb 11, 2026

Also, another question: wouldn't it make sense to also have a sink for a database? Now you have implemented one execution operator for Java and one for Spark but why not for a database?
One reason where that would be desirable could be if you have data in one database and you would like to extract some data from it and import it into another one (or think an ETL pipeline).

@harrygav
Copy link
Author

Hi @zkaoudi, nice to hear that the PR will be useful, I will follow up on this by the end of the week!

Thanks for your input, I think there are many things to be clarified for the sink operator, but I guess we will figure them out once we know more about the targeted use cases we want to cover. With the current implementation, you could do the ETL pipeline you mention through the Java or Spark platforms: e.g., Source(Java/Spark from DBMS1)->ETL(Java/Spark)->Sink(Java/Spark to DBMS2). Or where you thinking to write from DBMS1 into DBMS2 directly without any intermediate Java/Spark platform step? That would also be interesting for some use cases (improved perf) but becomes cumbersome to maintain in terms of interoperability.

Let me know what you think!

@zkaoudi
Copy link
Contributor

zkaoudi commented Feb 12, 2026

Yes I was thinking about directly writing from DBMS1 to DBMS2. For example, if you have two tables in two DBMSs and you want to join them and write the result into DBMS2 without doing the join in Spark or java. What do you mean with issues of interoperability?

@zkaoudi
Copy link
Contributor

zkaoudi commented Feb 12, 2026

But on a second thought, what I described above as a scenario is more like a conversion operator in addition to a sink. You would ideally want to create a temp table to do the join and then persist the result.

sujayxbarui and others added 4 commits February 23, 2026 22:01
Introduce generic type support and dialect-aware SQL mapping using Calcite. Add extensive H2 integration tests for Java and Spark covering various edge cases.
@harrygav
Copy link
Author

harrygav commented Feb 23, 2026

Hi all, picking up this one again. I just pushed a commit with the update:

  • Support for more types, using reflection. Currently we support POJOs but also Wayang Records.
  • Support for different backend DBMSes, added the Calcite dependency to wayang-basic for that
  • Introduced more tests for the JavaTableSink and SparkTableSink
  • Added the H2 in-memory DBMS for the tests locally, removing the dependency on an external PostgreSQL DBMS running

I think it would be wise to add a couple of DBMSes for the tests, which would also be useful for the source operators or supporting JDBC platforms themselves. This could be done either through their embedded versions or through maven testcontainers. Then, we could add support for sinks on other platforms, e.g., the JDBC platform, to also support DBMS->DBMS workloads.

Let me know what you think about the PR, and if we want to do some of the next steps (e.g., testing) here or in another PR.

description: Apache Wayang is the first cross-platform data processing system.
homepage: https://wayang.apache.org/
description: Apache Wayang(incubating) is the first cross-platform data processing system.
homepage: https://wayang.incubator.apache.org/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not modify this file, it seems it is an old version.

@zkaoudi
Copy link
Contributor

zkaoudi commented Feb 24, 2026

Thanks a lot for your contribution Harry!
Everything seems in order, except an override to a modified file (see review above).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants