@@ -189,17 +189,17 @@ public void Dispose()
189189 /// </summary>
190190 private async Task RunChangeConsumptionLoopAsync ( )
191191 {
192- this . _logger . LogInformationWithThreadId ( $ "Starting change consumption loop. MaxBatchSize: { this . _maxBatchSize } PollingIntervalMs: { this . _pollingIntervalInMs } ") ;
192+ this . _logger . LogInformation ( $ "Starting change consumption loop. MaxBatchSize: { this . _maxBatchSize } PollingIntervalMs: { this . _pollingIntervalInMs } ") ;
193193
194194 try
195195 {
196196 CancellationToken token = this . _cancellationTokenSourceCheckForChanges . Token ;
197197
198198 using ( var connection = new SqlConnection ( this . _connectionString ) )
199199 {
200- this . _logger . LogDebugWithThreadId ( "BEGIN OpenChangeConsumptionConnection" ) ;
200+ this . _logger . LogDebug ( "BEGIN OpenChangeConsumptionConnection" ) ;
201201 await connection . OpenAsync ( token ) ;
202- this . _logger . LogDebugWithThreadId ( "END OpenChangeConsumptionConnection" ) ;
202+ this . _logger . LogDebug ( "END OpenChangeConsumptionConnection" ) ;
203203
204204 bool forceReconnect = false ;
205205 // Check for cancellation request only after a cycle of checking and processing of changes completes.
@@ -216,7 +216,7 @@ private async Task RunChangeConsumptionLoopAsync()
216216 {
217217 forceReconnect = false ;
218218 }
219- this . _logger . LogDebugWithThreadId ( $ "BEGIN ProcessingChanges State={ this . _state } ") ;
219+ this . _logger . LogDebug ( $ "BEGIN ProcessingChanges State={ this . _state } ") ;
220220
221221 try
222222 {
@@ -243,8 +243,8 @@ private async Task RunChangeConsumptionLoopAsync()
243243 this . _logger . LogError ( $ "Fatal SQL Client exception processing changes. Will attempt to reestablish connection in { this . _pollingIntervalInMs } ms. Exception = { e . Message } ") ;
244244 forceReconnect = true ;
245245 }
246- this . _logger . LogDebugWithThreadId ( "END ProcessingChanges" ) ;
247- this . _logger . LogDebugWithThreadId ( $ "Delaying for { this . _pollingIntervalInMs } ms") ;
246+ this . _logger . LogDebug ( "END ProcessingChanges" ) ;
247+ this . _logger . LogDebug ( $ "Delaying for { this . _pollingIntervalInMs } ms") ;
248248 await Task . Delay ( TimeSpan . FromMilliseconds ( this . _pollingIntervalInMs ) , token ) ;
249249 }
250250 }
@@ -276,7 +276,7 @@ private async Task RunChangeConsumptionLoopAsync()
276276 /// </summary>
277277 private async Task GetTableChangesAsync ( SqlConnection connection , CancellationToken token )
278278 {
279- this . _logger . LogDebugWithThreadId ( "BEGIN GetTableChanges" ) ;
279+ this . _logger . LogDebug ( "BEGIN GetTableChanges" ) ;
280280 try
281281 {
282282 var transactionSw = Stopwatch . StartNew ( ) ;
@@ -289,19 +289,19 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo
289289 // Update the version number stored in the global state table if necessary before using it.
290290 using ( SqlCommand updateTablesPreInvocationCommand = this . BuildUpdateTablesPreInvocation ( connection , transaction ) )
291291 {
292- this . _logger . LogDebugWithThreadId ( $ "BEGIN UpdateTablesPreInvocation Query={ updateTablesPreInvocationCommand . CommandText } ") ;
292+ this . _logger . LogDebug ( $ "BEGIN UpdateTablesPreInvocation Query={ updateTablesPreInvocationCommand . CommandText } ") ;
293293 var commandSw = Stopwatch . StartNew ( ) ;
294294 await updateTablesPreInvocationCommand . ExecuteNonQueryAsync ( token ) ;
295295 setLastSyncVersionDurationMs = commandSw . ElapsedMilliseconds ;
296296 }
297- this . _logger . LogDebugWithThreadId ( $ "END UpdateTablesPreInvocation Duration={ setLastSyncVersionDurationMs } ms") ;
297+ this . _logger . LogDebug ( $ "END UpdateTablesPreInvocation Duration={ setLastSyncVersionDurationMs } ms") ;
298298
299299 var rows = new List < IReadOnlyDictionary < string , object > > ( ) ;
300300
301301 // Use the version number to query for new changes.
302302 using ( SqlCommand getChangesCommand = this . BuildGetChangesCommand ( connection , transaction ) )
303303 {
304- this . _logger . LogDebugWithThreadId ( $ "BEGIN GetChanges Query={ getChangesCommand . CommandText } ") ;
304+ this . _logger . LogDebug ( $ "BEGIN GetChanges Query={ getChangesCommand . CommandText } ") ;
305305 var commandSw = Stopwatch . StartNew ( ) ;
306306
307307 using ( SqlDataReader reader = await getChangesCommand . ExecuteReaderAsync ( token ) )
@@ -314,19 +314,19 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo
314314
315315 getChangesDurationMs = commandSw . ElapsedMilliseconds ;
316316 }
317- this . _logger . LogDebugWithThreadId ( $ "END GetChanges Duration={ getChangesDurationMs } ms ChangedRows={ rows . Count } ") ;
317+ this . _logger . LogDebug ( $ "END GetChanges Duration={ getChangesDurationMs } ms ChangedRows={ rows . Count } ") ;
318318
319319 // If changes were found, acquire leases on them.
320320 if ( rows . Count > 0 )
321321 {
322322 using ( SqlCommand acquireLeasesCommand = this . BuildAcquireLeasesCommand ( connection , transaction , rows ) )
323323 {
324- this . _logger . LogDebugWithThreadId ( $ "BEGIN AcquireLeases Query={ acquireLeasesCommand . CommandText } ") ;
324+ this . _logger . LogDebug ( $ "BEGIN AcquireLeases Query={ acquireLeasesCommand . CommandText } ") ;
325325 var commandSw = Stopwatch . StartNew ( ) ;
326326 await acquireLeasesCommand . ExecuteNonQueryAsync ( token ) ;
327327 acquireLeasesDurationMs = commandSw . ElapsedMilliseconds ;
328328 }
329- this . _logger . LogDebugWithThreadId ( $ "END AcquireLeases Duration={ acquireLeasesDurationMs } ms") ;
329+ this . _logger . LogDebug ( $ "END AcquireLeases Duration={ acquireLeasesDurationMs } ms") ;
330330
331331 // Only send event if we got changes to reduce the overall number of events sent since we generally
332332 // only care about the times that we had to actually retrieve and process rows
@@ -375,12 +375,12 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo
375375 throw ;
376376 }
377377 }
378- this . _logger . LogDebugWithThreadId ( "END GetTableChanges" ) ;
378+ this . _logger . LogDebug ( "END GetTableChanges" ) ;
379379 }
380380
381381 private async Task ProcessTableChangesAsync ( )
382382 {
383- this . _logger . LogDebugWithThreadId ( "BEGIN ProcessTableChanges" ) ;
383+ this . _logger . LogDebug ( "BEGIN ProcessTableChanges" ) ;
384384 if ( this . _rowsToProcess . Count > 0 )
385385 {
386386 IReadOnlyList < SqlChange < T > > changes = null ;
@@ -407,7 +407,7 @@ private async Task ProcessTableChangesAsync()
407407 {
408408 var input = new TriggeredFunctionData ( ) { TriggerValue = changes } ;
409409
410- this . _logger . LogDebugWithThreadId ( "Executing triggered function" ) ;
410+ this . _logger . LogDebug ( "Executing triggered function" ) ;
411411 var stopwatch = Stopwatch . StartNew ( ) ;
412412
413413 FunctionResult result = await this . _executor . TryExecuteAsync ( input , this . _cancellationTokenSourceExecutor . Token ) ;
@@ -419,7 +419,7 @@ private async Task ProcessTableChangesAsync()
419419 } ;
420420 if ( result . Succeeded )
421421 {
422- this . _logger . LogDebugWithThreadId ( $ "Successfully triggered function. Duration={ durationMs } ms") ;
422+ this . _logger . LogDebug ( $ "Successfully triggered function. Duration={ durationMs } ms") ;
423423 TelemetryInstance . TrackEvent ( TelemetryEventName . TriggerFunction , this . _telemetryProps , measures ) ;
424424 // We've successfully fully processed these so set them to be released in the cleanup phase
425425 this . _rowsToRelease = this . _rowsToProcess ;
@@ -441,7 +441,7 @@ private async Task ProcessTableChangesAsync()
441441 // any we still ensure everything is reset to a clean state
442442 await this . ClearRowsAsync ( ) ;
443443 }
444- this . _logger . LogDebugWithThreadId ( "END ProcessTableChanges" ) ;
444+ this . _logger . LogDebug ( "END ProcessTableChanges" ) ;
445445 }
446446
447447 /// <summary>
@@ -458,9 +458,9 @@ private async void RunLeaseRenewalLoopAsync()
458458
459459 using ( var connection = new SqlConnection ( this . _connectionString ) )
460460 {
461- this . _logger . LogDebugWithThreadId ( "BEGIN OpenLeaseRenewalLoopConnection" ) ;
461+ this . _logger . LogDebug ( "BEGIN OpenLeaseRenewalLoopConnection" ) ;
462462 await connection . OpenAsync ( token ) ;
463- this . _logger . LogDebugWithThreadId ( "END OpenLeaseRenewalLoopConnection" ) ;
463+ this . _logger . LogDebug ( "END OpenLeaseRenewalLoopConnection" ) ;
464464
465465 bool forceReconnect = false ;
466466 while ( ! token . IsCancellationRequested )
@@ -509,9 +509,9 @@ private async void RunLeaseRenewalLoopAsync()
509509
510510 private async Task RenewLeasesAsync ( SqlConnection connection , CancellationToken token )
511511 {
512- this . _logger . LogDebugWithThreadId ( "BEGIN WaitRowsLock - RenewLeases" ) ;
512+ this . _logger . LogDebug ( "BEGIN WaitRowsLock - RenewLeases" ) ;
513513 await this . _rowsLock . WaitAsync ( token ) ;
514- this . _logger . LogDebugWithThreadId ( "END WaitRowsLock - RenewLeases" ) ;
514+ this . _logger . LogDebug ( "END WaitRowsLock - RenewLeases" ) ;
515515
516516 if ( this . _state == State . ProcessingChanges && this . _rowsToProcess . Count > 0 )
517517 {
@@ -522,13 +522,13 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken
522522 {
523523 using ( SqlCommand renewLeasesCommand = this . BuildRenewLeasesCommand ( connection , transaction ) )
524524 {
525- this . _logger . LogDebugWithThreadId ( $ "BEGIN RenewLeases Query={ renewLeasesCommand . CommandText } ") ;
525+ this . _logger . LogDebug ( $ "BEGIN RenewLeases Query={ renewLeasesCommand . CommandText } ") ;
526526 var stopwatch = Stopwatch . StartNew ( ) ;
527527
528528 int rowsAffected = await renewLeasesCommand . ExecuteNonQueryAsync ( token ) ;
529529
530530 long durationMs = stopwatch . ElapsedMilliseconds ;
531- this . _logger . LogDebugWithThreadId ( $ "END RenewLeases Duration={ durationMs } ms RowsAffected={ rowsAffected } ") ;
531+ this . _logger . LogDebug ( $ "END RenewLeases Duration={ durationMs } ms RowsAffected={ rowsAffected } ") ;
532532
533533 if ( rowsAffected > 0 )
534534 {
@@ -588,7 +588,7 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken
588588 }
589589
590590 // Want to always release the lock at the end, even if renewing the leases failed.
591- this . _logger . LogDebugWithThreadId ( "ReleaseRowsLock - RenewLeases" ) ;
591+ this . _logger . LogDebug ( "ReleaseRowsLock - RenewLeases" ) ;
592592 this . _rowsLock . Release ( ) ;
593593 }
594594
@@ -597,15 +597,15 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken
597597 /// </summary>
598598 private async Task ClearRowsAsync ( )
599599 {
600- this . _logger . LogDebugWithThreadId ( "BEGIN WaitRowsLock - ClearRows" ) ;
600+ this . _logger . LogDebug ( "BEGIN WaitRowsLock - ClearRows" ) ;
601601 await this . _rowsLock . WaitAsync ( ) ;
602- this . _logger . LogDebugWithThreadId ( "END WaitRowsLock - ClearRows" ) ;
602+ this . _logger . LogDebug ( "END WaitRowsLock - ClearRows" ) ;
603603
604604 this . _leaseRenewalCount = 0 ;
605605 this . _state = State . CheckingForChanges ;
606606 this . _rowsToProcess = new List < IReadOnlyDictionary < string , object > > ( ) ;
607607
608- this . _logger . LogDebugWithThreadId ( "ReleaseRowsLock - ClearRows" ) ;
608+ this . _logger . LogDebug ( "ReleaseRowsLock - ClearRows" ) ;
609609 this . _rowsLock . Release ( ) ;
610610 }
611611
@@ -632,22 +632,22 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke
632632 // Release the leases held on "_rowsToRelease".
633633 using ( SqlCommand releaseLeasesCommand = this . BuildReleaseLeasesCommand ( connection , transaction ) )
634634 {
635- this . _logger . LogDebugWithThreadId ( $ "BEGIN ReleaseLeases Query={ releaseLeasesCommand . CommandText } ") ;
635+ this . _logger . LogDebug ( $ "BEGIN ReleaseLeases Query={ releaseLeasesCommand . CommandText } ") ;
636636 var commandSw = Stopwatch . StartNew ( ) ;
637637 int rowsUpdated = await releaseLeasesCommand . ExecuteNonQueryAsync ( token ) ;
638638 releaseLeasesDurationMs = commandSw . ElapsedMilliseconds ;
639- this . _logger . LogDebugWithThreadId ( $ "END ReleaseLeases Duration={ releaseLeasesDurationMs } ms RowsUpdated={ rowsUpdated } ") ;
639+ this . _logger . LogDebug ( $ "END ReleaseLeases Duration={ releaseLeasesDurationMs } ms RowsUpdated={ rowsUpdated } ") ;
640640 }
641641
642642 // Update the global state table if we have processed all changes with ChangeVersion <= newLastSyncVersion,
643643 // and clean up the leases table to remove all rows with ChangeVersion <= newLastSyncVersion.
644644 using ( SqlCommand updateTablesPostInvocationCommand = this . BuildUpdateTablesPostInvocation ( connection , transaction , newLastSyncVersion ) )
645645 {
646- this . _logger . LogDebugWithThreadId ( $ "BEGIN UpdateTablesPostInvocation Query={ updateTablesPostInvocationCommand . CommandText } ") ;
646+ this . _logger . LogDebug ( $ "BEGIN UpdateTablesPostInvocation Query={ updateTablesPostInvocationCommand . CommandText } ") ;
647647 var commandSw = Stopwatch . StartNew ( ) ;
648648 await updateTablesPostInvocationCommand . ExecuteNonQueryAsync ( token ) ;
649649 updateLastSyncVersionDurationMs = commandSw . ElapsedMilliseconds ;
650- this . _logger . LogDebugWithThreadId ( $ "END UpdateTablesPostInvocation Duration={ updateLastSyncVersionDurationMs } ms") ;
650+ this . _logger . LogDebug ( $ "END UpdateTablesPostInvocation Duration={ updateLastSyncVersionDurationMs } ms") ;
651651 }
652652 transaction . Commit ( ) ;
653653
@@ -718,7 +718,7 @@ private long RecomputeLastSyncVersion()
718718 // Also this LastSyncVersion is actually updated in the GlobalState table only after verifying that the changes with
719719 // changeVersion <= newLastSyncVersion have been processed in BuildUpdateTablesPostInvocation query.
720720 long lastSyncVersion = changeVersionSet . ElementAt ( changeVersionSet . Count > 1 ? changeVersionSet . Count - 2 : 0 ) ;
721- this . _logger . LogDebugWithThreadId ( $ "RecomputeLastSyncVersion. LastSyncVersion={ lastSyncVersion } ChangeVersionSet={ string . Join ( "," , changeVersionSet ) } ") ;
721+ this . _logger . LogDebug ( $ "RecomputeLastSyncVersion. LastSyncVersion={ lastSyncVersion } ChangeVersionSet={ string . Join ( "," , changeVersionSet ) } ") ;
722722 return lastSyncVersion ;
723723 }
724724
@@ -730,7 +730,7 @@ private long RecomputeLastSyncVersion()
730730 /// <returns>The list of changes</returns>
731731 private IReadOnlyList < SqlChange < T > > ProcessChanges ( )
732732 {
733- this . _logger . LogDebugWithThreadId ( "BEGIN ProcessChanges" ) ;
733+ this . _logger . LogDebug ( "BEGIN ProcessChanges" ) ;
734734 var changes = new List < SqlChange < T > > ( ) ;
735735 foreach ( IReadOnlyDictionary < string , object > row in this . _rowsToProcess )
736736 {
@@ -744,7 +744,7 @@ private IReadOnlyList<SqlChange<T>> ProcessChanges()
744744
745745 changes . Add ( new SqlChange < T > ( operation , Utils . JsonDeserializeObject < T > ( Utils . JsonSerializeObject ( item ) ) ) ) ;
746746 }
747- this . _logger . LogDebugWithThreadId ( "END ProcessChanges" ) ;
747+ this . _logger . LogDebug ( "END ProcessChanges" ) ;
748748 return changes ;
749749 }
750750
0 commit comments