-
Notifications
You must be signed in to change notification settings - Fork 828
[#4873] Support for SSE interface RPC calls #4875
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… add SseEventResponseEntity<T> adapt setting retry/event attributes
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.servicecomb.samples; | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test cases here will not run as integration tests. Can you add tests in some other module that run as integration tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, will add in spring-mvc demo
.../main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceEventStreamProcessor.java
Outdated
Show resolved
Hide resolved
...rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/servicecomb/common/rest/codec/produce/ProduceEventStreamProcessor.java
Outdated
Show resolved
Hide resolved
| default void setChunkedForEvent(boolean chunked) { | ||
| // not set header transfer-encoding=chunked in Rest Over Servlet, or will have Multiple in response. | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe change setChunked implementation to first check if header exists and then add it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The chunked header is empty when constructing the response in Rest Over Servlet model, so it be always add chunked header information, and tomcat itself will actively construct the chunked response header in the streaming scenario.
| public Object doDecodeResponse(InputStream input, JavaType type) throws Exception { | ||
| String buffer = new String(input.readAllBytes(), StandardCharsets.UTF_8); | ||
| SseEventResponseEntity<?> responseEntity = new SseEventResponseEntity<>(); | ||
| for (String line : buffer.split("\n")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里可能要识别一下\n, \r, \r\n 三种场景, 对SSE协议而言, 这三种场景都是合法的行分隔方式. 参考RFC8895 3.4 节的描述:
The server continually sends messages. Each message has one or more lines, where a line is terminated by a carriage return immediately followed by a new line, a carriage return not immediately followed by a new line, or a new line not immediately preceded by a carriage return. A message is terminated by a blank line (two line terminators in a row).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| responseEntity.data(RestObjectMapperFactory.getRestObjectMapper() | ||
| .readValue(line.substring(6), type)); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里要不要加个消息合法性校验, 比如之前集成测试遇到过的消息粘包导致的解析问题. 如果字段重复了, 打一个告警日志会好分析很多.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| } | ||
| } | ||
|
|
||
| default void refreshEventId(int index) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refreshEventId 是 SSE 场景特有的方法, 但我们把它加在 ProduceProcessor 里面去了, 这通常并不合适.
建议考虑把它放在 ProduceEventStreamProcessor 中.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| } | ||
|
|
||
| private boolean checkServerSendEvents() { | ||
| return !CollectionUtils.isEmpty(produces) && produces.contains(EVENTS_MEDIA_TYPE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MIME type 取值为 text/event-stream;charset=utf-8 也是合法的. 同样是这个类里的 doCreateProduceProcessors 方法在处理 produces 属性值的时候就考虑到了 MIME type 带分号的情况. 这里建议也优化一下实现细节.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| } | ||
|
|
||
| if (restOperationMeta != null && restOperationMeta.isServerSendEvents()) { | ||
| processFlowableResponseBody(FlowableHelper.toFlowable(httpClientResponse)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里可能需要再确认一下, 如果微服务客户端调用服务端的SSE接口方法, 但是服务端报错了, 比如返回 403 状态码的 application/json 响应, 那么这里是否就不应该以 SSE 的形式去处理 response body 了呢?
换句话说, 这里的 if 判断条件应该以服务端返回的实际 Content-Type 为准会更合适一些?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| appendData(bufferBuilder, responseEntity.getData()); | ||
| bufferBuilder.append("\n"); | ||
| output.write(bufferBuilder.toString().getBytes(StandardCharsets.UTF_8)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
建议保留 else 分支, 打印告警日志 ---- 如果 result 类型不是 SseEventResponseEntity, 我们不应该悄无声息地把错误放过去了.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| if (eventId == null) { | ||
| return; | ||
| } | ||
| eventBuilder.append("eventId: ").append(eventId.intValue()).append("\n"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
协议规定的消息ID的名字是"id", 不是"eventId", 这里需要改一下
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| public class PublisherProducerResponseMapper implements ProducerResponseMapper { | ||
| private final boolean shouldConstructEntity; | ||
|
|
||
| private int writeIndex = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PublisherProducerResponseMapper 是全局共享的实例. 消息ID如果是通过这个对象的属性累加得到的, 就会存在不同SSE消息流之间 ID 串线的问题.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| if (this.data != null) { | ||
| LOGGER.warn("origin data: [{}] is exists, overridden by the current value: [{}]", this.data, data); | ||
| } | ||
| this.data = (T) data; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
同一条消息里, 可以有多行data, 所以这里还不能直接打告警日志再覆盖.
还是参考 https://datatracker.ietf.org/doc/html/rfc8895
If a message has more than one "data" line, the value of the data field is the concatenation of the values on those lines. There can be only one "event" and "id" line per message. The "data" field is required; the others are optional.
下图是RFC文档给出来的样例(注意红框部分):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里建议参考一下 Spring 的 org.springframework.web.servlet.mvc.method.annotation.SseEmitter.SseEventBuilderImpl#data(java.lang.Object, org.springframework.http.MediaType) 方法. 它在处理 SSE 的消息的时候允许业务在同一条消息中添加多个 data 字段, 并且如果消息中包含换行符时, 自动将一个data字段拆分成多个data字段存放在消息中.
(当然这里是服务端的逻辑, 客户端应该也有类似的处理措施).
我觉得这里刚好顺带解答了另外一个问题, 就是如果\r\n, \r, \n都是 SSE 消息的本身的换行符, 那么消息内容中的换行符应该如何表示? 从 Spring 给的解决方案来看, 大概率就是一条消息中包含多个 data 字段, 就表示这条消息有多行.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
No description provided.