From b8cf1cf7b8310490a3fdd167841e73e73efb1591 Mon Sep 17 00:00:00 2001 From: Rhys Howell Date: Tue, 25 Feb 2025 12:49:58 -0500 Subject: [PATCH 1/3] feat(analysis): occasionally unblock thread while analysing documents to allow abort --- src/schema-analyzer.ts | 37 +++++++++++++++------ src/schema-converters/internalToExpanded.ts | 2 +- src/schema-converters/internalToMongoDB.ts | 2 +- src/schema-converters/internalToStandard.ts | 2 +- src/{schema-converters => }/util.ts | 2 ++ 5 files changed, 32 insertions(+), 13 deletions(-) rename src/{schema-converters => }/util.ts (83%) diff --git a/src/schema-analyzer.ts b/src/schema-analyzer.ts index 631cefa..5f68d56 100644 --- a/src/schema-analyzer.ts +++ b/src/schema-analyzer.ts @@ -18,6 +18,7 @@ import { import semanticTypes from './semantic-types'; import { AnyIterable } from './types'; +import { allowAbort, ALLOW_ABORT_INTERVAL_COUNT } from './util'; type TypeCastMap = { Array: unknown[]; @@ -484,6 +485,10 @@ export class SchemaAnalyzer { fields: [] }; + // Increments when every field or type is analyzed. + // Useful for occasionally checking if the analysis should be aborted. + fieldAndTypeAnalysisCounter = 0; + constructor(options?: SchemaParseOptions) { // Set default options. this.options = { ...defaultSchemaParseOptions, ...options }; @@ -512,6 +517,13 @@ export class SchemaAnalyzer { } } + allowAbort() { + // Allow aborting the analysis. + if (this.fieldAndTypeAnalysisCounter++ % ALLOW_ABORT_INTERVAL_COUNT === 0) { + allowAbort(); + } + } + increaseFieldCount() { if (!this.options.distinctFieldsAbortThreshold) return; this.fieldsCount++; @@ -531,14 +543,15 @@ export class SchemaAnalyzer { return returnValue; } - analyzeDoc(doc: Document) { + async analyzeDoc(doc: Document) { this.finalized = false; /** * Takes a field value, determines the correct type, handles recursion into * nested arrays and documents, and passes the value down to `addToValue`. * Note: This mutates the `schema` argument. */ - const addToType = (path: string[], value: BSONValue, schema: SchemaAnalysisFieldTypes) => { + const addToType = async(path: string[], value: BSONValue, schema: SchemaAnalysisFieldTypes) => { + await this.allowAbort(); const bsonType = getBSONType(value); // If semantic type detection is enabled, the type is the semantic type // or the original bson type if no semantic type was detected. If disabled, @@ -560,13 +573,16 @@ export class SchemaAnalyzer { type.types = type.types ?? Object.create(null); type.lengths = type.lengths ?? []; type.lengths.push((value as BSONValue[]).length); - (value as BSONValue[]).forEach((v: BSONValue) => addToType(path, v, type.types)); + for (const v of (value as BSONValue[])) { + await addToType(path, v, type.types); + } } else if (isDocumentType(type)) { // Recurse into nested documents by calling `addToField` for all sub-fields. type.fields = type.fields ?? Object.create(null); - Object.entries(value as Document).forEach( - ([fieldName, v]) => addToField(fieldName, [...path, fieldName], v, type.fields) - ); + + for (const [fieldName, v] of Object.entries(value as Document)) { + await addToField(fieldName, [...path, fieldName], v, type.fields); + } } else if (this.options.storeValues && !isNullType(type)) { // When the `storeValues` option is enabled, store some example values. if (!type.values) { @@ -584,7 +600,8 @@ export class SchemaAnalyzer { * Handles a field from a document. Passes the value to `addToType`. * Note: This mutates the `schema` argument. */ - const addToField = (fieldName: string, path: string[], value: BSONValue, schema: SchemaAnalysisFieldsMap) => { + const addToField = async(fieldName: string, path: string[], value: BSONValue, schema: SchemaAnalysisFieldsMap) => { + await this.allowAbort(); if (!schema[fieldName]) { schema[fieldName] = { name: fieldName, @@ -597,11 +614,11 @@ export class SchemaAnalyzer { const field = schema[fieldName]; field.count++; - addToType(path, value, field.types); + await addToType(path, value, field.types); }; for (const key of Object.keys(doc)) { - addToField(key, [key], doc[key], this.schemaAnalysisRoot.fields); + await addToField(key, [key], doc[key], this.schemaAnalysisRoot.fields); } this.schemaAnalysisRoot.count += 1; } @@ -652,7 +669,7 @@ export async function getCompletedSchemaAnalyzer( const analyzer = new SchemaAnalyzer(options); for await (const doc of verifyStreamSource(source)) { if (options?.signal?.aborted) throw options.signal.reason; - analyzer.analyzeDoc(doc); + await analyzer.analyzeDoc(doc); } return analyzer; } diff --git a/src/schema-converters/internalToExpanded.ts b/src/schema-converters/internalToExpanded.ts index fbe8217..0124e8c 100644 --- a/src/schema-converters/internalToExpanded.ts +++ b/src/schema-converters/internalToExpanded.ts @@ -2,7 +2,7 @@ import { ArraySchemaType, DocumentSchemaType, Schema as InternalSchema, SchemaTy import { type ExpandedJSONSchema } from '../types'; import { InternalTypeToStandardTypeMap, RELAXED_EJSON_DEFINITIONS } from './internalToStandard'; import { InternalTypeToBsonTypeMap } from './internalToMongoDB'; -import { allowAbort } from './util'; +import { allowAbort } from '../util'; const createConvertInternalToExpanded = function() { const usedDefinitions = new Set(); diff --git a/src/schema-converters/internalToMongoDB.ts b/src/schema-converters/internalToMongoDB.ts index e85f9ca..c40447f 100644 --- a/src/schema-converters/internalToMongoDB.ts +++ b/src/schema-converters/internalToMongoDB.ts @@ -3,7 +3,7 @@ */ import { ArraySchemaType, DocumentSchemaType, Schema as InternalSchema, SchemaType } from '../schema-analyzer'; import { MongoDBJSONSchema } from '../types'; -import { allowAbort } from './util'; +import { allowAbort } from '../util'; export const InternalTypeToBsonTypeMap: Record< SchemaType['name'] | 'Double' | 'BSONSymbol', diff --git a/src/schema-converters/internalToStandard.ts b/src/schema-converters/internalToStandard.ts index a1ec463..923604c 100644 --- a/src/schema-converters/internalToStandard.ts +++ b/src/schema-converters/internalToStandard.ts @@ -1,7 +1,7 @@ import { JSONSchema4TypeName } from 'json-schema'; import { ArraySchemaType, DocumentSchemaType, Schema as InternalSchema, SchemaType } from '../schema-analyzer'; import { StandardJSONSchema } from '../types'; -import { allowAbort } from './util'; +import { allowAbort } from '../util'; type StandardTypeDefinition = { type: JSONSchema4TypeName, $ref?: never; } | { $ref: string, type?: never }; diff --git a/src/schema-converters/util.ts b/src/util.ts similarity index 83% rename from src/schema-converters/util.ts rename to src/util.ts index 941ceee..f1521de 100644 --- a/src/schema-converters/util.ts +++ b/src/util.ts @@ -1,3 +1,5 @@ +export const ALLOW_ABORT_INTERVAL_COUNT = 1000; + export async function allowAbort(signal?: AbortSignal) { return new Promise((resolve, reject) => setTimeout(() => { From 3ba2471b8bb37bcdb8384bf5c0d24151cd7698d0 Mon Sep 17 00:00:00 2001 From: Rhys Howell Date: Wed, 26 Feb 2025 02:29:20 -0500 Subject: [PATCH 2/3] fixup: update allow abort methods so there's no overlap with the class member and global --- src/schema-analyzer.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/schema-analyzer.ts b/src/schema-analyzer.ts index 5f68d56..c44a648 100644 --- a/src/schema-analyzer.ts +++ b/src/schema-analyzer.ts @@ -517,7 +517,7 @@ export class SchemaAnalyzer { } } - allowAbort() { + allowAbortDuringAnalysis() { // Allow aborting the analysis. if (this.fieldAndTypeAnalysisCounter++ % ALLOW_ABORT_INTERVAL_COUNT === 0) { allowAbort(); @@ -551,7 +551,7 @@ export class SchemaAnalyzer { * Note: This mutates the `schema` argument. */ const addToType = async(path: string[], value: BSONValue, schema: SchemaAnalysisFieldTypes) => { - await this.allowAbort(); + await this.allowAbortDuringAnalysis(); const bsonType = getBSONType(value); // If semantic type detection is enabled, the type is the semantic type // or the original bson type if no semantic type was detected. If disabled, From 649a6fc9ab1f3e46a675536a14f2bec5d4f72249 Mon Sep 17 00:00:00 2001 From: Rhys Howell Date: Wed, 26 Feb 2025 02:43:42 -0500 Subject: [PATCH 3/3] fixup: missed a function renaming --- src/schema-analyzer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/schema-analyzer.ts b/src/schema-analyzer.ts index c44a648..7d0d5d7 100644 --- a/src/schema-analyzer.ts +++ b/src/schema-analyzer.ts @@ -601,7 +601,7 @@ export class SchemaAnalyzer { * Note: This mutates the `schema` argument. */ const addToField = async(fieldName: string, path: string[], value: BSONValue, schema: SchemaAnalysisFieldsMap) => { - await this.allowAbort(); + await this.allowAbortDuringAnalysis(); if (!schema[fieldName]) { schema[fieldName] = { name: fieldName,