@@ -413,7 +413,31 @@ export class LogicalReplicationClient {
413413 return false ;
414414 }
415415
416- if ( await this . #doesPublicationExist( ) ) {
416+ const publicationExists = await this . #doesPublicationExist( ) ;
417+
418+ if ( publicationExists ) {
419+ // Validate the existing publication is correctly configured
420+ const validationError = await this . #validatePublicationConfiguration( ) ;
421+
422+ if ( validationError ) {
423+ this . logger . error ( "Publication exists but is misconfigured" , {
424+ name : this . options . name ,
425+ table : this . options . table ,
426+ slotName : this . options . slotName ,
427+ publicationName : this . options . publicationName ,
428+ error : validationError ,
429+ } ) ;
430+
431+ this . events . emit ( "error" , new LogicalReplicationClientError ( validationError ) ) ;
432+ return false ;
433+ }
434+
435+ this . logger . info ( "Publication exists and is correctly configured" , {
436+ name : this . options . name ,
437+ table : this . options . table ,
438+ publicationName : this . options . publicationName ,
439+ } ) ;
440+
417441 return true ;
418442 }
419443
@@ -459,6 +483,90 @@ export class LogicalReplicationClient {
459483 return res . rows [ 0 ] . exists ;
460484 }
461485
486+ async #validatePublicationConfiguration( ) : Promise < string | null > {
487+ if ( ! this . client ) {
488+ return "Cannot validate publication configuration: client not connected" ;
489+ }
490+
491+ // Check if the publication has the correct table
492+ const tablesRes = await this . client . query (
493+ `SELECT schemaname, tablename
494+ FROM pg_publication_tables
495+ WHERE pubname = '${ this . options . publicationName } ';`
496+ ) ;
497+
498+ const tables = tablesRes . rows ;
499+ const expectedTable = this . options . table ;
500+
501+ // Check if the table is in the publication
502+ const hasTable = tables . some (
503+ ( row ) => row . tablename === expectedTable && row . schemaname === "public"
504+ ) ;
505+
506+ if ( ! hasTable ) {
507+ if ( tables . length === 0 ) {
508+ return `Publication '${ this . options . publicationName } ' exists but has NO TABLES configured. Expected table: "public.${ expectedTable } ". Run: ALTER PUBLICATION ${ this . options . publicationName } ADD TABLE "${ expectedTable } ";` ;
509+ } else {
510+ const tableList = tables . map ( ( t ) => `"${ t . schemaname } "."${ t . tablename } "` ) . join ( ", " ) ;
511+ return `Publication '${ this . options . publicationName } ' exists but does not include the required table "public.${ expectedTable } ". Current tables: ${ tableList } . Run: ALTER PUBLICATION ${ this . options . publicationName } ADD TABLE "${ expectedTable } ";` ;
512+ }
513+ }
514+
515+ // Check if the publication has the correct actions configured
516+ if ( this . options . publicationActions && this . options . publicationActions . length > 0 ) {
517+ const actionsRes = await this . client . query (
518+ `SELECT pubinsert, pubupdate, pubdelete, pubtruncate
519+ FROM pg_publication
520+ WHERE pubname = '${ this . options . publicationName } ';`
521+ ) ;
522+
523+ if ( actionsRes . rows . length === 0 ) {
524+ return `Publication '${ this . options . publicationName } ' not found when checking actions` ;
525+ }
526+
527+ const actualActions = actionsRes . rows [ 0 ] ;
528+ const missingActions : string [ ] = [ ] ;
529+
530+ for ( const action of this . options . publicationActions ) {
531+ switch ( action ) {
532+ case "insert" :
533+ if ( ! actualActions . pubinsert ) missingActions . push ( "insert" ) ;
534+ break ;
535+ case "update" :
536+ if ( ! actualActions . pubupdate ) missingActions . push ( "update" ) ;
537+ break ;
538+ case "delete" :
539+ if ( ! actualActions . pubdelete ) missingActions . push ( "delete" ) ;
540+ break ;
541+ case "truncate" :
542+ if ( ! actualActions . pubtruncate ) missingActions . push ( "truncate" ) ;
543+ break ;
544+ }
545+ }
546+
547+ if ( missingActions . length > 0 ) {
548+ const currentActions : string [ ] = [ ] ;
549+ if ( actualActions . pubinsert ) currentActions . push ( "insert" ) ;
550+ if ( actualActions . pubupdate ) currentActions . push ( "update" ) ;
551+ if ( actualActions . pubdelete ) currentActions . push ( "delete" ) ;
552+ if ( actualActions . pubtruncate ) currentActions . push ( "truncate" ) ;
553+
554+ return `Publication '${
555+ this . options . publicationName
556+ } ' is missing required actions. Expected: [${ this . options . publicationActions . join (
557+ ", "
558+ ) } ], Current: [${ currentActions . join ( ", " ) } ], Missing: [${ missingActions . join (
559+ ", "
560+ ) } ]. Run: ALTER PUBLICATION ${
561+ this . options . publicationName
562+ } SET (publish = '${ this . options . publicationActions . join ( ", " ) } ');`;
563+ }
564+ }
565+
566+ // All validations passed
567+ return null ;
568+ }
569+
462570 async #createSlot( ) : Promise < boolean > {
463571 if ( ! this . client ) {
464572 this . events . emit ( "error" , new LogicalReplicationClientError ( "Cannot create slot" ) ) ;
0 commit comments