diff --git a/cmd/dagger/logger/plain.go b/cmd/dagger/logger/plain.go index b52b4b5e..bf97e8bc 100644 --- a/cmd/dagger/logger/plain.go +++ b/cmd/dagger/logger/plain.go @@ -23,6 +23,8 @@ type PlainOutput struct { Out io.Writer } +const systemGroup = "system" + func (c *PlainOutput) Write(p []byte) (int, error) { event := map[string]interface{}{} d := json.NewDecoder(bytes.NewReader(p)) @@ -117,7 +119,7 @@ func formatMessage(event map[string]interface{}) string { } func parseSource(event map[string]interface{}) string { - source := "system" + source := systemGroup if task, ok := event["task"].(string); ok && task != "" { source = task } diff --git a/cmd/dagger/logger/tty.go b/cmd/dagger/logger/tty.go index a1bb958a..05da3f55 100644 --- a/cmd/dagger/logger/tty.go +++ b/cmd/dagger/logger/tty.go @@ -43,8 +43,14 @@ func (l *Logs) Add(event Event) error { l.l.Lock() defer l.l.Unlock() + // same thing as in plain.go group all the non-identified tasks + // into a general group called system + source := systemGroup taskPath, ok := event["task"].(string) - if !ok { + + if ok && len(taskPath) > 0 { + source = taskPath + } else if !ok { l.Messages = append(l.Messages, Message{ Event: event, }) @@ -53,7 +59,7 @@ func (l *Logs) Add(event Event) error { } // Hide hidden fields (e.g. `._*`) from log group names - groupKey := strings.Split(taskPath, "._")[0] + groupKey := strings.Split(source, "._")[0] group := l.groups[groupKey] @@ -78,12 +84,20 @@ func (l *Logs) Add(event Event) error { // For each task in a group, the status will transition from computing to complete, then back to computing and so on. // The transition is fast enough not to cause a problem. if st, ok := event["state"].(string); ok { - group.State = task.State(st) - if group.State == task.StateComputing { - group.Completed = nil - } else { - now := time.Now() - group.Completed = &now + if t, err := task.ParseState(st); err != nil { + return err + // concurrent "system" tasks are the only exception to transition + // from another state to failed since we need to show the error to + // the user + } else if group.State.CanTransition(t) || + (group.Name == systemGroup && t == task.StateFailed) { + group.State = t + if group.State == task.StateComputing { + group.Completed = nil + } else { + now := time.Now() + group.Completed = &now + } } return nil @@ -262,52 +276,57 @@ func (c *TTYOutput) printLine(w io.Writer, event Event, width int) int { func (c *TTYOutput) printGroup(group *Group, width, maxLines int) int { lineCount := 0 - prefix := "" - switch group.State { - case task.StateComputing: - prefix = "[+]" - case task.StateCanceled: - prefix = "[✗]" - case task.StateFailed: - prefix = "[✗]" - case task.StateCompleted: - prefix = "[✔]" + var out string + // treat the "system" group as a special case as we don't + // want it to be displayed as an action in the output + if group.Name != systemGroup { + prefix := "" + switch group.State { + case task.StateComputing: + prefix = "[+]" + case task.StateCanceled: + prefix = "[✗]" + case task.StateFailed: + prefix = "[✗]" + case task.StateCompleted: + prefix = "[✔]" + } + + out = prefix + " " + group.Name + + endTime := time.Now() + if group.Completed != nil { + endTime = *group.Completed + } + + dt := endTime.Sub(*group.Started).Seconds() + if dt < 0.05 { + dt = 0 + } + timer := fmt.Sprintf("%3.1fs", dt) + + // align + out += strings.Repeat(" ", width-utf8.RuneCountInString(out)-len(timer)) + out += timer + out += "\n" + + // color + switch group.State { + case task.StateComputing: + out = aec.Apply(out, aec.LightBlueF) + case task.StateCanceled: + out = aec.Apply(out, aec.LightYellowF) + case task.StateFailed: + out = aec.Apply(out, aec.LightRedF) + case task.StateCompleted: + out = aec.Apply(out, aec.LightGreenF) + } + + // Print + fmt.Fprint(c.cons, out) + lineCount++ } - out := prefix + " " + group.Name - - endTime := time.Now() - if group.Completed != nil { - endTime = *group.Completed - } - - dt := endTime.Sub(*group.Started).Seconds() - if dt < 0.05 { - dt = 0 - } - timer := fmt.Sprintf("%3.1fs", dt) - - // align - out += strings.Repeat(" ", width-utf8.RuneCountInString(out)-len(timer)) - out += timer - out += "\n" - - // color - switch group.State { - case task.StateComputing: - out = aec.Apply(out, aec.LightBlueF) - case task.StateCanceled: - out = aec.Apply(out, aec.LightYellowF) - case task.StateFailed: - out = aec.Apply(out, aec.LightRedF) - case task.StateCompleted: - out = aec.Apply(out, aec.LightGreenF) - } - - // Print - fmt.Fprint(c.cons, out) - lineCount++ - printEvents := []Event{} switch group.State { case task.StateComputing: diff --git a/plan/runner.go b/plan/runner.go index a59e80ac..68382975 100644 --- a/plan/runner.go +++ b/plan/runner.go @@ -8,6 +8,7 @@ import ( "time" "go.dagger.io/dagger/compiler" + "go.dagger.io/dagger/plan/task" "go.dagger.io/dagger/plancontext" "go.dagger.io/dagger/solver" @@ -15,6 +16,7 @@ import ( "cuelang.org/go/cue" cueflow "cuelang.org/go/tools/flow" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "go.opentelemetry.io/otel" ) @@ -124,6 +126,15 @@ func (r *Runner) shouldRun(p cue.Path) bool { return ok } +func taskLog(tp string, log *zerolog.Logger, t task.Task, fn func(lg zerolog.Logger)) { + fn(log.With().Str("task", tp).Logger()) + // setup logger here + _, isDockerfileTask := t.(*task.DockerfileTask) + if isDockerfileTask { + fn(log.With().Str("task", "system").Logger()) + } +} + func (r *Runner) taskFunc(flowVal cue.Value) (cueflow.Runner, error) { v := compiler.Wrap(flowVal) handler, err := task.Lookup(v) @@ -142,31 +153,45 @@ func (r *Runner) taskFunc(flowVal cue.Value) (cueflow.Runner, error) { // Wrapper around `task.Run` that handles logging, tracing, etc. return cueflow.RunnerFunc(func(t *cueflow.Task) error { ctx := t.Context() - lg := log.Ctx(ctx).With().Str("task", t.Path().String()).Logger() + taskPath := t.Path().String() + lg := log.Ctx(ctx).With().Logger() ctx = lg.WithContext(ctx) ctx, span := otel.Tracer("dagger").Start(ctx, fmt.Sprintf("up: %s", t.Path().String())) defer span.End() - lg.Info().Str("state", string(task.StateComputing)).Msg(string(task.StateComputing)) + taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) { + lg.Info().Str("state", task.StateComputing.String()).Msg(task.StateComputing.String()) + }) // Debug: dump dependencies for _, dep := range t.Dependencies() { - lg.Debug().Str("dependency", dep.Path().String()).Msg("dependency detected") + taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) { + lg.Debug().Str("dependency", dep.Path().String()).Msg("dependency detected") + }) } start := time.Now() result, err := handler.Run(ctx, r.pctx, r.s, compiler.Wrap(t.Value())) if err != nil { // FIXME: this should use errdefs.IsCanceled(err) + + // we don't wrap taskLog here since in some cases, actions could still be + // running in goroutines which will scramble outputs. if strings.Contains(err.Error(), "context canceled") { - lg.Error().Dur("duration", time.Since(start)).Str("state", string(task.StateCanceled)).Msg(string(task.StateCanceled)) + taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) { + lg.Error().Dur("duration", time.Since(start)).Str("state", task.StateCanceled.String()).Msg(task.StateCanceled.String()) + }) } else { - lg.Error().Dur("duration", time.Since(start)).Err(compiler.Err(err)).Str("state", string(task.StateFailed)).Msg(string(task.StateFailed)) + taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) { + lg.Error().Dur("duration", time.Since(start)).Err(compiler.Err(err)).Str("state", task.StateFailed.String()).Msg(task.StateFailed.String()) + }) } return fmt.Errorf("%s: %w", t.Path().String(), compiler.Err(err)) } - lg.Info().Dur("duration", time.Since(start)).Str("state", string(task.StateCompleted)).Msg(string(task.StateCompleted)) + taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) { + lg.Info().Dur("duration", time.Since(start)).Str("state", task.StateCompleted.String()).Msg(task.StateCompleted.String()) + }) // If the result is not concrete (e.g. empty value), there's nothing to merge. if !result.IsConcrete() { @@ -174,7 +199,9 @@ func (r *Runner) taskFunc(flowVal cue.Value) (cueflow.Runner, error) { } if src, err := result.Source(); err == nil { - lg.Debug().Str("result", string(src)).Msg("merging task result") + taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) { + lg.Debug().Str("result", string(src)).Msg("merging task result") + }) } // Mirror task result and re-scan tasks that should run. @@ -184,7 +211,9 @@ func (r *Runner) taskFunc(flowVal cue.Value) (cueflow.Runner, error) { // } if err := t.Fill(result.Cue()); err != nil { - lg.Error().Err(err).Msg("failed to fill task") + taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) { + lg.Error().Err(err).Msg("failed to fill task") + }) return err } diff --git a/plan/task/dockerfile.go b/plan/task/dockerfile.go index 4a8f1712..107ac0cc 100644 --- a/plan/task/dockerfile.go +++ b/plan/task/dockerfile.go @@ -22,13 +22,12 @@ import ( ) func init() { - Register("Dockerfile", func() Task { return &dockerfileTask{} }) + Register("Dockerfile", func() Task { return &DockerfileTask{} }) } -type dockerfileTask struct { -} +type DockerfileTask struct{} -func (t *dockerfileTask) Run(ctx context.Context, pctx *plancontext.Context, s *solver.Solver, v *compiler.Value) (*compiler.Value, error) { +func (t *DockerfileTask) Run(ctx context.Context, pctx *plancontext.Context, s *solver.Solver, v *compiler.Value) (*compiler.Value, error) { lg := log.Ctx(ctx) auths, err := v.Lookup("auth").Fields() if err != nil { @@ -76,7 +75,7 @@ func (t *dockerfileTask) Run(ctx context.Context, pctx *plancontext.Context, s * } dockerfileDef, err = s.Marshal(ctx, llb.Scratch().File( - llb.Mkfile("/Dockerfile", 0644, []byte(contents)), + llb.Mkfile("/Dockerfile", 0o644, []byte(contents)), ), ) if err != nil { @@ -139,7 +138,7 @@ func (t *dockerfileTask) Run(ctx context.Context, pctx *plancontext.Context, s * }) } -func (t *dockerfileTask) dockerBuildOpts(v *compiler.Value, pctx *plancontext.Context) (map[string]string, error) { +func (t *DockerfileTask) dockerBuildOpts(v *compiler.Value, pctx *plancontext.Context) (map[string]string, error) { opts := map[string]string{} if dockerfilePath := v.Lookup("dockerfile.path"); dockerfilePath.Exists() { diff --git a/plan/task/task.go b/plan/task/task.go index 1682144f..49b2b062 100644 --- a/plan/task/task.go +++ b/plan/task/task.go @@ -28,13 +28,36 @@ var ( ) // State is the state of the task. -type State string +type State int8 + +func (s State) String() string { + return [...]string{"computing", "cancelled", "failed", "completed"}[s] +} + +func ParseState(s string) (State, error) { + switch s { + case "computing": + return StateComputing, nil + case "cancelled": + return StateCanceled, nil + case "failed": + return StateFailed, nil + case "completed": + return StateCompleted, nil + } + + return -1, fmt.Errorf("invalid state [%s]", s) +} + +func (s State) CanTransition(t State) bool { + return s == StateComputing && s <= t +} const ( - StateComputing = State("computing") - StateCanceled = State("canceled") - StateFailed = State("failed") - StateCompleted = State("completed") + StateComputing State = iota + StateCanceled + StateFailed + StateCompleted ) type NewFunc func() Task diff --git a/plancontext/fs.go b/plancontext/fs.go index 6740c73b..86092e80 100644 --- a/plancontext/fs.go +++ b/plancontext/fs.go @@ -12,12 +12,10 @@ import ( "go.dagger.io/dagger/pkg" ) -var ( - fsIDPath = cue.MakePath( - cue.Str("$dagger"), - cue.Str("fs"), - cue.Hid("_id", pkg.DaggerPackage), - ) +var fsIDPath = cue.MakePath( + cue.Str("$dagger"), + cue.Str("fs"), + cue.Hid("_id", pkg.DaggerPackage), ) func IsFSValue(v *compiler.Value) bool { diff --git a/util/progressui/display.go b/util/progressui/display.go index e68f1be9..c893876b 100644 --- a/util/progressui/display.go +++ b/util/progressui/display.go @@ -25,9 +25,11 @@ const ( defaultDisplayTimeout = 100 * time.Millisecond ) -type VertexPrintFunc func(v *client.Vertex, index int) -type StatusPrintFunc func(v *client.Vertex, format string, a ...interface{}) -type LogPrintFunc func(v *client.Vertex, stream int, partial bool, format string, a ...interface{}) +type ( + VertexPrintFunc func(v *client.Vertex, index int) + StatusPrintFunc func(v *client.Vertex, format string, a ...interface{}) + LogPrintFunc func(v *client.Vertex, stream int, partial bool, format string, a ...interface{}) +) func PrintSolveStatus(ctx context.Context, ch chan *client.SolveStatus, vertexPrintCb VertexPrintFunc, statusPrintCb StatusPrintFunc, logPrintCb LogPrintFunc) error { printer := &textMux{ @@ -148,8 +150,10 @@ func DisplaySolveStatus(ctx context.Context, phase string, c console.Console, w } } -const termHeight = 6 -const termPad = 10 +const ( + termHeight = 6 + termPad = 10 +) type displayInfo struct { startTime time.Time diff --git a/util/progressui/printer.go b/util/progressui/printer.go index fe8cd9fd..02fc9611 100644 --- a/util/progressui/printer.go +++ b/util/progressui/printer.go @@ -11,10 +11,12 @@ import ( "github.com/tonistiigi/units" ) -const antiFlicker = 5 * time.Second -const maxDelay = 10 * time.Second -const minTimeDelta = 5 * time.Second -const minProgressDelta = 0.05 // % +const ( + antiFlicker = 5 * time.Second + maxDelay = 10 * time.Second + minTimeDelta = 5 * time.Second + minProgressDelta = 0.05 // % +) type lastStatus struct { Current int64