From e83f4369100d0c1f036587d08888bf0a861cf7de Mon Sep 17 00:00:00 2001 From: Alex TYRODE Date: Tue, 6 May 2025 23:36:44 +0000 Subject: [PATCH] feat: enhance user retrieval and username uniqueness checks in CoderAPI - Updated get_users method to support optional query, limit, and offset parameters for improved user filtering. - Refactored get_user_by_email to utilize the updated get_users method for better efficiency. - Added check_username_exists method to verify if a username is already taken. - Improved username creation logic to ensure uniqueness by appending random numbers if necessary, with a maximum of 1000 attempts. - Enhanced documentation for clarity on method functionalities and parameters. --- src/backend/coder.py | 81 +++++++++++++++++++++++++++++++------------- 1 file changed, 57 insertions(+), 24 deletions(-) diff --git a/src/backend/coder.py b/src/backend/coder.py index 6ae5a7b..5bd6895 100644 --- a/src/backend/coder.py +++ b/src/backend/coder.py @@ -1,4 +1,5 @@ import requests +import random from config import CODER_API_KEY, CODER_URL, CODER_TEMPLATE_ID, CODER_DEFAULT_ORGANIZATION, CODER_WORKSPACE_NAME class CoderAPI: @@ -32,19 +33,37 @@ def _get_all_templates(self): response.raise_for_status() return response.json() - - def get_users(self): + + def get_users(self, query=None, limit=None, offset=None) -> list[dict] | list: """ - Get all users from the Coder API + Get users from the Coder API + + Args: + query (str, optional): Search query to filter users (q parameter) + limit (int, optional): Page limit + offset (int, optional): Page offset + + Returns: + list: List of user objects """ endpoint = f"{self.coder_url}/api/v2/users" - response = requests.get(endpoint, headers=self.headers) + params = {} + + if query: + params['q'] = query + if limit: + params['limit'] = limit + if offset: + params['offset'] = offset + + response = requests.get(endpoint, headers=self.headers, params=params) response.raise_for_status() # Raise exception for 4XX/5XX responses + return response.json()['users'] - def get_user_by_email(self, email): + def get_user_by_email(self, email) -> dict | None: """ - Get a user by email + Get a user by email using the query parameter Args: email (str): email to search for @@ -52,11 +71,21 @@ def get_user_by_email(self, email): Returns: dict: User data if found, None otherwise """ - users = self.get_users() - for user in users: - if user['email'] == email: - return user - return None + users = self.get_users(query=email, limit=1) + return users[0] if users else None + + def check_username_exists(self, username): + """ + Check if a username already exists + + Args: + username (str): Username to check + + Returns: + bool: True if username exists, False otherwise + """ + users = self.get_users(query=username) + return bool(users) def create_user(self, username, email, name): """ @@ -122,19 +151,23 @@ def ensure_user_exists(self, user_info): if not base_username: base_username = 'user' - # Ensure username is unique + # Ensure username is unique using random numbers between 1-100000 username = base_username - for i in range(10): - # Check if username exists - users = self.get_users() - username_exists = any(user['username'] == username for user in users) - if not username_exists: - break - # If username exists, append a number - username = f"{base_username}{i}" - else: - raise Exception("Failed to create unique username") - + + # First check if the base username itself is available + if self.check_username_exists(username): + # Try up to 10 times to find a unique username with random numbers + max_attempts = 1000 + for _ in range(max_attempts): + random_suffix = random.randint(1, 100000) + username = f"{base_username}{random_suffix}" + + if not self.check_username_exists(username): + break + else: + # If we exhausted all attempts, raise an exception + raise Exception(f"uhh... failed to create unique username for {email} after {max_attempts} attempts") + new_user = self.create_user(username, email, name) return new_user, True @@ -310,4 +343,4 @@ def set_workspace_dormancy(self, workspace_id, dormant: bool): headers['Content-Type'] = 'application/json' response = requests.put(endpoint, headers=headers, json=data) response.raise_for_status() - return response.json() + return response.json() \ No newline at end of file