Skip to content

Commit a96f775

Browse files
committed
Use scheduler to explicitly queue pipeline execution jobs
Signed-off-by: Keshav Priyadarshi <git@keshav.space>
1 parent ae1a260 commit a96f775

File tree

3 files changed

+35
-17
lines changed

3 files changed

+35
-17
lines changed

vulnerabilities/management/commands/run_scheduler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
def init_pipeline_scheduled():
2020
"""Initialize schedule jobs for active PipelineSchedule."""
21-
active_pipeline_qs = models.PipelineSchedule.objects.filter(is_active=True)
21+
active_pipeline_qs = models.PipelineSchedule.objects.filter(is_active=True).order_by(
22+
"created_date"
23+
)
2224
for pipeline_schedule in active_pipeline_qs:
2325
if scheduled_job_exists(pipeline_schedule.schedule_work_id):
2426
continue

vulnerabilities/schedules.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,37 @@
77
# See https://aboutcode.org for more information about nexB OSS projects.
88
#
99

10+
import datetime
1011
import logging
1112

1213
import django_rq
1314
from redis.exceptions import ConnectionError
1415

15-
from vulnerabilities.tasks import execute_pipeline
16+
from vulnerabilities.tasks import enqueue_pipeline
1617
from vulnerablecode.settings import VULNERABLECODE_PIPELINE_TIMEOUT
1718

1819
log = logging.getLogger(__name__)
1920
scheduler = django_rq.get_scheduler()
2021

2122

22-
def schedule_execution(pipeline_schedule):
23+
def schedule_execution(pipeline_schedule, execute_now=False):
2324
"""
2425
Takes a `PackageSchedule` object as input and schedule a
2526
recurring job using `rq_scheduler` to execute the pipeline.
2627
"""
27-
first_execution = pipeline_schedule.next_run_date
28+
first_execution = datetime.datetime.now(tz=datetime.timezone.utc)
29+
if not execute_now:
30+
first_execution = pipeline_schedule.next_run_date
31+
2832
interval_in_seconds = pipeline_schedule.run_interval * 24 * 60 * 60
2933

3034
job = scheduler.schedule(
3135
scheduled_time=first_execution,
32-
func=execute_pipeline,
36+
func=enqueue_pipeline,
3337
args=[pipeline_schedule.pipeline_id],
3438
interval=interval_in_seconds,
35-
result_ttl=interval_in_seconds, # Remove job results after next run
3639
timeout=VULNERABLECODE_PIPELINE_TIMEOUT,
37-
repeat=None, # None for repeat forever
40+
repeat=None,
3841
)
3942
return job._id
4043

@@ -89,6 +92,6 @@ def update_pipeline_schedule():
8992
from vulnerabilities.models import PipelineSchedule
9093

9194
pipeline_ids = [*IMPORTERS_REGISTRY.keys(), *IMPROVERS_REGISTRY.keys()]
92-
# pipeline_ids = ["nvd_importer", "vulnerabilities.importers.curl.CurlImporter"]
95+
9396
PipelineSchedule.objects.exclude(pipeline_id__in=pipeline_ids).delete()
9497
[PipelineSchedule.objects.get_or_create(pipeline_id=id) for id in pipeline_ids]

vulnerabilities/tasks.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,26 @@
1212
from io import StringIO
1313
from traceback import format_exc as traceback_format_exc
1414

15-
from rq import get_current_job
15+
import django_rq
1616

1717
from vulnerabilities import models
1818
from vulnerabilities.importer import Importer
1919
from vulnerabilities.improver import Improver
20+
from vulnerablecode.settings import VULNERABLECODE_PIPELINE_TIMEOUT
2021

2122
logger = logging.getLogger(__name__)
2223

24+
queue = django_rq.get_queue("default")
2325

24-
def execute_pipeline(pipeline_id):
26+
27+
def execute_pipeline(pipeline_id, run_id):
2528
from vulnerabilities.pipelines import VulnerableCodePipeline
2629

2730
logger.info(f"Enter `execute_pipeline` {pipeline_id}")
2831

29-
pipeline_schedule = models.PipelineSchedule.objects.get(pipeline_id=pipeline_id)
30-
job = get_current_job()
31-
32-
run = models.PipelineRun.objects.create(
33-
pipeline=pipeline_schedule,
34-
run_id=job.id,
32+
run = models.PipelineRun.objects.get(
33+
run_id=run_id,
3534
)
36-
3735
run.set_vulnerablecode_version_and_commit()
3836
run.set_run_started()
3937

@@ -104,3 +102,18 @@ def set_run_failure(job, connection, type, value, traceback):
104102
return
105103

106104
run.set_run_ended(exitcode=1, output=f"value={value} trace={traceback}")
105+
106+
107+
def enqueue_pipeline(pipeline_id):
108+
pipeline_schedule = models.PipelineSchedule.objects.get(pipeline_id=pipeline_id)
109+
run = models.PipelineRun.objects.create(
110+
pipeline=pipeline_schedule,
111+
)
112+
job = queue.enqueue(
113+
execute_pipeline,
114+
pipeline_id,
115+
run.run_id,
116+
job_id=str(run.run_id),
117+
on_failure=set_run_failure,
118+
job_timeout=VULNERABLECODE_PIPELINE_TIMEOUT,
119+
)

0 commit comments

Comments
 (0)