From 4576cf31e2e812923ef8ff1c0eb9c541b0995f6d Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Fri, 20 Feb 2026 23:34:47 +0530 Subject: [PATCH] Add ReflectionCache for Iceberg serialization optimization (#3456) --- .../comet/iceberg/IcebergReflection.scala | 159 +++++++++ .../operator/CometIcebergNativeScan.scala | 105 +++--- .../CometIcebergSerializationBenchmark.scala | 302 ++++++++++++++++++ 3 files changed, 503 insertions(+), 63 deletions(-) create mode 100644 spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergSerializationBenchmark.scala diff --git a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala index d778212392..fb28cced9d 100644 --- a/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala +++ b/spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala @@ -19,8 +19,167 @@ package org.apache.comet.iceberg +import java.lang.reflect.Method + import org.apache.spark.internal.Logging +/** + * Cache for Iceberg reflection metadata to avoid repeated class loading and method lookups. + * + * This cache is created once per serializePartitions() call and passed to helper methods. It + * provides ~50% serialization speedup by eliminating redundant reflection operations that would + * otherwise be performed per-task (tens of thousands of times for large tables). + * + * @param contentScanTaskClass + * org.apache.iceberg.ContentScanTask + * @param fileScanTaskClass + * org.apache.iceberg.FileScanTask + * @param contentFileClass + * org.apache.iceberg.ContentFile + * @param deleteFileClass + * org.apache.iceberg.DeleteFile + * @param schemaParserClass + * org.apache.iceberg.SchemaParser + * @param schemaClass + * org.apache.iceberg.Schema + * @param partitionSpecParserClass + * org.apache.iceberg.PartitionSpecParser + * @param partitionSpecClass + * org.apache.iceberg.PartitionSpec + * @param structLikeClass + * org.apache.iceberg.StructLike + * @param fileMethod + * ContentScanTask.file() + * @param startMethod + * ContentScanTask.start() + * @param lengthMethod + * ContentScanTask.length() + * @param partitionMethod + * ContentScanTask.partition() + * @param residualMethod + * ContentScanTask.residual() + * @param taskSchemaMethod + * FileScanTask.schema() + * @param deletesMethod + * FileScanTask.deletes() + * @param specMethod + * FileScanTask.spec() + * @param schemaToJsonMethod + * SchemaParser.toJson(Schema) + * @param specToJsonMethod + * PartitionSpecParser.toJson(PartitionSpec) + * @param deleteContentMethod + * DeleteFile.content() + * @param deleteSpecIdMethod + * DeleteFile.specId() + * @param deleteEqualityIdsMethod + * DeleteFile.equalityFieldIds() + */ +case class ReflectionCache( + // Iceberg classes + contentScanTaskClass: Class[_], + fileScanTaskClass: Class[_], + contentFileClass: Class[_], + deleteFileClass: Class[_], + schemaParserClass: Class[_], + schemaClass: Class[_], + partitionSpecParserClass: Class[_], + partitionSpecClass: Class[_], + structLikeClass: Class[_], + // ContentScanTask methods + fileMethod: Method, + startMethod: Method, + lengthMethod: Method, + partitionMethod: Method, + residualMethod: Method, + // FileScanTask methods + taskSchemaMethod: Method, + deletesMethod: Method, + specMethod: Method, + // Schema methods + schemaToJsonMethod: Method, + // PartitionSpec methods + specToJsonMethod: Method, + // DeleteFile methods + deleteContentMethod: Method, + deleteSpecIdMethod: Method, + deleteEqualityIdsMethod: Method) + +object ReflectionCache extends Logging { + + /** + * Creates a ReflectionCache by loading all Iceberg classes and methods once. + * + * This should be called once at the start of serializePartitions() and the cache passed to all + * helper methods. + * + * @return + * ReflectionCache with all classes and methods pre-loaded + */ + def create(): ReflectionCache = { + // scalastyle:off classforname + val contentScanTaskClass = Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK) + val fileScanTaskClass = Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK) + val contentFileClass = Class.forName(IcebergReflection.ClassNames.CONTENT_FILE) + val deleteFileClass = Class.forName(IcebergReflection.ClassNames.DELETE_FILE) + val schemaParserClass = Class.forName(IcebergReflection.ClassNames.SCHEMA_PARSER) + val schemaClass = Class.forName(IcebergReflection.ClassNames.SCHEMA) + val partitionSpecParserClass = + Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC_PARSER) + val partitionSpecClass = Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC) + val structLikeClass = Class.forName(IcebergReflection.ClassNames.STRUCT_LIKE) + // scalastyle:on classforname + + // ContentScanTask methods + val fileMethod = contentScanTaskClass.getMethod("file") + val startMethod = contentScanTaskClass.getMethod("start") + val lengthMethod = contentScanTaskClass.getMethod("length") + val partitionMethod = contentScanTaskClass.getMethod("partition") + val residualMethod = contentScanTaskClass.getMethod("residual") + + // FileScanTask methods + val taskSchemaMethod = fileScanTaskClass.getMethod("schema") + val deletesMethod = fileScanTaskClass.getMethod("deletes") + val specMethod = fileScanTaskClass.getMethod("spec") + + // Schema methods + val schemaToJsonMethod = schemaParserClass.getMethod("toJson", schemaClass) + schemaToJsonMethod.setAccessible(true) + + // PartitionSpec methods + val specToJsonMethod = partitionSpecParserClass.getMethod("toJson", partitionSpecClass) + + // DeleteFile methods + val deleteContentMethod = deleteFileClass.getMethod("content") + val deleteSpecIdMethod = deleteFileClass.getMethod("specId") + val deleteEqualityIdsMethod = deleteFileClass.getMethod("equalityFieldIds") + + ReflectionCache( + contentScanTaskClass = contentScanTaskClass, + fileScanTaskClass = fileScanTaskClass, + contentFileClass = contentFileClass, + deleteFileClass = deleteFileClass, + schemaParserClass = schemaParserClass, + schemaClass = schemaClass, + partitionSpecParserClass = partitionSpecParserClass, + partitionSpecClass = partitionSpecClass, + structLikeClass = structLikeClass, + fileMethod = fileMethod, + startMethod = startMethod, + lengthMethod = lengthMethod, + partitionMethod = partitionMethod, + residualMethod = residualMethod, + taskSchemaMethod = taskSchemaMethod, + deletesMethod = deletesMethod, + specMethod = specMethod, + schemaToJsonMethod = schemaToJsonMethod, + specToJsonMethod = specToJsonMethod, + deleteContentMethod = deleteContentMethod, + deleteSpecIdMethod = deleteSpecIdMethod, + deleteEqualityIdsMethod = deleteEqualityIdsMethod) + } +} + /** * Shared reflection utilities for Iceberg operations. * diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index c86b2a51bb..1fa4949272 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceR import org.apache.spark.sql.types._ import org.apache.comet.ConfigEntry -import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection} +import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection, ReflectionCache} import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.OperatorOuterClass.{Operator, SparkStructField} @@ -220,22 +220,20 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit /** * Extracts delete files from an Iceberg FileScanTask as a list (for deduplication). + * + * Uses ReflectionCache to avoid repeated class loading and method lookups. */ private def extractDeleteFilesList( task: Any, - contentFileClass: Class[_], - fileScanTaskClass: Class[_]): Seq[OperatorOuterClass.IcebergDeleteFile] = { + cache: ReflectionCache): Seq[OperatorOuterClass.IcebergDeleteFile] = { try { - // scalastyle:off classforname - val deleteFileClass = Class.forName(IcebergReflection.ClassNames.DELETE_FILE) - // scalastyle:on classforname - - val deletes = IcebergReflection.getDeleteFilesFromTask(task, fileScanTaskClass) + val deletes = cache.deletesMethod.invoke(task).asInstanceOf[java.util.List[_]] + val deletesList = if (deletes == null) new java.util.ArrayList[Any]() else deletes - deletes.asScala.flatMap { deleteFile => + deletesList.asScala.flatMap { deleteFile => try { IcebergReflection - .extractFileLocation(contentFileClass, deleteFile) + .extractFileLocation(cache.contentFileClass, deleteFile) .map { deletePath => val deleteBuilder = OperatorOuterClass.IcebergDeleteFile.newBuilder() @@ -243,8 +241,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val contentType = try { - val contentMethod = deleteFileClass.getMethod("content") - val content = contentMethod.invoke(deleteFile) + val content = cache.deleteContentMethod.invoke(deleteFile) content.toString match { case IcebergReflection.ContentTypes.POSITION_DELETES => IcebergReflection.ContentTypes.POSITION_DELETES @@ -260,8 +257,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val specId = try { - val specIdMethod = deleteFileClass.getMethod("specId") - specIdMethod.invoke(deleteFile).asInstanceOf[Int] + cache.deleteSpecIdMethod.invoke(deleteFile).asInstanceOf[Int] } catch { case _: Exception => 0 @@ -269,12 +265,12 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit deleteBuilder.setPartitionSpecId(specId) try { - val equalityIdsMethod = - deleteFileClass.getMethod("equalityFieldIds") - val equalityIds = equalityIdsMethod + val equalityIds = cache.deleteEqualityIdsMethod .invoke(deleteFile) .asInstanceOf[java.util.List[Integer]] - equalityIds.forEach(id => deleteBuilder.addEqualityIds(id)) + if (equalityIds != null) { + equalityIds.forEach(id => deleteBuilder.addEqualityIds(id)) + } } catch { case _: Exception => } @@ -304,31 +300,24 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit * (actual values) from the task. This information is used by the native execution engine to * build a constants_map for identity-transformed partition columns and to handle * partition-level filtering. + * + * Uses ReflectionCache to avoid repeated class loading and method lookups. */ private def serializePartitionData( task: Any, - contentScanTaskClass: Class[_], - fileScanTaskClass: Class[_], + cache: ReflectionCache, taskBuilder: OperatorOuterClass.IcebergFileScanTask.Builder, commonBuilder: OperatorOuterClass.IcebergScanCommon.Builder, partitionTypeToPoolIndex: mutable.HashMap[String, Int], partitionSpecToPoolIndex: mutable.HashMap[String, Int], partitionDataToPoolIndex: mutable.HashMap[String, Int]): Unit = { try { - val specMethod = fileScanTaskClass.getMethod("spec") - val spec = specMethod.invoke(task) + val spec = cache.specMethod.invoke(task) if (spec != null) { // Deduplicate partition spec try { - // scalastyle:off classforname - val partitionSpecParserClass = - Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC_PARSER) - val toJsonMethod = partitionSpecParserClass.getMethod( - "toJson", - Class.forName(IcebergReflection.ClassNames.PARTITION_SPEC)) - // scalastyle:on classforname - val partitionSpecJson = toJsonMethod + val partitionSpecJson = cache.specToJsonMethod .invoke(null, spec) .asInstanceOf[String] @@ -345,8 +334,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } // Get partition data from the task (via file().partition()) - val partitionMethod = contentScanTaskClass.getMethod("partition") - val partitionData = partitionMethod.invoke(task) + val partitionData = cache.partitionMethod.invoke(task) if (partitionData != null) { // Get the partition type/schema from the spec @@ -770,23 +758,12 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit commonBuilder.addRequiredSchema(field.build()) } - // Load Iceberg classes once (avoid repeated class loading in loop) - // scalastyle:off classforname - val contentScanTaskClass = Class.forName(IcebergReflection.ClassNames.CONTENT_SCAN_TASK) - val fileScanTaskClass = Class.forName(IcebergReflection.ClassNames.FILE_SCAN_TASK) - val contentFileClass = Class.forName(IcebergReflection.ClassNames.CONTENT_FILE) - val schemaParserClass = Class.forName(IcebergReflection.ClassNames.SCHEMA_PARSER) - val schemaClass = Class.forName(IcebergReflection.ClassNames.SCHEMA) - // scalastyle:on classforname + // Create reflection cache once - avoids repeated class loading and method lookups + // This provides ~50% serialization speedup for large tables (see issue #3456) + val cache = ReflectionCache.create() - // Cache method lookups (avoid repeated getMethod in loop) - val fileMethod = contentScanTaskClass.getMethod("file") - val startMethod = contentScanTaskClass.getMethod("start") - val lengthMethod = contentScanTaskClass.getMethod("length") - val residualMethod = contentScanTaskClass.getMethod("residual") - val taskSchemaMethod = fileScanTaskClass.getMethod("schema") - val toJsonMethod = schemaParserClass.getMethod("toJson", schemaClass) - toJsonMethod.setAccessible(true) + // Field ID mapping cache - avoid rebuilding per-task + val fieldIdMappingCache = mutable.HashMap[AnyRef, Map[String, Int]]() // Access inputRDD - safe now, DPP is resolved scanExec.inputRDD match { @@ -817,10 +794,10 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val taskBuilder = OperatorOuterClass.IcebergFileScanTask.newBuilder() - val dataFile = fileMethod.invoke(task) + val dataFile = cache.fileMethod.invoke(task) val filePathOpt = - IcebergReflection.extractFileLocation(contentFileClass, dataFile) + IcebergReflection.extractFileLocation(cache.contentFileClass, dataFile) filePathOpt match { case Some(filePath) => @@ -832,17 +809,17 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit throw new RuntimeException(msg) } - val start = startMethod.invoke(task).asInstanceOf[Long] + val start = cache.startMethod.invoke(task).asInstanceOf[Long] taskBuilder.setStart(start) - val length = lengthMethod.invoke(task).asInstanceOf[Long] + val length = cache.lengthMethod.invoke(task).asInstanceOf[Long] taskBuilder.setLength(length) - val taskSchema = taskSchemaMethod.invoke(task) + val taskSchema = cache.taskSchemaMethod.invoke(task) - val deletes = - IcebergReflection.getDeleteFilesFromTask(task, fileScanTaskClass) - val hasDeletes = !deletes.isEmpty + val deletes = cache.deletesMethod.invoke(task).asInstanceOf[java.util.List[_]] + val deletesList = if (deletes == null) new java.util.ArrayList[Any]() else deletes + val hasDeletes = !deletesList.isEmpty val schema: AnyRef = if (hasDeletes) { @@ -869,13 +846,17 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val schemaIdx = schemaToPoolIndex.getOrElseUpdate( schema, { val idx = schemaToPoolIndex.size - val schemaJson = toJsonMethod.invoke(null, schema).asInstanceOf[String] + val schemaJson = + cache.schemaToJsonMethod.invoke(null, schema).asInstanceOf[String] commonBuilder.addSchemaPool(schemaJson) idx }) taskBuilder.setSchemaIdx(schemaIdx) - val nameToFieldId = IcebergReflection.buildFieldIdMapping(schema) + // Use cached field ID mapping to avoid repeated reflection per-task + val nameToFieldId = fieldIdMappingCache.getOrElseUpdate( + schema, + IcebergReflection.buildFieldIdMapping(schema)) val projectFieldIds = output.flatMap { attr => nameToFieldId @@ -898,8 +879,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit }) taskBuilder.setProjectFieldIdsIdx(projectFieldIdsIdx) - val deleteFilesList = - extractDeleteFilesList(task, contentFileClass, fileScanTaskClass) + val deleteFilesList = extractDeleteFilesList(task, cache) if (deleteFilesList.nonEmpty) { val deleteFilesIdx = deleteFilesToPoolIndex.getOrElseUpdate( deleteFilesList, { @@ -914,7 +894,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val residualExprOpt = try { - val residualExpr = residualMethod.invoke(task) + val residualExpr = cache.residualMethod.invoke(task) val catalystExpr = convertIcebergExpression(residualExpr, output) catalystExpr.flatMap { expr => exprToProto(expr, output, binding = false) @@ -939,8 +919,7 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit serializePartitionData( task, - contentScanTaskClass, - fileScanTaskClass, + cache, taskBuilder, commonBuilder, partitionTypeToPoolIndex, diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergSerializationBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergSerializationBenchmark.scala new file mode 100644 index 0000000000..b9b24b66e4 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometIcebergSerializationBenchmark.scala @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +import java.io.File +import java.nio.file.Files + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.comet.CometIcebergNativeScanExec +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec + +import org.apache.comet.CometConf +import org.apache.comet.serde.operator.CometIcebergNativeScan + +/** + * Benchmark for Iceberg FileScanTask serialization performance. + * + * This benchmark specifically measures the serializePartitions() method which performs the heavy + * reflection work of converting Iceberg Java objects to protobuf. + * + * Use this to validate performance improvements from reflection caching optimizations (see GitHub + * issue #3456). + * + * To run this benchmark: + * {{{ + * SPARK_GENERATE_BENCHMARK_FILES=1 make \ + * benchmark-org.apache.spark.sql.benchmark.CometIcebergSerializationBenchmark + * }}} + * + * Results will be written to "spark/benchmarks/CometIcebergSerializationBenchmark-*results.txt". + */ +object CometIcebergSerializationBenchmark extends CometBenchmarkBase { + + private def icebergAvailable: Boolean = { + try { + Class.forName("org.apache.iceberg.catalog.Catalog") + true + } catch { + case _: ClassNotFoundException => false + } + } + + private def withTempIcebergDir(f: File => Unit): Unit = { + val dir = Files.createTempDirectory("comet-iceberg-serde-bench").toFile + try { + f(dir) + } finally { + def deleteRecursively(file: File): Unit = { + if (file.isDirectory) { + Option(file.listFiles()).foreach(_.foreach(deleteRecursively)) + } + file.delete() + } + deleteRecursively(dir) + } + } + + private def extractIcebergNativeScanExec( + plan: SparkPlan): Option[CometIcebergNativeScanExec] = { + val unwrapped = plan match { + case aqe: AdaptiveSparkPlanExec => aqe.executedPlan + case other => other + } + + def find(p: SparkPlan): Option[CometIcebergNativeScanExec] = { + p match { + case scan: CometIcebergNativeScanExec => Some(scan) + case _ => p.children.flatMap(find).headOption + } + } + find(unwrapped) + } + + private def createPartitionedIcebergTable( + warehouseDir: File, + numPartitions: Int, + tableName: String = "serde_bench_table"): Unit = { + spark.conf.set("spark.sql.catalog.bench_cat", "org.apache.iceberg.spark.SparkCatalog") + spark.conf.set("spark.sql.catalog.bench_cat.type", "hadoop") + spark.conf.set("spark.sql.catalog.bench_cat.warehouse", warehouseDir.getAbsolutePath) + + val fullTableName = s"bench_cat.db.$tableName" + + spark.sql(s"DROP TABLE IF EXISTS $fullTableName") + spark.sql("CREATE NAMESPACE IF NOT EXISTS bench_cat.db") + + spark.sql(s""" + CREATE TABLE $fullTableName ( + id BIGINT, + name STRING, + value DOUBLE, + partition_col INT + ) USING iceberg + PARTITIONED BY (partition_col) + TBLPROPERTIES ( + 'format-version'='2', + 'write.parquet.compression-codec' = 'snappy' + ) + """) + + // scalastyle:off println + println(s"Creating Iceberg table with $numPartitions partitions...") + // scalastyle:on println + + val batchSize = 1000 + var partitionsCreated = 0 + + while (partitionsCreated < numPartitions) { + val batchEnd = math.min(partitionsCreated + batchSize, numPartitions) + val partitionRange = partitionsCreated until batchEnd + + import spark.implicits._ + val df = partitionRange + .map { p => + (p.toLong, s"name_$p", p * 1.5, p) + } + .toDF("id", "name", "value", "partition_col") + + df.writeTo(fullTableName).append() + partitionsCreated = batchEnd + + if (partitionsCreated % 5000 == 0 || partitionsCreated == numPartitions) { + // scalastyle:off println + println(s" Created $partitionsCreated / $numPartitions partitions") + // scalastyle:on println + } + } + } + + /** + * Benchmarks the serializePartitions() method which does the heavy reflection work. + * + * This is the core method that converts Iceberg FileScanTask Java objects to protobuf. The + * optimizations from PR #3298 target this code path. + */ + def serializePartitionsBenchmark(numPartitions: Int): Unit = { + if (!icebergAvailable) { + // scalastyle:off println + println("Iceberg not available in classpath, skipping benchmark") + // scalastyle:on println + return + } + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.bench_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.bench_cat.type" -> "hadoop", + "spark.sql.catalog.bench_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + createPartitionedIcebergTable(warehouseDir, numPartitions) + val fullTableName = "bench_cat.db.serde_bench_table" + + val df = spark.sql(s"SELECT * FROM $fullTableName") + val plan = df.queryExecution.executedPlan + + val nativeScanOpt = extractIcebergNativeScanExec(plan) + + nativeScanOpt match { + case Some(nativeScan) => + val metadata = nativeScan.nativeIcebergScanMetadata + val originalPlan = nativeScan.originalPlan + val output = nativeScan.output + + // scalastyle:off println + println(s"Found ${metadata.tasks.size()} FileScanTasks") + println(s"Output columns: ${output.map(_.name).mkString(", ")}") + // scalastyle:on println + + val iterations = 100 + val benchmark = new Benchmark( + s"serializePartitions ($numPartitions partitions, ${metadata.tasks.size()} tasks)", + iterations, + output = this.output) + + // Warmup + CometIcebergNativeScan.serializePartitions(originalPlan, output, metadata) + + // Benchmark: serializePartitions() - the heavy reflection path + benchmark.addCase("serializePartitions()") { _ => + var i = 0 + while (i < iterations) { + CometIcebergNativeScan.serializePartitions(originalPlan, output, metadata) + i += 1 + } + } + + // Measure serialized size + val (commonBytes, perPartitionBytes) = + CometIcebergNativeScan.serializePartitions(originalPlan, output, metadata) + + val totalBytes = commonBytes.length + perPartitionBytes.map(_.length).sum + val commonKB = commonBytes.length / 1024.0 + val perPartKB = perPartitionBytes.map(_.length).sum / 1024.0 + val totalKB = totalBytes / 1024.0 + + // scalastyle:off println + println( + f"Serialized size: common=$commonKB%.1f KB, " + + f"per-partition=$perPartKB%.1f KB, total=$totalKB%.1f KB") + println(f"Average per partition: ${perPartKB / numPartitions * 1024}%.1f bytes") + // scalastyle:on println + + benchmark.run() + + case None => + // scalastyle:off println + println("WARNING: Could not find CometIcebergNativeScanExec in query plan") + println(s"Plan:\n$plan") + // scalastyle:on println + } + + spark.sql(s"DROP TABLE IF EXISTS $fullTableName") + } + } + } + + /** + * Micro-benchmark for reflection operations to isolate their cost. + */ + def reflectionMicroBenchmark(): Unit = { + val iterations = 100000 + + val benchmark = new Benchmark("Reflection micro-benchmark", iterations, output = output) + + // Benchmark: Class.forName() cost + benchmark.addCase("Class.forName() - uncached") { _ => + var i = 0 + while (i < iterations) { + Class.forName("org.apache.iceberg.ContentScanTask") + i += 1 + } + } + + // Benchmark: Cached class lookup + val cachedClass = Class.forName("org.apache.iceberg.ContentScanTask") + benchmark.addCase("Class lookup - cached") { _ => + var i = 0 + var c: Class[_] = null + while (i < iterations) { + c = cachedClass + i += 1 + } + } + + // Benchmark: getMethod() cost + benchmark.addCase("getMethod() - uncached") { _ => + var i = 0 + while (i < iterations) { + cachedClass.getMethod("file") + i += 1 + } + } + + // Benchmark: Cached method lookup + val cachedMethod = cachedClass.getMethod("file") + benchmark.addCase("Method lookup - cached") { _ => + var i = 0 + var m: java.lang.reflect.Method = null + while (i < iterations) { + m = cachedMethod + i += 1 + } + } + + benchmark.run() + } + + override def runCometBenchmark(args: Array[String]): Unit = { + val numPartitions = if (args.nonEmpty) args(0).toInt else 10000 + + // First show the cost of reflection operations + runBenchmark("Reflection Micro-benchmark") { + reflectionMicroBenchmark() + } + + // Then benchmark the full serialization path + runBenchmark("Iceberg serializePartitions Benchmark") { + serializePartitionsBenchmark(numPartitions) + } + } +}