Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 133 additions & 2 deletions packages/firestore/src/api/pipeline_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,22 @@
* limitations under the License.
*/

import { Pipeline } from '../api/pipeline';
import { firestoreClientExecutePipeline } from '../core/firestore_client';
// Re-adding necessary imports that were removed previously
import {
CompleteFn,
ErrorFn,
isPartialObserver,
NextFn,
PartialObserver
} from '../api/observer';
import {
firestoreClientExecutePipeline,
firestoreClientListen
} from '../core/firestore_client';
import { ListenerDataSource } from '../core/event_manager';

Check failure on line 30 in packages/firestore/src/api/pipeline_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

`../core/event_manager` import should occur before import of `../core/firestore_client`

Check failure on line 30 in packages/firestore/src/api/pipeline_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

`../core/event_manager` import should occur before import of `../core/firestore_client`
import { toCorePipeline } from '../core/pipeline-util';
import { ViewSnapshot } from '../core/view_snapshot';
import {

Check failure on line 33 in packages/firestore/src/api/pipeline_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

`../core/structured_pipeline` import should occur before import of `../core/view_snapshot`

Check failure on line 33 in packages/firestore/src/api/pipeline_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

`../core/structured_pipeline` import should occur before import of `../core/view_snapshot`
StructuredPipeline,
StructuredPipelineOptions
} from '../core/structured_pipeline';
Expand All @@ -31,10 +44,15 @@
UserDataReader,
UserDataSource
} from '../lite-api/user_data_reader';
import { FirestoreError } from '../util/error';
import { cast } from '../util/input_validation';

import { ensureFirestoreConfigured, Firestore } from './database';
import { Pipeline } from './pipeline'; // Keep this specific Pipeline import if needed alongside LitePipeline
import { RealtimePipeline } from './realtime_pipeline';
import { DocumentReference } from './reference';
import { SnapshotListenOptions, Unsubscribe } from './reference_impl';
import { RealtimePipelineSnapshot } from './snapshot';
import { ExpUserDataWriter } from './user_data_writer';

declare module './database' {
Expand All @@ -49,6 +67,7 @@
*/
interface Firestore {
pipeline(): PipelineSource<Pipeline>;
realtimePipeline(): PipelineSource<RealtimePipeline>;
}
}

Expand Down Expand Up @@ -179,3 +198,115 @@
}
);
};

Firestore.prototype.realtimePipeline =
function (): PipelineSource<RealtimePipeline> {
const userDataReader = newUserDataReader(this);
return new PipelineSource<RealtimePipeline>(
this._databaseId,
userDataReader,
(stages: Stage[]) => {
return new RealtimePipeline(
this,
newUserDataReader(this),
new ExpUserDataWriter(this),
stages
);
}
);
};

/**
* @internal
* @private
*/
export function _onRealtimePipelineSnapshot(
pipeline: RealtimePipeline,
observer: {
next?: (snapshot: RealtimePipelineSnapshot) => void;
error?: (error: FirestoreError) => void;
complete?: () => void;
}
): Unsubscribe;
/**
* @internal
* @private
*/
export function _onRealtimePipelineSnapshot(
pipeline: RealtimePipeline,
options: SnapshotListenOptions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our design doc was updated to use PipelineListenOptions, which will add serverTimestamps config and remove source. Though, if we want to support source, we can. go/firestore-api-realtime-pipelines

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be refactored in a future PR, as long as we track it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More generally, we will need to put together a design for how pipeline options are passed to onSnapshot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I forgot about serverTimestamp here. Let's proceed without this for now. I added one item to the tracking sheet.

observer: {
next?: (snapshot: RealtimePipelineSnapshot) => void;
error?: (error: FirestoreError) => void;
complete?: () => void;
}
): Unsubscribe;
/**
* @internal
* @private
*/
export function _onRealtimePipelineSnapshot(
pipeline: RealtimePipeline,
onNext: (snapshot: RealtimePipelineSnapshot) => void,
onError?: (error: FirestoreError) => void,
onComplete?: () => void
): Unsubscribe;
/**
* @internal
* @private
*/
export function _onRealtimePipelineSnapshot(
pipeline: RealtimePipeline,
options: SnapshotListenOptions,
onNext: (snapshot: RealtimePipelineSnapshot) => void,
onError?: (error: FirestoreError) => void,
onComplete?: () => void
): Unsubscribe;
export function _onRealtimePipelineSnapshot(
pipeline: RealtimePipeline,
...args: unknown[]
): Unsubscribe {
let options: SnapshotListenOptions = {
includeMetadataChanges: false,
source: 'default'
};
let currArg = 0;
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
options = args[currArg] as SnapshotListenOptions;
currArg++;
}

