Skip to content

Commit a874313

Browse files
committed
send activity log after refreshing cache
1 parent 2a000c2 commit a874313

File tree

3 files changed

+183
-16
lines changed

3 files changed

+183
-16
lines changed

packages/core/src/lib/cache-layer/cacheLayerLoader.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,15 @@ export class CacheLayerLoader implements ICacheLayerLoader {
2323
private options: ICacheLayerOptions;
2424
private cacheStorage: DataSource;
2525
private logger = getLogger({ scopeName: 'CORE' });
26-
private activityLoggers: IActivityLogger;
2726
constructor(
2827
@inject(TYPES.CacheLayerOptions) options: CacheLayerOptions,
2928
@inject(TYPES.Factory_DataSource)
30-
dataSourceFactory: interfaces.SimpleFactory<DataSource>,
31-
@inject(TYPES.Extension_ActivityLogger)
32-
activityLogger: IActivityLogger
29+
dataSourceFactory: interfaces.SimpleFactory<DataSource>
3330
) {
3431
this.dataSourceFactory = dataSourceFactory;
3532
this.options = options;
3633
// prepare cache data source
3734
this.cacheStorage = this.dataSourceFactory(cacheProfileName);
38-
this.activityLoggers = activityLogger;
3935
}
4036

4137
/**
@@ -50,7 +46,6 @@ export class CacheLayerLoader implements ICacheLayerLoader {
5046
const { cacheTableName, sql, profile, indexes, folderSubpath } = cache;
5147
const type = this.options.type!;
5248
const dataSource = this.dataSourceFactory(profile);
53-
await this.activityLoggers.log({ a: 1 });
5449

5550
// generate directory for cache file path to export
5651
// format => [folderPath]/[schema.templateSource]/[profileName]/[cacheTableName]]/[timestamp]

packages/core/src/lib/cache-layer/cacheLayerRefresher.ts

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,16 @@ import { uniq } from 'lodash';
33
import { ToadScheduler, SimpleIntervalJob, AsyncTask } from 'toad-scheduler';
44
import { inject, injectable } from 'inversify';
55
import { TYPES } from '@vulcan-sql/core/types';
6-
import { APISchema } from '@vulcan-sql/core/models';
6+
import { APISchema, IActivityLogger } from '@vulcan-sql/core/models';
77
import { ConfigurationError } from '../utils/errors';
88
import { ICacheLayerLoader } from './cacheLayerLoader';
9+
import { getLogger } from '../utils';
10+
import moment = require('moment');
911

12+
enum RefreshResult {
13+
SUCCESS = 'SUCCESS',
14+
FAILED = 'FAILED',
15+
}
1016
export interface ICacheLayerRefresher {
1117
/**
1218
* Start the job to load the data source to cache storage and created tables from cache settings in schemas
@@ -22,9 +28,15 @@ export interface ICacheLayerRefresher {
2228
export class CacheLayerRefresher implements ICacheLayerRefresher {
2329
private cacheLoader: ICacheLayerLoader;
2430
private scheduler = new ToadScheduler();
31+
private activityLogger: IActivityLogger;
32+
private logger = getLogger({ scopeName: 'CORE' });
2533

26-
constructor(@inject(TYPES.CacheLayerLoader) loader: ICacheLayerLoader) {
34+
constructor(
35+
@inject(TYPES.CacheLayerLoader) loader: ICacheLayerLoader,
36+
@inject(TYPES.Extension_ActivityLogger) activityLogger: IActivityLogger
37+
) {
2738
this.cacheLoader = loader;
39+
this.activityLogger = activityLogger;
2840
}
2941

3042
public async start(
@@ -40,9 +52,10 @@ export class CacheLayerRefresher implements ICacheLayerRefresher {
4052
schemas.map(async (schema) => {
4153
// skip the schema by return if not set the cache
4254
if (!schema.cache) return;
55+
const { urlPath } = schema;
4356
return await Promise.all(
4457
schema.cache.map(async (cache) => {
45-
const { cacheTableName, profile, refreshTime } = cache;
58+
const { cacheTableName, profile, refreshTime, sql } = cache;
4659
// replace the '/' tp '_' to avoid the file path issue for templateSource
4760
const templateName = schema.templateSource.replace('/', '_');
4861
// If refresh time is set, use the scheduler to schedule the load task for refresh
@@ -54,15 +67,56 @@ export class CacheLayerRefresher implements ICacheLayerRefresher {
5467
{ milliseconds, runImmediately },
5568
new AsyncTask(workerId, async () => {
5669
// load data the to cache storage
57-
58-
await this.cacheLoader.load(templateName, cache);
70+
let refreshResult = RefreshResult.SUCCESS;
71+
const now = moment.utc().format('YYYY-MM-DD HH:mm:ss');
72+
try {
73+
// get the current time in format of UTC
74+
await this.cacheLoader.load(templateName, cache);
75+
} catch (error: any) {
76+
refreshResult = RefreshResult.FAILED;
77+
this.logger.debug(`Failed to refresh cache: ${error}`);
78+
} finally {
79+
// send activity log
80+
const content = {
81+
logTime: now,
82+
urlPath,
83+
sql,
84+
refreshResult,
85+
};
86+
await this.activityLogger.log(content).catch((err: any) => {
87+
this.logger.debug(
88+
`Failed to log activity after refreshing cache: ${err}`
89+
);
90+
});
91+
}
5992
}),
6093
{ preventOverrun: true, id: workerId }
6194
);
6295
// add the job to schedule cache refresh task
6396
this.scheduler.addIntervalJob(refreshJob);
6497
} else {
65-
await this.cacheLoader.load(templateName, cache);
98+
let refreshResult = RefreshResult.SUCCESS;
99+
const now = moment.utc().format('YYYY-MM-DD HH:mm:ss');
100+
try {
101+
// get the current time in format of UTC
102+
await this.cacheLoader.load(templateName, cache);
103+
} catch (error: any) {
104+
refreshResult = RefreshResult.FAILED;
105+
this.logger.debug(`Failed to refresh cache: ${error}`);
106+
} finally {
107+
// send activity log
108+
const content = {
109+
logTime: now,
110+
urlPath,
111+
sql,
112+
refreshResult,
113+
};
114+
await this.activityLogger.log(content).catch((err: any) => {
115+
this.logger.debug(
116+
`Failed to log activity after refreshing cache: ${err}`
117+
);
118+
});
119+
}
66120
}
67121
})
68122
);

packages/core/test/cache-layer/cacheLayerRefresher.spec.ts

Lines changed: 122 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,32 @@ import {
1212
vulcanCacheSchemaName,
1313
} from '@vulcan-sql/core';
1414
import { MockDataSource, getQueryResults } from './mockDataSource';
15+
import { HttpLogger } from '../../src/lib/loggers/httpLogger';
1516

1617
// This is a helper function that will flush all pending promises in the event loop when use the setInterval and the callback is promise (jest > 27 version).
1718
// reference: https://gist.github.com/apieceofbart/e6dea8d884d29cf88cdb54ef14ddbcc4
1819
const flushPromises = () =>
1920
new Promise(jest.requireActual('timers').setImmediate);
2021

22+
jest.mock('../../src/lib/loggers/httpLogger', () => {
23+
const originalModule = jest.requireActual('../../src/lib/loggers/httpLogger');
24+
return {
25+
...originalModule,
26+
HttpLogger: jest.fn().mockImplementation(() => {
27+
return {
28+
log: jest.fn().mockResolvedValue(true), // Spy on the add method
29+
};
30+
}),
31+
};
32+
});
33+
const mockLogger = new HttpLogger(
34+
{
35+
enabled: true,
36+
options: { 'http-logger': { connection: { host: 'localhost' } } },
37+
},
38+
'http-logger'
39+
);
40+
2141
describe('Test cache layer refresher', () => {
2242
const folderPath = 'refresher-test-exported-parquets';
2343
const profiles = [
@@ -65,6 +85,10 @@ describe('Test cache layer refresher', () => {
6585
fs.rmSync(folderPath, { recursive: true, force: true });
6686
});
6787

88+
afterEach(() => {
89+
jest.clearAllMocks();
90+
});
91+
6892
it('Should fail to start when exist duplicate cache table name over than one API schema', async () => {
6993
// Arrange
7094
const schemas: Array<APISchema> = [
@@ -98,7 +122,7 @@ describe('Test cache layer refresher', () => {
98122
] as Array<CacheLayerInfo>,
99123
},
100124
];
101-
const refresher = new CacheLayerRefresher(stubCacheLoader);
125+
const refresher = new CacheLayerRefresher(stubCacheLoader, mockLogger);
102126

103127
// Act, Assert
104128
await expect(() => refresher.start(schemas)).rejects.toThrow(
@@ -149,7 +173,7 @@ describe('Test cache layer refresher', () => {
149173
] as Array<CacheLayerInfo>,
150174
},
151175
];
152-
const refresher = new CacheLayerRefresher(stubCacheLoader);
176+
const refresher = new CacheLayerRefresher(stubCacheLoader, mockLogger);
153177

154178
// Act, Assert
155179
await expect(() => refresher.start(schemas)).rejects.toThrow(
@@ -195,7 +219,7 @@ describe('Test cache layer refresher', () => {
195219
];
196220
// Act
197221
const loader = new CacheLayerLoader(options, stubFactory as any);
198-
const refresher = new CacheLayerRefresher(loader);
222+
const refresher = new CacheLayerRefresher(loader, mockLogger);
199223
await refresher.start(schemas);
200224

201225
// Assert
@@ -271,7 +295,7 @@ describe('Test cache layer refresher', () => {
271295

272296
// Stub the load method to not do any thing.
273297
stubCacheLoader.load.resolves();
274-
const refresher = new CacheLayerRefresher(stubCacheLoader);
298+
const refresher = new CacheLayerRefresher(stubCacheLoader, mockLogger);
275299
// Act
276300
await refresher.start(schemas);
277301

@@ -304,4 +328,98 @@ describe('Test cache layer refresher', () => {
304328
refresher.stop();
305329
jest.clearAllTimers();
306330
});
331+
332+
it(
333+
'Should send activity log after cacheLoader execute "load" successfully',
334+
async () => {
335+
// Arrange
336+
const schemas: Array<APISchema> = [
337+
{
338+
...sinon.stubInterface<APISchema>(),
339+
templateSource: 'template-1',
340+
profiles: [profiles[0].name, profiles[1].name],
341+
cache: [
342+
{
343+
cacheTableName: 'orders',
344+
sql: sinon.default.stub() as any,
345+
profile: profiles[0].name,
346+
},
347+
{
348+
cacheTableName: 'products',
349+
sql: sinon.default.stub() as any,
350+
profile: profiles[1].name,
351+
},
352+
] as Array<CacheLayerInfo>,
353+
},
354+
{
355+
...sinon.stubInterface<APISchema>(),
356+
templateSource: 'template-2',
357+
profiles: [profiles[2].name],
358+
cache: [
359+
{
360+
cacheTableName: 'users',
361+
sql: sinon.default.stub() as any,
362+
profile: profiles[2].name,
363+
},
364+
] as Array<CacheLayerInfo>,
365+
},
366+
];
367+
// Act
368+
const loader = new CacheLayerLoader(options, stubFactory as any);
369+
const refresher = new CacheLayerRefresher(loader, mockLogger);
370+
await refresher.start(schemas);
371+
372+
// Assert
373+
expect(mockLogger.log).toHaveBeenCalledTimes(3);
374+
refresher.stop();
375+
},
376+
100 * 1000
377+
);
378+
// Should send activity log when cacheLoader failed on executing "load"
379+
it(
380+
'Should send activity log after cacheLoader execute "load" failed',
381+
async () => {
382+
const schemas: Array<APISchema> = [
383+
{
384+
...sinon.stubInterface<APISchema>(),
385+
templateSource: 'template-1',
386+
profiles: [profiles[0].name, profiles[1].name],
387+
cache: [
388+
{
389+
cacheTableName: 'orders',
390+
sql: sinon.default.stub() as any,
391+
profile: profiles[0].name,
392+
},
393+
{
394+
cacheTableName: 'products',
395+
sql: sinon.default.stub() as any,
396+
profile: profiles[1].name,
397+
},
398+
] as Array<CacheLayerInfo>,
399+
},
400+
{
401+
...sinon.stubInterface<APISchema>(),
402+
templateSource: 'template-2',
403+
profiles: [profiles[2].name],
404+
cache: [
405+
{
406+
cacheTableName: 'users',
407+
sql: sinon.default.stub() as any,
408+
profile: profiles[2].name,
409+
},
410+
] as Array<CacheLayerInfo>,
411+
},
412+
];
413+
// Act
414+
const loader = new CacheLayerLoader(options, stubFactory as any);
415+
stubCacheLoader.load.throws();
416+
const refresher = new CacheLayerRefresher(loader, mockLogger);
417+
await refresher.start(schemas);
418+
419+
// Assert
420+
expect(mockLogger.log).toHaveBeenCalledTimes(3);
421+
refresher.stop();
422+
},
423+
100 * 1000
424+
);
307425
});

0 commit comments

Comments
 (0)