Skip to content
Merged
54 changes: 54 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,60 @@ async for row in rows:
await rows.aclose()
```

### Converting Events to pandas DataFrame

For data analysis and exploration, you can convert event streams to pandas DataFrames. To use this feature, install the client SDK with pandas support:

```shell
pip install eventsourcingdb[pandas]
```

Import the `events_to_dataframe` function from the `eventsourcingdb.pandas` module and pass an event stream to it:

```python
from eventsourcingdb import Client, ReadEventsOptions
from eventsourcingdb.pandas import events_to_dataframe

events = client.read_events(
subject = '/books',
options = ReadEventsOptions(
recursive = True
),
)

df = await events_to_dataframe(events)
```

The resulting DataFrame includes all event fields as columns: `event_id`, `time`, `source`, `subject`, `type`, `data`, `spec_version`, `data_content_type`, `predecessor_hash`, `hash`, `trace_parent`, `trace_state`, and `signature`.

The `data` field remains as a dict column, which you can access directly:

```python
# Access the data field
for index, row in df.iterrows():
print(row['data'])
```

#### Flattening the Data Field

For analysis of specific event types, you may want to flatten the `data` field into separate columns. Use pandas' `json_normalize` function:

```python
import pandas as pd

# Filter for a specific event type first
book_acquired_df = df[df['type'] == 'io.eventsourcingdb.library.book-acquired']

# Flatten the data field
flattened_df = book_acquired_df.join(
pd.json_normalize(book_acquired_df['data'])
)

