Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
453198b
Issue #276 - Add a new argument to accept a URL for harvesting
Aidajafarbigloo Oct 10, 2024
3a7c9ad
Issue #276 - Harvest metadata from the provided URL
Aidajafarbigloo Oct 10, 2024
153e676
Issue #276 - Store harvested data from URL
Aidajafarbigloo Oct 26, 2024
16cba5d
Issue #276 - Harvest metadata from CFF via path
Aidajafarbigloo Jan 30, 2025
afb8189
Issue #276 - Harvest metadata from CodeMeta via path
Aidajafarbigloo Jan 30, 2025
09401ed
Issue #276 - Refactor functions for harvesting CFF/CodeMeta via path
Aidajafarbigloo Jan 30, 2025
f193cc9
Issue #276 - Revert to original base.py
Aidajafarbigloo Jan 30, 2025
1bab2c7
Issue #276 - Update base.py
Aidajafarbigloo Jan 30, 2025
25eec31
Add functionality to remove temp files
Aidajafarbigloo Feb 7, 2025
98814f4
Remove temp files
Aidajafarbigloo Feb 7, 2025
dd56827
Remove temp files
Aidajafarbigloo Feb 7, 2025
88ad304
Merge branch 'develop' into 'feature/276-harvesting-metadata-from-a-p…
Aidajafarbigloo Feb 12, 2025
5f75ad1
Issue #276 - Add SPDX headers
Aidajafarbigloo Feb 13, 2025
b9e5523
softwarepub#276 - Merge latest changes from develop into feature
Aidajafarbigloo Apr 11, 2025
4d901fc
Update base.py
Aidajafarbigloo Apr 14, 2025
3aa06a0
Fix issues: HERMES user agent and temporary files
Aidajafarbigloo May 14, 2025
1bd4d1f
Fix hermes clean command
Aidajafarbigloo May 14, 2025
3918954
Small fix
Aidajafarbigloo May 17, 2025
f170481
Load token from toml file
Aidajafarbigloo Jun 6, 2025
a894259
Use token
Aidajafarbigloo Jun 6, 2025
14fc040
Small fix
Aidajafarbigloo Jun 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/hermes/commands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def init_command_parser(self, command_parser: argparse.ArgumentParser) -> None:
def load_settings(self, args: argparse.Namespace):
"""Load settings from the configuration file (passed in from command line)."""
try:
toml_data = toml.load(args.path / args.config)
toml_data = toml.load("." / args.config)
self.root_settings = HermesCommand.settings_class.model_validate(toml_data)
self.settings = getattr(self.root_settings, self.command_name)
except FileNotFoundError as e:
Expand Down
2 changes: 2 additions & 0 deletions src/hermes/commands/clean/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import argparse
import shutil
import logging

from pydantic import BaseModel

Expand All @@ -27,6 +28,7 @@ def __call__(self, args: argparse.Namespace) -> None:
self.log.info("Removing HERMES caches...")

# Naive implementation for now... check errors, validate directory, don't construct the path ourselves, etc.
logging.shutdown()
shutil.rmtree(args.path / '.hermes')

def load_settings(self, args: argparse.Namespace):
Expand Down
54 changes: 37 additions & 17 deletions src/hermes/commands/harvest/cff.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from hermes.model.context import ContextPath
from hermes.model.errors import HermesValidationError
from hermes.commands.harvest.base import HermesHarvestPlugin, HermesHarvestCommand
from hermes.commands.harvest.util.remote_harvesting import normalize_url, fetch_metadata_from_repo
from hermes.commands.harvest.util.token import load_token_from_toml


# TODO: should this be configurable via a CLI option?
Expand All @@ -36,15 +38,24 @@ class CffHarvestPlugin(HermesHarvestPlugin):
settings_class = CffHarvestSettings

def __call__(self, command: HermesHarvestCommand) -> t.Tuple[t.Dict, t.Dict]:

