Skip to content

Commit cd31afd

Browse files
committed
pytestadapter: Refactor test runner to make platform specific named pipe code encapuslated.
1 parent 5bf7b98 commit cd31afd

File tree

1 file changed

+81
-119
lines changed

1 file changed

+81
-119
lines changed

python_files/tests/pytestadapter/helpers.py

Lines changed: 81 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@
1414
import uuid
1515
from typing import Any, Dict, List, Optional, Tuple
1616

17-
if sys.platform == "win32":
18-
from namedpipe import NPopen
19-
20-
2117
script_dir = pathlib.Path(__file__).parent.parent.parent
2218
script_dir_child = pathlib.Path(__file__).parent.parent
2319
sys.path.append(os.fspath(script_dir))
@@ -128,52 +124,84 @@ def parse_rpc_message(data: str) -> Tuple[Dict[str, str], str]:
128124
print("json decode error")
129125

130126

131-
def _listen_on_fifo(pipe_name: str, result: List[str], completed: threading.Event):
132-
# Open the FIFO for reading
133-
fifo_path = pathlib.Path(pipe_name)
134-
with fifo_path.open() as fifo:
135-
print("Waiting for data...")
136-
while True:
137-
if completed.is_set():
138-
break # Exit loop if completed event is set
139-
data = fifo.read() # This will block until data is available
140-
if len(data) == 0:
141-
# If data is empty, assume EOF
142-
break
143-
print(f"Received: {data}")
144-
result.append(data)
145-
146-
147-
def _listen_win_named_pipe(listener, result: List[str], completed: threading.Event):
148-
all_data: list = []
149-
stream = listener.wait()
150-
while True:
151-
# Read data from collection
152-
close = stream.closed
153-
if close:
154-
break
155-
data = stream.readlines()
156-
if not data:
157-
if completed.is_set():
158-
break # Exit loop if completed event is set
159-
else:
127+
if sys.platform == "win32":
128+
from namedpipe import NPopen
129+
130+
@contextlib.contextmanager
131+
def pipe_setup_an_listen(pipe_name: str, result: List[str]):
132+
# For Windows, named pipes have a specific naming convention.
133+
pipe_path = f"\\\\.\\pipe\\{pipe_name}"
134+
135+
with NPopen("r+t", name=pipe_name, bufsize=0) as pipe:
136+
completed = threading.Event()
137+
138+
def listen():
139+
all_data: list = []
140+
stream = pipe.wait()
141+
while True:
142+
# Read data from collection
143+
close = stream.closed
144+
if close:
145+
break
146+
data = stream.readlines()
147+
if not data:
148+
if completed.is_set():
149+
break # Exit loop if completed event is set
150+
else:
151+
try:
152+
# Attempt to accept another connection if the current one closes unexpectedly
153+
print("attempt another connection")
154+
except socket.timeout:
155+
# On timeout, append all collected data to result and return
156+
# result.append("".join(all_data))
157+
return
158+
data_decoded = "".join(data)
159+
all_data.append(data_decoded)
160+
# Append all collected data to result array
161+
result.append("".join(all_data))
162+
163+
thread = threading.Thread(target=listen)
164+
thread.start()
160165
try:
161-
# Attempt to accept another connection if the current one closes unexpectedly
162-
print("attempt another connection")
163-
except socket.timeout:
164-
# On timeout, append all collected data to result and return
165-
# result.append("".join(all_data))
166-
return
167-
data_decoded = "".join(data)
168-
all_data.append(data_decoded)
169-
# Append all collected data to result array
170-
result.append("".join(all_data))
166+
yield pipe_path
167+
finally:
168+
completed.set()
169+
thread.join()
170+
else:
171+
172+
@contextlib.contextmanager
173+
def pipe_setup_an_listen(pipe_name: str, result: List[str]):
174+
# For Unix-like systems, use either the XDG_RUNTIME_DIR or a temporary directory.
175+
xdg_runtime_dir = os.getenv("XDG_RUNTIME_DIR")
176+
pipe_path = pathlib.Path(
177+
xdg_runtime_dir if xdg_runtime_dir else tempfile.gettempdir(),
178+
pipe_name,
179+
)
180+
os.mkfifo(pipe_path)
171181

182+
completed = threading.Event()
172183

