Skip to content

Commit e9ba942

Browse files
Merge branch 'triggerbindings' into release/LogicApps
# Conflicts: # performance/SqlBindingBenchmarks.cs # src/TriggerBinding/SqlTableChangeMonitor.cs # src/TriggerBinding/SqlTriggerConstants.cs # test/Integration/IntegrationTestBase.cs # test/Integration/SqlTriggerBindingIntegrationTests.cs
2 parents 78328c6 + de73eed commit e9ba942

File tree

9 files changed

+444
-158
lines changed

9 files changed

+444
-158
lines changed

performance/SqlBindingBenchmarks.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ public static void Main()
1111
{
1212
BenchmarkRunner.Run<SqlInputBindingPerformance>();
1313
BenchmarkRunner.Run<SqlOutputBindingPerformance>();
14+
BenchmarkRunner.Run<SqlTriggerBindingPerformance>();
1415
}
1516
}
1617
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System.Threading.Tasks;
5+
using Microsoft.Azure.WebJobs.Extensions.Sql.Samples.TriggerBindingSamples;
6+
using Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Common;
7+
using Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Integration;
8+
using BenchmarkDotNet.Attributes;
9+
10+
namespace Microsoft.Azure.WebJobs.Extensions.Sql.Performance
11+
{
12+
public class SqlTriggerBindingPerformance : SqlTriggerBindingIntegrationTests
13+
{
14+
[GlobalSetup]
15+
public void GlobalSetup()
16+
{
17+
this.EnableChangeTrackingForTable("Products");
18+
this.StartFunctionHost(nameof(ProductsTrigger), SupportedLanguages.CSharp);
19+
}
20+
21+
[Benchmark]
22+
[Arguments(1)]
23+
[Arguments(10)]
24+
[Arguments(100)]
25+
[Arguments(1000)]
26+
public async Task ProductsTriggerTest(int count)
27+
{
28+
await this.WaitForProductChanges(
29+
1,
30+
count,
31+
SqlChangeOperation.Insert,
32+
() => { this.InsertProducts(1, count); return Task.CompletedTask; },
33+
id => $"Product {id}",
34+
id => id * 100,
35+
GetBatchProcessingTimeout(1, count));
36+
}
37+
38+
[IterationCleanup]
39+
public void IterationCleanup()
40+
{
41+
// Delete all rows in Products table after each iteration
42+
this.ExecuteNonQuery("TRUNCATE TABLE Products");
43+
}
44+
45+
[GlobalCleanup]
46+
public void GlobalCleanup()
47+
{
48+
this.Dispose();
49+
}
50+
}
51+
}

