runtime: new execution engine

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2021-11-24 16:22:33 -08:00
parent cdcb09126b
commit 2a4db167e4
9 changed files with 356 additions and 3 deletions

View File

@ -36,6 +36,8 @@ func init() {
rootCmd.PersistentFlags().StringP("environment", "e", "", "Select an environment") rootCmd.PersistentFlags().StringP("environment", "e", "", "Select an environment")
rootCmd.PersistentFlags().String("project", "", "Specify a project directory (defaults to current)") rootCmd.PersistentFlags().String("project", "", "Specify a project directory (defaults to current)")
rootCmd.PersistentFlags().Bool("europa", false, "Enable experiemental Europa UX")
rootCmd.PersistentPreRun = func(cmd *cobra.Command, _ []string) { rootCmd.PersistentPreRun = func(cmd *cobra.Command, _ []string) {
lg := logger.New() lg := logger.New()
ctx := lg.WithContext(cmd.Context()) ctx := lg.WithContext(cmd.Context())

View File

@ -6,11 +6,13 @@ import (
"os" "os"
"cuelang.org/go/cue" "cuelang.org/go/cue"
"go.dagger.io/dagger/client"
"go.dagger.io/dagger/cmd/dagger/cmd/common" "go.dagger.io/dagger/cmd/dagger/cmd/common"
"go.dagger.io/dagger/cmd/dagger/cmd/output" "go.dagger.io/dagger/cmd/dagger/cmd/output"
"go.dagger.io/dagger/cmd/dagger/logger" "go.dagger.io/dagger/cmd/dagger/logger"
"go.dagger.io/dagger/compiler" "go.dagger.io/dagger/compiler"
"go.dagger.io/dagger/environment" "go.dagger.io/dagger/environment"
"go.dagger.io/dagger/plan"
"go.dagger.io/dagger/solver" "go.dagger.io/dagger/solver"
"golang.org/x/term" "golang.org/x/term"
@ -61,6 +63,18 @@ var upCmd = &cobra.Command{
cl := common.NewClient(ctx) cl := common.NewClient(ctx)
if viper.GetBool("europa") {
err = europaUp(ctx, cl, project.Path)
<-doneCh
if err != nil {
lg.Fatal().Err(err).Msg("failed to up environment")
}
return
}
env, err := environment.New(st) env, err := environment.New(st)
if err != nil { if err != nil {
lg.Fatal().Msg("unable to create environment") lg.Fatal().Msg("unable to create environment")
@ -97,6 +111,27 @@ var upCmd = &cobra.Command{
}, },
} }
func europaUp(ctx context.Context, cl *client.Client, path string) error {
lg := log.Ctx(ctx)
p, err := plan.Load(ctx, path, "")
if err != nil {
lg.Fatal().Err(err).Msg("failed to load plan")
}
localdirs, err := p.LocalDirectories()
if err != nil {
return err
}
return cl.Do(ctx, p.Context(), localdirs, func(ctx context.Context, s solver.Solver) error {
if err := p.Up(ctx, s); err != nil {
return err
}
return nil
})
}
func checkInputs(ctx context.Context, env *environment.Environment) error { func checkInputs(ctx context.Context, env *environment.Environment) error {
lg := log.Ctx(ctx) lg := log.Ctx(ctx)
warnOnly := viper.GetBool("force") warnOnly := viper.GetBool("force")

View File

@ -10,6 +10,28 @@ Dagger core types
import "alpha.dagger.io/dagger" import "alpha.dagger.io/dagger"
``` ```
## dagger.#Context
### dagger.#Context Inputs
_No input._
### dagger.#Context Outputs
_No output._
## dagger.#Plan
A deployment plan executed by `dagger up`
### dagger.#Plan Inputs
_No input._
### dagger.#Plan Outputs
_No output._
## dagger.#Secret ## dagger.#Secret
Secret value Secret value

173
plan/plan.go Normal file
View File

@ -0,0 +1,173 @@
package plan
import (
"context"
"fmt"
"path/filepath"
"strings"
"time"
"cuelang.org/go/cue"
cueflow "cuelang.org/go/tools/flow"
"github.com/rs/zerolog/log"
"go.dagger.io/dagger/compiler"
"go.dagger.io/dagger/environment"
"go.dagger.io/dagger/plan/task"
"go.dagger.io/dagger/plancontext"
"go.dagger.io/dagger/solver"
"go.dagger.io/dagger/state"
"go.opentelemetry.io/otel"
)
type Plan struct {
context *plancontext.Context
source *compiler.Value
}
func Load(ctx context.Context, path, pkg string) (*Plan, error) {
// FIXME: universe vendoring
if err := state.VendorUniverse(ctx, path); err != nil {
return nil, err
}
v, err := compiler.Build(path, nil, pkg)
if err != nil {
return nil, err
}
return &Plan{
context: plancontext.New(),
source: v,
}, nil
}
func (p *Plan) Context() *plancontext.Context {
return p.context
}
func (p *Plan) Source() *compiler.Value {
return p.source
}
// LocalDirectories scans the context for local imports.
// BuildKit requires to known the list of directories ahead of time.
func (p *Plan) LocalDirectories() (map[string]string, error) {
dirs := map[string]string{}
imports, err := p.source.Lookup("context.imports").Fields()
if err != nil {
return nil, err
}
for _, v := range imports {
dir, err := v.Value.Lookup("path").String()
if err != nil {
return nil, err
}
abs, err := filepath.Abs(dir)
if err != nil {
return nil, err
}
dirs[dir] = abs
}
return dirs, nil
}
// Up executes the plan
func (p *Plan) Up(ctx context.Context, s solver.Solver) error {
ctx, span := otel.Tracer("dagger").Start(ctx, "plan.Up")
defer span.End()
computed := compiler.NewValue()
flow := cueflow.New(
&cueflow.Config{},
p.source.Cue(),
newRunner(p.context, s, computed),
)
if err := flow.Run(ctx); err != nil {
return err
}
if src, err := computed.Source(); err == nil {
log.Ctx(ctx).Debug().Str("computed", string(src)).Msg("computed values")
}
// FIXME: canceling the context makes flow return `nil`
// Check explicitly if the context is canceled.
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}
func newRunner(pctx *plancontext.Context, s solver.Solver, computed *compiler.Value) cueflow.TaskFunc {
return func(flowVal cue.Value) (cueflow.Runner, error) {
v := compiler.Wrap(flowVal)
r, err := task.Lookup(v)
if err != nil {
// Not a task
if err == task.ErrNotTask {
return nil, nil
}
return nil, err
}
// Wrapper around `task.Run` that handles logging, tracing, etc.
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
ctx := t.Context()
lg := log.Ctx(ctx).With().Str("component", t.Path().String()).Logger()
ctx = lg.WithContext(ctx)
ctx, span := otel.Tracer("dagger").Start(ctx, fmt.Sprintf("compute: %s", t.Path().String()))
defer span.End()
lg.Info().Str("state", string(environment.StateComputing)).Msg(string(environment.StateComputing))
// Debug: dump dependencies
for _, dep := range t.Dependencies() {
lg.Debug().Str("dependency", dep.Path().String()).Msg("dependency detected")
}
start := time.Now()
result, err := r.Run(ctx, pctx, s, compiler.Wrap(t.Value()))
if err != nil {
// FIXME: this should use errdefs.IsCanceled(err)
if strings.Contains(err.Error(), "context canceled") {
lg.Error().Dur("duration", time.Since(start)).Str("state", string(environment.StateCanceled)).Msg(string(environment.StateCanceled))
} else {
lg.Error().Dur("duration", time.Since(start)).Err(err).Str("state", string(environment.StateFailed)).Msg(string(environment.StateFailed))
}
return err
}
lg.Info().Dur("duration", time.Since(start)).Str("state", string(environment.StateCompleted)).Msg(string(environment.StateCompleted))
// If the result is not concrete, there's nothing to merge.
if !result.IsConcrete() {
return nil
}
if src, err := result.Source(); err == nil {
lg.Debug().Str("result", string(src)).Msg("merging task result")
}
// Mirror task result in both `flow.Task` and `computed`
if err := t.Fill(result.Cue()); err != nil {
lg.Error().Err(err).Msg("failed to fill task")
return err
}
// Merge task value into computed
if err := computed.FillPath(t.Path(), result); err != nil {
lg.Error().Err(err).Msg("failed to fill plan")
return err
}
return nil
}), nil
}
}

68
plan/task/task.go Normal file
View File

@ -0,0 +1,68 @@
package task
import (
"context"
"errors"
"fmt"
"sync"
"cuelang.org/go/cue"
"go.dagger.io/dagger/compiler"
"go.dagger.io/dagger/environment"
"go.dagger.io/dagger/plancontext"
"go.dagger.io/dagger/solver"
)
var (
ErrNotTask = errors.New("not a task")
tasks sync.Map
typePath = cue.MakePath(cue.Hid("_type", "alpha.dagger.io/dagger"))
)
type NewFunc func() Task
type Task interface {
Run(ctx context.Context, pctx *plancontext.Context, s solver.Solver, v *compiler.Value) (*compiler.Value, error)
}
// Register a task type and initializer
func Register(typ string, f NewFunc) {
tasks.Store(typ, f)
}
// New creates a new Task of the given type.
func New(typ string) Task {
v, ok := tasks.Load(typ)
if !ok {
return nil
}
fn := v.(NewFunc)
return fn()
}
func Lookup(v *compiler.Value) (Task, error) {
// FIXME: legacy pipelines
if environment.IsComponent(v) {
return New("#up"), nil
}
if v.Kind() != cue.StructKind {
return nil, ErrNotTask
}
typ := v.LookupPath(typePath)
if !typ.Exists() {
return nil, ErrNotTask
}
typeString, err := typ.String()
if err != nil {
return nil, err
}
t := New(typeString)
if t == nil {
return nil, fmt.Errorf("unknown type %q", typeString)
}
return t, nil
}

View File

@ -40,3 +40,15 @@ func (c *directoryContext) List() []*Directory {
return directories return directories
} }
func (c *directoryContext) Paths() map[string]string {
c.l.RLock()
defer c.l.RUnlock()
directories := make(map[string]string)
for _, d := range c.store {
directories[d.Path] = d.Path
}
return directories
}

View File

@ -55,7 +55,7 @@ func Init(ctx context.Context, dir string) (*Project, error) {
return nil, err return nil, err
} }
if err := vendorUniverse(ctx, root); err != nil { if err := VendorUniverse(ctx, root); err != nil {
return nil, err return nil, err
} }
@ -390,7 +390,7 @@ func cueModInit(ctx context.Context, parentDir string) error {
return nil return nil
} }
func vendorUniverse(ctx context.Context, p string) error { func VendorUniverse(ctx context.Context, p string) error {
// ensure cue module is initialized // ensure cue module is initialized
if err := cueModInit(ctx, p); err != nil { if err := cueModInit(ctx, p); err != nil {
return err return err

View File

@ -55,7 +55,7 @@ func (s *State) CompilePlan(ctx context.Context) (*compiler.Value, error) {
// 2) For backward compatibility: if the project was `dagger // 2) For backward compatibility: if the project was `dagger
// init`-ed before we added support for vendoring universe, it might not // init`-ed before we added support for vendoring universe, it might not
// contain a `cue.mod`. // contain a `cue.mod`.
if err := vendorUniverse(ctx, w); err != nil { if err := VendorUniverse(ctx, w); err != nil {
return nil, err return nil, err
} }

41
stdlib/dagger/plan.cue Normal file
View File

@ -0,0 +1,41 @@
package dagger
// A deployment plan executed by `dagger up`
#Plan: {
context: #Context
actions: [string]: _
}
// FIXME: Platform spec here
#Platform: string
#Context: {
// Platform to target
platform?: #Platform
// Import directories
imports: [string]: {
_type: "Import"
path: string
include?: [...string]
exclude?: [...string]
fs: #Artifact
}
// Securely load external secrets
secrets: [string]: {
// Secrets can be securely mounted into action containers as a file
contents: #Secret
{
_type: "SecretFile"
// Read secret from a file
path: string
} | {
_type: "SecretEnv"
// Read secret from an environment variable ON THE CLIENT MACHINE
envvar: string
}
}
}