Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/env.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable should be a JSON array of
[
{
"socketPath": "/var/run/docker.sock",
"imageRetentionDays": 7,
"imageCleanupInterval": 86400,
"resources": [
{
"id": "disk",
Expand Down Expand Up @@ -193,6 +195,8 @@ The `DOCKER_COMPUTE_ENVIRONMENTS` environment variable should be a JSON array of
#### Configuration Options

- **socketPath**: Path to the Docker socket (e.g., docker.sock).
- **imageRetentionDays** - how long docker images are kept, in days. Default: 7
- **imageCleanupInterval** - how often to run cleanup for docker images, in seconds. Min: 3600 (1hour), Default: 86400 (24 hours)
- **storageExpiry**: Amount of seconds for storage expiry.(Mandatory)
- **maxJobDuration**: Maximum duration in seconds for a job.(Mandatory)
- **minJobDuration**: Minimum duration in seconds for a job.(Mandatory)
Expand Down
2 changes: 2 additions & 0 deletions src/@types/C2D/C2D.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ export interface C2DDockerConfig {
resources?: ComputeResource[] // optional, owner can overwrite
free?: ComputeEnvironmentFreeOptions
access: ComputeAccessList
imageRetentionDays?: number // Default: 7 days
imageCleanupInterval?: number // Default: 86400 seconds (24 hours)
}

export type ComputeResultType =
Expand Down
138 changes: 126 additions & 12 deletions src/components/c2d/compute_engine_docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ export class C2DEngineDocker extends C2DEngine {
private cronTimer: any
private cronTime: number = 2000
private jobImageSizes: Map<string, number> = new Map()
private isInternalLoopRunning: boolean = false
private imageCleanupTimer: NodeJS.Timeout | null = null
private static DEFAULT_DOCKER_REGISTRY = 'https://registry-1.docker.io'
private retentionDays: number
private cleanupInterval: number

public constructor(
clusterConfig: C2DClusterInfo,
Expand All @@ -77,6 +81,8 @@ export class C2DEngineDocker extends C2DEngine {
CORE_LOGGER.error('Could not create Docker container: ' + e.message)
}
}
this.retentionDays = clusterConfig.connection.imageRetentionDays || 7
this.cleanupInterval = clusterConfig.connection.imageCleanupInterval || 86400 // 24 hours
if (
clusterConfig.connection.protocol &&
clusterConfig.connection.host &&
Expand Down Expand Up @@ -278,6 +284,96 @@ export class C2DEngineDocker extends C2DEngine {
if (!this.cronTimer) {
this.setNewTimer()
}
// Start image cleanup timer
this.startImageCleanupTimer()
}

public override stop(): Promise<void> {
// Clear the timer and reset the flag
if (this.cronTimer) {
clearTimeout(this.cronTimer)
this.cronTimer = null
}
this.isInternalLoopRunning = false
// Stop image cleanup timer
if (this.imageCleanupTimer) {
clearInterval(this.imageCleanupTimer)
this.imageCleanupTimer = null
CORE_LOGGER.debug('Image cleanup timer stopped')
}
return Promise.resolve()
}

public async updateImageUsage(image: string): Promise<void> {
try {
await this.db.updateImage(image)
} catch (e) {
CORE_LOGGER.error(`Failed to update image usage for ${image}: ${e.message}`)
}
}

private async cleanupOldImages(): Promise<void> {
if (!this.docker) return

try {
const oldImages = await this.db.getOldImages(this.retentionDays)
if (oldImages.length === 0) {
CORE_LOGGER.debug('No old images to clean up')
return
}

CORE_LOGGER.info(`Starting cleanup of ${oldImages.length} old Docker images`)
let cleaned = 0
let failed = 0

for (const image of oldImages) {
try {
const dockerImage = this.docker.getImage(image)
await dockerImage.remove({ force: true })
cleaned++
CORE_LOGGER.info(`Successfully removed old image: ${image}`)
} catch (e) {
failed++
// Image might be in use or already deleted - log but don't throw
CORE_LOGGER.debug(`Could not remove image ${image}: ${e.message}`)
}
}
Comment on lines +329 to +340
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would there be any advantages to do this in parallel with promise.all?


CORE_LOGGER.info(
`Image cleanup completed: ${cleaned} removed, ${failed} failed (may be in use)`
)
} catch (e) {
CORE_LOGGER.error(`Error during image cleanup: ${e.message}`)
}
}

private startImageCleanupTimer(): void {
if (this.imageCleanupTimer) {
return // Already running
}

if (!this.docker) {
CORE_LOGGER.debug('Docker not available, skipping image cleanup timer')
return
}

// Run initial cleanup after a short delay
setTimeout(() => {
this.cleanupOldImages().catch((e) => {
CORE_LOGGER.error(`Initial image cleanup failed: ${e.message}`)
})
}, 60000) // Wait 1 minute after start

// Set up periodic cleanup
this.imageCleanupTimer = setInterval(() => {
this.cleanupOldImages().catch((e) => {
CORE_LOGGER.error(`Periodic image cleanup failed: ${e.message}`)
})
}, this.cleanupInterval * 1000)

CORE_LOGGER.info(
`Image cleanup timer started (interval: ${this.cleanupInterval / 60} minutes)`
)
}

// eslint-disable-next-line require-await
Expand Down Expand Up @@ -817,6 +913,17 @@ export class C2DEngineDocker extends C2DEngine {
private async InternalLoop() {
// this is the internal loop of docker engine
// gets list of all running jobs and process them one by one

// Prevent concurrent execution
if (this.isInternalLoopRunning) {
CORE_LOGGER.debug(
`InternalLoop already running for engine ${this.getC2DConfig().hash}, skipping this execution`
)
return
}

this.isInternalLoopRunning = true

if (this.cronTimer) {
clearTimeout(this.cronTimer)
this.cronTimer = null
Expand All @@ -826,22 +933,26 @@ export class C2DEngineDocker extends C2DEngine {
const jobs = await this.db.getRunningJobs(this.getC2DConfig().hash)

if (jobs.length === 0) {
CORE_LOGGER.info('No C2D jobs found for engine ' + this.getC2DConfig().hash)
this.setNewTimer()
return
CORE_LOGGER.debug('No C2D jobs found for engine ' + this.getC2DConfig().hash)
} else {
CORE_LOGGER.info(`Got ${jobs.length} jobs for engine ${this.getC2DConfig().hash}`)
CORE_LOGGER.debug(JSON.stringify(jobs))
CORE_LOGGER.debug(
`Got ${jobs.length} jobs for engine ${this.getC2DConfig().hash}`
)
}
const promises: any = []
for (const job of jobs) {
promises.push(this.processJob(job))

if (jobs.length > 0) {
const promises: any = []
for (const job of jobs) {
promises.push(this.processJob(job))
}
// wait for all promises, there is no return
await Promise.all(promises)
}
// wait for all promises, there is no return
await Promise.all(promises)
} catch (e) {
CORE_LOGGER.error(`Error in C2D InternalLoop: ${e.message}`)
} finally {
// Reset the flag before setting the timer
this.isInternalLoopRunning = false
// set the cron again
this.setNewTimer()
}
Expand Down Expand Up @@ -1356,7 +1467,7 @@ export class C2DEngineDocker extends C2DEngine {
}
}
} catch (e) {
// console.error('Container volume not found! ' + e.message)
CORE_LOGGER.error('Container volume not found! ' + e.message)
}
if (job.algorithm?.meta.container && job.algorithm?.meta.container.dockerfile) {
const image = getAlgorithmImage(job.algorithm, job.jobId)
Expand Down Expand Up @@ -1547,6 +1658,10 @@ export class C2DEngineDocker extends C2DEngine {
const logText = `Successfully pulled image: ${job.containerImage}`
CORE_LOGGER.debug(logText)
appendFileSync(imageLogFile, logText + '\n')
// Track image usage
this.updateImageUsage(job.containerImage).catch((e) => {
CORE_LOGGER.debug(`Failed to track image usage: ${e.message}`)
})
resolve(res)
},
(progress: any) => {
Expand Down Expand Up @@ -1621,7 +1736,6 @@ export class C2DEngineDocker extends C2DEngine {
await new Promise<void>((resolve, reject) => {
buildStream.on('end', () => {
CORE_LOGGER.debug(`Image '${job.containerImage}' built successfully.`)

resolve()
})
buildStream.on('error', (err) => {
Expand Down
9 changes: 9 additions & 0 deletions src/components/database/C2DDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export class C2DDatabase extends AbstractDatabase {
}
this.provider = new SQLiteCompute('databases/c2dDatabase.sqlite')
await this.provider.createTable()
await this.provider.createImageTable()

return this
})() as unknown as C2DDatabase
Expand Down Expand Up @@ -83,6 +84,14 @@ export class C2DDatabase extends AbstractDatabase {
return await this.provider.getJobs(environments, fromTimestamp, consumerAddrs)
}

async updateImage(image: string): Promise<void> {
return await this.provider.updateImage(image)
}

async getOldImages(retentionDays: number): Promise<string[]> {
return await this.provider.getOldImages(retentionDays)
}

/**
*
* @param environment compute environment to check for
Expand Down
64 changes: 63 additions & 1 deletion src/components/database/sqliteCompute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ interface ComputeDatabaseProvider {
fromTimestamp?: string,
consumerAddrs?: string[]
): Promise<DBComputeJob[]>
updateImage(image: string): Promise<void>
getOldImages(retentionDays: number): Promise<string[]>
}

function getInternalStructure(job: DBComputeJob): any {
Expand Down Expand Up @@ -123,6 +125,66 @@ export class SQLiteCompute implements ComputeDatabaseProvider {
})
}

createImageTable(): Promise<void> {
const createTableSQL = `
CREATE TABLE IF NOT EXISTS docker_images (
image TEXT PRIMARY KEY,
lastUsedTimestamp INTEGER NOT NULL
);
`
return new Promise<void>((resolve, reject) => {
this.db.run(createTableSQL, (err) => {
if (err) {
DATABASE_LOGGER.error('Could not create docker_images table: ' + err.message)
reject(err)
} else {
resolve()
}
})
})
}

updateImage(image: string): Promise<void> {
const timestamp = Math.floor(Date.now() / 1000) // Unix timestamp in seconds
const insertSQL = `
INSERT OR REPLACE INTO docker_images (image, lastUsedTimestamp)
VALUES (?, ?);
`
return new Promise<void>((resolve, reject) => {
this.db.run(insertSQL, [image, timestamp], (err) => {
if (err) {
DATABASE_LOGGER.error(
`Could not update image usage for ${image}: ${err.message}`
)
reject(err)
} else {
DATABASE_LOGGER.debug(`Updated image usage timestamp for ${image}`)
resolve()
}
})
})
}

getOldImages(retentionDays: number = 7): Promise<string[]> {
const cutoffTimestamp = Math.floor(Date.now() / 1000) - retentionDays * 24 * 60 * 60
const selectSQL = `
SELECT image FROM docker_images
WHERE lastUsedTimestamp < ?
ORDER BY lastUsedTimestamp ASC;
`
return new Promise<string[]>((resolve, reject) => {
this.db.all(selectSQL, [cutoffTimestamp], (err, rows: any[] | undefined) => {
if (err) {
DATABASE_LOGGER.error(`Could not get old images: ${err.message}`)
reject(err)
} else {
const images = rows ? rows.map((row) => row.image) : []
resolve(images)
}
})
})
}

newJob(job: DBComputeJob): Promise<string> {
// TO DO C2D
const insertSQL = `
Expand Down Expand Up @@ -307,7 +369,7 @@ export class SQLiteCompute implements ComputeDatabaseProvider {
})
resolve(filtered)
} else {
DATABASE_LOGGER.info('Could not find any running C2D jobs!')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this log?

// DATABASE_LOGGER.info('Could not find any running C2D jobs!')
resolve([])
}
}
Expand Down
Loading
Loading