1+ // Copyright (c) Microsoft Corporation. All rights reserved.
2+ // Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+ using System ;
5+ using System . Collections . Generic ;
6+ using System . Diagnostics ;
7+ using System . Linq ;
8+ using System . Text ;
9+ using System . Threading . Tasks ;
10+ using Microsoft . Azure . WebJobs . Extensions . Sql . Samples . Common ;
11+ using Microsoft . Azure . WebJobs . Extensions . Sql . Tests . Common ;
12+ using Newtonsoft . Json ;
13+ using Xunit ;
14+ using Xunit . Abstractions ;
15+
16+ namespace Microsoft . Azure . WebJobs . Extensions . Sql . Tests . Integration
17+ {
18+ public class SqlTriggerBindingIntegrationTestBase : IntegrationTestBase
19+ {
20+ public SqlTriggerBindingIntegrationTestBase ( ITestOutputHelper output = null ) : base ( output )
21+ {
22+ this . EnableChangeTrackingForDatabase ( ) ;
23+ }
24+
25+ private void EnableChangeTrackingForDatabase ( )
26+ {
27+ this . ExecuteNonQuery ( $@ "
28+ ALTER DATABASE [{ this . DatabaseName } ]
29+ SET CHANGE_TRACKING = ON
30+ (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
31+ " ) ;
32+ }
33+
34+ public void SetChangeTrackingForTable ( string tableName , bool enable = true )
35+ {
36+ this . ExecuteNonQuery ( $@ "
37+ ALTER TABLE [dbo].[{ tableName } ]
38+ { ( enable ? "ENABLE" : "DISABLE" ) } CHANGE_TRACKING;
39+ " ) ;
40+ }
41+
42+ public void InsertProducts ( int firstId , int lastId )
43+ {
44+ // Only 1000 items are allowed to be inserted into a single INSERT statement so if we have more than 1000 batch them up into separate statements
45+ var builder = new StringBuilder ( ) ;
46+ do
47+ {
48+ int batchCount = Math . Min ( lastId - firstId + 1 , 1000 ) ;
49+ builder . Append ( $ "INSERT INTO [dbo].[Products] VALUES { string . Join ( ",\n " , Enumerable . Range ( firstId , batchCount ) . Select ( id => $ "({ id } , 'Product { id } ', { id * 100 } )") ) } ; ") ;
50+ firstId += batchCount ;
51+ } while ( firstId < lastId ) ;
52+ this . ExecuteNonQuery ( builder . ToString ( ) ) ;
53+ }
54+
55+ protected void UpdateProducts ( int firstId , int lastId )
56+ {
57+ int count = lastId - firstId + 1 ;
58+ this . ExecuteNonQuery (
59+ "UPDATE [dbo].[Products]\n " +
60+ "SET Name = 'Updated ' + Name\n " +
61+ "WHERE ProductId IN (" + string . Join ( ", " , Enumerable . Range ( firstId , count ) ) + ");" ) ;
62+ }
63+
64+ protected void DeleteProducts ( int firstId , int lastId )
65+ {
66+ int count = lastId - firstId + 1 ;
67+ this . ExecuteNonQuery (
68+ "DELETE FROM [dbo].[Products]\n " +
69+ "WHERE ProductId IN (" + string . Join ( ", " , Enumerable . Range ( firstId , count ) ) + ");" ) ;
70+ }
71+
72+ public async Task WaitForProductChanges (
73+ int firstId ,
74+ int lastId ,
75+ SqlChangeOperation operation ,
76+ Func < Task > actions ,
77+ Func < int , string > getName ,
78+ Func < int , int > getCost ,
79+ int timeoutMs ,
80+ string messagePrefix = "SQL Changes: " )
81+ {
82+ var expectedIds = Enumerable . Range ( firstId , lastId - firstId + 1 ) . ToHashSet ( ) ;
83+ int index = 0 ;
84+
85+ var taskCompletion = new TaskCompletionSource < bool > ( ) ;
86+
87+ void MonitorOutputData ( object sender , DataReceivedEventArgs e )
88+ {
89+ if ( e . Data != null && ( index = e . Data . IndexOf ( messagePrefix , StringComparison . Ordinal ) ) >= 0 )
90+ {
91+ string json = e . Data [ ( index + messagePrefix . Length ) ..] ;
92+ // Sometimes we'll get messages that have extra logging content on the same line - so to prevent that from breaking
93+ // the deserialization we look for the end of the changes array and only use that.
94+ // (This is fine since we control what content is in the array so know that none of the items have a ] in them)
95+ json = json [ ..( json . IndexOf ( ']' ) + 1 ) ] ;
96+ IReadOnlyList < SqlChange < Product > > changes ;
97+ try
98+ {
99+ changes = JsonConvert . DeserializeObject < IReadOnlyList < SqlChange < Product > > > ( json ) ;
100+ }
101+ catch ( Exception ex )
102+ {
103+ throw new InvalidOperationException ( $ "Exception deserializing JSON content. Error={ ex . Message } Json=\" { json } \" ", ex ) ;
104+ }
105+ foreach ( SqlChange < Product > change in changes )
106+ {
107+ Assert . Equal ( operation , change . Operation ) ; // Expected change operation
108+ Product product = change . Item ;
109+ Assert . NotNull ( product ) ; // Product deserialized correctly
110+ Assert . Contains ( product . ProductID , expectedIds ) ; // We haven't seen this product ID yet, and it's one we expected to see
111+ expectedIds . Remove ( product . ProductID ) ;
112+ Assert . Equal ( getName ( product . ProductID ) , product . Name ) ; // The product has the expected name
113+ Assert . Equal ( getCost ( product . ProductID ) , product . Cost ) ; // The product has the expected cost
114+ }
115+ if ( expectedIds . Count == 0 )
116+ {
117+ taskCompletion . SetResult ( true ) ;
118+ }
119+ }
120+ } ;
121+ // Set up listener for the changes coming in
122+ foreach ( Process functionHost in this . FunctionHostList )
123+ {
124+ functionHost . OutputDataReceived += MonitorOutputData ;
125+ }
126+
127+ // Now that we've set up our listener trigger the actions to monitor
128+ await actions ( ) ;
129+
130+ // Now wait until either we timeout or we've gotten all the expected changes, whichever comes first
131+ Console . WriteLine ( $ "[{ DateTime . UtcNow : u} ] Waiting for { operation } changes ({ timeoutMs } ms)") ;
132+ await taskCompletion . Task . TimeoutAfter ( TimeSpan . FromMilliseconds ( timeoutMs ) , $ "Timed out waiting for { operation } changes.") ;
133+
134+ // Unhook handler since we're done monitoring these changes so we aren't checking other changes done later
135+ foreach ( Process functionHost in this . FunctionHostList )
136+ {
137+ functionHost . OutputDataReceived -= MonitorOutputData ;
138+ }
139+ }
140+
141+ /// <summary>
142+ /// Launches the functions runtime host, waits for it to encounter error while starting the SQL trigger listener,
143+ /// and asserts that the logged error message matches with the supplied error message.
144+ /// </summary>
145+ /// <param name="functionName">Name of the user function that should cause error in trigger listener</param>
146+ /// <param name="useTestFolder">Whether the functions host should be launched from test folder</param>
147+ /// <param name="expectedErrorMessage">Expected error message string</param>
148+ protected void StartFunctionHostAndWaitForError ( string functionName , bool useTestFolder , string expectedErrorMessage )
149+ {
150+ string errorMessage = null ;
151+ var tcs = new TaskCompletionSource < bool > ( ) ;
152+
153+ void OutputHandler ( object sender , DataReceivedEventArgs e )
154+ {
155+ if ( errorMessage == null && e . Data ? . Contains ( "Failed to start SQL trigger listener" ) == true )
156+ {
157+ // SQL trigger listener throws exception of type InvalidOperationException for all error conditions.
158+ string exceptionPrefix = "Exception: System.InvalidOperationException: " ;
159+ int index = e . Data . IndexOf ( exceptionPrefix , StringComparison . Ordinal ) ;
160+ Assert . NotEqual ( - 1 , index ) ;
161+
162+ errorMessage = e . Data [ ( index + exceptionPrefix . Length ) ..] ;
163+ tcs . SetResult ( true ) ;
164+ }
165+ } ;
166+
167+ // All trigger integration tests are only using C# functions for testing at the moment.
168+ this . StartFunctionHost ( functionName , SupportedLanguages . CSharp , useTestFolder , OutputHandler ) ;
169+
170+ // The functions host generally logs the error message within a second after starting up.
171+ const int BufferTimeForErrorInSeconds = 15 ;
172+ bool isCompleted = tcs . Task . Wait ( TimeSpan . FromSeconds ( BufferTimeForErrorInSeconds ) ) ;
173+
174+ this . FunctionHost . OutputDataReceived -= OutputHandler ;
175+ this . FunctionHost . Kill ( true ) ;
176+
177+ Assert . True ( isCompleted , "Functions host did not log failure to start SQL trigger listener within specified time." ) ;
178+ Assert . Equal ( expectedErrorMessage , errorMessage ) ;
179+ }
180+
181+ /// <summary>
182+ /// Gets a timeout value to use when processing the given number of changes, based on the
183+ /// default batch size and polling interval.
184+ /// </summary>
185+ /// <param name="firstId">The first ID in the batch to process</param>
186+ /// <param name="lastId">The last ID in the batch to process</param>
187+ /// <param name="batchSize">The batch size if different than the default batch size</param>
188+ /// <param name="pollingIntervalMs">The polling interval in ms if different than the default polling interval</param>
189+ /// <returns></returns>
190+ protected int GetBatchProcessingTimeout ( int firstId , int lastId , int batchSize = SqlTableChangeMonitor < object > . DefaultBatchSize , int pollingIntervalMs = SqlTableChangeMonitor < object > . DefaultPollingIntervalMs )
191+ {
192+ int changesToProcess = lastId - firstId + 1 ;
193+ int calculatedTimeout = ( int ) ( Math . Ceiling ( ( double ) changesToProcess / batchSize // The number of batches to process
194+ / this . FunctionHostList . Count ) // The number of function host processes
195+ * pollingIntervalMs // The length to process each batch
196+ * 2 ) ; // Double to add buffer time for processing results & writing log messages
197+
198+ // Always have a timeout of at least 10sec since there's a certain amount of overhead
199+ // always expected from each run regardless of the number of batches being processed and the delay
200+ // These tests aren't testing performance so giving extra processing time is fine as long as the
201+ // results themselves are correct
202+ return Math . Max ( calculatedTimeout , 10000 ) ;
203+ }
204+ }
205+ }
0 commit comments