Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 57 additions & 24 deletions src/backend/coder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import requests
import random
from config import CODER_API_KEY, CODER_URL, CODER_TEMPLATE_ID, CODER_DEFAULT_ORGANIZATION, CODER_WORKSPACE_NAME

class CoderAPI:
Expand Down Expand Up @@ -32,31 +33,59 @@ def _get_all_templates(self):
response.raise_for_status()
return response.json()


def get_users(self):
def get_users(self, query=None, limit=None, offset=None) -> list[dict] | list:
"""
Get all users from the Coder API
Get users from the Coder API

Args:
query (str, optional): Search query to filter users (q parameter)
limit (int, optional): Page limit
offset (int, optional): Page offset

Returns:
list: List of user objects
"""
endpoint = f"{self.coder_url}/api/v2/users"
response = requests.get(endpoint, headers=self.headers)
params = {}

if query:
params['q'] = query
if limit:
params['limit'] = limit
if offset:
params['offset'] = offset

response = requests.get(endpoint, headers=self.headers, params=params)
response.raise_for_status() # Raise exception for 4XX/5XX responses

return response.json()['users']

def get_user_by_email(self, email):
def get_user_by_email(self, email) -> dict | None:
"""
Get a user by email
Get a user by email using the query parameter

Args:
email (str): email to search for

Returns:
dict: User data if found, None otherwise
"""
users = self.get_users()
for user in users:
if user['email'] == email:
return user
return None
users = self.get_users(query=email, limit=1)
return users[0] if users else None

def check_username_exists(self, username):
"""
Check if a username already exists

Args:
username (str): Username to check

Returns:
bool: True if username exists, False otherwise
"""
users = self.get_users(query=username)
return bool(users)

def create_user(self, username, email, name):
"""
Expand Down Expand Up @@ -122,19 +151,23 @@ def ensure_user_exists(self, user_info):
if not base_username:
base_username = 'user'

# Ensure username is unique
# Ensure username is unique using random numbers between 1-100000
username = base_username
for i in range(10):
# Check if username exists
users = self.get_users()
username_exists = any(user['username'] == username for user in users)
if not username_exists:
break
# If username exists, append a number
username = f"{base_username}{i}"
else:
raise Exception("Failed to create unique username")


# First check if the base username itself is available
if self.check_username_exists(username):
# Try up to 10 times to find a unique username with random numbers
max_attempts = 1000
for _ in range(max_attempts):
random_suffix = random.randint(1, 100000)
username = f"{base_username}{random_suffix}"

if not self.check_username_exists(username):
break
else:
# If we exhausted all attempts, raise an exception
raise Exception(f"uhh... failed to create unique username for {email} after {max_attempts} attempts")

new_user = self.create_user(username, email, name)
return new_user, True

Expand Down Expand Up @@ -310,4 +343,4 @@ def set_workspace_dormancy(self, workspace_id, dormant: bool):
headers['Content-Type'] = 'application/json'
response = requests.put(endpoint, headers=headers, json=data)
response.raise_for_status()
return response.json()
return response.json()