Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,16 @@
private static final long serialVersionUID = 1;

private final RemoteOutputStream ros;
private final DelayBufferedOutputStream.Tuning tuning = DelayBufferedOutputStream.Tuning.DEFAULT; // load defaults on controller
// load defaults on controller
private final DelayBufferedOutputStream.Tuning tuning = DelayBufferedOutputStream.Tuning.DEFAULT;
private final boolean disableFlushOnGc = GCFlushedOutputStream.DISABLED;

Replacement(BufferedBuildListener cbl) {
this.ros = new RemoteOutputStream(new CloseProofOutputStream(cbl.out));
}

private Object readResolve() {
return new BufferedBuildListener(new CloseableOutputStream(new GCFlushedOutputStream(new DelayBufferedOutputStream(ros, tuning))));
return new BufferedBuildListener(new CloseableOutputStream(new GCFlushedOutputStream(new DelayBufferedOutputStream(ros, tuning), disableFlushOnGc)));

Check warning on line 119 in src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java

View check run for this annotation

ci.jenkins.io / Code Coverage

Not covered line

Line 119 is not covered by tests
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,17 @@
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.util.concurrent.TimeUnit;
import java.lang.ref.Cleaner;
import java.util.logging.Level;
import java.util.logging.Logger;

import edu.umd.cs.findbugs.annotations.NonNull;
import hudson.remoting.ChannelClosedException;
import hudson.remoting.DaemonThreadFactory;
import hudson.remoting.NamingThreadFactory;
import java.io.EOFException;
import java.nio.channels.ClosedChannelException;
import java.util.stream.Stream;
import jenkins.util.Timer;

/**
* A stream which will be flushed before garbage collection.
Expand All @@ -48,10 +47,20 @@
final class GCFlushedOutputStream extends FilterOutputStream {

private static final Logger LOGGER = Logger.getLogger(GCFlushedOutputStream.class.getName());
private static final Cleaner CLEANER = Cleaner.create(
new NamingThreadFactory(new DaemonThreadFactory(), GCFlushedOutputStream.class.getName() + ".CLEANER"));

static boolean DISABLED = Boolean.getBoolean(GCFlushedOutputStream.class.getName() + ".DISABLED");
Copy link
Member

@jglick jglick Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use SystemProperties generally. Also prefer positive to negative sense, e.g.

static final boolean ENABLED = SystemProperties.getBoolean(XXX + ".enabled", true);


GCFlushedOutputStream(OutputStream out) {
this(out, DISABLED);
}

GCFlushedOutputStream(OutputStream out, boolean disabled) {
super(out);
FlushRef.register(this, out);
if (!disabled) {
CLEANER.register(this, new CleanerTask(out));
}
}

@Override public void write(@NonNull byte[] b, int off, int len) throws IOException {
Expand Down Expand Up @@ -80,40 +89,23 @@ private static boolean isClosedChannelException(Throwable t) {
/**
* Flushes streams prior to garbage collection.
* ({@link BufferedOutputStream} does not do this automatically.)
* TODO Java 9+ could use java.util.Cleaner
*/
private static final class FlushRef extends PhantomReference<GCFlushedOutputStream> {

private static final ReferenceQueue<GCFlushedOutputStream> rq = new ReferenceQueue<>();

static {
Timer.get().scheduleWithFixedDelay(() -> {
while (true) {
FlushRef ref = (FlushRef) rq.poll();
if (ref == null) {
break;
}
LOGGER.log(Level.FINE, "flushing {0}", ref.out);
try {
ref.out.flush();
} catch (IOException x) {
LOGGER.log(isClosedChannelException(x) ? Level.FINE : Level.WARNING, null, x);
}
}
}, 0, 10, TimeUnit.SECONDS);
}

static void register(GCFlushedOutputStream fos, OutputStream out) {
new FlushRef(fos, out, rq).enqueue();
}

private static final class CleanerTask implements Runnable {
private final OutputStream out;

private FlushRef(GCFlushedOutputStream fos, OutputStream out, ReferenceQueue<GCFlushedOutputStream> rq) {
super(fos, rq);
public CleanerTask(OutputStream out) {
this.out = out;
}

@Override
public void run() {
LOGGER.log(Level.FINE, "flushing {0}", out);
try {
out.flush();
} catch (IOException x) {
LOGGER.log(isClosedChannelException(x) ? Level.FINE : Level.WARNING, null, x);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeThat;

import edu.umd.cs.findbugs.annotations.NonNull;
import hudson.console.AnnotatedLargeText;
Expand All @@ -47,6 +49,7 @@
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.logging.Level;
import jenkins.model.CauseOfInterruption;
Expand All @@ -64,7 +67,9 @@
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.jvnet.hudson.test.FlagRule;
import org.jvnet.hudson.test.JenkinsRule;
import org.jvnet.hudson.test.LoggerRule;
import org.springframework.security.core.Authentication;
Expand All @@ -78,6 +83,16 @@

@ClassRule public static LoggerRule logging = new LoggerRule();

@Rule public FlagRule<Long> resetTuningMinRecurrence = new FlagRule<>(
() -> DelayBufferedOutputStream.Tuning.DEFAULT.minRecurrencePeriod,
x -> DelayBufferedOutputStream.Tuning.DEFAULT.minRecurrencePeriod = x);
@Rule public FlagRule<Long> resetTuningMaxRecurrence = new FlagRule<>(
() -> DelayBufferedOutputStream.Tuning.DEFAULT.maxRecurrencePeriod,
x -> DelayBufferedOutputStream.Tuning.DEFAULT.maxRecurrencePeriod = x);
@Rule public FlagRule<Boolean> resetGcFlushedOutputStream = new FlagRule<>(
() -> GCFlushedOutputStream.DISABLED,
x -> GCFlushedOutputStream.DISABLED = x);

/** Create a new storage implementation, but potentially reusing any data initialized in the last {@link Before} setup. */
protected abstract LogStorage createStorage();

Expand Down Expand Up @@ -171,9 +186,66 @@
}

@Test public void remoting() throws Exception {
remotingParameterized(RemotingTestVariant.Regular);
}

/**
* Verifies that flushing explicitly makes {@link BufferedBuildListener} and {@link GCFlushedOutputStream} unnecessary.
* Only relevant for {@link LogStorage} implementations that use {@link BufferedBuildListener}.
*/
@Test public void remotingNormalFlushOnly() throws Exception {
remotingParameterized(RemotingTestVariant.ManualFlushOnly);
}

/**
* Verifies the behavior of {@link BufferedBuildListener}.
* Only relevant for {@link LogStorage} implementations that use {@link BufferedBuildListener}.
*/
@Test public void remotingDelayedAutoFlushOnly() throws Exception {
// To make this test fail, set `DelayBufferedOutputStream.Tuning.DEFAULT.*RecurrencePeriod` to very large values.
remotingParameterized(RemotingTestVariant.DelayedAutoFlushOnly);
}

/**
* Verifies the behavior of {@link GCFlushedOutputStream}.
* Only relevant for {@link LogStorage} implementations that use {@link BufferedBuildListener}.
*/
@Test public void remotingGcFlushOnly() throws Exception {
// To make this test fail, set `GCFlushedOutputStream.DISABLED` to true.
remotingParameterized(RemotingTestVariant.GcFlushOnly);
}

private enum RemotingTestVariant {
Regular(false, false, false),
ManualFlushOnly(true, true, false),
DelayedAutoFlushOnly(true, false, true),
GcFlushOnly(false, true, true);

boolean disableGcFlush;
boolean disableDelayedAutoFlush;
boolean disableManualFlush;

RemotingTestVariant(boolean disableGcFlush, boolean disableDelayedAutoFlush, boolean disableManualFlush) {
this.disableGcFlush = disableGcFlush;
this.disableDelayedAutoFlush = disableDelayedAutoFlush;
this.disableManualFlush = disableManualFlush;
}
}
private void remotingParameterized(RemotingTestVariant variant) throws Exception {
logging.capture(100).record(Channel.class, Level.WARNING);
if (variant.disableDelayedAutoFlush) {
DelayBufferedOutputStream.Tuning.DEFAULT.minRecurrencePeriod = TimeUnit.HOURS.toMillis(1);
DelayBufferedOutputStream.Tuning.DEFAULT.maxRecurrencePeriod = TimeUnit.HOURS.toMillis(1);
}
if (variant.disableGcFlush) {
GCFlushedOutputStream.DISABLED = true;
}
LogStorage ls = createStorage();
TaskListener overall = ls.overallListener();
if (variant != RemotingTestVariant.Regular) {
assumeThat("Skipping BufferedBuildListener-specific tests because listener is " + overall,
overall, instanceOf(BufferedBuildListener.class));
Comment on lines +246 to +247
Copy link
Member Author

@dwnusbaum dwnusbaum Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary, i.e. the new tests pass against pipeline-cloudwatch-logs, but it seemed somewhat pointless to run tests that configure implementation details of BufferedBuildListener against unrelated listener implementations.

If desired, we could instead move the new tests down into FileLogStorageTest add make the tests here more generic. For example, we'd just have one test that checks that if you write a line and do not flush, the line gets written if you wait for a few seconds. pipeline-cloudwatch-logs would still pass this test without waiting because it doesn't buffer more than a single line. FileLogStorage would only pass after a few seconds due to DelayBufferedOutputStream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you write a line and do not flush, the line gets written if you wait for a few seconds

Fine, I guess, but no one should rely on that; remote code printing to a TaskListener is expected to flush explicitly.

}
overall.getLogger().println("overall from controller");
TaskListener step = ls.nodeListener(new MockNode("1"));
step.getLogger().println("step from controller");
Expand All @@ -185,9 +257,8 @@
DumbSlave s = r.createOnlineSlave();
r.showAgentLogs(s, agentLoggers());
VirtualChannel channel = s.getChannel();
channel.call(new RemotePrint("overall from agent", overall));
channel.call(new RemotePrint("step from agent", step));
channel.call(new GC());
channel.call(new RemotePrint("overall from agent", overall, variant));
channel.call(new RemotePrint("step from agent", step, variant));
overallPos = assertOverallLog(overallPos, lines(
"overall from agent",
"<span class=\"pipeline-node-1\">step from agent",
Expand All @@ -202,22 +273,32 @@
}
private static final class RemotePrint extends MasterToSlaveCallable<Void, Exception> {
private final String message;
private final TaskListener listener;
RemotePrint(String message, TaskListener listener) {
private TaskListener listener;
private final RemotingTestVariant variant;
RemotePrint(String message, TaskListener listener, RemotingTestVariant variant) {
this.message = message;
this.listener = listener;
this.variant = variant;
}
@Override public Void call() {
@Override public Void call() throws Exception {
listener.getLogger().println(message);
listener.getLogger().flush();
return null;
}
}
/** Checking behavior of {@link DelayBufferedOutputStream} garbage collection. */
private static final class GC extends MasterToSlaveCallable<Void, Exception> {
@Override public Void call() {
System.gc();
System.runFinalization();
if (!variant.disableManualFlush) {
listener.getLogger().flush();
}
switch (variant) {
case DelayedAutoFlushOnly -> {
// TODO: It would be better to wait for `DelayBufferedOutputStream.flushBuffer` to run exactly once.

Check warning on line 290 in src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java

View check run for this annotation

ci.jenkins.io / Open Tasks Scanner

TODO

NORMAL: It would be better to wait for `DelayBufferedOutputStream.flushBuffer` to run exactly once.
Thread.sleep(DelayBufferedOutputStream.Tuning.DEFAULT.minRecurrencePeriod * 3);
}
case GcFlushOnly -> {
listener = null;
// Sleeping and calling `System.gc` more than once seem to be necessary for Cleaner to run reliably.
for (int i = 0; i < 3; i++) {
System.gc();
Thread.sleep(1000);
}
}
}
Comment on lines +288 to +301
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that in real-world scenarios, where we do not nicely wait after writing each individual line, there is no guarantee of ordering with GCFlushedOutputStream or DelayBufferedOutputStream, and so you can end up with lines in unexpected orders.

return null;
}
}
Expand Down
Loading