Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/fix-on-demand-isready.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@tanstack/db': patch
'@tanstack/query-db-collection': patch
---

Fix `isReady` tracking for on-demand live queries without orderBy. Previously, non-ordered live queries using `syncMode: 'on-demand'` were incorrectly marked as ready before data finished loading. Also fix `preload()` promises hanging when cleanup occurs before the collection becomes ready. Additionally, fix concurrent live queries subscribing to the same source collection - each now independently tracks loading state.
14 changes: 14 additions & 0 deletions packages/db/src/collection/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,21 @@ export class CollectionLifecycleManager<
}

this.hasBeenReady = false

// Call any pending onFirstReady callbacks before clearing them.
// This ensures preload() promises resolve during cleanup instead of hanging.
const callbacks = [...this.onFirstReadyCallbacks]
this.onFirstReadyCallbacks = []
callbacks.forEach((callback) => {
try {
callback()
} catch (error) {
console.error(
`${this.config.id ? `[${this.config.id}] ` : ``}Error in onFirstReady callback during cleanup:`,
error,
)
}
})

// Set status to cleaned-up after everything is cleaned up
// This fires the status:change event to notify listeners
Expand Down
52 changes: 49 additions & 3 deletions packages/db/src/query/live/collection-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,64 @@ export class CollectionSubscriber<
this.sendChangesToPipeline(changes)
}

// Create subscription with onStatusChange - listener is registered before snapshot
// Note: For non-ordered queries (no limit/offset), we use trackLoadSubsetPromise: false
// which is the default behavior in subscribeChanges
// Create subscription with includeInitialState. This uses trackLoadSubsetPromise: false
// internally, which is required for truncate handling to work correctly.
const subscription = this.collection.subscribeChanges(sendChanges, {
...(includeInitialState && { includeInitialState }),
whereExpression,
onStatusChange,
})

// Track loading state for the live query's isReady status.
// We can't rely on subscription status changes (trackLoadSubsetPromise: false breaks that),
// so we check if the source collection is loading and track when it finishes.
// Each live query needs its own tracking even if another query already started loading,
// since each query's isReady state is independent.
if (includeInitialState && this.collection.isLoadingSubset) {
this.trackCollectionLoading()
}

return subscription
}

/**
* Track the source collection's loading state for the live query's isReady.
* Creates a promise that resolves when the collection finishes loading.
*/
private trackCollectionLoading(): void {
// Handle race condition: if loading already ended, no tracking needed
if (!this.collection.isLoadingSubset) {
return
}

let resolve: () => void
const promise = new Promise<void>((res) => {
resolve = res
})

this.collectionConfigBuilder.liveQueryCollection!._sync.trackLoadPromise(
promise,
)

const unsubscribe = this.collection.on(`loadingSubset:change`, (event) => {
if (event.loadingSubsetTransition === `end`) {
cleanup()
}
})

// Cleanup function to unsubscribe and resolve promise
const cleanup = () => {
unsubscribe()
resolve()
}

// Register cleanup for when the subscription is unsubscribed early
// (e.g., component unmounts before loading completes)
this.collectionConfigBuilder.currentSyncState!.unsubscribeCallbacks.add(
cleanup,
)
}

private subscribeToOrderedChanges(
whereExpression: BasicExpression<boolean> | undefined,
orderByInfo: OrderByOptimizationInfo,
Expand Down
85 changes: 69 additions & 16 deletions packages/db/tests/query/live-query-collection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,71 @@ describe(`createLiveQueryCollection`, () => {
expect(liveQuery.status).toBe(`ready`)
expect(liveQuery.isLoadingSubset).toBe(false)
})

it(`concurrent live queries should each track loading state independently`, async () => {
// This tests the fix for the !wasLoadingBefore bug:
// When multiple live queries subscribe to the same source collection,
// each must independently track when loading finishes.
// Previously, only the first live query would track loading because
// wasLoadingBefore was true for subsequent queries.

let resolveLoadSubset: () => void
const loadSubsetPromise = new Promise<void>((resolve) => {
resolveLoadSubset = resolve
})

const sourceCollection = createCollection<{ id: number; value: number }>({
id: `source-concurrent-lq`,
getKey: (item) => item.id,
syncMode: `on-demand`,
startSync: true,
sync: {
sync: ({ markReady, begin, write, commit }) => {
begin()
write({ type: `insert`, value: { id: 1, value: 10 } })
commit()
markReady()

return {
loadSubset: () => loadSubsetPromise,
}
},
},
})

// Create TWO live queries that subscribe to the same source collection
const liveQuery1 = createLiveQueryCollection({
query: (q) => q.from({ item: sourceCollection }),
startSync: true,
})

const liveQuery2 = createLiveQueryCollection({
query: (q) => q.from({ item: sourceCollection }),
startSync: true,
})

// Wait for both subscriptions to start and trigger loadSubset
await flushPromises()
await new Promise((resolve) => setTimeout(resolve, 10))

// Source should be ready
expect(sourceCollection.isReady()).toBe(true)

// Both live queries should be loading (not ready yet)
// KEY ASSERTION: Without the fix, liveQuery2 would be 'ready' here
// because it skipped tracking when wasLoadingBefore was true
expect(liveQuery1.status).toBe(`loading`)
expect(liveQuery2.status).toBe(`loading`)

// Resolve the loadSubset promise
resolveLoadSubset!()
await flushPromises()
await new Promise((resolve) => setTimeout(resolve, 10))

// Now both should be ready
expect(liveQuery1.status).toBe(`ready`)
expect(liveQuery2.status).toBe(`ready`)
})
})

