From 2a4db167e4a99c977c3836b2e3d161e417f5dae7 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Wed, 24 Nov 2021 16:22:33 -0800 Subject: [PATCH] runtime: new execution engine Signed-off-by: Andrea Luzzardi --- cmd/dagger/cmd/root.go | 2 + cmd/dagger/cmd/up.go | 35 +++++++ docs/reference/dagger/README.md | 22 ++++ plan/plan.go | 173 ++++++++++++++++++++++++++++++++ plan/task/task.go | 68 +++++++++++++ plancontext/directory.go | 12 +++ state/project.go | 4 +- state/state.go | 2 +- stdlib/dagger/plan.cue | 41 ++++++++ 9 files changed, 356 insertions(+), 3 deletions(-) create mode 100644 plan/plan.go create mode 100644 plan/task/task.go create mode 100644 stdlib/dagger/plan.cue diff --git a/cmd/dagger/cmd/root.go b/cmd/dagger/cmd/root.go index ee024878..e784927e 100644 --- a/cmd/dagger/cmd/root.go +++ b/cmd/dagger/cmd/root.go @@ -36,6 +36,8 @@ func init() { rootCmd.PersistentFlags().StringP("environment", "e", "", "Select an environment") rootCmd.PersistentFlags().String("project", "", "Specify a project directory (defaults to current)") + rootCmd.PersistentFlags().Bool("europa", false, "Enable experiemental Europa UX") + rootCmd.PersistentPreRun = func(cmd *cobra.Command, _ []string) { lg := logger.New() ctx := lg.WithContext(cmd.Context()) diff --git a/cmd/dagger/cmd/up.go b/cmd/dagger/cmd/up.go index aa24f3e6..debd0378 100644 --- a/cmd/dagger/cmd/up.go +++ b/cmd/dagger/cmd/up.go @@ -6,11 +6,13 @@ import ( "os" "cuelang.org/go/cue" + "go.dagger.io/dagger/client" "go.dagger.io/dagger/cmd/dagger/cmd/common" "go.dagger.io/dagger/cmd/dagger/cmd/output" "go.dagger.io/dagger/cmd/dagger/logger" "go.dagger.io/dagger/compiler" "go.dagger.io/dagger/environment" + "go.dagger.io/dagger/plan" "go.dagger.io/dagger/solver" "golang.org/x/term" @@ -61,6 +63,18 @@ var upCmd = &cobra.Command{ cl := common.NewClient(ctx) + if viper.GetBool("europa") { + err = europaUp(ctx, cl, project.Path) + + <-doneCh + + if err != nil { + lg.Fatal().Err(err).Msg("failed to up environment") + } + + return + } + env, err := environment.New(st) if err != nil { lg.Fatal().Msg("unable to create environment") @@ -97,6 +111,27 @@ var upCmd = &cobra.Command{ }, } +func europaUp(ctx context.Context, cl *client.Client, path string) error { + lg := log.Ctx(ctx) + + p, err := plan.Load(ctx, path, "") + if err != nil { + lg.Fatal().Err(err).Msg("failed to load plan") + } + + localdirs, err := p.LocalDirectories() + if err != nil { + return err + } + return cl.Do(ctx, p.Context(), localdirs, func(ctx context.Context, s solver.Solver) error { + if err := p.Up(ctx, s); err != nil { + return err + } + + return nil + }) +} + func checkInputs(ctx context.Context, env *environment.Environment) error { lg := log.Ctx(ctx) warnOnly := viper.GetBool("force") diff --git a/docs/reference/dagger/README.md b/docs/reference/dagger/README.md index 78a571c5..779f95f0 100644 --- a/docs/reference/dagger/README.md +++ b/docs/reference/dagger/README.md @@ -10,6 +10,28 @@ Dagger core types import "alpha.dagger.io/dagger" ``` +## dagger.#Context + +### dagger.#Context Inputs + +_No input._ + +### dagger.#Context Outputs + +_No output._ + +## dagger.#Plan + +A deployment plan executed by `dagger up` + +### dagger.#Plan Inputs + +_No input._ + +### dagger.#Plan Outputs + +_No output._ + ## dagger.#Secret Secret value diff --git a/plan/plan.go b/plan/plan.go new file mode 100644 index 00000000..b079228b --- /dev/null +++ b/plan/plan.go @@ -0,0 +1,173 @@ +package plan + +import ( + "context" + "fmt" + "path/filepath" + "strings" + "time" + + "cuelang.org/go/cue" + cueflow "cuelang.org/go/tools/flow" + "github.com/rs/zerolog/log" + "go.dagger.io/dagger/compiler" + "go.dagger.io/dagger/environment" + "go.dagger.io/dagger/plan/task" + "go.dagger.io/dagger/plancontext" + "go.dagger.io/dagger/solver" + "go.dagger.io/dagger/state" + "go.opentelemetry.io/otel" +) + +type Plan struct { + context *plancontext.Context + source *compiler.Value +} + +func Load(ctx context.Context, path, pkg string) (*Plan, error) { + // FIXME: universe vendoring + if err := state.VendorUniverse(ctx, path); err != nil { + return nil, err + } + + v, err := compiler.Build(path, nil, pkg) + if err != nil { + return nil, err + } + + return &Plan{ + context: plancontext.New(), + source: v, + }, nil +} + +func (p *Plan) Context() *plancontext.Context { + return p.context +} + +func (p *Plan) Source() *compiler.Value { + return p.source +} + +// LocalDirectories scans the context for local imports. +// BuildKit requires to known the list of directories ahead of time. +func (p *Plan) LocalDirectories() (map[string]string, error) { + dirs := map[string]string{} + + imports, err := p.source.Lookup("context.imports").Fields() + if err != nil { + return nil, err + } + + for _, v := range imports { + dir, err := v.Value.Lookup("path").String() + if err != nil { + return nil, err + } + abs, err := filepath.Abs(dir) + if err != nil { + return nil, err + } + + dirs[dir] = abs + } + + return dirs, nil +} + +// Up executes the plan +func (p *Plan) Up(ctx context.Context, s solver.Solver) error { + ctx, span := otel.Tracer("dagger").Start(ctx, "plan.Up") + defer span.End() + + computed := compiler.NewValue() + + flow := cueflow.New( + &cueflow.Config{}, + p.source.Cue(), + newRunner(p.context, s, computed), + ) + if err := flow.Run(ctx); err != nil { + return err + } + + if src, err := computed.Source(); err == nil { + log.Ctx(ctx).Debug().Str("computed", string(src)).Msg("computed values") + } + + // FIXME: canceling the context makes flow return `nil` + // Check explicitly if the context is canceled. + select { + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } +} + +func newRunner(pctx *plancontext.Context, s solver.Solver, computed *compiler.Value) cueflow.TaskFunc { + return func(flowVal cue.Value) (cueflow.Runner, error) { + v := compiler.Wrap(flowVal) + r, err := task.Lookup(v) + if err != nil { + // Not a task + if err == task.ErrNotTask { + return nil, nil + } + return nil, err + } + + // Wrapper around `task.Run` that handles logging, tracing, etc. + return cueflow.RunnerFunc(func(t *cueflow.Task) error { + ctx := t.Context() + lg := log.Ctx(ctx).With().Str("component", t.Path().String()).Logger() + ctx = lg.WithContext(ctx) + ctx, span := otel.Tracer("dagger").Start(ctx, fmt.Sprintf("compute: %s", t.Path().String())) + defer span.End() + + lg.Info().Str("state", string(environment.StateComputing)).Msg(string(environment.StateComputing)) + + // Debug: dump dependencies + for _, dep := range t.Dependencies() { + lg.Debug().Str("dependency", dep.Path().String()).Msg("dependency detected") + } + + start := time.Now() + result, err := r.Run(ctx, pctx, s, compiler.Wrap(t.Value())) + if err != nil { + // FIXME: this should use errdefs.IsCanceled(err) + if strings.Contains(err.Error(), "context canceled") { + lg.Error().Dur("duration", time.Since(start)).Str("state", string(environment.StateCanceled)).Msg(string(environment.StateCanceled)) + } else { + lg.Error().Dur("duration", time.Since(start)).Err(err).Str("state", string(environment.StateFailed)).Msg(string(environment.StateFailed)) + } + return err + } + + lg.Info().Dur("duration", time.Since(start)).Str("state", string(environment.StateCompleted)).Msg(string(environment.StateCompleted)) + + // If the result is not concrete, there's nothing to merge. + if !result.IsConcrete() { + return nil + } + + if src, err := result.Source(); err == nil { + lg.Debug().Str("result", string(src)).Msg("merging task result") + } + + // Mirror task result in both `flow.Task` and `computed` + if err := t.Fill(result.Cue()); err != nil { + lg.Error().Err(err).Msg("failed to fill task") + return err + } + + // Merge task value into computed + if err := computed.FillPath(t.Path(), result); err != nil { + lg.Error().Err(err).Msg("failed to fill plan") + return err + } + + return nil + }), nil + } +} diff --git a/plan/task/task.go b/plan/task/task.go new file mode 100644 index 00000000..784c5b9a --- /dev/null +++ b/plan/task/task.go @@ -0,0 +1,68 @@ +package task + +import ( + "context" + "errors" + "fmt" + "sync" + + "cuelang.org/go/cue" + "go.dagger.io/dagger/compiler" + "go.dagger.io/dagger/environment" + "go.dagger.io/dagger/plancontext" + "go.dagger.io/dagger/solver" +) + +var ( + ErrNotTask = errors.New("not a task") + tasks sync.Map + typePath = cue.MakePath(cue.Hid("_type", "alpha.dagger.io/dagger")) +) + +type NewFunc func() Task + +type Task interface { + Run(ctx context.Context, pctx *plancontext.Context, s solver.Solver, v *compiler.Value) (*compiler.Value, error) +} + +// Register a task type and initializer +func Register(typ string, f NewFunc) { + tasks.Store(typ, f) +} + +// New creates a new Task of the given type. +func New(typ string) Task { + v, ok := tasks.Load(typ) + if !ok { + return nil + } + fn := v.(NewFunc) + return fn() +} + +func Lookup(v *compiler.Value) (Task, error) { + // FIXME: legacy pipelines + if environment.IsComponent(v) { + return New("#up"), nil + } + + if v.Kind() != cue.StructKind { + return nil, ErrNotTask + } + + typ := v.LookupPath(typePath) + if !typ.Exists() { + return nil, ErrNotTask + } + + typeString, err := typ.String() + if err != nil { + return nil, err + } + + t := New(typeString) + if t == nil { + return nil, fmt.Errorf("unknown type %q", typeString) + } + return t, nil +} diff --git a/plancontext/directory.go b/plancontext/directory.go index 1bed1770..3011f682 100644 --- a/plancontext/directory.go +++ b/plancontext/directory.go @@ -40,3 +40,15 @@ func (c *directoryContext) List() []*Directory { return directories } + +func (c *directoryContext) Paths() map[string]string { + c.l.RLock() + defer c.l.RUnlock() + + directories := make(map[string]string) + for _, d := range c.store { + directories[d.Path] = d.Path + } + + return directories +} diff --git a/state/project.go b/state/project.go index 6c955fd2..db99b71d 100644 --- a/state/project.go +++ b/state/project.go @@ -55,7 +55,7 @@ func Init(ctx context.Context, dir string) (*Project, error) { return nil, err } - if err := vendorUniverse(ctx, root); err != nil { + if err := VendorUniverse(ctx, root); err != nil { return nil, err } @@ -390,7 +390,7 @@ func cueModInit(ctx context.Context, parentDir string) error { return nil } -func vendorUniverse(ctx context.Context, p string) error { +func VendorUniverse(ctx context.Context, p string) error { // ensure cue module is initialized if err := cueModInit(ctx, p); err != nil { return err diff --git a/state/state.go b/state/state.go index b68a878e..e68126e0 100644 --- a/state/state.go +++ b/state/state.go @@ -55,7 +55,7 @@ func (s *State) CompilePlan(ctx context.Context) (*compiler.Value, error) { // 2) For backward compatibility: if the project was `dagger // init`-ed before we added support for vendoring universe, it might not // contain a `cue.mod`. - if err := vendorUniverse(ctx, w); err != nil { + if err := VendorUniverse(ctx, w); err != nil { return nil, err } diff --git a/stdlib/dagger/plan.cue b/stdlib/dagger/plan.cue new file mode 100644 index 00000000..ec71aa9b --- /dev/null +++ b/stdlib/dagger/plan.cue @@ -0,0 +1,41 @@ +package dagger + +// A deployment plan executed by `dagger up` +#Plan: { + context: #Context + actions: [string]: _ +} + +// FIXME: Platform spec here +#Platform: string + +#Context: { + // Platform to target + platform?: #Platform + + // Import directories + imports: [string]: { + _type: "Import" + + path: string + include?: [...string] + exclude?: [...string] + fs: #Artifact + } + + // Securely load external secrets + secrets: [string]: { + // Secrets can be securely mounted into action containers as a file + contents: #Secret + + { + _type: "SecretFile" + // Read secret from a file + path: string + } | { + _type: "SecretEnv" + // Read secret from an environment variable ON THE CLIENT MACHINE + envvar: string + } + } +}