Skip to content

[SPARK-55314][CONNECT] Propagate observed metrics errors to client#54094

Open
heyihong wants to merge 2 commits intoapache:masterfrom
heyihong:SPARK-55314
Open

[SPARK-55314][CONNECT] Propagate observed metrics errors to client#54094
heyihong wants to merge 2 commits intoapache:masterfrom
heyihong:SPARK-55314

Conversation

@heyihong
Copy link
Contributor

@heyihong heyihong commented Feb 2, 2026

What changes were proposed in this pull request?

Propagate observation metric collection errors to the client in Spark Connect instead of silently returning empty metrics.

  • Proto: Add optional root_error_idx and repeated errors to ExecutePlanResponse.ObservedMetrics so the server can send observation failures.
  • Python: Add convert_observation_errors() and refactor exception conversion to support it; in the client, when observed metrics have root_error_idx set, convert and store the error on the Observation; in Observation.get, raise the stored error if present.
  • Scala/server: Use Try[Row] / Try[Seq[...]] for observed metrics end-to-end; on failure, serialize the throwable via ErrorUtils.throwableToProtoErrors and set root_error_idx/errors on ObservedMetrics; in Observation, rethrow the cause from getRow so the original failure is exposed.
  • Tests: New Python test and updated Scala Connect E2E and DatasetSuite tests for the new behavior.

Why are the changes needed?

Previously, when an error occurred during observation metric collection (SPARK-55150), the error was silently ignored and an empty result was returned. This was confusing for users since they would get empty metrics without knowing an error occurred. With this change, the actual error is propagated to the client so users can understand why their observation failed.

Does this PR introduce any user-facing change?

Yes. When an observation fails during metric collection, observation.get now raises the underlying exception (e.g. PySparkException in Python, SparkRuntimeException in Scala) instead of returning an empty map.

How was this patch tested?

New unit test in Python (test_observation_errors_propagated_to_client); updated Scala Connect E2E test and DatasetSuite test to expect the exception with message containing "test error" instead of empty metrics.

Was this patch authored or co-authored using generative AI tooling?

Yes

@heyihong heyihong changed the title [SPARK-55314][CONNECT] Propagate observed metrics errors to client [WIP][SPARK-55314][CONNECT] Propagate observed metrics errors to client Feb 2, 2026
@heyihong heyihong changed the title [WIP][SPARK-55314][CONNECT] Propagate observed metrics errors to client [SPARK-55314][CONNECT] Propagate observed metrics errors to client Feb 2, 2026
@heyihong heyihong force-pushed the SPARK-55314 branch 3 times, most recently from cc950ed to e127c3c Compare February 5, 2026 17:16
@github-actions
Copy link

github-actions bot commented Feb 5, 2026

JIRA Issue Information

=== Sub-task SPARK-55314 ===
Summary: Propagate observed metrics errors to client
Assignee: None
Status: Open
Affected: ["4.2.0"]


This comment was automatically generated by GitHub Actions

@heyihong heyihong force-pushed the SPARK-55314 branch 2 times, most recently from d64e517 to 8074f21 Compare February 6, 2026 22:56
@cloud-fan
Copy link
Contributor

I need a bit more context. What's the current protobuf protocol for observed metrics and how this PR changes it?

@heyihong
Copy link
Contributor Author

heyihong commented Feb 12, 2026

I need a bit more context. What's the current protobuf protocol for observed metrics and how this PR changes it?

@cloud-fan Yes, below is the summary of the success path:

  1. The spark connect server executes the plan; observations produce Rows.
  2. The server converts those into the protobuf message ObservedMetrics, which includes: name, keys, values (each value as Expression.Literal), and plan_id.
  3. The message is sent as ExecutePlanResponse.observed_metrics (repeated).
  4. The client treats each item as a success (for failures, it receives an empty result): keys and values have the same length; it decodes each Literal and builds either a Row (Scala) or PlanObservedMetrics / pairs (Python).

For this PR, success path is unchanged; it only adds reporting of observation collection failures via root_error_idx and errors, and surfaces those failures on both Scala and Python clients.

int64 plan_id = 4;
// (Optional) The index of the root error in errors.
// The field will not be set if there are no errors.
optional int32 root_error_idx = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is root error? how is this different from normal query errors?

Copy link
Contributor Author

@heyihong heyihong Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The root error is the top-level error in the error cause chain. The definition is the same as in https://github.com/apache/spark/blob/master/sql/connect/common/src/main/protobuf/spark/connect/base.proto#L1050-L1054.

optional int32 root_error_idx = 5;
// A list of errors that occurred while collecting the observed metrics.
// If the length is 0, it means no errors occurred.
repeated FetchErrorDetailsResponse.Error errors = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it's a list? because a query can have many observations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is for future extensions if we want to support returning cause exceptions.

@heyihong heyihong requested a review from cloud-fan February 13, 2026 15:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants