Skip to content

Commit fc74f55

Browse files
Merge pull request #283 from Canner/feature/caching-layer-persistent
Fix: caching layer persistent
2 parents de4fb81 + 032c133 commit fc74f55

File tree

6 files changed

+135
-18
lines changed

6 files changed

+135
-18
lines changed

packages/core/src/containers/modules/dataSource.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ import {
88
import { Profile } from '../../models/profile';
99
import { ClassType } from '../../lib/utils/module';
1010
import { ConfigurationError } from '@vulcan-sql/core/utils';
11-
import { ICacheLayerOptions, cacheProfileName } from '@vulcan-sql/core/models';
11+
import {
12+
ICacheLayerOptions,
13+
cacheLayerPersistentFileName,
14+
cacheProfileName,
15+
} from '@vulcan-sql/core/models';
1216
import 'reflect-metadata';
1317

1418
export const dataSourceModule = (
@@ -26,6 +30,7 @@ export const dataSourceModule = (
2630
type: options.loader!.toLocaleLowerCase(),
2731
// allow '*' to make every user request could use the cache-layer data source.
2832
allow: '*',
33+
connection: { ['persistent-path']: cacheLayerPersistentFileName },
2934
} as Profile);
3035
}
3136

packages/core/src/lib/cache-layer/cacheLayerLoader.ts

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,31 +43,40 @@ export class CacheLayerLoader implements ICacheLayerLoader {
4343
templateName: string,
4444
cache: CacheLayerInfo
4545
): Promise<void> {
46-
const { cacheTableName, sql, profile, indexes } = cache;
46+
const { cacheTableName, sql, profile, indexes, folderSubpath } = cache;
4747
const type = this.options.type!;
4848
const dataSource = this.dataSourceFactory(profile);
4949

5050
// generate directory for cache file path to export
5151
// format => [folderPath]/[schema.templateSource]/[profileName]/[cacheTableName]]/[timestamp]
52+
const subpath = folderSubpath || moment.utc().format('YYYYMMDDHHmmss');
5253
const directory = path.resolve(
5354
this.options.folderPath!,
5455
templateName,
5556
profile,
5657
cacheTableName,
57-
moment.utc().format('YYYYMMDDHHmmss')
58+
subpath
5859
);
59-
60-
if (!fs.existsSync(directory!))
61-
fs.mkdirSync(directory!, { recursive: true });
62-
63-
// 1. export to cache files according to each schema set the cache value
64-
this.logger.debug(`Start to export to ${type} file in "${directory}"`);
65-
await dataSource.export({
66-
sql,
67-
directory,
68-
profileName: profile,
69-
type,
70-
});
60+
const parquetFiles = this.getParquetFiles(directory);
61+
if (!parquetFiles.length) {
62+
if (!fs.existsSync(directory!)) {
63+
fs.mkdirSync(directory!, { recursive: true });
64+
}
65+
// 1. export to cache files according to each schema set the cache value
66+
this.logger.debug(`Start to export to ${type} file in "${directory}"`);
67+
await dataSource.export({
68+
sql,
69+
directory,
70+
profileName: profile,
71+
type,
72+
});
73+
} else {
74+
this.logger.debug(
75+
`Parquet file \n ${parquetFiles.join(
76+
'\n '
77+
)} found in ${directory}, skip export`
78+
);
79+
}
7180
this.logger.debug(`Start to load ${cacheTableName} in "${directory}"`);
7281
// 2. load the files to cache data source
7382
await this.cacheStorage.import({
@@ -81,4 +90,11 @@ export class CacheLayerLoader implements ICacheLayerLoader {
8190
indexes,
8291
});
8392
}
93+
94+
private getParquetFiles(directory: string): string[] {
95+
if (!directory || !fs.existsSync(directory)) return [];
96+
const files = fs.readdirSync(directory);
97+
const parquetFiles = files.filter((file) => /\.parquet$/.test(file));
98+
return parquetFiles;
99+
}
84100
}

packages/core/src/models/artifact.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ export class CacheLayerInfo {
114114
refreshExpression?: RefreshExpression;
115115
// index key name -> index column
116116
indexes?: Record<string, string>;
117+
// cache folder subpath
118+
folderSubpath?: string;
117119
}
118120

