diff --git a/dagger/client.go b/dagger/client.go index 191086bf..3f808bc8 100644 --- a/dagger/client.go +++ b/dagger/client.go @@ -162,10 +162,7 @@ func (c *Client) outputfn(ctx context.Context, r io.Reader) (*compiler.Value, er lg := log.Ctx(ctx) // FIXME: merge this into env output. - out, err := compiler.EmptyStruct() - if err != nil { - return nil, err - } + out := compiler.EmptyStruct() tr := tar.NewReader(r) for { diff --git a/dagger/compiler/compiler.go b/dagger/compiler/compiler.go index cfa39434..a3f6194a 100644 --- a/dagger/compiler/compiler.go +++ b/dagger/compiler/compiler.go @@ -17,7 +17,7 @@ func Compile(name string, src interface{}) (*Value, error) { return DefaultCompiler.Compile(name, src) } -func EmptyStruct() (*Value, error) { +func EmptyStruct() *Value { return DefaultCompiler.EmptyStruct() } @@ -63,8 +63,12 @@ func (c *Compiler) Cue() *cue.Runtime { } // Compile an empty struct -func (c *Compiler) EmptyStruct() (*Value, error) { - return c.Compile("", "") +func (c *Compiler) EmptyStruct() *Value { + empty, err := c.Compile("", "") + if err != nil { + panic(err) + } + return empty } func (c *Compiler) Compile(name string, src interface{}) (*Value, error) { diff --git a/dagger/compiler/compiler_test.go b/dagger/compiler/compiler_test.go index b60fcec9..37c298ee 100644 --- a/dagger/compiler/compiler_test.go +++ b/dagger/compiler/compiler_test.go @@ -38,14 +38,6 @@ func TestDefNotExist(t *testing.T) { } } -func TestSimple(t *testing.T) { - c := &Compiler{} - _, err := c.EmptyStruct() - if err != nil { - t.Fatal(err) - } -} - func TestJSON(t *testing.T) { c := &Compiler{} v, err := c.Compile("", `foo: hello: "world"`) diff --git a/dagger/compiler/value.go b/dagger/compiler/value.go index 28984554..f5c8ee85 100644 --- a/dagger/compiler/value.go +++ b/dagger/compiler/value.go @@ -67,6 +67,11 @@ func (v *Value) Len() cue.Value { return v.val.Len() } +// Proxy function to the underlying cue.Value +func (v *Value) Kind() cue.Kind { + return v.val.Kind() +} + // Proxy function to the underlying cue.Value func (v *Value) Fields() (*cue.Iterator, error) { return v.val.Fields() diff --git a/dagger/env.go b/dagger/env.go index ecb89082..a7a12fab 100644 --- a/dagger/env.go +++ b/dagger/env.go @@ -51,15 +51,14 @@ func (env *Env) SetUpdater(v *compiler.Value) error { } func NewEnv() (*Env, error) { - empty, err := compiler.EmptyStruct() - if err != nil { - return nil, err - } + empty := compiler.EmptyStruct() env := &Env{ base: empty, input: empty, output: empty, - state: empty, + } + if err := env.mergeState(); err != nil { + return nil, err } if err := env.SetUpdater(nil); err != nil { return nil, err @@ -77,17 +76,10 @@ func (env *Env) Input() *compiler.Value { func (env *Env) SetInput(i *compiler.Value) error { if i == nil { - var err error - i, err = compiler.EmptyStruct() - if err != nil { - return err - } + i = compiler.EmptyStruct() } - return env.set( - env.base, - i, - env.output, - ) + env.input = i + return env.mergeState() } // Update the base configuration @@ -104,12 +96,9 @@ func (env *Env) Update(ctx context.Context, s Solver) error { if err != nil { return fmt.Errorf("base config: %w", err) } + env.base = base // Commit - return env.set( - base, - env.input, - env.output, - ) + return env.mergeState() } func (env *Env) Base() *compiler.Value { @@ -147,52 +136,48 @@ func (env *Env) LocalDirs() map[string]string { ) } // 1. Scan the environment state - env.State().Walk( - func(v *compiler.Value) bool { - compute := v.Get("#dagger.compute") - if !compute.Exists() { - // No compute script - return true - } - localdirs(compute) - return false // no nested executables - }, - nil, - ) + // FIXME: use a common `flow` instance to avoid rescanning the tree. + inst := env.state.CueInst() + flow := cueflow.New(&cueflow.Config{}, inst, newDummyTaskFunc(inst)) + for _, t := range flow.Tasks() { + v := compiler.Wrap(t.Value(), inst) + localdirs(v.Get("#dagger.compute")) + } // 2. Scan the environment updater localdirs(env.Updater()) return dirs } // FIXME: this is just a 3-way merge. Add var args to compiler.Value.Merge. -func (env *Env) set(base, input, output *compiler.Value) (err error) { +func (env *Env) mergeState() error { // FIXME: make this cleaner in *compiler.Value by keeping intermediary instances // FIXME: state.CueInst() must return an instance with the same // contents as state.v, for the purposes of cueflow. // That is not currently how *compiler.Value works, so we prepare the cue // instance manually. // --> refactor the compiler.Value API to do this for us. - stateInst := env.state.CueInst() + var ( + state = compiler.EmptyStruct() + stateInst = state.CueInst() + err error + ) - stateInst, err = stateInst.Fill(base.Cue()) + stateInst, err = stateInst.Fill(env.base.Cue()) if err != nil { return fmt.Errorf("merge base & input: %w", err) } - stateInst, err = stateInst.Fill(input.Cue()) + stateInst, err = stateInst.Fill(env.input.Cue()) if err != nil { return fmt.Errorf("merge base & input: %w", err) } - stateInst, err = stateInst.Fill(output.Cue()) + stateInst, err = stateInst.Fill(env.output.Cue()) if err != nil { return fmt.Errorf("merge output with base & input: %w", err) } - state := compiler.Wrap(stateInst.Value(), stateInst) + state = compiler.Wrap(stateInst.Value(), stateInst) // commit - env.base = base - env.input = input - env.output = output env.state = state return nil } @@ -217,15 +202,7 @@ func (env *Env) Export(fs FS) (FS, error) { // fs = env.output.SaveJSON(fs, "output.cue") // } // For now, export a single `state.cue` containing the combined output. - var err error - state := env.state - if env.output != nil { - state, err = state.Merge(env.output) - if err != nil { - return fs, err - } - } - fs = fs.WriteValueJSON("state.cue", state) + fs = fs.WriteValueJSON("state.cue", env.state) return fs, nil } @@ -240,11 +217,8 @@ func (env *Env) Compute(ctx context.Context, s Solver) error { Str("value", compiler.Wrap(flowInst.Value(), flowInst).JSON().String()). Msg("walking") - // Initialize empty output - output, err := compiler.EmptyStruct() - if err != nil { - return err - } + // Reset the output + env.output = compiler.EmptyStruct() // Cueflow config flowCfg := &cueflow.Config{ @@ -266,7 +240,7 @@ func (env *Env) Compute(ctx context.Context, s Solver) error { lg.Debug().Msg("cueflow task: filling result") // Merge task value into output var err error - output, err = output.MergePath(t.Value(), t.Path()) + env.output, err = env.output.MergePath(t.Value(), t.Path()) if err != nil { lg. Error(). @@ -277,21 +251,38 @@ func (env *Env) Compute(ctx context.Context, s Solver) error { return nil }, } - // Cueflow match func - flowMatchFn := func(flowVal cue.Value) (cueflow.Runner, error) { - v := compiler.Wrap(flowVal, flowInst) - compute := v.Get("#dagger.compute") - if !compute.Exists() { + // Orchestrate execution with cueflow + flow := cueflow.New(flowCfg, flowInst, newPipelineTaskFunc(ctx, flowInst, s)) + if err := flow.Run(ctx); err != nil { + return err + } + return env.mergeState() +} + +func newDummyTaskFunc(inst *cue.Instance) cueflow.TaskFunc { + return func(flowVal cue.Value) (cueflow.Runner, error) { + v := compiler.Wrap(flowVal, inst) + if !isComponent(v) { // No compute script return nil, nil } - if _, err := compute.List(); err != nil { - // invalid compute script - return nil, err + return cueflow.RunnerFunc(func(t *cueflow.Task) error { + return nil + }), nil + } +} + +func newPipelineTaskFunc(ctx context.Context, inst *cue.Instance, s Solver) cueflow.TaskFunc { + return func(flowVal cue.Value) (cueflow.Runner, error) { + v := compiler.Wrap(flowVal, inst) + if !isComponent(v) { + // No compute script + return nil, nil } // Cueflow run func: return cueflow.RunnerFunc(func(t *cueflow.Task) error { - lg := lg. + lg := log. + Ctx(ctx). With(). Str("path", t.Path().String()). Logger() @@ -303,19 +294,9 @@ func (env *Env) Compute(ctx context.Context, s Solver) error { Str("dependency", dep.Path().String()). Msg("dependency detected") } - v := compiler.Wrap(t.Value(), flowInst) + v := compiler.Wrap(t.Value(), inst) p := NewPipeline(s, NewFillable(t)) return p.Do(ctx, v) }), nil } - // Orchestrate execution with cueflow - flow := cueflow.New(flowCfg, flowInst, flowMatchFn) - if err := flow.Run(ctx); err != nil { - return err - } - return env.set( - env.base, - env.input, - output, - ) } diff --git a/dagger/pipeline.go b/dagger/pipeline.go index 8e5c4e73..a55d614b 100644 --- a/dagger/pipeline.go +++ b/dagger/pipeline.go @@ -31,28 +31,32 @@ func (p *Pipeline) FS() FS { return p.fs } +func isComponent(v *compiler.Value) bool { + return v.Get("#dagger.compute").Exists() +} + func ops(code ...*compiler.Value) ([]*compiler.Value, error) { ops := []*compiler.Value{} // 1. Decode 'code' into a single flat array of operations. for _, x := range code { // 1. attachment array - if xops, err := x.Get("#dagger.compute").List(); err == nil { + if isComponent(x) { + xops, err := x.Get("#dagger.compute").List() + if err != nil { + return nil, err + } // 'from' has an executable attached ops = append(ops, xops...) - continue - } - // 2. individual op - if _, err := x.Get("do").String(); err == nil { + // 2. individual op + } else if _, err := x.Get("do").String(); err == nil { ops = append(ops, x) - continue - } - // 3. op array - if xops, err := x.List(); err == nil { + // 3. op array + } else if xops, err := x.List(); err == nil { ops = append(ops, xops...) - continue + } else { + // 4. error + return nil, fmt.Errorf("not executable: %s", x.SourceUnsafe()) } - // 4. error - return nil, fmt.Errorf("not executable: %s", x.SourceUnsafe()) } return ops, nil }