Merge pull request #1691 from aluzzardi/task-wl-poc

Fix task detection in dagger do
This commit is contained in:
Andrea Luzzardi 2022-03-07 16:58:03 -08:00 committed by GitHub
commit 2a6962ddc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 294 additions and 170 deletions

View File

@ -55,29 +55,24 @@ var doCmd = &cobra.Command{
ctx := lg.WithContext(cmd.Context()) ctx := lg.WithContext(cmd.Context())
cl := common.NewClient(ctx) cl := common.NewClient(ctx)
p, err := loadPlan(getTargetPath(args).String()) p, err := loadPlan()
if err != nil { if err != nil {
lg.Fatal().Err(err).Msg("failed to load plan") lg.Fatal().Err(err).Msg("failed to load plan")
} }
err = cl.Do(ctx, p.Context(), func(ctx context.Context, s solver.Solver) error { err = cl.Do(ctx, p.Context(), func(ctx context.Context, s solver.Solver) error {
_, err := p.Up(ctx, s) return p.Do(ctx, getTargetPath(args), s)
if err != nil {
return err
}
return nil
}) })
// FIXME: rework telemetry // FIXME: rework telemetry
if err != nil { if err != nil {
lg.Fatal().Err(err).Msg("failed to up environment") lg.Fatal().Err(err).Msg("failed to execute plan")
} }
}, },
} }
func loadPlan(target string) (*plan.Plan, error) { func loadPlan() (*plan.Plan, error) {
planPath := viper.GetString("plan") planPath := viper.GetString("plan")
// support only local filesystem paths // support only local filesystem paths
@ -95,26 +90,23 @@ func loadPlan(target string) (*plan.Plan, error) {
return plan.Load(context.Background(), plan.Config{ return plan.Load(context.Background(), plan.Config{
Args: []string{planPath}, Args: []string{planPath},
With: viper.GetStringSlice("with"), With: viper.GetStringSlice("with"),
Target: target,
Vendor: !viper.GetBool("no-vendor"), Vendor: !viper.GetBool("no-vendor"),
}) })
} }
func getTargetPath(args []string) cue.Path { func getTargetPath(args []string) cue.Path {
actionLookupArgs := []string{plan.ActionsPath} selectors := []cue.Selector{plan.ActionSelector}
actionLookupArgs = append(actionLookupArgs, args...) for _, arg := range args {
actionLookupSelectors := []cue.Selector{} selectors = append(selectors, cue.Str(arg))
for _, a := range actionLookupArgs {
actionLookupSelectors = append(actionLookupSelectors, cue.Str(a))
} }
return cue.MakePath(actionLookupSelectors...) return cue.MakePath(selectors...)
} }
func doHelp(cmd *cobra.Command, _ []string) { func doHelp(cmd *cobra.Command, _ []string) {
w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', tabwriter.StripEscape) w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', tabwriter.StripEscape)
defer w.Flush() defer w.Flush()
p, err := loadPlan("") p, err := loadPlan()
if err != nil { if err != nil {
fmt.Printf("%s", err) fmt.Printf("%s", err)
fmt.Fprintln(w, "failed to load plan") fmt.Fprintln(w, "failed to load plan")

View File

@ -2,9 +2,9 @@ package cmd
import ( import (
"context" "context"
"fmt"
"os" "os"
"cuelang.org/go/cue"
"go.dagger.io/dagger/client" "go.dagger.io/dagger/client"
"go.dagger.io/dagger/cmd/dagger/cmd/common" "go.dagger.io/dagger/cmd/dagger/cmd/common"
"go.dagger.io/dagger/cmd/dagger/logger" "go.dagger.io/dagger/cmd/dagger/logger"
@ -89,24 +89,7 @@ func europaUp(ctx context.Context, cl *client.Client, args ...string) error {
} }
return cl.Do(ctx, p.Context(), func(ctx context.Context, s solver.Solver) error { return cl.Do(ctx, p.Context(), func(ctx context.Context, s solver.Solver) error {
computed, err := p.Up(ctx, s) return p.Do(ctx, cue.ParsePath(viper.GetString("target")), s)
if err != nil {
return err
}
if output := viper.GetString("output"); output != "" {
data := computed.JSON().PrettyString()
if output == "-" {
fmt.Println(data)
return nil
}
err := os.WriteFile(output, []byte(data), 0600)
if err != nil {
lg.Fatal().Err(err).Str("path", output).Msg("failed to write output")
}
}
return nil
}) })
} }

View File

@ -3,8 +3,6 @@ package plan
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"time"
"cuelang.org/go/cue" "cuelang.org/go/cue"
cueflow "cuelang.org/go/tools/flow" cueflow "cuelang.org/go/tools/flow"
@ -17,8 +15,9 @@ import (
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
) )
const ( var (
ActionsPath = "actions" ActionSelector = cue.Str("actions")
ClientSelector = cue.Str("client")
) )
type Plan struct { type Plan struct {
@ -159,49 +158,19 @@ func (p *Plan) prepare(ctx context.Context) error {
return flow.Run(ctx) return flow.Run(ctx)
} }
// Up executes the plan // Do executes an action in the plan
func (p *Plan) Up(ctx context.Context, s solver.Solver) (*compiler.Value, error) { func (p *Plan) Do(ctx context.Context, path cue.Path, s solver.Solver) error {
ctx, span := otel.Tracer("dagger").Start(ctx, "plan.Up") ctx, span := otel.Tracer("dagger").Start(ctx, "plan.Up")
defer span.End() defer span.End()
computed := compiler.NewValue() r := NewRunner(p.context, path, s)
return r.Run(ctx, p.source)
cfg := &cueflow.Config{
FindHiddenTasks: true,
}
if p.config.Target != "" {
cfg.Root = cue.ParsePath(p.config.Target)
// The target may reference dependencies outside of the target path.
// InferTasks will include them in the workflow.
cfg.InferTasks = true
}
flow := cueflow.New(
cfg,
p.source.Cue(),
newRunner(p.context, s, computed),
)
if err := flow.Run(ctx); err != nil {
return nil, err
}
if src, err := computed.Source(); err == nil {
log.Ctx(ctx).Debug().Str("computed", string(src)).Msg("computed values")
}
// FIXME: canceling the context makes flow return `nil`
// Check explicitly if the context is canceled.
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return computed, nil
}
} }
func (p *Plan) fillAction() { func (p *Plan) fillAction() {
cfg := &cueflow.Config{ cfg := &cueflow.Config{
FindHiddenTasks: true, FindHiddenTasks: true,
Root: cue.ParsePath(ActionsPath), Root: cue.MakePath(ActionSelector),
} }
flow := cueflow.New( flow := cueflow.New(
@ -210,13 +179,13 @@ func (p *Plan) fillAction() {
noOpRunner, noOpRunner,
) )
actions := p.source.Lookup(ActionsPath) actions := p.source.LookupPath(cue.MakePath(ActionSelector))
actionsComment := "" actionsComment := ""
for _, cg := range actions.Doc() { for _, cg := range actions.Doc() {
actionsComment += cg.Text() actionsComment += cg.Text()
} }
p.action = &Action{ p.action = &Action{
ActionsPath, ActionSelector.String(),
false, false,
actions.Path(), actions.Path(),
actionsComment, actionsComment,
@ -252,87 +221,3 @@ func (p *Plan) fillAction() {
} }
} }
} }
func noOpRunner(flowVal cue.Value) (cueflow.Runner, error) {
v := compiler.Wrap(flowVal)
_, err := task.Lookup(v)
if err != nil {
// Not a task
if err == task.ErrNotTask {
return nil, nil
}
return nil, err
}
// Wrapper around `task.Run` that handles logging, tracing, etc.
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
return nil
}), nil
}
func newRunner(pctx *plancontext.Context, s solver.Solver, computed *compiler.Value) cueflow.TaskFunc {
return func(flowVal cue.Value) (cueflow.Runner, error) {
v := compiler.Wrap(flowVal)
r, err := task.Lookup(v)
if err != nil {
// Not a task
if err == task.ErrNotTask {
return nil, nil
}
return nil, err
}
// Wrapper around `task.Run` that handles logging, tracing, etc.
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
ctx := t.Context()
lg := log.Ctx(ctx).With().Str("task", t.Path().String()).Logger()
ctx = lg.WithContext(ctx)
ctx, span := otel.Tracer("dagger").Start(ctx, fmt.Sprintf("up: %s", t.Path().String()))
defer span.End()
lg.Info().Str("state", string(task.StateComputing)).Msg(string(task.StateComputing))
// Debug: dump dependencies
for _, dep := range t.Dependencies() {
lg.Debug().Str("dependency", dep.Path().String()).Msg("dependency detected")
}
start := time.Now()
result, err := r.Run(ctx, pctx, s, compiler.Wrap(t.Value()))
if err != nil {
// FIXME: this should use errdefs.IsCanceled(err)
if strings.Contains(err.Error(), "context canceled") {
lg.Error().Dur("duration", time.Since(start)).Str("state", string(task.StateCanceled)).Msg(string(task.StateCanceled))
} else {
lg.Error().Dur("duration", time.Since(start)).Err(err).Str("state", string(task.StateFailed)).Msg(string(task.StateFailed))
}
return fmt.Errorf("%s: %w", t.Path().String(), err)
}
lg.Info().Dur("duration", time.Since(start)).Str("state", string(task.StateCompleted)).Msg(string(task.StateCompleted))
// If the result is not concrete (e.g. empty value), there's nothing to merge.
if !result.IsConcrete() {
return nil
}
if src, err := result.Source(); err == nil {
lg.Debug().Str("result", string(src)).Msg("merging task result")
}
// Mirror task result in both `flow.Task` and `computed`
if err := t.Fill(result.Cue()); err != nil {
lg.Error().Err(err).Msg("failed to fill task")
return err
}
// Merge task value into computed
if err := computed.FillPath(t.Path(), result); err != nil {
lg.Error().Err(err).Msg("failed to fill plan")
return err
}
return nil
}), nil
}
}