src/Telemetry/Telemetry.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ public enum TelemetryEventName
346346
TableInfoCacheMiss,
347347
TriggerFunctionEnd,
348348
TriggerFunctionStart,
349+
TriggerMonitorStart,
349350
UpsertEnd,
350351
UpsertStart,
351352
}
@@ -359,6 +360,8 @@ public enum TelemetryPropertyName
359360
ErrorName,
360361
ExceptionType,
361362
HasIdentityColumn,
363+
HasConfiguredBatchSize,
364+
HasConfiguredPollingInterval,
362365
LeasesTableName,
363366
QueryType,
364367
ServerVersion,
@@ -373,6 +376,7 @@ public enum TelemetryMeasureName
373376
{
374377
AcquireLeasesDurationMs,
375378
BatchCount,
379+
BatchSize,
376380
CommandDurationMs,
377381
CreatedSchemaDurationMs,
378382
CreateGlobalStateTableDurationMs,
@@ -383,6 +387,7 @@ public enum TelemetryMeasureName
383387
GetColumnDefinitionsDurationMs,
384388
GetPrimaryKeysDurationMs,
385389
InsertGlobalStateTableRowDurationMs,
390+
PollingIntervalMs,
386391
ReleaseLeasesDurationMs,
387392
RetryAttemptNumber,
388393
SetLastSyncVersionDurationMs,

src/TriggerBinding/SqlTableChangeMonitor.cs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ internal sealed class SqlTableChangeMonitor<T> : IDisposable
4343
private const int LeaseIntervalInSeconds = 60;
4444
private const int LeaseRenewalIntervalInSeconds = 15;
4545
private const int MaxRetryReleaseLeases = 3;
46+
47+
public const int DefaultBatchSize = 100;
48+
public const int DefaultPollingIntervalMs = 1000;
4649
#endregion Constants
4750

4851
private readonly string _connectionString;
@@ -58,11 +61,11 @@ internal sealed class SqlTableChangeMonitor<T> : IDisposable
5861
/// <summary>
5962
/// Number of changes to process in each iteration of the loop
6063
/// </summary>
61-
private readonly int _batchSize = 10;
64+
private readonly int _batchSize = DefaultBatchSize;
6265
/// <summary>
6366
/// Delay in ms between processing each batch of changes
6467
/// </summary>
65-
private readonly int _pollingIntervalInMs = 5000;
68+
private readonly int _pollingIntervalInMs = DefaultPollingIntervalMs;
6669

6770
private readonly CancellationTokenSource _cancellationTokenSourceCheckForChanges = new CancellationTokenSource();
6871
private readonly CancellationTokenSource _cancellationTokenSourceRenewLeases = new CancellationTokenSource();
@@ -117,16 +120,31 @@ public SqlTableChangeMonitor(
117120
this._logger = logger ?? throw new ArgumentNullException(nameof(logger));
118121

119122
this._userTableId = userTableId;
123+
this._telemetryProps = telemetryProps ?? new Dictionary<TelemetryPropertyName, string>();
124+
120125
// Check if there's config settings to override the default batch size/polling interval values
121-
this._batchSize = configuration.GetValue<int?>(SqlTriggerConstants.ConfigKey_SqlTrigger_BatchSize) ?? this._batchSize;
122-
this._pollingIntervalInMs = configuration.GetValue<int?>(SqlTriggerConstants.ConfigKey_SqlTrigger_PollingInterval) ?? this._pollingIntervalInMs;
126+
int? configuredBatchSize = configuration.GetValue<int?>(SqlTriggerConstants.ConfigKey_SqlTrigger_BatchSize);
127+
int? configuredPollingInterval = configuration.GetValue<int?>(SqlTriggerConstants.ConfigKey_SqlTrigger_BatchSize);
128+
this._batchSize = configuredBatchSize ?? this._batchSize;
129+
this._pollingIntervalInMs = configuredPollingInterval ?? this._pollingIntervalInMs;
130+
var monitorStartProps = new Dictionary<TelemetryPropertyName, string>(telemetryProps)
131+
{
132+
{ TelemetryPropertyName.HasConfiguredBatchSize, (configuredBatchSize != null).ToString() },
133+
{ TelemetryPropertyName.HasConfiguredPollingInterval, (configuredPollingInterval != null).ToString() },
134+
};
135+
TelemetryInstance.TrackEvent(
136+
TelemetryEventName.TriggerMonitorStart,
137+
monitorStartProps,
138+
new Dictionary<TelemetryMeasureName, double>() {
139+
{ TelemetryMeasureName.BatchSize, this._batchSize },
140+
{ TelemetryMeasureName.PollingIntervalMs, this._pollingIntervalInMs }
141+
});
142+
123143
// Prep search-conditions that will be used besides WHERE clause to match table rows.
124144
this._rowMatchConditions = Enumerable.Range(0, this._batchSize)
125145
.Select(rowIndex => string.Join(" AND ", this._primaryKeyColumns.Select((col, colIndex) => $"{col.AsBracketQuotedString()} = @{rowIndex}_{colIndex}")))
126146
.ToList();
127147

128-
this._telemetryProps = telemetryProps ?? new Dictionary<TelemetryPropertyName, string>();
129-
130148
#pragma warning disable CS4014 // Queue the below tasks and exit. Do not wait for their completion.
131149
_ = Task.Run(() =>
132150
{
@@ -591,8 +609,8 @@ private long RecomputeLastSyncVersion()
591609
// have leases acquired on them by another worker.
592610
// Therefore, if there are more than one version numbers in the set, return the second highest one. Otherwise, return
593611
// the only version number in the set.
594-
// Also this LastSyncVersion is actually updated in the GlobalState table only after verifying that the changes with
595-
// changeVersion <= newLastSyncVersion have been processed in BuildUpdateTablesPostInvocation query.
612+
// Also this LastSyncVersion is actually updated in the GlobalState table only after verifying that the changes with
613+
// changeVersion <= newLastSyncVersion have been processed in BuildUpdateTablesPostInvocation query.
596614
long lastSyncVersion = changeVersionSet.ElementAt(changeVersionSet.Count > 1 ? changeVersionSet.Count - 2 : 0);
597615
this._logger.LogDebugWithThreadId($"RecomputeLastSyncVersion. LastSyncVersion={lastSyncVersion} ChangeVersionSet={string.Join(",", changeVersionSet)}");
598616
return lastSyncVersion;

src/TriggerBinding/SqlTriggerConstants.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,17 @@ internal static class SqlTriggerConstants
1515
public const string LeasesTableAttemptCountColumnName = "_az_func_AttemptCount";
1616
public const string LeasesTableLeaseExpirationTimeColumnName = "_az_func_LeaseExpirationTime";
1717

18+
/// <summary>
19+
/// The column names that are used in internal state tables and so can't exist in the target table
20+
/// since that shares column names with the primary keys from each user table being monitored.
21+
/// </summary>
22+
public static readonly string[] ReservedColumnNames = new string[]
23+
{
24+
LeasesTableChangeVersionColumnName,
25+
LeasesTableAttemptCountColumnName,
26+
LeasesTableLeaseExpirationTimeColumnName
27+
};
28+
1829
public const string ConfigKey_SqlTrigger_BatchSize = "Sql_Trigger_BatchSize";
1930
public const string ConfigKey_SqlTrigger_PollingInterval = "Sql_Trigger_PollingIntervalMs";
2031
}

src/TriggerBinding/SqlTriggerListener.cs

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -257,22 +257,6 @@ FROM sys.indexes AS i
257257
throw new InvalidOperationException($"Could not find primary key created in table: '{this._userTable.FullName}'.");
258258
}
259259

260-
string[] reservedColumnNames = new[]
261-
{
262-
SqlTriggerConstants.LeasesTableChangeVersionColumnName,
263-
SqlTriggerConstants.LeasesTableAttemptCountColumnName,
264-
SqlTriggerConstants.LeasesTableLeaseExpirationTimeColumnName
265-
};
266-
267-
var conflictingColumnNames = primaryKeyColumns.Select(col => col.name).Intersect(reservedColumnNames).ToList();
268-
269-
if (conflictingColumnNames.Count > 0)
270-
{
271-
string columnNames = string.Join(", ", conflictingColumnNames.Select(col => $"'{col}'"));
272-
throw new InvalidOperationException($"Found reserved column name(s): {columnNames} in table: '{this._userTable.FullName}'." +
273-
" Please rename them to be able to use trigger binding.");
274-
}
275-
276260
this._logger.LogDebugWithThreadId($"END GetPrimaryKeyColumns ColumnNames(types) = {string.Join(", ", primaryKeyColumns.Select(col => $"'{col.name}({col.type})'"))}.");
277261
return primaryKeyColumns;
278262
}
@@ -317,6 +301,15 @@ FROM sys.columns AS c
317301
throw new InvalidOperationException($"Found column(s) with unsupported type(s): {columnNamesAndTypes} in table: '{this._userTable.FullName}'.");
318302
}
319303

