From ce04aaf71038bc516f531efd756566c85c13d519 Mon Sep 17 00:00:00 2001 From: simw Date: Tue, 23 Jan 2024 14:34:41 +0000 Subject: [PATCH 1/7] Move optypes into separate file --- src/pipedata/core/chain.py | 82 ++++---------------------------- src/pipedata/core/optypes.py | 92 ++++++++++++++++++++++++++++++++++++ src/pipedata/ops/storage.py | 14 ++++-- 3 files changed, 113 insertions(+), 75 deletions(-) create mode 100644 src/pipedata/core/optypes.py diff --git a/src/pipedata/core/chain.py b/src/pipedata/core/chain.py index a4568d1..cefc7cc 100644 --- a/src/pipedata/core/chain.py +++ b/src/pipedata/core/chain.py @@ -1,7 +1,5 @@ from __future__ import annotations -import functools -import itertools from typing import ( Any, Callable, @@ -17,66 +15,17 @@ overload, ) +from .optypes import ChainLink, batched_op, filter_op, one2one_op + TStart = TypeVar("TStart") TEnd = TypeVar("TEnd") TOther = TypeVar("TOther") -def batched(iterable: Iterator[TEnd], n: Optional[int]) -> Iterator[Tuple[TEnd, ...]]: - """Can be replaced by itertools.batched once using Python 3.12+.""" - while (elements := tuple(itertools.islice(iterable, n))) != (): - yield elements - - def _identity(input_iterator: Iterator[TEnd]) -> Iterator[TEnd]: yield from input_iterator -class CountingIterator(Iterator[TStart]): - def __init__(self, iterator: Iterator[TStart]) -> None: - self._iterator = iterator - self._count = 0 - - def __iter__(self) -> Iterator[TStart]: - return self - - def __next__(self) -> TStart: - self._count += 1 - try: - return next(self._iterator) - except StopIteration as err: - self._count -= 1 - raise StopIteration from err - - def get_count(self) -> int: - return self._count - - -class ChainLink(Generic[TStart, TEnd]): - def __init__( - self, - func: Callable[[Iterator[TStart]], Iterator[TEnd]], - ) -> None: - self._func = func - self._input: Optional[CountingIterator[TStart]] = None - self._output: Optional[CountingIterator[TEnd]] = None - - @property - def __name__(self) -> str: # noqa: A003 - return self._func.__name__ - - def __call__(self, input_iterator: Iterator[TStart]) -> Iterator[TEnd]: - self._input = CountingIterator(input_iterator) - self._output = CountingIterator(self._func(self._input)) - return self._output - - def get_counts(self) -> Tuple[int, int]: - return ( - 0 if self._input is None else self._input.get_count(), - 0 if self._output is None else self._output.get_count(), - ) - - class Chain(Generic[TStart, TEnd]): @overload def __init__( @@ -124,16 +73,16 @@ def flat_map( """ return Chain(self, func) + def then( + self, func: Callable[[Iterator[TEnd]], Iterator[TOther]] + ) -> Chain[TStart, TOther]: + return Chain(self, func) + def filter(self, func: Callable[[TEnd], bool]) -> Chain[TStart, TEnd]: # noqa: A003 """ Remove elements from the stream that do not pass the filter function. """ - - @functools.wraps(func) - def new_action(previous_step: Iterator[TEnd]) -> Iterator[TEnd]: - return filter(func, previous_step) - - return self.flat_map(new_action) + return self.then(filter_op(func)) def map( # noqa: A003 self, func: Callable[[TEnd], TOther] @@ -141,12 +90,7 @@ def map( # noqa: A003 """ Return a single transformed element from each input element. """ - - @functools.wraps(func) - def new_action(previous_step: Iterator[TEnd]) -> Iterator[TOther]: - return map(func, previous_step) - - return self.flat_map(new_action) + return self.then(one2one_op(func)) def batched_map( self, func: Callable[[Tuple[TEnd, ...]], TOther], n: Optional[int] = None @@ -157,13 +101,7 @@ def batched_map( If n is None, then apply the function to all the elements, and return an iterator of 1 element. """ - - @functools.wraps(func) - def new_action(previous_step: Iterator[TEnd]) -> Iterator[TOther]: - for elements in batched(previous_step, n): - yield func(elements) - - return self.flat_map(new_action) + return self.then(batched_op(func, n)) def get_counts(self) -> List[Dict[str, Any]]: step_counts = [] diff --git a/src/pipedata/core/optypes.py b/src/pipedata/core/optypes.py new file mode 100644 index 0000000..ed8e267 --- /dev/null +++ b/src/pipedata/core/optypes.py @@ -0,0 +1,92 @@ +import functools +import itertools +from typing import ( + Callable, + Generic, + Iterator, + Optional, + Tuple, + TypeVar, +) + +TStart = TypeVar("TStart") +TEnd = TypeVar("TEnd") +TOther = TypeVar("TOther") + + +def _batched(iterable: Iterator[TEnd], n: Optional[int]) -> Iterator[Tuple[TEnd, ...]]: + """Can be replaced by itertools.batched once using Python 3.12+.""" + while (elements := tuple(itertools.islice(iterable, n))) != (): + yield elements + + +class CountingIterator(Iterator[TStart]): + def __init__(self, iterator: Iterator[TStart]) -> None: + self._iterator = iterator + self._count = 0 + + def __iter__(self) -> Iterator[TStart]: + return self + + def __next__(self) -> TStart: + self._count += 1 + try: + return next(self._iterator) + except StopIteration as err: + self._count -= 1 + raise StopIteration from err + + def get_count(self) -> int: + return self._count + + +class ChainLink(Generic[TStart, TEnd]): + def __init__( + self, + func: Callable[[Iterator[TStart]], Iterator[TEnd]], + ) -> None: + self._func = func + self._input: Optional[CountingIterator[TStart]] = None + self._output: Optional[CountingIterator[TEnd]] = None + + @property + def __name__(self) -> str: # noqa: A003 + return self._func.__name__ + + def __call__(self, input_iterator: Iterator[TStart]) -> Iterator[TEnd]: + self._input = CountingIterator(input_iterator) + self._output = CountingIterator(self._func(self._input)) + return self._output + + def get_counts(self) -> Tuple[int, int]: + return ( + 0 if self._input is None else self._input.get_count(), + 0 if self._output is None else self._output.get_count(), + ) + + +class filter_op(ChainLink[TEnd, TEnd]): # noqa: N801 + def __init__(self, func: Callable[[TEnd], bool]): + @functools.wraps(func) + def new_action(previous_step: Iterator[TEnd]) -> Iterator[TEnd]: + return filter(func, previous_step) + + super().__init__(new_action) + + +class one2one_op(ChainLink[TEnd, TOther]): # noqa: N801 + def __init__(self, func: Callable[[TEnd], TOther]): + @functools.wraps(func) + def new_action(previous_step: Iterator[TEnd]) -> Iterator[TOther]: + return map(func, previous_step) + + super().__init__(new_action) + + +class batched_op(ChainLink[TEnd, TOther]): # noqa: N801 + def __init__(self, func: Callable[[Tuple[TEnd, ...]], TOther], n: Optional[int]): + @functools.wraps(func) + def new_action(previous_step: Iterator[TEnd]) -> Iterator[TOther]: + return (func(elements) for elements in _batched(previous_step, n)) + + super().__init__(new_action) diff --git a/src/pipedata/ops/storage.py b/src/pipedata/ops/storage.py index cb72304..0a76cfc 100644 --- a/src/pipedata/ops/storage.py +++ b/src/pipedata/ops/storage.py @@ -1,14 +1,22 @@ +import itertools import logging -from typing import Any, Callable, Dict, Iterator, Optional +from typing import Any, Callable, Dict, Iterator, Optional, Tuple, TypeVar import pyarrow as pa # type: ignore import pyarrow.parquet as pq # type: ignore -from pipedata.core.chain import batched +T = TypeVar("T") + logger = logging.getLogger(__name__) +def _batched(iterable: Iterator[T], n: Optional[int]) -> Iterator[Tuple[T, ...]]: + """Can be replaced by itertools.batched once using Python 3.12+.""" + while (elements := tuple(itertools.islice(iterable, n))) != (): + yield elements + + def parquet_writer( file_path: str, schema: Optional[pa.Schema] = None, @@ -32,7 +40,7 @@ def parquet_writer_func(records: Iterator[Dict[str, Any]]) -> Iterator[str]: writer = None file_number = 1 file_length = 0 - for batch in batched(records, row_group_length): + for batch in _batched(records, row_group_length): table = pa.Table.from_pylist(batch, schema=schema) if writer is None: formated_file_path = file_path From b4ea119cdc13c68e0ff7532a0db5ff15d55e6391 Mon Sep 17 00:00:00 2001 From: simw Date: Tue, 23 Jan 2024 14:41:42 +0000 Subject: [PATCH 2/7] Rename ChainStart to Chain and StreamStart to Stream --- README.md | 20 ++++++++-------- src/pipedata/core/__init__.py | 8 +++---- src/pipedata/core/chain.py | 24 ++++++++++--------- src/pipedata/core/stream.py | 26 ++++++++++----------- tests/core/test_chain.py | 20 ++++++++-------- tests/core/test_stream.py | 44 +++++++++++++++++------------------ tests/ops/test_files.py | 12 +++++----- tests/ops/test_pipeline.py | 4 ++-- tests/ops/test_records.py | 10 ++++---- tests/ops/test_storage.py | 8 +++---- 10 files changed, 88 insertions(+), 88 deletions(-) diff --git a/README.md b/README.md index 3181218..6a34ef2 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ import zipfile import pyarrow.parquet as pq -from pipedata.core import StreamStart +from pipedata.core import Stream from pipedata.ops import json_records, parquet_writer, zipped_files @@ -46,7 +46,7 @@ with zipfile.ZipFile("test_input.json.zip", "w") as zipped: zipped.writestr("file2.json", json.dumps(data2)) result = ( - StreamStart(["test_input.json.zip"]) + Stream(["test_input.json.zip"]) .flat_map(zipped_files) .flat_map(json_records()) .flat_map(parquet_writer("test_output.parquet")) @@ -63,17 +63,17 @@ Alternatively, you can construct the pipeline as a chain: ```py import pyarrow.parquet as pq -from pipedata.core import ChainStart, StreamStart +from pipedata.core import Chain, Stream from pipedata.ops import json_records, parquet_writer, zipped_files # Running this after input file created in above example chain = ( - ChainStart() + Chain() .flat_map(zipped_files) .flat_map(json_records()) .flat_map(parquet_writer("test_output_2.parquet")) ) -result = StreamStart(["test_input.json.zip"]).flat_map(chain).to_list() +result = Stream(["test_input.json.zip"]).flat_map(chain).to_list() table = pq.read_table("test_output_2.parquet") print(table.to_pydict()) #> {'col1': [1, 2, 3], 'col2': ['Hello', 'world', '!']} @@ -86,11 +86,11 @@ The core framework provides the building blocks for chaining operations. Running a stream: ```py -from pipedata.core import StreamStart +from pipedata.core import Stream result = ( - StreamStart(range(10)) + Stream(range(10)) .filter(lambda x: x % 2 == 0) .map(lambda x: x ^ 2) .batched_map(lambda x: x, 2) @@ -103,11 +103,11 @@ print(result) Creating a chain and then using it: ```py import json -from pipedata.core import ChainStart, Stream, StreamStart +from pipedata.core import Chain, Stream, Stream chain = ( - ChainStart() + Chain() .filter(lambda x: x % 2 == 0) .map(lambda x: x ^ 2) .batched_map(lambda x: sum(x), 2) @@ -137,7 +137,7 @@ print(json.dumps(chain.get_counts(), indent=4)) #> "outputs": 3 #> } #> ] -print(StreamStart(range(10)).flat_map(chain).to_list()) +print(Stream(range(10)).flat_map(chain).to_list()) #> [2, 10, 10] ``` diff --git a/src/pipedata/core/__init__.py b/src/pipedata/core/__init__.py index 7b7257f..547752d 100644 --- a/src/pipedata/core/__init__.py +++ b/src/pipedata/core/__init__.py @@ -1,9 +1,9 @@ -from .chain import Chain, ChainStart -from .stream import Stream, StreamStart +from .chain import Chain, ChainType +from .stream import Stream, StreamType __all__ = [ + "ChainType", "Chain", - "ChainStart", + "StreamType", "Stream", - "StreamStart", ] diff --git a/src/pipedata/core/chain.py b/src/pipedata/core/chain.py index cefc7cc..b8881da 100644 --- a/src/pipedata/core/chain.py +++ b/src/pipedata/core/chain.py @@ -26,11 +26,11 @@ def _identity(input_iterator: Iterator[TEnd]) -> Iterator[TEnd]: yield from input_iterator -class Chain(Generic[TStart, TEnd]): +class ChainType(Generic[TStart, TEnd]): @overload def __init__( self, - previous_steps: Chain[TStart, TOther], + previous_steps: ChainType[TStart, TOther], func: Callable[[Iterator[TOther]], Iterator[TEnd]], ): ... @@ -45,7 +45,7 @@ def __init__( def __init__( self, - previous_steps: Optional[Chain[TStart, TOther]], + previous_steps: Optional[ChainType[TStart, TOther]], func: Union[ Callable[[Iterator[TOther]], Iterator[TEnd]], Callable[[Iterator[TStart]], Iterator[TEnd]], @@ -63,7 +63,7 @@ def __call__(self, input_iterator: Iterator[TStart]) -> Iterator[TEnd]: def flat_map( self, func: Callable[[Iterator[TEnd]], Iterator[TOther]] - ) -> Chain[TStart, TOther]: + ) -> ChainType[TStart, TOther]: """ Output zero or more elements from one or more input elements. @@ -71,14 +71,16 @@ def flat_map( stream of elements. It is the most powerful operation, and all the other operations are implemented in terms of it. """ - return Chain(self, func) + return ChainType(self, func) def then( self, func: Callable[[Iterator[TEnd]], Iterator[TOther]] - ) -> Chain[TStart, TOther]: - return Chain(self, func) + ) -> ChainType[TStart, TOther]: + return ChainType(self, func) - def filter(self, func: Callable[[TEnd], bool]) -> Chain[TStart, TEnd]: # noqa: A003 + def filter( # noqa: A003 + self, func: Callable[[TEnd], bool] + ) -> ChainType[TStart, TEnd]: """ Remove elements from the stream that do not pass the filter function. """ @@ -86,7 +88,7 @@ def filter(self, func: Callable[[TEnd], bool]) -> Chain[TStart, TEnd]: # noqa: def map( # noqa: A003 self, func: Callable[[TEnd], TOther] - ) -> Chain[TStart, TOther]: + ) -> ChainType[TStart, TOther]: """ Return a single transformed element from each input element. """ @@ -94,7 +96,7 @@ def map( # noqa: A003 def batched_map( self, func: Callable[[Tuple[TEnd, ...]], TOther], n: Optional[int] = None - ) -> Chain[TStart, TOther]: + ) -> ChainType[TStart, TOther]: """ Return a single transformed element from (up to) n input elements. @@ -119,6 +121,6 @@ def get_counts(self) -> List[Dict[str, Any]]: return step_counts -class ChainStart(Chain[TOther, TOther]): +class Chain(ChainType[TOther, TOther]): def __init__(self) -> None: super().__init__(None, _identity) diff --git a/src/pipedata/core/stream.py b/src/pipedata/core/stream.py index dfdd749..2120928 100644 --- a/src/pipedata/core/stream.py +++ b/src/pipedata/core/stream.py @@ -15,15 +15,15 @@ overload, ) -from .chain import Chain, ChainStart +from .chain import Chain, ChainType TStart = TypeVar("TStart") TEnd = TypeVar("TEnd") TNewEnd = TypeVar("TNewEnd") -class Stream(Iterable[TEnd]): - def __init__(self, items: Iterable[TStart], chain: Chain[TStart, TEnd]) -> None: +class StreamType(Iterable[TEnd]): + def __init__(self, items: Iterable[TStart], chain: ChainType[TStart, TEnd]) -> None: self._items = iter(items) self._chain = chain self._iter = self._chain(self._items) @@ -36,19 +36,19 @@ def __next__(self) -> TEnd: def flat_map( self, func: Callable[[Iterator[TEnd]], Iterator[TNewEnd]] - ) -> Stream[TNewEnd]: - return Stream(self._items, self._chain.flat_map(func)) + ) -> StreamType[TNewEnd]: + return StreamType(self._items, self._chain.flat_map(func)) - def filter(self, func: Callable[[TEnd], bool]) -> Stream[TEnd]: # noqa: A003 - return Stream(self._items, self._chain.filter(func)) + def filter(self, func: Callable[[TEnd], bool]) -> StreamType[TEnd]: # noqa: A003 + return StreamType(self._items, self._chain.filter(func)) - def map(self, func: Callable[[TEnd], TNewEnd]) -> Stream[TNewEnd]: # noqa: A003 - return Stream(self._items, self._chain.map(func)) + def map(self, func: Callable[[TEnd], TNewEnd]) -> StreamType[TNewEnd]: # noqa: A003 + return StreamType(self._items, self._chain.map(func)) def batched_map( self, func: Callable[[Tuple[TEnd, ...]], TNewEnd], n: Optional[int] = None - ) -> Stream[TNewEnd]: - return Stream(self._items, self._chain.batched_map(func, n)) + ) -> StreamType[TNewEnd]: + return StreamType(self._items, self._chain.batched_map(func, n)) @overload def reduce(self, func: Callable[[TEnd, TEnd], TEnd]) -> TEnd: @@ -75,6 +75,6 @@ def get_counts(self) -> List[Dict[str, Any]]: return self._chain.get_counts() -class StreamStart(Stream[TEnd]): +class Stream(StreamType[TEnd]): def __init__(self, items: Iterable[TEnd]) -> None: - super().__init__(items, ChainStart[TEnd]()) + super().__init__(items, Chain[TEnd]()) diff --git a/tests/core/test_chain.py b/tests/core/test_chain.py index b87ff8b..71e8e47 100644 --- a/tests/core/test_chain.py +++ b/tests/core/test_chain.py @@ -1,6 +1,6 @@ from typing import Iterator, Tuple -from pipedata.core import Chain, ChainStart +from pipedata.core import Chain, ChainType def test_chain() -> None: @@ -9,7 +9,7 @@ def test_chain() -> None: to check that terminations are explicitly checking for None rather than a false value. """ - chain = ChainStart[int]() + chain = Chain[int]() result = list(chain(iter([0, 1, 2, 3]))) assert result == [0, 1, 2, 3] assert chain.get_counts() == [ @@ -32,7 +32,7 @@ def test_chain_with_wrong_types() -> None: def is_even(value: int) -> bool: return value % 2 == 0 - chain = ChainStart[str]().filter(is_even) # type: ignore + chain = Chain[str]().filter(is_even) # type: ignore result = list(chain(iter([1, 2, 3]))) # type: ignore assert result == [2] # type: ignore @@ -41,7 +41,7 @@ def test_chain_filter() -> None: def is_even(value: int) -> bool: return value % 2 == 0 - chain = ChainStart[int]().filter(is_even) + chain = Chain[int]().filter(is_even) result = list(chain(iter([0, 1, 2, 3]))) assert result == [0, 2] @@ -78,7 +78,7 @@ def test_chain_filter_with_none_passing() -> None: def is_even(value: int) -> bool: return value % 2 == 0 - chain = ChainStart[int]().filter(is_even) + chain = Chain[int]().filter(is_even) result = list(chain(iter([1, 3, 5]))) assert result == [] @@ -88,7 +88,7 @@ def add_one(input_iterator: Iterator[int]) -> Iterator[int]: for element in input_iterator: yield element + 1 - chain = ChainStart[int]().flat_map(add_one) + chain = Chain[int]().flat_map(add_one) result = list(chain(iter([0, 1, 2, 3]))) assert result == [1, 2, 3, 4] @@ -109,7 +109,7 @@ def multiply_two(input_iterator: Iterator[int]) -> Iterator[int]: def is_even(value: int) -> bool: return value % 2 == 0 - chain = ChainStart[int]().flat_map(add_one).filter(is_even).flat_map(multiply_two) + chain = Chain[int]().flat_map(add_one).filter(is_even).flat_map(multiply_two) result = list(chain(iter([0, 1, 2, 3]))) assert result == [4, 8] assert chain.get_counts() == [ @@ -140,13 +140,13 @@ def test_chain_map() -> None: def add_one(value: int) -> int: return value + 1 - chain = ChainStart[int]().map(add_one) + chain = Chain[int]().map(add_one) result = list(chain(iter([0, 1, 2, 3]))) assert result == [1, 2, 3, 4] def test_chain_map_changing_types() -> None: - chain: Chain[int, str] = ChainStart[int]().map(str) + chain: ChainType[int, str] = Chain[int]().map(str) result = list(chain(iter([0, 1, 2, 3]))) assert result == ["0", "1", "2", "3"] @@ -155,7 +155,7 @@ def test_chain_batched_map() -> None: def add_values(values: Tuple[int, ...]) -> int: return sum(values) - chain = ChainStart[int]().batched_map(add_values, 2) + chain = Chain[int]().batched_map(add_values, 2) result = list(chain(iter([0, 1, 2, 3, 4]))) assert result == [1, 5, 4] assert chain.get_counts() == [ diff --git a/tests/core/test_stream.py b/tests/core/test_stream.py index 3737cb6..9224c67 100644 --- a/tests/core/test_stream.py +++ b/tests/core/test_stream.py @@ -1,16 +1,16 @@ from itertools import islice from typing import Iterable, Iterator, List -from pipedata.core import ChainStart, StreamStart +from pipedata.core import Chain, Stream def test_stream_to_list() -> None: - result = StreamStart([1, 2, 3]).to_list() + result = Stream([1, 2, 3]).to_list() assert result == [1, 2, 3] def test_stream_on_range() -> None: - stream = StreamStart(range(3)) + stream = Stream(range(3)) result = stream.to_list() assert result == [0, 1, 2] assert stream.get_counts() == [ @@ -19,7 +19,7 @@ def test_stream_on_range() -> None: def test_repeated_stream() -> None: - stream = StreamStart([0, 1, 2, 3]) + stream = Stream([0, 1, 2, 3]) result1 = stream.to_list() assert result1 == [0, 1, 2, 3] @@ -29,11 +29,11 @@ def test_repeated_stream() -> None: def test_stream_is_iterable() -> None: - stream = StreamStart([0, 1, 2, 3]) + stream = Stream([0, 1, 2, 3]) result = list(stream) assert result == [0, 1, 2, 3] - stream2 = StreamStart([2, 3, 4]) + stream2 = Stream([2, 3, 4]) result2 = [] for element in stream2: result2.append(element) # noqa: PERF402 @@ -41,19 +41,19 @@ def test_stream_is_iterable() -> None: def test_stream_with_a_chain() -> None: - chain = ChainStart[int]().filter(lambda x: x % 2 == 0) + chain = Chain[int]().filter(lambda x: x % 2 == 0) - result = StreamStart([0, 1, 2, 3]).flat_map(chain).to_list() + result = Stream([0, 1, 2, 3]).flat_map(chain).to_list() assert result == [0, 2] def test_stream_to_list_smaller_length() -> None: - result = StreamStart([0, 1, 2, 3]).to_list(2) + result = Stream([0, 1, 2, 3]).to_list(2) assert result == [0, 1] def test_stream_to_list_longer_length() -> None: - result = StreamStart([0, 1, 2, 3]).to_list(5) + result = Stream([0, 1, 2, 3]).to_list(5) assert result == [0, 1, 2, 3] @@ -61,7 +61,7 @@ def test_stream_filter() -> None: def is_even(value: int) -> bool: return value % 2 == 0 - result = StreamStart([0, 1, 2, 3, 4, 5]).filter(is_even).to_list() + result = Stream([0, 1, 2, 3, 4, 5]).filter(is_even).to_list() assert result == [0, 2, 4] @@ -69,7 +69,7 @@ def test_stream_filter_with_range() -> None: def is_even(value: int) -> bool: return value % 2 == 0 - result = StreamStart(range(10)).filter(is_even).to_list() + result = Stream(range(10)).filter(is_even).to_list() assert result == [0, 2, 4, 6, 8] @@ -77,7 +77,7 @@ def test_stream_filter_with_none_passing() -> None: def is_even(value: int) -> bool: return value % 2 == 0 - result = StreamStart([1, 3, 5]).filter(is_even).to_list() + result = Stream([1, 3, 5]).filter(is_even).to_list() assert result == [] @@ -85,7 +85,7 @@ def test_stream_flat_map_identity() -> None: def identity(input_iterator: Iterator[int]) -> Iterator[int]: yield from input_iterator - result = StreamStart([0, 1, 2, 3]).flat_map(identity).to_list() + result = Stream([0, 1, 2, 3]).flat_map(identity).to_list() assert result == [0, 1, 2, 3] @@ -98,9 +98,7 @@ def multiply_two(input_iterator: Iterator[int]) -> Iterator[int]: for element in input_iterator: yield element * 2 - result = ( - StreamStart([0, 1, 2, 3]).flat_map(add_one).flat_map(multiply_two).to_list() - ) + result = Stream([0, 1, 2, 3]).flat_map(add_one).flat_map(multiply_two).to_list() assert result == [2, 4, 6, 8] @@ -110,7 +108,7 @@ def add_element(input_iterator: Iterator[int]) -> Iterator[int]: yield element yield element + 1 - result = StreamStart([0, 1, 2, 3]).flat_map(add_element).to_list() + result = Stream([0, 1, 2, 3]).flat_map(add_element).to_list() assert result == [0, 1, 1, 2, 2, 3, 3, 4] @@ -119,7 +117,7 @@ def add_two_values(input_iterator: Iterator[int]) -> Iterator[int]: while batch := tuple(islice(input_iterator, 2)): yield sum(batch) - result = StreamStart([0, 1, 2, 3, 4]).flat_map(add_two_values).to_list() + result = Stream([0, 1, 2, 3, 4]).flat_map(add_two_values).to_list() assert result == [1, 5, 4] @@ -127,7 +125,7 @@ def test_stream_map() -> None: def add_one(value: int) -> int: return value + 1 - result = StreamStart([0, 1, 2, 3]).map(add_one).to_list() + result = Stream([0, 1, 2, 3]).map(add_one).to_list() assert result == [1, 2, 3, 4] @@ -135,7 +133,7 @@ def test_stream_reduce_adding() -> None: def add_values(a: int, b: int) -> int: return a + b - result = StreamStart([0, 1, 2, 3]).reduce(add_values) + result = Stream([0, 1, 2, 3]).reduce(add_values) expected = 6 assert result == expected @@ -146,7 +144,7 @@ def append_values(a: List[int], b: int) -> List[int]: return a initializer: List[int] = [] - result = StreamStart([0, 1, 2, 3]).reduce(append_values, initializer) + result = Stream([0, 1, 2, 3]).reduce(append_values, initializer) assert result == [0, 1, 2, 3] @@ -154,5 +152,5 @@ def test_stream_batched_map() -> None: def add_values(values: Iterable[int]) -> int: return sum(values) - result = StreamStart([0, 1, 2, 3, 4]).batched_map(add_values, 2).to_list() + result = Stream([0, 1, 2, 3, 4]).batched_map(add_values, 2).to_list() assert result == [1, 5, 4] diff --git a/tests/ops/test_files.py b/tests/ops/test_files.py index c9acd20..8e7e87f 100644 --- a/tests/ops/test_files.py +++ b/tests/ops/test_files.py @@ -5,7 +5,7 @@ import pyarrow as pa # type: ignore import pytest -from pipedata.core import StreamStart +from pipedata.core import Stream from pipedata.ops.files import FilesReaderError, read_from_parquet, zipped_files @@ -18,7 +18,7 @@ def test_zipped_files() -> None: zip_file.writestr("test2.txt", "Hello, world 2!") zip_file.writestr("test3.txt", "Hello, world 3!") - result = StreamStart([str(zip_path)]).flat_map(zipped_files).to_list() + result = Stream([str(zip_path)]).flat_map(zipped_files).to_list() assert result[0].name == "test.txt" assert result[1].name == "test2.txt" @@ -35,7 +35,7 @@ def test_zipped_file_contents() -> None: zip_file.writestr("test3.txt", "Hello, world 3!") result = ( - StreamStart([str(zip_path)]) + Stream([str(zip_path)]) .flat_map(zipped_files) .map(lambda x: x.contents.read().decode("utf-8")) .to_list() @@ -62,7 +62,7 @@ def test_parquet_reading_simple() -> None: pa.parquet.write_table(table, parquet_path) parquet_reader = read_from_parquet() - result = StreamStart([str(parquet_path)]).flat_map(parquet_reader).to_list() + result = Stream([str(parquet_path)]).flat_map(parquet_reader).to_list() expected = [ {"a": 1, "b": 4}, @@ -85,7 +85,7 @@ def test_parquet_reading_with_columns() -> None: pa.parquet.write_table(table, parquet_path) parquet_reader = read_from_parquet(columns=["a"]) - result = StreamStart([str(parquet_path)]).flat_map(parquet_reader).to_list() + result = Stream([str(parquet_path)]).flat_map(parquet_reader).to_list() expected = [ {"a": 1}, @@ -108,7 +108,7 @@ def test_parquet_reading_record_batch() -> None: pa.parquet.write_table(table, parquet_path) parquet_reader = read_from_parquet(columns=["a"], return_as="recordbatch") - result = StreamStart([str(parquet_path)]).flat_map(parquet_reader).to_list() + result = Stream([str(parquet_path)]).flat_map(parquet_reader).to_list() schema = pa.schema([("a", pa.int64())]) a_array = pa.array([1, 2, 3]) diff --git a/tests/ops/test_pipeline.py b/tests/ops/test_pipeline.py index 031e607..ccb90b9 100644 --- a/tests/ops/test_pipeline.py +++ b/tests/ops/test_pipeline.py @@ -5,7 +5,7 @@ import pyarrow.parquet as pq # type: ignore -from pipedata.core import StreamStart +from pipedata.core import Stream from pipedata.ops import json_records, parquet_writer, zipped_files @@ -33,7 +33,7 @@ def test_zipped_files() -> None: zip_file.writestr("test3.txt", json.dumps(data3)) result = ( - StreamStart([str(zip_path)]) + Stream([str(zip_path)]) .flat_map(zipped_files) .map(lambda x: x.contents) .flat_map(json_records()) diff --git a/tests/ops/test_records.py b/tests/ops/test_records.py index ee392ca..4376bbd 100644 --- a/tests/ops/test_records.py +++ b/tests/ops/test_records.py @@ -1,7 +1,7 @@ import io import json -from pipedata.core import StreamStart +from pipedata.core import Stream from pipedata.ops.files import OpenedFileRef from pipedata.ops.records import csv_records, json_records @@ -13,7 +13,7 @@ def test_json_records() -> None: file1 = io.BytesIO(json.dumps(json1).encode("utf-8")) file2 = io.BytesIO(json.dumps(json2).encode("utf-8")) - result = StreamStart([file1, file2]).flat_map(json_records()).to_list() + result = Stream([file1, file2]).flat_map(json_records()).to_list() expected = json1 + json2 assert result == expected @@ -30,7 +30,7 @@ def test_json_records_from_file_ref() -> None: OpenedFileRef(name="test2.json", contents=file2), ] - result = StreamStart(file_refs).flat_map(json_records()).to_list() + result = Stream(file_refs).flat_map(json_records()).to_list() expected = json1 + json2 assert result == expected @@ -42,7 +42,7 @@ def test_csv_records() -> None: file1 = io.BytesIO(csv1.encode("utf-8")) file2 = io.BytesIO(csv2.encode("utf-8")) - result = StreamStart([file1, file2]).flat_map(csv_records()).to_list() + result = Stream([file1, file2]).flat_map(csv_records()).to_list() expected = [ {"a": "1", "b": "2"}, {"a": "3", "b": "4"}, @@ -64,7 +64,7 @@ def test_csv_records_from_file_ref() -> None: OpenedFileRef(name="test2.csv", contents=file2), ] - result = StreamStart(file_refs).flat_map(csv_records()).to_list() + result = Stream(file_refs).flat_map(csv_records()).to_list() expected = [ {"a": "1", "b": "2"}, {"a": "3", "b": "4"}, diff --git a/tests/ops/test_storage.py b/tests/ops/test_storage.py index 7d05752..08d4176 100644 --- a/tests/ops/test_storage.py +++ b/tests/ops/test_storage.py @@ -4,7 +4,7 @@ import pyarrow.parquet as pq # type: ignore import pytest -from pipedata.core import StreamStart +from pipedata.core import Stream from pipedata.ops.storage import parquet_writer @@ -20,7 +20,7 @@ def test_parquet_simple_storage() -> None: temp_path = Path(tmpdir) output_path = temp_path / "test.parquet" - result = StreamStart(items).flat_map(parquet_writer(str(output_path))).to_list() + result = Stream(items).flat_map(parquet_writer(str(output_path))).to_list() assert result == [str(output_path)] @@ -47,7 +47,7 @@ def test_parquet_batched_storage() -> None: output_path = temp_path / "test.parquet" result = ( - StreamStart(items) + Stream(items) .flat_map(parquet_writer(str(output_path), row_group_length=2)) .to_list() ) @@ -77,7 +77,7 @@ def test_parquet_multiple_files() -> None: output_path = temp_path / "test_{i:04d}.parquet" result = ( - StreamStart(items) + Stream(items) .flat_map(parquet_writer(str(output_path), max_file_length=2)) .to_list() ) From 8a7e89c90d663f8741be10dca63bff3eedb65bdd Mon Sep 17 00:00:00 2001 From: simw Date: Tue, 23 Jan 2024 15:04:06 +0000 Subject: [PATCH 3/7] Add .then and pipe operations, as aliases to flat_map --- src/pipedata/core/chain.py | 7 +++- src/pipedata/core/stream.py | 12 +++++- tests/core/test_chain.py | 73 ++++++++++++++++++++++++++++++++++++- tests/core/test_stream.py | 18 ++++++++- 4 files changed, 104 insertions(+), 6 deletions(-) diff --git a/src/pipedata/core/chain.py b/src/pipedata/core/chain.py index b8881da..9466232 100644 --- a/src/pipedata/core/chain.py +++ b/src/pipedata/core/chain.py @@ -71,13 +71,18 @@ def flat_map( stream of elements. It is the most powerful operation, and all the other operations are implemented in terms of it. """ - return ChainType(self, func) + return self.then(func) def then( self, func: Callable[[Iterator[TEnd]], Iterator[TOther]] ) -> ChainType[TStart, TOther]: return ChainType(self, func) + def __or__( + self, func: Callable[[Iterator[TEnd]], Iterator[TOther]] + ) -> ChainType[TStart, TOther]: + return self.then(func) + def filter( # noqa: A003 self, func: Callable[[TEnd], bool] ) -> ChainType[TStart, TEnd]: diff --git a/src/pipedata/core/stream.py b/src/pipedata/core/stream.py index 2120928..3710dbb 100644 --- a/src/pipedata/core/stream.py +++ b/src/pipedata/core/stream.py @@ -37,7 +37,17 @@ def __next__(self) -> TEnd: def flat_map( self, func: Callable[[Iterator[TEnd]], Iterator[TNewEnd]] ) -> StreamType[TNewEnd]: - return StreamType(self._items, self._chain.flat_map(func)) + return StreamType(self._items, self._chain.then(func)) + + def then( + self, func: Callable[[Iterator[TEnd]], Iterator[TNewEnd]] + ) -> StreamType[TNewEnd]: + return StreamType(self._items, self._chain.then(func)) + + def __or__( + self, func: Callable[[Iterator[TEnd]], Iterator[TNewEnd]] + ) -> StreamType[TNewEnd]: + return StreamType(self._items, self._chain.then(func)) def filter(self, func: Callable[[TEnd], bool]) -> StreamType[TEnd]: # noqa: A003 return StreamType(self._items, self._chain.filter(func)) diff --git a/tests/core/test_chain.py b/tests/core/test_chain.py index 71e8e47..60505e0 100644 --- a/tests/core/test_chain.py +++ b/tests/core/test_chain.py @@ -97,7 +97,76 @@ def add_one(input_iterator: Iterator[int]) -> Iterator[int]: assert result2 == [3, 4, 5] -def test_chain_multiple_operations() -> None: +def test_chain_then() -> None: + def add_one(input_iterator: Iterator[int]) -> Iterator[int]: + for element in input_iterator: + yield element + 1 + + chain = Chain[int]().then(add_one) + + result = list(chain(iter([0, 1, 2, 3]))) + assert result == [1, 2, 3, 4] + + result2 = list(chain(iter([2, 3, 4]))) + assert result2 == [3, 4, 5] + + +def test_chain_pipe() -> None: + def add_one(input_iterator: Iterator[int]) -> Iterator[int]: + for element in input_iterator: + yield element + 1 + + chain = Chain[int]() | add_one + + result = list(chain(iter([0, 1, 2, 3]))) + assert result == [1, 2, 3, 4] + + result2 = list(chain(iter([2, 3, 4]))) + assert result2 == [3, 4, 5] + + +def test_chain_piping_multiple_operations() -> None: # noqa: C901 + def add_one(input_iterator: Iterator[int]) -> Iterator[int]: + for element in input_iterator: + yield element + 1 + + def multiply_two(input_iterator: Iterator[int]) -> Iterator[int]: + for element in input_iterator: + yield element * 2 + + def is_even(input_iterator: Iterator[int]) -> Iterator[int]: + for element in input_iterator: + if element % 2 == 0: + yield element + + chain = Chain[int]() | add_one | is_even | multiply_two + result = list(chain(iter([0, 1, 2, 3]))) + assert result == [4, 8] + assert chain.get_counts() == [ + { + "name": "_identity", + "inputs": 4, + "outputs": 4, + }, + { + "name": "add_one", + "inputs": 4, + "outputs": 4, + }, + { + "name": "is_even", + "inputs": 4, + "outputs": 2, + }, + { + "name": "multiply_two", + "inputs": 2, + "outputs": 2, + }, + ] + + +def test_chain_multiple_operations() -> None: # noqa: C901 def add_one(input_iterator: Iterator[int]) -> Iterator[int]: for element in input_iterator: yield element + 1 @@ -109,7 +178,7 @@ def multiply_two(input_iterator: Iterator[int]) -> Iterator[int]: def is_even(value: int) -> bool: return value % 2 == 0 - chain = Chain[int]().flat_map(add_one).filter(is_even).flat_map(multiply_two) + chain = Chain[int]().then(add_one).filter(is_even).then(multiply_two) result = list(chain(iter([0, 1, 2, 3]))) assert result == [4, 8] assert chain.get_counts() == [ diff --git a/tests/core/test_stream.py b/tests/core/test_stream.py index 9224c67..d0d5544 100644 --- a/tests/core/test_stream.py +++ b/tests/core/test_stream.py @@ -89,7 +89,7 @@ def identity(input_iterator: Iterator[int]) -> Iterator[int]: assert result == [0, 1, 2, 3] -def test_stream_flat_map_chain() -> None: +def test_stream_then_chain() -> None: def add_one(input_iterator: Iterator[int]) -> Iterator[int]: for element in input_iterator: yield element + 1 @@ -98,7 +98,21 @@ def multiply_two(input_iterator: Iterator[int]) -> Iterator[int]: for element in input_iterator: yield element * 2 - result = Stream([0, 1, 2, 3]).flat_map(add_one).flat_map(multiply_two).to_list() + result = Stream([0, 1, 2, 3]).then(add_one).then(multiply_two).to_list() + assert result == [2, 4, 6, 8] + + +def test_stream_pipe_chain() -> None: + def add_one(input_iterator: Iterator[int]) -> Iterator[int]: + for element in input_iterator: + yield element + 1 + + def multiply_two(input_iterator: Iterator[int]) -> Iterator[int]: + for element in input_iterator: + yield element * 2 + + stream = Stream([0, 1, 2, 3]) | add_one | multiply_two + result = stream.to_list() assert result == [2, 4, 6, 8] From 8ee8f74bbb56aa9222085a8d4e77aea4cf624cb7 Mon Sep 17 00:00:00 2001 From: simw Date: Tue, 23 Jan 2024 15:04:42 +0000 Subject: [PATCH 4/7] Bump version to 0.3 --- pyproject.toml | 2 +- src/pipedata/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 4b7b952..02be904 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pipedata" -version = "0.2.2" +version = "0.3" description = "Framework for building pipelines for data processing" authors = ["Simon Wicks "] readme = "README.md" diff --git a/src/pipedata/__init__.py b/src/pipedata/__init__.py index 0b8bbe2..d412d09 100644 --- a/src/pipedata/__init__.py +++ b/src/pipedata/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.2.2" +__version__ = "0.3" __all__ = [ "__version__", From be3b11ec0289eed0035ac7dc430a8360ad35a352 Mon Sep 17 00:00:00 2001 From: simw Date: Tue, 23 Jan 2024 15:41:00 +0000 Subject: [PATCH 5/7] Remove functions in chain, move to a more functional approach of modifying the functions --- src/pipedata/core/chain.py | 42 +------------------ src/pipedata/core/{optypes.py => links.py} | 35 ---------------- src/pipedata/core/ops.py | 48 +++++++++++++++++++++ src/pipedata/core/stream.py | 7 ++-- tests/core/test_chain.py | 49 ++++++++-------------- tests/core/test_stream.py | 6 ++- 6 files changed, 75 insertions(+), 112 deletions(-) rename src/pipedata/core/{optypes.py => links.py} (54%) create mode 100644 src/pipedata/core/ops.py diff --git a/src/pipedata/core/chain.py b/src/pipedata/core/chain.py index 9466232..3414b74 100644 --- a/src/pipedata/core/chain.py +++ b/src/pipedata/core/chain.py @@ -8,14 +8,13 @@ Iterator, List, Optional, - Tuple, TypeVar, Union, cast, overload, ) -from .optypes import ChainLink, batched_op, filter_op, one2one_op +from .links import ChainLink TStart = TypeVar("TStart") TEnd = TypeVar("TEnd") @@ -61,18 +60,6 @@ def __call__(self, input_iterator: Iterator[TStart]) -> Iterator[TEnd]: return self._func(self._previous_steps(input_iterator)) # type: ignore - def flat_map( - self, func: Callable[[Iterator[TEnd]], Iterator[TOther]] - ) -> ChainType[TStart, TOther]: - """ - Output zero or more elements from one or more input elements. - - This is a fully general operation, that can arbitrarily transform the - stream of elements. It is the most powerful operation, and all the - other operations are implemented in terms of it. - """ - return self.then(func) - def then( self, func: Callable[[Iterator[TEnd]], Iterator[TOther]] ) -> ChainType[TStart, TOther]: @@ -83,33 +70,6 @@ def __or__( ) -> ChainType[TStart, TOther]: return self.then(func) - def filter( # noqa: A003 - self, func: Callable[[TEnd], bool] - ) -> ChainType[TStart, TEnd]: - """ - Remove elements from the stream that do not pass the filter function. - """ - return self.then(filter_op(func)) - - def map( # noqa: A003 - self, func: Callable[[TEnd], TOther] - ) -> ChainType[TStart, TOther]: - """ - Return a single transformed element from each input element. - """ - return self.then(one2one_op(func)) - - def batched_map( - self, func: Callable[[Tuple[TEnd, ...]], TOther], n: Optional[int] = None - ) -> ChainType[TStart, TOther]: - """ - Return a single transformed element from (up to) n input elements. - - If n is None, then apply the function to all the elements, and return - an iterator of 1 element. - """ - return self.then(batched_op(func, n)) - def get_counts(self) -> List[Dict[str, Any]]: step_counts = [] if self._previous_steps is not None: diff --git a/src/pipedata/core/optypes.py b/src/pipedata/core/links.py similarity index 54% rename from src/pipedata/core/optypes.py rename to src/pipedata/core/links.py index ed8e267..2fdf886 100644 --- a/src/pipedata/core/optypes.py +++ b/src/pipedata/core/links.py @@ -1,5 +1,3 @@ -import functools -import itertools from typing import ( Callable, Generic, @@ -14,12 +12,6 @@ TOther = TypeVar("TOther") -def _batched(iterable: Iterator[TEnd], n: Optional[int]) -> Iterator[Tuple[TEnd, ...]]: - """Can be replaced by itertools.batched once using Python 3.12+.""" - while (elements := tuple(itertools.islice(iterable, n))) != (): - yield elements - - class CountingIterator(Iterator[TStart]): def __init__(self, iterator: Iterator[TStart]) -> None: self._iterator = iterator @@ -63,30 +55,3 @@ def get_counts(self) -> Tuple[int, int]: 0 if self._input is None else self._input.get_count(), 0 if self._output is None else self._output.get_count(), ) - - -class filter_op(ChainLink[TEnd, TEnd]): # noqa: N801 - def __init__(self, func: Callable[[TEnd], bool]): - @functools.wraps(func) - def new_action(previous_step: Iterator[TEnd]) -> Iterator[TEnd]: - return filter(func, previous_step) - - super().__init__(new_action) - - -class one2one_op(ChainLink[TEnd, TOther]): # noqa: N801 - def __init__(self, func: Callable[[TEnd], TOther]): - @functools.wraps(func) - def new_action(previous_step: Iterator[TEnd]) -> Iterator[TOther]: - return map(func, previous_step) - - super().__init__(new_action) - - -class batched_op(ChainLink[TEnd, TOther]): # noqa: N801 - def __init__(self, func: Callable[[Tuple[TEnd, ...]], TOther], n: Optional[int]): - @functools.wraps(func) - def new_action(previous_step: Iterator[TEnd]) -> Iterator[TOther]: - return (func(elements) for elements in _batched(previous_step, n)) - - super().__init__(new_action) diff --git a/src/pipedata/core/ops.py b/src/pipedata/core/ops.py new file mode 100644 index 0000000..5b23c44 --- /dev/null +++ b/src/pipedata/core/ops.py @@ -0,0 +1,48 @@ +import functools +import itertools +from typing import ( + Callable, + Iterator, + Optional, + Tuple, + TypeVar, +) + +from .links import ChainLink + +TStart = TypeVar("TStart") +TEnd = TypeVar("TEnd") +TOther = TypeVar("TOther") + + +def _batched(iterable: Iterator[TEnd], n: Optional[int]) -> Iterator[Tuple[TEnd, ...]]: + """Can be replaced by itertools.batched once using Python 3.12+.""" + while (elements := tuple(itertools.islice(iterable, n))) != (): + yield elements + + +class filtering(ChainLink[TEnd, TEnd]): # noqa: N801 + def __init__(self, func: Callable[[TEnd], bool]): + @functools.wraps(func) + def new_action(previous_step: Iterator[TEnd]) -> Iterator[TEnd]: + return filter(func, previous_step) + + super().__init__(new_action) + + +class mapping(ChainLink[TEnd, TOther]): # noqa: N801 + def __init__(self, func: Callable[[TEnd], TOther]): + @functools.wraps(func) + def new_action(previous_step: Iterator[TEnd]) -> Iterator[TOther]: + return map(func, previous_step) + + super().__init__(new_action) + + +class batching(ChainLink[TEnd, TOther]): # noqa: N801 + def __init__(self, func: Callable[[Tuple[TEnd, ...]], TOther], n: Optional[int]): + @functools.wraps(func) + def new_action(previous_step: Iterator[TEnd]) -> Iterator[TOther]: + return (func(elements) for elements in _batched(previous_step, n)) + + super().__init__(new_action) diff --git a/src/pipedata/core/stream.py b/src/pipedata/core/stream.py index 3710dbb..4031e7d 100644 --- a/src/pipedata/core/stream.py +++ b/src/pipedata/core/stream.py @@ -15,6 +15,7 @@ overload, ) +from . import ops from .chain import Chain, ChainType TStart = TypeVar("TStart") @@ -50,15 +51,15 @@ def __or__( return StreamType(self._items, self._chain.then(func)) def filter(self, func: Callable[[TEnd], bool]) -> StreamType[TEnd]: # noqa: A003 - return StreamType(self._items, self._chain.filter(func)) + return self.then(ops.filtering(func)) def map(self, func: Callable[[TEnd], TNewEnd]) -> StreamType[TNewEnd]: # noqa: A003 - return StreamType(self._items, self._chain.map(func)) + return self.then(ops.mapping(func)) def batched_map( self, func: Callable[[Tuple[TEnd, ...]], TNewEnd], n: Optional[int] = None ) -> StreamType[TNewEnd]: - return StreamType(self._items, self._chain.batched_map(func, n)) + return self.then(ops.batching(func, n)) @overload def reduce(self, func: Callable[[TEnd, TEnd], TEnd]) -> TEnd: diff --git a/tests/core/test_chain.py b/tests/core/test_chain.py index 60505e0..3d452d1 100644 --- a/tests/core/test_chain.py +++ b/tests/core/test_chain.py @@ -1,6 +1,6 @@ from typing import Iterator, Tuple -from pipedata.core import Chain, ChainType +from pipedata.core import Chain, ChainType, ops def test_chain() -> None: @@ -32,16 +32,17 @@ def test_chain_with_wrong_types() -> None: def is_even(value: int) -> bool: return value % 2 == 0 - chain = Chain[str]().filter(is_even) # type: ignore + chain = Chain[str]().then(ops.filtering(is_even)) # type: ignore result = list(chain(iter([1, 2, 3]))) # type: ignore - assert result == [2] # type: ignore + assert result == [2] def test_chain_filter() -> None: + @ops.filtering def is_even(value: int) -> bool: return value % 2 == 0 - chain = Chain[int]().filter(is_even) + chain = Chain[int]().then(is_even) result = list(chain(iter([0, 1, 2, 3]))) assert result == [0, 2] @@ -75,28 +76,15 @@ def is_even(value: int) -> bool: def test_chain_filter_with_none_passing() -> None: + @ops.filtering def is_even(value: int) -> bool: return value % 2 == 0 - chain = Chain[int]().filter(is_even) + chain = Chain[int]().then(is_even) result = list(chain(iter([1, 3, 5]))) assert result == [] -def test_chain_flat_map() -> None: - def add_one(input_iterator: Iterator[int]) -> Iterator[int]: - for element in input_iterator: - yield element + 1 - - chain = Chain[int]().flat_map(add_one) - - result = list(chain(iter([0, 1, 2, 3]))) - assert result == [1, 2, 3, 4] - - result2 = list(chain(iter([2, 3, 4]))) - assert result2 == [3, 4, 5] - - def test_chain_then() -> None: def add_one(input_iterator: Iterator[int]) -> Iterator[int]: for element in input_iterator: @@ -130,16 +118,14 @@ def add_one(input_iterator: Iterator[int]) -> Iterator[int]: for element in input_iterator: yield element + 1 - def multiply_two(input_iterator: Iterator[int]) -> Iterator[int]: - for element in input_iterator: - yield element * 2 + @ops.mapping + def multiply_two(value: int) -> int: + return value * 2 - def is_even(input_iterator: Iterator[int]) -> Iterator[int]: - for element in input_iterator: - if element % 2 == 0: - yield element + def is_even(value: int) -> bool: + return value % 2 == 0 - chain = Chain[int]() | add_one | is_even | multiply_two + chain = Chain[int]() | add_one | ops.filtering(is_even) | multiply_two result = list(chain(iter([0, 1, 2, 3]))) assert result == [4, 8] assert chain.get_counts() == [ @@ -175,10 +161,11 @@ def multiply_two(input_iterator: Iterator[int]) -> Iterator[int]: for element in input_iterator: yield element * 2 + @ops.filtering def is_even(value: int) -> bool: return value % 2 == 0 - chain = Chain[int]().then(add_one).filter(is_even).then(multiply_two) + chain = Chain[int]().then(add_one).then(is_even).then(multiply_two) result = list(chain(iter([0, 1, 2, 3]))) assert result == [4, 8] assert chain.get_counts() == [ @@ -209,13 +196,13 @@ def test_chain_map() -> None: def add_one(value: int) -> int: return value + 1 - chain = Chain[int]().map(add_one) + chain = Chain[int]().then(ops.mapping(add_one)) result = list(chain(iter([0, 1, 2, 3]))) assert result == [1, 2, 3, 4] def test_chain_map_changing_types() -> None: - chain: ChainType[int, str] = Chain[int]().map(str) + chain: ChainType[int, str] = Chain[int]().then(ops.mapping(str)) result = list(chain(iter([0, 1, 2, 3]))) assert result == ["0", "1", "2", "3"] @@ -224,7 +211,7 @@ def test_chain_batched_map() -> None: def add_values(values: Tuple[int, ...]) -> int: return sum(values) - chain = Chain[int]().batched_map(add_values, 2) + chain = Chain[int]().then(ops.batching(add_values, 2)) result = list(chain(iter([0, 1, 2, 3, 4]))) assert result == [1, 5, 4] assert chain.get_counts() == [ diff --git a/tests/core/test_stream.py b/tests/core/test_stream.py index d0d5544..66b8b44 100644 --- a/tests/core/test_stream.py +++ b/tests/core/test_stream.py @@ -1,7 +1,7 @@ from itertools import islice from typing import Iterable, Iterator, List -from pipedata.core import Chain, Stream +from pipedata.core import Chain, Stream, ops def test_stream_to_list() -> None: @@ -41,7 +41,9 @@ def test_stream_is_iterable() -> None: def test_stream_with_a_chain() -> None: - chain = Chain[int]().filter(lambda x: x % 2 == 0) + # mypy gives error on 'cannot infer type argument 1 of filtering + # TODO: what to do with mypy and lambda functions + chain = Chain[int]().then(ops.filtering(lambda x: x % 2 == 0)) # type: ignore result = Stream([0, 1, 2, 3]).flat_map(chain).to_list() assert result == [0, 2] From bccd01163a34accf7a1f883222e660abeb71ce08 Mon Sep 17 00:00:00 2001 From: simw Date: Tue, 23 Jan 2024 15:56:43 +0000 Subject: [PATCH 6/7] Remove .map, .filter etc functions in Stream, moving to a more functional approach --- README.md | 37 ++++++++++++++++++------------------- src/pipedata/core/stream.py | 18 ------------------ tests/core/test_stream.py | 32 ++++++++++++++++++-------------- tests/ops/test_files.py | 14 +++++++------- tests/ops/test_pipeline.py | 10 +++++----- tests/ops/test_records.py | 8 ++++---- tests/ops/test_storage.py | 6 +++--- 7 files changed, 55 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index 6a34ef2..58456df 100644 --- a/README.md +++ b/README.md @@ -47,9 +47,9 @@ with zipfile.ZipFile("test_input.json.zip", "w") as zipped: result = ( Stream(["test_input.json.zip"]) - .flat_map(zipped_files) - .flat_map(json_records()) - .flat_map(parquet_writer("test_output.parquet")) + .then(zipped_files) + .then(json_records()) + .then(parquet_writer("test_output.parquet")) .to_list() ) @@ -69,11 +69,11 @@ from pipedata.ops import json_records, parquet_writer, zipped_files # Running this after input file created in above example chain = ( Chain() - .flat_map(zipped_files) - .flat_map(json_records()) - .flat_map(parquet_writer("test_output_2.parquet")) + .then(zipped_files) + .then(json_records()) + .then(parquet_writer("test_output_2.parquet")) ) -result = Stream(["test_input.json.zip"]).flat_map(chain).to_list() +result = Stream(["test_input.json.zip"]).then(chain).to_list() table = pq.read_table("test_output_2.parquet") print(table.to_pydict()) #> {'col1': [1, 2, 3], 'col2': ['Hello', 'world', '!']} @@ -86,33 +86,34 @@ The core framework provides the building blocks for chaining operations. Running a stream: ```py -from pipedata.core import Stream +from pipedata.core import Stream, ops result = ( Stream(range(10)) - .filter(lambda x: x % 2 == 0) - .map(lambda x: x ^ 2) - .batched_map(lambda x: x, 2) + .then(ops.filtering(lambda x: x % 2 == 0)) + .then(ops.mapping(lambda x: x ^ 2)) + .then(ops.batching(lambda x: x, 2)) .to_list() ) print(result) #> [(2, 0), (6, 4), (10,)] ``` -Creating a chain and then using it: +Creating a chain and then using it, this time using the +pipe notation: ```py import json -from pipedata.core import Chain, Stream, Stream +from pipedata.core import Chain, Stream, ops chain = ( Chain() - .filter(lambda x: x % 2 == 0) - .map(lambda x: x ^ 2) - .batched_map(lambda x: sum(x), 2) + | ops.filtering(lambda x: x % 2 == 0) + | ops.mapping(lambda x: x ^ 2) + | ops.batching(lambda x: sum(x), 2) ) -print(Stream(range(10), chain).to_list()) +print(Stream(range(10)).then(chain).to_list()) #> [2, 10, 10] print(json.dumps(chain.get_counts(), indent=4)) #> [ @@ -137,8 +138,6 @@ print(json.dumps(chain.get_counts(), indent=4)) #> "outputs": 3 #> } #> ] -print(Stream(range(10)).flat_map(chain).to_list()) -#> [2, 10, 10] ``` ## Similar Functionality diff --git a/src/pipedata/core/stream.py b/src/pipedata/core/stream.py index 4031e7d..f4a140a 100644 --- a/src/pipedata/core/stream.py +++ b/src/pipedata/core/stream.py @@ -10,12 +10,10 @@ Iterator, List, Optional, - Tuple, TypeVar, overload, ) -from . import ops from .chain import Chain, ChainType TStart = TypeVar("TStart") @@ -35,11 +33,6 @@ def __iter__(self) -> Iterator[TEnd]: def __next__(self) -> TEnd: return next(self._iter) - def flat_map( - self, func: Callable[[Iterator[TEnd]], Iterator[TNewEnd]] - ) -> StreamType[TNewEnd]: - return StreamType(self._items, self._chain.then(func)) - def then( self, func: Callable[[Iterator[TEnd]], Iterator[TNewEnd]] ) -> StreamType[TNewEnd]: @@ -50,17 +43,6 @@ def __or__( ) -> StreamType[TNewEnd]: return StreamType(self._items, self._chain.then(func)) - def filter(self, func: Callable[[TEnd], bool]) -> StreamType[TEnd]: # noqa: A003 - return self.then(ops.filtering(func)) - - def map(self, func: Callable[[TEnd], TNewEnd]) -> StreamType[TNewEnd]: # noqa: A003 - return self.then(ops.mapping(func)) - - def batched_map( - self, func: Callable[[Tuple[TEnd, ...]], TNewEnd], n: Optional[int] = None - ) -> StreamType[TNewEnd]: - return self.then(ops.batching(func, n)) - @overload def reduce(self, func: Callable[[TEnd, TEnd], TEnd]) -> TEnd: ... diff --git a/tests/core/test_stream.py b/tests/core/test_stream.py index 66b8b44..627bb73 100644 --- a/tests/core/test_stream.py +++ b/tests/core/test_stream.py @@ -45,7 +45,7 @@ def test_stream_with_a_chain() -> None: # TODO: what to do with mypy and lambda functions chain = Chain[int]().then(ops.filtering(lambda x: x % 2 == 0)) # type: ignore - result = Stream([0, 1, 2, 3]).flat_map(chain).to_list() + result = Stream([0, 1, 2, 3]).then(chain).to_list() assert result == [0, 2] @@ -60,34 +60,37 @@ def test_stream_to_list_longer_length() -> None: def test_stream_filter() -> None: + @ops.filtering def is_even(value: int) -> bool: return value % 2 == 0 - result = Stream([0, 1, 2, 3, 4, 5]).filter(is_even).to_list() + result = Stream([0, 1, 2, 3, 4, 5]).then(is_even).to_list() assert result == [0, 2, 4] def test_stream_filter_with_range() -> None: + @ops.filtering def is_even(value: int) -> bool: return value % 2 == 0 - result = Stream(range(10)).filter(is_even).to_list() + result = Stream(range(10)).then(is_even).to_list() assert result == [0, 2, 4, 6, 8] def test_stream_filter_with_none_passing() -> None: + @ops.filtering def is_even(value: int) -> bool: return value % 2 == 0 - result = Stream([1, 3, 5]).filter(is_even).to_list() + result = Stream([1, 3, 5]).then(is_even).to_list() assert result == [] -def test_stream_flat_map_identity() -> None: +def test_stream_then_identity() -> None: def identity(input_iterator: Iterator[int]) -> Iterator[int]: yield from input_iterator - result = Stream([0, 1, 2, 3]).flat_map(identity).to_list() + result = Stream([0, 1, 2, 3]).then(identity).to_list() assert result == [0, 1, 2, 3] @@ -96,9 +99,9 @@ def add_one(input_iterator: Iterator[int]) -> Iterator[int]: for element in input_iterator: yield element + 1 - def multiply_two(input_iterator: Iterator[int]) -> Iterator[int]: - for element in input_iterator: - yield element * 2 + @ops.mapping + def multiply_two(value: int) -> int: + return value * 2 result = Stream([0, 1, 2, 3]).then(add_one).then(multiply_two).to_list() assert result == [2, 4, 6, 8] @@ -124,7 +127,7 @@ def add_element(input_iterator: Iterator[int]) -> Iterator[int]: yield element yield element + 1 - result = Stream([0, 1, 2, 3]).flat_map(add_element).to_list() + result = Stream([0, 1, 2, 3]).then(add_element).to_list() assert result == [0, 1, 1, 2, 2, 3, 3, 4] @@ -133,15 +136,16 @@ def add_two_values(input_iterator: Iterator[int]) -> Iterator[int]: while batch := tuple(islice(input_iterator, 2)): yield sum(batch) - result = Stream([0, 1, 2, 3, 4]).flat_map(add_two_values).to_list() + result = Stream([0, 1, 2, 3, 4]).then(add_two_values).to_list() assert result == [1, 5, 4] def test_stream_map() -> None: + @ops.mapping def add_one(value: int) -> int: return value + 1 - result = Stream([0, 1, 2, 3]).map(add_one).to_list() + result = Stream([0, 1, 2, 3]).then(add_one).to_list() assert result == [1, 2, 3, 4] @@ -164,9 +168,9 @@ def append_values(a: List[int], b: int) -> List[int]: assert result == [0, 1, 2, 3] -def test_stream_batched_map() -> None: +def test_stream_batching() -> None: def add_values(values: Iterable[int]) -> int: return sum(values) - result = Stream([0, 1, 2, 3, 4]).batched_map(add_values, 2).to_list() + result = Stream([0, 1, 2, 3, 4]).then(ops.batching(add_values, 2)).to_list() assert result == [1, 5, 4] diff --git a/tests/ops/test_files.py b/tests/ops/test_files.py index 8e7e87f..2ae64f1 100644 --- a/tests/ops/test_files.py +++ b/tests/ops/test_files.py @@ -5,7 +5,7 @@ import pyarrow as pa # type: ignore import pytest -from pipedata.core import Stream +from pipedata.core import Stream, ops from pipedata.ops.files import FilesReaderError, read_from_parquet, zipped_files @@ -18,7 +18,7 @@ def test_zipped_files() -> None: zip_file.writestr("test2.txt", "Hello, world 2!") zip_file.writestr("test3.txt", "Hello, world 3!") - result = Stream([str(zip_path)]).flat_map(zipped_files).to_list() + result = Stream([str(zip_path)]).then(zipped_files).to_list() assert result[0].name == "test.txt" assert result[1].name == "test2.txt" @@ -36,8 +36,8 @@ def test_zipped_file_contents() -> None: result = ( Stream([str(zip_path)]) - .flat_map(zipped_files) - .map(lambda x: x.contents.read().decode("utf-8")) + .then(zipped_files) + .then(ops.mapping(lambda x: x.contents.read().decode("utf-8"))) # type: ignore # TODO .to_list() ) @@ -62,7 +62,7 @@ def test_parquet_reading_simple() -> None: pa.parquet.write_table(table, parquet_path) parquet_reader = read_from_parquet() - result = Stream([str(parquet_path)]).flat_map(parquet_reader).to_list() + result = Stream([str(parquet_path)]).then(parquet_reader).to_list() expected = [ {"a": 1, "b": 4}, @@ -85,7 +85,7 @@ def test_parquet_reading_with_columns() -> None: pa.parquet.write_table(table, parquet_path) parquet_reader = read_from_parquet(columns=["a"]) - result = Stream([str(parquet_path)]).flat_map(parquet_reader).to_list() + result = Stream([str(parquet_path)]).then(parquet_reader).to_list() expected = [ {"a": 1}, @@ -108,7 +108,7 @@ def test_parquet_reading_record_batch() -> None: pa.parquet.write_table(table, parquet_path) parquet_reader = read_from_parquet(columns=["a"], return_as="recordbatch") - result = Stream([str(parquet_path)]).flat_map(parquet_reader).to_list() + result = Stream([str(parquet_path)]).then(parquet_reader).to_list() schema = pa.schema([("a", pa.int64())]) a_array = pa.array([1, 2, 3]) diff --git a/tests/ops/test_pipeline.py b/tests/ops/test_pipeline.py index ccb90b9..3c69a87 100644 --- a/tests/ops/test_pipeline.py +++ b/tests/ops/test_pipeline.py @@ -5,7 +5,7 @@ import pyarrow.parquet as pq # type: ignore -from pipedata.core import Stream +from pipedata.core import Stream, ops from pipedata.ops import json_records, parquet_writer, zipped_files @@ -34,10 +34,10 @@ def test_zipped_files() -> None: result = ( Stream([str(zip_path)]) - .flat_map(zipped_files) - .map(lambda x: x.contents) - .flat_map(json_records()) - .flat_map(parquet_writer(str(output_path))) + .then(zipped_files) + .then(ops.mapping(lambda x: x.contents)) # type: ignore # TODO + .then(json_records()) + .then(parquet_writer(str(output_path))) .to_list() ) diff --git a/tests/ops/test_records.py b/tests/ops/test_records.py index 4376bbd..779a318 100644 --- a/tests/ops/test_records.py +++ b/tests/ops/test_records.py @@ -13,7 +13,7 @@ def test_json_records() -> None: file1 = io.BytesIO(json.dumps(json1).encode("utf-8")) file2 = io.BytesIO(json.dumps(json2).encode("utf-8")) - result = Stream([file1, file2]).flat_map(json_records()).to_list() + result = Stream([file1, file2]).then(json_records()).to_list() expected = json1 + json2 assert result == expected @@ -30,7 +30,7 @@ def test_json_records_from_file_ref() -> None: OpenedFileRef(name="test2.json", contents=file2), ] - result = Stream(file_refs).flat_map(json_records()).to_list() + result = Stream(file_refs).then(json_records()).to_list() expected = json1 + json2 assert result == expected @@ -42,7 +42,7 @@ def test_csv_records() -> None: file1 = io.BytesIO(csv1.encode("utf-8")) file2 = io.BytesIO(csv2.encode("utf-8")) - result = Stream([file1, file2]).flat_map(csv_records()).to_list() + result = Stream([file1, file2]).then(csv_records()).to_list() expected = [ {"a": "1", "b": "2"}, {"a": "3", "b": "4"}, @@ -64,7 +64,7 @@ def test_csv_records_from_file_ref() -> None: OpenedFileRef(name="test2.csv", contents=file2), ] - result = Stream(file_refs).flat_map(csv_records()).to_list() + result = Stream(file_refs).then(csv_records()).to_list() expected = [ {"a": "1", "b": "2"}, {"a": "3", "b": "4"}, diff --git a/tests/ops/test_storage.py b/tests/ops/test_storage.py index 08d4176..8ea8bfa 100644 --- a/tests/ops/test_storage.py +++ b/tests/ops/test_storage.py @@ -20,7 +20,7 @@ def test_parquet_simple_storage() -> None: temp_path = Path(tmpdir) output_path = temp_path / "test.parquet" - result = Stream(items).flat_map(parquet_writer(str(output_path))).to_list() + result = Stream(items).then(parquet_writer(str(output_path))).to_list() assert result == [str(output_path)] @@ -48,7 +48,7 @@ def test_parquet_batched_storage() -> None: result = ( Stream(items) - .flat_map(parquet_writer(str(output_path), row_group_length=2)) + .then(parquet_writer(str(output_path), row_group_length=2)) .to_list() ) @@ -78,7 +78,7 @@ def test_parquet_multiple_files() -> None: result = ( Stream(items) - .flat_map(parquet_writer(str(output_path), max_file_length=2)) + .then(parquet_writer(str(output_path), max_file_length=2)) .to_list() ) From f96c4c3662a68ab869549fc73f4d4e8b94ac6eef Mon Sep 17 00:00:00 2001 From: simw Date: Tue, 23 Jan 2024 15:59:21 +0000 Subject: [PATCH 7/7] Rename batching to batched --- README.md | 4 ++-- src/pipedata/core/ops.py | 2 +- tests/core/test_chain.py | 2 +- tests/core/test_stream.py | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 58456df..a4e42b9 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ result = ( Stream(range(10)) .then(ops.filtering(lambda x: x % 2 == 0)) .then(ops.mapping(lambda x: x ^ 2)) - .then(ops.batching(lambda x: x, 2)) + .then(ops.batched(lambda x: x, 2)) .to_list() ) print(result) @@ -111,7 +111,7 @@ chain = ( Chain() | ops.filtering(lambda x: x % 2 == 0) | ops.mapping(lambda x: x ^ 2) - | ops.batching(lambda x: sum(x), 2) + | ops.batched(lambda x: sum(x), 2) ) print(Stream(range(10)).then(chain).to_list()) #> [2, 10, 10] diff --git a/src/pipedata/core/ops.py b/src/pipedata/core/ops.py index 5b23c44..acdbd7c 100644 --- a/src/pipedata/core/ops.py +++ b/src/pipedata/core/ops.py @@ -39,7 +39,7 @@ def new_action(previous_step: Iterator[TEnd]) -> Iterator[TOther]: super().__init__(new_action) -class batching(ChainLink[TEnd, TOther]): # noqa: N801 +class batched(ChainLink[TEnd, TOther]): # noqa: N801 def __init__(self, func: Callable[[Tuple[TEnd, ...]], TOther], n: Optional[int]): @functools.wraps(func) def new_action(previous_step: Iterator[TEnd]) -> Iterator[TOther]: diff --git a/tests/core/test_chain.py b/tests/core/test_chain.py index 3d452d1..052152d 100644 --- a/tests/core/test_chain.py +++ b/tests/core/test_chain.py @@ -211,7 +211,7 @@ def test_chain_batched_map() -> None: def add_values(values: Tuple[int, ...]) -> int: return sum(values) - chain = Chain[int]().then(ops.batching(add_values, 2)) + chain = Chain[int]().then(ops.batched(add_values, 2)) result = list(chain(iter([0, 1, 2, 3, 4]))) assert result == [1, 5, 4] assert chain.get_counts() == [ diff --git a/tests/core/test_stream.py b/tests/core/test_stream.py index 627bb73..0509e21 100644 --- a/tests/core/test_stream.py +++ b/tests/core/test_stream.py @@ -168,9 +168,9 @@ def append_values(a: List[int], b: int) -> List[int]: assert result == [0, 1, 2, 3] -def test_stream_batching() -> None: +def test_stream_batched() -> None: def add_values(values: Iterable[int]) -> int: return sum(values) - result = Stream([0, 1, 2, 3, 4]).then(ops.batching(add_values, 2)).to_list() + result = Stream([0, 1, 2, 3, 4]).then(ops.batched(add_values, 2)).to_list() assert result == [1, 5, 4]