diff --git a/examples/AnalyticsReactNativeExample/plugins/ExampleWaitingPlugin.tsx b/examples/AnalyticsReactNativeExample/plugins/ExampleWaitingPlugin.tsx new file mode 100644 index 00000000..1015966e --- /dev/null +++ b/examples/AnalyticsReactNativeExample/plugins/ExampleWaitingPlugin.tsx @@ -0,0 +1,33 @@ +import { + WaitingPlugin, + PluginType, + Plugin, + +} from '@segment/analytics-react-native'; + +import type {SegmentAPISettings, SegmentClient, SegmentEvent, UpdateType} from '@segment/analytics-react-native'; +export class ExampleWaitingPlugin extends WaitingPlugin { + type = PluginType.enrichment; + analytics = undefined; + tracked = false; + + /** + * Called when settings are updated + */ + update(_settings: SegmentAPISettings, _type: UpdateType) { + if (this.type === PluginType.before) { + // delay 3 seconds, then resume event processing + setTimeout(() => { + this.resume(); + }, 3000); + } + } + + /** + * Called for track events + */ + track(event: SegmentEvent) { + this.tracked = true; + return event; + } +} \ No newline at end of file diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index 1ded23b6..8975eb76 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -72,6 +72,7 @@ import { translateHTTPError, } from './errors'; import { QueueFlushingPlugin } from './plugins/QueueFlushingPlugin'; +import { WaitingPlugin } from './plugin'; type OnPluginAddedCallback = (plugin: Plugin) => void; @@ -200,6 +201,18 @@ export class SegmentClient { this.store = store; this.timeline = new Timeline(); + // // create and add waiting plugin immediately so early events get buffered. + // try { + // this.waitingPlugin = new WaitingPlugin(); + // // add directly to timeline via addPlugin to ensure configure() is called immediately + // this.addPlugin(this.waitingPlugin); + // // initial running state false until init completes (mirrors Kotlin semantics) + // this.isRunning = false; + // } catch (e) { + // // if WaitingPlugin instantiation or add fails, fallback to running=true + // this.isRunning = true; + // } + // Initialize the watchables this.context = { get: this.store.context.get, @@ -295,7 +308,6 @@ export class SegmentClient { if ((await this.store.isReady.get(true)) === false) { await this.storageReady(); } - // Get new settings from segment // It's important to run this before checkInstalledVersion and trackDeeplinks to give time for destination plugins // which make use of the settings object to initialize @@ -309,7 +321,6 @@ export class SegmentClient { ]); await this.onReady(); this.isReady.value = true; - // Process all pending events await this.processPendingEvents(); // Trigger manual flush @@ -465,10 +476,11 @@ export class SegmentClient { settings ); } - - if (!this.isReady.value) { + console.log('!this.isReady.value', !this.isReady.value); + if (!this.isReady.value && !(plugin instanceof WaitingPlugin)) { this.pluginsToAdd.push(plugin); } else { + console.log('this.addPlugin'); this.addPlugin(plugin); } } @@ -476,6 +488,12 @@ export class SegmentClient { private addPlugin(plugin: Plugin) { plugin.configure(this); this.timeline.add(plugin); + //check for waiting plugin here + if (plugin instanceof WaitingPlugin) { + console.log('add plugin'); + this.pauseEventProcessingForPlugin(plugin); + } + this.triggerOnPluginLoaded(plugin); } @@ -512,7 +530,7 @@ export class SegmentClient { ): Promise { const event = await this.applyContextData(incomingEvent); this.flushPolicyExecuter.notify(event); - return this.timeline.process(event); + return await this.timeline.process(event); } private async trackDeepLinks() { @@ -1027,4 +1045,58 @@ export class SegmentClient { return totalEventsCount; } + private resumeTimeoutId?: ReturnType; + private waitingPlugins = new Set(); + + /** + * Pause event processing globally. Events will be buffered into pendingEvents and WaitingPlugin. + * An auto-resume will be scheduled after `timeout` ms. + */ + pauseEventProcessingForPlugin(plugin?: WaitingPlugin) { + if (plugin) { + this.waitingPlugins.add(plugin); + } + this.pauseEventProcessing(); + } + async resumeEventProcessingForPlugin(plugin?: WaitingPlugin) { + if (plugin) { + this.waitingPlugins.delete(plugin); + } + if (this.waitingPlugins.size > 0) { + return; // still blocked + } + + await this.resumeEventProcessing(); + } + + pauseEventProcessing(timeout = 30000) { + // IMPORTANT: ignore repeated pauses + if (!this.isReady.value) { + return; + } + + this.isReady.value = false; + + // clear previous timeout if any + if (this.resumeTimeoutId) { + clearTimeout(this.resumeTimeoutId); + } + + this.resumeTimeoutId = setTimeout(async () => { + await this.resumeEventProcessing(); + }, timeout); + } + async resumeEventProcessing() { + if (this.isReady.value) { + return; + } + + if (this.resumeTimeoutId) { + clearTimeout(this.resumeTimeoutId); + this.resumeTimeoutId = undefined; + } + // this.waitingPlugins.clear(); + this.isReady.value = true; + await this.processPendingEvents(); + } } diff --git a/packages/core/src/plugin.ts b/packages/core/src/plugin.ts index d0328d19..d1bd1840 100644 --- a/packages/core/src/plugin.ts +++ b/packages/core/src/plugin.ts @@ -115,12 +115,14 @@ export class DestinationPlugin extends EventPlugin { key = ''; timeline = new Timeline(); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + store: any; private hasSettings() { return this.analytics?.settings.get()?.[this.key] !== undefined; } - private isEnabled(event: SegmentEvent): boolean { + protected isEnabled(event: SegmentEvent): boolean { let customerDisabled = false; if (event.integrations?.[this.key] === false) { customerDisabled = true; @@ -140,6 +142,10 @@ export class DestinationPlugin extends EventPlugin { if (analytics) { plugin.configure(analytics); } + + if (analytics && plugin instanceof WaitingPlugin) { + analytics.pauseEventProcessingForPlugin(plugin); + } this.timeline.add(plugin); return plugin; } @@ -179,7 +185,6 @@ export class DestinationPlugin extends EventPlugin { type: PluginType.before, event, }); - if (beforeResult === undefined) { return; } @@ -210,3 +215,27 @@ export class UtilityPlugin extends EventPlugin {} // For internal platform-specific bits export class PlatformPlugin extends Plugin {} + +export { PluginType }; + +/** + * WaitingPlugin + * Buffers events when paused and releases them when resumed. + */ +export class WaitingPlugin extends Plugin { + constructor() { + super(); + } + + configure(analytics: SegmentClient) { + super.configure(analytics); + } + + pause() { + this.analytics?.pauseEventProcessingForPlugin(this); + } + + async resume() { + await this.analytics?.resumeEventProcessingForPlugin(this); + } +} diff --git a/packages/core/src/plugins/__tests__/Waiting.test.ts b/packages/core/src/plugins/__tests__/Waiting.test.ts new file mode 100644 index 00000000..d97a7e87 --- /dev/null +++ b/packages/core/src/plugins/__tests__/Waiting.test.ts @@ -0,0 +1,338 @@ +import { SegmentClient } from '../../analytics'; +import { DestinationPlugin } from '../../plugin'; + +//import { SegmentDestination } from '../SegmentDestination'; +import { + ExampleWaitingPlugin, + ExampleWaitingPlugin1, + getMockLogger, + ManualResumeWaitingPlugin, + MockSegmentStore, + StubDestinationPlugin, +} from '../../test-helpers'; + +jest.useFakeTimers(); + +describe('WaitingPlugin', () => { + const store = new MockSegmentStore(); + const baseConfig = { + writeKey: 'test-key', + flushAt: 1, + flushInterval: 0, + trackAppLifecycleEvents: false, + autoAddSegmentDestination: false, + }; + + beforeEach(() => { + store.reset(); + jest.clearAllMocks(); + jest.clearAllTimers(); + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + test('test resume after timeout', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + client.isReady.value = true; + expect(client.isReady.value).toBe(true); + + client.pauseEventProcessing(1000); + + expect(client.isReady.value).toBe(false); + + jest.advanceTimersByTime(2000); + + // Allow microtasks from setTimeout → resumeEventProcessing + await Promise.resolve(); + + expect(client.isReady.value).toBe(true); + }); + test('test manual resume', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + client.isReady.value = true; + expect(client.isReady.value).toBe(true); + + client.pauseEventProcessing(); + + expect(client.isReady.value).toBe(false); + + await client.resumeEventProcessing(); + + expect(client.isReady.value).toBe(true); + }); + test('pause does not dispatch timeout if already paused', () => { + const setTimeoutSpy = jest.spyOn(global, 'setTimeout'); + + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + client.isReady.value = true; + + client.pauseEventProcessing(); + client.pauseEventProcessing(); + client.pauseEventProcessing(); + + expect(setTimeoutSpy).toHaveBeenCalledTimes(1); + }); + test('WaitingPlugin makes analytics wait', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + client.isReady.value = true; + expect(client.isReady.value).toBe(true); + + const plugin = new ExampleWaitingPlugin(); + + client.add({ plugin }); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const trackSpy = jest.spyOn(client as any, 'startTimelineProcessing'); + + client.track('foo'); + + expect(client.isReady.value).toBe(false); + + // Event should NOT be processed while paused + expect(trackSpy).not.toHaveBeenCalled(); + + await Promise.resolve(); + + jest.advanceTimersByTime(1000); + await Promise.resolve(); + + expect(client.isReady.value).toBe(true); + + // Event should now be processed + expect(trackSpy).toHaveBeenCalledTimes(1); + }); + + test('timeout force resume', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + client.isReady.value = true; + expect(client.isReady.value).toBe(true); + + const waitingPlugin = new ManualResumeWaitingPlugin(); + client.add({ plugin: waitingPlugin }); + + client.track('foo'); + + expect(client.isReady.value).toBe(false); + expect(waitingPlugin.tracked).toBe(false); + + await jest.advanceTimersByTimeAsync(30000); + + await Promise.resolve(); + await Promise.resolve(); + + expect(client.isReady.value).toBe(true); + expect(waitingPlugin.tracked).toBe(true); + }); + test('multiple WaitingPlugins', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + // Initially, analytics is running + client.isReady.value = true; + expect(client.isReady.value).toBe(true); + + // Create two waiting plugins + const plugin1 = new ExampleWaitingPlugin1(); + const plugin2 = new ManualResumeWaitingPlugin(); + + // Add plugins to client + client.add({ plugin: plugin1 }); + client.add({ plugin: plugin2 }); + + // Track an event while waiting plugins are active + client.track('foo'); + + // Client should now be paused + expect(client.isReady.value).toBe(false); + + // Plugins should not have tracked the event yet + expect(plugin1.tracked).toBe(false); + expect(plugin2.tracked).toBe(false); + + // Resume the first plugin + await plugin1.resume(); + // Advance timers to simulate any internal delays + jest.advanceTimersByTime(6000); + await Promise.resolve(); + + // Still paused because plugin2 is waiting + expect(client.isReady.value).toBe(false); + expect(plugin1.tracked).toBe(false); + expect(plugin2.tracked).toBe(false); + + // Resume the second plugin + await plugin2.resume(); + // Advance timers to flush + jest.advanceTimersByTime(6000); + await Promise.resolve(); + + // Now analytics should be running + expect(client.isReady.value).toBe(true); + // Both plugins should have tracked the event + expect(plugin1.tracked).toBe(true); + expect(plugin2.tracked).toBe(true); + }); + test('WaitingPlugin makes analytics to wait on DestinationPlugin', async () => { + jest.useFakeTimers(); + + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + // Initially, analytics is running + client.isReady.value = true; + expect(client.isReady.value).toBe(true); + const waitingPlugin = new ExampleWaitingPlugin1(); + const stubDestinationPlugin: DestinationPlugin = + new StubDestinationPlugin(); + // Add destination to analytics + client.add({ plugin: stubDestinationPlugin }); + // Add waiting plugin inside destination + stubDestinationPlugin.add(waitingPlugin); + + // Track event + await client.track('foo'); + + // Analytics should pause + expect(client.isReady.value).toBe(false); + expect(waitingPlugin.tracked).toBe(false); + await Promise.resolve(); + + jest.advanceTimersByTime(30000); + await jest.runAllTimersAsync(); + + // 🔑 flush remaining promise chains + await Promise.resolve(); + await Promise.resolve(); + + // Analytics resumed + expect(client.isReady.value).toBe(true); + // Waiting plugin executed + expect(waitingPlugin.tracked).toBe(true); + + jest.useRealTimers(); + }); + test('timeout force resume on DestinationPlugin', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + // analytics running initially + client.isReady.value = true; + expect(client.isReady.value).toBe(true); + + const waitingPlugin = new ExampleWaitingPlugin1(); // no manual resume + const destinationPlugin: DestinationPlugin = new StubDestinationPlugin(); + + // add destination + client.add({ plugin: destinationPlugin }); + + // add waiting plugin inside destination + destinationPlugin.add(waitingPlugin); + + // track event + await client.track('foo'); + + // analytics should pause + expect(client.isReady.value).toBe(false); + expect(waitingPlugin.tracked).toBe(false); + + await Promise.resolve(); + + jest.advanceTimersByTime(6000); + await jest.runAllTimersAsync(); + + await Promise.resolve(); + await Promise.resolve(); + + // analytics resumed + expect(client.isReady.value).toBe(true); + + // waiting plugin executed + expect(waitingPlugin.tracked).toBe(true); + }); + test('test multiple WaitingPlugin on DestinationPlugin', async () => { + const client = new SegmentClient({ + config: baseConfig, + logger: getMockLogger(), + store, + }); + + // analytics running initially + client.isReady.value = true; + expect(client.isReady.value).toBe(true); + + const destinationPlugin: DestinationPlugin = new StubDestinationPlugin(); + client.add({ plugin: destinationPlugin }); + + const plugin1 = new ExampleWaitingPlugin1(); + const plugin2 = new ManualResumeWaitingPlugin(); + + destinationPlugin.add(plugin1); + destinationPlugin.add(plugin2); + + // track event + await client.track('foo'); + + // analytics paused + expect(client.isReady.value).toBe(false); + expect(plugin1.tracked).toBe(false); + expect(plugin2.tracked).toBe(false); + // Resume the first plugin + await plugin1.resume(); + + jest.advanceTimersByTime(6000); + await Promise.resolve(); + + // still paused because plugin2 not resumed + expect(client.isReady.value).toBe(false); + expect(plugin1.tracked).toBe(false); + expect(plugin2.tracked).toBe(false); + plugin2.resume(); + + jest.advanceTimersByTime(6000); + await jest.runAllTimersAsync(); + await Promise.resolve(); + // analytics resumed + expect(client.isReady.value).toBe(true); + + // both plugins executed + expect(plugin1.tracked).toBe(true); + expect(plugin2.tracked).toBe(true); + }); +}); diff --git a/packages/core/src/test-helpers/exampleWaitingPlugin.ts b/packages/core/src/test-helpers/exampleWaitingPlugin.ts new file mode 100644 index 00000000..d47c02b0 --- /dev/null +++ b/packages/core/src/test-helpers/exampleWaitingPlugin.ts @@ -0,0 +1,62 @@ +import { PluginType, SegmentEvent } from '../types'; +import { SegmentClient } from '../analytics'; +import { DestinationPlugin, WaitingPlugin } from '../plugin'; + +export class ExampleWaitingPlugin extends WaitingPlugin { + public type = PluginType.enrichment; + public tracked = false; + + configure(analytics: SegmentClient) { + console.log('exampleWaitingPlugin configure'); + super.configure(analytics); + // Simulate async work (network, native module init, etc.) + setTimeout(() => { + console.log('ExampleWaitingPlugin: ready!'); + void analytics.resumeEventProcessing(); + }, 1000); + } + + execute(event: SegmentEvent): SegmentEvent | undefined { + super.execute(event); + this.tracked = true; + return event; + } +} +export class ExampleWaitingPlugin1 extends WaitingPlugin { + public type = PluginType.before; + public tracked = false; + + constructor() { + super(); + } + configure(analytics: SegmentClient) { + super.configure(analytics); + } + execute(event: SegmentEvent): SegmentEvent | undefined { + console.log('ExampleWaitingPlugin1 received event', event.type); + this.tracked = true; + return event; + } +} + +export class ManualResumeWaitingPlugin extends WaitingPlugin { + public type = PluginType.enrichment; + public tracked = false; + + configure(analytics: SegmentClient) { + super.configure(analytics); + } + + execute(event: SegmentEvent): SegmentEvent | undefined { + console.log('ManualResumeWaitingPlugin received event', event.type); + this.tracked = true; + return event; + } +} + +export class StubDestinationPlugin extends DestinationPlugin { + key = 'StubDestination'; + protected isEnabled(_event: SegmentEvent): boolean { + return true; + } +} diff --git a/packages/core/src/test-helpers/index.ts b/packages/core/src/test-helpers/index.ts index d8a6aff0..90b8d350 100644 --- a/packages/core/src/test-helpers/index.ts +++ b/packages/core/src/test-helpers/index.ts @@ -5,3 +5,5 @@ export * from './mockSegmentStore'; export * from './mockTimeline'; export * from './setupSegmentClient'; export * from './utils'; +export * from './exampleWaitingPlugin'; +