From b518f612112b2f47e2bf6a7320a312c30c4074ea Mon Sep 17 00:00:00 2001 From: Tim Li Date: Tue, 26 Aug 2025 12:09:31 -0700 Subject: [PATCH 1/8] Implement workflow & activity registry --- cadence/__init__.py | 14 + cadence/worker/__init__.py | 43 ++- cadence/worker/_registry.py | 515 ++++++++++++++++++++++++++ pyproject.toml | 1 + tests/cadence/worker/test_registry.py | 325 ++++++++++++++++ uv.lock | 2 + 6 files changed, 898 insertions(+), 2 deletions(-) create mode 100644 cadence/__init__.py create mode 100644 cadence/worker/_registry.py create mode 100644 tests/cadence/worker/test_registry.py diff --git a/cadence/__init__.py b/cadence/__init__.py new file mode 100644 index 0000000..175f01b --- /dev/null +++ b/cadence/__init__.py @@ -0,0 +1,14 @@ +""" +Cadence Python Client + +A Python framework for authoring workflows and activities for Cadence. +""" + +# Import main client functionality +from .client import Client + +__version__ = "0.1.0" + +__all__ = [ + "Client", +] diff --git a/cadence/worker/__init__.py b/cadence/worker/__init__.py index c2959b6..83e535e 100644 --- a/cadence/worker/__init__.py +++ b/cadence/worker/__init__.py @@ -5,7 +5,46 @@ WorkerOptions ) +from ._registry import ( + Registry, + RegistryError, + WorkflowNotFoundError, + ActivityNotFoundError, + DuplicateRegistrationError, + registry, + new_registry, + register_workflow, + register_activity, + get_workflow, + get_activity, + list_workflows, + list_activities, + has_workflow, + has_activity, + get_total_workflow_count, + get_total_activity_count, + clear_registry, +) + __all__ = [ "Worker", - "WorkerOptions" -] \ No newline at end of file + "WorkerOptions", + 'Registry', + 'RegistryError', + 'WorkflowNotFoundError', + 'ActivityNotFoundError', + 'DuplicateRegistrationError', + 'registry', + 'new_registry', + 'register_workflow', + 'register_activity', + 'get_workflow', + 'get_activity', + 'list_workflows', + 'list_activities', + 'has_workflow', + 'has_activity', + 'get_total_workflow_count', + 'get_total_activity_count', + 'clear_registry', +] diff --git a/cadence/worker/_registry.py b/cadence/worker/_registry.py new file mode 100644 index 0000000..8b34618 --- /dev/null +++ b/cadence/worker/_registry.py @@ -0,0 +1,515 @@ +#!/usr/bin/env python3 +""" +Workflow and Activity Registry for Cadence Python Client. + +This module provides a registry system for managing workflows and activities, +similar to the Go client's registry.go implementation. +""" + +import logging +from typing import Callable, Dict, List, Optional + + +logger = logging.getLogger(__name__) + + +class RegistryError(Exception): + """Base exception for registry operations.""" + pass + + +class WorkflowNotFoundError(RegistryError): + """Raised when a workflow is not found in the registry.""" + pass + + +class ActivityNotFoundError(RegistryError): + """Raised when an activity is not found in the registry.""" + pass + + +class DuplicateRegistrationError(RegistryError): + """Raised when attempting to register a duplicate workflow or activity.""" + pass + + +class Registry: + """ + Registry for managing workflows and activities. + + This class provides functionality to register, retrieve, and manage + workflows and activities in a Cadence application. + """ + + def __init__(self, next_registry: Optional['Registry'] = None): + """ + Initialize the registry. + + Args: + next_registry: Optional next registry in the chain for delegation + """ + self._workflows: Dict[str, Callable] = {} + self._activities: Dict[str, Callable] = {} + self._workflow_aliases: Dict[str, str] = {} # alias -> name mapping + self._activity_aliases: Dict[str, str] = {} # alias -> name mapping + self._next_registry = next_registry + + def register_workflow( + self, + func: Optional[Callable] = None, + *, + name: Optional[str] = None, + alias: Optional[str] = None + ) -> Callable: + """ + Register a workflow function. + + This method can be used as a decorator or called directly. + + Args: + func: The workflow function to register + name: Name of the workflow (defaults to function name) + alias: Alternative name for the workflow + + Returns: + The decorated function or the function itself + + Raises: + DuplicateRegistrationError: If workflow name already exists + """ + def decorator(f: Callable) -> Callable: + workflow_name = name or f.__name__ + + if workflow_name in self._workflows: + raise DuplicateRegistrationError( + f"Workflow '{workflow_name}' is already registered" + ) + + self._workflows[workflow_name] = f + + # Register alias if provided + if alias: + if alias in self._workflow_aliases: + raise DuplicateRegistrationError( + f"Workflow alias '{alias}' is already registered" + ) + self._workflow_aliases[alias] = workflow_name + + logger.info(f"Registered workflow '{workflow_name}'") + return f + + if func is None: + return decorator + return decorator(func) + + def register_activity( + self, + func: Optional[Callable] = None, + *, + name: Optional[str] = None, + alias: Optional[str] = None + ) -> Callable: + """ + Register an activity function. + + This method can be used as a decorator or called directly. + + Args: + func: The activity function to register + name: Name of the activity (defaults to function name) + alias: Alternative name for the activity + + Returns: + The decorated function or the function itself + + Raises: + DuplicateRegistrationError: If activity name already exists + """ + def decorator(f: Callable) -> Callable: + activity_name = name or f.__name__ + + if activity_name in self._activities: + raise DuplicateRegistrationError( + f"Activity '{activity_name}' is already registered" + ) + + self._activities[activity_name] = f + + # Register alias if provided + if alias: + if alias in self._activity_aliases: + raise DuplicateRegistrationError( + f"Activity alias '{alias}' is already registered" + ) + self._activity_aliases[alias] = activity_name + + logger.info(f"Registered activity '{activity_name}'") + return f + + if func is None: + return decorator + return decorator(func) + + def get_workflow(self, name: str) -> Callable: + """ + Get a registered workflow by name. + + Args: + name: Name or alias of the workflow + + Returns: + The workflow function + + Raises: + WorkflowNotFoundError: If workflow is not found + """ + # Check if it's an alias + actual_name = self._workflow_aliases.get(name, name) + + if actual_name in self._workflows: + return self._workflows[actual_name] + + # Try the next registry in the chain if available + if self._next_registry: + try: + return self._next_registry.get_workflow(name) + except WorkflowNotFoundError: + pass + + raise WorkflowNotFoundError(f"Workflow '{name}' not found in registry") + + def get_activity(self, name: str) -> Callable: + """ + Get a registered activity by name. + + Args: + name: Name or alias of the activity + + Returns: + The activity function + + Raises: + ActivityNotFoundError: If activity is not found + """ + # Check if it's an alias + actual_name = self._activity_aliases.get(name, name) + + if actual_name in self._activities: + return self._activities[actual_name] + + # Try the next registry in the chain if available + if self._next_registry: + try: + return self._next_registry.get_activity(name) + except ActivityNotFoundError: + pass + + raise ActivityNotFoundError(f"Activity '{name}' not found in registry") + + def list_workflows(self) -> List[str]: + """ + Get a list of all registered workflow names. + + Returns: + List of workflow names + """ + workflows = list(self._workflows.keys()) + if self._next_registry: + workflows.extend(self._next_registry.list_workflows()) + return workflows + + def list_activities(self) -> List[str]: + """ + Get a list of all registered activity names. + + Returns: + List of activity names + """ + activities = list(self._activities.keys()) + if self._next_registry: + activities.extend(self._next_registry.list_activities()) + return activities + + def list_workflow_aliases(self) -> Dict[str, str]: + """ + Get a mapping of workflow aliases to actual names. + + Returns: + Dictionary mapping aliases to workflow names + """ + return self._workflow_aliases.copy() + + def list_activity_aliases(self) -> Dict[str, str]: + """ + Get a mapping of activity aliases to actual names. + + Returns: + Dictionary mapping aliases to activity names + """ + return self._activity_aliases.copy() + + def unregister_workflow(self, name: str) -> bool: + """ + Unregister a workflow. + + Args: + name: Name or alias of the workflow to unregister + + Returns: + True if workflow was unregistered, False if not found + """ + # Check if it's an alias + actual_name = self._workflow_aliases.get(name, name) + + if actual_name in self._workflows: + del self._workflows[actual_name] + + # Remove any aliases pointing to this workflow + aliases_to_remove = [ + alias for alias, workflow_name in self._workflow_aliases.items() + if workflow_name == actual_name + ] + for alias in aliases_to_remove: + del self._workflow_aliases[alias] + + logger.info(f"Unregistered workflow '{actual_name}'") + return True + + return False + + def unregister_activity(self, name: str) -> bool: + """ + Unregister an activity. + + Args: + name: Name or alias of the activity to unregister + + Returns: + True if activity was unregistered, False if not found + """ + # Check if it's an alias + actual_name = self._activity_aliases.get(name, name) + + if actual_name in self._activities: + del self._activities[actual_name] + + # Remove any aliases pointing to this activity + aliases_to_remove = [ + alias for alias, activity_name in self._activity_aliases.items() + if activity_name == actual_name + ] + for alias in aliases_to_remove: + del self._activity_aliases[alias] + + logger.info(f"Unregistered activity '{actual_name}'") + return True + + return False + + def clear(self): + """Clear all registered workflows and activities.""" + self._workflows.clear() + self._activities.clear() + self._workflow_aliases.clear() + self._activity_aliases.clear() + logger.info("Registry cleared") + + def get_workflow_count(self) -> int: + """ + Get the total number of registered workflows in this registry. + + Note: This only counts workflows in this registry, not in chained registries. + Use get_total_workflow_count() to get the total count across all chained registries. + """ + return len(self._workflows) + + def get_activity_count(self) -> int: + """ + Get the total number of registered activities in this registry. + + Note: This only counts activities in this registry, not in chained registries. + Use get_total_workflow_count() to get the total count across all chained registries. + """ + return len(self._activities) + + def get_total_workflow_count(self) -> int: + """Get the total number of registered workflows across all chained registries.""" + count = len(self._workflows) + if self._next_registry: + count += self._next_registry.get_total_workflow_count() + return count + + def get_total_activity_count(self) -> int: + """Get the total number of registered activities across all chained registries.""" + count = len(self._activities) + if self._next_registry: + count += self._next_registry.get_total_activity_count() + return count + + def has_workflow(self, name: str) -> bool: + """ + Check if a workflow is registered in this registry or any chained registries. + + Args: + name: Name or alias of the workflow + + Returns: + True if workflow exists anywhere in the chain, False otherwise + """ + actual_name = self._workflow_aliases.get(name, name) + if actual_name in self._workflows: + return True + + # Check the next registry in the chain if available + if self._next_registry: + return self._next_registry.has_workflow(name) + + return False + + def has_activity(self, name: str) -> bool: + """ + Check if an activity is registered in this registry or any chained registries. + + Args: + name: Name or alias of the activity + + Returns: + True if activity exists anywhere in the chain, False otherwise + """ + actual_name = self._activity_aliases.get(name, name) + if actual_name in self._activities: + return True + + # Check the next registry in the chain if available + if self._next_registry: + return self._next_registry.has_activity(name) + + return False + + def set_next_registry(self, next_registry: 'Registry') -> None: + """ + Set the next registry in the chain. + + Args: + next_registry: The registry to chain to + """ + self._next_registry = next_registry + + def get_next_registry(self) -> Optional['Registry']: + """ + Get the next registry in the chain. + + Returns: + The next registry or None if no chaining + """ + return self._next_registry + + def has_next_registry(self) -> bool: + """ + Check if this registry has a next registry in the chain. + + Returns: + True if there is a next registry, False otherwise + """ + return self._next_registry is not None + + +# Global registry instance +registry = Registry() + +def new_registry() -> Registry: + """ + Create a new registry that automatically chains to the global registry. + + This follows the Go client pattern where new registries automatically + delegate to the global registry when items are not found locally. + + Returns: + A new Registry instance chained to the global registry + """ + return Registry(next_registry=registry) + + +# Convenience functions for using the global registry +def register_workflow( + func: Optional[Callable] = None, + **kwargs +) -> Callable: + """Register a workflow with the global registry.""" + return registry.register_workflow(func, **kwargs) + + +def register_activity( + func: Optional[Callable] = None, + **kwargs +) -> Callable: + """Register an activity with the global registry.""" + return registry.register_activity(func, **kwargs) + + +def get_workflow(name: str) -> Callable: + """Get a workflow from the global registry.""" + return registry.get_workflow(name) + + +def get_activity(name: str) -> Callable: + """Get an activity from the global registry.""" + return registry.get_activity(name) + + +def list_workflows() -> List[str]: + """List all workflows in the global registry.""" + return registry.list_workflows() + + +def list_activities() -> List[str]: + """List all activities in the global registry.""" + return registry.list_activities() + + +def has_workflow(name: str) -> bool: + """Check if a workflow exists in the global registry.""" + return registry.has_workflow(name) + + +def has_activity(name: str) -> bool: + """Check if an activity exists in the global registry.""" + return registry.has_activity(name) + + +def get_total_workflow_count() -> int: + """Get the total number of workflows across all chained registries.""" + return registry.get_total_workflow_count() + + +def get_total_activity_count() -> int: + """Get the total number of activities across all chained registries.""" + return registry.get_total_activity_count() + + +def clear_registry(): + """Clear the global registry.""" + registry.clear() + + +# Export the main classes and functions +__all__ = [ + 'Registry', + 'RegistryError', + 'WorkflowNotFoundError', + 'ActivityNotFoundError', + 'DuplicateRegistrationError', + 'registry', + 'new_registry', + 'register_workflow', + 'register_activity', + 'get_workflow', + 'get_activity', + 'list_workflows', + 'list_activities', + 'has_workflow', + 'has_activity', + 'get_total_workflow_count', + 'get_total_activity_count', + 'clear_registry', +] diff --git a/pyproject.toml b/pyproject.toml index b3a1dba..1ebc6ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "grpcio>=1.50.0", "grpcio-tools>=1.50.0", "protobuf==5.29.1", + "pytest>=8.4.1", "typing-extensions>=4.0.0", ] diff --git a/tests/cadence/worker/test_registry.py b/tests/cadence/worker/test_registry.py new file mode 100644 index 0000000..567cbc2 --- /dev/null +++ b/tests/cadence/worker/test_registry.py @@ -0,0 +1,325 @@ +#!/usr/bin/env python3 +""" +Tests for the registry chaining functionality. +""" + +import pytest +import sys +import os + +# Add the project root to the path +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) + +from cadence.worker._registry import ( + Registry, new_registry, registry, + RegistryError, WorkflowNotFoundError, ActivityNotFoundError, DuplicateRegistrationError +) + + +class TestRegistryChaining: + """Test registry chaining functionality.""" + + def setup_method(self): + """Set up test fixtures.""" + # Clear the global registry before each test + registry.clear() + + def test_basic_registry_creation(self): + """Test basic registry creation.""" + reg = Registry() + assert reg.get_workflow_count() == 0 + assert reg.get_activity_count() == 0 + assert not reg.has_next_registry() + + def test_new_registry_chains_to_global(self): + """Test that new_registry() automatically chains to global registry.""" + local_reg = new_registry() + assert local_reg.has_next_registry() + assert local_reg.get_next_registry() is registry + + def test_register_and_retrieve_workflow(self): + """Test registering and retrieving workflows.""" + reg = Registry() + + @reg.register_workflow + def test_workflow(): + return "test" + + assert reg.has_workflow("test_workflow") + wf = reg.get_workflow("test_workflow") + assert wf() == "test" + + def test_register_and_retrieve_activity(self): + """Test registering and retrieving activities.""" + reg = Registry() + + @reg.register_activity + def test_activity(): + return "test" + + assert reg.has_activity("test_activity") + act = reg.get_activity("test_activity") + assert act() == "test" + + def test_workflow_chaining(self): + """Test workflow lookup through registry chain.""" + # Register in global registry + @registry.register_workflow + def global_workflow(): + return "global" + + # Create local registry that chains to global + local_reg = new_registry() + + # Register in local registry + @local_reg.register_workflow + def local_workflow(): + return "local" + + # Test that local registry can access both + assert local_reg.has_workflow("local_workflow") + assert local_reg.has_workflow("global_workflow") + + local_wf = local_reg.get_workflow("local_workflow") + global_wf = local_reg.get_workflow("global_workflow") + + assert local_wf() == "local" + assert global_wf() == "global" + + def test_activity_chaining(self): + """Test activity lookup through registry chain.""" + # Register in global registry + @registry.register_activity + def global_activity(): + return "global" + + # Create local registry that chains to global + local_reg = new_registry() + + # Register in local registry + @local_reg.register_activity + def local_activity(): + return "local" + + # Test that local registry can access both + assert local_reg.has_activity("local_activity") + assert local_reg.has_activity("global_activity") + + local_act = local_reg.get_activity("local_activity") + global_act = local_reg.get_activity("global_activity") + + assert local_act() == "local" + assert global_act() == "global" + + def test_multi_level_chaining(self): + """Test multi-level registry chaining.""" + # Global registry + @registry.register_workflow + def global_workflow(): + return "global" + + # Local registry + local_reg = new_registry() + @local_reg.register_workflow + def local_workflow(): + return "local" + + # Sub registry + sub_reg = Registry() + @sub_reg.register_workflow + def sub_workflow(): + return "sub" + + # Chain: sub -> local -> global + sub_reg.set_next_registry(local_reg) + + # Test that sub registry can access all levels + assert sub_reg.has_workflow("sub_workflow") + assert sub_reg.has_workflow("local_workflow") + assert sub_reg.has_workflow("global_workflow") + + sub_wf = sub_reg.get_workflow("sub_workflow") + local_wf = sub_reg.get_workflow("local_workflow") + global_wf = sub_reg.get_workflow("global_workflow") + + assert sub_wf() == "sub" + assert local_wf() == "local" + assert global_wf() == "global" + + def test_list_aggregation(self): + """Test that list methods aggregate from all chained registries.""" + # Global registry + @registry.register_workflow + def global_workflow(): + return "global" + + # Local registry + local_reg = new_registry() + @local_reg.register_workflow + def local_workflow(): + return "local" + + # Sub registry + sub_reg = Registry() + @sub_reg.register_workflow + def sub_workflow(): + return "sub" + sub_reg.set_next_registry(local_reg) + + # Test list aggregation + global_workflows = registry.list_workflows() + local_workflows = local_reg.list_workflows() + sub_workflows = sub_reg.list_workflows() + + assert "global_workflow" in global_workflows + assert "local_workflow" in local_workflows + assert "global_workflow" in local_workflows + assert "sub_workflow" in sub_workflows + assert "local_workflow" in sub_workflows + assert "global_workflow" in sub_workflows + + def test_count_aggregation(self): + """Test that count methods aggregate from all chained registries.""" + # Global registry + @registry.register_workflow + def global_workflow(): + return "global" + + # Local registry + local_reg = new_registry() + @local_reg.register_workflow + def local_workflow(): + return "local" + + # Sub registry + sub_reg = Registry() + @sub_reg.register_workflow + def sub_workflow(): + return "sub" + sub_reg.set_next_registry(local_reg) + + # Test count aggregation + assert registry.get_workflow_count() == 1 + assert local_reg.get_workflow_count() == 1 + assert sub_reg.get_workflow_count() == 1 + + assert registry.get_total_workflow_count() == 1 + assert local_reg.get_total_workflow_count() == 2 + assert sub_reg.get_total_workflow_count() == 3 + + def test_chain_management(self): + """Test chain management methods.""" + reg1 = Registry() + reg2 = Registry() + + # Test setting next registry + reg1.set_next_registry(reg2) + assert reg1.has_next_registry() + assert reg1.get_next_registry() is reg2 + + # Test that reg2 has no next registry + assert not reg2.has_next_registry() + assert reg2.get_next_registry() is None + + def test_workflow_not_found_error(self): + """Test WorkflowNotFoundError is raised when workflow not found.""" + reg = Registry() + + with pytest.raises(WorkflowNotFoundError): + reg.get_workflow("nonexistent") + + def test_activity_not_found_error(self): + """Test ActivityNotFoundError is raised when activity not found.""" + reg = Registry() + + with pytest.raises(ActivityNotFoundError): + reg.get_activity("nonexistent") + + def test_duplicate_registration_error(self): + """Test DuplicateRegistrationError is raised for duplicate registrations.""" + reg = Registry() + + @reg.register_workflow + def test_workflow(): + return "test" + + with pytest.raises(DuplicateRegistrationError): + @reg.register_workflow + def test_workflow(): + return "duplicate" + + def test_workflow_alias(self): + """Test workflow alias functionality.""" + reg = Registry() + + @reg.register_workflow(name="custom_name") + def test_workflow(): + return "test" + + assert reg.has_workflow("custom_name") + wf = reg.get_workflow("custom_name") + assert wf() == "test" + + def test_activity_alias(self): + """Test activity alias functionality.""" + reg = Registry() + + @reg.register_activity(alias="custom_alias") + def test_activity(): + return "test" + + assert reg.has_activity("custom_alias") + act = reg.get_activity("custom_alias") + assert act() == "test" + + def test_unregister_workflow(self): + """Test unregistering workflows.""" + reg = Registry() + + @reg.register_workflow + def test_workflow(): + return "test" + + assert reg.has_workflow("test_workflow") + assert reg.unregister_workflow("test_workflow") + assert not reg.has_workflow("test_workflow") + assert not reg.unregister_workflow("test_workflow") # Already unregistered + + def test_unregister_activity(self): + """Test unregistering activities.""" + reg = Registry() + + @reg.register_activity + def test_activity(): + return "test" + + assert reg.has_activity("test_activity") + assert reg.unregister_activity("test_activity") + assert not reg.has_activity("test_activity") + assert not reg.unregister_activity("test_activity") # Already unregistered + + def test_clear_registry(self): + """Test clearing registry.""" + reg = Registry() + + @reg.register_workflow + def test_workflow(): + return "test" + + @reg.register_activity + def test_activity(): + return "test" + + assert reg.get_workflow_count() == 1 + assert reg.get_activity_count() == 1 + + reg.clear() + + assert reg.get_workflow_count() == 0 + assert reg.get_activity_count() == 0 + assert not reg.has_workflow("test_workflow") + assert not reg.has_activity("test_activity") + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/uv.lock b/uv.lock index 4fb2f1a..2b75b24 100644 --- a/uv.lock +++ b/uv.lock @@ -155,6 +155,7 @@ dependencies = [ { name = "grpcio" }, { name = "grpcio-tools" }, { name = "protobuf" }, + { name = "pytest" }, { name = "typing-extensions" }, ] @@ -191,6 +192,7 @@ requires-dist = [ { name = "myst-parser", marker = "extra == 'docs'", specifier = ">=1.0.0" }, { name = "pre-commit", marker = "extra == 'dev'", specifier = ">=3.0.0" }, { name = "protobuf", specifier = "==5.29.1" }, + { name = "pytest", specifier = ">=8.4.1" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=7.0.0" }, { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.21.0" }, { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=4.0.0" }, From 52af499c853653e59ed38a5e85fb1589dada2f40 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Tue, 26 Aug 2025 12:14:20 -0700 Subject: [PATCH 2/8] linter --- tests/cadence/worker/test_registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cadence/worker/test_registry.py b/tests/cadence/worker/test_registry.py index 567cbc2..83692df 100644 --- a/tests/cadence/worker/test_registry.py +++ b/tests/cadence/worker/test_registry.py @@ -12,7 +12,7 @@ from cadence.worker._registry import ( Registry, new_registry, registry, - RegistryError, WorkflowNotFoundError, ActivityNotFoundError, DuplicateRegistrationError + WorkflowNotFoundError, ActivityNotFoundError, DuplicateRegistrationError ) From c9e57209964f5ac0bf36651c8832b8026dff2ac6 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Tue, 26 Aug 2025 12:09:31 -0700 Subject: [PATCH 3/8] Implement workflow & activity registry --- cadence/__init__.py | 14 + cadence/worker/__init__.py | 43 ++- cadence/worker/_registry.py | 515 ++++++++++++++++++++++++++ pyproject.toml | 1 + tests/cadence/worker/test_registry.py | 325 ++++++++++++++++ uv.lock | 2 + 6 files changed, 898 insertions(+), 2 deletions(-) create mode 100644 cadence/__init__.py create mode 100644 cadence/worker/_registry.py create mode 100644 tests/cadence/worker/test_registry.py diff --git a/cadence/__init__.py b/cadence/__init__.py new file mode 100644 index 0000000..175f01b --- /dev/null +++ b/cadence/__init__.py @@ -0,0 +1,14 @@ +""" +Cadence Python Client + +A Python framework for authoring workflows and activities for Cadence. +""" + +# Import main client functionality +from .client import Client + +__version__ = "0.1.0" + +__all__ = [ + "Client", +] diff --git a/cadence/worker/__init__.py b/cadence/worker/__init__.py index c2959b6..83e535e 100644 --- a/cadence/worker/__init__.py +++ b/cadence/worker/__init__.py @@ -5,7 +5,46 @@ WorkerOptions ) +from ._registry import ( + Registry, + RegistryError, + WorkflowNotFoundError, + ActivityNotFoundError, + DuplicateRegistrationError, + registry, + new_registry, + register_workflow, + register_activity, + get_workflow, + get_activity, + list_workflows, + list_activities, + has_workflow, + has_activity, + get_total_workflow_count, + get_total_activity_count, + clear_registry, +) + __all__ = [ "Worker", - "WorkerOptions" -] \ No newline at end of file + "WorkerOptions", + 'Registry', + 'RegistryError', + 'WorkflowNotFoundError', + 'ActivityNotFoundError', + 'DuplicateRegistrationError', + 'registry', + 'new_registry', + 'register_workflow', + 'register_activity', + 'get_workflow', + 'get_activity', + 'list_workflows', + 'list_activities', + 'has_workflow', + 'has_activity', + 'get_total_workflow_count', + 'get_total_activity_count', + 'clear_registry', +] diff --git a/cadence/worker/_registry.py b/cadence/worker/_registry.py new file mode 100644 index 0000000..8b34618 --- /dev/null +++ b/cadence/worker/_registry.py @@ -0,0 +1,515 @@ +#!/usr/bin/env python3 +""" +Workflow and Activity Registry for Cadence Python Client. + +This module provides a registry system for managing workflows and activities, +similar to the Go client's registry.go implementation. +""" + +import logging +from typing import Callable, Dict, List, Optional + + +logger = logging.getLogger(__name__) + + +class RegistryError(Exception): + """Base exception for registry operations.""" + pass + + +class WorkflowNotFoundError(RegistryError): + """Raised when a workflow is not found in the registry.""" + pass + + +class ActivityNotFoundError(RegistryError): + """Raised when an activity is not found in the registry.""" + pass + + +class DuplicateRegistrationError(RegistryError): + """Raised when attempting to register a duplicate workflow or activity.""" + pass + + +class Registry: + """ + Registry for managing workflows and activities. + + This class provides functionality to register, retrieve, and manage + workflows and activities in a Cadence application. + """ + + def __init__(self, next_registry: Optional['Registry'] = None): + """ + Initialize the registry. + + Args: + next_registry: Optional next registry in the chain for delegation + """ + self._workflows: Dict[str, Callable] = {} + self._activities: Dict[str, Callable] = {} + self._workflow_aliases: Dict[str, str] = {} # alias -> name mapping + self._activity_aliases: Dict[str, str] = {} # alias -> name mapping + self._next_registry = next_registry + + def register_workflow( + self, + func: Optional[Callable] = None, + *, + name: Optional[str] = None, + alias: Optional[str] = None + ) -> Callable: + """ + Register a workflow function. + + This method can be used as a decorator or called directly. + + Args: + func: The workflow function to register + name: Name of the workflow (defaults to function name) + alias: Alternative name for the workflow + + Returns: + The decorated function or the function itself + + Raises: + DuplicateRegistrationError: If workflow name already exists + """ + def decorator(f: Callable) -> Callable: + workflow_name = name or f.__name__ + + if workflow_name in self._workflows: + raise DuplicateRegistrationError( + f"Workflow '{workflow_name}' is already registered" + ) + + self._workflows[workflow_name] = f + + # Register alias if provided + if alias: + if alias in self._workflow_aliases: + raise DuplicateRegistrationError( + f"Workflow alias '{alias}' is already registered" + ) + self._workflow_aliases[alias] = workflow_name + + logger.info(f"Registered workflow '{workflow_name}'") + return f + + if func is None: + return decorator + return decorator(func) + + def register_activity( + self, + func: Optional[Callable] = None, + *, + name: Optional[str] = None, + alias: Optional[str] = None + ) -> Callable: + """ + Register an activity function. + + This method can be used as a decorator or called directly. + + Args: + func: The activity function to register + name: Name of the activity (defaults to function name) + alias: Alternative name for the activity + + Returns: + The decorated function or the function itself + + Raises: + DuplicateRegistrationError: If activity name already exists + """ + def decorator(f: Callable) -> Callable: + activity_name = name or f.__name__ + + if activity_name in self._activities: + raise DuplicateRegistrationError( + f"Activity '{activity_name}' is already registered" + ) + + self._activities[activity_name] = f + + # Register alias if provided + if alias: + if alias in self._activity_aliases: + raise DuplicateRegistrationError( + f"Activity alias '{alias}' is already registered" + ) + self._activity_aliases[alias] = activity_name + + logger.info(f"Registered activity '{activity_name}'") + return f + + if func is None: + return decorator + return decorator(func) + + def get_workflow(self, name: str) -> Callable: + """ + Get a registered workflow by name. + + Args: + name: Name or alias of the workflow + + Returns: + The workflow function + + Raises: + WorkflowNotFoundError: If workflow is not found + """ + # Check if it's an alias + actual_name = self._workflow_aliases.get(name, name) + + if actual_name in self._workflows: + return self._workflows[actual_name] + + # Try the next registry in the chain if available + if self._next_registry: + try: + return self._next_registry.get_workflow(name) + except WorkflowNotFoundError: + pass + + raise WorkflowNotFoundError(f"Workflow '{name}' not found in registry") + + def get_activity(self, name: str) -> Callable: + """ + Get a registered activity by name. + + Args: + name: Name or alias of the activity + + Returns: + The activity function + + Raises: + ActivityNotFoundError: If activity is not found + """ + # Check if it's an alias + actual_name = self._activity_aliases.get(name, name) + + if actual_name in self._activities: + return self._activities[actual_name] + + # Try the next registry in the chain if available + if self._next_registry: + try: + return self._next_registry.get_activity(name) + except ActivityNotFoundError: + pass + + raise ActivityNotFoundError(f"Activity '{name}' not found in registry") + + def list_workflows(self) -> List[str]: + """ + Get a list of all registered workflow names. + + Returns: + List of workflow names + """ + workflows = list(self._workflows.keys()) + if self._next_registry: + workflows.extend(self._next_registry.list_workflows()) + return workflows + + def list_activities(self) -> List[str]: + """ + Get a list of all registered activity names. + + Returns: + List of activity names + """ + activities = list(self._activities.keys()) + if self._next_registry: + activities.extend(self._next_registry.list_activities()) + return activities + + def list_workflow_aliases(self) -> Dict[str, str]: + """ + Get a mapping of workflow aliases to actual names. + + Returns: + Dictionary mapping aliases to workflow names + """ + return self._workflow_aliases.copy() + + def list_activity_aliases(self) -> Dict[str, str]: + """ + Get a mapping of activity aliases to actual names. + + Returns: + Dictionary mapping aliases to activity names + """ + return self._activity_aliases.copy() + + def unregister_workflow(self, name: str) -> bool: + """ + Unregister a workflow. + + Args: + name: Name or alias of the workflow to unregister + + Returns: + True if workflow was unregistered, False if not found + """ + # Check if it's an alias + actual_name = self._workflow_aliases.get(name, name) + + if actual_name in self._workflows: + del self._workflows[actual_name] + + # Remove any aliases pointing to this workflow + aliases_to_remove = [ + alias for alias, workflow_name in self._workflow_aliases.items() + if workflow_name == actual_name + ] + for alias in aliases_to_remove: + del self._workflow_aliases[alias] + + logger.info(f"Unregistered workflow '{actual_name}'") + return True + + return False + + def unregister_activity(self, name: str) -> bool: + """ + Unregister an activity. + + Args: + name: Name or alias of the activity to unregister + + Returns: + True if activity was unregistered, False if not found + """ + # Check if it's an alias + actual_name = self._activity_aliases.get(name, name) + + if actual_name in self._activities: + del self._activities[actual_name] + + # Remove any aliases pointing to this activity + aliases_to_remove = [ + alias for alias, activity_name in self._activity_aliases.items() + if activity_name == actual_name + ] + for alias in aliases_to_remove: + del self._activity_aliases[alias] + + logger.info(f"Unregistered activity '{actual_name}'") + return True + + return False + + def clear(self): + """Clear all registered workflows and activities.""" + self._workflows.clear() + self._activities.clear() + self._workflow_aliases.clear() + self._activity_aliases.clear() + logger.info("Registry cleared") + + def get_workflow_count(self) -> int: + """ + Get the total number of registered workflows in this registry. + + Note: This only counts workflows in this registry, not in chained registries. + Use get_total_workflow_count() to get the total count across all chained registries. + """ + return len(self._workflows) + + def get_activity_count(self) -> int: + """ + Get the total number of registered activities in this registry. + + Note: This only counts activities in this registry, not in chained registries. + Use get_total_workflow_count() to get the total count across all chained registries. + """ + return len(self._activities) + + def get_total_workflow_count(self) -> int: + """Get the total number of registered workflows across all chained registries.""" + count = len(self._workflows) + if self._next_registry: + count += self._next_registry.get_total_workflow_count() + return count + + def get_total_activity_count(self) -> int: + """Get the total number of registered activities across all chained registries.""" + count = len(self._activities) + if self._next_registry: + count += self._next_registry.get_total_activity_count() + return count + + def has_workflow(self, name: str) -> bool: + """ + Check if a workflow is registered in this registry or any chained registries. + + Args: + name: Name or alias of the workflow + + Returns: + True if workflow exists anywhere in the chain, False otherwise + """ + actual_name = self._workflow_aliases.get(name, name) + if actual_name in self._workflows: + return True + + # Check the next registry in the chain if available + if self._next_registry: + return self._next_registry.has_workflow(name) + + return False + + def has_activity(self, name: str) -> bool: + """ + Check if an activity is registered in this registry or any chained registries. + + Args: + name: Name or alias of the activity + + Returns: + True if activity exists anywhere in the chain, False otherwise + """ + actual_name = self._activity_aliases.get(name, name) + if actual_name in self._activities: + return True + + # Check the next registry in the chain if available + if self._next_registry: + return self._next_registry.has_activity(name) + + return False + + def set_next_registry(self, next_registry: 'Registry') -> None: + """ + Set the next registry in the chain. + + Args: + next_registry: The registry to chain to + """ + self._next_registry = next_registry + + def get_next_registry(self) -> Optional['Registry']: + """ + Get the next registry in the chain. + + Returns: + The next registry or None if no chaining + """ + return self._next_registry + + def has_next_registry(self) -> bool: + """ + Check if this registry has a next registry in the chain. + + Returns: + True if there is a next registry, False otherwise + """ + return self._next_registry is not None + + +# Global registry instance +registry = Registry() + +def new_registry() -> Registry: + """ + Create a new registry that automatically chains to the global registry. + + This follows the Go client pattern where new registries automatically + delegate to the global registry when items are not found locally. + + Returns: + A new Registry instance chained to the global registry + """ + return Registry(next_registry=registry) + + +# Convenience functions for using the global registry +def register_workflow( + func: Optional[Callable] = None, + **kwargs +) -> Callable: + """Register a workflow with the global registry.""" + return registry.register_workflow(func, **kwargs) + + +def register_activity( + func: Optional[Callable] = None, + **kwargs +) -> Callable: + """Register an activity with the global registry.""" + return registry.register_activity(func, **kwargs) + + +def get_workflow(name: str) -> Callable: + """Get a workflow from the global registry.""" + return registry.get_workflow(name) + + +def get_activity(name: str) -> Callable: + """Get an activity from the global registry.""" + return registry.get_activity(name) + + +def list_workflows() -> List[str]: + """List all workflows in the global registry.""" + return registry.list_workflows() + + +def list_activities() -> List[str]: + """List all activities in the global registry.""" + return registry.list_activities() + + +def has_workflow(name: str) -> bool: + """Check if a workflow exists in the global registry.""" + return registry.has_workflow(name) + + +def has_activity(name: str) -> bool: + """Check if an activity exists in the global registry.""" + return registry.has_activity(name) + + +def get_total_workflow_count() -> int: + """Get the total number of workflows across all chained registries.""" + return registry.get_total_workflow_count() + + +def get_total_activity_count() -> int: + """Get the total number of activities across all chained registries.""" + return registry.get_total_activity_count() + + +def clear_registry(): + """Clear the global registry.""" + registry.clear() + + +# Export the main classes and functions +__all__ = [ + 'Registry', + 'RegistryError', + 'WorkflowNotFoundError', + 'ActivityNotFoundError', + 'DuplicateRegistrationError', + 'registry', + 'new_registry', + 'register_workflow', + 'register_activity', + 'get_workflow', + 'get_activity', + 'list_workflows', + 'list_activities', + 'has_workflow', + 'has_activity', + 'get_total_workflow_count', + 'get_total_activity_count', + 'clear_registry', +] diff --git a/pyproject.toml b/pyproject.toml index 8db2705..db87eb6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "grpcio>=1.50.0", "grpcio-tools>=1.50.0", "protobuf==5.29.1", + "pytest>=8.4.1", "typing-extensions>=4.0.0", ] diff --git a/tests/cadence/worker/test_registry.py b/tests/cadence/worker/test_registry.py new file mode 100644 index 0000000..567cbc2 --- /dev/null +++ b/tests/cadence/worker/test_registry.py @@ -0,0 +1,325 @@ +#!/usr/bin/env python3 +""" +Tests for the registry chaining functionality. +""" + +import pytest +import sys +import os + +# Add the project root to the path +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) + +from cadence.worker._registry import ( + Registry, new_registry, registry, + RegistryError, WorkflowNotFoundError, ActivityNotFoundError, DuplicateRegistrationError +) + + +class TestRegistryChaining: + """Test registry chaining functionality.""" + + def setup_method(self): + """Set up test fixtures.""" + # Clear the global registry before each test + registry.clear() + + def test_basic_registry_creation(self): + """Test basic registry creation.""" + reg = Registry() + assert reg.get_workflow_count() == 0 + assert reg.get_activity_count() == 0 + assert not reg.has_next_registry() + + def test_new_registry_chains_to_global(self): + """Test that new_registry() automatically chains to global registry.""" + local_reg = new_registry() + assert local_reg.has_next_registry() + assert local_reg.get_next_registry() is registry + + def test_register_and_retrieve_workflow(self): + """Test registering and retrieving workflows.""" + reg = Registry() + + @reg.register_workflow + def test_workflow(): + return "test" + + assert reg.has_workflow("test_workflow") + wf = reg.get_workflow("test_workflow") + assert wf() == "test" + + def test_register_and_retrieve_activity(self): + """Test registering and retrieving activities.""" + reg = Registry() + + @reg.register_activity + def test_activity(): + return "test" + + assert reg.has_activity("test_activity") + act = reg.get_activity("test_activity") + assert act() == "test" + + def test_workflow_chaining(self): + """Test workflow lookup through registry chain.""" + # Register in global registry + @registry.register_workflow + def global_workflow(): + return "global" + + # Create local registry that chains to global + local_reg = new_registry() + + # Register in local registry + @local_reg.register_workflow + def local_workflow(): + return "local" + + # Test that local registry can access both + assert local_reg.has_workflow("local_workflow") + assert local_reg.has_workflow("global_workflow") + + local_wf = local_reg.get_workflow("local_workflow") + global_wf = local_reg.get_workflow("global_workflow") + + assert local_wf() == "local" + assert global_wf() == "global" + + def test_activity_chaining(self): + """Test activity lookup through registry chain.""" + # Register in global registry + @registry.register_activity + def global_activity(): + return "global" + + # Create local registry that chains to global + local_reg = new_registry() + + # Register in local registry + @local_reg.register_activity + def local_activity(): + return "local" + + # Test that local registry can access both + assert local_reg.has_activity("local_activity") + assert local_reg.has_activity("global_activity") + + local_act = local_reg.get_activity("local_activity") + global_act = local_reg.get_activity("global_activity") + + assert local_act() == "local" + assert global_act() == "global" + + def test_multi_level_chaining(self): + """Test multi-level registry chaining.""" + # Global registry + @registry.register_workflow + def global_workflow(): + return "global" + + # Local registry + local_reg = new_registry() + @local_reg.register_workflow + def local_workflow(): + return "local" + + # Sub registry + sub_reg = Registry() + @sub_reg.register_workflow + def sub_workflow(): + return "sub" + + # Chain: sub -> local -> global + sub_reg.set_next_registry(local_reg) + + # Test that sub registry can access all levels + assert sub_reg.has_workflow("sub_workflow") + assert sub_reg.has_workflow("local_workflow") + assert sub_reg.has_workflow("global_workflow") + + sub_wf = sub_reg.get_workflow("sub_workflow") + local_wf = sub_reg.get_workflow("local_workflow") + global_wf = sub_reg.get_workflow("global_workflow") + + assert sub_wf() == "sub" + assert local_wf() == "local" + assert global_wf() == "global" + + def test_list_aggregation(self): + """Test that list methods aggregate from all chained registries.""" + # Global registry + @registry.register_workflow + def global_workflow(): + return "global" + + # Local registry + local_reg = new_registry() + @local_reg.register_workflow + def local_workflow(): + return "local" + + # Sub registry + sub_reg = Registry() + @sub_reg.register_workflow + def sub_workflow(): + return "sub" + sub_reg.set_next_registry(local_reg) + + # Test list aggregation + global_workflows = registry.list_workflows() + local_workflows = local_reg.list_workflows() + sub_workflows = sub_reg.list_workflows() + + assert "global_workflow" in global_workflows + assert "local_workflow" in local_workflows + assert "global_workflow" in local_workflows + assert "sub_workflow" in sub_workflows + assert "local_workflow" in sub_workflows + assert "global_workflow" in sub_workflows + + def test_count_aggregation(self): + """Test that count methods aggregate from all chained registries.""" + # Global registry + @registry.register_workflow + def global_workflow(): + return "global" + + # Local registry + local_reg = new_registry() + @local_reg.register_workflow + def local_workflow(): + return "local" + + # Sub registry + sub_reg = Registry() + @sub_reg.register_workflow + def sub_workflow(): + return "sub" + sub_reg.set_next_registry(local_reg) + + # Test count aggregation + assert registry.get_workflow_count() == 1 + assert local_reg.get_workflow_count() == 1 + assert sub_reg.get_workflow_count() == 1 + + assert registry.get_total_workflow_count() == 1 + assert local_reg.get_total_workflow_count() == 2 + assert sub_reg.get_total_workflow_count() == 3 + + def test_chain_management(self): + """Test chain management methods.""" + reg1 = Registry() + reg2 = Registry() + + # Test setting next registry + reg1.set_next_registry(reg2) + assert reg1.has_next_registry() + assert reg1.get_next_registry() is reg2 + + # Test that reg2 has no next registry + assert not reg2.has_next_registry() + assert reg2.get_next_registry() is None + + def test_workflow_not_found_error(self): + """Test WorkflowNotFoundError is raised when workflow not found.""" + reg = Registry() + + with pytest.raises(WorkflowNotFoundError): + reg.get_workflow("nonexistent") + + def test_activity_not_found_error(self): + """Test ActivityNotFoundError is raised when activity not found.""" + reg = Registry() + + with pytest.raises(ActivityNotFoundError): + reg.get_activity("nonexistent") + + def test_duplicate_registration_error(self): + """Test DuplicateRegistrationError is raised for duplicate registrations.""" + reg = Registry() + + @reg.register_workflow + def test_workflow(): + return "test" + + with pytest.raises(DuplicateRegistrationError): + @reg.register_workflow + def test_workflow(): + return "duplicate" + + def test_workflow_alias(self): + """Test workflow alias functionality.""" + reg = Registry() + + @reg.register_workflow(name="custom_name") + def test_workflow(): + return "test" + + assert reg.has_workflow("custom_name") + wf = reg.get_workflow("custom_name") + assert wf() == "test" + + def test_activity_alias(self): + """Test activity alias functionality.""" + reg = Registry() + + @reg.register_activity(alias="custom_alias") + def test_activity(): + return "test" + + assert reg.has_activity("custom_alias") + act = reg.get_activity("custom_alias") + assert act() == "test" + + def test_unregister_workflow(self): + """Test unregistering workflows.""" + reg = Registry() + + @reg.register_workflow + def test_workflow(): + return "test" + + assert reg.has_workflow("test_workflow") + assert reg.unregister_workflow("test_workflow") + assert not reg.has_workflow("test_workflow") + assert not reg.unregister_workflow("test_workflow") # Already unregistered + + def test_unregister_activity(self): + """Test unregistering activities.""" + reg = Registry() + + @reg.register_activity + def test_activity(): + return "test" + + assert reg.has_activity("test_activity") + assert reg.unregister_activity("test_activity") + assert not reg.has_activity("test_activity") + assert not reg.unregister_activity("test_activity") # Already unregistered + + def test_clear_registry(self): + """Test clearing registry.""" + reg = Registry() + + @reg.register_workflow + def test_workflow(): + return "test" + + @reg.register_activity + def test_activity(): + return "test" + + assert reg.get_workflow_count() == 1 + assert reg.get_activity_count() == 1 + + reg.clear() + + assert reg.get_workflow_count() == 0 + assert reg.get_activity_count() == 0 + assert not reg.has_workflow("test_workflow") + assert not reg.has_activity("test_activity") + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/uv.lock b/uv.lock index 4fb2f1a..2b75b24 100644 --- a/uv.lock +++ b/uv.lock @@ -155,6 +155,7 @@ dependencies = [ { name = "grpcio" }, { name = "grpcio-tools" }, { name = "protobuf" }, + { name = "pytest" }, { name = "typing-extensions" }, ] @@ -191,6 +192,7 @@ requires-dist = [ { name = "myst-parser", marker = "extra == 'docs'", specifier = ">=1.0.0" }, { name = "pre-commit", marker = "extra == 'dev'", specifier = ">=3.0.0" }, { name = "protobuf", specifier = "==5.29.1" }, + { name = "pytest", specifier = ">=8.4.1" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=7.0.0" }, { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.21.0" }, { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=4.0.0" }, From 9f0daee4b3117d97c6912cbaf6a290ee3c050fff Mon Sep 17 00:00:00 2001 From: Tim Li Date: Tue, 26 Aug 2025 12:14:20 -0700 Subject: [PATCH 4/8] linter --- tests/cadence/worker/test_registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cadence/worker/test_registry.py b/tests/cadence/worker/test_registry.py index 567cbc2..83692df 100644 --- a/tests/cadence/worker/test_registry.py +++ b/tests/cadence/worker/test_registry.py @@ -12,7 +12,7 @@ from cadence.worker._registry import ( Registry, new_registry, registry, - RegistryError, WorkflowNotFoundError, ActivityNotFoundError, DuplicateRegistrationError + WorkflowNotFoundError, ActivityNotFoundError, DuplicateRegistrationError ) From 5e2d96330cdf6c7715100d07d5bf198e1581aed6 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Wed, 27 Aug 2025 11:28:34 -0700 Subject: [PATCH 5/8] Respond to comments --- cadence/worker/__init__.py | 38 +-- cadence/worker/_registry.py | 434 +++----------------------- tests/cadence/worker/test_registry.py | 277 ++++------------ 3 files changed, 106 insertions(+), 643 deletions(-) diff --git a/cadence/worker/__init__.py b/cadence/worker/__init__.py index 83e535e..6249d28 100644 --- a/cadence/worker/__init__.py +++ b/cadence/worker/__init__.py @@ -7,44 +7,14 @@ from ._registry import ( Registry, - RegistryError, - WorkflowNotFoundError, - ActivityNotFoundError, - DuplicateRegistrationError, - registry, - new_registry, - register_workflow, - register_activity, - get_workflow, - get_activity, - list_workflows, - list_activities, - has_workflow, - has_activity, - get_total_workflow_count, - get_total_activity_count, - clear_registry, + RegisterWorkflowOptions, + RegisterActivityOptions, ) __all__ = [ "Worker", "WorkerOptions", 'Registry', - 'RegistryError', - 'WorkflowNotFoundError', - 'ActivityNotFoundError', - 'DuplicateRegistrationError', - 'registry', - 'new_registry', - 'register_workflow', - 'register_activity', - 'get_workflow', - 'get_activity', - 'list_workflows', - 'list_activities', - 'has_workflow', - 'has_activity', - 'get_total_workflow_count', - 'get_total_activity_count', - 'clear_registry', + 'RegisterWorkflowOptions', + 'RegisterActivityOptions', ] diff --git a/cadence/worker/_registry.py b/cadence/worker/_registry.py index 8b34618..a29f49b 100644 --- a/cadence/worker/_registry.py +++ b/cadence/worker/_registry.py @@ -7,30 +7,25 @@ """ import logging -from typing import Callable, Dict, List, Optional +from typing import Callable, Dict, Optional +from dataclasses import dataclass logger = logging.getLogger(__name__) -class RegistryError(Exception): - """Base exception for registry operations.""" - pass +@dataclass +class RegisterWorkflowOptions: + """Options for registering a workflow.""" + name: Optional[str] = None + alias: Optional[str] = None -class WorkflowNotFoundError(RegistryError): - """Raised when a workflow is not found in the registry.""" - pass - - -class ActivityNotFoundError(RegistryError): - """Raised when an activity is not found in the registry.""" - pass - - -class DuplicateRegistrationError(RegistryError): - """Raised when attempting to register a duplicate workflow or activity.""" - pass +@dataclass +class RegisterActivityOptions: + """Options for registering an activity.""" + name: Optional[str] = None + alias: Optional[str] = None class Registry: @@ -41,25 +36,17 @@ class Registry: workflows and activities in a Cadence application. """ - def __init__(self, next_registry: Optional['Registry'] = None): - """ - Initialize the registry. - - Args: - next_registry: Optional next registry in the chain for delegation - """ + def __init__(self): + """Initialize the registry.""" self._workflows: Dict[str, Callable] = {} self._activities: Dict[str, Callable] = {} self._workflow_aliases: Dict[str, str] = {} # alias -> name mapping self._activity_aliases: Dict[str, str] = {} # alias -> name mapping - self._next_registry = next_registry - def register_workflow( + def workflow( self, func: Optional[Callable] = None, - *, - name: Optional[str] = None, - alias: Optional[str] = None + **kwargs ) -> Callable: """ Register a workflow function. @@ -68,32 +55,29 @@ def register_workflow( Args: func: The workflow function to register - name: Name of the workflow (defaults to function name) - alias: Alternative name for the workflow + **kwargs: Options for registration (name, alias) Returns: The decorated function or the function itself Raises: - DuplicateRegistrationError: If workflow name already exists + KeyError: If workflow name already exists """ + options = RegisterWorkflowOptions(**kwargs) + def decorator(f: Callable) -> Callable: - workflow_name = name or f.__name__ + workflow_name = options.name or f.__name__ if workflow_name in self._workflows: - raise DuplicateRegistrationError( - f"Workflow '{workflow_name}' is already registered" - ) + raise KeyError(f"Workflow '{workflow_name}' is already registered") self._workflows[workflow_name] = f # Register alias if provided - if alias: - if alias in self._workflow_aliases: - raise DuplicateRegistrationError( - f"Workflow alias '{alias}' is already registered" - ) - self._workflow_aliases[alias] = workflow_name + if options.alias: + if options.alias in self._workflow_aliases: + raise KeyError(f"Workflow alias '{options.alias}' is already registered") + self._workflow_aliases[options.alias] = workflow_name logger.info(f"Registered workflow '{workflow_name}'") return f @@ -102,12 +86,10 @@ def decorator(f: Callable) -> Callable: return decorator return decorator(func) - def register_activity( + def activity( self, func: Optional[Callable] = None, - *, - name: Optional[str] = None, - alias: Optional[str] = None + **kwargs ) -> Callable: """ Register an activity function. @@ -116,32 +98,29 @@ def register_activity( Args: func: The activity function to register - name: Name of the activity (defaults to function name) - alias: Alternative name for the activity + **kwargs: Options for registration (name, alias) Returns: The decorated function or the function itself Raises: - DuplicateRegistrationError: If activity name already exists + KeyError: If activity name already exists """ + options = RegisterActivityOptions(**kwargs) + def decorator(f: Callable) -> Callable: - activity_name = name or f.__name__ + activity_name = options.name or f.__name__ if activity_name in self._activities: - raise DuplicateRegistrationError( - f"Activity '{activity_name}' is already registered" - ) + raise KeyError(f"Activity '{activity_name}' is already registered") self._activities[activity_name] = f # Register alias if provided - if alias: - if alias in self._activity_aliases: - raise DuplicateRegistrationError( - f"Activity alias '{alias}' is already registered" - ) - self._activity_aliases[alias] = activity_name + if options.alias: + if options.alias in self._activity_aliases: + raise KeyError(f"Activity alias '{options.alias}' is already registered") + self._activity_aliases[options.alias] = activity_name logger.info(f"Registered activity '{activity_name}'") return f @@ -161,22 +140,15 @@ def get_workflow(self, name: str) -> Callable: The workflow function Raises: - WorkflowNotFoundError: If workflow is not found + KeyError: If workflow is not found """ # Check if it's an alias actual_name = self._workflow_aliases.get(name, name) - if actual_name in self._workflows: - return self._workflows[actual_name] - - # Try the next registry in the chain if available - if self._next_registry: - try: - return self._next_registry.get_workflow(name) - except WorkflowNotFoundError: - pass + if actual_name not in self._workflows: + raise KeyError(f"Workflow '{name}' not found in registry") - raise WorkflowNotFoundError(f"Workflow '{name}' not found in registry") + return self._workflows[actual_name] def get_activity(self, name: str) -> Callable: """ @@ -189,327 +161,15 @@ def get_activity(self, name: str) -> Callable: The activity function Raises: - ActivityNotFoundError: If activity is not found - """ - # Check if it's an alias - actual_name = self._activity_aliases.get(name, name) - - if actual_name in self._activities: - return self._activities[actual_name] - - # Try the next registry in the chain if available - if self._next_registry: - try: - return self._next_registry.get_activity(name) - except ActivityNotFoundError: - pass - - raise ActivityNotFoundError(f"Activity '{name}' not found in registry") - - def list_workflows(self) -> List[str]: - """ - Get a list of all registered workflow names. - - Returns: - List of workflow names - """ - workflows = list(self._workflows.keys()) - if self._next_registry: - workflows.extend(self._next_registry.list_workflows()) - return workflows - - def list_activities(self) -> List[str]: - """ - Get a list of all registered activity names. - - Returns: - List of activity names - """ - activities = list(self._activities.keys()) - if self._next_registry: - activities.extend(self._next_registry.list_activities()) - return activities - - def list_workflow_aliases(self) -> Dict[str, str]: - """ - Get a mapping of workflow aliases to actual names. - - Returns: - Dictionary mapping aliases to workflow names - """ - return self._workflow_aliases.copy() - - def list_activity_aliases(self) -> Dict[str, str]: - """ - Get a mapping of activity aliases to actual names. - - Returns: - Dictionary mapping aliases to activity names - """ - return self._activity_aliases.copy() - - def unregister_workflow(self, name: str) -> bool: - """ - Unregister a workflow. - - Args: - name: Name or alias of the workflow to unregister - - Returns: - True if workflow was unregistered, False if not found - """ - # Check if it's an alias - actual_name = self._workflow_aliases.get(name, name) - - if actual_name in self._workflows: - del self._workflows[actual_name] - - # Remove any aliases pointing to this workflow - aliases_to_remove = [ - alias for alias, workflow_name in self._workflow_aliases.items() - if workflow_name == actual_name - ] - for alias in aliases_to_remove: - del self._workflow_aliases[alias] - - logger.info(f"Unregistered workflow '{actual_name}'") - return True - - return False - - def unregister_activity(self, name: str) -> bool: - """ - Unregister an activity. - - Args: - name: Name or alias of the activity to unregister - - Returns: - True if activity was unregistered, False if not found + KeyError: If activity is not found """ # Check if it's an alias actual_name = self._activity_aliases.get(name, name) - if actual_name in self._activities: - del self._activities[actual_name] - - # Remove any aliases pointing to this activity - aliases_to_remove = [ - alias for alias, activity_name in self._activity_aliases.items() - if activity_name == actual_name - ] - for alias in aliases_to_remove: - del self._activity_aliases[alias] - - logger.info(f"Unregistered activity '{actual_name}'") - return True + if actual_name not in self._activities: + raise KeyError(f"Activity '{name}' not found in registry") - return False - - def clear(self): - """Clear all registered workflows and activities.""" - self._workflows.clear() - self._activities.clear() - self._workflow_aliases.clear() - self._activity_aliases.clear() - logger.info("Registry cleared") - - def get_workflow_count(self) -> int: - """ - Get the total number of registered workflows in this registry. - - Note: This only counts workflows in this registry, not in chained registries. - Use get_total_workflow_count() to get the total count across all chained registries. - """ - return len(self._workflows) - - def get_activity_count(self) -> int: - """ - Get the total number of registered activities in this registry. - - Note: This only counts activities in this registry, not in chained registries. - Use get_total_workflow_count() to get the total count across all chained registries. - """ - return len(self._activities) - - def get_total_workflow_count(self) -> int: - """Get the total number of registered workflows across all chained registries.""" - count = len(self._workflows) - if self._next_registry: - count += self._next_registry.get_total_workflow_count() - return count + return self._activities[actual_name] - def get_total_activity_count(self) -> int: - """Get the total number of registered activities across all chained registries.""" - count = len(self._activities) - if self._next_registry: - count += self._next_registry.get_total_activity_count() - return count - - def has_workflow(self, name: str) -> bool: - """ - Check if a workflow is registered in this registry or any chained registries. - - Args: - name: Name or alias of the workflow - - Returns: - True if workflow exists anywhere in the chain, False otherwise - """ - actual_name = self._workflow_aliases.get(name, name) - if actual_name in self._workflows: - return True - - # Check the next registry in the chain if available - if self._next_registry: - return self._next_registry.has_workflow(name) - - return False - - def has_activity(self, name: str) -> bool: - """ - Check if an activity is registered in this registry or any chained registries. - - Args: - name: Name or alias of the activity - - Returns: - True if activity exists anywhere in the chain, False otherwise - """ - actual_name = self._activity_aliases.get(name, name) - if actual_name in self._activities: - return True - - # Check the next registry in the chain if available - if self._next_registry: - return self._next_registry.has_activity(name) - - return False - - def set_next_registry(self, next_registry: 'Registry') -> None: - """ - Set the next registry in the chain. - - Args: - next_registry: The registry to chain to - """ - self._next_registry = next_registry - - def get_next_registry(self) -> Optional['Registry']: - """ - Get the next registry in the chain. - - Returns: - The next registry or None if no chaining - """ - return self._next_registry - - def has_next_registry(self) -> bool: - """ - Check if this registry has a next registry in the chain. - - Returns: - True if there is a next registry, False otherwise - """ - return self._next_registry is not None - - -# Global registry instance -registry = Registry() - -def new_registry() -> Registry: - """ - Create a new registry that automatically chains to the global registry. - - This follows the Go client pattern where new registries automatically - delegate to the global registry when items are not found locally. - - Returns: - A new Registry instance chained to the global registry - """ - return Registry(next_registry=registry) - - -# Convenience functions for using the global registry -def register_workflow( - func: Optional[Callable] = None, - **kwargs -) -> Callable: - """Register a workflow with the global registry.""" - return registry.register_workflow(func, **kwargs) - - -def register_activity( - func: Optional[Callable] = None, - **kwargs -) -> Callable: - """Register an activity with the global registry.""" - return registry.register_activity(func, **kwargs) - - -def get_workflow(name: str) -> Callable: - """Get a workflow from the global registry.""" - return registry.get_workflow(name) - - -def get_activity(name: str) -> Callable: - """Get an activity from the global registry.""" - return registry.get_activity(name) - - -def list_workflows() -> List[str]: - """List all workflows in the global registry.""" - return registry.list_workflows() - - -def list_activities() -> List[str]: - """List all activities in the global registry.""" - return registry.list_activities() - - -def has_workflow(name: str) -> bool: - """Check if a workflow exists in the global registry.""" - return registry.has_workflow(name) - - -def has_activity(name: str) -> bool: - """Check if an activity exists in the global registry.""" - return registry.has_activity(name) - - -def get_total_workflow_count() -> int: - """Get the total number of workflows across all chained registries.""" - return registry.get_total_workflow_count() - - -def get_total_activity_count() -> int: - """Get the total number of activities across all chained registries.""" - return registry.get_total_activity_count() - - -def clear_registry(): - """Clear the global registry.""" - registry.clear() - -# Export the main classes and functions -__all__ = [ - 'Registry', - 'RegistryError', - 'WorkflowNotFoundError', - 'ActivityNotFoundError', - 'DuplicateRegistrationError', - 'registry', - 'new_registry', - 'register_workflow', - 'register_activity', - 'get_workflow', - 'get_activity', - 'list_workflows', - 'list_activities', - 'has_workflow', - 'has_activity', - 'get_total_workflow_count', - 'get_total_activity_count', - 'clear_registry', -] + \ No newline at end of file diff --git a/tests/cadence/worker/test_registry.py b/tests/cadence/worker/test_registry.py index 83692df..8093585 100644 --- a/tests/cadence/worker/test_registry.py +++ b/tests/cadence/worker/test_registry.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -Tests for the registry chaining functionality. +Tests for the registry functionality. """ import pytest @@ -10,42 +10,27 @@ # Add the project root to the path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) -from cadence.worker._registry import ( - Registry, new_registry, registry, - WorkflowNotFoundError, ActivityNotFoundError, DuplicateRegistrationError -) +from cadence.worker import Registry, RegisterWorkflowOptions, RegisterActivityOptions -class TestRegistryChaining: - """Test registry chaining functionality.""" - - def setup_method(self): - """Set up test fixtures.""" - # Clear the global registry before each test - registry.clear() +class TestRegistry: + """Test registry functionality.""" def test_basic_registry_creation(self): """Test basic registry creation.""" reg = Registry() - assert reg.get_workflow_count() == 0 - assert reg.get_activity_count() == 0 - assert not reg.has_next_registry() - - def test_new_registry_chains_to_global(self): - """Test that new_registry() automatically chains to global registry.""" - local_reg = new_registry() - assert local_reg.has_next_registry() - assert local_reg.get_next_registry() is registry + assert len(reg._workflows) == 0 + assert len(reg._activities) == 0 def test_register_and_retrieve_workflow(self): """Test registering and retrieving workflows.""" reg = Registry() - @reg.register_workflow + @reg.workflow def test_workflow(): return "test" - assert reg.has_workflow("test_workflow") + assert "test_workflow" in reg._workflows wf = reg.get_workflow("test_workflow") assert wf() == "test" @@ -53,198 +38,64 @@ def test_register_and_retrieve_activity(self): """Test registering and retrieving activities.""" reg = Registry() - @reg.register_activity + @reg.activity def test_activity(): return "test" - assert reg.has_activity("test_activity") + assert "test_activity" in reg._activities act = reg.get_activity("test_activity") assert act() == "test" - def test_workflow_chaining(self): - """Test workflow lookup through registry chain.""" - # Register in global registry - @registry.register_workflow - def global_workflow(): - return "global" - - # Create local registry that chains to global - local_reg = new_registry() - - # Register in local registry - @local_reg.register_workflow - def local_workflow(): - return "local" - - # Test that local registry can access both - assert local_reg.has_workflow("local_workflow") - assert local_reg.has_workflow("global_workflow") - - local_wf = local_reg.get_workflow("local_workflow") - global_wf = local_reg.get_workflow("global_workflow") - - assert local_wf() == "local" - assert global_wf() == "global" - - def test_activity_chaining(self): - """Test activity lookup through registry chain.""" - # Register in global registry - @registry.register_activity - def global_activity(): - return "global" - - # Create local registry that chains to global - local_reg = new_registry() - - # Register in local registry - @local_reg.register_activity - def local_activity(): - return "local" - - # Test that local registry can access both - assert local_reg.has_activity("local_activity") - assert local_reg.has_activity("global_activity") - - local_act = local_reg.get_activity("local_activity") - global_act = local_reg.get_activity("global_activity") + def test_workflow_registration(self): + """Test workflow registration.""" + reg = Registry() - assert local_act() == "local" - assert global_act() == "global" - - def test_multi_level_chaining(self): - """Test multi-level registry chaining.""" - # Global registry - @registry.register_workflow + @reg.workflow def global_workflow(): return "global" - # Local registry - local_reg = new_registry() - @local_reg.register_workflow - def local_workflow(): - return "local" - - # Sub registry - sub_reg = Registry() - @sub_reg.register_workflow - def sub_workflow(): - return "sub" - - # Chain: sub -> local -> global - sub_reg.set_next_registry(local_reg) - - # Test that sub registry can access all levels - assert sub_reg.has_workflow("sub_workflow") - assert sub_reg.has_workflow("local_workflow") - assert sub_reg.has_workflow("global_workflow") - - sub_wf = sub_reg.get_workflow("sub_workflow") - local_wf = sub_reg.get_workflow("local_workflow") - global_wf = sub_reg.get_workflow("global_workflow") - - assert sub_wf() == "sub" - assert local_wf() == "local" - assert global_wf() == "global" + assert "global_workflow" in reg._workflows + wf = reg.get_workflow("global_workflow") + assert wf() == "global" - def test_list_aggregation(self): - """Test that list methods aggregate from all chained registries.""" - # Global registry - @registry.register_workflow - def global_workflow(): - return "global" - - # Local registry - local_reg = new_registry() - @local_reg.register_workflow - def local_workflow(): - return "local" - - # Sub registry - sub_reg = Registry() - @sub_reg.register_workflow - def sub_workflow(): - return "sub" - sub_reg.set_next_registry(local_reg) - - # Test list aggregation - global_workflows = registry.list_workflows() - local_workflows = local_reg.list_workflows() - sub_workflows = sub_reg.list_workflows() + def test_activity_registration(self): + """Test activity registration.""" + reg = Registry() - assert "global_workflow" in global_workflows - assert "local_workflow" in local_workflows - assert "global_workflow" in local_workflows - assert "sub_workflow" in sub_workflows - assert "local_workflow" in sub_workflows - assert "global_workflow" in sub_workflows - - def test_count_aggregation(self): - """Test that count methods aggregate from all chained registries.""" - # Global registry - @registry.register_workflow - def global_workflow(): + @reg.activity + def global_activity(): return "global" - # Local registry - local_reg = new_registry() - @local_reg.register_workflow - def local_workflow(): - return "local" - - # Sub registry - sub_reg = Registry() - @sub_reg.register_workflow - def sub_workflow(): - return "sub" - sub_reg.set_next_registry(local_reg) - - # Test count aggregation - assert registry.get_workflow_count() == 1 - assert local_reg.get_workflow_count() == 1 - assert sub_reg.get_workflow_count() == 1 - - assert registry.get_total_workflow_count() == 1 - assert local_reg.get_total_workflow_count() == 2 - assert sub_reg.get_total_workflow_count() == 3 + assert "global_activity" in reg._activities + act = reg.get_activity("global_activity") + assert act() == "global" - def test_chain_management(self): - """Test chain management methods.""" - reg1 = Registry() - reg2 = Registry() - - # Test setting next registry - reg1.set_next_registry(reg2) - assert reg1.has_next_registry() - assert reg1.get_next_registry() is reg2 - - # Test that reg2 has no next registry - assert not reg2.has_next_registry() - assert reg2.get_next_registry() is None + def test_workflow_not_found_error(self): - """Test WorkflowNotFoundError is raised when workflow not found.""" + """Test KeyError is raised when workflow not found.""" reg = Registry() - with pytest.raises(WorkflowNotFoundError): + with pytest.raises(KeyError): reg.get_workflow("nonexistent") def test_activity_not_found_error(self): - """Test ActivityNotFoundError is raised when activity not found.""" + """Test KeyError is raised when activity not found.""" reg = Registry() - with pytest.raises(ActivityNotFoundError): + with pytest.raises(KeyError): reg.get_activity("nonexistent") def test_duplicate_registration_error(self): - """Test DuplicateRegistrationError is raised for duplicate registrations.""" + """Test KeyError is raised for duplicate registrations.""" reg = Registry() - @reg.register_workflow + @reg.workflow def test_workflow(): return "test" - with pytest.raises(DuplicateRegistrationError): - @reg.register_workflow + with pytest.raises(KeyError): + @reg.workflow def test_workflow(): return "duplicate" @@ -252,11 +103,11 @@ def test_workflow_alias(self): """Test workflow alias functionality.""" reg = Registry() - @reg.register_workflow(name="custom_name") + @reg.workflow(name="custom_name") def test_workflow(): return "test" - assert reg.has_workflow("custom_name") + assert "custom_name" in reg._workflows wf = reg.get_workflow("custom_name") assert wf() == "test" @@ -264,61 +115,43 @@ def test_activity_alias(self): """Test activity alias functionality.""" reg = Registry() - @reg.register_activity(alias="custom_alias") + @reg.activity(alias="custom_alias") def test_activity(): return "test" - assert reg.has_activity("custom_alias") + assert "custom_alias" in reg._activity_aliases act = reg.get_activity("custom_alias") assert act() == "test" - def test_unregister_workflow(self): - """Test unregistering workflows.""" + def test_workflow_options_class(self): + """Test using RegisterWorkflowOptions class.""" reg = Registry() - @reg.register_workflow - def test_workflow(): - return "test" - - assert reg.has_workflow("test_workflow") - assert reg.unregister_workflow("test_workflow") - assert not reg.has_workflow("test_workflow") - assert not reg.unregister_workflow("test_workflow") # Already unregistered - - def test_unregister_activity(self): - """Test unregistering activities.""" - reg = Registry() + options = RegisterWorkflowOptions(name="custom_name", alias="custom_alias") - @reg.register_activity - def test_activity(): + @reg.workflow(**options.__dict__) + def test_workflow(): return "test" - assert reg.has_activity("test_activity") - assert reg.unregister_activity("test_activity") - assert not reg.has_activity("test_activity") - assert not reg.unregister_activity("test_activity") # Already unregistered + assert "custom_name" in reg._workflows + assert "custom_alias" in reg._workflow_aliases + wf = reg.get_workflow("custom_name") + assert wf() == "test" - def test_clear_registry(self): - """Test clearing registry.""" + def test_activity_options_class(self): + """Test using RegisterActivityOptions class.""" reg = Registry() - @reg.register_workflow - def test_workflow(): - return "test" + options = RegisterActivityOptions(name="custom_name", alias="custom_alias") - @reg.register_activity + @reg.activity(**options.__dict__) def test_activity(): return "test" - assert reg.get_workflow_count() == 1 - assert reg.get_activity_count() == 1 - - reg.clear() - - assert reg.get_workflow_count() == 0 - assert reg.get_activity_count() == 0 - assert not reg.has_workflow("test_workflow") - assert not reg.has_activity("test_activity") + assert "custom_name" in reg._activities + assert "custom_alias" in reg._activity_aliases + act = reg.get_activity("custom_name") + assert act() == "test" if __name__ == "__main__": From 6e22fb07a871c9d5aa85d63e46be2b0722145325 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Wed, 27 Aug 2025 12:23:43 -0700 Subject: [PATCH 6/8] modify test --- tests/cadence/worker/test_registry.py | 258 +++++++++++++++----------- 1 file changed, 153 insertions(+), 105 deletions(-) diff --git a/tests/cadence/worker/test_registry.py b/tests/cadence/worker/test_registry.py index 8093585..a9ec05d 100644 --- a/tests/cadence/worker/test_registry.py +++ b/tests/cadence/worker/test_registry.py @@ -22,136 +22,184 @@ def test_basic_registry_creation(self): assert len(reg._workflows) == 0 assert len(reg._activities) == 0 - def test_register_and_retrieve_workflow(self): - """Test registering and retrieving workflows.""" + @pytest.mark.parametrize("registration_type", ["workflow", "activity"]) + def test_basic_registration_and_retrieval(self, registration_type): + """Test basic registration and retrieval for both workflows and activities.""" reg = Registry() - @reg.workflow - def test_workflow(): - return "test" - - assert "test_workflow" in reg._workflows - wf = reg.get_workflow("test_workflow") - assert wf() == "test" - - def test_register_and_retrieve_activity(self): - """Test registering and retrieving activities.""" - reg = Registry() - - @reg.activity - def test_activity(): - return "test" - - assert "test_activity" in reg._activities - act = reg.get_activity("test_activity") - assert act() == "test" + if registration_type == "workflow": + @reg.workflow + def test_func(): + return "test" + + assert "test_func" in reg._workflows + func = reg.get_workflow("test_func") + else: + @reg.activity + def test_func(): + return "test" + + assert "test_func" in reg._activities + func = reg.get_activity("test_func") + + assert func() == "test" - def test_workflow_registration(self): - """Test workflow registration.""" + @pytest.mark.parametrize("registration_type", ["workflow", "activity"]) + def test_direct_call_behavior(self, registration_type): + """Test direct function call behavior for both workflows and activities.""" reg = Registry() - @reg.workflow - def global_workflow(): - return "global" - - assert "global_workflow" in reg._workflows - wf = reg.get_workflow("global_workflow") - assert wf() == "global" + def test_func(): + return "direct_call" + + if registration_type == "workflow": + # Direct call behavior - should register and return the function + registered_func = reg.workflow(test_func) + assert "test_func" in reg._workflows + func = reg.get_workflow("test_func") + else: + # Direct call behavior - should register and return the function + registered_func = reg.activity(test_func) + assert "test_func" in reg._activities + func = reg.get_activity("test_func") + + # Should be the same function + assert registered_func == test_func + assert func() == "direct_call" - def test_activity_registration(self): - """Test activity registration.""" + @pytest.mark.parametrize("registration_type", ["workflow", "activity"]) + def test_decorator_with_options(self, registration_type): + """Test decorator with options for both workflows and activities.""" reg = Registry() - @reg.activity - def global_activity(): - return "global" - - assert "global_activity" in reg._activities - act = reg.get_activity("global_activity") - assert act() == "global" - - + if registration_type == "workflow": + @reg.workflow(name="custom_name", alias="custom_alias") + def test_func(): + return "decorator_with_options" + + assert "custom_name" in reg._workflows + assert "custom_alias" in reg._workflow_aliases + func = reg.get_workflow("custom_name") + else: + @reg.activity(name="custom_name", alias="custom_alias") + def test_func(): + return "decorator_with_options" + + assert "custom_name" in reg._activities + assert "custom_alias" in reg._activity_aliases + func = reg.get_activity("custom_name") + + assert func() == "decorator_with_options" - def test_workflow_not_found_error(self): - """Test KeyError is raised when workflow not found.""" + @pytest.mark.parametrize("registration_type", ["workflow", "activity"]) + def test_direct_call_with_options(self, registration_type): + """Test direct call with options for both workflows and activities.""" reg = Registry() - with pytest.raises(KeyError): - reg.get_workflow("nonexistent") + def test_func(): + return "direct_call_with_options" + + if registration_type == "workflow": + # Direct call with options + registered_func = reg.workflow(test_func, name="custom_name", alias="custom_alias") + assert "custom_name" in reg._workflows + assert "custom_alias" in reg._workflow_aliases + func = reg.get_workflow("custom_name") + else: + # Direct call with options + registered_func = reg.activity(test_func, name="custom_name", alias="custom_alias") + assert "custom_name" in reg._activities + assert "custom_alias" in reg._activity_aliases + func = reg.get_activity("custom_name") + + assert registered_func == test_func + assert func() == "direct_call_with_options" - def test_activity_not_found_error(self): - """Test KeyError is raised when activity not found.""" + @pytest.mark.parametrize("registration_type", ["workflow", "activity"]) + def test_not_found_error(self, registration_type): + """Test KeyError is raised when function not found.""" reg = Registry() - with pytest.raises(KeyError): - reg.get_activity("nonexistent") + if registration_type == "workflow": + with pytest.raises(KeyError): + reg.get_workflow("nonexistent") + else: + with pytest.raises(KeyError): + reg.get_activity("nonexistent") - def test_duplicate_registration_error(self): + @pytest.mark.parametrize("registration_type", ["workflow", "activity"]) + def test_duplicate_registration_error(self, registration_type): """Test KeyError is raised for duplicate registrations.""" reg = Registry() - @reg.workflow - def test_workflow(): - return "test" - - with pytest.raises(KeyError): + if registration_type == "workflow": @reg.workflow - def test_workflow(): - return "duplicate" + def test_func(): + return "test" + + with pytest.raises(KeyError): + @reg.workflow + def test_func(): + return "duplicate" + else: + @reg.activity + def test_func(): + return "test" + + with pytest.raises(KeyError): + @reg.activity + def test_func(): + return "duplicate" - def test_workflow_alias(self): - """Test workflow alias functionality.""" + @pytest.mark.parametrize("registration_type", ["workflow", "activity"]) + def test_alias_functionality(self, registration_type): + """Test alias functionality for both workflows and activities.""" reg = Registry() - @reg.workflow(name="custom_name") - def test_workflow(): - return "test" - - assert "custom_name" in reg._workflows - wf = reg.get_workflow("custom_name") - assert wf() == "test" + if registration_type == "workflow": + @reg.workflow(name="custom_name") + def test_func(): + return "test" + + assert "custom_name" in reg._workflows + func = reg.get_workflow("custom_name") + else: + @reg.activity(alias="custom_alias") + def test_func(): + return "test" + + assert "custom_alias" in reg._activity_aliases + func = reg.get_activity("custom_alias") + + assert func() == "test" - def test_activity_alias(self): - """Test activity alias functionality.""" + @pytest.mark.parametrize("registration_type", ["workflow", "activity"]) + def test_options_class(self, registration_type): + """Test using options classes for both workflows and activities.""" reg = Registry() - @reg.activity(alias="custom_alias") - def test_activity(): - return "test" - - assert "custom_alias" in reg._activity_aliases - act = reg.get_activity("custom_alias") - assert act() == "test" - - def test_workflow_options_class(self): - """Test using RegisterWorkflowOptions class.""" - reg = Registry() - - options = RegisterWorkflowOptions(name="custom_name", alias="custom_alias") - - @reg.workflow(**options.__dict__) - def test_workflow(): - return "test" - - assert "custom_name" in reg._workflows - assert "custom_alias" in reg._workflow_aliases - wf = reg.get_workflow("custom_name") - assert wf() == "test" - - def test_activity_options_class(self): - """Test using RegisterActivityOptions class.""" - reg = Registry() - - options = RegisterActivityOptions(name="custom_name", alias="custom_alias") - - @reg.activity(**options.__dict__) - def test_activity(): - return "test" - - assert "custom_name" in reg._activities - assert "custom_alias" in reg._activity_aliases - act = reg.get_activity("custom_name") - assert act() == "test" + if registration_type == "workflow": + options = RegisterWorkflowOptions(name="custom_name", alias="custom_alias") + + @reg.workflow(**options.__dict__) + def test_func(): + return "test" + + assert "custom_name" in reg._workflows + assert "custom_alias" in reg._workflow_aliases + func = reg.get_workflow("custom_name") + else: + options = RegisterActivityOptions(name="custom_name", alias="custom_alias") + + @reg.activity(**options.__dict__) + def test_func(): + return "test" + + assert "custom_name" in reg._activities + assert "custom_alias" in reg._activity_aliases + func = reg.get_activity("custom_name") + + assert func() == "test" if __name__ == "__main__": From 5018631abad187edbf02a59ef1db653cab4549f3 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Wed, 27 Aug 2025 13:00:43 -0700 Subject: [PATCH 7/8] comments --- cadence/worker/_registry.py | 4 +- pyproject.toml | 3 +- tests/cadence/worker/test_registry.py | 53 ++++++++++----------------- uv.lock | 4 +- 4 files changed, 23 insertions(+), 41 deletions(-) diff --git a/cadence/worker/_registry.py b/cadence/worker/_registry.py index a29f49b..b0e205b 100644 --- a/cadence/worker/_registry.py +++ b/cadence/worker/_registry.py @@ -46,7 +46,7 @@ def __init__(self): def workflow( self, func: Optional[Callable] = None, - **kwargs + **kwargs: RegisterWorkflowOptions ) -> Callable: """ Register a workflow function. @@ -89,7 +89,7 @@ def decorator(f: Callable) -> Callable: def activity( self, func: Optional[Callable] = None, - **kwargs + **kwargs: RegisterActivityOptions ) -> Callable: """ Register an activity function. diff --git a/pyproject.toml b/pyproject.toml index 694fa2f..82c7457 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,13 +30,12 @@ dependencies = [ "grpcio-tools>=1.50.0", "msgspec>=0.19.0", "protobuf==5.29.1", - "pytest>=8.4.1", "typing-extensions>=4.0.0", ] [project.optional-dependencies] dev = [ - "pytest>=7.0.0", + "pytest>=8.4.1", "pytest-cov>=4.0.0", "pytest-asyncio>=0.21.0", "black>=23.0.0", diff --git a/tests/cadence/worker/test_registry.py b/tests/cadence/worker/test_registry.py index a9ec05d..e047c11 100644 --- a/tests/cadence/worker/test_registry.py +++ b/tests/cadence/worker/test_registry.py @@ -4,11 +4,6 @@ """ import pytest -import sys -import os - -# Add the project root to the path -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) from cadence.worker import Registry, RegisterWorkflowOptions, RegisterActivityOptions @@ -19,8 +14,10 @@ class TestRegistry: def test_basic_registry_creation(self): """Test basic registry creation.""" reg = Registry() - assert len(reg._workflows) == 0 - assert len(reg._activities) == 0 + with pytest.raises(KeyError): + reg.get_workflow("nonexistent") + with pytest.raises(KeyError): + reg.get_activity("nonexistent") @pytest.mark.parametrize("registration_type", ["workflow", "activity"]) def test_basic_registration_and_retrieval(self, registration_type): @@ -32,14 +29,12 @@ def test_basic_registration_and_retrieval(self, registration_type): def test_func(): return "test" - assert "test_func" in reg._workflows func = reg.get_workflow("test_func") else: @reg.activity def test_func(): return "test" - assert "test_func" in reg._activities func = reg.get_activity("test_func") assert func() == "test" @@ -53,17 +48,12 @@ def test_func(): return "direct_call" if registration_type == "workflow": - # Direct call behavior - should register and return the function registered_func = reg.workflow(test_func) - assert "test_func" in reg._workflows func = reg.get_workflow("test_func") else: - # Direct call behavior - should register and return the function registered_func = reg.activity(test_func) - assert "test_func" in reg._activities func = reg.get_activity("test_func") - # Should be the same function assert registered_func == test_func assert func() == "direct_call" @@ -77,19 +67,19 @@ def test_decorator_with_options(self, registration_type): def test_func(): return "decorator_with_options" - assert "custom_name" in reg._workflows - assert "custom_alias" in reg._workflow_aliases func = reg.get_workflow("custom_name") + func_by_alias = reg.get_workflow("custom_alias") else: @reg.activity(name="custom_name", alias="custom_alias") def test_func(): return "decorator_with_options" - assert "custom_name" in reg._activities - assert "custom_alias" in reg._activity_aliases func = reg.get_activity("custom_name") + func_by_alias = reg.get_activity("custom_alias") assert func() == "decorator_with_options" + assert func_by_alias() == "decorator_with_options" + assert func == func_by_alias @pytest.mark.parametrize("registration_type", ["workflow", "activity"]) def test_direct_call_with_options(self, registration_type): @@ -100,20 +90,18 @@ def test_func(): return "direct_call_with_options" if registration_type == "workflow": - # Direct call with options registered_func = reg.workflow(test_func, name="custom_name", alias="custom_alias") - assert "custom_name" in reg._workflows - assert "custom_alias" in reg._workflow_aliases func = reg.get_workflow("custom_name") + func_by_alias = reg.get_workflow("custom_alias") else: - # Direct call with options registered_func = reg.activity(test_func, name="custom_name", alias="custom_alias") - assert "custom_name" in reg._activities - assert "custom_alias" in reg._activity_aliases func = reg.get_activity("custom_name") + func_by_alias = reg.get_activity("custom_name") assert registered_func == test_func assert func() == "direct_call_with_options" + assert func_by_alias() == "direct_call_with_options" + assert func == func_by_alias @pytest.mark.parametrize("registration_type", ["workflow", "activity"]) def test_not_found_error(self, registration_type): @@ -161,15 +149,16 @@ def test_alias_functionality(self, registration_type): def test_func(): return "test" - assert "custom_name" in reg._workflows func = reg.get_workflow("custom_name") else: @reg.activity(alias="custom_alias") def test_func(): return "test" - assert "custom_alias" in reg._activity_aliases func = reg.get_activity("custom_alias") + func_by_name = reg.get_activity("test_func") + assert func_by_name() == "test" + assert func == func_by_name assert func() == "test" @@ -185,9 +174,8 @@ def test_options_class(self, registration_type): def test_func(): return "test" - assert "custom_name" in reg._workflows - assert "custom_alias" in reg._workflow_aliases func = reg.get_workflow("custom_name") + func_by_alias = reg.get_workflow("custom_alias") else: options = RegisterActivityOptions(name="custom_name", alias="custom_alias") @@ -195,12 +183,9 @@ def test_func(): def test_func(): return "test" - assert "custom_name" in reg._activities - assert "custom_alias" in reg._activity_aliases func = reg.get_activity("custom_name") + func_by_alias = reg.get_activity("custom_alias") assert func() == "test" - - -if __name__ == "__main__": - pytest.main([__file__]) + assert func_by_alias() == "test" + assert func == func_by_alias diff --git a/uv.lock b/uv.lock index 897b248..fe3f9e0 100644 --- a/uv.lock +++ b/uv.lock @@ -156,7 +156,6 @@ dependencies = [ { name = "grpcio-tools" }, { name = "msgspec" }, { name = "protobuf" }, - { name = "pytest" }, { name = "typing-extensions" }, ] @@ -194,8 +193,7 @@ requires-dist = [ { name = "myst-parser", marker = "extra == 'docs'", specifier = ">=1.0.0" }, { name = "pre-commit", marker = "extra == 'dev'", specifier = ">=3.0.0" }, { name = "protobuf", specifier = "==5.29.1" }, - { name = "pytest", specifier = ">=8.4.1" }, - { name = "pytest", marker = "extra == 'dev'", specifier = ">=7.0.0" }, + { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.4.1" }, { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.21.0" }, { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=4.0.0" }, { name = "requests", marker = "extra == 'examples'", specifier = ">=2.28.0" }, From 026ad58a4ec4d4665130cb33e74d1497cd539eaa Mon Sep 17 00:00:00 2001 From: Tim Li Date: Wed, 27 Aug 2025 13:24:18 -0700 Subject: [PATCH 8/8] unpack --- cadence/worker/_registry.py | 43 +++++++++++++-------------- tests/cadence/worker/test_registry.py | 4 +-- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/cadence/worker/_registry.py b/cadence/worker/_registry.py index b0e205b..6822351 100644 --- a/cadence/worker/_registry.py +++ b/cadence/worker/_registry.py @@ -7,25 +7,22 @@ """ import logging -from typing import Callable, Dict, Optional -from dataclasses import dataclass +from typing import Callable, Dict, Optional, Unpack, TypedDict logger = logging.getLogger(__name__) -@dataclass -class RegisterWorkflowOptions: +class RegisterWorkflowOptions(TypedDict, total=False): """Options for registering a workflow.""" - name: Optional[str] = None - alias: Optional[str] = None + name: Optional[str] + alias: Optional[str] -@dataclass -class RegisterActivityOptions: +class RegisterActivityOptions(TypedDict, total=False): """Options for registering an activity.""" - name: Optional[str] = None - alias: Optional[str] = None + name: Optional[str] + alias: Optional[str] class Registry: @@ -46,7 +43,7 @@ def __init__(self): def workflow( self, func: Optional[Callable] = None, - **kwargs: RegisterWorkflowOptions + **kwargs: Unpack[RegisterWorkflowOptions] ) -> Callable: """ Register a workflow function. @@ -66,7 +63,7 @@ def workflow( options = RegisterWorkflowOptions(**kwargs) def decorator(f: Callable) -> Callable: - workflow_name = options.name or f.__name__ + workflow_name = options.get('name') or f.__name__ if workflow_name in self._workflows: raise KeyError(f"Workflow '{workflow_name}' is already registered") @@ -74,10 +71,11 @@ def decorator(f: Callable) -> Callable: self._workflows[workflow_name] = f # Register alias if provided - if options.alias: - if options.alias in self._workflow_aliases: - raise KeyError(f"Workflow alias '{options.alias}' is already registered") - self._workflow_aliases[options.alias] = workflow_name + alias = options.get('alias') + if alias: + if alias in self._workflow_aliases: + raise KeyError(f"Workflow alias '{alias}' is already registered") + self._workflow_aliases[alias] = workflow_name logger.info(f"Registered workflow '{workflow_name}'") return f @@ -89,7 +87,7 @@ def decorator(f: Callable) -> Callable: def activity( self, func: Optional[Callable] = None, - **kwargs: RegisterActivityOptions + **kwargs: Unpack[RegisterActivityOptions] ) -> Callable: """ Register an activity function. @@ -109,7 +107,7 @@ def activity( options = RegisterActivityOptions(**kwargs) def decorator(f: Callable) -> Callable: - activity_name = options.name or f.__name__ + activity_name = options.get('name') or f.__name__ if activity_name in self._activities: raise KeyError(f"Activity '{activity_name}' is already registered") @@ -117,10 +115,11 @@ def decorator(f: Callable) -> Callable: self._activities[activity_name] = f # Register alias if provided - if options.alias: - if options.alias in self._activity_aliases: - raise KeyError(f"Activity alias '{options.alias}' is already registered") - self._activity_aliases[options.alias] = activity_name + alias = options.get('alias') + if alias: + if alias in self._activity_aliases: + raise KeyError(f"Activity alias '{alias}' is already registered") + self._activity_aliases[alias] = activity_name logger.info(f"Registered activity '{activity_name}'") return f diff --git a/tests/cadence/worker/test_registry.py b/tests/cadence/worker/test_registry.py index e047c11..57f345b 100644 --- a/tests/cadence/worker/test_registry.py +++ b/tests/cadence/worker/test_registry.py @@ -170,7 +170,7 @@ def test_options_class(self, registration_type): if registration_type == "workflow": options = RegisterWorkflowOptions(name="custom_name", alias="custom_alias") - @reg.workflow(**options.__dict__) + @reg.workflow(**options) def test_func(): return "test" @@ -179,7 +179,7 @@ def test_func(): else: options = RegisterActivityOptions(name="custom_name", alias="custom_alias") - @reg.activity(**options.__dict__) + @reg.activity(**options) def test_func(): return "test"