Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ jobs:

- name: check out repository
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: 3.11

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
Expand All @@ -31,6 +36,9 @@ jobs:
run: |
python tests/upload_server.py &

- name: Install dependencies
run: pip install .

- name: instal pytest
run: pip install pytest

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
target: development
image: tesp-api
environment:
- CONTAINER_TYPE=docker # Set to "docker", "singularity", or "both"
- CONTAINER_TYPE=singularity # Set to "docker", "singularity", or "both"
container_name: tesp-api
privileged: true
ports:
Expand Down
24 changes: 23 additions & 1 deletion docker/pulsar_rest/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,28 @@ RUN pip install 'pulsar-app[web]'
FROM python-base as development
COPY --from=builder $PYSETUP_PATH $PYSETUP_PATH

# Install dependencies required by Apptainer (Singularity)
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
curl \
gnupg-agent \
software-properties-common \
lsb-release \
wget \
libseccomp2 \
uidmap \
squashfs-tools \
squashfuse \
fuse2fs \
fuse-overlayfs \
fakeroot \
cryptsetup

# Download and install Apptainer
ARG APPTAINER_VERSION=1.3.6
RUN curl -LO https://github.com/apptainer/apptainer/releases/download/v${APPTAINER_VERSION}/apptainer_${APPTAINER_VERSION}_amd64.deb \
&& apt-get install -y ./apptainer_${APPTAINER_VERSION}_amd64.deb \
&& rm apptainer_${APPTAINER_VERSION}_amd64.deb

RUN apt-get update && apt-get install -y curl gnupg-agent software-properties-common lsb-release
RUN curl -fsSL https://download.docker.com/linux/debian/gpg | apt-key add -
RUN add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/debian $(lsb_release -cs) stable"
Expand All @@ -37,4 +59,4 @@ WORKDIR $PYSETUP_PATH
COPY startup.sh startup.sh
RUN pulsar-config --host 0.0.0.0
EXPOSE 8913
CMD ["/bin/bash", "./startup.sh"]
CMD ["/bin/bash", "./startup.sh"]
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repository = "https://github.com/ndopj/tesp-api"

[tool.poetry.dependencies]
python = "^3.10.0"
aio_pika = "^9.5.7"
fastapi = "^0.75.1"
orjson = "^3.6.8"
gunicorn = "^20.1.0"
Expand Down
3 changes: 2 additions & 1 deletion settings.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
db.mongodb_uri = "mongodb://localhost:27017"
pulsar.url = "http://localhost:8913"
pulsar.status.poll_interval = 4
pulsar.status.max_polls = 100
pulsar.status.max_polls = 400
pulsar.client_timeout = 30

logging.level = "DEBUG"
logging.output_json = false
Expand Down
13 changes: 11 additions & 2 deletions tesp_api/api/endpoints/endpoint_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,14 @@ def resource_not_found_response(message: Maybe[str] = Nothing):


def response_from_model(model: BaseModel, model_rules: dict = None) -> Response:
return Response(model.json(**(model_rules if model_rules else {}), by_alias=False),
status_code=200, media_type='application/json')
response = Response(
model.json(**(model_rules if model_rules else {}), by_alias=False),
status_code=200,
media_type='application/json'
)
# FORCE NO CACHING
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
response.headers["Pragma"] = "no-cache"
response.headers["Expires"] = "0"

return response
25 changes: 14 additions & 11 deletions tesp_api/repository/task_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,20 @@ def cancel_task(
p_author: Maybe[str],
task_id: ObjectId
) -> Promise:
full_search_query = dict()
full_search_query.update({'_id': task_id})
full_search_query.update(p_author.maybe({}, lambda a: {'author': a}))

return Promise(lambda resolve, reject: resolve(full_search_query)) \
.then(self._tasks.find_one) \
.then(lambda _task: self.update_task(
{'_id': task_id},
{'$set': {'state': TesTaskState.CANCELED}}
)).map(lambda updated_task: updated_task
.map(lambda _updated_task: _updated_task.id))\
search_query = {
'_id': task_id,
'state': {'$in': [
TesTaskState.QUEUED,
TesTaskState.INITIALIZING,
TesTaskState.RUNNING
]}
}
search_query.update(p_author.maybe({}, lambda a: {'author': a}))
update_query = {'$set': {'state': TesTaskState.CANCELED}}

return self.update_task(search_query, update_query)\
.map(lambda updated_task: updated_task
.map(lambda _updated_task: _updated_task.id))\
.catch(handle_data_layer_error)


