27d878456f
* Improve tty error logging when buildkit vertex is unknown Creates a generic "system" group in the tty output which captures buildkit events that report a non-dagger vertex name. This happens currently when using core.#Dockerfile actions since Dagger delegates the LLB generation to buildkit through it's frontend and we don't get meaningful events that we can correlate from Dagger's side Signed-off-by: Marcos Lilljedahl <marcosnils@gmail.com>
257 lines
5.8 KiB
Go
257 lines
5.8 KiB
Go
package plan
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.dagger.io/dagger/compiler"
|
|
|
|
"go.dagger.io/dagger/plan/task"
|
|
"go.dagger.io/dagger/plancontext"
|
|
"go.dagger.io/dagger/solver"
|
|
|
|
"cuelang.org/go/cue"
|
|
cueflow "cuelang.org/go/tools/flow"
|
|
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/log"
|
|
"go.opentelemetry.io/otel"
|
|
)
|
|
|
|
type Runner struct {
|
|
pctx *plancontext.Context
|
|
target cue.Path
|
|
s *solver.Solver
|
|
tasks sync.Map
|
|
mirror *compiler.Value
|
|
l sync.Mutex
|
|
}
|
|
|
|
func NewRunner(pctx *plancontext.Context, target cue.Path, s *solver.Solver) *Runner {
|
|
return &Runner{
|
|
pctx: pctx,
|
|
target: target,
|
|
s: s,
|
|
mirror: compiler.NewValue(),
|
|
}
|
|
}
|
|
|
|
func (r *Runner) Run(ctx context.Context, src *compiler.Value) error {
|
|
if !src.LookupPath(r.target).Exists() {
|
|
return fmt.Errorf("%s not found", r.target.String())
|
|
}
|
|
|
|
if err := r.update(cue.MakePath(), src); err != nil {
|
|
return err
|
|
}
|
|
|
|
flow := cueflow.New(
|
|
&cueflow.Config{
|
|
FindHiddenTasks: true,
|
|
},
|
|
src.Cue(),
|
|
r.taskFunc,
|
|
)
|
|
|
|
if err := flow.Run(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (r *Runner) update(p cue.Path, v *compiler.Value) error {
|
|
r.l.Lock()
|
|
defer r.l.Unlock()
|
|
|
|
if err := r.mirror.FillPath(p, v); err != nil {
|
|
return err
|
|
}
|
|
r.initTasks()
|
|
return nil
|
|
}
|
|
|
|
func (r *Runner) initTasks() {
|
|
flow := cueflow.New(
|
|
&cueflow.Config{
|
|
FindHiddenTasks: true,
|
|
},
|
|
r.mirror.Cue(),
|
|
noOpRunner,
|
|
)
|
|
|
|
// Allow tasks under the target
|
|
for _, t := range flow.Tasks() {
|
|
if cuePathHasPrefix(t.Path(), r.target) {
|
|
r.addTask(t)
|
|
}
|
|
}
|
|
|
|
// If a `client` task is targeting an allowed task, allow the output task as well
|
|
for _, t := range flow.Tasks() {
|
|
if t.Path().Selectors()[0] != ClientSelector {
|
|
continue
|
|
}
|
|
for _, dep := range t.Dependencies() {
|
|
if r.shouldRun(dep.Path()) {
|
|
r.addTask(t)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Runner) addTask(t *cueflow.Task) {
|
|
// avoid circular dependencies
|
|
if _, ok := r.tasks.Load(t.Path().String()); ok {
|
|
return
|
|
}
|
|
|
|
r.tasks.Store(t.Path().String(), struct{}{})
|
|
|
|
for _, dep := range t.Dependencies() {
|
|
r.addTask(dep)
|
|
}
|
|
}
|
|
|
|
func (r *Runner) shouldRun(p cue.Path) bool {
|
|
_, ok := r.tasks.Load(p.String())
|
|
return ok
|
|
}
|
|
|
|
func taskLog(tp string, log *zerolog.Logger, t task.Task, fn func(lg zerolog.Logger)) {
|
|
fn(log.With().Str("task", tp).Logger())
|
|
// setup logger here
|
|
_, isDockerfileTask := t.(*task.DockerfileTask)
|
|
if isDockerfileTask {
|
|
fn(log.With().Str("task", "system").Logger())
|
|
}
|
|
}
|
|
|
|
func (r *Runner) taskFunc(flowVal cue.Value) (cueflow.Runner, error) {
|
|
v := compiler.Wrap(flowVal)
|
|
handler, err := task.Lookup(v)
|
|
if err != nil {
|
|
// Not a task
|
|
if err == task.ErrNotTask {
|
|
return nil, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
if !r.shouldRun(v.Path()) {
|
|
return nil, nil
|
|
}
|
|
|
|
// Wrapper around `task.Run` that handles logging, tracing, etc.
|
|
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
|
|
ctx := t.Context()
|
|
taskPath := t.Path().String()
|
|
lg := log.Ctx(ctx).With().Logger()
|
|
ctx = lg.WithContext(ctx)
|
|
ctx, span := otel.Tracer("dagger").Start(ctx, fmt.Sprintf("up: %s", t.Path().String()))
|
|
defer span.End()
|
|
|
|
taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) {
|
|
lg.Info().Str("state", task.StateComputing.String()).Msg(task.StateComputing.String())
|
|
})
|
|
|
|
// Debug: dump dependencies
|
|
for _, dep := range t.Dependencies() {
|
|
taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) {
|
|
lg.Debug().Str("dependency", dep.Path().String()).Msg("dependency detected")
|
|
})
|
|
}
|
|
|
|
start := time.Now()
|
|
result, err := handler.Run(ctx, r.pctx, r.s, compiler.Wrap(t.Value()))
|
|
if err != nil {
|
|
// FIXME: this should use errdefs.IsCanceled(err)
|
|
|
|
// we don't wrap taskLog here since in some cases, actions could still be
|
|
// running in goroutines which will scramble outputs.
|
|
if strings.Contains(err.Error(), "context canceled") {
|
|
taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) {
|
|
lg.Error().Dur("duration", time.Since(start)).Str("state", task.StateCanceled.String()).Msg(task.StateCanceled.String())
|
|
})
|
|
} else {
|
|
taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) {
|
|
lg.Error().Dur("duration", time.Since(start)).Err(compiler.Err(err)).Str("state", task.StateFailed.String()).Msg(task.StateFailed.String())
|
|
})
|
|
}
|
|
return fmt.Errorf("%s: %w", t.Path().String(), compiler.Err(err))
|
|
}
|
|
|
|
taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) {
|
|
lg.Info().Dur("duration", time.Since(start)).Str("state", task.StateCompleted.String()).Msg(task.StateCompleted.String())
|
|
})
|
|
|
|
// If the result is not concrete (e.g. empty value), there's nothing to merge.
|
|
if !result.IsConcrete() {
|
|
return nil
|
|
}
|
|
|
|
if src, err := result.Source(); err == nil {
|
|
taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) {
|
|
lg.Debug().Str("result", string(src)).Msg("merging task result")
|
|
})
|
|
}
|
|
|
|
// Mirror task result and re-scan tasks that should run.
|
|
// FIXME: This yields some structural cycle errors.
|
|
// if err := r.update(t.Path(), result); err != nil {
|
|
// return err
|
|
// }
|
|
|
|
if err := t.Fill(result.Cue()); err != nil {
|
|
taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) {
|
|
lg.Error().Err(err).Msg("failed to fill task")
|
|
})
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}), nil
|
|
}
|
|
|
|
func cuePathHasPrefix(p cue.Path, prefix cue.Path) bool {
|
|
pathSelectors := p.Selectors()
|
|
prefixSelectors := prefix.Selectors()
|
|
|
|
if len(pathSelectors) < len(prefixSelectors) {
|
|
return false
|
|
}
|
|
|
|
for i, sel := range prefixSelectors {
|
|
if pathSelectors[i] != sel {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func noOpRunner(flowVal cue.Value) (cueflow.Runner, error) {
|
|
v := compiler.Wrap(flowVal)
|
|
_, err := task.Lookup(v)
|
|
if err != nil {
|
|
// Not a task
|
|
if err == task.ErrNotTask {
|
|
return nil, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// Return a no op runner
|
|
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
|
|
return nil
|
|
}), nil
|
|
}
|