Skip to content

Commit b7b5d71

Browse files
Merge pull request #930 from gooddata/snapshot-master-484fc20c-to-rel/dev
[bot] Merge master/484fc20c into rel/dev
2 parents 7221df4 + 484fc20 commit b7b5d71

File tree

8 files changed

+126
-18
lines changed

8 files changed

+126
-18
lines changed

gooddata-flexconnect/gooddata_flexconnect/function/flight_methods.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
from gooddata_flexconnect.function.function_task import FlexConnectFunctionTask
2222

2323
_LOGGER = structlog.get_logger("gooddata_flexconnect.rpc")
24-
_DEFAULT_TASK_WAIT = 60.0
2524

2625

2726
class _FlexConnectServerMethods(FlightServerMethods):
28-
def __init__(self, ctx: ServerContext, registry: FlexConnectFunctionRegistry) -> None:
27+
def __init__(self, ctx: ServerContext, registry: FlexConnectFunctionRegistry, call_deadline_ms: float) -> None:
2928
self._ctx = ctx
3029
self._registry = registry
30+
self._call_deadline = call_deadline_ms / 1000
3131

3232
@staticmethod
3333
def _create_descriptor(fun_name: str, metadata: Optional[dict]) -> pyarrow.flight.FlightDescriptor:
@@ -148,8 +148,13 @@ def get_flight_info(
148148

149149
try:
150150
# XXX: this should be enhanced to implement polling
151-
task_result = self._ctx.task_executor.wait_for_result(task.task_id, _DEFAULT_TASK_WAIT)
151+
task_result = self._ctx.task_executor.wait_for_result(task.task_id, self._call_deadline)
152152
except TaskWaitTimeoutError:
153+
cancelled = self._ctx.task_executor.cancel(task.task_id)
154+
_LOGGER.warning(
155+
"flexconnect_fun_call_timeout", task_id=task.task_id, fun=task.fun_name, cancelled=cancelled
156+
)
157+
153158
raise ErrorInfo.for_reason(
154159
ErrorCode.TIMEOUT, f"GetFlightInfo timed out while waiting for task {task.task_id}."
155160
).to_timeout_error()
@@ -195,6 +200,27 @@ def do_get(
195200

196201
_FLEX_CONNECT_CONFIG_SECTION = "flexconnect"
197202
_FLEX_CONNECT_FUNCTION_LIST = "functions"
203+
_FLEX_CONNECT_CALL_DEADLINE_MS = "call_deadline_ms"
204+
_DEFAULT_FLEX_CONNECT_CALL_DEADLINE_MS = 180_000
205+
206+
207+
def _read_call_deadline_ms(ctx: ServerContext) -> int:
208+
call_deadline = ctx.settings.get(f"{_FLEX_CONNECT_CONFIG_SECTION}.{_FLEX_CONNECT_CALL_DEADLINE_MS}")
209+
if call_deadline is None:
210+
return _DEFAULT_FLEX_CONNECT_CALL_DEADLINE_MS
211+
212+
try:
213+
call_deadline_ms = int(call_deadline)
214+
if call_deadline_ms <= 0:
215+
raise ValueError()
216+
217+
return call_deadline_ms
218+
except ValueError:
219+
raise ValueError(
220+
f"Value of {_FLEX_CONNECT_CONFIG_SECTION}.{_FLEX_CONNECT_CALL_DEADLINE_MS} must "
221+
f"be a positive number - duration, in milliseconds, that FlexConnect function "
222+
f"calls are expected to run."
223+
)
198224

199225

200226
@flight_server_methods
@@ -209,7 +235,9 @@ def create_flexconnect_flight_methods(ctx: ServerContext) -> FlightServerMethods
209235
:return: new instance of Flight RPC server methods to integrate into the server
210236
"""
211237
modules = list(ctx.settings.get(f"{_FLEX_CONNECT_CONFIG_SECTION}.{_FLEX_CONNECT_FUNCTION_LIST}") or [])
238+
call_deadline_ms = _read_call_deadline_ms(ctx)
239+
212240
_LOGGER.info("flexconnect_init", modules=modules)
213241
registry = FlexConnectFunctionRegistry().load(ctx, modules)
214242

215-
return _FlexConnectServerMethods(ctx, registry)
243+
return _FlexConnectServerMethods(ctx, registry, call_deadline_ms)

gooddata-flexconnect/gooddata_flexconnect/function/function_task.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,14 @@ def run(self) -> Union[TaskResult, TaskError]:
4545
headers=self._headers,
4646
)
4747

48+
# switch task to non-cancellable state; once the code creates
49+
# and returns the result, the task successfully executed and there
50+
# is nothing to cancel.
51+
#
52+
# NOTE: if the switch finds that task got cancelled already, it
53+
# bails and raises error.
54+
self.switch_non_cancellable()
55+
4856
return FlightDataTaskResult.for_data(result)
4957

5058
def on_task_cancel(self) -> None:

gooddata-flexconnect/tests/server/conftest.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,11 @@ def flexconnect_server(
7676
tls: bool = False,
7777
mtls: bool = False,
7878
) -> GoodDataFlightServer:
79-
envvar = ", ".join([f'"{module}"' for module in modules])
80-
envvar = f"[{envvar}]"
79+
funs = ", ".join([f'"{module}"' for module in modules])
80+
funs = f"[{funs}]"
8181

82-
os.environ["GOODDATA_FLIGHT_FLEXCONNECT__FUNCTIONS"] = envvar
82+
os.environ["GOODDATA_FLIGHT_FLEXCONNECT__FUNCTIONS"] = funs
83+
os.environ["GOODDATA_FLIGHT_FLEXCONNECT__CALL_DEADLINE_MS"] = "500"
8384

8485
with server(create_flexconnect_flight_methods, tls, mtls) as s:
8586
yield s

gooddata-flexconnect/tests/server/funs/fun1.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
from gooddata_flight_server import ArrowData
66

77

8-
class _SimpleFun(FlexConnectFunction):
9-
Name = "SimpleFun"
8+
class _SimpleFun1(FlexConnectFunction):
9+
Name = "SimpleFun1"
1010
Schema = pyarrow.schema(
1111
fields=[
1212
pyarrow.field("col1", pyarrow.int64()),

gooddata-flexconnect/tests/server/funs/fun2.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
_DATA: Optional[pyarrow.Table] = None
99

1010

11-
class _SimpleFun(FlexConnectFunction):
12-
Name = "SimpleFun"
11+
class _SimpleFun2(FlexConnectFunction):
12+
Name = "SimpleFun2"
1313
Schema = pyarrow.schema(
1414
fields=[
1515
pyarrow.field("col1", pyarrow.int64()),
@@ -37,5 +37,5 @@ def on_load(ctx: ServerContext) -> None:
3737
"col2": ["a", "b", "c"],
3838
"col3": [True, False, True],
3939
},
40-
schema=_SimpleFun.Schema,
40+
schema=_SimpleFun2.Schema,
4141
)
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# (C) 2024 GoodData Corporation
2+
import time
3+
from typing import Optional
4+
5+
import pyarrow
6+
from gooddata_flexconnect.function.function import FlexConnectFunction
7+
from gooddata_flight_server import ArrowData
8+
9+
_DATA: Optional[pyarrow.Table] = None
10+
11+
12+
class _LongRunningFun(FlexConnectFunction):
13+
Name = "LongRunningFun"
14+
Schema = pyarrow.schema(
15+
fields=[
16+
pyarrow.field("col1", pyarrow.int64()),
17+
pyarrow.field("col2", pyarrow.string()),
18+
pyarrow.field("col3", pyarrow.bool_()),
19+
]
20+
)
21+
22+
def call(
23+
self,
24+
parameters: dict,
25+
columns: tuple[str, ...],
26+
headers: dict[str, list[str]],
27+
) -> ArrowData:
28+
# sleep is intentionally setup to be longer than the deadline for
29+
# the function invocation (see conftest.py // flexconnect_server fixture)
30+
time.sleep(1)
31+
32+
return pyarrow.table(
33+
data={
34+
"col1": [1, 2, 3],
35+
"col2": ["a", "b", "c"],
36+
"col3": [True, False, True],
37+
},
38+
schema=self.Schema,
39+
)

gooddata-flexconnect/tests/server/test_flexconnect_server.py

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
# (C) 2024 GoodData Corporation
22
import orjson
33
import pyarrow.flight
4+
import pytest
5+
from gooddata_flight_server import ErrorCode
46

7+
from tests.assert_error_info import assert_error_code
58
from tests.server.conftest import flexconnect_server
69

710

@@ -16,12 +19,12 @@ def test_basic_function():
1619
assert fun_info.descriptor.command is not None
1720
assert len(fun_info.descriptor.command)
1821
cmd = orjson.loads(fun_info.descriptor.command)
19-
assert cmd["functionName"] == "SimpleFun"
22+
assert cmd["functionName"] == "SimpleFun1"
2023

2124
descriptor = pyarrow.flight.FlightDescriptor.for_command(
2225
orjson.dumps(
2326
{
24-
"functionName": "SimpleFun",
27+
"functionName": "SimpleFun1",
2528
"parameters": {"test1": 1, "test2": 2, "test3": 3},
2629
}
2730
)
@@ -45,7 +48,7 @@ def test_function_with_on_load():
4548
descriptor = pyarrow.flight.FlightDescriptor.for_command(
4649
orjson.dumps(
4750
{
48-
"functionName": "SimpleFun",
51+
"functionName": "SimpleFun2",
4952
"parameters": {"test1": 1, "test2": 2, "test3": 3},
5053
}
5154
)
@@ -69,12 +72,12 @@ def test_basic_function_tls(tls_ca_cert):
6972
assert fun_info.descriptor.command is not None
7073
assert len(fun_info.descriptor.command)
7174
cmd = orjson.loads(fun_info.descriptor.command)
72-
assert cmd["functionName"] == "SimpleFun"
75+
assert cmd["functionName"] == "SimpleFun1"
7376

7477
descriptor = pyarrow.flight.FlightDescriptor.for_command(
7578
orjson.dumps(
7679
{
77-
"functionName": "SimpleFun",
80+
"functionName": "SimpleFun1",
7881
"parameters": {"test1": 1, "test2": 2, "test3": 3},
7982
}
8083
)
@@ -84,3 +87,32 @@ def test_basic_function_tls(tls_ca_cert):
8487

8588
assert len(data) == 3
8689
assert data.column_names == ["col1", "col2", "col3"]
90+
91+
92+
def test_function_with_call_deadline():
93+
"""
94+
Flight RPC implementation that invokes FlexConnect can be setup with
95+
deadline for the invocation duration (done by GetFlightInfo).
96+
97+
If the function invocation (or wait for the invocation) exceeds the
98+
deadline, the GetFlightInfo will fail with timeout and the underlying
99+
task will be cancelled (if possible).
100+
101+
In these cases, the GetFlightInfo raises FlightTimedOutError with
102+
appropriate error code.
103+
"""
104+
with flexconnect_server(["tests.server.funs.fun3"]) as s:
105+
c = pyarrow.flight.FlightClient(s.location)
106+
descriptor = pyarrow.flight.FlightDescriptor.for_command(
107+
orjson.dumps(
108+
{
109+
"functionName": "LongRunningFun",
110+
"parameters": {"test1": 1, "test2": 2, "test3": 3},
111+
}
112+
)
113+
)
114+
115+
with pytest.raises(pyarrow.flight.FlightTimedOutError) as e:
116+
c.get_flight_info(descriptor)
117+
118+
assert_error_code(ErrorCode.TIMEOUT, e.value)

gooddata-flight-server/gooddata_flight_server/tasks/thread_task_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ def cancel(self) -> bool:
270270
"""
271271
Cancels the execution.
272272
273-
IMPORTANT: task executor most not hold any locks at the time of cancellation.
273+
IMPORTANT: task executor must not hold any locks at the time of cancellation.
274274
275275
:return: True if cancel was successful, false if it was not possible
276276
"""

0 commit comments

Comments
 (0)