Expand Down
135 changes: 82 additions & 53 deletions tesp_api/service/event_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from tesp_api.repository.task_repository import task_repository
from tesp_api.service.file_transfer_service import file_transfer_service
from tesp_api.service.error import pulsar_event_handle_error, TaskNotFoundError, TaskExecutorError
from tesp_api.service.pulsar_operations import PulsarRestOperations, PulsarAmpqOperations, DataType
from tesp_api.service.pulsar_operations import PulsarRestOperations, PulsarAmqpOperations, DataType
from tesp_api.repository.model.task import (
TesTaskState,
TesTaskExecutor,
Expand All @@ -29,6 +29,7 @@

CONTAINER_TYPE = os.getenv("CONTAINER_TYPE", "docker")


@local_handler.register(event_name="queued_task")
def handle_queued_task(event: Event) -> None:
"""
Expand All @@ -39,8 +40,9 @@ def handle_queued_task(event: Event) -> None:
match pulsar_service.get_operations():
case PulsarRestOperations() as pulsar_rest_operations:
dispatch_event('queued_task_rest', {**payload, 'pulsar_operations': pulsar_rest_operations})
case PulsarAmpqOperations() as pulsar_ampq_operations:
dispatch_event('queued_task_ampq', {**payload, 'pulsar_operations': pulsar_ampq_operations})
case PulsarAmqpOperations() as pulsar_amqp_operations:
dispatch_event('queued_task_amqp', {**payload, 'pulsar_operations': pulsar_amqp_operations})


@local_handler.register(event_name="queued_task_rest")
async def handle_queued_task_rest(event: Event):
Expand All @@ -53,12 +55,37 @@ async def handle_queued_task_rest(event: Event):

print(f"Queued task rest: {task_id}")

await Promise(lambda resolve, reject: resolve(None))\
.then(lambda nothing: pulsar_operations.setup_job(task_id))\
.map(lambda setup_job_result: dispatch_event('initialize_task', {**payload, 'task_config': setup_job_result}))\
.catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations))\
await Promise(lambda resolve, reject: resolve(None)) \
.then(lambda nothing: pulsar_operations.setup_job(task_id)) \
.map(lambda setup_job_result: dispatch_event('initialize_task', {**payload, 'task_config': setup_job_result})) \
.catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)) \
.then(lambda x: x) # Invokes promise, potentially from error handler


@local_handler.register(event_name="queued_task_amqp")
async def handle_queued_task_amqp(event: Event):
"""
Sets up the job in Pulsar via AMQP operations and dispatches an 'initialize_task' event.
"""
event_name, payload = event
task_id: ObjectId = payload['task_id']
pulsar_operations: PulsarAmqpOperations = payload['pulsar_operations']

print(f"Queued task AMQP: {task_id}")

try:
# Setup job via AMQP
setup_job_result = await pulsar_operations.setup_job(task_id)

# Dispatch initialize event
await dispatch_event('initialize_task', {
**payload,
'task_config': setup_job_result
})
except Exception as error:
await pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)


@local_handler.register(event_name="initialize_task")
async def handle_initializing_task(event: Event) -> None:
"""
Expand All @@ -69,12 +96,11 @@ async def handle_initializing_task(event: Event) -> None:
task_id: ObjectId = payload['task_id']
pulsar_operations: PulsarRestOperations = payload['pulsar_operations']

# Merged Logic: Using the feature-complete setup_data from the new version
async def setup_data(job_id: ObjectId,
resources: TesTaskResources,
volumes: List[str],
inputs: List[TesTaskInput],
outputs: List[TesTaskOutput]):
resources: TesTaskResources,
volumes: List[str],
inputs: List[TesTaskInput],
outputs: List[TesTaskOutput]):
resource_conf: dict
volume_confs: List[dict] = []
input_confs: List[dict] = []
Expand Down Expand Up @@ -109,28 +135,29 @@ async def setup_data(job_id: ObjectId,
return resource_conf, volume_confs, input_confs, output_confs

print(f"Initializing task: {task_id}")
await Promise(lambda resolve, reject: resolve(None))\
await Promise(lambda resolve, reject: resolve(None)) \
.then(lambda nothing: task_repository.update_task_state(
task_id,
TesTaskState.QUEUED,
TesTaskState.INITIALIZING
)).map(lambda updated_task: get_else_throw(
updated_task, TaskNotFoundError(task_id, Just(TesTaskState.QUEUED))
)).then(lambda updated_task: setup_data(
task_id,
maybe_of(updated_task.resources).maybe(None, lambda x: x),
maybe_of(updated_task.volumes).maybe([], lambda x: x),
maybe_of(updated_task.inputs).maybe([], lambda x: x),
maybe_of(updated_task.outputs).maybe([], lambda x: x)
)).map(lambda res_input_output_confs: dispatch_event('run_task', {
**payload,
'resource_conf': res_input_output_confs[0],
'volume_confs': res_input_output_confs[1],
'input_confs': res_input_output_confs[2],
'output_confs': res_input_output_confs[3]
})).catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations))\
task_id,
TesTaskState.QUEUED,
TesTaskState.INITIALIZING
)).map(lambda updated_task: get_else_throw(
updated_task, TaskNotFoundError(task_id, Just(TesTaskState.QUEUED))
)).then(lambda updated_task: setup_data(
task_id,
maybe_of(updated_task.resources).maybe(None, lambda x: x),
maybe_of(updated_task.volumes).maybe([], lambda x: x),
maybe_of(updated_task.inputs).maybe([], lambda x: x),
maybe_of(updated_task.outputs).maybe([], lambda x: x)
)).map(lambda res_input_output_confs: dispatch_event('run_task', {
**payload,
'resource_conf': res_input_output_confs[0],
'volume_confs': res_input_output_confs[1],
'input_confs': res_input_output_confs[2],
'output_confs': res_input_output_confs[3]
})).catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)) \
.then(lambda x: x)


