@@ -3,96 +3,131 @@ package io.exoquery.controller.r2dbc
33import io.exoquery.controller.*
44import io.r2dbc.spi.Connection
55import io.r2dbc.spi.ConnectionFactory
6- import io.r2dbc.spi.Result
6+ import io.r2dbc.spi.Row
7+ import io.r2dbc.spi.Statement
78import kotlinx.coroutines.flow.Flow
8- import kotlinx.coroutines.flow.collect
99import kotlinx.coroutines.flow.emptyFlow
10- import kotlinx.coroutines.flow.flatMapConcat
10+ import kotlinx.coroutines.flow.map
1111import kotlinx.coroutines.flow.toList
1212import kotlinx.coroutines.reactive.asFlow
1313import kotlinx.coroutines.reactive.awaitFirstOrNull
14- import kotlinx.coroutines.reactive.awaitSingle
15- import org.reactivestreams.Publisher
14+ import kotlinx.coroutines.reactive.collect
1615
1716// Minimal execution options placeholder for R2DBC
1817data class R2dbcExecutionOptions (
19- val debug : Boolean = false
18+ val sessionTimeout : Int? = null ,
19+ val fetchSize : Int? = null ,
20+ val queryTimeout : Int? = null
2021) {
2122 companion object {
2223 fun Default () = R2dbcExecutionOptions ()
2324 }
2425}
2526
26- class R2dbcController (private val connectionFactory : ConnectionFactory ): ControllerVerbs<R2dbcExecutionOptions> {
27+ class R2dbcController (
28+ override val encodingConfig : R2dbcEncodingConfig = R2dbcEncodingConfig .Default (),
29+ override val connectionFactory : ConnectionFactory
30+ ):
31+ WithEncoding <Connection , Statement , Row >,
32+ ControllerVerbs <R2dbcExecutionOptions >,
33+ HasTransactionalityR2dbc
34+ {
2735 override fun DefaultOpts (): R2dbcExecutionOptions = R2dbcExecutionOptions .Default ()
2836
37+ override val encodingApi: R2dbcSqlEncoding =
38+ object : JavaSqlEncoding <Connection , Statement , Row >,
39+ BasicEncoding <Connection , Statement , Row > by R2dbcBasicEncoding ,
40+ JavaTimeEncoding <Connection , Statement , Row > by R2dbcTimeEncoding ,
41+ JavaUuidEncoding <Connection , Statement , Row > by R2dbcUuidEncoding {}
42+
43+
2944 // Helper to create a connection and ensure closure
30- private suspend fun <T > withConnection (block : suspend (Connection ) -> T ): T {
31- val conn = connectionFactory.create().awaitSingle()
32- try {
33- return block(conn)
34- } finally {
35- conn.close().awaitFirstOrNull()
45+ // private suspend fun <T> withConnection(block: suspend (Connection) -> T): T {
46+ // val conn = connectionFactory.create().awaitSingle()
47+ // try {
48+ // return block(conn)
49+ // } finally {
50+ // conn.close().awaitFirstOrNull()
51+ // }
52+ // }
53+
54+ override fun extractColumnInfo (row : Row ): List <ColumnInfo >? {
55+ val meta = row.metadata
56+ val cols = meta.columnMetadatas
57+ return cols.map { cmd ->
58+ ColumnInfo (cmd.name, cmd.type.name)
3659 }
3760 }
3861
39- private fun bindParams (stmt : io.r2dbc.spi.Statement , params : List <StatementParam <* >>) {
40- var idx = 0
41- for (p in params) {
42- // Bind by index; most R2DBC drivers accept basic Kotlin/Java types directly
43- @Suppress(" UNCHECKED_CAST" )
44- val v: Any? = (p.value as Any? )
45- stmt.bind(idx, v)
46- idx + = 1
62+ override suspend fun <T > stream (act : ControllerQuery <T >, options : R2dbcExecutionOptions ): Flow <T > =
63+ withConnection(options) {
64+ val conn = localConnection()
65+ accessStmt(act.sql, conn) { stmt ->
66+ prepare(stmt, conn, act.params)
67+ val pub = stmt.execute() // TODO try-catch here?
68+ pub.awaitFirstOrNull()?.map { row, meta ->
69+ val resultMaker = act.resultMaker.makeExtractor(QueryDebugInfo (act.sql))
70+ PubResult (resultMaker(conn, row))
71+ }?.asFlow()?.map { it.value } ? : emptyFlow()
72+ }
4773 }
48- }
49-
50- override suspend fun <T > stream (query : ControllerQuery <T >, options : R2dbcExecutionOptions ): Flow <T > {
51- // Decoding using resultMaker requires a full encoding implementation which is out of scope here.
52- // Provided for API completeness.
53- throw UnsupportedOperationException (" R2dbcController.stream(query) decoding not yet implemented" )
54- }
5574
5675 override suspend fun <T > stream (query : ControllerBatchActionReturning <T >, options : R2dbcExecutionOptions ): Flow <T > {
5776 throw UnsupportedOperationException (" R2dbcController.stream(batchReturning) not yet implemented" )
5877 }
5978
60- override suspend fun <T > stream (query : ControllerActionReturning <T >, options : R2dbcExecutionOptions ): Flow <T > {
61- throw UnsupportedOperationException (" R2dbcController.stream(actionReturning) not yet implemented" )
62- }
79+ override suspend fun <T > stream (act : ControllerActionReturning <T >, options : R2dbcExecutionOptions ): Flow <T > =
80+ withConnection(options) {
81+ val conn = localConnection()
82+ accessStmt(act.sql, conn) { stmt ->
83+ prepare(stmt, conn, act.params)
84+ val results = mutableListOf<List <Pair <String , String ?>>>()
85+ val pub = stmt.execute() // TODO try-catch here?
86+ // Each Result may contain rows; map them to name->string pairs for all columns
87+
88+ // convert the publisher into a suspeding function
89+ pub.awaitFirstOrNull()?.map { row, meta ->
90+ val resultMaker = act.resultMaker.makeExtractor(QueryDebugInfo (act.sql))
91+ PubResult (resultMaker(conn, row))
92+ }?.asFlow()?.map { it.value } ? : emptyFlow()
93+ }
94+ }
95+
96+ /* * Need a temporary wrapper to work around limitation of pub-result being not-nullable */
97+ @JvmInline
98+ private value class PubResult <T >(val value : T )
6399
64100 override suspend fun <T > run (query : ControllerQuery <T >, options : R2dbcExecutionOptions ): List <T > = stream(query, options).toList()
65101
66- override suspend fun run (query : ControllerAction , options : R2dbcExecutionOptions ): Long =
67- withConnection { conn ->
68- val stmt = conn.createStatement(query.sql)
69- bindParams(stmt, query.params)
70- // Execute and sum rowsUpdated across possibly multiple results
71- val pub = stmt.execute()
72- var total = 0L
73- pub.asFlow().collect { result ->
74- val updated = result.rowsUpdated.awaitFirstOrNull() ? : 0
75- total + = updated.toLong()
102+ override suspend fun run (act : ControllerAction , options : R2dbcExecutionOptions ): Long =
103+ withConnection(options) {
104+ val conn = localConnection()
105+ accessStmt(act.sql, conn) { stmt ->
106+ prepare(stmt, conn, act.params)
107+ // Execute and sum rowsUpdated across possibly multiple results
108+ val pub = stmt.execute()
109+ pub.awaitFirstOrNull()?.rowsUpdated?.awaitFirstOrNull() ? : 0
76110 }
77- total
78111 }
79112
80113 override suspend fun run (query : ControllerBatchAction , options : R2dbcExecutionOptions ): List <Long > =
81- withConnection { conn ->
114+ withConnection(options) {
115+ val conn = localConnection()
82116 // TODO this statement works very well with caching, should look into reusing statements across calls
83- val stmt = conn.createStatement(query.sql)
84- // Add batches
85- query.params.forEach { batch ->
86- bindParams(stmt, batch)
87- stmt.add()
88- }
89- val results = mutableListOf<Long >()
90- val pub = stmt.execute()
91- pub.asFlow().collect { result ->
92- val updated = result.rowsUpdated.awaitFirstOrNull() ? : 0
93- results.add(updated.toLong())
117+ accessStmtReturning(query.sql, conn, options, emptyList()) { stmt ->
118+ query.params.forEach { batch ->
119+ prepare(stmt, conn, batch)
120+ stmt.add()
121+ }
122+ val results = mutableListOf<Long >()
123+ val pub = stmt.execute()
124+ // Here using the asFlow and connect actually makes sense because multiple results are expected
125+ pub.asFlow().collect { result ->
126+ val updated = result.rowsUpdated.awaitFirstOrNull() ? : 0
127+ results.add(updated.toLong())
128+ }
129+ results
94130 }
95- results
96131 }
97132
98133 override suspend fun <T > run (query : ControllerActionReturning <T >, options : R2dbcExecutionOptions ): T =
@@ -101,40 +136,25 @@ class R2dbcController(private val connectionFactory: ConnectionFactory): Control
101136 override suspend fun <T > run (query : ControllerBatchActionReturning <T >, options : R2dbcExecutionOptions ): List <T > =
102137 stream(query, options).toList()
103138
104- override suspend fun <T > runRaw (query : ControllerQuery <T >, options : R2dbcExecutionOptions ): List <List <Pair <String , String ?>>> =
105- withConnection { conn ->
139+ override suspend fun <T > runRaw (query : ControllerQuery <T >, options : R2dbcExecutionOptions ) =
140+ withConnection(options) {
141+ val conn = localConnection()
106142 val stmt = conn.createStatement(query.sql)
107- bindParams (stmt, query.params)
143+ prepare (stmt, conn , query.params)
108144 val results = mutableListOf<List <Pair <String , String ?>>>()
109145 val pub = stmt.execute()
110146 // Each Result may contain rows; map them to name->string pairs for all columns
111147
112148 // convert the publisher into a suspeding function
113- // pub.awaitFirstOrNull()?.map { row, meta ->
114- // val cols = meta.columnMetadatas
115- // val pairs = cols.mapIndexed { i, md ->
116- // val name = md.name
117- // val value = row.get(i, Any::class.java)
118- // name to value?.toString()
119- // }
120- // pairs
121- // }?.asFlow() ?: emptyFlow()
122-
123- pub.asFlow()
124- .flatMapConcat { r ->
125- r.map { row, meta ->
126- val cols = meta.columnMetadatas
127- val pairs = cols.mapIndexed { i, md ->
128- val name = md.name
129- val value = row.get(i, Any ::class .java)
130- name to value?.toString()
131- }
132- pairs
133- }.asFlow()
134- }
135- .collect { rowPairs ->
136- results.add(rowPairs)
149+ pub.awaitFirstOrNull()?.map { row, meta ->
150+ val cols = meta.columnMetadatas
151+ val pairs = cols.mapIndexed { i, md ->
152+ val name = md.name
153+ val value = row.get(i, Any ::class .java)
154+ name to value?.toString()
137155 }
156+ pairs
157+ }?.collect { rowPairs -> results.add(rowPairs) }
138158 results
139159 }
140160}
0 commit comments