[SPARK-55314][CONNECT] Propagate observed metrics errors to client#54094
[SPARK-55314][CONNECT] Propagate observed metrics errors to client#54094heyihong wants to merge 2 commits intoapache:masterfrom
Conversation
cc950ed to
e127c3c
Compare
JIRA Issue Information=== Sub-task SPARK-55314 === This comment was automatically generated by GitHub Actions |
d64e517 to
8074f21
Compare
|
I need a bit more context. What's the current protobuf protocol for observed metrics and how this PR changes it? |
@cloud-fan Yes, below is the summary of the success path:
For this PR, success path is unchanged; it only adds reporting of observation collection failures via root_error_idx and errors, and surfaces those failures on both Scala and Python clients. |
| int64 plan_id = 4; | ||
| // (Optional) The index of the root error in errors. | ||
| // The field will not be set if there are no errors. | ||
| optional int32 root_error_idx = 5; |
There was a problem hiding this comment.
what is root error? how is this different from normal query errors?
There was a problem hiding this comment.
The root error is the top-level error in the error cause chain. The definition is the same as in https://github.com/apache/spark/blob/master/sql/connect/common/src/main/protobuf/spark/connect/base.proto#L1050-L1054.
| optional int32 root_error_idx = 5; | ||
| // A list of errors that occurred while collecting the observed metrics. | ||
| // If the length is 0, it means no errors occurred. | ||
| repeated FetchErrorDetailsResponse.Error errors = 6; |
There was a problem hiding this comment.
why it's a list? because a query can have many observations?
There was a problem hiding this comment.
It is for future extensions if we want to support returning cause exceptions.
What changes were proposed in this pull request?
Propagate observation metric collection errors to the client in Spark Connect instead of silently returning empty metrics.
root_error_idxand repeatederrorstoExecutePlanResponse.ObservedMetricsso the server can send observation failures.convert_observation_errors()and refactor exception conversion to support it; in the client, when observed metrics haveroot_error_idxset, convert and store the error on the Observation; inObservation.get, raise the stored error if present.Try[Row]/Try[Seq[...]]for observed metrics end-to-end; on failure, serialize the throwable viaErrorUtils.throwableToProtoErrorsand setroot_error_idx/errorson ObservedMetrics; in Observation, rethrow the cause fromgetRowso the original failure is exposed.Why are the changes needed?
Previously, when an error occurred during observation metric collection (SPARK-55150), the error was silently ignored and an empty result was returned. This was confusing for users since they would get empty metrics without knowing an error occurred. With this change, the actual error is propagated to the client so users can understand why their observation failed.
Does this PR introduce any user-facing change?
Yes. When an observation fails during metric collection,
observation.getnow raises the underlying exception (e.g.PySparkExceptionin Python,SparkRuntimeExceptionin Scala) instead of returning an empty map.How was this patch tested?
New unit test in Python (
test_observation_errors_propagated_to_client); updated Scala Connect E2E test and DatasetSuite test to expect the exception with message containing"test error"instead of empty metrics.Was this patch authored or co-authored using generative AI tooling?
Yes