@@ -189,17 +189,15 @@ public void Dispose()
189189 /// </summary>
190190 private async Task RunChangeConsumptionLoopAsync ( )
191191 {
192- this . _logger . LogInformation ( $ "Starting change consumption loop. MaxBatchSize: { this . _maxBatchSize } PollingIntervalMs: { this . _pollingIntervalInMs } ") ;
192+ this . _logger . LogDebug ( $ "Starting change consumption loop. MaxBatchSize: { this . _maxBatchSize } PollingIntervalMs: { this . _pollingIntervalInMs } ") ;
193193
194194 try
195195 {
196196 CancellationToken token = this . _cancellationTokenSourceCheckForChanges . Token ;
197197
198198 using ( var connection = new SqlConnection ( this . _connectionString ) )
199199 {
200- this . _logger . LogDebug ( "BEGIN OpenChangeConsumptionConnection" ) ;
201200 await connection . OpenAsync ( token ) ;
202- this . _logger . LogDebug ( "END OpenChangeConsumptionConnection" ) ;
203201
204202 bool forceReconnect = false ;
205203 // Check for cancellation request only after a cycle of checking and processing of changes completes.
@@ -216,7 +214,6 @@ private async Task RunChangeConsumptionLoopAsync()
216214 {
217215 forceReconnect = false ;
218216 }
219- this . _logger . LogDebug ( $ "BEGIN ProcessingChanges State={ this . _state } ") ;
220217
221218 try
222219 {
@@ -243,8 +240,6 @@ private async Task RunChangeConsumptionLoopAsync()
243240 this . _logger . LogError ( $ "Fatal SQL Client exception processing changes. Will attempt to reestablish connection in { this . _pollingIntervalInMs } ms. Exception = { e . Message } ") ;
244241 forceReconnect = true ;
245242 }
246- this . _logger . LogDebug ( "END ProcessingChanges" ) ;
247- this . _logger . LogDebug ( $ "Delaying for { this . _pollingIntervalInMs } ms") ;
248243 await Task . Delay ( TimeSpan . FromMilliseconds ( this . _pollingIntervalInMs ) , token ) ;
249244 }
250245 }
@@ -276,7 +271,6 @@ private async Task RunChangeConsumptionLoopAsync()
276271 /// </summary>
277272 private async Task GetTableChangesAsync ( SqlConnection connection , CancellationToken token )
278273 {
279- this . _logger . LogDebug ( "BEGIN GetTableChanges" ) ;
280274 try
281275 {
282276 var transactionSw = Stopwatch . StartNew ( ) ;
@@ -289,19 +283,16 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo
289283 // Update the version number stored in the global state table if necessary before using it.
290284 using ( SqlCommand updateTablesPreInvocationCommand = this . BuildUpdateTablesPreInvocation ( connection , transaction ) )
291285 {
292- this . _logger . LogDebug ( $ "BEGIN UpdateTablesPreInvocation Query={ updateTablesPreInvocationCommand . CommandText } ") ;
293286 var commandSw = Stopwatch . StartNew ( ) ;
294- await updateTablesPreInvocationCommand . ExecuteNonQueryAsync ( token ) ;
287+ await updateTablesPreInvocationCommand . ExecuteNonQueryAsyncWithLogging ( this . _logger , token ) ;
295288 setLastSyncVersionDurationMs = commandSw . ElapsedMilliseconds ;
296289 }
297- this . _logger . LogDebug ( $ "END UpdateTablesPreInvocation Duration={ setLastSyncVersionDurationMs } ms") ;
298290
299291 var rows = new List < IReadOnlyDictionary < string , object > > ( ) ;
300292
301293 // Use the version number to query for new changes.
302294 using ( SqlCommand getChangesCommand = this . BuildGetChangesCommand ( connection , transaction ) )
303295 {
304- this . _logger . LogDebug ( $ "BEGIN GetChanges Query={ getChangesCommand . CommandText } ") ;
305296 var commandSw = Stopwatch . StartNew ( ) ;
306297
307298 using ( SqlDataReader reader = await getChangesCommand . ExecuteReaderAsync ( token ) )
@@ -314,19 +305,16 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo
314305
315306 getChangesDurationMs = commandSw . ElapsedMilliseconds ;
316307 }
317- this . _logger . LogDebug ( $ "END GetChanges Duration={ getChangesDurationMs } ms ChangedRows={ rows . Count } ") ;
318308
319309 // If changes were found, acquire leases on them.
320310 if ( rows . Count > 0 )
321311 {
322312 using ( SqlCommand acquireLeasesCommand = this . BuildAcquireLeasesCommand ( connection , transaction , rows ) )
323313 {
324- this . _logger . LogDebug ( $ "BEGIN AcquireLeases Query={ acquireLeasesCommand . CommandText } ") ;
325314 var commandSw = Stopwatch . StartNew ( ) ;
326- await acquireLeasesCommand . ExecuteNonQueryAsync ( token ) ;
315+ await acquireLeasesCommand . ExecuteNonQueryAsyncWithLogging ( this . _logger , token ) ;
327316 acquireLeasesDurationMs = commandSw . ElapsedMilliseconds ;
328317 }
329- this . _logger . LogDebug ( $ "END AcquireLeases Duration={ acquireLeasesDurationMs } ms") ;
330318
331319 // Only send event if we got changes to reduce the overall number of events sent since we generally
332320 // only care about the times that we had to actually retrieve and process rows
@@ -375,12 +363,10 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo
375363 throw ;
376364 }
377365 }
378- this . _logger . LogDebug ( "END GetTableChanges" ) ;
379366 }
380367
381368 private async Task ProcessTableChangesAsync ( )
382369 {
383- this . _logger . LogDebug ( "BEGIN ProcessTableChanges" ) ;
384370 if ( this . _rowsToProcess . Count > 0 )
385371 {
386372 IReadOnlyList < SqlChange < T > > changes = null ;
@@ -407,7 +393,6 @@ private async Task ProcessTableChangesAsync()
407393 {
408394 var input = new TriggeredFunctionData ( ) { TriggerValue = changes } ;
409395
410- this . _logger . LogDebug ( "Executing triggered function" ) ;
411396 var stopwatch = Stopwatch . StartNew ( ) ;
412397
413398 FunctionResult result = await this . _executor . TryExecuteAsync ( input , this . _cancellationTokenSourceExecutor . Token ) ;
@@ -419,7 +404,6 @@ private async Task ProcessTableChangesAsync()
419404 } ;
420405 if ( result . Succeeded )
421406 {
422- this . _logger . LogDebug ( $ "Successfully triggered function. Duration={ durationMs } ms") ;
423407 TelemetryInstance . TrackEvent ( TelemetryEventName . TriggerFunction , this . _telemetryProps , measures ) ;
424408 // We've successfully fully processed these so set them to be released in the cleanup phase
425409 this . _rowsToRelease = this . _rowsToProcess ;
@@ -429,7 +413,6 @@ private async Task ProcessTableChangesAsync()
429413 {
430414 // In the future might make sense to retry executing the function, but for now we just let
431415 // another worker try.
432- this . _logger . LogError ( $ "Failed to trigger user function for table: '{ this . _userTable . FullName } due to exception: { result . Exception . GetType ( ) } . Exception message: { result . Exception . Message } ") ;
433416 TelemetryInstance . TrackException ( TelemetryErrorName . TriggerFunction , result . Exception , this . _telemetryProps , measures ) ;
434417 }
435418 this . _state = State . Cleanup ;
@@ -441,7 +424,6 @@ private async Task ProcessTableChangesAsync()
441424 // any we still ensure everything is reset to a clean state
442425 await this . ClearRowsAsync ( ) ;
443426 }
444- this . _logger . LogDebug ( "END ProcessTableChanges" ) ;
445427 }
446428
447429 /// <summary>
@@ -450,17 +432,15 @@ private async Task ProcessTableChangesAsync()
450432 /// </summary>
451433 private async void RunLeaseRenewalLoopAsync ( )
452434 {
453- this . _logger . LogInformation ( "Starting lease renewal loop." ) ;
435+ this . _logger . LogDebug ( "Starting lease renewal loop." ) ;
454436
455437 try
456438 {
457439 CancellationToken token = this . _cancellationTokenSourceRenewLeases . Token ;
458440
459441 using ( var connection = new SqlConnection ( this . _connectionString ) )
460442 {
461- this . _logger . LogDebug ( "BEGIN OpenLeaseRenewalLoopConnection" ) ;
462443 await connection . OpenAsync ( token ) ;
463- this . _logger . LogDebug ( "END OpenLeaseRenewalLoopConnection" ) ;
464444
465445 bool forceReconnect = false ;
466446 while ( ! token . IsCancellationRequested )
@@ -509,9 +489,7 @@ private async void RunLeaseRenewalLoopAsync()
509489
510490 private async Task RenewLeasesAsync ( SqlConnection connection , CancellationToken token )
511491 {
512- this . _logger . LogDebug ( "BEGIN WaitRowsLock - RenewLeases" ) ;
513492 await this . _rowsLock . WaitAsync ( token ) ;
514- this . _logger . LogDebug ( "END WaitRowsLock - RenewLeases" ) ;
515493
516494 if ( this . _state == State . ProcessingChanges && this . _rowsToProcess . Count > 0 )
517495 {
@@ -522,13 +500,11 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken
522500 {
523501 using ( SqlCommand renewLeasesCommand = this . BuildRenewLeasesCommand ( connection , transaction ) )
524502 {
525- this . _logger . LogDebug ( $ "BEGIN RenewLeases Query={ renewLeasesCommand . CommandText } ") ;
526503 var stopwatch = Stopwatch . StartNew ( ) ;
527504
528- int rowsAffected = await renewLeasesCommand . ExecuteNonQueryAsync ( token ) ;
505+ int rowsAffected = await renewLeasesCommand . ExecuteNonQueryAsyncWithLogging ( this . _logger , token ) ;
529506
530507 long durationMs = stopwatch . ElapsedMilliseconds ;
531- this . _logger . LogDebug ( $ "END RenewLeases Duration={ durationMs } ms RowsAffected={ rowsAffected } ") ;
532508
533509 if ( rowsAffected > 0 )
534510 {
@@ -588,7 +564,6 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken
588564 }
589565
590566 // Want to always release the lock at the end, even if renewing the leases failed.
591- this . _logger . LogDebug ( "ReleaseRowsLock - RenewLeases" ) ;
592567 this . _rowsLock . Release ( ) ;
593568 }
594569
@@ -597,15 +572,12 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken
597572 /// </summary>
598573 private async Task ClearRowsAsync ( )
599574 {
600- this . _logger . LogDebug ( "BEGIN WaitRowsLock - ClearRows" ) ;
601575 await this . _rowsLock . WaitAsync ( ) ;
602- this . _logger . LogDebug ( "END WaitRowsLock - ClearRows" ) ;
603576
604577 this . _leaseRenewalCount = 0 ;
605578 this . _state = State . CheckingForChanges ;
606579 this . _rowsToProcess = new List < IReadOnlyDictionary < string , object > > ( ) ;
607580
608- this . _logger . LogDebug ( "ReleaseRowsLock - ClearRows" ) ;
609581 this . _rowsLock . Release ( ) ;
610582 }
611583
@@ -632,22 +604,18 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke
632604 // Release the leases held on "_rowsToRelease".
633605 using ( SqlCommand releaseLeasesCommand = this . BuildReleaseLeasesCommand ( connection , transaction ) )
634606 {
635- this . _logger . LogDebug ( $ "BEGIN ReleaseLeases Query={ releaseLeasesCommand . CommandText } ") ;
636607 var commandSw = Stopwatch . StartNew ( ) ;
637- int rowsUpdated = await releaseLeasesCommand . ExecuteNonQueryAsync ( token ) ;
608+ int rowsUpdated = await releaseLeasesCommand . ExecuteNonQueryAsyncWithLogging ( this . _logger , token ) ;
638609 releaseLeasesDurationMs = commandSw . ElapsedMilliseconds ;
639- this . _logger . LogDebug ( $ "END ReleaseLeases Duration={ releaseLeasesDurationMs } ms RowsUpdated={ rowsUpdated } ") ;
640610 }
641611
642612 // Update the global state table if we have processed all changes with ChangeVersion <= newLastSyncVersion,
643613 // and clean up the leases table to remove all rows with ChangeVersion <= newLastSyncVersion.
644614 using ( SqlCommand updateTablesPostInvocationCommand = this . BuildUpdateTablesPostInvocation ( connection , transaction , newLastSyncVersion ) )
645615 {
646- this . _logger . LogDebug ( $ "BEGIN UpdateTablesPostInvocation Query={ updateTablesPostInvocationCommand . CommandText } ") ;
647616 var commandSw = Stopwatch . StartNew ( ) ;
648- await updateTablesPostInvocationCommand . ExecuteNonQueryAsync ( token ) ;
617+ await updateTablesPostInvocationCommand . ExecuteNonQueryAsyncWithLogging ( this . _logger , token ) ;
649618 updateLastSyncVersionDurationMs = commandSw . ElapsedMilliseconds ;
650- this . _logger . LogDebug ( $ "END UpdateTablesPostInvocation Duration={ updateLastSyncVersionDurationMs } ms") ;
651619 }
652620 transaction . Commit ( ) ;
653621
@@ -717,9 +685,7 @@ private long RecomputeLastSyncVersion()
717685 // the only version number in the set.
718686 // Also this LastSyncVersion is actually updated in the GlobalState table only after verifying that the changes with
719687 // changeVersion <= newLastSyncVersion have been processed in BuildUpdateTablesPostInvocation query.
720- long lastSyncVersion = changeVersionSet . ElementAt ( changeVersionSet . Count > 1 ? changeVersionSet . Count - 2 : 0 ) ;
721- this . _logger . LogDebug ( $ "RecomputeLastSyncVersion. LastSyncVersion={ lastSyncVersion } ChangeVersionSet={ string . Join ( "," , changeVersionSet ) } ") ;
722- return lastSyncVersion ;
688+ return changeVersionSet . ElementAt ( changeVersionSet . Count > 1 ? changeVersionSet . Count - 2 : 0 ) ;
723689 }
724690
725691 /// <summary>
@@ -730,7 +696,6 @@ private long RecomputeLastSyncVersion()
730696 /// <returns>The list of changes</returns>
731697 private IReadOnlyList < SqlChange < T > > ProcessChanges ( )
732698 {
733- this . _logger . LogDebug ( "BEGIN ProcessChanges" ) ;
734699 var changes = new List < SqlChange < T > > ( ) ;
735700 foreach ( IReadOnlyDictionary < string , object > row in this . _rowsToProcess )
736701 {
@@ -744,7 +709,6 @@ private IReadOnlyList<SqlChange<T>> ProcessChanges()
744709
745710 changes . Add ( new SqlChange < T > ( operation , Utils . JsonDeserializeObject < T > ( Utils . JsonSerializeObject ( item ) ) ) ) ;
746711 }
747- this . _logger . LogDebug ( "END ProcessChanges" ) ;
748712 return changes ;
749713 }
750714
0 commit comments