@@ -14,7 +14,7 @@ import {
1414 VulcanExtensionId ,
1515} from '@vulcan-sql/core' ;
1616import * as path from 'path' ;
17- import { buildSQL } from './sqlBuilder' ;
17+ import { buildSQL , chunkSize } from './sqlBuilder' ;
1818import { DuckDBExtensionLoader } from './duckdbExtensionLoader' ;
1919
2020const getType = ( value : any ) => {
@@ -99,49 +99,31 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
9999 }
100100 const { db, configurationParameters, ...options } =
101101 this . dbMapping . get ( profileName ) ! ;
102- const builtSQL = buildSQL ( sql , operations ) ;
102+ const [ firstDataSQL , restDataSQL ] = buildSQL ( sql , operations ) ;
103+
103104 // create new connection for each query
105+ const parameters = Array . from ( bindParams . values ( ) ) ;
106+ this . logRequest ( firstDataSQL , parameters , options ) ;
104107 const connection = db . connect ( ) ;
105108 await this . loadExtensions ( connection , configurationParameters ) ;
106- const statement = connection . prepare ( builtSQL ) ;
107- const parameters = Array . from ( bindParams . values ( ) ) ;
108- this . logRequest ( builtSQL , parameters , options ) ;
109-
110- const result = await statement . stream ( ...parameters ) ;
111- const firstChunk = await result . nextChunk ( ) ;
109+ if ( restDataSQL ) this . logRequest ( restDataSQL , parameters , options ) ;
110+ const [ firstData , restDataStream ] = await this . acquireData (
111+ firstDataSQL ,
112+ restDataSQL ,
113+ parameters ,
114+ db
115+ ) ;
116+ const readable = this . createReadableStream ( firstData , restDataStream ) ;
112117 return {
113118 getColumns : ( ) => {
114- if ( ! firstChunk || firstChunk . length === 0 ) return [ ] ;
115- return Object . keys ( firstChunk [ 0 ] ) . map ( ( columnName ) => ( {
119+ if ( ! firstData || firstData . length === 0 ) return [ ] ;
120+ return Object . keys ( firstData [ 0 ] ) . map ( ( columnName ) => ( {
116121 name : columnName ,
117- type : getType ( firstChunk [ 0 ] [ columnName as any ] ) ,
122+ type : getType ( firstData [ 0 ] [ columnName as any ] ) ,
118123 } ) ) ;
119124 } ,
120125 getData : ( ) => {
121- const stream = new Readable ( {
122- objectMode : true ,
123- read ( ) {
124- result . nextChunk ( ) . then ( ( chunk ) => {
125- if ( ! chunk ) {
126- this . push ( null ) ;
127- return ;
128- }
129- for ( const row of chunk ) {
130- this . push ( row ) ;
131- }
132- } ) ;
133- } ,
134- } ) ;
135- // Send the first chunk
136- if ( firstChunk ) {
137- for ( const row of firstChunk ) {
138- stream . push ( row ) ;
139- }
140- } else {
141- // If there is no data, close the stream.
142- stream . push ( null ) ;
143- }
144- return stream ;
126+ return readable ;
145127 } ,
146128 } ;
147129 }
@@ -150,6 +132,73 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
150132 return `$${ parameterIndex } ` ;
151133 }
152134
135+ private createReadableStream (
136+ firstData : duckdb . TableData ,
137+ restDataStream : duckdb . QueryResult | undefined
138+ ) {
139+ const readable = new Readable ( {
140+ objectMode : true ,
141+ read : function ( ) {
142+ for ( const row of firstData ) {
143+ this . push ( row ) ;
144+ }
145+ this . push ( null ) ;
146+ } ,
147+ } ) ;
148+ if ( firstData . length >= chunkSize ) {
149+ readable . _read = async function ( ) {
150+ if ( restDataStream ) {
151+ for await ( const row of restDataStream ) {
152+ this . push ( row ) ;
153+ }
154+ this . push ( null ) ;
155+ }
156+ } ;
157+ if ( firstData ) {
158+ for ( const row of firstData ) {
159+ readable . push ( row ) ;
160+ }
161+ }
162+ }
163+ return readable ;
164+ }
165+
166+ private async acquireData (
167+ firstDataSql : string ,
168+ restDataSql : string | undefined ,
169+ parameters : any [ ] ,
170+ db : duckdb . Database
171+ ) {
172+ // conn.all() is faster then stream.checkChunk().
173+ // For the small size data we use conn.all() to get the data at once
174+ // To limit memory use and prevent server crashes, we will use conn.all() to acquire the initial chunk of data, then conn.stream() to receive the remainder of the data.
175+ return await Promise . all ( [
176+ new Promise < duckdb . TableData > ( ( resolve , reject ) => {
177+ const c = db . connect ( ) ;
178+ c . all (
179+ firstDataSql ,
180+ ...parameters ,
181+ ( err : duckdb . DuckDbError | null , res : duckdb . TableData ) => {
182+ if ( err ) {
183+ reject ( err ) ;
184+ }
185+ resolve ( res ) ;
186+ }
187+ ) ;
188+ } ) ,
189+ new Promise < duckdb . QueryResult | undefined > ( ( resolve , reject ) => {
190+ if ( ! restDataSql ) resolve ( undefined ) ;
191+ try {
192+ const c = db . connect ( ) ;
193+ const result = c . stream ( restDataSql , ...parameters ) ;
194+ resolve ( result ) ;
195+ } catch ( err : any ) {
196+ reject ( err ) ;
197+ }
198+ } ) ,
199+ ] ) ;
200+ }
201+
153202 private logRequest (
154203 sql : string ,
155204 parameters : string [ ] ,
@@ -246,9 +295,25 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
246295 } ) ;
247296 }
248297
298+ // The dafault duckdb thread is 16
299+ // Setting thread below your CPU core number may result in enhanced performance, according to our observations.
300+ private async setThread ( db : duckdb . Database ) {
301+ const thread = process . env [ 'DUCKDB_THREADS' ] ;
302+
303+ if ( ! thread ) return ;
304+ await new Promise ( ( resolve , reject ) => {
305+ db . run ( `SET threads=${ Number ( thread ) } ` , ( err : any ) => {
306+ if ( err ) reject ( err ) ;
307+ this . logger . debug ( `Set thread to ${ thread } ` ) ;
308+ resolve ( true ) ;
309+ } ) ;
310+ } ) ;
311+ }
312+
249313 private async initDatabase ( dbPath : string ) {
250314 const db = new duckdb . Database ( dbPath ) ;
251315 const conn = db . connect ( ) ;
316+ await this . setThread ( db ) ;
252317 await this . installExtensions ( conn ) ;
253318 return db ;
254319 }
0 commit comments