Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion app/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ def update_election(
"date_end",
"hide_results",
"force_close",
"auth_for_result",
]:
if getattr(election, key) is None:
continue
Expand Down Expand Up @@ -513,11 +514,20 @@ def get_ballot(db: Session, token: str) -> schemas.BallotGet:
return schemas.BallotGet(token=token, votes=votes_get, election=election)


def get_results(db: Session, election_ref: str) -> schemas.ResultsGet:
def get_results(db: Session, election_ref: str, token: t.Optional[str]) -> schemas.ResultsGet:
db_election = get_election(db, election_ref)
if db_election is None:
raise errors.NotFoundError("elections")

if db_election.auth_for_result:
if token is None:
raise errors.UnauthorizedError("Election require auth for result, you need to set Authentication header")

payload = jws_verify(token)

if payload["election"] != election_ref:
raise errors.UnauthorizedError("Wrong authentication for this election")

if (
db_election.hide_results
and (db_election.date_end is not None and db_election.date_end > datetime.now())
Expand Down
5 changes: 3 additions & 2 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,6 @@ def get_ballot(authorization: str = Header(), db: Session = Depends(get_db)):


@app.get("/results/{election_ref}", response_model=schemas.ResultsGet)
def get_results(election_ref: str, db: Session = Depends(get_db)):
return crud.get_results(db=db, election_ref=election_ref)
def get_results(election_ref: str, authorization: t.Optional[str] = Header(default=None), db: Session = Depends(get_db)):
token = authorization.split("Bearer ")[1] if authorization else None
return crud.get_results(db=db, token=token, election_ref=election_ref)
1 change: 1 addition & 0 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Election(Base):
hide_results = Column(Boolean, default=False)
restricted = Column(Boolean, default=False)
force_close = Column(Boolean, default=False)
auth_for_result = Column(Boolean, default=False)

grades = relationship("Grade", back_populates="election")
candidates = relationship("Candidate", back_populates="election")
Expand Down
2 changes: 2 additions & 0 deletions app/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class ElectionBase(BaseModel):
date_end: datetime | int | str | None = Field(default_factory=_in_a_long_time)
hide_results: bool = True
restricted: bool = False
auth_for_result: bool = False

@field_validator("date_end", "date_start", mode="before")
@classmethod
Expand Down Expand Up @@ -221,6 +222,7 @@ class ElectionUpdate(BaseModel):
num_voters: int | None = None
force_close: bool | None = None
candidates: list[CandidateUpdate] | None = None
auth_for_result: bool | None = None

@field_validator("date_end", "date_start", mode="before")
@classmethod
Expand Down
66 changes: 59 additions & 7 deletions app/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class RandomElection(t.TypedDict):
hide_results: bool
num_voters: int
date_end: t.Optional[str]
auth_for_result: bool


def _random_election(num_candidates: int, num_grades: int) -> RandomElection:
Expand All @@ -78,6 +79,7 @@ def _random_election(num_candidates: int, num_grades: int) -> RandomElection:
"hide_results": False,
"num_voters": 0,
"date_end": None,
"auth_for_result": False,
}


Expand Down Expand Up @@ -537,6 +539,47 @@ def test_get_results_with_hide_results():
assert response.status_code == 200, response.text


def test_get_results_with_auth_for_result():
# Create a random election
body = _random_election(10, 5)
body["auth_for_result"] = True
body["date_end"] = (datetime.now() + timedelta(days=1)).isoformat()
response = client.post("/elections", json=body)
assert response.status_code == 200, response.content
data = response.json()
election_ref = data["ref"]
admin_token = data["admin"]

# We create votes using the ID
votes = _generate_votes_from_response("id", data)
response = client.post(
f"/ballots", json={"votes": votes, "election_ref": election_ref}
)
assert response.status_code == 200, data

response = client.put(
f"/elections", json=data, headers={"Authorization": f"Bearer {admin_token}"}
)

assert response.status_code == 200, response.text

# But, we can't get the results
response = client.get(f"/results/{election_ref}")
assert response.status_code == 401, data

# Now, we can access to the results
response = client.get(f"/results/{election_ref}", headers={"Authorization": f"Bearer {admin_token}"})
assert response.status_code == 200, response.text

# Ensure other admin tokens can't access the results
response = client.post("/elections", json=body)
assert response.status_code == 200, response.content
data2 = response.json()
admin_token2 = data2["admin"]

response = client.get(f"/results/{election_ref}", headers={"Authorization": f"Bearer {admin_token2}"})
assert response.status_code == 401, data

def test_update_election():
# Create a random election
body = _random_election(10, 5)
Expand All @@ -545,21 +588,30 @@ def test_update_election():
data = response.json()
new_name = f'{data["name"]}_MODIFIED'
data["name"] = new_name
ballot_token = data["admin"]
admin_token = data["admin"]

# Check we can not update without the ballot_token
response = client.put("/elections", json=data)
assert response.status_code == 422, response.content

# Check that the request fails with a wrong ballot_token
response = client.put(
f"/elections", json=data, headers={"Authorization": f"Bearer {ballot_token}WRONG"}
f"/elections", json=data, headers={"Authorization": f"Bearer {admin_token}WRONG"}
)
assert response.status_code == 401, response.text
assert response.status_code == 401, response.text

# Check that the request fails with a admnin token of other election
response2 = client.post("/elections", json=body)
data2 = response2.json()
admin_token2 = data2["admin"]
response = client.put(
f"/elections", json=data, headers={"Authorization": f"Bearer {admin_token2}"}
)
assert response.status_code == 403, response.text

# But it works with the right ballot_token
response = client.put(
f"/elections", json=data, headers={"Authorization": f"Bearer {ballot_token}"}
f"/elections", json=data, headers={"Authorization": f"Bearer {admin_token}"}
)
assert response.status_code == 200, response.text
response2 = client.get(f"/elections/{data['ref']}")
Expand All @@ -575,7 +627,7 @@ def test_update_election():
data["grades"][0]["description"] += "MODIFIED"
data["grades"][0]["value"] += 10
response = client.put(
f"/elections", json=data, headers={"Authorization": f"Bearer {ballot_token}"}
f"/elections", json=data, headers={"Authorization": f"Bearer {admin_token}"}
)
assert response.status_code == 200, response.text
data = response.json()
Expand All @@ -586,14 +638,14 @@ def test_update_election():
data2 = copy.deepcopy(data)
del data2["candidates"][-1]
response = client.put(
f"/elections", json=data2, headers={"Authorization": f"Bearer {ballot_token}"}
f"/elections", json=data2, headers={"Authorization": f"Bearer {admin_token}"}
)
assert response.status_code == 403, response.text

data2 = copy.deepcopy(data)
data2["grades"][0]["id"] += 100
response = client.put(
f"/elections", json=data2, headers={"Authorization": f"Bearer {ballot_token}"}
f"/elections", json=data2, headers={"Authorization": f"Bearer {admin_token}"}
)
assert response.status_code == 403, response.text

Expand Down