cleanup: move packages to top level, change vanity URL

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi
2021-05-25 16:53:26 -07:00
parent e13153a284
commit af776b8abe
45 changed files with 74 additions and 74 deletions

208
util/buildkitd/buildkitd.go Normal file
View File

@@ -0,0 +1,208 @@
package buildkitd
import (
"context"
"errors"
"fmt"
"os/exec"
"strings"
"time"
"github.com/docker/distribution/reference"
bk "github.com/moby/buildkit/client"
_ "github.com/moby/buildkit/client/connhelper/dockercontainer" // import the container connection driver
"github.com/rs/zerolog/log"
)
const (
image = "moby/buildkit"
version = "v0.8.3"
imageVersion = image + ":" + version
containerName = "dagger-buildkitd"
volumeName = "dagger-buildkitd"
)
func Start(ctx context.Context) (string, error) {
lg := log.Ctx(ctx)
// Attempt to detect the current buildkit version
currentVersion, err := getBuildkitVersion(ctx)
if err != nil {
// If that failed, it might either be because buildkitd is not running
// or because the docker CLI is out of service.
if err := checkDocker(ctx); err != nil {
return "", err
}
currentVersion = ""
lg.Debug().Msg("no buildkit daemon detected")
} else {
lg.Debug().Str("version", currentVersion).Msg("detected buildkit version")
}
if currentVersion != version {
if currentVersion != "" {
lg.
Info().
Str("version", version).
Msg("upgrading buildkit")
if err := remvoveBuildkit(ctx); err != nil {
return "", err
}
} else {
lg.
Info().
Str("version", version).
Msg("starting buildkit")
}
if err := startBuildkit(ctx); err != nil {
return "", err
}
}
return fmt.Sprintf("docker-container://%s", containerName), nil
}
// ensure the docker CLI is available and properly set up (e.g. permissions to
// communicate with the daemon, etc)
func checkDocker(ctx context.Context) error {
cmd := exec.CommandContext(ctx, "docker", "info")
output, err := cmd.CombinedOutput()
if err != nil {
log.
Ctx(ctx).
Error().
Err(err).
Bytes("output", output).
Msg("failed to run docker")
return err
}
return nil
}
func startBuildkit(ctx context.Context) error {
lg := log.
Ctx(ctx).
With().
Str("version", version).
Logger()
lg.Debug().Msg("pulling buildkit image")
cmd := exec.CommandContext(ctx,
"docker",
"pull",
imageVersion,
)
output, err := cmd.CombinedOutput()
if err != nil {
lg.
Error().
Err(err).
Bytes("output", output).
Msg("failed to pull buildkit image")
return err
}
// FIXME: buildkitd currently runs without network isolation (--net=host)
// in order for containers to be able to reach localhost.
// This is required for things such as kubectl being able to
// reach a KinD/minikube cluster locally
cmd = exec.CommandContext(ctx,
"docker",
"run",
"--net=host",
"-d",
"--restart", "always",
"-v", volumeName+":/var/lib/buildkit",
"--name", containerName,
"--privileged",
imageVersion,
)
output, err = cmd.CombinedOutput()
if err != nil {
// If the daemon failed to start because it's already running,
// chances are another dagger instance started it. We can just ignore
// the error.
if !strings.Contains(string(output), "Error response from daemon: Conflict.") {
log.
Ctx(ctx).
Error().
Err(err).
Bytes("output", output).
Msg("unable to start buildkitd")
return err
}
}
return waitBuildkit(ctx)
}
// waitBuildkit waits for the buildkit daemon to be responsive.
func waitBuildkit(ctx context.Context) error {
c, err := bk.New(ctx, "docker-container://"+containerName)
if err != nil {
return err
}
defer c.Close()
// Try to connect every 100ms up to 50 times (5 seconds total)
const (
retryPeriod = 100 * time.Millisecond
retryAttempts = 50
)
for retry := 0; retry < retryAttempts; retry++ {
_, err = c.ListWorkers(ctx)
if err == nil {
return nil
}
time.Sleep(retryPeriod)
}
return errors.New("buildkit failed to respond")
}
func remvoveBuildkit(ctx context.Context) error {
lg := log.
Ctx(ctx)
cmd := exec.CommandContext(ctx,
"docker",
"rm",
"-fv",
containerName,
)
output, err := cmd.CombinedOutput()
if err != nil {
lg.
Error().
Err(err).
Bytes("output", output).
Msg("failed to stop buildkit")
return err
}
return nil
}
func getBuildkitVersion(ctx context.Context) (string, error) {
cmd := exec.CommandContext(ctx,
"docker",
"inspect",
"--format",
"{{.Config.Image}}",
containerName,
)
output, err := cmd.CombinedOutput()
if err != nil {
return "", err
}
ref, err := reference.ParseNormalizedNamed(strings.TrimSpace(string(output)))
if err != nil {
return "", err
}
tag, ok := ref.(reference.Tagged)
if !ok {
return "", fmt.Errorf("failed to parse image: %s", output)
}
return tag.Tag(), nil
}

