diff --git a/docs/api/pipeline/pipeline_api.md b/docs/api/pipeline/pipeline_api.md index 6de27751..f1ef7d34 100644 --- a/docs/api/pipeline/pipeline_api.md +++ b/docs/api/pipeline/pipeline_api.md @@ -10,6 +10,8 @@ :members: .. autoclass:: pyearthtools.pipeline.Operation :members: +.. autoclass:: pyearthtools.pipeline.ReversedPipeline + :members: .. autoclass:: pyearthtools.pipeline.PipelineException :members: .. autoclass:: pyearthtools.pipeline.PipelineFilterException diff --git a/docs/api/pipeline/pipeline_index.md b/docs/api/pipeline/pipeline_index.md index 5dc215d0..f053df43 100644 --- a/docs/api/pipeline/pipeline_index.md +++ b/docs/api/pipeline/pipeline_index.md @@ -6,9 +6,10 @@ The rest of this page contains reference information for the components of the P | Module | Purpose | API Docs | |----------------------|--------------------------------------|--------------------------------------------------------------------------------------------------------------------| -| `pipeline` | | - [Sampler](pipeline_api.md#pyearthtools.pipeline.Sampler) | +| `pipeline` | | - [Sampler](pipeline_api.md#pyearthtools.pipeline.Sampler) | | | | - [Pipeline](pipeline_api.md#pyearthtools.pipeline.Pipeline) | | | | - [Operation](pipeline_api.md#pyearthtools.pipeline.Operation) | +| | | - [ReversedPipeline](pipeline_api.md#pyearthtools.pipeline.ReversedPipeline) | | | | - [PipelineException](pipeline_api.md#pyearthtools.pipeline.PipelineException) | | | | - [PipelineFilterException](pipeline_api.md#pyearthtools.pipeline.PipelineFilterException) | | | | - [PipelineRuntimeError](pipeline_api.md#pyearthtools.pipeline.PipelineRuntimeError) | diff --git a/notebooks/Gallery.ipynb b/notebooks/Gallery.ipynb index e6b3495d..62b27fed 100644 --- a/notebooks/Gallery.ipynb +++ b/notebooks/Gallery.ipynb @@ -115,7 +115,8 @@ "| Basics | Introduction to what a pipeline is (essential reading) | [Pipeline Basics](./pipeline/Basics.ipynb) | 18 Aug 2025 |\n", "| Operations | Introduction to pipeline operations | [Pipeline Operations](./pipeline/Operations.ipynb) | 18 Aug 2025 |\n", "| Modifications | Introduction to pipeline modifications | [Pipeline Modifications](./pipeline/Modifications.ipynb) | 22 Aug 2025 |\n", - "| Branching | -- | [Pipeline Branching](./pipeline/Branching.ipynb) | 18 Aug 2025 |\n" + "| Branching | -- | [Pipeline Branching](./pipeline/Branching.ipynb) | 18 Aug 2025 |\n", + "| Patterns | Recommended design patterns for pipelines | [Additional Pipeline Syntaxes](./pipeline/Patterns.ipynb) | 21 Oct 2025 |\n" ] } ], @@ -135,7 +136,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.13.5" + "version": "3.13.7" } }, "nbformat": 4, diff --git a/notebooks/pipeline/Patterns.ipynb b/notebooks/pipeline/Patterns.ipynb new file mode 100644 index 00000000..c543132b --- /dev/null +++ b/notebooks/pipeline/Patterns.ipynb @@ -0,0 +1,6054 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "63287ab4-74ec-4af2-9d70-5ed866ff1798", + "metadata": {}, + "source": [ + "# Additional Pipeline Syntaxes\n", + "\n", + "This notebooks introduces syntaxes to ease creation and manipulation of `Pipeline`objects:\n", + "\n", + "- named pipelines,\n", + "- combination using `|` operation,\n", + "- reversing pipelines." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "cbe5ebfb-f820-4b77-ad0f-91d507a5195f", + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "\n", + "def repr_ndarray(arr):\n", + " return f\"array(..., shape={arr.shape}, dtype={arr.dtype})\"\n", + "\n", + "np.set_printoptions(override_repr=repr_ndarray)" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "160c063a-de2e-4a28-a22d-d156d4bc06f3", + "metadata": {}, + "outputs": [], + "source": [ + "import pyearthtools.data\n", + "import pyearthtools.pipeline" + ] + }, + { + "cell_type": "markdown", + "id": "964e9683-2999-4a8b-8a1f-407dc24d4ffa", + "metadata": {}, + "source": [ + "To illustrate these features, we'll reuse the same pipeline as the one used in the [End-to-end CNN Training Example](../tutorial/CNN-Model-Training.ipynb).\n", + "Here is the original definition of the pipeline." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "0b273573-99d1-4c8e-834d-7c0183674774", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
Pipeline\n",
+ "\tDescription `pyearthtools.pipeline` Data Pipeline\n",
+ "\n",
+ "\n",
+ "\tInitialisation \n",
+ "\t\t exceptions_to_ignore None\n",
+ "\t\t iterator None\n",
+ "\t\t name None\n",
+ "\t\t sampler None\n",
+ "\tSteps \n",
+ "\t\t weatherbench.WB2ERA5 {'WB2ERA5': {'level': '[850]', 'license_ok': 'True', 'resolution': "'64x32'", 'variables': "['2m_temperature', 'u', 'v', 'geopotential', 'vorticity']"}}\n",
+ "\t\t sort.Sort {'Sort': {'order': "['2m_temperature', 'u_component_of_wind', 'v_component_of_wind', 'vorticity', 'geopotential']", 'strict': 'False'}}\n",
+ "\t\t coordinates.StandardLongitude {'StandardLongitude': {'longitude_name': "'longitude'", 'type': "'0-360'"}}\n",
+ "\t\t reshape.CoordinateFlatten {'CoordinateFlatten': {'coordinate': "['level']", 'skip_missing': 'False'}}\n",
+ "\t\t idx_modification.TemporalRetrieval {'TemporalRetrieval': {'concat': 'True', 'delta_unit': 'None', 'merge_function': 'None', 'merge_kwargs': 'None', 'samples': '((0, 1), (6, 1))'}}\n",
+ "\t\t conversion.ToNumpy {'ToNumpy': {'reference_dataset': 'None', 'run_parallel': 'False', 'saved_records': 'None', 'warn': 'True'}}\n",
+ "\t\t reshape.Rearrange {'Rearrange': {'rearrange': "'c t h w -> t c h w'", 'rearrange_kwargs': 'None', 'reverse_rearrange': 'None', 'skip': 'False'}}\n",
+ "\t\t reshape.Squeeze {'Squeeze': {'axis': '0'}}Pipeline\n",
+ "\tDescription `pyearthtools.pipeline` Data Pipeline\n",
+ "\n",
+ "\n",
+ "\tInitialisation \n",
+ "\t\t exceptions_to_ignore None\n",
+ "\t\t iterator None\n",
+ "\t\t name None\n",
+ "\t\t sampler None\n",
+ "\tSteps \n",
+ "\t\t weatherbench.WB2ERA5 {'WB2ERA5': {'level': '[850]', 'license_ok': 'True', 'resolution': "'64x32'", 'variables': "['2m_temperature', 'u', 'v', 'geopotential', 'vorticity']"}}\n",
+ "\t\t sort.Sort {'Sort': {'order': "['2m_temperature', 'u_component_of_wind', 'v_component_of_wind', 'vorticity', 'geopotential']", 'strict': 'False'}}\n",
+ "\t\t coordinates.StandardLongitude {'StandardLongitude': {'longitude_name': "'longitude'", 'type': "'0-360'"}}\n",
+ "\t\t reshape.CoordinateFlatten {'CoordinateFlatten': {'coordinate': "['level']", 'skip_missing': 'False'}}\n",
+ "\t\t idx_modification.TemporalRetrieval {'TemporalRetrieval': {'concat': 'True', 'delta_unit': 'None', 'merge_function': 'None', 'merge_kwargs': 'None', 'samples': '((0, 1), (6, 1))'}}\n",
+ "\t\t conversion.ToNumpy {'ToNumpy': {'reference_dataset': 'None', 'run_parallel': 'False', 'saved_records': 'None', 'warn': 'True'}}\n",
+ "\t\t reshape.Rearrange {'Rearrange': {'rearrange': "'c t h w -> t c h w'", 'rearrange_kwargs': 'None', 'reverse_rearrange': 'None', 'skip': 'False'}}\n",
+ "\t\t reshape.Squeeze {'Squeeze': {'axis': '0'}}Pipeline\n",
+ "\tDescription `pyearthtools.pipeline` Data Pipeline\n",
+ "\n",
+ "\n",
+ "\tInitialisation \n",
+ "\t\t exceptions_to_ignore None\n",
+ "\t\t iterator None\n",
+ "\t\t name 'prepare'\n",
+ "\t\t sampler None\n",
+ "\tSteps \n",
+ "\t\t weatherbench.WB2ERA5 {'WB2ERA5': {'level': '[850]', 'license_ok': 'True', 'resolution': "'64x32'", 'variables': "['2m_temperature', 'u', 'v', 'geopotential', 'vorticity']"}}\n",
+ "\t\t sort.Sort {'Sort': {'order': "['2m_temperature', 'u_component_of_wind', 'v_component_of_wind', 'vorticity', 'geopotential']", 'strict': 'False'}}\n",
+ "\t\t coordinates.StandardLongitude {'StandardLongitude': {'longitude_name': "'longitude'", 'type': "'0-360'"}}\n",
+ "\t\t reshape.CoordinateFlatten {'CoordinateFlatten': {'coordinate': "['level']", 'skip_missing': 'False'}}<xarray.Dataset> Size: 42kB\n", + "Dimensions: (latitude: 32, longitude: 64, time: 1)\n", + "Coordinates:\n", + " * latitude (latitude) float64 256B -87.19 -81.56 ... 87.19\n", + " * longitude (longitude) float64 512B 0.0 5.625 ... 348.8 354.4\n", + " * time (time) datetime64[ns] 8B 2021-01-01\n", + "Data variables:\n", + " 2m_temperature (time, longitude, latitude) float32 8kB dask.array<chunksize=(1, 64, 32), meta=np.ndarray>\n", + " u_component_of_wind850 (time, longitude, latitude) float32 8kB dask.array<chunksize=(1, 64, 32), meta=np.ndarray>\n", + " v_component_of_wind850 (time, longitude, latitude) float32 8kB dask.array<chunksize=(1, 64, 32), meta=np.ndarray>\n", + " vorticity850 (time, longitude, latitude) float32 8kB dask.array<chunksize=(1, 64, 32), meta=np.ndarray>\n", + " geopotential850 (time, longitude, latitude) float32 8kB dask.array<chunksize=(1, 64, 32), meta=np.ndarray>\n", + "Attributes:\n", + " level-dtype: int64
Pipeline\n",
+ "\tDescription `pyearthtools.pipeline` Data Pipeline\n",
+ "\n",
+ "\n",
+ "\tInitialisation \n",
+ "\t\t exceptions_to_ignore None\n",
+ "\t\t iterator None\n",
+ "\t\t name None\n",
+ "\t\t sampler None\n",
+ "\tSteps \n",
+ "\t\t weatherbench.WB2ERA5 {'WB2ERA5': {'level': '[850]', 'license_ok': 'True', 'resolution': "'64x32'", 'variables': "['2m_temperature', 'u', 'v', 'geopotential', 'vorticity']"}}\n",
+ "\t\t sort.Sort {'Sort': {'order': "['2m_temperature', 'u_component_of_wind', 'v_component_of_wind', 'vorticity', 'geopotential']", 'strict': 'False'}}\n",
+ "\t\t coordinates.StandardLongitude {'StandardLongitude': {'longitude_name': "'longitude'", 'type': "'0-360'"}}\n",
+ "\t\t reshape.CoordinateFlatten {'CoordinateFlatten': {'coordinate': "['level']", 'skip_missing': 'False'}}\n",
+ "\t\t idx_modification.TemporalRetrieval {'TemporalRetrieval': {'concat': 'True', 'delta_unit': 'None', 'merge_function': 'None', 'merge_kwargs': 'None', 'samples': '((0, 1), (6, 1))'}}\n",
+ "\t\t conversion.ToNumpy {'ToNumpy': {'reference_dataset': 'None', 'run_parallel': 'False', 'saved_records': 'None', 'warn': 'True'}}\n",
+ "\t\t reshape.Rearrange {'Rearrange': {'rearrange': "'c t h w -> t c h w'", 'rearrange_kwargs': 'None', 'reverse_rearrange': 'None', 'skip': 'False'}}\n",
+ "\t\t reshape.Squeeze {'Squeeze': {'axis': '0'}}Pipeline\n",
+ "\tDescription `pyearthtools.pipeline` Data Pipeline\n",
+ "\n",
+ "\n",
+ "\tInitialisation \n",
+ "\t\t exceptions_to_ignore None\n",
+ "\t\t iterator None\n",
+ "\t\t name 'reshape'\n",
+ "\t\t sampler None\n",
+ "\tSteps \n",
+ "\t\t conversion.ToNumpy {'ToNumpy': {'reference_dataset': 'None', 'run_parallel': 'False', 'saved_records': 'None', 'warn': 'True'}}\n",
+ "\t\t reshape.Rearrange {'Rearrange': {'rearrange': "'c t h w -> t c h w'", 'rearrange_kwargs': 'None', 'reverse_rearrange': 'None', 'skip': 'False'}}\n",
+ "\t\t reshape.Squeeze {'Squeeze': {'axis': '0'}}ReversedPipeline\n",
+ "\tInitialisation Operation reversing the effect of pipeline\n",
+ "\t\t forward_pipeline {'Pipeline': {'__args': "(ToNumpy\\n\\tInitialisation Convert xarray objects to np.ndarray's\\n\\t\\t reference_dataset None\\n\\t\\t run_parallel False\\n\\t\\t saved_records None\\n\\t\\t warn True, Rearrange\\n\\tInitialisation Operation to rearrange data using einops\\n\\t\\t rearrange 'c t h w -> t c h w'\\n\\t\\t rearrange_kwargs None\\n\\t\\t reverse_rearrange None\\n\\t\\t skip False, Squeeze\\n\\tInitialisation Operation to Squeeze one-Dimensional axes at 'axis' location\\n\\t\\t axis 0)", 'exceptions_to_ignore': 'None', 'iterator': 'None', 'name': "'reshape'", 'sampler': 'None'}}<xarray.Dataset> Size: 42kB\n", + "Dimensions: (time: 1, longitude: 64, latitude: 32)\n", + "Coordinates:\n", + " * time (time) datetime64[ns] 8B 2021-01-01\n", + " * longitude (longitude) float64 512B 0.0 5.625 ... 348.8 354.4\n", + " * latitude (latitude) float64 256B -87.19 -81.56 ... 87.19\n", + "Data variables:\n", + " 2m_temperature (time, longitude, latitude) float32 8kB 241.9 ......\n", + " u_component_of_wind850 (time, longitude, latitude) float32 8kB -3.03 ......\n", + " v_component_of_wind850 (time, longitude, latitude) float32 8kB -0.7963 ....\n", + " vorticity850 (time, longitude, latitude) float32 8kB -1.147e-0...\n", + " geopotential850 (time, longitude, latitude) float32 8kB 1.117e+04...\n", + "Attributes:\n", + " level-dtype: int64
Pipeline\n",
+ "\tDescription `pyearthtools.pipeline` Data Pipeline\n",
+ "\n",
+ "\n",
+ "\tInitialisation \n",
+ "\t\t exceptions_to_ignore None\n",
+ "\t\t iterator None\n",
+ "\t\t name None\n",
+ "\t\t sampler None\n",
+ "\tSteps \n",
+ "\t\t weatherbench.WB2ERA5 {'WB2ERA5': {'level': '[850]', 'license_ok': 'True', 'resolution': "'64x32'", 'variables': "['2m_temperature', 'u', 'v', 'geopotential', 'vorticity']"}}\n",
+ "\t\t sort.Sort {'Sort': {'order': "['2m_temperature', 'u_component_of_wind', 'v_component_of_wind', 'vorticity', 'geopotential']", 'strict': 'False'}}\n",
+ "\t\t coordinates.StandardLongitude {'StandardLongitude': {'longitude_name': "'longitude'", 'type': "'0-360'"}}\n",
+ "\t\t reshape.CoordinateFlatten {'CoordinateFlatten': {'coordinate': "['level']", 'skip_missing': 'False'}}\n",
+ "\t\t idx_modification.TemporalRetrieval {'TemporalRetrieval': {'concat': 'True', 'delta_unit': 'None', 'merge_function': 'None', 'merge_kwargs': 'None', 'samples': '((0, 1), (6, 1))'}}\n",
+ "\t\t conversion.ToNumpy {'ToNumpy': {'reference_dataset': 'None', 'run_parallel': 'False', 'saved_records': 'None', 'warn': 'True'}}\n",
+ "\t\t reshape.Rearrange {'Rearrange': {'rearrange': "'c t h w -> t c h w'", 'rearrange_kwargs': 'None', 'reverse_rearrange': 'None', 'skip': 'False'}}\n",
+ "\t\t reshape.Squeeze {'Squeeze': {'axis': '0'}}\n",
+ "\t\t __main__.Persistence {'Persistence': {}}\n",
+ "\t\t controller.ReversedPipeline {'ReversedPipeline': {'forward_pipeline': {'Pipeline': {'__args': '(WB2ERA5\\n\\tDescription WeatherBench2 cloud-optimized ground truth ERA5 dataset\\n\\t\\t link \\'https://github.com/google-research/weatherbench2\\'\\n\\n\\n\\tInitialisation \\n\\t\\t level [850]\\n\\t\\t license_ok True\\n\\t\\t resolution \\'64x32\\'\\n\\t\\t variables [\\'2m_temperature\\', \\'u\\', \\'v\\', \\'geopotential\\', \\'vorticity\\']\\n\\tTransforms \\n\\t\\t StandardCoordinateNames {\\'latitude\\': "[\\'lat\\', \\'Latitude\\', \\'yt_ocean\\', \\'yt\\']", \\'longitude\\': "[\\'lon\\', \\'Longitude\\', \\'xt_ocean\\', \\'xt\\']", \\'replacement_dictionary\\': \\'None\\', \\'time\\': "[\\'Time\\']"}, Sort\\n\\tInitialisation Sort Variables of an `xarray` object\\n\\t\\t order [\\'2m_temperature\\', \\'u_component_of_wind\\', \\'v_component_of_wind\\', \\'vorticity\\', \\'geopotential\\']\\n\\t\\t strict False, StandardLongitude\\n\\tInitialisation Standardise format of longitude.\\n\\t\\t longitude_name \\'longitude\\'\\n\\t\\t type \\'0-360\\', CoordinateFlatten\\n\\tInitialisation Flatten a coordinate in a dataset into separate variables.\\n\\t\\t coordinate [\\'level\\']\\n\\t\\t skip_missing False, TemporalRetrieval\\n\\tInitialisation Retrieve a sequence of samples from `SequenceRetrieval`,\\n\\t\\t concat True\\n\\t\\t delta_unit None\\n\\t\\t merge_function None\\n\\t\\t merge_kwargs None\\n\\t\\t samples ((0, 1), (6, 1)), ToNumpy\\n\\tInitialisation Convert xarray objects to np.ndarray\\'s\\n\\t\\t reference_dataset None\\n\\t\\t run_parallel False\\n\\t\\t saved_records None\\n\\t\\t warn True, Rearrange\\n\\tInitialisation Operation to rearrange data using einops\\n\\t\\t rearrange \\'c t h w -> t c h w\\'\\n\\t\\t rearrange_kwargs None\\n\\t\\t reverse_rearrange None\\n\\t\\t skip False, Squeeze\\n\\tInitialisation Operation to Squeeze one-Dimensional axes at \\'axis\\' location\\n\\t\\t axis 0)', 'exceptions_to_ignore': 'None', 'iterator': 'None', 'name': 'None', 'sampler': 'None'}}}}<xarray.Dataset> Size: 42kB\n", + "Dimensions: (time: 1, longitude: 64, latitude: 32, level: 1)\n", + "Coordinates:\n", + " * time (time) datetime64[ns] 8B 2021-01-01\n", + " * longitude (longitude) float64 512B 0.0 5.625 ... 348.8 354.4\n", + " * latitude (latitude) float64 256B -87.19 -81.56 ... 81.56 87.19\n", + " * level (level) float64 8B 850.0\n", + "Data variables:\n", + " 2m_temperature (time, longitude, latitude) float32 8kB 241.8 ... 260.0\n", + " u_component_of_wind (time, longitude, latitude) float32 8kB -2.618 ... -...\n", + " v_component_of_wind (time, longitude, latitude) float32 8kB -2.405 ... 1...\n", + " vorticity (time, longitude, latitude) float32 8kB -1.714e-05 ....\n", + " geopotential (time, longitude, latitude) float32 8kB 1.12e+04 ......