@@ -70,7 +70,6 @@ class Client extends EventEmitter {
7070 this . _activeQuery = null
7171 this . _pipelining = false
7272 this . _pipelineQueue = [ ]
73- this . _pipelineSync = false
7473
7574 this . enableChannelBinding = Boolean ( c . enableChannelBinding ) // set true to use SCRAM-SHA-256-PLUS when offered
7675 this . connection =
@@ -365,22 +364,18 @@ class Client extends EventEmitter {
365364 this . emit ( 'connect' )
366365 }
367366
368- if ( this . _pipelining ) {
369- // In pipeline mode, readyForQuery indicates end of current query in pipeline
370- const activeQuery = this . _getActiveQuery ( )
371- if ( activeQuery ) {
372- activeQuery . handleReadyForQuery ( this . connection )
373- // Remove completed query from pipeline queue
374- const index = this . _pipelineQueue . indexOf ( activeQuery )
375- if ( index > - 1 ) {
376- this . _pipelineQueue . splice ( index , 1 )
377- }
367+ if ( this . _pipelining && this . _pipelineQueue . length > 0 ) {
368+ // In pipeline mode, readyForQuery indicates completion of the current query
369+ const completedQuery = this . _pipelineQueue . shift ( )
370+ if ( completedQuery ) {
371+ completedQuery . handleReadyForQuery ( this . connection )
378372 }
379-
373+
380374 // Set next query as active if available
381375 if ( this . _pipelineQueue . length > 0 ) {
382376 this . _activeQuery = this . _pipelineQueue [ 0 ]
383377 } else {
378+ // All pipeline queries completed
384379 this . _activeQuery = null
385380 this . readyForQuery = true
386381 this . emit ( 'drain' )
@@ -631,11 +626,10 @@ class Client extends EventEmitter {
631626
632627 // In pipeline mode, only extended query protocol is allowed
633628 if ( this . _pipelining ) {
634- // TODO: better check!
635629 if ( typeof config === 'string' ) {
636630 throw new Error ( 'Simple query protocol is not allowed in pipeline mode. Use parameterized queries instead.' )
637631 }
638- if ( query . text && query . text . includes ( ';' ) ) {
632+ if ( query . text && query . text . split ( ';' ) . filter ( s => s . trim ( ) ) . length > 1 ) {
639633 throw new Error ( 'Multiple SQL commands in a single query are not allowed in pipeline mode.' )
640634 }
641635 }
@@ -809,12 +803,12 @@ class Client extends EventEmitter {
809803 while ( this . _queryQueue . length > 0 ) {
810804 const query = this . _queryQueue . shift ( )
811805 this . _pipelineQueue . push ( query )
812-
806+
813807 // Force extended query protocol for pipeline mode
814808 if ( ! query . requiresPreparation ( ) ) {
815809 query . queryMode = 'extended'
816810 }
817-
811+
818812 const queryError = query . submit ( this . connection )
819813 if ( queryError ) {
820814 process . nextTick ( ( ) => {
@@ -829,12 +823,16 @@ class Client extends EventEmitter {
829823 }
830824 }
831825
832- // Set active query to first in pipeline if we don't have one
833- if ( ! this . _getActiveQuery ( ) && this . _pipelineQueue . length > 0 ) {
834- this . _activeQuery = this . _pipelineQueue [ 0 ]
826+ // Send sync message to end the pipeline batch
827+ if ( this . _pipelineQueue . length > 0 && this . _queryQueue . length === 0 ) {
828+ this . connection . sync ( )
829+ // Set active query to first in pipeline to start processing responses
830+ if ( ! this . _getActiveQuery ( ) && this . _pipelineQueue . length > 0 ) {
831+ this . _activeQuery = this . _pipelineQueue [ 0 ]
832+ }
835833 }
836834
837- this . readyForQuery = true
835+ this . readyForQuery = false // We're not ready for more queries until pipeline completes
838836 }
839837}
840838
0 commit comments