const internalOptions = {
includeMetadataChanges: options.includeMetadataChanges,
source: options.source as ListenerDataSource
};

let userObserver: PartialObserver<RealtimePipelineSnapshot>;
if (isPartialObserver(args[currArg])) {
userObserver = args[currArg] as PartialObserver<RealtimePipelineSnapshot>;
} else {
userObserver = {
next: args[currArg] as NextFn<RealtimePipelineSnapshot>,
error: args[currArg + 1] as ErrorFn,
complete: args[currArg + 2] as CompleteFn
};
}

const client = ensureFirestoreConfigured(pipeline._db as Firestore);
const observer = {
next: (snapshot: ViewSnapshot) => {
if (userObserver.next) {
userObserver.next(new RealtimePipelineSnapshot(pipeline, snapshot));
}
},
error: userObserver.error,
complete: userObserver.complete
};

return firestoreClientListen(
client,
toCorePipeline(pipeline),
internalOptions, // Pass parsed options here
observer
);
}
101 changes: 99 additions & 2 deletions packages/firestore/src/api/reference_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
firestoreClientListen,
firestoreClientWrite
} from '../core/firestore_client';
import { newQueryForPath, Query as InternalQuery } from '../core/query';
import { QueryOrPipeline, toCorePipeline } from '../core/pipeline-util';

Check failure on line 38 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

'QueryOrPipeline' is defined but never used

Check failure on line 38 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

'QueryOrPipeline' is defined but never used. Allowed unused vars must match /^_/u

Check failure on line 38 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

'QueryOrPipeline' is defined but never used

Check failure on line 38 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

'QueryOrPipeline' is defined but never used. Allowed unused vars must match /^_/u
import { Query as InternalQuery, newQueryForPath } from '../core/query';
import { ViewSnapshot } from '../core/view_snapshot';
import { FieldPath } from '../lite-api/field_path';
import { validateHasExplicitOrderByForLimitToLast } from '../lite-api/query';
Expand Down Expand Up @@ -69,9 +70,12 @@
DocumentSnapshot,
FirestoreDataConverter,
QuerySnapshot,
RealtimePipelineSnapshot,
SnapshotMetadata
} from './snapshot';
import { ExpUserDataWriter } from './user_data_writer';
import { RealtimePipeline } from './realtime_pipeline';

Check failure on line 77 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

`./realtime_pipeline` import should occur before import of `./snapshot`

Check failure on line 77 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

There should be at least one empty line between import groups

Check failure on line 77 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

`./realtime_pipeline` import should occur before import of `./snapshot`

Check failure on line 77 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

There should be at least one empty line between import groups
import { CorePipeline } from '../core/pipeline';

Check failure on line 78 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

`../core/pipeline` import should occur before import of `../core/pipeline-util`

Check failure on line 78 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

`../core/pipeline` import should occur before import of `../core/pipeline-util`

/**
* An options object that can be passed to {@link (onSnapshot:1)} and {@link
Expand Down Expand Up @@ -197,6 +201,10 @@
*
* @returns A `Promise` that will be resolved with the results of the query.
*/
export function getDocs<AppModelType, DbModelType extends DocumentData>(
query: Query<AppModelType, DbModelType>
): Promise<QuerySnapshot<AppModelType, DbModelType>>;