304+
var conflictingColumnNames = userTableColumns.Intersect(SqlTriggerConstants.ReservedColumnNames).ToList();
305+
306+
if (conflictingColumnNames.Count > 0)
307+
{
308+
string columnNames = string.Join(", ", conflictingColumnNames.Select(col => $"'{col}'"));
309+
throw new InvalidOperationException($"Found reserved column name(s): {columnNames} in table: '{this._userTable.FullName}'." +
310+
" Please rename them to be able to use trigger binding.");
311+
}
312+
320313
this._logger.LogDebugWithThreadId($"END GetUserTableColumns ColumnNames = {string.Join(", ", userTableColumns.Select(col => $"'{col}'"))}.");
321314
return userTableColumns;
322315
}

test/Common/TestUtils.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Data;
77
using System.Diagnostics;
88
using System.Threading;
9+
using System.Threading.Tasks;
910

1011
namespace Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Common
1112
{
@@ -159,5 +160,22 @@ public static string CleanJsonString(string jsonStr)
159160
{
160161
return jsonStr.Trim().Replace(" ", "").Replace(Environment.NewLine, "");
161162
}
163+
164+
public static async Task<TResult> TimeoutAfter<TResult>(this Task<TResult> task, TimeSpan timeout, string message = "The operation has timed out.")
165+
{
166+
167+
using var timeoutCancellationTokenSource = new CancellationTokenSource();
168+
169+
Task completedTask = await Task.WhenAny(task, Task.Delay(timeout, timeoutCancellationTokenSource.Token));
170+
if (completedTask == task)
171+
{
172+
timeoutCancellationTokenSource.Cancel();
173+
return await task; // Very important in order to propagate exceptions
174+
}
175+
else
176+
{
177+
throw new TimeoutException(message);
178+
}
179+
}
162180
}
163181
}

test/Integration/IntegrationTestBase.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
using Microsoft.AspNetCore.WebUtilities;
1919
using System.Collections.Generic;
2020
using System.Linq;
21+
using static Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry.Telemetry;
2122

2223
namespace Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Integration
2324
{
@@ -199,6 +200,10 @@ protected void StartFunctionHost(string functionName, SupportedLanguages languag
199200
{
200201
environmentVariables.ToList().ForEach(ev => startInfo.EnvironmentVariables[ev.Key] = ev.Value);
201202
}
203+
204+
// Always disable telemetry during test runs
205+
startInfo.EnvironmentVariables[TelemetryOptoutEnvVar] = "1";
206+
202207
this.LogOutput($"Starting {startInfo.FileName} {startInfo.Arguments} in {startInfo.WorkingDirectory}");
203208
this.FunctionHost = new Process
204209
{

0 commit comments

Comments
 (0)