Skip to content

Commit 13a3533

Browse files
committed
feat: update event grid to support live streaming of data
1 parent c3dbf5b commit 13a3533

File tree

8 files changed

+192
-21
lines changed

8 files changed

+192
-21
lines changed

plugins/plugin-client-default/notebooks/dashboard.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,5 +80,5 @@ layout:
8080
execute: now
8181
outputOnly: true
8282
---
83-
chart progress "${LOGDIR}"
83+
chart events "${LOGDIR}"
8484
```

plugins/plugin-codeflare/src/components/Grid.tsx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,18 @@
1717
import React from "react"
1818
import { Tooltip } from "@kui-shell/plugin-client-common"
1919

20-
import Event from "../controller/events/Event"
20+
import { GenericEvent } from "../controller/events/Event"
2121

2222
import "@kui-shell/plugin-client-common/web/scss/components/Table/_index.scss"
2323
import "@kui-shell/plugin-client-common/web/scss/components/Table/Grid/_index.scss"
2424
import "../../web/scss/components/Dashboard/Grid.scss"
2525

2626
interface Props {
27-
events: Event<string, unknown>[]
27+
events: GenericEvent[]
2828
}
2929

3030
export default class Grid extends React.PureComponent<Props> {
31-
private tooltipContent(event: Event<string, unknown>) {
31+
private tooltipContent(event: GenericEvent) {
3232
const title = event.name
3333
const subtitle = event.subtitle || event.type
3434
const status = event.state
@@ -42,7 +42,7 @@ ${status ? "Status: " + status : ""}
4242
\`${showMoreDetail}\``
4343
}
4444

