Skip to content

Conversation

@chengyouling
Copy link
Contributor

No description provided.

@chengyouling chengyouling self-assigned this Jul 9, 2025
Comment on lines 11 to 19
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.samples;


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test cases here will not run as integration tests. Can you add tests in some other module that run as integration tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, will add in spring-mvc demo

Comment on lines +52 to +54
default void setChunkedForEvent(boolean chunked) {
// not set header transfer-encoding=chunked in Rest Over Servlet, or will have Multiple in response.
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change setChunked implementation to first check if header exists and then add it?

Copy link
Contributor Author

@chengyouling chengyouling Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chunked header is empty when constructing the response in Rest Over Servlet model, so it be always add chunked header information, and tomcat itself will actively construct the chunked response header in the streaming scenario.

public Object doDecodeResponse(InputStream input, JavaType type) throws Exception {
String buffer = new String(input.readAllBytes(), StandardCharsets.UTF_8);
SseEventResponseEntity<?> responseEntity = new SseEventResponseEntity<>();
for (String line : buffer.split("\n")) {
Copy link
Member

@yhs0092 yhs0092 Aug 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里可能要识别一下\n, \r, \r\n 三种场景, 对SSE协议而言, 这三种场景都是合法的行分隔方式. 参考RFC8895 3.4 节的描述:

The server continually sends messages. Each message has one or more lines, where a line is terminated by a carriage return immediately followed by a new line, a carriage return not immediately followed by a new line, or a new line not immediately preceded by a carriage return. A message is terminated by a blank line (two line terminators in a row).

补充文档链接: https://datatracker.ietf.org/doc/html/rfc8895

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

responseEntity.data(RestObjectMapperFactory.getRestObjectMapper()
.readValue(line.substring(6), type));
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里要不要加个消息合法性校验, 比如之前集成测试遇到过的消息粘包导致的解析问题. 如果字段重复了, 打一个告警日志会好分析很多.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
}

default void refreshEventId(int index) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refreshEventId 是 SSE 场景特有的方法, 但我们把它加在 ProduceProcessor 里面去了, 这通常并不合适.
建议考虑把它放在 ProduceEventStreamProcessor 中.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}

private boolean checkServerSendEvents() {
return !CollectionUtils.isEmpty(produces) && produces.contains(EVENTS_MEDIA_TYPE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MIME type 取值为 text/event-stream;charset=utf-8 也是合法的. 同样是这个类里的 doCreateProduceProcessors 方法在处理 produces 属性值的时候就考虑到了 MIME type 带分号的情况. 这里建议也优化一下实现细节.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}

if (restOperationMeta != null && restOperationMeta.isServerSendEvents()) {
processFlowableResponseBody(FlowableHelper.toFlowable(httpClientResponse));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里可能需要再确认一下, 如果微服务客户端调用服务端的SSE接口方法, 但是服务端报错了, 比如返回 403 状态码的 application/json 响应, 那么这里是否就不应该以 SSE 的形式去处理 response body 了呢?

换句话说, 这里的 if 判断条件应该以服务端返回的实际 Content-Type 为准会更合适一些?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

appendData(bufferBuilder, responseEntity.getData());
bufferBuilder.append("\n");
output.write(bufferBuilder.toString().getBytes(StandardCharsets.UTF_8));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议保留 else 分支, 打印告警日志 ---- 如果 result 类型不是 SseEventResponseEntity, 我们不应该悄无声息地把错误放过去了.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if (eventId == null) {
return;
}
eventBuilder.append("eventId: ").append(eventId.intValue()).append("\n");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

协议规定的消息ID的名字是"id", 不是"eventId", 这里需要改一下

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

public class PublisherProducerResponseMapper implements ProducerResponseMapper {
private final boolean shouldConstructEntity;

private int writeIndex = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PublisherProducerResponseMapper 是全局共享的实例. 消息ID如果是通过这个对象的属性累加得到的, 就会存在不同SSE消息流之间 ID 串线的问题.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if (this.data != null) {
LOGGER.warn("origin data: [{}] is exists, overridden by the current value: [{}]", this.data, data);
}
this.data = (T) data;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同一条消息里, 可以有多行data, 所以这里还不能直接打告警日志再覆盖.

还是参考 https://datatracker.ietf.org/doc/html/rfc8895

If a message has more than one "data" line, the value of the data field is the concatenation of the values on those lines. There can be only one "event" and "id" line per message. The "data" field is required; the others are optional.

下图是RFC文档给出来的样例(注意红框部分):

image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里建议参考一下 Spring 的 org.springframework.web.servlet.mvc.method.annotation.SseEmitter.SseEventBuilderImpl#data(java.lang.Object, org.springframework.http.MediaType) 方法. 它在处理 SSE 的消息的时候允许业务在同一条消息中添加多个 data 字段, 并且如果消息中包含换行符时, 自动将一个data字段拆分成多个data字段存放在消息中.
(当然这里是服务端的逻辑, 客户端应该也有类似的处理措施).

我觉得这里刚好顺带解答了另外一个问题, 就是如果\r\n, \r, \n都是 SSE 消息的本身的换行符, 那么消息内容中的换行符应该如何表示? 从 Spring 给的解决方案来看, 大概率就是一条消息中包含多个 data 字段, 就表示这条消息有多行.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@chengyouling chengyouling merged commit cacfb79 into apache:2.9.x Aug 20, 2025
6 checks passed
@chengyouling chengyouling deleted the 2.9.x-sse branch January 6, 2026 07:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants