diff --git a/docs/Development.md b/docs/Development.md index b4819eeb..3fd5a30f 100644 --- a/docs/Development.md +++ b/docs/Development.md @@ -66,6 +66,17 @@ file_share_mounts: Instead of using the `file_share_mounts` setting, you can configure file share paths in the database. This is useful for production deployments where you want centralized management of file share paths. To use the paths in the database, set `file_share_mounts: []`. See [fileglancer-janelia](https://github.com/JaneliaSciComp/fileglancer-janelia) for an example of populating the file share paths in the database, using a private wiki source. +### Making changes to the database +If database changes are needed you first need to create a new alembic revision using: +``` +pixi run migrate-create -m "enter what the migration is about" +``` +Then run the actual up-migration +``` +pixi run migrate +``` +If database downgrade is required, so far there's no pixi task defined for it so that would have to be done in a `pixi shell` + ### Running with SSL/HTTPS (Secure Mode) By default, `pixi run dev-launch` runs the server in insecure HTTP mode on port 7878. This is suitable for most local development scenarios. diff --git a/fileglancer/alembic/versions/c05fd0a62deb_create_task_data_table.py b/fileglancer/alembic/versions/c05fd0a62deb_create_task_data_table.py new file mode 100644 index 00000000..5a75bb1e --- /dev/null +++ b/fileglancer/alembic/versions/c05fd0a62deb_create_task_data_table.py @@ -0,0 +1,45 @@ +"""create task data table + +Revision ID: c05fd0a62deb +Revises: 9812335c52b6 +Create Date: 2025-11-12 10:53:48.450263 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'c05fd0a62deb' +down_revision = '9812335c52b6' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('tasks', + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('name', sa.String(), nullable=False), + sa.Column('owner', sa.String(), nullable=True), + sa.Column('proxy', sa.String(), nullable=True), + sa.Column('parameters', sa.String(), nullable=True), + sa.Column('compute_resources', sa.String(), nullable=True), + sa.Column('monitor_url', sa.String(), nullable=True), + sa.Column('output_log', sa.String(), nullable=True), + sa.Column('error_log', sa.String(), nullable=True), + sa.Column('status', sa.String(), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('started_at', sa.DateTime(), nullable=True), + sa.Column('finished_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_tasks_name'), 'tasks', ['name'], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_tasks_name'), table_name='tasks') + op.drop_table('tasks') + # ### end Alembic commands ### \ No newline at end of file diff --git a/fileglancer/app.py b/fileglancer/app.py index dcff7276..5cf0d936 100644 --- a/fileglancer/app.py +++ b/fileglancer/app.py @@ -7,7 +7,7 @@ from datetime import datetime, timedelta, timezone, UTC from functools import cache from pathlib import Path as PathLib -from typing import List, Optional, Dict, Tuple, Generator +from typing import List, Optional, Dict, Tuple, Any try: import tomllib @@ -34,6 +34,7 @@ from fileglancer.utils import format_timestamp, guess_content_type, parse_range_header from fileglancer.user_context import UserContext, EffectiveUserContext, CurrentUserContext from fileglancer.filestore import Filestore +from fileglancer.fgtasks.fgtasks import create_taskdata, get_tasks_registry from fileglancer.log import AccessLogMiddleware from x2s3.utils import get_read_access_acl, get_nosuchbucket_response, get_error_response @@ -1088,6 +1089,56 @@ async def simple_login_handler(request: Request, body: dict = Body(...)): return response + @app.get("/api/tasks") + async def list_tasks(): + tasks_registry = get_tasks_registry(get_settings()) + # list tasks + task_names = tasks_registry.list_tasks() + return JSONResponse(content = task_names, status_code=200) + + @app.get("/api/tasks/{task_name}/param-defs") + async def get_task_params( + task_name: str, + request: Request + ): + tasks_registry = get_tasks_registry(get_settings()) + logger.info(f'Lookup task {task_name}') + # lookup task + task_defn = tasks_registry.get_task(task_name) + if task_defn is None: + logger.error(f'No task found for {task_name}') + return Response(status_code = 404) + + # request.query_params is a MultiDict-like object + task_context = dict(request.query_params) + + param_defs = [p.to_json() for p in task_defn.parameter_defns(task_context)] + return JSONResponse(content = param_defs, status_code=200) + + @app.post("/api/tasks/{task_name}") + async def create_task( + task_name: str, + task_input: Dict[str, Any] = Body(...), + username: str = Depends(get_current_user) + ): + tasks_registry = get_tasks_registry(get_settings()) + with _get_user_context(username): + # lookup task + task_defn = tasks_registry.get_task(task_name) + + if task_defn is None: + logger.error(f'No task found for {task_name}') + return Response(status_code = 404) + + # create and run task + task_data = create_taskdata( + task_name, + task_input.get('parameters',[]), + task_input.get('env', {}), + task_input.get('compute_resources', {}), + ) + await task_defn.launch_task(task_data) + return Response(status_code=201) # Home page - redirect to /fg @app.get("/", include_in_schema=False) diff --git a/fileglancer/auth.py b/fileglancer/auth.py index a10d058c..479cb536 100644 --- a/fileglancer/auth.py +++ b/fileglancer/auth.py @@ -1,9 +1,8 @@ """ Authentication module for OKTA OAuth/OIDC integration """ -import os import hashlib -from datetime import datetime, timedelta, UTC +from datetime import datetime, UTC from typing import Optional from authlib.integrations.starlette_client import OAuth diff --git a/fileglancer/cli.py b/fileglancer/cli.py index 359ef48f..191d0c29 100644 --- a/fileglancer/cli.py +++ b/fileglancer/cli.py @@ -7,14 +7,13 @@ import secrets import signal import sys -import asyncio import click import uvicorn import json import webbrowser import threading import time -import socket + from pathlib import Path from datetime import datetime, timedelta, UTC from loguru import logger diff --git a/fileglancer/database.py b/fileglancer/database.py index 50f79b5b..606d8f26 100644 --- a/fileglancer/database.py +++ b/fileglancer/database.py @@ -137,6 +137,26 @@ class SessionDB(Base): last_accessed_at = Column(DateTime, nullable=False, default=lambda: datetime.now(UTC)) + +class TaskDataDB(Base): + """TaskData DB model""" + __tablename__ = 'tasks' + + id = Column(Integer, primary_key=True, autoincrement=True) + name = Column(String, nullable=False, index=True) + owner = Column(String, nullable=True) + proxy = Column(String, nullable=True) + parameters = Column(String, nullable=True) + compute_resources = Column(String, nullable=True) + monitor_url = Column(String, nullable=True) + output_log = Column(String, nullable=True) + error_log = Column(String, nullable=True) + status = Column(String, nullable=False) + created_at = Column(DateTime, nullable=False, default=lambda: datetime.now(UTC)) + started_at = Column(DateTime, nullable=True) + finished_at = Column(DateTime, nullable=True) + + def run_alembic_upgrade(db_url): """Run Alembic migrations to upgrade database to latest version""" global _migrations_run diff --git a/fileglancer/fgtasks/__init__.py b/fileglancer/fgtasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fileglancer/fgtasks/fgtasks.py b/fileglancer/fgtasks/fgtasks.py new file mode 100644 index 00000000..2e3f2f42 --- /dev/null +++ b/fileglancer/fgtasks/fgtasks.py @@ -0,0 +1,262 @@ +import argparse, asyncio, json + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from importlib.metadata import entry_points +from typing import Dict, List, Any, Optional, Sequence +from loguru import logger +from pathlib import Path +from fileglancer.settings import Settings + +@dataclass +class TaskData: + """ + This task data that can be used to get information about the task + """ + id: str | None + name: str + # task owner + owner: str | None + # users with appropriate permissions can run tasks + # for other users + proxy: str | None + env: Dict[str, str] + parameters: List[str] + # task compute resources + compute_resources: Dict[str, Any] + monitor_url: str | None + output_log: str | None + error_log: str | None + status: str + created_date: datetime + start_time: datetime | None + end_time: datetime | None + + +class TaskDefn(ABC): + """ + Task definition + """ + def __init__(self, name:str, settings:Settings): + self._name = name + self._settings = settings + self._executor = None + + # parameter definition + def parameter_defns(self, task_context:Dict[str,Any]={}) -> List[TaskParameterDefn]: + """List of parameters accepted by this task""" + args_meta = [] + argparser = self.define_args() + for action in argparser._actions: + if action.dest == 'help': + continue + args_meta.append(TaskParameterDefn( + action.dest, + action.option_strings, + action.required, + action.default, + action.help, + action.nargs, + action.choices, + )) + if task_context: + args_meta.extend(self.parameter_defns_for_context(task_context)) + return args_meta + + @abstractmethod + def parameter_defns_for_context(self, task_context:Dict[str, Any])-> List[TaskParameterDefn]: + pass + + async def launch_task(self, task_data:TaskData): + # the default launcher parses the task arguments and invokes the execute method + logger.info(f'Parse task args: {task_data.parameters}') + args, additional_args = self.define_args().parse_known_args(task_data.parameters) + task_args = {tp.name: getattr(args, tp.name) for tp in self.parameter_defns()} + task_args['extra_args'] = additional_args + + logger.info(f'Task arguments: {task_args}') + # not awaiting for the result here is intentional + # I don't know yet how this is going to work + + self._executor = TaskExecutor(task_data.name) + + await self._executor.execute(self.create_task_cmd(task_data, **task_args)) + + @abstractmethod + def create_task_cmd(self, task_data: TaskData, **kwargs) -> List[str]: + pass + + @abstractmethod + def define_args(self) -> argparse.ArgumentParser: + pass + + +@dataclass +class TaskParameterDefn: + name: str + flags: Sequence[str] + required: bool + default: Any + help: Optional[str] + nargs: Any + choices: Any + + def to_json(self) -> str: + return json.dumps({ + 'name': self.name, + 'flags': list(self.flags), # convert Sequence to list + 'required': self.required, + 'default': _as_json_type(self.default), + 'help': self.help, + 'nargs': _as_json_type(self.nargs), + 'choices': _as_json_type(self.choices), + }) + + +class TaskExecutor: + def __init__(self, task_name): + self._task_name = task_name + self._process = None + self._monitor = None + self._process_stdout_reader = None + self._process_stderr_reader = None + + async def execute(self, cmd:List[str]): + logger.info(f'Run: {cmd}') + self._process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + logger.info(f'Task {self._task_name} PID: {self._process.pid}') + # Start the background monitor + self._monitor = asyncio.create_task(self._monitor_task_process(self._process)) + # Start background tasks to read the output output + self._process_stdout_reader = asyncio.create_task(self._read_stream(self._process.stdout, "STDOUT")) + self._process_stderr_reader = asyncio.create_task(self._read_stream(self._process.stderr, "STDERR")) + + async def _read_stream(self, stream: asyncio.StreamReader | None, label: str, filename: str|None=None): + """Read and log process output lines as they arrive.""" + if stream is None: + return # nothing to do + try: + if filename: + # Ensure all parent directories exist + Path(filename).parent.mkdir(parents=True, exist_ok=True) + # Open file for writing (append mode is often useful for logs) + file_handle = open(filename, 'a', encoding='utf-8') + else: + file_handle = None + + async for line in stream: + text = line.decode(errors='replace').rstrip() + logger.debug(f"[{label}] {text}") + + if file_handle: + file_handle.write(text + '\n') + file_handle.flush() + + except Exception as e: + logger.error(f"Error reading {label}: {e}") + + async def _monitor_task_process(self, process:asyncio.subprocess.Process): + try: + while True: + if process.returncode is not None: + logger.info(f'{self._task_name} process finished with {process.returncode} code') + await asyncio.gather( + *(t for t in [self._process_stdout_reader, self._process_stderr_reader] if t), + return_exceptions=True + ) + break + await asyncio.sleep(0.5) + except asyncio.CancelledError: + logger.info(f'{self._task_name} monitor cancelled') + except Exception as e: + logger.error(f'{self._task_name} monitor error: {e}') + + +class TaskRegistry: + + def __init__(self, settings: Settings, entry_point_group: str = 'fileglancer.tasks'): + self._settings = settings + self._entry_point_group = entry_point_group + self._tasks : Dict[str, TaskDefn] = {} + + def discover_tasks(self) -> None: + self._tasks.clear() + + # this only needs to work for python 3.10+ + logger.info(f'Discover {self._entry_point_group}') + eps = entry_points() + group = eps.select(group=self._entry_point_group) + + for ep in group: + try: + task_class = ep.load() + task = task_class(ep.name, self._settings) + if not isinstance(task, TaskDefn): + logger.warning(f'Warning: {ep.name} is not a Task definition instance') + continue + logger.info(f'Found registered task: {ep.name}') + self._tasks[ep.name] = task + except Exception as e: + print(f'Error loading task {ep.name}: {e}') + + def list_tasks(self) -> List[str]: + return list(self._tasks.keys()) + + def get_task(self, name:str | None) -> Optional[TaskDefn]: + logger.debug(f'Get {name}') + return self._tasks.get(name) if name else None + + +def create_taskdata(task_name: str, + parameters: List[str] = [], + run_env: Dict[str, str] = {}, + task_resources: Dict[str, Any] = {}) -> TaskData: + """ + Create task data - this is information that can be persisted + and then later used to + """ + # populate task_data both from current paramenters and + # the default parameter definitions + return TaskData( + None, + task_name, + None, + None, + run_env, + parameters, + task_resources, + None, # monitor_url + None, # output log + None, # error log + 'CREATED', + datetime.now(), + None, + None, + ) + +tasks_registry : Optional[TaskRegistry] = None + + +def get_tasks_registry(settings:Settings): + global tasks_registry + if tasks_registry is None: + logger.info('Initialize task registry') + tasks_registry = TaskRegistry(settings) + tasks_registry.discover_tasks() + + return tasks_registry + + +def _as_json_type(value): + if value is None: + return None + if isinstance(value, (str, int, float, bool, list, dict)): + return value + if isinstance(value, (set, tuple)): + return list(value) + return str(value) \ No newline at end of file diff --git a/fileglancer/fgtasks/nftask.py b/fileglancer/fgtasks/nftask.py new file mode 100644 index 00000000..3519a9bf --- /dev/null +++ b/fileglancer/fgtasks/nftask.py @@ -0,0 +1,98 @@ +import json + +from argparse import ArgumentParser +from loguru import logger +from pathlib import Path +from typing import Any, Dict, List, Optional +from .fgtasks import TaskData, TaskDefn, TaskParameterDefn + + +class NextflowTaskDefn(TaskDefn): + def __init__(self, name, settings): + super().__init__(name, settings) + self._process = None + + def define_args(self) -> ArgumentParser: + parser = ArgumentParser() + + parser.add_argument('--pipeline', type=str, required=True, help='Nextflow pipeline') + parser.add_argument('--params-file', '--params_file', type=str, help='Nextflow pipeline json params file') + parser.add_argument('--compute-profile', type=str, help='Nextflow compute profile') + parser.add_argument('--configs', nargs='*', help='Nextflow configuration files') + parser.add_argument('--workdir', type=str, help='Nextflow compute profile') + return parser + + def create_task_cmd(self, task_data: TaskData, + pipeline:str='', + configs:List[str]=[], + params_file:Optional[str]=None, + compute_profile:Optional[str]=None, + workdir:Optional[str]=None, + **kwargs) -> List[str]: + if not pipeline: + raise ValueError('Pipeline must be defined') + nextflow_configs_arg = [config_arg for c in configs + for config_arg in ('-c', c) if c ] if configs is not None else [] + params_file_arg = ['-params-file', params_file] if params_file else [] + profile_arg = ['-profile', compute_profile] if compute_profile else [] + workdir_arg = ['-w', workdir] if workdir else [] + extra_args = kwargs.get('extra_args', []) + + cmdline = ([ 'nextflow', 'run', pipeline ] + + nextflow_configs_arg + + params_file_arg + + profile_arg + + workdir_arg + + extra_args) + logger.debug('Nextflow cmd', cmdline) + return cmdline + + def parameter_defns_for_context(self, task_context:Dict[str, Any])-> List[TaskParameterDefn]: + """ + task_context: dictionary containing pipeline path and a flag whether to include hidden parameters, e.g., + {'pipeline': '/location/of/the/pipeline', 'include_hidden': True} + """ + pipeline_path = task_context.get('pipeline') + if not pipeline_path: + return [] + p = Path(pipeline_path) + if p.is_dir(): + p = p / 'nextflow_schema.json' + if not p.exists(): + raise ValueError(f'No schema found at {pipeline_path}') + + with open(p, "r", encoding="utf-8") as nf_schema_file: + nf_schema = json.load(nf_schema_file) + + include_hidden = task_context.get('include_hidden', False) + return self._extract_parameter_defns_from_section(nf_schema, include_hidden) + + def _extract_parameter_defns_from_section(self, section: Dict[str, Any], include_hidden): + param_defs = [] + if 'properties' in section: + props = section['properties'] + required_fields = set(section.get("required", [])) + for name, attr in props.items(): + is_hidden = attr.get('hidden', False) + if not is_hidden or include_hidden: + param = TaskParameterDefn( + name=name, + flags=[f'--{name}'], + required=name in required_fields, + default=attr.get('default'), + help=attr.get('description'), + nargs='+' if attr.get('type') == 'array' else None, + choices=attr.get('enum') + ) + param_defs.append(param) + + for key, value in section.items(): + if isinstance(value, dict): + logger.debug(f'Extract {key} parmeters') + param_defs.extend(self._extract_parameter_defns_from_section(value, include_hidden)) + + + return param_defs + + + return [] # !!!!! FIXME \ No newline at end of file diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 9db3176e..c703a119 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -28,7 +28,8 @@ function RequireAuth({ children }: { readonly children: ReactNode }) { // If not authenticated, redirect to home page with the current URL as 'next' parameter if (!authStatus?.authenticated) { - const currentPath = window.location.pathname + window.location.search + window.location.hash; + const currentPath = + window.location.pathname + window.location.search + window.location.hash; const encodedNext = encodeURIComponent(currentPath); window.location.href = `/fg/?next=${encodedNext}`; return ( diff --git a/pixi.lock b/pixi.lock index 6c451219..127437c5 100644 --- a/pixi.lock +++ b/pixi.lock @@ -3030,8 +3030,8 @@ packages: timestamp: 1760972937564 - pypi: ./ name: fileglancer - version: 2.2.0 - sha256: 568a88fbeaa707ed25d850e71614cfb49c634032acfca92b456b180c6464a0f1 + version: 2.1.2 + sha256: d2e03ed9f609f89fc78d7ece14bf36c43f0af00e92af9eddaae332a31ca5161c requires_dist: - aiosqlite>=0.21.0 - alembic>=1.17.0 @@ -3041,6 +3041,7 @@ packages: - click>=8.0 - fastapi>=0.119.1 - httpx>=0.28 + - importlib-metadata>=8.7.0 - itsdangerous>=2.2.0 - loguru>=0.7.3 - lxml>=5.3.1 diff --git a/pyproject.toml b/pyproject.toml index e563947a..060f6acb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "click >=8.0", "fastapi >=0.119.1", "httpx >=0.28", + "importlib-metadata >=8.7.0", "itsdangerous >=2.2.0", "loguru >=0.7.3", "lxml >=5.3.1", @@ -49,6 +50,9 @@ dependencies = [ [project.scripts] fileglancer = "fileglancer.cli:cli" +[project.entry-points."fileglancer.tasks"] +nextflow_task = "fileglancer.fgtasks.nftask:NextflowTaskDefn" + [project.optional-dependencies] test = [ "coverage", @@ -93,7 +97,7 @@ path = "fileglancer/_version.py" [tool.check-wheel-contents] ignore = ["W002"] -[tool.pixi.project] +[tool.pixi.workspace] channels = ["conda-forge"] platforms = ["osx-arm64", "osx-64", "linux-64"] @@ -143,8 +147,8 @@ dev-watch = { cmd = "cd frontend && NODE_ENV=development npm run watch" } dev-launch = "pixi run uvicorn fileglancer.app:app --no-access-log --port 7878 --reload" dev-launch-remote = "pixi run uvicorn fileglancer.app:app --no-access-log --host 0.0.0.0 --port 7878 --reload --ssl-keyfile /opt/certs/cert.key --ssl-certfile /opt/certs/cert.crt" dev-launch-secure = "python fileglancer/dev_launch.py" -migrate = "alembic upgrade head" -migrate-create = "alembic revision --autogenerate" +migrate = { cmd = "alembic upgrade head", cwd = "fileglancer" } +migrate-create = { cmd = "alembic revision --autogenerate", cwd = "fileglancer" } stamp-db = "python -m fileglancer.stamp_db" [tool.pixi.dependencies]