diff --git a/docs/env.md b/docs/env.md index 322bdae92..520502578 100644 --- a/docs/env.md +++ b/docs/env.md @@ -137,6 +137,8 @@ The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable should be a JSON array of [ { "socketPath": "/var/run/docker.sock", + "imageRetentionDays": 7, + "imageCleanupInterval": 86400, "resources": [ { "id": "disk", @@ -193,6 +195,8 @@ The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable should be a JSON array of #### Configuration Options - **socketPath**: Path to the Docker socket (e.g., docker.sock). +- **imageRetentionDays** - how long docker images are kept, in days. Default: 7 +- **imageCleanupInterval** - how often to run cleanup for docker images, in seconds. Min: 3600 (1hour), Default: 86400 (24 hours) - **storageExpiry**: Amount of seconds for storage expiry.(Mandatory) - **maxJobDuration**: Maximum duration in seconds for a job.(Mandatory) - **minJobDuration**: Minimum duration in seconds for a job.(Mandatory) diff --git a/src/@types/C2D/C2D.ts b/src/@types/C2D/C2D.ts index 7dc780b57..6a6213a2e 100644 --- a/src/@types/C2D/C2D.ts +++ b/src/@types/C2D/C2D.ts @@ -142,6 +142,8 @@ export interface C2DDockerConfig { resources?: ComputeResource[] // optional, owner can overwrite free?: ComputeEnvironmentFreeOptions access: ComputeAccessList + imageRetentionDays?: number // Default: 7 days + imageCleanupInterval?: number // Default: 86400 seconds (24 hours) } export type ComputeResultType = diff --git a/src/components/c2d/compute_engine_docker.ts b/src/components/c2d/compute_engine_docker.ts index 50a71941d..7459e7ce2 100644 --- a/src/components/c2d/compute_engine_docker.ts +++ b/src/components/c2d/compute_engine_docker.ts @@ -59,7 +59,11 @@ export class C2DEngineDocker extends C2DEngine { private cronTimer: any private cronTime: number = 2000 private jobImageSizes: Map = new Map() + private isInternalLoopRunning: boolean = false + private imageCleanupTimer: NodeJS.Timeout | null = null private static DEFAULT_DOCKER_REGISTRY = 'https://registry-1.docker.io' + private retentionDays: number + private cleanupInterval: number public constructor( clusterConfig: C2DClusterInfo, @@ -77,6 +81,8 @@ export class C2DEngineDocker extends C2DEngine { CORE_LOGGER.error('Could not create Docker container: ' + e.message) } } + this.retentionDays = clusterConfig.connection.imageRetentionDays || 7 + this.cleanupInterval = clusterConfig.connection.imageCleanupInterval || 86400 // 24 hours if ( clusterConfig.connection.protocol && clusterConfig.connection.host && @@ -278,6 +284,96 @@ export class C2DEngineDocker extends C2DEngine { if (!this.cronTimer) { this.setNewTimer() } + // Start image cleanup timer + this.startImageCleanupTimer() + } + + public override stop(): Promise { + // Clear the timer and reset the flag + if (this.cronTimer) { + clearTimeout(this.cronTimer) + this.cronTimer = null + } + this.isInternalLoopRunning = false + // Stop image cleanup timer + if (this.imageCleanupTimer) { + clearInterval(this.imageCleanupTimer) + this.imageCleanupTimer = null + CORE_LOGGER.debug('Image cleanup timer stopped') + } + return Promise.resolve() + } + + public async updateImageUsage(image: string): Promise { + try { + await this.db.updateImage(image) + } catch (e) { + CORE_LOGGER.error(`Failed to update image usage for ${image}: ${e.message}`) + } + } + + private async cleanupOldImages(): Promise { + if (!this.docker) return + + try { + const oldImages = await this.db.getOldImages(this.retentionDays) + if (oldImages.length === 0) { + CORE_LOGGER.debug('No old images to clean up') + return + } + + CORE_LOGGER.info(`Starting cleanup of ${oldImages.length} old Docker images`) + let cleaned = 0 + let failed = 0 + + for (const image of oldImages) { + try { + const dockerImage = this.docker.getImage(image) + await dockerImage.remove({ force: true }) + cleaned++ + CORE_LOGGER.info(`Successfully removed old image: ${image}`) + } catch (e) { + failed++ + // Image might be in use or already deleted - log but don't throw + CORE_LOGGER.debug(`Could not remove image ${image}: ${e.message}`) + } + } + + CORE_LOGGER.info( + `Image cleanup completed: ${cleaned} removed, ${failed} failed (may be in use)` + ) + } catch (e) { + CORE_LOGGER.error(`Error during image cleanup: ${e.message}`) + } + } + + private startImageCleanupTimer(): void { + if (this.imageCleanupTimer) { + return // Already running + } + + if (!this.docker) { + CORE_LOGGER.debug('Docker not available, skipping image cleanup timer') + return + } + + // Run initial cleanup after a short delay + setTimeout(() => { + this.cleanupOldImages().catch((e) => { + CORE_LOGGER.error(`Initial image cleanup failed: ${e.message}`) + }) + }, 60000) // Wait 1 minute after start + + // Set up periodic cleanup + this.imageCleanupTimer = setInterval(() => { + this.cleanupOldImages().catch((e) => { + CORE_LOGGER.error(`Periodic image cleanup failed: ${e.message}`) + }) + }, this.cleanupInterval * 1000) + + CORE_LOGGER.info( + `Image cleanup timer started (interval: ${this.cleanupInterval / 60} minutes)` + ) } // eslint-disable-next-line require-await @@ -817,6 +913,17 @@ export class C2DEngineDocker extends C2DEngine { private async InternalLoop() { // this is the internal loop of docker engine // gets list of all running jobs and process them one by one + + // Prevent concurrent execution + if (this.isInternalLoopRunning) { + CORE_LOGGER.debug( + `InternalLoop already running for engine ${this.getC2DConfig().hash}, skipping this execution` + ) + return + } + + this.isInternalLoopRunning = true + if (this.cronTimer) { clearTimeout(this.cronTimer) this.cronTimer = null @@ -826,22 +933,26 @@ export class C2DEngineDocker extends C2DEngine { const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash) if (jobs.length === 0) { - CORE_LOGGER.info('No C2D jobs found for engine ' + this.getC2DConfig().hash) - this.setNewTimer() - return + CORE_LOGGER.debug('No C2D jobs found for engine ' + this.getC2DConfig().hash) } else { - CORE_LOGGER.info(`Got ${jobs.length} jobs for engine ${this.getC2DConfig().hash}`) - CORE_LOGGER.debug(JSON.stringify(jobs)) + CORE_LOGGER.debug( + `Got ${jobs.length} jobs for engine ${this.getC2DConfig().hash}` + ) } - const promises: any = [] - for (const job of jobs) { - promises.push(this.processJob(job)) + + if (jobs.length > 0) { + const promises: any = [] + for (const job of jobs) { + promises.push(this.processJob(job)) + } + // wait for all promises, there is no return + await Promise.all(promises) } - // wait for all promises, there is no return - await Promise.all(promises) } catch (e) { CORE_LOGGER.error(`Error in C2D InternalLoop: ${e.message}`) } finally { + // Reset the flag before setting the timer + this.isInternalLoopRunning = false // set the cron again this.setNewTimer() } @@ -1356,7 +1467,7 @@ export class C2DEngineDocker extends C2DEngine { } } } catch (e) { - // console.error('Container volume not found! ' + e.message) + CORE_LOGGER.error('Container volume not found! ' + e.message) } if (job.algorithm?.meta.container && job.algorithm?.meta.container.dockerfile) { const image = getAlgorithmImage(job.algorithm, job.jobId) @@ -1547,6 +1658,10 @@ export class C2DEngineDocker extends C2DEngine { const logText = `Successfully pulled image: ${job.containerImage}` CORE_LOGGER.debug(logText) appendFileSync(imageLogFile, logText + '\n') + // Track image usage + this.updateImageUsage(job.containerImage).catch((e) => { + CORE_LOGGER.debug(`Failed to track image usage: ${e.message}`) + }) resolve(res) }, (progress: any) => { @@ -1621,7 +1736,6 @@ export class C2DEngineDocker extends C2DEngine { await new Promise((resolve, reject) => { buildStream.on('end', () => { CORE_LOGGER.debug(`Image '${job.containerImage}' built successfully.`) - resolve() }) buildStream.on('error', (err) => { diff --git a/src/components/database/C2DDatabase.ts b/src/components/database/C2DDatabase.ts index 20ac3d7e5..01eff5a09 100644 --- a/src/components/database/C2DDatabase.ts +++ b/src/components/database/C2DDatabase.ts @@ -26,6 +26,7 @@ export class C2DDatabase extends AbstractDatabase { } this.provider = new SQLiteCompute('databases/c2dDatabase.sqlite') await this.provider.createTable() + await this.provider.createImageTable() return this })() as unknown as C2DDatabase @@ -83,6 +84,14 @@ export class C2DDatabase extends AbstractDatabase { return await this.provider.getJobs(environments, fromTimestamp, consumerAddrs) } + async updateImage(image: string): Promise { + return await this.provider.updateImage(image) + } + + async getOldImages(retentionDays: number): Promise { + return await this.provider.getOldImages(retentionDays) + } + /** * * @param environment compute environment to check for diff --git a/src/components/database/sqliteCompute.ts b/src/components/database/sqliteCompute.ts index 74dcbe84a..0ae9823dc 100644 --- a/src/components/database/sqliteCompute.ts +++ b/src/components/database/sqliteCompute.ts @@ -19,6 +19,8 @@ interface ComputeDatabaseProvider { fromTimestamp?: string, consumerAddrs?: string[] ): Promise + updateImage(image: string): Promise + getOldImages(retentionDays: number): Promise } function getInternalStructure(job: DBComputeJob): any { @@ -123,6 +125,66 @@ export class SQLiteCompute implements ComputeDatabaseProvider { }) } + createImageTable(): Promise { + const createTableSQL = ` + CREATE TABLE IF NOT EXISTS docker_images ( + image TEXT PRIMARY KEY, + lastUsedTimestamp INTEGER NOT NULL + ); + ` + return new Promise((resolve, reject) => { + this.db.run(createTableSQL, (err) => { + if (err) { + DATABASE_LOGGER.error('Could not create docker_images table: ' + err.message) + reject(err) + } else { + resolve() + } + }) + }) + } + + updateImage(image: string): Promise { + const timestamp = Math.floor(Date.now() / 1000) // Unix timestamp in seconds + const insertSQL = ` + INSERT OR REPLACE INTO docker_images (image, lastUsedTimestamp) + VALUES (?, ?); + ` + return new Promise((resolve, reject) => { + this.db.run(insertSQL, [image, timestamp], (err) => { + if (err) { + DATABASE_LOGGER.error( + `Could not update image usage for ${image}: ${err.message}` + ) + reject(err) + } else { + DATABASE_LOGGER.debug(`Updated image usage timestamp for ${image}`) + resolve() + } + }) + }) + } + + getOldImages(retentionDays: number = 7): Promise { + const cutoffTimestamp = Math.floor(Date.now() / 1000) - retentionDays * 24 * 60 * 60 + const selectSQL = ` + SELECT image FROM docker_images + WHERE lastUsedTimestamp < ? + ORDER BY lastUsedTimestamp ASC; + ` + return new Promise((resolve, reject) => { + this.db.all(selectSQL, [cutoffTimestamp], (err, rows: any[] | undefined) => { + if (err) { + DATABASE_LOGGER.error(`Could not get old images: ${err.message}`) + reject(err) + } else { + const images = rows ? rows.map((row) => row.image) : [] + resolve(images) + } + }) + }) + } + newJob(job: DBComputeJob): Promise { // TO DO C2D const insertSQL = ` @@ -307,7 +369,7 @@ export class SQLiteCompute implements ComputeDatabaseProvider { }) resolve(filtered) } else { - DATABASE_LOGGER.info('Could not find any running C2D jobs!') + // DATABASE_LOGGER.info('Could not find any running C2D jobs!') resolve([]) } } diff --git a/src/test/integration/imageCleanup.test.ts b/src/test/integration/imageCleanup.test.ts new file mode 100644 index 000000000..00245525b --- /dev/null +++ b/src/test/integration/imageCleanup.test.ts @@ -0,0 +1,325 @@ +import { expect, assert } from 'chai' +import { SQLiteCompute } from '../../components/database/sqliteCompute.js' +import { C2DDatabase } from '../../components/database/C2DDatabase.js' +import { typesenseSchemas } from '../../components/database/TypesenseSchemas.js' +import { getConfiguration } from '../../utils/config.js' +import { + buildEnvOverrideConfig, + OverrideEnvConfig, + setupEnvironment, + tearDownEnvironment, + TEST_ENV_CONFIG_FILE +} from '../utils/utils.js' +import { OceanNodeConfig } from '../../@types/OceanNode.js' +import { ENVIRONMENT_VARIABLES } from '../../utils/constants.js' +import { C2DEngineDocker } from '../../components/c2d/compute_engine_docker.js' +import { Escrow } from '../../components/core/utils/escrow.js' +import { KeyManager } from '../../components/KeyManager/index.js' +import { C2DClusterInfo } from '../../@types/C2D/C2D.js' +import Dockerode from 'dockerode' + +describe('Docker Image Cleanup Integration Tests', () => { + let envOverrides: OverrideEnvConfig[] + let config: OceanNodeConfig + let db: C2DDatabase + let sqliteProvider: SQLiteCompute + let dockerEngine: C2DEngineDocker + let docker: Dockerode + + before(async () => { + envOverrides = buildEnvOverrideConfig( + [ENVIRONMENT_VARIABLES.DOCKER_COMPUTE_ENVIRONMENTS], + [ + JSON.stringify([ + { + socketPath: '/var/run/docker.sock', + resources: [{ id: 'disk', total: 10 }], + storageExpiry: 604800, + maxJobDuration: 3600, + minJobDuration: 60, + fees: { + '1': [ + { + feeToken: '0x123', + prices: [{ id: 'cpu', price: 1 }] + } + ] + }, + access: { + addresses: [], + accessLists: null + }, + imageRetentionDays: 7, + imageCleanupInterval: 60 // 1 minute for testing + } + ]) + ] + ) + envOverrides = await setupEnvironment(TEST_ENV_CONFIG_FILE, envOverrides) + config = await getConfiguration(true) + db = await new C2DDatabase(config.dbConfig, typesenseSchemas.c2dSchemas) + sqliteProvider = (db as any).provider as SQLiteCompute + + // Initialize Docker connection for testing + docker = new Dockerode() + }) + + after(async () => { + await tearDownEnvironment(envOverrides) + if (dockerEngine) { + await dockerEngine.stop() + } + }) + + describe('Image Tracking Database Methods', () => { + it('should create docker_images table', async () => { + await sqliteProvider.createImageTable() + // If no error is thrown, table creation succeeded + assert(true, 'Table creation should succeed') + }) + + it('should update image usage timestamp', async () => { + const testImage = 'test-image:latest' + await sqliteProvider.updateImage(testImage) + + // Verify image was recorded by querying directly + // getOldImages(0) looks for images older than now, so we query the DB directly + const imageRecord = await new Promise((resolve, reject) => { + const { db } = sqliteProvider as any + db.get( + 'SELECT image, lastUsedTimestamp FROM docker_images WHERE image = ?', + [testImage], + (err: Error | null, row: any) => { + if (err) reject(err) + else resolve(row) + } + ) + }) + + assert(imageRecord, 'Image should be recorded in database') + expect(imageRecord.image).to.equal(testImage) + expect(imageRecord.lastUsedTimestamp).to.be.a('number') + const currentTimestamp = Math.floor(Date.now() / 1000) + expect(imageRecord.lastUsedTimestamp).to.be.at.least(currentTimestamp - 1) + }) + + it('should update existing image timestamp', async () => { + const testImage = 'test-image-update:latest' + const firstTimestamp = Math.floor(Date.now() / 1000) + + // Insert image first time + await sqliteProvider.updateImage(testImage) + + // Wait a bit to ensure timestamp difference + await new Promise((resolve) => setTimeout(resolve, 1100)) + + // Update image again - timestamp should be newer + await sqliteProvider.updateImage(testImage) + + // Verify image exists with updated timestamp + const imageRecord = await new Promise((resolve, reject) => { + const { db } = sqliteProvider as any + db.get( + 'SELECT image, lastUsedTimestamp FROM docker_images WHERE image = ?', + [testImage], + (err: Error | null, row: any) => { + if (err) reject(err) + else resolve(row) + } + ) + }) + + assert(imageRecord, 'Image should be recorded in database') + expect(imageRecord.image).to.equal(testImage) + expect(imageRecord.lastUsedTimestamp).to.be.greaterThan(firstTimestamp) + }) + + it('should return old images based on retention days', async () => { + const recentImage = 'recent-image:latest' + const oldImage = 'old-image:latest' + + // Update recent image + await sqliteProvider.updateImage(recentImage) + + // Manually insert an old image by directly updating the database + const oldTimestamp = Math.floor(Date.now() / 1000) - 8 * 24 * 60 * 60 // 8 days ago + await new Promise((resolve, reject) => { + const { db } = sqliteProvider as any + db.run( + 'INSERT OR REPLACE INTO docker_images (image, lastUsedTimestamp) VALUES (?, ?)', + [oldImage, oldTimestamp], + (err: Error | null) => { + if (err) reject(err) + else resolve(undefined) + } + ) + }) + + // Get images older than 7 days + const oldImages = await sqliteProvider.getOldImages(7) + expect(oldImages).to.include(oldImage) + expect(oldImages).to.not.include(recentImage) + }) + + it('should return empty array when no old images exist', async () => { + const recentImage = 'very-recent-image:latest' + await sqliteProvider.updateImage(recentImage) + + const oldImages = await sqliteProvider.getOldImages(30) // 30 days retention + expect(oldImages).to.not.include(recentImage) + }) + }) + + describe('C2DEngineDocker Image Cleanup', () => { + let clusterConfig: C2DClusterInfo + let escrow: Escrow + let keyManager: KeyManager + + before(() => { + // Create minimal cluster config for testing + clusterConfig = { + type: 'docker' as any, + hash: 'test-cluster-hash', + connection: config.dockerComputeEnvironments[0], + tempFolder: '/tmp/test-c2d' + } + + // Create mock escrow and keyManager (minimal setup) + escrow = {} as Escrow + keyManager = {} as KeyManager + + dockerEngine = new C2DEngineDocker(clusterConfig, db, escrow, keyManager) + }) + + it('should track image usage when image is pulled', async () => { + const testImage = 'alpine:latest' + + // Call updateImageUsage directly (using private method access for testing) + await (dockerEngine as any).updateImageUsage(testImage) + + // Verify image was recorded in database + const imageRecord = await new Promise((resolve, reject) => { + const { db } = sqliteProvider as any + db.get( + 'SELECT image, lastUsedTimestamp FROM docker_images WHERE image = ?', + [testImage], + (err: Error | null, row: any) => { + if (err) reject(err) + else resolve(row) + } + ) + }) + + assert(imageRecord, 'Image should be recorded in database after updateImageUsage') + expect(imageRecord.image).to.equal(testImage) + }) + + it('should start image cleanup timer on engine start', () => { + // Check if timer property exists + assert(dockerEngine, 'dockerEngine should be initialized') + expect(dockerEngine).to.have.property('imageCleanupTimer') + }) + + it('should stop image cleanup timer on engine stop', async () => { + await dockerEngine.stop() + // Timer should be cleared + const timer = (dockerEngine as any).imageCleanupTimer + assert(timer === null, 'Timer should be cleared after stop') + }) + + it('should handle cleanup of non-existent images gracefully', async () => { + const nonExistentImage = 'non-existent-image:999999' + + // Manually insert into database + await sqliteProvider.updateImage(nonExistentImage) + + // Try to clean it up - should not throw error + try { + await (dockerEngine as any).cleanupOldImages() + assert(true, 'cleanupOldImages should complete without error') + } catch (e) { + // Cleanup should handle errors gracefully + assert.fail('cleanupOldImages should not throw errors for non-existent images') + } + }) + }) + + describe('Image Cleanup with Real Docker (if available)', () => { + let testImageName: string + + before(async () => { + // Check if Docker is available + try { + await docker.info() + } catch (e) { + // Skip tests if Docker is not available + } + }) + + it('should cleanup old images from Docker', async function () { + // Skip if Docker not available + try { + await docker.info() + } catch (e) { + this.skip() + } + + testImageName = 'alpine:3.18' + + // Pull a test image + try { + await docker.pull(testImageName) + } catch (e) { + // If pull fails, skip test + this.skip() + } + + // Track the image with old timestamp (8 days ago) + const oldTimestamp = Math.floor(Date.now() / 1000) - 8 * 24 * 60 * 60 + await new Promise((resolve, reject) => { + const { db } = sqliteProvider as any + db.run( + 'INSERT OR REPLACE INTO docker_images (image, lastUsedTimestamp) VALUES (?, ?)', + [testImageName, oldTimestamp], + (err: Error | null) => { + if (err) reject(err) + else resolve(undefined) + } + ) + }) + + // Verify image exists before cleanup + const imagesBefore = await docker.listImages() + const imageExistsBefore = imagesBefore.some( + (img) => img.RepoTags && img.RepoTags.includes(testImageName) + ) + + if (imageExistsBefore) { + // Run cleanup + await (dockerEngine as any).cleanupOldImages() + + // Verify cleanup was attempted (may or may not succeed if image in use) + // We just verify the cleanup function ran without error + assert(true, 'cleanupOldImages should complete without error') + } + }) + + after(async function () { + // Clean up test image if it exists + try { + const dockerInfo = await docker.info() + assert(dockerInfo, 'Docker should be available for cleanup') + if (testImageName) { + try { + const image = docker.getImage(testImageName) + await image.remove({ force: true }) + } catch (e) { + // Ignore errors during cleanup + } + } + } catch (e) { + // Docker not available, skip cleanup + } + }) + }) +}) diff --git a/src/utils/config/schemas.ts b/src/utils/config/schemas.ts index 5c56eaaaf..1139b69c5 100644 --- a/src/utils/config/schemas.ts +++ b/src/utils/config/schemas.ts @@ -139,7 +139,9 @@ export const C2DDockerConfigSchema = z.array( }) .optional(), fees: z.record(z.string(), z.array(ComputeEnvFeesSchema)), - free: ComputeEnvironmentFreeOptionsSchema.optional() + free: ComputeEnvironmentFreeOptionsSchema.optional(), + imageRetentionDays: z.number().int().min(1).optional().default(7), + imageCleanupInterval: z.number().int().min(3600).optional().default(86400) // min 1 hour, default 24 hours }) .refine((data) => data.fees !== undefined && Object.keys(data.fees).length > 0, { message: 'There is no fees configuration!'