export function getDocs<AppModelType, DbModelType extends DocumentData>(
query: Query<AppModelType, DbModelType>
): Promise<QuerySnapshot<AppModelType, DbModelType>> {
Expand All @@ -214,7 +222,7 @@
new QuerySnapshot<AppModelType, DbModelType>(
firestore,
userDataWriter,
query,
query as Query<AppModelType, DbModelType>,
snapshot
)
);
Expand Down Expand Up @@ -642,6 +650,7 @@
onError?: (error: FirestoreError) => void,
onCompletion?: () => void
): Unsubscribe;

export function onSnapshot<AppModelType, DbModelType extends DocumentData>(
reference:
| Query<AppModelType, DbModelType>
Expand Down Expand Up @@ -1061,6 +1070,94 @@
}
}

export function onPipelineSnapshot(
query: RealtimePipeline,
observer: {
next?: (snapshot: RealtimePipelineSnapshot) => void;
error?: (error: FirestoreError) => void;
complete?: () => void;
}
): Unsubscribe;
export function onPipelineSnapshot(
query: RealtimePipeline,
options: SnapshotListenOptions,
observer: {
next?: (snapshot: RealtimePipelineSnapshot) => void;
error?: (error: FirestoreError) => void;
complete?: () => void;
}
): Unsubscribe;
export function onPipelineSnapshot(
query: RealtimePipeline,
onNext: (snapshot: RealtimePipelineSnapshot) => void,
onError?: (error: FirestoreError) => void,
onCompletion?: () => void
): Unsubscribe;
export function onPipelineSnapshot(
query: RealtimePipeline,
options: SnapshotListenOptions,
onNext: (snapshot: RealtimePipelineSnapshot) => void,
onError?: (error: FirestoreError) => void,
onCompletion?: () => void
): Unsubscribe;
export function onPipelineSnapshot(
reference: RealtimePipeline,
...args: unknown[]
): Unsubscribe {
reference = getModularInstance(reference);

let options: SnapshotListenOptions = {
includeMetadataChanges: false,
source: 'default'
};
let currArg = 0;
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
options = args[currArg] as SnapshotListenOptions;
currArg++;
}

const internalOptions = {
includeMetadataChanges: options.includeMetadataChanges,
source: options.source as ListenerDataSource
};

if (isPartialObserver(args[currArg])) {
const userObserver = args[
currArg
] as PartialObserver<RealtimePipelineSnapshot>;
args[currArg] = userObserver.next?.bind(userObserver);
args[currArg + 1] = userObserver.error?.bind(userObserver);
args[currArg + 2] = userObserver.complete?.bind(userObserver);
}

let observer: PartialObserver<ViewSnapshot>;
let firestore: Firestore;
let internalQuery: CorePipeline;

// RealtimePipeline
firestore = cast(reference._db, Firestore);

Check failure on line 1138 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

'firestore' is never reassigned. Use 'const' instead

Check failure on line 1138 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

'firestore' is never reassigned. Use 'const' instead
internalQuery = toCorePipeline(reference);

Check failure on line 1139 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

'internalQuery' is never reassigned. Use 'const' instead

Check failure on line 1139 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

'internalQuery' is never reassigned. Use 'const' instead
observer = {

Check failure on line 1140 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

'observer' is never reassigned. Use 'const' instead

Check failure on line 1140 in packages/firestore/src/api/reference_impl.ts

View workflow job for this annotation

GitHub Actions / Lint

'observer' is never reassigned. Use 'const' instead
next: snapshot => {
if (args[currArg]) {
(args[currArg] as NextFn<RealtimePipelineSnapshot>)(
new RealtimePipelineSnapshot(reference as RealtimePipeline, snapshot)
);
}
},
error: args[currArg + 1] as ErrorFn,
complete: args[currArg + 2] as CompleteFn
};

const client = ensureFirestoreConfigured(firestore);
return firestoreClientListen(
client,
internalQuery,
internalOptions,
observer
);
}

// TODO(firestorexp): Make sure these overloads are tested via the Firestore
// integration tests

Expand Down
Loading
Loading