From 28686062762628da156f527ad14cfbc48fe817a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9o=20Sabatti=C3=A9?= Date: Wed, 4 Jun 2025 19:00:36 +0200 Subject: [PATCH] Added: auth_for_result property to make result private/only accessible by admin --- app/crud.py | 12 +++++++- app/main.py | 5 ++-- app/models.py | 1 + app/schemas.py | 2 ++ app/tests/test_api.py | 66 ++++++++++++++++++++++++++++++++++++++----- 5 files changed, 76 insertions(+), 10 deletions(-) diff --git a/app/crud.py b/app/crud.py index 9aaf44a..09d182d 100644 --- a/app/crud.py +++ b/app/crud.py @@ -330,6 +330,7 @@ def update_election( "date_end", "hide_results", "force_close", + "auth_for_result", ]: if getattr(election, key) is None: continue @@ -513,11 +514,20 @@ def get_ballot(db: Session, token: str) -> schemas.BallotGet: return schemas.BallotGet(token=token, votes=votes_get, election=election) -def get_results(db: Session, election_ref: str) -> schemas.ResultsGet: +def get_results(db: Session, election_ref: str, token: t.Optional[str]) -> schemas.ResultsGet: db_election = get_election(db, election_ref) if db_election is None: raise errors.NotFoundError("elections") + if db_election.auth_for_result: + if token is None: + raise errors.UnauthorizedError("Election require auth for result, you need to set Authentication header") + + payload = jws_verify(token) + + if payload["election"] != election_ref: + raise errors.UnauthorizedError("Wrong authentication for this election") + if ( db_election.hide_results and (db_election.date_end is not None and db_election.date_end > datetime.now()) diff --git a/app/main.py b/app/main.py index 61b62df..790dddb 100644 --- a/app/main.py +++ b/app/main.py @@ -142,5 +142,6 @@ def get_ballot(authorization: str = Header(), db: Session = Depends(get_db)): @app.get("/results/{election_ref}", response_model=schemas.ResultsGet) -def get_results(election_ref: str, db: Session = Depends(get_db)): - return crud.get_results(db=db, election_ref=election_ref) +def get_results(election_ref: str, authorization: t.Optional[str] = Header(default=None), db: Session = Depends(get_db)): + token = authorization.split("Bearer ")[1] if authorization else None + return crud.get_results(db=db, token=token, election_ref=election_ref) diff --git a/app/models.py b/app/models.py index 48c5f80..2525050 100644 --- a/app/models.py +++ b/app/models.py @@ -19,6 +19,7 @@ class Election(Base): hide_results = Column(Boolean, default=False) restricted = Column(Boolean, default=False) force_close = Column(Boolean, default=False) + auth_for_result = Column(Boolean, default=False) grades = relationship("Grade", back_populates="election") candidates = relationship("Candidate", back_populates="election") diff --git a/app/schemas.py b/app/schemas.py index 78c1853..8fdab59 100644 --- a/app/schemas.py +++ b/app/schemas.py @@ -114,6 +114,7 @@ class ElectionBase(BaseModel): date_end: datetime | int | str | None = Field(default_factory=_in_a_long_time) hide_results: bool = True restricted: bool = False + auth_for_result: bool = False @field_validator("date_end", "date_start", mode="before") @classmethod @@ -221,6 +222,7 @@ class ElectionUpdate(BaseModel): num_voters: int | None = None force_close: bool | None = None candidates: list[CandidateUpdate] | None = None + auth_for_result: bool | None = None @field_validator("date_end", "date_start", mode="before") @classmethod diff --git a/app/tests/test_api.py b/app/tests/test_api.py index 4ad6d17..69c4835 100644 --- a/app/tests/test_api.py +++ b/app/tests/test_api.py @@ -59,6 +59,7 @@ class RandomElection(t.TypedDict): hide_results: bool num_voters: int date_end: t.Optional[str] + auth_for_result: bool def _random_election(num_candidates: int, num_grades: int) -> RandomElection: @@ -78,6 +79,7 @@ def _random_election(num_candidates: int, num_grades: int) -> RandomElection: "hide_results": False, "num_voters": 0, "date_end": None, + "auth_for_result": False, } @@ -537,6 +539,47 @@ def test_get_results_with_hide_results(): assert response.status_code == 200, response.text +def test_get_results_with_auth_for_result(): + # Create a random election + body = _random_election(10, 5) + body["auth_for_result"] = True + body["date_end"] = (datetime.now() + timedelta(days=1)).isoformat() + response = client.post("/elections", json=body) + assert response.status_code == 200, response.content + data = response.json() + election_ref = data["ref"] + admin_token = data["admin"] + + # We create votes using the ID + votes = _generate_votes_from_response("id", data) + response = client.post( + f"/ballots", json={"votes": votes, "election_ref": election_ref} + ) + assert response.status_code == 200, data + + response = client.put( + f"/elections", json=data, headers={"Authorization": f"Bearer {admin_token}"} + ) + + assert response.status_code == 200, response.text + + # But, we can't get the results + response = client.get(f"/results/{election_ref}") + assert response.status_code == 401, data + + # Now, we can access to the results + response = client.get(f"/results/{election_ref}", headers={"Authorization": f"Bearer {admin_token}"}) + assert response.status_code == 200, response.text + + # Ensure other admin tokens can't access the results + response = client.post("/elections", json=body) + assert response.status_code == 200, response.content + data2 = response.json() + admin_token2 = data2["admin"] + + response = client.get(f"/results/{election_ref}", headers={"Authorization": f"Bearer {admin_token2}"}) + assert response.status_code == 401, data + def test_update_election(): # Create a random election body = _random_election(10, 5) @@ -545,7 +588,7 @@ def test_update_election(): data = response.json() new_name = f'{data["name"]}_MODIFIED' data["name"] = new_name - ballot_token = data["admin"] + admin_token = data["admin"] # Check we can not update without the ballot_token response = client.put("/elections", json=data) @@ -553,13 +596,22 @@ def test_update_election(): # Check that the request fails with a wrong ballot_token response = client.put( - f"/elections", json=data, headers={"Authorization": f"Bearer {ballot_token}WRONG"} + f"/elections", json=data, headers={"Authorization": f"Bearer {admin_token}WRONG"} ) - assert response.status_code == 401, response.text + assert response.status_code == 401, response.text + + # Check that the request fails with a admnin token of other election + response2 = client.post("/elections", json=body) + data2 = response2.json() + admin_token2 = data2["admin"] + response = client.put( + f"/elections", json=data, headers={"Authorization": f"Bearer {admin_token2}"} + ) + assert response.status_code == 403, response.text # But it works with the right ballot_token response = client.put( - f"/elections", json=data, headers={"Authorization": f"Bearer {ballot_token}"} + f"/elections", json=data, headers={"Authorization": f"Bearer {admin_token}"} ) assert response.status_code == 200, response.text response2 = client.get(f"/elections/{data['ref']}") @@ -575,7 +627,7 @@ def test_update_election(): data["grades"][0]["description"] += "MODIFIED" data["grades"][0]["value"] += 10 response = client.put( - f"/elections", json=data, headers={"Authorization": f"Bearer {ballot_token}"} + f"/elections", json=data, headers={"Authorization": f"Bearer {admin_token}"} ) assert response.status_code == 200, response.text data = response.json() @@ -586,14 +638,14 @@ def test_update_election(): data2 = copy.deepcopy(data) del data2["candidates"][-1] response = client.put( - f"/elections", json=data2, headers={"Authorization": f"Bearer {ballot_token}"} + f"/elections", json=data2, headers={"Authorization": f"Bearer {admin_token}"} ) assert response.status_code == 403, response.text data2 = copy.deepcopy(data) data2["grades"][0]["id"] += 100 response = client.put( - f"/elections", json=data2, headers={"Authorization": f"Bearer {ballot_token}"} + f"/elections", json=data2, headers={"Authorization": f"Bearer {admin_token}"} ) assert response.status_code == 403, response.text