Skip to content

Commit bca56a9

Browse files
committed
Limit parallism to reduce pressure on the memory
1 parent fed83e8 commit bca56a9

File tree

2 files changed

+53
-52
lines changed

2 files changed

+53
-52
lines changed

tests/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2433,6 +2433,8 @@ def spark() -> "SparkSession":
24332433
spark = (
24342434
SparkSession.builder.appName("PyIceberg integration test")
24352435
.config("spark.sql.session.timeZone", "UTC")
2436+
.config("spark.sql.shuffle.partitions", "1")
2437+
.config("spark.default.parallelism", "1")
24362438
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
24372439
.config("spark.sql.catalog.integration", "org.apache.iceberg.spark.SparkCatalog")
24382440
.config("spark.sql.catalog.integration.catalog-impl", "org.apache.iceberg.rest.RESTCatalog")

tests/integration/test_deletes.py

Lines changed: 51 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -309,58 +309,57 @@ def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSes
309309
assert len(reader.read_all()) == 0
310310

311311

312-
# @pytest.mark.integration
313-
# @pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
314-
# def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
315-
# identifier = "default.test_read_multiple_batches_in_task_with_position_deletes"
316-
# multiplier = 10
317-
#
318-
# run_spark_commands(
319-
# spark,
320-
# [
321-
# f"DROP TABLE IF EXISTS {identifier}",
322-
# f"""
323-
# CREATE TABLE {identifier} (
324-
# number int
325-
# )
326-
# USING iceberg
327-
# TBLPROPERTIES(
328-
# 'format-version' = 2,
329-
# 'write.delete.mode'='merge-on-read',
330-
# 'write.update.mode'='merge-on-read',
331-
# 'write.merge.mode'='merge-on-read'
332-
# )
333-
# """,
334-
# ],
335-
# )
336-
#
337-
# tbl = session_catalog.load_table(identifier)
338-
#
339-
# arrow_table = pa.Table.from_arrays(
340-
# [
341-
# pa.array(list(range(1, 1001)) * multiplier),
342-
# ],
343-
# schema=pa.schema([pa.field("number", pa.int32())]),
344-
# )
345-
#
346-
# tbl.append(arrow_table)
347-
#
348-
# run_spark_commands(
349-
# spark,
350-
# [
351-
# f"""
352-
# DELETE FROM {identifier} WHERE number in (1, 2, 3, 4)
353-
# """,
354-
# ],
355-
# )
356-
#
357-
# tbl.refresh()
358-
#
359-
# reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader()
360-
# assert isinstance(reader, pa.RecordBatchReader)
361-
# pyiceberg_count = len(reader.read_all())
362-
# expected_count = 46 * multiplier
363-
# assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}"
312+
@pytest.mark.integration
313+
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
314+
def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
315+
identifier = "default.test_read_multiple_batches_in_task_with_position_deletes"
316+
317+
run_spark_commands(
318+
spark,
319+
[
320+
f"DROP TABLE IF EXISTS {identifier}",
321+
f"""
322+
CREATE TABLE {identifier} (
323+
number int
324+
)
325+
USING iceberg
326+
TBLPROPERTIES(
327+
'format-version' = 2,
328+
'write.delete.mode'='merge-on-read',
329+
'write.update.mode'='merge-on-read',
330+
'write.merge.mode'='merge-on-read'
331+
)
332+
""",
333+
],
334+
)
335+
336+
tbl = session_catalog.load_table(identifier)
337+
338+
arrow_table = pa.Table.from_arrays(
339+
[
340+
pa.array(list(range(1, 1001)) * 100),
341+
],
342+
schema=pa.schema([pa.field("number", pa.int32())]),
343+
)
344+
345+
tbl.append(arrow_table)
346+
347+
run_spark_commands(
348+
spark,
349+
[
350+
f"""
351+
DELETE FROM {identifier} WHERE number in (1, 2, 3, 4)
352+
""",
353+
],
354+
)
355+
356+
tbl.refresh()
357+
358+
reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader()
359+
assert isinstance(reader, pa.RecordBatchReader)
360+
pyiceberg_count = len(reader.read_all())
361+
expected_count = 46 * 100
362+
assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}"
364363

365364

366365
@pytest.mark.integration

0 commit comments

Comments
 (0)