From 8879481339501ef41ec23657c04c52a479143b54 Mon Sep 17 00:00:00 2001 From: Golo Roden Date: Thu, 13 Nov 2025 01:52:45 +0100 Subject: [PATCH 1/6] feat: add pandas DataFrame integration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add support for converting event streams to pandas DataFrames for data analysis and exploration. The events_to_dataframe() function accepts an AsyncGenerator of events and returns a DataFrame with all event fields as columns. - Add eventsourcingdb.pandas module with events_to_dataframe() function - Add comprehensive test suite following TDD approach - Add pandas as optional dependency group - Update README with usage examples and flattening guide 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- README.md | 54 +++++++ eventsourcingdb/pandas.py | 43 ++++++ pyproject.toml | 4 + tests/test_events_to_dataframe.py | 248 ++++++++++++++++++++++++++++++ 4 files changed, 349 insertions(+) create mode 100644 eventsourcingdb/pandas.py create mode 100644 tests/test_events_to_dataframe.py diff --git a/README.md b/README.md index 2b4d49f..14485ae 100644 --- a/README.md +++ b/README.md @@ -314,6 +314,60 @@ async for row in rows: await rows.aclose() ``` +### Converting Events to pandas DataFrame + +For data analysis and exploration, you can convert event streams to pandas DataFrames. To use this feature, install the client SDK with pandas support: + +```shell +pip install eventsourcingdb[pandas] +``` + +Import the `events_to_dataframe` function from the `eventsourcingdb.pandas` module and pass an event stream to it: + +```python +from eventsourcingdb import Client, ReadEventsOptions +from eventsourcingdb.pandas import events_to_dataframe + +events = client.read_events( + subject = '/books', + options = ReadEventsOptions( + recursive = True + ), +) + +df = await events_to_dataframe(events) +``` + +The resulting DataFrame includes all event fields as columns: `event_id`, `time`, `source`, `subject`, `type`, `data`, `spec_version`, `data_content_type`, `predecessor_hash`, `hash`, `trace_parent`, `trace_state`, and `signature`. + +The `data` field remains as a dict column, which you can access directly: + +```python +# Access the data field +for index, row in df.iterrows(): + print(row['data']) +``` + +#### Flattening the Data Field + +For analysis of specific event types, you may want to flatten the `data` field into separate columns. Use pandas' `json_normalize` function: + +```python +import pandas as pd + +# Filter for a specific event type first +book_acquired_df = df[df['type'] == 'io.eventsourcingdb.library.book-acquired'] + +# Flatten the data field +flattened_df = book_acquired_df.join( + pd.json_normalize(book_acquired_df['data']) +) + +# Now you can access data fields as columns +print(flattened_df['title']) +print(flattened_df['author']) +``` + ### Observing Events To observe all events of a subject, call the `observe_events` function with the subject as the first argument and an options object as the second argument. Set the `recursive` option to `False`. This ensures that only events of the given subject are returned, not events of nested subjects. diff --git a/eventsourcingdb/pandas.py b/eventsourcingdb/pandas.py new file mode 100644 index 0000000..376f8d2 --- /dev/null +++ b/eventsourcingdb/pandas.py @@ -0,0 +1,43 @@ +from typing import AsyncGenerator + +import pandas as pd + +from .event.event import Event + +__all__ = ["events_to_dataframe"] + + +async def events_to_dataframe(events: AsyncGenerator[Event, None]) -> pd.DataFrame: + """ + Convert an async stream of events to a pandas DataFrame. + + All event fields are included as columns. The 'data' field remains + as a dict column - use pd.json_normalize() for flattening if needed. + + Args: + events: An async generator of Event objects + + Returns: + A pandas DataFrame with all event fields as columns + """ + event_list = [] + + async for event in events: + event_dict = { + "event_id": event.event_id, + "time": event.time, + "source": event.source, + "subject": event.subject, + "type": event.type, + "data": event.data, + "spec_version": event.spec_version, + "data_content_type": event.data_content_type, + "predecessor_hash": event.predecessor_hash, + "hash": event.hash, + "trace_parent": event.trace_parent, + "trace_state": event.trace_state, + "signature": event.signature, + } + event_list.append(event_dict) + + return pd.DataFrame(event_list) diff --git a/pyproject.toml b/pyproject.toml index c87c868..f7018d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,10 @@ dev = [ "bandit==1.8.6", "pyright==1.1.407", "twine==6.2.0", + "pandas>=2.0.0", +] +pandas = [ + "pandas>=2.0.0", ] [build-system] diff --git a/tests/test_events_to_dataframe.py b/tests/test_events_to_dataframe.py new file mode 100644 index 0000000..815c0c3 --- /dev/null +++ b/tests/test_events_to_dataframe.py @@ -0,0 +1,248 @@ +from datetime import datetime + +import pandas as pd +import pytest + +from eventsourcingdb import EventCandidate, ReadEventsOptions +from eventsourcingdb.pandas import events_to_dataframe + +from .conftest import TestData +from .shared.database import Database + + +class TestEventsToDataframe: + @staticmethod + @pytest.mark.asyncio + async def test_returns_empty_dataframe_for_empty_event_stream( + database: Database, + ) -> None: + client = database.get_client() + + events = client.read_events("/nonexistent", ReadEventsOptions(recursive=False)) + df = await events_to_dataframe(events) + + assert isinstance(df, pd.DataFrame) + assert len(df) == 0 + assert list(df.columns) == [ + "event_id", + "time", + "source", + "subject", + "type", + "data", + "spec_version", + "data_content_type", + "predecessor_hash", + "hash", + "trace_parent", + "trace_state", + "signature", + ] + + @staticmethod + @pytest.mark.asyncio + async def test_returns_dataframe_with_single_event( + database: Database, test_data: TestData + ) -> None: + client = database.get_client() + + await client.write_events( + [ + EventCandidate( + source=test_data.TEST_SOURCE_STRING, + subject=test_data.REGISTERED_SUBJECT, + type=test_data.REGISTERED_TYPE, + data=test_data.JANE_DATA, + trace_parent=test_data.TRACE_PARENT_1, + ) + ] + ) + + events = client.read_events( + test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False) + ) + df = await events_to_dataframe(events) + + assert isinstance(df, pd.DataFrame) + assert len(df) == 1 + + assert df.iloc[0]["source"] == test_data.TEST_SOURCE_STRING + assert df.iloc[0]["subject"] == test_data.REGISTERED_SUBJECT + assert df.iloc[0]["type"] == test_data.REGISTERED_TYPE + assert df.iloc[0]["data"] == test_data.JANE_DATA + assert df.iloc[0]["trace_parent"] == test_data.TRACE_PARENT_1 + + @staticmethod + @pytest.mark.asyncio + async def test_returns_dataframe_with_multiple_events( + prepared_database: Database, test_data: TestData + ) -> None: + client = prepared_database.get_client() + + events = client.read_events("/users", ReadEventsOptions(recursive=True)) + df = await events_to_dataframe(events) + + assert isinstance(df, pd.DataFrame) + expected_event_count = 4 + assert len(df) == expected_event_count + + assert df.iloc[0]["data"] == test_data.JANE_DATA + assert df.iloc[1]["data"] == test_data.JANE_DATA + assert df.iloc[2]["data"] == test_data.JOHN_DATA + assert df.iloc[3]["data"] == test_data.JOHN_DATA + + @staticmethod + @pytest.mark.asyncio + async def test_dataframe_has_correct_column_names( + database: Database, test_data: TestData + ) -> None: + client = database.get_client() + + await client.write_events( + [ + EventCandidate( + source=test_data.TEST_SOURCE_STRING, + subject=test_data.REGISTERED_SUBJECT, + type=test_data.REGISTERED_TYPE, + data=test_data.JANE_DATA, + ) + ] + ) + + events = client.read_events( + test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False) + ) + df = await events_to_dataframe(events) + + expected_columns = [ + "event_id", + "time", + "source", + "subject", + "type", + "data", + "spec_version", + "data_content_type", + "predecessor_hash", + "hash", + "trace_parent", + "trace_state", + "signature", + ] + assert list(df.columns) == expected_columns + + @staticmethod + @pytest.mark.asyncio + async def test_data_field_remains_as_dict( + database: Database, test_data: TestData + ) -> None: + client = database.get_client() + + await client.write_events( + [ + EventCandidate( + source=test_data.TEST_SOURCE_STRING, + subject=test_data.REGISTERED_SUBJECT, + type=test_data.REGISTERED_TYPE, + data=test_data.JANE_DATA, + ) + ] + ) + + events = client.read_events( + test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False) + ) + df = await events_to_dataframe(events) + + assert isinstance(df.iloc[0]["data"], dict) + assert df.iloc[0]["data"] == test_data.JANE_DATA + + @staticmethod + @pytest.mark.asyncio + async def test_time_field_is_datetime_object( + database: Database, test_data: TestData + ) -> None: + client = database.get_client() + + await client.write_events( + [ + EventCandidate( + source=test_data.TEST_SOURCE_STRING, + subject=test_data.REGISTERED_SUBJECT, + type=test_data.REGISTERED_TYPE, + data=test_data.JANE_DATA, + ) + ] + ) + + events = client.read_events( + test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False) + ) + df = await events_to_dataframe(events) + + assert isinstance(df.iloc[0]["time"], datetime) + + @staticmethod + @pytest.mark.asyncio + async def test_optional_fields_can_be_none( + database: Database, test_data: TestData + ) -> None: + client = database.get_client() + + await client.write_events( + [ + EventCandidate( + source=test_data.TEST_SOURCE_STRING, + subject=test_data.REGISTERED_SUBJECT, + type=test_data.REGISTERED_TYPE, + data=test_data.JANE_DATA, + ) + ] + ) + + events = client.read_events( + test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False) + ) + df = await events_to_dataframe(events) + + assert df.iloc[0]["trace_parent"] is None or pd.isna(df.iloc[0]["trace_parent"]) + assert df.iloc[0]["trace_state"] is None or pd.isna(df.iloc[0]["trace_state"]) + assert df.iloc[0]["signature"] is None or pd.isna(df.iloc[0]["signature"]) + + @staticmethod + @pytest.mark.asyncio + async def test_all_event_fields_are_present( + database: Database, test_data: TestData + ) -> None: + client = database.get_client() + + await client.write_events( + [ + EventCandidate( + source=test_data.TEST_SOURCE_STRING, + subject=test_data.REGISTERED_SUBJECT, + type=test_data.REGISTERED_TYPE, + data=test_data.JANE_DATA, + trace_parent=test_data.TRACE_PARENT_1, + ) + ] + ) + + events = client.read_events( + test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False) + ) + df = await events_to_dataframe(events) + + row = df.iloc[0] + + assert isinstance(row["event_id"], str) + assert isinstance(row["time"], datetime) + assert row["source"] == test_data.TEST_SOURCE_STRING + assert row["subject"] == test_data.REGISTERED_SUBJECT + assert row["type"] == test_data.REGISTERED_TYPE + assert row["data"] == test_data.JANE_DATA + assert isinstance(row["spec_version"], str) + assert isinstance(row["data_content_type"], str) + assert isinstance(row["predecessor_hash"], str) + assert isinstance(row["hash"], str) + assert row["trace_parent"] == test_data.TRACE_PARENT_1 From e3e593fe0455ddf93def04ee7eb37d641f78ce40 Mon Sep 17 00:00:00 2001 From: Golo Roden Date: Thu, 13 Nov 2025 01:55:03 +0100 Subject: [PATCH 2/6] fix: ensure DataFrame has correct columns for empty event streams MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When converting an empty event stream, the DataFrame now includes all expected columns instead of being completely empty. This is achieved by explicitly defining the columns parameter when creating the DataFrame. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- eventsourcingdb/pandas.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/eventsourcingdb/pandas.py b/eventsourcingdb/pandas.py index 376f8d2..7f233dd 100644 --- a/eventsourcingdb/pandas.py +++ b/eventsourcingdb/pandas.py @@ -20,6 +20,22 @@ async def events_to_dataframe(events: AsyncGenerator[Event, None]) -> pd.DataFra Returns: A pandas DataFrame with all event fields as columns """ + columns = [ + "event_id", + "time", + "source", + "subject", + "type", + "data", + "spec_version", + "data_content_type", + "predecessor_hash", + "hash", + "trace_parent", + "trace_state", + "signature", + ] + event_list = [] async for event in events: @@ -40,4 +56,4 @@ async def events_to_dataframe(events: AsyncGenerator[Event, None]) -> pd.DataFra } event_list.append(event_dict) - return pd.DataFrame(event_list) + return pd.DataFrame(event_list, columns=columns) From 92ede4758a545aa2b22cc63d6f87f9c302eaf9ce Mon Sep 17 00:00:00 2001 From: Golo Roden Date: Thu, 13 Nov 2025 01:56:31 +0100 Subject: [PATCH 3/6] fix: resolve type checking error for DataFrame columns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use conditional logic to create DataFrame with columns parameter only when the event list is empty, avoiding type checking issues with pyright while maintaining correct behavior. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- eventsourcingdb/pandas.py | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/eventsourcingdb/pandas.py b/eventsourcingdb/pandas.py index 7f233dd..5981a8f 100644 --- a/eventsourcingdb/pandas.py +++ b/eventsourcingdb/pandas.py @@ -20,22 +20,6 @@ async def events_to_dataframe(events: AsyncGenerator[Event, None]) -> pd.DataFra Returns: A pandas DataFrame with all event fields as columns """ - columns = [ - "event_id", - "time", - "source", - "subject", - "type", - "data", - "spec_version", - "data_content_type", - "predecessor_hash", - "hash", - "trace_parent", - "trace_state", - "signature", - ] - event_list = [] async for event in events: @@ -56,4 +40,23 @@ async def events_to_dataframe(events: AsyncGenerator[Event, None]) -> pd.DataFra } event_list.append(event_dict) - return pd.DataFrame(event_list, columns=columns) + if len(event_list) == 0: + return pd.DataFrame( + columns=[ + "event_id", + "time", + "source", + "subject", + "type", + "data", + "spec_version", + "data_content_type", + "predecessor_hash", + "hash", + "trace_parent", + "trace_state", + "signature", + ] + ) + + return pd.DataFrame(event_list) From 52650ce1cc58cae579dbae65a228a20b929862fa Mon Sep 17 00:00:00 2001 From: Golo Roden Date: Thu, 13 Nov 2025 01:57:44 +0100 Subject: [PATCH 4/6] fix: add type ignore for pandas DataFrame columns parameter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add type: ignore annotation for the columns parameter when creating an empty DataFrame, as pyright's pandas stubs don't recognize list[str] as a valid type despite it working correctly at runtime. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- eventsourcingdb/pandas.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventsourcingdb/pandas.py b/eventsourcingdb/pandas.py index 5981a8f..f6b3160 100644 --- a/eventsourcingdb/pandas.py +++ b/eventsourcingdb/pandas.py @@ -41,7 +41,7 @@ async def events_to_dataframe(events: AsyncGenerator[Event, None]) -> pd.DataFra event_list.append(event_dict) if len(event_list) == 0: - return pd.DataFrame( + return pd.DataFrame( # type: ignore[call-overload] columns=[ "event_id", "time", From 4bddd458e51ad5819e1447f9491ab936fc8fe053 Mon Sep 17 00:00:00 2001 From: Golo Roden Date: Thu, 13 Nov 2025 01:59:04 +0100 Subject: [PATCH 5/6] fix: use pd.Index for DataFrame columns to satisfy type checker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Set DataFrame columns using pd.Index after creation instead of passing columns parameter to constructor, resolving pyright type checking errors while maintaining correct functionality. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- eventsourcingdb/pandas.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/eventsourcingdb/pandas.py b/eventsourcingdb/pandas.py index f6b3160..962124a 100644 --- a/eventsourcingdb/pandas.py +++ b/eventsourcingdb/pandas.py @@ -40,23 +40,23 @@ async def events_to_dataframe(events: AsyncGenerator[Event, None]) -> pd.DataFra } event_list.append(event_dict) + df = pd.DataFrame(event_list) + if len(event_list) == 0: - return pd.DataFrame( # type: ignore[call-overload] - columns=[ - "event_id", - "time", - "source", - "subject", - "type", - "data", - "spec_version", - "data_content_type", - "predecessor_hash", - "hash", - "trace_parent", - "trace_state", - "signature", - ] - ) - - return pd.DataFrame(event_list) + df.columns = pd.Index([ + "event_id", + "time", + "source", + "subject", + "type", + "data", + "spec_version", + "data_content_type", + "predecessor_hash", + "hash", + "trace_parent", + "trace_state", + "signature", + ]) + + return df From 744d98c39ac0b2aba5df906e25980715106321cc Mon Sep 17 00:00:00 2001 From: Golo Roden Date: Thu, 13 Nov 2025 02:00:24 +0100 Subject: [PATCH 6/6] fix: use pyright-specific ignore for DataFrame columns parameter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Revert to using columns parameter with correct pyright ignore syntax (reportArgumentType) instead of attempting to set columns property after creation, which fails for empty DataFrames. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- eventsourcingdb/pandas.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/eventsourcingdb/pandas.py b/eventsourcingdb/pandas.py index 962124a..37a6348 100644 --- a/eventsourcingdb/pandas.py +++ b/eventsourcingdb/pandas.py @@ -40,23 +40,23 @@ async def events_to_dataframe(events: AsyncGenerator[Event, None]) -> pd.DataFra } event_list.append(event_dict) - df = pd.DataFrame(event_list) - if len(event_list) == 0: - df.columns = pd.Index([ - "event_id", - "time", - "source", - "subject", - "type", - "data", - "spec_version", - "data_content_type", - "predecessor_hash", - "hash", - "trace_parent", - "trace_state", - "signature", - ]) - - return df + return pd.DataFrame( + columns=[ # pyright: ignore[reportArgumentType] + "event_id", + "time", + "source", + "subject", + "type", + "data", + "spec_version", + "data_content_type", + "predecessor_hash", + "hash", + "trace_parent", + "trace_state", + "signature", + ] + ) + + return pd.DataFrame(event_list)