self.token = load_token_from_toml('hermes.toml')

# Get source files
cff_file = self._get_single_cff(command.args.path)

cff_file, temp_dir_obj = self._get_single_cff(command.args.path)

if not cff_file:
raise HermesValidationError(f'{command.args.path} contains either no or more than 1 CITATION.cff file. '
'Aborting harvesting for this metadata source.')

# Read the content
cff_data = cff_file.read_text()
cff_data = cff_file.read_text(encoding='utf-8')

# clean up the temp
if temp_dir_obj:
temp_dir_obj.cleanup()

# Validate the content to be correct CFF
cff_dict = self._load_cff_from_file(cff_data)

Expand Down Expand Up @@ -109,18 +120,27 @@ def _validate(self, cff_file: pathlib.Path, cff_dict: t.Dict) -> bool:
return True

def _get_single_cff(self, path: pathlib.Path) -> t.Optional[pathlib.Path]:
# Find CFF files in directories and subdirectories
cff_file = path / 'CITATION.cff'
if cff_file.exists():
return cff_file

# TODO: Do we really want to search recursive? CFF convention is the file should be at the topmost dir,
# which is given via the --path arg. Maybe add another option to enable pointing to a single file?
# (So this stays "convention over configuration")
files = list(path.rglob('**/CITATION.cff'))
if len(files) == 1:
return pathlib.Path(files[0])
# TODO: Shouldn't we log/echo the found CFF files so a user can debug/cleanup?
# TODO: Do we want to hand down a logging instance via Hermes context or just encourage
# peeps to use the Click context?
return None
if str(path).startswith("http:") or str(path).startswith("https:"):
# Find CFF files from the provided URL repository
normalized_url = normalize_url(str(path))
file_info = fetch_metadata_from_repo(normalized_url, "CITATION.cff", token=self.token)
if not file_info:
return {}
else:
return file_info
else:
# Find CFF files in directories and subdirectories
cff_file = path / 'CITATION.cff'
if cff_file.exists():
return cff_file, None

# TODO: Do we really want to search recursive? CFF convention is the file should be at the topmost dir,
# which is given via the --path arg. Maybe add another option to enable pointing to a single file?
# (So this stays "convention over configuration")
files = list(path.rglob('**/CITATION.cff'))
if len(files) == 1:
return pathlib.Path(files[0]), None
# TODO: Shouldn't we log/echo the found CFF files so a user can debug/cleanup?
# TODO: Do we want to hand down a logging instance via Hermes context or just encourage
# peeps to use the Click context?
return None, None
42 changes: 29 additions & 13 deletions src/hermes/commands/harvest/codemeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
from hermes.commands.harvest.base import HermesHarvestCommand, HermesHarvestPlugin
from hermes.commands.harvest.util.validate_codemeta import validate_codemeta
from hermes.model.errors import HermesValidationError

from hermes.commands.harvest.util.remote_harvesting import normalize_url, fetch_metadata_from_repo
from hermes.commands.harvest.util.token import load_token_from_toml

class CodeMetaHarvestPlugin(HermesHarvestPlugin):
def __call__(self, command: HermesHarvestCommand) -> t.Tuple[t.Dict, t.Dict]:

self.token = load_token_from_toml('hermes.toml')

"""
Implementation of a harvester that provides data from a codemeta.json file format.

Expand All @@ -25,19 +29,22 @@ def __call__(self, command: HermesHarvestCommand) -> t.Tuple[t.Dict, t.Dict]:
:param ctx: The harvesting context that should contain the provided metadata.
"""
# Get source files
codemeta_file = self._get_single_codemeta(command.args.path)
codemeta_file, temp_dir_obj = self._get_single_codemeta(command.args.path)
if not codemeta_file:
raise HermesValidationError(
f"{command.args.path} contains either no or more than 1 codemeta.json file. Aborting harvesting "
f"for this metadata source."
)