218
plan/runner.go Normal file
View File

@ -0,0 +1,218 @@
package plan
import (
"context"
"fmt"
"strings"
"sync"
"time"
"go.dagger.io/dagger/compiler"
"go.dagger.io/dagger/plan/task"
"go.dagger.io/dagger/plancontext"
"go.dagger.io/dagger/solver"
"cuelang.org/go/cue"
cueflow "cuelang.org/go/tools/flow"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
)
type Runner struct {
pctx *plancontext.Context
target cue.Path
s solver.Solver
tasks sync.Map
mirror *compiler.Value
l sync.Mutex
}
func NewRunner(pctx *plancontext.Context, target cue.Path, s solver.Solver) *Runner {
return &Runner{
pctx: pctx,
target: target,
s: s,
mirror: compiler.NewValue(),
}
}
func (r *Runner) Run(ctx context.Context, src *compiler.Value) error {
if err := r.update(cue.MakePath(), src); err != nil {
return err
}
flow := cueflow.New(
&cueflow.Config{
FindHiddenTasks: true,
},
src.Cue(),
r.taskFunc,
)
if err := flow.Run(ctx); err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}
func (r *Runner) update(p cue.Path, v *compiler.Value) error {
r.l.Lock()
defer r.l.Unlock()
if err := r.mirror.FillPath(p, v); err != nil {
return err
}
r.initTasks()
return nil
}
func (r *Runner) initTasks() {
flow := cueflow.New(
&cueflow.Config{
FindHiddenTasks: true,
},
r.mirror.Cue(),
noOpRunner,
)
// Allow tasks under the target
for _, t := range flow.Tasks() {
if cuePathHasPrefix(t.Path(), r.target) {
r.addTask(t)
}
}
// If a `client` task is targeting an allowed task, allow the output task as well
for _, t := range flow.Tasks() {
if t.Path().Selectors()[0] != ClientSelector {
continue
}
for _, dep := range t.Dependencies() {
if r.shouldRun(dep.Path()) {
r.addTask(t)
}
}
}
}
func (r *Runner) addTask(t *cueflow.Task) {
r.tasks.Store(t.Path().String(), struct{}{})
for _, dep := range t.Dependencies() {
r.addTask(dep)
}
}
func (r *Runner) shouldRun(p cue.Path) bool {
_, ok := r.tasks.Load(p.String())
return ok
}
func (r *Runner) taskFunc(flowVal cue.Value) (cueflow.Runner, error) {
v := compiler.Wrap(flowVal)
handler, err := task.Lookup(v)
if err != nil {
// Not a task
if err == task.ErrNotTask {
return nil, nil
}
return nil, err
}
if !r.shouldRun(v.Path()) {
return nil, nil
}
// Wrapper around `task.Run` that handles logging, tracing, etc.
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
ctx := t.Context()
lg := log.Ctx(ctx).With().Str("task", t.Path().String()).Logger()
ctx = lg.WithContext(ctx)
ctx, span := otel.Tracer("dagger").Start(ctx, fmt.Sprintf("up: %s", t.Path().String()))
defer span.End()
lg.Info().Str("state", string(task.StateComputing)).Msg(string(task.StateComputing))
// Debug: dump dependencies
for _, dep := range t.Dependencies() {
lg.Debug().Str("dependency", dep.Path().String()).Msg("dependency detected")
}
start := time.Now()
result, err := handler.Run(ctx, r.pctx, r.s, compiler.Wrap(t.Value()))
if err != nil {
// FIXME: this should use errdefs.IsCanceled(err)
if strings.Contains(err.Error(), "context canceled") {
lg.Error().Dur("duration", time.Since(start)).Str("state", string(task.StateCanceled)).Msg(string(task.StateCanceled))
} else {
lg.Error().Dur("duration", time.Since(start)).Err(err).Str("state", string(task.StateFailed)).Msg(string(task.StateFailed))
}
return fmt.Errorf("%s: %w", t.Path().String(), err)
}
lg.Info().Dur("duration", time.Since(start)).Str("state", string(task.StateCompleted)).Msg(string(task.StateCompleted))
// If the result is not concrete (e.g. empty value), there's nothing to merge.
if !result.IsConcrete() {
return nil
}
if src, err := result.Source(); err == nil {
lg.Debug().Str("result", string(src)).Msg("merging task result")
}
// Mirror task result and re-scan tasks that should run.
// FIXME: This yields some structural cycle errors.
// if err := r.update(t.Path(), result); err != nil {
// return err
// }
if err := t.Fill(result.Cue()); err != nil {
lg.Error().Err(err).Msg("failed to fill task")
return err
}
return nil
}), nil
}
func cuePathHasPrefix(p cue.Path, prefix cue.Path) bool {
pathSelectors := p.Selectors()
prefixSelectors := prefix.Selectors()
if len(pathSelectors) < len(prefixSelectors) {
return false
}
for i, sel := range prefixSelectors {
if pathSelectors[i] != sel {
return false
}
}
return true
}
func noOpRunner(flowVal cue.Value) (cueflow.Runner, error) {
v := compiler.Wrap(flowVal)
_, err := task.Lookup(v)
if err != nil {
// Not a task
if err == task.ErrNotTask {
return nil, nil
}
return nil, err
}
// Return a no op runner
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
return nil
}), nil
}

