diff --git a/src/pluggy/_callers.py b/src/pluggy/_callers.py index 6c54ce61..f6bdd32f 100644 --- a/src/pluggy/_callers.py +++ b/src/pluggy/_callers.py @@ -4,81 +4,18 @@ from __future__ import annotations -from collections.abc import Generator from collections.abc import Mapping from collections.abc import Sequence -from typing import cast -from typing import NoReturn -from typing import TypeAlias -import warnings -from ._hooks import HookImpl -from ._result import HookCallError -from ._result import Result -from ._warnings import PluggyTeardownRaisedWarning - - -# Need to distinguish between old- and new-style hook wrappers. -# Wrapping with a tuple is the fastest type-safe way I found to do it. -Teardown: TypeAlias = Generator[None, object, object] - - -def run_old_style_hookwrapper( - hook_impl: HookImpl, hook_name: str, args: Sequence[object] -) -> Teardown: - """ - backward compatibility wrapper to run a old style hookwrapper as a wrapper - """ - - teardown: Teardown = cast(Teardown, hook_impl.function(*args)) - try: - next(teardown) - except StopIteration: - _raise_wrapfail(teardown, "did not yield") - try: - res = yield - result = Result(res, None) - except BaseException as exc: - result = Result(None, exc) - try: - teardown.send(result) - except StopIteration: - pass - except BaseException as e: - _warn_teardown_exception(hook_name, hook_impl, e) - raise - else: - _raise_wrapfail(teardown, "has second yield") - finally: - teardown.close() - return result.get_result() - - -def _raise_wrapfail( - wrap_controller: Generator[None, object, object], - msg: str, -) -> NoReturn: - co = wrap_controller.gi_code # type: ignore[attr-defined] - raise RuntimeError( - f"wrap_controller at {co.co_name!r} {co.co_filename}:{co.co_firstlineno} {msg}" - ) - - -def _warn_teardown_exception( - hook_name: str, hook_impl: HookImpl, e: BaseException -) -> None: - msg = ( - f"A plugin raised an exception during an old-style hookwrapper teardown.\n" - f"Plugin: {hook_impl.plugin_name}, Hook: {hook_name}\n" - f"{type(e).__name__}: {e}\n" - f"For more information see https://pluggy.readthedocs.io/en/stable/api_reference.html#pluggy.PluggyTeardownRaisedWarning" # noqa: E501 - ) - warnings.warn(PluggyTeardownRaisedWarning(msg), stacklevel=6) +from ._hooks import _NormalHookImplementation +from ._hooks import _WrapperHookImplementation +from ._hooks import Teardown def _multicall( hook_name: str, - hook_impls: Sequence[HookImpl], + normal_impls: Sequence[_NormalHookImplementation], + wrapper_impls: Sequence[_WrapperHookImplementation], caller_kwargs: Mapping[str, object], firstresult: bool, ) -> object | list[object]: @@ -90,83 +27,34 @@ def _multicall( __tracebackhide__ = True results: list[object] = [] exception = None - try: # run impl and wrapper setup functions in a loop - teardowns: list[Teardown] = [] - try: - for hook_impl in reversed(hook_impls): - try: - args = [caller_kwargs[argname] for argname in hook_impl.argnames] - except KeyError as e: - # coverage bug - this is tested - for argname in hook_impl.argnames: # pragma: no cover - if argname not in caller_kwargs: - raise HookCallError( - f"hook call must provide argument {argname!r}" - ) from e + teardowns: list[Teardown] = [] - if hook_impl.hookwrapper: - function_gen = run_old_style_hookwrapper(hook_impl, hook_name, args) + try: + # Phase 1: Run wrapper setup (in reverse - tryfirst wrappers run first) + for wrapper in reversed(wrapper_impls): + teardowns.append(wrapper.setup_teardown(caller_kwargs)) + + # Phase 2: Run normal impls (in reverse - tryfirst impls run first) + for impl in reversed(normal_impls): + res = impl.call(caller_kwargs) + if res is not None: + results.append(res) + if firstresult: # halt further impl calls + break - next(function_gen) # first yield - teardowns.append(function_gen) + except BaseException as exc: + exception = exc - elif hook_impl.wrapper: - try: - # If this cast is not valid, a type error is raised below, - # which is the desired response. - res = hook_impl.function(*args) - function_gen = cast(Generator[None, object, object], res) - next(function_gen) # first yield - teardowns.append(function_gen) - except StopIteration: - _raise_wrapfail(function_gen, "did not yield") - else: - res = hook_impl.function(*args) - if res is not None: - results.append(res) - if firstresult: # halt further impl calls - break - except BaseException as exc: - exception = exc - finally: - if firstresult: # first result hooks return a single value - result = results[0] if results else None - else: - result = results + # Compute result before teardowns + if firstresult: + outcome: object = results[0] if results else None + else: + outcome = results - # run all wrapper post-yield blocks - for teardown in reversed(teardowns): - try: - if exception is not None: - try: - teardown.throw(exception) - except RuntimeError as re: - # StopIteration from generator causes RuntimeError - # even for coroutine usage - see #544 - if ( - isinstance(exception, StopIteration) - and re.__cause__ is exception - ): - teardown.close() - continue - else: - raise - else: - teardown.send(result) - # Following is unreachable for a well behaved hook wrapper. - # Try to force finalizers otherwise postponed till GC action. - # Note: close() may raise if generator handles GeneratorExit. - teardown.close() - except StopIteration as si: - result = si.value - exception = None - continue - except BaseException as e: - exception = e - continue - _raise_wrapfail(teardown, "has second yield") + # Run all wrapper teardowns in reverse order + for teardown in reversed(teardowns): + outcome, exception = teardown(outcome, exception) if exception is not None: raise exception - else: - return result + return outcome diff --git a/src/pluggy/_hooks.py b/src/pluggy/_hooks.py index 7fde78c9..7794f52f 100644 --- a/src/pluggy/_hooks.py +++ b/src/pluggy/_hooks.py @@ -4,6 +4,8 @@ from __future__ import annotations +from abc import ABC +from abc import abstractmethod from collections.abc import Callable from collections.abc import Generator from collections.abc import Mapping @@ -16,6 +18,8 @@ from typing import Final from typing import final from typing import overload +from typing import Protocol +from typing import runtime_checkable from typing import TYPE_CHECKING from typing import TypeAlias from typing import TypedDict @@ -30,8 +34,20 @@ _Namespace: TypeAlias = ModuleType | type _Plugin: TypeAlias = object +# Teardown function: takes (result, exception), returns (result, exception) +Teardown: TypeAlias = Callable[ + [object, BaseException | None], tuple[object, BaseException | None] +] +# Hook execution function signature: +# (name, normal_impls, wrapper_impls, kwargs, firstresult) _HookExec: TypeAlias = Callable[ - [str, Sequence["HookImpl"], Mapping[str, object], bool], + [ + str, + Sequence["_NormalHookImplementation"], + Sequence["_WrapperHookImplementation"], + Mapping[str, object], + bool, + ], object | list[object], ] _HookImplFunction: TypeAlias = Callable[..., _T | Generator[None, Result[_T], None]] @@ -366,8 +382,8 @@ def __init__(self) -> None: """:meta private:""" if TYPE_CHECKING: - - def __getattr__(self, name: str) -> HookCaller: ... + # Return Any since the actual hook type varies (normal, firstresult, historic) + def __getattr__(self, name: str) -> Any: ... # Historical name (pluggy<=1.2), kept for backward compatibility. @@ -379,139 +395,338 @@ def __getattr__(self, name: str) -> HookCaller: ... ] -class HookCaller: - """A caller of all registered implementations of a hook specification.""" +@runtime_checkable +class HookCallerProtocol(Protocol): + """Public protocol for hook callers. - __slots__ = ( - "name", - "spec", - "_hookexec", - "_hookimpls", - "_call_history", - ) + This is the stable public interface that API users should depend on + for type hints. The concrete implementation classes are internal. + """ - def __init__( + @property + def name(self) -> str: ... + + @property + def spec(self) -> HookSpec | None: ... + + def has_spec(self) -> bool: ... + + def is_historic(self) -> bool: ... + + def __call__(self, **kwargs: object) -> Any: ... + + def call_extra( + self, methods: Sequence[Callable[..., object]], kwargs: Mapping[str, object] + ) -> Any: ... + + def call_historic( self, - name: str, - hook_execute: _HookExec, - specmodule_or_class: _Namespace | None = None, - spec_opts: HookspecOpts | None = None, - ) -> None: - """:meta private:""" - #: Name of the hook getting called. + result_callback: Callable[[Any], None] | None = ..., + kwargs: Mapping[str, object] | None = ..., + ) -> None: ... + + +class _HookCallerBase(ABC): + """Base class for all hook callers (internal). + + Use :class:`HookCallerProtocol` for type hints in public APIs. + """ + + __slots__ = ("name", "_hookexec", "_normal_impls", "_wrapper_impls") + + def __init__(self, name: str, hook_execute: _HookExec) -> None: self.name: Final = name self._hookexec: Final = hook_execute - # The hookimpls list. The caller iterates it *in reverse*. Format: - # 1. trylast nonwrappers - # 2. nonwrappers - # 3. tryfirst nonwrappers - # 4. trylast wrappers - # 5. wrappers - # 6. tryfirst wrappers - self._hookimpls: Final[list[HookImpl]] = [] - self._call_history: _CallHistory | None = None - # TODO: Document, or make private. - self.spec: HookSpec | None = None - if specmodule_or_class is not None: - assert spec_opts is not None - self.set_specification(specmodule_or_class, spec_opts) - - # TODO: Document, or make private. - def has_spec(self) -> bool: - return self.spec is not None + # Separate lists for normal and wrapper implementations. + # Each list ordered: [trylast..., normal..., tryfirst...] + self._normal_impls: list[_NormalHookImplementation] = [] + self._wrapper_impls: list[_WrapperHookImplementation] = [] + + def get_hookimpls(self) -> list[_HookImplementation]: + """Get all registered hook implementations for this hook. + + .. deprecated:: + Access ``_normal_impls`` and ``_wrapper_impls`` directly instead. + """ + return list(self._normal_impls) + list(self._wrapper_impls) - # TODO: Document, or make private. - def set_specification( + def _insert_by_priority( self, - specmodule_or_class: _Namespace, - spec_opts: HookspecOpts, + impl_list: list[_HookImplementation], + hookimpl: _HookImplementation, ) -> None: - if self.spec is not None: - raise ValueError( - f"Hook {self.spec.name!r} is already registered " - f"within namespace {self.spec.namespace}" + """Insert hookimpl into list maintaining priority order.""" + if hookimpl.trylast: + impl_list.insert(0, hookimpl) + elif hookimpl.tryfirst: + impl_list.append(hookimpl) + else: + # Find last non-tryfirst impl + i = len(impl_list) + while i > 0 and impl_list[i - 1].tryfirst: + i -= 1 + impl_list.insert(i, hookimpl) + + def _add_hookimpl(self, hookimpl: _HookImplementation) -> None: + """Add to appropriate list based on wrapper type.""" + if hookimpl.is_wrapper: + self._insert_by_priority( + self._wrapper_impls, # type: ignore[arg-type] + hookimpl, + ) + else: + self._insert_by_priority( + self._normal_impls, # type: ignore[arg-type] + hookimpl, ) - self.spec = HookSpec(specmodule_or_class, self.name, spec_opts) - if spec_opts.get("historic"): - self._call_history = [] - - def is_historic(self) -> bool: - """Whether this caller is :ref:`historic `.""" - return self._call_history is not None def _remove_plugin(self, plugin: _Plugin) -> None: - for i, method in enumerate(self._hookimpls): - if method.plugin == plugin: - del self._hookimpls[i] + """Remove all hookimpls for a plugin.""" + for i, impl in enumerate(self._normal_impls): + if impl.plugin == plugin: + del self._normal_impls[i] + return + for i, wrapper in enumerate(self._wrapper_impls): + if wrapper.plugin == plugin: + del self._wrapper_impls[i] return raise ValueError(f"plugin {plugin!r} not found") - def get_hookimpls(self) -> list[HookImpl]: - """Get all registered hook implementations for this hook.""" - return self._hookimpls.copy() + @abstractmethod + def has_spec(self) -> bool: ... - def _add_hookimpl(self, hookimpl: HookImpl) -> None: - """Add an implementation to the callback chain.""" - for i, method in enumerate(self._hookimpls): - if method.hookwrapper or method.wrapper: - splitpoint = i - break - else: - splitpoint = len(self._hookimpls) - if hookimpl.hookwrapper or hookimpl.wrapper: - start, end = splitpoint, len(self._hookimpls) - else: - start, end = 0, splitpoint + @abstractmethod + def is_historic(self) -> bool: ... - if hookimpl.trylast: - self._hookimpls.insert(start, hookimpl) - elif hookimpl.tryfirst: - self._hookimpls.insert(end, hookimpl) - else: - # find last non-tryfirst method - i = end - 1 - while i >= start and self._hookimpls[i].tryfirst: - i -= 1 - self._hookimpls.insert(i + 1, hookimpl) + @property + @abstractmethod + def spec(self) -> HookSpec | None: ... + + def call_extra( + self, methods: Sequence[Callable[..., object]], kwargs: Mapping[str, object] + ) -> Any: + """Call with additional temporary methods. + + Override in subclasses that support this operation. + """ + raise TypeError(f"{type(self).__name__!r} does not support call_extra") + + def call_historic( + self, + result_callback: Callable[[Any], None] | None = None, + kwargs: Mapping[str, object] | None = None, + ) -> None: + """Call with historic registration. + + Override in subclasses that support this operation. + """ + raise TypeError(f"{type(self).__name__!r} does not support call_historic") def __repr__(self) -> str: - return f"" + return f"<{type(self).__name__} {self.name!r}>" + + +@final +class _UnspeccedHookCaller(_HookCallerBase): + """Hook without specification - replaced when spec is added.""" + + __slots__ = () + + @property + def spec(self) -> None: + return None + + def has_spec(self) -> bool: + return False + + def is_historic(self) -> bool: + return False + + def _verify_all_args_are_provided(self, _kwargs: Mapping[str, object]) -> None: + # No spec, no verification needed + pass + + def __call__(self, **kwargs: object) -> list[object]: + # Copy because plugins may register other plugins during iteration (#438). + return self._hookexec( # type: ignore[return-value] + self.name, + self._normal_impls.copy(), + self._wrapper_impls.copy(), + kwargs, + False, + ) + + +class _SpecifiedHookCaller(_HookCallerBase, ABC): + """Base for hooks with a specification.""" + + __slots__ = ("_spec",) + + def __init__( + self, + name: str, + hook_execute: _HookExec, + specmodule_or_class: _Namespace, + spec_opts: HookspecOpts, + ) -> None: + super().__init__(name, hook_execute) + self._spec: Final = HookSpec(specmodule_or_class, name, spec_opts) + + @property + def spec(self) -> HookSpec: + return self._spec + + def has_spec(self) -> bool: + return True + + @abstractmethod + def __call__(self, **kwargs: object) -> Any: ... def _verify_all_args_are_provided(self, kwargs: Mapping[str, object]) -> None: # This is written to avoid expensive operations when not needed. - if self.spec: - for argname in self.spec.argnames: - if argname not in kwargs: - notincall = ", ".join( - repr(argname) - for argname in self.spec.argnames - # Avoid self.spec.argnames - kwargs.keys() - # it doesn't preserve order. - if argname not in kwargs.keys() - ) - warnings.warn( - f"Argument(s) {notincall} which are declared in the hookspec " - "cannot be found in this hook call", - stacklevel=2, - ) - break + for argname in self._spec.argnames: + if argname not in kwargs: + notincall = ", ".join( + repr(argname) + for argname in self._spec.argnames + if argname not in kwargs.keys() + ) + warnings.warn( + f"Argument(s) {notincall} which are declared in the hookspec " + "cannot be found in this hook call", + stacklevel=3, + ) + break - def __call__(self, **kwargs: object) -> Any: - """Call the hook. - Only accepts keyword arguments, which should match the hook - specification. +@final +class _NormalHookCaller(_SpecifiedHookCaller): + """Returns list of all non-None results.""" - Returns the result(s) of calling all registered plugins, see - :ref:`calling`. - """ - assert not self.is_historic(), ( - "Cannot directly call a historic hook - use call_historic instead." + __slots__ = () + + def is_historic(self) -> bool: + return False + + def __call__(self, **kwargs: object) -> list[object]: + self._verify_all_args_are_provided(kwargs) + # Copy because plugins may register other plugins during iteration (#438). + return self._hookexec( # type: ignore[return-value] + self.name, + self._normal_impls.copy(), + self._wrapper_impls.copy(), + kwargs, + False, ) + + def call_extra( + self, methods: Sequence[Callable[..., object]], kwargs: Mapping[str, object] + ) -> list[object]: + """Call with additional temporary methods.""" + self._verify_all_args_are_provided(kwargs) + opts: HookimplOpts = { + "wrapper": False, + "hookwrapper": False, + "optionalhook": False, + "trylast": False, + "tryfirst": False, + "specname": None, + } + # Build list of normal impls with extras inserted + normal_impls: list[_NormalHookImplementation] = list(self._normal_impls) + for method in methods: + hookimpl = _NormalHookImplementation(None, "", method, opts) + # Find last non-tryfirst method. + i = len(normal_impls) - 1 + while i >= 0 and normal_impls[i].tryfirst: + i -= 1 + normal_impls.insert(i + 1, hookimpl) + return self._hookexec( # type: ignore[return-value] + self.name, normal_impls, list(self._wrapper_impls), kwargs, False + ) + + +@final +class _FirstResultHookCaller(_SpecifiedHookCaller): + """Returns first non-None result.""" + + __slots__ = () + + def is_historic(self) -> bool: + return False + + def __call__(self, **kwargs: object) -> object | None: self._verify_all_args_are_provided(kwargs) - firstresult = self.spec.opts.get("firstresult", False) if self.spec else False # Copy because plugins may register other plugins during iteration (#438). - return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult) + return self._hookexec( + self.name, + self._normal_impls.copy(), + self._wrapper_impls.copy(), + kwargs, + True, + ) + + def call_extra( + self, methods: Sequence[Callable[..., object]], kwargs: Mapping[str, object] + ) -> object | None: + """Call with additional temporary methods.""" + self._verify_all_args_are_provided(kwargs) + opts: HookimplOpts = { + "wrapper": False, + "hookwrapper": False, + "optionalhook": False, + "trylast": False, + "tryfirst": False, + "specname": None, + } + # Build list of normal impls with extras inserted + normal_impls: list[_NormalHookImplementation] = list(self._normal_impls) + for method in methods: + hookimpl = _NormalHookImplementation(None, "", method, opts) + # Find last non-tryfirst method. + i = len(normal_impls) - 1 + while i >= 0 and normal_impls[i].tryfirst: + i -= 1 + normal_impls.insert(i + 1, hookimpl) + return self._hookexec( + self.name, normal_impls, list(self._wrapper_impls), kwargs, True + ) + + +@final +class _HistoricHookCaller(_SpecifiedHookCaller): + """Memorizes calls and replays to new registrations.""" + + __slots__ = ("_call_history",) + + def __init__( + self, + name: str, + hook_execute: _HookExec, + specmodule_or_class: _Namespace, + spec_opts: HookspecOpts, + ) -> None: + super().__init__(name, hook_execute, specmodule_or_class, spec_opts) + self._call_history: _CallHistory = [] + + def is_historic(self) -> bool: + return True + + def _add_hookimpl(self, hookimpl: _HookImplementation) -> None: + if hookimpl.is_wrapper: + from ._manager import PluginValidationError + + raise PluginValidationError( + hookimpl.plugin, + f"Plugin {hookimpl.plugin_name!r}\nhook {self.name!r}\n" + "historic hooks cannot have wrappers", + ) + super()._add_hookimpl(hookimpl) + + def __call__(self, **_kwargs: object) -> Any: + raise TypeError( + "Cannot directly call a historic hook - use call_historic instead." + ) def call_historic( self, @@ -519,125 +734,445 @@ def call_historic( kwargs: Mapping[str, object] | None = None, ) -> None: """Call the hook with given ``kwargs`` for all registered plugins and - for all plugins which will be registered afterwards, see - :ref:`historic`. + for all plugins which will be registered afterwards. :param result_callback: If provided, will be called for each non-``None`` result obtained from a hook implementation. """ - assert self._call_history is not None kwargs = kwargs or {} self._verify_all_args_are_provided(kwargs) self._call_history.append((kwargs, result_callback)) # Historizing hooks don't return results. - # Remember firstresult isn't compatible with historic. + # Historic hooks don't have wrappers (enforced in _add_hookimpl). # Copy because plugins may register other plugins during iteration (#438). - res = self._hookexec(self.name, self._hookimpls.copy(), kwargs, False) + res = self._hookexec(self.name, self._normal_impls.copy(), [], kwargs, False) if result_callback is None: return if isinstance(res, list): for x in res: result_callback(x) - def call_extra( - self, methods: Sequence[Callable[..., object]], kwargs: Mapping[str, object] - ) -> Any: - """Call the hook with some additional temporarily participating - methods using the specified ``kwargs`` as call parameters, see - :ref:`call_extra`.""" - assert not self.is_historic(), ( - "Cannot directly call a historic hook - use call_historic instead." - ) - self._verify_all_args_are_provided(kwargs) - opts: HookimplOpts = { - "wrapper": False, - "hookwrapper": False, - "optionalhook": False, - "trylast": False, - "tryfirst": False, - "specname": None, - } - hookimpls = self._hookimpls.copy() - for method in methods: - hookimpl = HookImpl(None, "", method, opts) - # Find last non-tryfirst nonwrapper method. - i = len(hookimpls) - 1 - while i >= 0 and ( - # Skip wrappers. - (hookimpls[i].hookwrapper or hookimpls[i].wrapper) - # Skip tryfirst nonwrappers. - or hookimpls[i].tryfirst - ): - i -= 1 - hookimpls.insert(i + 1, hookimpl) - firstresult = self.spec.opts.get("firstresult", False) if self.spec else False - return self._hookexec(self.name, hookimpls, kwargs, firstresult) - - def _maybe_apply_history(self, method: HookImpl) -> None: + def _maybe_apply_history(self, method: _NormalHookImplementation) -> None: """Apply call history to a new hookimpl if it is marked as historic.""" - if self.is_historic(): - assert self._call_history is not None - for kwargs, result_callback in self._call_history: - res = self._hookexec(self.name, [method], kwargs, False) - if res and result_callback is not None: - # XXX: remember firstresult isn't compat with historic - assert isinstance(res, list) - result_callback(res[0]) - - -# Historical name (pluggy<=1.2), kept for backward compatibility. -_HookCaller = HookCaller + for kwargs, result_callback in self._call_history: + res = self._hookexec(self.name, [method], [], kwargs, False) + if res and result_callback is not None: + assert isinstance(res, list) + result_callback(res[0]) + + +def _create_hook_caller( + name: str, + hook_execute: _HookExec, + specmodule_or_class: _Namespace | None = None, + spec_opts: HookspecOpts | None = None, +) -> _HookCallerBase: + """Factory returning appropriate HookCaller type (internal).""" + if specmodule_or_class is None: + return _UnspeccedHookCaller(name, hook_execute) + + assert spec_opts is not None + if spec_opts.get("historic"): + return _HistoricHookCaller(name, hook_execute, specmodule_or_class, spec_opts) + elif spec_opts.get("firstresult"): + return _FirstResultHookCaller( + name, hook_execute, specmodule_or_class, spec_opts + ) + else: + return _NormalHookCaller(name, hook_execute, specmodule_or_class, spec_opts) -class _SubsetHookCaller(HookCaller): +class _SubsetHookCaller: """A proxy to another HookCaller which manages calls to all registered plugins except the ones from remove_plugins.""" - # This class is unusual: in inhertits from `HookCaller` so all of - # the *code* runs in the class, but it delegates all underlying *data* - # to the original HookCaller. - # `subset_hook_caller` used to be implemented by creating a full-fledged - # HookCaller, copying all hookimpls from the original. This had problems - # with memory leaks (#346) and historic calls (#347), which make a proxy - # approach better. - # An alternative implementation is to use a `_getattr__`/`__getattribute__` - # proxy, however that adds more overhead and is more tricky to implement. - - __slots__ = ( - "_orig", - "_remove_plugins", - ) + __slots__ = ("_orig", "_remove_plugins") - def __init__(self, orig: HookCaller, remove_plugins: Set[_Plugin]) -> None: + def __init__( + self, orig: _SpecifiedHookCaller, remove_plugins: Set[_Plugin] + ) -> None: self._orig = orig self._remove_plugins = remove_plugins - self.name = orig.name # type: ignore[misc] - self._hookexec = orig._hookexec # type: ignore[misc] - @property # type: ignore[misc] - def _hookimpls(self) -> list[HookImpl]: + @property + def name(self) -> str: + return self._orig.name + + @property + def spec(self) -> HookSpec: + return self._orig.spec + + @property + def _normal_impls(self) -> list[_NormalHookImplementation]: return [ impl - for impl in self._orig._hookimpls + for impl in self._orig._normal_impls if impl.plugin not in self._remove_plugins ] @property - def spec(self) -> HookSpec | None: # type: ignore[override] - return self._orig.spec + def _wrapper_impls(self) -> list[_WrapperHookImplementation]: + return [ + impl + for impl in self._orig._wrapper_impls + if impl.plugin not in self._remove_plugins + ] + + def get_hookimpls(self) -> list[_HookImplementation]: + """Get all registered hook implementations for this hook.""" + return list(self._normal_impls) + list(self._wrapper_impls) + + def has_spec(self) -> bool: + return True + + def is_historic(self) -> bool: + return self._orig.is_historic() @property - def _call_history(self) -> _CallHistory | None: # type: ignore[override] - return self._orig._call_history + def _call_history(self) -> _CallHistory | None: + if isinstance(self._orig, _HistoricHookCaller): + return self._orig._call_history + return None + + def __call__(self, **kwargs: object) -> Any: + """Call the hook with filtered implementations.""" + self._orig._verify_all_args_are_provided(kwargs) + firstresult = isinstance(self._orig, _FirstResultHookCaller) + return self._orig._hookexec( + self.name, + self._normal_impls.copy(), + self._wrapper_impls.copy(), + kwargs, + firstresult, + ) + + def call_historic( + self, + result_callback: Callable[[Any], None] | None = None, + kwargs: Mapping[str, object] | None = None, + ) -> None: + """Call a historic hook with given kwargs for all filtered plugins. + + Also registers the call in the original hook's history so new plugins + will receive the call (subject to the original's filtering, not this subset's). + """ + if not isinstance(self._orig, _HistoricHookCaller): + raise TypeError("call_historic is only valid for historic hooks") + kwargs = kwargs or {} + self._orig._verify_all_args_are_provided(kwargs) + # Store in original's history - new plugins get the call via the original + self._orig._call_history.append((kwargs, result_callback)) + # Call with filtered implementations + res = self._orig._hookexec( + self.name, self._normal_impls.copy(), [], kwargs, False + ) + if result_callback is None: + return + if isinstance(res, list): + for x in res: + result_callback(x) def __repr__(self) -> str: return f"<_SubsetHookCaller {self.name!r}>" +# Backward compatibility alias +HookCaller = _NormalHookCaller +_HookCaller = _NormalHookCaller + + +@runtime_checkable +class HookImplementationProtocol(Protocol): + """Public protocol for hook implementations. + + This is the stable public interface that API users should depend on + for type hints. The concrete implementation classes are internal. + """ + + @property + def function(self) -> Callable[..., object]: ... + + @property + def argnames(self) -> tuple[str, ...]: ... + + @property + def kwargnames(self) -> tuple[str, ...]: ... + + @property + def plugin(self) -> _Plugin: ... + + @property + def plugin_name(self) -> str: ... + + @property + def optionalhook(self) -> bool: ... + + @property + def tryfirst(self) -> bool: ... + + @property + def trylast(self) -> bool: ... + + +class _HookImplementation(ABC): + """Base class for all hook implementations (internal). + + Use :class:`HookImplementationProtocol` for type hints in public APIs. + """ + + __slots__ = ( + "function", + "argnames", + "kwargnames", + "plugin", + "plugin_name", + "optionalhook", + "tryfirst", + "trylast", + ) + + def __init__( + self, + plugin: _Plugin, + plugin_name: str, + function: _HookImplFunction[object], + hook_impl_opts: HookimplOpts, + ) -> None: + self.function: Final = function + argnames, kwargnames = varnames(self.function) + self.argnames: Final = argnames + self.kwargnames: Final = kwargnames + self.plugin: Final = plugin + self.plugin_name: Final = plugin_name + self.optionalhook: Final[bool] = hook_impl_opts["optionalhook"] + self.tryfirst: Final[bool] = hook_impl_opts["tryfirst"] + self.trylast: Final[bool] = hook_impl_opts["trylast"] + + @property + @abstractmethod + def is_wrapper(self) -> bool: + """Whether this is a wrapper implementation.""" + ... + + def _get_args(self, caller_kwargs: Mapping[str, object]) -> list[object]: + """Extract positional args from caller kwargs. + + :raises HookCallError: If a required argument is missing. + """ + from ._result import HookCallError + + try: + return [caller_kwargs[argname] for argname in self.argnames] + except KeyError: + for argname in self.argnames: + if argname not in caller_kwargs: + raise HookCallError(f"hook call must provide argument {argname!r}") + raise # pragma: no cover + + # Backward compatibility properties - compute from type + @property + def wrapper(self) -> bool: + """Whether this is a new-style wrapper.""" + return False + + @property + def hookwrapper(self) -> bool: + """Whether this is an old-style hookwrapper.""" + return False + + def __repr__(self) -> str: + return ( + f"<{type(self).__name__} " + f"plugin_name={self.plugin_name!r}, plugin={self.plugin!r}>" + ) + + +@final +class _NormalHookImplementation(_HookImplementation): + """A normal (non-wrapper) hook implementation.""" + + __slots__ = () + + @property + def is_wrapper(self) -> bool: + return False + + def call(self, caller_kwargs: Mapping[str, object]) -> object: + """Call this hook implementation with the given kwargs.""" + args = self._get_args(caller_kwargs) + return self.function(*args) + + +class _WrapperHookImplementation(_HookImplementation, ABC): + """Base class for wrapper hook implementations.""" + + __slots__ = () + + @property + def is_wrapper(self) -> bool: + return True + + @abstractmethod + def setup_teardown(self, caller_kwargs: Mapping[str, object]) -> Teardown: + """Run the wrapper setup phase and return a teardown function. + + :param caller_kwargs: The keyword arguments from the hook call. + + The teardown function takes (result, exception) and returns + (result, exception). + """ + ... + + +def _gen_code_location(gen: Generator[None, object, object]) -> str: + """Get code location string for a generator.""" + co = gen.gi_code # type: ignore[attr-defined] + return f"{co.co_name!r} {co.co_filename}:{co.co_firstlineno}" + + +@final +class _NewStyleWrapper(_WrapperHookImplementation): + """New-style wrapper (wrapper=True).""" + + __slots__ = () + + @property + def wrapper(self) -> bool: + return True + + def setup_teardown(self, caller_kwargs: Mapping[str, object]) -> Teardown: + """Run the wrapper setup phase and return a teardown function.""" + args = self._get_args(caller_kwargs) + gen: Generator[None, object, object] = self.function(*args) # type: ignore[assignment] + try: + next(gen) + except StopIteration: + raise RuntimeError( + f"wrap_controller at {_gen_code_location(gen)} did not yield" + ) + + def teardown( + result: object, exception: BaseException | None + ) -> tuple[object, BaseException | None]: + __tracebackhide__ = True + try: + if exception is not None: + # Throw exception into generator + gen.throw(exception) + else: + gen.send(result) + except StopIteration as si: + return si.value, None + except RuntimeError as re: + # StopIteration from generator causes RuntimeError in Python 3.7+ + # even for coroutine usage - see #544 + if isinstance(exception, StopIteration) and re.__cause__ is exception: + gen.close() + return None, exception + return None, re + except BaseException as e: + return None, e + # If we get here, the wrapper yielded again (bad) + gen.close() + return None, RuntimeError( + f"wrap_controller at {_gen_code_location(gen)} has second yield" + ) + + return teardown + + +@final +class _OldStyleWrapper(_WrapperHookImplementation): + """Legacy hookwrapper (hookwrapper=True).""" + + __slots__ = () + + @property + def hookwrapper(self) -> bool: + return True + + def setup_teardown(self, caller_kwargs: Mapping[str, object]) -> Teardown: + """Run the wrapper setup phase and return a teardown function.""" + import warnings + + from ._result import Result + from ._warnings import PluggyTeardownRaisedWarning + + args = self._get_args(caller_kwargs) + gen: Generator[None, Result[object], None] + gen = self.function(*args) # type: ignore[assignment] + try: + next(gen) + except StopIteration: + loc = _gen_code_location(gen) # type: ignore[arg-type] + raise RuntimeError(f"wrap_controller at {loc} did not yield") + + def teardown( + result: object, exception: BaseException | None + ) -> tuple[object, BaseException | None]: + __tracebackhide__ = True + # Old-style wrappers receive Result object + result_obj = Result(result, exception) + try: + gen.send(result_obj) + except StopIteration: + # Old-style wrapper completed normally + return result_obj._result, result_obj._exception + except BaseException as e: + # Warn about teardown exception in old-style wrapper + loc = _gen_code_location(gen) # type: ignore[arg-type] + msg = ( + "A plugin raised an exception during an " + "old-style hookwrapper teardown.\n" + f"Plugin: {self.plugin_name}, Hook: {loc}\n" + f"{type(e).__name__}: {e}\n" + "For more information see " + "https://pluggy.readthedocs.io/en/stable/api_reference.html" + "#pluggy.PluggyTeardownRaisedWarning" + ) + warnings.warn(PluggyTeardownRaisedWarning(msg), stacklevel=5) + return None, e + finally: + gen.close() + # Unreachable - either StopIteration or exception + loc = _gen_code_location(gen) # type: ignore[arg-type] + return None, RuntimeError(f"wrap_controller at {loc} has second yield") + + return teardown + + +def _create_hook_implementation( + plugin: _Plugin, + plugin_name: str, + function: _HookImplFunction[object], + hook_impl_opts: HookimplOpts, +) -> _HookImplementation: + """Factory returning appropriate implementation type (internal).""" + if hook_impl_opts.get("wrapper") and hook_impl_opts.get("hookwrapper"): + from ._manager import PluginValidationError + + raise PluginValidationError( + plugin, + f"Plugin {plugin_name!r}\n" + "The wrapper=True and hookwrapper=True options are mutually exclusive", + ) + if hook_impl_opts.get("wrapper"): + return _NewStyleWrapper(plugin, plugin_name, function, hook_impl_opts) + elif hook_impl_opts.get("hookwrapper"): + return _OldStyleWrapper(plugin, plugin_name, function, hook_impl_opts) + else: + return _NormalHookImplementation(plugin, plugin_name, function, hook_impl_opts) + + @final class HookImpl: - """A hook implementation in a :class:`HookCaller`.""" + """A hook implementation in a :class:`HookCaller`. + + .. deprecated:: + This class is deprecated. Use :class:`HookImplementationProtocol` + for type hints instead. + """ __slots__ = ( "function", @@ -689,6 +1224,11 @@ def __init__( #: `. self.trylast: Final = hook_impl_opts["trylast"] + @property + def is_wrapper(self) -> bool: + """Whether this is a wrapper implementation.""" + return self.wrapper or self.hookwrapper + def __repr__(self) -> str: return f"" diff --git a/src/pluggy/_manager.py b/src/pluggy/_manager.py index 1b994f25..b2875e11 100644 --- a/src/pluggy/_manager.py +++ b/src/pluggy/_manager.py @@ -15,11 +15,19 @@ from . import _tracing from ._callers import _multicall +from ._hooks import _create_hook_caller +from ._hooks import _create_hook_implementation +from ._hooks import _HistoricHookCaller +from ._hooks import _HookCallerBase +from ._hooks import _HookImplementation from ._hooks import _HookImplFunction from ._hooks import _Namespace +from ._hooks import _NormalHookImplementation from ._hooks import _Plugin +from ._hooks import _SpecifiedHookCaller from ._hooks import _SubsetHookCaller -from ._hooks import HookCaller +from ._hooks import _UnspeccedHookCaller +from ._hooks import _WrapperHookImplementation from ._hooks import HookImpl from ._hooks import HookimplOpts from ._hooks import HookRelay @@ -33,9 +41,13 @@ import importlib.metadata -_BeforeTrace: TypeAlias = Callable[[str, Sequence[HookImpl], Mapping[str, Any]], None] +_BeforeTrace: TypeAlias = Callable[ + [str, Sequence[HookImpl], Mapping[str, Any]], + None, +] _AfterTrace: TypeAlias = Callable[ - [Result[Any], str, Sequence[HookImpl], Mapping[str, Any]], None + [Result[Any], str, Sequence[HookImpl], Mapping[str, Any]], + None, ] @@ -114,13 +126,16 @@ def __init__(self, project_name: str) -> None: def _hookexec( self, hook_name: str, - methods: Sequence[HookImpl], + normal_impls: Sequence[_NormalHookImplementation], + wrapper_impls: Sequence[_WrapperHookImplementation], kwargs: Mapping[str, object], firstresult: bool, ) -> object | list[object]: # called from all hookcaller instances. # enable_tracing will set its own wrapping function at self._inner_hookexec - return self._inner_hookexec(hook_name, methods, kwargs, firstresult) + return self._inner_hookexec( + hook_name, normal_impls, wrapper_impls, kwargs, firstresult + ) def register(self, plugin: _Plugin, name: str | None = None) -> str | None: """Register a plugin and return its name. @@ -161,15 +176,20 @@ def register(self, plugin: _Plugin, name: str | None = None) -> str | None: if hookimpl_opts is not None: normalize_hookimpl_opts(hookimpl_opts) method: _HookImplFunction[object] = getattr(plugin, name) - hookimpl = HookImpl(plugin, plugin_name, method, hookimpl_opts) + hookimpl = _create_hook_implementation( + plugin, plugin_name, method, hookimpl_opts + ) name = hookimpl_opts.get("specname") or name - hook: HookCaller | None = getattr(self.hook, name, None) + hook: _HookCallerBase | None = getattr(self.hook, name, None) if hook is None: - hook = HookCaller(name, self._hookexec) + hook = _create_hook_caller(name, self._hookexec) setattr(self.hook, name, hook) elif hook.has_spec(): self._verify_hook(hook, hookimpl) - hook._maybe_apply_history(hookimpl) + # Apply history for historic hooks + if isinstance(hook, _HistoricHookCaller): + assert isinstance(hookimpl, _NormalHookImplementation) + hook._maybe_apply_history(hookimpl) hook._add_hookimpl(hookimpl) return plugin_name @@ -259,15 +279,30 @@ def add_hookspecs(self, module_or_class: _Namespace) -> None: for name in dir(module_or_class): spec_opts = self.parse_hookspec_opts(module_or_class, name) if spec_opts is not None: - hc: HookCaller | None = getattr(self.hook, name, None) + hc: _HookCallerBase | None = getattr(self.hook, name, None) if hc is None: - hc = HookCaller(name, self._hookexec, module_or_class, spec_opts) + hc = _create_hook_caller( + name, self._hookexec, module_or_class, spec_opts + ) setattr(self.hook, name, hc) - else: + elif isinstance(hc, _UnspeccedHookCaller): # Plugins registered this hook without knowing the spec. - hc.set_specification(module_or_class, spec_opts) - for hookfunction in hc.get_hookimpls(): - self._verify_hook(hc, hookfunction) + # Replace unspecced caller with specified one, migrating impls. + new_hc = _create_hook_caller( + name, self._hookexec, module_or_class, spec_opts + ) + for impl in hc.get_hookimpls(): + self._verify_hook(new_hc, impl) + new_hc._add_hookimpl(impl) + hc = new_hc + setattr(self.hook, name, hc) + else: + # Already specified - hc.spec is not None here + assert hc.spec is not None + raise ValueError( + f"Hook {name!r} is already registered " + f"within namespace {hc.spec.namespace!r}" + ) names.append(name) if not names: @@ -328,8 +363,10 @@ def get_name(self, plugin: _Plugin) -> str | None: return name return None - def _verify_hook(self, hook: HookCaller, hookimpl: HookImpl) -> None: - if hook.is_historic() and (hookimpl.hookwrapper or hookimpl.wrapper): + def _verify_hook( + self, hook: _HookCallerBase, hookimpl: _HookImplementation + ) -> None: + if hook.is_historic() and hookimpl.is_wrapper: raise PluginValidationError( hookimpl.plugin, f"Plugin {hookimpl.plugin_name!r}\nhook {hook.name!r}\n" @@ -357,9 +394,7 @@ def _verify_hook(self, hook: HookCaller, hookimpl: HookImpl) -> None: if argname_warning is not None: _warn_for_function(argname_warning, hookimpl.function) - if ( - hookimpl.wrapper or hookimpl.hookwrapper - ) and not inspect.isgeneratorfunction(hookimpl.function): + if hookimpl.is_wrapper and not inspect.isgeneratorfunction(hookimpl.function): raise PluginValidationError( hookimpl.plugin, f"Plugin {hookimpl.plugin_name!r} for hook {hook.name!r}\n" @@ -368,14 +403,6 @@ def _verify_hook(self, hook: HookCaller, hookimpl: HookImpl) -> None: "but function is not a generator function", ) - if hookimpl.wrapper and hookimpl.hookwrapper: - raise PluginValidationError( - hookimpl.plugin, - f"Plugin {hookimpl.plugin_name!r} for hook {hook.name!r}\n" - f"hookimpl definition: {_formatdef(hookimpl.function)}\n" - "The wrapper=True and hookwrapper=True options are mutually exclusive", - ) - def check_pending(self) -> None: """Verify that all hooks which have not been verified against a hook specification are optional, otherwise raise @@ -383,7 +410,7 @@ def check_pending(self) -> None: for name in self.hook.__dict__: if name[0] == "_": continue - hook: HookCaller = getattr(self.hook, name) + hook: _HookCallerBase = getattr(self.hook, name) if not hook.has_spec(): for hookimpl in hook.get_hookimpls(): if not hookimpl.optionalhook: @@ -431,7 +458,7 @@ def list_name_plugin(self) -> list[tuple[str, _Plugin]]: """Return a list of (name, plugin) pairs for all registered plugins.""" return list(self._name2plugin.items()) - def get_hookcallers(self, plugin: _Plugin) -> list[HookCaller] | None: + def get_hookcallers(self, plugin: _Plugin) -> list[_HookCallerBase] | None: """Get all hook callers for the specified plugin. :returns: @@ -466,13 +493,20 @@ def add_hookcall_monitoring( def traced_hookexec( hook_name: str, - hook_impls: Sequence[HookImpl], + normal_impls: Sequence[_NormalHookImplementation], + wrapper_impls: Sequence[_WrapperHookImplementation], caller_kwargs: Mapping[str, object], firstresult: bool, ) -> object | list[object]: + # Combine lists for public API backward compatibility + hook_impls: Sequence[HookImpl] = cast( + Sequence[HookImpl], [*normal_impls, *wrapper_impls] + ) before(hook_name, hook_impls, caller_kwargs) outcome = Result.from_call( - lambda: oldcall(hook_name, hook_impls, caller_kwargs, firstresult) + lambda: oldcall( + hook_name, normal_impls, wrapper_impls, caller_kwargs, firstresult + ) ) after(outcome, hook_name, hook_impls, caller_kwargs) return outcome.get_result() @@ -492,7 +526,9 @@ def enable_tracing(self) -> Callable[[], None]: hooktrace = self.trace.root.get("hook") def before( - hook_name: str, methods: Sequence[HookImpl], kwargs: Mapping[str, object] + hook_name: str, + hook_impls: Sequence[HookImpl], + kwargs: Mapping[str, object], ) -> None: hooktrace.root.indent += 1 hooktrace(hook_name, kwargs) @@ -500,7 +536,7 @@ def before( def after( outcome: Result[object], hook_name: str, - methods: Sequence[HookImpl], + hook_impls: Sequence[HookImpl], kwargs: Mapping[str, object], ) -> None: if outcome.exception is None: @@ -511,11 +547,11 @@ def after( def subset_hook_caller( self, name: str, remove_plugins: Iterable[_Plugin] - ) -> HookCaller: + ) -> _SpecifiedHookCaller | _SubsetHookCaller: """Return a proxy :class:`~pluggy.HookCaller` instance for the named method which manages calls to all registered plugins except the ones from remove_plugins.""" - orig: HookCaller = getattr(self.hook, name) + orig: _SpecifiedHookCaller = getattr(self.hook, name) plugins_to_remove = {plug for plug in remove_plugins if hasattr(plug, name)} if plugins_to_remove: return _SubsetHookCaller(orig, plugins_to_remove) diff --git a/testing/benchmark.py b/testing/benchmark.py index cc3be4eb..d91b03de 100644 --- a/testing/benchmark.py +++ b/testing/benchmark.py @@ -10,7 +10,9 @@ from pluggy import HookspecMarker from pluggy import PluginManager from pluggy._callers import _multicall -from pluggy._hooks import HookImpl +from pluggy._hooks import _create_hook_implementation +from pluggy._hooks import _NormalHookImplementation +from pluggy._hooks import _WrapperHookImplementation hookspec = HookspecMarker("example") @@ -40,13 +42,19 @@ def wrappers(request: Any) -> list[object]: def test_hook_and_wrappers_speed(benchmark, hooks, wrappers) -> None: def setup(): hook_name = "foo" - hook_impls = [] - for method in hooks + wrappers: - f = HookImpl(None, "", method, method.example_impl) - hook_impls.append(f) + normal_impls: list[_NormalHookImplementation] = [] + wrapper_impls: list[_WrapperHookImplementation] = [] + for method in hooks: + opts = method.example_impl + f = _create_hook_implementation(None, "", method, opts) + normal_impls.append(f) # type: ignore[arg-type] + for method in wrappers: + opts = method.example_impl + f = _create_hook_implementation(None, "", method, opts) + wrapper_impls.append(f) # type: ignore[arg-type] caller_kwargs = {"arg1": 1, "arg2": 2, "arg3": 3} firstresult = False - return (hook_name, hook_impls, caller_kwargs, firstresult), {} + return (hook_name, normal_impls, wrapper_impls, caller_kwargs, firstresult), {} benchmark.pedantic(_multicall, setup=setup, rounds=10) diff --git a/testing/test_details.py b/testing/test_details.py index de79d536..44914e5d 100644 --- a/testing/test_details.py +++ b/testing/test_details.py @@ -192,7 +192,7 @@ def myhook(self): plugin = Plugin() pname = pm.register(plugin) assert repr(pm.hook.myhook.get_hookimpls()[0]) == ( - f"" + f"<_NormalHookImplementation plugin_name={pname!r}, plugin={plugin!r}>" ) diff --git a/testing/test_hookcaller.py b/testing/test_hookcaller.py index 5c42f4b6..e567ab00 100644 --- a/testing/test_hookcaller.py +++ b/testing/test_hookcaller.py @@ -1,6 +1,7 @@ from collections.abc import Callable from collections.abc import Generator from collections.abc import Sequence +from typing import cast from typing import TypeVar import pytest @@ -9,8 +10,9 @@ from pluggy import HookspecMarker from pluggy import PluginManager from pluggy import PluginValidationError -from pluggy._hooks import HookCaller -from pluggy._hooks import HookImpl +from pluggy._hooks import _create_hook_implementation +from pluggy._hooks import _HookCallerBase +from pluggy._hooks import _HookImplementation hookspec = HookspecMarker("example") @@ -18,21 +20,21 @@ @pytest.fixture -def hc(pm: PluginManager) -> HookCaller: +def hc(pm: PluginManager) -> _HookCallerBase: class Hooks: @hookspec def he_method1(self, arg: object) -> None: pass pm.add_hookspecs(Hooks) - return pm.hook.he_method1 + return cast(_HookCallerBase, pm.hook.he_method1) FuncT = TypeVar("FuncT", bound=Callable[..., object]) class AddMeth: - def __init__(self, hc: HookCaller) -> None: + def __init__(self, hc: _HookCallerBase) -> None: self.hc = hc def __call__( @@ -50,7 +52,12 @@ def wrap(func: FuncT) -> FuncT: wrapper=wrapper, )(func) self.hc._add_hookimpl( - HookImpl(None, "", func, func.example_impl), # type: ignore[attr-defined] + _create_hook_implementation( + None, + "", + func, + func.example_impl, # type: ignore[attr-defined] + ), ) return func @@ -58,15 +65,15 @@ def wrap(func: FuncT) -> FuncT: @pytest.fixture -def addmeth(hc: HookCaller) -> AddMeth: +def addmeth(hc: _HookCallerBase) -> AddMeth: return AddMeth(hc) -def funcs(hookmethods: Sequence[HookImpl]) -> list[Callable[..., object]]: +def funcs(hookmethods: Sequence[_HookImplementation]) -> list[Callable[..., object]]: return [hookmethod.function for hookmethod in hookmethods] -def test_adding_nonwrappers(hc: HookCaller, addmeth: AddMeth) -> None: +def test_adding_nonwrappers(hc: _HookCallerBase, addmeth: AddMeth) -> None: @addmeth() def he_method1() -> None: pass @@ -82,7 +89,7 @@ def he_method3() -> None: assert funcs(hc.get_hookimpls()) == [he_method1, he_method2, he_method3] -def test_adding_nonwrappers_trylast(hc: HookCaller, addmeth: AddMeth) -> None: +def test_adding_nonwrappers_trylast(hc: _HookCallerBase, addmeth: AddMeth) -> None: @addmeth() def he_method1_middle() -> None: pass @@ -98,7 +105,7 @@ def he_method1_b() -> None: assert funcs(hc.get_hookimpls()) == [he_method1, he_method1_middle, he_method1_b] -def test_adding_nonwrappers_trylast3(hc: HookCaller, addmeth: AddMeth) -> None: +def test_adding_nonwrappers_trylast3(hc: _HookCallerBase, addmeth: AddMeth) -> None: @addmeth() def he_method1_a() -> None: pass @@ -123,7 +130,7 @@ def he_method1_d() -> None: ] -def test_adding_nonwrappers_trylast2(hc: HookCaller, addmeth: AddMeth) -> None: +def test_adding_nonwrappers_trylast2(hc: _HookCallerBase, addmeth: AddMeth) -> None: @addmeth() def he_method1_middle() -> None: pass @@ -139,7 +146,7 @@ def he_method1() -> None: assert funcs(hc.get_hookimpls()) == [he_method1, he_method1_middle, he_method1_b] -def test_adding_nonwrappers_tryfirst(hc: HookCaller, addmeth: AddMeth) -> None: +def test_adding_nonwrappers_tryfirst(hc: _HookCallerBase, addmeth: AddMeth) -> None: @addmeth(tryfirst=True) def he_method1() -> None: pass @@ -155,7 +162,7 @@ def he_method1_b() -> None: assert funcs(hc.get_hookimpls()) == [he_method1_middle, he_method1_b, he_method1] -def test_adding_wrappers_ordering(hc: HookCaller, addmeth: AddMeth) -> None: +def test_adding_wrappers_ordering(hc: _HookCallerBase, addmeth: AddMeth) -> None: @addmeth(hookwrapper=True) def he_method1(): yield # pragma: no cover @@ -185,7 +192,9 @@ def he_method3(): ] -def test_adding_wrappers_ordering_tryfirst(hc: HookCaller, addmeth: AddMeth) -> None: +def test_adding_wrappers_ordering_tryfirst( + hc: _HookCallerBase, addmeth: AddMeth +) -> None: @addmeth(hookwrapper=True, tryfirst=True) def he_method1(): yield # pragma: no cover @@ -201,7 +210,7 @@ def he_method3(): assert funcs(hc.get_hookimpls()) == [he_method2, he_method1, he_method3] -def test_adding_wrappers_complex(hc: HookCaller, addmeth: AddMeth) -> None: +def test_adding_wrappers_complex(hc: _HookCallerBase, addmeth: AddMeth) -> None: assert funcs(hc.get_hookimpls()) == [] @addmeth(hookwrapper=True, trylast=True) @@ -442,7 +451,7 @@ def conflict(self) -> None: ) -def test_call_extra_hook_order(hc: HookCaller, addmeth: AddMeth) -> None: +def test_call_extra_hook_order(hc: _HookCallerBase, addmeth: AddMeth) -> None: """Ensure that call_extra is calling hooks in the right order.""" order = [] diff --git a/testing/test_invocations.py b/testing/test_invocations.py index e24af750..7456fd95 100644 --- a/testing/test_invocations.py +++ b/testing/test_invocations.py @@ -40,7 +40,7 @@ def hello(self, arg): pm.add_hookspecs(Api) with pytest.raises(TypeError) as exc: - pm.hook.hello(3) # type: ignore[call-arg] + pm.hook.hello(3) message = "__call__() takes 1 positional argument but 2 were given" assert message in str(exc.value) diff --git a/testing/test_multicall.py b/testing/test_multicall.py index 93f394c8..9bfa8dcc 100644 --- a/testing/test_multicall.py +++ b/testing/test_multicall.py @@ -8,7 +8,9 @@ from pluggy import HookimplMarker from pluggy import HookspecMarker from pluggy._callers import _multicall -from pluggy._hooks import HookImpl +from pluggy._hooks import _create_hook_implementation +from pluggy._hooks import _NormalHookImplementation +from pluggy._hooks import _WrapperHookImplementation hookspec = HookspecMarker("example") @@ -21,11 +23,16 @@ def MC( firstresult: bool = False, ) -> object | list[object]: caller = _multicall - hookfuncs = [] + normal_funcs: list[_NormalHookImplementation] = [] + wrapper_funcs: list[_WrapperHookImplementation] = [] for method in methods: - f = HookImpl(None, "", method, method.example_impl) # type: ignore[attr-defined] - hookfuncs.append(f) - return caller("foo", hookfuncs, kwargs, firstresult) + opts = method.example_impl # type: ignore[attr-defined] + f = _create_hook_implementation(None, "", method, opts) + if f.is_wrapper: + wrapper_funcs.append(f) # type: ignore[arg-type] + else: + normal_funcs.append(f) # type: ignore[arg-type] + return caller("foo", normal_funcs, wrapper_funcs, kwargs, firstresult) def test_keyword_args() -> None: diff --git a/testing/test_pluginmanager.py b/testing/test_pluginmanager.py index f80b1b55..3acf256d 100644 --- a/testing/test_pluginmanager.py +++ b/testing/test_pluginmanager.py @@ -650,7 +650,7 @@ def after(outcome, hook_name, hook_impls, kwargs): assert isinstance(out[0][2], dict) assert out[1] == "he_method1-api2" assert out[2] == "he_method1-api1" - assert len(out[3]) == 4 + assert len(out[3]) == 4 # (outcome, hook_name, combined_impls, kwargs) assert out[3][1] == out[0][0] undo()