From a870b02d2208dc4251dc8ae19ee51f32e025f177 Mon Sep 17 00:00:00 2001 From: xqhu Date: Mon, 6 Mar 2023 22:19:28 -0500 Subject: [PATCH 01/13] Raise the Runtime error when DoFn.process uses both yield and return --- sdks/python/apache_beam/transforms/core.py | 11 ++++ .../apache_beam/transforms/core_test.py | 59 +++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 sdks/python/apache_beam/transforms/core_test.py diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index cdc96b52b378..c21c2b22083f 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1387,6 +1387,11 @@ def partition_for(self, element, num_partitions, *args, **kwargs): return self._fn(element, num_partitions, *args, **kwargs) +def _check_fn_use_yield_and_return(fn): + source_code = inspect.getsource(fn) + return " yield " in source_code and " return " in source_code + + class ParDo(PTransformWithSideInputs): """A :class:`ParDo` transform. @@ -1427,6 +1432,12 @@ def __init__(self, fn, *args, **kwargs): if not isinstance(self.fn, DoFn): raise TypeError('ParDo must be called with a DoFn instance.') + # DoFn.process cannot allow both return and yield + if _check_fn_use_yield_and_return(self.fn.process): + raise RuntimeError( + 'The yield and return statements in the process method ' + f'of {self.fn.__class__ } can not be mixed.') + # Validate the DoFn by creating a DoFnSignature from apache_beam.runners.common import DoFnSignature self._signature = DoFnSignature(self.fn) diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py new file mode 100644 index 000000000000..3cca3bfc7de7 --- /dev/null +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -0,0 +1,59 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the core python file.""" +# pytype: skip-file + +import logging +import unittest + +import apache_beam as beam + + +class TestDoFn1(beam.DoFn): + def process(self, element): + yield element + + +class TestDoFn2(beam.DoFn): + def process(self, element): + return element + + +class TestDoFn3(beam.DoFn): + """mixing return and yield is not allowed""" + def process(self, element): + if not element: + return -1 + yield element + + +class CreateTest(unittest.TestCase): + def test_dofn_with_yield_and_return(self): + assert beam.ParDo(TestDoFn1()) + assert beam.ParDo(TestDoFn2()) + with self.assertRaises(RuntimeError) as e: + beam.ParDo(TestDoFn3()) + self.assertEqual( + str(e.exception), + 'The yield and return statements in the process method ' + f'of {TestDoFn3().__class__} can not be mixed.') + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() From e42a63d1791fc0c3a46ee71f9b811d7ac46e381a Mon Sep 17 00:00:00 2001 From: xqhu Date: Tue, 7 Mar 2023 17:48:04 -0500 Subject: [PATCH 02/13] skip the inner functions when checking return and yield --- sdks/python/apache_beam/transforms/core.py | 37 ++++++++++++++++++- .../apache_beam/transforms/core_test.py | 5 ++- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index c21c2b22083f..33d7442ea325 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -71,6 +71,8 @@ from apache_beam.utils import urns from apache_beam.utils.timestamp import Duration +from itertools import dropwhile + if typing.TYPE_CHECKING: from google.protobuf import message # pylint: disable=ungrouped-imports from apache_beam.io import iobase @@ -1387,9 +1389,40 @@ def partition_for(self, element, num_partitions, *args, **kwargs): return self._fn(element, num_partitions, *args, **kwargs) +def _get_function_body_without_inners(func): + source_lines = inspect.getsourcelines(func)[0] + source_lines = dropwhile(lambda x: x.startswith("@"), source_lines) + def_line = next(source_lines).strip() + if def_line.startswith("def ") and def_line.endswith(":"): + first_line = next(source_lines) + indentation = len(first_line) - len(first_line.lstrip()) + final_lines = [first_line[indentation:]] + + skip_inner_def = False + if first_line[indentation:].startswith("def "): + skip_inner_def = True + for line in source_lines: + line_indentation = len(line) - len(line.lstrip()) + + if line[indentation:].startswith("def "): + skip_inner_def = True + continue + + if skip_inner_def and line_indentation == indentation: + skip_inner_def = False + + if skip_inner_def and line_indentation > indentation: + continue + final_lines.append(line[indentation:]) + + return "".join(final_lines) + else: + return def_line.rsplit(":")[-1].strip() + + def _check_fn_use_yield_and_return(fn): - source_code = inspect.getsource(fn) - return " yield " in source_code and " return " in source_code + source_code = _get_function_body_without_inners(fn) + return "yield " in source_code and "return " in source_code class ParDo(PTransformWithSideInputs): diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index 3cca3bfc7de7..72fb8b3d46d7 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -31,7 +31,10 @@ def process(self, element): class TestDoFn2(beam.DoFn): def process(self, element): - return element + def inner_func(x): + yield x + + return inner_func(element) class TestDoFn3(beam.DoFn): From 997a8e6a0822517143942bdb20ab2b71ca2e7773 Mon Sep 17 00:00:00 2001 From: xqhu Date: Tue, 7 Mar 2023 20:29:07 -0500 Subject: [PATCH 03/13] skip builtin function types --- sdks/python/apache_beam/transforms/core.py | 2 ++ sdks/python/apache_beam/transforms/core_test.py | 1 + 2 files changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 33d7442ea325..f4c4929627f2 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1421,6 +1421,8 @@ def _get_function_body_without_inners(func): def _check_fn_use_yield_and_return(fn): + if isinstance(fn, types.BuiltinFunctionType): + return False source_code = _get_function_body_without_inners(fn) return "yield " in source_code and "return " in source_code diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index 72fb8b3d46d7..bf994a5d532a 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -47,6 +47,7 @@ def process(self, element): class CreateTest(unittest.TestCase): def test_dofn_with_yield_and_return(self): + assert beam.ParDo(sum) assert beam.ParDo(TestDoFn1()) assert beam.ParDo(TestDoFn2()) with self.assertRaises(RuntimeError) as e: From 67124406adf01603ea22197718be05c5efc102fb Mon Sep 17 00:00:00 2001 From: xqhu Date: Wed, 8 Mar 2023 10:49:53 -0500 Subject: [PATCH 04/13] Capture TypeError --- sdks/python/apache_beam/transforms/core.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index f4c4929627f2..8b0cc2e60dea 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1423,8 +1423,11 @@ def _get_function_body_without_inners(func): def _check_fn_use_yield_and_return(fn): if isinstance(fn, types.BuiltinFunctionType): return False - source_code = _get_function_body_without_inners(fn) - return "yield " in source_code and "return " in source_code + try: + source_code = _get_function_body_without_inners(fn) + return "yield " in source_code and "return " in source_code + except TypeError: + return False class ParDo(PTransformWithSideInputs): From e9b9a5d86e13f343c814e24ca6fc9d317fa5841b Mon Sep 17 00:00:00 2001 From: xqhu Date: Wed, 8 Mar 2023 14:59:46 -0500 Subject: [PATCH 05/13] fix the isort error --- sdks/python/apache_beam/transforms/core.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 8b0cc2e60dea..7d3ce698e2fc 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -28,6 +28,7 @@ import traceback import types import typing +from itertools import dropwhile from apache_beam import coders from apache_beam import pvalue @@ -71,8 +72,6 @@ from apache_beam.utils import urns from apache_beam.utils.timestamp import Duration -from itertools import dropwhile - if typing.TYPE_CHECKING: from google.protobuf import message # pylint: disable=ungrouped-imports from apache_beam.io import iobase From 9652a80d9fc4b5b79c20b201cb061c15658fbce3 Mon Sep 17 00:00:00 2001 From: xqhu Date: Thu, 9 Mar 2023 10:25:06 -0500 Subject: [PATCH 06/13] update CHANGES.md for this potential breaking changes --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index d36fb2ffaa76..3532a3c5a597 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,7 @@ ## Breaking Changes +* Python SDK now does not allow mixing the yield and return statements in `DoFn.process()` ([#22969](https://github.com/apache/beam/issues/22969)). `yield` is recommended for emitting elements and `yield from` for iterators. * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). ## Deprecations From d32e368017ec25652d1093ad1fb1008cc788ebc5 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Fri, 10 Mar 2023 21:34:14 +0000 Subject: [PATCH 07/13] use the loggin warning rather than raise RuntimeError --- CHANGES.md | 1 - sdks/python/apache_beam/transforms/core.py | 99 +++++++++++++++---- .../apache_beam/transforms/core_test.py | 36 +++++-- 3 files changed, 111 insertions(+), 25 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 3532a3c5a597..d36fb2ffaa76 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,7 +68,6 @@ ## Breaking Changes -* Python SDK now does not allow mixing the yield and return statements in `DoFn.process()` ([#22969](https://github.com/apache/beam/issues/22969)). `yield` is recommended for emitting elements and `yield from` for iterators. * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). ## Deprecations diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 7d3ce698e2fc..c0194b30245e 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -141,6 +141,7 @@ class DoFnProcessContext(DoFnContext): for this element. Not used by the pipeline code. """ + def __init__(self, label, element=None, state=None): """Initialize a processing context object with an element and state. @@ -182,6 +183,7 @@ class ProcessContinuation(object): If produced, indicates that there is more work to be done for the current input element. """ + def __init__(self, resume_delay=0): """Initializes a ProcessContinuation object. @@ -254,6 +256,7 @@ class RestrictionProvider(object): be invoked with a single parameter of type ``Timestamp`` or as an integer that gives the watermark in number of seconds. """ + def create_tracker(self, restriction): # type: (...) -> iobase.RestrictionTracker @@ -419,6 +422,7 @@ class WatermarkEstimatorProvider(object): or, if no WatermarkEstimatorProvider is provided, the DoFn itself must be a WatermarkEstimatorProvider. """ + def initial_estimator_state(self, element, restriction): """Returns the initial state of the WatermarkEstimator with given element and restriction. @@ -438,6 +442,7 @@ def estimator_state_coder(self): class _DoFnParam(object): """DoFn parameter.""" + def __init__(self, param_id): self.param_id = param_id @@ -455,6 +460,7 @@ def __repr__(self): class _RestrictionDoFnParam(_DoFnParam): """Restriction Provider DoFn parameter.""" + def __init__(self, restriction_provider=None): # type: (typing.Optional[RestrictionProvider]) -> None if (restriction_provider is not None and @@ -468,6 +474,7 @@ def __init__(self, restriction_provider=None): class _StateDoFnParam(_DoFnParam): """State DoFn parameter.""" + def __init__(self, state_spec): # type: (StateSpec) -> None if not isinstance(state_spec, StateSpec): @@ -478,6 +485,7 @@ def __init__(self, state_spec): class _TimerDoFnParam(_DoFnParam): """Timer DoFn parameter.""" + def __init__(self, timer_spec): # type: (TimerSpec) -> None if not isinstance(timer_spec, TimerSpec): @@ -488,6 +496,7 @@ def __init__(self, timer_spec): class _BundleFinalizerParam(_DoFnParam): """Bundle Finalization DoFn parameter.""" + def __init__(self): self._callbacks = [] self.param_id = "FinalizeBundle" @@ -515,6 +524,7 @@ def reset(self): class _WatermarkEstimatorParam(_DoFnParam): """WatermarkEstimator DoFn parameter.""" + def __init__( self, watermark_estimator_provider: typing. @@ -581,6 +591,7 @@ def from_callable(fn): def unbounded_per_element(): """A decorator on process fn specifying that the fn performs an unbounded amount of work per input element.""" + def wrapper(process_fn): process_fn.unbounded_per_element = True return process_fn @@ -914,6 +925,7 @@ class CallableWrapperDoFn(DoFn): The purpose of this class is to conveniently wrap simple functions and use them in transforms. """ + def __init__(self, fn, fullargspec=None): """Initializes a CallableWrapperDoFn object wrapping a callable. @@ -1008,6 +1020,7 @@ class CombineFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn): **apply** will be called with an empty list at expansion time to get the default value. """ + def default_label(self): return self.__class__.__name__ @@ -1174,6 +1187,7 @@ def get_accumulator_coder(self): class _ReiterableChain(object): """Like itertools.chain, but allowing re-iteration.""" + def __init__(self, iterables): self.iterables = iterables @@ -1306,6 +1320,7 @@ class NoSideInputsCallableWrapperCombineFn(CallableWrapperCombineFn): This is identical to its parent, but avoids accepting and passing *args and **kwargs for efficiency as they are known to be empty. """ + def create_accumulator(self): return [] @@ -1340,6 +1355,7 @@ class PartitionFn(WithTypeHints): A PartitionFn specifies how individual values in a PCollection will be placed into separate partitions, indexed by an integer. """ + def default_label(self): return self.__class__.__name__ @@ -1367,6 +1383,7 @@ class CallableWrapperPartitionFn(PartitionFn): Instances of this class wrap simple functions for use in Partition operations. """ + def __init__(self, fn): """Initializes a PartitionFn object wrapping a callable. @@ -1424,7 +1441,16 @@ def _check_fn_use_yield_and_return(fn): return False try: source_code = _get_function_body_without_inners(fn) - return "yield " in source_code and "return " in source_code + has_yield = False + has_return = False + for line in source_code.split("\n"): + if line.lstrip().startswith("yield"): + has_yield = True + if line.lstrip().startswith("return"): + has_return = True + if has_yield and has_return: + return True + return False except TypeError: return False @@ -1460,6 +1486,7 @@ class ParDo(PTransformWithSideInputs): replaced by values from the :class:`~apache_beam.pvalue.PCollection` in the exact positions where they appear in the argument lists. """ + def __init__(self, fn, *args, **kwargs): super().__init__(fn, *args, **kwargs) # TODO(robertwb): Change all uses of the dofn attribute to use fn instead. @@ -1471,9 +1498,12 @@ def __init__(self, fn, *args, **kwargs): # DoFn.process cannot allow both return and yield if _check_fn_use_yield_and_return(self.fn.process): - raise RuntimeError( + _LOGGER.warning( 'The yield and return statements in the process method ' - f'of {self.fn.__class__ } can not be mixed.') + f'of {self.fn.__class__ } can not be mixed.' + 'We recommend to use `yield` for emitting individual ' + ' elements and `yield from` for emitting the content ' + 'of entire iterables.') # Validate the DoFn by creating a DoFnSignature from apache_beam.runners.common import DoFnSignature @@ -1800,6 +1830,7 @@ def _add_type_constraint_from_consumer(self, full_label, input_type_hints): class _MultiParDo(PTransform): + def __init__(self, do_transform, tags, main_tag, allow_unknown_tags=None): super().__init__(do_transform.label) self._do_transform = do_transform @@ -1821,8 +1852,10 @@ class DoFnInfo(object): """This class represents the state in the ParDoPayload's function spec, which is the actual DoFn together with some data required for invoking it. """ + @staticmethod def register_stateless_dofn(urn): + def wrapper(cls): StatelessDoFnInfo.REGISTERED_DOFNS[urn] = cls cls._stateless_dofn_urn = urn @@ -1857,6 +1890,7 @@ def serialized_dofn_data(self): class PickledDoFnInfo(DoFnInfo): + def __init__(self, serialized_data): self._serialized_data = serialized_data @@ -2127,6 +2161,7 @@ def FlatMapTuple(fn, *args, **kwargs): # pylint: disable=invalid-name class _ExceptionHandlingWrapper(ptransform.PTransform): """Implementation of ParDo.with_exception_handling.""" + def __init__( self, fn, @@ -2166,6 +2201,7 @@ def expand(self, pcoll): if self._threshold < 1.0: class MaybeWindow(ptransform.PTransform): + @staticmethod def expand(pcoll): if self._threshold_windowing: @@ -2194,6 +2230,7 @@ def check_threshold(bad, total, threshold, window=DoFn.WindowParam): class _ExceptionHandlingWrapperDoFn(DoFn): + def __init__(self, fn, dead_letter_tag, exc_class, partial): self._fn = fn self._dead_letter_tag = dead_letter_tag @@ -2227,6 +2264,7 @@ def process(self, *args, **kwargs): class _SubprocessDoFn(DoFn): """Process method run in a subprocess, turning hard crashes into exceptions. """ + def __init__(self, fn): self._fn = fn self._serialized_fn = pickler.dumps(fn) @@ -2467,6 +2505,7 @@ def as_singleton_view(self): return self._clone(as_view=True) def expand(self, pcoll): + def add_input_types(transform): type_hints = self.get_type_hints() if type_hints.input_types: @@ -2551,6 +2590,7 @@ def from_runner_api_parameter(unused_ptransform, combine_payload, context): @DoFnInfo.register_stateless_dofn(python_urns.KEY_WITH_NONE_DOFN) class _KeyWithNone(DoFn): + def process(self, v): yield None, v @@ -2574,6 +2614,7 @@ class CombinePerKey(PTransformWithSideInputs): Returns: A PObject holding the result of the combine operation. """ + def with_hot_key_fanout(self, fanout): """A per-key combine operation like self but with two levels of aggregation. @@ -2671,6 +2712,7 @@ def runner_api_requires_keyed_input(self): # TODO(robertwb): Rename to CombineGroupedValues? class CombineValues(PTransformWithSideInputs): + def make_fn(self, fn, has_side_inputs): return CombineFn.maybe_from_callable(fn, has_side_inputs) @@ -2711,6 +2753,7 @@ def from_runner_api_parameter(unused_ptransform, combine_payload, context): class CombineValuesDoFn(DoFn): """DoFn for performing per-key Combine transforms.""" + def __init__( self, input_pcoll_type, @@ -2773,6 +2816,7 @@ def default_type_hints(self): class _CombinePerKeyWithHotKeyFanout(PTransform): + def __init__( self, combine_fn, # type: CombineFn @@ -2801,6 +2845,7 @@ def expand(self, pcoll): 'SlidingWindows. See: https://github.com/apache/beam/issues/20528') class SplitHotCold(DoFn): + def start_bundle(self): # Spreading a hot key across all possible sub-keys for all bundles # would defeat the goal of not overwhelming downstream reducers @@ -2819,6 +2864,7 @@ def process(self, element): yield pvalue.TaggedOutput('hot', ((self._nonce % fanout, key), value)) class PreCombineFn(CombineFn): + @staticmethod def extract_output(accumulator): # Boolean indicates this is an accumulator. @@ -2832,6 +2878,7 @@ def extract_output(accumulator): teardown = combine_fn.teardown class PostCombineFn(CombineFn): + @staticmethod def add_input(accumulator, element): is_accumulator, value = element @@ -2878,7 +2925,9 @@ class GroupByKey(PTransform): The implementation here is used only when run on the local direct runner. """ + class ReifyWindows(DoFn): + def process( self, element, window=DoFn.WindowParam, timestamp=DoFn.TimestampParam): try: @@ -2987,11 +3036,12 @@ class GroupBy(PTransform): The GroupBy operation can be made into an aggregating operation by invoking its `aggregate_field` method. """ + def __init__( self, *fields, # type: typing.Union[str, typing.Callable] **kwargs # type: typing.Union[str, typing.Callable] - ): + ): if len(fields) == 1 and not kwargs: self._force_tuple_keys = False name = fields[0] if isinstance(fields[0], str) else 'key' @@ -3014,7 +3064,7 @@ def aggregate_field( field, # type: typing.Union[str, typing.Callable] combine_fn, # type: typing.Union[typing.Callable, CombineFn] dest, # type: str - ): + ): """Returns a grouping operation that also aggregates grouped values. Args: @@ -3093,6 +3143,7 @@ def _unpickle_dynamic_named_tuple(type_name, field_names, values): class _GroupAndAggregate(PTransform): + def __init__(self, grouping, aggregations): self._grouping = grouping self._aggregations = aggregations @@ -3102,7 +3153,7 @@ def aggregate_field( field, # type: typing.Union[str, typing.Callable] combine_fn, # type: typing.Union[typing.Callable, CombineFn] dest, # type: str - ): + ): field = _expr_to_callable(field, 0) return _GroupAndAggregate( self._grouping, list(self._aggregations) + [(field, combine_fn, dest)]) @@ -3144,10 +3195,12 @@ class Select(PTransform): pcoll | beam.Map(lambda x: beam.Row(a=x.a, b=foo(x))) """ - def __init__(self, - *args, # type: typing.Union[str, typing.Callable] - **kwargs # type: typing.Union[str, typing.Callable] - ): + + def __init__( + self, + *args, # type: typing.Union[str, typing.Callable] + **kwargs # type: typing.Union[str, typing.Callable] + ): self._fields = [( expr if isinstance(expr, str) else 'arg%02d' % ix, _expr_to_callable(expr, ix)) for (ix, expr) in enumerate(args) @@ -3185,8 +3238,10 @@ class Partition(PTransformWithSideInputs): The result of this PTransform is a simple list of the output PCollections representing each of n partitions, in order. """ + class ApplyPartitionFnFn(DoFn): """A DoFn that applies a PartitionFn.""" + def process(self, element, partitionfn, n, *args, **kwargs): partition = partitionfn.partition_for(element, n, *args, **kwargs) if not 0 <= partition < n: @@ -3209,14 +3264,16 @@ def expand(self, pcoll): class Windowing(object): - def __init__(self, - windowfn, # type: WindowFn - triggerfn=None, # type: typing.Optional[TriggerFn] - accumulation_mode=None, # type: typing.Optional[beam_runner_api_pb2.AccumulationMode.Enum] - timestamp_combiner=None, # type: typing.Optional[beam_runner_api_pb2.OutputTime.Enum] - allowed_lateness=0, # type: typing.Union[int, float] - environment_id=None, # type: typing.Optional[str] - ): + + def __init__( + self, + windowfn, # type: WindowFn + triggerfn=None, # type: typing.Optional[TriggerFn] + accumulation_mode=None, # type: typing.Optional[beam_runner_api_pb2.AccumulationMode.Enum] + timestamp_combiner=None, # type: typing.Optional[beam_runner_api_pb2.OutputTime.Enum] + allowed_lateness=0, # type: typing.Union[int, float] + environment_id=None, # type: typing.Optional[str] + ): """Class representing the window strategy. Args: @@ -3338,8 +3395,10 @@ class WindowInto(ParDo): element with the same input value and timestamp, with its new set of windows determined by the windowing function. """ + class WindowIntoFn(DoFn): """A DoFn that applies a WindowInto operation.""" + def __init__(self, windowing): # type: (Windowing) -> None self.windowing = windowing @@ -3447,6 +3506,7 @@ class Flatten(PTransform): if there's a chance there may be none), this argument is the only way to provide pipeline information and should be considered mandatory. """ + def __init__(self, **kwargs): super().__init__() self.pipeline = kwargs.pop( @@ -3488,6 +3548,7 @@ def from_runner_api_parameter( class Create(PTransform): """A transform that creates a PCollection from an iterable.""" + def __init__(self, values, reshuffle=True): """Initializes a Create transform. @@ -3542,6 +3603,7 @@ def expand(self, pbegin): # transforms (e.g. Write). class MaybeReshuffle(PTransform): + def expand(self, pcoll): if len(serialized_values) > 1 and reshuffle: from apache_beam.transforms.util import Reshuffle @@ -3580,6 +3642,7 @@ def _create_source(serialized_values, coder): @typehints.with_output_types(bytes) class Impulse(PTransform): """Impulse primitive.""" + def expand(self, pbegin): if not isinstance(pbegin, pvalue.PBegin): raise TypeError( diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index bf994a5d532a..1596b2228e69 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -19,18 +19,22 @@ # pytype: skip-file import logging +import pytest import unittest import apache_beam as beam class TestDoFn1(beam.DoFn): + def process(self, element): yield element class TestDoFn2(beam.DoFn): + def process(self, element): + def inner_func(x): yield x @@ -39,24 +43,44 @@ def inner_func(x): class TestDoFn3(beam.DoFn): """mixing return and yield is not allowed""" + def process(self, element): if not element: return -1 yield element +class TestDoFn4(beam.DoFn): + """test the variable name containing return""" + + def process(self, element): + my_return = element + yield my_return + + +class TestDoFn5(beam.DoFn): + """test the variable name containing yield""" + + def process(self, element): + my_yield = element + return my_yield + + class CreateTest(unittest.TestCase): + + @pytest.fixture(autouse=True) + def inject_fixtures(self, caplog): + self._caplog = caplog + def test_dofn_with_yield_and_return(self): assert beam.ParDo(sum) assert beam.ParDo(TestDoFn1()) assert beam.ParDo(TestDoFn2()) - with self.assertRaises(RuntimeError) as e: + assert beam.ParDo(TestDoFn4()) + assert beam.ParDo(TestDoFn5()) + with self._caplog.at_level(logging.WARNING): beam.ParDo(TestDoFn3()) - self.assertEqual( - str(e.exception), - 'The yield and return statements in the process method ' - f'of {TestDoFn3().__class__} can not be mixed.') - + assert 'The yield and return statements in' in self._caplog.text if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From fe5535fb83217fbd738c58bdc3d2061291480164 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Fri, 10 Mar 2023 21:50:54 +0000 Subject: [PATCH 08/13] use the yapf 0.29.0 --- sdks/python/apache_beam/transforms/core.py | 44 ------------------- .../apache_beam/transforms/core_test.py | 10 +---- 2 files changed, 2 insertions(+), 52 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index c0194b30245e..ce4278b08cc4 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -141,7 +141,6 @@ class DoFnProcessContext(DoFnContext): for this element. Not used by the pipeline code. """ - def __init__(self, label, element=None, state=None): """Initialize a processing context object with an element and state. @@ -183,7 +182,6 @@ class ProcessContinuation(object): If produced, indicates that there is more work to be done for the current input element. """ - def __init__(self, resume_delay=0): """Initializes a ProcessContinuation object. @@ -256,7 +254,6 @@ class RestrictionProvider(object): be invoked with a single parameter of type ``Timestamp`` or as an integer that gives the watermark in number of seconds. """ - def create_tracker(self, restriction): # type: (...) -> iobase.RestrictionTracker @@ -422,7 +419,6 @@ class WatermarkEstimatorProvider(object): or, if no WatermarkEstimatorProvider is provided, the DoFn itself must be a WatermarkEstimatorProvider. """ - def initial_estimator_state(self, element, restriction): """Returns the initial state of the WatermarkEstimator with given element and restriction. @@ -442,7 +438,6 @@ def estimator_state_coder(self): class _DoFnParam(object): """DoFn parameter.""" - def __init__(self, param_id): self.param_id = param_id @@ -460,7 +455,6 @@ def __repr__(self): class _RestrictionDoFnParam(_DoFnParam): """Restriction Provider DoFn parameter.""" - def __init__(self, restriction_provider=None): # type: (typing.Optional[RestrictionProvider]) -> None if (restriction_provider is not None and @@ -474,7 +468,6 @@ def __init__(self, restriction_provider=None): class _StateDoFnParam(_DoFnParam): """State DoFn parameter.""" - def __init__(self, state_spec): # type: (StateSpec) -> None if not isinstance(state_spec, StateSpec): @@ -485,7 +478,6 @@ def __init__(self, state_spec): class _TimerDoFnParam(_DoFnParam): """Timer DoFn parameter.""" - def __init__(self, timer_spec): # type: (TimerSpec) -> None if not isinstance(timer_spec, TimerSpec): @@ -496,7 +488,6 @@ def __init__(self, timer_spec): class _BundleFinalizerParam(_DoFnParam): """Bundle Finalization DoFn parameter.""" - def __init__(self): self._callbacks = [] self.param_id = "FinalizeBundle" @@ -524,7 +515,6 @@ def reset(self): class _WatermarkEstimatorParam(_DoFnParam): """WatermarkEstimator DoFn parameter.""" - def __init__( self, watermark_estimator_provider: typing. @@ -591,7 +581,6 @@ def from_callable(fn): def unbounded_per_element(): """A decorator on process fn specifying that the fn performs an unbounded amount of work per input element.""" - def wrapper(process_fn): process_fn.unbounded_per_element = True return process_fn @@ -925,7 +914,6 @@ class CallableWrapperDoFn(DoFn): The purpose of this class is to conveniently wrap simple functions and use them in transforms. """ - def __init__(self, fn, fullargspec=None): """Initializes a CallableWrapperDoFn object wrapping a callable. @@ -1020,7 +1008,6 @@ class CombineFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn): **apply** will be called with an empty list at expansion time to get the default value. """ - def default_label(self): return self.__class__.__name__ @@ -1187,7 +1174,6 @@ def get_accumulator_coder(self): class _ReiterableChain(object): """Like itertools.chain, but allowing re-iteration.""" - def __init__(self, iterables): self.iterables = iterables @@ -1320,7 +1306,6 @@ class NoSideInputsCallableWrapperCombineFn(CallableWrapperCombineFn): This is identical to its parent, but avoids accepting and passing *args and **kwargs for efficiency as they are known to be empty. """ - def create_accumulator(self): return [] @@ -1355,7 +1340,6 @@ class PartitionFn(WithTypeHints): A PartitionFn specifies how individual values in a PCollection will be placed into separate partitions, indexed by an integer. """ - def default_label(self): return self.__class__.__name__ @@ -1383,7 +1367,6 @@ class CallableWrapperPartitionFn(PartitionFn): Instances of this class wrap simple functions for use in Partition operations. """ - def __init__(self, fn): """Initializes a PartitionFn object wrapping a callable. @@ -1486,7 +1469,6 @@ class ParDo(PTransformWithSideInputs): replaced by values from the :class:`~apache_beam.pvalue.PCollection` in the exact positions where they appear in the argument lists. """ - def __init__(self, fn, *args, **kwargs): super().__init__(fn, *args, **kwargs) # TODO(robertwb): Change all uses of the dofn attribute to use fn instead. @@ -1830,7 +1812,6 @@ def _add_type_constraint_from_consumer(self, full_label, input_type_hints): class _MultiParDo(PTransform): - def __init__(self, do_transform, tags, main_tag, allow_unknown_tags=None): super().__init__(do_transform.label) self._do_transform = do_transform @@ -1852,10 +1833,8 @@ class DoFnInfo(object): """This class represents the state in the ParDoPayload's function spec, which is the actual DoFn together with some data required for invoking it. """ - @staticmethod def register_stateless_dofn(urn): - def wrapper(cls): StatelessDoFnInfo.REGISTERED_DOFNS[urn] = cls cls._stateless_dofn_urn = urn @@ -1890,7 +1869,6 @@ def serialized_dofn_data(self): class PickledDoFnInfo(DoFnInfo): - def __init__(self, serialized_data): self._serialized_data = serialized_data @@ -2161,7 +2139,6 @@ def FlatMapTuple(fn, *args, **kwargs): # pylint: disable=invalid-name class _ExceptionHandlingWrapper(ptransform.PTransform): """Implementation of ParDo.with_exception_handling.""" - def __init__( self, fn, @@ -2201,7 +2178,6 @@ def expand(self, pcoll): if self._threshold < 1.0: class MaybeWindow(ptransform.PTransform): - @staticmethod def expand(pcoll): if self._threshold_windowing: @@ -2230,7 +2206,6 @@ def check_threshold(bad, total, threshold, window=DoFn.WindowParam): class _ExceptionHandlingWrapperDoFn(DoFn): - def __init__(self, fn, dead_letter_tag, exc_class, partial): self._fn = fn self._dead_letter_tag = dead_letter_tag @@ -2264,7 +2239,6 @@ def process(self, *args, **kwargs): class _SubprocessDoFn(DoFn): """Process method run in a subprocess, turning hard crashes into exceptions. """ - def __init__(self, fn): self._fn = fn self._serialized_fn = pickler.dumps(fn) @@ -2505,7 +2479,6 @@ def as_singleton_view(self): return self._clone(as_view=True) def expand(self, pcoll): - def add_input_types(transform): type_hints = self.get_type_hints() if type_hints.input_types: @@ -2590,7 +2563,6 @@ def from_runner_api_parameter(unused_ptransform, combine_payload, context): @DoFnInfo.register_stateless_dofn(python_urns.KEY_WITH_NONE_DOFN) class _KeyWithNone(DoFn): - def process(self, v): yield None, v @@ -2614,7 +2586,6 @@ class CombinePerKey(PTransformWithSideInputs): Returns: A PObject holding the result of the combine operation. """ - def with_hot_key_fanout(self, fanout): """A per-key combine operation like self but with two levels of aggregation. @@ -2712,7 +2683,6 @@ def runner_api_requires_keyed_input(self): # TODO(robertwb): Rename to CombineGroupedValues? class CombineValues(PTransformWithSideInputs): - def make_fn(self, fn, has_side_inputs): return CombineFn.maybe_from_callable(fn, has_side_inputs) @@ -2845,7 +2815,6 @@ def expand(self, pcoll): 'SlidingWindows. See: https://github.com/apache/beam/issues/20528') class SplitHotCold(DoFn): - def start_bundle(self): # Spreading a hot key across all possible sub-keys for all bundles # would defeat the goal of not overwhelming downstream reducers @@ -2864,7 +2833,6 @@ def process(self, element): yield pvalue.TaggedOutput('hot', ((self._nonce % fanout, key), value)) class PreCombineFn(CombineFn): - @staticmethod def extract_output(accumulator): # Boolean indicates this is an accumulator. @@ -2878,7 +2846,6 @@ def extract_output(accumulator): teardown = combine_fn.teardown class PostCombineFn(CombineFn): - @staticmethod def add_input(accumulator, element): is_accumulator, value = element @@ -2925,9 +2892,7 @@ class GroupByKey(PTransform): The implementation here is used only when run on the local direct runner. """ - class ReifyWindows(DoFn): - def process( self, element, window=DoFn.WindowParam, timestamp=DoFn.TimestampParam): try: @@ -3143,7 +3108,6 @@ def _unpickle_dynamic_named_tuple(type_name, field_names, values): class _GroupAndAggregate(PTransform): - def __init__(self, grouping, aggregations): self._grouping = grouping self._aggregations = aggregations @@ -3238,10 +3202,8 @@ class Partition(PTransformWithSideInputs): The result of this PTransform is a simple list of the output PCollections representing each of n partitions, in order. """ - class ApplyPartitionFnFn(DoFn): """A DoFn that applies a PartitionFn.""" - def process(self, element, partitionfn, n, *args, **kwargs): partition = partitionfn.partition_for(element, n, *args, **kwargs) if not 0 <= partition < n: @@ -3395,10 +3357,8 @@ class WindowInto(ParDo): element with the same input value and timestamp, with its new set of windows determined by the windowing function. """ - class WindowIntoFn(DoFn): """A DoFn that applies a WindowInto operation.""" - def __init__(self, windowing): # type: (Windowing) -> None self.windowing = windowing @@ -3506,7 +3466,6 @@ class Flatten(PTransform): if there's a chance there may be none), this argument is the only way to provide pipeline information and should be considered mandatory. """ - def __init__(self, **kwargs): super().__init__() self.pipeline = kwargs.pop( @@ -3548,7 +3507,6 @@ def from_runner_api_parameter( class Create(PTransform): """A transform that creates a PCollection from an iterable.""" - def __init__(self, values, reshuffle=True): """Initializes a Create transform. @@ -3603,7 +3561,6 @@ def expand(self, pbegin): # transforms (e.g. Write). class MaybeReshuffle(PTransform): - def expand(self, pcoll): if len(serialized_values) > 1 and reshuffle: from apache_beam.transforms.util import Reshuffle @@ -3642,7 +3599,6 @@ def _create_source(serialized_values, coder): @typehints.with_output_types(bytes) class Impulse(PTransform): """Impulse primitive.""" - def expand(self, pbegin): if not isinstance(pbegin, pvalue.PBegin): raise TypeError( diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index 1596b2228e69..0b7f5dc8b765 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -26,15 +26,12 @@ class TestDoFn1(beam.DoFn): - def process(self, element): yield element class TestDoFn2(beam.DoFn): - def process(self, element): - def inner_func(x): yield x @@ -43,7 +40,6 @@ def inner_func(x): class TestDoFn3(beam.DoFn): """mixing return and yield is not allowed""" - def process(self, element): if not element: return -1 @@ -52,7 +48,6 @@ def process(self, element): class TestDoFn4(beam.DoFn): """test the variable name containing return""" - def process(self, element): my_return = element yield my_return @@ -60,17 +55,15 @@ def process(self, element): class TestDoFn5(beam.DoFn): """test the variable name containing yield""" - def process(self, element): my_yield = element return my_yield class CreateTest(unittest.TestCase): - @pytest.fixture(autouse=True) def inject_fixtures(self, caplog): - self._caplog = caplog + self._caplog = caplog def test_dofn_with_yield_and_return(self): assert beam.ParDo(sum) @@ -82,6 +75,7 @@ def test_dofn_with_yield_and_return(self): beam.ParDo(TestDoFn3()) assert 'The yield and return statements in' in self._caplog.text + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() From 6dcb7a812c231f7074374f098251759cd3a45039 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sat, 11 Mar 2023 00:57:25 +0000 Subject: [PATCH 09/13] format fix --- sdks/python/apache_beam/transforms/core.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index ce4278b08cc4..bfdae2739d06 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1482,10 +1482,11 @@ def __init__(self, fn, *args, **kwargs): if _check_fn_use_yield_and_return(self.fn.process): _LOGGER.warning( 'The yield and return statements in the process method ' - f'of {self.fn.__class__ } can not be mixed.' + 'of %s can not be mixed.' 'We recommend to use `yield` for emitting individual ' ' elements and `yield from` for emitting the content ' - 'of entire iterables.') + 'of entire iterables.', + self.fn.__class__) # Validate the DoFn by creating a DoFnSignature from apache_beam.runners.common import DoFnSignature From 8ae2c5d0bbfa2b5cef2915b480b8fec5185c60b5 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sat, 11 Mar 2023 01:22:40 +0000 Subject: [PATCH 10/13] fix isort --- sdks/python/apache_beam/transforms/core_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index 0b7f5dc8b765..c06aea17612f 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -19,9 +19,10 @@ # pytype: skip-file import logging -import pytest import unittest +import pytest + import apache_beam as beam From 53ff0da9c9473733ee5058329be67bc7e9c4ee8c Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Tue, 14 Mar 2023 18:55:05 +0000 Subject: [PATCH 11/13] Fix the false postive cases --- sdks/python/apache_beam/transforms/core.py | 14 ++++---- .../apache_beam/transforms/core_test.py | 33 +++++++++++++++---- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index bfdae2739d06..d3d93e16574a 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1427,9 +1427,11 @@ def _check_fn_use_yield_and_return(fn): has_yield = False has_return = False for line in source_code.split("\n"): - if line.lstrip().startswith("yield"): + if line.lstrip().startswith("yield ") or line.lstrip().startswith( + "yield("): has_yield = True - if line.lstrip().startswith("return"): + if line.lstrip().startswith("return ") or line.lstrip().startswith( + "return("): has_return = True if has_yield and has_return: return True @@ -1481,11 +1483,9 @@ def __init__(self, fn, *args, **kwargs): # DoFn.process cannot allow both return and yield if _check_fn_use_yield_and_return(self.fn.process): _LOGGER.warning( - 'The yield and return statements in the process method ' - 'of %s can not be mixed.' - 'We recommend to use `yield` for emitting individual ' - ' elements and `yield from` for emitting the content ' - 'of entire iterables.', + 'Using yield and return in the process method ' + 'of %s can lead to unexpected behavior, see:' + 'https://github.com/apache/beam/issues/22969.', self.fn.__class__) # Validate the DoFn by creating a DoFnSignature diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index c06aea17612f..1c08bf8137eb 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -61,20 +61,41 @@ def process(self, element): return my_yield +class TestDoFn6(beam.DoFn): + """test the variable name containing return""" + def process(self, element): + return_test = element + yield return_test + + +class TestDoFn7(beam.DoFn): + """test the variable name containing yield""" + def process(self, element): + yield_test = element + return yield_test + + class CreateTest(unittest.TestCase): @pytest.fixture(autouse=True) def inject_fixtures(self, caplog): self._caplog = caplog def test_dofn_with_yield_and_return(self): - assert beam.ParDo(sum) - assert beam.ParDo(TestDoFn1()) - assert beam.ParDo(TestDoFn2()) - assert beam.ParDo(TestDoFn4()) - assert beam.ParDo(TestDoFn5()) + warning_text = 'Using yield and return' + + with self._caplog.at_level(logging.WARNING): + assert beam.ParDo(sum) + assert beam.ParDo(TestDoFn1()) + assert beam.ParDo(TestDoFn2()) + assert beam.ParDo(TestDoFn4()) + assert beam.ParDo(TestDoFn5()) + assert beam.ParDo(TestDoFn6()) + assert beam.ParDo(TestDoFn7()) + assert warning_text not in self._caplog.text + with self._caplog.at_level(logging.WARNING): beam.ParDo(TestDoFn3()) - assert 'The yield and return statements in' in self._caplog.text + assert warning_text in self._caplog.text if __name__ == '__main__': From 3c84ca081deca358a1e4e0254ddb267c205676ea Mon Sep 17 00:00:00 2001 From: xqhu Date: Sat, 18 Mar 2023 12:21:36 -0400 Subject: [PATCH 12/13] capture all exceptions --- sdks/python/apache_beam/transforms/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 305838a239f1..1149a9487c9c 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1436,7 +1436,8 @@ def _check_fn_use_yield_and_return(fn): if has_yield and has_return: return True return False - except TypeError: + except Exception as e: + _LOGGER.info(str(e)) return False From b979176b1bc1885186cd423d8130d9a091b7785d Mon Sep 17 00:00:00 2001 From: xqhu Date: Mon, 20 Mar 2023 14:33:43 -0400 Subject: [PATCH 13/13] Use debug for exceptions --- sdks/python/apache_beam/transforms/core.py | 2 +- sdks/python/apache_beam/transforms/core_test.py | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 1149a9487c9c..6260975b32c9 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1437,7 +1437,7 @@ def _check_fn_use_yield_and_return(fn): return True return False except Exception as e: - _LOGGER.info(str(e)) + _LOGGER.debug(str(e)) return False diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index 1c08bf8137eb..0fba28266138 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -75,6 +75,15 @@ def process(self, element): return yield_test +class TestDoFn8(beam.DoFn): + """test the code containing yield and yield from""" + def process(self, element): + if not element: + yield from [1, 2, 3] + else: + yield element + + class CreateTest(unittest.TestCase): @pytest.fixture(autouse=True) def inject_fixtures(self, caplog): @@ -91,6 +100,7 @@ def test_dofn_with_yield_and_return(self): assert beam.ParDo(TestDoFn5()) assert beam.ParDo(TestDoFn6()) assert beam.ParDo(TestDoFn7()) + assert beam.ParDo(TestDoFn8()) assert warning_text not in self._caplog.text with self._caplog.at_level(logging.WARNING):