View File

@ -8,23 +8,24 @@ dagger.#Plan & {
inputs: directories: test: path: string inputs: directories: test: path: string
actions: { actions: {
_readFile: dagger.#ReadFile & { // Test that file exists and contains correct content
exists: dagger.#ReadFile & {
input: inputs.directories.test.contents input: inputs.directories.test.contents
path: "test.txt" path: "test.txt"
}
// Test that file exists and contains correct content
exists: _readFile & {
contents: "local directory" contents: "local directory"
} }
// Test that file does NOT exist // Test that file does NOT exist
notExists: _readFile & { notExists: dagger.#ReadFile & {
input: inputs.directories.test.contents
path: "test.txt"
contents: "local directory" contents: "local directory"
} }
// Test that file exists and contains conflicting content // Test that file exists and contains conflicting content
conflictingValues: _readFile & { conflictingValues: dagger.#ReadFile & {
input: inputs.directories.test.contents
path: "test.txt"
contents: "local dfsadf" contents: "local dfsadf"
} }
} }

View File

@ -0,0 +1,45 @@
package main
import (
"dagger.io/dagger"
"universe.dagger.io/alpine"
"universe.dagger.io/bash"
)
dagger.#Plan & {
client: filesystem: "./test_do": write: contents: actions.test.one.export.files["/output.txt"]
actions: {
image: alpine.#Build & {
packages: bash: {}
}
test: {
one: bash.#Run & {
input: image.output
script: contents: "echo Hello World! > /output.txt"
export: files: "/output.txt": string
}
two: bash.#Run & {
input: image.output
script: contents: "true"
}
three: bash.#Run & {
input: image.output
script: contents: "cat /one/output.txt"
mounts: output: {
contents: one.export.rootfs
dest: "/one"
}
}
}
notMe: bash.#Run & {
input: image.output
script: contents: "false"
}
}
}