55 */
66package org .hibernate .reactive .query .sqm .mutation .internal .cte ;
77
8+ import org .hibernate .dialect .Dialect ;
9+ import org .hibernate .dialect .PostgreSQLDialect ;
10+ import org .hibernate .dialect .function .array .DdlTypeHelper ;
811import org .hibernate .internal .util .MutableObject ;
912import org .hibernate .query .spi .DomainQueryExecutionContext ;
13+ import org .hibernate .query .spi .QueryOptions ;
1014import org .hibernate .query .sqm .internal .DomainParameterXref ;
1115import org .hibernate .query .sqm .internal .SqmJdbcExecutionContextAdapter ;
1216import org .hibernate .query .sqm .mutation .internal .cte .CteInsertHandler ;
1822import org .hibernate .reactive .sql .exec .internal .StandardReactiveSelectExecutor ;
1923import org .hibernate .reactive .sql .results .spi .ReactiveListResultsConsumer ;
2024import org .hibernate .sql .ast .tree .cte .CteTable ;
25+ import org .hibernate .sql .ast .tree .expression .JdbcParameter ;
26+ import org .hibernate .sql .exec .spi .ExecutionContext ;
27+ import org .hibernate .sql .exec .spi .JdbcLockStrategy ;
28+ import org .hibernate .sql .exec .spi .JdbcParameterBinder ;
29+ import org .hibernate .sql .exec .spi .JdbcParameterBinding ;
2130import org .hibernate .sql .exec .spi .JdbcParameterBindings ;
31+ import org .hibernate .sql .exec .spi .JdbcSelect ;
32+ import org .hibernate .sql .exec .spi .LoadedValuesCollector ;
33+ import org .hibernate .sql .exec .spi .StatementAccess ;
34+ import org .hibernate .sql .results .jdbc .spi .JdbcValuesMappingProducer ;
35+ import org .hibernate .type .spi .TypeConfiguration ;
2236
2337import java .lang .invoke .MethodHandles ;
38+ import java .sql .Connection ;
39+ import java .util .List ;
40+ import java .util .Set ;
2441import java .util .concurrent .CompletionStage ;
2542
2643public class ReactiveCteInsertHandler extends CteInsertHandler implements ReactiveHandler {
2744
2845 private static final Log LOG = LoggerFactory .make ( Log .class , MethodHandles .lookup () );
2946
47+ private final Dialect dialect ;
48+
3049 public ReactiveCteInsertHandler (
3150 CteTable cteTable ,
3251 SqmInsertStatement <?> sqmStatement ,
3352 DomainParameterXref domainParameterXref ,
3453 DomainQueryExecutionContext context ,
3554 MutableObject <JdbcParameterBindings > firstJdbcParameterBindingsConsumer ) {
3655 super ( cteTable , sqmStatement , domainParameterXref , context , firstJdbcParameterBindingsConsumer );
56+ this .dialect = context .getSession ().getDialect ();
3757 }
3858
3959 @ Override
@@ -45,11 +65,21 @@ public int execute(DomainQueryExecutionContext executionContext) {
4565 public CompletionStage <Integer > reactiveExecute (
4666 JdbcParameterBindings jdbcParameterBindings ,
4767 DomainQueryExecutionContext context ) {
68+ JdbcSelect jdbcSelect ;
69+
70+ if ( dialect instanceof PostgreSQLDialect ) {
71+ // need to replace parameters with explicit casts see https://github.com/eclipse-vertx/vertx-sql-client/issues/1540
72+ jdbcSelect = new PostgreSQLCteMutationSelect ( getSelect (), jdbcParameterBindings , context );
73+ }
74+ else {
75+ jdbcSelect = getSelect ();
76+ }
77+
4878 return ( (ReactiveSharedSessionContractImplementor ) context .getSession () )
49- .reactiveAutoFlushIfRequired ( getSelect () .getAffectedTableNames () )
79+ .reactiveAutoFlushIfRequired ( jdbcSelect .getAffectedTableNames () )
5080 .thenCompose ( v -> StandardReactiveSelectExecutor .INSTANCE
5181 .list (
52- getSelect () ,
82+ jdbcSelect ,
5383 jdbcParameterBindings ,
5484 SqmJdbcExecutionContextAdapter .omittingLockingAndPaging ( context ),
5585 row -> row [0 ],
@@ -60,4 +90,145 @@ public CompletionStage<Integer> reactiveExecute(
6090 .thenApply ( list -> ( (Number ) list .get ( 0 ) ).intValue () )
6191 );
6292 }
93+
94+ /*
95+ * A JdbcSelect wrapper that adds explicit type casts to parameters in the original SQL Select string.
96+ * This is needed for PostgreSQL when using CTEs for mutation statements,
97+ * See https://github.com/eclipse-vertx/vertx-sql-client/issues/1540 .
98+ */
99+ public static class PostgreSQLCteMutationSelect implements JdbcSelect {
100+ private final JdbcSelect delegate ;
101+ private final String sqlString ;
102+
103+ public PostgreSQLCteMutationSelect (
104+ JdbcSelect delegate ,
105+ JdbcParameterBindings jdbcParameterBindings ,
106+ DomainQueryExecutionContext context ) {
107+ this .delegate = delegate ;
108+ this .sqlString = getSqlStringWithExplicitParameterCasting ( delegate , jdbcParameterBindings , context );
109+ }
110+
111+ private static String getSqlStringWithExplicitParameterCasting (
112+ JdbcSelect original ,
113+ JdbcParameterBindings jdbcParameterBindings ,
114+ DomainQueryExecutionContext context ) {
115+ final StringBuilder newSelect = new StringBuilder ( original .getSqlString () );
116+ addExplicitCastToParameters (
117+ jdbcParameterBindings ,
118+ newSelect ,
119+ context .getSession ().getSessionFactory ().getMappingMetamodel ().getTypeConfiguration ()
120+ );
121+ return newSelect .toString ();
122+ }
123+
124+ private static void addExplicitCastToParameters (
125+ JdbcParameterBindings jdbcParameterBindings ,
126+ StringBuilder newSelect ,
127+ TypeConfiguration typeConfiguration ) {
128+ jdbcParameterBindings .visitBindings (
129+ (jdbcParameter , jdbcParameterBinding ) ->
130+ addExplicitCastToParameter (
131+ newSelect ,
132+ typeConfiguration ,
133+ jdbcParameter ,
134+ jdbcParameterBinding
135+ )
136+ );
137+ }
138+
139+ private static void addExplicitCastToParameter (
140+ StringBuilder newSelect ,
141+ TypeConfiguration typeConfiguration ,
142+ JdbcParameter jdbcParameter ,
143+ JdbcParameterBinding jdbcParameterBinding ) {
144+ final int index = jdbcParameter .getParameterId () + 1 ;
145+ final String parameterToReplace = "$" + index ;
146+ final int start = newSelect .indexOf ( parameterToReplace );
147+ newSelect .replace (
148+ start ,
149+ start + parameterToReplace .length (),
150+ parameterToReplace + "::" + DdlTypeHelper .getCastTypeName (
151+ jdbcParameterBinding .getBindType (),
152+ typeConfiguration
153+ )
154+ );
155+ }
156+
157+ @ Override
158+ public JdbcValuesMappingProducer getJdbcValuesMappingProducer () {
159+ return delegate .getJdbcValuesMappingProducer ();
160+ }
161+
162+ @ Override
163+ public JdbcLockStrategy getLockStrategy () {
164+ return delegate .getLockStrategy ();
165+ }
166+
167+ @ Override
168+ public boolean usesLimitParameters () {
169+ return delegate .usesLimitParameters ();
170+ }
171+
172+ @ Override
173+ public JdbcParameter getLimitParameter () {
174+ return delegate .getLimitParameter ();
175+ }
176+
177+ @ Override
178+ public int getRowsToSkip () {
179+ return delegate .getRowsToSkip ();
180+ }
181+
182+ @ Override
183+ public int getMaxRows () {
184+ return delegate .getMaxRows ();
185+ }
186+
187+ @ Override
188+ public LoadedValuesCollector getLoadedValuesCollector () {
189+ return delegate .getLoadedValuesCollector ();
190+ }
191+
192+ @ Override
193+ public void performPreActions (
194+ StatementAccess jdbcStatementAccess ,
195+ Connection jdbcConnection ,
196+ ExecutionContext executionContext ) {
197+ delegate .performPreActions ( jdbcStatementAccess , jdbcConnection , executionContext );
198+ }
199+
200+ @ Override
201+ public void performPostAction (
202+ boolean succeeded ,
203+ StatementAccess jdbcStatementAccess ,
204+ Connection jdbcConnection ,
205+ ExecutionContext executionContext ) {
206+ delegate .performPostAction ( succeeded , jdbcStatementAccess , jdbcConnection , executionContext );
207+ }
208+
209+ @ Override
210+ public boolean dependsOnParameterBindings () {
211+ return delegate .dependsOnParameterBindings ();
212+ }
213+
214+ @ Override
215+ public boolean isCompatibleWith (JdbcParameterBindings jdbcParameterBindings , QueryOptions queryOptions ) {
216+ return delegate .isCompatibleWith ( jdbcParameterBindings , queryOptions );
217+ }
218+
219+ @ Override
220+ public Set <String > getAffectedTableNames () {
221+ return delegate .getAffectedTableNames ();
222+ }
223+
224+ @ Override
225+ public String getSqlString () {
226+ return sqlString ;
227+ }
228+
229+ @ Override
230+ public List <JdbcParameterBinder > getParameterBinders () {
231+ return delegate .getParameterBinders ();
232+ }
233+ }
63234}
0 commit comments