# Read the content
codemeta_str = codemeta_file.read_text()
codemeta_str = codemeta_file.read_text(encoding='utf-8')

if not self._validate(codemeta_file):
raise HermesValidationError(codemeta_file)

if temp_dir_obj:
temp_dir_obj.cleanup()

codemeta = json.loads(codemeta_str)
return codemeta, {'local_path': str(codemeta_file)}

Expand All @@ -56,13 +63,22 @@ def _validate(self, codemeta_file: pathlib.Path) -> bool:
return True

def _get_single_codemeta(self, path: pathlib.Path) -> t.Optional[pathlib.Path]:
# Find CodeMeta files in directories and subdirectories
# TODO: Do we really want to search recursive? Maybe add another option to enable pointing to a single file?
# (So this stays "convention over configuration")
files = glob.glob(str(path / "**" / "codemeta.json"), recursive=True)
if len(files) == 1:
return pathlib.Path(files[0])
# TODO: Shouldn't we log/echo the found CFF files so a user can debug/cleanup?
# TODO: Do we want to hand down a logging instance via Hermes context or just encourage
# peeps to use the Click context?
return None
if str(path).startswith("http:") or str(path).startswith("https:"):
# Find CodeMeta files from the provided URL repository
normalized_url = normalize_url(str(path))
file_info = fetch_metadata_from_repo(normalized_url, "codemeta.json", token=self.token)
if not file_info:
return None, None
else:
return file_info
else:
# Find CodeMeta files in directories and subdirectories
# TODO: Do we really want to search recursive? Maybe add another option to enable pointing to a single file?
# (So this stays "convention over configuration")
files = glob.glob(str(path / "**" / "codemeta.json"), recursive=True)
if len(files) == 1:
return pathlib.Path(files[0]), None
# TODO: Shouldn't we log/echo the found CFF files so a user can debug/cleanup?
# TODO: Do we want to hand down a logging instance via Hermes context or just encourage
# peeps to use the Click context?
return None, None
157 changes: 157 additions & 0 deletions src/hermes/commands/harvest/util/remote_harvesting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# SPDX-FileCopyrightText: 2025 OFFIS e.V.
#
# SPDX-License-Identifier: Apache-2.0

# SPDX-FileContributor: Stephan Ferenz
# SPDX-FileContributor: Aida Jafarbigloo

import pathlib
import requests
import tempfile
import typing as t
from urllib.parse import urlparse, quote

from hermes.utils import hermes_user_agent


def normalize_url(path: str) -> str:
"""Normalize a given URL by correcting backslashes and fixing malformed HTTPS."""
corrected_url = path.replace("\\", "/")
return corrected_url.replace("https:/", "https://")


def fetch_metadata_from_repo(repo_url: str, filename: str, token: str = None) -> t.Optional[t.Tuple[pathlib.Path, tempfile.TemporaryDirectory]]:
"""
Fetch a metadata file (e.g., CITATION.cff or codemeta.json) from a GitHub or GitLab repository.

:param repo_url: The repository URL.
:param filename: The name of the metadata file to fetch.
:param token: (Optional) Access token for authentication (GitHub token or GitLab private token).
:return: A tuple containing:
- Path to the downloaded metadata file.
- TemporaryDirectory object (caller is responsible for cleanup).
Returns None if the file could not be fetched.
"""
try:
session = requests.Session()
session.headers.update({"User-Agent": hermes_user_agent})
if token:
if "github" in repo_url:
session.headers.update({"Authorization": f"token {token}"})
elif "gitlab" in repo_url:
session.headers.update({"PRIVATE-TOKEN": token})

temp_dir_obj = tempfile.TemporaryDirectory()
temp_dir = pathlib.Path(temp_dir_obj.name)

parsed_url = urlparse(repo_url)

