Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import {
WaitingPlugin,
PluginType,
Plugin,

} from '@segment/analytics-react-native';

import type {SegmentAPISettings, SegmentClient, SegmentEvent, UpdateType} from '@segment/analytics-react-native';
export class ExampleWaitingPlugin extends WaitingPlugin {
type = PluginType.enrichment;
analytics = undefined;
tracked = false;

/**
* Called when settings are updated
*/
update(_settings: SegmentAPISettings, _type: UpdateType) {
if (this.type === PluginType.before) {
// delay 3 seconds, then resume event processing
setTimeout(() => {
this.resume();
}, 3000);
}
}

/**
* Called for track events
*/
track(event: SegmentEvent) {
this.tracked = true;
return event;
}
}
82 changes: 77 additions & 5 deletions packages/core/src/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import {
translateHTTPError,
} from './errors';
import { QueueFlushingPlugin } from './plugins/QueueFlushingPlugin';
import { WaitingPlugin } from './plugin';

type OnPluginAddedCallback = (plugin: Plugin) => void;

Expand Down Expand Up @@ -200,6 +201,18 @@ export class SegmentClient {
this.store = store;
this.timeline = new Timeline();

// // create and add waiting plugin immediately so early events get buffered.
// try {
// this.waitingPlugin = new WaitingPlugin();
// // add directly to timeline via addPlugin to ensure configure() is called immediately
// this.addPlugin(this.waitingPlugin);
// // initial running state false until init completes (mirrors Kotlin semantics)
// this.isRunning = false;
// } catch (e) {
// // if WaitingPlugin instantiation or add fails, fallback to running=true
// this.isRunning = true;
// }

// Initialize the watchables
this.context = {
get: this.store.context.get,
Expand Down Expand Up @@ -295,7 +308,6 @@ export class SegmentClient {
if ((await this.store.isReady.get(true)) === false) {
await this.storageReady();
}

// Get new settings from segment
// It's important to run this before checkInstalledVersion and trackDeeplinks to give time for destination plugins
// which make use of the settings object to initialize
Expand All @@ -309,7 +321,6 @@ export class SegmentClient {
]);
await this.onReady();
this.isReady.value = true;

// Process all pending events
await this.processPendingEvents();
// Trigger manual flush
Expand Down Expand Up @@ -465,17 +476,24 @@ export class SegmentClient {
settings
);
}

if (!this.isReady.value) {
console.log('!this.isReady.value', !this.isReady.value);
if (!this.isReady.value && !(plugin instanceof WaitingPlugin)) {
this.pluginsToAdd.push(plugin);
} else {
console.log('this.addPlugin');
this.addPlugin(plugin);
}
}

private addPlugin(plugin: Plugin) {
plugin.configure(this);
this.timeline.add(plugin);
//check for waiting plugin here
if (plugin instanceof WaitingPlugin) {
console.log('add plugin');
this.pauseEventProcessingForPlugin(plugin);
}

this.triggerOnPluginLoaded(plugin);
}

Expand Down Expand Up @@ -512,7 +530,7 @@ export class SegmentClient {
): Promise<SegmentEvent | undefined> {
const event = await this.applyContextData(incomingEvent);
this.flushPolicyExecuter.notify(event);
return this.timeline.process(event);
return await this.timeline.process(event);
}

private async trackDeepLinks() {
Expand Down Expand Up @@ -1027,4 +1045,58 @@ export class SegmentClient {

return totalEventsCount;
}
private resumeTimeoutId?: ReturnType<typeof setTimeout>;
private waitingPlugins = new Set<WaitingPlugin>();

/**
* Pause event processing globally. Events will be buffered into pendingEvents and WaitingPlugin.
* An auto-resume will be scheduled after `timeout` ms.
*/
pauseEventProcessingForPlugin(plugin?: WaitingPlugin) {
if (plugin) {
this.waitingPlugins.add(plugin);
}
this.pauseEventProcessing();
}
async resumeEventProcessingForPlugin(plugin?: WaitingPlugin) {
if (plugin) {
this.waitingPlugins.delete(plugin);
}
if (this.waitingPlugins.size > 0) {
return; // still blocked
}

await this.resumeEventProcessing();
}

pauseEventProcessing(timeout = 30000) {
// IMPORTANT: ignore repeated pauses
if (!this.isReady.value) {
return;
}

this.isReady.value = false;

// clear previous timeout if any
if (this.resumeTimeoutId) {
clearTimeout(this.resumeTimeoutId);
}

this.resumeTimeoutId = setTimeout(async () => {
await this.resumeEventProcessing();
}, timeout);
}
async resumeEventProcessing() {
if (this.isReady.value) {
return;
}

if (this.resumeTimeoutId) {
clearTimeout(this.resumeTimeoutId);
this.resumeTimeoutId = undefined;
}
// this.waitingPlugins.clear();
this.isReady.value = true;
await this.processPendingEvents();
}
}
33 changes: 31 additions & 2 deletions packages/core/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ export class DestinationPlugin extends EventPlugin {
key = '';

timeline = new Timeline();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
store: any;

private hasSettings() {
return this.analytics?.settings.get()?.[this.key] !== undefined;
}

private isEnabled(event: SegmentEvent): boolean {
protected isEnabled(event: SegmentEvent): boolean {
let customerDisabled = false;
if (event.integrations?.[this.key] === false) {
customerDisabled = true;
Expand All @@ -140,6 +142,10 @@ export class DestinationPlugin extends EventPlugin {
if (analytics) {
plugin.configure(analytics);
}

if (analytics && plugin instanceof WaitingPlugin) {
analytics.pauseEventProcessingForPlugin(plugin);
}
this.timeline.add(plugin);
return plugin;
}
Expand Down Expand Up @@ -179,7 +185,6 @@ export class DestinationPlugin extends EventPlugin {
type: PluginType.before,
event,
});

if (beforeResult === undefined) {
return;
}
Expand Down Expand Up @@ -210,3 +215,27 @@ export class UtilityPlugin extends EventPlugin {}

// For internal platform-specific bits
export class PlatformPlugin extends Plugin {}

export { PluginType };

/**
* WaitingPlugin
* Buffers events when paused and releases them when resumed.
*/
export class WaitingPlugin extends Plugin {
constructor() {
super();
}

configure(analytics: SegmentClient) {
super.configure(analytics);
}

pause() {
this.analytics?.pauseEventProcessingForPlugin(this);
}

async resume() {
await this.analytics?.resumeEventProcessingForPlugin(this);
}
}
Loading
Loading