diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java b/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java index 9d0dd81f..4c158dc6 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java @@ -107,14 +107,16 @@ private static final class Replacement implements SerializableOnlyOverRemoting { private static final long serialVersionUID = 1; private final RemoteOutputStream ros; - private final DelayBufferedOutputStream.Tuning tuning = DelayBufferedOutputStream.Tuning.DEFAULT; // load defaults on controller + // load defaults on controller + private final DelayBufferedOutputStream.Tuning tuning = DelayBufferedOutputStream.Tuning.DEFAULT; + private final boolean disableFlushOnGc = GCFlushedOutputStream.DISABLED; Replacement(BufferedBuildListener cbl) { this.ros = new RemoteOutputStream(new CloseProofOutputStream(cbl.out)); } private Object readResolve() { - return new BufferedBuildListener(new CloseableOutputStream(new GCFlushedOutputStream(new DelayBufferedOutputStream(ros, tuning)))); + return new BufferedBuildListener(new CloseableOutputStream(new GCFlushedOutputStream(new DelayBufferedOutputStream(ros, tuning), disableFlushOnGc))); } } diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/GCFlushedOutputStream.java b/src/main/java/org/jenkinsci/plugins/workflow/log/GCFlushedOutputStream.java index 0123213c..641921c3 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/GCFlushedOutputStream.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/GCFlushedOutputStream.java @@ -28,18 +28,17 @@ import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.lang.ref.PhantomReference; -import java.lang.ref.ReferenceQueue; -import java.util.concurrent.TimeUnit; +import java.lang.ref.Cleaner; import java.util.logging.Level; import java.util.logging.Logger; import edu.umd.cs.findbugs.annotations.NonNull; import hudson.remoting.ChannelClosedException; +import hudson.remoting.DaemonThreadFactory; +import hudson.remoting.NamingThreadFactory; import java.io.EOFException; import java.nio.channels.ClosedChannelException; import java.util.stream.Stream; -import jenkins.util.Timer; /** * A stream which will be flushed before garbage collection. @@ -48,10 +47,20 @@ final class GCFlushedOutputStream extends FilterOutputStream { private static final Logger LOGGER = Logger.getLogger(GCFlushedOutputStream.class.getName()); + private static final Cleaner CLEANER = Cleaner.create( + new NamingThreadFactory(new DaemonThreadFactory(), GCFlushedOutputStream.class.getName() + ".CLEANER")); + + static boolean DISABLED = Boolean.getBoolean(GCFlushedOutputStream.class.getName() + ".DISABLED"); GCFlushedOutputStream(OutputStream out) { + this(out, DISABLED); + } + + GCFlushedOutputStream(OutputStream out, boolean disabled) { super(out); - FlushRef.register(this, out); + if (!disabled) { + CLEANER.register(this, new CleanerTask(out)); + } } @Override public void write(@NonNull byte[] b, int off, int len) throws IOException { @@ -80,40 +89,23 @@ private static boolean isClosedChannelException(Throwable t) { /** * Flushes streams prior to garbage collection. * ({@link BufferedOutputStream} does not do this automatically.) - * TODO Java 9+ could use java.util.Cleaner */ - private static final class FlushRef extends PhantomReference { - - private static final ReferenceQueue rq = new ReferenceQueue<>(); - - static { - Timer.get().scheduleWithFixedDelay(() -> { - while (true) { - FlushRef ref = (FlushRef) rq.poll(); - if (ref == null) { - break; - } - LOGGER.log(Level.FINE, "flushing {0}", ref.out); - try { - ref.out.flush(); - } catch (IOException x) { - LOGGER.log(isClosedChannelException(x) ? Level.FINE : Level.WARNING, null, x); - } - } - }, 0, 10, TimeUnit.SECONDS); - } - - static void register(GCFlushedOutputStream fos, OutputStream out) { - new FlushRef(fos, out, rq).enqueue(); - } - + private static final class CleanerTask implements Runnable { private final OutputStream out; - private FlushRef(GCFlushedOutputStream fos, OutputStream out, ReferenceQueue rq) { - super(fos, rq); + public CleanerTask(OutputStream out) { this.out = out; } + @Override + public void run() { + LOGGER.log(Level.FINE, "flushing {0}", out); + try { + out.flush(); + } catch (IOException x) { + LOGGER.log(isClosedChannelException(x) ? Level.FINE : Level.WARNING, null, x); + } + } } } diff --git a/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java b/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java index a8cabbb8..3a4f0551 100644 --- a/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java +++ b/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java @@ -26,7 +26,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeThat; import edu.umd.cs.findbugs.annotations.NonNull; import hudson.console.AnnotatedLargeText; @@ -47,6 +49,7 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.logging.Level; import jenkins.model.CauseOfInterruption; @@ -64,7 +67,9 @@ import org.jenkinsci.plugins.workflow.job.WorkflowRun; import org.junit.Before; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; +import org.jvnet.hudson.test.FlagRule; import org.jvnet.hudson.test.JenkinsRule; import org.jvnet.hudson.test.LoggerRule; import org.springframework.security.core.Authentication; @@ -78,6 +83,16 @@ public abstract class LogStorageTestBase { @ClassRule public static LoggerRule logging = new LoggerRule(); + @Rule public FlagRule resetTuningMinRecurrence = new FlagRule<>( + () -> DelayBufferedOutputStream.Tuning.DEFAULT.minRecurrencePeriod, + x -> DelayBufferedOutputStream.Tuning.DEFAULT.minRecurrencePeriod = x); + @Rule public FlagRule resetTuningMaxRecurrence = new FlagRule<>( + () -> DelayBufferedOutputStream.Tuning.DEFAULT.maxRecurrencePeriod, + x -> DelayBufferedOutputStream.Tuning.DEFAULT.maxRecurrencePeriod = x); + @Rule public FlagRule resetGcFlushedOutputStream = new FlagRule<>( + () -> GCFlushedOutputStream.DISABLED, + x -> GCFlushedOutputStream.DISABLED = x); + /** Create a new storage implementation, but potentially reusing any data initialized in the last {@link Before} setup. */ protected abstract LogStorage createStorage(); @@ -171,9 +186,66 @@ protected static void close(TaskListener listener) throws Exception { } @Test public void remoting() throws Exception { + remotingParameterized(RemotingTestVariant.Regular); + } + + /** + * Verifies that flushing explicitly makes {@link BufferedBuildListener} and {@link GCFlushedOutputStream} unnecessary. + * Only relevant for {@link LogStorage} implementations that use {@link BufferedBuildListener}. + */ + @Test public void remotingNormalFlushOnly() throws Exception { + remotingParameterized(RemotingTestVariant.ManualFlushOnly); + } + + /** + * Verifies the behavior of {@link BufferedBuildListener}. + * Only relevant for {@link LogStorage} implementations that use {@link BufferedBuildListener}. + */ + @Test public void remotingDelayedAutoFlushOnly() throws Exception { + // To make this test fail, set `DelayBufferedOutputStream.Tuning.DEFAULT.*RecurrencePeriod` to very large values. + remotingParameterized(RemotingTestVariant.DelayedAutoFlushOnly); + } + + /** + * Verifies the behavior of {@link GCFlushedOutputStream}. + * Only relevant for {@link LogStorage} implementations that use {@link BufferedBuildListener}. + */ + @Test public void remotingGcFlushOnly() throws Exception { + // To make this test fail, set `GCFlushedOutputStream.DISABLED` to true. + remotingParameterized(RemotingTestVariant.GcFlushOnly); + } + + private enum RemotingTestVariant { + Regular(false, false, false), + ManualFlushOnly(true, true, false), + DelayedAutoFlushOnly(true, false, true), + GcFlushOnly(false, true, true); + + boolean disableGcFlush; + boolean disableDelayedAutoFlush; + boolean disableManualFlush; + + RemotingTestVariant(boolean disableGcFlush, boolean disableDelayedAutoFlush, boolean disableManualFlush) { + this.disableGcFlush = disableGcFlush; + this.disableDelayedAutoFlush = disableDelayedAutoFlush; + this.disableManualFlush = disableManualFlush; + } + } + private void remotingParameterized(RemotingTestVariant variant) throws Exception { logging.capture(100).record(Channel.class, Level.WARNING); + if (variant.disableDelayedAutoFlush) { + DelayBufferedOutputStream.Tuning.DEFAULT.minRecurrencePeriod = TimeUnit.HOURS.toMillis(1); + DelayBufferedOutputStream.Tuning.DEFAULT.maxRecurrencePeriod = TimeUnit.HOURS.toMillis(1); + } + if (variant.disableGcFlush) { + GCFlushedOutputStream.DISABLED = true; + } LogStorage ls = createStorage(); TaskListener overall = ls.overallListener(); + if (variant != RemotingTestVariant.Regular) { + assumeThat("Skipping BufferedBuildListener-specific tests because listener is " + overall, + overall, instanceOf(BufferedBuildListener.class)); + } overall.getLogger().println("overall from controller"); TaskListener step = ls.nodeListener(new MockNode("1")); step.getLogger().println("step from controller"); @@ -185,9 +257,8 @@ protected static void close(TaskListener listener) throws Exception { DumbSlave s = r.createOnlineSlave(); r.showAgentLogs(s, agentLoggers()); VirtualChannel channel = s.getChannel(); - channel.call(new RemotePrint("overall from agent", overall)); - channel.call(new RemotePrint("step from agent", step)); - channel.call(new GC()); + channel.call(new RemotePrint("overall from agent", overall, variant)); + channel.call(new RemotePrint("step from agent", step, variant)); overallPos = assertOverallLog(overallPos, lines( "overall from agent", "step from agent", @@ -202,22 +273,32 @@ protected Map agentLoggers() { } private static final class RemotePrint extends MasterToSlaveCallable { private final String message; - private final TaskListener listener; - RemotePrint(String message, TaskListener listener) { + private TaskListener listener; + private final RemotingTestVariant variant; + RemotePrint(String message, TaskListener listener, RemotingTestVariant variant) { this.message = message; this.listener = listener; + this.variant = variant; } - @Override public Void call() { + @Override public Void call() throws Exception { listener.getLogger().println(message); - listener.getLogger().flush(); - return null; - } - } - /** Checking behavior of {@link DelayBufferedOutputStream} garbage collection. */ - private static final class GC extends MasterToSlaveCallable { - @Override public Void call() { - System.gc(); - System.runFinalization(); + if (!variant.disableManualFlush) { + listener.getLogger().flush(); + } + switch (variant) { + case DelayedAutoFlushOnly -> { + // TODO: It would be better to wait for `DelayBufferedOutputStream.flushBuffer` to run exactly once. + Thread.sleep(DelayBufferedOutputStream.Tuning.DEFAULT.minRecurrencePeriod * 3); + } + case GcFlushOnly -> { + listener = null; + // Sleeping and calling `System.gc` more than once seem to be necessary for Cleaner to run reliably. + for (int i = 0; i < 3; i++) { + System.gc(); + Thread.sleep(1000); + } + } + } return null; } }