Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
Expand All @@ -39,8 +41,10 @@
/**
* Buffered output stream which is guaranteed to deliver content after some time even if idle and the buffer does not fill up.
* The automatic “flushing” does <em>not</em> flush the underlying stream, for example via {@code ProxyOutputStream.Flush}.
* Also the stream will be flushed before garbage collection.
* Otherwise it is similar to {@link BufferedOutputStream}.
*/
final class DelayBufferedOutputStream extends BufferedOutputStream {
final class DelayBufferedOutputStream extends FilterOutputStream {

private static final Logger LOGGER = Logger.getLogger(DelayBufferedOutputStream.class.getName());

Expand All @@ -54,6 +58,45 @@ private Tuning() {}
static final Tuning DEFAULT = new Tuning();
}

/**
* The interesting state of the buffered stream, kept as a separate object so that {@link FlushRef} can hold on to it.
*/
private static final class Buffer {

final OutputStream out;
private final byte[] dat;
private int pos;

Buffer(OutputStream out, int size) {
this.out = out;
dat = new byte[size];
}

synchronized void drain() throws IOException {
if (pos == 0) {
return;
}
out.write(dat, 0, pos);
pos = 0;
}

void write(int b) throws IOException {
assert Thread.holdsLock(this);
if (pos == dat.length) {
drain();
}
dat[pos++] = (byte) b;
}

synchronized void write(byte[] b, int off, int len) throws IOException {
for (int i = 0; i < len; i++) {
write(b[off + i]);
}
}

}

private final Buffer buf;
private final Tuning tuning;
private long recurrencePeriod;

Expand All @@ -62,85 +105,97 @@ private Tuning() {}
}

DelayBufferedOutputStream(OutputStream out, Tuning tuning) {
super(new FlushControlledOutputStream(out), tuning.bufferSize);
super(out);
buf = new Buffer(out, tuning.bufferSize);
this.tuning = tuning;
recurrencePeriod = tuning.minRecurrencePeriod;
FlushRef.register(this);
reschedule();
}

