From af77e360a50f565a88c6cad560d81d4cd672285a Mon Sep 17 00:00:00 2001 From: chengyouling Date: Mon, 9 Jun 2025 15:54:06 +0800 Subject: [PATCH 1/9] [#4835] Fixed the fileUpload stream not closed problem --- .../foundation/vertx/http/FileUploadPart.java | 5 +- .../vertx/http/InputStreamWrapper.java | 79 ++++++++ .../vertx/http/OpenInputStreamRecorder.java | 174 ++++++++++++++++++ 3 files changed, 257 insertions(+), 1 deletion(-) create mode 100644 foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/InputStreamWrapper.java create mode 100644 foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/OpenInputStreamRecorder.java diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java index 9e72d64d289..dfed13739b8 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java @@ -36,7 +36,10 @@ public FileUploadPart(FileUpload fileUpload) { @Override public InputStream getInputStream() throws IOException { - return Files.newInputStream(new File(fileUpload.uploadedFileName()).toPath()); + final InputStream inputStream = Files.newInputStream(new File(fileUpload.uploadedFileName()).toPath()); + final InputStreamWrapper inputStreamWrapper = new InputStreamWrapper(inputStream); + OpenInputStreamRecorder.getInstance().recordOpenStream(inputStreamWrapper); + return inputStreamWrapper; } @Override diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/InputStreamWrapper.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/InputStreamWrapper.java new file mode 100644 index 00000000000..58762833642 --- /dev/null +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/InputStreamWrapper.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.foundation.vertx.http; + +import java.io.IOException; +import java.io.InputStream; + +public class InputStreamWrapper extends InputStream { + private final InputStream inputStream; + + public InputStreamWrapper(InputStream inputStream) { + this.inputStream = inputStream; + } + + public InputStream getInputStream() { + return inputStream; + } + + @Override + public int read() throws IOException { + return inputStream.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return inputStream.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return inputStream.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + return inputStream.skip(n); + } + + @Override + public int available() throws IOException { + return inputStream.available(); + } + + @Override + public boolean markSupported() { + return inputStream.markSupported(); + } + + @Override + public synchronized void mark(int readlimit) { + inputStream.mark(readlimit); + } + + @Override + public void close() throws IOException { + OpenInputStreamRecorder.getInstance().clearRecorder(this); + inputStream.close(); + } + + @Override + public synchronized void reset() throws IOException { + inputStream.reset(); + } +} diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/OpenInputStreamRecorder.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/OpenInputStreamRecorder.java new file mode 100644 index 00000000000..e4cd634bdf2 --- /dev/null +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/OpenInputStreamRecorder.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.foundation.vertx.http; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.servicecomb.foundation.common.event.EventManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.eventbus.EventBus; +import com.netflix.config.DynamicPropertyFactory; + +public class OpenInputStreamRecorder { + private static final Logger LOGGER = LoggerFactory.getLogger(OpenInputStreamRecorder.class); + + private static final OpenInputStreamRecorder RECORDER = new OpenInputStreamRecorder(); + + private static final String STREAM_OPEN_UPPER_LIMIT = "file.upload.stream.operate.upper-limit"; + + private static final String STREAM_STACKTRACE_ENABLED = "file.upload.stream.operate.stack-trace-enabled"; + + private static final String STREAM_CHECK_INTERVAL = "file.upload.stream.operate.check-interval"; + + private static final int DEFAULT_STREAM_OPEN_UPPER_LIMIT = 1000; + + private static final long DEFAULT_STREAM_CHECK_INTERVAL = 30000L; + + private final Map wrapperRecorder = new ConcurrentHashMap<>(); + + private final EventBus eventBus; + + private final ScheduledExecutorService streamCheckExecutor; + + private OpenInputStreamRecorder() { + eventBus = EventManager.getEventBus(); + streamCheckExecutor = Executors.newScheduledThreadPool(1, (t) -> new Thread(t, "stream-operate-check")); + startCheckOpenStream(); + } + + private void startCheckOpenStream() { + streamCheckExecutor.scheduleWithFixedDelay(this::checkOpenInputStream, DEFAULT_STREAM_CHECK_INTERVAL, + getStreamCheckInterval(), TimeUnit.MILLISECONDS); + } + + public static OpenInputStreamRecorder getInstance() { + return RECORDER; + } + + public void recordOpenStream(final InputStreamWrapper wrapper) { + checkAndRemoveOldestStream(); + wrapperRecorder.put(wrapper, new StreamOperateEvent(wrapper)); + } + + private void checkAndRemoveOldestStream() { + int upperLimit = getStreamOpenUpperLimit(); + if (wrapperRecorder.size() < upperLimit) { + return; + } + List operateEvents = new ArrayList<>(wrapperRecorder.values()); + List sortEvents = operateEvents.stream() + .sorted(Comparator.comparingLong(StreamOperateEvent::getOpenStreamTimestamp)).collect(Collectors.toList()); + StreamOperateEvent deleteEvent = sortEvents.get(0); + LOGGER.warn("reached upper limit [{}] of open stream, delete oldest stream, operate time [{}], stackTrace {}", + upperLimit, deleteEvent.getOpenStreamTimestamp(), deleteEvent.getInvokeStackTrace()); + closeStreamWrapper(deleteEvent.getInputStreamWrapper()); + } + + public void clearRecorder(InputStreamWrapper inputStreamWrapper) { + wrapperRecorder.remove(inputStreamWrapper); + } + + private void checkOpenInputStream() { + if (wrapperRecorder.isEmpty()) { + return; + } + List closeStreamWrapper = new ArrayList<>(); + for (Map.Entry entry : wrapperRecorder.entrySet()) { + StreamOperateEvent event = entry.getValue(); + long streamOperateTime = event.getOpenStreamTimestamp(); + long notifyTime = getStreamCheckInterval(); + + // If the check time exceeds three times, close the open stream. + if (System.currentTimeMillis() - streamOperateTime >= 3 * notifyTime) { + closeStreamWrapper.add(entry.getKey()); + continue; + } + if (System.currentTimeMillis() - streamOperateTime >= notifyTime) { + LOGGER.warn("there have input stream not closed, operate time [{}], operate stackTrace {}", + event.getOpenStreamTimestamp(), event.getInvokeStackTrace()); + eventBus.post(event); + } + } + for (InputStreamWrapper wrapper : closeStreamWrapper) { + closeStreamWrapper(wrapper); + LOGGER.warn("closed notify three times stream, operate time [{}], operate stackTrace {}", + wrapperRecorder.get(wrapper).getOpenStreamTimestamp(), wrapperRecorder.get(wrapper).getInvokeStackTrace()); + } + } + + private void closeStreamWrapper(InputStreamWrapper wrapper) { + try { + wrapper.close(); + } catch (IOException e) { + LOGGER.error("closed input stream failed!", e); + } + } + + private int getStreamOpenUpperLimit() { + return DynamicPropertyFactory.getInstance() + .getIntProperty(STREAM_OPEN_UPPER_LIMIT, DEFAULT_STREAM_OPEN_UPPER_LIMIT).get(); + } + + private boolean getStackTraceEnabled() { + return DynamicPropertyFactory.getInstance().getBooleanProperty(STREAM_STACKTRACE_ENABLED, true).get(); + } + + private long getStreamCheckInterval() { + return DynamicPropertyFactory.getInstance() + .getLongProperty(STREAM_CHECK_INTERVAL, DEFAULT_STREAM_CHECK_INTERVAL).get(); + } + + private class StreamOperateEvent { + private final InputStreamWrapper inputStreamWrapper; + + private final long openStreamTimestamp; + + private Exception invokeStackTrace; + + public StreamOperateEvent(InputStreamWrapper inputStreamWrapper) { + this.inputStreamWrapper = inputStreamWrapper; + if (getStackTraceEnabled()) { + this.invokeStackTrace = new Exception(); + } + this.openStreamTimestamp = System.currentTimeMillis(); + } + + public InputStreamWrapper getInputStreamWrapper() { + return inputStreamWrapper; + } + + public Exception getInvokeStackTrace() { + return invokeStackTrace; + } + + public long getOpenStreamTimestamp() { + return openStreamTimestamp; + } + } +} From 5c931b311c1be28e497c58b7a9540b9510acd5f9 Mon Sep 17 00:00:00 2001 From: chengyouling Date: Mon, 9 Jun 2025 16:20:26 +0800 Subject: [PATCH 2/9] change fileName --- .../foundation/vertx/http/FileUploadPart.java | 2 +- ...reamRecorder.java => FileUploadStreamRecorder.java} | 10 +++++----- .../foundation/vertx/http/InputStreamWrapper.java | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) rename foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/{OpenInputStreamRecorder.java => FileUploadStreamRecorder.java} (95%) diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java index dfed13739b8..c599048f881 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java @@ -38,7 +38,7 @@ public FileUploadPart(FileUpload fileUpload) { public InputStream getInputStream() throws IOException { final InputStream inputStream = Files.newInputStream(new File(fileUpload.uploadedFileName()).toPath()); final InputStreamWrapper inputStreamWrapper = new InputStreamWrapper(inputStream); - OpenInputStreamRecorder.getInstance().recordOpenStream(inputStreamWrapper); + FileUploadStreamRecorder.getInstance().recordOpenStream(inputStreamWrapper); return inputStreamWrapper; } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/OpenInputStreamRecorder.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java similarity index 95% rename from foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/OpenInputStreamRecorder.java rename to foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java index e4cd634bdf2..4aff76460ee 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/OpenInputStreamRecorder.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java @@ -35,10 +35,10 @@ import com.google.common.eventbus.EventBus; import com.netflix.config.DynamicPropertyFactory; -public class OpenInputStreamRecorder { - private static final Logger LOGGER = LoggerFactory.getLogger(OpenInputStreamRecorder.class); +public class FileUploadStreamRecorder { + private static final Logger LOGGER = LoggerFactory.getLogger(FileUploadStreamRecorder.class); - private static final OpenInputStreamRecorder RECORDER = new OpenInputStreamRecorder(); + private static final FileUploadStreamRecorder RECORDER = new FileUploadStreamRecorder(); private static final String STREAM_OPEN_UPPER_LIMIT = "file.upload.stream.operate.upper-limit"; @@ -56,7 +56,7 @@ public class OpenInputStreamRecorder { private final ScheduledExecutorService streamCheckExecutor; - private OpenInputStreamRecorder() { + private FileUploadStreamRecorder() { eventBus = EventManager.getEventBus(); streamCheckExecutor = Executors.newScheduledThreadPool(1, (t) -> new Thread(t, "stream-operate-check")); startCheckOpenStream(); @@ -67,7 +67,7 @@ private void startCheckOpenStream() { getStreamCheckInterval(), TimeUnit.MILLISECONDS); } - public static OpenInputStreamRecorder getInstance() { + public static FileUploadStreamRecorder getInstance() { return RECORDER; } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/InputStreamWrapper.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/InputStreamWrapper.java index 58762833642..6bbb095487a 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/InputStreamWrapper.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/InputStreamWrapper.java @@ -68,7 +68,7 @@ public synchronized void mark(int readlimit) { @Override public void close() throws IOException { - OpenInputStreamRecorder.getInstance().clearRecorder(this); + FileUploadStreamRecorder.getInstance().clearRecorder(this); inputStream.close(); } From eba320a229d74abde5b4cf64eedc97ea22300dea Mon Sep 17 00:00:00 2001 From: chengyouling Date: Tue, 10 Jun 2025 10:14:09 +0800 Subject: [PATCH 3/9] changed parameter name --- .../foundation/vertx/http/FileUploadStreamRecorder.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java index 4aff76460ee..0cf23c5a12e 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java @@ -98,7 +98,7 @@ private void checkOpenInputStream() { if (wrapperRecorder.isEmpty()) { return; } - List closeStreamWrapper = new ArrayList<>(); + List overdueStreams = new ArrayList<>(); for (Map.Entry entry : wrapperRecorder.entrySet()) { StreamOperateEvent event = entry.getValue(); long streamOperateTime = event.getOpenStreamTimestamp(); @@ -106,7 +106,7 @@ private void checkOpenInputStream() { // If the check time exceeds three times, close the open stream. if (System.currentTimeMillis() - streamOperateTime >= 3 * notifyTime) { - closeStreamWrapper.add(entry.getKey()); + overdueStreams.add(entry.getKey()); continue; } if (System.currentTimeMillis() - streamOperateTime >= notifyTime) { @@ -115,7 +115,7 @@ private void checkOpenInputStream() { eventBus.post(event); } } - for (InputStreamWrapper wrapper : closeStreamWrapper) { + for (InputStreamWrapper wrapper : overdueStreams) { closeStreamWrapper(wrapper); LOGGER.warn("closed notify three times stream, operate time [{}], operate stackTrace {}", wrapperRecorder.get(wrapper).getOpenStreamTimestamp(), wrapperRecorder.get(wrapper).getInvokeStackTrace()); From 274d4c454045d4d0935e13d893e3c3418aad1b4b Mon Sep 17 00:00:00 2001 From: chengyouling Date: Tue, 10 Jun 2025 16:53:53 +0800 Subject: [PATCH 4/9] fixed review options --- .../foundation/vertx/http/FileUploadPart.java | 4 +- .../vertx/http/FileUploadStreamRecorder.java | 53 ++++++++++--------- .../vertx/http/InputStreamWrapper.java | 5 +- 3 files changed, 32 insertions(+), 30 deletions(-) diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java index c599048f881..ba7acb2efc2 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadPart.java @@ -37,9 +37,7 @@ public FileUploadPart(FileUpload fileUpload) { @Override public InputStream getInputStream() throws IOException { final InputStream inputStream = Files.newInputStream(new File(fileUpload.uploadedFileName()).toPath()); - final InputStreamWrapper inputStreamWrapper = new InputStreamWrapper(inputStream); - FileUploadStreamRecorder.getInstance().recordOpenStream(inputStreamWrapper); - return inputStreamWrapper; + return new InputStreamWrapper(inputStream); } @Override diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java index 0cf23c5a12e..12e291f98b7 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java @@ -40,17 +40,18 @@ public class FileUploadStreamRecorder { private static final FileUploadStreamRecorder RECORDER = new FileUploadStreamRecorder(); - private static final String STREAM_OPEN_UPPER_LIMIT = "file.upload.stream.operate.upper-limit"; + private static final String STREAM_RECORDER_MAX_SIZE = "servicecomb.uploads.file.stream-recorder.max-size"; - private static final String STREAM_STACKTRACE_ENABLED = "file.upload.stream.operate.stack-trace-enabled"; + private static final String STREAM_STACKTRACE_ENABLED + = "servicecomb.uploads.file.stream-recorder.stack-trace-enabled"; - private static final String STREAM_CHECK_INTERVAL = "file.upload.stream.operate.check-interval"; + private static final String STREAM_CHECK_INTERVAL = "servicecomb.uploads.file.stream-recorder.check-interval"; private static final int DEFAULT_STREAM_OPEN_UPPER_LIMIT = 1000; private static final long DEFAULT_STREAM_CHECK_INTERVAL = 30000L; - private final Map wrapperRecorder = new ConcurrentHashMap<>(); + private final Map streamWrapperRecorder = new ConcurrentHashMap<>(); private final EventBus eventBus; @@ -58,12 +59,13 @@ public class FileUploadStreamRecorder { private FileUploadStreamRecorder() { eventBus = EventManager.getEventBus(); - streamCheckExecutor = Executors.newScheduledThreadPool(1, (t) -> new Thread(t, "stream-operate-check")); - startCheckOpenStream(); + streamCheckExecutor = Executors.newScheduledThreadPool(1, + (t) -> new Thread(t, "upload-file-stream-check")); + startCheckRecordFileStream(); } - private void startCheckOpenStream() { - streamCheckExecutor.scheduleWithFixedDelay(this::checkOpenInputStream, DEFAULT_STREAM_CHECK_INTERVAL, + private void startCheckRecordFileStream() { + streamCheckExecutor.scheduleWithFixedDelay(this::checkRecordFileStream, DEFAULT_STREAM_CHECK_INTERVAL, getStreamCheckInterval(), TimeUnit.MILLISECONDS); } @@ -73,33 +75,33 @@ public static FileUploadStreamRecorder getInstance() { public void recordOpenStream(final InputStreamWrapper wrapper) { checkAndRemoveOldestStream(); - wrapperRecorder.put(wrapper, new StreamOperateEvent(wrapper)); + streamWrapperRecorder.put(wrapper, new StreamOperateEvent(wrapper)); } private void checkAndRemoveOldestStream() { - int upperLimit = getStreamOpenUpperLimit(); - if (wrapperRecorder.size() < upperLimit) { + int maxSize = getStreamRecorderMaxSize(); + if (streamWrapperRecorder.size() < maxSize) { return; } - List operateEvents = new ArrayList<>(wrapperRecorder.values()); + List operateEvents = new ArrayList<>(streamWrapperRecorder.values()); List sortEvents = operateEvents.stream() .sorted(Comparator.comparingLong(StreamOperateEvent::getOpenStreamTimestamp)).collect(Collectors.toList()); StreamOperateEvent deleteEvent = sortEvents.get(0); - LOGGER.warn("reached upper limit [{}] of open stream, delete oldest stream, operate time [{}], stackTrace {}", - upperLimit, deleteEvent.getOpenStreamTimestamp(), deleteEvent.getInvokeStackTrace()); + LOGGER.warn("reached recorder maxSize [{}] of file stream, delete oldest stream, operate time [{}], stackTrace {}", + maxSize, deleteEvent.getOpenStreamTimestamp(), deleteEvent.getInvokeStackTrace()); closeStreamWrapper(deleteEvent.getInputStreamWrapper()); } public void clearRecorder(InputStreamWrapper inputStreamWrapper) { - wrapperRecorder.remove(inputStreamWrapper); + streamWrapperRecorder.remove(inputStreamWrapper); } - private void checkOpenInputStream() { - if (wrapperRecorder.isEmpty()) { + private void checkRecordFileStream() { + if (streamWrapperRecorder.isEmpty()) { return; } List overdueStreams = new ArrayList<>(); - for (Map.Entry entry : wrapperRecorder.entrySet()) { + for (Map.Entry entry : streamWrapperRecorder.entrySet()) { StreamOperateEvent event = entry.getValue(); long streamOperateTime = event.getOpenStreamTimestamp(); long notifyTime = getStreamCheckInterval(); @@ -110,7 +112,7 @@ private void checkOpenInputStream() { continue; } if (System.currentTimeMillis() - streamOperateTime >= notifyTime) { - LOGGER.warn("there have input stream not closed, operate time [{}], operate stackTrace {}", + LOGGER.warn("there have file stream not closed, operate time [{}], operate stackTrace {}", event.getOpenStreamTimestamp(), event.getInvokeStackTrace()); eventBus.post(event); } @@ -118,7 +120,8 @@ private void checkOpenInputStream() { for (InputStreamWrapper wrapper : overdueStreams) { closeStreamWrapper(wrapper); LOGGER.warn("closed notify three times stream, operate time [{}], operate stackTrace {}", - wrapperRecorder.get(wrapper).getOpenStreamTimestamp(), wrapperRecorder.get(wrapper).getInvokeStackTrace()); + streamWrapperRecorder.get(wrapper).getOpenStreamTimestamp(), + streamWrapperRecorder.get(wrapper).getInvokeStackTrace()); } } @@ -130,13 +133,13 @@ private void closeStreamWrapper(InputStreamWrapper wrapper) { } } - private int getStreamOpenUpperLimit() { + private int getStreamRecorderMaxSize() { return DynamicPropertyFactory.getInstance() - .getIntProperty(STREAM_OPEN_UPPER_LIMIT, DEFAULT_STREAM_OPEN_UPPER_LIMIT).get(); + .getIntProperty(STREAM_RECORDER_MAX_SIZE, DEFAULT_STREAM_OPEN_UPPER_LIMIT).get(); } - private boolean getStackTraceEnabled() { - return DynamicPropertyFactory.getInstance().getBooleanProperty(STREAM_STACKTRACE_ENABLED, true).get(); + private boolean getStreamStackTraceEnabled() { + return DynamicPropertyFactory.getInstance().getBooleanProperty(STREAM_STACKTRACE_ENABLED, false).get(); } private long getStreamCheckInterval() { @@ -153,7 +156,7 @@ private class StreamOperateEvent { public StreamOperateEvent(InputStreamWrapper inputStreamWrapper) { this.inputStreamWrapper = inputStreamWrapper; - if (getStackTraceEnabled()) { + if (getStreamStackTraceEnabled()) { this.invokeStackTrace = new Exception(); } this.openStreamTimestamp = System.currentTimeMillis(); diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/InputStreamWrapper.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/InputStreamWrapper.java index 6bbb095487a..3bf5889bcfb 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/InputStreamWrapper.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/InputStreamWrapper.java @@ -25,6 +25,7 @@ public class InputStreamWrapper extends InputStream { public InputStreamWrapper(InputStream inputStream) { this.inputStream = inputStream; + FileUploadStreamRecorder.getInstance().recordOpenStream(this); } public InputStream getInputStream() { @@ -62,7 +63,7 @@ public boolean markSupported() { } @Override - public synchronized void mark(int readlimit) { + public void mark(int readlimit) { inputStream.mark(readlimit); } @@ -73,7 +74,7 @@ public void close() throws IOException { } @Override - public synchronized void reset() throws IOException { + public void reset() throws IOException { inputStream.reset(); } } From d138dbd00d47529a84423ec4bf8531b081c6c65e Mon Sep 17 00:00:00 2001 From: chengyouling Date: Tue, 10 Jun 2025 20:28:54 +0800 Subject: [PATCH 5/9] add synchronized --- .../vertx/http/FileUploadStreamRecorder.java | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java index 12e291f98b7..9677fe2fe76 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java @@ -19,14 +19,13 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Comparator; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.servicecomb.foundation.common.event.EventManager; import org.slf4j.Logger; @@ -47,7 +46,7 @@ public class FileUploadStreamRecorder { private static final String STREAM_CHECK_INTERVAL = "servicecomb.uploads.file.stream-recorder.check-interval"; - private static final int DEFAULT_STREAM_OPEN_UPPER_LIMIT = 1000; + private static final int DEFAULT_STREAM_RECORDER_MAX_SIZE = 5000; private static final long DEFAULT_STREAM_CHECK_INTERVAL = 30000L; @@ -57,6 +56,8 @@ public class FileUploadStreamRecorder { private final ScheduledExecutorService streamCheckExecutor; + private final Object lock = new Object(); + private FileUploadStreamRecorder() { eventBus = EventManager.getEventBus(); streamCheckExecutor = Executors.newScheduledThreadPool(1, @@ -83,13 +84,26 @@ private void checkAndRemoveOldestStream() { if (streamWrapperRecorder.size() < maxSize) { return; } - List operateEvents = new ArrayList<>(streamWrapperRecorder.values()); - List sortEvents = operateEvents.stream() - .sorted(Comparator.comparingLong(StreamOperateEvent::getOpenStreamTimestamp)).collect(Collectors.toList()); - StreamOperateEvent deleteEvent = sortEvents.get(0); + StreamOperateEvent oldestEvent = getOldestOperateEvent(streamWrapperRecorder.values()); LOGGER.warn("reached recorder maxSize [{}] of file stream, delete oldest stream, operate time [{}], stackTrace {}", - maxSize, deleteEvent.getOpenStreamTimestamp(), deleteEvent.getInvokeStackTrace()); - closeStreamWrapper(deleteEvent.getInputStreamWrapper()); + maxSize, oldestEvent.getOpenStreamTimestamp(), oldestEvent.getInvokeStackTrace()); + closeStreamWrapper(oldestEvent.getInputStreamWrapper()); + } + + private StreamOperateEvent getOldestOperateEvent(Collection values) { + synchronized (lock) { + StreamOperateEvent oldestEvent = null; + for (StreamOperateEvent event : values) { + if (oldestEvent == null) { + oldestEvent = event; + continue; + } + if (oldestEvent.getOpenStreamTimestamp() > event.getOpenStreamTimestamp()) { + oldestEvent = event; + } + } + return oldestEvent; + } } public void clearRecorder(InputStreamWrapper inputStreamWrapper) { @@ -135,7 +149,7 @@ private void closeStreamWrapper(InputStreamWrapper wrapper) { private int getStreamRecorderMaxSize() { return DynamicPropertyFactory.getInstance() - .getIntProperty(STREAM_RECORDER_MAX_SIZE, DEFAULT_STREAM_OPEN_UPPER_LIMIT).get(); + .getIntProperty(STREAM_RECORDER_MAX_SIZE, DEFAULT_STREAM_RECORDER_MAX_SIZE).get(); } private boolean getStreamStackTraceEnabled() { From 9643768d4c0831cb1672d538a2c3517c40e9003a Mon Sep 17 00:00:00 2001 From: chengyouling Date: Tue, 10 Jun 2025 20:31:52 +0800 Subject: [PATCH 6/9] adjust synchronized --- .../vertx/http/FileUploadStreamRecorder.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java index 9677fe2fe76..071309f8576 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java @@ -84,26 +84,26 @@ private void checkAndRemoveOldestStream() { if (streamWrapperRecorder.size() < maxSize) { return; } - StreamOperateEvent oldestEvent = getOldestOperateEvent(streamWrapperRecorder.values()); - LOGGER.warn("reached recorder maxSize [{}] of file stream, delete oldest stream, operate time [{}], stackTrace {}", - maxSize, oldestEvent.getOpenStreamTimestamp(), oldestEvent.getInvokeStackTrace()); - closeStreamWrapper(oldestEvent.getInputStreamWrapper()); + synchronized (lock) { + StreamOperateEvent oldestEvent = getOldestOperateEvent(streamWrapperRecorder.values()); + LOGGER.warn("reached recorder maxSize [{}] of file stream, delete oldest stream, operate time [{}], stackTrace {}", + maxSize, oldestEvent.getOpenStreamTimestamp(), oldestEvent.getInvokeStackTrace()); + closeStreamWrapper(oldestEvent.getInputStreamWrapper()); + } } private StreamOperateEvent getOldestOperateEvent(Collection values) { - synchronized (lock) { - StreamOperateEvent oldestEvent = null; - for (StreamOperateEvent event : values) { - if (oldestEvent == null) { - oldestEvent = event; - continue; - } - if (oldestEvent.getOpenStreamTimestamp() > event.getOpenStreamTimestamp()) { - oldestEvent = event; - } + StreamOperateEvent oldestEvent = null; + for (StreamOperateEvent event : values) { + if (oldestEvent == null) { + oldestEvent = event; + continue; + } + if (oldestEvent.getOpenStreamTimestamp() > event.getOpenStreamTimestamp()) { + oldestEvent = event; } - return oldestEvent; } + return oldestEvent; } public void clearRecorder(InputStreamWrapper inputStreamWrapper) { From 9add4c759ef8f2ced96e75a1d4968aced6dd29cc Mon Sep 17 00:00:00 2001 From: chengyouling Date: Wed, 11 Jun 2025 11:27:04 +0800 Subject: [PATCH 7/9] add eventType --- .../vertx/http/FileUploadStreamRecorder.java | 62 ++++++++++++------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java index 071309f8576..c089c26ae6a 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java @@ -46,10 +46,14 @@ public class FileUploadStreamRecorder { private static final String STREAM_CHECK_INTERVAL = "servicecomb.uploads.file.stream-recorder.check-interval"; + private static final String STREAM_MAX_OPEN_TIME = "servicecomb.uploads.file.stream-recorder.stream-max-open-time"; + private static final int DEFAULT_STREAM_RECORDER_MAX_SIZE = 5000; private static final long DEFAULT_STREAM_CHECK_INTERVAL = 30000L; + private static final long DEFAULT_STREAM_MAX_OPEN_TIME = 90000L; + private final Map streamWrapperRecorder = new ConcurrentHashMap<>(); private final EventBus eventBus; @@ -86,8 +90,10 @@ private void checkAndRemoveOldestStream() { } synchronized (lock) { StreamOperateEvent oldestEvent = getOldestOperateEvent(streamWrapperRecorder.values()); - LOGGER.warn("reached recorder maxSize [{}] of file stream, delete oldest stream, operate time [{}], stackTrace {}", + LOGGER.warn("reached record maxSize [{}] of file stream, delete oldest stream, operate time [{}], stackTrace {}", maxSize, oldestEvent.getOpenStreamTimestamp(), oldestEvent.getInvokeStackTrace()); + oldestEvent.setEventType(EventType.OVER_SIZE); + eventBus.post(oldestEvent); closeStreamWrapper(oldestEvent.getInputStreamWrapper()); } } @@ -114,28 +120,20 @@ private void checkRecordFileStream() { if (streamWrapperRecorder.isEmpty()) { return; } - List overdueStreams = new ArrayList<>(); - for (Map.Entry entry : streamWrapperRecorder.entrySet()) { - StreamOperateEvent event = entry.getValue(); + List overdueStreamEvents = new ArrayList<>(); + long currentMillis = System.currentTimeMillis(); + for (StreamOperateEvent event : streamWrapperRecorder.values()) { long streamOperateTime = event.getOpenStreamTimestamp(); - long notifyTime = getStreamCheckInterval(); - - // If the check time exceeds three times, close the open stream. - if (System.currentTimeMillis() - streamOperateTime >= 3 * notifyTime) { - overdueStreams.add(entry.getKey()); - continue; - } - if (System.currentTimeMillis() - streamOperateTime >= notifyTime) { - LOGGER.warn("there have file stream not closed, operate time [{}], operate stackTrace {}", - event.getOpenStreamTimestamp(), event.getInvokeStackTrace()); - eventBus.post(event); + if (currentMillis - streamOperateTime >= getStreamMaxOpenTime()) { + overdueStreamEvents.add(event); } } - for (InputStreamWrapper wrapper : overdueStreams) { - closeStreamWrapper(wrapper); - LOGGER.warn("closed notify three times stream, operate time [{}], operate stackTrace {}", - streamWrapperRecorder.get(wrapper).getOpenStreamTimestamp(), - streamWrapperRecorder.get(wrapper).getInvokeStackTrace()); + for (StreamOperateEvent overdueEvent : overdueStreamEvents) { + overdueEvent.setEventType(EventType.TIMEOUT); + eventBus.post(overdueEvent); + closeStreamWrapper(overdueEvent.getInputStreamWrapper()); + LOGGER.warn("closed timeout stream, operate time [{}], operate stackTrace {}", + overdueEvent.getOpenStreamTimestamp(), overdueEvent.getInvokeStackTrace()); } } @@ -152,7 +150,7 @@ private int getStreamRecorderMaxSize() { .getIntProperty(STREAM_RECORDER_MAX_SIZE, DEFAULT_STREAM_RECORDER_MAX_SIZE).get(); } - private boolean getStreamStackTraceEnabled() { + private static boolean getStreamStackTraceEnabled() { return DynamicPropertyFactory.getInstance().getBooleanProperty(STREAM_STACKTRACE_ENABLED, false).get(); } @@ -161,13 +159,20 @@ private long getStreamCheckInterval() { .getLongProperty(STREAM_CHECK_INTERVAL, DEFAULT_STREAM_CHECK_INTERVAL).get(); } - private class StreamOperateEvent { + private long getStreamMaxOpenTime() { + return DynamicPropertyFactory.getInstance() + .getLongProperty(STREAM_MAX_OPEN_TIME, DEFAULT_STREAM_MAX_OPEN_TIME).get(); + } + + public static class StreamOperateEvent { private final InputStreamWrapper inputStreamWrapper; private final long openStreamTimestamp; private Exception invokeStackTrace; + private EventType eventType; + public StreamOperateEvent(InputStreamWrapper inputStreamWrapper) { this.inputStreamWrapper = inputStreamWrapper; if (getStreamStackTraceEnabled()) { @@ -187,5 +192,18 @@ public Exception getInvokeStackTrace() { public long getOpenStreamTimestamp() { return openStreamTimestamp; } + + public EventType getEventType() { + return eventType; + } + + public void setEventType(EventType eventType) { + this.eventType = eventType; + } + } + + public enum EventType { + OVER_SIZE, + TIMEOUT } } From 08b1f93bb7daa73caaae43edba1dc420b40323e4 Mon Sep 17 00:00:00 2001 From: chengyouling Date: Thu, 12 Jun 2025 09:25:21 +0800 Subject: [PATCH 8/9] Configuration items are named in CamelCase format. --- .../vertx/http/FileUploadStreamRecorder.java | 46 ++++++++++--------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java index c089c26ae6a..6ea3c6aa93b 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java @@ -39,14 +39,14 @@ public class FileUploadStreamRecorder { private static final FileUploadStreamRecorder RECORDER = new FileUploadStreamRecorder(); - private static final String STREAM_RECORDER_MAX_SIZE = "servicecomb.uploads.file.stream-recorder.max-size"; + private static final String STREAM_RECORDER_MAX_SIZE = "servicecomb.uploads.file.streamRecorder.maxSize"; private static final String STREAM_STACKTRACE_ENABLED - = "servicecomb.uploads.file.stream-recorder.stack-trace-enabled"; + = "servicecomb.uploads.file.streamRecorder.stackTraceEnabled"; - private static final String STREAM_CHECK_INTERVAL = "servicecomb.uploads.file.stream-recorder.check-interval"; + private static final String STREAM_CHECK_INTERVAL = "servicecomb.uploads.file.streamRecorder.checkInterval"; - private static final String STREAM_MAX_OPEN_TIME = "servicecomb.uploads.file.stream-recorder.stream-max-open-time"; + private static final String STREAM_MAX_OPEN_TIME = "servicecomb.uploads.file.streamRecorder.streamMaxOpenTime"; private static final int DEFAULT_STREAM_RECORDER_MAX_SIZE = 5000; @@ -90,7 +90,7 @@ private void checkAndRemoveOldestStream() { } synchronized (lock) { StreamOperateEvent oldestEvent = getOldestOperateEvent(streamWrapperRecorder.values()); - LOGGER.warn("reached record maxSize [{}] of file stream, delete oldest stream, operate time [{}], stackTrace {}", + LOGGER.warn("reached record maxSize [{}] of file stream, delete oldest stream, operate time [{}], stackTrace: ", maxSize, oldestEvent.getOpenStreamTimestamp(), oldestEvent.getInvokeStackTrace()); oldestEvent.setEventType(EventType.OVER_SIZE); eventBus.post(oldestEvent); @@ -117,23 +117,27 @@ public void clearRecorder(InputStreamWrapper inputStreamWrapper) { } private void checkRecordFileStream() { - if (streamWrapperRecorder.isEmpty()) { - return; - } - List overdueStreamEvents = new ArrayList<>(); - long currentMillis = System.currentTimeMillis(); - for (StreamOperateEvent event : streamWrapperRecorder.values()) { - long streamOperateTime = event.getOpenStreamTimestamp(); - if (currentMillis - streamOperateTime >= getStreamMaxOpenTime()) { - overdueStreamEvents.add(event); + try { + if (streamWrapperRecorder.isEmpty()) { + return; } - } - for (StreamOperateEvent overdueEvent : overdueStreamEvents) { - overdueEvent.setEventType(EventType.TIMEOUT); - eventBus.post(overdueEvent); - closeStreamWrapper(overdueEvent.getInputStreamWrapper()); - LOGGER.warn("closed timeout stream, operate time [{}], operate stackTrace {}", - overdueEvent.getOpenStreamTimestamp(), overdueEvent.getInvokeStackTrace()); + List overdueStreamEvents = new ArrayList<>(); + long currentMillis = System.currentTimeMillis(); + for (StreamOperateEvent event : streamWrapperRecorder.values()) { + long streamOperateTime = event.getOpenStreamTimestamp(); + if (currentMillis - streamOperateTime >= getStreamMaxOpenTime()) { + overdueStreamEvents.add(event); + } + } + for (StreamOperateEvent overdueEvent : overdueStreamEvents) { + overdueEvent.setEventType(EventType.TIMEOUT); + eventBus.post(overdueEvent); + closeStreamWrapper(overdueEvent.getInputStreamWrapper()); + LOGGER.warn("closed timeout stream, operate time [{}], operate stackTrace: ", + overdueEvent.getOpenStreamTimestamp(), overdueEvent.getInvokeStackTrace()); + } + } catch (Exception e) { + LOGGER.error("checkRecordFileStream failed, next interval will try again.", e); } } From 622f399a8bfb640235907ba44c6e34f2dd6b034f Mon Sep 17 00:00:00 2001 From: chengyouling Date: Fri, 13 Jun 2025 15:05:49 +0800 Subject: [PATCH 9/9] change config modifiers private to public --- .../vertx/http/FileUploadStreamRecorder.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java index 6ea3c6aa93b..289914f986b 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/http/FileUploadStreamRecorder.java @@ -39,20 +39,20 @@ public class FileUploadStreamRecorder { private static final FileUploadStreamRecorder RECORDER = new FileUploadStreamRecorder(); - private static final String STREAM_RECORDER_MAX_SIZE = "servicecomb.uploads.file.streamRecorder.maxSize"; + public static final String STREAM_RECORDER_MAX_SIZE = "servicecomb.uploads.file.streamRecorder.maxSize"; - private static final String STREAM_STACKTRACE_ENABLED + public static final String STREAM_STACKTRACE_ENABLED = "servicecomb.uploads.file.streamRecorder.stackTraceEnabled"; - private static final String STREAM_CHECK_INTERVAL = "servicecomb.uploads.file.streamRecorder.checkInterval"; + public static final String STREAM_CHECK_INTERVAL = "servicecomb.uploads.file.streamRecorder.checkInterval"; - private static final String STREAM_MAX_OPEN_TIME = "servicecomb.uploads.file.streamRecorder.streamMaxOpenTime"; + public static final String STREAM_MAX_OPEN_TIME = "servicecomb.uploads.file.streamRecorder.streamMaxOpenTime"; - private static final int DEFAULT_STREAM_RECORDER_MAX_SIZE = 5000; + public static final int DEFAULT_STREAM_RECORDER_MAX_SIZE = 5000; - private static final long DEFAULT_STREAM_CHECK_INTERVAL = 30000L; + public static final long DEFAULT_STREAM_CHECK_INTERVAL = 30000L; - private static final long DEFAULT_STREAM_MAX_OPEN_TIME = 90000L; + public static final long DEFAULT_STREAM_MAX_OPEN_TIME = 90000L; private final Map streamWrapperRecorder = new ConcurrentHashMap<>();