Skip to content

Commit 90cdfee

Browse files
committed
Added --fail-on-err scheduler arg.
1 parent 27356e3 commit 90cdfee

File tree

3 files changed

+48
-4
lines changed

3 files changed

+48
-4
lines changed

taskiq/cli/scheduler/args.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class SchedulerArgs:
2020
skip_first_run: bool = False
2121
update_interval: int | None = None
2222
loop_interval: int | None = None
23+
fail_on_err: bool = False
2324

2425
@classmethod
2526
def from_cli(cls, args: Sequence[str] | None = None) -> "SchedulerArgs":
@@ -111,6 +112,12 @@ def from_cli(cls, args: Sequence[str] | None = None) -> "SchedulerArgs":
111112
"If not specified, scheduler will run once a second."
112113
),
113114
)
115+
parser.add_argument(
116+
"--fail-on-err",
117+
action="store_false",
118+
dest="fail_on_err",
119+
help="Fail scheduler on any error during schedules collection",
120+
)
114121

115122
namespace = parser.parse_args(args)
116123
# If there are any patterns specified, remove default.

taskiq/cli/scheduler/run.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ def to_tz_aware(time: datetime) -> datetime:
3636
return time
3737

3838

39-
async def get_schedules(source: ScheduleSource) -> list[ScheduledTask]:
39+
async def get_schedules(
40+
source: ScheduleSource,
41+
fail_on_err: bool,
42+
) -> list[ScheduledTask]:
4043
"""
4144
Get schedules from source.
4245
@@ -48,6 +51,13 @@ async def get_schedules(source: ScheduleSource) -> list[ScheduledTask]:
4851
try:
4952
return await source.get_schedules()
5053
except Exception as exc:
54+
if fail_on_err:
55+
logger.warning(
56+
"Cannot update schedules with source: %s",
57+
source,
58+
exc_info=True,
59+
)
60+
raise exc
5161
logger.warning(
5262
"Cannot update schedules with source: %s",
5363
source,
@@ -58,6 +68,7 @@ async def get_schedules(source: ScheduleSource) -> list[ScheduledTask]:
5868

5969
async def get_all_schedules(
6070
scheduler: TaskiqScheduler,
71+
fail_on_err: bool = False,
6172
) -> list[tuple[ScheduleSource, list[ScheduledTask]]]:
6273
"""
6374
Task to update all schedules.
@@ -71,7 +82,7 @@ async def get_all_schedules(
7182
"""
7283
logger.debug("Started schedule update.")
7384
schedules: list[list[ScheduledTask]] = await asyncio.gather(
74-
*[get_schedules(source) for source in scheduler.sources],
85+
*[get_schedules(source, fail_on_err) for source in scheduler.sources],
7586
)
7687
return list(zip(scheduler.sources, schedules, strict=True))
7788

@@ -186,8 +197,10 @@ def __init__(
186197
scheduler: TaskiqScheduler,
187198
*,
188199
event_loop: asyncio.AbstractEventLoop | None = None,
200+
fail_on_err: bool = False,
189201
) -> None:
190202
self.scheduler = scheduler
203+
self.fail_on_err = fail_on_err
191204
self._event_loop = event_loop or asyncio.get_event_loop()
192205

193206
# Variables for control the last run of schedules.
@@ -310,7 +323,7 @@ async def run(
310323

311324
running_schedules: dict[ScheduleId, asyncio.Task[Any]] = {}
312325

313-
self.scheduled_tasks = await get_all_schedules(self.scheduler)
326+
self.scheduled_tasks = await get_all_schedules(self.scheduler, self.fail_on_err)
314327
self.scheduled_tasks_updated_at = datetime.now(tz=timezone.utc)
315328

316329
if skip_first_run:
@@ -405,7 +418,7 @@ async def run_scheduler(args: SchedulerArgs) -> None:
405418
await scheduler.startup()
406419
logger.info("Startup completed.")
407420

408-
scheduler_loop = SchedulerLoop(scheduler)
421+
scheduler_loop = SchedulerLoop(scheduler, fail_on_err=args.fail_on_err)
409422
try:
410423
await scheduler_loop.run(
411424
update_interval=update_interval,

tests/cli/scheduler/test_updater.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from datetime import datetime
22

3+
import pytest
4+
35
from taskiq import InMemoryBroker, ScheduleSource
46
from taskiq.cli.scheduler.run import get_all_schedules
57
from taskiq.scheduler.scheduled_task import ScheduledTask
@@ -80,3 +82,25 @@ async def test_get_schedules_error() -> None:
8082
(source1, source1.schedules),
8183
(source2, []),
8284
]
85+
86+
87+
async def test_get_schedules_fail_on_err() -> None:
88+
"""Tests that if source returned an error, empty list will be returned."""
89+
source1 = DummySource(
90+
[
91+
ScheduledTask(
92+
task_name="a",
93+
labels={},
94+
args=[],
95+
kwargs={},
96+
time=datetime.now(),
97+
),
98+
],
99+
)
100+
source2 = DummySource(Exception("test"))
101+
102+
with pytest.raises(Exception, match="test"):
103+
await get_all_schedules(
104+
TaskiqScheduler(InMemoryBroker(), [source1, source2]),
105+
fail_on_err=True,
106+
)

0 commit comments

Comments
 (0)