655
util/progressui/display.go Normal file
View File

@@ -0,0 +1,655 @@
package progressui
import (
"bytes"
"context"
"fmt"
"io"
"os"
"sort"
"strconv"
"strings"
"time"
"github.com/containerd/console"
"github.com/jaguilar/vt100"
"github.com/moby/buildkit/client"
"github.com/morikuni/aec"
digest "github.com/opencontainers/go-digest"
"github.com/tonistiigi/units"
"golang.org/x/time/rate"
)
const (
defaultTickerTimeout = 150 * time.Millisecond
defaultDisplayTimeout = 100 * time.Millisecond
)
type VertexPrintFunc func(v *client.Vertex, index int)
type StatusPrintFunc func(v *client.Vertex, format string, a ...interface{})
type LogPrintFunc func(v *client.Vertex, stream int, partial bool, format string, a ...interface{})
func PrintSolveStatus(ctx context.Context, ch chan *client.SolveStatus, vertexPrintCb VertexPrintFunc, statusPrintCb StatusPrintFunc, logPrintCb LogPrintFunc) error {
printer := &textMux{
vertexPrintCb: vertexPrintCb,
statusPrintCb: statusPrintCb,
logPrintCb: logPrintCb,
}
t := newTrace(false)
var done bool
ticker := time.NewTicker(defaultTickerTimeout)
defer ticker.Stop()
displayLimiter := rate.NewLimiter(rate.Every(defaultDisplayTimeout), 1)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
case ss, ok := <-ch:
if ok {
t.update(ss, 80)
} else {
done = true
}
}
if done || displayLimiter.Allow() {
printer.print(t)
if done {
t.printErrorLogs(statusPrintCb)
return nil
}
ticker.Stop()
ticker = time.NewTicker(defaultTickerTimeout)
}
}
}
func DisplaySolveStatus(ctx context.Context, phase string, c console.Console, w io.Writer, ch chan *client.SolveStatus) error {
modeConsole := c != nil
if !modeConsole {
vertexPrintCb := func(v *client.Vertex, index int) {
if os.Getenv("PROGRESS_NO_TRUNC") == "0" {
fmt.Fprintf(w, "#%d %s\n", index, limitString(v.Name, 72))
} else {
fmt.Fprintf(w, "#%d %s\n", index, v.Name)
fmt.Fprintf(w, "#%d %s\n", index, v.Digest)
}
}
statusPrintCb := func(v *client.Vertex, format string, a ...interface{}) {
fmt.Fprintf(w, fmt.Sprintf("%s\n", format), a...)
}
logPrintCb := func(v *client.Vertex, stream int, partial bool, format string, a ...interface{}) {
if partial {
fmt.Fprintf(w, format, a...)
} else {
fmt.Fprintf(w, fmt.Sprintf("%s\n", format), a...)
}
}
return PrintSolveStatus(ctx, ch, vertexPrintCb, statusPrintCb, logPrintCb)
}
disp := &display{c: c, phase: phase}
if disp.phase == "" {
disp.phase = "Building"
}
t := newTrace(true)
tickerTimeout := defaultTickerTimeout
displayTimeout := defaultDisplayTimeout
if v := os.Getenv("TTY_DISPLAY_RATE"); v != "" {
if r, err := strconv.ParseInt(v, 10, 64); err == nil {
tickerTimeout = time.Duration(r) * time.Millisecond
displayTimeout = time.Duration(r) * time.Millisecond
}
}
var done bool
ticker := time.NewTicker(tickerTimeout)
defer ticker.Stop()
displayLimiter := rate.NewLimiter(rate.Every(displayTimeout), 1)
var height int
width, _ := disp.getSize()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
case ss, ok := <-ch:
if ok {
t.update(ss, width)
} else {
done = true
}
}
width, height = disp.getSize()
if done {
disp.print(t.displayInfo(), width, height, true)
t.printErrorLogs(func(v *client.Vertex, format string, a ...interface{}) {
fmt.Fprintf(w, format, a...)
})
return nil
} else if displayLimiter.Allow() {
ticker.Stop()
ticker = time.NewTicker(tickerTimeout)
disp.print(t.displayInfo(), width, height, false)
}
}
}
const termHeight = 6
const termPad = 10
type displayInfo struct {
startTime time.Time
jobs []*job
countTotal int
countCompleted int
}
type job struct {
startTime *time.Time
completedTime *time.Time
name string
status string
hasError bool
isCanceled bool
vertex *vertex
showTerm bool
}
type trace struct {
localTimeDiff time.Duration
vertexes []*vertex
byDigest map[digest.Digest]*vertex
nextIndex int
updates map[digest.Digest]struct{}
modeConsole bool
}
type log struct {
index int
line []byte
stream int
}
type vertex struct {
*client.Vertex
statuses []*status
byID map[string]*status
indent string
index int
logs []log
logsPartial bool
logsOffset int
prev *client.Vertex
events []string
lastBlockTime *time.Time
count int
statusUpdates map[string]struct{}
jobs []*job
jobCached bool
term *vt100.VT100
termBytes int
termCount int
}
func (v *vertex) update(c int) {
if v.count == 0 {
now := time.Now()
v.lastBlockTime = &now
}
v.count += c
}
type status struct {
*client.VertexStatus
}
func newTrace(modeConsole bool) *trace {
return &trace{
byDigest: make(map[digest.Digest]*vertex),
updates: make(map[digest.Digest]struct{}),
modeConsole: modeConsole,
}
}
func (t *trace) triggerVertexEvent(v *client.Vertex) {
if v.Started == nil {
return
}
var old client.Vertex
vtx := t.byDigest[v.Digest]
if v := vtx.prev; v != nil {
old = *v
}
changed := false
if v.Digest != old.Digest {
changed = true
}
if v.Name != old.Name {
changed = true
}
if v.Started != old.Started {
if v.Started != nil && old.Started == nil || !v.Started.Equal(*old.Started) {
changed = true
}
}
if v.Completed != old.Completed && v.Completed != nil {
changed = true
}
if v.Cached != old.Cached {
changed = true
}
if v.Error != old.Error {
changed = true
}
if changed {
vtx.update(1)
t.updates[v.Digest] = struct{}{}
}
t.byDigest[v.Digest].prev = v
}
func (t *trace) update(s *client.SolveStatus, termWidth int) {
for _, v := range s.Vertexes {
prev, ok := t.byDigest[v.Digest]
if !ok {
t.nextIndex++
t.byDigest[v.Digest] = &vertex{
byID: make(map[string]*status),
statusUpdates: make(map[string]struct{}),
index: t.nextIndex,
}
if t.modeConsole {
t.byDigest[v.Digest].term = vt100.NewVT100(termHeight, termWidth-termPad)
}
}
t.triggerVertexEvent(v)
if v.Started != nil && (prev == nil || prev.Started == nil) {
if t.localTimeDiff == 0 {
t.localTimeDiff = time.Since(*v.Started)
}
t.vertexes = append(t.vertexes, t.byDigest[v.Digest])
}
// allow a duplicate initial vertex that shouldn't reset state
if !(prev != nil && prev.Started != nil && v.Started == nil) {
t.byDigest[v.Digest].Vertex = v
}
t.byDigest[v.Digest].jobCached = false
}
for _, s := range s.Statuses {
v, ok := t.byDigest[s.Vertex]
if !ok {
continue // shouldn't happen
}
v.jobCached = false
prev, ok := v.byID[s.ID]
if !ok {
v.byID[s.ID] = &status{VertexStatus: s}
}
if s.Started != nil && (prev == nil || prev.Started == nil) {
v.statuses = append(v.statuses, v.byID[s.ID])
}
v.byID[s.ID].VertexStatus = s
v.statusUpdates[s.ID] = struct{}{}
t.updates[v.Digest] = struct{}{}
v.update(1)
}
for _, l := range s.Logs {
l := l
v, ok := t.byDigest[l.Vertex]
if !ok {
continue // shouldn't happen
}
v.jobCached = false
if v.term != nil {
if v.term.Width != termWidth {
v.term.Resize(termHeight, termWidth-termPad)
}
v.termBytes += len(l.Data)
v.term.Write(l.Data) // error unhandled on purpose. don't trust vt100
}
i := 0
complete := split(l.Data, byte('\n'), func(dt []byte) {
if v.logsPartial && len(v.logs) != 0 && i == 0 && v.logs[len(v.logs)-1].stream == l.Stream {
v.logs[len(v.logs)-1].line = append(v.logs[len(v.logs)-1].line, dt...)
} else {
ts := time.Duration(0)
if v.Started != nil {
ts = l.Timestamp.Sub(*v.Started)
}
prec := 1
sec := ts.Seconds()
if sec < 10 {
prec = 3
} else if sec < 100 {
prec = 2
}
v.logs = append(v.logs, log{
line: []byte(fmt.Sprintf("#%d %s %s", v.index, fmt.Sprintf("%.[2]*[1]f", sec, prec), dt)),
stream: l.Stream,
index: v.index,
})
}
i++
})
v.logsPartial = !complete
t.updates[v.Digest] = struct{}{}
v.update(1)
}
}
func (t *trace) printErrorLogs(printCb StatusPrintFunc) {
for _, v := range t.vertexes {
if v.Error != "" && !strings.HasSuffix(v.Error, context.Canceled.Error()) {
printCb(v.Vertex, "------")
printCb(v.Vertex, " > %s:", v.Name)
for _, l := range v.logs {
printCb(v.Vertex, "%s", l.line)
}
printCb(v.Vertex, "------")
}
}
}
func (t *trace) displayInfo() (d displayInfo) {
d.startTime = time.Now()
if t.localTimeDiff != 0 {
d.startTime = t.vertexes[0].Started.Add(t.localTimeDiff)
}
d.countTotal = len(t.byDigest)
for _, v := range t.byDigest {
if v.Completed != nil {
d.countCompleted++
}
}
for _, v := range t.vertexes {
if v.jobCached {
d.jobs = append(d.jobs, v.jobs...)
continue
}
var jobs []*job
j := &job{
startTime: addTime(v.Started, t.localTimeDiff),
completedTime: addTime(v.Completed, t.localTimeDiff),
name: strings.ReplaceAll(v.Name, "\t", " "),
vertex: v,
}
if v.Error != "" {
if strings.HasSuffix(v.Error, context.Canceled.Error()) {
j.isCanceled = true
j.name = "CANCELED " + j.name
} else {
j.hasError = true
j.name = "ERROR " + j.name
}
}
if v.Cached {
j.name = "CACHED " + j.name
}
j.name = v.indent + j.name
jobs = append(jobs, j)
for _, s := range v.statuses {
j := &job{
startTime: addTime(s.Started, t.localTimeDiff),
completedTime: addTime(s.Completed, t.localTimeDiff),
name: v.indent + "=> " + s.ID,
}
if s.Total != 0 {
j.status = fmt.Sprintf("%.2f / %.2f", units.Bytes(s.Current), units.Bytes(s.Total))
} else if s.Current != 0 {
j.status = fmt.Sprintf("%.2f", units.Bytes(s.Current))
}
jobs = append(jobs, j)
}
d.jobs = append(d.jobs, jobs...)
v.jobs = jobs
v.jobCached = true
}
return d
}
func split(dt []byte, sep byte, fn func([]byte)) bool {
if len(dt) == 0 {
return false
}
for {
if len(dt) == 0 {
return true
}
idx := bytes.IndexByte(dt, sep)
if idx == -1 {
fn(dt)
return false
}
fn(dt[:idx])
dt = dt[idx+1:]
}
}
func addTime(tm *time.Time, d time.Duration) *time.Time {
if tm == nil {
return nil
}
t := tm.Add(d)
return &t
}
type display struct {
c console.Console
phase string
lineCount int
repeated bool
}
func (disp *display) getSize() (int, int) {
width := 80
height := 10
if disp.c != nil {
size, err := disp.c.Size()
if err == nil && size.Width > 0 && size.Height > 0 {
width = int(size.Width)
height = int(size.Height)
}
}
return width, height
}
func setupTerminals(jobs []*job, height int, all bool) []*job {
var candidates []*job
numInUse := 0
for _, j := range jobs {
if j.vertex != nil && j.vertex.termBytes > 0 && j.completedTime == nil {
candidates = append(candidates, j)
}
if j.completedTime == nil {
numInUse++
}
}
sort.Slice(candidates, func(i, j int) bool {
idxI := candidates[i].vertex.termBytes + candidates[i].vertex.termCount*50
idxJ := candidates[j].vertex.termBytes + candidates[j].vertex.termCount*50
return idxI > idxJ
})
numFree := height - 2 - numInUse
numToHide := 0
termLimit := termHeight + 3
for i := 0; numFree > termLimit && i < len(candidates); i++ {
candidates[i].showTerm = true
numToHide += candidates[i].vertex.term.UsedHeight()
numFree -= termLimit
}
if !all {
jobs = wrapHeight(jobs, height-2-numToHide)
}
return jobs
}
func (disp *display) print(d displayInfo, width, height int, all bool) {
// this output is inspired by Buck
d.jobs = setupTerminals(d.jobs, height, all)
b := aec.EmptyBuilder
for i := 0; i <= disp.lineCount; i++ {
b = b.Up(1)
}
if !disp.repeated {
b = b.Down(1)
}
disp.repeated = true
fmt.Fprint(disp.c, b.Column(0).ANSI)
statusStr := ""
if d.countCompleted > 0 && d.countCompleted == d.countTotal && all {
statusStr = "FINISHED"
}
fmt.Fprint(disp.c, aec.Hide)
defer fmt.Fprint(disp.c, aec.Show)
out := fmt.Sprintf("[+] %s %.1fs (%d/%d) %s", disp.phase, time.Since(d.startTime).Seconds(), d.countCompleted, d.countTotal, statusStr)
out = align(out, "", width)
fmt.Fprintln(disp.c, out)
lineCount := 0
for _, j := range d.jobs {
endTime := time.Now()
if j.completedTime != nil {
endTime = *j.completedTime
}
if j.startTime == nil {
continue
}
dt := endTime.Sub(*j.startTime).Seconds()
if dt < 0.05 {
dt = 0
}
pfx := " => "
timer := fmt.Sprintf(" %3.1fs\n", dt)
status := j.status
showStatus := false
left := width - len(pfx) - len(timer) - 1
if status != "" {
if left+len(status) > 20 {
showStatus = true
left -= len(status) + 1
}
}
if left < 12 { // too small screen to show progress
continue
}
name := j.name
if len(name) > left {
name = name[:left]
}
out := pfx + name
if showStatus {
out += " " + status
}
out = align(out, timer, width)
if j.completedTime != nil {
color := aec.BlueF
if j.isCanceled {
color = aec.YellowF
} else if j.hasError {
color = aec.RedF
}
out = aec.Apply(out, color)
}
fmt.Fprint(disp.c, out)
lineCount++
if j.showTerm {
term := j.vertex.term
term.Resize(termHeight, width-termPad)
for _, l := range term.Content {
if !isEmpty(l) {
out := aec.Apply(fmt.Sprintf(" => => # %s\n", string(l)), aec.Faint)
fmt.Fprint(disp.c, out)
lineCount++
}
}
j.vertex.termCount++
j.showTerm = false
}
}
// override previous content
if diff := disp.lineCount - lineCount; diff > 0 {
for i := 0; i < diff; i++ {
fmt.Fprintln(disp.c, strings.Repeat(" ", width))
}
fmt.Fprint(disp.c, aec.EmptyBuilder.Up(uint(diff)).Column(0).ANSI)
}
disp.lineCount = lineCount
}
func isEmpty(l []rune) bool {
for _, r := range l {
if r != ' ' {
return false
}
}
return true
}
func align(l, r string, w int) string {
return fmt.Sprintf("%-[2]*[1]s %[3]s", l, w-len(r)-1, r)
}
func wrapHeight(j []*job, limit int) []*job {
if limit < 0 {
return nil
}
var wrapped []*job
wrapped = append(wrapped, j...)
if len(j) > limit {
wrapped = wrapped[len(j)-limit:]
// wrap things around if incomplete jobs were cut
var invisible []*job
for _, j := range j[:len(j)-limit] {
if j.completedTime == nil {
invisible = append(invisible, j)
}
}
if l := len(invisible); l > 0 {
rewrapped := make([]*job, 0, len(wrapped))
for _, j := range wrapped {
if j.completedTime == nil || l <= 0 {
rewrapped = append(rewrapped, j)
}
l--
}
freespace := len(wrapped) - len(rewrapped)
invisible = append(invisible[len(invisible)-freespace:], rewrapped...)
wrapped = invisible
}
}
return wrapped
}

