Skip to content

Commit 6703075

Browse files
authored
Merge pull request #438 from das7pad/stream-step-log-interface
Implement non-buffered streaming for step logs
2 parents 0f6c0cf + 01d7de0 commit 6703075

File tree

2 files changed

+209
-48
lines changed

2 files changed

+209
-48
lines changed

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@
6464
<properties>
6565
<changelist>999999-SNAPSHOT</changelist>
6666
<!-- https://www.jenkins.io/doc/developer/plugin-development/choosing-jenkins-baseline/ -->
67-
<jenkins.baseline>2.504</jenkins.baseline>
68-
<jenkins.version>${jenkins.baseline}.1</jenkins.version>
67+
<jenkins.baseline>2.528</jenkins.baseline>
68+
<jenkins.version>2.532</jenkins.version>
6969
<no-test-jar>false</no-test-jar>
7070
<gitHubRepo>jenkinsci/${project.artifactId}-plugin</gitHubRepo>
7171
<hpi.strictBundledArtifacts>true</hpi.strictBundledArtifacts>
@@ -75,7 +75,7 @@
7575
<dependency>
7676
<groupId>io.jenkins.tools.bom</groupId>
7777
<artifactId>bom-${jenkins.baseline}.x</artifactId>
78-
<version>5015.vb_52d36583443</version>
78+
<version>5577.vea_979d35b_b_ff</version>
7979
<scope>import</scope>
8080
<type>pom</type>
8181
</dependency>

src/main/java/org/jenkinsci/plugins/workflow/log/FileLogStorage.java

Lines changed: 206 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@
3131
import hudson.model.BuildListener;
3232
import hudson.model.TaskListener;
3333
import java.io.BufferedReader;
34+
import java.io.EOFException;
3435
import java.io.File;
3536
import java.io.FileOutputStream;
3637
import java.io.FilterOutputStream;
38+
import java.io.InputStream;
3739
import java.io.IOException;
3840
import java.io.OutputStream;
3941
import java.io.OutputStreamWriter;
@@ -54,6 +56,7 @@
5456
import org.kohsuke.accmod.Restricted;
5557
import org.kohsuke.accmod.restrictions.Beta;
5658
import org.kohsuke.stapler.framework.io.ByteBuffer;
59+
import org.kohsuke.stapler.framework.io.LargeText;
5760

