diff --git a/CHANGELOG.rst b/CHANGELOG.rst index eeb62779..94c1d735 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -30,6 +30,9 @@ Changed - ``parse_optionals_as_positionals`` and ``applied_instantiation_links`` no longer marked as experimental (`#788 `__). +- Moved location of ``Path`` class from ``jsonargparse`` to + ``jsonargparse.typing`` (`#792 + `__). v4.42.0 (2025-10-14) diff --git a/jsonargparse/__init__.py b/jsonargparse/__init__.py index cab69aea..53ad4f05 100644 --- a/jsonargparse/__init__.py +++ b/jsonargparse/__init__.py @@ -20,6 +20,7 @@ from ._loaders_dumpers import * # noqa: F403 from ._namespace import * # noqa: F403 from ._optionals import * # noqa: F403 +from ._paths import Path # noqa: F401 from ._signatures import * # noqa: F403 from ._typehints import * # noqa: F403 from ._util import * # noqa: F403 diff --git a/jsonargparse/_actions.py b/jsonargparse/_actions.py index 3b1a975b..84e8ed63 100644 --- a/jsonargparse/_actions.py +++ b/jsonargparse/_actions.py @@ -13,11 +13,11 @@ from ._loaders_dumpers import get_loader_exceptions, load_value from ._namespace import Namespace, NSKeyError, split_key, split_key_root from ._optionals import _get_config_read_mode, ruamel_support +from ._paths import change_to_path_dir from ._type_checking import ActionsContainer, ArgumentParser from ._util import ( Path, argument_error, - change_to_path_dir, default_config_option_help, get_import_path, import_object, diff --git a/jsonargparse/_core.py b/jsonargparse/_core.py index 68182f8a..41d9d73c 100644 --- a/jsonargparse/_core.py +++ b/jsonargparse/_core.py @@ -85,13 +85,13 @@ pyyaml_available, ) from ._parameter_resolvers import UnknownDefault +from ._paths import change_to_path_dir from ._signatures import SignatureArguments from ._typehints import ActionTypeHint, is_subclass_spec from ._util import ( ClassType, Path, argument_error, - change_to_path_dir, get_argument_group_class, get_private_kwargs, identity, diff --git a/jsonargparse/_paths.py b/jsonargparse/_paths.py new file mode 100644 index 00000000..77e37c22 --- /dev/null +++ b/jsonargparse/_paths.py @@ -0,0 +1,385 @@ +import os +import re +import stat +import sys +from collections import Counter +from contextlib import contextmanager +from contextvars import ContextVar +from dataclasses import dataclass +from io import StringIO +from typing import IO, Any, Iterator, Optional, Union + +from ._deprecated import PathDeprecations +from ._optionals import ( + fsspec_support, + import_fsspec, + import_requests, + url_support, +) + +_current_path_dir: ContextVar[Optional[str]] = ContextVar("_current_path_dir", default=None) + + +class _CachedStdin(StringIO): + """Used to allow reading sys.stdin multiple times.""" + + +def _get_cached_stdin() -> _CachedStdin: + if not isinstance(sys.stdin, _CachedStdin): + sys.stdin = _CachedStdin(sys.stdin.read()) + return sys.stdin + + +def _read_cached_stdin() -> str: + stdin = _get_cached_stdin() + value = stdin.read() + stdin.seek(0) + return value + + +@dataclass +class _UrlData: + scheme: str + url_path: str + + +def _parse_url(url: str) -> Optional[_UrlData]: + index = url.rfind("://") + if index <= 0: + return None + return _UrlData( + scheme=url[: index + 3], + url_path=url[index + 3 :], + ) + + +def _is_absolute_path(path: str) -> bool: + if path.find("://") > 0: + return True + return os.path.isabs(path) + + +def _resolve_relative_path(path: str) -> str: + parts = path.split("/") + resolved: list[str] = [] + for part in parts: + if part == "..": + resolved.pop() + elif part != ".": + resolved.append(part) + return "/".join(resolved) + + +def _known_to_fsspec(path: str) -> bool: + import_fsspec("_known_to_fsspec") + from fsspec.registry import known_implementations + + for protocol in known_implementations: + if path.startswith(protocol + "://") or path.startswith(protocol + "::"): + return True + return False + + +class PathError(TypeError): + """Exception raised for errors in the Path class.""" + + +class Path(PathDeprecations): + """Stores a (possibly relative) path and the corresponding absolute path. + + The absolute path can be obtained without having to remember the working + directory (or parent remote path) from when the object was created. + + When a Path instance is created, it is checked that: the path exists, + whether it is a file or directory and whether it has the required access + permissions (f=file, d=directory, r=readable, w=writeable, x=executable, + c=creatable, u=url, s=fsspec or in uppercase meaning not, i.e., F=not-file, + D=not-directory, R=not-readable, W=not-writeable and X=not-executable). + + The creatable flag "c" can be given one or two times. If give once, the + parent directory must exist and be writeable. If given twice, the parent + directory does not have to exist, but should be allowed to create. + + An instance of Path class can also refer to the standard input or output. + To do that, path must be set with the value "-"; it is a common practice. + Then, getting the content or opening it will automatically be done on + standard input or output. + """ + + _url_data: Optional[_UrlData] + _file_scheme = re.compile("^file:///?") + + def __init__( + self, + path: Union[str, os.PathLike, "Path"], + mode: str = "fr", + cwd: Optional[Union[str, os.PathLike]] = None, + **kwargs, + ): + """Initializer for Path instance. + + Args: + path: The path to check and store. + mode: The required type and access permissions among [fdrwxcuFDRWX]. + cwd: Working directory for relative paths. If None, os.getcwd() is used. + + Raises: + ValueError: If the provided mode is invalid. + PathError: If the path does not exist or does not agree with the mode. + """ + self._deprecated_kwargs(kwargs) + self._check_mode(mode) + self._std_io = False + + is_url = False + is_fsspec = False + if isinstance(path, Path): + self._std_io = path._std_io + is_url = path.is_url + is_fsspec = path.is_fsspec + url_data = path._url_data + cwd = path.cwd + abs_path = path.absolute + path = path.relative + elif isinstance(path, (str, os.PathLike)): + if path == "-": + self._std_io = True + path = os.fspath(path) + cwd = os.fspath(cwd) if cwd else None + abs_path = os.path.expanduser(path) + if self._file_scheme.match(abs_path): + abs_path = self._file_scheme.sub("" if os.name == "nt" else "/", abs_path) + is_absolute = _is_absolute_path(abs_path) + url_data = _parse_url(abs_path) + cwd_url_data = _parse_url(cwd or _current_path_dir.get() or os.getcwd()) + if ("u" in mode or "s" in mode) and (url_data or (cwd_url_data and not is_absolute)): + if cwd_url_data and not is_absolute: + abs_path = _resolve_relative_path(cwd_url_data.url_path + "/" + path) + abs_path = cwd_url_data.scheme + abs_path + url_data = _parse_url(abs_path) + if cwd is None: + cwd = _current_path_dir.get() or os.getcwd() + if "u" in mode and url_support: + is_url = True + elif "s" in mode and fsspec_support and _known_to_fsspec(abs_path): + is_fsspec = True + else: + if cwd is None: + cwd = os.getcwd() + abs_path = abs_path if is_absolute else os.path.join(cwd, abs_path) + url_data = None + else: + raise PathError("Expected path to be a string, os.PathLike or a Path object.") + + if not self._skip_check and is_url: + if "r" in mode: + requests = import_requests("Path with URL support") + try: + requests.head(abs_path).raise_for_status() + except requests.HTTPError as ex: + raise PathError(f"{abs_path} HEAD not accessible :: {ex}") from ex + elif not self._skip_check and is_fsspec: + fsspec_mode = "".join(c for c in mode if c in {"r", "w"}) + if fsspec_mode: + fsspec = import_fsspec("Path") + try: + handle = fsspec.open(abs_path, fsspec_mode) + handle.open() + handle.close() + except (FileNotFoundError, KeyError) as ex: + raise PathError(f"Path does not exist: {abs_path!r}") from ex + except PermissionError as ex: + raise PathError(f"Path exists but no permission to access: {abs_path!r}") from ex + elif not self._skip_check and not self._std_io: + ptype = "Directory" if "d" in mode else "File" + if "c" in mode: + pdir = os.path.realpath(os.path.join(abs_path, "..")) + if not os.path.isdir(pdir) and mode.count("c") == 2: + ppdir = None + while not os.path.isdir(pdir) and pdir != ppdir: + ppdir = pdir + pdir = os.path.realpath(os.path.join(pdir, "..")) + if not os.path.isdir(pdir): + raise PathError(f"{ptype} is not creatable since parent directory does not exist: {abs_path!r}") + if not os.access(pdir, os.W_OK): + raise PathError(f"{ptype} is not creatable since parent directory not writeable: {abs_path!r}") + if "d" in mode and os.access(abs_path, os.F_OK) and not os.path.isdir(abs_path): + raise PathError(f"{ptype} is not creatable since path already exists: {abs_path!r}") + if "f" in mode and os.access(abs_path, os.F_OK) and not os.path.isfile(abs_path): + raise PathError(f"{ptype} is not creatable since path already exists: {abs_path!r}") + elif "d" in mode or "f" in mode: + if not os.access(abs_path, os.F_OK): + raise PathError(f"{ptype} does not exist: {abs_path!r}") + if "d" in mode and not os.path.isdir(abs_path): + raise PathError(f"Path is not a directory: {abs_path!r}") + if "f" in mode and not (os.path.isfile(abs_path) or stat.S_ISFIFO(os.stat(abs_path).st_mode)): + raise PathError(f"Path is not a file: {abs_path!r}") + + if "r" in mode and not os.access(abs_path, os.R_OK): + raise PathError(f"{ptype} is not readable: {abs_path!r}") + if "w" in mode and not os.access(abs_path, os.W_OK): + raise PathError(f"{ptype} is not writeable: {abs_path!r}") + if "x" in mode and not os.access(abs_path, os.X_OK): + raise PathError(f"{ptype} is not executable: {abs_path!r}") + if "D" in mode and os.path.isdir(abs_path): + raise PathError(f"Path is a directory: {abs_path!r}") + if "F" in mode and (os.path.isfile(abs_path) or stat.S_ISFIFO(os.stat(abs_path).st_mode)): + raise PathError(f"Path is a file: {abs_path!r}") + if "R" in mode and os.access(abs_path, os.R_OK): + raise PathError(f"{ptype} is readable: {abs_path!r}") + if "W" in mode and os.access(abs_path, os.W_OK): + raise PathError(f"{ptype} is writeable: {abs_path!r}") + if "X" in mode and os.access(abs_path, os.X_OK): + raise PathError(f"{ptype} is executable: {abs_path!r}") + + self._relative = path + self._absolute = abs_path + self._cwd = cwd + self._mode = mode + self._is_url = is_url + self._is_fsspec = is_fsspec + self._url_data = url_data + + @property + def relative(self) -> str: + """Returns the relative representation of the path (how the path was given on instance creation).""" + return self._relative + + @property + def absolute(self) -> str: + """Returns the absolute representation of the path.""" + return self._absolute + + @property + def mode(self) -> str: + return self._mode + + @property + def is_url(self) -> bool: + return self._is_url + + @property + def is_fsspec(self) -> bool: + return self._is_fsspec + + def __str__(self): + return self._relative + + def __repr__(self): + name = "Path_" + self._mode + name = self._repr_skip_check(name) + cwd = "" + if self._relative != self._absolute: + cwd = ", cwd=" + self._cwd + return f"{name}({self._relative}{cwd})" + + def __fspath__(self) -> str: + return self._absolute + + def __eq__(self, other: Any) -> bool: + if isinstance(other, Path): + return self._absolute == other._absolute + elif isinstance(other, str): + return str(self) == other + return False + + def __call__(self, absolute: bool = True) -> str: + """Returns the path as a string. + + Args: + absolute: If false returns the original path given, otherwise the corresponding absolute path. + """ + return self._absolute if absolute else self._relative + + def get_content(self, mode: str = "r") -> str: + """Returns the contents of the file or the remote path.""" + if self._std_io: + return _read_cached_stdin() + elif self._is_url: + assert mode == "r" + requests = import_requests("Path.get_content") + response = requests.get(self._absolute) + response.raise_for_status() + return response.text + elif self._is_fsspec: + fsspec = import_fsspec("Path.get_content") + with fsspec.open(self._absolute, mode) as handle: + with handle as input_file: + return input_file.read() + else: + with open(self._absolute, mode) as input_file: + return input_file.read() + + @contextmanager + def open(self, mode: str = "r") -> Iterator[IO]: + """Return an opened file object for the path.""" + if self._std_io: + if "r" in mode: + yield _get_cached_stdin() + elif "w" in mode: + yield sys.stdout + elif self._is_url: + yield StringIO(self.get_content()) + elif self._is_fsspec: + fsspec = import_fsspec("Path.open") + with fsspec.open(self._absolute, mode) as handle: + yield handle + else: + with open(self._absolute, mode) as handle: + yield handle + + @contextmanager + def relative_path_context(self) -> Iterator[str]: + """Context manager to use this path's parent (directory or URL) for relative paths defined within.""" + with change_to_path_dir(self) as path_dir: + assert isinstance(path_dir, str) + yield path_dir + + @staticmethod + def _check_mode(mode: str): + if not isinstance(mode, str): + raise ValueError("Expected mode to be a string.") + if len(set(mode) - set("fdrwxcusFDRWX")) > 0: + raise ValueError("Expected mode to only include [fdrwxcusFDRWX] flags.") + for flag, count in Counter(mode).items(): + if count > (2 if flag == "c" else 1): + raise ValueError(f'Too many occurrences ({count}) for flag "{flag}".') + if "f" in mode and "d" in mode: + raise ValueError('Both modes "f" and "d" not possible.') + if "u" in mode and "d" in mode: + raise ValueError('Both modes "d" and "u" not possible.') + if "s" in mode and "d" in mode: + raise ValueError('Both modes "d" and "s" not possible.') + + +@contextmanager +def change_to_path_dir(path: Optional[Union[Path, str]]) -> Iterator[Optional[str]]: + """A context manager for running code in the directory of a path.""" + path_dir = _current_path_dir.get() + chdir: Union[bool, str] = False + if path is not None: + if isinstance(path, str): + path = Path(path, mode="d") + if path._url_data and (path.is_url or path.is_fsspec): + scheme = path._url_data.scheme + path_dir = path._url_data.url_path + else: + scheme = "" + path_dir = path.absolute + chdir = True + if "d" not in path.mode: + path_dir = os.path.dirname(path_dir) + path_dir = scheme + path_dir + + token = _current_path_dir.set(path_dir) + if chdir and path_dir: + chdir = os.getcwd() + path_dir = os.path.abspath(path_dir) + os.chdir(path_dir) + + try: + yield path_dir + finally: + _current_path_dir.reset(token) + if chdir: + os.chdir(chdir) diff --git a/jsonargparse/_typehints.py b/jsonargparse/_typehints.py index fa3b5ae7..54e087eb 100644 --- a/jsonargparse/_typehints.py +++ b/jsonargparse/_typehints.py @@ -77,13 +77,11 @@ typing_extensions_import, validate_annotated, ) +from ._paths import Path, PathError, change_to_path_dir from ._util import ( ClassType, NestedArg, NoneType, - Path, - PathError, - change_to_path_dir, get_import_path, get_typehint_origin, import_object, diff --git a/jsonargparse/_util.py b/jsonargparse/_util.py index 2e4d3405..9fc6800b 100644 --- a/jsonargparse/_util.py +++ b/jsonargparse/_util.py @@ -2,26 +2,16 @@ import inspect import os -import re -import stat -import sys import textwrap import warnings from argparse import ArgumentError -from collections import Counter, namedtuple -from contextlib import contextmanager -from contextvars import ContextVar -from dataclasses import dataclass +from collections import namedtuple from functools import wraps from importlib import import_module -from io import StringIO from types import BuiltinFunctionType, FunctionType, ModuleType from typing import ( - IO, Any, Callable, - Iterator, - List, Optional, Tuple, Type, @@ -36,21 +26,14 @@ parser_capture, parser_context, ) -from ._deprecated import PathDeprecations from ._loaders_dumpers import json_compact_dump, load_value -from ._optionals import ( - _get_config_read_mode, - fsspec_support, - import_fsspec, - import_requests, - url_support, -) +from ._optionals import _get_config_read_mode +from ._paths import Path from ._type_checking import ArgumentParser __all__ = [ "capture_parser", "class_from_function", - "Path", "register_unresolvable_import_paths", ] @@ -61,12 +44,6 @@ default_config_option_help = "Path to a configuration file." -@dataclass -class UrlData: - scheme: str - url_path: str - - def argument_error(message: str) -> ArgumentError: return ArgumentError(None, message) @@ -126,7 +103,7 @@ def identity(value): def parse_value_or_config( value: Any, enable_path: bool = True, simple_types: bool = False -) -> Tuple[Any, Optional["Path"]]: +) -> Tuple[Any, Optional[Path]]: """Parses yaml/json config in a string or a path""" nested_arg: Union[bool, NestedArg] = False if isinstance(value, NestedArg): @@ -152,23 +129,6 @@ def parse_value_or_config( return value, cfg_path -class CachedStdin(StringIO): - """Used to allow reading sys.stdin multiple times.""" - - -def get_cached_stdin() -> CachedStdin: - if not isinstance(sys.stdin, CachedStdin): - sys.stdin = CachedStdin(sys.stdin.read()) - return sys.stdin - - -def read_cached_stdin() -> str: - stdin = get_cached_stdin() - value = stdin.read() - stdin.seek(0) - return value - - def import_object(name: str): """Returns an object in a module given its dot import path.""" if not isinstance(name, str) or "." not in name: @@ -278,42 +238,6 @@ def get_typehint_origin(typehint): return getattr(typehint, "__origin__", None) -current_path_dir: ContextVar[Optional[str]] = ContextVar("current_path_dir", default=None) - - -@contextmanager -def change_to_path_dir(path: Optional[Union["Path", str]]) -> Iterator[Optional[str]]: - """A context manager for running code in the directory of a path.""" - path_dir = current_path_dir.get() - chdir: Union[bool, str] = False - if path is not None: - if isinstance(path, str): - path = Path(path, mode="d") - if path._url_data and (path.is_url or path.is_fsspec): - scheme = path._url_data.scheme - path_dir = path._url_data.url_path - else: - scheme = "" - path_dir = path.absolute - chdir = True - if "d" not in path.mode: - path_dir = os.path.dirname(path_dir) - path_dir = scheme + path_dir - - token = current_path_dir.set(path_dir) - if chdir and path_dir: - chdir = os.getcwd() - path_dir = os.path.abspath(path_dir) - os.chdir(path_dir) - - try: - yield path_dir - finally: - current_path_dir.reset(token) - if chdir: - os.chdir(chdir) - - def hash_item(item): try: if isinstance(item, (dict, list)): @@ -359,16 +283,6 @@ def get_private_kwargs(data, **kwargs): return extracted[0] if len(extracted) == 1 else extracted -def known_to_fsspec(path: str) -> bool: - import_fsspec("known_to_fsspec") - from fsspec.registry import known_implementations - - for protocol in known_implementations: - if path.startswith(protocol + "://") or path.startswith(protocol + "::"): - return True - return False - - class ClassFromFunctionBase: wrapped_function: Callable @@ -431,33 +345,6 @@ class ClassFromFunction(func_return, ClassFromFunctionBase): # type: ignore[val return ClassFromFunction -def parse_url(url: str) -> Optional[UrlData]: - index = url.rfind("://") - if index <= 0: - return None - return UrlData( - scheme=url[: index + 3], - url_path=url[index + 3 :], - ) - - -def is_absolute_path(path: str) -> bool: - if path.find("://") > 0: - return True - return os.path.isabs(path) - - -def resolve_relative_path(path: str) -> str: - parts = path.split("/") - resolved: List[str] = [] - for part in parts: - if part == "..": - resolved.pop() - elif part != ".": - resolved.append(part) - return "/".join(resolved) - - def get_argument_group_class(parser): import ast @@ -481,275 +368,3 @@ def get_argument_group_class(parser): f"Failed to create ArgumentGroup subclass based on {parser.__class__.__name__}: {ex}", exc_info=ex ) return ArgumentGroup - - -class PathError(TypeError): - """Exception raised for errors in the Path class.""" - - -class Path(PathDeprecations): - """Stores a (possibly relative) path and the corresponding absolute path. - - The absolute path can be obtained without having to remember the working - directory (or parent remote path) from when the object was created. - - When a Path instance is created, it is checked that: the path exists, - whether it is a file or directory and whether it has the required access - permissions (f=file, d=directory, r=readable, w=writeable, x=executable, - c=creatable, u=url, s=fsspec or in uppercase meaning not, i.e., F=not-file, - D=not-directory, R=not-readable, W=not-writeable and X=not-executable). - - The creatable flag "c" can be given one or two times. If give once, the - parent directory must exist and be writeable. If given twice, the parent - directory does not have to exist, but should be allowed to create. - - An instance of Path class can also refer to the standard input or output. - To do that, path must be set with the value "-"; it is a common practice. - Then, getting the content or opening it will automatically be done on - standard input or output. - """ - - _url_data: Optional[UrlData] - _file_scheme = re.compile("^file:///?") - - def __init__( - self, - path: Union[str, os.PathLike, "Path"], - mode: str = "fr", - cwd: Optional[Union[str, os.PathLike]] = None, - **kwargs, - ): - """Initializer for Path instance. - - Args: - path: The path to check and store. - mode: The required type and access permissions among [fdrwxcuFDRWX]. - cwd: Working directory for relative paths. If None, os.getcwd() is used. - - Raises: - ValueError: If the provided mode is invalid. - PathError: If the path does not exist or does not agree with the mode. - """ - self._deprecated_kwargs(kwargs) - self._check_mode(mode) - self._std_io = False - - is_url = False - is_fsspec = False - if isinstance(path, Path): - self._std_io = path._std_io - is_url = path.is_url - is_fsspec = path.is_fsspec - url_data = path._url_data - cwd = path.cwd - abs_path = path.absolute - path = path.relative - elif isinstance(path, (str, os.PathLike)): - if path == "-": - self._std_io = True - path = os.fspath(path) - cwd = os.fspath(cwd) if cwd else None - abs_path = os.path.expanduser(path) - if self._file_scheme.match(abs_path): - abs_path = self._file_scheme.sub("" if os.name == "nt" else "/", abs_path) - is_absolute = is_absolute_path(abs_path) - url_data = parse_url(abs_path) - cwd_url_data = parse_url(cwd or current_path_dir.get() or os.getcwd()) - if ("u" in mode or "s" in mode) and (url_data or (cwd_url_data and not is_absolute)): - if cwd_url_data and not is_absolute: - abs_path = resolve_relative_path(cwd_url_data.url_path + "/" + path) - abs_path = cwd_url_data.scheme + abs_path - url_data = parse_url(abs_path) - if cwd is None: - cwd = current_path_dir.get() or os.getcwd() - if "u" in mode and url_support: - is_url = True - elif "s" in mode and fsspec_support and known_to_fsspec(abs_path): - is_fsspec = True - else: - if cwd is None: - cwd = os.getcwd() - abs_path = abs_path if is_absolute else os.path.join(cwd, abs_path) - url_data = None - else: - raise PathError("Expected path to be a string, os.PathLike or a Path object.") - - if not self._skip_check and is_url: - if "r" in mode: - requests = import_requests("Path with URL support") - try: - requests.head(abs_path).raise_for_status() - except requests.HTTPError as ex: - raise PathError(f"{abs_path} HEAD not accessible :: {ex}") from ex - elif not self._skip_check and is_fsspec: - fsspec_mode = "".join(c for c in mode if c in {"r", "w"}) - if fsspec_mode: - fsspec = import_fsspec("Path") - try: - handle = fsspec.open(abs_path, fsspec_mode) - handle.open() - handle.close() - except (FileNotFoundError, KeyError) as ex: - raise PathError(f"Path does not exist: {abs_path!r}") from ex - except PermissionError as ex: - raise PathError(f"Path exists but no permission to access: {abs_path!r}") from ex - elif not self._skip_check and not self._std_io: - ptype = "Directory" if "d" in mode else "File" - if "c" in mode: - pdir = os.path.realpath(os.path.join(abs_path, "..")) - if not os.path.isdir(pdir) and mode.count("c") == 2: - ppdir = None - while not os.path.isdir(pdir) and pdir != ppdir: - ppdir = pdir - pdir = os.path.realpath(os.path.join(pdir, "..")) - if not os.path.isdir(pdir): - raise PathError(f"{ptype} is not creatable since parent directory does not exist: {abs_path!r}") - if not os.access(pdir, os.W_OK): - raise PathError(f"{ptype} is not creatable since parent directory not writeable: {abs_path!r}") - if "d" in mode and os.access(abs_path, os.F_OK) and not os.path.isdir(abs_path): - raise PathError(f"{ptype} is not creatable since path already exists: {abs_path!r}") - if "f" in mode and os.access(abs_path, os.F_OK) and not os.path.isfile(abs_path): - raise PathError(f"{ptype} is not creatable since path already exists: {abs_path!r}") - elif "d" in mode or "f" in mode: - if not os.access(abs_path, os.F_OK): - raise PathError(f"{ptype} does not exist: {abs_path!r}") - if "d" in mode and not os.path.isdir(abs_path): - raise PathError(f"Path is not a directory: {abs_path!r}") - if "f" in mode and not (os.path.isfile(abs_path) or stat.S_ISFIFO(os.stat(abs_path).st_mode)): - raise PathError(f"Path is not a file: {abs_path!r}") - - if "r" in mode and not os.access(abs_path, os.R_OK): - raise PathError(f"{ptype} is not readable: {abs_path!r}") - if "w" in mode and not os.access(abs_path, os.W_OK): - raise PathError(f"{ptype} is not writeable: {abs_path!r}") - if "x" in mode and not os.access(abs_path, os.X_OK): - raise PathError(f"{ptype} is not executable: {abs_path!r}") - if "D" in mode and os.path.isdir(abs_path): - raise PathError(f"Path is a directory: {abs_path!r}") - if "F" in mode and (os.path.isfile(abs_path) or stat.S_ISFIFO(os.stat(abs_path).st_mode)): - raise PathError(f"Path is a file: {abs_path!r}") - if "R" in mode and os.access(abs_path, os.R_OK): - raise PathError(f"{ptype} is readable: {abs_path!r}") - if "W" in mode and os.access(abs_path, os.W_OK): - raise PathError(f"{ptype} is writeable: {abs_path!r}") - if "X" in mode and os.access(abs_path, os.X_OK): - raise PathError(f"{ptype} is executable: {abs_path!r}") - - self._relative = path - self._absolute = abs_path - self._cwd = cwd - self._mode = mode - self._is_url = is_url - self._is_fsspec = is_fsspec - self._url_data = url_data - - @property - def relative(self) -> str: - """Returns the relative representation of the path (how the path was given on instance creation).""" - return self._relative - - @property - def absolute(self) -> str: - """Returns the absolute representation of the path.""" - return self._absolute - - @property - def mode(self) -> str: - return self._mode - - @property - def is_url(self) -> bool: - return self._is_url - - @property - def is_fsspec(self) -> bool: - return self._is_fsspec - - def __str__(self): - return self._relative - - def __repr__(self): - name = "Path_" + self._mode - name = self._repr_skip_check(name) - cwd = "" - if self._relative != self._absolute: - cwd = ", cwd=" + self._cwd - return f"{name}({self._relative}{cwd})" - - def __fspath__(self) -> str: - return self._absolute - - def __eq__(self, other: Any) -> bool: - if isinstance(other, Path): - return self._absolute == other._absolute - elif isinstance(other, str): - return str(self) == other - return False - - def __call__(self, absolute: bool = True) -> str: - """Returns the path as a string. - - Args: - absolute: If false returns the original path given, otherwise the corresponding absolute path. - """ - return self._absolute if absolute else self._relative - - def get_content(self, mode: str = "r") -> str: - """Returns the contents of the file or the remote path.""" - if self._std_io: - return read_cached_stdin() - elif self._is_url: - assert mode == "r" - requests = import_requests("Path.get_content") - response = requests.get(self._absolute) - response.raise_for_status() - return response.text - elif self._is_fsspec: - fsspec = import_fsspec("Path.get_content") - with fsspec.open(self._absolute, mode) as handle: - with handle as input_file: - return input_file.read() - else: - with open(self._absolute, mode) as input_file: - return input_file.read() - - @contextmanager - def open(self, mode: str = "r") -> Iterator[IO]: - """Return an opened file object for the path.""" - if self._std_io: - if "r" in mode: - yield get_cached_stdin() - elif "w" in mode: - yield sys.stdout - elif self._is_url: - yield StringIO(self.get_content()) - elif self._is_fsspec: - fsspec = import_fsspec("Path.open") - with fsspec.open(self._absolute, mode) as handle: - yield handle - else: - with open(self._absolute, mode) as handle: - yield handle - - @contextmanager - def relative_path_context(self) -> Iterator[str]: - """Context manager to use this path's parent (directory or URL) for relative paths defined within.""" - with change_to_path_dir(self) as path_dir: - assert isinstance(path_dir, str) - yield path_dir - - @staticmethod - def _check_mode(mode: str): - if not isinstance(mode, str): - raise ValueError("Expected mode to be a string.") - if len(set(mode) - set("fdrwxcusFDRWX")) > 0: - raise ValueError("Expected mode to only include [fdrwxcusFDRWX] flags.") - for flag, count in Counter(mode).items(): - if count > (2 if flag == "c" else 1): - raise ValueError(f'Too many occurrences ({count}) for flag "{flag}".') - if "f" in mode and "d" in mode: - raise ValueError('Both modes "f" and "d" not possible.') - if "u" in mode and "d" in mode: - raise ValueError('Both modes "d" and "u" not possible.') - if "s" in mode and "d" in mode: - raise ValueError('Both modes "d" and "s" not possible.') diff --git a/jsonargparse/typing.py b/jsonargparse/typing.py index 05704d14..c46efccd 100644 --- a/jsonargparse/typing.py +++ b/jsonargparse/typing.py @@ -15,7 +15,8 @@ from ._common import is_final_class, path_dump_preserve_relative from ._optionals import final, pydantic_support -from ._util import Path, change_to_path_dir, get_import_path, get_private_kwargs, import_object +from ._paths import Path, change_to_path_dir +from ._util import get_import_path, get_private_kwargs, import_object __all__ = [ "final", @@ -33,6 +34,7 @@ "OpenUnitInterval", "NotEmptyStr", "Email", + "Path", "Path_fr", "Path_fc", "Path_dw", diff --git a/jsonargparse_tests/test_deprecated.py b/jsonargparse_tests/test_deprecated.py index 87e9b223..d2ec1cfd 100644 --- a/jsonargparse_tests/test_deprecated.py +++ b/jsonargparse_tests/test_deprecated.py @@ -18,7 +18,6 @@ ArgumentError, ArgumentParser, Namespace, - Path, get_config_read_mode, set_config_read_mode, set_docstring_parse_options, @@ -48,6 +47,7 @@ url_support, ) from jsonargparse._util import argument_error +from jsonargparse.typing import Path from jsonargparse_tests.conftest import ( get_parser_help, is_posix, diff --git a/jsonargparse_tests/test_paths.py b/jsonargparse_tests/test_paths.py index e191552f..29e1f235 100644 --- a/jsonargparse_tests/test_paths.py +++ b/jsonargparse_tests/test_paths.py @@ -2,23 +2,46 @@ import json import os +import pathlib +import stat +import zipfile from calendar import Calendar -from pathlib import Path +from io import StringIO from typing import Any, Dict, List, Optional, Union +from unittest.mock import patch import pytest from jsonargparse import ArgumentError, Namespace -from jsonargparse.typing import Path_drw, Path_fc, Path_fr, path_type -from jsonargparse_tests.conftest import get_parser_help, json_or_yaml_dump, json_or_yaml_load +from jsonargparse._optionals import fsspec_support, url_support +from jsonargparse._paths import _current_path_dir, _parse_url +from jsonargparse.typing import Path, Path_drw, Path_fc, Path_fr, path_type +from jsonargparse_tests.conftest import ( + get_parser_help, + is_posix, + json_or_yaml_dump, + json_or_yaml_load, + responses_activate, + responses_available, + skip_if_fsspec_unavailable, + skip_if_requests_unavailable, + skip_if_responses_unavailable, + skip_if_running_as_root, +) + +if responses_available: + import responses +if fsspec_support: + import fsspec + # stdlib path types tests def test_pathlib_path(parser, file_r): - parser.add_argument("--path", type=Path) + parser.add_argument("--path", type=pathlib.Path) cfg = parser.parse_args([f"--path={file_r}"]) - assert isinstance(cfg.path, Path) + assert isinstance(cfg.path, pathlib.Path) assert str(cfg.path) == file_r assert json_or_yaml_load(parser.dump(cfg)) == {"path": "file_r"} @@ -28,7 +51,394 @@ def test_os_pathlike(parser, file_r): assert file_r == parser.parse_args([f"--path={file_r}"]).path -# jsonargparse path types tests +# base path tests + + +@pytest.fixture(scope="module") +def paths(tmp_path_factory): + cwd = os.getcwd() + tmp_path = tmp_path_factory.mktemp("paths_fixture") + os.chdir(tmp_path) + + try: + paths = Namespace() + paths.tmp_path = tmp_path + paths.file_rw = file_rw = pathlib.Path("file_rw") + paths.file_r = file_r = pathlib.Path("file_r") + paths.file_ = file_ = pathlib.Path("file_") + paths.dir_rwx = dir_rwx = pathlib.Path("dir_rwx") + paths.dir_rx = dir_rx = pathlib.Path("dir_rx") + paths.dir_x = dir_x = pathlib.Path("dir_x") + paths.dir_file_rx = dir_file_rx = dir_x / "file_rx" + + file_r.write_text("file contents") + file_rw.touch() + file_.touch() + dir_rwx.mkdir() + dir_rx.mkdir() + dir_x.mkdir() + dir_file_rx.touch() + + file_rw.chmod(stat.S_IREAD | stat.S_IWRITE) + file_r.chmod(stat.S_IREAD) + file_.chmod(0) + dir_file_rx.chmod(stat.S_IREAD | stat.S_IEXEC) + dir_rwx.chmod(stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) + dir_rx.chmod(stat.S_IREAD | stat.S_IEXEC) + dir_x.chmod(stat.S_IEXEC) + + yield paths + finally: + dir_x.chmod(stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) + os.chdir(cwd) + + +def test_path_init(paths): + path1 = Path(paths.file_rw, "frw") + path2 = Path(path1) + assert path1.cwd == path2.cwd + assert path1.absolute == path2.absolute + assert path1.relative == path2.relative + assert path1.is_url == path2.is_url + + +def test_path_init_failures(paths): + pytest.raises(TypeError, lambda: Path(True)) + pytest.raises(ValueError, lambda: Path(paths.file_rw, "-")) + pytest.raises(ValueError, lambda: Path(paths.file_rw, "frr")) + + +def test_path_cwd(paths): + path = Path("file_rx", mode="fr", cwd=(paths.tmp_path / paths.dir_x)) + assert path.cwd == Path("file_rx", mode="fr", cwd=path.cwd).cwd + + +def test_path_empty_mode(paths): + path = Path("does_not_exist", "") + assert path() == str(paths.tmp_path / "does_not_exist") + + +def test_path_pathlike(paths): + path = Path(paths.file_rw) + assert isinstance(path, os.PathLike) + assert os.fspath(path) == str(paths.tmp_path / paths.file_rw) + assert os.path.dirname(path) == str(paths.tmp_path) + + +def test_path_equality_operator(paths): + path1 = Path(paths.file_rw) + path2 = Path(paths.tmp_path / paths.file_rw) + assert path1 == path2 + assert Path("123", "fc") != 123 + + +@skip_if_running_as_root +def test_path_file_access_mode(paths): + Path(paths.file_rw, "frw") + Path(paths.file_r, "fr") + Path(paths.file_, "f") + Path(paths.dir_file_rx, "fr") + if is_posix: + pytest.raises(TypeError, lambda: Path(paths.file_rw, "fx")) + pytest.raises(TypeError, lambda: Path(paths.file_, "fr")) + pytest.raises(TypeError, lambda: Path(paths.file_r, "fw")) + pytest.raises(TypeError, lambda: Path(paths.dir_file_rx, "fw")) + pytest.raises(TypeError, lambda: Path(paths.dir_rx, "fr")) + pytest.raises(TypeError, lambda: Path("file_ne", "fr")) + + +@skip_if_running_as_root +def test_path_dir_access_mode(paths): + Path(paths.dir_rwx, "drwx") + Path(paths.dir_rx, "drx") + Path(paths.dir_x, "dx") + if is_posix: + pytest.raises(TypeError, lambda: Path(paths.dir_rx, "dw")) + pytest.raises(TypeError, lambda: Path(paths.dir_x, "dr")) + pytest.raises(TypeError, lambda: Path(paths.file_r, "dr")) + + +def test_path_get_content(paths): + assert "file contents" == Path(paths.file_r, "fr").get_content() + assert "file contents" == Path(f"file://{paths.tmp_path}/{paths.file_r}", "fr").get_content() + assert "file contents" == Path(f"file://{paths.tmp_path}/{paths.file_r}", "ur").get_content() + + +@skip_if_running_as_root +def test_path_create_mode(paths): + Path(paths.file_rw, "fcrw") + Path(paths.tmp_path / "file_c", "fc") + Path(paths.tmp_path / "not_existing_dir" / "file_c", "fcc") + Path(paths.dir_rwx, "dcrwx") + Path(paths.tmp_path / "dir_c", "dc") + if is_posix: + pytest.raises(TypeError, lambda: Path(paths.dir_rx / "file_c", "fc")) + pytest.raises(TypeError, lambda: Path(paths.dir_rx / "dir_c", "dc")) + pytest.raises(TypeError, lambda: Path(paths.dir_rx / "not_existing_dir" / "file_c", "fcc")) + pytest.raises(TypeError, lambda: Path(paths.file_rw, "dc")) + pytest.raises(TypeError, lambda: Path(paths.dir_rwx, "fc")) + pytest.raises(TypeError, lambda: Path(paths.dir_rwx / "ne" / "file_c", "fc")) + + +def test_path_complement_modes(paths): + pytest.raises(TypeError, lambda: Path(paths.file_rw, "fW")) + pytest.raises(TypeError, lambda: Path(paths.file_rw, "fR")) + pytest.raises(TypeError, lambda: Path(paths.dir_rwx, "dX")) + pytest.raises(TypeError, lambda: Path(paths.file_rw, "F")) + pytest.raises(TypeError, lambda: Path(paths.dir_rwx, "D")) + + +def test_path_invalid_modes(paths): + pytest.raises(ValueError, lambda: Path(paths.file_rw, True)) + pytest.raises(ValueError, lambda: Path(paths.file_rw, "≠")) + pytest.raises(ValueError, lambda: Path(paths.file_rw, "fd")) + if url_support: + pytest.raises(ValueError, lambda: Path(paths.file_rw, "du")) + + +def test_path_class_hidden_methods(paths): + path = Path(paths.file_rw, "frw") + assert path(False) == str(paths.file_rw) + assert path(True) == str(paths.tmp_path / paths.file_rw) + assert path() == str(paths.tmp_path / paths.file_rw) + assert str(path) == str(paths.file_rw) + assert path.__repr__().startswith("Path_frw(") + + +def test_path_tilde_home(paths): + home_env = "USERPROFILE" if os.name == "nt" else "HOME" + with patch.dict(os.environ, {home_env: str(paths.tmp_path)}): + home = Path("~", "dr") + path = Path(os.path.join("~", paths.file_rw), "frw") + assert str(home) == "~" + assert str(path) == os.path.join("~", paths.file_rw) + assert home() == str(paths.tmp_path) + assert path() == os.path.join(paths.tmp_path, paths.file_rw) + + +def test_std_input_path(): + input_text_to_test = "a text here\n" + + with patch("sys.stdin", StringIO(input_text_to_test)): + path = Path("-", mode="fr") + assert path == "-" + assert input_text_to_test == path.get_content("r") + + with patch("sys.stdin", StringIO(input_text_to_test)): + path = Path("-", mode="fr") + with path.open("r") as std_input: + assert input_text_to_test == "".join([line for line in std_input]) + + +def test_std_output_path(): + path = Path("-", mode="fw") + assert path == "-" + output = StringIO("") + with patch("sys.stdout", output): + with path.open("w") as std_output: + std_output.write("test\n") + assert output.getvalue() == "test\n" + + +# url tests + + +@pytest.mark.parametrize( + ["url", "scheme", "path"], + [ + ("https://eg.com:8080/eg", "https://", "eg.com:8080/eg"), + ("dask::s3://bucket/key", "dask::s3://", "bucket/key"), + ("filecache::s3://bucket/key", "filecache::s3://", "bucket/key"), + ( + "zip://*.csv::simplecache::gcs://bucket/file.zip", + "zip://*.csv::simplecache::gcs://", + "bucket/file.zip", + ), + ( + "simplecache::zip://*.csv::gcs://bucket/file.zip", + "simplecache::zip://*.csv::gcs://", + "bucket/file.zip", + ), + ( + "zip://existing.txt::file://file1.zip", + "zip://existing.txt::file://", + "file1.zip", + ), + ("file.txt", None, None), + ("../../file.txt", None, None), + ("/tmp/file.txt", None, None), + ], +) +def test_parse_url(url, scheme, path): + url_data = _parse_url(url) + if scheme is None: + assert url_data is None + else: + assert url_data.scheme == scheme + assert url_data.url_path == path + + +@skip_if_responses_unavailable +@responses_activate +def test_path_url_200(): + existing = "http://example.com/existing-url" + existing_body = "url contents" + responses.add(responses.GET, existing, status=200, body=existing_body) + responses.add(responses.HEAD, existing, status=200) + path = Path(existing, mode="ur") + assert existing_body == path.get_content() + + +@skip_if_responses_unavailable +@responses_activate +def test_path_url_404(): + nonexisting = "http://example.com/non-existing-url" + responses.add(responses.HEAD, nonexisting, status=404) + with pytest.raises(TypeError) as ctx: + Path(nonexisting, mode="ur") + ctx.match("404") + + +# fsspec tests + + +def create_zip(zip_path, file_path): + ziph = zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) + ziph.write(file_path) + ziph.close() + + +@skip_if_fsspec_unavailable +def test_path_fsspec_zipfile(tmp_cwd): + existing = pathlib.Path("existing.txt") + existing_body = "existing content" + existing.write_text(existing_body) + nonexisting = "non-existing.txt" + zip1_path = "file1.zip" + zip2_path = pathlib.Path("file2.zip") + create_zip(zip1_path, existing) + create_zip(zip2_path, existing) + zip2_path.chmod(0) + + path = Path(f"zip://{existing}::file://{zip1_path}", mode="sr") + assert existing_body == path.get_content() + + with pytest.raises(TypeError) as ctx: + Path(f"zip://{nonexisting}::file://{zip1_path}", mode="sr") + ctx.match("does not exist") + + if is_posix: + with pytest.raises(TypeError) as ctx: + Path(f"zip://{existing}::file://{zip2_path}", mode="sr") + ctx.match("exists but no permission to access") + + +@skip_if_fsspec_unavailable +def test_path_fsspec_memory(): + file_content = "content in memory" + memfile = "memfile.txt" + path = Path(f"memory://{memfile}", mode="sw") + with fsspec.open(path, "w") as f: + f.write(file_content) + assert file_content == path.get_content() + + +def test_path_fsspec_invalid_mode(): + with pytest.raises(ValueError) as ctx: + Path("memory://file.txt", mode="ds") + ctx.match('Both modes "d" and "s" not possible') + + +@skip_if_fsspec_unavailable +def test_path_fsspec_invalid_scheme(): + with pytest.raises(TypeError) as ctx: + Path("unsupported://file.txt", mode="sr") + ctx.match("not readable") + + +# path open tests + + +def test_path_open_local(tmp_cwd): + pathlib.Path("file.txt").write_text("content") + path = Path("file.txt", mode="fr") + with path.open() as f: + assert "content" == f.read() + + +@skip_if_responses_unavailable +@responses_activate +def test_path_open_url(): + url = "http://example.com/file.txt" + responses.add(responses.GET, url, status=200, body="content") + responses.add(responses.HEAD, url, status=200) + path = Path(url, mode="ur") + with path.open() as f: + assert "content" == f.read() + + +@skip_if_fsspec_unavailable +def test_path_open_fsspec(): + path = Path("memory://nested/file.txt", mode="sc") + with fsspec.open(path, "w") as f: + f.write("content") + path = Path("memory://nested/file.txt", mode="sr") + with path.open() as f: + assert "content" == f.read() + + +# path relative path context tests + + +@skip_if_requests_unavailable +def test_path_relative_path_context_url(): + path1 = Path("http://example.com/nested/path/file1.txt", mode="u") + with path1.relative_path_context() as dir: + assert "http://example.com/nested/path" == dir + path2 = Path("../file2.txt", mode="u") + assert path2() == "http://example.com/nested/file2.txt" + + +@skip_if_fsspec_unavailable +def test_relative_path_context_fsspec(tmp_cwd, subtests): + local_path = tmp_cwd / "file0.txt" + local_path.write_text("zero") + + mem_path = Path("memory://one/two/file1.txt", mode="sc") + with fsspec.open(mem_path, "w") as f: + f.write("one") + + with Path("memory://one/two/three/file1.txt", mode="sc").relative_path_context() as dir1: + with subtests.test("get current path dir"): + assert "memory://one/two/three" == dir1 + assert "memory://one/two/three" == _current_path_dir.get() + + with subtests.test("absolute local path"): + path0 = Path(local_path, mode="fr") + assert "zero" == path0.get_content() + + with subtests.test("relative fsspec path"): + path1 = Path("../file1.txt", mode="fsr") + assert "one" == path1.get_content() + assert str(path1) == "../file1.txt" + assert path1() == "memory://one/two/file1.txt" + assert path1._url_data is not None + + with subtests.test("nested fsspec dir"): + with path1.relative_path_context() as dir2: + assert "memory://one/two" == dir2 + path2 = Path("four/five/six/../file2.txt", mode="fsc") + assert path2() == "memory://one/two/four/five/file2.txt" + + with subtests.test("non-fsspec path"): + path3 = Path("file3.txt", mode="fc") + assert path3() == str(tmp_cwd / "file3.txt") + + with subtests.test("current path dir unset"): + assert _current_path_dir.get() is None + + +# path types tests def test_path_fr(file_r): @@ -53,7 +463,7 @@ def test_paths_config_relative_absolute(parser, tmp_cwd): parser.add_argument("--dir", type=Path_drw) (tmp_cwd / "example").mkdir() - rel_yaml_file = Path("..", "example", "example.yaml") + rel_yaml_file = pathlib.Path("..", "example", "example.yaml") abs_yaml_file = (tmp_cwd / "example" / rel_yaml_file).resolve() abs_yaml_file.write_text(json_or_yaml_dump({"file": str(rel_yaml_file), "dir": str(tmp_cwd)})) @@ -122,7 +532,7 @@ def test_paths_dump(parser, tmp_cwd): def test_enable_path_dict(parser, tmp_cwd): data = {"a": 1, "b": 2, "c": [3, 4]} - Path("data.yaml").write_text(json.dumps(data)) + pathlib.Path("data.yaml").write_text(json.dumps(data)) parser.add_argument("--data", type=Dict[str, Any], enable_path=True) cfg = parser.parse_args(["--data=data.yaml"]) @@ -135,7 +545,7 @@ def test_enable_path_dict(parser, tmp_cwd): def test_enable_path_subclass(parser, tmp_cwd): cal = {"class_path": "calendar.Calendar"} - Path("cal.yaml").write_text(json.dumps(cal)) + pathlib.Path("cal.yaml").write_text(json.dumps(cal)) parser.add_argument("--cal", type=Calendar, enable_path=True) cfg = parser.parse_args(["--cal=cal.yaml"]) @@ -243,7 +653,7 @@ def __init__(self, path: Optional[os.PathLike] = None): def test_enable_path_optional_pathlike_subclass_parameter(parser, tmp_cwd): - data_path = Path("data.json") + data_path = pathlib.Path("data.json") data_path.write_text('{"a": 1}') parser.add_argument("--data", type=DataOptionalPath, enable_path=True) @@ -263,14 +673,14 @@ def __init__(self, path: Union[Base, os.PathLike, str] = ""): def test_sub_configs_union_subclass_and_pathlike(parser, tmp_cwd): - data_path = Path("data.csv") + data_path = pathlib.Path("data.csv") data_path.write_text("x\ny\n") config = { "data": { "path": "data.csv", } } - config_path = Path("config.json") + config_path = pathlib.Path("config.json") config_path.write_text(json.dumps(config)) parser.add_class_arguments(DataUnionPath, "data", sub_configs=True) @@ -278,3 +688,7 @@ def test_sub_configs_union_subclass_and_pathlike(parser, tmp_cwd): cfg = parser.parse_args([f"--cfg={config_path}"]) assert cfg.data.path == str(data_path) + + +def test_import_old_path_location(): + from jsonargparse import Path # noqa: F401 diff --git a/jsonargparse_tests/test_util.py b/jsonargparse_tests/test_util.py index bd08e0c8..8f66bc5b 100644 --- a/jsonargparse_tests/test_util.py +++ b/jsonargparse_tests/test_util.py @@ -2,12 +2,8 @@ import logging import os -import pathlib -import stat -import zipfile from calendar import Calendar from importlib import import_module -from io import StringIO from random import Random from unittest.mock import patch @@ -16,434 +12,20 @@ from jsonargparse import ( ArgumentParser, Namespace, - Path, capture_parser, class_from_function, ) -from jsonargparse._common import ( - LoggerProperty, - null_logger, -) -from jsonargparse._optionals import ( - docstring_parser_support, - fsspec_support, - reconplogger_support, - url_support, -) +from jsonargparse._common import LoggerProperty, null_logger +from jsonargparse._optionals import docstring_parser_support, reconplogger_support from jsonargparse._util import ( CaptureParserException, - current_path_dir, get_import_path, import_object, object_path_serializer, - parse_url, register_unresolvable_import_paths, unique, ) -from jsonargparse_tests.conftest import ( - capture_logs, - get_parser_help, - is_posix, - responses_activate, - responses_available, - skip_if_fsspec_unavailable, - skip_if_requests_unavailable, - skip_if_responses_unavailable, - skip_if_running_as_root, -) - -if responses_available: - import responses -if fsspec_support: - import fsspec - - -# path tests - - -@pytest.fixture(scope="module") -def paths(tmp_path_factory): - cwd = os.getcwd() - tmp_path = tmp_path_factory.mktemp("paths_fixture") - os.chdir(tmp_path) - - try: - paths = Namespace() - paths.tmp_path = tmp_path - paths.file_rw = file_rw = pathlib.Path("file_rw") - paths.file_r = file_r = pathlib.Path("file_r") - paths.file_ = file_ = pathlib.Path("file_") - paths.dir_rwx = dir_rwx = pathlib.Path("dir_rwx") - paths.dir_rx = dir_rx = pathlib.Path("dir_rx") - paths.dir_x = dir_x = pathlib.Path("dir_x") - paths.dir_file_rx = dir_file_rx = dir_x / "file_rx" - - file_r.write_text("file contents") - file_rw.touch() - file_.touch() - dir_rwx.mkdir() - dir_rx.mkdir() - dir_x.mkdir() - dir_file_rx.touch() - - file_rw.chmod(stat.S_IREAD | stat.S_IWRITE) - file_r.chmod(stat.S_IREAD) - file_.chmod(0) - dir_file_rx.chmod(stat.S_IREAD | stat.S_IEXEC) - dir_rwx.chmod(stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) - dir_rx.chmod(stat.S_IREAD | stat.S_IEXEC) - dir_x.chmod(stat.S_IEXEC) - - yield paths - finally: - dir_x.chmod(stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) - os.chdir(cwd) - - -def test_path_init(paths): - path1 = Path(paths.file_rw, "frw") - path2 = Path(path1) - assert path1.cwd == path2.cwd - assert path1.absolute == path2.absolute - assert path1.relative == path2.relative - assert path1.is_url == path2.is_url - - -def test_path_init_failures(paths): - pytest.raises(TypeError, lambda: Path(True)) - pytest.raises(ValueError, lambda: Path(paths.file_rw, "-")) - pytest.raises(ValueError, lambda: Path(paths.file_rw, "frr")) - - -def test_path_cwd(paths): - path = Path("file_rx", mode="fr", cwd=(paths.tmp_path / paths.dir_x)) - assert path.cwd == Path("file_rx", mode="fr", cwd=path.cwd).cwd - - -def test_path_empty_mode(paths): - path = Path("does_not_exist", "") - assert path() == str(paths.tmp_path / "does_not_exist") - - -def test_path_pathlike(paths): - path = Path(paths.file_rw) - assert isinstance(path, os.PathLike) - assert os.fspath(path) == str(paths.tmp_path / paths.file_rw) - assert os.path.dirname(path) == str(paths.tmp_path) - - -def test_path_equality_operator(paths): - path1 = Path(paths.file_rw) - path2 = Path(paths.tmp_path / paths.file_rw) - assert path1 == path2 - assert Path("123", "fc") != 123 - - -@skip_if_running_as_root -def test_path_file_access_mode(paths): - Path(paths.file_rw, "frw") - Path(paths.file_r, "fr") - Path(paths.file_, "f") - Path(paths.dir_file_rx, "fr") - if is_posix: - pytest.raises(TypeError, lambda: Path(paths.file_rw, "fx")) - pytest.raises(TypeError, lambda: Path(paths.file_, "fr")) - pytest.raises(TypeError, lambda: Path(paths.file_r, "fw")) - pytest.raises(TypeError, lambda: Path(paths.dir_file_rx, "fw")) - pytest.raises(TypeError, lambda: Path(paths.dir_rx, "fr")) - pytest.raises(TypeError, lambda: Path("file_ne", "fr")) - - -@skip_if_running_as_root -def test_path_dir_access_mode(paths): - Path(paths.dir_rwx, "drwx") - Path(paths.dir_rx, "drx") - Path(paths.dir_x, "dx") - if is_posix: - pytest.raises(TypeError, lambda: Path(paths.dir_rx, "dw")) - pytest.raises(TypeError, lambda: Path(paths.dir_x, "dr")) - pytest.raises(TypeError, lambda: Path(paths.file_r, "dr")) - - -def test_path_get_content(paths): - assert "file contents" == Path(paths.file_r, "fr").get_content() - assert "file contents" == Path(f"file://{paths.tmp_path}/{paths.file_r}", "fr").get_content() - assert "file contents" == Path(f"file://{paths.tmp_path}/{paths.file_r}", "ur").get_content() - - -@skip_if_running_as_root -def test_path_create_mode(paths): - Path(paths.file_rw, "fcrw") - Path(paths.tmp_path / "file_c", "fc") - Path(paths.tmp_path / "not_existing_dir" / "file_c", "fcc") - Path(paths.dir_rwx, "dcrwx") - Path(paths.tmp_path / "dir_c", "dc") - if is_posix: - pytest.raises(TypeError, lambda: Path(paths.dir_rx / "file_c", "fc")) - pytest.raises(TypeError, lambda: Path(paths.dir_rx / "dir_c", "dc")) - pytest.raises(TypeError, lambda: Path(paths.dir_rx / "not_existing_dir" / "file_c", "fcc")) - pytest.raises(TypeError, lambda: Path(paths.file_rw, "dc")) - pytest.raises(TypeError, lambda: Path(paths.dir_rwx, "fc")) - pytest.raises(TypeError, lambda: Path(paths.dir_rwx / "ne" / "file_c", "fc")) - - -def test_path_complement_modes(paths): - pytest.raises(TypeError, lambda: Path(paths.file_rw, "fW")) - pytest.raises(TypeError, lambda: Path(paths.file_rw, "fR")) - pytest.raises(TypeError, lambda: Path(paths.dir_rwx, "dX")) - pytest.raises(TypeError, lambda: Path(paths.file_rw, "F")) - pytest.raises(TypeError, lambda: Path(paths.dir_rwx, "D")) - - -def test_path_invalid_modes(paths): - pytest.raises(ValueError, lambda: Path(paths.file_rw, True)) - pytest.raises(ValueError, lambda: Path(paths.file_rw, "≠")) - pytest.raises(ValueError, lambda: Path(paths.file_rw, "fd")) - if url_support: - pytest.raises(ValueError, lambda: Path(paths.file_rw, "du")) - - -def test_path_class_hidden_methods(paths): - path = Path(paths.file_rw, "frw") - assert path(False) == str(paths.file_rw) - assert path(True) == str(paths.tmp_path / paths.file_rw) - assert path() == str(paths.tmp_path / paths.file_rw) - assert str(path) == str(paths.file_rw) - assert path.__repr__().startswith("Path_frw(") - - -def test_path_tilde_home(paths): - home_env = "USERPROFILE" if os.name == "nt" else "HOME" - with patch.dict(os.environ, {home_env: str(paths.tmp_path)}): - home = Path("~", "dr") - path = Path(os.path.join("~", paths.file_rw), "frw") - assert str(home) == "~" - assert str(path) == os.path.join("~", paths.file_rw) - assert home() == str(paths.tmp_path) - assert path() == os.path.join(paths.tmp_path, paths.file_rw) - - -def test_std_input_path(): - input_text_to_test = "a text here\n" - - with patch("sys.stdin", StringIO(input_text_to_test)): - path = Path("-", mode="fr") - assert path == "-" - assert input_text_to_test == path.get_content("r") - - with patch("sys.stdin", StringIO(input_text_to_test)): - path = Path("-", mode="fr") - with path.open("r") as std_input: - assert input_text_to_test == "".join([line for line in std_input]) - - -def test_std_output_path(): - path = Path("-", mode="fw") - assert path == "-" - output = StringIO("") - with patch("sys.stdout", output): - with path.open("w") as std_output: - std_output.write("test\n") - assert output.getvalue() == "test\n" - - -# url tests - - -@pytest.mark.parametrize( - ["url", "scheme", "path"], - [ - ("https://eg.com:8080/eg", "https://", "eg.com:8080/eg"), - ("dask::s3://bucket/key", "dask::s3://", "bucket/key"), - ("filecache::s3://bucket/key", "filecache::s3://", "bucket/key"), - ( - "zip://*.csv::simplecache::gcs://bucket/file.zip", - "zip://*.csv::simplecache::gcs://", - "bucket/file.zip", - ), - ( - "simplecache::zip://*.csv::gcs://bucket/file.zip", - "simplecache::zip://*.csv::gcs://", - "bucket/file.zip", - ), - ( - "zip://existing.txt::file://file1.zip", - "zip://existing.txt::file://", - "file1.zip", - ), - ("file.txt", None, None), - ("../../file.txt", None, None), - ("/tmp/file.txt", None, None), - ], -) -def test_parse_url(url, scheme, path): - url_data = parse_url(url) - if scheme is None: - assert url_data is None - else: - assert url_data.scheme == scheme - assert url_data.url_path == path - - -@skip_if_responses_unavailable -@responses_activate -def test_path_url_200(): - existing = "http://example.com/existing-url" - existing_body = "url contents" - responses.add(responses.GET, existing, status=200, body=existing_body) - responses.add(responses.HEAD, existing, status=200) - path = Path(existing, mode="ur") - assert existing_body == path.get_content() - - -@skip_if_responses_unavailable -@responses_activate -def test_path_url_404(): - nonexisting = "http://example.com/non-existing-url" - responses.add(responses.HEAD, nonexisting, status=404) - with pytest.raises(TypeError) as ctx: - Path(nonexisting, mode="ur") - ctx.match("404") - - -# fsspec tests - - -def create_zip(zip_path, file_path): - ziph = zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) - ziph.write(file_path) - ziph.close() - - -@skip_if_fsspec_unavailable -def test_path_fsspec_zipfile(tmp_cwd): - existing = pathlib.Path("existing.txt") - existing_body = "existing content" - existing.write_text(existing_body) - nonexisting = "non-existing.txt" - zip1_path = "file1.zip" - zip2_path = pathlib.Path("file2.zip") - create_zip(zip1_path, existing) - create_zip(zip2_path, existing) - zip2_path.chmod(0) - - path = Path(f"zip://{existing}::file://{zip1_path}", mode="sr") - assert existing_body == path.get_content() - - with pytest.raises(TypeError) as ctx: - Path(f"zip://{nonexisting}::file://{zip1_path}", mode="sr") - ctx.match("does not exist") - - if is_posix: - with pytest.raises(TypeError) as ctx: - Path(f"zip://{existing}::file://{zip2_path}", mode="sr") - ctx.match("exists but no permission to access") - - -@skip_if_fsspec_unavailable -def test_path_fsspec_memory(): - file_content = "content in memory" - memfile = "memfile.txt" - path = Path(f"memory://{memfile}", mode="sw") - with fsspec.open(path, "w") as f: - f.write(file_content) - assert file_content == path.get_content() - - -def test_path_fsspec_invalid_mode(): - with pytest.raises(ValueError) as ctx: - Path("memory://file.txt", mode="ds") - ctx.match('Both modes "d" and "s" not possible') - - -@skip_if_fsspec_unavailable -def test_path_fsspec_invalid_scheme(): - with pytest.raises(TypeError) as ctx: - Path("unsupported://file.txt", mode="sr") - ctx.match("not readable") - - -# path open tests - - -def test_path_open_local(tmp_cwd): - pathlib.Path("file.txt").write_text("content") - path = Path("file.txt", mode="fr") - with path.open() as f: - assert "content" == f.read() - - -@skip_if_responses_unavailable -@responses_activate -def test_path_open_url(): - url = "http://example.com/file.txt" - responses.add(responses.GET, url, status=200, body="content") - responses.add(responses.HEAD, url, status=200) - path = Path(url, mode="ur") - with path.open() as f: - assert "content" == f.read() - - -@skip_if_fsspec_unavailable -def test_path_open_fsspec(): - path = Path("memory://nested/file.txt", mode="sc") - with fsspec.open(path, "w") as f: - f.write("content") - path = Path("memory://nested/file.txt", mode="sr") - with path.open() as f: - assert "content" == f.read() - - -# path relative path context tests - - -@skip_if_requests_unavailable -def test_path_relative_path_context_url(): - path1 = Path("http://example.com/nested/path/file1.txt", mode="u") - with path1.relative_path_context() as dir: - assert "http://example.com/nested/path" == dir - path2 = Path("../file2.txt", mode="u") - assert path2() == "http://example.com/nested/file2.txt" - - -@skip_if_fsspec_unavailable -def test_relative_path_context_fsspec(tmp_cwd, subtests): - local_path = tmp_cwd / "file0.txt" - local_path.write_text("zero") - - mem_path = Path("memory://one/two/file1.txt", mode="sc") - with fsspec.open(mem_path, "w") as f: - f.write("one") - - with Path("memory://one/two/three/file1.txt", mode="sc").relative_path_context() as dir1: - with subtests.test("get current path dir"): - assert "memory://one/two/three" == dir1 - assert "memory://one/two/three" == current_path_dir.get() - - with subtests.test("absolute local path"): - path0 = Path(local_path, mode="fr") - assert "zero" == path0.get_content() - - with subtests.test("relative fsspec path"): - path1 = Path("../file1.txt", mode="fsr") - assert "one" == path1.get_content() - assert str(path1) == "../file1.txt" - assert path1() == "memory://one/two/file1.txt" - assert path1._url_data is not None - - with subtests.test("nested fsspec dir"): - with path1.relative_path_context() as dir2: - assert "memory://one/two" == dir2 - path2 = Path("four/five/six/../file2.txt", mode="fsc") - assert path2() == "memory://one/two/four/five/file2.txt" - - with subtests.test("non-fsspec path"): - path3 = Path("file3.txt", mode="fc") - assert path3() == str(tmp_cwd / "file3.txt") - - with subtests.test("current path dir unset"): - assert current_path_dir.get() is None - +from jsonargparse_tests.conftest import capture_logs, get_parser_help # logger property tests diff --git a/sphinx/index.rst b/sphinx/index.rst index be394285..e3c379b2 100644 --- a/sphinx/index.rst +++ b/sphinx/index.rst @@ -24,7 +24,7 @@ jsonargparse jsonargparse.typing ------------------- .. automodule:: jsonargparse.typing - :exclude-members: get_import_path, import_object, Path + :exclude-members: get_import_path, import_object Index