Skip to content

Commit 171a801

Browse files
committed
add scheduler middleware
1 parent db3f728 commit 171a801

File tree

7 files changed

+345
-203
lines changed

7 files changed

+345
-203
lines changed

app/main.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncpg
2-
from contextlib import asynccontextmanager
3-
2+
from apscheduler.eventbrokers.redis import RedisEventBroker
3+
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
44
from fastapi import FastAPI, Depends
55
from fastapi_cache import FastAPICache
66
from fastapi_cache.backends.redis import RedisBackend
@@ -9,11 +9,17 @@
99
from app.api.shakespeare import router as shakespeare_router
1010
from app.api.stuff import router as stuff_router
1111
from app.config import settings as global_settings
12+
from app.database import engine
1213
from app.utils.logging import AppLogger
1314
from app.api.user import router as user_router
1415
from app.api.health import router as health_router
1516
from app.redis import get_redis, get_cache
1617
from app.services.auth import AuthBearer
18+
from app.services.scheduler import SchedulerMiddleware
19+
20+
from contextlib import asynccontextmanager
21+
22+
from apscheduler import AsyncScheduler
1723

1824
logger = AppLogger().get_logger()
1925

@@ -60,3 +66,16 @@ async def lifespan(_app: FastAPI):
6066
tags=["Health, Bearer"],
6167
dependencies=[Depends(AuthBearer())],
6268
)
69+
70+
_scheduler_data_store = SQLAlchemyDataStore(engine)
71+
_scheduler_event_broker = RedisEventBroker(
72+
client_or_url=global_settings.redis_url.unicode_string()
73+
)
74+
_scheduler_himself = AsyncScheduler(_scheduler_data_store, _scheduler_event_broker)
75+
76+
app.add_middleware(SchedulerMiddleware, scheduler=_scheduler_himself)
77+
78+
79+
# TODO: every not GET meth should reset cache
80+
# TODO: every scheduler task which needs to act on database hsould have access to connection pool via request
81+
# TODO: https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru

app/models/stuff.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ def compile_sql_or_scalar(func):
1717
async def wrapper(cls, db_session, name, compile_sql=False, *args, **kwargs):
1818
stmt = await func(cls, db_session, name, *args, **kwargs)
1919
if compile_sql:
20-
return stmt.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True})
20+
return stmt.compile(
21+
dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}
22+
)
2123
result = await db_session.execute(stmt)
2224
return result.scalars().first()
2325

app/models/user.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ def password(self):
2727
@password.setter
2828
def password(self, password: SecretStr):
2929
_password_string = password.get_secret_value().encode("utf-8")
30-
self._password = bcrypt.hashpw(
31-
_password_string, bcrypt.gensalt()
32-
)
30+
self._password = bcrypt.hashpw(_password_string, bcrypt.gensalt())
3331

3432
def check_password(self, password: SecretStr):
35-
return bcrypt.checkpw(password.get_secret_value().encode("utf-8"), self._password)
33+
return bcrypt.checkpw(
34+
password.get_secret_value().encode("utf-8"), self._password
35+
)
3636

3737
@classmethod
3838
async def find(cls, database_session: AsyncSession, where_conditions: list[Any]):

app/services/scheduler.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from datetime import datetime
2+
3+
from starlette.types import ASGIApp, Receive, Scope, Send
4+
5+
from apscheduler import AsyncScheduler
6+
from apscheduler.triggers.interval import IntervalTrigger
7+
8+
from app.utils.logging import AppLogger
9+
10+
logger = AppLogger().get_logger()
11+
12+
13+
def tick():
14+
15+
logger.info(f">>>> Be or not to be...{datetime.now()}")
16+
17+
18+
class SchedulerMiddleware:
19+
# TODO: need access to request to be able to get db conn pool
20+
21+
def __init__(
22+
self,
23+
app: ASGIApp,
24+
scheduler: AsyncScheduler,
25+
) -> None:
26+
self.app = app
27+
self.scheduler = scheduler
28+
29+
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
30+
if scope["type"] == "lifespan":
31+
async with self.scheduler:
32+
await self.scheduler.add_schedule(
33+
tick, IntervalTrigger(seconds=25), id="tick"
34+
)
35+
await self.scheduler.start_in_background()
36+
await self.app(scope, receive, send)
37+
else:
38+
await self.app(scope, receive, send)

app/utils/logging.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@ def get_logger(self):
2020
class RichConsoleHandler(RichHandler):
2121
def __init__(self, width=200, style=None, **kwargs):
2222
super().__init__(
23-
console=Console(color_system="256", width=width, style=style, stderr=True), **kwargs
23+
console=Console(color_system="256", width=width, style=style, stderr=True),
24+
**kwargs
2425
)

0 commit comments

Comments
 (0)