Skip to content

Commit bd9d7ac

Browse files
authored
Merge pull request #175 from grillazz/174-implement-scheduler
174 implement scheduler
2 parents db3f728 + 7634531 commit bd9d7ac

File tree

8 files changed

+349
-203
lines changed

8 files changed

+349
-203
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ I've included a few of my favorites to kick things off!
176176
- **[MAR 15 2024]** add polars and calamine to project :heart_eyes_cat:
177177
- **[JUN 8 2024]** implement asyncpg connection pool :fast_forward:
178178
- **[AUG 17 2024]** granian use case implemented with docker compose and rich logger :fast_forward:
179+
- **[OCT 16 2024]** apscheduler added to project :fast_forward:
179180

180181
<p align="right">(<a href="#readme-top">back to top</a>)</p>
181182

app/main.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncpg
2-
from contextlib import asynccontextmanager
3-
2+
from apscheduler.eventbrokers.redis import RedisEventBroker
3+
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
44
from fastapi import FastAPI, Depends
55
from fastapi_cache import FastAPICache
66
from fastapi_cache.backends.redis import RedisBackend
@@ -9,11 +9,17 @@
99
from app.api.shakespeare import router as shakespeare_router
1010
from app.api.stuff import router as stuff_router
1111
from app.config import settings as global_settings
12+
from app.database import engine
1213
from app.utils.logging import AppLogger
1314
from app.api.user import router as user_router
1415
from app.api.health import router as health_router
1516
from app.redis import get_redis, get_cache
1617
from app.services.auth import AuthBearer
18+
from app.services.scheduler import SchedulerMiddleware
19+
20+
from contextlib import asynccontextmanager
21+
22+
from apscheduler import AsyncScheduler
1723

1824
logger = AppLogger().get_logger()
1925

@@ -60,3 +66,16 @@ async def lifespan(_app: FastAPI):
6066
tags=["Health, Bearer"],
6167
dependencies=[Depends(AuthBearer())],
6268
)
69+
70+
_scheduler_data_store = SQLAlchemyDataStore(engine)
71+
_scheduler_event_broker = RedisEventBroker(
72+
client_or_url=global_settings.redis_url.unicode_string()
73+
)
74+
_scheduler_himself = AsyncScheduler(_scheduler_data_store, _scheduler_event_broker)
75+
76+
app.add_middleware(SchedulerMiddleware, scheduler=_scheduler_himself)
77+
78+
79+
# TODO: every not GET meth should reset cache
80+
# TODO: every scheduler task which needs to act on database should have access to connection pool via request - maybe ?
81+
# TODO: https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru

app/models/stuff.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ def compile_sql_or_scalar(func):
1717
async def wrapper(cls, db_session, name, compile_sql=False, *args, **kwargs):
1818
stmt = await func(cls, db_session, name, *args, **kwargs)
1919
if compile_sql:
20-
return stmt.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True})
20+
return stmt.compile(
21+
dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}
22+
)
2123
result = await db_session.execute(stmt)
2224
return result.scalars().first()
2325

app/models/user.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ def password(self):
2727
@password.setter
2828
def password(self, password: SecretStr):
2929
_password_string = password.get_secret_value().encode("utf-8")
30-
self._password = bcrypt.hashpw(
31-
_password_string, bcrypt.gensalt()
32-
)
30+
self._password = bcrypt.hashpw(_password_string, bcrypt.gensalt())
3331

3432
def check_password(self, password: SecretStr):
35-
return bcrypt.checkpw(password.get_secret_value().encode("utf-8"), self._password)
33+
return bcrypt.checkpw(
34+
password.get_secret_value().encode("utf-8"), self._password
35+
)
3636

3737
@classmethod
3838
async def find(cls, database_session: AsyncSession, where_conditions: list[Any]):

app/services/scheduler.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from datetime import datetime
2+
3+
from sqlalchemy import text
4+
from starlette.types import ASGIApp, Receive, Scope, Send
5+
from apscheduler import AsyncScheduler
6+
from apscheduler.triggers.interval import IntervalTrigger
7+
8+
from app.database import AsyncSessionFactory
9+
from app.utils.logging import AppLogger
10+
11+
logger = AppLogger().get_logger()
12+
13+
14+
async def tick():
15+
async with AsyncSessionFactory() as session:
16+
stmt = text("select 1;")
17+
logger.info(f">>>> Be or not to be...{datetime.now()}")
18+
result = await session.execute(stmt)
19+
logger.info(f">>>> Result: {result.scalar()}")
20+
return True
21+
22+
23+
class SchedulerMiddleware:
24+
def __init__(
25+
self,
26+
app: ASGIApp,
27+
scheduler: AsyncScheduler,
28+
) -> None:
29+
self.app = app
30+
self.scheduler = scheduler
31+
32+
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
33+
if scope["type"] == "lifespan":
34+
async with self.scheduler:
35+
await self.scheduler.add_schedule(
36+
tick, IntervalTrigger(seconds=25), id="tick-sql-25"
37+
)
38+
await self.scheduler.start_in_background()
39+
await self.app(scope, receive, send)
40+
else:
41+
await self.app(scope, receive, send)

app/utils/logging.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@ def get_logger(self):
2020
class RichConsoleHandler(RichHandler):
2121
def __init__(self, width=200, style=None, **kwargs):
2222
super().__init__(
23-
console=Console(color_system="256", width=width, style=style, stderr=True), **kwargs
23+
console=Console(color_system="256", width=width, style=style, stderr=True),
24+
**kwargs
2425
)

0 commit comments

Comments
 (0)