45-
private readonly cell = (event: Event<string, unknown>, idx: number) => {
45+
private readonly cell = (event: GenericEvent, idx: number) => {
4646
return (
4747
<Tooltip key={idx} markdown={this.tooltipContent(event)}>
4848
<span className="kui--grid-cell" data-tag="badge" data-type={event.type} data-state={event.state}>

plugins/plugin-codeflare/src/controller/events/Event.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
export type State = "InProgress" | "Done" | "Error"
18+
1819
type Event<T extends string, Detail> = Detail & {
1920
name: string
2021
subtitle?: string
@@ -25,4 +26,6 @@ type Event<T extends string, Detail> = Detail & {
2526
hidden?: boolean
2627
}
2728

29+
export type GenericEvent = Event<string, unknown>
30+
2831
export default Event

plugins/plugin-codeflare/src/controller/events/Events.tsx

Lines changed: 173 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,184 @@ import { join } from "path"
1919
import stripAnsi from "strip-ansi"
2020
import { Arguments } from "@kui-shell/core"
2121

22-
import kubeEvents from "./kube"
23-
import torchEvents from "./torch"
22+
import parseKubeEvents, { collateEvent as collateKubeEvent, KubeEvent } from "./kube"
23+
import parseTorchEvents, { collateEvent as collateTorchEvent, TorchEvent } from "./torch"
2424

2525
import { expand } from "../../lib/util"
2626
import Grid from "../../components/Grid"
2727

28+
interface EventState {
29+
kubeEvents: KubeEvent[]
30+
torchEvents: TorchEvent[]
31+
}
32+
33+
type State = EventState & {
34+
nKubeEvents: number
35+
nTorchEvents: number
36+
catastrophicError?: Error
37+
}
38+
39+
type Props = EventState & {
40+
/** Follow kube events? */
41+
onKube?(eventType: "data", cb: (data: any) => void): void
42+
43+
/** Follow torch events? */
44+
onTorch?(eventType: "data", cb: (data: any) => void): void
45+
46+
/** Stop watching? */
47+
unwatch?(): void
48+
}
49+
50+
class Events extends React.PureComponent<Props, State> {
51+
public constructor(props: Props) {
52+
super(props)
53+
54+
const kubeEvents = props.kubeEvents || []
55+
const torchEvents = props.torchEvents || []
56+
this.state = {
57+
kubeEvents,
58+
torchEvents,
59+
nKubeEvents: kubeEvents.length,
60+
nTorchEvents: torchEvents.length,
61+
}
62+
63+
// reduce any initial flood of events
64+
let queueFlushHysteresis = 300
65+
setTimeout(() => (queueFlushHysteresis = 0), 5000)
66+
67+
if (props.onKube) {
68+
let queue: string[] = []
69+
let flushTO: ReturnType<typeof setTimeout>
70+
71+
props.onKube("data", (line) => {
72+
if (typeof line === "string") {
73+
queue.push(stripAnsi(line))
74+
75+
if (flushTO) {
76+
clearTimeout(flushTO)
77+
}
78+
79+
flushTO = setTimeout(() => {
80+
const toBeProcessed = queue
81+
queue = []
82+
this.setState((curState) => {
83+
toBeProcessed.forEach((line) => collateKubeEvent(curState.kubeEvents, line))
84+
return {
85+
nKubeEvents: curState.kubeEvents.length,
86+
}
87+
})
88+
}, queueFlushHysteresis)
89+
}
90+
})
91+
}
92+
if (props.onTorch) {
93+
let queue: string[] = []
94+
let flushTO: ReturnType<typeof setTimeout>
95+
96+
props.onTorch("data", (line) => {
97+
if (typeof line === "string") {
98+
queue.push(stripAnsi(line))
99+
100+
if (flushTO) {
101+
clearTimeout(flushTO)
102+
}
103+
104+
flushTO = setTimeout(() => {
105+
const toBeProcessed = queue
106+
queue = []
107+
this.setState((curState) => {
108+
toBeProcessed.forEach((line) => collateTorchEvent(curState.torchEvents, line))
109+
return {
110+
nTorchEvents: curState.torchEvents.length,
111+
}
112+
})
113+
}, queueFlushHysteresis)
114+
}
115+
})
116+
}
117+
}
118+
119+
public static getDerivedStateFromError(error: Error) {
120+
return { catastrophicError: error }
121+
}
122+
123+
public componentDidCatch(error: Error, errorInfo: React.ErrorInfo) {
124+
console.error("catastrophic error in Scalar", error, errorInfo)
125+
}
126+
127+
public componentWillUnmount() {
128+
if (this.props.unwatch) {
129+
this.props.unwatch()
130+
}
131+
}
132+
133+
private get events() {
134+
return [...this.state.kubeEvents, ...this.state.torchEvents]
135+
.filter((_) => !_.hidden)
136+
.sort((a, b) => a.timestamp - b.timestamp)
137+
}
138+
139+
public render() {
140+
if (this.state.catastrophicError) {
141+
return "Internal Error"
142+
} else {
143+
return <Grid events={this.events} />
144+
}
145+
}
146+
}
147+
28148
async function eventsUI(filepath: string, REPL: Arguments["REPL"]) {
29-
const [kube, logs] = await Promise.all([
30-
REPL.qexec<string>(`vfs fslice ${join(expand(filepath), "events/kubernetes.txt")} 0`).then(stripAnsi),
31-
REPL.qexec<string>(`vfs fslice ${join(expand(filepath), "logs/job.txt")} 0`).then(stripAnsi),
32-
])
33-
34-
const events = [...kubeEvents(kube), ...torchEvents(logs)]
35-
.filter((_) => !_.hidden)
36-
.sort((a, b) => a.timestamp - b.timestamp)
37-
return <Grid events={events} />
149+
const jobFilepath = join(expand(filepath), "logs/job.txt")
150+
const kubeFilepath = join(expand(filepath), "events/kubernetes.txt")
151+
152+
if (process.env.FOLLOW) {
153+
const [TailFile, split2] = await Promise.all([
154+
import("@logdna/tail-file").then((_) => _.default),
155+
import("split2").then((_) => _.default),
156+
])
157+
158+
const kubeTail = new TailFile(kubeFilepath, {
159+
startPos: 0,
160+
pollFileIntervalMs: 500,
161+
})
162+
kubeTail.on("tail_error", (err) => {
163+
console.error(err)
164+
})
165+
166+
const jobTail = new TailFile(jobFilepath, {
167+
startPos: 0,
168+
pollFileIntervalMs: 500,
169+
})
170+
jobTail.on("tail_error", (err) => {
171+
console.error(err)
172+
})
173+
174+
kubeTail.start()
175+
jobTail.start()
176+
177+
const kubeSplitter = kubeTail.pipe(split2())
178+
const torchSplitter = jobTail.pipe(split2())
179+
180+
return (
181+
<Events
182+
kubeEvents={[]}
183+
torchEvents={[]}
184+
onKube={kubeSplitter.on.bind(kubeSplitter)}
185+
onTorch={torchSplitter.on.bind(torchSplitter)}
186+
unwatch={() => {
187+
kubeTail.quit()
188+
jobTail.quit()
189+
}}
190+
/>
191+
)
192+
} else {
193+
const [kube, torch] = await Promise.all([
194+
REPL.qexec<string>(`vfs fslice ${kubeFilepath} 0`).then(stripAnsi).then(parseKubeEvents),
195+
REPL.qexec<string>(`vfs fslice ${jobFilepath} 0`).then(stripAnsi).then(parseTorchEvents),
196+
])
197+
198+
return <Events kubeEvents={kube} torchEvents={torch} />
199+
}
38200
}
39201

40202
export default async function eventsCmd(args: Arguments) {

plugins/plugin-codeflare/src/controller/events/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@ import { Registrar } from "@kui-shell/core"
1818

1919
/** Register Kui Commands for rendering dashboard event UIs */
2020
export default function registerEventCommands(registrar: Registrar) {
21-
registrar.listen("/chart/progress", (args) => import("./Events").then((_) => _.default(args)), { needsUI: true })
21+
registrar.listen("/chart/events", (args) => import("./Events").then((_) => _.default(args)), { needsUI: true })
2222
}

plugins/plugin-codeflare/src/controller/events/kube.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import Event from "./Event"
1818

1919
type EventType = "Pulling" | "Pulled"
20-
type KubeEvent = Event<EventType, { node: string }>
20+
export type KubeEvent = Event<EventType, { node: string }>
2121

2222
function findPrevious(M: KubeEvent[], node: KubeEvent["node"], type: EventType) {
2323
for (let idx = M.length - 1; idx >= 0; idx--) {
@@ -28,7 +28,7 @@ function findPrevious(M: KubeEvent[], node: KubeEvent["node"], type: EventType)
2828
}
2929
}
3030

31-
function collateEvent(M: KubeEvent[], line: string) {
31+
export function collateEvent(M: KubeEvent[], line: string): KubeEvent[] {
3232
const pullMatch = line.match(/(Pulling|Pulled)\s+(\S+)\s+(.+)$/)
3333
if (pullMatch) {
3434
const type = pullMatch[1] as EventType

plugins/plugin-codeflare/src/controller/events/torch.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import Event from "./Event"
1818

1919
type EventType = "Epoch" | "Iteration" | "Marker"
2020
type Detail = { epoch: number; step: number; nSteps: number; ip: string }
21-
type TorchEvent = Event<EventType, Detail>
21+
export type TorchEvent = Event<EventType, Detail>
2222

2323
function findPrevious(M: TorchEvent[], ip: TorchEvent["ip"], type: EventType) {
2424
for (let idx = M.length - 1; idx >= 0; idx--) {
@@ -34,7 +34,7 @@ function findEpoch(M: TorchEvent[], ip: TorchEvent["ip"]) {
3434
return evt ? evt.step : -1
3535
}
3636

37-
function collateEvent(M: TorchEvent[], line: string) {
37+
export function collateEvent(M: TorchEvent[], line: string) {
3838
const startMatch = line.match(/ip=([\d.]+)\)\s+(\d+\/\d+\/\d+\s+\d+:\d+:\d+)\s+.+\*\*\*\*\* Running training/)
3939
if (startMatch) {
4040
const ip = startMatch[1]

plugins/plugin-codeflare/web/scss/components/Dashboard/Grid.scss

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ $fullWidth: 1em; /* $large * ($unit + $rgap) - $rgap */
7777
grid-template-columns: repeat(auto-fill, $unit) !important;
7878
grid-auto-rows: $fullWidth;
7979

80+
/**
81+
* Cosmetic: to keep the cursor from switching back and forth from
82+
* pointer to default as you sweep your mouse along the events.
83+
*/
84+
cursor: pointer;
85+
8086
@include CFCell {
8187
padding-left: 1px;
8288
grid-column: span $large;

0 commit comments

Comments
 (0)