Skip to content

Commit 813791b

Browse files
Async SDK Clients for Orkes Conductor and Conductor OSS (#320)
1 parent b7ebaf0 commit 813791b

File tree

624 files changed

+112108
-364
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

624 files changed

+112108
-364
lines changed

docs/authorization/README.md

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,10 @@ authorization_client.remove_user_from_group(group_id, user_id)
225225
Grants a set of accesses to the specified Subject for a given Target.
226226

227227
```python
228-
from conductor.client.http.models.target_ref import TargetRef, TargetType
229-
from conductor.client.http.models.subject_ref import SubjectRef, SubjectType
228+
from conductor.client.http.models.target_ref import TargetRef
229+
from conductor.shared.http.enums.target_type import TargetType
230+
from conductor.client.http.models.subject_ref import SubjectRef
231+
from conductor.shared.http.enums.subject_type import SubjectType
230232
from conductor.client.orkes.models.access_type import AccessType
231233

232234
target = TargetRef(TargetType.WORKFLOW_DEF, "TEST_WORKFLOW")
@@ -245,7 +247,8 @@ Given the target, returns all permissions associated with it as a Dict[str, List
245247
In the returned dictionary, key is AccessType and value is a list of subjects.
246248

247249
```python
248-
from conductor.client.http.models.target_ref import TargetRef, TargetType
250+
from conductor.client.http.models.target_ref import TargetRef
251+
from conductor.shared.http.enums.target_type import TargetType
249252

250253
target = TargetRef(TargetType.WORKFLOW_DEF, WORKFLOW_NAME)
251254
target_permissions = authorization_client.get_permissions(target)
@@ -273,8 +276,10 @@ user_permissions = authorization_client.get_granted_permissions_for_user(user_id
273276
Removes a set of accesses from a specified Subject for a given Target.
274277

275278
```python
276-
from conductor.client.http.models.target_ref import TargetRef, TargetType
277-
from conductor.client.http.models.subject_ref import SubjectRef, SubjectType
279+
from conductor.client.http.models.target_ref import TargetRef
280+
from conductor.shared.http.enums.target_type import TargetType
281+
from conductor.client.http.models.subject_ref import SubjectRef
282+
from conductor.shared.http.enums.subject_type import SubjectType
278283
from conductor.client.orkes.models.access_type import AccessType
279284

280285
target = TargetRef(TargetType.WORKFLOW_DEF, "TEST_WORKFLOW")

docs/metadata/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ In order to define a workflow, you must provide a `MetadataClient` and a `Workfl
88

99
```python
1010
from conductor.client.configuration.configuration import Configuration
11-
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
11+
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
1212
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClie
1313
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
1414
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor

docs/schedule/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
## Scheduler Client
44

55
### Initialization
6+
67
```python
78
from conductor.client.configuration.configuration import Configuration
8-
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
9+
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
910
from conductor.client.orkes.orkes_scheduler_client import OrkesSchedulerClient
1011

1112
configuration = Configuration(

docs/secret/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
## Secret Client
44

55
### Initialization
6+
67
```python
78
from conductor.client.configuration.configuration import Configuration
8-
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
9+
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
910
from conductor.client.orkes.orkes_secret_client import OrkesSecretClient
1011

1112
configuration = Configuration(

docs/task/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
## Task Client
44

55
### Initialization
6+
67
```python
78
from conductor.client.configuration.configuration import Configuration
8-
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
9+
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
910
from conductor.client.orkes.orkes_task_client import OrkesTaskClient
1011

1112
configuration = Configuration(

docs/testing/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ A sample unit test code snippet is provided below.
1414

1515
```python
1616
import json
17-
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
17+
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
1818
from conductor.client.configuration.configuration import Configuration
1919
from conductor.client.http.models.workflow_test_request import WorkflowTestRequest
2020
from conductor.client.orkes.orkes_workflow_client import OrkesWorkflowClient

docs/worker/README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ Quick example below:
3838

3939
```python
4040
from conductor.client.http.models import Task, TaskResult
41-
from conductor.client.http.models.task_result_status import TaskResultStatus
41+
from conductor.shared.http.enums import TaskResultStatus
42+
4243

4344
def execute(task: Task) -> TaskResult:
4445
task_result = TaskResult(
@@ -59,7 +60,7 @@ The class must implement `WorkerInterface` class, which requires an `execute` me
5960

6061
```python
6162
from conductor.client.http.models import Task, TaskResult
62-
from conductor.client.http.models.task_result_status import TaskResultStatus
63+
from conductor.shared.http.enums import TaskResultStatus
6364
from conductor.client.worker.worker_interface import WorkerInterface
6465

6566
class SimplePythonWorker(WorkerInterface):
@@ -99,13 +100,14 @@ def python_annotated_task(input) -> object:
99100
Now you can run your workers by calling a `TaskHandler`, example:
100101

101102
```python
102-
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
103+
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
103104
from conductor.client.configuration.configuration import Configuration
104105
from conductor.client.automator.task_handler import TaskHandler
105106
from conductor.client.worker.worker import Worker
106107

107108
#### Add these lines if running on a mac####
108109
from multiprocessing import set_start_method
110+
109111
set_start_method('fork')
110112
############################################
111113

@@ -347,7 +349,7 @@ and [simple_cpp_worker.py](src/example/worker/cpp/simple_cpp_worker.py) for comp
347349
```python
348350
from conductor.client.http.models.task import Task
349351
from conductor.client.http.models.task_result import TaskResult
350-
from conductor.client.http.models.task_result_status import TaskResultStatus
352+
from conductor.shared.http.enums import TaskResultStatus
351353
from conductor.client.worker.worker_interface import WorkerInterface
352354
from ctypes import cdll
353355

docs/workflow/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
## Workflow Client
44

55
### Initialization
6+
67
```python
78
from conductor.client.configuration.configuration import Configuration
8-
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
9+
from conductor.shared.configuration.settings.authentication_settings import AuthenticationSettings
910
from conductor.client.orkes.orkes_workflow_client import OrkesWorkflowClient
1011

1112
configuration = Configuration(

examples/async/dynamic_workflow.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
"""
2+
This is a dynamic workflow that can be created and executed at run time.
3+
dynamic_workflow will run worker tasks get_user_email and send_email in the same order.
4+
For use cases in which the workflow cannot be defined statically, dynamic workflows is a useful approach.
5+
For detailed explanation, https://github.com/conductor-sdk/conductor-python/blob/main/workflows.md
6+
"""
7+
8+
import asyncio
9+
10+
from conductor.asyncio_client.automator.task_handler import TaskHandler
11+
from conductor.asyncio_client.configuration.configuration import Configuration
12+
from conductor.asyncio_client.adapters import ApiClient
13+
from conductor.asyncio_client.orkes.orkes_clients import OrkesClients
14+
from conductor.asyncio_client.worker.worker_task import worker_task
15+
from conductor.asyncio_client.workflow.conductor_workflow import AsyncConductorWorkflow
16+
17+
18+
@worker_task(task_definition_name="get_user_email")
19+
def get_user_email(userid: str) -> str:
20+
return f"{userid}@example.com"
21+
22+
23+
@worker_task(task_definition_name="send_email")
24+
def send_email(email: str, subject: str, body: str):
25+
print(f"sending email to {email} with subject {subject} and body {body}")
26+
27+
28+
async def main():
29+
# defaults to reading the configuration using following env variables
30+
# CONDUCTOR_SERVER_URL : conductor server e.g. https://play.orkes.io/api
31+
# CONDUCTOR_AUTH_KEY : API Authentication Key
32+
# CONDUCTOR_AUTH_SECRET: API Auth Secret
33+
api_config = Configuration()
34+
task_handler = TaskHandler(configuration=api_config)
35+
task_handler.start_processes()
36+
37+
async with ApiClient(api_config) as api_client:
38+
clients = OrkesClients(api_client=api_client, configuration=api_config)
39+
workflow_executor = clients.get_workflow_executor()
40+
workflow = AsyncConductorWorkflow(
41+
name="dynamic_workflow", version=1, executor=workflow_executor
42+
)
43+
get_email = get_user_email(
44+
task_ref_name="get_user_email_ref", userid=workflow.input("userid")
45+
)
46+
sendmail = send_email(
47+
task_ref_name="send_email_ref",
48+
email=get_email.output("result"),
49+
subject="Hello from Orkes",
50+
body="Test Email",
51+
)
52+
53+
workflow >> get_email >> sendmail
54+
55+
# Configure the output of the workflow
56+
workflow.output_parameters(
57+
output_parameters={"email": get_email.output("result")}
58+
)
59+
60+
workflow_run = await workflow.execute(workflow_input={"userid": "user_a"})
61+
print(f"\nworkflow output: {workflow_run.output}\n")
62+
print(
63+
f"check the workflow execution here: {api_config.ui_host}/execution/{workflow_run.workflow_id}"
64+
)
65+
66+
task_handler.stop_processes()
67+
68+
69+
if __name__ == "__main__":
70+
asyncio.run(main())

0 commit comments

Comments
 (0)