1515 */
1616
1717import Heap from "heap"
18+ import ansiRegex from "ansi-regex"
1819import stripAnsi from "strip-ansi"
1920import type { TextProps } from "ink"
2021
@@ -28,6 +29,15 @@ import { rankFor, stateFor } from "./states.js"
2829
2930type Event = { line : string ; stateRank : number ; timestamp : number }
3031
32+ /**
33+ * Keep track of a local timestamp so we can prioritize and show the
34+ * most recent; this is a "local" timestamp in that it does not
35+ * indicate when the event was created on the server, but rather when
36+ * we received it. Perhaps suboptimal, but we cannot guarantee that
37+ * random log lines from applications are timestamped.
38+ */
39+ type LogLineRecord = { id : string ; logLine : string ; localMillis : number }
40+
3141/**
3242 * Maintain a model of live data from a given set of file streams
3343 * (`tails`), and pump it into the given `cb` callback.
@@ -41,7 +51,7 @@ export default class Live {
4151 private static readonly MAX_HEAP = 1000
4252
4353 /** Model of logLines. TODO circular buffer and obey options.lines */
44- // private logLine = ""
54+ private logLine : Record < string , LogLineRecord > = { }
4555
4656 /** Model of the lines of output */
4757 private readonly events = new Heap < Event > ( ( a , b ) => {
@@ -144,11 +154,15 @@ export default class Live {
144154 . sort ( ( a , b ) => a . timestamp - b . timestamp )
145155 }
146156
157+ /** Replace any timestamps with a placeholder, so that the UI can use a "5m ago" style */
158+ private timestamped ( line : string ) {
159+ return line . replace ( / \s * ( \d \d \d \d - \d \d - \d \d T \d \d : \d \d : \d \d ( \. \d + ) ? Z ) \s * / , "{timestamp}" )
160+ }
161+
147162 private readonly lookup : Record < string , Event > = { }
148163 /** Add `line` to our heap `this.events` */
149164 private pushEvent ( line : string , metric : WorkerState , timestamp : number ) {
150- const key = line
151- . replace ( / \s * ( \d \d \d \d - \d \d - \d \d T \d \d : \d \d : \d \d ( \. \d + ) ? Z ) \s * / , "{timestamp}" )
165+ const key = this . timestamped ( line )
152166 . replace ( / p o d \/ t o r c h x - \S + / , "" ) // worker name in torchx
153167 . replace ( / p o d \/ r a y - ( h e a d | w o r k e r ) - \S + / , "" ) // worker name in ray
154168 . replace ( / \* / , "" ) // wildcard worker name (codeflare)
@@ -181,12 +195,37 @@ export default class Live {
181195 }
182196 }
183197
198+ /** This helps us parse out a [W5] style prefix for loglines, so we can intuit the worker id of the log line */
199+ private readonly workerIdPattern = new RegExp ( "^(" + ansiRegex ( ) . source + ")?\\[([^\\]]+)\\]" )
200+
201+ private readonly timeSorter = ( a : LogLineRecord , b : LogLineRecord ) : number => {
202+ return a . localMillis - b . localMillis
203+ }
204+
205+ private readonly idSorter = ( a : LogLineRecord , b : LogLineRecord ) : number => {
206+ return a . id . localeCompare ( b . id )
207+ }
208+
184209 /** Add the given `line` to our logLines model and pass the updated model to `cb` */
185210 private pushLineAndPublish ( logLine : string , cb : OnData ) {
186211 if ( logLine ) {
187212 // here we avoid a flood of React renders by batching them up a
188213 // bit; i thought react 18 was supposed to help with this. hmm.
189- cb ( { logLine } )
214+ const match = logLine . match ( this . workerIdPattern )
215+ const id = match ? match [ 2 ] : "notsure"
216+ if ( id ) {
217+ this . logLine [ id ] = { id, logLine, localMillis : Date . now ( ) }
218+
219+ // display the k most recent logLines per worker, ordering the display by worker id
220+ const k = 4
221+ cb ( {
222+ logLine : Object . values ( this . logLine )
223+ . sort ( this . timeSorter ) // so we can pick off the most recent
224+ . slice ( 0 , k ) // now pick off the k most recent
225+ . sort ( this . idSorter ) // sort those k by worker id, so there is a consistent ordering in the UI
226+ . map ( ( _ ) => _ . logLine ) , // and display just the logLine
227+ } )
228+ }
190229 }
191230 }
192231
0 commit comments