From 2113bbd630b9b9569aea511a601489d82af2809c Mon Sep 17 00:00:00 2001 From: Maxime RIO Date: Wed, 10 Sep 2025 13:22:40 +1000 Subject: [PATCH 01/16] add name to pipeline and allow retrieval os sub-pipelines using their names --- .../src/pyearthtools/pipeline/controller.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/packages/pipeline/src/pyearthtools/pipeline/controller.py b/packages/pipeline/src/pyearthtools/pipeline/controller.py index c3805ecd..ed84c008 100644 --- a/packages/pipeline/src/pyearthtools/pipeline/controller.py +++ b/packages/pipeline/src/pyearthtools/pipeline/controller.py @@ -229,6 +229,7 @@ def __init__( iterator: Optional[Union[iterators.Iterator, tuple[iterators.Iterator, ...]]] = None, sampler: Optional[Union[samplers.Sampler, tuple[samplers.Sampler, ...]]] = None, exceptions_to_ignore: Optional[tuple[Union[str, Type[Exception]], ...]] = None, + name: str | None = None, **kwargs, ): """ @@ -311,13 +312,15 @@ def __init__( Can be used to randomly sample, drop out and more exceptions_to_ignore: Which exceptions to ignore when iterating. Defaults to None. + + name: Name of the pipeline, used in nested pipelines """ self.iterator = iterator self.sampler = sampler - + self.name = name + self._named = {} super().__init__(*steps, **kwargs) self.record_initialisation() - self.exceptions_to_ignore = exceptions_to_ignore @property @@ -394,10 +397,20 @@ def steps( # steps_list = [v] elif isinstance(v, Pipeline): steps_list.extend(v.steps) + if v.name in self._named: + assert v.name is not None + raise KeyError(f"Named pipeline '{v.name}' already exists.") + elif v.name is not None: + self._named[v.name] = v else: steps_list.append(v) self._steps = tuple(steps_list) # type: ignore + @property + def named(self) -> dict[str, Pipeline]: + """Named sub-pipelines""" + return self._named.copy() + @property def iterator(self): """Iterator of `Pipeline`""" From 4ab75e5cf3cc526a9ea3c59c0ea68494fc15da82 Mon Sep 17 00:00:00 2001 From: Maxime RIO Date: Wed, 10 Sep 2025 13:47:20 +1000 Subject: [PATCH 02/16] add a property to reverse the effects of a pipeline --- .../src/pyearthtools/pipeline/controller.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/packages/pipeline/src/pyearthtools/pipeline/controller.py b/packages/pipeline/src/pyearthtools/pipeline/controller.py index ed84c008..703c7ef5 100644 --- a/packages/pipeline/src/pyearthtools/pipeline/controller.py +++ b/packages/pipeline/src/pyearthtools/pipeline/controller.py @@ -856,3 +856,23 @@ def sample( iterator=iterator, sampler=sampler, ) + + @property + def reversed(self) -> "ReversedPipeline": + # TODO keep same indexed? + # TODO keep same sampler? + return ReversedPipeline(*self.steps) + + +class ReversedPipeline(Pipeline): + """Support class to reverse the effect of pipeline + + Given a set of pipeline steps, this pipeline will undo the effect of the steps + when applied, and vice-versa. However it keeps the original order of the steps. + """ + + def undo(self, sample): + return super().apply(sample) + + def apply(self, sample): + return super().undo(sample) From d075868afd2eaed9d90948c42caa6f9efc603f45 Mon Sep 17 00:00:00 2001 From: Maxime RIO Date: Wed, 10 Sep 2025 14:47:25 +1000 Subject: [PATCH 03/16] reverse each step of the pipeline instead of using a class for that --- .../src/pyearthtools/pipeline/controller.py | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/packages/pipeline/src/pyearthtools/pipeline/controller.py b/packages/pipeline/src/pyearthtools/pipeline/controller.py index 703c7ef5..29f48fd2 100644 --- a/packages/pipeline/src/pyearthtools/pipeline/controller.py +++ b/packages/pipeline/src/pyearthtools/pipeline/controller.py @@ -182,6 +182,10 @@ def _get_tree( return graph, prior_step +class NotReversiblePipeline(RuntimeError): + pass + + class Pipeline(_Pipeline, Index): """ Core of `pyearthtools.pipeline`, @@ -858,21 +862,21 @@ def sample( ) @property - def reversed(self) -> "ReversedPipeline": - # TODO keep same indexed? - # TODO keep same sampler? - return ReversedPipeline(*self.steps) - + def reversed(self) -> "Pipeline": + """Reversed pipeline, inverting each step and the order of the steps -class ReversedPipeline(Pipeline): - """Support class to reverse the effect of pipeline - - Given a set of pipeline steps, this pipeline will undo the effect of the steps - when applied, and vice-versa. However it keeps the original order of the steps. - """ + This method only works if each step can be transposed, i.e. implement the + transposed operation `.T`. - def undo(self, sample): - return super().apply(sample) - - def apply(self, sample): - return super().undo(sample) + Note the iterator and sampler of the pipeline are dropped. + """ + steps = [] + for step in reversed(self.steps): + if not hasattr(step, "T"): + raise NotReversiblePipeline( + "This pipeline cannot be reversed, the following step is " + f"missing the transposed property '.T':\n{step}" + ) + steps.append(step.T) + name = name if self.name is None else f"reversed_{self.name}" + return Pipeline(*steps, name=name) From 09373cffc339c27188f22bcd3a869c1c64f35dfe Mon Sep 17 00:00:00 2001 From: Maxime RIO Date: Wed, 10 Sep 2025 15:08:58 +1000 Subject: [PATCH 04/16] allow using | operator to combine pipelines --- .../src/pyearthtools/pipeline/controller.py | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/packages/pipeline/src/pyearthtools/pipeline/controller.py b/packages/pipeline/src/pyearthtools/pipeline/controller.py index 29f48fd2..83e16f18 100644 --- a/packages/pipeline/src/pyearthtools/pipeline/controller.py +++ b/packages/pipeline/src/pyearthtools/pipeline/controller.py @@ -789,8 +789,7 @@ def __contains__(self, id: Union[str, Type[Any]]) -> bool: return False def __add__(self, other: Union[_Pipeline, PipelineIndex, PipelineStep]) -> Pipeline: - """ - Combine pipelines + """Combine pipelines Will set `self` steps first then `other`. @@ -813,6 +812,27 @@ def __add__(self, other: Union[_Pipeline, PipelineIndex, PipelineStep]) -> Pipel args = (*init.pop("__args", []), other) return Pipeline(*args, **init) + def __or__(self, other: Union[_Pipeline, PipelineIndex, PipelineStep]) -> Pipeline: + """Combine pipelines + + Same as + operator, alternative syntax. + """ + return self + other + + def __ror__( + self, + other: Union[ + VALID_PIPELINE_TYPES, + _Pipeline, + PipelineIndex, + tuple[Union[VALID_PIPELINE_TYPES, Literal["map", "map_copy"]], ...], + ], + ) -> Pipeline: + """Append a step in front of a pipeline""" + init = dict(self.initialisation) + args = (other, *init.pop("__args", [])) + return Pipeline(*args, **init) + def save(self, path: Optional[Union[str, Path]] = None, only_steps: bool = False) -> Union[str, None]: """ Save `Pipeline` @@ -878,5 +898,5 @@ def reversed(self) -> "Pipeline": f"missing the transposed property '.T':\n{step}" ) steps.append(step.T) - name = name if self.name is None else f"reversed_{self.name}" + name = self.name if self.name is None else f"reversed_{self.name}" return Pipeline(*steps, name=name) From d8ca63fe45c6c8d943595064ea678355c3c1a47b Mon Sep 17 00:00:00 2001 From: Maxime RIO Date: Wed, 10 Sep 2025 16:56:37 +1000 Subject: [PATCH 05/16] store pipeline name in steps Sub-pipelines are now virtually stored inside the pipeline steps names. This ensure that merging pipelines using | operator works and preserve the names. --- .../src/pyearthtools/pipeline/controller.py | 35 +++++++++++++------ .../src/pyearthtools/pipeline/step.py | 4 +++ 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/packages/pipeline/src/pyearthtools/pipeline/controller.py b/packages/pipeline/src/pyearthtools/pipeline/controller.py index 83e16f18..274303ec 100644 --- a/packages/pipeline/src/pyearthtools/pipeline/controller.py +++ b/packages/pipeline/src/pyearthtools/pipeline/controller.py @@ -322,7 +322,6 @@ def __init__( self.iterator = iterator self.sampler = sampler self.name = name - self._named = {} super().__init__(*steps, **kwargs) self.record_initialisation() self.exceptions_to_ignore = exceptions_to_ignore @@ -401,19 +400,29 @@ def steps( # steps_list = [v] elif isinstance(v, Pipeline): steps_list.extend(v.steps) - if v.name in self._named: - assert v.name is not None - raise KeyError(f"Named pipeline '{v.name}' already exists.") - elif v.name is not None: - self._named[v.name] = v else: steps_list.append(v) + if self.name is not None: + for step in steps_list: + step.name = self.name self._steps = tuple(steps_list) # type: ignore @property def named(self) -> dict[str, Pipeline]: - """Named sub-pipelines""" - return self._named.copy() + """Retrieve sub-pipelines by name + + The iterator and sampler of the sub-pipelines are not preserved. + """ + named_steps = {} + for step in self.steps: + # skip unnamed steps, because not a PipelineStep or unset name + if (step_name := getattr(step, "name", None)) is None: + continue + named_steps.setdefault(step_name, []).append(step) + + named_pipelines = {name: Pipeline(*steps) for name, steps in named_steps.items()} + + return named_pipelines @property def iterator(self): @@ -805,10 +814,15 @@ def __add__(self, other: Union[_Pipeline, PipelineIndex, PipelineStep]) -> Pipel new_init = dict(init) new_init.update({key: val for key, val in other_init.items() if val is not None}) + # ensure steps are not renamed after the merge + if "name" in new_init: + del new_init["name"] + return Pipeline(*args, **new_init) assert isinstance(other, (PipelineIndex, PipelineStep)) - init = dict(self.initialisation) + # ensure steps are not renamed after the merge + init = {**self.initialisation, "name": None} args = (*init.pop("__args", []), other) return Pipeline(*args, **init) @@ -829,7 +843,8 @@ def __ror__( ], ) -> Pipeline: """Append a step in front of a pipeline""" - init = dict(self.initialisation) + # ensure steps are not renamed after the merge + init = {**self.initialisation, "name": None} args = (other, *init.pop("__args", [])) return Pipeline(*args, **init) diff --git a/packages/pipeline/src/pyearthtools/pipeline/step.py b/packages/pipeline/src/pyearthtools/pipeline/step.py index 64c0f5a8..bdd7b663 100644 --- a/packages/pipeline/src/pyearthtools/pipeline/step.py +++ b/packages/pipeline/src/pyearthtools/pipeline/step.py @@ -53,6 +53,7 @@ def __init__( ] ] = None, response_on_type: Literal["warn", "exception", "ignore", "filter"] = "exception", + name: str | None = None, ): """ Base `PipelineStep` @@ -69,12 +70,15 @@ def __init__( Types recognised, can be dictionary to reference different types per function Defaults to None. response_on_type (Literal['warn', 'exception', 'ignore', 'filter'], optional): Response when invalid type found. Defaults to "exception". + name (str, optional): + Name of the pipeline this step belongs too """ self.split_tuples = split_tuples self.recursively_split_tuples = recursively_split_tuples self.recognised_types = recognised_types or {} # type: ignore self.response_on_type = response_on_type + self.name = name @abstractmethod def run(self, sample): From 4a848e3eaf59a9171b1b3a602f98569e555d287b Mon Sep 17 00:00:00 2001 From: Tennessee Leeuwenburg Date: Mon, 15 Sep 2025 13:34:11 +1000 Subject: [PATCH 06/16] Add named pipe demo --- notebooks/NamedPipe.ipynb | 112 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 notebooks/NamedPipe.ipynb diff --git a/notebooks/NamedPipe.ipynb b/notebooks/NamedPipe.ipynb new file mode 100644 index 00000000..308d0f1c --- /dev/null +++ b/notebooks/NamedPipe.ipynb @@ -0,0 +1,112 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "8837d904-57cb-4f2c-a33d-0e227fb800bd", + "metadata": {}, + "outputs": [], + "source": [ + "import pyearthtools.pipeline\n", + "import pyearthtools.data\n", + "import pyearthtools.tutorial\n", + "\n", + "import os\n", + "os.environ['PETPROJECT'] = os.path.expanduser(\"~\") + '/dev/proj/petcache'\n", + "workdir = os.environ['PETPROJECT']\n", + "file_location = workdir + '/mini.nc'\n", + "\n", + "p1 = pyearthtools.pipeline.Pipeline(\n", + " pyearthtools.tutorial.ERA5DataClass.ERA5LowResDemoIndex([\n", + " '10m_u_component_of_wind', \n", + " '10m_v_component_of_wind', \n", + " 'mean_sea_level_pressure',\n", + " '2m_temperature' \n", + " ], filename_override=file_location),\n", + " pyearthtools.pipeline.operations.xarray.Sort(\n", + " [\"2m_temperature\", \"u_component_of_wind\", \"v_component_of_wind\", \"vorticity\", \"geopotential\"]\n", + " ),\n", + " pyearthtools.data.transforms.coordinates.StandardLongitude(type=\"0-360\"),\n", + " name=\"make_flat_xarray\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "92fe751b-2374-413d-a16b-7a9769d2c3fa", + "metadata": {}, + "outputs": [], + "source": [ + "p2 = pyearthtools.pipeline.Pipeline(\n", + " pyearthtools.pipeline.modifications.TemporalRetrieval(\n", + " concat=True, samples=((0, 1), (6, 1, 6)) # Input = 1 sample from time T=0 hours. Output = T+6,+12,+18,+24\n", + " ), \n", + " pyearthtools.pipeline.operations.xarray.normalisation.MagicNorm(cache_dir=workdir), # Incremental normalisation calculator \n", + " pyearthtools.pipeline.operations.xarray.conversion.ToNumpy(),\n", + " pyearthtools.pipeline.operations.numpy.reshape.Rearrange('c t h w -> t h w c'), # channel batch height width -> batch height width channel\n", + " name='to_netcdf'\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d5212f76-7b2f-4181-842d-e627eea1beaa", + "metadata": {}, + "outputs": [], + "source": [ + "p3 = p1 | p2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "50b3e1be-b7a9-4056-a942-f3f5da2acdd5", + "metadata": {}, + "outputs": [], + "source": [ + "p3.named['make_flat_xarray']['20220202T00']" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7699bc23-2bc7-4f45-a0a8-11729546a64f", + "metadata": {}, + "outputs": [], + "source": [ + "p3['20220202T00']" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e439990e-0c71-4473-98a6-9bb83fbd85a8", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} From 317d76b02e6dc319a6db76fae735f9cf772b972d Mon Sep 17 00:00:00 2001 From: Maxime RIO Date: Mon, 15 Sep 2025 16:52:58 +1200 Subject: [PATCH 07/16] add example notebook for the new pipeline features (WIP) --- notebooks/pipeline/Extras.ipynb | 4332 +++++++++++++++++++++++++++++++ 1 file changed, 4332 insertions(+) create mode 100644 notebooks/pipeline/Extras.ipynb diff --git a/notebooks/pipeline/Extras.ipynb b/notebooks/pipeline/Extras.ipynb new file mode 100644 index 00000000..fdce7527 --- /dev/null +++ b/notebooks/pipeline/Extras.ipynb @@ -0,0 +1,4332 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "63287ab4-74ec-4af2-9d70-5ed866ff1798", + "metadata": {}, + "source": [ + "# Additional Pipeline Syntaxes\n", + "\n", + "This notebooks introduces some new syntaxes to ease creation and manipulation of `Pipeline`objects:\n", + "\n", + "- named pipelines,\n", + "- combination using `|` operation,\n", + "- reversing pipelines." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "cbe5ebfb-f820-4b77-ad0f-91d507a5195f", + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "\n", + "def repr_ndarray(arr):\n", + " return f\"array(..., shape={arr.shape}, dtype={arr.dtype})\"\n", + "\n", + "np.set_printoptions(override_repr=repr_ndarray)" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "160c063a-de2e-4a28-a22d-d156d4bc06f3", + "metadata": {}, + "outputs": [], + "source": [ + "import pyearthtools.data\n", + "import pyearthtools.pipeline" + ] + }, + { + "cell_type": "markdown", + "id": "964e9683-2999-4a8b-8a1f-407dc24d4ffa", + "metadata": {}, + "source": [ + "To illustrate these features, we'll reuse the same pipeline as the one used in the [End-to-end CNN Training Example](../tutorial/CNN-Model-Training.ipynb).\n", + "Here is the original definition of the pipeline." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "0b273573-99d1-4c8e-834d-7c0183674774", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Pipeline\n",
+       "\tDescription                    `pyearthtools.pipeline` Data Pipeline\n",
+       "\n",
+       "\n",
+       "\tInitialisation                 \n",
+       "\t\t exceptions_to_ignore           None\n",
+       "\t\t iterator                       None\n",
+       "\t\t name                           None\n",
+       "\t\t sampler                        None\n",
+       "\tSteps                          \n",
+       "\t\t weatherbench.WB2ERA5           {'WB2ERA5': {'level': '[850]', 'license_ok': 'True', 'resolution': "'64x32'", 'variables': "['2m_temperature', 'u', 'v', 'geopotential', 'vorticity']"}}\n",
+       "\t\t sort.Sort                      {'Sort': {'order': "['2m_temperature', 'u_component_of_wind', 'v_component_of_wind', 'vorticity', 'geopotential']", 'strict': 'False'}}\n",
+       "\t\t coordinates.StandardLongitude  {'StandardLongitude': {'longitude_name': "'longitude'", 'type': "'0-360'"}}\n",
+       "\t\t reshape.CoordinateFlatten      {'CoordinateFlatten': {'__args': '()', 'coordinate': "'level'", 'skip_missing': 'False'}}\n",
+       "\t\t idx_modification.TemporalRetrieval {'TemporalRetrieval': {'concat': 'True', 'delta_unit': 'None', 'merge_function': 'None', 'merge_kwargs': 'None', 'samples': '((0, 1), (6, 1))'}}\n",
+       "\t\t conversion.ToNumpy             {'ToNumpy': {'reference_dataset': 'None', 'run_parallel': 'False', 'saved_records': 'None', 'warn': 'True'}}\n",
+       "\t\t reshape.Rearrange              {'Rearrange': {'rearrange': "'c t h w -> t c h w'", 'rearrange_kwargs': 'None', 'reverse_rearrange': 'None', 'skip': 'False'}}\n",
+       "\t\t reshape.Squeeze                {'Squeeze': {'axis': '0'}}
" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "

Graph

" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "WB2ERA5_1ad8d626-eae0-4784-bac9-1a894917b79f\n", + "\n", + "weatherbench.WB2ERA5\n", + "\n", + "\n", + "\n", + "Sort_f77d5219-61cd-4ff5-be96-35b2c59ebb55\n", + "\n", + "sort.Sort\n", + "\n", + "\n", + "\n", + "WB2ERA5_1ad8d626-eae0-4784-bac9-1a894917b79f->Sort_f77d5219-61cd-4ff5-be96-35b2c59ebb55\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "StandardLongitude_ba7533b2-5052-46b3-a688-d8c8652f5b80\n", + "\n", + "coordinates.StandardLongitude\n", + "\n", + "\n", + "\n", + "Sort_f77d5219-61cd-4ff5-be96-35b2c59ebb55->StandardLongitude_ba7533b2-5052-46b3-a688-d8c8652f5b80\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "CoordinateFlatten_6226876f-4298-4131-b854-26721df3fd95\n", + "\n", + "reshape.CoordinateFlatten\n", + "\n", + "\n", + "\n", + "StandardLongitude_ba7533b2-5052-46b3-a688-d8c8652f5b80->CoordinateFlatten_6226876f-4298-4131-b854-26721df3fd95\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "TemporalRetrieval_5436c12e-0a24-42d0-a3f7-804670686978\n", + "\n", + "idx_modification.TemporalRetrieval\n", + "\n", + "\n", + "\n", + "CoordinateFlatten_6226876f-4298-4131-b854-26721df3fd95->TemporalRetrieval_5436c12e-0a24-42d0-a3f7-804670686978\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "ToNumpy_96533c63-4543-4056-a3bb-49cc0e1d65e4\n", + "\n", + "conversion.ToNumpy\n", + "\n", + "\n", + "\n", + "TemporalRetrieval_5436c12e-0a24-42d0-a3f7-804670686978->ToNumpy_96533c63-4543-4056-a3bb-49cc0e1d65e4\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "Rearrange_be57010e-b32d-4397-97bf-72fbafcfa9b8\n", + "\n", + "reshape.Rearrange\n", + "\n", + "\n", + "\n", + "ToNumpy_96533c63-4543-4056-a3bb-49cc0e1d65e4->Rearrange_be57010e-b32d-4397-97bf-72fbafcfa9b8\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "Squeeze_7546bef8-44a2-4946-8c49-17e10681afd0\n", + "\n", + "reshape.Squeeze\n", + "\n", + "\n", + "\n", + "Rearrange_be57010e-b32d-4397-97bf-72fbafcfa9b8->Squeeze_7546bef8-44a2-4946-8c49-17e10681afd0\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "pyearthtools.pipeline.Pipeline(\n", + " pyearthtools.data.download.weatherbench.WB2ERA5(\n", + " variables=[\"2m_temperature\", \"u\", \"v\", \"geopotential\", \"vorticity\"],\n", + " level=[850],\n", + " license_ok=True,\n", + " ),\n", + " pyearthtools.pipeline.operations.xarray.Sort(\n", + " [\"2m_temperature\", \"u_component_of_wind\", \"v_component_of_wind\", \"vorticity\", \"geopotential\"]\n", + " ),\n", + " pyearthtools.data.transforms.coordinates.StandardLongitude(type=\"0-360\"),\n", + " pyearthtools.pipeline.operations.xarray.reshape.CoordinateFlatten(\"level\"),\n", + " pyearthtools.pipeline.modifications.TemporalRetrieval(\n", + " concat=True, samples=((0, 1), (6, 1))\n", + " ),\n", + " pyearthtools.pipeline.operations.xarray.conversion.ToNumpy(),\n", + " pyearthtools.pipeline.operations.numpy.reshape.Rearrange(\"c t h w -> t c h w\"),\n", + " pyearthtools.pipeline.operations.numpy.reshape.Squeeze(axis=0),\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "b60a86ed-d610-410f-8b03-aa535c9ba99a", + "metadata": {}, + "source": [ + "## Named pipelines\n", + "\n", + "When developing a new pipeline, it can be convenient to separate the main stages of a long pipeline into these sub-pipelines, and assemble them into one big pipeline afterwards.\n", + "However, once the pipeline has been assembled, we loose access to the sub-pipelines.\n", + "To solve this, we can add a **name** to each of the sub-pipelines.\n", + "Then, in the final pipeline, we can recover them via the `.named` attribute, which is a dictionary of all the named sub-pipelines contained in a pipeline.\n", + "\n", + "In the following example, we build the same pipeline but split into 3 stages:\n", + "- a named pipeline \"prepare\", to fetch the data and apply few transformation on it,\n", + "- a temporal retrieval step, to generate the tuple of (features, target) samples,\n", + "- a named pipeline \"reshape\", to do the final convertion to numpy and reshaping." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "9882f56e-cf55-4938-9bd8-0c6b3240541e", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Pipeline\n",
+       "\tDescription                    `pyearthtools.pipeline` Data Pipeline\n",
+       "\n",
+       "\n",
+       "\tInitialisation                 \n",
+       "\t\t exceptions_to_ignore           None\n",
+       "\t\t iterator                       None\n",
+       "\t\t name                           None\n",
+       "\t\t sampler                        None\n",
+       "\tSteps                          \n",
+       "\t\t weatherbench.WB2ERA5           {'WB2ERA5': {'level': '[850]', 'license_ok': 'True', 'resolution': "'64x32'", 'variables': "['2m_temperature', 'u', 'v', 'geopotential', 'vorticity']"}}\n",
+       "\t\t sort.Sort                      {'Sort': {'order': "['2m_temperature', 'u_component_of_wind', 'v_component_of_wind', 'vorticity', 'geopotential']", 'strict': 'False'}}\n",
+       "\t\t coordinates.StandardLongitude  {'StandardLongitude': {'longitude_name': "'longitude'", 'type': "'0-360'"}}\n",
+       "\t\t reshape.CoordinateFlatten      {'CoordinateFlatten': {'__args': '()', 'coordinate': "'level'", 'skip_missing': 'False'}}\n",
+       "\t\t idx_modification.TemporalRetrieval {'TemporalRetrieval': {'concat': 'True', 'delta_unit': 'None', 'merge_function': 'None', 'merge_kwargs': 'None', 'samples': '((0, 1), (6, 1))'}}\n",
+       "\t\t conversion.ToNumpy             {'ToNumpy': {'reference_dataset': 'None', 'run_parallel': 'False', 'saved_records': 'None', 'warn': 'True'}}\n",
+       "\t\t reshape.Rearrange              {'Rearrange': {'rearrange': "'c t h w -> t c h w'", 'rearrange_kwargs': 'None', 'reverse_rearrange': 'None', 'skip': 'False'}}\n",
+       "\t\t reshape.Squeeze                {'Squeeze': {'axis': '0'}}
" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "

Graph

" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "WB2ERA5_12058809-2d16-40b4-991a-939db79c0285\n", + "\n", + "weatherbench.WB2ERA5\n", + "\n", + "\n", + "\n", + "Sort_9ab339c0-d2e0-4626-ad0b-bdb57aa07c29\n", + "\n", + "sort.Sort\n", + "\n", + "\n", + "\n", + "WB2ERA5_12058809-2d16-40b4-991a-939db79c0285->Sort_9ab339c0-d2e0-4626-ad0b-bdb57aa07c29\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "StandardLongitude_1159f9e4-f9da-4801-b103-51ded230ae11\n", + "\n", + "coordinates.StandardLongitude\n", + "\n", + "\n", + "\n", + "Sort_9ab339c0-d2e0-4626-ad0b-bdb57aa07c29->StandardLongitude_1159f9e4-f9da-4801-b103-51ded230ae11\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "CoordinateFlatten_32a0b446-d668-43c1-93e7-18c1b4e23978\n", + "\n", + "reshape.CoordinateFlatten\n", + "\n", + "\n", + "\n", + "StandardLongitude_1159f9e4-f9da-4801-b103-51ded230ae11->CoordinateFlatten_32a0b446-d668-43c1-93e7-18c1b4e23978\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "TemporalRetrieval_e35bf2b1-16b6-4cac-bcbf-11e3feaab0b5\n", + "\n", + "idx_modification.TemporalRetrieval\n", + "\n", + "\n", + "\n", + "CoordinateFlatten_32a0b446-d668-43c1-93e7-18c1b4e23978->TemporalRetrieval_e35bf2b1-16b6-4cac-bcbf-11e3feaab0b5\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "ToNumpy_f1a52042-2e34-4799-9cf3-48186c6dbe09\n", + "\n", + "conversion.ToNumpy\n", + "\n", + "\n", + "\n", + "TemporalRetrieval_e35bf2b1-16b6-4cac-bcbf-11e3feaab0b5->ToNumpy_f1a52042-2e34-4799-9cf3-48186c6dbe09\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "Rearrange_78c02cd7-3cad-4548-8c37-5e0c4ba0131e\n", + "\n", + "reshape.Rearrange\n", + "\n", + "\n", + "\n", + "ToNumpy_f1a52042-2e34-4799-9cf3-48186c6dbe09->Rearrange_78c02cd7-3cad-4548-8c37-5e0c4ba0131e\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "Squeeze_99f80f77-14a6-462a-bfce-8aed73af8f44\n", + "\n", + "reshape.Squeeze\n", + "\n", + "\n", + "\n", + "Rearrange_78c02cd7-3cad-4548-8c37-5e0c4ba0131e->Squeeze_99f80f77-14a6-462a-bfce-8aed73af8f44\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "pipeline = pyearthtools.pipeline.Pipeline(\n", + " pyearthtools.pipeline.Pipeline(\n", + " pyearthtools.data.download.weatherbench.WB2ERA5(\n", + " variables=[\"2m_temperature\", \"u\", \"v\", \"geopotential\", \"vorticity\"],\n", + " level=[850],\n", + " license_ok=True,\n", + " ),\n", + " pyearthtools.pipeline.operations.xarray.Sort(\n", + " [\"2m_temperature\", \"u_component_of_wind\", \"v_component_of_wind\", \"vorticity\", \"geopotential\"]\n", + " ),\n", + " pyearthtools.data.transforms.coordinates.StandardLongitude(type=\"0-360\"),\n", + " pyearthtools.pipeline.operations.xarray.reshape.CoordinateFlatten(\"level\"),\n", + " name=\"prepare\"\n", + " ),\n", + " pyearthtools.pipeline.modifications.TemporalRetrieval(concat=True, samples=((0, 1), (6, 1))),\n", + " pyearthtools.pipeline.Pipeline(\n", + " pyearthtools.pipeline.operations.xarray.conversion.ToNumpy(),\n", + " pyearthtools.pipeline.operations.numpy.reshape.Rearrange(\"c t h w -> t c h w\"),\n", + " pyearthtools.pipeline.operations.numpy.reshape.Squeeze(axis=0),\n", + " name=\"reshape\"\n", + " ),\n", + ")\n", + "pipeline" + ] + }, + { + "cell_type": "markdown", + "id": "a341cbf7-65a6-4ee1-a263-4b3b2b7f703f", + "metadata": {}, + "source": [ + "We can inspect the `.named` attribute to see which named pipelines are accessible within a pipeline." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "496c427e-37cf-45a9-8424-4bc0eb594997", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "dict_keys(['prepare', 'reshape'])" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pipeline.named.keys()" + ] + }, + { + "cell_type": "markdown", + "id": "a0598e50-0f1a-409d-9bab-a131fcaaa9f9", + "metadata": {}, + "source": [ + "Then we can access the named pipeline \"prepare\" as follows." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "fcb727e8-274f-49e7-b743-386fe665af49", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Pipeline\n",
+       "\tDescription                    `pyearthtools.pipeline` Data Pipeline\n",
+       "\n",
+       "\n",
+       "\tInitialisation                 \n",
+       "\t\t exceptions_to_ignore           None\n",
+       "\t\t iterator                       None\n",
+       "\t\t name                           None\n",
+       "\t\t sampler                        None\n",
+       "\tSteps                          \n",
+       "\t\t weatherbench.WB2ERA5           {'WB2ERA5': {'level': '[850]', 'license_ok': 'True', 'resolution': "'64x32'", 'variables': "['2m_temperature', 'u', 'v', 'geopotential', 'vorticity']"}}\n",
+       "\t\t sort.Sort                      {'Sort': {'order': "['2m_temperature', 'u_component_of_wind', 'v_component_of_wind', 'vorticity', 'geopotential']", 'strict': 'False'}}\n",
+       "\t\t coordinates.StandardLongitude  {'StandardLongitude': {'longitude_name': "'longitude'", 'type': "'0-360'"}}\n",
+       "\t\t reshape.CoordinateFlatten      {'CoordinateFlatten': {'__args': '()', 'coordinate': "'level'", 'skip_missing': 'False'}}
" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "

Graph

" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "WB2ERA5_6afe4e3a-07bb-48c3-9b86-b69a478a49c7\n", + "\n", + "weatherbench.WB2ERA5\n", + "\n", + "\n", + "\n", + "Sort_8400dbbd-07c8-4faf-bd17-3adcc91be05f\n", + "\n", + "sort.Sort\n", + "\n", + "\n", + "\n", + "WB2ERA5_6afe4e3a-07bb-48c3-9b86-b69a478a49c7->Sort_8400dbbd-07c8-4faf-bd17-3adcc91be05f\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "StandardLongitude_7e740e0c-9095-44b6-9829-198e8b899653\n", + "\n", + "coordinates.StandardLongitude\n", + "\n", + "\n", + "\n", + "Sort_8400dbbd-07c8-4faf-bd17-3adcc91be05f->StandardLongitude_7e740e0c-9095-44b6-9829-198e8b899653\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "CoordinateFlatten_51e780b3-3b56-490d-984c-9076de407742\n", + "\n", + "reshape.CoordinateFlatten\n", + "\n", + "\n", + "\n", + "StandardLongitude_7e740e0c-9095-44b6-9829-198e8b899653->CoordinateFlatten_51e780b3-3b56-490d-984c-9076de407742\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "pipeline.named[\"prepare\"]" + ] + }, + { + "cell_type": "markdown", + "id": "bf6e89bb-683d-4790-8e29-b7b1c69532e0", + "metadata": {}, + "source": [ + "And even use it without the rest of the pipeline, as it includes a data source." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "221cd83e-3e38-4763-bccd-54a0e185a378", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
<xarray.Dataset> Size: 42kB\n",
+       "Dimensions:                 (latitude: 32, longitude: 64, time: 1)\n",
+       "Coordinates:\n",
+       "  * latitude                (latitude) float64 256B -87.19 -81.56 ... 87.19\n",
+       "  * longitude               (longitude) float64 512B 0.0 5.625 ... 348.8 354.4\n",
+       "  * time                    (time) datetime64[ns] 8B 2021-01-01\n",
+       "Data variables:\n",
+       "    2m_temperature          (time, longitude, latitude) float32 8kB dask.array<chunksize=(1, 64, 32), meta=np.ndarray>\n",
+       "    u_component_of_wind850  (time, longitude, latitude) float32 8kB dask.array<chunksize=(1, 64, 32), meta=np.ndarray>\n",
+       "    v_component_of_wind850  (time, longitude, latitude) float32 8kB dask.array<chunksize=(1, 64, 32), meta=np.ndarray>\n",
+       "    vorticity850            (time, longitude, latitude) float32 8kB dask.array<chunksize=(1, 64, 32), meta=np.ndarray>\n",
+       "    geopotential850         (time, longitude, latitude) float32 8kB dask.array<chunksize=(1, 64, 32), meta=np.ndarray>\n",
+       "Attributes:\n",
+       "    level-dtype:  int64
" + ], + "text/plain": [ + " Size: 42kB\n", + "Dimensions: (latitude: 32, longitude: 64, time: 1)\n", + "Coordinates:\n", + " * latitude (latitude) float64 256B -87.19 -81.56 ... 87.19\n", + " * longitude (longitude) float64 512B 0.0 5.625 ... 348.8 354.4\n", + " * time (time) datetime64[ns] 8B 2021-01-01\n", + "Data variables:\n", + " 2m_temperature (time, longitude, latitude) float32 8kB dask.array\n", + " u_component_of_wind850 (time, longitude, latitude) float32 8kB dask.array\n", + " v_component_of_wind850 (time, longitude, latitude) float32 8kB dask.array\n", + " vorticity850 (time, longitude, latitude) float32 8kB dask.array\n", + " geopotential850 (time, longitude, latitude) float32 8kB dask.array\n", + "Attributes:\n", + " level-dtype: int64" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pipeline.named[\"prepare\"][\"20210101T00\"]" + ] + }, + { + "cell_type": "markdown", + "id": "1d8275f2-252a-411f-93eb-17541050f726", + "metadata": {}, + "source": [ + "## Pipe operator\n", + "\n", + "The `Pipeline` object also support the `|` operator (logical or) as a way to combine multiple pipelines together.\n", + "This has the same effect as creating a new `Pipeline` object as a combination of 2 pipelines (or a pipeline and a step).\n", + "This has not additional effect and can be used to increase the readability when building a long pipeline.\n", + "\n", + "In the following example, we now create the sub-pipelines and the temporal retrieval step separately, as distinct objects, then combine them using the `|` operator." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "19bcdc11-aeea-4a3d-b702-cad8073ba3f7", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Pipeline\n",
+       "\tDescription                    `pyearthtools.pipeline` Data Pipeline\n",
+       "\n",
+       "\n",
+       "\tInitialisation                 \n",
+       "\t\t exceptions_to_ignore           None\n",
+       "\t\t iterator                       None\n",
+       "\t\t name                           None\n",
+       "\t\t sampler                        None\n",
+       "\tSteps                          \n",
+       "\t\t weatherbench.WB2ERA5           {'WB2ERA5': {'level': '[850]', 'license_ok': 'True', 'resolution': "'64x32'", 'variables': "['2m_temperature', 'u', 'v', 'geopotential', 'vorticity']"}}\n",
+       "\t\t sort.Sort                      {'Sort': {'order': "['2m_temperature', 'u_component_of_wind', 'v_component_of_wind', 'vorticity', 'geopotential']", 'strict': 'False'}}\n",
+       "\t\t coordinates.StandardLongitude  {'StandardLongitude': {'longitude_name': "'longitude'", 'type': "'0-360'"}}\n",
+       "\t\t reshape.CoordinateFlatten      {'CoordinateFlatten': {'__args': '()', 'coordinate': "'level'", 'skip_missing': 'False'}}\n",
+       "\t\t idx_modification.TemporalRetrieval {'TemporalRetrieval': {'concat': 'True', 'delta_unit': 'None', 'merge_function': 'None', 'merge_kwargs': 'None', 'samples': '((0, 1), (6, 1))'}}\n",
+       "\t\t conversion.ToNumpy             {'ToNumpy': {'reference_dataset': 'None', 'run_parallel': 'False', 'saved_records': 'None', 'warn': 'True'}}\n",
+       "\t\t reshape.Rearrange              {'Rearrange': {'rearrange': "'c t h w -> t c h w'", 'rearrange_kwargs': 'None', 'reverse_rearrange': 'None', 'skip': 'False'}}\n",
+       "\t\t reshape.Squeeze                {'Squeeze': {'axis': '0'}}
" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "

Graph

" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "WB2ERA5_4c4cb9ed-946c-4fab-919d-4516d83de95b\n", + "\n", + "weatherbench.WB2ERA5\n", + "\n", + "\n", + "\n", + "Sort_9ca1875c-5b54-4492-a7d0-a08e6e722eb1\n", + "\n", + "sort.Sort\n", + "\n", + "\n", + "\n", + "WB2ERA5_4c4cb9ed-946c-4fab-919d-4516d83de95b->Sort_9ca1875c-5b54-4492-a7d0-a08e6e722eb1\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "StandardLongitude_3ad1b188-802f-479f-a601-85eeed77197f\n", + "\n", + "coordinates.StandardLongitude\n", + "\n", + "\n", + "\n", + "Sort_9ca1875c-5b54-4492-a7d0-a08e6e722eb1->StandardLongitude_3ad1b188-802f-479f-a601-85eeed77197f\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "CoordinateFlatten_1cdf46f2-1987-480d-a634-7b2b55c4c458\n", + "\n", + "reshape.CoordinateFlatten\n", + "\n", + "\n", + "\n", + "StandardLongitude_3ad1b188-802f-479f-a601-85eeed77197f->CoordinateFlatten_1cdf46f2-1987-480d-a634-7b2b55c4c458\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "TemporalRetrieval_2ec556f8-4ed5-484d-95e2-7b3e6ed688d6\n", + "\n", + "idx_modification.TemporalRetrieval\n", + "\n", + "\n", + "\n", + "CoordinateFlatten_1cdf46f2-1987-480d-a634-7b2b55c4c458->TemporalRetrieval_2ec556f8-4ed5-484d-95e2-7b3e6ed688d6\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "ToNumpy_7257b0cd-dc9c-4941-aa43-5d02da9409be\n", + "\n", + "conversion.ToNumpy\n", + "\n", + "\n", + "\n", + "TemporalRetrieval_2ec556f8-4ed5-484d-95e2-7b3e6ed688d6->ToNumpy_7257b0cd-dc9c-4941-aa43-5d02da9409be\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "Rearrange_ebc53972-5594-40a9-bb91-02450b403209\n", + "\n", + "reshape.Rearrange\n", + "\n", + "\n", + "\n", + "ToNumpy_7257b0cd-dc9c-4941-aa43-5d02da9409be->Rearrange_ebc53972-5594-40a9-bb91-02450b403209\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "Squeeze_5147cc33-5ac6-4913-82ef-24e818e85f10\n", + "\n", + "reshape.Squeeze\n", + "\n", + "\n", + "\n", + "Rearrange_ebc53972-5594-40a9-bb91-02450b403209->Squeeze_5147cc33-5ac6-4913-82ef-24e818e85f10\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "prepare = pyearthtools.pipeline.Pipeline(\n", + " pyearthtools.data.download.weatherbench.WB2ERA5(\n", + " variables=[\"2m_temperature\", \"u\", \"v\", \"geopotential\", \"vorticity\"],\n", + " level=[850],\n", + " license_ok=True,\n", + " ),\n", + " pyearthtools.pipeline.operations.xarray.Sort(\n", + " [\"2m_temperature\", \"u_component_of_wind\", \"v_component_of_wind\", \"vorticity\", \"geopotential\"]\n", + " ),\n", + " pyearthtools.data.transforms.coordinates.StandardLongitude(type=\"0-360\"),\n", + " pyearthtools.pipeline.operations.xarray.reshape.CoordinateFlatten(\"level\"),\n", + " name=\"prepare\"\n", + ")\n", + "\n", + "retrieve = pyearthtools.pipeline.modifications.TemporalRetrieval(concat=True, samples=((0, 1), (6, 1)))\n", + "\n", + "reshape = pyearthtools.pipeline.Pipeline(\n", + " pyearthtools.pipeline.operations.xarray.conversion.ToNumpy(),\n", + " pyearthtools.pipeline.operations.numpy.reshape.Rearrange(\"c t h w -> t c h w\"),\n", + " pyearthtools.pipeline.operations.numpy.reshape.Squeeze(axis=0),\n", + " name=\"reshape\"\n", + ")\n", + "\n", + "pipeline = prepare | retrieve | reshape\n", + "pipeline" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "15781661-20ac-4235-aa75-e05184e9d1b4", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "(array(..., shape=(5, 64, 32), dtype=float32),\n", + " array(..., shape=(5, 64, 32), dtype=float32))" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pipeline[\"20210101T00\"]" + ] + }, + { + "cell_type": "markdown", + "id": "f97ac27d-1e23-404d-b0ff-a265c6a04530", + "metadata": {}, + "source": [ + "## Reversed pipeline\n", + "\n", + "Some pipeline can be reversed, i.e. undoing their effect.\n", + "This is only possible if all steps of a pipeline can be *transposed*, via their `.T` attribute.\n", + "The reverse of a pipeline, if feasible, can be obtained via the `.reversed` atttribute.\n", + "\n", + "In the following example, we'll reverse the \"reshape\" sub-pipeline, which is only made of reversible steps." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "0397753a-3fbd-453f-8402-7ff1db67745b", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Pipeline\n",
+       "\tDescription                    `pyearthtools.pipeline` Data Pipeline\n",
+       "\n",
+       "\n",
+       "\tInitialisation                 \n",
+       "\t\t exceptions_to_ignore           None\n",
+       "\t\t iterator                       None\n",
+       "\t\t name                           None\n",
+       "\t\t sampler                        None\n",
+       "\tSteps                          \n",
+       "\t\t conversion.ToNumpy             {'ToNumpy': {'reference_dataset': 'None', 'run_parallel': 'False', 'saved_records': 'None', 'warn': 'True'}}\n",
+       "\t\t reshape.Rearrange              {'Rearrange': {'rearrange': "'c t h w -> t c h w'", 'rearrange_kwargs': 'None', 'reverse_rearrange': 'None', 'skip': 'False'}}\n",
+       "\t\t reshape.Squeeze                {'Squeeze': {'axis': '0'}}
" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "

Graph

" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "ToNumpy_347c19cc-3674-464e-b435-4261946bc1ca\n", + "\n", + "conversion.ToNumpy\n", + "\n", + "\n", + "\n", + "Rearrange_b0ef60f9-a425-4d54-ad22-3128393ddf82\n", + "\n", + "reshape.Rearrange\n", + "\n", + "\n", + "\n", + "ToNumpy_347c19cc-3674-464e-b435-4261946bc1ca->Rearrange_b0ef60f9-a425-4d54-ad22-3128393ddf82\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "Squeeze_2c2bf1ee-a439-4215-8ee3-2b71e7809ebe\n", + "\n", + "reshape.Squeeze\n", + "\n", + "\n", + "\n", + "Rearrange_b0ef60f9-a425-4d54-ad22-3128393ddf82->Squeeze_2c2bf1ee-a439-4215-8ee3-2b71e7809ebe\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "pipeline.named[\"reshape\"]" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "1b202180-6456-4989-99a4-fded7115a6c7", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Pipeline\n",
+       "\tDescription                    `pyearthtools.pipeline` Data Pipeline\n",
+       "\n",
+       "\n",
+       "\tInitialisation                 \n",
+       "\t\t exceptions_to_ignore           None\n",
+       "\t\t iterator                       None\n",
+       "\t\t name                           None\n",
+       "\t\t sampler                        None\n",
+       "\tSteps                          \n",
+       "\t\t reshape.Squeeze                {'Squeeze': {'axis': '0'}}\n",
+       "\t\t reshape.Rearrange              {'Rearrange': {'rearrange': "'c t h w -> t c h w'", 'rearrange_kwargs': 'None', 'reverse_rearrange': 'None', 'skip': 'False'}}\n",
+       "\t\t conversion.ToNumpy             {'ToNumpy': {'reference_dataset': 'None', 'run_parallel': 'False', 'saved_records': 'None', 'warn': 'True'}}
" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "

Graph

" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "Squeeze_83b70e0f-955f-4fdd-a8f4-5e186332f89c\n", + "\n", + "reshape.Squeeze\n", + "\n", + "\n", + "\n", + "Rearrange_cd3626c6-eeff-4f8e-865f-f650eddf68c3\n", + "\n", + "reshape.Rearrange\n", + "\n", + "\n", + "\n", + "Squeeze_83b70e0f-955f-4fdd-a8f4-5e186332f89c->Rearrange_cd3626c6-eeff-4f8e-865f-f650eddf68c3\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "ToNumpy_71ad1674-b46f-4e91-a4f5-c2be3910ff5b\n", + "\n", + "conversion.ToNumpy\n", + "\n", + "\n", + "\n", + "Rearrange_cd3626c6-eeff-4f8e-865f-f650eddf68c3->ToNumpy_71ad1674-b46f-4e91-a4f5-c2be3910ff5b\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "pipeline.named[\"reshape\"].reversed" + ] + }, + { + "cell_type": "markdown", + "id": "ed456cfb-fcff-43a8-ab82-88310af44741", + "metadata": {}, + "source": [ + "To test it, we'll fetch a sample from the \"prepare\" pipeline, reshape it and apply the reverse." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "0fbda3f4-8f44-4032-a218-115fe21fd664", + "metadata": {}, + "outputs": [ + { + "ename": "RuntimeError", + "evalue": "Data hasn't been converted to arrays with this. So data cannot be undone", + "output_type": "error", + "traceback": [ + "\u001b[31m---------------------------------------------------------------------------\u001b[39m", + "\u001b[31mRuntimeError\u001b[39m Traceback (most recent call last)", + "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[12]\u001b[39m\u001b[32m, line 2\u001b[39m\n\u001b[32m 1\u001b[39m \u001b[38;5;66;03m# FIXME\u001b[39;00m\n\u001b[32m----> \u001b[39m\u001b[32m2\u001b[39m \u001b[43m(\u001b[49m\u001b[43mprepare\u001b[49m\u001b[43m \u001b[49m\u001b[43m|\u001b[49m\u001b[43m \u001b[49m\u001b[43mreshape\u001b[49m\u001b[43m \u001b[49m\u001b[43m|\u001b[49m\u001b[43m \u001b[49m\u001b[43mreshape\u001b[49m\u001b[43m.\u001b[49m\u001b[43mreversed\u001b[49m\u001b[43m)\u001b[49m\u001b[43m[\u001b[49m\u001b[33;43m\"\u001b[39;49m\u001b[33;43m20210101T00\u001b[39;49m\u001b[33;43m\"\u001b[39;49m\u001b[43m]\u001b[49m\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/projects/pyearthtools/named_pipelines/packages/pipeline/src/pyearthtools/pipeline/controller.py:572\u001b[39m, in \u001b[36mPipeline.__getitem__\u001b[39m\u001b[34m(self, idx)\u001b[39m\n\u001b[32m 570\u001b[39m sample = step.apply(sample)\n\u001b[32m 571\u001b[39m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[32m--> \u001b[39m\u001b[32m572\u001b[39m sample = \u001b[43mstep\u001b[49m\u001b[43m(\u001b[49m\u001b[43msample\u001b[49m\u001b[43m)\u001b[49m \u001b[38;5;66;03m# type: ignore\u001b[39;00m\n\u001b[32m 574\u001b[39m \u001b[38;5;66;03m# We've done all the pipeline steps, return the value\u001b[39;00m\n\u001b[32m 575\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m sample\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/projects/pyearthtools/named_pipelines/packages/pipeline/src/pyearthtools/pipeline/step.py:168\u001b[39m, in \u001b[36mPipelineStep.__call__\u001b[39m\u001b[34m(self, sample)\u001b[39m\n\u001b[32m 166\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34m__call__\u001b[39m(\u001b[38;5;28mself\u001b[39m, sample):\n\u001b[32m 167\u001b[39m \u001b[38;5;28mself\u001b[39m.check_type(sample, func_name=\u001b[33m\"\u001b[39m\u001b[33mrun\u001b[39m\u001b[33m\"\u001b[39m)\n\u001b[32m--> \u001b[39m\u001b[32m168\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_split_tuples_call\u001b[49m\u001b[43m(\u001b[49m\u001b[43msample\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m_function\u001b[49m\u001b[43m=\u001b[49m\u001b[33;43m\"\u001b[39;49m\u001b[33;43mrun\u001b[39;49m\u001b[33;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mallow_parallel\u001b[49m\u001b[43m=\u001b[49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m)\u001b[49m\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/projects/pyearthtools/named_pipelines/packages/pipeline/src/pyearthtools/pipeline/step.py:122\u001b[39m, in \u001b[36mPipelineStep._split_tuples_call\u001b[39m\u001b[34m(self, sample, _function, override_for_split, allow_parallel, **kwargs)\u001b[39m\n\u001b[32m 117\u001b[39m func = partial(\n\u001b[32m 118\u001b[39m \u001b[38;5;28mself\u001b[39m._split_tuples_call, _function=_function, override_for_split=\u001b[38;5;28mself\u001b[39m.recursively_split_tuples, **kwargs\n\u001b[32m 119\u001b[39m )\n\u001b[32m 120\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mtuple\u001b[39m(parallel_interface.collect(parallel_interface.map(func, sample)))\n\u001b[32m--> \u001b[39m\u001b[32m122\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m parallel_interface.collect(\u001b[43mparallel_interface\u001b[49m\u001b[43m.\u001b[49m\u001b[43msubmit\u001b[49m\u001b[43m(\u001b[49m\u001b[43mfunc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43msample\u001b[49m\u001b[43m)\u001b[49m)\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/projects/pyearthtools/named_pipelines/packages/pipeline/src/pyearthtools/pipeline/parallel.py:128\u001b[39m, in \u001b[36mSerialInterface.submit\u001b[39m\u001b[34m(self, func, *args, **kwargs)\u001b[39m\n\u001b[32m 127\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34msubmit\u001b[39m(\u001b[38;5;28mself\u001b[39m, func, *args, **kwargs):\n\u001b[32m--> \u001b[39m\u001b[32m128\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m FutureFaker(\u001b[43mfunc\u001b[49m\u001b[43m(\u001b[49m\u001b[43m*\u001b[49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m*\u001b[49m\u001b[43m*\u001b[49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m)\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/projects/pyearthtools/named_pipelines/packages/pipeline/src/pyearthtools/pipeline/operation.py:100\u001b[39m, in \u001b[36mOperation.run\u001b[39m\u001b[34m(self, sample)\u001b[39m\n\u001b[32m 99\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34mrun\u001b[39m(\u001b[38;5;28mself\u001b[39m, sample):\n\u001b[32m--> \u001b[39m\u001b[32m100\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mapply\u001b[49m\u001b[43m(\u001b[49m\u001b[43msample\u001b[49m\u001b[43m)\u001b[49m\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/projects/pyearthtools/named_pipelines/packages/pipeline/src/pyearthtools/pipeline/operation.py:123\u001b[39m, in \u001b[36mOperation.undo\u001b[39m\u001b[34m(self, sample)\u001b[39m\n\u001b[32m 119\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(sample, np.ndarray) \u001b[38;5;129;01mor\u001b[39;00m (\n\u001b[32m 120\u001b[39m \u001b[38;5;28misinstance\u001b[39m(sample, \u001b[38;5;28mtuple\u001b[39m) \u001b[38;5;129;01mand\u001b[39;00m \u001b[38;5;28many\u001b[39m(\u001b[38;5;28mmap\u001b[39m(\u001b[38;5;28;01mlambda\u001b[39;00m x: \u001b[38;5;28misinstance\u001b[39m(x, np.ndarray), sample))\n\u001b[32m 121\u001b[39m ):\n\u001b[32m 122\u001b[39m \u001b[38;5;28;01mwith\u001b[39;00m parallel.disable:\n\u001b[32m--> \u001b[39m\u001b[32m123\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_split_tuples_call\u001b[49m\u001b[43m(\u001b[49m\u001b[43msample\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m_function\u001b[49m\u001b[43m=\u001b[49m\u001b[33;43m\"\u001b[39;49m\u001b[33;43mundo_func\u001b[39;49m\u001b[33;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\n\u001b[32m 124\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mself\u001b[39m._split_tuples_call(sample, _function=\u001b[33m\"\u001b[39m\u001b[33mundo_func\u001b[39m\u001b[33m\"\u001b[39m)\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/projects/pyearthtools/named_pipelines/packages/pipeline/src/pyearthtools/pipeline/step.py:122\u001b[39m, in \u001b[36mPipelineStep._split_tuples_call\u001b[39m\u001b[34m(self, sample, _function, override_for_split, allow_parallel, **kwargs)\u001b[39m\n\u001b[32m 117\u001b[39m func = partial(\n\u001b[32m 118\u001b[39m \u001b[38;5;28mself\u001b[39m._split_tuples_call, _function=_function, override_for_split=\u001b[38;5;28mself\u001b[39m.recursively_split_tuples, **kwargs\n\u001b[32m 119\u001b[39m )\n\u001b[32m 120\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mtuple\u001b[39m(parallel_interface.collect(parallel_interface.map(func, sample)))\n\u001b[32m--> \u001b[39m\u001b[32m122\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m parallel_interface.collect(\u001b[43mparallel_interface\u001b[49m\u001b[43m.\u001b[49m\u001b[43msubmit\u001b[49m\u001b[43m(\u001b[49m\u001b[43mfunc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43msample\u001b[49m\u001b[43m)\u001b[49m)\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/projects/pyearthtools/named_pipelines/packages/pipeline/src/pyearthtools/pipeline/parallel.py:128\u001b[39m, in \u001b[36mSerialInterface.submit\u001b[39m\u001b[34m(self, func, *args, **kwargs)\u001b[39m\n\u001b[32m 127\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34msubmit\u001b[39m(\u001b[38;5;28mself\u001b[39m, func, *args, **kwargs):\n\u001b[32m--> \u001b[39m\u001b[32m128\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m FutureFaker(\u001b[43mfunc\u001b[49m\u001b[43m(\u001b[49m\u001b[43m*\u001b[49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m*\u001b[49m\u001b[43m*\u001b[49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m)\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/projects/pyearthtools/named_pipelines/packages/pipeline/src/pyearthtools/pipeline/operations/xarray/conversion.py:136\u001b[39m, in \u001b[36mToNumpy.undo_func\u001b[39m\u001b[34m(self, sample)\u001b[39m\n\u001b[32m 135\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34mundo_func\u001b[39m(\u001b[38;5;28mself\u001b[39m, sample: Union[\u001b[38;5;28mtuple\u001b[39m[np.ndarray, ...], np.ndarray]):\n\u001b[32m--> \u001b[39m\u001b[32m136\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_get_converters\u001b[49m\u001b[43m(\u001b[49m\u001b[32;43m1\u001b[39;49m\u001b[43m)\u001b[49m\u001b[43m[\u001b[49m\u001b[32;43m0\u001b[39;49m\u001b[43m]\u001b[49m\u001b[43m.\u001b[49m\u001b[43mconvert_to_xarray\u001b[49m\u001b[43m(\u001b[49m\u001b[43msample\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mpop\u001b[49m\u001b[43m=\u001b[49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m)\u001b[49m\n", + "\u001b[36mFile \u001b[39m\u001b[32m~/projects/pyearthtools/named_pipelines/packages/utils/src/pyearthtools/utils/data/converter.py:398\u001b[39m, in \u001b[36mNumpyConverter.convert_to_xarray\u001b[39m\u001b[34m(self, data, pop)\u001b[39m\n\u001b[32m 378\u001b[39m \u001b[38;5;250m\u001b[39m\u001b[33;03m\"\"\"\u001b[39;00m\n\u001b[32m 379\u001b[39m \u001b[33;03mConvert [array/s][numpy.ndarray] into [Dataset/s][xarray.Dataset] inferring metadata from saved records.\u001b[39;00m\n\u001b[32m 380\u001b[39m \n\u001b[32m (...)\u001b[39m\u001b[32m 395\u001b[39m \u001b[33;03m Rebuilt [Dataset/s][xarray.Dataset]\u001b[39;00m\n\u001b[32m 396\u001b[39m \u001b[33;03m\"\"\"\u001b[39;00m\n\u001b[32m 397\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28mself\u001b[39m._records:\n\u001b[32m--> \u001b[39m\u001b[32m398\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mRuntimeError\u001b[39;00m(\u001b[33m\"\u001b[39m\u001b[33mData hasn\u001b[39m\u001b[33m'\u001b[39m\u001b[33mt been converted to arrays with this. So data cannot be undone\u001b[39m\u001b[33m\"\u001b[39m)\n\u001b[32m 400\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(data, (\u001b[38;5;28mtuple\u001b[39m, \u001b[38;5;28mlist\u001b[39m)):\n\u001b[32m 401\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m pop:\n", + "\u001b[31mRuntimeError\u001b[39m: Data hasn't been converted to arrays with this. So data cannot be undone" + ] + } + ], + "source": [ + "# FIXME\n", + "(prepare | reshape | reshape.reversed)[\"20210101T00\"]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d7b8ac7d-6dcd-430c-bb25-2514bfdc54d1", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} From 4220a434f19ea58799425e3ce9f20aed0fab54f2 Mon Sep 17 00:00:00 2001 From: Maxime RIO Date: Sun, 21 Sep 2025 23:19:21 +1200 Subject: [PATCH 08/16] Revert "store pipeline name in steps" This reverts commit d8ca63fe45c6c8d943595064ea678355c3c1a47b. Save nested pipelines in a dictionary. --- notebooks/pipeline/Extras.ipynb | 604 +++++++++--------- .../src/pyearthtools/pipeline/controller.py | 47 +- .../src/pyearthtools/pipeline/step.py | 4 - 3 files changed, 324 insertions(+), 331 deletions(-) diff --git a/notebooks/pipeline/Extras.ipynb b/notebooks/pipeline/Extras.ipynb index fdce7527..ef461fe2 100644 --- a/notebooks/pipeline/Extras.ipynb +++ b/notebooks/pipeline/Extras.ipynb @@ -458,7 +458,7 @@ "\t\t idx_modification.TemporalRetrieval {'TemporalRetrieval': {'concat': 'True', 'delta_unit': 'None', 'merge_function': 'None', 'merge_kwargs': 'None', 'samples': '((0, 1), (6, 1))'}}\n", "\t\t conversion.ToNumpy {'ToNumpy': {'reference_dataset': 'None', 'run_parallel': 'False', 'saved_records': 'None', 'warn': 'True'}}\n", "\t\t reshape.Rearrange {'Rearrange': {'rearrange': "'c t h w -> t c h w'", 'rearrange_kwargs': 'None', 'reverse_rearrange': 'None', 'skip': 'False'}}\n", - "\t\t reshape.Squeeze {'Squeeze': {'axis': '0'}}" + "\t\t reshape.Squeeze {'Squeeze': {'axis': '0'}}" ], "text/plain": [ "" @@ -488,105 +488,105 @@ "\n", "\n", - "\n", + "\n", "\n", - "\n", - "\n", + "\n", + "\n", "\n", - "WB2ERA5_1ad8d626-eae0-4784-bac9-1a894917b79f\n", - "\n", - "weatherbench.WB2ERA5\n", + "WB2ERA5_c8586c2b-310d-408d-b532-53e263be5112\n", + "\n", + "weatherbench.WB2ERA5\n", "\n", - "\n", + "\n", "\n", - "Sort_f77d5219-61cd-4ff5-be96-35b2c59ebb55\n", - "\n", - "sort.Sort\n", + "Sort_b97b8195-eb3d-424d-95d9-934e3b64a223\n", + "\n", + "sort.Sort\n", "\n", - "\n", + "\n", "\n", - "WB2ERA5_1ad8d626-eae0-4784-bac9-1a894917b79f->Sort_f77d5219-61cd-4ff5-be96-35b2c59ebb55\n", - "\n", - "\n", + "WB2ERA5_c8586c2b-310d-408d-b532-53e263be5112->Sort_b97b8195-eb3d-424d-95d9-934e3b64a223\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "StandardLongitude_ba7533b2-5052-46b3-a688-d8c8652f5b80\n", - "\n", - "coordinates.StandardLongitude\n", + "StandardLongitude_e78f1c81-e60f-4d48-a7c9-96b98df95da1\n", + "\n", + "coordinates.StandardLongitude\n", "\n", - "\n", + "\n", "\n", - "Sort_f77d5219-61cd-4ff5-be96-35b2c59ebb55->StandardLongitude_ba7533b2-5052-46b3-a688-d8c8652f5b80\n", - "\n", - "\n", + "Sort_b97b8195-eb3d-424d-95d9-934e3b64a223->StandardLongitude_e78f1c81-e60f-4d48-a7c9-96b98df95da1\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "CoordinateFlatten_6226876f-4298-4131-b854-26721df3fd95\n", - "\n", - "reshape.CoordinateFlatten\n", + "CoordinateFlatten_8b97194d-3898-44bf-92f9-fec123abb01f\n", + "\n", + "reshape.CoordinateFlatten\n", "\n", - "\n", + "\n", "\n", - "StandardLongitude_ba7533b2-5052-46b3-a688-d8c8652f5b80->CoordinateFlatten_6226876f-4298-4131-b854-26721df3fd95\n", - "\n", - "\n", + "StandardLongitude_e78f1c81-e60f-4d48-a7c9-96b98df95da1->CoordinateFlatten_8b97194d-3898-44bf-92f9-fec123abb01f\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "TemporalRetrieval_5436c12e-0a24-42d0-a3f7-804670686978\n", - "\n", - "idx_modification.TemporalRetrieval\n", + "TemporalRetrieval_febc1f4b-36f6-464e-9510-53727ea63844\n", + "\n", + "idx_modification.TemporalRetrieval\n", "\n", - "\n", + "\n", "\n", - "CoordinateFlatten_6226876f-4298-4131-b854-26721df3fd95->TemporalRetrieval_5436c12e-0a24-42d0-a3f7-804670686978\n", - "\n", - "\n", + "CoordinateFlatten_8b97194d-3898-44bf-92f9-fec123abb01f->TemporalRetrieval_febc1f4b-36f6-464e-9510-53727ea63844\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "ToNumpy_96533c63-4543-4056-a3bb-49cc0e1d65e4\n", - "\n", - "conversion.ToNumpy\n", + "ToNumpy_008a7250-4bb6-441f-b39b-39c90be0e9d4\n", + "\n", + "conversion.ToNumpy\n", "\n", - "\n", + "\n", "\n", - "TemporalRetrieval_5436c12e-0a24-42d0-a3f7-804670686978->ToNumpy_96533c63-4543-4056-a3bb-49cc0e1d65e4\n", - "\n", - "\n", + "TemporalRetrieval_febc1f4b-36f6-464e-9510-53727ea63844->ToNumpy_008a7250-4bb6-441f-b39b-39c90be0e9d4\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "Rearrange_be57010e-b32d-4397-97bf-72fbafcfa9b8\n", - "\n", - "reshape.Rearrange\n", + "Rearrange_e5bb8792-3916-4133-a5be-b66c2c1153d7\n", + "\n", + "reshape.Rearrange\n", "\n", - "\n", + "\n", "\n", - "ToNumpy_96533c63-4543-4056-a3bb-49cc0e1d65e4->Rearrange_be57010e-b32d-4397-97bf-72fbafcfa9b8\n", - "\n", - "\n", + "ToNumpy_008a7250-4bb6-441f-b39b-39c90be0e9d4->Rearrange_e5bb8792-3916-4133-a5be-b66c2c1153d7\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "Squeeze_7546bef8-44a2-4946-8c49-17e10681afd0\n", - "\n", - "reshape.Squeeze\n", + "Squeeze_a26a582c-5abd-4a83-aa9c-9cc280feb47d\n", + "\n", + "reshape.Squeeze\n", "\n", - "\n", + "\n", "\n", - "Rearrange_be57010e-b32d-4397-97bf-72fbafcfa9b8->Squeeze_7546bef8-44a2-4946-8c49-17e10681afd0\n", - "\n", - "\n", + "Rearrange_e5bb8792-3916-4133-a5be-b66c2c1153d7->Squeeze_a26a582c-5abd-4a83-aa9c-9cc280feb47d\n", + "\n", + "\n", "\n", "\n", "\n" ], "text/plain": [ - "" + "" ] }, "metadata": {}, @@ -1041,7 +1041,7 @@ "\t\t idx_modification.TemporalRetrieval {'TemporalRetrieval': {'concat': 'True', 'delta_unit': 'None', 'merge_function': 'None', 'merge_kwargs': 'None', 'samples': '((0, 1), (6, 1))'}}\n", "\t\t conversion.ToNumpy {'ToNumpy': {'reference_dataset': 'None', 'run_parallel': 'False', 'saved_records': 'None', 'warn': 'True'}}\n", "\t\t reshape.Rearrange {'Rearrange': {'rearrange': "'c t h w -> t c h w'", 'rearrange_kwargs': 'None', 'reverse_rearrange': 'None', 'skip': 'False'}}\n", - "\t\t reshape.Squeeze {'Squeeze': {'axis': '0'}}" + "\t\t reshape.Squeeze {'Squeeze': {'axis': '0'}}" ], "text/plain": [ "" @@ -1071,105 +1071,105 @@ "\n", "\n", - "\n", + "\n", "\n", - "\n", - "\n", + "\n", + "\n", "\n", - "WB2ERA5_12058809-2d16-40b4-991a-939db79c0285\n", - "\n", - "weatherbench.WB2ERA5\n", + "WB2ERA5_337a1927-bb4a-464b-8a0b-07decb24e5cd\n", + "\n", + "weatherbench.WB2ERA5\n", "\n", - "\n", + "\n", "\n", - "Sort_9ab339c0-d2e0-4626-ad0b-bdb57aa07c29\n", - "\n", - "sort.Sort\n", + "Sort_307ab5f0-7519-4fd9-8d54-9b61a7e43230\n", + "\n", + "sort.Sort\n", "\n", - "\n", + "\n", "\n", - "WB2ERA5_12058809-2d16-40b4-991a-939db79c0285->Sort_9ab339c0-d2e0-4626-ad0b-bdb57aa07c29\n", - "\n", - "\n", + "WB2ERA5_337a1927-bb4a-464b-8a0b-07decb24e5cd->Sort_307ab5f0-7519-4fd9-8d54-9b61a7e43230\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "StandardLongitude_1159f9e4-f9da-4801-b103-51ded230ae11\n", - "\n", - "coordinates.StandardLongitude\n", + "StandardLongitude_5e11610e-4b3e-46e9-97e0-42918a60f610\n", + "\n", + "coordinates.StandardLongitude\n", "\n", - "\n", + "\n", "\n", - "Sort_9ab339c0-d2e0-4626-ad0b-bdb57aa07c29->StandardLongitude_1159f9e4-f9da-4801-b103-51ded230ae11\n", - "\n", - "\n", + "Sort_307ab5f0-7519-4fd9-8d54-9b61a7e43230->StandardLongitude_5e11610e-4b3e-46e9-97e0-42918a60f610\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "CoordinateFlatten_32a0b446-d668-43c1-93e7-18c1b4e23978\n", - "\n", - "reshape.CoordinateFlatten\n", + "CoordinateFlatten_f8548acd-f863-40a0-a6b1-c47c32d4a436\n", + "\n", + "reshape.CoordinateFlatten\n", "\n", - "\n", + "\n", "\n", - "StandardLongitude_1159f9e4-f9da-4801-b103-51ded230ae11->CoordinateFlatten_32a0b446-d668-43c1-93e7-18c1b4e23978\n", - "\n", - "\n", + "StandardLongitude_5e11610e-4b3e-46e9-97e0-42918a60f610->CoordinateFlatten_f8548acd-f863-40a0-a6b1-c47c32d4a436\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "TemporalRetrieval_e35bf2b1-16b6-4cac-bcbf-11e3feaab0b5\n", - "\n", - "idx_modification.TemporalRetrieval\n", + "TemporalRetrieval_5337416e-49d9-41c7-a3f8-2146b4ed1aa5\n", + "\n", + "idx_modification.TemporalRetrieval\n", "\n", - "\n", + "\n", "\n", - "CoordinateFlatten_32a0b446-d668-43c1-93e7-18c1b4e23978->TemporalRetrieval_e35bf2b1-16b6-4cac-bcbf-11e3feaab0b5\n", - "\n", - "\n", + "CoordinateFlatten_f8548acd-f863-40a0-a6b1-c47c32d4a436->TemporalRetrieval_5337416e-49d9-41c7-a3f8-2146b4ed1aa5\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "ToNumpy_f1a52042-2e34-4799-9cf3-48186c6dbe09\n", - "\n", - "conversion.ToNumpy\n", + "ToNumpy_d1fad22e-fb4f-4492-a124-a23c3664c7fe\n", + "\n", + "conversion.ToNumpy\n", "\n", - "\n", + "\n", "\n", - "TemporalRetrieval_e35bf2b1-16b6-4cac-bcbf-11e3feaab0b5->ToNumpy_f1a52042-2e34-4799-9cf3-48186c6dbe09\n", - "\n", - "\n", + "TemporalRetrieval_5337416e-49d9-41c7-a3f8-2146b4ed1aa5->ToNumpy_d1fad22e-fb4f-4492-a124-a23c3664c7fe\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "Rearrange_78c02cd7-3cad-4548-8c37-5e0c4ba0131e\n", - "\n", - "reshape.Rearrange\n", + "Rearrange_b7a1e109-a2aa-461b-a2c4-c063ed7b323c\n", + "\n", + "reshape.Rearrange\n", "\n", - "\n", + "\n", "\n", - "ToNumpy_f1a52042-2e34-4799-9cf3-48186c6dbe09->Rearrange_78c02cd7-3cad-4548-8c37-5e0c4ba0131e\n", - "\n", - "\n", + "ToNumpy_d1fad22e-fb4f-4492-a124-a23c3664c7fe->Rearrange_b7a1e109-a2aa-461b-a2c4-c063ed7b323c\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "Squeeze_99f80f77-14a6-462a-bfce-8aed73af8f44\n", - "\n", - "reshape.Squeeze\n", + "Squeeze_48621fd2-d820-40d7-81ee-5385da537e81\n", + "\n", + "reshape.Squeeze\n", "\n", - "\n", + "\n", "\n", - "Rearrange_78c02cd7-3cad-4548-8c37-5e0c4ba0131e->Squeeze_99f80f77-14a6-462a-bfce-8aed73af8f44\n", - "\n", - "\n", + "Rearrange_b7a1e109-a2aa-461b-a2c4-c063ed7b323c->Squeeze_48621fd2-d820-40d7-81ee-5385da537e81\n", + "\n", + "\n", "\n", "\n", "\n" ], "text/plain": [ - "" + "" ] }, "metadata": {}, @@ -1638,13 +1638,13 @@ "\tInitialisation \n", "\t\t exceptions_to_ignore None\n", "\t\t iterator None\n", - "\t\t name None\n", + "\t\t name 'prepare'\n", "\t\t sampler None\n", "\tSteps \n", "\t\t weatherbench.WB2ERA5 {'WB2ERA5': {'level': '[850]', 'license_ok': 'True', 'resolution': "'64x32'", 'variables': "['2m_temperature', 'u', 'v', 'geopotential', 'vorticity']"}}\n", "\t\t sort.Sort {'Sort': {'order': "['2m_temperature', 'u_component_of_wind', 'v_component_of_wind', 'vorticity', 'geopotential']", 'strict': 'False'}}\n", "\t\t coordinates.StandardLongitude {'StandardLongitude': {'longitude_name': "'longitude'", 'type': "'0-360'"}}\n", - "\t\t reshape.CoordinateFlatten {'CoordinateFlatten': {'__args': '()', 'coordinate': "'level'", 'skip_missing': 'False'}}" + "\t\t reshape.CoordinateFlatten {'CoordinateFlatten': {'__args': '()', 'coordinate': "'level'", 'skip_missing': 'False'}}" ], "text/plain": [ "" @@ -1674,57 +1674,57 @@ "\n", "\n", - "\n", + "\n", "\n", - "\n", - "\n", + "\n", + "\n", "\n", - "WB2ERA5_6afe4e3a-07bb-48c3-9b86-b69a478a49c7\n", - "\n", - "weatherbench.WB2ERA5\n", + "WB2ERA5_7741aacd-d885-422f-b093-1546398f1337\n", + "\n", + "weatherbench.WB2ERA5\n", "\n", - "\n", + "\n", "\n", - "Sort_8400dbbd-07c8-4faf-bd17-3adcc91be05f\n", - "\n", - "sort.Sort\n", + "Sort_858cf3cf-fda6-44f4-8b34-37fc52aafb47\n", + "\n", + "sort.Sort\n", "\n", - "\n", + "\n", "\n", - "WB2ERA5_6afe4e3a-07bb-48c3-9b86-b69a478a49c7->Sort_8400dbbd-07c8-4faf-bd17-3adcc91be05f\n", - "\n", - "\n", + "WB2ERA5_7741aacd-d885-422f-b093-1546398f1337->Sort_858cf3cf-fda6-44f4-8b34-37fc52aafb47\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "StandardLongitude_7e740e0c-9095-44b6-9829-198e8b899653\n", - "\n", - "coordinates.StandardLongitude\n", + "StandardLongitude_b6474de2-95ed-4a50-8e1c-3d5d011a9e9c\n", + "\n", + "coordinates.StandardLongitude\n", "\n", - "\n", + "\n", "\n", - "Sort_8400dbbd-07c8-4faf-bd17-3adcc91be05f->StandardLongitude_7e740e0c-9095-44b6-9829-198e8b899653\n", - "\n", - "\n", + "Sort_858cf3cf-fda6-44f4-8b34-37fc52aafb47->StandardLongitude_b6474de2-95ed-4a50-8e1c-3d5d011a9e9c\n", + "\n", + "\n", "\n", - "\n", + "\n", "\n", - "CoordinateFlatten_51e780b3-3b56-490d-984c-9076de407742\n", - "\n", - "reshape.CoordinateFlatten\n", + "CoordinateFlatten_71561a57-ad5d-48f4-926e-8a7146fb936f\n", + "\n", + "reshape.CoordinateFlatten\n", "\n", - "\n", + "\n", "\n", - "StandardLongitude_7e740e0c-9095-44b6-9829-198e8b899653->CoordinateFlatten_51e780b3-3b56-490d-984c-9076de407742\n", - "\n", - "\n", + "StandardLongitude_b6474de2-95ed-4a50-8e1c-3d5d011a9e9c->CoordinateFlatten_71561a57-ad5d-48f4-926e-8a7146fb936f\n", + "\n", + "\n", "\n", "\n", "\n" ], "text/plain": [ - "" + "" ] }, "metadata": {}, @@ -2208,18 +2208,18 @@ " vorticity850 (time, longitude, latitude) float32 8kB dask.array<chunksize=(1, 64, 32), meta=np.ndarray>\n", " geopotential850 (time, longitude, latitude) float32 8kB dask.array<chunksize=(1, 64, 32), meta=np.ndarray>\n", "Attributes:\n", - " level-dtype: int64