Skip to content

[SPARK-55304][SS][PYTHON] Introduce support of Admission Control and Trigger.AvailableNow in Python data source - streaming reader#54085

Open
HeartSaVioR wants to merge 32 commits intoapache:masterfrom
HeartSaVioR:SPARK-55304
Open

[SPARK-55304][SS][PYTHON] Introduce support of Admission Control and Trigger.AvailableNow in Python data source - streaming reader#54085
HeartSaVioR wants to merge 32 commits intoapache:masterfrom
HeartSaVioR:SPARK-55304

Conversation

@HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This PR proposes to introduce the support of Admission Control and Trigger.AvailableNow in Python data source - streaming reader.

To support Admission control, we propose to change DataSourceStreamReader interface as following:
(Created a table to perform side-by-side comparison)

Before After
class DataSourceStreamReader(ABC): class DataSourceStreamReader(ABC):
def initialOffset(self) -> dict def initialOffset(self) -> dict
def latestOffset() -> dict def latestOffset(self, start: dict, limit: ReadLimit) -> dict
# NOTE: Optional to implement, default = ReadAllAvailable()
def getDefaultReadLimit(self) -> ReadLimit
# NOTE: Optional to implement, default = None
def reportLatestOffset(self) -> Optional[dict]
def partitions(self, start: dict, end: dict) -> Sequence[InputPartition] def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]
@abstractmethod def read(self, partition: InputPartition) -> Union[Iterator[Tuple], Iterator["RecordBatch"]] @abstractmethod def read(self, partition: InputPartition) -> Union[Iterator[Tuple], Iterator["RecordBatch"]]
def commit(self, end: dict) -> None def commit(self, end: dict) -> None
def stop(self) -> None def stop(self) -> None

The main change is following:

  • The method signature for latestOffset is changed. The method is mandatory.
  • The method getDefaultReadLimit is added, as optional.
  • The method reportLatestOffset is added, as optional.

This way, new implementations would support Admission Control by default. We ensure the engine can handle the case of the old method signature, via Python’s built-in inspect module (similar to Java’s reflection). If the method “latestOffset” is implemented without parameters, we fall back to the source which does not enable admission control. For all new sources, implementing latestOffset with parameters is strongly recommended.

ReadLimit interface and built-in implementations will be available for source implementations to leverage. Built-in implementations are as follows: ReadAllAvailable, ReadMinRows, ReadMaxRows, ReadMaxFiles, ReadMaxBytes. We won’t support custom implementation of ReadLimit interface at this point since it requires major efforts and we don’t see a demand, but we can plan for it if there is a strong demand.

We do not make any change to SimpleDataSourceStreamReader for Admission Control, since it is designed for small data fetch and could be considered as already limiting the data. We could still add the ReadLimit later if we see strong demand of limiting the fetch size via the source option.

To support Trigger.AvailableNow, we propose to introduce a new interface as following:

class SupportsTriggerAvailableNow(ABC):
  @abstractmethod
  def prepareForTriggerAvailableNow(self) -> None

The above interface can be “mixed-up” with both DataSourceStreamReader and SimpleDataSourceStreamReader. It won’t work with DataSourceStreamReader implementations having the old method signature of latestOffset(), likewise mentioned above.

Why are the changes needed?

This is to catch up with supported features in Scala DSv2 API, which we got reports from developers that missing features block them to implement some data sources.

Does this PR introduce any user-facing change?

Yes, users implementing streaming reader via python data source API will be able to add the support of Admission Control and Trigger.AvailableNow, which had been major lacks of features.

How was this patch tested?

New UTs.

Was this patch authored or co-authored using generative AI tooling?

Co-authored using claude-4.5-sonnet

@github-actions
Copy link

github-actions bot commented Feb 2, 2026

JIRA Issue Information

=== New Feature SPARK-55304 ===
Summary: Introduce Admission Control and Trigger.AvailableNow into Python Data Source - reader
Assignee: None
Status: Open
Affected: ["4.2.0"]


This comment was automatically generated by GitHub Actions

Comment on lines +219 to +220
same offset as given start parameter, to indicate that there is no more data to read. This
includes the case where the query is restarted and the source is asked to read from the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, why do we need to handle query restart case here? shouldn't the trigger available now get the new end offset after query restart?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4575b1d

So here is the scenario - the query reads from Kafka topic. In the first run, the topic has 3 partitions. During the downtime of the query, users perform repartition of Kafka topic and now the topic has 5 partitions. If there was uncommitted batch, the second run of query will get the start offset from uncommitted batch, which had only 3 partitions. In the meanwhile, prepareForTriggerAvailableNow() will identify there are 5 partitions and store the offset for 5 partitions. The source is responsible to read further from 3 partitions and figure out new partitions, and eventually touch the offset stored from prepareForTriggerAvailableNow().

