diff --git a/src/gen/http/http.ts b/src/gen/http/http.ts index 3ea1ea692c..41aa4f84ee 100644 --- a/src/gen/http/http.ts +++ b/src/gen/http/http.ts @@ -159,6 +159,7 @@ export class RequestContext { export interface ResponseBody { text(): Promise; binary(): Promise; + stream?(): import('node:stream').Readable; } /** diff --git a/src/index.ts b/src/index.ts index fb6b1bba01..99beccc21b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,6 +3,7 @@ export * from './cache.js'; export * from './api.js'; export * from './attach.js'; export * from './watch.js'; +export * from './watch_api.js'; export * from './exec.js'; export * from './portforward.js'; export * from './types.js'; diff --git a/src/watch_api.ts b/src/watch_api.ts new file mode 100644 index 0000000000..ac4489e974 --- /dev/null +++ b/src/watch_api.ts @@ -0,0 +1,249 @@ +import { createInterface } from 'node:readline'; +import { KubernetesObject } from './types.js'; +import { ApiException, Configuration, HttpMethod } from './gen/index.js'; + +/** + * Represents the type of watch event received from the Kubernetes API. + * + * - `ADDED`: A new object was added. + * - `MODIFIED`: An existing object was modified. + * - `DELETED`: An object was deleted. + * - `BOOKMARK`: A bookmark event for efficient reconnection (contains only resourceVersion). + * - `ERROR`: An error occurred during the watch. + */ +export type WatchEventType = 'ADDED' | 'MODIFIED' | 'DELETED' | 'BOOKMARK' | 'ERROR'; + +/** + * Represents a single watch event from the Kubernetes API. + * + * @typeParam T - The Kubernetes object type (e.g., V1Pod, V1Deployment). + * + * @example + * ```typescript + * import { WatchEvent, V1Pod } from '@kubernetes/client-node'; + * + * const event: WatchEvent = { + * type: 'ADDED', + * object: { apiVersion: 'v1', kind: 'Pod', metadata: { name: 'my-pod' } } + * }; + * ``` + */ +export interface WatchEvent { + /** + * The type of event that occurred. + */ + type: WatchEventType; + + /** + * The Kubernetes object associated with this event. + * For ERROR events, this may contain error details rather than a standard K8s object. + */ + object: T; +} + +/** + * A watch API implementation that uses async iterators and follows the generated + * Kubernetes API client pattern. This allows users to use it with `makeApiClient` + * and override the HTTP library via `wrapHttpLibrary` and `createConfiguration`. + * + * The class uses the configuration's `httpApi` to send requests, enabling custom + * HTTP implementations. For optimal streaming support, custom HTTP libraries should + * return a response body with a `stream()` method that returns a Readable stream. + * + * @example Using with makeApiClient: + * ```typescript + * import { KubeConfig, WatchApi, V1Pod } from '@kubernetes/client-node'; + * + * const kubeConfig = new KubeConfig(); + * kubeConfig.loadFromDefault(); + * + * const watchApi = kubeConfig.makeApiClient(WatchApi); + * + * for await (const event of watchApi.watch('/api/v1/namespaces/default/pods')) { + * console.log(`${event.type}: ${event.object.metadata?.name}`); + * } + * ``` + * + * @example With custom HTTP library: + * ```typescript + * import { KubeConfig, WatchApi, V1Pod, wrapHttpLibrary, createConfiguration, ServerConfiguration, ResponseContext } from '@kubernetes/client-node'; + * import { Readable } from 'node:stream'; + * import ky from 'ky'; + * + * const httpApi = wrapHttpLibrary({ + * async send(request) { + * const response = await ky(request.getUrl(), { + * method: request.getHttpMethod(), + * headers: request.getHeaders(), + * body: request.getBody(), + * }); + * + * return new ResponseContext( + * response.status, + * Object.fromEntries(response.headers.entries()), + * { + * text: () => response.text(), + * binary: async () => Buffer.from(await response.arrayBuffer()), + * stream: () => Readable.fromWeb(response.body), // Enable streaming for watch + * }, + * ); + * }, + * }); + * + * const kubeConfig = new KubeConfig(); + * kubeConfig.loadFromDefault(); + * + * const configuration = createConfiguration({ + * baseServer: new ServerConfiguration(kubeConfig.getCurrentCluster()!.server, {}), + * authMethods: { default: kubeConfig }, + * httpApi, + * }); + * + * const watchApi = new WatchApi(configuration); + * + * for await (const event of watchApi.watch('/api/v1/namespaces/default/pods')) { + * console.log(`${event.type}: ${event.object.metadata?.name}`); + * } + * ``` + * + * @example With query parameters: + * ```typescript + * for await (const event of watchApi.watch('/api/v1/namespaces/default/pods', { + * labelSelector: 'app=nginx', + * resourceVersion: '12345', + * allowWatchBookmarks: true, + * })) { + * switch (event.type) { + * case 'ADDED': + * console.log('Pod added:', event.object.metadata?.name); + * break; + * case 'MODIFIED': + * console.log('Pod modified:', event.object.metadata?.name); + * break; + * case 'DELETED': + * console.log('Pod deleted:', event.object.metadata?.name); + * break; + * } + * } + * ``` + */ +export class WatchApi { + private configuration: Configuration; + private requestTimeoutMs: number = 30000; + + /** + * Creates a new WatchApi instance. + * + * @param configuration - The API configuration object from `createConfiguration()` or via `makeApiClient`. + */ + constructor(configuration: Configuration) { + this.configuration = configuration; + } + + /** + * Sets the request timeout in milliseconds. + * + * @param timeout - Timeout in milliseconds. + */ + public setRequestTimeout(timeout: number): void { + this.requestTimeoutMs = timeout; + } + + /** + * Watches for changes to Kubernetes resources at the specified path. + * Returns an async iterator that yields watch events. + * + * @typeParam T - The Kubernetes object type to expect (e.g., V1Pod, V1Deployment). + * + * @param path - The API path to watch (e.g., '/api/v1/namespaces/default/pods'). + * @param queryParams - Optional query parameters for the watch request. + * Supports any query parameter accepted by the Kubernetes API. + * + * @yields {WatchEvent} Events as they are received from the API server. + * + * @throws {ApiException} When the watch request fails or the server returns an error status. + * + * @example + * ```typescript + * for await (const event of watchApi.watch('/api/v1/namespaces/default/pods', { + * labelSelector: 'app=nginx', + * resourceVersion: '12345', + * })) { + * console.log(`${event.type}: ${event.object.metadata?.name}`); + * } + * ``` + */ + async *watch( + path: string, + queryParams: Record = {}, + ): AsyncGenerator, void, undefined> { + const requestContext = this.configuration.baseServer.makeRequestContext(path, HttpMethod.GET); + + requestContext.setQueryParam('watch', 'true'); + + for (const [key, val] of Object.entries(queryParams)) { + if (val !== undefined) { + requestContext.setQueryParam(key, val.toString()); + } + } + + const authMethod = this.configuration.authMethods.default; + + if (authMethod?.applySecurityAuthentication) { + await authMethod.applySecurityAuthentication(requestContext); + } + + const controller = new AbortController(); + + const timeoutSignal = AbortSignal.timeout(this.requestTimeoutMs); + + requestContext.setSignal(AbortSignal.any([controller.signal, timeoutSignal])); + + try { + const response = await this.configuration.httpApi.send(requestContext).toPromise(); + + if (response.httpStatusCode !== 200) { + const body = await response.body.text(); + + throw new ApiException( + response.httpStatusCode, + 'Watch request failed', + body, + response.headers, + ); + } + + if (response.body.stream) { + // Use streaming if available, otherwise fall back to text parsing + const stream = response.body.stream(); + + const lines = createInterface(stream); + + for await (const line of lines) { + const data = JSON.parse(line.toString()) as { type: WatchEventType; object: T }; + + yield { + type: data.type, + object: data.object, + }; + } + } else { + // Fallback: parse full text response line by line + const text = await response.body.text(); + + const lines = text.split('\n').filter((line) => line.trim() !== ''); + + for (const line of lines) { + const data = JSON.parse(line) as { type: WatchEventType; object: T }; + + yield { + type: data.type, + object: data.object, + }; + } + } + } finally { + controller.abort(); + } + } +} diff --git a/src/watch_api_test.ts b/src/watch_api_test.ts new file mode 100644 index 0000000000..ab5d48e175 --- /dev/null +++ b/src/watch_api_test.ts @@ -0,0 +1,335 @@ +import { describe, it } from 'node:test'; +import { Readable } from 'node:stream'; +import { deepStrictEqual, strictEqual, rejects } from 'node:assert'; +import { WatchApi, WatchEvent } from './watch_api.js'; +import { + ApiException, + createConfiguration, + wrapHttpLibrary, + ServerConfiguration, + ResponseContext, + RequestContext, +} from './gen/index.js'; + +const server = 'https://foo.company.com'; + +/** + * Creates a mock configuration with a custom HTTP library for testing. + */ +function createMockConfiguration( + baseUrl: string, + responseBody: string, + statusCode: number = 200, + useStreaming: boolean = false, +) { + const httpApi = wrapHttpLibrary({ + async send(_request: RequestContext): Promise { + const body = useStreaming + ? { + text: () => Promise.resolve(responseBody), + binary: () => Promise.resolve(Buffer.from(responseBody)), + stream: () => Readable.from(responseBody), + } + : { + text: () => Promise.resolve(responseBody), + binary: () => Promise.resolve(Buffer.from(responseBody)), + }; + + return new ResponseContext(statusCode, {}, body); + }, + }); + + // Create a mock auth method that does nothing + const mockAuth = { + getName: () => 'mock', + applySecurityAuthentication: async (_context: RequestContext): Promise => {}, + }; + + return createConfiguration({ + baseServer: new ServerConfiguration(baseUrl, {}), + authMethods: { default: mockAuth }, + httpApi, + }); +} + +describe('WatchApi', () => { + it('should construct correctly', () => { + const config = createMockConfiguration(server, ''); + const watchApi = new WatchApi(config); + strictEqual(watchApi instanceof WatchApi, true); + }); + + it('should iterate over watch events using text fallback', async () => { + const events = [ + { type: 'ADDED', object: { apiVersion: 'v1', kind: 'Pod', metadata: { name: 'pod1' } } }, + { type: 'MODIFIED', object: { apiVersion: 'v1', kind: 'Pod', metadata: { name: 'pod1' } } }, + { type: 'DELETED', object: { apiVersion: 'v1', kind: 'Pod', metadata: { name: 'pod1' } } }, + ]; + + const responseBody = events.map((e) => JSON.stringify(e)).join('\n'); + const config = createMockConfiguration(server, responseBody); + const watchApi = new WatchApi(config); + + const receivedEvents: WatchEvent[] = []; + for await (const event of watchApi.watch('/api/v1/namespaces/default/pods')) { + receivedEvents.push(event); + } + + strictEqual(receivedEvents.length, 3); + deepStrictEqual(receivedEvents[0].type, 'ADDED'); + deepStrictEqual(receivedEvents[1].type, 'MODIFIED'); + deepStrictEqual(receivedEvents[2].type, 'DELETED'); + deepStrictEqual(receivedEvents[0].object.metadata?.name, 'pod1'); + }); + + it('should iterate over watch events using streaming', async () => { + const events = [ + { type: 'ADDED', object: { apiVersion: 'v1', kind: 'Pod', metadata: { name: 'pod1' } } }, + { type: 'MODIFIED', object: { apiVersion: 'v1', kind: 'Pod', metadata: { name: 'pod1' } } }, + { type: 'DELETED', object: { apiVersion: 'v1', kind: 'Pod', metadata: { name: 'pod1' } } }, + ]; + + const responseBody = events.map((e) => JSON.stringify(e)).join('\n'); + const config = createMockConfiguration(server, responseBody, 200, true); + const watchApi = new WatchApi(config); + + const receivedEvents: WatchEvent[] = []; + for await (const event of watchApi.watch('/api/v1/namespaces/default/pods')) { + receivedEvents.push(event); + } + + strictEqual(receivedEvents.length, 3); + deepStrictEqual(receivedEvents[0].type, 'ADDED'); + deepStrictEqual(receivedEvents[1].type, 'MODIFIED'); + deepStrictEqual(receivedEvents[2].type, 'DELETED'); + deepStrictEqual(receivedEvents[0].object.metadata?.name, 'pod1'); + }); + + it('should handle BOOKMARK events', async () => { + const events = [ + { type: 'ADDED', object: { apiVersion: 'v1', kind: 'Pod', metadata: { name: 'pod1' } } }, + { + type: 'BOOKMARK', + object: { apiVersion: 'v1', kind: 'Pod', metadata: { resourceVersion: '12345' } }, + }, + ]; + + const responseBody = events.map((e) => JSON.stringify(e)).join('\n'); + const config = createMockConfiguration(server, responseBody); + const watchApi = new WatchApi(config); + + const receivedEvents: WatchEvent[] = []; + for await (const event of watchApi.watch('/api/v1/namespaces/default/pods')) { + receivedEvents.push(event); + } + + strictEqual(receivedEvents.length, 2); + deepStrictEqual(receivedEvents[1].type, 'BOOKMARK'); + }); + + it('should handle ERROR events in the watch stream', async () => { + const events = [ + { type: 'ADDED', object: { apiVersion: 'v1', kind: 'Pod', metadata: { name: 'pod1' } } }, + { type: 'ERROR', object: { code: 410, message: 'Gone', reason: 'Expired' } }, + ]; + + const responseBody = events.map((e) => JSON.stringify(e)).join('\n'); + const config = createMockConfiguration(server, responseBody); + const watchApi = new WatchApi(config); + + const receivedEvents: WatchEvent[] = []; + for await (const event of watchApi.watch('/api/v1/namespaces/default/pods')) { + receivedEvents.push(event); + } + + strictEqual(receivedEvents.length, 2); + deepStrictEqual(receivedEvents[1].type, 'ERROR'); + deepStrictEqual(receivedEvents[1].object.code, 410); + }); + + it('should throw ApiException on non-200 status', async () => { + const config = createMockConfiguration(server, 'Internal Server Error', 500); + const watchApi = new WatchApi(config); + + await rejects( + async () => { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const event of watchApi.watch('/api/v1/namespaces/default/pods')) { + // Should not reach here + } + }, + (err: Error) => { + strictEqual(err instanceof ApiException, true); + strictEqual((err as ApiException).code, 500); + return true; + }, + ); + }); + + it('should throw on invalid JSON lines', async () => { + const validEvent = { + type: 'ADDED', + object: { apiVersion: 'v1', kind: 'Pod', metadata: { name: 'pod1' } }, + }; + const responseBody = `${JSON.stringify(validEvent)}\n{"invalid json\n${JSON.stringify(validEvent)}`; + const config = createMockConfiguration(server, responseBody); + const watchApi = new WatchApi(config); + + const iterator = watchApi.watch('/api/v1/namespaces/default/pods')[Symbol.asyncIterator](); + + // First event is valid + const first = await iterator.next(); + strictEqual(first.done, false); + strictEqual(first.value.type, 'ADDED'); + + // Second line is invalid, so next() should throw + await rejects(async () => { + await iterator.next(); + }, SyntaxError); + }); + + it('should pass query parameters correctly', async () => { + let capturedUrl: string = ''; + + const httpApi = wrapHttpLibrary({ + async send(request: RequestContext): Promise { + capturedUrl = request.getUrl(); + const event = { + type: 'ADDED', + object: { apiVersion: 'v1', kind: 'Pod', metadata: { name: 'pod1' } }, + }; + return new ResponseContext( + 200, + {}, + { + text: () => Promise.resolve(JSON.stringify(event)), + binary: () => Promise.resolve(Buffer.from(JSON.stringify(event))), + }, + ); + }, + }); + + const mockAuth = { + getName: () => 'mock', + applySecurityAuthentication: async (_context: RequestContext): Promise => {}, + }; + + const config = createConfiguration({ + baseServer: new ServerConfiguration(server, {}), + authMethods: { default: mockAuth }, + httpApi, + }); + + const watchApi = new WatchApi(config); + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const event of watchApi.watch('/api/v1/namespaces/default/pods', { + resourceVersion: '12345', + labelSelector: 'app=nginx', + fieldSelector: 'metadata.name=my-pod', + allowWatchBookmarks: true, + })) { + // Just consume the event + } + + strictEqual(capturedUrl.includes('watch=true'), true); + strictEqual(capturedUrl.includes('resourceVersion=12345'), true); + strictEqual(capturedUrl.includes('labelSelector=app%3Dnginx'), true); + strictEqual(capturedUrl.includes('fieldSelector=metadata.name%3Dmy-pod'), true); + strictEqual(capturedUrl.includes('allowWatchBookmarks=true'), true); + }); + + it('should handle empty response', async () => { + const config = createMockConfiguration(server, ''); + const watchApi = new WatchApi(config); + + const receivedEvents: WatchEvent[] = []; + for await (const event of watchApi.watch('/api/v1/namespaces/default/pods')) { + receivedEvents.push(event); + } + + strictEqual(receivedEvents.length, 0); + }); +}); + +describe('WatchApi with custom HTTP library', () => { + it('should work with custom HTTP implementation', async () => { + const events = [ + { type: 'ADDED', object: { apiVersion: 'v1', kind: 'Pod', metadata: { name: 'custom-pod' } } }, + ]; + const responseBody = events.map((e) => JSON.stringify(e)).join('\n'); + + // Custom HTTP implementation + const customHttpApi = wrapHttpLibrary({ + async send(request: RequestContext): Promise { + // Verify we receive the request correctly + strictEqual(request.getHttpMethod(), 'GET'); + strictEqual(request.getUrl().includes('/api/v1/namespaces/default/pods'), true); + + return new ResponseContext( + 200, + { 'content-type': 'application/json' }, + { + text: () => Promise.resolve(responseBody), + binary: () => Promise.resolve(Buffer.from(responseBody)), + stream: () => Readable.from(responseBody), + }, + ); + }, + }); + + const mockAuth = { + getName: () => 'mock', + applySecurityAuthentication: async (_context: RequestContext): Promise => {}, + }; + + const configuration = createConfiguration({ + baseServer: new ServerConfiguration(server, {}), + authMethods: { default: mockAuth }, + httpApi: customHttpApi, + }); + + const watchApi = new WatchApi(configuration); + + const receivedEvents: WatchEvent[] = []; + for await (const event of watchApi.watch('/api/v1/namespaces/default/pods')) { + receivedEvents.push(event); + } + + strictEqual(receivedEvents.length, 1); + deepStrictEqual(receivedEvents[0].object.metadata?.name, 'custom-pod'); + }); +}); + +describe('WatchApi type safety', () => { + it('should preserve generic type through iteration', async () => { + interface CustomResource { + apiVersion?: string; + kind?: string; + metadata?: { name: string; namespace: string }; + spec?: { replicas: number }; + } + + const events = [ + { + type: 'ADDED', + object: { + apiVersion: 'custom.io/v1', + kind: 'CustomResource', + metadata: { name: 'my-resource', namespace: 'default' }, + spec: { replicas: 3 }, + }, + }, + ]; + + const responseBody = events.map((e) => JSON.stringify(e)).join('\n'); + const config = createMockConfiguration(server, responseBody); + const watchApi = new WatchApi(config); + + for await (const event of watchApi.watch('/apis/custom.io/v1/customresources')) { + // Type should be correctly inferred + strictEqual(event.object.spec?.replicas, 3); + strictEqual(event.object.metadata?.name, 'my-resource'); + } + }); +});