From ce4a70a5e1e959c4ad85c0dac6104c5c36b9f6f4 Mon Sep 17 00:00:00 2001 From: born_in_adversity <564361679@qq.com> Date: Sat, 18 Oct 2025 17:21:23 +0800 Subject: [PATCH] [waterflow] fix close flow --- .../modelengine/fit/waterflow/domain/states/State.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/State.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/State.java index 54b18447..dcc1184b 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/State.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/State.java @@ -8,6 +8,7 @@ import modelengine.fit.waterflow.domain.context.FlowContext; import modelengine.fit.waterflow.domain.context.FlowSession; +import modelengine.fit.waterflow.domain.context.Window; import modelengine.fit.waterflow.domain.context.repo.flowcontext.FlowContextRepo; import modelengine.fit.waterflow.domain.context.repo.flowsession.FlowSessionRepo; import modelengine.fit.waterflow.domain.emitters.Emitter; @@ -224,11 +225,13 @@ public F close(Operators.Just>> callback, Operators.Just FlowDebug.log(input.get().getSession(), "[close] " + this.getFlow().end().getStreamId() + ":" + "end. data:" + input.get().getData()); callback.process(input); - input.get().getWindow().peekAndConsume().finishConsume(); - input.get().getWindow().onDone(this.getFlow().end().getId(), () -> { + Window window = input.get().getWindow(); + window.peekAndConsume().finishConsume(); + window.onDone(this.getFlow().end().getId(), () -> { FlowSessionRepo.release(this.processor.getStreamId(), input.get().getSession()); this.getFlow().completeSession(input.get().getSession().getId()); }); + window.tryFinish(); }); if (sessionComplete != null) { getFlow().end().onSessionComplete(session -> {