package plan import ( "context" "fmt" "strings" "sync" "time" "go.dagger.io/dagger/compiler" "go.dagger.io/dagger/plan/task" "go.dagger.io/dagger/plancontext" "go.dagger.io/dagger/solver" "cuelang.org/go/cue" cueflow "cuelang.org/go/tools/flow" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "go.opentelemetry.io/otel" ) type Runner struct { pctx *plancontext.Context target cue.Path s *solver.Solver tasks sync.Map mirror *compiler.Value l sync.Mutex } func NewRunner(pctx *plancontext.Context, target cue.Path, s *solver.Solver) *Runner { return &Runner{ pctx: pctx, target: target, s: s, mirror: compiler.NewValue(), } } func (r *Runner) Run(ctx context.Context, src *compiler.Value) error { if !src.LookupPath(r.target).Exists() { return fmt.Errorf("%s not found", r.target.String()) } if err := r.update(cue.MakePath(), src); err != nil { return err } flow := cueflow.New( &cueflow.Config{ FindHiddenTasks: true, }, src.Cue(), r.taskFunc, ) if err := flow.Run(ctx); err != nil { return err } select { case <-ctx.Done(): return ctx.Err() default: return nil } } func (r *Runner) update(p cue.Path, v *compiler.Value) error { r.l.Lock() defer r.l.Unlock() if err := r.mirror.FillPath(p, v); err != nil { return err } r.initTasks() return nil } func (r *Runner) initTasks() { flow := cueflow.New( &cueflow.Config{ FindHiddenTasks: true, }, r.mirror.Cue(), noOpRunner, ) // Allow tasks under the target for _, t := range flow.Tasks() { if cuePathHasPrefix(t.Path(), r.target) { r.addTask(t) } } // If a `client` task is targeting an allowed task, allow the output task as well for _, t := range flow.Tasks() { if t.Path().Selectors()[0] != ClientSelector { continue } for _, dep := range t.Dependencies() { if r.shouldRun(dep.Path()) { r.addTask(t) } } } } func (r *Runner) addTask(t *cueflow.Task) { // avoid circular dependencies if _, ok := r.tasks.Load(t.Path().String()); ok { return } r.tasks.Store(t.Path().String(), struct{}{}) for _, dep := range t.Dependencies() { r.addTask(dep) } } func (r *Runner) shouldRun(p cue.Path) bool { _, ok := r.tasks.Load(p.String()) return ok } func taskLog(tp string, log *zerolog.Logger, t task.Task, fn func(lg zerolog.Logger)) { fn(log.With().Str("task", tp).Logger()) // setup logger here _, isDockerfileTask := t.(*task.DockerfileTask) if isDockerfileTask { fn(log.With().Str("task", "system").Logger()) } } func (r *Runner) taskFunc(flowVal cue.Value) (cueflow.Runner, error) { v := compiler.Wrap(flowVal) handler, err := task.Lookup(v) if err != nil { // Not a task if err == task.ErrNotTask { return nil, nil } return nil, err } if !r.shouldRun(v.Path()) { return nil, nil } // Wrapper around `task.Run` that handles logging, tracing, etc. return cueflow.RunnerFunc(func(t *cueflow.Task) error { ctx := t.Context() taskPath := t.Path().String() lg := log.Ctx(ctx).With().Logger() ctx = lg.WithContext(ctx) ctx, span := otel.Tracer("dagger").Start(ctx, fmt.Sprintf("up: %s", t.Path().String())) defer span.End() taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) { lg.Info().Str("state", task.StateComputing.String()).Msg(task.StateComputing.String()) }) // Debug: dump dependencies for _, dep := range t.Dependencies() { taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) { lg.Debug().Str("dependency", dep.Path().String()).Msg("dependency detected") }) } start := time.Now() result, err := handler.Run(ctx, r.pctx, r.s, compiler.Wrap(t.Value())) if err != nil { // FIXME: this should use errdefs.IsCanceled(err) // we don't wrap taskLog here since in some cases, actions could still be // running in goroutines which will scramble outputs. if strings.Contains(err.Error(), "context canceled") { taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) { lg.Error().Dur("duration", time.Since(start)).Str("state", task.StateCanceled.String()).Msg(task.StateCanceled.String()) }) } else { taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) { lg.Error().Dur("duration", time.Since(start)).Err(compiler.Err(err)).Str("state", task.StateFailed.String()).Msg(task.StateFailed.String()) }) } return fmt.Errorf("%s: %w", t.Path().String(), compiler.Err(err)) } taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) { lg.Info().Dur("duration", time.Since(start)).Str("state", task.StateCompleted.String()).Msg(task.StateCompleted.String()) }) // If the result is not concrete (e.g. empty value), there's nothing to merge. if !result.IsConcrete() { return nil } if src, err := result.Source(); err == nil { taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) { lg.Debug().Str("result", string(src)).Msg("merging task result") }) } // Mirror task result and re-scan tasks that should run. // FIXME: This yields some structural cycle errors. // if err := r.update(t.Path(), result); err != nil { // return err // } if err := t.Fill(result.Cue()); err != nil { taskLog(taskPath, &lg, handler, func(lg zerolog.Logger) { lg.Error().Err(err).Msg("failed to fill task") }) return err } return nil }), nil } func cuePathHasPrefix(p cue.Path, prefix cue.Path) bool { pathSelectors := p.Selectors() prefixSelectors := prefix.Selectors() if len(pathSelectors) < len(prefixSelectors) { return false } for i, sel := range prefixSelectors { if pathSelectors[i] != sel { return false } } return true } func noOpRunner(flowVal cue.Value) (cueflow.Runner, error) { v := compiler.Wrap(flowVal) _, err := task.Lookup(v) if err != nil { // Not a task if err == task.ErrNotTask { return nil, nil } return nil, err } // Return a no op runner return cueflow.RunnerFunc(func(t *cueflow.Task) error { return nil }), nil }