Skip to content

Commit a3b83a8

Browse files
Use transaction for all queries (#489)
* Use transaction for renewing leases * more * events * Add comment
1 parent 0575ed9 commit a3b83a8

File tree

2 files changed

+92
-53
lines changed

2 files changed

+92
-53
lines changed

src/Telemetry/Telemetry.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,7 @@ public enum TelemetryErrorName
422422
GetPrimaryKeys,
423423
GetScaleStatus,
424424
GetUnprocessedChangeCount,
425+
GetUnprocessedChangeCountRollback,
425426
InvalidConfigurationValue,
426427
MissingPrimaryKeys,
427428
NoPrimaryKeys,
@@ -432,6 +433,7 @@ public enum TelemetryErrorName
432433
ReleaseLeasesRollback,
433434
RenewLeases,
434435
RenewLeasesLoop,
436+
RenewLeasesRollback,
435437
StartListener,
436438
Upsert,
437439
UpsertRollback,

src/TriggerBinding/SqlTableChangeMonitor.cs

Lines changed: 90 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -185,15 +185,37 @@ public async Task<long> GetUnprocessedChangeCountAsync()
185185
await connection.OpenAsync();
186186
this._logger.LogDebugWithThreadId("END OpenGetUnprocessedChangesConnection");
187187

188-
using (SqlCommand getUnprocessedChangesCommand = this.BuildGetUnprocessedChangesCommand(connection))
188+
// Use a transaction to automatically release the app lock when we're done executing the query
189+
using (SqlTransaction transaction = connection.BeginTransaction(IsolationLevel.RepeatableRead))
189190
{
190-
this._logger.LogDebugWithThreadId($"BEGIN GetUnprocessedChangeCount Query={getUnprocessedChangesCommand.CommandText}");
191-
var commandSw = Stopwatch.StartNew();
192-
unprocessedChangeCount = (long)await getUnprocessedChangesCommand.ExecuteScalarAsync();
193-
getUnprocessedChangesDurationMs = commandSw.ElapsedMilliseconds;
194-
}
191+
try
192+
{
193+
using (SqlCommand getUnprocessedChangesCommand = this.BuildGetUnprocessedChangesCommand(connection, transaction))
194+
{
195+
this._logger.LogInformation("Getting change count");
196+
this._logger.LogDebugWithThreadId($"BEGIN GetUnprocessedChangeCount Query={getUnprocessedChangesCommand.CommandText}");
197+
var commandSw = Stopwatch.StartNew();
198+
unprocessedChangeCount = (long)await getUnprocessedChangesCommand.ExecuteScalarAsync();
199+
getUnprocessedChangesDurationMs = commandSw.ElapsedMilliseconds;
200+
}
195201

196-
this._logger.LogDebugWithThreadId($"END GetUnprocessedChangeCount Duration={getUnprocessedChangesDurationMs}ms Count={unprocessedChangeCount}");
202+
this._logger.LogDebugWithThreadId($"END GetUnprocessedChangeCount Duration={getUnprocessedChangesDurationMs}ms Count={unprocessedChangeCount}");
203+
transaction.Commit();
204+
}
205+
catch (Exception)
206+
{
207+
try
208+
{
209+
transaction.Rollback();
210+
}
211+
catch (Exception ex2)
212+
{
213+
this._logger.LogError($"GetUnprocessedChangeCount : Failed to rollback transaction due to exception: {ex2.GetType()}. Exception message: {ex2.Message}");
214+
TelemetryInstance.TrackException(TelemetryErrorName.GetUnprocessedChangeCountRollback, ex2, this._telemetryProps);
215+
}
216+
throw;
217+
}
218+
}
197219
}
198220

199221
var measures = new Dictionary<TelemetryMeasureName, double>
@@ -483,56 +505,69 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken
483505

