Skip to content

Commit fe6d7c9

Browse files
authored
chore: optimization to reduce number of open TCP connections while running zonal system tests (#1691)
chore: optimization to reduce number of open TCP connections while running zonal system tests 1. Increase `ulimit -n 10000` before ssh'ing into the VM where system tests for zonal buckets are running. 2. Delete `mrd` and `writer` instance and trigger `gc.collect()` ( this alone should suffice but increasing doing the above optimization to avoid future issues.
1 parent bee6089 commit fe6d7c9

File tree

3 files changed

+36
-38
lines changed

3 files changed

+36
-38
lines changed

cloudbuild/run_zonal_tests.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@ pip install -e .
2222
echo '--- Setting up environment variables on VM ---'
2323
export ZONAL_BUCKET=${_ZONAL_BUCKET}
2424
export RUN_ZONAL_SYSTEM_TESTS=True
25-
echo '--- Running Zonal tests on VM ---'
25+
CURRENT_ULIMIT=$(ulimit -n)
26+
echo '--- Running Zonal tests on VM with ulimit set to ---' $CURRENT_ULIMIT
2627
pytest -vv -s --log-format='%(asctime)s %(levelname)s %(message)s' --log-date-format='%H:%M:%S' tests/system/test_zonal.py

cloudbuild/zb-system-tests-cloudbuild.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ substitutions:
33
_ZONE: "us-central1-a"
44
_SHORT_BUILD_ID: ${BUILD_ID:0:8}
55
_VM_NAME: "py-sdk-sys-test-${_SHORT_BUILD_ID}"
6+
_ULIMIT: "10000" # 10k, for gRPC bidi streams
67

78

89

@@ -67,7 +68,7 @@ steps:
6768
# Execute the script on the VM via SSH.
6869
# Capture the exit code to ensure cleanup happens before the build fails.
6970
set +e
70-
gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} bash run_zonal_tests.sh"
71+
gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="ulimit -n {_ULIMIT}; COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} bash run_zonal_tests.sh"
7172
EXIT_CODE=$?
7273
set -e
7374

tests/system/test_zonal.py

Lines changed: 32 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
# py standard imports
2-
import asyncio
32
import os
43
import uuid
54
from io import BytesIO
@@ -8,6 +7,7 @@
87
import google_crc32c
98

109
import pytest
10+
import gc
1111

1212
# current library imports
1313
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
@@ -19,6 +19,7 @@
1919
AsyncMultiRangeDownloader,
2020
)
2121

22+
2223
pytestmark = pytest.mark.skipif(
2324
os.getenv("RUN_ZONAL_SYSTEM_TESTS") != "True",
2425
reason="Zonal system tests need to be explicitly enabled. This helps scheduling tests in Kokoro and Cloud Build.",
@@ -36,36 +37,6 @@ def _get_equal_dist(a: int, b: int) -> tuple[int, int]:
3637
return a + step, a + 2 * step
3738

3839

39-
async def write_one_appendable_object(
40-
bucket_name: str,
41-
object_name: str,
42-
data: bytes,
43-
) -> None:
44-
"""Helper to write an appendable object."""
45-
grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client
46-
writer = AsyncAppendableObjectWriter(grpc_client, bucket_name, object_name)
47-
await writer.open()
48-
await writer.append(data)
49-
await writer.close()
50-
51-
52-
@pytest.fixture(scope="function")
53-
def appendable_object(storage_client, blobs_to_delete):
54-
"""Fixture to create and cleanup an appendable object."""
55-
object_name = f"appendable_obj_for_mrd-{str(uuid.uuid4())[:4]}"
56-
asyncio.run(
57-
write_one_appendable_object(
58-
_ZONAL_BUCKET,
59-
object_name,
60-
_BYTES_TO_UPLOAD,
61-
)
62-
)
63-
yield object_name
64-
65-
# Clean up; use json client (i.e. `storage_client` fixture) to delete.
66-
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
67-
68-
6940
@pytest.mark.asyncio
7041
@pytest.mark.parametrize(
7142
"object_size",
@@ -114,6 +85,9 @@ async def test_basic_wrd(
11485

11586
# Clean up; use json client (i.e. `storage_client` fixture) to delete.
11687
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
88+
del writer
89+
del mrd
90+
gc.collect()
11791

11892

11993
@pytest.mark.asyncio
@@ -161,12 +135,20 @@ async def test_basic_wrd_in_slices(storage_client, blobs_to_delete, object_size)
161135

162136
# Clean up; use json client (i.e. `storage_client` fixture) to delete.
163137
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
138+
del writer
139+
del mrd
140+
gc.collect()
164141

165142

166143
@pytest.mark.asyncio
167144
@pytest.mark.parametrize(
168145
"flush_interval",
169-
[2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024, _DEFAULT_FLUSH_INTERVAL_BYTES],
146+
[
147+
2 * 1024 * 1024,
148+
4 * 1024 * 1024,
149+
8 * 1024 * 1024,
150+
_DEFAULT_FLUSH_INTERVAL_BYTES,
151+
],
170152
)
171153
async def test_wrd_with_non_default_flush_interval(
172154
storage_client,
@@ -214,6 +196,9 @@ async def test_wrd_with_non_default_flush_interval(
214196

215197
# Clean up; use json client (i.e. `storage_client` fixture) to delete.
216198
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
199+
del writer
200+
del mrd
201+
gc.collect()
217202

218203

219204
@pytest.mark.asyncio
@@ -237,20 +222,28 @@ async def test_read_unfinalized_appendable_object(storage_client, blobs_to_delet
237222

238223
# Clean up; use json client (i.e. `storage_client` fixture) to delete.
239224
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
225+
del writer
226+
del mrd
227+
gc.collect()
240228

241229

242230
@pytest.mark.asyncio
243-
async def test_mrd_open_with_read_handle(appendable_object):
244-
grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client
231+
async def test_mrd_open_with_read_handle():
232+
grpc_client = AsyncGrpcClient().grpc_client
233+
object_name = f"test_read_handl-{str(uuid.uuid4())[:4]}"
234+
writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
235+
await writer.open()
236+
await writer.append(_BYTES_TO_UPLOAD)
237+
await writer.close()
245238

246-
mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, appendable_object)
239+
mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
247240
await mrd.open()
248241
read_handle = mrd.read_handle
249242
await mrd.close()
250243

251244
# Open a new MRD using the `read_handle` obtained above
252245
new_mrd = AsyncMultiRangeDownloader(
253-
grpc_client, _ZONAL_BUCKET, appendable_object, read_handle=read_handle
246+
grpc_client, _ZONAL_BUCKET, object_name, read_handle=read_handle
254247
)
255248
await new_mrd.open()
256249
# persisted_size not set when opened with read_handle
@@ -259,3 +252,6 @@ async def test_mrd_open_with_read_handle(appendable_object):
259252
await new_mrd.download_ranges([(0, 0, buffer)])
260253
await new_mrd.close()
261254
assert buffer.getvalue() == _BYTES_TO_UPLOAD
255+
del mrd
256+
del new_mrd
257+
gc.collect()

0 commit comments

Comments
 (0)