Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 5 additions & 16 deletions src/cloudevents/core/bindings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.

from cloudevents.core.bindings.http import (
HTTPMessage,
from_binary,
from_http,
from_structured,
to_binary,
to_structured,
)
"""
CloudEvents protocol bindings.

__all__ = [
"HTTPMessage",
"to_binary",
"from_binary",
"to_structured",
"from_structured",
"from_http",
]
This package provides protocol-specific bindings for CloudEvents, including HTTP and Kafka.
Each binding module provides functions to convert CloudEvents to/from protocol-specific messages.
"""
68 changes: 68 additions & 0 deletions src/cloudevents/core/bindings/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copyright 2018-Present The CloudEvents Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""
Common utilities for CloudEvents protocol bindings.

This module provides shared functionality for protocol bindings (HTTP, Kafka, etc.)
to handle CloudEvent attribute encoding and decoding per the CloudEvents specification.
"""

from datetime import datetime
from typing import Any, Final
from urllib.parse import quote, unquote

from dateutil.parser import isoparse

TIME_ATTR: Final[str] = "time"
CONTENT_TYPE_HEADER: Final[str] = "content-type"
DATACONTENTTYPE_ATTR: Final[str] = "datacontenttype"


def encode_header_value(value: Any) -> str:
"""
Encode a CloudEvent attribute value for use in a protocol header.

Handles special encoding for datetime objects (ISO 8601 with 'Z' suffix for UTC)
and applies percent-encoding for non-ASCII and special characters per RFC 3986.

:param value: The attribute value to encode
:return: Percent-encoded string suitable for protocol headers
"""
if isinstance(value, datetime):
str_value = value.isoformat()
if str_value.endswith("+00:00"):
str_value = str_value[:-6] + "Z"
return quote(str_value, safe="")

return quote(str(value), safe="")


def decode_header_value(attr_name: str, value: str) -> Any:
"""
Decode a CloudEvent attribute value from a protocol header.

Applies percent-decoding and special parsing for the 'time' attribute
(converts to datetime object using RFC 3339 parsing).

:param attr_name: The name of the CloudEvent attribute
:param value: The percent-encoded header value
:return: Decoded value (datetime for 'time' attribute, string otherwise)
"""
decoded = unquote(value)

if attr_name == TIME_ATTR:
return isoparse(decoded)

return decoded
61 changes: 12 additions & 49 deletions src/cloudevents/core/bindings/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@
# under the License.

from dataclasses import dataclass
from datetime import datetime
from typing import Any, Callable, Final
from urllib.parse import quote, unquote

from dateutil.parser import isoparse

from cloudevents.core.base import BaseCloudEvent
from cloudevents.core.bindings.common import (
CONTENT_TYPE_HEADER,
DATACONTENTTYPE_ATTR,
decode_header_value,
encode_header_value,
)
from cloudevents.core.formats.base import Format

CE_PREFIX: Final[str] = "ce-"
CONTENT_TYPE_HEADER: Final[str] = "content-type"


@dataclass(frozen=True)
Expand All @@ -44,44 +45,6 @@ class HTTPMessage:
body: bytes


def _encode_header_value(value: Any) -> str:
"""
Encode a CloudEvent attribute value for use in an HTTP header.

Handles special encoding for datetime objects (ISO 8601 with 'Z' suffix for UTC)
and applies percent-encoding for non-ASCII and special characters per RFC 3986.

:param value: The attribute value to encode
:return: Percent-encoded string suitable for HTTP headers
"""
if isinstance(value, datetime):
str_value = value.isoformat()
if str_value.endswith("+00:00"):
str_value = str_value[:-6] + "Z"
return quote(str_value, safe="")

return quote(str(value), safe="")


def _decode_header_value(attr_name: str, value: str) -> Any:
"""
Decode a CloudEvent attribute value from an HTTP header.

Applies percent-decoding and special parsing for the 'time' attribute
(converts to datetime object using RFC 3339 parsing).

:param attr_name: The name of the CloudEvent attribute
:param value: The percent-encoded header value
:return: Decoded value (datetime for 'time' attribute, string otherwise)
"""
decoded = unquote(value)

if attr_name == "time":
return isoparse(decoded)

return decoded


def to_binary(event: BaseCloudEvent, event_format: Format) -> HTTPMessage:
"""
Convert a CloudEvent to HTTP binary content mode.
Expand Down Expand Up @@ -113,14 +76,14 @@ def to_binary(event: BaseCloudEvent, event_format: Format) -> HTTPMessage:
if attr_value is None:
continue

if attr_name == "datacontenttype":
if attr_name == DATACONTENTTYPE_ATTR:
headers[CONTENT_TYPE_HEADER] = str(attr_value)
else:
header_name = f"{CE_PREFIX}{attr_name}"
headers[header_name] = _encode_header_value(attr_value)
headers[header_name] = encode_header_value(attr_value)

data = event.get_data()
datacontenttype = attributes.get("datacontenttype")
datacontenttype = attributes.get(DATACONTENTTYPE_ATTR)
body = event_format.write_data(data, datacontenttype)

return HTTPMessage(headers=headers, body=body)
Expand Down Expand Up @@ -163,11 +126,11 @@ def from_binary(

if normalized_name.startswith(CE_PREFIX):
attr_name = normalized_name[len(CE_PREFIX) :]
attributes[attr_name] = _decode_header_value(attr_name, header_value)
attributes[attr_name] = decode_header_value(attr_name, header_value)
elif normalized_name == CONTENT_TYPE_HEADER:
attributes["datacontenttype"] = header_value
attributes[DATACONTENTTYPE_ATTR] = header_value

datacontenttype = attributes.get("datacontenttype")
datacontenttype = attributes.get(DATACONTENTTYPE_ATTR)
data = event_format.read_data(message.body, datacontenttype)

return event_factory(attributes, data)
Expand Down
Loading
Loading