diff --git a/src/hermes/commands/base.py b/src/hermes/commands/base.py index 3ae9030b..5a242a76 100644 --- a/src/hermes/commands/base.py +++ b/src/hermes/commands/base.py @@ -132,7 +132,7 @@ def init_command_parser(self, command_parser: argparse.ArgumentParser) -> None: def load_settings(self, args: argparse.Namespace): """Load settings from the configuration file (passed in from command line).""" try: - toml_data = toml.load(args.path / args.config) + toml_data = toml.load("." / args.config) self.root_settings = HermesCommand.settings_class.model_validate(toml_data) self.settings = getattr(self.root_settings, self.command_name) except FileNotFoundError as e: diff --git a/src/hermes/commands/clean/base.py b/src/hermes/commands/clean/base.py index b588faf5..4e5e4ea2 100644 --- a/src/hermes/commands/clean/base.py +++ b/src/hermes/commands/clean/base.py @@ -6,6 +6,7 @@ import argparse import shutil +import logging from pydantic import BaseModel @@ -27,6 +28,7 @@ def __call__(self, args: argparse.Namespace) -> None: self.log.info("Removing HERMES caches...") # Naive implementation for now... check errors, validate directory, don't construct the path ourselves, etc. + logging.shutdown() shutil.rmtree(args.path / '.hermes') def load_settings(self, args: argparse.Namespace): diff --git a/src/hermes/commands/harvest/cff.py b/src/hermes/commands/harvest/cff.py index e333b27c..57fcf19f 100644 --- a/src/hermes/commands/harvest/cff.py +++ b/src/hermes/commands/harvest/cff.py @@ -19,6 +19,8 @@ from hermes.model.context import ContextPath from hermes.model.errors import HermesValidationError from hermes.commands.harvest.base import HermesHarvestPlugin, HermesHarvestCommand +from hermes.commands.harvest.util.remote_harvesting import normalize_url, fetch_metadata_from_repo +from hermes.commands.harvest.util.token import load_token_from_toml # TODO: should this be configurable via a CLI option? @@ -36,15 +38,24 @@ class CffHarvestPlugin(HermesHarvestPlugin): settings_class = CffHarvestSettings def __call__(self, command: HermesHarvestCommand) -> t.Tuple[t.Dict, t.Dict]: + + self.token = load_token_from_toml('hermes.toml') + # Get source files - cff_file = self._get_single_cff(command.args.path) + + cff_file, temp_dir_obj = self._get_single_cff(command.args.path) + if not cff_file: raise HermesValidationError(f'{command.args.path} contains either no or more than 1 CITATION.cff file. ' 'Aborting harvesting for this metadata source.') # Read the content - cff_data = cff_file.read_text() + cff_data = cff_file.read_text(encoding='utf-8') + # clean up the temp + if temp_dir_obj: + temp_dir_obj.cleanup() + # Validate the content to be correct CFF cff_dict = self._load_cff_from_file(cff_data) @@ -109,18 +120,27 @@ def _validate(self, cff_file: pathlib.Path, cff_dict: t.Dict) -> bool: return True def _get_single_cff(self, path: pathlib.Path) -> t.Optional[pathlib.Path]: - # Find CFF files in directories and subdirectories - cff_file = path / 'CITATION.cff' - if cff_file.exists(): - return cff_file - - # TODO: Do we really want to search recursive? CFF convention is the file should be at the topmost dir, - # which is given via the --path arg. Maybe add another option to enable pointing to a single file? - # (So this stays "convention over configuration") - files = list(path.rglob('**/CITATION.cff')) - if len(files) == 1: - return pathlib.Path(files[0]) - # TODO: Shouldn't we log/echo the found CFF files so a user can debug/cleanup? - # TODO: Do we want to hand down a logging instance via Hermes context or just encourage - # peeps to use the Click context? - return None + if str(path).startswith("http:") or str(path).startswith("https:"): + # Find CFF files from the provided URL repository + normalized_url = normalize_url(str(path)) + file_info = fetch_metadata_from_repo(normalized_url, "CITATION.cff", token=self.token) + if not file_info: + return {} + else: + return file_info + else: + # Find CFF files in directories and subdirectories + cff_file = path / 'CITATION.cff' + if cff_file.exists(): + return cff_file, None + + # TODO: Do we really want to search recursive? CFF convention is the file should be at the topmost dir, + # which is given via the --path arg. Maybe add another option to enable pointing to a single file? + # (So this stays "convention over configuration") + files = list(path.rglob('**/CITATION.cff')) + if len(files) == 1: + return pathlib.Path(files[0]), None + # TODO: Shouldn't we log/echo the found CFF files so a user can debug/cleanup? + # TODO: Do we want to hand down a logging instance via Hermes context or just encourage + # peeps to use the Click context? + return None, None \ No newline at end of file diff --git a/src/hermes/commands/harvest/codemeta.py b/src/hermes/commands/harvest/codemeta.py index b75bb002..56db9b55 100644 --- a/src/hermes/commands/harvest/codemeta.py +++ b/src/hermes/commands/harvest/codemeta.py @@ -13,10 +13,14 @@ from hermes.commands.harvest.base import HermesHarvestCommand, HermesHarvestPlugin from hermes.commands.harvest.util.validate_codemeta import validate_codemeta from hermes.model.errors import HermesValidationError - +from hermes.commands.harvest.util.remote_harvesting import normalize_url, fetch_metadata_from_repo +from hermes.commands.harvest.util.token import load_token_from_toml class CodeMetaHarvestPlugin(HermesHarvestPlugin): def __call__(self, command: HermesHarvestCommand) -> t.Tuple[t.Dict, t.Dict]: + + self.token = load_token_from_toml('hermes.toml') + """ Implementation of a harvester that provides data from a codemeta.json file format. @@ -25,7 +29,7 @@ def __call__(self, command: HermesHarvestCommand) -> t.Tuple[t.Dict, t.Dict]: :param ctx: The harvesting context that should contain the provided metadata. """ # Get source files - codemeta_file = self._get_single_codemeta(command.args.path) + codemeta_file, temp_dir_obj = self._get_single_codemeta(command.args.path) if not codemeta_file: raise HermesValidationError( f"{command.args.path} contains either no or more than 1 codemeta.json file. Aborting harvesting " @@ -33,11 +37,14 @@ def __call__(self, command: HermesHarvestCommand) -> t.Tuple[t.Dict, t.Dict]: ) # Read the content - codemeta_str = codemeta_file.read_text() + codemeta_str = codemeta_file.read_text(encoding='utf-8') if not self._validate(codemeta_file): raise HermesValidationError(codemeta_file) + if temp_dir_obj: + temp_dir_obj.cleanup() + codemeta = json.loads(codemeta_str) return codemeta, {'local_path': str(codemeta_file)} @@ -56,13 +63,22 @@ def _validate(self, codemeta_file: pathlib.Path) -> bool: return True def _get_single_codemeta(self, path: pathlib.Path) -> t.Optional[pathlib.Path]: - # Find CodeMeta files in directories and subdirectories - # TODO: Do we really want to search recursive? Maybe add another option to enable pointing to a single file? - # (So this stays "convention over configuration") - files = glob.glob(str(path / "**" / "codemeta.json"), recursive=True) - if len(files) == 1: - return pathlib.Path(files[0]) - # TODO: Shouldn't we log/echo the found CFF files so a user can debug/cleanup? - # TODO: Do we want to hand down a logging instance via Hermes context or just encourage - # peeps to use the Click context? - return None + if str(path).startswith("http:") or str(path).startswith("https:"): + # Find CodeMeta files from the provided URL repository + normalized_url = normalize_url(str(path)) + file_info = fetch_metadata_from_repo(normalized_url, "codemeta.json", token=self.token) + if not file_info: + return None, None + else: + return file_info + else: + # Find CodeMeta files in directories and subdirectories + # TODO: Do we really want to search recursive? Maybe add another option to enable pointing to a single file? + # (So this stays "convention over configuration") + files = glob.glob(str(path / "**" / "codemeta.json"), recursive=True) + if len(files) == 1: + return pathlib.Path(files[0]), None + # TODO: Shouldn't we log/echo the found CFF files so a user can debug/cleanup? + # TODO: Do we want to hand down a logging instance via Hermes context or just encourage + # peeps to use the Click context? + return None, None \ No newline at end of file diff --git a/src/hermes/commands/harvest/util/remote_harvesting.py b/src/hermes/commands/harvest/util/remote_harvesting.py new file mode 100644 index 00000000..114088ed --- /dev/null +++ b/src/hermes/commands/harvest/util/remote_harvesting.py @@ -0,0 +1,157 @@ +# SPDX-FileCopyrightText: 2025 OFFIS e.V. +# +# SPDX-License-Identifier: Apache-2.0 + +# SPDX-FileContributor: Stephan Ferenz +# SPDX-FileContributor: Aida Jafarbigloo + +import pathlib +import requests +import tempfile +import typing as t +from urllib.parse import urlparse, quote + +from hermes.utils import hermes_user_agent + + +def normalize_url(path: str) -> str: + """Normalize a given URL by correcting backslashes and fixing malformed HTTPS.""" + corrected_url = path.replace("\\", "/") + return corrected_url.replace("https:/", "https://") + + +def fetch_metadata_from_repo(repo_url: str, filename: str, token: str = None) -> t.Optional[t.Tuple[pathlib.Path, tempfile.TemporaryDirectory]]: + """ + Fetch a metadata file (e.g., CITATION.cff or codemeta.json) from a GitHub or GitLab repository. + + :param repo_url: The repository URL. + :param filename: The name of the metadata file to fetch. + :param token: (Optional) Access token for authentication (GitHub token or GitLab private token). + :return: A tuple containing: + - Path to the downloaded metadata file. + - TemporaryDirectory object (caller is responsible for cleanup). + Returns None if the file could not be fetched. + """ + try: + session = requests.Session() + session.headers.update({"User-Agent": hermes_user_agent}) + if token: + if "github" in repo_url: + session.headers.update({"Authorization": f"token {token}"}) + elif "gitlab" in repo_url: + session.headers.update({"PRIVATE-TOKEN": token}) + + temp_dir_obj = tempfile.TemporaryDirectory() + temp_dir = pathlib.Path(temp_dir_obj.name) + + parsed_url = urlparse(repo_url) + + if "github.com" in repo_url: + # GitHub API: List repository contents + api_url = repo_url.replace("github.com", "api.github.com/repos").rstrip("/") + "/contents" + response = session.get(api_url) + if response.status_code == 200: + for file_info in response.json(): + if file_info["name"] == filename: + temp_file = _download_to_tempfile(file_info["download_url"], filename, temp_dir, session) + return temp_file, temp_dir_obj + elif "gitlab" in parsed_url.netloc: + # GitLab API + temp_file, temp_dir = _fetch_from_gitlab(parsed_url, filename, temp_dir, session) + if temp_file: + return temp_file, temp_dir_obj + else: + print(f"Unsupported repository URL: {repo_url}") + temp_dir_obj.cleanup() + return None + + except Exception as e: + print(f"Error fetching metadata from repository: {e}") + return None + + +def _fetch_from_gitlab(parsed_url, filename, temp_dir, session): + """ + Helper function to fetch a file from GitLab. + """ + base_domain = parsed_url.netloc + project_path = parsed_url.path.lstrip('/') + encoded_project_path = quote(project_path, safe='') + + # Step 1: Detect default branch + project_api_url = f"https://{base_domain}/api/v4/projects/{encoded_project_path}" + project_resp = session.get(project_api_url) + if project_resp.status_code != 200: + print(f"Failed to fetch project info: {project_resp.status_code}") + return None, None + + project_info = project_resp.json() + default_branch = project_info.get('default_branch', 'main') # fallback to 'main' if not found + + # Step 2: Search for the file recursively + page = 1 + per_page = 100 + found_file = None + + while True: + api_url = ( + f"https://{base_domain}/api/v4/projects/{encoded_project_path}/repository/tree" + f"?recursive=true&per_page={per_page}&page={page}" + ) + response = session.get(api_url) + if response.status_code != 200: + print(f"Failed to fetch repo tree: {response.status_code}") + break + + files_list = response.json() + if not files_list: + break + + for file_info in files_list: + if file_info.get("type") == "blob" and file_info.get("name", "").lower() == filename.lower(): + found_file = file_info + break + + if found_file: + break + + page += 1 + + # Step 3: Download the file + if found_file: + file_path_in_repo = found_file["path"] + file_url = ( + f"https://{base_domain}/api/v4/projects/" + f"{encoded_project_path}/repository/files/" + f"{quote(file_path_in_repo, safe='')}/raw?ref={default_branch}" + ) + temp_file = _download_to_tempfile(file_url, filename, temp_dir, session) + if temp_file: + print(f"Downloaded file: {temp_file}") + return temp_file, temp_dir + + print(f"{filename} not found in repository.") + return None, None + + + +def _download_to_tempfile(url: str, filename: str, temp_dir: pathlib.Path, session: requests.Session) -> pathlib.Path: + try: + response = session.get(url) + if response.status_code == 200: + file_path = temp_dir / filename + + try: + text = response.content.decode('utf-8') + with open(file_path, 'w', encoding='utf-8') as f: + f.write(text) + except UnicodeDecodeError: + with open(file_path, 'wb') as f: + f.write(response.content) + return pathlib.Path(file_path) + else: + print(f"Failed to download {filename}: {response.status_code}") + return None + except Exception as e: + print(f"Error downloading {filename}: {e}") + return None diff --git a/src/hermes/commands/harvest/util/token.py b/src/hermes/commands/harvest/util/token.py new file mode 100644 index 00000000..a3539d4f --- /dev/null +++ b/src/hermes/commands/harvest/util/token.py @@ -0,0 +1,29 @@ +# SPDX-FileCopyrightText: 2025 OFFIS e.V. +# +# SPDX-License-Identifier: Apache-2.0 + +# SPDX-FileContributor: Stephan Ferenz +# SPDX-FileContributor: Aida Jafarbigloo + +import toml +import base64 + + +def load_token_from_toml(config_path: str = "hermes.toml") -> str: + """ + Loads and decodes the token from the HERMES TOML configuration file. + + Args: + config_path (str): Path to the TOML config file. + + Returns: + str: The decoded token. + """ + with open(config_path, "r") as f: + config = toml.load(f) + + encoded_token = config.get('harvest', {}).get('token') + if encoded_token: + return base64.b64decode(encoded_token.encode()).decode() + else: + return None