Skip to content

Commit 9b96ba2

Browse files
committed
feature(extension-driver-canner): connect to datasource using the connection info of the API requester
1 parent b08da4b commit 9b96ba2

File tree

5 files changed

+143
-18
lines changed

5 files changed

+143
-18
lines changed

packages/extension-driver-canner/src/lib/cannerDataSource.ts

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@ export interface PGOptions extends PoolConfig {
2424
@VulcanExtensionId('canner')
2525
export class CannerDataSource extends DataSource<any, PGOptions> {
2626
private logger = this.getLogger();
27-
private poolMapping = new Map<string, { pool: Pool; options?: PGOptions }>();
27+
protected poolMapping = new Map<
28+
string,
29+
{ pool: Pool; options?: PGOptions }
30+
>();
31+
protected UserPool = new Map<string, Pool>();
2832

2933
public override async onActivate() {
3034
const profiles = this.getProfiles().values();
@@ -108,14 +112,14 @@ export class CannerDataSource extends DataSource<any, PGOptions> {
108112
bindParams,
109113
profileName,
110114
operations,
115+
headers,
111116
}: ExecuteOptions): Promise<DataResult> {
112-
if (!this.poolMapping.has(profileName)) {
113-
throw new InternalError(`Profile instance ${profileName} not found`);
114-
}
115-
const { pool, options } = this.poolMapping.get(profileName)!;
116-
this.logger.debug(`Acquiring connection from ${profileName}`);
117-
const client = await pool.connect();
118117
this.logger.debug(`Acquired connection from ${profileName}`);
118+
const { options } = this.poolMapping.get(profileName)!;
119+
const auth = headers?.['authorization'];
120+
const password = auth?.trim().split(' ')[1];
121+
const pool = this.getPool(profileName, password);
122+
const client = await pool.connect();
119123
try {
120124
const builtSQL = buildSQL(sql, operations);
121125
const cursor = client.query(
@@ -150,6 +154,38 @@ export class CannerDataSource extends DataSource<any, PGOptions> {
150154
}
151155
}
152156

157+
// use protected to make it testable
158+
protected getPool(profileName: string, password?: string): Pool {
159+
if (!this.poolMapping.has(profileName)) {
160+
throw new InternalError(`Profile instance ${profileName} not found`);
161+
}
162+
const { pool: defaultPool, options: poolOptions } =
163+
this.poolMapping.get(profileName)!;
164+
this.logger.debug(`Acquiring connection from ${profileName}`);
165+
if (!password) {
166+
return defaultPool;
167+
}
168+
const database = poolOptions?.database || '';
169+
const userPoolKey = this.getUserPoolKey(password, database);
170+
if (this.UserPool.has(userPoolKey)) {
171+
const userPool = this.UserPool.get(userPoolKey);
172+
if (!userPool) {
173+
throw new InternalError(
174+
`User pool ${userPoolKey} is not a Pool instance`
175+
);
176+
}
177+
return userPool;
178+
}
179+
const pool = new Pool({ ...poolOptions, password: password });
180+
this.UserPool.set(userPoolKey, pool);
181+
return pool;
182+
}
183+
184+
// use protected to make it testable
185+
protected getUserPoolKey(pat: string, database?: string) {
186+
return `${pat}-${database}`;
187+
}
188+
153189
private async getResultFromCursor(
154190
cursor: Cursor,
155191
options: PGOptions = {}

packages/extension-driver-canner/test/cannerDataSource.spec.ts

Lines changed: 66 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { CannerServer } from './cannerServer';
22
import { CannerDataSource, PGOptions } from '../src';
3+
import { MockCannerDataSource } from './mock';
34
import { ExportOptions, InternalError, streamToArray } from '@vulcan-sql/core';
45
import { Writable } from 'stream';
56
import * as sinon from 'ts-sinon';
@@ -8,7 +9,9 @@ import { CannerAdapter } from '../src/lib/cannerAdapter';
89

910
const pg = new CannerServer();
1011
let dataSource: CannerDataSource;
12+
let mockDataSource: MockCannerDataSource;
1113

14+
const directory = 'tmp_test_canner';
1215
// restore all sinon mock/stub before each test
1316
beforeEach(() => {
1417
sinon.default.restore();
@@ -42,22 +45,22 @@ it('Data source should throw error when activating if any profile is invalid', a
4245

4346
// export method should be executed successfully
4447
it('Data source should export successfully', async () => {
45-
fs.mkdirSync('tmp', { recursive: true });
48+
fs.mkdirSync(directory, { recursive: true });
4649
dataSource = new CannerDataSource({}, '', [pg.getProfile('profile1')]);
4750
await dataSource.activate();
4851

4952
// Act, Assert
5053
await expect(
5154
dataSource.export({
5255
sql: 'select 1',
53-
directory: 'tmp',
56+
directory,
5457
profileName: 'profile1',
5558
} as ExportOptions)
5659
).resolves.not.toThrow();
57-
expect(fs.readdirSync('tmp').length).toBe(1);
60+
expect(fs.readdirSync(directory).length).toBe(1);
5861

5962
// clean up
60-
fs.rmSync('tmp', { recursive: true, force: true });
63+
fs.rmSync(directory, { recursive: true, force: true });
6164
}, 100000);
6265

6366
it('Data source should throw error when fail to export data', async () => {
@@ -73,22 +76,22 @@ it('Data source should throw error when fail to export data', async () => {
7376
);
7477
});
7578

76-
fs.mkdirSync('tmp', { recursive: true });
79+
fs.mkdirSync(directory, { recursive: true });
7780
dataSource = new CannerDataSource({}, '', [pg.getProfile('profile1')]);
7881
await dataSource.activate();
7982

8083
// Act, Assert
8184
await expect(
8285
dataSource.export({
8386
sql: 'select 1',
84-
directory: 'tmp',
87+
directory,
8588
profileName: 'profile1',
8689
} as ExportOptions)
8790
).rejects.toThrow();
88-
expect(fs.readdirSync('tmp').length).toBe(0);
91+
expect(fs.readdirSync(directory).length).toBe(0);
8992

9093
// clean up
91-
fs.rmSync('tmp', { recursive: true, force: true });
94+
fs.rmSync(directory, { recursive: true, force: true });
9295
}, 100000);
9396

9497
it('Data source should throw error when given directory is not exist', async () => {
@@ -100,7 +103,7 @@ it('Data source should throw error when given directory is not exist', async ()
100103
await expect(
101104
dataSource.export({
102105
sql: 'select 1',
103-
directory: 'tmp',
106+
directory: directory,
104107
profileName: 'profile1',
105108
} as ExportOptions)
106109
).rejects.toThrow();
@@ -110,13 +113,13 @@ it('Data source should throw error when given profile name is not exist', async
110113
// Arrange
111114
dataSource = new CannerDataSource({}, '', [pg.getProfile('profile1')]);
112115
await dataSource.activate();
113-
fs.mkdirSync('tmp', { recursive: true });
116+
fs.mkdirSync(directory, { recursive: true });
114117

115118
// Act, Assert
116119
await expect(
117120
dataSource.export({
118121
sql: 'select 1',
119-
directory: 'tmp',
122+
directory,
120123
profileName: 'profile not exist',
121124
} as ExportOptions)
122125
).rejects.toThrow();
@@ -318,3 +321,55 @@ it('Data source should release connection when readable stream is destroyed', as
318321
expect(rows.length).toBe(1);
319322
// afterEach hook will timeout if any leak occurred.
320323
}, 300000);
324+
325+
it('Should return the same pool when the profile is the same', async () => {
326+
// Arrange
327+
mockDataSource = new MockCannerDataSource({}, '', [
328+
pg.getProfile('profile1'),
329+
]);
330+
await mockDataSource.activate();
331+
// Act
332+
const pool1 = mockDataSource.getPool('profile1');
333+
const pool2 = mockDataSource.getPool('profile1');
334+
// Assert
335+
expect(pool1 === pool2).toBeTruthy();
336+
}, 30000);
337+
338+
it('Should return the same pool when the profile and authentication is the same', async () => {
339+
// Arrange
340+
mockDataSource = new MockCannerDataSource({}, '', [
341+
pg.getProfile('profile1'),
342+
]);
343+
await mockDataSource.activate();
344+
// Act
345+
const pool1 = mockDataSource.getPool('profile1', 'the-same-authentication');
346+
const pool2 = mockDataSource.getPool('profile1', 'the-same-authentication');
347+
// Assert
348+
expect(pool1 === pool2).toBeTruthy();
349+
}, 30000);
350+
351+
it('Should return different pool if authentication exist in headers even the profile is the same', async () => {
352+
// Arrange
353+
mockDataSource = new MockCannerDataSource({}, '', [
354+
pg.getProfile('profile1'),
355+
]);
356+
await mockDataSource.activate();
357+
// Act
358+
const pool1 = mockDataSource.getPool('profile1');
359+
const pool2 = mockDataSource.getPool('profile1', 'my-authentication');
360+
// Assert
361+
expect(pool1 == pool2).toBeFalsy();
362+
}, 30000);
363+
364+
it('Should return different pool with different authentication even the profile is the same', async () => {
365+
// Arrange
366+
mockDataSource = new MockCannerDataSource({}, '', [
367+
pg.getProfile('profile1'),
368+
]);
369+
await mockDataSource.activate();
370+
// Act
371+
const pool1 = mockDataSource.getPool('profile1', 'authentication');
372+
const pool2 = mockDataSource.getPool('profile1', 'differ-authentication');
373+
// Assert
374+
expect(pool1 === pool2).toBeFalsy();
375+
}, 30000);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './mockCannerDataSource';
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import { CannerDataSource } from '../../src';
2+
import { InternalError } from '@vulcan-sql/core';
3+
import { Pool } from 'pg';
4+
5+
export class MockCannerDataSource extends CannerDataSource {
6+
public override getPool(profileName: string, password?: string): Pool {
7+
if (!this.poolMapping.has(profileName)) {
8+
throw new InternalError(`Profile instance ${profileName} not found`);
9+
}
10+
const { pool: defaultPool, options: poolOptions } =
11+
this.poolMapping.get(profileName)!;
12+
if (!password) {
13+
return defaultPool;
14+
}
15+
const database = poolOptions?.database || '';
16+
const userPoolKey = this.getUserPoolKey(password, database);
17+
if (this.UserPool.has(userPoolKey)) {
18+
const userPool = this.UserPool.get(userPoolKey);
19+
if (!userPool) {
20+
throw new InternalError(
21+
`User pool ${userPoolKey} is not a Pool instance`
22+
);
23+
}
24+
return userPool;
25+
}
26+
const pool = new Pool({ ...poolOptions, password: password });
27+
this.UserPool.set(userPoolKey, pool);
28+
return pool;
29+
}
30+
}

packages/extension-driver-duckdb/src/lib/duckdbDataSource.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,13 @@ export class DuckDBDataSource extends DataSource<any, DuckDBOptions> {
9393
bindParams,
9494
profileName,
9595
operations,
96+
headers,
9697
}: ExecuteOptions): Promise<DataResult> {
9798
if (!this.dbMapping.has(profileName)) {
9899
throw new InternalError(`Profile instance ${profileName} not found`);
99100
}
101+
console.log(`execute duckdb: ${sql}`);
102+
console.log({ headers });
100103
const { db, configurationParameters, ...options } =
101104
this.dbMapping.get(profileName)!;
102105
const [firstDataSQL, restDataSQL] = buildSQL(sql, operations);

0 commit comments

Comments
 (0)