[SPARK-55304][SS][PYTHON] Introduce support of Admission Control and Trigger.AvailableNow in Python data source - streaming reader#54085
Conversation
JIRA Issue Information=== New Feature SPARK-55304 === This comment was automatically generated by GitHub Actions |
… built-in read limit, testing...
157cad4 to
4fc6e0a
Compare
| same offset as given start parameter, to indicate that there is no more data to read. This | ||
| includes the case where the query is restarted and the source is asked to read from the |
There was a problem hiding this comment.
sorry, why do we need to handle query restart case here? shouldn't the trigger available now get the new end offset after query restart?
There was a problem hiding this comment.
So here is the scenario - the query reads from Kafka topic. In the first run, the topic has 3 partitions. During the downtime of the query, users perform repartition of Kafka topic and now the topic has 5 partitions. If there was uncommitted batch, the second run of query will get the start offset from uncommitted batch, which had only 3 partitions. In the meanwhile, prepareForTriggerAvailableNow() will identify there are 5 partitions and store the offset for 5 partitions. The source is responsible to read further from 3 partitions and figure out new partitions, and eventually touch the offset stored from prepareForTriggerAvailableNow().
The scenario is actually complicated and I might not be able to describe the case with ease of understanding. If there is proposal for better wording, I appreciate the suggestion!
There was a problem hiding this comment.
thanks for the explanation, the example sounds good to me
python/pyspark/sql/datasource.py
Outdated
| the very first micro-batch, and the offset continues from the last micro-batch for the | ||
| following. The source can return the same offset as start offset if there is no data to |
There was a problem hiding this comment.
maybe "and for subsequent micro-batches, the start offset is the ending offset from the previous micro-batch." is better iirc?
There was a problem hiding this comment.
I'm slightly not in favor of coupling the contract with the engine's behavior, since it could limit ourselves. (I found myself doing this and I'm even OK with omitting it.) But the explanation from your comment is unlikely to change, so makes sense to me. Thanks for the suggestion.
python/pyspark/sql/datasource.py
Outdated
| the very first micro-batch, and the offset continues from the last micro-batch for the | ||
| following. The source can return the same offset as start offset if there is no data to |
There was a problem hiding this comment.
"The source can return the same offset as start offset if there is no data to process"
I feel this is also a bit confusing, which "start" offset you are referring here?
There was a problem hiding this comment.
I meant the parameter.
"The source can return the start parameter as it is, if there is no data to process"
^ Would it be clearer?
| engine; e.g. if the readLimit is :class:`ReadAllAvailable`, the source must ignore the read | ||
| limit configured through options. |
There was a problem hiding this comment.
nit: maybe it will be more clear if we can provide an example in which the engine provides a different read limit than the configured one
There was a problem hiding this comment.
It's for two cases 1) Trigger.Once (deprecated), 2) fallback of Trigger.AvailableNow (when any stream does not support Trigger.AvailableNow and Trigger.AvailableNow is requested) - I'm not very sure we would like to document these cases, as once we document it's considered as "contract" rather than implementation detail.
| same offset as given start parameter, to indicate that there is no more data to read. This | ||
| includes the case where the query is restarted and the source is asked to read from the |
There was a problem hiding this comment.
thanks for the explanation, the example sounds good to me
| self._registry[type_name] = read_limit_type | ||
|
|
||
| def get(self, type_name: str, params: dict) -> ReadLimit: | ||
| read_limit_type = self._registry[type_name] |
There was a problem hiding this comment.
I am not quite familiar with python, but will this throws out exception if type_name doesn't exist?
There was a problem hiding this comment.
+1 here, it will throw a KeyError I think. Either way this should probably use self._registry._get(type_name) or have an appropriate check
There was a problem hiding this comment.
That's not expected to happen, but KeyError is definitely not preferable. I'll probably throw the better exception - if we have internal exception in PySpark than I'll use that.
There was a problem hiding this comment.
lol I have check with None at next line. I'll just do .get().
There was a problem hiding this comment.
is it possible to add a test case where latestOffset returns the same offset?
There was a problem hiding this comment.
It happens with Trigger.AvailableNow; I'll check whether we do not have one for processing time timer; maybe that needs some union or stream-stream join since we still need to trigger the microbatch to verify the behavior. (If latestOffset returns the same offset and it's the only stream, we don't trigger the microbatch.)
allisonwang-db
left a comment
There was a problem hiding this comment.
Thanks for working on this! Left some comments.
| Specifies limits on how much data to read from a streaming source when | ||
| determining the latest offset. |
There was a problem hiding this comment.
Can we add more comments and examples on how to use this class?
There was a problem hiding this comment.
The documentation should be covered in latestOffset() and getDefaultReadLimit() in DataSourceStreamReader.
Since devs have to use the built-in read limit implementations at this point, I'm going to enumerate the built-in implementations here so that they can understand which classes are available.
| Parameter | ||
| --------- | ||
| params : dict | ||
| The parameters to create the :class:`ReadLimit`. type name isn't included. |
There was a problem hiding this comment.
ditto can we add more examples?
There was a problem hiding this comment.
Class methods in ReadLimit aren't something user facing ones. They are Spark internal ones. Do we still need examples?
| assert isinstance( | ||
| limit, ReadAllAvailable | ||
| ), "simple stream reader does not support read limit" |
There was a problem hiding this comment.
Does that mean we can't use availableNow with simple streaming reader?
There was a problem hiding this comment.
No, admission control is not available for simple streaming reader. Trigger.availableNow would be still available for simple stream reader.
| ---------- | ||
| start : dict | ||
| The start offset of the microbatch to continue reading from. | ||
| limit : :class:`ReadLimit` | ||
| The limit on the amount of data to be returned by this call. |
There was a problem hiding this comment.
Let's add supported version here
python/pyspark/sql/datasource.py
Outdated
| NOTE: Previous Spark versions didn't have start offset and read limit parameters for this | ||
| method. While Spark will ensure the backward compatibility for existing data sources, the | ||
| new data sources are strongly encouraged to implement this new method signature. |
There was a problem hiding this comment.
Let's add this NOTE below as a docstring section?
There was a problem hiding this comment.
You meant moving out of doc comment? I'm OK with it.
| from abc import ABC, abstractmethod | ||
|
|
||
|
|
||
| class ReadLimit(ABC): |
There was a problem hiding this comment.
I think this ABC is over-designed, especially considering that we do not even support custom ReadLimit class at this point.
type_name does not really do anything besides returning an identifier for this class. It's only used internally I think you can directly use self.__class__.__name__.
cls.load(param) -> cls is not a super common pattern in python or pyspark. This is just __init__ I think. You just need the subclass to have an __init__ function that takes a parameter. Save that parameter and use it in dump would be fine.
Bottom line is, you just need a serializable enum that take some argument. I think you can totally do it with just dataclass.
from dataclasses import dataclass
class ReadLimit:
...
@dataclass
class ReadAllAvailable(ReadLimit):
type: str = "ReadAllAvailable"
@dataclass
class ReadMinRows(ReadLimit):
type: str = "ReadMinRows"
min_rows: intYou can do dataclasses.asdict(obj) to dump it and registry[type](**params) to create the class. What's the concerns to use this simple pattern?
There was a problem hiding this comment.
It might be the case, but when we decide to extend this, would it be a one way door and we will never be able to support user-defined ReadLimit? If not I'm happy to incorporate the feedback.
There was a problem hiding this comment.
OK looks like it's still retaining the mechanism of registry but just to remove out the classmethods. Maybe we could just ask the custom impl to provide the type and done. OK for me.
There was a problem hiding this comment.
Yeah there could still be custom RateLimit in the future. The difference is that the user only needs to define a dataclass like
@dataclass
class MyOwnRateLimiter(RateLimit):
name = "MyOwnRateLimiter"
param1: int
param2: strAnd that's it - they don't need to do anything else.
I changed type to name here because I think that's probably a bit more intuitive. Also type is a global variable so even though it's okay to use it here, I try to avoid it.
There was a problem hiding this comment.
Also if you think it's easier for them to do
@dataclass
class MyOwnRateLimiter(RateLimit):
param1: int
param2: strAll you need to do is to do a bit extra work when you dump it (not even load). When you dump it, do something like
asdict(r) | {"_type": r.__class__.__name__} - that saved another thing you need to specify in the dataclass.
There was a problem hiding this comment.
I'll take the class name - basically I want flexibility of the name, but that sounds to me I need to do it with class variable and that is even modifiable from outside, which could mess up the thing. I realized Final is just a type hint.
| NON_EMPTY_PYARROW_RECORD_BATCHES = 1 | ||
| EMPTY_PYARROW_RECORD_BATCHES = 2 | ||
|
|
||
| SUPPORTS_ADMISSION_CONTROL = 1 |
There was a problem hiding this comment.
SUPPORTS_ADMISSION_CONTROL = 1 << 0 is better I think.
|
|
||
| def check_support_func(reader: DataSourceStreamReader, outfile: IO) -> None: | ||
| support_flags = 0 | ||
| if isinstance(reader, _SimpleStreamReaderWrapper): |
There was a problem hiding this comment.
I'm surprised that we wrote _SimpleStreamReaderWrapper which is a subclass of DataSourceStreamReader yet we still need to have a separate if statement for it - that to be is against the rule of inheritance...
However, in this case, do we really need to?
The same logic for inspect.signature applies for _SimpleStreamReaderWrapper because it does have the correct signature for reader.latestOffset. SupportsTriggerAvailableNow is not the important thing the important thing should be prepareForTriggerAvailableNow. We should have a read-through logic for _SimpleStreamReaderWrapper so hasattr(reader, "prepareForTriggerAvailableNow") returns the underlying simple_reader's attributes directly.
Maybe we don't need to do this in this PR, but we should not claim that _SimpleStreamReaderWrapper is a DataSourceStreamReader and still need to have a separate case to access simple_reader all the time.
There was a problem hiding this comment.
While I get how your proposal works, I'd argue that this is just another workaround for doing something which we can't do from inheritance.
For example, you said SupportsTriggerAvailableNow isn't the important thing - if we come to non-python language which has strong typing, explicitly checking the type is mostly required except the hacky way like reflection in Java - it is not a best practice. We do inspect for latestOffset but this is a last resort approach, because python does not support method overloading so we actually can't do the same evolution of API we did from Scala/Java.
(It's still possible we can introduce a separate interface to achieve similar thing but it is not about overloading - it's just that the latest definition will just override the prior definition for the same name of method.)
Though I understand that in python it's duck typing and the type system is crazy loose and the availability of function/method simply matters.
Maybe you might have tried to give a better approach for pythonic thinking. It's just that both breaks inheritance and the proposal seems to break more than what I do. I agree if you want to say this isn't pythonic, but then I could say this is one of the pattern we heavily leverage in Scala, we call it as pattern matching. It's just that there is no language level support so I have to do this manually.
There was a problem hiding this comment.
we should not claim that _SimpleStreamReaderWrapper is a DataSourceStreamReader
This breaks the structure of interface we design. If _SimpleStreamReaderWrapper isn't a DataSourceStreamReader, what it would be?
gaogaotiantian
left a comment
There was a problem hiding this comment.
I understand that there's a time pressure. I think as long as we can agree on the public interface, we can always polish the implementation later.
I have no knowledge on scala side so I just have some comments on python code.
|
I think the proposal on the interface is closer to be "arguable" instead of something I totally agree. While it might be more pythonic, at least we shouldn't claim that is better for inheritance. From the proposal I don't see we respect the interface at all. I don't think this can hold the PR from merging anyway since it's more about "preference" at this point - that's not related to time pressure or so. Also _SimpleStreamReaderWrapper is an internal implementation of DataSourceStreamReader, so modifying that class is nothing to do with "public interface". |
5c4de75 to
0118045
Compare
0118045 to
4bd0bc9
Compare
What changes were proposed in this pull request?
This PR proposes to introduce the support of Admission Control and Trigger.AvailableNow in Python data source - streaming reader.
To support Admission control, we propose to change
DataSourceStreamReaderinterface as following:(Created a table to perform side-by-side comparison)
class DataSourceStreamReader(ABC):class DataSourceStreamReader(ABC):def initialOffset(self) -> dictdef initialOffset(self) -> dictdef latestOffset() -> dictdef latestOffset(self, start: dict, limit: ReadLimit) -> dict# NOTE: Optional to implement, default = ReadAllAvailable()def getDefaultReadLimit(self) -> ReadLimit# NOTE: Optional to implement, default = Nonedef reportLatestOffset(self) -> Optional[dict]def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]@abstractmethod def read(self, partition: InputPartition) -> Union[Iterator[Tuple], Iterator["RecordBatch"]]@abstractmethod def read(self, partition: InputPartition) -> Union[Iterator[Tuple], Iterator["RecordBatch"]]def commit(self, end: dict) -> Nonedef commit(self, end: dict) -> Nonedef stop(self) -> Nonedef stop(self) -> NoneThe main change is following:
latestOffsetis changed. The method is mandatory.getDefaultReadLimitis added, as optional.reportLatestOffsetis added, as optional.This way, new implementations would support Admission Control by default. We ensure the engine can handle the case of the old method signature, via Python’s built-in inspect module (similar to Java’s reflection). If the method “latestOffset” is implemented without parameters, we fall back to the source which does not enable admission control. For all new sources, implementing latestOffset with parameters is strongly recommended.
ReadLimit interface and built-in implementations will be available for source implementations to leverage. Built-in implementations are as follows:
ReadAllAvailable,ReadMinRows,ReadMaxRows,ReadMaxFiles,ReadMaxBytes. We won’t support custom implementation ofReadLimitinterface at this point since it requires major efforts and we don’t see a demand, but we can plan for it if there is a strong demand.We do not make any change to
SimpleDataSourceStreamReaderfor Admission Control, since it is designed for small data fetch and could be considered as already limiting the data. We could still add theReadLimitlater if we see strong demand of limiting the fetch size via the source option.To support
Trigger.AvailableNow, we propose to introduce a new interface as following:The above interface can be “mixed-up” with both
DataSourceStreamReaderandSimpleDataSourceStreamReader. It won’t work withDataSourceStreamReaderimplementations having the old method signature oflatestOffset(), likewise mentioned above.Why are the changes needed?
This is to catch up with supported features in Scala DSv2 API, which we got reports from developers that missing features block them to implement some data sources.
Does this PR introduce any user-facing change?
Yes, users implementing streaming reader via python data source API will be able to add the support of Admission Control and Trigger.AvailableNow, which had been major lacks of features.
How was this patch tested?
New UTs.
Was this patch authored or co-authored using generative AI tooling?
Co-authored using claude-4.5-sonnet