The scenario is actually complicated and I might not be able to describe the case with ease of understanding. If there is proposal for better wording, I appreciate the suggestion!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the explanation, the example sounds good to me

Comment on lines 724 to 725
the very first micro-batch, and the offset continues from the last micro-batch for the
following. The source can return the same offset as start offset if there is no data to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe "and for subsequent micro-batches, the start offset is the ending offset from the previous micro-batch." is better iirc?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm slightly not in favor of coupling the contract with the engine's behavior, since it could limit ourselves. (I found myself doing this and I'm even OK with omitting it.) But the explanation from your comment is unlikely to change, so makes sense to me. Thanks for the suggestion.

Comment on lines 724 to 725
the very first micro-batch, and the offset continues from the last micro-batch for the
following. The source can return the same offset as start offset if there is no data to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"The source can return the same offset as start offset if there is no data to process"

I feel this is also a bit confusing, which "start" offset you are referring here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the parameter.

"The source can return the start parameter as it is, if there is no data to process"

^ Would it be clearer?

Comment on lines +736 to +737
engine; e.g. if the readLimit is :class:`ReadAllAvailable`, the source must ignore the read
limit configured through options.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe it will be more clear if we can provide an example in which the engine provides a different read limit than the configured one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for two cases 1) Trigger.Once (deprecated), 2) fallback of Trigger.AvailableNow (when any stream does not support Trigger.AvailableNow and Trigger.AvailableNow is requested) - I'm not very sure we would like to document these cases, as once we document it's considered as "contract" rather than implementation detail.

Comment on lines +219 to +220
same offset as given start parameter, to indicate that there is no more data to read. This
includes the case where the query is restarted and the source is asked to read from the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the explanation, the example sounds good to me

self._registry[type_name] = read_limit_type

def get(self, type_name: str, params: dict) -> ReadLimit:
read_limit_type = self._registry[type_name]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not quite familiar with python, but will this throws out exception if type_name doesn't exist?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 here, it will throw a KeyError I think. Either way this should probably use self._registry._get(type_name) or have an appropriate check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not expected to happen, but KeyError is definitely not preferable. I'll probably throw the better exception - if we have internal exception in PySpark than I'll use that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol I have check with None at next line. I'll just do .get().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to add a test case where latestOffset returns the same offset?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It happens with Trigger.AvailableNow; I'll check whether we do not have one for processing time timer; maybe that needs some union or stream-stream join since we still need to trigger the microbatch to verify the behavior. (If latestOffset returns the same offset and it's the only stream, we don't trigger the microbatch.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@huanliwang-db huanliwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link

@yyoli-db yyoli-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing this!

Copy link
Contributor

@allisonwang-db allisonwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this! Left some comments.

Comment on lines +23 to +24
Specifies limits on how much data to read from a streaming source when
determining the latest offset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add more comments and examples on how to use this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation should be covered in latestOffset() and getDefaultReadLimit() in DataSourceStreamReader.

Since devs have to use the built-in read limit implementations at this point, I'm going to enumerate the built-in implementations here so that they can understand which classes are available.

Parameter
---------
params : dict
The parameters to create the :class:`ReadLimit`. type name isn't included.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto can we add more examples?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class methods in ReadLimit aren't something user facing ones. They are Spark internal ones. Do we still need examples?

Comment on lines +98 to +100
assert isinstance(
limit, ReadAllAvailable
), "simple stream reader does not support read limit"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean we can't use availableNow with simple streaming reader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, admission control is not available for simple streaming reader. Trigger.availableNow would be still available for simple stream reader.

Comment on lines +744 to +748
----------
start : dict
The start offset of the microbatch to continue reading from.
limit : :class:`ReadLimit`
The limit on the amount of data to be returned by this call.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add supported version here

Comment on lines 739 to 741
NOTE: Previous Spark versions didn't have start offset and read limit parameters for this
method. While Spark will ensure the backward compatibility for existing data sources, the
new data sources are strongly encouraged to implement this new method signature.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add this NOTE below as a docstring section?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You meant moving out of doc comment? I'm OK with it.

@allisonwang-db
Copy link
Contributor

cc @shujingyang-db @gaogaotiantian

from abc import ABC, abstractmethod


