Skip to content

Commit 03c0878

Browse files
authored
Merge pull request #83 from jglick/finalize-JENKINS-54566
[JENKINS-54566] finalize vs. flush
2 parents 57eb03e + b944d25 commit 03c0878

File tree

5 files changed

+155
-7
lines changed

5 files changed

+155
-7
lines changed

src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ private static final class Replacement implements SerializableOnlyOverRemoting {
7474
}
7575

7676
private Object readResolve() throws IOException {
77-
return new BufferedBuildListener(new DelayBufferedOutputStream(ros, tuning));
77+
return new BufferedBuildListener(new GCFlushedOutputStream(new DelayBufferedOutputStream(ros, tuning)));
7878
}
7979

8080
}

src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,8 @@ void flushAndReschedule() {
9595
reschedule();
9696
}
9797

98-
@SuppressWarnings("FinalizeDeclaration") // not ideal, but PhantomReference is more of a hassle
99-
@Override protected void finalize() throws Throwable {
100-
super.finalize();
101-
// Odd that this is not the default behavior for BufferedOutputStream.
102-
flush();
98+
@Override public String toString() {
99+
return "DelayBufferedOutputStream[" + out + "]";
103100
}
104101

105102
private static final class Flush implements Runnable {
@@ -143,6 +140,10 @@ private static final class FlushControlledOutputStream extends FilterOutputStrea
143140
}
144141
}
145142

143+
@Override public String toString() {
144+
return "FlushControlledOutputStream[" + out + "]";
145+
}
146+
146147
}
147148

148149
}