if "github.com" in repo_url:
# GitHub API: List repository contents
api_url = repo_url.replace("github.com", "api.github.com/repos").rstrip("/") + "/contents"
response = session.get(api_url)
if response.status_code == 200:
for file_info in response.json():
if file_info["name"] == filename:
temp_file = _download_to_tempfile(file_info["download_url"], filename, temp_dir, session)
return temp_file, temp_dir_obj
elif "gitlab" in parsed_url.netloc:
# GitLab API
temp_file, temp_dir = _fetch_from_gitlab(parsed_url, filename, temp_dir, session)
if temp_file:
return temp_file, temp_dir_obj
else:
print(f"Unsupported repository URL: {repo_url}")
temp_dir_obj.cleanup()
return None

except Exception as e:
print(f"Error fetching metadata from repository: {e}")
return None


def _fetch_from_gitlab(parsed_url, filename, temp_dir, session):
"""
Helper function to fetch a file from GitLab.
"""
base_domain = parsed_url.netloc
project_path = parsed_url.path.lstrip('/')
encoded_project_path = quote(project_path, safe='')

# Step 1: Detect default branch
project_api_url = f"https://{base_domain}/api/v4/projects/{encoded_project_path}"
project_resp = session.get(project_api_url)
if project_resp.status_code != 200:
print(f"Failed to fetch project info: {project_resp.status_code}")
return None, None

project_info = project_resp.json()
default_branch = project_info.get('default_branch', 'main') # fallback to 'main' if not found

# Step 2: Search for the file recursively
page = 1
per_page = 100
found_file = None

while True:
api_url = (
f"https://{base_domain}/api/v4/projects/{encoded_project_path}/repository/tree"
f"?recursive=true&per_page={per_page}&page={page}"
)
response = session.get(api_url)
if response.status_code != 200:
print(f"Failed to fetch repo tree: {response.status_code}")
break

files_list = response.json()
if not files_list:
break

for file_info in files_list:
if file_info.get("type") == "blob" and file_info.get("name", "").lower() == filename.lower():
found_file = file_info
break

if found_file:
break

page += 1

# Step 3: Download the file
if found_file:
file_path_in_repo = found_file["path"]
file_url = (
f"https://{base_domain}/api/v4/projects/"
f"{encoded_project_path}/repository/files/"
f"{quote(file_path_in_repo, safe='')}/raw?ref={default_branch}"
)
temp_file = _download_to_tempfile(file_url, filename, temp_dir, session)
if temp_file:
print(f"Downloaded file: {temp_file}")
return temp_file, temp_dir

print(f"{filename} not found in repository.")
return None, None



def _download_to_tempfile(url: str, filename: str, temp_dir: pathlib.Path, session: requests.Session) -> pathlib.Path:
try:
response = session.get(url)
if response.status_code == 200:
file_path = temp_dir / filename

try:
text = response.content.decode('utf-8')
with open(file_path, 'w', encoding='utf-8') as f:
f.write(text)
except UnicodeDecodeError:
with open(file_path, 'wb') as f:
f.write(response.content)
return pathlib.Path(file_path)
else:
print(f"Failed to download {filename}: {response.status_code}")
return None
except Exception as e:
print(f"Error downloading {filename}: {e}")
return None
29 changes: 29 additions & 0 deletions src/hermes/commands/harvest/util/token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# SPDX-FileCopyrightText: 2025 OFFIS e.V.
#
# SPDX-License-Identifier: Apache-2.0

# SPDX-FileContributor: Stephan Ferenz
# SPDX-FileContributor: Aida Jafarbigloo

import toml
import base64


def load_token_from_toml(config_path: str = "hermes.toml") -> str:
"""
Loads and decodes the token from the HERMES TOML configuration file.

Args:
config_path (str): Path to the TOML config file.

Returns:
str: The decoded token.
"""
with open(config_path, "r") as f:
config = toml.load(f)

encoded_token = config.get('harvest', {}).get('token')
if encoded_token:
return base64.b64decode(encoded_token.encode()).decode()
else:
return None