class ReadLimit(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this ABC is over-designed, especially considering that we do not even support custom ReadLimit class at this point.

type_name does not really do anything besides returning an identifier for this class. It's only used internally I think you can directly use self.__class__.__name__.

cls.load(param) -> cls is not a super common pattern in python or pyspark. This is just __init__ I think. You just need the subclass to have an __init__ function that takes a parameter. Save that parameter and use it in dump would be fine.

Bottom line is, you just need a serializable enum that take some argument. I think you can totally do it with just dataclass.

from dataclasses import dataclass

class ReadLimit:
    ...

@dataclass
class ReadAllAvailable(ReadLimit):
    type: str = "ReadAllAvailable"

@dataclass
class ReadMinRows(ReadLimit):
    type: str = "ReadMinRows"
    min_rows: int

You can do dataclasses.asdict(obj) to dump it and registry[type](**params) to create the class. What's the concerns to use this simple pattern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be the case, but when we decide to extend this, would it be a one way door and we will never be able to support user-defined ReadLimit? If not I'm happy to incorporate the feedback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK looks like it's still retaining the mechanism of registry but just to remove out the classmethods. Maybe we could just ask the custom impl to provide the type and done. OK for me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there could still be custom RateLimit in the future. The difference is that the user only needs to define a dataclass like

@dataclass
class MyOwnRateLimiter(RateLimit):
    name = "MyOwnRateLimiter"
    param1: int
    param2: str

And that's it - they don't need to do anything else.

I changed type to name here because I think that's probably a bit more intuitive. Also type is a global variable so even though it's okay to use it here, I try to avoid it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if you think it's easier for them to do

@dataclass
class MyOwnRateLimiter(RateLimit):
    param1: int
    param2: str

All you need to do is to do a bit extra work when you dump it (not even load). When you dump it, do something like

asdict(r) | {"_type": r.__class__.__name__} - that saved another thing you need to specify in the dataclass.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take the class name - basically I want flexibility of the name, but that sounds to me I need to do it with class variable and that is even modifiable from outside, which could mess up the thing. I realized Final is just a type hint.

NON_EMPTY_PYARROW_RECORD_BATCHES = 1
EMPTY_PYARROW_RECORD_BATCHES = 2

SUPPORTS_ADMISSION_CONTROL = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SUPPORTS_ADMISSION_CONTROL = 1 << 0 is better I think.


def check_support_func(reader: DataSourceStreamReader, outfile: IO) -> None:
support_flags = 0
if isinstance(reader, _SimpleStreamReaderWrapper):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised that we wrote _SimpleStreamReaderWrapper which is a subclass of DataSourceStreamReader yet we still need to have a separate if statement for it - that to be is against the rule of inheritance...

However, in this case, do we really need to?

The same logic for inspect.signature applies for _SimpleStreamReaderWrapper because it does have the correct signature for reader.latestOffset. SupportsTriggerAvailableNow is not the important thing the important thing should be prepareForTriggerAvailableNow. We should have a read-through logic for _SimpleStreamReaderWrapper so hasattr(reader, "prepareForTriggerAvailableNow") returns the underlying simple_reader's attributes directly.

Maybe we don't need to do this in this PR, but we should not claim that _SimpleStreamReaderWrapper is a DataSourceStreamReader and still need to have a separate case to access simple_reader all the time.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I get how your proposal works, I'd argue that this is just another workaround for doing something which we can't do from inheritance.

For example, you said SupportsTriggerAvailableNow isn't the important thing - if we come to non-python language which has strong typing, explicitly checking the type is mostly required except the hacky way like reflection in Java - it is not a best practice. We do inspect for latestOffset but this is a last resort approach, because python does not support method overloading so we actually can't do the same evolution of API we did from Scala/Java.
(It's still possible we can introduce a separate interface to achieve similar thing but it is not about overloading - it's just that the latest definition will just override the prior definition for the same name of method.)

Though I understand that in python it's duck typing and the type system is crazy loose and the availability of function/method simply matters.

Maybe you might have tried to give a better approach for pythonic thinking. It's just that both breaks inheritance and the proposal seems to break more than what I do. I agree if you want to say this isn't pythonic, but then I could say this is one of the pattern we heavily leverage in Scala, we call it as pattern matching. It's just that there is no language level support so I have to do this manually.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not claim that _SimpleStreamReaderWrapper is a DataSourceStreamReader

This breaks the structure of interface we design. If _SimpleStreamReaderWrapper isn't a DataSourceStreamReader, what it would be?

Copy link
Contributor

@gaogaotiantian gaogaotiantian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that there's a time pressure. I think as long as we can agree on the public interface, we can always polish the implementation later.

I have no knowledge on scala side so I just have some comments on python code.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 5, 2026

I think the proposal on the interface is closer to be "arguable" instead of something I totally agree. While it might be more pythonic, at least we shouldn't claim that is better for inheritance. From the proposal I don't see we respect the interface at all. I don't think this can hold the PR from merging anyway since it's more about "preference" at this point - that's not related to time pressure or so.

Also _SimpleStreamReaderWrapper is an internal implementation of DataSourceStreamReader, so modifying that class is nothing to do with "public interface".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants