diff --git a/README.md b/README.md index 2b4d49f..14485ae 100644 --- a/README.md +++ b/README.md @@ -314,6 +314,60 @@ async for row in rows: await rows.aclose() ``` +### Converting Events to pandas DataFrame + +For data analysis and exploration, you can convert event streams to pandas DataFrames. To use this feature, install the client SDK with pandas support: + +```shell +pip install eventsourcingdb[pandas] +``` + +Import the `events_to_dataframe` function from the `eventsourcingdb.pandas` module and pass an event stream to it: + +```python +from eventsourcingdb import Client, ReadEventsOptions +from eventsourcingdb.pandas import events_to_dataframe + +events = client.read_events( + subject = '/books', + options = ReadEventsOptions( + recursive = True + ), +) + +df = await events_to_dataframe(events) +``` + +The resulting DataFrame includes all event fields as columns: `event_id`, `time`, `source`, `subject`, `type`, `data`, `spec_version`, `data_content_type`, `predecessor_hash`, `hash`, `trace_parent`, `trace_state`, and `signature`. + +The `data` field remains as a dict column, which you can access directly: + +```python +# Access the data field +for index, row in df.iterrows(): + print(row['data']) +``` + +#### Flattening the Data Field + +For analysis of specific event types, you may want to flatten the `data` field into separate columns. Use pandas' `json_normalize` function: + +```python +import pandas as pd + +# Filter for a specific event type first +book_acquired_df = df[df['type'] == 'io.eventsourcingdb.library.book-acquired'] + +# Flatten the data field +flattened_df = book_acquired_df.join( + pd.json_normalize(book_acquired_df['data']) +) + +# Now you can access data fields as columns +print(flattened_df['title']) +print(flattened_df['author']) +``` + ### Observing Events To observe all events of a subject, call the `observe_events` function with the subject as the first argument and an options object as the second argument. Set the `recursive` option to `False`. This ensures that only events of the given subject are returned, not events of nested subjects. diff --git a/eventsourcingdb/pandas.py b/eventsourcingdb/pandas.py new file mode 100644 index 0000000..37a6348 --- /dev/null +++ b/eventsourcingdb/pandas.py @@ -0,0 +1,62 @@ +from typing import AsyncGenerator + +import pandas as pd + +from .event.event import Event + +__all__ = ["events_to_dataframe"] + + +async def events_to_dataframe(events: AsyncGenerator[Event, None]) -> pd.DataFrame: + """ + Convert an async stream of events to a pandas DataFrame. + + All event fields are included as columns. The 'data' field remains + as a dict column - use pd.json_normalize() for flattening if needed. + + Args: + events: An async generator of Event objects + + Returns: + A pandas DataFrame with all event fields as columns + """ + event_list = [] + + async for event in events: + event_dict = { + "event_id": event.event_id, + "time": event.time, + "source": event.source, + "subject": event.subject, + "type": event.type, + "data": event.data, + "spec_version": event.spec_version, + "data_content_type": event.data_content_type, + "predecessor_hash": event.predecessor_hash, + "hash": event.hash, + "trace_parent": event.trace_parent, + "trace_state": event.trace_state, + "signature": event.signature, + } + event_list.append(event_dict) + + if len(event_list) == 0: + return pd.DataFrame( + columns=[ # pyright: ignore[reportArgumentType] + "event_id", + "time", + "source", + "subject", + "type", + "data", + "spec_version", + "data_content_type", + "predecessor_hash", + "hash", + "trace_parent", + "trace_state", + "signature", + ] + ) + + return pd.DataFrame(event_list) diff --git a/pyproject.toml b/pyproject.toml index 1a613ac..6bf402d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,10 @@ dev = [ "bandit==1.8.6", "pyright==1.1.407", "twine==6.2.0", + "pandas>=2.0.0", +] +pandas = [ + "pandas>=2.0.0", ] [build-system] diff --git a/tests/test_events_to_dataframe.py b/tests/test_events_to_dataframe.py new file mode 100644 index 0000000..815c0c3 --- /dev/null +++ b/tests/test_events_to_dataframe.py @@ -0,0 +1,248 @@ +from datetime import datetime + +import pandas as pd +import pytest + +from eventsourcingdb import EventCandidate, ReadEventsOptions +from eventsourcingdb.pandas import events_to_dataframe + +from .conftest import TestData +from .shared.database import Database + + +class TestEventsToDataframe: + @staticmethod + @pytest.mark.asyncio + async def test_returns_empty_dataframe_for_empty_event_stream( + database: Database, + ) -> None: + client = database.get_client() + + events = client.read_events("/nonexistent", ReadEventsOptions(recursive=False)) + df = await events_to_dataframe(events) + + assert isinstance(df, pd.DataFrame) + assert len(df) == 0 + assert list(df.columns) == [ + "event_id", + "time", + "source", + "subject", + "type", + "data", + "spec_version", + "data_content_type", + "predecessor_hash", + "hash", + "trace_parent", + "trace_state", + "signature", + ] + + @staticmethod + @pytest.mark.asyncio + async def test_returns_dataframe_with_single_event( + database: Database, test_data: TestData + ) -> None: + client = database.get_client() + + await client.write_events( + [ + EventCandidate( + source=test_data.TEST_SOURCE_STRING, + subject=test_data.REGISTERED_SUBJECT, + type=test_data.REGISTERED_TYPE, + data=test_data.JANE_DATA, + trace_parent=test_data.TRACE_PARENT_1, + ) + ] + ) + + events = client.read_events( + test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False) + ) + df = await events_to_dataframe(events) + + assert isinstance(df, pd.DataFrame) + assert len(df) == 1 + + assert df.iloc[0]["source"] == test_data.TEST_SOURCE_STRING + assert df.iloc[0]["subject"] == test_data.REGISTERED_SUBJECT + assert df.iloc[0]["type"] == test_data.REGISTERED_TYPE + assert df.iloc[0]["data"] == test_data.JANE_DATA + assert df.iloc[0]["trace_parent"] == test_data.TRACE_PARENT_1 + + @staticmethod + @pytest.mark.asyncio + async def test_returns_dataframe_with_multiple_events( + prepared_database: Database, test_data: TestData + ) -> None: + client = prepared_database.get_client() + + events = client.read_events("/users", ReadEventsOptions(recursive=True)) + df = await events_to_dataframe(events) + + assert isinstance(df, pd.DataFrame) + expected_event_count = 4 + assert len(df) == expected_event_count + + assert df.iloc[0]["data"] == test_data.JANE_DATA + assert df.iloc[1]["data"] == test_data.JANE_DATA + assert df.iloc[2]["data"] == test_data.JOHN_DATA + assert df.iloc[3]["data"] == test_data.JOHN_DATA + + @staticmethod + @pytest.mark.asyncio + async def test_dataframe_has_correct_column_names( + database: Database, test_data: TestData + ) -> None: + client = database.get_client() + + await client.write_events( + [ + EventCandidate( + source=test_data.TEST_SOURCE_STRING, + subject=test_data.REGISTERED_SUBJECT, + type=test_data.REGISTERED_TYPE, + data=test_data.JANE_DATA, + ) + ] + ) + + events = client.read_events( + test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False) + ) + df = await events_to_dataframe(events) + + expected_columns = [ + "event_id", + "time", + "source", + "subject", + "type", + "data", + "spec_version", + "data_content_type", + "predecessor_hash", + "hash", + "trace_parent", + "trace_state", + "signature", + ] + assert list(df.columns) == expected_columns + + @staticmethod + @pytest.mark.asyncio + async def test_data_field_remains_as_dict( + database: Database, test_data: TestData + ) -> None: + client = database.get_client() + + await client.write_events( + [ + EventCandidate( + source=test_data.TEST_SOURCE_STRING, + subject=test_data.REGISTERED_SUBJECT, + type=test_data.REGISTERED_TYPE, + data=test_data.JANE_DATA, + ) + ] + ) + + events = client.read_events( + test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False) + ) + df = await events_to_dataframe(events) + + assert isinstance(df.iloc[0]["data"], dict) + assert df.iloc[0]["data"] == test_data.JANE_DATA + + @staticmethod + @pytest.mark.asyncio + async def test_time_field_is_datetime_object( + database: Database, test_data: TestData + ) -> None: + client = database.get_client() + + await client.write_events( + [ + EventCandidate( + source=test_data.TEST_SOURCE_STRING, + subject=test_data.REGISTERED_SUBJECT, + type=test_data.REGISTERED_TYPE, + data=test_data.JANE_DATA, + ) + ] + ) + + events = client.read_events( + test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False) + ) + df = await events_to_dataframe(events) + + assert isinstance(df.iloc[0]["time"], datetime) + + @staticmethod + @pytest.mark.asyncio + async def test_optional_fields_can_be_none( + database: Database, test_data: TestData + ) -> None: + client = database.get_client() + + await client.write_events( + [ + EventCandidate( + source=test_data.TEST_SOURCE_STRING, + subject=test_data.REGISTERED_SUBJECT, + type=test_data.REGISTERED_TYPE, + data=test_data.JANE_DATA, + ) + ] + ) + + events = client.read_events( + test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False) + ) + df = await events_to_dataframe(events) + + assert df.iloc[0]["trace_parent"] is None or pd.isna(df.iloc[0]["trace_parent"]) + assert df.iloc[0]["trace_state"] is None or pd.isna(df.iloc[0]["trace_state"]) + assert df.iloc[0]["signature"] is None or pd.isna(df.iloc[0]["signature"]) + + @staticmethod + @pytest.mark.asyncio + async def test_all_event_fields_are_present( + database: Database, test_data: TestData + ) -> None: + client = database.get_client() + + await client.write_events( + [ + EventCandidate( + source=test_data.TEST_SOURCE_STRING, + subject=test_data.REGISTERED_SUBJECT, + type=test_data.REGISTERED_TYPE, + data=test_data.JANE_DATA, + trace_parent=test_data.TRACE_PARENT_1, + ) + ] + ) + + events = client.read_events( + test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False) + ) + df = await events_to_dataframe(events) + + row = df.iloc[0] + + assert isinstance(row["event_id"], str) + assert isinstance(row["time"], datetime) + assert row["source"] == test_data.TEST_SOURCE_STRING + assert row["subject"] == test_data.REGISTERED_SUBJECT + assert row["type"] == test_data.REGISTERED_TYPE + assert row["data"] == test_data.JANE_DATA + assert isinstance(row["spec_version"], str) + assert isinstance(row["data_content_type"], str) + assert isinstance(row["predecessor_hash"], str) + assert isinstance(row["hash"], str) + assert row["trace_parent"] == test_data.TRACE_PARENT_1