|
1 | | -import { expect } from 'chai'; |
2 | 1 | import * as path from 'path'; |
3 | 2 |
|
4 | | -import { Document, MongoClient } from '../../../src'; |
5 | | -import { LEGACY_HELLO_COMMAND } from '../../../src/constants'; |
6 | 3 | import { loadSpecTests } from '../../spec'; |
7 | 4 | import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; |
8 | | -import { delay, setupDatabase } from '../shared'; |
9 | 5 |
|
10 | | -// TODO(NODE-4126): Fix change stream resumabilty in iterator mode |
11 | | -const skippedResumabilityTests = [ |
12 | | - 'change stream resumes after HostUnreachable', |
13 | | - 'change stream resumes after HostNotFound', |
14 | | - 'change stream resumes after NetworkTimeout', |
15 | | - 'change stream resumes after ShutdownInProgress', |
16 | | - 'change stream resumes after PrimarySteppedDown', |
17 | | - 'change stream resumes after ExceededTimeLimit', |
18 | | - 'change stream resumes after SocketException', |
19 | | - 'change stream resumes after NotWritablePrimary', |
20 | | - 'change stream resumes after InterruptedAtShutdown', |
21 | | - 'change stream resumes after InterruptedDueToReplStateChange', |
22 | | - 'change stream resumes after NotPrimaryNoSecondaryOk', |
23 | | - 'change stream resumes after NotPrimaryOrSecondary', |
24 | | - 'change stream resumes after StaleShardVersion', |
25 | | - 'change stream resumes after StaleEpoch', |
26 | | - 'change stream resumes after RetryChangeStream', |
27 | | - 'change stream resumes after FailedToSatisfyReadPreference', |
28 | | - 'change stream resumes if error contains ResumableChangeStreamError', |
29 | | - 'change stream resumes after a network error', |
30 | | - 'change stream resumes after CursorNotFound', |
31 | | - 'Test consecutive resume' |
32 | | -]; |
33 | 6 | describe('Change Streams Spec - Unified', function () { |
34 | | - runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified')), skippedResumabilityTests); |
35 | | -}); |
36 | | - |
37 | | -// TODO(NODE-3819): Unskip flaky MacOS tests. |
38 | | -const maybeDescribe = process.platform === 'darwin' ? describe.skip : describe; |
39 | | -maybeDescribe('Change Stream Spec - v1', function () { |
40 | | - let globalClient; |
41 | | - let ctx; |
42 | | - let events; |
43 | | - |
44 | | - const TESTS_TO_SKIP = new Set([]); |
45 | | - |
46 | | - before(function () { |
47 | | - const configuration = this.configuration; |
48 | | - return setupDatabase(configuration).then(() => { |
49 | | - globalClient = configuration.newClient(); |
50 | | - return globalClient.connect(); |
51 | | - }); |
52 | | - }); |
53 | | - |
54 | | - after(function () { |
55 | | - const gc = globalClient; |
56 | | - globalClient = undefined; |
57 | | - return new Promise<void>(r => gc.close(() => r())); |
58 | | - }); |
59 | | - |
60 | | - loadSpecTests(path.join('change-streams', 'legacy')).forEach(suite => { |
61 | | - const ALL_DBS = [suite.database_name, suite.database2_name]; |
62 | | - |
63 | | - describe(suite.name, () => { |
64 | | - beforeEach(function () { |
65 | | - const gc = globalClient; |
66 | | - const sDB = suite.database_name; |
67 | | - const sColl = suite.collection_name; |
68 | | - const configuration = this.configuration; |
69 | | - return Promise.all( |
70 | | - ALL_DBS.map(db => gc.db(db).dropDatabase({ writeConcern: { w: 'majority' } })) |
71 | | - ) |
72 | | - .then(() => gc.db(sDB).createCollection(sColl)) |
73 | | - .then(() => { |
74 | | - if (suite.database2_name && suite.collection2_name) { |
75 | | - return gc.db(suite.database2_name).createCollection(suite.collection2_name); |
76 | | - } |
77 | | - }) |
78 | | - .then(() => |
79 | | - configuration |
80 | | - .newClient({}, { monitorCommands: true, heartbeatFrequencyMS: 100 }) |
81 | | - .connect() |
82 | | - ) |
83 | | - .then(client => { |
84 | | - ctx = { gc, client }; |
85 | | - events = []; |
86 | | - const _events = events; |
87 | | - |
88 | | - ctx.database = ctx.client.db(sDB); |
89 | | - ctx.collection = ctx.database.collection(sColl); |
90 | | - ctx.client.on('commandStarted', e => { |
91 | | - if (e.commandName !== LEGACY_HELLO_COMMAND) _events.push(e); |
92 | | - }); |
93 | | - }); |
94 | | - }); |
95 | | - |
96 | | - afterEach(function () { |
97 | | - const client = ctx.client; |
98 | | - ctx = undefined; |
99 | | - events = undefined; |
100 | | - |
101 | | - client.removeAllListeners('commandStarted'); |
102 | | - |
103 | | - return client && client.close(true); |
104 | | - }); |
105 | | - |
106 | | - suite.tests.forEach(test => { |
107 | | - const shouldSkip = test.skip || TESTS_TO_SKIP.has(test.description); |
108 | | - // There's no evidence of test.only being defined in the spec files |
109 | | - // But let's avoid removing it now to just be sure we aren't changing anything |
110 | | - // These tests will eventually be replaced by unified format versions. |
111 | | - const itFn = shouldSkip ? it.skip : test.only ? Reflect.get(it, 'only') : it; |
112 | | - const metadata = generateMetadata(test); |
113 | | - const testFn = generateTestFn(test); |
114 | | - |
115 | | - itFn(test.description, { metadata, test: testFn }); |
116 | | - }); |
117 | | - }); |
118 | | - }); |
119 | | - |
120 | | - // Fn Generator methods |
121 | | - |
122 | | - function generateMetadata(test) { |
123 | | - const topology = test.topology; |
124 | | - const requires: MongoDBMetadataUI['requires'] = {}; |
125 | | - const versionLimits = []; |
126 | | - if (test.minServerVersion) { |
127 | | - versionLimits.push(`>=${test.minServerVersion}`); |
128 | | - } |
129 | | - if (test.maxServerVersion) { |
130 | | - versionLimits.push(`<=${test.maxServerVersion}`); |
131 | | - } |
132 | | - if (versionLimits.length) { |
133 | | - requires.mongodb = versionLimits.join(' '); |
134 | | - } |
135 | | - |
136 | | - if (topology) { |
137 | | - requires.topology = topology; |
138 | | - } |
139 | | - |
140 | | - return { requires }; |
141 | | - } |
142 | | - |
143 | | - function generateTestFn(test) { |
144 | | - const configureFailPoint = makeFailPointCommand(test); |
145 | | - const testFnRunOperations = makeTestFnRunOperations(test); |
146 | | - const testSuccess = makeTestSuccess(test); |
147 | | - const testFailure = makeTestFailure(test); |
148 | | - const testAPM = makeTestAPM(test); |
149 | | - |
150 | | - return function testFn() { |
151 | | - return configureFailPoint(ctx) |
152 | | - .then(() => testFnRunOperations(ctx)) |
153 | | - .then(testSuccess, testFailure) |
154 | | - .then(() => testAPM(ctx, events)); |
155 | | - }; |
156 | | - } |
157 | | - |
158 | | - function makeFailPointCommand(test) { |
159 | | - if (!test.failPoint) { |
160 | | - return () => Promise.resolve(); |
161 | | - } |
162 | | - |
163 | | - return function (ctx) { |
164 | | - return ctx.gc.db('admin').command(test.failPoint); |
165 | | - }; |
166 | | - } |
167 | | - |
168 | | - function makeTestSuccess(test) { |
169 | | - const result = test.result; |
170 | | - |
171 | | - return function testSuccess(value) { |
172 | | - if (result.error) { |
173 | | - throw new Error(`Expected test to return error ${result.error}`); |
174 | | - } |
175 | | - |
176 | | - if (result.success) { |
177 | | - expect(value).to.have.a.lengthOf(result.success.length); |
178 | | - expect(value).to.matchMongoSpec(result.success); |
179 | | - } |
180 | | - }; |
181 | | - } |
182 | | - |
183 | | - function makeTestFailure(test) { |
184 | | - const result = test.result; |
185 | | - |
186 | | - return function testFailure(err) { |
187 | | - if (!result.error) { |
188 | | - throw err; |
189 | | - } |
190 | | - |
191 | | - expect(err).to.matchMongoSpec(result.error); |
192 | | - }; |
193 | | - } |
194 | | - |
195 | | - function makeTestAPM(test) { |
196 | | - const expectedEvents = test.expectations || []; |
197 | | - |
198 | | - return function testAPM(ctx, events) { |
199 | | - expectedEvents |
200 | | - .map(e => e.command_started_event) |
201 | | - .map(normalizeAPMEvent) |
202 | | - .forEach((expected, idx) => { |
203 | | - if (!events[idx]) { |
204 | | - throw new Error( |
205 | | - `Expected there to be an APM event at index ${idx}, but there was none` |
206 | | - ); |
207 | | - } |
208 | | - // killCursors events should be skipped |
209 | | - // (see https://github.com/mongodb/specifications/blob/master/source/change-streams/tests/README.rst#spec-test-runner) |
210 | | - if (events[idx].commandName === 'killCursors') { |
211 | | - return; |
212 | | - } |
213 | | - expect(events[idx]).to.matchMongoSpec(expected); |
214 | | - }); |
215 | | - }; |
216 | | - } |
217 | | - |
218 | | - function allSettled(promises) { |
219 | | - let err; |
220 | | - return Promise.all(promises.map(p => p.catch(x => (err = err || x)))).then(args => { |
221 | | - if (err) { |
222 | | - throw err; |
223 | | - } |
224 | | - return args; |
225 | | - }); |
226 | | - } |
227 | | - |
228 | | - function makeTestFnRunOperations(test) { |
229 | | - const target = test.target; |
230 | | - const operations = test.operations; |
231 | | - const success = test.result.success || []; |
232 | | - |
233 | | - return function testFnRunOperations(ctx) { |
234 | | - const changeStreamPipeline = test.changeStreamPipeline; |
235 | | - const changeStreamOptions = test.changeStreamOptions; |
236 | | - ctx.changeStream = ctx[target].watch(changeStreamPipeline, changeStreamOptions); |
237 | | - |
238 | | - const changeStreamPromise = readAndCloseChangeStream(ctx.changeStream, success.length); |
239 | | - const operationsPromise = runOperations(ctx.gc, operations); |
240 | | - |
241 | | - return allSettled([changeStreamPromise, operationsPromise]).then(args => args[0]); |
242 | | - }; |
243 | | - } |
244 | | - |
245 | | - function readAndCloseChangeStream(changeStream, numChanges) { |
246 | | - const close = makeChangeStreamCloseFn(changeStream); |
247 | | - let changeStreamPromise = changeStream.next().then(r => [r]); |
248 | | - |
249 | | - for (let i = 1; i < numChanges; i += 1) { |
250 | | - changeStreamPromise = changeStreamPromise.then(results => { |
251 | | - return changeStream.next().then(result => { |
252 | | - results.push(result); |
253 | | - return results; |
254 | | - }); |
255 | | - }); |
256 | | - } |
257 | | - |
258 | | - return changeStreamPromise.then( |
259 | | - result => close(null, result), |
260 | | - err => close(err) |
261 | | - ); |
262 | | - } |
263 | | - |
264 | | - function runOperations(client, operations) { |
265 | | - return operations |
266 | | - .map(op => makeOperation(client, op)) |
267 | | - .reduce((p, op) => p.then(op), delay(200)); |
268 | | - } |
269 | | - |
270 | | - function makeChangeStreamCloseFn(changeStream): (error?: any, value?: any) => Promise<unknown> { |
271 | | - return function close(error, value) { |
272 | | - return new Promise((resolve, reject) => { |
273 | | - changeStream.close(err => { |
274 | | - if (error || err) { |
275 | | - return reject(error || err); |
276 | | - } |
277 | | - return resolve(value); |
278 | | - }); |
279 | | - }); |
280 | | - }; |
281 | | - } |
282 | | - |
283 | | - function normalizeAPMEvent(raw) { |
284 | | - const rawKeys = Object.keys(raw); |
285 | | - rawKeys.sort(); |
286 | | - expect(rawKeys, 'test runner only supports these keys, is there a new one?').to.deep.equal([ |
287 | | - 'command', |
288 | | - 'command_name', |
289 | | - 'database_name' |
290 | | - ]); |
291 | | - return { |
292 | | - command: raw.command, |
293 | | - commandName: raw.command_name, |
294 | | - databaseName: raw.database_name |
295 | | - }; |
296 | | - } |
297 | | - |
298 | | - function makeOperation(client: MongoClient, op: Document) { |
299 | | - const collection = client.db(op.database).collection(op.collection); |
300 | | - switch (op.name) { |
301 | | - case 'insertOne': |
302 | | - expect(op.arguments).to.have.property('document').that.is.an('object'); |
303 | | - return () => collection.insertOne(op.arguments.document); |
304 | | - case 'updateOne': |
305 | | - expect(op.arguments).to.have.property('filter').that.is.an('object'); |
306 | | - expect(op.arguments).to.have.property('update').that.is.an('object'); |
307 | | - return () => collection.updateOne(op.arguments.filter, op.arguments.update); |
308 | | - case 'replaceOne': |
309 | | - expect(op.arguments).to.have.property('filter').that.is.an('object'); |
310 | | - expect(op.arguments).to.have.property('replacement').that.is.an('object'); |
311 | | - return () => collection.replaceOne(op.arguments.filter, op.arguments.replacement); |
312 | | - case 'deleteOne': |
313 | | - expect(op.arguments).to.have.property('filter').that.is.an('object'); |
314 | | - return () => collection.deleteOne(op.arguments.filter); |
315 | | - case 'rename': |
316 | | - expect(op.arguments).to.have.property('to').that.is.a('string'); |
317 | | - return () => collection.rename(op.arguments.to); |
318 | | - case 'drop': |
319 | | - return () => collection.drop(); |
320 | | - default: |
321 | | - throw new Error(`runner does not support ${op.name}`); |
322 | | - } |
323 | | - } |
| 7 | + runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified'))); |
324 | 8 | }); |
0 commit comments