119121
export class APISchema {

packages/core/src/models/cacheLayerOptions.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ export interface ICacheLayerOptions {
1919
// The cache layer profile name is used to load the cache data to table name from cache files
2020
export const cacheProfileName = 'vulcan.cache';
2121

22+
// The cache layer persistent file name, if the file name is set to ":memory:", it will use in-memory database
23+
export const cacheLayerPersistentFileName =
24+
process.env['VULCAN_CACHE_LAYER_PERSISTENT_FILE_NAME'] ||
25+
'vulcan_caching_layer.db';
26+
2227
// The schema name for vulcan used to create table when loading cache files to cache data source
2328
export const vulcanCacheSchemaName = 'vulcan';
2429

packages/core/test/cache-layer/cacheLayerLoader.spec.ts

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import * as fs from 'fs';
2+
import * as duckdb from 'duckdb';
23
import * as sinon from 'ts-sinon';
4+
import * as path from 'path';
35
import {
46
CacheLayerInfo,
57
CacheLayerLoader,
@@ -9,7 +11,7 @@ import {
911
vulcanCacheSchemaName,
1012
} from '@vulcan-sql/core';
1113
import { MockDataSource, getQueryResults } from './mockDataSource';
12-
14+
const db = new duckdb.Database(':memory:');
1315
describe('Test cache layer loader', () => {
1416
const folderPath = 'loader-test-exported-parquets';
1517
const profiles = [
@@ -108,4 +110,80 @@ describe('Test cache layer loader', () => {
108110
// Set 50s timeout to test cache loader export and load data
109111
50 * 10000
110112
);
113+
it.each([
114+
{
115+
templateName: 'template-1',
116+
cache: {
117+
cacheTableName: 'employees',
118+
sql: sinon.default.stub() as any,
119+
profile: profiles[0].name,
120+
folderSubpath: '2023',
121+
} as CacheLayerInfo,
122+
},
123+
{
124+
templateName: 'template-1',
125+
cache: {
126+
cacheTableName: 'departments',
127+
sql: sinon.default.stub() as any,
128+
profile: profiles[1].name,
129+
folderSubpath: '2023',
130+
} as CacheLayerInfo,
131+
},
132+
])(
133+
'Should use existed parquet to load cache table: $cache.cacheTableName',
134+
async ({ templateName, cache }) => {
135+
// Arrange
136+
const { profile, cacheTableName, folderSubpath } = cache;
137+
const dir = path.resolve(
138+
folderPath,
139+
templateName,
140+
profile,
141+
cacheTableName,
142+
folderSubpath!
143+
);
144+
await createParquetFile(dir, cacheTableName);
145+
// Act
146+
const loader = new CacheLayerLoader(options, stubFactory as any);
147+
await loader.load(templateName, cache);
148+
149+
// Assert
150+
const actual = (
151+
await getQueryResults(
152+
"select * from information_schema.tables where table_schema = 'vulcan'"
153+
)
154+
).map((row) => {
155+
return {
156+
table: row['table_name'],
157+
schema: row['table_schema'],
158+
};
159+
});
160+
expect(actual).toEqual(
161+
expect.arrayContaining([
162+
{
163+
table: cache.cacheTableName,
164+
schema: vulcanCacheSchemaName,
165+
},
166+
])
167+
);
168+
},
169+
// Set 50s timeout to test cache loader export and load data
170+
50 * 10000
171+
);
111172
});
173+
174+
async function createParquetFile(path: string, fileName: string) {
175+
if (!fs.existsSync(path)) {
176+
fs.mkdirSync(path, { recursive: true });
177+
}
178+
db.run(`CREATE OR REPLACE TABLE parquet_table (i integer)`);
179+
db.run(`INSERT INTO parquet_table (i) VALUES (1)`);
180+
return new Promise((resolve, reject) => {
181+
db.run(
182+
`COPY (SELECT * FROM parquet_table) TO '${path}/${fileName}.parquet' (FORMAT PARQUET);`,
183+
(err: any) => {
184+
if (err) reject(err);
185+
resolve(true);
186+
}
187+
);
188+
});
189+
}

packages/core/test/containers/cacheLayer.spec.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {
22
CacheLayerLoader,
33
cacheLayerModule,
4+
cacheLayerPersistentFileName,
45
CacheLayerStoreFormatType,
56
cacheProfileName,
67
DataResult,
@@ -89,11 +90,21 @@ it('Cache layer module should add "vulcan.cache" profile and bind "duckdb" type
8990

9091
expect(dsFromTestDuck.injectedProfiles).toEqual([
9192
{ name: 'test-duck', type: 'duckdb', allow: '*' },
92-
{ name: cacheProfileName, type: 'duckdb', allow: '*' },
93+
{
94+
name: cacheProfileName,
95+
type: 'duckdb',
96+
allow: '*',
97+
connection: { ['persistent-path']: cacheLayerPersistentFileName },
98+
},
9399
]);
94100
expect(dsFromCacheLayer.injectedProfiles).toEqual([
95101
{ name: 'test-duck', type: 'duckdb', allow: '*' },
96-
{ name: cacheProfileName, type: 'duckdb', allow: '*' },
102+
{
103+
name: cacheProfileName,
104+
type: 'duckdb',
105+
allow: '*',
106+
connection: { ['persistent-path']: cacheLayerPersistentFileName },
107+
},
97108
]);
98109
});
99110

0 commit comments

Comments
 (0)