Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
Expand Down Expand Up @@ -65,6 +67,7 @@ private Tuning() {}
super(new FlushControlledOutputStream(out), tuning.bufferSize);
this.tuning = tuning;
recurrencePeriod = tuning.minRecurrencePeriod;
FlushRef.register(this, out);
reschedule();
}

Expand Down Expand Up @@ -95,13 +98,6 @@ void flushAndReschedule() {
reschedule();
}

@SuppressWarnings("FinalizeDeclaration") // not ideal, but PhantomReference is more of a hassle
@Override protected void finalize() throws Throwable {
super.finalize();
// Odd that this is not the default behavior for BufferedOutputStream.
flush();
}

private static final class Flush implements Runnable {

/** Since there is no explicit close event, just keep flushing periodically until the stream is collected. */
Expand All @@ -120,6 +116,45 @@ private static final class Flush implements Runnable {

}

/**
* Flushes streams prior to garbage collection.
* ({@link BufferedOutputStream} does not do this automatically.)
* TODO Java 9+ could use java.util.Cleaner
*/
private static final class FlushRef extends PhantomReference<DelayBufferedOutputStream> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seriously hacky, but I think this will work IIUC what it's doing.

Makes me wonder why Remoting doesn't have a way to register a callback to execute before disposing the lease on an object.


private static final ReferenceQueue<DelayBufferedOutputStream> rq = new ReferenceQueue<>();

static {
Timer.get().scheduleWithFixedDelay(() -> {
while (true) {
FlushRef ref = (FlushRef) rq.poll();
if (ref == null) {
break;
}
LOGGER.log(Level.FINE, "cleaning up phantom {0}", ref.out);
try {
ref.out.flush();
} catch (IOException x) {
LOGGER.log(Level.WARNING, null, x);
}
}
}, 0, 10, TimeUnit.SECONDS);
}

static void register(DelayBufferedOutputStream dbos, OutputStream out) {
new FlushRef(dbos, out, rq).enqueue();
}

private final OutputStream out;

private FlushRef(DelayBufferedOutputStream dbos, OutputStream out, ReferenceQueue<DelayBufferedOutputStream> rq) {
super(dbos, rq);
this.out = out;
}

}

/** @see DelayBufferedOutputStream#flushBuffer */
private static final class FlushControlledOutputStream extends FilterOutputStream {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import hudson.console.AnnotatedLargeText;
import hudson.console.HyperlinkNote;
import hudson.model.TaskListener;
import hudson.remoting.Channel;
import hudson.remoting.VirtualChannel;
import hudson.util.StreamTaskListener;
import java.io.EOFException;
import java.io.PrintWriter;
import java.io.StringWriter;
Expand All @@ -37,11 +39,18 @@
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.function.BiFunction;
import java.util.logging.Formatter;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import jenkins.security.MasterToSlaveCallable;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.output.NullOutputStream;
import org.apache.commons.io.output.NullWriter;
import org.apache.commons.io.output.WriterOutputStream;
import static org.hamcrest.Matchers.*;
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
Expand All @@ -51,6 +60,7 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.jvnet.hudson.test.JenkinsRule;
import org.jvnet.hudson.test.LoggerRule;

/**
* Foundation for compliance tests of {@link LogStorage} implementations.
Expand All @@ -63,6 +73,8 @@ public abstract class LogStorageTestBase {

@ClassRule public static JenkinsRule r = new JenkinsRule();

@ClassRule public static LoggerRule logging = new LoggerRule();

/** Create a new storage implementation, but potentially reusing any data initialized in the last {@link Before} setup. */
protected abstract LogStorage createStorage() throws Exception;

Expand Down Expand Up @@ -142,6 +154,7 @@ protected static void close(TaskListener listener) throws Exception {
}

@Test public void remoting() throws Exception {
logging.capture(100).record(Channel.class, Level.WARNING);
LogStorage ls = createStorage();
TaskListener overall = ls.overallListener();
overall.getLogger().println("overall from master");
Expand All @@ -150,12 +163,15 @@ protected static void close(TaskListener listener) throws Exception {
long overallPos = assertOverallLog(0, "overall from master\n<span class=\"pipeline-node-1\">step from master\n</span>", true);
long stepPos = assertStepLog("1", 0, "step from master\n", true);
VirtualChannel channel = r.createOnlineSlave().getChannel();
channel.call(new RemoteLogDumper("agent"));
channel.call(new RemotePrint("overall from agent", overall));
channel.call(new RemotePrint("step from agent", step));
channel.call(new GC());
overallPos = assertOverallLog(overallPos, "overall from agent\n<span class=\"pipeline-node-1\">step from agent\n</span>", true);
stepPos = assertStepLog("1", stepPos, "step from agent\n", true);
assertEquals(overallPos, assertOverallLog(overallPos, "", true));
assertEquals(stepPos, assertStepLog("1", stepPos, "", true));
assertThat(logging.getMessages(), empty());
}
private static final class RemotePrint extends MasterToSlaveCallable<Void, Exception> {
static {
Expand All @@ -173,6 +189,39 @@ private static final class RemotePrint extends MasterToSlaveCallable<Void, Excep
return null;
}
}
/** Checking behavior of {@link DelayBufferedOutputStream} garbage collection. */
private static final class GC extends MasterToSlaveCallable<Void, Exception> {
@Override public Void call() throws Exception {
System.gc();
System.runFinalization();
return null;
}
}
// TODO copied from pipeline-log-cloudwatch; consider whether this should be moved into LoggerRule
private static final class RemoteLogDumper extends MasterToSlaveCallable<Void, RuntimeException> {
private final String name;
private final TaskListener stderr = StreamTaskListener.fromStderr();
RemoteLogDumper(String name) {
this.name = name;
}
@Override public Void call() throws RuntimeException {
Handler handler = new Handler() {
final Formatter formatter = new SimpleFormatter();
@Override public void publish(LogRecord record) {
if (isLoggable(record)) {
stderr.getLogger().print(formatter.format(record).replaceAll("(?m)^", "[" + name + "] "));
}
}
@Override public void flush() {}
@Override public void close() throws SecurityException {}
};
handler.setLevel(Level.ALL);
Logger logger = Logger.getLogger(LogStorageTestBase.class.getPackage().getName());
logger.setLevel(Level.FINER);
logger.addHandler(handler);
return null;
}
}

/**
* Checks what happens when code using {@link TaskListener#getLogger} prints a line with inadequate synchronization.
Expand Down