484506
if (this._state == State.ProcessingChanges)
485507
{
486-
try
508+
// Use a transaction to automatically release the app lock when we're done executing the query
509+
using (SqlTransaction transaction = connection.BeginTransaction(IsolationLevel.RepeatableRead))
487510
{
488-
// I don't think I need a transaction for renewing leases. If this worker reads in a row from the
489-
// leases table and determines that it corresponds to its batch of changes, but then that row gets
490-
// deleted by a cleanup task, it shouldn't renew the lease on it anyways.
491-
using (SqlCommand renewLeasesCommand = this.BuildRenewLeasesCommand(connection))
511+
try
492512
{
493-
TelemetryInstance.TrackEvent(TelemetryEventName.RenewLeasesStart, this._telemetryProps);
494-
this._logger.LogDebugWithThreadId($"BEGIN RenewLeases Query={renewLeasesCommand.CommandText}");
495-
var stopwatch = Stopwatch.StartNew();
513+
using (SqlCommand renewLeasesCommand = this.BuildRenewLeasesCommand(connection, transaction))
514+
{
515+
TelemetryInstance.TrackEvent(TelemetryEventName.RenewLeasesStart, this._telemetryProps);
516+
this._logger.LogDebugWithThreadId($"BEGIN RenewLeases Query={renewLeasesCommand.CommandText}");
517+
var stopwatch = Stopwatch.StartNew();
496518

497-
await renewLeasesCommand.ExecuteNonQueryAsync(token);
519+
await renewLeasesCommand.ExecuteNonQueryAsync(token);
498520

499-
long durationMs = stopwatch.ElapsedMilliseconds;
500-
this._logger.LogDebugWithThreadId($"END RenewLeases Duration={durationMs}ms");
501-
var measures = new Dictionary<TelemetryMeasureName, double>
502-
{
503-
[TelemetryMeasureName.DurationMs] = durationMs,
504-
};
521+
long durationMs = stopwatch.ElapsedMilliseconds;
522+
this._logger.LogDebugWithThreadId($"END RenewLeases Duration={durationMs}ms");
523+
var measures = new Dictionary<TelemetryMeasureName, double>
524+
{
525+
[TelemetryMeasureName.DurationMs] = durationMs,
526+
};
527+
528+
TelemetryInstance.TrackEvent(TelemetryEventName.RenewLeasesEnd, this._telemetryProps, measures);
505529

506-
TelemetryInstance.TrackEvent(TelemetryEventName.RenewLeasesEnd, this._telemetryProps, measures);
530+
transaction.Commit();
531+
}
507532
}
508-
}
509-
catch (Exception e)
510-
{
511-
// This catch block is necessary so that the finally block is executed even in the case of an exception
512-
// (see https://docs.microsoft.com/dotnet/csharp/language-reference/keywords/try-finally, third
513-
// paragraph). If we fail to renew the leases, multiple workers could be processing the same change
514-
// data, but we have functionality in place to deal with this (see design doc).
515-
this._logger.LogError($"Failed to renew leases due to exception: {e.GetType()}. Exception message: {e.Message}");
516-
TelemetryInstance.TrackException(TelemetryErrorName.RenewLeases, e, this._telemetryProps);
517-
}
518-
finally
519-
{
520-
// Do we want to update this count even in the case of a failure to renew the leases? Probably,
521-
// because the count is simply meant to indicate how much time the other thread has spent processing
522-
// changes essentially.
523-
this._leaseRenewalCount += 1;
524-
525-
// If this thread has been cancelled, then the _cancellationTokenSourceExecutor could have already
526-
// been disposed so shouldn't cancel it.
527-
if (this._leaseRenewalCount == MaxLeaseRenewalCount && !token.IsCancellationRequested)
533+
catch (Exception e)
534+
{
535+
// This catch block is necessary so that the finally block is executed even in the case of an exception
536+
// (see https://docs.microsoft.com/dotnet/csharp/language-reference/keywords/try-finally, third
537+
// paragraph). If we fail to renew the leases, multiple workers could be processing the same change
538+
// data, but we have functionality in place to deal with this (see design doc).
539+
this._logger.LogError($"Failed to renew leases due to exception: {e.GetType()}. Exception message: {e.Message}");
540+
TelemetryInstance.TrackException(TelemetryErrorName.RenewLeases, e, this._telemetryProps);
541+
542+
try
543+
{
544+
transaction.Rollback();
545+
}
546+
catch (Exception e2)
547+
{
548+
this._logger.LogError($"RenewLeases - Failed to rollback transaction due to exception: {e2.GetType()}. Exception message: {e2.Message}");
549+
TelemetryInstance.TrackException(TelemetryErrorName.RenewLeasesRollback, e2, this._telemetryProps);
550+
}
551+
}
552+
finally
528553
{
529-
this._logger.LogWarning("Call to execute the function (TryExecuteAsync) seems to be stuck, so it is being cancelled");
554+
// Do we want to update this count even in the case of a failure to renew the leases? Probably,
555+
// because the count is simply meant to indicate how much time the other thread has spent processing
556+
// changes essentially.
557+
this._leaseRenewalCount += 1;
558+
559+
// If this thread has been cancelled, then the _cancellationTokenSourceExecutor could have already
560+
// been disposed so shouldn't cancel it.
561+
if (this._leaseRenewalCount == MaxLeaseRenewalCount && !token.IsCancellationRequested)
562+
{
563+
this._logger.LogWarning("Call to execute the function (TryExecuteAsync) seems to be stuck, so it is being cancelled");
530564

531-
// If we keep renewing the leases, the thread responsible for processing the changes is stuck.
532-
// If it's stuck, it has to be stuck in the function execution call (I think), so we should
533-
// cancel the call.
534-
this._cancellationTokenSourceExecutor.Cancel();
535-
this._cancellationTokenSourceExecutor = new CancellationTokenSource();
565+
// If we keep renewing the leases, the thread responsible for processing the changes is stuck.
566+
// If it's stuck, it has to be stuck in the function execution call (I think), so we should
567+
// cancel the call.
568+
this._cancellationTokenSourceExecutor.Cancel();
569+
this._cancellationTokenSourceExecutor = new CancellationTokenSource();
570+
}
536571
}
537572
}
538573
}
@@ -790,8 +825,9 @@ LEFT OUTER JOIN {this._userTable.BracketQuotedFullName} AS u ON {userTableJoinCo
790825
/// used by workers to get the changes for processing.
791826
/// </summary>
792827
/// <param name="connection">The connection to add to the returned SqlCommand</param>
828+
/// <param name="transaction">The transaction to add to the returned SqlCommand</param>
793829
/// <returns>The SqlCommand populated with the query and appropriate parameters</returns>
794-
private SqlCommand BuildGetUnprocessedChangesCommand(SqlConnection connection)
830+
private SqlCommand BuildGetUnprocessedChangesCommand(SqlConnection connection, SqlTransaction transaction)
795831
{
796832
string leasesTableJoinCondition = string.Join(" AND ", this._primaryKeyColumns.Select(col => $"c.{col.name.AsBracketQuotedString()} = l.{col.name.AsBracketQuotedString()}"));
797833

@@ -813,7 +849,7 @@ LEFT OUTER JOIN {this._leasesTableName} AS l ON {leasesTableJoinCondition}
813849
(l.{LeasesTableAttemptCountColumnName} IS NULL OR l.{LeasesTableAttemptCountColumnName} < {MaxChangeProcessAttemptCount});
814850
";
815851

816-
return new SqlCommand(getUnprocessedChangesQuery, connection);
852+
return new SqlCommand(getUnprocessedChangesQuery, connection, transaction);
817853
}
818854

819855
/// <summary>
@@ -868,8 +904,9 @@ WHEN NOT MATCHED THEN
868904
/// Builds the query to renew leases on the rows in "_rows" (<see cref="RenewLeasesAsync(CancellationToken)"/>).
869905
/// </summary>
870906
/// <param name="connection">The connection to add to the returned SqlCommand</param>
907+
/// <param name="transaction">The transaction to add to the returned SqlCommand</param>
871908
/// <returns>The SqlCommand populated with the query and appropriate parameters</returns>
872-
private SqlCommand BuildRenewLeasesCommand(SqlConnection connection)
909+
private SqlCommand BuildRenewLeasesCommand(SqlConnection connection, SqlTransaction transaction)
873910
{
874911
string matchCondition = string.Join(" OR ", this._rowMatchConditions.Take(this._rows.Count));
875912

@@ -881,7 +918,7 @@ private SqlCommand BuildRenewLeasesCommand(SqlConnection connection)
881918
WHERE {matchCondition};
882919
";
883920

884-
return this.GetSqlCommandWithParameters(renewLeasesQuery, connection, null, this._rows);
921+
return this.GetSqlCommandWithParameters(renewLeasesQuery, connection, transaction, this._rows);
885922
}
886923

887924
/// <summary>

0 commit comments

Comments
 (0)