describe(`move functionality`, () => {
Expand Down Expand Up @@ -2160,10 +2225,6 @@ describe(`createLiveQueryCollection`, () => {
describe(`where clauses passed to loadSubset`, () => {
it(`passes eq where clause to loadSubset`, async () => {
const capturedOptions: Array<LoadSubsetOptions> = []
let resolveLoadSubset: () => void
const loadSubsetPromise = new Promise<void>((resolve) => {
resolveLoadSubset = resolve
})

const baseCollection = createCollection<{ id: number; name: string }>({
id: `test-base`,
Expand All @@ -2175,7 +2236,8 @@ describe(`createLiveQueryCollection`, () => {
return {
loadSubset: (options: LoadSubsetOptions) => {
capturedOptions.push(options)
return loadSubsetPromise
// Return true to indicate sync is complete (no async loading)
return true
},
}
},
Expand All @@ -2200,17 +2262,10 @@ describe(`createLiveQueryCollection`, () => {
if (lastCall?.where?.type === `func`) {
expect(lastCall.where.name).toBe(`eq`)
}

resolveLoadSubset!()
await flushPromises()
})

it(`passes ilike where clause to loadSubset`, async () => {
const capturedOptions: Array<LoadSubsetOptions> = []
let resolveLoadSubset: () => void
const loadSubsetPromise = new Promise<void>((resolve) => {
resolveLoadSubset = resolve
})

const baseCollection = createCollection<{ id: number; name: string }>({
id: `test-base`,
Expand All @@ -2222,7 +2277,8 @@ describe(`createLiveQueryCollection`, () => {
return {
loadSubset: (options: LoadSubsetOptions) => {
capturedOptions.push(options)
return loadSubsetPromise
// Return true to indicate sync is complete (no async loading)
return true
},
}
},
Expand Down Expand Up @@ -2252,9 +2308,6 @@ describe(`createLiveQueryCollection`, () => {
if (lastCall?.where?.type === `func`) {
expect(lastCall.where.name).toBe(`ilike`)
}

resolveLoadSubset!()
await flushPromises()
})

it(`passes single orderBy clause to loadSubset when using limit`, async () => {
Expand Down
21 changes: 21 additions & 0 deletions packages/query-db-collection/src/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,15 @@ export function queryCollectionOptions(
// Error already occurred, reject immediately
return Promise.reject(currentResult.error)
} else {
// Check QueryClient cache directly - observer's getCurrentResult() may show
// a loading state even when data exists in cache. This happens because observer
// state can lag behind the QueryClient cache during unsubscribe/resubscribe
// cycles (e.g., when a live query is cleaned up and recreated).
const cachedData = queryClient.getQueryData(key)
if (cachedData !== undefined) {
return true
}

// Query is still loading, wait for the first result
return new Promise<void>((resolve, reject) => {
const unsubscribe = observer.subscribe((result) => {
Expand Down Expand Up @@ -745,6 +754,18 @@ export function queryCollectionOptions(
(queryRefCounts.get(hashedQueryKey) || 0) + 1,
)

// Check if data already exists in QueryClient cache (persisted within gcTime from
// a previous observer). This avoids creating unnecessary promises and subscription
// delays when recreating an observer for data that's already cached.
const cachedData = queryClient.getQueryData(key)
if (cachedData !== undefined) {
// Still subscribe if sync is active so we receive future updates
if (syncStarted || collection.subscriberCount > 0) {
subscribeToQuery(localObserver, hashedQueryKey)
}
return true
}

// Create a promise that resolves when the query result is first available
const readyPromise = new Promise<void>((resolve, reject) => {
const unsubscribe = localObserver.subscribe((result) => {
Expand Down
Loading