@@ -43,6 +43,9 @@ internal sealed class SqlTableChangeMonitor<T> : IDisposable
4343 private const int LeaseIntervalInSeconds = 60 ;
4444 private const int LeaseRenewalIntervalInSeconds = 15 ;
4545 private const int MaxRetryReleaseLeases = 3 ;
46+
47+ public const int DefaultBatchSize = 100 ;
48+ public const int DefaultPollingIntervalMs = 1000 ;
4649 #endregion Constants
4750
4851 private readonly string _connectionString ;
@@ -58,11 +61,11 @@ internal sealed class SqlTableChangeMonitor<T> : IDisposable
5861 /// <summary>
5962 /// Number of changes to process in each iteration of the loop
6063 /// </summary>
61- private readonly int _batchSize = 10 ;
64+ private readonly int _batchSize = DefaultBatchSize ;
6265 /// <summary>
6366 /// Delay in ms between processing each batch of changes
6467 /// </summary>
65- private readonly int _pollingIntervalInMs = 5000 ;
68+ private readonly int _pollingIntervalInMs = DefaultPollingIntervalMs ;
6669
6770 private readonly CancellationTokenSource _cancellationTokenSourceCheckForChanges = new CancellationTokenSource ( ) ;
6871 private readonly CancellationTokenSource _cancellationTokenSourceRenewLeases = new CancellationTokenSource ( ) ;
@@ -117,16 +120,31 @@ public SqlTableChangeMonitor(
117120 this . _logger = logger ?? throw new ArgumentNullException ( nameof ( logger ) ) ;
118121
119122 this . _userTableId = userTableId ;
123+ this . _telemetryProps = telemetryProps ?? new Dictionary < TelemetryPropertyName , string > ( ) ;
124+
120125 // Check if there's config settings to override the default batch size/polling interval values
121- this . _batchSize = configuration . GetValue < int ? > ( SqlTriggerConstants . ConfigKey_SqlTrigger_BatchSize ) ?? this . _batchSize ;
122- this . _pollingIntervalInMs = configuration . GetValue < int ? > ( SqlTriggerConstants . ConfigKey_SqlTrigger_PollingInterval ) ?? this . _pollingIntervalInMs ;
126+ int ? configuredBatchSize = configuration . GetValue < int ? > ( SqlTriggerConstants . ConfigKey_SqlTrigger_BatchSize ) ;
127+ int ? configuredPollingInterval = configuration . GetValue < int ? > ( SqlTriggerConstants . ConfigKey_SqlTrigger_BatchSize ) ;
128+ this . _batchSize = configuredBatchSize ?? this . _batchSize ;
129+ this . _pollingIntervalInMs = configuredPollingInterval ?? this . _pollingIntervalInMs ;
130+ var monitorStartProps = new Dictionary < TelemetryPropertyName , string > ( telemetryProps )
131+ {
132+ { TelemetryPropertyName . HasConfiguredBatchSize , ( configuredBatchSize != null ) . ToString ( ) } ,
133+ { TelemetryPropertyName . HasConfiguredPollingInterval , ( configuredPollingInterval != null ) . ToString ( ) } ,
134+ } ;
135+ TelemetryInstance . TrackEvent (
136+ TelemetryEventName . TriggerMonitorStart ,
137+ monitorStartProps ,
138+ new Dictionary < TelemetryMeasureName , double > ( ) {
139+ { TelemetryMeasureName . BatchSize , this . _batchSize } ,
140+ { TelemetryMeasureName . PollingIntervalMs , this . _pollingIntervalInMs }
141+ } ) ;
142+
123143 // Prep search-conditions that will be used besides WHERE clause to match table rows.
124144 this . _rowMatchConditions = Enumerable . Range ( 0 , this . _batchSize )
125145 . Select ( rowIndex => string . Join ( " AND " , this . _primaryKeyColumns . Select ( ( col , colIndex ) => $ "{ col . AsBracketQuotedString ( ) } = @{ rowIndex } _{ colIndex } ") ) )
126146 . ToList ( ) ;
127147
128- this . _telemetryProps = telemetryProps ?? new Dictionary < TelemetryPropertyName , string > ( ) ;
129-
130148#pragma warning disable CS4014 // Queue the below tasks and exit. Do not wait for their completion.
131149 _ = Task . Run ( ( ) =>
132150 {
@@ -591,8 +609,8 @@ private long RecomputeLastSyncVersion()
591609 // have leases acquired on them by another worker.
592610 // Therefore, if there are more than one version numbers in the set, return the second highest one. Otherwise, return
593611 // the only version number in the set.
594- // Also this LastSyncVersion is actually updated in the GlobalState table only after verifying that the changes with
595- // changeVersion <= newLastSyncVersion have been processed in BuildUpdateTablesPostInvocation query.
612+ // Also this LastSyncVersion is actually updated in the GlobalState table only after verifying that the changes with
613+ // changeVersion <= newLastSyncVersion have been processed in BuildUpdateTablesPostInvocation query.
596614 long lastSyncVersion = changeVersionSet . ElementAt ( changeVersionSet . Count > 1 ? changeVersionSet . Count - 2 : 0 ) ;
597615 this . _logger . LogDebugWithThreadId ( $ "RecomputeLastSyncVersion. LastSyncVersion={ lastSyncVersion } ChangeVersionSet={ string . Join ( "," , changeVersionSet ) } ") ;
598616 return lastSyncVersion ;
0 commit comments