286
util/progressui/printer.go Normal file
View File

@@ -0,0 +1,286 @@
package progressui
import (
"context"
"fmt"
"sort"
"strings"
"time"
digest "github.com/opencontainers/go-digest"
"github.com/tonistiigi/units"
)
const antiFlicker = 5 * time.Second
const maxDelay = 10 * time.Second
const minTimeDelta = 5 * time.Second
const minProgressDelta = 0.05 // %
type lastStatus struct {
Current int64
Timestamp time.Time
}
type textMux struct {
vertexPrintCb VertexPrintFunc
statusPrintCb StatusPrintFunc
logPrintCb LogPrintFunc
current digest.Digest
last map[string]lastStatus
notFirst bool
}
func (p *textMux) printVtx(t *trace, dgst digest.Digest) {
if p.last == nil {
p.last = make(map[string]lastStatus)
}
v, ok := t.byDigest[dgst]
if !ok {
return
}
if dgst != p.current {
if p.current != "" {
old := t.byDigest[p.current]
if old.logsPartial {
p.statusPrintCb(v.Vertex, "")
}
old.logsOffset = 0
old.count = 0
p.statusPrintCb(v.Vertex, "#%d ...", old.index)
}
if p.notFirst {
p.statusPrintCb(v.Vertex, "")
} else {
p.notFirst = true
}
p.vertexPrintCb(v.Vertex, v.index)
}
if len(v.events) != 0 {
v.logsOffset = 0
}
for _, ev := range v.events {
p.statusPrintCb(v.Vertex, "#%d %s", v.index, ev)
}
v.events = v.events[:0]
for _, s := range v.statuses {
if _, ok := v.statusUpdates[s.ID]; ok {
doPrint := true
if last, ok := p.last[s.ID]; ok && s.Completed == nil {
var progressDelta float64
if s.Total > 0 {
progressDelta = float64(s.Current-last.Current) / float64(s.Total)
}
timeDelta := s.Timestamp.Sub(last.Timestamp)
if progressDelta < minProgressDelta && timeDelta < minTimeDelta {
doPrint = false
}
}
if !doPrint {
continue
}
p.last[s.ID] = lastStatus{
Timestamp: s.Timestamp,
Current: s.Current,
}
var bytes string
if s.Total != 0 {
bytes = fmt.Sprintf(" %.2f / %.2f", units.Bytes(s.Current), units.Bytes(s.Total))
} else if s.Current != 0 {
bytes = fmt.Sprintf(" %.2f", units.Bytes(s.Current))
}
var tm string
endTime := s.Timestamp
if s.Completed != nil {
endTime = *s.Completed
}
if s.Started != nil {
diff := endTime.Sub(*s.Started).Seconds()
if diff > 0.01 {
tm = fmt.Sprintf(" %.1fs", diff)
}
}
if s.Completed != nil {
tm += " done"
}
p.statusPrintCb(v.Vertex, "#%d %s%s%s", v.index, s.ID, bytes, tm)
}
}
v.statusUpdates = map[string]struct{}{}
for i, l := range v.logs {
line := l.line
if i == 0 {
line = line[v.logsOffset:]
}
complete := i != len(v.logs)-1 || !v.logsPartial
p.logPrintCb(v.Vertex, l.stream, !complete, "%s", line)
}
if len(v.logs) > 0 {
if v.logsPartial {
v.logs = v.logs[len(v.logs)-1:]
v.logsOffset = len(v.logs[0].line)
} else {
v.logs = nil
v.logsOffset = 0
}
}
p.current = dgst
if v.Completed != nil {
p.current = ""
v.count = 0
if v.Error != "" {
if v.logsPartial {
p.statusPrintCb(v.Vertex, "")
}
if strings.HasSuffix(v.Error, context.Canceled.Error()) {
p.statusPrintCb(v.Vertex, "#%d CANCELED", v.index)
} else {
p.statusPrintCb(v.Vertex, "#%d ERROR: %s", v.index, v.Error)
}
} else if v.Cached {
p.statusPrintCb(v.Vertex, "#%d CACHED", v.index)
} else {
tm := ""
if v.Started != nil {
tm = fmt.Sprintf(" %.1fs", v.Completed.Sub(*v.Started).Seconds())
}
p.statusPrintCb(v.Vertex, "#%d DONE%s", v.index, tm)
}
}
delete(t.updates, dgst)
}
func sortCompleted(t *trace, m map[digest.Digest]struct{}) []digest.Digest {
out := make([]digest.Digest, 0, len(m))
for k := range m {
out = append(out, k)
}
sort.Slice(out, func(i, j int) bool {
return t.byDigest[out[i]].Completed.Before(*t.byDigest[out[j]].Completed)
})
return out
}
func (p *textMux) print(t *trace) {
completed := map[digest.Digest]struct{}{}
rest := map[digest.Digest]struct{}{}
for dgst := range t.updates {
v, ok := t.byDigest[dgst]
if !ok {
continue
}
if v.Vertex.Completed != nil {
completed[dgst] = struct{}{}
} else {
rest[dgst] = struct{}{}
}
}
current := p.current
// items that have completed need to be printed first
if _, ok := completed[current]; ok {
p.printVtx(t, current)
}
for _, dgst := range sortCompleted(t, completed) {
if dgst != current {
p.printVtx(t, dgst)
}
}
if len(rest) == 0 {
if current != "" {
if v := t.byDigest[current]; v.Started != nil && v.Completed == nil {
return
}
}
// make any open vertex active
for dgst, v := range t.byDigest {
if v.Started != nil && v.Completed == nil {
p.printVtx(t, dgst)
return
}
}
return
}
// now print the active one
if _, ok := rest[current]; ok {
p.printVtx(t, current)
}
stats := map[digest.Digest]*vtxStat{}
now := time.Now()
sum := 0.0
var max digest.Digest
if current != "" {
rest[current] = struct{}{}
}
for dgst := range rest {
v, ok := t.byDigest[dgst]
if !ok {
continue
}
tm := now.Sub(*v.lastBlockTime)
speed := float64(v.count) / tm.Seconds()
overLimit := tm > maxDelay && dgst != current
stats[dgst] = &vtxStat{blockTime: tm, speed: speed, overLimit: overLimit}
sum += speed
if overLimit || max == "" || stats[max].speed < speed {
max = dgst
}
}
for dgst := range stats {
stats[dgst].share = stats[dgst].speed / sum
}
if _, ok := completed[current]; ok || current == "" {
p.printVtx(t, max)
return
}
// show items that were hidden
for dgst := range rest {
if stats[dgst].overLimit {
p.printVtx(t, dgst)
return
}
}
// fair split between vertexes
if 1.0/(1.0-stats[current].share)*antiFlicker.Seconds() < stats[current].blockTime.Seconds() {
p.printVtx(t, max)
return
}
}
type vtxStat struct {
blockTime time.Duration
speed float64
share float64
overLimit bool
}
func limitString(s string, l int) string {
if len(s) > l {
return s[:l] + "..."
}
return s
}