1414 * limitations under the License.
1515 */
1616
17+ import Heap from "heap"
1718import stripAnsi from "strip-ansi"
1819import type { TextProps } from "ink"
1920
2021import type { Tail } from "../tailf.js"
2122import type { OnData , Worker } from "../../../components/Dashboard/types.js"
2223
23- import { WorkerState , stateFor } from "./states.js"
24+ import { WorkerState , rankFor , stateFor } from "./states.js"
25+
26+ type Line = { line : string ; stateRank : number ; timestamp : number }
2427
2528/**
2629 * Maintain a model of live data from a given set of file streams
@@ -32,16 +35,21 @@ export default class Live {
3235 private readonly workers : Record < string , Worker > = { }
3336
3437 /** Number of lines of output to retain. TODO this depends on height of terminal? */
35- private static readonly nLines = 18
36-
37- /** Model of the last `Live.nLines` lines of output */
38- private readonly lines : string [ ] = Array ( Live . nLines ) . fill ( "" )
38+ private static readonly MAX_HEAP = 1000
3939
40- /** Current insertion index into `this.lines` */
41- private linesInsertionIdx = 0
40+ /** Model of the lines of output */
41+ private readonly lines = new Heap < Line > ( ( a , b ) => {
42+ if ( a . line === b . line ) {
43+ return 0
44+ }
4245
43- /** Current number of valid lines in `this.lines` */
44- private linesCount = 0
46+ const stateDiff = a . stateRank - b . stateRank
47+ if ( stateDiff !== 0 ) {
48+ return stateDiff
49+ } else {
50+ return a . timestamp - b . timestamp
51+ }
52+ } )
4553
4654 public constructor ( private readonly tails : Promise < Tail > [ ] , cb : OnData , styleOf : Record < WorkerState , TextProps > ) {
4755 tails . map ( ( tailf ) => {
@@ -60,7 +68,7 @@ export default class Live {
6068
6169 if ( ! name || ! timestamp ) {
6270 // console.error("Bad status record", line)
63- this . pushLineAndPublish ( data , cb )
71+ // this.pushLineAndPublish(data, metric, timestamp , cb)
6472 return
6573 } else if ( ! metric ) {
6674 // ignoring this line
@@ -93,7 +101,7 @@ export default class Live {
93101
94102 // inform the UI that we have updates
95103 cb ( {
96- lines : this . pushLine ( data ) ,
104+ lines : this . pushLine ( data , metric , timestamp ) ,
97105 workers : Object . values ( this . workers ) ,
98106 } )
99107 }
@@ -112,40 +120,45 @@ export default class Live {
112120 } )
113121 }
114122
123+ private readonly lookup : Record < string , Line > = { }
115124 /** Add `line` to our circular buffer `this.lines` */
116- private pushLine ( line : string ) {
117- if ( this . lines . includes ( line ) ) {
118- // duplicate line
119- // the oldest is the one we are about to overwrite
120- let oldestIdx = this . linesInsertionIdx
121- if ( this . lines [ oldestIdx ] . length === 0 ) {
122- do {
123- oldestIdx = ( oldestIdx + 1 ) % Live . nLines
124- } while ( this . lines [ oldestIdx ] . length === 0 )
125- }
126- return { lines : this . lines , idx : oldestIdx , N : this . linesCount }
125+ private pushLine ( line : string , metric : WorkerState , timestamp : number ) {
126+ const key = line
127+ . replace ( / \s * ( \d \d \d \d - \d \d - \d \d T \d \d : \d \d : \d \d Z ) \s * / , "{timestamp}" )
128+ . replace ( / p o d \/ t o r c h x - \S + / , "" ) // worker name in torchx
129+ . replace ( / p o d \/ r a y - ( h e a d | w o r k e r ) - \S + / , "" ) // worker name in ray
130+ . replace ( / \* / , "" ) // wildcard worker name (codeflare)
131+ . replace ( / \x1b \x5B \[ 2 J / g, "" ) // eslint-disable-line no-control-regex
132+ // ^^^ [2J is part of clear screen; we don't want those to flow through
133+
134+ const rec = {
135+ timestamp,
136+ stateRank : rankFor [ metric ] ,
137+ line : key ,
138+ }
139+
140+ const already = this . lookup [ rec . line ]
141+ if ( already ) {
142+ already . timestamp = timestamp
143+ this . lines . updateItem ( already )
127144 } else {
128- const idx = this . linesInsertionIdx
129- this . linesInsertionIdx = ( this . linesInsertionIdx + 1 ) % Live . nLines
130-
131- this . lines [ idx ] = line
132- this . linesCount = Math . min ( this . lines . length , this . linesCount + 1 )
133-
134- // the oldest is the one we are about to overwrite
135- let oldestIdx = this . linesInsertionIdx
136- if ( this . lines [ oldestIdx ] . length === 0 ) {
137- do {
138- oldestIdx = ( oldestIdx + 1 ) % Live . nLines
139- } while ( this . lines [ oldestIdx ] . length === 0 )
145+ this . lookup [ rec . line ] = rec
146+ if ( this . lines . size ( ) >= Live . MAX_HEAP ) {
147+ this . lines . replace ( rec )
148+ } else {
149+ this . lines . push ( rec )
140150 }
141-
142- return { lines : this . lines , idx : oldestIdx , N : this . linesCount }
143151 }
152+
153+ return this . lines
154+ . toArray ( )
155+ . slice ( 0 , 18 )
156+ . sort ( ( a , b ) => a . timestamp - b . timestamp )
144157 }
145158
146159 /** `pushLine` and then pass the updated model to `cb` */
147- private pushLineAndPublish ( line : string , cb : OnData ) {
148- cb ( { lines : this . pushLine ( line ) , workers : Object . values ( this . workers ) } )
160+ private pushLineAndPublish ( line : string , metric : WorkerState , timestamp : number , cb : OnData ) {
161+ cb ( { lines : this . pushLine ( line , metric , timestamp ) , workers : Object . values ( this . workers ) } )
149162 }
150163
151164 private asMillisSinceEpoch ( timestamp : string ) {
0 commit comments