From 0a62bd6970d130927129484deeb4774975cd5c27 Mon Sep 17 00:00:00 2001 From: Cristian Goina Date: Fri, 7 Nov 2025 11:11:33 -0500 Subject: [PATCH 1/8] started to define classes for task plugins --- fileglancer/fg_plugins.py | 86 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 fileglancer/fg_plugins.py diff --git a/fileglancer/fg_plugins.py b/fileglancer/fg_plugins.py new file mode 100644 index 00000000..c725e3b2 --- /dev/null +++ b/fileglancer/fg_plugins.py @@ -0,0 +1,86 @@ +import datetime + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from importlib.metadata import entry_points +from typing import Dict, List, Any, Optional + + +@dataclass +class TaskDefn: + name: str + task_env: Dict[str, Optional[str]] + parameters: List[TaskParameterDefn] + + +@dataclass +class TaskParameterDefn: + name: str + description: str + type: str + required: bool + arity: int + valid_values: List[Any] + default_value: Any + + +class TaskParameter: + name: str + value: Any + + +class Task(ABC): + instance_id: str + task_name: str + # task owner + task_owner: str + # users with appropriate permissions can run tasks + # for other users + task_run_proxy: str + task_env: Dict[str, str] + parameters: List[TaskParameter] + created_date: datetime.date + + @abstractmethod + def error_log(self) -> str: + pass + + @abstractmethod + def output_log(self) -> str: + pass + + @abstractmethod + def execute(self, **kwargs): + pass + + +class TaskRegistry: + + def __init__(self, entry_point_group: str = 'fileglancer.tasks'): + self._entry_point_group = entry_point_group + self._tasks : Dict[str, TaskDefn] = {} + + def discover_tasks(self) -> None: + self._tasks.clear() + + # this only needs to work for python 3.10+ + eps = entry_points() + group = eps.select(group=self._entry_point_group) + + for ep in group: + try: + task_class = ep.load() + task = task_class() + if not isinstance(task, TaskDefn): + print(f'Warning: {ep.name} is not a Task definition instance') + continue + self._tasks[task.name] = task + print(f'Registered task: {task.name}') + except Exception as e: + print(f'Error loading task {ep.name}: {e}') + + def list_tasks(self) -> List[str]: + return list(self._tasks.keys()) + + def get_task(self, name:str) -> Optional[TaskDefn]: + return self._tasks.get(name) From 7abc729527f20c94acf3537d5ea392e02a345515 Mon Sep 17 00:00:00 2001 From: Cristian Goina Date: Tue, 11 Nov 2025 18:50:32 -0500 Subject: [PATCH 2/8] a first take of implementing a task execution framework --- fileglancer/app.py | 50 ++++++- fileglancer/cli.py | 3 +- fileglancer/database.py | 20 +++ fileglancer/fg_plugins.py | 86 ------------ fileglancer/fgtasks/__init__.py | 0 fileglancer/fgtasks/fgtasks.py | 223 ++++++++++++++++++++++++++++++++ fileglancer/fgtasks/nftask.py | 47 +++++++ pixi.lock | 5 +- pyproject.toml | 6 +- 9 files changed, 348 insertions(+), 92 deletions(-) delete mode 100644 fileglancer/fg_plugins.py create mode 100644 fileglancer/fgtasks/__init__.py create mode 100644 fileglancer/fgtasks/fgtasks.py create mode 100644 fileglancer/fgtasks/nftask.py diff --git a/fileglancer/app.py b/fileglancer/app.py index 030cf9e1..08d543fa 100644 --- a/fileglancer/app.py +++ b/fileglancer/app.py @@ -7,7 +7,7 @@ from datetime import datetime, timedelta, timezone, UTC from functools import cache from pathlib import Path as PathLib -from typing import List, Optional, Dict, Tuple, Generator +from typing import List, Optional, Dict, Tuple, Any try: import tomllib @@ -34,6 +34,7 @@ from fileglancer.utils import format_timestamp, guess_content_type, parse_range_header from fileglancer.user_context import UserContext, EffectiveUserContext, CurrentUserContext from fileglancer.filestore import Filestore +from fileglancer.fgtasks.fgtasks import create_taskdata, get_tasks_registry from fileglancer.log import AccessLogMiddleware from x2s3.utils import get_read_access_acl, get_nosuchbucket_response, get_error_response @@ -1071,6 +1072,53 @@ async def simple_login_handler(request: Request, body: dict = Body(...)): return response + @app.get("/api/tasks") + async def list_tasks(): + tasks_registry = get_tasks_registry() + # list tasks + task_names = tasks_registry.list_tasks() + return JSONResponse(content = task_names, status_code=200) + + @app.get("/api/tasks/{task_name}/param-defs") + async def get_task_params( + task_name: str, + ): + tasks_registry = get_tasks_registry() + logger.info(f'Lookup task {task_name}') + # lookup task + task_defn = tasks_registry.get_task(task_name) + if task_defn is None: + logger.error(f'No task found for {task_name}') + return Response(status_code = 404) + + param_defs = [p.to_json() for p in task_defn.parameter_defns] + return JSONResponse(content = param_defs, status_code=200) + + @app.post("/api/tasks/{task_name}") + async def create_task( + task_name: str, + task_input: Dict[str, Any] = Body(...), + # username: str = Depends(get_current_user) + ): + tasks_registry = get_tasks_registry() + username = 'goinac' # !!!!!!! + with _get_user_context(username): + # lookup task + task_defn = tasks_registry.get_task(task_name) + + if task_defn is None: + logger.error(f'No task found for {task_name}') + return Response(status_code = 404) + + # create and run task + task_data = create_taskdata( + task_name, + task_input.get('parameters',[]), + task_input.get('env', {}), + task_input.get('compute_resources', {}), + ) + await task_defn.launch_task(task_data) + return Response(status_code=201) # Home page - redirect to /fg @app.get("/", include_in_schema=False) diff --git a/fileglancer/cli.py b/fileglancer/cli.py index 359ef48f..191d0c29 100644 --- a/fileglancer/cli.py +++ b/fileglancer/cli.py @@ -7,14 +7,13 @@ import secrets import signal import sys -import asyncio import click import uvicorn import json import webbrowser import threading import time -import socket + from pathlib import Path from datetime import datetime, timedelta, UTC from loguru import logger diff --git a/fileglancer/database.py b/fileglancer/database.py index 50f79b5b..606d8f26 100644 --- a/fileglancer/database.py +++ b/fileglancer/database.py @@ -137,6 +137,26 @@ class SessionDB(Base): last_accessed_at = Column(DateTime, nullable=False, default=lambda: datetime.now(UTC)) + +class TaskDataDB(Base): + """TaskData DB model""" + __tablename__ = 'tasks' + + id = Column(Integer, primary_key=True, autoincrement=True) + name = Column(String, nullable=False, index=True) + owner = Column(String, nullable=True) + proxy = Column(String, nullable=True) + parameters = Column(String, nullable=True) + compute_resources = Column(String, nullable=True) + monitor_url = Column(String, nullable=True) + output_log = Column(String, nullable=True) + error_log = Column(String, nullable=True) + status = Column(String, nullable=False) + created_at = Column(DateTime, nullable=False, default=lambda: datetime.now(UTC)) + started_at = Column(DateTime, nullable=True) + finished_at = Column(DateTime, nullable=True) + + def run_alembic_upgrade(db_url): """Run Alembic migrations to upgrade database to latest version""" global _migrations_run diff --git a/fileglancer/fg_plugins.py b/fileglancer/fg_plugins.py deleted file mode 100644 index c725e3b2..00000000 --- a/fileglancer/fg_plugins.py +++ /dev/null @@ -1,86 +0,0 @@ -import datetime - -from abc import ABC, abstractmethod -from dataclasses import dataclass -from importlib.metadata import entry_points -from typing import Dict, List, Any, Optional - - -@dataclass -class TaskDefn: - name: str - task_env: Dict[str, Optional[str]] - parameters: List[TaskParameterDefn] - - -@dataclass -class TaskParameterDefn: - name: str - description: str - type: str - required: bool - arity: int - valid_values: List[Any] - default_value: Any - - -class TaskParameter: - name: str - value: Any - - -class Task(ABC): - instance_id: str - task_name: str - # task owner - task_owner: str - # users with appropriate permissions can run tasks - # for other users - task_run_proxy: str - task_env: Dict[str, str] - parameters: List[TaskParameter] - created_date: datetime.date - - @abstractmethod - def error_log(self) -> str: - pass - - @abstractmethod - def output_log(self) -> str: - pass - - @abstractmethod - def execute(self, **kwargs): - pass - - -class TaskRegistry: - - def __init__(self, entry_point_group: str = 'fileglancer.tasks'): - self._entry_point_group = entry_point_group - self._tasks : Dict[str, TaskDefn] = {} - - def discover_tasks(self) -> None: - self._tasks.clear() - - # this only needs to work for python 3.10+ - eps = entry_points() - group = eps.select(group=self._entry_point_group) - - for ep in group: - try: - task_class = ep.load() - task = task_class() - if not isinstance(task, TaskDefn): - print(f'Warning: {ep.name} is not a Task definition instance') - continue - self._tasks[task.name] = task - print(f'Registered task: {task.name}') - except Exception as e: - print(f'Error loading task {ep.name}: {e}') - - def list_tasks(self) -> List[str]: - return list(self._tasks.keys()) - - def get_task(self, name:str) -> Optional[TaskDefn]: - return self._tasks.get(name) diff --git a/fileglancer/fgtasks/__init__.py b/fileglancer/fgtasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fileglancer/fgtasks/fgtasks.py b/fileglancer/fgtasks/fgtasks.py new file mode 100644 index 00000000..1252ce9d --- /dev/null +++ b/fileglancer/fgtasks/fgtasks.py @@ -0,0 +1,223 @@ +import argparse, asyncio, json + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from importlib.metadata import entry_points +from typing import Dict, List, Any, Optional, Sequence +from loguru import logger + + +@dataclass +class TaskData: + """ + This task data that can be used to get information about the task + """ + id: str | None + name: str + # task owner + owner: str | None + # users with appropriate permissions can run tasks + # for other users + proxy: str | None + env: Dict[str, str] + parameters: List[str] + # task compute resources + compute_resources: Dict[str, Any] + monitor_url: str | None + output_log: str | None + error_log: str | None + status: str + created_date: datetime + start_time: datetime | None + end_time: datetime | None + + +class TaskDefn(ABC): + """ + Task definition + """ + def __init__(self, name:str): + self._name = name + self._executor = None + + # parameter definition + @property + def parameter_defns(self) -> List[TaskParameterDefn]: + """List of parameters accepted by this task""" + args_meta = [] + argparser = self.define_args() + for action in argparser._actions: + if action.dest == 'help': + continue + args_meta.append(TaskParameterDefn( + action.dest, + action.option_strings, + action.required, + action.default, + action.help, + action.nargs, + action.choices, + )) + return args_meta + + async def launch_task(self, task_data:TaskData): + # the default launcher parses the task arguments and invokes the execute method + logger.info(f'Parse task args: {task_data.parameters}') + args, additional_args = self.define_args().parse_known_args(task_data.parameters) + task_args = {tp.name: getattr(args, tp.name) for tp in self.parameter_defns} + task_args['extra_args'] = additional_args + + logger.info(f'Task arguments: {task_args}') + # not awaiting for the result here is intentional + # I don't know yet how this is going to work + + self._executor = TaskExecutor(task_data.name) + + await self._executor.execute(self.create_task_cmd(task_data, **task_args)) + + @abstractmethod + def create_task_cmd(self, task_data: TaskData, **kwargs) -> List[str]: + pass + + @abstractmethod + def define_args(self) -> argparse.ArgumentParser: + pass + + +@dataclass +class TaskParameterDefn: + name: str + flags: Sequence[str] + required: bool + default: Any + help: Optional[str] + nargs: Any + choices: Any + + def to_json(self) -> str: + return json.dumps({ + "name": self.name, + "flags": list(self.flags), # convert Sequence to list + "required": self.required, + "default": _as_json_type(self.default), + "help": self.help, + "nargs": _as_json_type(self.nargs), + "choices": _as_json_type(self.choices), + }) + + + +class TaskExecutor: + def __init__(self, task_name): + self._task_name = task_name + self._process = None + self._process_stdout_reader = None + self._process_stderr_reader = None + + async def execute(self, cmd:List[str]): + logger.info(f'Run: {cmd}') + self._process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + logger.info(f'Task {self._task_name} PID: {self._process.pid}') + # Start background tasks to read the output output + self._process_stdout_reader = asyncio.create_task(self._read_stream(self._process.stdout, "STDOUT")) + self._process_stderr_reader = asyncio.create_task(self._read_stream(self._process.stderr, "STDERR")) + + async def _read_stream(self, stream: asyncio.StreamReader | None, label: str): + """Read and log process output lines as they arrive.""" + try: + if stream is not None: + async for line in stream: + text = line.decode().rstrip() + logger.info(f"[{label}] {text}") + except Exception as e: + logger.error(f"Error reading {label}: {e}") + + + +class TaskRegistry: + + def __init__(self, entry_point_group: str = 'fileglancer.tasks'): + self._entry_point_group = entry_point_group + self._tasks : Dict[str, TaskDefn] = {} + + def discover_tasks(self) -> None: + self._tasks.clear() + + # this only needs to work for python 3.10+ + logger.info(f'Discover {self._entry_point_group}') + eps = entry_points() + group = eps.select(group=self._entry_point_group) + + for ep in group: + try: + task_class = ep.load() + task = task_class(ep.name) + if not isinstance(task, TaskDefn): + logger.warning(f'Warning: {ep.name} is not a Task definition instance') + continue + logger.info(f'Found registered task: {ep.name}') + self._tasks[ep.name] = task + except Exception as e: + print(f'Error loading task {ep.name}: {e}') + + def list_tasks(self) -> List[str]: + return list(self._tasks.keys()) + + def get_task(self, name:str | None) -> Optional[TaskDefn]: + logger.debug(f'Get {name}') + return self._tasks.get(name) if name else None + + +def create_taskdata(task_name: str, + parameters: List[str] = [], + run_env: Dict[str, str] = {}, + task_resources: Dict[str, Any] = {}) -> TaskData: + """ + Create task data - this is information that can be persisted + and then later used to + """ + # populate task_data both from current paramenters and + # the default parameter definitions + return TaskData( + None, + task_name, + None, + None, + run_env, + parameters, + task_resources, + None, # monitor_url + None, # output log + None, # error log + 'CREATED', + datetime.now(), + None, + None, + ) + +tasks_registry : Optional[TaskRegistry] = None + + +def get_tasks_registry(): + global tasks_registry + if tasks_registry is None: + logger.info('Initialize task registry') + tasks_registry = TaskRegistry() + tasks_registry.discover_tasks() + + return tasks_registry + + +def _as_json_type(value): + if value is None: + return None + if isinstance(value, (str, int, float, bool, list, dict)): + return value + if isinstance(value, (set, tuple)): + return list(value) + return str(value) \ No newline at end of file diff --git a/fileglancer/fgtasks/nftask.py b/fileglancer/fgtasks/nftask.py new file mode 100644 index 00000000..8d122317 --- /dev/null +++ b/fileglancer/fgtasks/nftask.py @@ -0,0 +1,47 @@ +import asyncio + +from argparse import ArgumentParser +from loguru import logger +from typing import Optional, List +from .fgtasks import TaskData, TaskDefn + + +class NextflowTaskDefn(TaskDefn): + def __init__(self, name): + super().__init__(name) + self._process = None + + def define_args(self) -> ArgumentParser: + parser = ArgumentParser() + + parser.add_argument('--pipeline', type=str, required=True, help='Nextflow pipeline') + parser.add_argument('--params-file', '--params_file', type=str, help='Nextflow pipeline json params file') + parser.add_argument('--compute-profile', type=str, help='Nextflow compute profile') + parser.add_argument('--configs', nargs='*', help='Nextflow configuration files') + parser.add_argument('--workdir', type=str, help='Nextflow compute profile') + return parser + + def create_task_cmd(self, task_data: TaskData, + pipeline:str='', + configs:List[str]=[], + params_file:Optional[str]=None, + compute_profile:Optional[str]=None, + workdir:Optional[str]=None, + **kwargs) -> List[str]: + if not pipeline: + raise ValueError('Pipeline must be defined') + nextflow_configs_arg = [config_arg for c in configs + for config_arg in ('-c', c) if c ] if configs is not None else [] + params_file_arg = ['-params-file', params_file] if params_file else [] + profile_arg = ['-profile', compute_profile] if compute_profile else [] + workdir_arg = ['-w', workdir] if workdir else [] + extra_args = kwargs.get('extra_args', []) + + cmdline = ([ 'nextflow', 'run', pipeline ] + + nextflow_configs_arg + + params_file_arg + + profile_arg + + workdir_arg + + extra_args) + logger.debug('Nextflow cmd', cmdline) + return cmdline diff --git a/pixi.lock b/pixi.lock index 6c451219..5934b706 100644 --- a/pixi.lock +++ b/pixi.lock @@ -3030,8 +3030,8 @@ packages: timestamp: 1760972937564 - pypi: ./ name: fileglancer - version: 2.2.0 - sha256: 568a88fbeaa707ed25d850e71614cfb49c634032acfca92b456b180c6464a0f1 + version: 2.1.2 + sha256: 34cc9d943a0f28f313fe0d8da68798e8d9e62db25c8c22c2917b2e06cf848306 requires_dist: - aiosqlite>=0.21.0 - alembic>=1.17.0 @@ -3041,6 +3041,7 @@ packages: - click>=8.0 - fastapi>=0.119.1 - httpx>=0.28 + - importlib-metadata>=8.7.0 - itsdangerous>=2.2.0 - loguru>=0.7.3 - lxml>=5.3.1 diff --git a/pyproject.toml b/pyproject.toml index e563947a..6a520ae0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "click >=8.0", "fastapi >=0.119.1", "httpx >=0.28", + "importlib-metadata >=8.7.0", "itsdangerous >=2.2.0", "loguru >=0.7.3", "lxml >=5.3.1", @@ -49,6 +50,9 @@ dependencies = [ [project.scripts] fileglancer = "fileglancer.cli:cli" +[project.entry-points."fileglancer.tasks"] +nextflow_task = "fileglancer.fgtasks.nftask:NextflowTaskDefn" + [project.optional-dependencies] test = [ "coverage", @@ -93,7 +97,7 @@ path = "fileglancer/_version.py" [tool.check-wheel-contents] ignore = ["W002"] -[tool.pixi.project] +[tool.pixi.workspace] channels = ["conda-forge"] platforms = ["osx-arm64", "osx-64", "linux-64"] From 008a3785788ab45c359753a9829807c60dd339e6 Mon Sep 17 00:00:00 2001 From: Cristian Goina Date: Wed, 12 Nov 2025 10:02:51 -0500 Subject: [PATCH 3/8] uncommented username injection --- fileglancer/app.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fileglancer/app.py b/fileglancer/app.py index 08d543fa..815f63b7 100644 --- a/fileglancer/app.py +++ b/fileglancer/app.py @@ -1098,10 +1098,9 @@ async def get_task_params( async def create_task( task_name: str, task_input: Dict[str, Any] = Body(...), - # username: str = Depends(get_current_user) + username: str = Depends(get_current_user) ): tasks_registry = get_tasks_registry() - username = 'goinac' # !!!!!!! with _get_user_context(username): # lookup task task_defn = tasks_registry.get_task(task_name) From 59cf061186921682743737f370ff5df377cc6a0d Mon Sep 17 00:00:00 2001 From: Cristian Goina Date: Wed, 12 Nov 2025 10:31:47 -0500 Subject: [PATCH 4/8] use single quotes for strings --- fileglancer/fgtasks/fgtasks.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/fileglancer/fgtasks/fgtasks.py b/fileglancer/fgtasks/fgtasks.py index 1252ce9d..019d83e2 100644 --- a/fileglancer/fgtasks/fgtasks.py +++ b/fileglancer/fgtasks/fgtasks.py @@ -97,13 +97,13 @@ class TaskParameterDefn: def to_json(self) -> str: return json.dumps({ - "name": self.name, - "flags": list(self.flags), # convert Sequence to list - "required": self.required, - "default": _as_json_type(self.default), - "help": self.help, - "nargs": _as_json_type(self.nargs), - "choices": _as_json_type(self.choices), + 'name': self.name, + 'flags': list(self.flags), # convert Sequence to list + 'required': self.required, + 'default': _as_json_type(self.default), + 'help': self.help, + 'nargs': _as_json_type(self.nargs), + 'choices': _as_json_type(self.choices), }) From 0b5928b8c4ef2756ec50ca03af802c36f3bbeae6 Mon Sep 17 00:00:00 2001 From: Cristian Goina Date: Wed, 12 Nov 2025 11:00:23 -0500 Subject: [PATCH 5/8] alembic migration --- docs/Development.md | 11 +++++ .../c05fd0a62deb_create_task_data_table.py | 45 +++++++++++++++++++ pixi.lock | 2 +- pyproject.toml | 4 +- 4 files changed, 59 insertions(+), 3 deletions(-) create mode 100644 fileglancer/alembic/versions/c05fd0a62deb_create_task_data_table.py diff --git a/docs/Development.md b/docs/Development.md index b4819eeb..3fd5a30f 100644 --- a/docs/Development.md +++ b/docs/Development.md @@ -66,6 +66,17 @@ file_share_mounts: Instead of using the `file_share_mounts` setting, you can configure file share paths in the database. This is useful for production deployments where you want centralized management of file share paths. To use the paths in the database, set `file_share_mounts: []`. See [fileglancer-janelia](https://github.com/JaneliaSciComp/fileglancer-janelia) for an example of populating the file share paths in the database, using a private wiki source. +### Making changes to the database +If database changes are needed you first need to create a new alembic revision using: +``` +pixi run migrate-create -m "enter what the migration is about" +``` +Then run the actual up-migration +``` +pixi run migrate +``` +If database downgrade is required, so far there's no pixi task defined for it so that would have to be done in a `pixi shell` + ### Running with SSL/HTTPS (Secure Mode) By default, `pixi run dev-launch` runs the server in insecure HTTP mode on port 7878. This is suitable for most local development scenarios. diff --git a/fileglancer/alembic/versions/c05fd0a62deb_create_task_data_table.py b/fileglancer/alembic/versions/c05fd0a62deb_create_task_data_table.py new file mode 100644 index 00000000..5a75bb1e --- /dev/null +++ b/fileglancer/alembic/versions/c05fd0a62deb_create_task_data_table.py @@ -0,0 +1,45 @@ +"""create task data table + +Revision ID: c05fd0a62deb +Revises: 9812335c52b6 +Create Date: 2025-11-12 10:53:48.450263 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'c05fd0a62deb' +down_revision = '9812335c52b6' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('tasks', + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('name', sa.String(), nullable=False), + sa.Column('owner', sa.String(), nullable=True), + sa.Column('proxy', sa.String(), nullable=True), + sa.Column('parameters', sa.String(), nullable=True), + sa.Column('compute_resources', sa.String(), nullable=True), + sa.Column('monitor_url', sa.String(), nullable=True), + sa.Column('output_log', sa.String(), nullable=True), + sa.Column('error_log', sa.String(), nullable=True), + sa.Column('status', sa.String(), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('started_at', sa.DateTime(), nullable=True), + sa.Column('finished_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_tasks_name'), 'tasks', ['name'], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_tasks_name'), table_name='tasks') + op.drop_table('tasks') + # ### end Alembic commands ### \ No newline at end of file diff --git a/pixi.lock b/pixi.lock index 5934b706..127437c5 100644 --- a/pixi.lock +++ b/pixi.lock @@ -3031,7 +3031,7 @@ packages: - pypi: ./ name: fileglancer version: 2.1.2 - sha256: 34cc9d943a0f28f313fe0d8da68798e8d9e62db25c8c22c2917b2e06cf848306 + sha256: d2e03ed9f609f89fc78d7ece14bf36c43f0af00e92af9eddaae332a31ca5161c requires_dist: - aiosqlite>=0.21.0 - alembic>=1.17.0 diff --git a/pyproject.toml b/pyproject.toml index 6a520ae0..060f6acb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -147,8 +147,8 @@ dev-watch = { cmd = "cd frontend && NODE_ENV=development npm run watch" } dev-launch = "pixi run uvicorn fileglancer.app:app --no-access-log --port 7878 --reload" dev-launch-remote = "pixi run uvicorn fileglancer.app:app --no-access-log --host 0.0.0.0 --port 7878 --reload --ssl-keyfile /opt/certs/cert.key --ssl-certfile /opt/certs/cert.crt" dev-launch-secure = "python fileglancer/dev_launch.py" -migrate = "alembic upgrade head" -migrate-create = "alembic revision --autogenerate" +migrate = { cmd = "alembic upgrade head", cwd = "fileglancer" } +migrate-create = { cmd = "alembic revision --autogenerate", cwd = "fileglancer" } stamp-db = "python -m fileglancer.stamp_db" [tool.pixi.dependencies] From 70c0dc79e8ca793515be3acc2a1ecb93bb599db0 Mon Sep 17 00:00:00 2001 From: Cristian Goina Date: Wed, 12 Nov 2025 11:01:00 -0500 Subject: [PATCH 6/8] changed made by pre-push --- frontend/src/App.tsx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 9db3176e..c703a119 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -28,7 +28,8 @@ function RequireAuth({ children }: { readonly children: ReactNode }) { // If not authenticated, redirect to home page with the current URL as 'next' parameter if (!authStatus?.authenticated) { - const currentPath = window.location.pathname + window.location.search + window.location.hash; + const currentPath = + window.location.pathname + window.location.search + window.location.hash; const encodedNext = encodeURIComponent(currentPath); window.location.href = `/fg/?next=${encodedNext}`; return ( From 7e5e85dc656b0a77cbc1941cf15dfc72d996dee1 Mon Sep 17 00:00:00 2001 From: Cristian Goina Date: Wed, 12 Nov 2025 13:56:29 -0500 Subject: [PATCH 7/8] pass settings to a task in case I need config settings --- fileglancer/app.py | 6 ++-- fileglancer/auth.py | 3 +- fileglancer/fgtasks/fgtasks.py | 58 +++++++++++++++++++++++++++------- fileglancer/fgtasks/nftask.py | 4 +-- 4 files changed, 52 insertions(+), 19 deletions(-) diff --git a/fileglancer/app.py b/fileglancer/app.py index e1af32d3..1f41809e 100644 --- a/fileglancer/app.py +++ b/fileglancer/app.py @@ -1091,7 +1091,7 @@ async def simple_login_handler(request: Request, body: dict = Body(...)): @app.get("/api/tasks") async def list_tasks(): - tasks_registry = get_tasks_registry() + tasks_registry = get_tasks_registry(get_settings()) # list tasks task_names = tasks_registry.list_tasks() return JSONResponse(content = task_names, status_code=200) @@ -1100,7 +1100,7 @@ async def list_tasks(): async def get_task_params( task_name: str, ): - tasks_registry = get_tasks_registry() + tasks_registry = get_tasks_registry(get_settings()) logger.info(f'Lookup task {task_name}') # lookup task task_defn = tasks_registry.get_task(task_name) @@ -1117,7 +1117,7 @@ async def create_task( task_input: Dict[str, Any] = Body(...), username: str = Depends(get_current_user) ): - tasks_registry = get_tasks_registry() + tasks_registry = get_tasks_registry(get_settings()) with _get_user_context(username): # lookup task task_defn = tasks_registry.get_task(task_name) diff --git a/fileglancer/auth.py b/fileglancer/auth.py index a10d058c..479cb536 100644 --- a/fileglancer/auth.py +++ b/fileglancer/auth.py @@ -1,9 +1,8 @@ """ Authentication module for OKTA OAuth/OIDC integration """ -import os import hashlib -from datetime import datetime, timedelta, UTC +from datetime import datetime, UTC from typing import Optional from authlib.integrations.starlette_client import OAuth diff --git a/fileglancer/fgtasks/fgtasks.py b/fileglancer/fgtasks/fgtasks.py index 019d83e2..68b34f91 100644 --- a/fileglancer/fgtasks/fgtasks.py +++ b/fileglancer/fgtasks/fgtasks.py @@ -6,7 +6,8 @@ from importlib.metadata import entry_points from typing import Dict, List, Any, Optional, Sequence from loguru import logger - +from pathlib import Path +from fileglancer.settings import Settings @dataclass class TaskData: @@ -37,8 +38,9 @@ class TaskDefn(ABC): """ Task definition """ - def __init__(self, name:str): + def __init__(self, name:str, settings:Settings): self._name = name + self._settings = settings self._executor = None # parameter definition @@ -107,11 +109,11 @@ def to_json(self) -> str: }) - class TaskExecutor: def __init__(self, task_name): self._task_name = task_name self._process = None + self._monitor = None self._process_stdout_reader = None self._process_stderr_reader = None @@ -123,25 +125,57 @@ async def execute(self, cmd:List[str]): stderr=asyncio.subprocess.PIPE, ) logger.info(f'Task {self._task_name} PID: {self._process.pid}') + # Start the background monitor + self._monitor = asyncio.create_task(self._monitor_task_process(self._process)) # Start background tasks to read the output output self._process_stdout_reader = asyncio.create_task(self._read_stream(self._process.stdout, "STDOUT")) self._process_stderr_reader = asyncio.create_task(self._read_stream(self._process.stderr, "STDERR")) - async def _read_stream(self, stream: asyncio.StreamReader | None, label: str): + async def _read_stream(self, stream: asyncio.StreamReader | None, label: str, filename: str|None=None): """Read and log process output lines as they arrive.""" + if stream is None: + return # nothing to do try: - if stream is not None: - async for line in stream: - text = line.decode().rstrip() - logger.info(f"[{label}] {text}") + if filename: + # Ensure all parent directories exist + Path(filename).parent.mkdir(parents=True, exist_ok=True) + # Open file for writing (append mode is often useful for logs) + file_handle = open(filename, 'a', encoding='utf-8') + else: + file_handle = None + + async for line in stream: + text = line.decode(errors='replace').rstrip() + logger.debug(f"[{label}] {text}") + + if file_handle: + file_handle.write(text + '\n') + file_handle.flush() + except Exception as e: logger.error(f"Error reading {label}: {e}") + async def _monitor_task_process(self, process:asyncio.subprocess.Process): + try: + while True: + if process.returncode is not None: + logger.info(f'{self._task_name} process finished with {process.returncode} code') + await asyncio.gather( + *(t for t in [self._process_stdout_reader, self._process_stderr_reader] if t), + return_exceptions=True + ) + break + await asyncio.sleep(0.5) + except asyncio.CancelledError: + logger.info(f'{self._task_name} monitor cancelled') + except Exception as e: + logger.error(f'{self._task_name} monitor error: {e}') class TaskRegistry: - def __init__(self, entry_point_group: str = 'fileglancer.tasks'): + def __init__(self, settings: Settings, entry_point_group: str = 'fileglancer.tasks'): + self._settings = settings self._entry_point_group = entry_point_group self._tasks : Dict[str, TaskDefn] = {} @@ -156,7 +190,7 @@ def discover_tasks(self) -> None: for ep in group: try: task_class = ep.load() - task = task_class(ep.name) + task = task_class(ep.name, self._settings) if not isinstance(task, TaskDefn): logger.warning(f'Warning: {ep.name} is not a Task definition instance') continue @@ -203,11 +237,11 @@ def create_taskdata(task_name: str, tasks_registry : Optional[TaskRegistry] = None -def get_tasks_registry(): +def get_tasks_registry(settings:Settings): global tasks_registry if tasks_registry is None: logger.info('Initialize task registry') - tasks_registry = TaskRegistry() + tasks_registry = TaskRegistry(settings) tasks_registry.discover_tasks() return tasks_registry diff --git a/fileglancer/fgtasks/nftask.py b/fileglancer/fgtasks/nftask.py index 8d122317..1844a26a 100644 --- a/fileglancer/fgtasks/nftask.py +++ b/fileglancer/fgtasks/nftask.py @@ -7,8 +7,8 @@ class NextflowTaskDefn(TaskDefn): - def __init__(self, name): - super().__init__(name) + def __init__(self, name, settings): + super().__init__(name, settings) self._process = None def define_args(self) -> ArgumentParser: From 7c5316e4d05200bac232392888095146ca6c0334 Mon Sep 17 00:00:00 2001 From: Cristian Goina Date: Thu, 13 Nov 2025 19:01:05 -0500 Subject: [PATCH 8/8] options to get the actual pipeline parameters --- fileglancer/app.py | 6 +++- fileglancer/fgtasks/fgtasks.py | 11 +++++-- fileglancer/fgtasks/nftask.py | 57 ++++++++++++++++++++++++++++++++-- 3 files changed, 67 insertions(+), 7 deletions(-) diff --git a/fileglancer/app.py b/fileglancer/app.py index 1f41809e..5cf0d936 100644 --- a/fileglancer/app.py +++ b/fileglancer/app.py @@ -1099,6 +1099,7 @@ async def list_tasks(): @app.get("/api/tasks/{task_name}/param-defs") async def get_task_params( task_name: str, + request: Request ): tasks_registry = get_tasks_registry(get_settings()) logger.info(f'Lookup task {task_name}') @@ -1108,7 +1109,10 @@ async def get_task_params( logger.error(f'No task found for {task_name}') return Response(status_code = 404) - param_defs = [p.to_json() for p in task_defn.parameter_defns] + # request.query_params is a MultiDict-like object + task_context = dict(request.query_params) + + param_defs = [p.to_json() for p in task_defn.parameter_defns(task_context)] return JSONResponse(content = param_defs, status_code=200) @app.post("/api/tasks/{task_name}") diff --git a/fileglancer/fgtasks/fgtasks.py b/fileglancer/fgtasks/fgtasks.py index 68b34f91..2e3f2f42 100644 --- a/fileglancer/fgtasks/fgtasks.py +++ b/fileglancer/fgtasks/fgtasks.py @@ -44,8 +44,7 @@ def __init__(self, name:str, settings:Settings): self._executor = None # parameter definition - @property - def parameter_defns(self) -> List[TaskParameterDefn]: + def parameter_defns(self, task_context:Dict[str,Any]={}) -> List[TaskParameterDefn]: """List of parameters accepted by this task""" args_meta = [] argparser = self.define_args() @@ -61,13 +60,19 @@ def parameter_defns(self) -> List[TaskParameterDefn]: action.nargs, action.choices, )) + if task_context: + args_meta.extend(self.parameter_defns_for_context(task_context)) return args_meta + @abstractmethod + def parameter_defns_for_context(self, task_context:Dict[str, Any])-> List[TaskParameterDefn]: + pass + async def launch_task(self, task_data:TaskData): # the default launcher parses the task arguments and invokes the execute method logger.info(f'Parse task args: {task_data.parameters}') args, additional_args = self.define_args().parse_known_args(task_data.parameters) - task_args = {tp.name: getattr(args, tp.name) for tp in self.parameter_defns} + task_args = {tp.name: getattr(args, tp.name) for tp in self.parameter_defns()} task_args['extra_args'] = additional_args logger.info(f'Task arguments: {task_args}') diff --git a/fileglancer/fgtasks/nftask.py b/fileglancer/fgtasks/nftask.py index 1844a26a..3519a9bf 100644 --- a/fileglancer/fgtasks/nftask.py +++ b/fileglancer/fgtasks/nftask.py @@ -1,9 +1,10 @@ -import asyncio +import json from argparse import ArgumentParser from loguru import logger -from typing import Optional, List -from .fgtasks import TaskData, TaskDefn +from pathlib import Path +from typing import Any, Dict, List, Optional +from .fgtasks import TaskData, TaskDefn, TaskParameterDefn class NextflowTaskDefn(TaskDefn): @@ -45,3 +46,53 @@ def create_task_cmd(self, task_data: TaskData, + extra_args) logger.debug('Nextflow cmd', cmdline) return cmdline + + def parameter_defns_for_context(self, task_context:Dict[str, Any])-> List[TaskParameterDefn]: + """ + task_context: dictionary containing pipeline path and a flag whether to include hidden parameters, e.g., + {'pipeline': '/location/of/the/pipeline', 'include_hidden': True} + """ + pipeline_path = task_context.get('pipeline') + if not pipeline_path: + return [] + p = Path(pipeline_path) + if p.is_dir(): + p = p / 'nextflow_schema.json' + if not p.exists(): + raise ValueError(f'No schema found at {pipeline_path}') + + with open(p, "r", encoding="utf-8") as nf_schema_file: + nf_schema = json.load(nf_schema_file) + + include_hidden = task_context.get('include_hidden', False) + return self._extract_parameter_defns_from_section(nf_schema, include_hidden) + + def _extract_parameter_defns_from_section(self, section: Dict[str, Any], include_hidden): + param_defs = [] + if 'properties' in section: + props = section['properties'] + required_fields = set(section.get("required", [])) + for name, attr in props.items(): + is_hidden = attr.get('hidden', False) + if not is_hidden or include_hidden: + param = TaskParameterDefn( + name=name, + flags=[f'--{name}'], + required=name in required_fields, + default=attr.get('default'), + help=attr.get('description'), + nargs='+' if attr.get('type') == 'array' else None, + choices=attr.get('enum') + ) + param_defs.append(param) + + for key, value in section.items(): + if isinstance(value, dict): + logger.debug(f'Extract {key} parmeters') + param_defs.extend(self._extract_parameter_defns_from_section(value, include_hidden)) + + + return param_defs + + + return [] # !!!!! FIXME \ No newline at end of file