src/main/java/org/jenkinsci/plugins/workflow/log/FileLogStorage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ private FileLogStorage(File log) {
8585
private synchronized void open() throws IOException {
8686
if (os == null) {
8787
os = new FileOutputStream(log, true);
88-
bos = new DelayBufferedOutputStream(os);
88+
bos = new GCFlushedOutputStream(new DelayBufferedOutputStream(os));
8989
if (index.isFile()) {
9090
try (BufferedReader r = Files.newBufferedReader(index.toPath(), StandardCharsets.UTF_8)) {
9191
// TODO would be faster to scan the file backwards for the penultimate \n, then convert the byte sequence from there to EOF to UTF-8 and set lastId accordingly
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* The MIT License
3+
*
4+
* Copyright 2018 CloudBees, Inc.
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in
14+
* all copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22+
* THE SOFTWARE.
23+
*/
24+
25+
package org.jenkinsci.plugins.workflow.log;
26+
27+
import java.io.BufferedOutputStream;
28+
import java.io.FilterOutputStream;
29+
import java.io.IOException;
30+
import java.io.OutputStream;
31+
import java.lang.ref.PhantomReference;
32+
import java.lang.ref.ReferenceQueue;
33+
import java.util.concurrent.TimeUnit;
34+
import java.util.logging.Level;
35+
import java.util.logging.Logger;
36+
import jenkins.util.Timer;
37+
38+
/**
39+
* A stream which will be flushed before garbage collection.
40+
* {@link BufferedOutputStream} does not do this automatically.
41+
*/
42+
final class GCFlushedOutputStream extends FilterOutputStream {
43+
44+
private static final Logger LOGGER = Logger.getLogger(GCFlushedOutputStream.class.getName());
45+
46+
GCFlushedOutputStream(OutputStream out) {
47+
super(out);
48+
FlushRef.register(this, out);
49+
}
50+
51+
@Override public void write(byte[] b, int off, int len) throws IOException {
52+
out.write(b, off, len); // super method is surprising
53+
}
54+
55+
@Override public String toString() {
56+
return "GCFlushedOutputStream[" + out + "]";
57+
}
58+
59+
/**
60+
* Flushes streams prior to garbage collection.
61+
* ({@link BufferedOutputStream} does not do this automatically.)
62+
* TODO Java 9+ could use java.util.Cleaner
63+
*/
64+
private static final class FlushRef extends PhantomReference<GCFlushedOutputStream> {
65+
66+
private static final ReferenceQueue<GCFlushedOutputStream> rq = new ReferenceQueue<>();
67+
68+
static {
69+
Timer.get().scheduleWithFixedDelay(() -> {
70+
while (true) {
71+
FlushRef ref = (FlushRef) rq.poll();
72+
if (ref == null) {
73+
break;
74+
}
75+
LOGGER.log(Level.FINE, "flushing {0}", ref.out);
76+
try {
77+
ref.out.flush();
78+
} catch (IOException x) {
79+
LOGGER.log(Level.WARNING, null, x);
80+
}
81+
}
82+
}, 0, 10, TimeUnit.SECONDS);
83+
}
84+
85+
static void register(GCFlushedOutputStream fos, OutputStream out) {
86+
new FlushRef(fos, out, rq).enqueue();
87+
}
88+
89+
private final OutputStream out;
90+
91+
private FlushRef(GCFlushedOutputStream fos, OutputStream out, ReferenceQueue<GCFlushedOutputStream> rq) {
92+
super(fos, rq);
93+
this.out = out;
94+
}
95+
96+
}
97+
98+
}

src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import hudson.console.AnnotatedLargeText;
2828
import hudson.console.HyperlinkNote;
2929
import hudson.model.TaskListener;
30+
import hudson.remoting.Channel;
3031
import hudson.remoting.VirtualChannel;
32+
import hudson.util.StreamTaskListener;
3133
import java.io.EOFException;
3234
import java.io.PrintWriter;
3335
import java.io.StringWriter;
@@ -37,11 +39,18 @@
3739
import java.util.Random;
3840
import java.util.concurrent.Callable;
3941
import java.util.function.BiFunction;
42+
import java.util.logging.Formatter;
43+
import java.util.logging.Handler;
44+
import java.util.logging.Level;
45+
import java.util.logging.LogRecord;
46+
import java.util.logging.Logger;
47+
import java.util.logging.SimpleFormatter;
4048
import jenkins.security.MasterToSlaveCallable;
4149
import org.apache.commons.io.FileUtils;
4250
import org.apache.commons.io.output.NullOutputStream;
4351
import org.apache.commons.io.output.NullWriter;
4452
import org.apache.commons.io.output.WriterOutputStream;
53+
import static org.hamcrest.Matchers.*;
4554
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
4655
import org.jenkinsci.plugins.workflow.graph.FlowNode;
4756
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
@@ -51,6 +60,7 @@
5160
import org.junit.ClassRule;
5261
import org.junit.Test;
5362
import org.jvnet.hudson.test.JenkinsRule;
63+
import org.jvnet.hudson.test.LoggerRule;
5464

5565
/**
5666
* Foundation for compliance tests of {@link LogStorage} implementations.
@@ -63,6 +73,8 @@ public abstract class LogStorageTestBase {
6373

6474
@ClassRule public static JenkinsRule r = new JenkinsRule();
6575

76+
@ClassRule public static LoggerRule logging = new LoggerRule();
77+
6678
/** Create a new storage implementation, but potentially reusing any data initialized in the last {@link Before} setup. */
6779
protected abstract LogStorage createStorage() throws Exception;
6880

@@ -142,6 +154,7 @@ protected static void close(TaskListener listener) throws Exception {
142154
}
143155

144156
@Test public void remoting() throws Exception {
157+
logging.capture(100).record(Channel.class, Level.WARNING);
145158
LogStorage ls = createStorage();
146159
TaskListener overall = ls.overallListener();
147160
overall.getLogger().println("overall from master");
@@ -150,12 +163,15 @@ protected static void close(TaskListener listener) throws Exception {
150163
long overallPos = assertOverallLog(0, "overall from master\n<span class=\"pipeline-node-1\">step from master\n</span>", true);
151164
long stepPos = assertStepLog("1", 0, "step from master\n", true);
152165
VirtualChannel channel = r.createOnlineSlave().getChannel();
166+
channel.call(new RemoteLogDumper("agent"));
153167
channel.call(new RemotePrint("overall from agent", overall));
154168
channel.call(new RemotePrint("step from agent", step));
169+
channel.call(new GC());
155170
overallPos = assertOverallLog(overallPos, "overall from agent\n<span class=\"pipeline-node-1\">step from agent\n</span>", true);
156171
stepPos = assertStepLog("1", stepPos, "step from agent\n", true);
157172
assertEquals(overallPos, assertOverallLog(overallPos, "", true));
158173
assertEquals(stepPos, assertStepLog("1", stepPos, "", true));
174+
assertThat(logging.getMessages(), empty());
159175
}
160176
private static final class RemotePrint extends MasterToSlaveCallable<Void, Exception> {
161177
static {
@@ -173,6 +189,39 @@ private static final class RemotePrint extends MasterToSlaveCallable<Void, Excep
173189
return null;
174190
}
175191
}
192+
/** Checking behavior of {@link DelayBufferedOutputStream} garbage collection. */
193+
private static final class GC extends MasterToSlaveCallable<Void, Exception> {
194+
@Override public Void call() throws Exception {
195+
System.gc();
196+
System.runFinalization();
197+
return null;
198+
}
199+
}
200+
// TODO copied from pipeline-log-cloudwatch; consider whether this should be moved into LoggerRule
201+
private static final class RemoteLogDumper extends MasterToSlaveCallable<Void, RuntimeException> {
202+
private final String name;
203+
private final TaskListener stderr = StreamTaskListener.fromStderr();
204+
RemoteLogDumper(String name) {
205+
this.name = name;
206+
}
207+
@Override public Void call() throws RuntimeException {
208+
Handler handler = new Handler() {
209+
final Formatter formatter = new SimpleFormatter();
210+
@Override public void publish(LogRecord record) {
211+
if (isLoggable(record)) {
212+
stderr.getLogger().print(formatter.format(record).replaceAll("(?m)^", "[" + name + "] "));
213+
}
214+
}
215+
@Override public void flush() {}
216+
@Override public void close() throws SecurityException {}
217+
};
218+
handler.setLevel(Level.ALL);
219+
Logger logger = Logger.getLogger(LogStorageTestBase.class.getPackage().getName());
220+
logger.setLevel(Level.FINER);
221+
logger.addHandler(handler);
222+
return null;
223+
}
224+
}
176225

177226
/**
178227
* Checks what happens when code using {@link TaskListener#getLogger} prints a line with inadequate synchronization.

0 commit comments

Comments
 (0)