@@ -28,14 +28,15 @@ const exportsManagerConfig: ExportsManagerConfig = {
2828
2929function getExportNameAndPath (
3030 sessionId : string ,
31- timestamp : number
31+ timestamp : number = Date . now ( ) ,
32+ objectId : string = new ObjectId ( ) . toString ( )
3233) : {
3334 sessionExportsPath : string ;
3435 exportName : string ;
3536 exportPath : string ;
3637 exportURI : string ;
3738} {
38- const exportName = `foo.bar.${ timestamp } .json` ;
39+ const exportName = `foo.bar.${ timestamp } .${ objectId } . json` ;
3940 const sessionExportsPath = path . join ( exportsPath , sessionId ) ;
4041 const exportPath = path . join ( sessionExportsPath , exportName ) ;
4142 return {
@@ -48,22 +49,21 @@ function getExportNameAndPath(
4849
4950function createDummyFindCursor (
5051 dataArray : unknown [ ] ,
51- chunkPushTimeoutMs ?: number
52+ beforeEachChunk ?: ( chunkIndex : number ) => void | Promise < void >
5253) : { cursor : FindCursor ; cursorCloseNotification : Promise < void > } {
5354 let index = 0 ;
5455 const readable = new Readable ( {
5556 objectMode : true ,
5657 async read ( ) : Promise < void > {
57- if ( index < dataArray . length ) {
58- if ( chunkPushTimeoutMs ) {
59- await timeout ( chunkPushTimeoutMs ) ;
58+ try {
59+ await beforeEachChunk ?.( index ) ;
60+ if ( index < dataArray . length ) {
61+ this . push ( dataArray [ index ++ ] ) ;
62+ } else {
63+ this . push ( null ) ;
6064 }
61- this . push ( dataArray [ index ++ ] ) ;
62- } else {
63- if ( chunkPushTimeoutMs ) {
64- await timeout ( chunkPushTimeoutMs ) ;
65- }
66- this . push ( null ) ;
65+ } catch ( error ) {
66+ this . destroy ( error as Error ) ;
6767 }
6868 } ,
6969 } ) ;
@@ -90,6 +90,13 @@ function createDummyFindCursor(
9090 } ;
9191}
9292
93+ function createDummyFindCursorWithDelay (
94+ dataArray : unknown [ ] ,
95+ delayMs : number
96+ ) : { cursor : FindCursor ; cursorCloseNotification : Promise < void > } {
97+ return createDummyFindCursor ( dataArray , ( ) => timeout ( delayMs ) ) ;
98+ }
99+
93100async function fileExists ( filePath : string ) : Promise < boolean > {
94101 try {
95102 await fs . access ( filePath ) ;
@@ -125,15 +132,15 @@ describe("ExportsManager unit test", () => {
125132 describe ( "#availableExport" , ( ) => {
126133 it ( "should list only the exports that are in ready state" , async ( ) => {
127134 // This export will finish in at-least 1 second
128- const { exportName : exportName1 } = getExportNameAndPath ( session . sessionId , Date . now ( ) ) ;
135+ const { exportName : exportName1 } = getExportNameAndPath ( session . sessionId ) ;
129136 manager . createJSONExport ( {
130- input : createDummyFindCursor ( [ { name : "Test1" } ] , 1000 ) . cursor ,
137+ input : createDummyFindCursorWithDelay ( [ { name : "Test1" } ] , 1000 ) . cursor ,
131138 exportName : exportName1 ,
132139 jsonExportFormat : "relaxed" ,
133140 } ) ;
134141
135142 // This export will finish way sooner than the first one
136- const { exportName : exportName2 } = getExportNameAndPath ( session . sessionId , Date . now ( ) ) ;
143+ const { exportName : exportName2 } = getExportNameAndPath ( session . sessionId ) ;
137144 const { cursor, cursorCloseNotification } = createDummyFindCursor ( [ { name : "Test1" } ] ) ;
138145 manager . createJSONExport ( {
139146 input : cursor ,
@@ -154,8 +161,8 @@ describe("ExportsManager unit test", () => {
154161 } ) ;
155162
156163 it ( "should throw if the resource is still being generated" , async ( ) => {
157- const { exportName } = getExportNameAndPath ( session . sessionId , Date . now ( ) ) ;
158- const { cursor } = createDummyFindCursor ( [ { name : "Test1" } ] , 100 ) ;
164+ const { exportName } = getExportNameAndPath ( session . sessionId ) ;
165+ const { cursor } = createDummyFindCursorWithDelay ( [ { name : "Test1" } ] , 100 ) ;
159166 manager . createJSONExport ( {
160167 input : cursor ,
161168 exportName,
@@ -168,7 +175,7 @@ describe("ExportsManager unit test", () => {
168175 } ) ;
169176
170177 it ( "should return the resource content if the resource is ready to be consumed" , async ( ) => {
171- const { exportName } = getExportNameAndPath ( session . sessionId , Date . now ( ) ) ;
178+ const { exportName } = getExportNameAndPath ( session . sessionId ) ;
172179 const { cursor, cursorCloseNotification } = createDummyFindCursor ( [ ] ) ;
173180 manager . createJSONExport ( {
174181 input : cursor ,
@@ -198,7 +205,7 @@ describe("ExportsManager unit test", () => {
198205 longNumber : Long . fromNumber ( 123456 ) ,
199206 } ,
200207 ] ) ) ;
201- ( { exportName, exportPath, exportURI } = getExportNameAndPath ( session . sessionId , Date . now ( ) ) ) ;
208+ ( { exportName, exportPath, exportURI } = getExportNameAndPath ( session . sessionId ) ) ;
202209 } ) ;
203210
204211 describe ( "when cursor is empty" , ( ) => {
@@ -304,31 +311,37 @@ describe("ExportsManager unit test", () => {
304311 } ) ;
305312 } ) ;
306313
307- describe ( "when there is an error in export generation " , ( ) => {
314+ describe ( "when there is an error during stream transform " , ( ) => {
308315 it ( "should remove the partial export and never make it available" , async ( ) => {
309316 const emitSpy = vi . spyOn ( manager , "emit" ) ;
310317 // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any
311318 ( manager as any ) . docToEJSONStream = function ( ejsonOptions : EJSONOptions | undefined ) : Transform {
312319 let docsTransformed = 0 ;
313320 return new Transform ( {
314321 objectMode : true ,
315- transform : function ( chunk : unknown , encoding , callback ) : void {
316- ++ docsTransformed ;
322+ transform ( chunk : unknown , encoding , callback ) : void {
317323 try {
318- if ( docsTransformed === 1 ) {
324+ const doc = EJSON . stringify ( chunk , undefined , undefined , ejsonOptions ) ;
325+ if ( docsTransformed === 0 ) {
326+ this . push ( "[" + doc ) ;
327+ } else if ( docsTransformed === 1 ) {
319328 throw new Error ( "Could not transform the chunk!" ) ;
329+ } else {
330+ this . push ( ",\n" + doc ) ;
320331 }
321- const doc : string = EJSON . stringify ( chunk , undefined , 2 , ejsonOptions ) ;
322- const line = `${ docsTransformed > 1 ? ",\n" : "" } ${ doc } ` ;
323-
324- callback ( null , line ) ;
325- } catch ( err : unknown ) {
332+ docsTransformed ++ ;
333+ callback ( ) ;
334+ } catch ( err ) {
326335 callback ( err as Error ) ;
327336 }
328337 } ,
329- final : function ( callback ) : void {
330- this . push ( "]" ) ;
331- callback ( null ) ;
338+ flush ( this : Transform , cb ) : void {
339+ if ( docsTransformed === 0 ) {
340+ this . push ( "[]" ) ;
341+ } else {
342+ this . push ( "]" ) ;
343+ }
344+ cb ( ) ;
332345 } ,
333346 } ) ;
334347 } ;
@@ -348,6 +361,33 @@ describe("ExportsManager unit test", () => {
348361 expect ( await fileExists ( exportPath ) ) . toEqual ( false ) ;
349362 } ) ;
350363 } ) ;
364+
365+ describe ( "when there is an error on read stream" , ( ) => {
366+ it ( "should remove the partial export and never make it available" , async ( ) => {
367+ const emitSpy = vi . spyOn ( manager , "emit" ) ;
368+ // A cursor that will make the read stream fail after the first chunk
369+ const { cursor, cursorCloseNotification } = createDummyFindCursor ( [ { name : "Test1" } ] , ( chunkIndex ) => {
370+ if ( chunkIndex > 0 ) {
371+ return Promise . reject ( new Error ( "Connection timedout!" ) ) ;
372+ }
373+ return Promise . resolve ( ) ;
374+ } ) ;
375+ manager . createJSONExport ( {
376+ input : cursor ,
377+ exportName,
378+ jsonExportFormat : "relaxed" ,
379+ } ) ;
380+ await cursorCloseNotification ;
381+
382+ // Because the export was never populated in the available exports.
383+ await expect ( ( ) => manager . readExport ( exportName ) ) . rejects . toThrow (
384+ "Requested export has either expired or does not exist!"
385+ ) ;
386+ expect ( emitSpy ) . not . toHaveBeenCalled ( ) ;
387+ expect ( manager . availableExports ) . toEqual ( [ ] ) ;
388+ expect ( await fileExists ( exportPath ) ) . toEqual ( false ) ;
389+ } ) ;
390+ } ) ;
351391 } ) ;
352392
353393 describe ( "#cleanupExpiredExports" , ( ) => {
@@ -368,7 +408,7 @@ describe("ExportsManager unit test", () => {
368408 } ) ;
369409
370410 it ( "should not clean up in-progress exports" , async ( ) => {
371- const { exportName } = getExportNameAndPath ( session . sessionId , Date . now ( ) ) ;
411+ const { exportName } = getExportNameAndPath ( session . sessionId ) ;
372412 const manager = ExportsManager . init (
373413 session . sessionId ,
374414 {
@@ -378,7 +418,7 @@ describe("ExportsManager unit test", () => {
378418 } ,
379419 new CompositeLogger ( )
380420 ) ;
381- const { cursor } = createDummyFindCursor ( [ { name : "Test" } ] , 2000 ) ;
421+ const { cursor } = createDummyFindCursorWithDelay ( [ { name : "Test" } ] , 2000 ) ;
382422 manager . createJSONExport ( {
383423 input : cursor ,
384424 exportName,
@@ -395,7 +435,7 @@ describe("ExportsManager unit test", () => {
395435 } ) ;
396436
397437 it ( "should cleanup expired exports" , async ( ) => {
398- const { exportName, exportPath, exportURI } = getExportNameAndPath ( session . sessionId , Date . now ( ) ) ;
438+ const { exportName, exportPath, exportURI } = getExportNameAndPath ( session . sessionId ) ;
399439 const manager = ExportsManager . init (
400440 session . sessionId ,
401441 {
0 commit comments