Skip to content

Commit 8393a07

Browse files
committed
Implement non-buffered streaming for step logs
Now that the log-index is read incrementally while streaming, the length of the step log needs to be determined before returning. The only option for determining the step logs length is by reading the full index file. A new IndexReader class abstracts the parsing of the index. It is used by both the full read in advance and the incremental read while streaming the log content.
1 parent 0f6c0cf commit 8393a07

File tree

1 file changed

+206
-45
lines changed

1 file changed

+206
-45
lines changed

src/main/java/org/jenkinsci/plugins/workflow/log/FileLogStorage.java

Lines changed: 206 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@
3131
import hudson.model.BuildListener;
3232
import hudson.model.TaskListener;
3333
import java.io.BufferedReader;
34+
import java.io.EOFException;
3435
import java.io.File;
3536
import java.io.FileOutputStream;
3637
import java.io.FilterOutputStream;
38+
import java.io.InputStream;
3739
import java.io.IOException;
3840
import java.io.OutputStream;
3941
import java.io.OutputStreamWriter;
@@ -54,6 +56,7 @@
5456
import org.kohsuke.accmod.Restricted;
5557
import org.kohsuke.accmod.restrictions.Beta;
5658
import org.kohsuke.stapler.framework.io.ByteBuffer;
59+
import org.kohsuke.stapler.framework.io.LargeText;
5760

