diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/State.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/State.java index 54b18447..dcc1184b 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/State.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/State.java @@ -8,6 +8,7 @@ import modelengine.fit.waterflow.domain.context.FlowContext; import modelengine.fit.waterflow.domain.context.FlowSession; +import modelengine.fit.waterflow.domain.context.Window; import modelengine.fit.waterflow.domain.context.repo.flowcontext.FlowContextRepo; import modelengine.fit.waterflow.domain.context.repo.flowsession.FlowSessionRepo; import modelengine.fit.waterflow.domain.emitters.Emitter; @@ -224,11 +225,13 @@ public F close(Operators.Just>> callback, Operators.Just FlowDebug.log(input.get().getSession(), "[close] " + this.getFlow().end().getStreamId() + ":" + "end. data:" + input.get().getData()); callback.process(input); - input.get().getWindow().peekAndConsume().finishConsume(); - input.get().getWindow().onDone(this.getFlow().end().getId(), () -> { + Window window = input.get().getWindow(); + window.peekAndConsume().finishConsume(); + window.onDone(this.getFlow().end().getId(), () -> { FlowSessionRepo.release(this.processor.getStreamId(), input.get().getSession()); this.getFlow().completeSession(input.get().getSession().getId()); }); + window.tryFinish(); }); if (sessionComplete != null) { getFlow().end().onSessionComplete(session -> {