Skip to content

Commit deb7f76

Browse files
authored
feat: pipeline
1 parent 27a2754 commit deb7f76

File tree

1 file changed

+151
-5
lines changed

1 file changed

+151
-5
lines changed

packages/pg/lib/client.js

Lines changed: 151 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ class Client extends EventEmitter {
6868
this._connectionError = false
6969
this._queryable = true
7070
this._activeQuery = null
71+
this._pipelining = false
72+
this._pipelineQueue = []
73+
this._pipelineSync = false
7174

7275
this.enableChannelBinding = Boolean(c.enableChannelBinding) // set true to use SCRAM-SHA-256-PLUS when offered
7376
this.connection =
@@ -123,8 +126,15 @@ class Client extends EventEmitter {
123126
this._activeQuery = null
124127
}
125128

129+
// Error all queued queries
126130
this._queryQueue.forEach(enqueueError)
127131
this._queryQueue.length = 0
132+
133+
// Error all pipeline queries
134+
if (this._pipelining) {
135+
this._pipelineQueue.forEach(enqueueError)
136+
this._pipelineQueue.length = 0
137+
}
128138
}
129139

130140
_connect(callback) {
@@ -354,12 +364,36 @@ class Client extends EventEmitter {
354364
}
355365
this.emit('connect')
356366
}
357-
const activeQuery = this._getActiveQuery()
358-
this._activeQuery = null
359-
this.readyForQuery = true
360-
if (activeQuery) {
361-
activeQuery.handleReadyForQuery(this.connection)
367+
368+
if (this._pipelining) {
369+
// In pipeline mode, readyForQuery indicates end of current query in pipeline
370+
const activeQuery = this._getActiveQuery()
371+
if (activeQuery) {
372+
activeQuery.handleReadyForQuery(this.connection)
373+
// Remove completed query from pipeline queue
374+
const index = this._pipelineQueue.indexOf(activeQuery)
375+
if (index > -1) {
376+
this._pipelineQueue.splice(index, 1)
377+
}
378+
}
379+
380+
// Set next query as active if available
381+
if (this._pipelineQueue.length > 0) {
382+
this._activeQuery = this._pipelineQueue[0]
383+
} else {
384+
this._activeQuery = null
385+
this.readyForQuery = true
386+
this.emit('drain')
387+
}
388+
} else {
389+
const activeQuery = this._getActiveQuery()
390+
this._activeQuery = null
391+
this.readyForQuery = true
392+
if (activeQuery) {
393+
activeQuery.handleReadyForQuery(this.connection)
394+
}
362395
}
396+
363397
this._pulseQueryQueue()
364398
}
365399

@@ -538,6 +572,10 @@ class Client extends EventEmitter {
538572
}
539573

540574
_pulseQueryQueue() {
575+
if (this._pipelining) {
576+
return this._pipelinePulseQueryQueue()
577+
}
578+
541579
if (this.readyForQuery === true) {
542580
this._activeQuery = this._queryQueue.shift()
543581
const activeQuery = this._getActiveQuery()
@@ -591,6 +629,16 @@ class Client extends EventEmitter {
591629
}
592630
}
593631

632+
// In pipeline mode, only extended query protocol is allowed
633+
if (this._pipelining) {
634+
if (typeof config === 'string') {
635+
throw new Error('Simple query protocol is not allowed in pipeline mode. Use parameterized queries instead.')
636+
}
637+
if (query.text && query.text.includes(';') && query.text.trim().split(';').filter(s => s.trim()).length > 1) {
638+
throw new Error('Multiple SQL commands in a single query are not allowed in pipeline mode.')
639+
}
640+
}
641+
594642
if (readTimeout) {
595643
queryCallback = query.callback
596644

@@ -689,6 +737,104 @@ class Client extends EventEmitter {
689737
queryQueueDeprecationNotice()
690738
return this._queryQueue
691739
}
740+
741+
// Pipeline mode support
742+
get pipelining() {
743+
return this._pipelining
744+
}
745+
746+
set pipelining(value) {
747+
if (typeof value !== 'boolean') {
748+
throw new TypeError('pipelining must be a boolean')
749+
}
750+
751+
if (value && !this._connected) {
752+
throw new Error('Cannot enable pipelining before connection is established')
753+
}
754+
755+
if (value && this._getActiveQuery()) {
756+
throw new Error('Cannot enable pipelining while a query is active')
757+
}
758+
759+
if (value && !this._pipelining) {
760+
this._enterPipelineMode()
761+
} else if (!value && this._pipelining) {
762+
this._exitPipelineMode()
763+
}
764+
}
765+
766+
pipelineStatus() {
767+
return this._pipelining ? 'PIPELINE_ON' : 'PIPELINE_OFF'
768+
}
769+
770+
_enterPipelineMode() {
771+
if (this._pipelining) {
772+
return
773+
}
774+
775+
if (!this._connected) {
776+
throw new Error('Cannot enter pipeline mode before connection is established')
777+
}
778+
779+
if (this._getActiveQuery()) {
780+
throw new Error('Cannot enter pipeline mode while a query is active')
781+
}
782+
783+
this._pipelining = true
784+
this._pipelineQueue = []
785+
this._pipelineSync = false
786+
}
787+
788+
_exitPipelineMode() {
789+
if (!this._pipelining) {
790+
return
791+
}
792+
793+
// Send sync to end pipeline if we have pending queries
794+
if (this._pipelineQueue.length > 0 && !this._pipelineSync) {
795+
this.connection.sync()
796+
this._pipelineSync = true
797+
}
798+
799+
this._pipelining = false
800+
}
801+
802+
_pipelinePulseQueryQueue() {
803+
if (!this._pipelining) {
804+
return this._pulseQueryQueue()
805+
}
806+
807+
// In pipeline mode, send all queued queries immediately without waiting for responses
808+
while (this._queryQueue.length > 0) {
809+
const query = this._queryQueue.shift()
810+
this._pipelineQueue.push(query)
811+
812+
// Force extended query protocol for pipeline mode
813+
if (!query.requiresPreparation()) {
814+
query.queryMode = 'extended'
815+
}
816+
817+
const queryError = query.submit(this.connection)
818+
if (queryError) {
819+
process.nextTick(() => {
820+
query.handleError(queryError, this.connection)
821+
})
822+
// Remove failed query from pipeline queue
823+
const index = this._pipelineQueue.indexOf(query)
824+
if (index > -1) {
825+
this._pipelineQueue.splice(index, 1)
826+
}
827+
continue
828+
}
829+
}
830+
831+
// Set active query to first in pipeline if we don't have one
832+
if (!this._getActiveQuery() && this._pipelineQueue.length > 0) {
833+
this._activeQuery = this._pipelineQueue[0]
834+
}
835+
836+
this.readyForQuery = true
837+
}
692838
}
693839

694840
// expose a Query constructor

0 commit comments

Comments
 (0)