Skip to content

Commit e904de2

Browse files
committed
extends CacheLayerInfo interface & Profile interface
to store properties passed to datasource.
1 parent 23aa7df commit e904de2

File tree

8 files changed

+68
-7
lines changed

8 files changed

+68
-7
lines changed

packages/core/src/lib/cache-layer/cacheLayerLoader.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,14 @@ export class CacheLayerLoader implements ICacheLayerLoader {
4343
templateName: string,
4444
cache: CacheLayerInfo
4545
): Promise<void> {
46-
const { cacheTableName, sql, profile, indexes, folderSubpath } = cache;
46+
const {
47+
cacheTableName,
48+
sql,
49+
profile,
50+
indexes,
51+
folderSubpath,
52+
options: cacheOptions,
53+
} = cache;
4754
const type = this.options.type!;
4855
const dataSource = this.dataSourceFactory(profile);
4956

@@ -82,6 +89,7 @@ export class CacheLayerLoader implements ICacheLayerLoader {
8289
directory,
8390
profileName: profile,
8491
type,
92+
options: cacheOptions,
8593
});
8694
} else {
8795
this.logger.debug(

packages/core/src/models/artifact.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ export class CacheLayerInfo {
118118
indexes?: Record<string, string>;
119119
// cache folder subpath
120120
folderSubpath?: string;
121+
// options pass to the data source
122+
options?: any;
121123
}
122124

123125
export class APISchema {

packages/core/src/models/extensions/dataSource.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ export interface ExportOptions {
1919
directory: string;
2020
// The profile name to select to export data
2121
profileName: string;
22+
// data source options
23+
options?: any;
2224
// export file format type
2325
type: CacheLayerStoreFormatType | string;
2426
}

packages/core/src/models/profile.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,14 @@ export interface Profile<C = Record<string, any>> {
2929
cache?: C;
3030
/** What users have access to this profile */
3131
allow: ProfileAllowConstraints;
32+
/** Properties that can be used when involking the dataSource method */
33+
properties?: Record<string, any>;
3234
}
35+
36+
// profile : by connection/pool/client setting 的變動
37+
// vulcan.yaml: by project 設定
38+
// api.yaml: by api/cache 執行設定
39+
40+
// => use additional information when refreshing cache
41+
// => the userId changed by each api
42+
// => the root_user_id changed by project

packages/extension-driver-canner/src/lib/cannerAdapter.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,18 @@ export class CannerAdapter {
3131
// When querying Canner enterprise, the Canner enterprise will save the query result as parquet files,
3232
// and store them in S3. This method will return the S3 urls of the query result.
3333
// For more Canner API ref: https://docs.cannerdata.com/reference/restful
34-
public async createAsyncQueryResultUrls(sql: string): Promise<string[]> {
34+
public async createAsyncQueryResultUrls(
35+
sql: string,
36+
headers?: Record<string, string>
37+
): Promise<string[]> {
3538
this.logger.debug(`Create async request to Canner.`);
3639
let data = await this.getWorkspaceRequestData('post', '/v2/async-queries', {
3740
data: {
3841
sql,
3942
timeout: 600,
4043
noLimit: true,
4144
},
45+
headers,
4246
});
4347

4448
const { id: requestId } = data;
@@ -60,12 +64,14 @@ export class CannerAdapter {
6064
private async getWorkspaceRequestData(
6165
method: string,
6266
urlPath: string,
63-
options?: Record<string, any>
67+
options?: Record<string, any>,
68+
headers?: Record<string, string>
6469
) {
6570
await this.prepare();
6671
try {
6772
const response = await axios({
6873
headers: {
74+
...headers,
6975
Authorization: `Token ${this.PAT}`,
7076
},
7177
params: {

packages/extension-driver-canner/src/lib/cannerDataSource.ts

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export class CannerDataSource extends DataSource<any, PGOptions> {
2626
private logger = this.getLogger();
2727
protected poolMapping = new Map<
2828
string,
29-
{ pool: Pool; options?: PGOptions }
29+
{ pool: Pool; options?: PGOptions; properties?: Record<string, any> }
3030
>();
3131
protected UserPool = new Map<string, Pool>();
3232

@@ -52,6 +52,7 @@ export class CannerDataSource extends DataSource<any, PGOptions> {
5252
this.poolMapping.set(profile.name, {
5353
pool,
5454
options: profile.connection,
55+
properties: profile.properties,
5556
});
5657
this.logger.debug(`Profile ${profile.name} initialized`);
5758
}
@@ -61,6 +62,7 @@ export class CannerDataSource extends DataSource<any, PGOptions> {
6162
sql,
6263
directory,
6364
profileName,
65+
options: cannerpOtions,
6466
}: ExportOptions): Promise<void> {
6567
if (!this.poolMapping.has(profileName)) {
6668
throw new InternalError(`Profile instance ${profileName} not found`);
@@ -69,12 +71,16 @@ export class CannerDataSource extends DataSource<any, PGOptions> {
6971
if (!fs.existsSync(directory)) {
7072
throw new InternalError(`Directory ${directory} not found`);
7173
}
72-
const { options: connection } = this.poolMapping.get(profileName)!;
73-
74+
const { options: connection, properties } =
75+
this.poolMapping.get(profileName)!;
7476
const cannerAdapter = new CannerAdapter(connection);
7577
try {
7678
this.logger.debug('Send the async query to the Canner Enterprise');
77-
const presignedUrls = await cannerAdapter.createAsyncQueryResultUrls(sql);
79+
const header = this.getCannerRequestHeader(properties, cannerpOtions);
80+
const presignedUrls = await cannerAdapter.createAsyncQueryResultUrls(
81+
sql,
82+
header
83+
);
7884
this.logger.debug(
7985
'Start fetching the query result parquet files from URLs'
8086
);
@@ -85,6 +91,21 @@ export class CannerDataSource extends DataSource<any, PGOptions> {
8591
throw error;
8692
}
8793
}
94+
private getCannerRequestHeader(
95+
properties?: Record<string, any>,
96+
cannerOptions?: any
97+
) {
98+
const header: Record<string, string> = {};
99+
const { userId } = cannerOptions;
100+
const rootUserId = properties?.['rootUserId'];
101+
if (userId && rootUserId) {
102+
header[
103+
'x-trino-session'
104+
] = `root_user_id=${rootUserId}, canner_user_id=${userId}`;
105+
this.logger.debug(`Impersonate used: ${userId}`);
106+
}
107+
return header;
108+
}
88109

89110
private async downloadFiles(urls: string[], directory: string) {
90111
await Promise.all(

packages/extension-store-canner/src/lib/canner/profileReader.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export class CannerProfileReader extends ProfileReader {
4444

4545
// generate profiles from the indicator files of each workspaces
4646
const { user, password, host, port, max } = this.envConfig.profile;
47+
const { rootUserId } = this.envConfig.properties;
4748
if (!user || !password || !host)
4849
throw new ConfigurationError(
4950
'Canner profile reader needs username, password, host properties.'
@@ -67,6 +68,9 @@ export class CannerProfileReader extends ProfileReader {
6768
max,
6869
},
6970
allow: '*',
71+
properties: {
72+
rootUserId,
73+
},
7074
} as Profile<Record<string, any>>;
7175
this.logger.debug(`created "${profile.name}".`);
7276
return profile;

packages/extension-store-canner/src/lib/config.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
export interface CannerStoreConfig {
22
storage: StorageServiceOptions;
3+
properties: CannnerDriverProfileProperties;
34
profile: CannerDriverProfileOptions;
45
}
56

7+
export interface CannnerDriverProfileProperties {
8+
rootUserId?: string;
9+
}
10+
611
export interface CannerDriverProfileOptions {
712
// user to connect to canner enterprise. Default is canner
813
user?: string;
@@ -64,6 +69,9 @@ export const getEnvConfig = (): CannerStoreConfig => {
6469
max:
6570
Number(process.env['PROFILE_CANNER_DRIVER_CONNECTION_POOL_MAX']) || 10,
6671
},
72+
properties: {
73+
rootUserId: process.env['PROFILE_ROOT_USER_ID'],
74+
},
6775
storage: {
6876
provider: process.env['STORAGE_PROVIDER'],
6977
// MINIO Provider options

0 commit comments

Comments
 (0)