diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 93edf70f46..dca5a06228 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1597,6 +1597,20 @@ def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool] # shared instance across multiple threads. return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) + def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: + schema = self.table_metadata.schema() + include_empty_files = strtobool(self.options.get("include_empty_files", "false")) + + # The lambda created here is run in multiple threads. + # So we avoid creating _InclusiveMetricsEvaluator methods bound to a single + # shared instance across multiple threads. + return lambda data_file: _InclusiveMetricsEvaluator( + schema, + self.row_filter, + self.case_sensitive, + include_empty_files, + ).eval(data_file) + def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]: spec = self.table_metadata.specs()[spec_id] @@ -1660,13 +1674,6 @@ def plan_files(self) -> Iterable[FileScanTask]: partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator) - metrics_evaluator = _InclusiveMetricsEvaluator( - self.table_metadata.schema(), - self.row_filter, - self.case_sensitive, - strtobool(self.options.get("include_empty_files", "false")), - ).eval - min_sequence_number = _min_sequence_number(manifests) data_entries: List[ManifestEntry] = [] @@ -1682,7 +1689,7 @@ def plan_files(self) -> Iterable[FileScanTask]: manifest, partition_evaluators[manifest.partition_spec_id], residual_evaluators[manifest.partition_spec_id], - metrics_evaluator, + self._build_metrics_evaluator(), ) for manifest in manifests if self._check_sequence_number(min_sequence_number, manifest)