diff --git a/.golangci.yml b/.golangci.yml index 796dff50..2a3a2233 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,6 +1,10 @@ +run: + skip-dirs: + # progressui is a modified 3rd party library from buildkit + - pkg/progressui + linters: disable-all: true - timeout: 30m enable: - bodyclose - deadcode diff --git a/cmd/dagger/cmd/compute.go b/cmd/dagger/cmd/compute.go index a60b40a4..927483a2 100644 --- a/cmd/dagger/cmd/compute.go +++ b/cmd/dagger/cmd/compute.go @@ -43,21 +43,18 @@ var computeCmd = &cobra.Command{ if err := env.SetUpdater(updater.Value()); err != nil { lg.Fatal().Err(err).Msg("invalid updater script") } - lg.Debug().Str("input", input.Value().SourceUnsafe()).Msg("Setting input") + lg.Debug().Str("input", input.Value().SourceUnsafe()).Msg("setting input") if err := env.SetInput(input.Value()); err != nil { lg.Fatal().Err(err).Msg("invalid input") } - lg.Debug().Str("env state", env.State().SourceUnsafe()).Msg("creating client") c, err := dagger.NewClient(ctx, "") if err != nil { lg.Fatal().Err(err).Msg("unable to create client") } - lg.Info().Msg("running") output, err := c.Compute(ctx, env) if err != nil { lg.Fatal().Err(err).Msg("failed to compute") } - lg.Info().Msg("processing output") fmt.Println(output.JSON()) }, } diff --git a/cmd/dagger/logger/console.go b/cmd/dagger/logger/console.go new file mode 100644 index 00000000..99dbfc6b --- /dev/null +++ b/cmd/dagger/logger/console.go @@ -0,0 +1,192 @@ +package logger + +import ( + "bytes" + "encoding/json" + "fmt" + "hash/adler32" + "io" + "strings" + "time" + + "github.com/mitchellh/colorstring" + "github.com/rs/zerolog" +) + +var colorize = colorstring.Colorize{ + Colors: colorstring.DefaultColors, + Reset: true, +} + +type Console struct { + Out io.Writer + maxLength int +} + +func (c *Console) Write(p []byte) (n int, err error) { + event := map[string]interface{}{} + d := json.NewDecoder(bytes.NewReader(p)) + if err := d.Decode(&event); err != nil { + return n, fmt.Errorf("cannot decode event: %s", err) + } + + source := c.parseSource(event) + if len(source) > c.maxLength { + c.maxLength = len(source) + } + + return fmt.Fprintln(c.Out, + colorize.Color(fmt.Sprintf("%s %s %s%s%s", + c.formatTimestamp(event), + c.formatLevel(event), + c.formatSource(source), + c.formatMessage(event), + c.formatFields(event), + ))) +} + +func (c *Console) formatLevel(event map[string]interface{}) string { + level := zerolog.DebugLevel + if l, ok := event[zerolog.LevelFieldName].(string); ok { + level, _ = zerolog.ParseLevel(l) + } + + switch level { + case zerolog.TraceLevel: + return "[magenta]TRC[reset]" + case zerolog.DebugLevel: + return "[yellow]DBG[reset]" + case zerolog.InfoLevel: + return "[green]INF[reset]" + case zerolog.WarnLevel: + return "[red]WRN[reset]" + case zerolog.ErrorLevel: + return "[red]ERR[reset]" + case zerolog.FatalLevel: + return "[red]FTL[reset]" + case zerolog.PanicLevel: + return "[red]PNC[reset]" + default: + return "[bold]???[reset]" + } +} + +func (c *Console) formatTimestamp(event map[string]interface{}) string { + ts, ok := event[zerolog.TimestampFieldName].(string) + if !ok { + return "???" + } + + t, err := time.Parse(zerolog.TimeFieldFormat, ts) + if err != nil { + panic(err) + } + return fmt.Sprintf("[dark_gray]%s[reset]", t.Format(time.Kitchen)) +} + +func (c *Console) formatMessage(event map[string]interface{}) string { + message, ok := event[zerolog.MessageFieldName].(string) + if !ok { + return "" + } + message = strings.TrimSpace(message) + + if err, ok := event[zerolog.ErrorFieldName].(string); ok && err != "" { + message = message + ": " + err + } + + level := zerolog.DebugLevel + if l, ok := event[zerolog.LevelFieldName].(string); ok { + level, _ = zerolog.ParseLevel(l) + } + + switch level { + case zerolog.TraceLevel: + return fmt.Sprintf("[dim]%s[reset]", message) + case zerolog.DebugLevel: + return fmt.Sprintf("[dim]%s[reset]", message) + case zerolog.InfoLevel: + return message + case zerolog.WarnLevel: + return fmt.Sprintf("[yellow]%s[reset]", message) + case zerolog.ErrorLevel: + return fmt.Sprintf("[red]%s[reset]", message) + case zerolog.FatalLevel: + return fmt.Sprintf("[red]%s[reset]", message) + case zerolog.PanicLevel: + return fmt.Sprintf("[red]%s[reset]", message) + default: + return message + } +} + +func (c *Console) parseSource(event map[string]interface{}) string { + source := "system" + if task, ok := event["component"].(string); ok && task != "" { + source = task + } + return source +} + +func (c *Console) formatSource(source string) string { + return fmt.Sprintf("[%s]%s | [reset]", + hashColor(source), + source, + ) +} + +func (c *Console) formatFields(entry map[string]interface{}) string { + // these are the fields we don't want to expose, either because they're + // already part of the Log structure or because they're internal + fieldSkipList := map[string]struct{}{ + zerolog.MessageFieldName: {}, + zerolog.LevelFieldName: {}, + zerolog.TimestampFieldName: {}, + zerolog.ErrorFieldName: {}, + zerolog.CallerFieldName: {}, + "component": {}, + } + + fields := []string{} + for key, value := range entry { + if _, ok := fieldSkipList[key]; ok { + continue + } + switch v := value.(type) { + case string: + fields = append(fields, fmt.Sprintf("%s=%s", key, v)) + case int: + fields = append(fields, fmt.Sprintf("%s=%v", key, v)) + case float64: + dur := time.Duration(v) * time.Millisecond + s := dur.Round(100 * time.Millisecond).String() + fields = append(fields, fmt.Sprintf("%s=%s", key, s)) + case nil: + fields = append(fields, fmt.Sprintf("%s=null", key)) + } + } + + if len(fields) == 0 { + return "" + } + return fmt.Sprintf(" [dim]%s[reset]", strings.Join(fields, " ")) +} + +// hashColor returns a consistent color for a given string +func hashColor(text string) string { + colors := []string{ + "green", + "light_green", + "light_blue", + "blue", + "magenta", + "light_magenta", + "light_yellow", + "cyan", + "light_cyan", + "red", + "light_red", + } + h := adler32.Checksum([]byte(text)) + return colors[int(h)%len(colors)] +} diff --git a/cmd/dagger/logger/logger.go b/cmd/dagger/logger/logger.go index a0c2a55f..56da3d6a 100644 --- a/cmd/dagger/logger/logger.go +++ b/cmd/dagger/logger/logger.go @@ -22,7 +22,7 @@ func New() zerolog.Logger { Logger() if prettyLogs() { - logger = logger.Output(zerolog.ConsoleWriter{Out: os.Stderr}) + logger = logger.Output(&Console{Out: os.Stderr}) } else { logger = logger.With().Timestamp().Caller().Logger() } diff --git a/dagger/build.go b/dagger/build.go index 497dcaf5..8ee112f3 100644 --- a/dagger/build.go +++ b/dagger/build.go @@ -37,7 +37,7 @@ func CueBuild(ctx context.Context, fs FS, args ...string) (*compiler.Value, erro // Add the config files on top of the overlay err = fs.Walk(ctx, func(p string, f Stat) error { - lg.Debug().Str("path", p).Msg("Compiler.Build: processing") + lg.Debug().Str("path", p).Msg("load") if f.IsDir() { return nil } diff --git a/dagger/client.go b/dagger/client.go index 3f808bc8..28006019 100644 --- a/dagger/client.go +++ b/dagger/client.go @@ -22,8 +22,7 @@ import ( bkgw "github.com/moby/buildkit/frontend/gateway/client" // docker output - "github.com/containerd/console" - "github.com/moby/buildkit/util/progress/progressui" + "dagger.io/go/pkg/progressui" "dagger.io/go/dagger/compiler" ) @@ -56,18 +55,16 @@ func NewClient(ctx context.Context, host string) (*Client, error) { // FIXME: return completed *Env, instead of *compiler.Value func (c *Client) Compute(ctx context.Context, env *Env) (*compiler.Value, error) { lg := log.Ctx(ctx) - eg, gctx := errgroup.WithContext(ctx) // Spawn print function - var events chan *bk.SolveStatus - if os.Getenv("DOCKER_OUTPUT") != "" { - events = make(chan *bk.SolveStatus) - eg.Go(func() error { - dispCtx := context.TODO() - return c.dockerprintfn(dispCtx, events, lg) - }) - } + events := make(chan *bk.SolveStatus) + eg.Go(func() error { + // Create a background context so that logging will not be cancelled + // with the main context. + dispCtx := lg.WithContext(context.Background()) + return c.logSolveStatus(dispCtx, events) + }) // Spawn build function outr, outw := io.Pipe() @@ -126,20 +123,24 @@ func (c *Client) buildfn(ctx context.Context, env *Env, ch chan *bk.SolveStatus, resp, err := c.c.Build(ctx, opts, "", func(ctx context.Context, c bkgw.Client) (*bkgw.Result, error) { s := NewSolver(c) + lg.Debug().Msg("loading configuration") if err := env.Update(ctx, s); err != nil { return nil, err } - lg.Debug().Msg("computing env") + // Compute output overlay + lg.Debug().Msg("computing env") if err := env.Compute(ctx, s); err != nil { return nil, err } - lg.Debug().Msg("exporting env") + // Export env to a cue directory + lg.Debug().Msg("exporting env") outdir, err := env.Export(s.Scratch()) if err != nil { return nil, err } + // Wrap cue directory in buildkit result return outdir.Result(ctx) }, ch) @@ -196,8 +197,68 @@ func (c *Client) outputfn(ctx context.Context, r io.Reader) (*compiler.Value, er return out, nil } -func (c *Client) dockerprintfn(ctx context.Context, ch chan *bk.SolveStatus, out io.Writer) error { - var cons console.Console - // FIXME: use smarter writer from blr - return progressui.DisplaySolveStatus(ctx, "", cons, out, ch) +func (c *Client) logSolveStatus(ctx context.Context, ch chan *bk.SolveStatus) error { + parseName := func(v *bk.Vertex) (string, string) { + // Pattern: `@name@ message`. Minimal length is len("@X@ ") + if len(v.Name) < 2 || !strings.HasPrefix(v.Name, "@") { + return "", v.Name + } + + prefixEndPos := strings.Index(v.Name[1:], "@") + if prefixEndPos == -1 { + return "", v.Name + } + + component := v.Name[1 : prefixEndPos+1] + return component, v.Name[prefixEndPos+3 : len(v.Name)] + } + + return progressui.PrintSolveStatus(ctx, ch, + func(v *bk.Vertex, index int) { + component, name := parseName(v) + lg := log. + Ctx(ctx). + With(). + Str("component", component). + Logger() + + lg. + Debug(). + Msg(fmt.Sprintf("#%d %s\n", index, name)) + lg. + Debug(). + Msg(fmt.Sprintf("#%d %s\n", index, v.Digest)) + }, + func(v *bk.Vertex, format string, a ...interface{}) { + component, _ := parseName(v) + lg := log. + Ctx(ctx). + With(). + Str("component", component). + Logger() + + lg. + Debug(). + Msg(fmt.Sprintf(format, a...)) + }, + func(v *bk.Vertex, stream int, partial bool, format string, a ...interface{}) { + component, _ := parseName(v) + lg := log. + Ctx(ctx). + With(). + Str("component", component). + Logger() + + switch stream { + case 1: + lg. + Info(). + Msg(fmt.Sprintf(format, a...)) + case 2: + lg. + Error(). + Msg(fmt.Sprintf(format, a...)) + } + }, + ) } diff --git a/dagger/env.go b/dagger/env.go index a7a12fab..6c3cd022 100644 --- a/dagger/env.go +++ b/dagger/env.go @@ -3,6 +3,7 @@ package dagger import ( "context" "fmt" + "time" "cuelang.org/go/cue" cueflow "cuelang.org/go/tools/flow" @@ -84,7 +85,7 @@ func (env *Env) SetInput(i *compiler.Value) error { // Update the base configuration func (env *Env) Update(ctx context.Context, s Solver) error { - p := NewPipeline(s, nil) + p := NewPipeline("[internal] source", s, nil) // execute updater script if err := p.Do(ctx, env.updater); err != nil { return err @@ -212,10 +213,6 @@ func (env *Env) Compute(ctx context.Context, s Solver) error { // Cueflow cue instance flowInst := env.state.CueInst() - lg. - Debug(). - Str("value", compiler.Wrap(flowInst.Value(), flowInst).JSON().String()). - Msg("walking") // Reset the output env.output = compiler.EmptyStruct() @@ -229,15 +226,13 @@ func (env *Env) Compute(ctx context.Context, s Solver) error { lg := lg. With(). - Str("path", t.Path().String()). + Str("component", t.Path().String()). Str("state", t.State().String()). Logger() - lg.Debug().Msg("cueflow task") if t.State() != cueflow.Terminated { return nil } - lg.Debug().Msg("cueflow task: filling result") // Merge task value into output var err error env.output, err = env.output.MergePath(t.Value(), t.Path()) @@ -245,7 +240,7 @@ func (env *Env) Compute(ctx context.Context, s Solver) error { lg. Error(). Err(err). - Msg("failed to fill script result") + Msg("failed to fill task result") return err } return nil @@ -284,10 +279,14 @@ func newPipelineTaskFunc(ctx context.Context, inst *cue.Instance, s Solver) cuef lg := log. Ctx(ctx). With(). - Str("path", t.Path().String()). + Str("component", t.Path().String()). Logger() ctx := lg.WithContext(ctx) + start := time.Now() + lg. + Info(). + Msg("computing") for _, dep := range t.Dependencies() { lg. Debug(). @@ -295,8 +294,21 @@ func newPipelineTaskFunc(ctx context.Context, inst *cue.Instance, s Solver) cuef Msg("dependency detected") } v := compiler.Wrap(t.Value(), inst) - p := NewPipeline(s, NewFillable(t)) - return p.Do(ctx, v) + p := NewPipeline(t.Path().String(), s, NewFillable(t)) + err := p.Do(ctx, v) + if err != nil { + lg. + Error(). + Dur("duration", time.Since(start)). + Err(err). + Msg("failed") + } else { + lg. + Info(). + Dur("duration", time.Since(start)). + Msg("completed") + } + return err }), nil } } diff --git a/dagger/fs.go b/dagger/fs.go index e2e75a63..44dc0a95 100644 --- a/dagger/fs.go +++ b/dagger/fs.go @@ -32,6 +32,7 @@ func (fs FS) WriteValueJSON(filename string, v *compiler.Value) FS { return fs.Change(func(st llb.State) llb.State { return st.File( llb.Mkfile(filename, 0600, v.JSON()), + llb.WithCustomName("[internal] serializing state to JSON"), ) }) } @@ -44,6 +45,7 @@ func (fs FS) WriteValueCUE(filename string, v *compiler.Value) (FS, error) { return fs.Change(func(st llb.State) llb.State { return st.File( llb.Mkfile(filename, 0600, src), + llb.WithCustomName("[internal] serializing state to CUE"), ) }), nil } diff --git a/dagger/pipeline.go b/dagger/pipeline.go index 799ee34b..84d31681 100644 --- a/dagger/pipeline.go +++ b/dagger/pipeline.go @@ -19,16 +19,18 @@ import ( // An execution pipeline type Pipeline struct { - s Solver - fs FS - out *Fillable + name string + s Solver + fs FS + out *Fillable } -func NewPipeline(s Solver, out *Fillable) *Pipeline { +func NewPipeline(name string, s Solver, out *Fillable) *Pipeline { return &Pipeline{ - s: s, - fs: s.Scratch(), - out: out, + name: name, + s: s, + fs: s.Scratch(), + out: out, } } @@ -169,10 +171,16 @@ func (p *Pipeline) doOp(ctx context.Context, op *compiler.Value) error { } } +func (p *Pipeline) vertexNamef(format string, a ...interface{}) string { + prefix := fmt.Sprintf("@%s@", p.name) + name := fmt.Sprintf(format, a...) + return prefix + " " + name +} + // Spawn a temporary pipeline with the same solver. // Output values are discarded: the parent pipeline's values are not modified. -func (p *Pipeline) Tmp() *Pipeline { - return NewPipeline(p.s, nil) +func (p *Pipeline) Tmp(name string) *Pipeline { + return NewPipeline(name, p.s, nil) } func (p *Pipeline) Subdir(ctx context.Context, op *compiler.Value) error { @@ -184,14 +192,17 @@ func (p *Pipeline) Subdir(ctx context.Context, op *compiler.Value) error { return err } p.fs = p.fs.Change(func(st llb.State) llb.State { - return st.File(llb.Copy( - p.fs.LLB(), - dir, - "/", - &llb.CopyInfo{ - CopyDirContentsOnly: true, - }, - )) + return st.File( + llb.Copy( + p.fs.LLB(), + dir, + "/", + &llb.CopyInfo{ + CopyDirContentsOnly: true, + }, + ), + llb.WithCustomName(p.vertexNamef("Subdir %s", dir)), + ) }) return nil } @@ -207,23 +218,26 @@ func (p *Pipeline) Copy(ctx context.Context, op *compiler.Value) error { return err } // Execute 'from' in a tmp pipeline, and use the resulting fs - from := p.Tmp() + from := p.Tmp(op.Get("from").Path().String()) if err := from.Do(ctx, op.Get("from")); err != nil { return err } p.fs = p.fs.Change(func(st llb.State) llb.State { - return st.File(llb.Copy( - from.FS().LLB(), - src, - dest, - // FIXME: allow more configurable llb options - // For now we define the following convenience presets: - &llb.CopyInfo{ - CopyDirContentsOnly: true, - CreateDestPath: true, - AllowWildcard: true, - }, - )) + return st.File( + llb.Copy( + from.FS().LLB(), + src, + dest, + // FIXME: allow more configurable llb options + // For now we define the following convenience presets: + &llb.CopyInfo{ + CopyDirContentsOnly: true, + CreateDestPath: true, + AllowWildcard: true, + }, + ), + llb.WithCustomName(p.vertexNamef("Copy %s %s", src, dest)), + ) }) return nil } @@ -240,13 +254,13 @@ func (p *Pipeline) Local(ctx context.Context, op *compiler.Value) error { } } - p.fs = p.fs.Change(func(st llb.State) llb.State { - return st.File(llb.Copy( - llb.Local(dir, llb.FollowPaths(include)), - "/", - "/", - )) - }) + p.fs = p.fs.Set( + llb.Local( + dir, + llb.FollowPaths(include), + llb.WithCustomName(p.vertexNamef("Local %s", dir)), + ), + ) return nil } @@ -262,9 +276,6 @@ func (p *Pipeline) Exec(ctx context.Context, op *compiler.Value) error { if err := op.Decode(&cmd); err != nil { return err } - // marker for status events - // FIXME - opts = append(opts, llb.WithCustomName(op.Path().String())) // args opts = append(opts, llb.Args(cmd.Args)) // dir @@ -290,6 +301,15 @@ func (p *Pipeline) Exec(ctx context.Context, op *compiler.Value) error { } opts = append(opts, mntOpts...) } + + // marker for status events + // FIXME + args := make([]string, 0, len(cmd.Args)) + for _, a := range cmd.Args { + args = append(args, fmt.Sprintf("%q", a)) + } + opts = append(opts, llb.WithCustomName(p.vertexNamef("Exec [%s]", strings.Join(args, ", ")))) + // --> Execute p.fs = p.fs.Change(func(st llb.State) llb.State { return st.Run(opts...).Root() @@ -334,7 +354,7 @@ func (p *Pipeline) mount(ctx context.Context, dest string, mnt *compiler.Value) } } // eg. mount: "/foo": { from: www.source } - from := p.Tmp() + from := p.Tmp(mnt.Get("from").Path().String()) if err := from.Do(ctx, mnt.Get("from")); err != nil { return nil, err } @@ -434,7 +454,7 @@ func unmarshalAnything(data []byte, fn unmarshaller) (interface{}, error) { func (p *Pipeline) Load(ctx context.Context, op *compiler.Value) error { // Execute 'from' in a tmp pipeline, and use the resulting fs - from := p.Tmp() + from := p.Tmp(op.Get("from").Path().String()) if err := from.Do(ctx, op.Get("from")); err != nil { return err } @@ -449,7 +469,9 @@ func (p *Pipeline) FetchContainer(ctx context.Context, op *compiler.Value) error return err } // FIXME: preserve docker image metadata - p.fs = p.fs.Set(llb.Image(ref)) + p.fs = p.fs.Set( + llb.Image(ref, llb.WithCustomName(p.vertexNamef("FetchContainer %s", ref))), + ) return nil } @@ -462,7 +484,9 @@ func (p *Pipeline) FetchGit(ctx context.Context, op *compiler.Value) error { if err != nil { return err } - p.fs = p.fs.Set(llb.Git(remote, ref)) + p.fs = p.fs.Set( + llb.Git(remote, ref, llb.WithCustomName(p.vertexNamef("FetchGit %s@%s", remote, ref))), + ) return nil } @@ -484,7 +508,7 @@ func (p *Pipeline) DockerBuild(ctx context.Context, op *compiler.Value) error { // docker build context. This can come from another component, so we need to // compute it first. if context.Exists() { - from := p.Tmp() + from := p.Tmp(op.Lookup("context").Path().String()) if err := from.Do(ctx, context); err != nil { return err } diff --git a/go.mod b/go.mod index c5130136..a236722c 100644 --- a/go.mod +++ b/go.mod @@ -7,16 +7,21 @@ require ( github.com/KromDaniel/jonson v0.0.0-20180630143114-d2f9c3c389db github.com/containerd/console v1.0.1 github.com/emicklei/proto v1.9.0 // indirect + github.com/jaguilar/vt100 v0.0.0-20150826170717-2703a27b14ea + github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db github.com/moby/buildkit v0.8.1 + github.com/morikuni/aec v1.0.0 github.com/opencontainers/go-digest v1.0.0 github.com/rs/zerolog v1.20.0 github.com/spf13/cobra v1.1.3 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.7.1 github.com/tonistiigi/fsutil v0.0.0-20201103201449-0834f99b7b85 + github.com/tonistiigi/units v0.0.0-20180711220420-6950e57a87ea golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect golang.org/x/term v0.0.0-20201117132131-f5c789dd3221 + golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 golang.org/x/tools v0.1.0 // indirect gopkg.in/yaml.v3 v3.0.0-20200506231410-2ff61e1afc86 ) diff --git a/go.sum b/go.sum index 34e79e67..44d242a9 100644 --- a/go.sum +++ b/go.sum @@ -609,6 +609,8 @@ github.com/maxbrunsfeld/counterfeiter/v6 v6.2.2/go.mod h1:eD9eIE7cdwcMi9rYluz88J github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b/go.mod h1:r1VsdOzOPt1ZSrGZWFoNhsAedKnEd6r9Np1+5blZCWk= @@ -743,6 +745,7 @@ github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.0-20190522114515-bc1a522cf7b1/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.0.5 h1:3+auTFlqw+ZaQYJARz6ArODtkaIwtvBTx3N2NehQlL8= github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/quasilyte/go-consistent v0.0.0-20190521200055-c6f3937de18c/go.mod h1:5STLWrekHfjyYwxBRVRXNOSewLJ3PWfDJd1VyTS21fI= diff --git a/pkg/progressui/display.go b/pkg/progressui/display.go new file mode 100644 index 00000000..55f269aa --- /dev/null +++ b/pkg/progressui/display.go @@ -0,0 +1,655 @@ +package progressui + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "sort" + "strconv" + "strings" + "time" + + "github.com/containerd/console" + "github.com/jaguilar/vt100" + "github.com/moby/buildkit/client" + "github.com/morikuni/aec" + digest "github.com/opencontainers/go-digest" + "github.com/tonistiigi/units" + "golang.org/x/time/rate" +) + +const ( + defaultTickerTimeout = 150 * time.Millisecond + defaultDisplayTimeout = 100 * time.Millisecond +) + +type VertexPrintFunc func(v *client.Vertex, index int) +type StatusPrintFunc func(v *client.Vertex, format string, a ...interface{}) +type LogPrintFunc func(v *client.Vertex, stream int, partial bool, format string, a ...interface{}) + +func PrintSolveStatus(ctx context.Context, ch chan *client.SolveStatus, vertexPrintCb VertexPrintFunc, statusPrintCb StatusPrintFunc, logPrintCb LogPrintFunc) error { + printer := &textMux{ + vertexPrintCb: vertexPrintCb, + statusPrintCb: statusPrintCb, + logPrintCb: logPrintCb, + } + + t := newTrace(false) + + var done bool + ticker := time.NewTicker(defaultTickerTimeout) + defer ticker.Stop() + + displayLimiter := rate.NewLimiter(rate.Every(defaultDisplayTimeout), 1) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + case ss, ok := <-ch: + if ok { + t.update(ss, 80) + } else { + done = true + } + } + + if done || displayLimiter.Allow() { + printer.print(t) + if done { + t.printErrorLogs(statusPrintCb) + return nil + } + ticker.Stop() + ticker = time.NewTicker(defaultTickerTimeout) + } + } +} + +func DisplaySolveStatus(ctx context.Context, phase string, c console.Console, w io.Writer, ch chan *client.SolveStatus) error { + modeConsole := c != nil + + if !modeConsole { + vertexPrintCb := func(v *client.Vertex, index int) { + if os.Getenv("PROGRESS_NO_TRUNC") == "0" { + fmt.Fprintf(w, "#%d %s\n", index, limitString(v.Name, 72)) + } else { + fmt.Fprintf(w, "#%d %s\n", index, v.Name) + fmt.Fprintf(w, "#%d %s\n", index, v.Digest) + } + } + statusPrintCb := func(v *client.Vertex, format string, a ...interface{}) { + fmt.Fprintf(w, fmt.Sprintf("%s\n", format), a...) + } + logPrintCb := func(v *client.Vertex, stream int, partial bool, format string, a ...interface{}) { + if partial { + fmt.Fprintf(w, format, a...) + } else { + fmt.Fprintf(w, fmt.Sprintf("%s\n", format), a...) + } + } + + return PrintSolveStatus(ctx, ch, vertexPrintCb, statusPrintCb, logPrintCb) + } + + disp := &display{c: c, phase: phase} + if disp.phase == "" { + disp.phase = "Building" + } + + t := newTrace(true) + + tickerTimeout := defaultTickerTimeout + displayTimeout := defaultDisplayTimeout + + if v := os.Getenv("TTY_DISPLAY_RATE"); v != "" { + if r, err := strconv.ParseInt(v, 10, 64); err == nil { + tickerTimeout = time.Duration(r) * time.Millisecond + displayTimeout = time.Duration(r) * time.Millisecond + } + } + + var done bool + ticker := time.NewTicker(tickerTimeout) + defer ticker.Stop() + + displayLimiter := rate.NewLimiter(rate.Every(displayTimeout), 1) + + var height int + width, _ := disp.getSize() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + case ss, ok := <-ch: + if ok { + t.update(ss, width) + } else { + done = true + } + } + + width, height = disp.getSize() + if done { + disp.print(t.displayInfo(), width, height, true) + t.printErrorLogs(func(v *client.Vertex, format string, a ...interface{}) { + fmt.Fprintf(w, format, a...) + }) + return nil + } else if displayLimiter.Allow() { + ticker.Stop() + ticker = time.NewTicker(tickerTimeout) + disp.print(t.displayInfo(), width, height, false) + } + } +} + +const termHeight = 6 +const termPad = 10 + +type displayInfo struct { + startTime time.Time + jobs []*job + countTotal int + countCompleted int +} + +type job struct { + startTime *time.Time + completedTime *time.Time + name string + status string + hasError bool + isCanceled bool + vertex *vertex + showTerm bool +} + +type trace struct { + localTimeDiff time.Duration + vertexes []*vertex + byDigest map[digest.Digest]*vertex + nextIndex int + updates map[digest.Digest]struct{} + modeConsole bool +} + +type log struct { + index int + line []byte + stream int +} + +type vertex struct { + *client.Vertex + statuses []*status + byID map[string]*status + indent string + index int + + logs []log + logsPartial bool + logsOffset int + prev *client.Vertex + events []string + lastBlockTime *time.Time + count int + statusUpdates map[string]struct{} + + jobs []*job + jobCached bool + + term *vt100.VT100 + termBytes int + termCount int +} + +func (v *vertex) update(c int) { + if v.count == 0 { + now := time.Now() + v.lastBlockTime = &now + } + v.count += c +} + +type status struct { + *client.VertexStatus +} + +func newTrace(modeConsole bool) *trace { + return &trace{ + byDigest: make(map[digest.Digest]*vertex), + updates: make(map[digest.Digest]struct{}), + modeConsole: modeConsole, + } +} + +func (t *trace) triggerVertexEvent(v *client.Vertex) { + if v.Started == nil { + return + } + + var old client.Vertex + vtx := t.byDigest[v.Digest] + if v := vtx.prev; v != nil { + old = *v + } + + changed := false + if v.Digest != old.Digest { + changed = true + } + if v.Name != old.Name { + changed = true + } + if v.Started != old.Started { + if v.Started != nil && old.Started == nil || !v.Started.Equal(*old.Started) { + changed = true + } + } + if v.Completed != old.Completed && v.Completed != nil { + changed = true + } + if v.Cached != old.Cached { + changed = true + } + if v.Error != old.Error { + changed = true + } + + if changed { + vtx.update(1) + t.updates[v.Digest] = struct{}{} + } + + t.byDigest[v.Digest].prev = v +} + +func (t *trace) update(s *client.SolveStatus, termWidth int) { + for _, v := range s.Vertexes { + prev, ok := t.byDigest[v.Digest] + if !ok { + t.nextIndex++ + t.byDigest[v.Digest] = &vertex{ + byID: make(map[string]*status), + statusUpdates: make(map[string]struct{}), + index: t.nextIndex, + } + if t.modeConsole { + t.byDigest[v.Digest].term = vt100.NewVT100(termHeight, termWidth-termPad) + } + } + t.triggerVertexEvent(v) + if v.Started != nil && (prev == nil || prev.Started == nil) { + if t.localTimeDiff == 0 { + t.localTimeDiff = time.Since(*v.Started) + } + t.vertexes = append(t.vertexes, t.byDigest[v.Digest]) + } + // allow a duplicate initial vertex that shouldn't reset state + if !(prev != nil && prev.Started != nil && v.Started == nil) { + t.byDigest[v.Digest].Vertex = v + } + t.byDigest[v.Digest].jobCached = false + } + for _, s := range s.Statuses { + v, ok := t.byDigest[s.Vertex] + if !ok { + continue // shouldn't happen + } + v.jobCached = false + prev, ok := v.byID[s.ID] + if !ok { + v.byID[s.ID] = &status{VertexStatus: s} + } + if s.Started != nil && (prev == nil || prev.Started == nil) { + v.statuses = append(v.statuses, v.byID[s.ID]) + } + v.byID[s.ID].VertexStatus = s + v.statusUpdates[s.ID] = struct{}{} + t.updates[v.Digest] = struct{}{} + v.update(1) + } + for _, l := range s.Logs { + l := l + v, ok := t.byDigest[l.Vertex] + if !ok { + continue // shouldn't happen + } + v.jobCached = false + if v.term != nil { + if v.term.Width != termWidth { + v.term.Resize(termHeight, termWidth-termPad) + } + v.termBytes += len(l.Data) + v.term.Write(l.Data) // error unhandled on purpose. don't trust vt100 + } + i := 0 + complete := split(l.Data, byte('\n'), func(dt []byte) { + if v.logsPartial && len(v.logs) != 0 && i == 0 && v.logs[len(v.logs)-1].stream == l.Stream { + v.logs[len(v.logs)-1].line = append(v.logs[len(v.logs)-1].line, dt...) + } else { + ts := time.Duration(0) + if v.Started != nil { + ts = l.Timestamp.Sub(*v.Started) + } + prec := 1 + sec := ts.Seconds() + if sec < 10 { + prec = 3 + } else if sec < 100 { + prec = 2 + } + v.logs = append(v.logs, log{ + line: []byte(fmt.Sprintf("#%d %s %s", v.index, fmt.Sprintf("%.[2]*[1]f", sec, prec), dt)), + stream: l.Stream, + index: v.index, + }) + } + i++ + }) + v.logsPartial = !complete + t.updates[v.Digest] = struct{}{} + v.update(1) + } +} + +func (t *trace) printErrorLogs(printCb StatusPrintFunc) { + for _, v := range t.vertexes { + if v.Error != "" && !strings.HasSuffix(v.Error, context.Canceled.Error()) { + printCb(v.Vertex, "------") + printCb(v.Vertex, " > %s:", v.Name) + for _, l := range v.logs { + printCb(v.Vertex, "%s", l.line) + } + printCb(v.Vertex, "------") + } + } +} + +func (t *trace) displayInfo() (d displayInfo) { + d.startTime = time.Now() + if t.localTimeDiff != 0 { + d.startTime = t.vertexes[0].Started.Add(t.localTimeDiff) + } + d.countTotal = len(t.byDigest) + for _, v := range t.byDigest { + if v.Completed != nil { + d.countCompleted++ + } + } + + for _, v := range t.vertexes { + if v.jobCached { + d.jobs = append(d.jobs, v.jobs...) + continue + } + var jobs []*job + j := &job{ + startTime: addTime(v.Started, t.localTimeDiff), + completedTime: addTime(v.Completed, t.localTimeDiff), + name: strings.ReplaceAll(v.Name, "\t", " "), + vertex: v, + } + if v.Error != "" { + if strings.HasSuffix(v.Error, context.Canceled.Error()) { + j.isCanceled = true + j.name = "CANCELED " + j.name + } else { + j.hasError = true + j.name = "ERROR " + j.name + } + } + if v.Cached { + j.name = "CACHED " + j.name + } + j.name = v.indent + j.name + jobs = append(jobs, j) + for _, s := range v.statuses { + j := &job{ + startTime: addTime(s.Started, t.localTimeDiff), + completedTime: addTime(s.Completed, t.localTimeDiff), + name: v.indent + "=> " + s.ID, + } + if s.Total != 0 { + j.status = fmt.Sprintf("%.2f / %.2f", units.Bytes(s.Current), units.Bytes(s.Total)) + } else if s.Current != 0 { + j.status = fmt.Sprintf("%.2f", units.Bytes(s.Current)) + } + jobs = append(jobs, j) + } + d.jobs = append(d.jobs, jobs...) + v.jobs = jobs + v.jobCached = true + } + + return d +} + +func split(dt []byte, sep byte, fn func([]byte)) bool { + if len(dt) == 0 { + return false + } + for { + if len(dt) == 0 { + return true + } + idx := bytes.IndexByte(dt, sep) + if idx == -1 { + fn(dt) + return false + } + fn(dt[:idx]) + dt = dt[idx+1:] + } +} + +func addTime(tm *time.Time, d time.Duration) *time.Time { + if tm == nil { + return nil + } + t := tm.Add(d) + return &t +} + +type display struct { + c console.Console + phase string + lineCount int + repeated bool +} + +func (disp *display) getSize() (int, int) { + width := 80 + height := 10 + if disp.c != nil { + size, err := disp.c.Size() + if err == nil && size.Width > 0 && size.Height > 0 { + width = int(size.Width) + height = int(size.Height) + } + } + return width, height +} + +func setupTerminals(jobs []*job, height int, all bool) []*job { + var candidates []*job + numInUse := 0 + for _, j := range jobs { + if j.vertex != nil && j.vertex.termBytes > 0 && j.completedTime == nil { + candidates = append(candidates, j) + } + if j.completedTime == nil { + numInUse++ + } + } + sort.Slice(candidates, func(i, j int) bool { + idxI := candidates[i].vertex.termBytes + candidates[i].vertex.termCount*50 + idxJ := candidates[j].vertex.termBytes + candidates[j].vertex.termCount*50 + return idxI > idxJ + }) + + numFree := height - 2 - numInUse + numToHide := 0 + termLimit := termHeight + 3 + + for i := 0; numFree > termLimit && i < len(candidates); i++ { + candidates[i].showTerm = true + numToHide += candidates[i].vertex.term.UsedHeight() + numFree -= termLimit + } + + if !all { + jobs = wrapHeight(jobs, height-2-numToHide) + } + + return jobs +} + +func (disp *display) print(d displayInfo, width, height int, all bool) { + // this output is inspired by Buck + d.jobs = setupTerminals(d.jobs, height, all) + b := aec.EmptyBuilder + for i := 0; i <= disp.lineCount; i++ { + b = b.Up(1) + } + if !disp.repeated { + b = b.Down(1) + } + disp.repeated = true + fmt.Fprint(disp.c, b.Column(0).ANSI) + + statusStr := "" + if d.countCompleted > 0 && d.countCompleted == d.countTotal && all { + statusStr = "FINISHED" + } + + fmt.Fprint(disp.c, aec.Hide) + defer fmt.Fprint(disp.c, aec.Show) + + out := fmt.Sprintf("[+] %s %.1fs (%d/%d) %s", disp.phase, time.Since(d.startTime).Seconds(), d.countCompleted, d.countTotal, statusStr) + out = align(out, "", width) + fmt.Fprintln(disp.c, out) + lineCount := 0 + for _, j := range d.jobs { + endTime := time.Now() + if j.completedTime != nil { + endTime = *j.completedTime + } + if j.startTime == nil { + continue + } + dt := endTime.Sub(*j.startTime).Seconds() + if dt < 0.05 { + dt = 0 + } + pfx := " => " + timer := fmt.Sprintf(" %3.1fs\n", dt) + status := j.status + showStatus := false + + left := width - len(pfx) - len(timer) - 1 + if status != "" { + if left+len(status) > 20 { + showStatus = true + left -= len(status) + 1 + } + } + if left < 12 { // too small screen to show progress + continue + } + name := j.name + if len(name) > left { + name = name[:left] + } + + out := pfx + name + if showStatus { + out += " " + status + } + + out = align(out, timer, width) + if j.completedTime != nil { + color := aec.BlueF + if j.isCanceled { + color = aec.YellowF + } else if j.hasError { + color = aec.RedF + } + out = aec.Apply(out, color) + } + fmt.Fprint(disp.c, out) + lineCount++ + if j.showTerm { + term := j.vertex.term + term.Resize(termHeight, width-termPad) + for _, l := range term.Content { + if !isEmpty(l) { + out := aec.Apply(fmt.Sprintf(" => => # %s\n", string(l)), aec.Faint) + fmt.Fprint(disp.c, out) + lineCount++ + } + } + j.vertex.termCount++ + j.showTerm = false + } + } + // override previous content + if diff := disp.lineCount - lineCount; diff > 0 { + for i := 0; i < diff; i++ { + fmt.Fprintln(disp.c, strings.Repeat(" ", width)) + } + fmt.Fprint(disp.c, aec.EmptyBuilder.Up(uint(diff)).Column(0).ANSI) + } + disp.lineCount = lineCount +} + +func isEmpty(l []rune) bool { + for _, r := range l { + if r != ' ' { + return false + } + } + return true +} + +func align(l, r string, w int) string { + return fmt.Sprintf("%-[2]*[1]s %[3]s", l, w-len(r)-1, r) +} + +func wrapHeight(j []*job, limit int) []*job { + if limit < 0 { + return nil + } + var wrapped []*job + wrapped = append(wrapped, j...) + if len(j) > limit { + wrapped = wrapped[len(j)-limit:] + + // wrap things around if incomplete jobs were cut + var invisible []*job + for _, j := range j[:len(j)-limit] { + if j.completedTime == nil { + invisible = append(invisible, j) + } + } + + if l := len(invisible); l > 0 { + rewrapped := make([]*job, 0, len(wrapped)) + for _, j := range wrapped { + if j.completedTime == nil || l <= 0 { + rewrapped = append(rewrapped, j) + } + l-- + } + freespace := len(wrapped) - len(rewrapped) + invisible = append(invisible[len(invisible)-freespace:], rewrapped...) + wrapped = invisible + } + } + return wrapped +} diff --git a/pkg/progressui/printer.go b/pkg/progressui/printer.go new file mode 100644 index 00000000..fe8cd9fd --- /dev/null +++ b/pkg/progressui/printer.go @@ -0,0 +1,286 @@ +package progressui + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + digest "github.com/opencontainers/go-digest" + "github.com/tonistiigi/units" +) + +const antiFlicker = 5 * time.Second +const maxDelay = 10 * time.Second +const minTimeDelta = 5 * time.Second +const minProgressDelta = 0.05 // % + +type lastStatus struct { + Current int64 + Timestamp time.Time +} + +type textMux struct { + vertexPrintCb VertexPrintFunc + statusPrintCb StatusPrintFunc + logPrintCb LogPrintFunc + + current digest.Digest + last map[string]lastStatus + notFirst bool +} + +func (p *textMux) printVtx(t *trace, dgst digest.Digest) { + if p.last == nil { + p.last = make(map[string]lastStatus) + } + + v, ok := t.byDigest[dgst] + if !ok { + return + } + + if dgst != p.current { + if p.current != "" { + old := t.byDigest[p.current] + if old.logsPartial { + p.statusPrintCb(v.Vertex, "") + } + old.logsOffset = 0 + old.count = 0 + p.statusPrintCb(v.Vertex, "#%d ...", old.index) + } + + if p.notFirst { + p.statusPrintCb(v.Vertex, "") + } else { + p.notFirst = true + } + + p.vertexPrintCb(v.Vertex, v.index) + } + + if len(v.events) != 0 { + v.logsOffset = 0 + } + for _, ev := range v.events { + p.statusPrintCb(v.Vertex, "#%d %s", v.index, ev) + } + v.events = v.events[:0] + + for _, s := range v.statuses { + if _, ok := v.statusUpdates[s.ID]; ok { + doPrint := true + + if last, ok := p.last[s.ID]; ok && s.Completed == nil { + var progressDelta float64 + if s.Total > 0 { + progressDelta = float64(s.Current-last.Current) / float64(s.Total) + } + timeDelta := s.Timestamp.Sub(last.Timestamp) + if progressDelta < minProgressDelta && timeDelta < minTimeDelta { + doPrint = false + } + } + + if !doPrint { + continue + } + + p.last[s.ID] = lastStatus{ + Timestamp: s.Timestamp, + Current: s.Current, + } + + var bytes string + if s.Total != 0 { + bytes = fmt.Sprintf(" %.2f / %.2f", units.Bytes(s.Current), units.Bytes(s.Total)) + } else if s.Current != 0 { + bytes = fmt.Sprintf(" %.2f", units.Bytes(s.Current)) + } + var tm string + endTime := s.Timestamp + if s.Completed != nil { + endTime = *s.Completed + } + if s.Started != nil { + diff := endTime.Sub(*s.Started).Seconds() + if diff > 0.01 { + tm = fmt.Sprintf(" %.1fs", diff) + } + } + if s.Completed != nil { + tm += " done" + } + p.statusPrintCb(v.Vertex, "#%d %s%s%s", v.index, s.ID, bytes, tm) + } + } + v.statusUpdates = map[string]struct{}{} + + for i, l := range v.logs { + line := l.line + if i == 0 { + line = line[v.logsOffset:] + } + complete := i != len(v.logs)-1 || !v.logsPartial + p.logPrintCb(v.Vertex, l.stream, !complete, "%s", line) + } + + if len(v.logs) > 0 { + if v.logsPartial { + v.logs = v.logs[len(v.logs)-1:] + v.logsOffset = len(v.logs[0].line) + } else { + v.logs = nil + v.logsOffset = 0 + } + } + + p.current = dgst + if v.Completed != nil { + p.current = "" + v.count = 0 + + if v.Error != "" { + if v.logsPartial { + p.statusPrintCb(v.Vertex, "") + } + if strings.HasSuffix(v.Error, context.Canceled.Error()) { + p.statusPrintCb(v.Vertex, "#%d CANCELED", v.index) + } else { + p.statusPrintCb(v.Vertex, "#%d ERROR: %s", v.index, v.Error) + } + } else if v.Cached { + p.statusPrintCb(v.Vertex, "#%d CACHED", v.index) + } else { + tm := "" + if v.Started != nil { + tm = fmt.Sprintf(" %.1fs", v.Completed.Sub(*v.Started).Seconds()) + } + p.statusPrintCb(v.Vertex, "#%d DONE%s", v.index, tm) + } + } + + delete(t.updates, dgst) +} + +func sortCompleted(t *trace, m map[digest.Digest]struct{}) []digest.Digest { + out := make([]digest.Digest, 0, len(m)) + for k := range m { + out = append(out, k) + } + sort.Slice(out, func(i, j int) bool { + return t.byDigest[out[i]].Completed.Before(*t.byDigest[out[j]].Completed) + }) + return out +} + +func (p *textMux) print(t *trace) { + completed := map[digest.Digest]struct{}{} + rest := map[digest.Digest]struct{}{} + + for dgst := range t.updates { + v, ok := t.byDigest[dgst] + if !ok { + continue + } + if v.Vertex.Completed != nil { + completed[dgst] = struct{}{} + } else { + rest[dgst] = struct{}{} + } + } + + current := p.current + + // items that have completed need to be printed first + if _, ok := completed[current]; ok { + p.printVtx(t, current) + } + + for _, dgst := range sortCompleted(t, completed) { + if dgst != current { + p.printVtx(t, dgst) + } + } + + if len(rest) == 0 { + if current != "" { + if v := t.byDigest[current]; v.Started != nil && v.Completed == nil { + return + } + } + // make any open vertex active + for dgst, v := range t.byDigest { + if v.Started != nil && v.Completed == nil { + p.printVtx(t, dgst) + return + } + } + return + } + + // now print the active one + if _, ok := rest[current]; ok { + p.printVtx(t, current) + } + + stats := map[digest.Digest]*vtxStat{} + now := time.Now() + sum := 0.0 + var max digest.Digest + if current != "" { + rest[current] = struct{}{} + } + for dgst := range rest { + v, ok := t.byDigest[dgst] + if !ok { + continue + } + tm := now.Sub(*v.lastBlockTime) + speed := float64(v.count) / tm.Seconds() + overLimit := tm > maxDelay && dgst != current + stats[dgst] = &vtxStat{blockTime: tm, speed: speed, overLimit: overLimit} + sum += speed + if overLimit || max == "" || stats[max].speed < speed { + max = dgst + } + } + for dgst := range stats { + stats[dgst].share = stats[dgst].speed / sum + } + + if _, ok := completed[current]; ok || current == "" { + p.printVtx(t, max) + return + } + + // show items that were hidden + for dgst := range rest { + if stats[dgst].overLimit { + p.printVtx(t, dgst) + return + } + } + + // fair split between vertexes + if 1.0/(1.0-stats[current].share)*antiFlicker.Seconds() < stats[current].blockTime.Seconds() { + p.printVtx(t, max) + return + } +} + +type vtxStat struct { + blockTime time.Duration + speed float64 + share float64 + overLimit bool +} + +func limitString(s string, l int) string { + if len(s) > l { + return s[:l] + "..." + } + return s +}