From e40983b7b7affba335abfb5022e42bdf740a1ec4 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Tue, 23 Feb 2021 15:33:13 -0800 Subject: [PATCH] import a copy of buildkit progressui Signed-off-by: Andrea Luzzardi --- pkg/progressui/display.go | 587 ++++++++++++++++++++++++++++++++++++++ pkg/progressui/printer.go | 293 +++++++++++++++++++ 2 files changed, 880 insertions(+) create mode 100644 pkg/progressui/display.go create mode 100644 pkg/progressui/printer.go diff --git a/pkg/progressui/display.go b/pkg/progressui/display.go new file mode 100644 index 00000000..885bcc41 --- /dev/null +++ b/pkg/progressui/display.go @@ -0,0 +1,587 @@ +package progressui + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "sort" + "strconv" + "strings" + "time" + + "github.com/containerd/console" + "github.com/jaguilar/vt100" + "github.com/moby/buildkit/client" + "github.com/morikuni/aec" + digest "github.com/opencontainers/go-digest" + "github.com/tonistiigi/units" + "golang.org/x/time/rate" +) + +func DisplaySolveStatus(ctx context.Context, phase string, c console.Console, w io.Writer, ch chan *client.SolveStatus) error { + + modeConsole := c != nil + + disp := &display{c: c, phase: phase} + printer := &textMux{w: w} + + if disp.phase == "" { + disp.phase = "Building" + } + + t := newTrace(w, modeConsole) + + tickerTimeout := 150 * time.Millisecond + displayTimeout := 100 * time.Millisecond + + if v := os.Getenv("TTY_DISPLAY_RATE"); v != "" { + if r, err := strconv.ParseInt(v, 10, 64); err == nil { + tickerTimeout = time.Duration(r) * time.Millisecond + displayTimeout = time.Duration(r) * time.Millisecond + } + } + + var done bool + ticker := time.NewTicker(tickerTimeout) + defer ticker.Stop() + + displayLimiter := rate.NewLimiter(rate.Every(displayTimeout), 1) + + var height int + width, _ := disp.getSize() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + case ss, ok := <-ch: + if ok { + t.update(ss, width) + } else { + done = true + } + } + + if modeConsole { + width, height = disp.getSize() + if done { + disp.print(t.displayInfo(), width, height, true) + t.printErrorLogs(c) + return nil + } else if displayLimiter.Allow() { + ticker.Stop() + ticker = time.NewTicker(tickerTimeout) + disp.print(t.displayInfo(), width, height, false) + } + } else { + if done || displayLimiter.Allow() { + printer.print(t) + if done { + t.printErrorLogs(w) + return nil + } + ticker.Stop() + ticker = time.NewTicker(tickerTimeout) + } + } + } +} + +const termHeight = 6 +const termPad = 10 + +type displayInfo struct { + startTime time.Time + jobs []*job + countTotal int + countCompleted int +} + +type job struct { + startTime *time.Time + completedTime *time.Time + name string + status string + hasError bool + isCanceled bool + vertex *vertex + showTerm bool +} + +type trace struct { + w io.Writer + localTimeDiff time.Duration + vertexes []*vertex + byDigest map[digest.Digest]*vertex + nextIndex int + updates map[digest.Digest]struct{} + modeConsole bool +} + +type vertex struct { + *client.Vertex + statuses []*status + byID map[string]*status + indent string + index int + + logs [][]byte + logsPartial bool + logsOffset int + prev *client.Vertex + events []string + lastBlockTime *time.Time + count int + statusUpdates map[string]struct{} + + jobs []*job + jobCached bool + + term *vt100.VT100 + termBytes int + termCount int +} + +func (v *vertex) update(c int) { + if v.count == 0 { + now := time.Now() + v.lastBlockTime = &now + } + v.count += c +} + +type status struct { + *client.VertexStatus +} + +func newTrace(w io.Writer, modeConsole bool) *trace { + return &trace{ + byDigest: make(map[digest.Digest]*vertex), + updates: make(map[digest.Digest]struct{}), + w: w, + modeConsole: modeConsole, + } +} + +func (t *trace) triggerVertexEvent(v *client.Vertex) { + if v.Started == nil { + return + } + + var old client.Vertex + vtx := t.byDigest[v.Digest] + if v := vtx.prev; v != nil { + old = *v + } + + changed := false + if v.Digest != old.Digest { + changed = true + } + if v.Name != old.Name { + changed = true + } + if v.Started != old.Started { + if v.Started != nil && old.Started == nil || !v.Started.Equal(*old.Started) { + changed = true + } + } + if v.Completed != old.Completed && v.Completed != nil { + changed = true + } + if v.Cached != old.Cached { + changed = true + } + if v.Error != old.Error { + changed = true + } + + if changed { + vtx.update(1) + t.updates[v.Digest] = struct{}{} + } + + t.byDigest[v.Digest].prev = v +} + +func (t *trace) update(s *client.SolveStatus, termWidth int) { + for _, v := range s.Vertexes { + prev, ok := t.byDigest[v.Digest] + if !ok { + t.nextIndex++ + t.byDigest[v.Digest] = &vertex{ + byID: make(map[string]*status), + statusUpdates: make(map[string]struct{}), + index: t.nextIndex, + } + if t.modeConsole { + t.byDigest[v.Digest].term = vt100.NewVT100(termHeight, termWidth-termPad) + } + } + t.triggerVertexEvent(v) + if v.Started != nil && (prev == nil || prev.Started == nil) { + if t.localTimeDiff == 0 { + t.localTimeDiff = time.Since(*v.Started) + } + t.vertexes = append(t.vertexes, t.byDigest[v.Digest]) + } + // allow a duplicate initial vertex that shouldn't reset state + if !(prev != nil && prev.Started != nil && v.Started == nil) { + t.byDigest[v.Digest].Vertex = v + } + t.byDigest[v.Digest].jobCached = false + } + for _, s := range s.Statuses { + v, ok := t.byDigest[s.Vertex] + if !ok { + continue // shouldn't happen + } + v.jobCached = false + prev, ok := v.byID[s.ID] + if !ok { + v.byID[s.ID] = &status{VertexStatus: s} + } + if s.Started != nil && (prev == nil || prev.Started == nil) { + v.statuses = append(v.statuses, v.byID[s.ID]) + } + v.byID[s.ID].VertexStatus = s + v.statusUpdates[s.ID] = struct{}{} + t.updates[v.Digest] = struct{}{} + v.update(1) + } + for _, l := range s.Logs { + v, ok := t.byDigest[l.Vertex] + if !ok { + continue // shouldn't happen + } + v.jobCached = false + if v.term != nil { + if v.term.Width != termWidth { + v.term.Resize(termHeight, termWidth-termPad) + } + v.termBytes += len(l.Data) + v.term.Write(l.Data) // error unhandled on purpose. don't trust vt100 + } + i := 0 + complete := split(l.Data, byte('\n'), func(dt []byte) { + if v.logsPartial && len(v.logs) != 0 && i == 0 { + v.logs[len(v.logs)-1] = append(v.logs[len(v.logs)-1], dt...) + } else { + ts := time.Duration(0) + if v.Started != nil { + ts = l.Timestamp.Sub(*v.Started) + } + prec := 1 + sec := ts.Seconds() + if sec < 10 { + prec = 3 + } else if sec < 100 { + prec = 2 + } + v.logs = append(v.logs, []byte(fmt.Sprintf("#%d %s %s", v.index, fmt.Sprintf("%.[2]*[1]f", sec, prec), dt))) + } + i++ + }) + v.logsPartial = !complete + t.updates[v.Digest] = struct{}{} + v.update(1) + } +} + +func (t *trace) printErrorLogs(f io.Writer) { + for _, v := range t.vertexes { + if v.Error != "" && !strings.HasSuffix(v.Error, context.Canceled.Error()) { + fmt.Fprintln(f, "------") + fmt.Fprintf(f, " > %s:\n", v.Name) + for _, l := range v.logs { + f.Write(l) + fmt.Fprintln(f) + } + fmt.Fprintln(f, "------") + } + } +} + +func (t *trace) displayInfo() (d displayInfo) { + d.startTime = time.Now() + if t.localTimeDiff != 0 { + d.startTime = (*t.vertexes[0].Started).Add(t.localTimeDiff) + } + d.countTotal = len(t.byDigest) + for _, v := range t.byDigest { + if v.Completed != nil { + d.countCompleted++ + } + } + + for _, v := range t.vertexes { + if v.jobCached { + d.jobs = append(d.jobs, v.jobs...) + continue + } + var jobs []*job + j := &job{ + startTime: addTime(v.Started, t.localTimeDiff), + completedTime: addTime(v.Completed, t.localTimeDiff), + name: strings.Replace(v.Name, "\t", " ", -1), + vertex: v, + } + if v.Error != "" { + if strings.HasSuffix(v.Error, context.Canceled.Error()) { + j.isCanceled = true + j.name = "CANCELED " + j.name + } else { + j.hasError = true + j.name = "ERROR " + j.name + } + } + if v.Cached { + j.name = "CACHED " + j.name + } + j.name = v.indent + j.name + jobs = append(jobs, j) + for _, s := range v.statuses { + j := &job{ + startTime: addTime(s.Started, t.localTimeDiff), + completedTime: addTime(s.Completed, t.localTimeDiff), + name: v.indent + "=> " + s.ID, + } + if s.Total != 0 { + j.status = fmt.Sprintf("%.2f / %.2f", units.Bytes(s.Current), units.Bytes(s.Total)) + } else if s.Current != 0 { + j.status = fmt.Sprintf("%.2f", units.Bytes(s.Current)) + } + jobs = append(jobs, j) + } + d.jobs = append(d.jobs, jobs...) + v.jobs = jobs + v.jobCached = true + } + + return d +} + +func split(dt []byte, sep byte, fn func([]byte)) bool { + if len(dt) == 0 { + return false + } + for { + if len(dt) == 0 { + return true + } + idx := bytes.IndexByte(dt, sep) + if idx == -1 { + fn(dt) + return false + } + fn(dt[:idx]) + dt = dt[idx+1:] + } +} + +func addTime(tm *time.Time, d time.Duration) *time.Time { + if tm == nil { + return nil + } + t := (*tm).Add(d) + return &t +} + +type display struct { + c console.Console + phase string + lineCount int + repeated bool +} + +func (disp *display) getSize() (int, int) { + width := 80 + height := 10 + if disp.c != nil { + size, err := disp.c.Size() + if err == nil && size.Width > 0 && size.Height > 0 { + width = int(size.Width) + height = int(size.Height) + } + } + return width, height +} + +func setupTerminals(jobs []*job, height int, all bool) []*job { + var candidates []*job + numInUse := 0 + for _, j := range jobs { + if j.vertex != nil && j.vertex.termBytes > 0 && j.completedTime == nil { + candidates = append(candidates, j) + } + if j.completedTime == nil { + numInUse++ + } + } + sort.Slice(candidates, func(i, j int) bool { + idxI := candidates[i].vertex.termBytes + candidates[i].vertex.termCount*50 + idxJ := candidates[j].vertex.termBytes + candidates[j].vertex.termCount*50 + return idxI > idxJ + }) + + numFree := height - 2 - numInUse + numToHide := 0 + termLimit := termHeight + 3 + + for i := 0; numFree > termLimit && i < len(candidates); i++ { + candidates[i].showTerm = true + numToHide += candidates[i].vertex.term.UsedHeight() + numFree -= termLimit + } + + if !all { + jobs = wrapHeight(jobs, height-2-numToHide) + } + + return jobs +} + +func (disp *display) print(d displayInfo, width, height int, all bool) { + // this output is inspired by Buck + d.jobs = setupTerminals(d.jobs, height, all) + b := aec.EmptyBuilder + for i := 0; i <= disp.lineCount; i++ { + b = b.Up(1) + } + if !disp.repeated { + b = b.Down(1) + } + disp.repeated = true + fmt.Fprint(disp.c, b.Column(0).ANSI) + + statusStr := "" + if d.countCompleted > 0 && d.countCompleted == d.countTotal && all { + statusStr = "FINISHED" + } + + fmt.Fprint(disp.c, aec.Hide) + defer fmt.Fprint(disp.c, aec.Show) + + out := fmt.Sprintf("[+] %s %.1fs (%d/%d) %s", disp.phase, time.Since(d.startTime).Seconds(), d.countCompleted, d.countTotal, statusStr) + out = align(out, "", width) + fmt.Fprintln(disp.c, out) + lineCount := 0 + for _, j := range d.jobs { + endTime := time.Now() + if j.completedTime != nil { + endTime = *j.completedTime + } + if j.startTime == nil { + continue + } + dt := endTime.Sub(*j.startTime).Seconds() + if dt < 0.05 { + dt = 0 + } + pfx := " => " + timer := fmt.Sprintf(" %3.1fs\n", dt) + status := j.status + showStatus := false + + left := width - len(pfx) - len(timer) - 1 + if status != "" { + if left+len(status) > 20 { + showStatus = true + left -= len(status) + 1 + } + } + if left < 12 { // too small screen to show progress + continue + } + name := j.name + if len(name) > left { + name = name[:left] + } + + out := pfx + name + if showStatus { + out += " " + status + } + + out = align(out, timer, width) + if j.completedTime != nil { + color := aec.BlueF + if j.isCanceled { + color = aec.YellowF + } else if j.hasError { + color = aec.RedF + } + out = aec.Apply(out, color) + } + fmt.Fprint(disp.c, out) + lineCount++ + if j.showTerm { + term := j.vertex.term + term.Resize(termHeight, width-termPad) + for _, l := range term.Content { + if !isEmpty(l) { + out := aec.Apply(fmt.Sprintf(" => => # %s\n", string(l)), aec.Faint) + fmt.Fprint(disp.c, out) + lineCount++ + } + } + j.vertex.termCount++ + j.showTerm = false + } + } + // override previous content + if diff := disp.lineCount - lineCount; diff > 0 { + for i := 0; i < diff; i++ { + fmt.Fprintln(disp.c, strings.Repeat(" ", width)) + } + fmt.Fprint(disp.c, aec.EmptyBuilder.Up(uint(diff)).Column(0).ANSI) + } + disp.lineCount = lineCount +} + +func isEmpty(l []rune) bool { + for _, r := range l { + if r != ' ' { + return false + } + } + return true +} + +func align(l, r string, w int) string { + return fmt.Sprintf("%-[2]*[1]s %[3]s", l, w-len(r)-1, r) +} + +func wrapHeight(j []*job, limit int) []*job { + if limit < 0 { + return nil + } + var wrapped []*job + wrapped = append(wrapped, j...) + if len(j) > limit { + wrapped = wrapped[len(j)-limit:] + + // wrap things around if incomplete jobs were cut + var invisible []*job + for _, j := range j[:len(j)-limit] { + if j.completedTime == nil { + invisible = append(invisible, j) + } + } + + if l := len(invisible); l > 0 { + rewrapped := make([]*job, 0, len(wrapped)) + for _, j := range wrapped { + if j.completedTime == nil || l <= 0 { + rewrapped = append(rewrapped, j) + } + l-- + } + freespace := len(wrapped) - len(rewrapped) + wrapped = append(invisible[len(invisible)-freespace:], rewrapped...) + } + } + return wrapped +} diff --git a/pkg/progressui/printer.go b/pkg/progressui/printer.go new file mode 100644 index 00000000..78c0f61d --- /dev/null +++ b/pkg/progressui/printer.go @@ -0,0 +1,293 @@ +package progressui + +import ( + "context" + "fmt" + "io" + "os" + "sort" + "strings" + "time" + + digest "github.com/opencontainers/go-digest" + "github.com/tonistiigi/units" +) + +const antiFlicker = 5 * time.Second +const maxDelay = 10 * time.Second +const minTimeDelta = 5 * time.Second +const minProgressDelta = 0.05 // % + +type lastStatus struct { + Current int64 + Timestamp time.Time +} + +type textMux struct { + w io.Writer + current digest.Digest + last map[string]lastStatus + notFirst bool +} + +func (p *textMux) printVtx(t *trace, dgst digest.Digest) { + if p.last == nil { + p.last = make(map[string]lastStatus) + } + + v, ok := t.byDigest[dgst] + if !ok { + return + } + + if dgst != p.current { + if p.current != "" { + old := t.byDigest[p.current] + if old.logsPartial { + fmt.Fprintln(p.w, "") + } + old.logsOffset = 0 + old.count = 0 + fmt.Fprintf(p.w, "#%d ...\n", old.index) + } + + if p.notFirst { + fmt.Fprintln(p.w, "") + } else { + p.notFirst = true + } + + if os.Getenv("PROGRESS_NO_TRUNC") == "0" { + fmt.Fprintf(p.w, "#%d %s\n", v.index, limitString(v.Name, 72)) + } else { + fmt.Fprintf(p.w, "#%d %s\n", v.index, v.Name) + fmt.Fprintf(p.w, "#%d %s\n", v.index, v.Digest) + } + + } + + if len(v.events) != 0 { + v.logsOffset = 0 + } + for _, ev := range v.events { + fmt.Fprintf(p.w, "#%d %s\n", v.index, ev) + } + v.events = v.events[:0] + + for _, s := range v.statuses { + if _, ok := v.statusUpdates[s.ID]; ok { + doPrint := true + + if last, ok := p.last[s.ID]; ok && s.Completed == nil { + var progressDelta float64 + if s.Total > 0 { + progressDelta = float64(s.Current-last.Current) / float64(s.Total) + } + timeDelta := s.Timestamp.Sub(last.Timestamp) + if progressDelta < minProgressDelta && timeDelta < minTimeDelta { + doPrint = false + } + } + + if !doPrint { + continue + } + + p.last[s.ID] = lastStatus{ + Timestamp: s.Timestamp, + Current: s.Current, + } + + var bytes string + if s.Total != 0 { + bytes = fmt.Sprintf(" %.2f / %.2f", units.Bytes(s.Current), units.Bytes(s.Total)) + } else if s.Current != 0 { + bytes = fmt.Sprintf(" %.2f", units.Bytes(s.Current)) + } + var tm string + endTime := s.Timestamp + if s.Completed != nil { + endTime = *s.Completed + } + if s.Started != nil { + diff := endTime.Sub(*s.Started).Seconds() + if diff > 0.01 { + tm = fmt.Sprintf(" %.1fs", diff) + } + } + if s.Completed != nil { + tm += " done" + } + fmt.Fprintf(p.w, "#%d %s%s%s\n", v.index, s.ID, bytes, tm) + } + } + v.statusUpdates = map[string]struct{}{} + + for i, l := range v.logs { + if i == 0 { + l = l[v.logsOffset:] + } + fmt.Fprintf(p.w, "%s", []byte(l)) + if i != len(v.logs)-1 || !v.logsPartial { + fmt.Fprintln(p.w, "") + } + } + + if len(v.logs) > 0 { + if v.logsPartial { + v.logs = v.logs[len(v.logs)-1:] + v.logsOffset = len(v.logs[0]) + } else { + v.logs = nil + v.logsOffset = 0 + } + } + + p.current = dgst + if v.Completed != nil { + p.current = "" + v.count = 0 + + if v.Error != "" { + if v.logsPartial { + fmt.Fprintln(p.w, "") + } + if strings.HasSuffix(v.Error, context.Canceled.Error()) { + fmt.Fprintf(p.w, "#%d CANCELED\n", v.index) + } else { + fmt.Fprintf(p.w, "#%d ERROR: %s\n", v.index, v.Error) + } + } else if v.Cached { + fmt.Fprintf(p.w, "#%d CACHED\n", v.index) + } else { + tm := "" + if v.Started != nil { + tm = fmt.Sprintf(" %.1fs", v.Completed.Sub(*v.Started).Seconds()) + } + fmt.Fprintf(p.w, "#%d DONE%s\n", v.index, tm) + } + + } + + delete(t.updates, dgst) +} + +func sortCompleted(t *trace, m map[digest.Digest]struct{}) []digest.Digest { + out := make([]digest.Digest, 0, len(m)) + for k := range m { + out = append(out, k) + } + sort.Slice(out, func(i, j int) bool { + return t.byDigest[out[i]].Completed.Before(*t.byDigest[out[j]].Completed) + }) + return out +} + +func (p *textMux) print(t *trace) { + completed := map[digest.Digest]struct{}{} + rest := map[digest.Digest]struct{}{} + + for dgst := range t.updates { + v, ok := t.byDigest[dgst] + if !ok { + continue + } + if v.Vertex.Completed != nil { + completed[dgst] = struct{}{} + } else { + rest[dgst] = struct{}{} + } + } + + current := p.current + + // items that have completed need to be printed first + if _, ok := completed[current]; ok { + p.printVtx(t, current) + } + + for _, dgst := range sortCompleted(t, completed) { + if dgst != current { + p.printVtx(t, dgst) + } + } + + if len(rest) == 0 { + if current != "" { + if v := t.byDigest[current]; v.Started != nil && v.Completed == nil { + return + } + } + // make any open vertex active + for dgst, v := range t.byDigest { + if v.Started != nil && v.Completed == nil { + p.printVtx(t, dgst) + return + } + } + return + } + + // now print the active one + if _, ok := rest[current]; ok { + p.printVtx(t, current) + } + + stats := map[digest.Digest]*vtxStat{} + now := time.Now() + sum := 0.0 + var max digest.Digest + if current != "" { + rest[current] = struct{}{} + } + for dgst := range rest { + v, ok := t.byDigest[dgst] + if !ok { + continue + } + tm := now.Sub(*v.lastBlockTime) + speed := float64(v.count) / tm.Seconds() + overLimit := tm > maxDelay && dgst != current + stats[dgst] = &vtxStat{blockTime: tm, speed: speed, overLimit: overLimit} + sum += speed + if overLimit || max == "" || stats[max].speed < speed { + max = dgst + } + } + for dgst := range stats { + stats[dgst].share = stats[dgst].speed / sum + } + + if _, ok := completed[current]; ok || current == "" { + p.printVtx(t, max) + return + } + + // show items that were hidden + for dgst := range rest { + if stats[dgst].overLimit { + p.printVtx(t, dgst) + return + } + } + + // fair split between vertexes + if 1.0/(1.0-stats[current].share)*antiFlicker.Seconds() < stats[current].blockTime.Seconds() { + p.printVtx(t, max) + return + } +} + +type vtxStat struct { + blockTime time.Duration + speed float64 + share float64 + overLimit bool +} + +func limitString(s string, l int) string { + if len(s) > l { + return s[:l] + "..." + } + return s +}