173-
def _run_test_code(proc_args: List[str], proc_env, proc_cwd: str, completed: threading.Event):
174-
result = subprocess.run(proc_args, env=proc_env, cwd=proc_cwd)
175-
completed.set()
176-
return result
184+
def listen():
185+
# Open the FIFO for reading
186+
with pipe_path.open() as fifo:
187+
print("Waiting for data...")
188+
while True:
189+
if completed.is_set():
190+
break # Exit loop if completed event is set
191+
data = fifo.read() # This will block until data is available
192+
if len(data) == 0:
193+
# If data is empty, assume EOF
194+
break
195+
print(f"Received: {data}")
196+
result.append(data)
197+
198+
thread = threading.Thread(target=listen)
199+
thread.start()
200+
try:
201+
yield pipe_path
202+
finally:
203+
completed.set()
204+
thread.join()
177205

178206

179207
def runner(args: List[str]) -> Optional[List[Dict[str, Any]]]:
@@ -266,77 +294,20 @@ def runner_with_cwd_env(
266294
*args,
267295
]
268296

269-
# Generate pipe name, pipe name specific per OS type.
270-
271-
# Windows design
272-
if sys.platform == "win32":
273-
with NPopen("r+t", name=pipe_name, bufsize=0) as pipe:
274-
# Update the environment with the pipe name and PYTHONPATH.
275-
env = os.environ.copy()
276-
env.update(
277-
{
278-
"TEST_RUN_PIPE": pipe.path,
279-
"PYTHONPATH": os.fspath(pathlib.Path(__file__).parent.parent.parent),
280-
}
281-
)
282-
# if additional environment variables are passed, add them to the environment
283-
if env_add:
284-
env.update(env_add)
285-
286-
completed = threading.Event()
287-
288-
result = [] # result is a string array to store the data during threading
289-
t1: threading.Thread = threading.Thread(
290-
target=_listen_win_named_pipe, args=(pipe, result, completed)
291-
)
292-
t1.start()
293-
294-
t2 = threading.Thread(
295-
target=_run_test_code,
296-
args=(process_args, env, path, completed),
297-
)
298-
t2.start()
299-
300-
t1.join()
301-
t2.join()
302-
303-
return process_data_received(result[0]) if result else None
304-
else: # Unix design
305-
# Update the environment with the pipe name and PYTHONPATH.
297+
result = [] # result is a string array to store the data during threading
298+
with pipe_setup_an_listen(pipe_name, result) as pipe_path:
306299
env = os.environ.copy()
307300
env.update(
308301
{
309-
"TEST_RUN_PIPE": pipe_name,
302+
"TEST_RUN_PIPE": pipe_path,
310303
"PYTHONPATH": os.fspath(pathlib.Path(__file__).parent.parent.parent),
311304
}
312305
)
313306
# if additional environment variables are passed, add them to the environment
314307
if env_add:
315308
env.update(env_add)
316-
317-
# Create the FIFO (named pipe) if it doesn't exist
318-
# if not pathlib.Path.exists(pipe_name):
319-
os.mkfifo(pipe_name)
320-
321-
completed = threading.Event()
322-
323-
result = [] # result is a string array to store the data during threading
324-
t1: threading.Thread = threading.Thread(
325-
target=_listen_on_fifo, args=(pipe_name, result, completed)
326-
)
327-
t1.start()
328-
329-
t2: threading.Thread = threading.Thread(
330-
target=_run_test_code,
331-
args=(process_args, env, path, completed),
332-
)
333-
334-
t2.start()
335-
336-
t1.join()
337-
t2.join()
338-
339-
return process_data_received(result[0]) if result else None
309+
subprocess.run(process_args, env=env, cwd=path)
310+
return process_data_received(result[0]) if result else None
340311

341312

342313
def find_test_line_number(test_name: str, test_file_path) -> str:
@@ -392,13 +363,4 @@ def generate_random_pipe_name(prefix=""):
392363
if not prefix:
393364
prefix = "python-ext-rpc"
394365

395-
# For Windows, named pipes have a specific naming convention.
396-
if sys.platform == "win32":
397-
return f"\\\\.\\pipe\\{prefix}-{random_suffix}"
398-
399-
# For Unix-like systems, use either the XDG_RUNTIME_DIR or a temporary directory.
400-
xdg_runtime_dir = os.getenv("XDG_RUNTIME_DIR")
401-
if xdg_runtime_dir:
402-
return os.path.join(xdg_runtime_dir, f"{prefix}-{random_suffix}") # noqa: PTH118
403-
else:
404-
return os.path.join(tempfile.gettempdir(), f"{prefix}-{random_suffix}") # noqa: PTH118
366+
return f"{prefix}-{random_suffix}"

0 commit comments

Comments
 (0)