Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
/**
* Buffered output stream which is guaranteed to deliver content after some time even if idle and the buffer does not fill up.
* The automatic “flushing” does <em>not</em> flush the underlying stream, for example via {@code ProxyOutputStream.Flush}.
* Also the stream will be flushed before garbage collection.
* Otherwise it is similar to {@link BufferedOutputStream}.
*/
final class DelayBufferedOutputStream extends BufferedOutputStream {
final class DelayBufferedOutputStream extends FilterOutputStream {

private static final Logger LOGGER = Logger.getLogger(DelayBufferedOutputStream.class.getName());

Expand All @@ -56,6 +58,45 @@ private Tuning() {}
static final Tuning DEFAULT = new Tuning();
}

/**
* The interesting state of the buffered stream, kept as a separate object so that {@link FlushRef} can hold on to it.
*/
private static final class Buffer {

final OutputStream out;
private final byte[] dat;
private int pos;

Buffer(OutputStream out, int size) {
this.out = out;
dat = new byte[size];
}

synchronized void drain() throws IOException {
if (pos == 0) {
return;
}
out.write(dat, 0, pos);
pos = 0;
}

void write(int b) throws IOException {
assert Thread.holdsLock(this);
if (pos == dat.length) {
drain();
}
dat[pos++] = (byte) b;
}

synchronized void write(byte[] b, int off, int len) throws IOException {
for (int i = 0; i < len; i++) {
write(b[off + i]);
}
}

}

private final Buffer buf;
private final Tuning tuning;
private long recurrencePeriod;

Expand All @@ -64,62 +105,65 @@ private Tuning() {}
}

DelayBufferedOutputStream(OutputStream out, Tuning tuning) {
super(new FlushControlledOutputStream(out), tuning.bufferSize);
super(out);
buf = new Buffer(out, tuning.bufferSize);
this.tuning = tuning;
recurrencePeriod = tuning.minRecurrencePeriod;
FlushRef.register(this, out);
FlushRef.register(this);
reschedule();
}

private void reschedule() {
Timer.get().schedule(new Flush(this), recurrencePeriod, TimeUnit.MILLISECONDS);
recurrencePeriod = Math.min((long) (recurrencePeriod * tuning.recurrencePeriodBackoff), tuning.maxRecurrencePeriod);
@Override public void write(int b) throws IOException {
synchronized (buf) {
buf.write(b);
}
}

/** We can only call {@link BufferedOutputStream#flushBuffer} via {@link #flush}, but we do not wish to flush the underlying stream, only write out the buffer. */
private void flushBuffer() throws IOException {
ThreadLocal<Boolean> enableFlush = ((FlushControlledOutputStream) out).enableFlush;
boolean orig = enableFlush.get();
enableFlush.set(false);
try {
flush();
} finally {
enableFlush.set(orig);
}
@Override public void write(byte[] b, int off, int len) throws IOException {
buf.write(b, off, len);
}

@Override public void flush() throws IOException {
buf.drain();
super.flush();
}

void flushAndReschedule() {
private void reschedule() {
Timer.get().schedule(new Drain(this), recurrencePeriod, TimeUnit.MILLISECONDS);
recurrencePeriod = Math.min((long) (recurrencePeriod * tuning.recurrencePeriodBackoff), tuning.maxRecurrencePeriod);
}

void drainAndReschedule() {
// TODO as an optimization, avoid flushing the buffer if it was recently flushed anyway due to filling up
try {
flushBuffer();
buf.drain();
} catch (IOException x) {
LOGGER.log(Level.FINE, null, x);
}
reschedule();
}

private static final class Flush implements Runnable {
private static final class Drain implements Runnable {

/** Since there is no explicit close event, just keep flushing periodically until the stream is collected. */
private final Reference<DelayBufferedOutputStream> osr;

Flush(DelayBufferedOutputStream os) {
Drain(DelayBufferedOutputStream os) {
osr = new WeakReference<>(os);
}

@Override public void run() {
DelayBufferedOutputStream os = osr.get();
if (os != null) {
os.flushAndReschedule();
os.drainAndReschedule();
}
}

}

/**
* Flushes streams prior to garbage collection.
* ({@link BufferedOutputStream} does not do this automatically.)
* TODO Java 9+ could use java.util.Cleaner
* In Java 9+ could use {@code java.util.Cleaner} instead.
*/
private static final class FlushRef extends PhantomReference<DelayBufferedOutputStream> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seriously hacky, but I think this will work IIUC what it's doing.

Makes me wonder why Remoting doesn't have a way to register a callback to execute before disposing the lease on an object.


Expand All @@ -132,50 +176,26 @@ private static final class FlushRef extends PhantomReference<DelayBufferedOutput
if (ref == null) {
break;
}
LOGGER.log(Level.FINE, "cleaning up phantom {0}", ref.out);
LOGGER.log(Level.FINE, "flushing {0} from a DelayBufferedOutputStream", ref.buf.out);
try {
ref.out.flush();
ref.buf.drain();
ref.buf.out.flush();
} catch (IOException x) {
LOGGER.log(Level.WARNING, null, x);
}
}
}, 0, 10, TimeUnit.SECONDS);
}

static void register(DelayBufferedOutputStream dbos, OutputStream out) {
new FlushRef(dbos, out, rq).enqueue();
static void register(DelayBufferedOutputStream dbos) {
new FlushRef(dbos, rq).enqueue();
}

private final OutputStream out;
private final Buffer buf;

private FlushRef(DelayBufferedOutputStream dbos, OutputStream out, ReferenceQueue<DelayBufferedOutputStream> rq) {
private FlushRef(DelayBufferedOutputStream dbos, ReferenceQueue<DelayBufferedOutputStream> rq) {
super(dbos, rq);
this.out = out;
}

}

/** @see DelayBufferedOutputStream#flushBuffer */
private static final class FlushControlledOutputStream extends FilterOutputStream {

private final ThreadLocal<Boolean> enableFlush = new ThreadLocal<Boolean>() {
@Override protected Boolean initialValue() {
return true;
}
};

FlushControlledOutputStream(OutputStream out) {
super(out);
}

@Override public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len); // super method writes one byte at a time!
}

@Override public void flush() throws IOException {
if (enableFlush.get()) {
super.flush();
}
this.buf = dbos.buf;
}

}
Expand Down