5861
/**
5962
* Simple implementation of log storage in a single file that maintains a side file with an index indicating where node transitions occur.
@@ -268,23 +271,77 @@ private void maybeFlush() {
268271
@NonNull
269272
@Override public AnnotatedLargeText<FlowNode> stepLog(@NonNull FlowNode node, boolean complete) {
270273
maybeFlush();
271-
String id = node.getId();
272-
try (ByteBuffer buf = new ByteBuffer();
273-
RandomAccessFile raf = new RandomAccessFile(log, "r");
274-
BufferedReader indexBR = index.isFile() ? Files.newBufferedReader(index.toPath(), StandardCharsets.UTF_8) : new BufferedReader(new NullReader(0))) {
275-
// Check this _before_ reading index-log to reduce the chance of a race condition resulting in recent content being associated with the wrong step:
276-
long end = raf.length();
277-
// To produce just the output for a single step (again we do not need to pay attention to ConsoleNote here since AnnotatedLargeText handles it),
278-
// index-log is read looking for transitions that pertain to this step: beginning or ending its content, including at EOF if applicable.
279-
// (Other transitions, such as to or from unrelated steps, are irrelevant).
280-
// Once a start and end position have been identified, that block is copied to a memory buffer.
281-
String line;
282-
long pos = -1; // -1 if not currently in this node, start position if we are
283-
while ((line = indexBR.readLine()) != null) {
274+
long rawLogSize;
275+
long stepLogSize = 0;
276+
String nodeId = node.getId();
277+
try (RandomAccessFile raf = new RandomAccessFile(log, "r")) {
278+
// Check this _before_ reading index-log to reduce the chance of a race condition resulting in recent content being associated with the wrong step.
279+
rawLogSize = raf.length();
280+
if (index.isFile()) {
281+
try (IndexReader idr = new IndexReader(rawLogSize, nodeId)) {
282+
stepLogSize = idr.getStepLogSize();
283+
}
284+
}
285+
} catch (IOException x) {
286+
return new BrokenLogStorage(x).stepLog(node, complete);
287+
}
288+
if (stepLogSize == 0) {
289+
return new AnnotatedLargeText<>(new ByteBuffer(), StandardCharsets.UTF_8, complete, node);
290+
}
291+
return new AnnotatedLargeText<>(new StreamingStepLog(rawLogSize, stepLogSize, nodeId), StandardCharsets.UTF_8, complete, node);
292+
}
293+
294+
private class IndexReader implements AutoCloseable {
295+
static class Next {
296+
public long start = -1;
297+
public long end = -1;
298+
}
299+
private final String nodeId;
300+
private final long rawLogSize;
301+
private boolean done;
302+
private BufferedReader indexBR = null;
303+
private long pos = -1; // -1 if not currently in this node, start position if we are
304+
305+
public IndexReader(long rawLogSize, String nodeId) {
306+
this.rawLogSize = rawLogSize;
307+
this.nodeId = nodeId;
308+
}
309+
310+
public void close() throws IOException {
311+
if (indexBR != null) {
312+
indexBR.close();
313+
indexBR = null;
314+
}
315+
}
316+
317+
private void ensureOpen() throws IOException {
318+
if (indexBR == null) {
319+
indexBR = Files.newBufferedReader(index.toPath(), StandardCharsets.UTF_8);
320+
}
321+
}
322+
323+
public long getStepLogSize() throws IOException {
324+
long stepLogSize = 0;
325+
Next next = new Next();
326+
while (readNext(next)) {
327+
stepLogSize += (next.end - next.start);
328+
}
329+
return stepLogSize;
330+
}
331+
332+
public boolean readNext(Next next) throws IOException {
333+
if (done) return false;
334+
ensureOpen();
335+
while (!done) {
336+
String line = indexBR.readLine();
337+
if (line == null) {
338+
done = true;
339+
break;
340+
}
284341
int space = line.indexOf(' ');
285-
long lastTransition = -1;
342+
long nextTransition;
286343
try {
287-
lastTransition = Long.parseLong(space == -1 ? line : line.substring(0, space));
344+
nextTransition = Long.parseLong(space == -1 ? line : line.substring(0, space));
288345
} catch (NumberFormatException x) {
289346
LOGGER.warning("Ignoring corrupt index file " + index);
290347
// If index-log is corrupt for whatever reason, we given up on this step in this build;
@@ -295,48 +352,152 @@ private void maybeFlush() {
295352
pos = -1;
296353
continue;
297354
}
355+
if (nextTransition >= rawLogSize) {
356+
// Do not emit positions past the previously determined logSize.
357+
nextTransition = rawLogSize;
358+
done = true;
359+
}
298360
if (pos == -1) {
299-
if (space != -1 && line.substring(space + 1).equals(id)) {
300-
pos = lastTransition;
301-
}
302-
} else if (lastTransition > pos) {
303-
raf.seek(pos);
304-
if (lastTransition > pos + Integer.MAX_VALUE) {
305-
throw new IOException("Cannot read more than 2Gib at a time"); // ByteBuffer does not support it anyway
361+
if (space != -1 && line.substring(space + 1).equals(nodeId)) {
362+
pos = nextTransition;
306363
}
307-
// Could perhaps be done a bit more efficiently with FileChannel methods,
308-
// at least if org.kohsuke.stapler.framework.io.ByteBuffer were replaced by java.nio.[Heap]ByteBuffer.
309-
// The overall bottleneck here is however the need to use a memory buffer to begin with:
310-
// LargeText.Source/Session are not public so, pending improvements to Stapler,
311-
// we cannot lazily stream per-step content the way we do for the overall log.
312-
// (Except perhaps by extending ByteBuffer and then overriding every public method!)
313-
// LargeText also needs to be improved to support opaque (non-long) cursors
314-
// (and callers such as progressiveText.jelly and Blue Ocean updated accordingly),
315-
// which is a hard requirement for efficient rendering of cloud-backed logs,
316-
// though for this implementation we do not need it since we can work with byte offsets.
317-
byte[] data = new byte[(int) (lastTransition - pos)];
318-
raf.readFully(data);
319-
buf.write(data);
364+
} else if (nextTransition > pos) {
365+
next.start = pos;
366+
next.end = nextTransition;
320367
pos = -1;
368+
return true;
321369
} else {
322370
// Some sort of mismatch. Do not emit this section.
323371
pos = -1;
324372
}
325373
}
326-
if (pos != -1 && /* otherwise race condition? */ end > pos) {
374+
if (pos != -1 && rawLogSize > pos) {
327375
// In case the build is ongoing and we are still actively writing content for this step,
328376
// we will hit EOF before any other transition. Otherwise identical to normal case above.
329-
raf.seek(pos);
330-
if (end > pos + Integer.MAX_VALUE) {
331-
throw new IOException("Cannot read more than 2Gib at a time");
377+
next.start = pos;
378+
next.end = rawLogSize;
379+
return true;
380+
}
381+
return false;
382+
}
383+
}
384+
385+
private class StreamingStepLog implements LargeText.Source {
386+
private final String nodeId;
387+
private final long rawLogSize;
388+
private final long stepLogSize;
389+
390+
StreamingStepLog(long rawLogSize, long stepLogSize, String nodeId ) {
391+
super();
392+
this.rawLogSize = rawLogSize;
393+
this.stepLogSize = stepLogSize;
394+
this.nodeId = nodeId;
395+
}
396+
397+
public boolean exists() {
398+
return true;
399+
}
400+
401+
public long length() {
402+
return stepLogSize;
403+
}
404+
405+
public LargeText.Session open() {
406+
return new StreamingStepLogSession();
407+
}
408+
409+
class StreamingStepLogSession extends InputStream implements LargeText.Session {
410+
private RandomAccessFile rawLog;
411+
private final IndexReader.Next next = new IndexReader.Next();
412+
private IndexReader indexReader;
413+
private long rawLogPos = next.end;
414+
private long stepLogPos = 0;
415+
416+
@Override
417+
public void close() throws IOException {
418+
try {
419+
if (rawLog != null) {
420+
rawLog.close();
421+
rawLog = null;
422+
}
423+
} finally {
424+
if (indexReader != null) {
425+
indexReader.close();
426+
indexReader = null;
427+
}
428+
}
429+
}
430+
431+
@Override
432+
public long skip(long n) throws IOException {
433+
if (stepLogPos + n > stepLogSize) {
434+
return 0;
435+
}
436+
if (n == 0) return 0;
437+
438+
ensureOpen();
439+
long skipped = 0;
440+
while (skipped < n) {
441+
advanceNextIfNeeded(false);
442+
long remainingInNext = next.end - rawLogPos;
443+
long remainingToSkip = n - skipped;
444+
long skip = Long.min(remainingInNext, remainingToSkip);
445+
rawLogPos += skip;
446+
stepLogPos += skip;
447+
skipped += skip;
448+
}
449+
rawLog.seek(rawLogPos);
450+
return skipped;
451+
}
452+
453+
@Override
454+
public int read() throws IOException {
455+
byte[] b = new byte[1];
456+
int n = read(b, 0, 1);
457+
if (n != 1) return -1;
458+
return (int) b[0];
459+
}
460+
461+
@Override
462+
public int read(@NonNull byte[] b) throws IOException {
463+
return read(b, 0, b.length);
464+
}
465+
466+
@Override
467+
public int read(@NonNull byte[] b, int off, int len) throws IOException {
468+
if (stepLogPos == stepLogSize) {
469+
return -1;
470+
}
471+
ensureOpen();
472+
advanceNextIfNeeded(true);
473+
long remaining = next.end - rawLogPos;
474+
if (len > remaining) {
475+
// len is an int and remaining is smaller, so no overflow is possible.
476+
len = (int) remaining;
477+
}
478+
int n = rawLog.read(b, off, len);
479+
rawLogPos += n;
480+
stepLogPos += n;
481+
return n;
482+
}
483+
484+
private void advanceNextIfNeeded(boolean seek) throws IOException {
485+
if (rawLogPos < next.end) return;
486+
if (!indexReader.readNext(next)) {
487+
throw new EOFException("index truncated; did not reach previously discovered end of step log");
488+
}
489+
if (seek) rawLog.seek(next.start);
490+
rawLogPos = next.start;
491+
}
492+
493+
private void ensureOpen() throws IOException {
494+
if (rawLog == null) {
495+
rawLog = new RandomAccessFile(log, "r");
496+
}
497+
if (indexReader == null) {
498+
indexReader = new IndexReader(rawLogSize, nodeId);
332499
}
333-
byte[] data = new byte[(int) (end - pos)];
334-
raf.readFully(data);
335-
buf.write(data);
336500
}
337-
return new AnnotatedLargeText<>(buf, StandardCharsets.UTF_8, complete, node);
338-
} catch (IOException x) {
339-
return new BrokenLogStorage(x).stepLog(node, complete);
340501
}
341502
}
342503

0 commit comments

Comments
 (0)