-
Notifications
You must be signed in to change notification settings - Fork 80
[JENKINS-54566] finalize vs. flush #83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
8ee0788
[JENKINS-54566] Reproducing warning sometimes from FileLogStorageTest.
jglick 5e845fe
Figured out how to make test reliably fail unless running with remoti…
jglick 8ab09d1
Capture remote log messages.
jglick d33b987
Use a phantom reference queue rather than overriding finalize.
jglick 18f1b53
Comments.
jglick 4bef56f
Rewrote DelayBufferedOutputStream to not be based on BufferedOutputSt…
jglick b944d25
Factored out GCFlushedOutputStream, and reverting 4bef56fe1ea425a8c47…
jglick File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,8 +41,10 @@ | |
| /** | ||
| * Buffered output stream which is guaranteed to deliver content after some time even if idle and the buffer does not fill up. | ||
| * The automatic “flushing” does <em>not</em> flush the underlying stream, for example via {@code ProxyOutputStream.Flush}. | ||
| * Also the stream will be flushed before garbage collection. | ||
| * Otherwise it is similar to {@link BufferedOutputStream}. | ||
| */ | ||
| final class DelayBufferedOutputStream extends BufferedOutputStream { | ||
| final class DelayBufferedOutputStream extends FilterOutputStream { | ||
|
|
||
| private static final Logger LOGGER = Logger.getLogger(DelayBufferedOutputStream.class.getName()); | ||
|
|
||
|
|
@@ -56,6 +58,45 @@ private Tuning() {} | |
| static final Tuning DEFAULT = new Tuning(); | ||
| } | ||
|
|
||
| /** | ||
| * The interesting state of the buffered stream, kept as a separate object so that {@link FlushRef} can hold on to it. | ||
| */ | ||
| private static final class Buffer { | ||
|
|
||
| final OutputStream out; | ||
| private final byte[] dat; | ||
| private int pos; | ||
jglick marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| Buffer(OutputStream out, int size) { | ||
| this.out = out; | ||
| dat = new byte[size]; | ||
| } | ||
|
|
||
| synchronized void drain() throws IOException { | ||
| if (pos == 0) { | ||
| return; | ||
| } | ||
| out.write(dat, 0, pos); | ||
| pos = 0; | ||
| } | ||
|
|
||
| void write(int b) throws IOException { | ||
| assert Thread.holdsLock(this); | ||
jglick marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (pos == dat.length) { | ||
| drain(); | ||
| } | ||
| dat[pos++] = (byte) b; | ||
| } | ||
|
|
||
| synchronized void write(byte[] b, int off, int len) throws IOException { | ||
| for (int i = 0; i < len; i++) { | ||
| write(b[off + i]); | ||
jglick marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| } | ||
|
|
||
| private final Buffer buf; | ||
| private final Tuning tuning; | ||
| private long recurrencePeriod; | ||
|
|
||
|
|
@@ -64,62 +105,65 @@ private Tuning() {} | |
| } | ||
|
|
||
| DelayBufferedOutputStream(OutputStream out, Tuning tuning) { | ||
| super(new FlushControlledOutputStream(out), tuning.bufferSize); | ||
| super(out); | ||
| buf = new Buffer(out, tuning.bufferSize); | ||
| this.tuning = tuning; | ||
| recurrencePeriod = tuning.minRecurrencePeriod; | ||
| FlushRef.register(this, out); | ||
| FlushRef.register(this); | ||
| reschedule(); | ||
| } | ||
|
|
||
| private void reschedule() { | ||
| Timer.get().schedule(new Flush(this), recurrencePeriod, TimeUnit.MILLISECONDS); | ||
| recurrencePeriod = Math.min((long) (recurrencePeriod * tuning.recurrencePeriodBackoff), tuning.maxRecurrencePeriod); | ||
| @Override public void write(int b) throws IOException { | ||
| synchronized (buf) { | ||
| buf.write(b); | ||
| } | ||
| } | ||
|
|
||
| /** We can only call {@link BufferedOutputStream#flushBuffer} via {@link #flush}, but we do not wish to flush the underlying stream, only write out the buffer. */ | ||
| private void flushBuffer() throws IOException { | ||
| ThreadLocal<Boolean> enableFlush = ((FlushControlledOutputStream) out).enableFlush; | ||
| boolean orig = enableFlush.get(); | ||
| enableFlush.set(false); | ||
| try { | ||
| flush(); | ||
| } finally { | ||
| enableFlush.set(orig); | ||
| } | ||
| @Override public void write(byte[] b, int off, int len) throws IOException { | ||
| buf.write(b, off, len); | ||
| } | ||
|
|
||
| @Override public void flush() throws IOException { | ||
| buf.drain(); | ||
| super.flush(); | ||
| } | ||
|
|
||
| void flushAndReschedule() { | ||
| private void reschedule() { | ||
| Timer.get().schedule(new Drain(this), recurrencePeriod, TimeUnit.MILLISECONDS); | ||
| recurrencePeriod = Math.min((long) (recurrencePeriod * tuning.recurrencePeriodBackoff), tuning.maxRecurrencePeriod); | ||
| } | ||
|
|
||
| void drainAndReschedule() { | ||
| // TODO as an optimization, avoid flushing the buffer if it was recently flushed anyway due to filling up | ||
| try { | ||
| flushBuffer(); | ||
| buf.drain(); | ||
| } catch (IOException x) { | ||
| LOGGER.log(Level.FINE, null, x); | ||
| } | ||
| reschedule(); | ||
| } | ||
|
|
||
| private static final class Flush implements Runnable { | ||
| private static final class Drain implements Runnable { | ||
|
|
||
| /** Since there is no explicit close event, just keep flushing periodically until the stream is collected. */ | ||
| private final Reference<DelayBufferedOutputStream> osr; | ||
|
|
||
| Flush(DelayBufferedOutputStream os) { | ||
| Drain(DelayBufferedOutputStream os) { | ||
| osr = new WeakReference<>(os); | ||
| } | ||
|
|
||
| @Override public void run() { | ||
| DelayBufferedOutputStream os = osr.get(); | ||
| if (os != null) { | ||
| os.flushAndReschedule(); | ||
| os.drainAndReschedule(); | ||
| } | ||
| } | ||
|
|
||
| } | ||
|
|
||
| /** | ||
| * Flushes streams prior to garbage collection. | ||
| * ({@link BufferedOutputStream} does not do this automatically.) | ||
| * TODO Java 9+ could use java.util.Cleaner | ||
| * In Java 9+ could use {@code java.util.Cleaner} instead. | ||
| */ | ||
| private static final class FlushRef extends PhantomReference<DelayBufferedOutputStream> { | ||
|
||
|
|
||
|
|
@@ -132,50 +176,26 @@ private static final class FlushRef extends PhantomReference<DelayBufferedOutput | |
| if (ref == null) { | ||
| break; | ||
| } | ||
| LOGGER.log(Level.FINE, "cleaning up phantom {0}", ref.out); | ||
| LOGGER.log(Level.FINE, "flushing {0} from a DelayBufferedOutputStream", ref.buf.out); | ||
| try { | ||
| ref.out.flush(); | ||
| ref.buf.drain(); | ||
| ref.buf.out.flush(); | ||
| } catch (IOException x) { | ||
| LOGGER.log(Level.WARNING, null, x); | ||
| } | ||
| } | ||
| }, 0, 10, TimeUnit.SECONDS); | ||
| } | ||
|
|
||
| static void register(DelayBufferedOutputStream dbos, OutputStream out) { | ||
| new FlushRef(dbos, out, rq).enqueue(); | ||
| static void register(DelayBufferedOutputStream dbos) { | ||
| new FlushRef(dbos, rq).enqueue(); | ||
| } | ||
|
|
||
| private final OutputStream out; | ||
| private final Buffer buf; | ||
|
|
||
| private FlushRef(DelayBufferedOutputStream dbos, OutputStream out, ReferenceQueue<DelayBufferedOutputStream> rq) { | ||
| private FlushRef(DelayBufferedOutputStream dbos, ReferenceQueue<DelayBufferedOutputStream> rq) { | ||
| super(dbos, rq); | ||
| this.out = out; | ||
| } | ||
|
|
||
| } | ||
|
|
||
| /** @see DelayBufferedOutputStream#flushBuffer */ | ||
| private static final class FlushControlledOutputStream extends FilterOutputStream { | ||
|
|
||
| private final ThreadLocal<Boolean> enableFlush = new ThreadLocal<Boolean>() { | ||
| @Override protected Boolean initialValue() { | ||
| return true; | ||
| } | ||
| }; | ||
|
|
||
| FlushControlledOutputStream(OutputStream out) { | ||
| super(out); | ||
| } | ||
|
|
||
| @Override public void write(byte[] b, int off, int len) throws IOException { | ||
| out.write(b, off, len); // super method writes one byte at a time! | ||
| } | ||
|
|
||
| @Override public void flush() throws IOException { | ||
| if (enableFlush.get()) { | ||
| super.flush(); | ||
| } | ||
| this.buf = dbos.buf; | ||
| } | ||
|
|
||
| } | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.