private void reschedule() {
Timer.get().schedule(new Flush(this), recurrencePeriod, TimeUnit.MILLISECONDS);
recurrencePeriod = Math.min((long) (recurrencePeriod * tuning.recurrencePeriodBackoff), tuning.maxRecurrencePeriod);
@Override public void write(int b) throws IOException {
synchronized (buf) {
buf.write(b);
}
}

/** We can only call {@link BufferedOutputStream#flushBuffer} via {@link #flush}, but we do not wish to flush the underlying stream, only write out the buffer. */
private void flushBuffer() throws IOException {
ThreadLocal<Boolean> enableFlush = ((FlushControlledOutputStream) out).enableFlush;
boolean orig = enableFlush.get();
enableFlush.set(false);
try {
flush();
} finally {
enableFlush.set(orig);
}
@Override public void write(byte[] b, int off, int len) throws IOException {
buf.write(b, off, len);
}

@Override public void flush() throws IOException {
buf.drain();
super.flush();
}

private void reschedule() {
Timer.get().schedule(new Drain(this), recurrencePeriod, TimeUnit.MILLISECONDS);
recurrencePeriod = Math.min((long) (recurrencePeriod * tuning.recurrencePeriodBackoff), tuning.maxRecurrencePeriod);
}

void flushAndReschedule() {
void drainAndReschedule() {
// TODO as an optimization, avoid flushing the buffer if it was recently flushed anyway due to filling up
try {
flushBuffer();
buf.drain();
} catch (IOException x) {
LOGGER.log(Level.FINE, null, x);
}
reschedule();
}

@SuppressWarnings("FinalizeDeclaration") // not ideal, but PhantomReference is more of a hassle
@Override protected void finalize() throws Throwable {
super.finalize();
// Odd that this is not the default behavior for BufferedOutputStream.
flush();
}

private static final class Flush implements Runnable {
private static final class Drain implements Runnable {

/** Since there is no explicit close event, just keep flushing periodically until the stream is collected. */
private final Reference<DelayBufferedOutputStream> osr;

Flush(DelayBufferedOutputStream os) {
Drain(DelayBufferedOutputStream os) {
osr = new WeakReference<>(os);
}

@Override public void run() {
DelayBufferedOutputStream os = osr.get();
if (os != null) {
os.flushAndReschedule();
os.drainAndReschedule();
}
}

}

/** @see DelayBufferedOutputStream#flushBuffer */
private static final class FlushControlledOutputStream extends FilterOutputStream {

private final ThreadLocal<Boolean> enableFlush = new ThreadLocal<Boolean>() {
@Override protected Boolean initialValue() {
return true;
}
};

FlushControlledOutputStream(OutputStream out) {
super(out);
/**
* Flushes streams prior to garbage collection.
* In Java 9+ could use {@code java.util.Cleaner} instead.
*/
private static final class FlushRef extends PhantomReference<DelayBufferedOutputStream> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seriously hacky, but I think this will work IIUC what it's doing.

Makes me wonder why Remoting doesn't have a way to register a callback to execute before disposing the lease on an object.


private static final ReferenceQueue<DelayBufferedOutputStream> rq = new ReferenceQueue<>();

static {
Timer.get().scheduleWithFixedDelay(() -> {
while (true) {
FlushRef ref = (FlushRef) rq.poll();
if (ref == null) {
break;
}
LOGGER.log(Level.FINE, "flushing {0} from a DelayBufferedOutputStream", ref.buf.out);
try {
ref.buf.drain();
ref.buf.out.flush();
} catch (IOException x) {
LOGGER.log(Level.WARNING, null, x);
}
}
}, 0, 10, TimeUnit.SECONDS);
}

@Override public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len); // super method writes one byte at a time!
static void register(DelayBufferedOutputStream dbos) {
new FlushRef(dbos, rq).enqueue();
}

@Override public void flush() throws IOException {
if (enableFlush.get()) {
super.flush();
}
private final Buffer buf;

private FlushRef(DelayBufferedOutputStream dbos, ReferenceQueue<DelayBufferedOutputStream> rq) {
super(dbos, rq);
this.buf = dbos.buf;
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import hudson.console.AnnotatedLargeText;
import hudson.console.HyperlinkNote;
import hudson.model.TaskListener;
import hudson.remoting.Channel;
import hudson.remoting.VirtualChannel;
import hudson.util.StreamTaskListener;
import java.io.EOFException;
import java.io.PrintWriter;
import java.io.StringWriter;
Expand All @@ -37,11 +39,18 @@
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.function.BiFunction;
import java.util.logging.Formatter;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import jenkins.security.MasterToSlaveCallable;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.output.NullOutputStream;
import org.apache.commons.io.output.NullWriter;
import org.apache.commons.io.output.WriterOutputStream;
import static org.hamcrest.Matchers.*;
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
Expand All @@ -51,6 +60,7 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.jvnet.hudson.test.JenkinsRule;
import org.jvnet.hudson.test.LoggerRule;

/**
* Foundation for compliance tests of {@link LogStorage} implementations.
Expand All @@ -63,6 +73,8 @@ public abstract class LogStorageTestBase {

@ClassRule public static JenkinsRule r = new JenkinsRule();

@ClassRule public static LoggerRule logging = new LoggerRule();

/** Create a new storage implementation, but potentially reusing any data initialized in the last {@link Before} setup. */
protected abstract LogStorage createStorage() throws Exception;

Expand Down Expand Up @@ -142,6 +154,7 @@ protected static void close(TaskListener listener) throws Exception {
}

@Test public void remoting() throws Exception {
logging.capture(100).record(Channel.class, Level.WARNING);
LogStorage ls = createStorage();
TaskListener overall = ls.overallListener();
overall.getLogger().println("overall from master");
Expand All @@ -150,12 +163,15 @@ protected static void close(TaskListener listener) throws Exception {
long overallPos = assertOverallLog(0, "overall from master\n<span class=\"pipeline-node-1\">step from master\n</span>", true);
long stepPos = assertStepLog("1", 0, "step from master\n", true);
VirtualChannel channel = r.createOnlineSlave().getChannel();
channel.call(new RemoteLogDumper("agent"));
channel.call(new RemotePrint("overall from agent", overall));
channel.call(new RemotePrint("step from agent", step));
channel.call(new GC());
overallPos = assertOverallLog(overallPos, "overall from agent\n<span class=\"pipeline-node-1\">step from agent\n</span>", true);
stepPos = assertStepLog("1", stepPos, "step from agent\n", true);
assertEquals(overallPos, assertOverallLog(overallPos, "", true));
assertEquals(stepPos, assertStepLog("1", stepPos, "", true));
assertThat(logging.getMessages(), empty());
}
private static final class RemotePrint extends MasterToSlaveCallable<Void, Exception> {
static {
Expand All @@ -173,6 +189,39 @@ private static final class RemotePrint extends MasterToSlaveCallable<Void, Excep
return null;
}
}
/** Checking behavior of {@link DelayBufferedOutputStream} garbage collection. */
private static final class GC extends MasterToSlaveCallable<Void, Exception> {
@Override public Void call() throws Exception {
System.gc();
System.runFinalization();
return null;
}
}
// TODO copied from pipeline-log-cloudwatch; consider whether this should be moved into LoggerRule
private static final class RemoteLogDumper extends MasterToSlaveCallable<Void, RuntimeException> {
private final String name;
private final TaskListener stderr = StreamTaskListener.fromStderr();
RemoteLogDumper(String name) {
this.name = name;
}
@Override public Void call() throws RuntimeException {
Handler handler = new Handler() {
final Formatter formatter = new SimpleFormatter();
@Override public void publish(LogRecord record) {
if (isLoggable(record)) {
stderr.getLogger().print(formatter.format(record).replaceAll("(?m)^", "[" + name + "] "));
}
}
@Override public void flush() {}
@Override public void close() throws SecurityException {}
};
handler.setLevel(Level.ALL);
Logger logger = Logger.getLogger(LogStorageTestBase.class.getPackage().getName());
logger.setLevel(Level.FINER);
logger.addHandler(handler);
return null;
}
}

/**
* Checks what happens when code using {@link TaskListener#getLogger} prints a line with inadequate synchronization.
Expand Down