# Now you can access data fields as columns
print(flattened_df['title'])
print(flattened_df['author'])
```

### Observing Events

To observe all events of a subject, call the `observe_events` function with the subject as the first argument and an options object as the second argument. Set the `recursive` option to `False`. This ensures that only events of the given subject are returned, not events of nested subjects.
Expand Down
62 changes: 62 additions & 0 deletions eventsourcingdb/pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from typing import AsyncGenerator

import pandas as pd

from .event.event import Event

__all__ = ["events_to_dataframe"]


async def events_to_dataframe(events: AsyncGenerator[Event, None]) -> pd.DataFrame:
"""
Convert an async stream of events to a pandas DataFrame.

All event fields are included as columns. The 'data' field remains
as a dict column - use pd.json_normalize() for flattening if needed.

Args:
events: An async generator of Event objects

Returns:
A pandas DataFrame with all event fields as columns
"""
event_list = []

async for event in events:
event_dict = {
"event_id": event.event_id,
"time": event.time,
"source": event.source,
"subject": event.subject,
"type": event.type,
"data": event.data,
"spec_version": event.spec_version,
"data_content_type": event.data_content_type,
"predecessor_hash": event.predecessor_hash,
"hash": event.hash,
"trace_parent": event.trace_parent,
"trace_state": event.trace_state,
"signature": event.signature,
}
event_list.append(event_dict)

if len(event_list) == 0:
return pd.DataFrame(
columns=[ # pyright: ignore[reportArgumentType]
"event_id",
"time",
"source",
"subject",
"type",
"data",
"spec_version",
"data_content_type",
"predecessor_hash",
"hash",
"trace_parent",
"trace_state",
"signature",
]
)

return pd.DataFrame(event_list)
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ dev = [
"bandit==1.8.6",
"pyright==1.1.407",
"twine==6.2.0",
"pandas>=2.0.0",
]
pandas = [
"pandas>=2.0.0",
]

[build-system]
Expand Down
248 changes: 248 additions & 0 deletions tests/test_events_to_dataframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
from datetime import datetime

import pandas as pd
import pytest

from eventsourcingdb import EventCandidate, ReadEventsOptions
from eventsourcingdb.pandas import events_to_dataframe

from .conftest import TestData
from .shared.database import Database


class TestEventsToDataframe:
@staticmethod
@pytest.mark.asyncio
async def test_returns_empty_dataframe_for_empty_event_stream(
database: Database,
) -> None:
client = database.get_client()

events = client.read_events("/nonexistent", ReadEventsOptions(recursive=False))
df = await events_to_dataframe(events)

assert isinstance(df, pd.DataFrame)
assert len(df) == 0
assert list(df.columns) == [
"event_id",
"time",
"source",
"subject",
"type",
"data",
"spec_version",
"data_content_type",
"predecessor_hash",
"hash",
"trace_parent",
"trace_state",
"signature",
]

@staticmethod
@pytest.mark.asyncio
async def test_returns_dataframe_with_single_event(
database: Database, test_data: TestData
) -> None:
client = database.get_client()

await client.write_events(
[
EventCandidate(
source=test_data.TEST_SOURCE_STRING,
subject=test_data.REGISTERED_SUBJECT,
type=test_data.REGISTERED_TYPE,
data=test_data.JANE_DATA,
trace_parent=test_data.TRACE_PARENT_1,
)
]
)

events = client.read_events(
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
)
df = await events_to_dataframe(events)

assert isinstance(df, pd.DataFrame)
assert len(df) == 1

assert df.iloc[0]["source"] == test_data.TEST_SOURCE_STRING
assert df.iloc[0]["subject"] == test_data.REGISTERED_SUBJECT
assert df.iloc[0]["type"] == test_data.REGISTERED_TYPE
assert df.iloc[0]["data"] == test_data.JANE_DATA
assert df.iloc[0]["trace_parent"] == test_data.TRACE_PARENT_1

@staticmethod
@pytest.mark.asyncio
async def test_returns_dataframe_with_multiple_events(
prepared_database: Database, test_data: TestData
) -> None:
client = prepared_database.get_client()

events = client.read_events("/users", ReadEventsOptions(recursive=True))
df = await events_to_dataframe(events)

assert isinstance(df, pd.DataFrame)
expected_event_count = 4
assert len(df) == expected_event_count

assert df.iloc[0]["data"] == test_data.JANE_DATA
assert df.iloc[1]["data"] == test_data.JANE_DATA
assert df.iloc[2]["data"] == test_data.JOHN_DATA
assert df.iloc[3]["data"] == test_data.JOHN_DATA

@staticmethod
@pytest.mark.asyncio
async def test_dataframe_has_correct_column_names(
database: Database, test_data: TestData
) -> None:
client = database.get_client()

await client.write_events(
[
EventCandidate(
source=test_data.TEST_SOURCE_STRING,
subject=test_data.REGISTERED_SUBJECT,
type=test_data.REGISTERED_TYPE,
data=test_data.JANE_DATA,
)
]
)

events = client.read_events(
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
)
df = await events_to_dataframe(events)

expected_columns = [
"event_id",
"time",
"source",
"subject",
"type",
"data",
"spec_version",
"data_content_type",
"predecessor_hash",
"hash",
"trace_parent",
"trace_state",
"signature",
]
assert list(df.columns) == expected_columns

@staticmethod
@pytest.mark.asyncio
async def test_data_field_remains_as_dict(
database: Database, test_data: TestData
) -> None:
client = database.get_client()

await client.write_events(
[
EventCandidate(
source=test_data.TEST_SOURCE_STRING,
subject=test_data.REGISTERED_SUBJECT,
type=test_data.REGISTERED_TYPE,
data=test_data.JANE_DATA,
)
]
)

events = client.read_events(
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
)
df = await events_to_dataframe(events)

assert isinstance(df.iloc[0]["data"], dict)
assert df.iloc[0]["data"] == test_data.JANE_DATA

@staticmethod
@pytest.mark.asyncio
async def test_time_field_is_datetime_object(
database: Database, test_data: TestData
) -> None:
client = database.get_client()

await client.write_events(
[
EventCandidate(
source=test_data.TEST_SOURCE_STRING,
subject=test_data.REGISTERED_SUBJECT,
type=test_data.REGISTERED_TYPE,
data=test_data.JANE_DATA,
)
]
)

events = client.read_events(
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
)
df = await events_to_dataframe(events)

assert isinstance(df.iloc[0]["time"], datetime)

@staticmethod
@pytest.mark.asyncio
async def test_optional_fields_can_be_none(
database: Database, test_data: TestData
) -> None:
client = database.get_client()

await client.write_events(
[
EventCandidate(
source=test_data.TEST_SOURCE_STRING,
subject=test_data.REGISTERED_SUBJECT,
type=test_data.REGISTERED_TYPE,
data=test_data.JANE_DATA,
)
]
)

events = client.read_events(
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
)
df = await events_to_dataframe(events)

assert df.iloc[0]["trace_parent"] is None or pd.isna(df.iloc[0]["trace_parent"])
assert df.iloc[0]["trace_state"] is None or pd.isna(df.iloc[0]["trace_state"])
assert df.iloc[0]["signature"] is None or pd.isna(df.iloc[0]["signature"])

@staticmethod
@pytest.mark.asyncio
async def test_all_event_fields_are_present(
database: Database, test_data: TestData
) -> None:
client = database.get_client()

await client.write_events(
[
EventCandidate(
source=test_data.TEST_SOURCE_STRING,
subject=test_data.REGISTERED_SUBJECT,
type=test_data.REGISTERED_TYPE,
data=test_data.JANE_DATA,
trace_parent=test_data.TRACE_PARENT_1,
)
]
)

events = client.read_events(
test_data.REGISTERED_SUBJECT, ReadEventsOptions(recursive=False)
)
df = await events_to_dataframe(events)

row = df.iloc[0]

assert isinstance(row["event_id"], str)
assert isinstance(row["time"], datetime)
assert row["source"] == test_data.TEST_SOURCE_STRING
assert row["subject"] == test_data.REGISTERED_SUBJECT
assert row["type"] == test_data.REGISTERED_TYPE
assert row["data"] == test_data.JANE_DATA
assert isinstance(row["spec_version"], str)
assert isinstance(row["data_content_type"], str)
assert isinstance(row["predecessor_hash"], str)
assert isinstance(row["hash"], str)
assert row["trace_parent"] == test_data.TRACE_PARENT_1