From 8823b9f1d77e36a23c054e69bf0c6226440109ef Mon Sep 17 00:00:00 2001 From: Wenxi Zeng Date: Wed, 16 Apr 2025 16:15:11 -0500 Subject: [PATCH 1/4] Fix issue where user-supplied enrichments were lost during the startup phase --- Analytics-CSharp/Segment/Analytics/Analytics.cs | 4 ++-- Analytics-CSharp/Segment/Analytics/Timeline.cs | 6 +++--- Analytics-CSharp/Segment/Analytics/Types.cs | 5 ++++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/Analytics-CSharp/Segment/Analytics/Analytics.cs b/Analytics-CSharp/Segment/Analytics/Analytics.cs index 17d9f9b..1ed41d9 100644 --- a/Analytics-CSharp/Segment/Analytics/Analytics.cs +++ b/Analytics-CSharp/Segment/Analytics/Analytics.cs @@ -86,10 +86,10 @@ public void Process(RawEvent incomingEvent, Func enrichment { if (!Enable) return; - incomingEvent.ApplyRawEventData(_userInfo); + incomingEvent.ApplyRawEventData(_userInfo, enrichment); AnalyticsScope.Launch(AnalyticsDispatcher, () => { - Timeline.Process(incomingEvent, enrichment); + Timeline.Process(incomingEvent); }); } diff --git a/Analytics-CSharp/Segment/Analytics/Timeline.cs b/Analytics-CSharp/Segment/Analytics/Timeline.cs index e0f7e70..09976aa 100644 --- a/Analytics-CSharp/Segment/Analytics/Timeline.cs +++ b/Analytics-CSharp/Segment/Analytics/Timeline.cs @@ -30,15 +30,15 @@ public class Timeline /// event to be processed /// a closure that enables enrichment on the generated event /// event after processing - internal RawEvent Process(RawEvent incomingEvent, Func enrichment = default) + internal RawEvent Process(RawEvent incomingEvent) { // Apply before and enrichment types first to start the timeline processing. RawEvent beforeResult = ApplyPlugins(PluginType.Before, incomingEvent); // Enrichment is like middleware, a chance to update the event across the board before going to destinations. RawEvent enrichmentResult = ApplyPlugins(PluginType.Enrichment, beforeResult); - if (enrichment != null) + if (enrichmentResult.Enrichment != null) { - enrichmentResult = enrichment(enrichmentResult); + enrichmentResult = enrichmentResult.Enrichment(enrichmentResult); } // Make sure not to update the events during this next cycle. Since each destination may want different diff --git a/Analytics-CSharp/Segment/Analytics/Types.cs b/Analytics-CSharp/Segment/Analytics/Types.cs index ae8baae..5a67838 100644 --- a/Analytics-CSharp/Segment/Analytics/Types.cs +++ b/Analytics-CSharp/Segment/Analytics/Types.cs @@ -18,6 +18,8 @@ public abstract class RawEvent public virtual string UserId { get; set; } public virtual string Timestamp { get; set; } + public Func Enrichment { get; set; } + // JSON types public JsonObject Context { get; set; } public JsonObject Integrations { get; set; } @@ -36,8 +38,9 @@ internal void ApplyRawEventData(RawEvent rawEvent) Integrations = rawEvent.Integrations; } - internal void ApplyRawEventData(UserInfo userInfo) + internal void ApplyRawEventData(UserInfo userInfo, Func enrichment) { + Enrichment = enrichment; MessageId = Guid.NewGuid().ToString(); Context = new JsonObject(); Timestamp = DateTime.UtcNow.ToString("o"); // iso8601 From 89669b6dbf5461ef2e563ae99b6b6cd26a024cae Mon Sep 17 00:00:00 2001 From: Wenxi Zeng Date: Tue, 22 Apr 2025 16:31:27 -0500 Subject: [PATCH 2/4] bug fix --- Analytics-CSharp/Segment/Analytics/Plugins/StartupQueue.cs | 2 +- Analytics-CSharp/Segment/Analytics/Timeline.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Analytics-CSharp/Segment/Analytics/Plugins/StartupQueue.cs b/Analytics-CSharp/Segment/Analytics/Plugins/StartupQueue.cs index 6e69655..fe6dc9e 100644 --- a/Analytics-CSharp/Segment/Analytics/Plugins/StartupQueue.cs +++ b/Analytics-CSharp/Segment/Analytics/Plugins/StartupQueue.cs @@ -58,7 +58,7 @@ private void ReplayEvents() { if (_queuedEvents.TryDequeue(out RawEvent e)) { - Analytics.Process(e); + Analytics.Process(e, e.Enrichment); } } } diff --git a/Analytics-CSharp/Segment/Analytics/Timeline.cs b/Analytics-CSharp/Segment/Analytics/Timeline.cs index 09976aa..ccfe86c 100644 --- a/Analytics-CSharp/Segment/Analytics/Timeline.cs +++ b/Analytics-CSharp/Segment/Analytics/Timeline.cs @@ -36,7 +36,7 @@ internal RawEvent Process(RawEvent incomingEvent) RawEvent beforeResult = ApplyPlugins(PluginType.Before, incomingEvent); // Enrichment is like middleware, a chance to update the event across the board before going to destinations. RawEvent enrichmentResult = ApplyPlugins(PluginType.Enrichment, beforeResult); - if (enrichmentResult.Enrichment != null) + if (enrichmentResult != null && enrichmentResult.Enrichment != null) { enrichmentResult = enrichmentResult.Enrichment(enrichmentResult); } From ddb5e35549466983a8abbc29bc08b859d8b4fe2c Mon Sep 17 00:00:00 2001 From: Wenxi Zeng Date: Tue, 22 Apr 2025 16:32:06 -0500 Subject: [PATCH 3/4] add unit tests to simulate startup queue replay --- Tests/EventsTest.cs | 262 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 261 insertions(+), 1 deletion(-) diff --git a/Tests/EventsTest.cs b/Tests/EventsTest.cs index 25e9387..8d3018c 100644 --- a/Tests/EventsTest.cs +++ b/Tests/EventsTest.cs @@ -1,5 +1,6 @@ using System.Collections.Generic; -using System.Threading.Tasks; +using System.Diagnostics; +using System.Threading; using Moq; using Segment.Analytics; using Segment.Analytics.Utilities; @@ -690,4 +691,263 @@ public void TestAliasEnrichment() Assert.Equal("test", actual[0].AnonymousId); } } + + public class DelayedEventsTest + { + private readonly Analytics _analytics; + + private Settings? _settings; + + private readonly Mock _plugin; + + private readonly Mock _afterPlugin; + + private readonly SemaphoreSlim _httpSemaphore; + private readonly SemaphoreSlim _assertSemaphore; + private readonly List _actual; + + public DelayedEventsTest() + { + _httpSemaphore = new SemaphoreSlim(0); + _assertSemaphore = new SemaphoreSlim(0); + _settings = JsonUtility.FromJson( + "{\"integrations\":{\"Segment.io\":{\"apiKey\":\"1vNgUqwJeCHmqgI9S1sOm9UHCyfYqbaQ\"}},\"plan\":{},\"edgeFunction\":{}}"); + + var mockHttpClient = new Mock(null, null, null); + mockHttpClient + .Setup(httpClient => httpClient.Settings()) + .Returns(async () => + { + // suspend http calls until we tracked events + // this will force events get into startup queue + await _httpSemaphore.WaitAsync(); + return _settings; + }); + + _plugin = new Mock + { + CallBase = true + }; + + _afterPlugin = new Mock { CallBase = true }; + _actual = new List(); + _afterPlugin.Setup(o => o.Execute(Capture.In(_actual))) + .Returns((RawEvent e) => + { + // since this is an after plugin, when its execute function is called, + // it is guaranteed that the enrichment closure has been called. + // so we can release the semaphore on assertions. + _assertSemaphore.Release(); + return e; + }); + + var config = new Configuration( + writeKey: "123", + storageProvider: new DefaultStorageProvider("tests"), + autoAddSegmentDestination: false, + useSynchronizeDispatcher: false, // we need async analytics to buildup events on start queue + httpClientProvider: new MockHttpClientProvider(mockHttpClient) + ); + _analytics = new Analytics(config); + } + + [Fact] + public void TestTrackEnrichment() + { + string expectedEvent = "foo"; + string expectedAnonymousId = "bar"; + + _analytics.Add(_afterPlugin.Object); + _analytics.Track(expectedEvent, enrichment: @event => + { + @event.AnonymousId = expectedAnonymousId; + return @event; + }); + + // now we have tracked event, i.e. event added to startup queue + // release the semaphore put on http client, so we startup queue will replay the events + _httpSemaphore.Release(); + // now we need to wait for events being fully replayed before making assertions + _assertSemaphore.Wait(); + + Assert.NotEmpty(_actual); + Assert.IsType(_actual[0]); + var actual = _actual[0] as TrackEvent; + Debug.Assert(actual != null, nameof(actual) + " != null"); + Assert.True(actual.Properties.Count == 0); + Assert.Equal(expectedEvent, actual.Event); + Assert.Equal(expectedAnonymousId, actual.AnonymousId); + } + + [Fact] + public void TestIdentifyEnrichment() + { + var expected = new JsonObject + { + ["foo"] = "bar" + }; + string expectedUserId = "newUserId"; + + _analytics.Add(_afterPlugin.Object); + _analytics.Identify(expectedUserId, expected, @event => + { + if (@event is IdentifyEvent identifyEvent) + { + identifyEvent.Traits["foo"] = "baz"; + } + + return @event; + }); + + // now we have tracked event, i.e. event added to startup queue + // release the semaphore put on http client, so we startup queue will replay the events + _httpSemaphore.Release(); + // now we need to wait for events being fully replayed before making assertions + _assertSemaphore.Wait(); + + string actualUserId = _analytics.UserId(); + + Assert.NotEmpty(_actual); + var actual = _actual[0] as IdentifyEvent; + Debug.Assert(actual != null, nameof(actual) + " != null"); + Assert.Equal(expected, actual.Traits); + Assert.Equal(expectedUserId, actualUserId); + } + + [Fact] + public void TestScreenEnrichment() + { + var expected = new JsonObject + { + ["foo"] = "bar" + }; + string expectedTitle = "foo"; + string expectedCategory = "bar"; + + _analytics.Add(_afterPlugin.Object); + _analytics.Screen(expectedTitle, expected, expectedCategory, @event => + { + if (@event is ScreenEvent screenEvent) + { + screenEvent.Properties["foo"] = "baz"; + } + + return @event; + }); + + // now we have tracked event, i.e. event added to startup queue + // release the semaphore put on http client, so we startup queue will replay the events + _httpSemaphore.Release(); + // now we need to wait for events being fully replayed before making assertions + _assertSemaphore.Wait(); + + Assert.NotEmpty(_actual); + var actual = _actual[0] as ScreenEvent; + Debug.Assert(actual != null, nameof(actual) + " != null"); + Assert.Equal(expected, actual.Properties); + Assert.Equal(expectedTitle, actual.Name); + Assert.Equal(expectedCategory, actual.Category); + } + + [Fact] + public void TestPageEnrichment() + { + var expected = new JsonObject + { + ["foo"] = "bar" + }; + string expectedTitle = "foo"; + string expectedCategory = "bar"; + + _analytics.Add(_afterPlugin.Object); + _analytics.Page(expectedTitle, expected, expectedCategory, @event => + { + if (@event is PageEvent pageEvent) + { + pageEvent.Properties["foo"] = "baz"; + } + + return @event; + }); + + // now we have tracked event, i.e. event added to startup queue + // release the semaphore put on http client, so we startup queue will replay the events + _httpSemaphore.Release(); + // now we need to wait for events being fully replayed before making assertions + _assertSemaphore.Wait(); + + Assert.NotEmpty(_actual); + var actual = _actual[0] as PageEvent; + Debug.Assert(actual != null, nameof(actual) + " != null"); + Assert.Equal(expected, actual.Properties); + Assert.Equal(expectedTitle, actual.Name); + Assert.Equal(expectedCategory, actual.Category); + Assert.Equal("page", actual.Type); + } + + [Fact] + public void TestGroupEnrichment() + { + var expected = new JsonObject + { + ["foo"] = "bar" + }; + string expectedGroupId = "foo"; + + _analytics.Add(_afterPlugin.Object); + _analytics.Group(expectedGroupId, expected, @event => + { + if (@event is GroupEvent groupEvent) + { + groupEvent.Traits["foo"] = "baz"; + } + + return @event; + }); + + // now we have tracked event, i.e. event added to startup queue + // release the semaphore put on http client, so we startup queue will replay the events + _httpSemaphore.Release(); + // now we need to wait for events being fully replayed before making assertions + _assertSemaphore.Wait(); + + Assert.NotEmpty(_actual); + var actual = _actual[0] as GroupEvent; + Debug.Assert(actual != null, nameof(actual) + " != null"); + Assert.Equal(expected, actual.Traits); + Assert.Equal(expectedGroupId, actual.GroupId); + } + + [Fact] + public void TestAliasEnrichment() + { + string expectedPrevious = "foo"; + string expected = "bar"; + + _analytics.Add(_afterPlugin.Object); + _analytics.Identify(expectedPrevious); + _analytics.Alias(expected, @event => + { + if (@event is AliasEvent aliasEvent) + { + aliasEvent.AnonymousId = "test"; + } + + return @event; + }); + + // now we have tracked event, i.e. event added to startup queue + // release the semaphore put on http client, so we startup queue will replay the events + _httpSemaphore.Release(); + // now we need to wait for events being fully replayed before making assertions + _assertSemaphore.Wait(); + + Assert.NotEmpty(_actual); + var actual = _actual.Find(o => o is AliasEvent) as AliasEvent; + Debug.Assert(actual != null, nameof(actual) + " != null"); + Assert.Equal(expectedPrevious, actual.PreviousId); + Assert.Equal(expected, actual.UserId); + Assert.Equal("test", actual.AnonymousId); + } + } } From c1e6b8c174bf97cbd7e8aea53b3141ca312d0d4c Mon Sep 17 00:00:00 2001 From: Wenxi Zeng Date: Tue, 22 Apr 2025 16:43:03 -0500 Subject: [PATCH 4/4] fix unit test --- Tests/EventsTest.cs | 3 --- 1 file changed, 3 deletions(-) diff --git a/Tests/EventsTest.cs b/Tests/EventsTest.cs index 8d3018c..86dac8e 100644 --- a/Tests/EventsTest.cs +++ b/Tests/EventsTest.cs @@ -921,11 +921,9 @@ public void TestGroupEnrichment() [Fact] public void TestAliasEnrichment() { - string expectedPrevious = "foo"; string expected = "bar"; _analytics.Add(_afterPlugin.Object); - _analytics.Identify(expectedPrevious); _analytics.Alias(expected, @event => { if (@event is AliasEvent aliasEvent) @@ -945,7 +943,6 @@ public void TestAliasEnrichment() Assert.NotEmpty(_actual); var actual = _actual.Find(o => o is AliasEvent) as AliasEvent; Debug.Assert(actual != null, nameof(actual) + " != null"); - Assert.Equal(expectedPrevious, actual.PreviousId); Assert.Equal(expected, actual.UserId); Assert.Equal("test", actual.AnonymousId); }