From 0642e34388a3bec576a77a244962a8dc1941cc73 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Mon, 7 Mar 2022 11:43:01 -0800 Subject: [PATCH 1/3] tests: fix inputs directories test Signed-off-by: Andrea Luzzardi --- tests/plan/inputs/directories/main.cue | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/plan/inputs/directories/main.cue b/tests/plan/inputs/directories/main.cue index ce485da4..f6e22e76 100644 --- a/tests/plan/inputs/directories/main.cue +++ b/tests/plan/inputs/directories/main.cue @@ -8,23 +8,24 @@ dagger.#Plan & { inputs: directories: test: path: string actions: { - _readFile: dagger.#ReadFile & { - input: inputs.directories.test.contents - path: "test.txt" - } - // Test that file exists and contains correct content - exists: _readFile & { + exists: dagger.#ReadFile & { + input: inputs.directories.test.contents + path: "test.txt" contents: "local directory" } // Test that file does NOT exist - notExists: _readFile & { + notExists: dagger.#ReadFile & { + input: inputs.directories.test.contents + path: "test.txt" contents: "local directory" } // Test that file exists and contains conflicting content - conflictingValues: _readFile & { + conflictingValues: dagger.#ReadFile & { + input: inputs.directories.test.contents + path: "test.txt" contents: "local dfsadf" } } From 0786410bbd27850e668c46f58d97f31871e5ce09 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Wed, 2 Mar 2022 18:33:15 -0800 Subject: [PATCH 2/3] Fix partial CUE tree run for dagger do Signed-off-by: Andrea Luzzardi --- cmd/dagger/cmd/do.go | 28 ++-- cmd/dagger/cmd/up.go | 21 +-- plan/plan.go | 135 ++------------------ plan/runner.go | 218 ++++++++++++++++++++++++++++++++ tests/plan/outputs/files/do.cue | 48 +++++++ 5 files changed, 288 insertions(+), 162 deletions(-) create mode 100644 plan/runner.go create mode 100644 tests/plan/outputs/files/do.cue diff --git a/cmd/dagger/cmd/do.go b/cmd/dagger/cmd/do.go index 37282d80..1f370366 100644 --- a/cmd/dagger/cmd/do.go +++ b/cmd/dagger/cmd/do.go @@ -55,29 +55,24 @@ var doCmd = &cobra.Command{ ctx := lg.WithContext(cmd.Context()) cl := common.NewClient(ctx) - p, err := loadPlan(getTargetPath(args).String()) + p, err := loadPlan() if err != nil { lg.Fatal().Err(err).Msg("failed to load plan") } err = cl.Do(ctx, p.Context(), func(ctx context.Context, s solver.Solver) error { - _, err := p.Up(ctx, s) - if err != nil { - return err - } - - return nil + return p.Do(ctx, getTargetPath(args), s) }) // FIXME: rework telemetry if err != nil { - lg.Fatal().Err(err).Msg("failed to up environment") + lg.Fatal().Err(err).Msg("failed to execute plan") } }, } -func loadPlan(target string) (*plan.Plan, error) { +func loadPlan() (*plan.Plan, error) { planPath := viper.GetString("plan") // support only local filesystem paths @@ -95,26 +90,23 @@ func loadPlan(target string) (*plan.Plan, error) { return plan.Load(context.Background(), plan.Config{ Args: []string{planPath}, With: viper.GetStringSlice("with"), - Target: target, Vendor: !viper.GetBool("no-vendor"), }) } func getTargetPath(args []string) cue.Path { - actionLookupArgs := []string{plan.ActionsPath} - actionLookupArgs = append(actionLookupArgs, args...) - actionLookupSelectors := []cue.Selector{} - for _, a := range actionLookupArgs { - actionLookupSelectors = append(actionLookupSelectors, cue.Str(a)) + selectors := []cue.Selector{plan.ActionSelector} + for _, arg := range args { + selectors = append(selectors, cue.Str(arg)) } - return cue.MakePath(actionLookupSelectors...) + return cue.MakePath(selectors...) } func doHelp(cmd *cobra.Command, _ []string) { w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', tabwriter.StripEscape) defer w.Flush() - p, err := loadPlan("") + p, err := loadPlan() if err != nil { fmt.Printf("%s", err) fmt.Fprintln(w, "failed to load plan") @@ -125,7 +117,7 @@ func doHelp(cmd *cobra.Command, _ []string) { actions := p.Action().FindByPath(actionLookupPath).Children fmt.Printf(`Execute a dagger action. - + %s Plan loaded from %s: diff --git a/cmd/dagger/cmd/up.go b/cmd/dagger/cmd/up.go index 8d8187a8..82b3c2b9 100644 --- a/cmd/dagger/cmd/up.go +++ b/cmd/dagger/cmd/up.go @@ -2,9 +2,9 @@ package cmd import ( "context" - "fmt" "os" + "cuelang.org/go/cue" "go.dagger.io/dagger/client" "go.dagger.io/dagger/cmd/dagger/cmd/common" "go.dagger.io/dagger/cmd/dagger/logger" @@ -89,24 +89,7 @@ func europaUp(ctx context.Context, cl *client.Client, args ...string) error { } return cl.Do(ctx, p.Context(), func(ctx context.Context, s solver.Solver) error { - computed, err := p.Up(ctx, s) - if err != nil { - return err - } - - if output := viper.GetString("output"); output != "" { - data := computed.JSON().PrettyString() - if output == "-" { - fmt.Println(data) - return nil - } - err := os.WriteFile(output, []byte(data), 0600) - if err != nil { - lg.Fatal().Err(err).Str("path", output).Msg("failed to write output") - } - } - - return nil + return p.Do(ctx, cue.ParsePath(viper.GetString("target")), s) }) } diff --git a/plan/plan.go b/plan/plan.go index c8b0ee82..e116c91c 100644 --- a/plan/plan.go +++ b/plan/plan.go @@ -3,8 +3,6 @@ package plan import ( "context" "fmt" - "strings" - "time" "cuelang.org/go/cue" cueflow "cuelang.org/go/tools/flow" @@ -17,8 +15,9 @@ import ( "go.opentelemetry.io/otel" ) -const ( - ActionsPath = "actions" +var ( + ActionSelector = cue.Str("actions") + OutputSelector = cue.Str("outputs") ) type Plan struct { @@ -159,49 +158,19 @@ func (p *Plan) prepare(ctx context.Context) error { return flow.Run(ctx) } -// Up executes the plan -func (p *Plan) Up(ctx context.Context, s solver.Solver) (*compiler.Value, error) { +// Do executes an action in the plan +func (p *Plan) Do(ctx context.Context, path cue.Path, s solver.Solver) error { ctx, span := otel.Tracer("dagger").Start(ctx, "plan.Up") defer span.End() - computed := compiler.NewValue() - - cfg := &cueflow.Config{ - FindHiddenTasks: true, - } - if p.config.Target != "" { - cfg.Root = cue.ParsePath(p.config.Target) - // The target may reference dependencies outside of the target path. - // InferTasks will include them in the workflow. - cfg.InferTasks = true - } - flow := cueflow.New( - cfg, - p.source.Cue(), - newRunner(p.context, s, computed), - ) - if err := flow.Run(ctx); err != nil { - return nil, err - } - - if src, err := computed.Source(); err == nil { - log.Ctx(ctx).Debug().Str("computed", string(src)).Msg("computed values") - } - - // FIXME: canceling the context makes flow return `nil` - // Check explicitly if the context is canceled. - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - return computed, nil - } + r := NewRunner(p.context, path, s) + return r.Run(ctx, p.source) } func (p *Plan) fillAction() { cfg := &cueflow.Config{ FindHiddenTasks: true, - Root: cue.ParsePath(ActionsPath), + Root: cue.MakePath(ActionSelector), } flow := cueflow.New( @@ -210,13 +179,13 @@ func (p *Plan) fillAction() { noOpRunner, ) - actions := p.source.Lookup(ActionsPath) + actions := p.source.LookupPath(cue.MakePath(ActionSelector)) actionsComment := "" for _, cg := range actions.Doc() { actionsComment += cg.Text() } p.action = &Action{ - ActionsPath, + ActionSelector.String(), false, actions.Path(), actionsComment, @@ -252,87 +221,3 @@ func (p *Plan) fillAction() { } } } - -func noOpRunner(flowVal cue.Value) (cueflow.Runner, error) { - v := compiler.Wrap(flowVal) - _, err := task.Lookup(v) - if err != nil { - // Not a task - if err == task.ErrNotTask { - return nil, nil - } - return nil, err - } - - // Wrapper around `task.Run` that handles logging, tracing, etc. - return cueflow.RunnerFunc(func(t *cueflow.Task) error { - return nil - }), nil -} - -func newRunner(pctx *plancontext.Context, s solver.Solver, computed *compiler.Value) cueflow.TaskFunc { - return func(flowVal cue.Value) (cueflow.Runner, error) { - v := compiler.Wrap(flowVal) - r, err := task.Lookup(v) - if err != nil { - // Not a task - if err == task.ErrNotTask { - return nil, nil - } - return nil, err - } - - // Wrapper around `task.Run` that handles logging, tracing, etc. - return cueflow.RunnerFunc(func(t *cueflow.Task) error { - ctx := t.Context() - lg := log.Ctx(ctx).With().Str("task", t.Path().String()).Logger() - ctx = lg.WithContext(ctx) - ctx, span := otel.Tracer("dagger").Start(ctx, fmt.Sprintf("up: %s", t.Path().String())) - defer span.End() - - lg.Info().Str("state", string(task.StateComputing)).Msg(string(task.StateComputing)) - - // Debug: dump dependencies - for _, dep := range t.Dependencies() { - lg.Debug().Str("dependency", dep.Path().String()).Msg("dependency detected") - } - - start := time.Now() - result, err := r.Run(ctx, pctx, s, compiler.Wrap(t.Value())) - if err != nil { - // FIXME: this should use errdefs.IsCanceled(err) - if strings.Contains(err.Error(), "context canceled") { - lg.Error().Dur("duration", time.Since(start)).Str("state", string(task.StateCanceled)).Msg(string(task.StateCanceled)) - } else { - lg.Error().Dur("duration", time.Since(start)).Err(err).Str("state", string(task.StateFailed)).Msg(string(task.StateFailed)) - } - return fmt.Errorf("%s: %w", t.Path().String(), err) - } - - lg.Info().Dur("duration", time.Since(start)).Str("state", string(task.StateCompleted)).Msg(string(task.StateCompleted)) - - // If the result is not concrete (e.g. empty value), there's nothing to merge. - if !result.IsConcrete() { - return nil - } - - if src, err := result.Source(); err == nil { - lg.Debug().Str("result", string(src)).Msg("merging task result") - } - - // Mirror task result in both `flow.Task` and `computed` - if err := t.Fill(result.Cue()); err != nil { - lg.Error().Err(err).Msg("failed to fill task") - return err - } - - // Merge task value into computed - if err := computed.FillPath(t.Path(), result); err != nil { - lg.Error().Err(err).Msg("failed to fill plan") - return err - } - - return nil - }), nil - } -} diff --git a/plan/runner.go b/plan/runner.go new file mode 100644 index 00000000..c3ec0d5b --- /dev/null +++ b/plan/runner.go @@ -0,0 +1,218 @@ +package plan + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "go.dagger.io/dagger/compiler" + "go.dagger.io/dagger/plan/task" + "go.dagger.io/dagger/plancontext" + "go.dagger.io/dagger/solver" + + "cuelang.org/go/cue" + cueflow "cuelang.org/go/tools/flow" + + "github.com/rs/zerolog/log" + "go.opentelemetry.io/otel" +) + +type Runner struct { + pctx *plancontext.Context + target cue.Path + s solver.Solver + tasks sync.Map + mirror *compiler.Value + l sync.Mutex +} + +func NewRunner(pctx *plancontext.Context, target cue.Path, s solver.Solver) *Runner { + return &Runner{ + pctx: pctx, + target: target, + s: s, + mirror: compiler.NewValue(), + } +} + +func (r *Runner) Run(ctx context.Context, src *compiler.Value) error { + if err := r.update(cue.MakePath(), src); err != nil { + return err + } + + flow := cueflow.New( + &cueflow.Config{ + FindHiddenTasks: true, + }, + src.Cue(), + r.taskFunc, + ) + + if err := flow.Run(ctx); err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } +} + +func (r *Runner) update(p cue.Path, v *compiler.Value) error { + r.l.Lock() + defer r.l.Unlock() + + if err := r.mirror.FillPath(p, v); err != nil { + return err + } + r.initTasks() + return nil +} + +func (r *Runner) initTasks() { + flow := cueflow.New( + &cueflow.Config{ + FindHiddenTasks: true, + }, + r.mirror.Cue(), + noOpRunner, + ) + + // Allow tasks under the target + for _, t := range flow.Tasks() { + if cuePathHasPrefix(t.Path(), r.target) { + r.addTask(t) + } + } + + // If an `output` task is targeting an allowed task, allow the output task as well + for _, t := range flow.Tasks() { + if t.Path().Selectors()[0] != OutputSelector { + continue + } + for _, dep := range t.Dependencies() { + if r.shouldRun(dep.Path()) { + r.addTask(t) + } + } + } +} + +func (r *Runner) addTask(t *cueflow.Task) { + r.tasks.Store(t.Path().String(), struct{}{}) + + for _, dep := range t.Dependencies() { + r.addTask(dep) + } +} + +func (r *Runner) shouldRun(p cue.Path) bool { + _, ok := r.tasks.Load(p.String()) + return ok +} + +func (r *Runner) taskFunc(flowVal cue.Value) (cueflow.Runner, error) { + v := compiler.Wrap(flowVal) + handler, err := task.Lookup(v) + if err != nil { + // Not a task + if err == task.ErrNotTask { + return nil, nil + } + return nil, err + } + + if !r.shouldRun(v.Path()) { + return nil, nil + } + + // Wrapper around `task.Run` that handles logging, tracing, etc. + return cueflow.RunnerFunc(func(t *cueflow.Task) error { + ctx := t.Context() + lg := log.Ctx(ctx).With().Str("task", t.Path().String()).Logger() + ctx = lg.WithContext(ctx) + ctx, span := otel.Tracer("dagger").Start(ctx, fmt.Sprintf("up: %s", t.Path().String())) + defer span.End() + + lg.Info().Str("state", string(task.StateComputing)).Msg(string(task.StateComputing)) + + // Debug: dump dependencies + for _, dep := range t.Dependencies() { + lg.Debug().Str("dependency", dep.Path().String()).Msg("dependency detected") + } + + start := time.Now() + result, err := handler.Run(ctx, r.pctx, r.s, compiler.Wrap(t.Value())) + if err != nil { + // FIXME: this should use errdefs.IsCanceled(err) + if strings.Contains(err.Error(), "context canceled") { + lg.Error().Dur("duration", time.Since(start)).Str("state", string(task.StateCanceled)).Msg(string(task.StateCanceled)) + } else { + lg.Error().Dur("duration", time.Since(start)).Err(err).Str("state", string(task.StateFailed)).Msg(string(task.StateFailed)) + } + return fmt.Errorf("%s: %w", t.Path().String(), err) + } + + lg.Info().Dur("duration", time.Since(start)).Str("state", string(task.StateCompleted)).Msg(string(task.StateCompleted)) + + // If the result is not concrete (e.g. empty value), there's nothing to merge. + if !result.IsConcrete() { + return nil + } + + if src, err := result.Source(); err == nil { + lg.Debug().Str("result", string(src)).Msg("merging task result") + } + + // Mirror task result and re-scan tasks that should run. + // FIXME: This yields some structural cycle errors. + // if err := r.update(t.Path(), result); err != nil { + // return err + // } + + if err := t.Fill(result.Cue()); err != nil { + lg.Error().Err(err).Msg("failed to fill task") + return err + } + + return nil + }), nil +} + +func cuePathHasPrefix(p cue.Path, prefix cue.Path) bool { + pathSelectors := p.Selectors() + prefixSelectors := prefix.Selectors() + + if len(pathSelectors) < len(prefixSelectors) { + return false + } + + for i, sel := range prefixSelectors { + if pathSelectors[i] != sel { + return false + } + } + + return true +} + +func noOpRunner(flowVal cue.Value) (cueflow.Runner, error) { + v := compiler.Wrap(flowVal) + _, err := task.Lookup(v) + if err != nil { + // Not a task + if err == task.ErrNotTask { + return nil, nil + } + return nil, err + } + + // Return a no op runner + return cueflow.RunnerFunc(func(t *cueflow.Task) error { + return nil + }), nil +} diff --git a/tests/plan/outputs/files/do.cue b/tests/plan/outputs/files/do.cue new file mode 100644 index 00000000..56b5ebc6 --- /dev/null +++ b/tests/plan/outputs/files/do.cue @@ -0,0 +1,48 @@ +package main + +import ( + "dagger.io/dagger" + + "universe.dagger.io/alpine" + "universe.dagger.io/bash" +) + +dagger.#Plan & { + outputs: files: test: { + dest: "./test_do" + contents: actions.test.one.export.files["/output.txt"] + } + + actions: { + image: alpine.#Build & { + packages: bash: {} + } + + test: { + one: bash.#Run & { + input: image.output + script: contents: "echo Hello World! > /output.txt" + export: files: "/output.txt": string + } + + two: bash.#Run & { + input: image.output + script: contents: "true" + } + + three: bash.#Run & { + input: image.output + script: contents: "cat /one/output.txt" + mounts: output: { + contents: one.export.rootfs + dest: "/one" + } + } + } + + notMe: bash.#Run & { + input: image.output + script: contents: "false" + } + } +} From b33b75a570d11d361d682f45828314823f79537d Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Mon, 7 Mar 2022 16:02:42 -0800 Subject: [PATCH 3/3] runner: switch to client API Signed-off-by: Andrea Luzzardi --- plan/plan.go | 2 +- plan/runner.go | 4 ++-- tests/plan/outputs/files/do.cue | 5 +---- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/plan/plan.go b/plan/plan.go index e116c91c..2b5c7702 100644 --- a/plan/plan.go +++ b/plan/plan.go @@ -17,7 +17,7 @@ import ( var ( ActionSelector = cue.Str("actions") - OutputSelector = cue.Str("outputs") + ClientSelector = cue.Str("client") ) type Plan struct { diff --git a/plan/runner.go b/plan/runner.go index c3ec0d5b..928a0b2d 100644 --- a/plan/runner.go +++ b/plan/runner.go @@ -89,9 +89,9 @@ func (r *Runner) initTasks() { } } - // If an `output` task is targeting an allowed task, allow the output task as well + // If a `client` task is targeting an allowed task, allow the output task as well for _, t := range flow.Tasks() { - if t.Path().Selectors()[0] != OutputSelector { + if t.Path().Selectors()[0] != ClientSelector { continue } for _, dep := range t.Dependencies() { diff --git a/tests/plan/outputs/files/do.cue b/tests/plan/outputs/files/do.cue index 56b5ebc6..e71c4f63 100644 --- a/tests/plan/outputs/files/do.cue +++ b/tests/plan/outputs/files/do.cue @@ -8,10 +8,7 @@ import ( ) dagger.#Plan & { - outputs: files: test: { - dest: "./test_do" - contents: actions.test.one.export.files["/output.txt"] - } + client: filesystem: "./test_do": write: contents: actions.test.one.export.files["/output.txt"] actions: { image: alpine.#Build & {