@local_handler.register(event_name="run_task")
async def handle_run_task(event: Event) -> None:
"""
Expand All @@ -146,8 +173,8 @@ async def handle_run_task(event: Event) -> None:
input_confs: List[dict] = payload['input_confs']
output_confs: List[dict] = payload['output_confs']
pulsar_operations: PulsarRestOperations = payload['pulsar_operations']
run_command_str = None

run_command_str = None
command_start_time = datetime.datetime.now(datetime.timezone.utc)

try:
Expand Down Expand Up @@ -175,7 +202,7 @@ async def handle_run_task(event: Event) -> None:
)

stage_exec = TesTaskExecutor(image="willdockerhub/curl-wget:latest", command=[], workdir=Path("/downloads"))

# Stage-in command
stage_in_cmd = ""
stage_in_mount = ""
Expand Down Expand Up @@ -211,7 +238,6 @@ async def handle_run_task(event: Event) -> None:
non_empty_parts = [p.strip() for p in parts if p and p.strip()]
run_command_str = " && ".join(non_empty_parts) if non_empty_parts else None

# Resume with the polished version's logic for execution and state management
command_start_time = datetime.datetime.now(datetime.timezone.utc)
command_status: dict

Expand All @@ -231,27 +257,28 @@ async def handle_run_task(event: Event) -> None:
command_status.get('returncode', -1)
)

current_task_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id})
current_task_obj = get_else_throw(current_task_monad, TaskNotFoundError(task_id))
current_task_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id})
current_task_obj = get_else_throw(current_task_monad, TaskNotFoundError(task_id))

if current_task_obj.state == TesTaskState.CANCELED:
print(f"Task {task_id} found CANCELED after job completion polling. Aborting state changes.")
return
return

if command_status.get('returncode', -1) != 0:
print(f"Task {task_id} executor error (return code: {command_status.get('returncode', -1)}). Setting state to EXECUTOR_ERROR.")
print(
f"Task {task_id} executor error (return code: {command_status.get('returncode', -1)}). Setting state to EXECUTOR_ERROR.")
await task_repository.update_task_state(task_id, TesTaskState.RUNNING, TesTaskState.EXECUTOR_ERROR)
await pulsar_operations.erase_job(task_id)
return
return

print(f"Task {task_id} completed successfully. Setting state to COMPLETE.")
await Promise(lambda resolve, reject: resolve(None)) \
.then(lambda ignored: task_repository.update_task_state(
task_id, TesTaskState.RUNNING, TesTaskState.COMPLETE
)) \
task_id, TesTaskState.RUNNING, TesTaskState.COMPLETE
)) \
.map(lambda task_after_complete_update: get_else_throw(
task_after_complete_update, TaskNotFoundError(task_id, Just(TesTaskState.RUNNING))
)) \
task_after_complete_update, TaskNotFoundError(task_id, Just(TesTaskState.RUNNING))
)) \
.then(lambda ignored: pulsar_operations.erase_job(task_id)) \
.catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)) \
.then(lambda x: x)
Expand All @@ -262,22 +289,24 @@ async def handle_run_task(event: Event) -> None:
await pulsar_operations.kill_job(task_id)
await pulsar_operations.erase_job(task_id)
print(f"Task {task_id} Pulsar job cleanup attempted after asyncio cancellation.")

except Exception as error:
print(f"Exception in handle_run_task for task {task_id}: {type(error).__name__} - {error}")

task_state_after_error_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id})
if task_state_after_error_monad.is_just() and task_state_after_error_monad.value.state == TesTaskState.CANCELED:
print(f"Task {task_id} is already CANCELED. Exception '{type(error).__name__}' likely due to this. No further error processing by handler.")
return
print(
f"Task {task_id} is already CANCELED. Exception '{type(error).__name__}' likely due to this. No further error processing by handler.")
return

print(f"Task {task_id} not CANCELED; proceeding with pulsar_event_handle_error for '{type(error).__name__}'.")
error_handler_result = pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)
if asyncio.iscoroutine(error_handler_result) or isinstance(error_handler_result, _Promise):
await error_handler_result

try:
print(f"Ensuring Pulsar job for task {task_id} is erased after general error handling in run_task.")
await pulsar_operations.erase_job(task_id)
except Exception as final_erase_error:
print(f"Error during final Pulsar erase attempt for task {task_id} after general error: {final_erase_error}")

# try:
# print(f"Ensuring Pulsar job for task {task_id} is erased after general error handling in run_task.")
# await pulsar_operations.erase_job(task_id)
# except Exception as final_erase_error:
# print(
# f"Error during final Pulsar erase attempt for task {task_id} after general error: {final_erase_error}")
Loading
Loading