5861
/**
5962
* Simple implementation of log storage in a single file that maintains a side file with an index indicating where node transitions occur.
@@ -268,23 +271,77 @@ private void maybeFlush() {
268271
@NonNull
269272
@Override public AnnotatedLargeText<FlowNode> stepLog(@NonNull FlowNode node, boolean complete) {
270273
maybeFlush();
271-
String id = node.getId();
272-
try (ByteBuffer buf = new ByteBuffer();
273-
RandomAccessFile raf = new RandomAccessFile(log, "r");
274-
BufferedReader indexBR = index.isFile() ? Files.newBufferedReader(index.toPath(), StandardCharsets.UTF_8) : new BufferedReader(new NullReader(0))) {
275-
// Check this _before_ reading index-log to reduce the chance of a race condition resulting in recent content being associated with the wrong step:
276-
long end = raf.length();
277-
// To produce just the output for a single step (again we do not need to pay attention to ConsoleNote here since AnnotatedLargeText handles it),
278-
// index-log is read looking for transitions that pertain to this step: beginning or ending its content, including at EOF if applicable.
279-
// (Other transitions, such as to or from unrelated steps, are irrelevant).
280-
// Once a start and end position have been identified, that block is copied to a memory buffer.
281-
String line;
282-
long pos = -1; // -1 if not currently in this node, start position if we are
283-
while ((line = indexBR.readLine()) != null) {
274+
long rawLogSize;
275+
long stepLogSize = 0;
276+
String nodeId = node.getId();
277+
try (RandomAccessFile raf = new RandomAccessFile(log, "r")) {
278+
// Check this _before_ reading index-log to reduce the chance of a race condition resulting in recent content being associated with the wrong step.
279+
rawLogSize = raf.length();
280+
if (index.isFile()) {
281+
try (IndexReader idr = new IndexReader(rawLogSize, nodeId)) {
282+
stepLogSize = idr.getStepLogSize();
283+
}
284+
}
285+
} catch (IOException x) {
286+
return new BrokenLogStorage(x).stepLog(node, complete);
287+
}
288+
if (stepLogSize == 0) {
289+
return new AnnotatedLargeText<>(new ByteBuffer(), StandardCharsets.UTF_8, complete, node);
290+
}
291+
return new AnnotatedLargeText<>(new StreamingStepLog(rawLogSize, stepLogSize, nodeId), StandardCharsets.UTF_8, complete, node);
292+
}
293+
294+
private class IndexReader implements AutoCloseable {
295+
static class Next {
296+
public long start = -1;
297+
public long end = -1;
298+
}
299+
private final String nodeId;
300+
private final long rawLogSize;
301+
private boolean done;
302+
private BufferedReader indexBR = null;
303+
private long pos = -1; // -1 if not currently in this node, start position if we are
304+
305+
public IndexReader(long rawLogSize, String nodeId) {
306+
this.rawLogSize = rawLogSize;
307+
this.nodeId = nodeId;
308+
}
309+
310+
public void close() throws IOException {
311+
if (indexBR != null) {
312+
indexBR.close();
313+
indexBR = null;
314+
}
315+
}
316+
317+
private void ensureOpen() throws IOException {
318+
if (indexBR == null) {
319+
indexBR = Files.newBufferedReader(index.toPath(), StandardCharsets.UTF_8);
320+
}
321+
}
322+
323+
public long getStepLogSize() throws IOException {
324+
long stepLogSize = 0;
325+
Next next = new Next();
326+
while (readNext(next)) {
327+
stepLogSize += (next.end - next.start);
328+
}
329+
return stepLogSize;
330+
}
331+
332+
public boolean readNext(Next next) throws IOException {
333+
if (done) return false;
334+
ensureOpen();
335+
while (!done) {
336+
String line = indexBR.readLine();
337+
if (line == null) {
338+
done = true;
339+
break;
340+
}
284341
int space = line.indexOf(' ');
285-
long lastTransition = -1;
342+
long nextTransition;
286343
try {
287-
lastTransition = Long.parseLong(space == -1 ? line : line.substring(0, space));
344+
nextTransition = Long.parseLong(space == -1 ? line : line.substring(0, space));
288345
} catch (NumberFormatException x) {
289346
LOGGER.warning("Ignoring corrupt index file " + index);
290347
// If index-log is corrupt for whatever reason, we given up on this step in this build;
@@ -295,48 +352,152 @@ private void maybeFlush() {
295352
pos = -1;
296353
continue;
297354
}
355+
if (nextTransition >= rawLogSize) {
356+
// Do not emit positions past the previously determined logSize.
357+
nextTransition = rawLogSize;
358+
done = true;
359+
}
298360
if (pos == -1) {
299-
if (space != -1 && line.substring(space + 1).equals(id)) {
300-
pos = lastTransition;
301-
}
302-
} else if (lastTransition > pos) {
303-
raf.seek(pos);
304-
if (lastTransition > pos + Integer.MAX_VALUE) {
305-
throw new IOException("Cannot read more than 2Gib at a time"); // ByteBuffer does not support it anyway
361+
if (space != -1 && line.substring(space + 1).equals(nodeId)) {
362+
pos = nextTransition;
306363
}
307-
// Could perhaps be done a bit more efficiently with FileChannel methods,
308-
// at least if org.kohsuke.stapler.framework.io.ByteBuffer were replaced by java.nio.[Heap]ByteBuffer.
309-
// The overall bottleneck here is however the need to use a memory buffer to begin with:
310-
// LargeText.Source/Session are not public so, pending improvements to Stapler,
311-
// we cannot lazily stream per-step content the way we do for the overall log.
312-
// (Except perhaps by extending ByteBuffer and then overriding every public method!)
313-
// LargeText also needs to be improved to support opaque (non-long) cursors
314-
// (and callers such as progressiveText.jelly and Blue Ocean updated accordingly),
315-
// which is a hard requirement for efficient rendering of cloud-backed logs,
316-
// though for this implementation we do not need it since we can work with byte offsets.
317-
byte[] data = new byte[(int) (lastTransition - pos)];
318-
raf.readFully(data);
319-
buf.write(data);
364+
} else if (nextTransition > pos) {
365+
next.start = pos;
366+
next.end = nextTransition;
320367
pos = -1;
368+
return true;
321369
} else {
322370
// Some sort of mismatch. Do not emit this section.
323371
pos = -1;
324372
}
325373
}
326-
if (pos != -1 && /* otherwise race condition? */ end > pos) {
374+
if (pos != -1 && rawLogSize > pos) {
327375
// In case the build is ongoing and we are still actively writing content for this step,
328376
// we will hit EOF before any other transition. Otherwise identical to normal case above.
329-
raf.seek(pos);
330-
if (end > pos + Integer.MAX_VALUE) {
331-
throw new IOException("Cannot read more than 2Gib at a time");
377+
next.start = pos;
378+
next.end = rawLogSize;
379+
return true;
380+
}
381+
return false;
382+
}
383+
}
384+
385+
private class StreamingStepLog implements LargeText.Source {
386+
private final String nodeId;
387+
private final long rawLogSize;
388+
private final long stepLogSize;
389+
390+
StreamingStepLog(long rawLogSize, long stepLogSize, String nodeId ) {
391+
super();
392+
this.rawLogSize = rawLogSize;
393+
this.stepLogSize = stepLogSize;
394+
this.nodeId = nodeId;
395+
}
396+
397+
public boolean exists() {
398+
return true;
399+
}
400+
401+
public long length() {
402+
return stepLogSize;
403+
}
404+
405+
public LargeText.Session open() {
406+
return new StreamingStepLogSession();
407+
}
408+
409+
class StreamingStepLogSession extends InputStream implements LargeText.Session {
410+
private RandomAccessFile rawLog;
411+
private final IndexReader.Next next = new IndexReader.Next();
412+
private IndexReader indexReader;
413+
private long rawLogPos = next.end;
414+
private long stepLogPos = 0;
415+
416+
@Override
417+
public void close() throws IOException {
418+
try {
419+
if (rawLog != null) {
420+
rawLog.close();
421+
rawLog = null;
422+
}
423+
} finally {
424+
if (indexReader != null) {
425+
indexReader.close();
426+
indexReader = null;
427+
}
428+
}
429+
}
430+
431+
@Override
432+
public long skip(long n) throws IOException {
433+
if (stepLogPos + n > stepLogSize) {
434+
return 0;
435+
}
436+
if (n == 0) return 0;
437+
438+
ensureOpen();
439+
long skipped = 0;
440+
while (skipped < n) {
441+
advanceNextIfNeeded(false);
442+
long remainingInNext = next.end - rawLogPos;
443+
long remainingToSkip = n - skipped;
444+
long skip = Long.min(remainingInNext, remainingToSkip);
445+
rawLogPos += skip;
446+
stepLogPos += skip;
447+
skipped += skip;
448+
}
449+
rawLog.seek(rawLogPos);
450+
return skipped;
451+
}
452+
453+
@Override
454+
public int read() throws IOException {
455+
byte[] b = new byte[1];
456+
int n = read(b, 0, 1);
457+
if (n != 1) return -1;
458+
return (int) b[0];
459+
}
460+
461+
@Override
462+
public int read(@NonNull byte[] b) throws IOException {
463+
return read(b, 0, b.length);
464+
}
465+
466+
@Override
467+
public int read(@NonNull byte[] b, int off, int len) throws IOException {
468+
if (stepLogPos >= stepLogSize) {
469+
return -1;
470+
}
471+
ensureOpen();
472+
advanceNextIfNeeded(true);
473+
long remaining = next.end - rawLogPos;
474+
if (len > remaining) {
475+
// len is an int and remaining is smaller, so no overflow is possible.
476+
len = (int) remaining;
477+
}
478+
int n = rawLog.read(b, off, len);
479+
rawLogPos += n;
480+
stepLogPos += n;
481+
return n;
482+
}
483+
484+
private void advanceNextIfNeeded(boolean seek) throws IOException {
485+
if (rawLogPos < next.end) return;
486+
if (!indexReader.readNext(next)) {
487+
throw new EOFException("index truncated; did not reach previously discovered end of step log");
488+
}
489+
if (seek) rawLog.seek(next.start);
490+
rawLogPos = next.start;
491+
}
492+
493+
private void ensureOpen() throws IOException {
494+
if (rawLog == null) {
495+
rawLog = new RandomAccessFile(log, "r");
496+
}
497+
if (indexReader == null) {
498+
indexReader = new IndexReader(rawLogSize, nodeId);
332499
}
333-
byte[] data = new byte[(int) (end - pos)];
334-
raf.readFully(data);
335-
buf.write(data);
336500
}
337-
return new AnnotatedLargeText<>(buf, StandardCharsets.UTF_8, complete, node);
338-
} catch (IOException x) {
339-
return new BrokenLogStorage(x).stepLog(node, complete);
340501
}
341502
}
342503

0 commit comments

Comments
 (0)