Skip to content

Commit 1ed748a

Browse files
committed
evict unused parquet file after exporting cache data
1 parent c6ba0de commit 1ed748a

File tree

2 files changed

+131
-0
lines changed

2 files changed

+131
-0
lines changed

packages/core/src/lib/cache-layer/cacheLayerLoader.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,19 @@ export class CacheLayerLoader implements ICacheLayerLoader {
6262
if (!fs.existsSync(directory!)) {
6363
fs.mkdirSync(directory!, { recursive: true });
6464
}
65+
// remove the files in other subfolder before export, cause we will not reuse cache files
66+
const folderPath = path.resolve(
67+
this.options.folderPath!,
68+
templateName,
69+
profile,
70+
cacheTableName
71+
);
72+
const folders = fs
73+
.readdirSync(folderPath)
74+
.filter((file) =>
75+
fs.statSync(path.resolve(folderPath, file)).isDirectory()
76+
);
77+
this.removeParquetFiles(folders, folderPath);
6578
// 1. export to cache files according to each schema set the cache value
6679
this.logger.debug(`Start to export to ${type} file in "${directory}"`);
6780
await dataSource.export({
@@ -97,4 +110,14 @@ export class CacheLayerLoader implements ICacheLayerLoader {
97110
const parquetFiles = files.filter((file) => /\.parquet$/.test(file));
98111
return parquetFiles;
99112
}
113+
114+
private removeParquetFiles(folders: string[], folderPath: string) {
115+
folders.forEach((folder) => {
116+
const directory = path.resolve(folderPath, folder);
117+
const parquetFiles = this.getParquetFiles(directory);
118+
parquetFiles.forEach((file) => {
119+
fs.unlinkSync(path.resolve(directory, file));
120+
});
121+
});
122+
}
100123
}

packages/core/test/cache-layer/cacheLayerLoader.spec.ts

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,114 @@ describe('Test cache layer loader', () => {
169169
// Set 50s timeout to test cache loader export and load data
170170
50 * 10000
171171
);
172+
it(
173+
'Should remove the other parquet files after executing export',
174+
async () => {
175+
// Arrange
176+
const templateName = 'template-1';
177+
const cache = {
178+
cacheTableName: 'employees',
179+
sql: sinon.default.stub() as any,
180+
profile: profiles[0].name,
181+
folderSubpath: '2023',
182+
} as CacheLayerInfo;
183+
const { profile, cacheTableName, folderSubpath } = cache;
184+
const dir = path.resolve(
185+
folderPath,
186+
templateName,
187+
profile,
188+
cacheTableName,
189+
folderSubpath!
190+
);
191+
const loader = new CacheLayerLoader(options, stubFactory as any);
192+
await loader.load(templateName, cache);
193+
expect(fs.readdirSync(dir).length).toBeGreaterThan(0);
194+
195+
// Act
196+
cache.folderSubpath = '2024';
197+
await loader.load(templateName, cache);
198+
199+
// Assert
200+
const newDir = path.resolve(
201+
folderPath,
202+
templateName,
203+
profile,
204+
cacheTableName,
205+
'2024'!
206+
);
207+
expect(fs.readdirSync(dir).length).toEqual(0);
208+
expect(fs.readdirSync(newDir).length).toBeGreaterThan(0);
209+
},
210+
// Set 50s timeout to test cache loader export and load data
211+
50 * 10000
212+
);
213+
it(
214+
'Should not remove files if parquet files were reused',
215+
async () => {
216+
const templateName = 'template-1';
217+
const cache = {
218+
cacheTableName: 'employees',
219+
sql: sinon.default.stub() as any,
220+
profile: profiles[0].name,
221+
folderSubpath: '2023',
222+
} as CacheLayerInfo;
223+
// Arrange
224+
const { profile, cacheTableName, folderSubpath } = cache;
225+
const dir = path.resolve(
226+
folderPath,
227+
templateName,
228+
profile,
229+
cacheTableName,
230+
folderSubpath!
231+
);
232+
const loader = new CacheLayerLoader(options, stubFactory as any);
233+
await loader.load(templateName, cache);
234+
const parquetFiles = fs.readdirSync(dir);
235+
236+
// Act
237+
await loader.load(templateName, cache);
238+
239+
// Assert
240+
// expect parquetFiles is the same
241+
expect(fs.readdirSync(dir)).toEqual(parquetFiles);
242+
},
243+
// Set 50s timeout to test cache loader export and load data
244+
50 * 10000
245+
);
246+
247+
it(
248+
'Should remove parquet files of its own folder.',
249+
async () => {
250+
// Arrange
251+
const templateName = 'template-1';
252+
const cache = {
253+
cacheTableName: 'employees',
254+
sql: sinon.default.stub() as any,
255+
profile: profiles[0].name,
256+
folderSubpath: '2023',
257+
} as CacheLayerInfo;
258+
const { profile, cacheTableName, folderSubpath } = cache;
259+
const loader = new CacheLayerLoader(options, stubFactory as any);
260+
await loader.load(templateName, cache);
261+
const dirPath = path.resolve(
262+
folderPath,
263+
templateName,
264+
profile,
265+
cacheTableName,
266+
folderSubpath!
267+
);
268+
expect(fs.readdirSync(dirPath).length).toBeGreaterThan(0);
269+
270+
// Act :load another cache table
271+
cache.cacheTableName = 'another_employees';
272+
await loader.load(templateName, cache);
273+
274+
// Assert
275+
expect(fs.readdirSync(dirPath).length).toBeGreaterThan(0);
276+
},
277+
// Set 50s timeout to test cache loader export and load data
278+
50 * 10000
279+
);
172280
});
173281

174282
async function createParquetFile(path: string, fileName: string) {

0 commit comments

Comments
 (0)