Merge pull request #132 from dagger/perf-fill

performance: reduce the number of fills
This commit is contained in:
Andrea Luzzardi 2021-02-19 14:34:36 -08:00 committed by GitHub
commit 3748bbc7db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 86 additions and 103 deletions

View File

@ -162,10 +162,7 @@ func (c *Client) outputfn(ctx context.Context, r io.Reader) (*compiler.Value, er
lg := log.Ctx(ctx) lg := log.Ctx(ctx)
// FIXME: merge this into env output. // FIXME: merge this into env output.
out, err := compiler.EmptyStruct() out := compiler.EmptyStruct()
if err != nil {
return nil, err
}
tr := tar.NewReader(r) tr := tar.NewReader(r)
for { for {

View File

@ -17,7 +17,7 @@ func Compile(name string, src interface{}) (*Value, error) {
return DefaultCompiler.Compile(name, src) return DefaultCompiler.Compile(name, src)
} }
func EmptyStruct() (*Value, error) { func EmptyStruct() *Value {
return DefaultCompiler.EmptyStruct() return DefaultCompiler.EmptyStruct()
} }
@ -63,8 +63,12 @@ func (c *Compiler) Cue() *cue.Runtime {
} }
// Compile an empty struct // Compile an empty struct
func (c *Compiler) EmptyStruct() (*Value, error) { func (c *Compiler) EmptyStruct() *Value {
return c.Compile("", "") empty, err := c.Compile("", "")
if err != nil {
panic(err)
}
return empty
} }
func (c *Compiler) Compile(name string, src interface{}) (*Value, error) { func (c *Compiler) Compile(name string, src interface{}) (*Value, error) {

View File

@ -38,14 +38,6 @@ func TestDefNotExist(t *testing.T) {
} }
} }
func TestSimple(t *testing.T) {
c := &Compiler{}
_, err := c.EmptyStruct()
if err != nil {
t.Fatal(err)
}
}
func TestJSON(t *testing.T) { func TestJSON(t *testing.T) {
c := &Compiler{} c := &Compiler{}
v, err := c.Compile("", `foo: hello: "world"`) v, err := c.Compile("", `foo: hello: "world"`)

View File

@ -67,6 +67,11 @@ func (v *Value) Len() cue.Value {
return v.val.Len() return v.val.Len()
} }
// Proxy function to the underlying cue.Value
func (v *Value) Kind() cue.Kind {
return v.val.Kind()
}
// Proxy function to the underlying cue.Value // Proxy function to the underlying cue.Value
func (v *Value) Fields() (*cue.Iterator, error) { func (v *Value) Fields() (*cue.Iterator, error) {
return v.val.Fields() return v.val.Fields()

View File

@ -51,15 +51,14 @@ func (env *Env) SetUpdater(v *compiler.Value) error {
} }
func NewEnv() (*Env, error) { func NewEnv() (*Env, error) {
empty, err := compiler.EmptyStruct() empty := compiler.EmptyStruct()
if err != nil {
return nil, err
}
env := &Env{ env := &Env{
base: empty, base: empty,
input: empty, input: empty,
output: empty, output: empty,
state: empty, }
if err := env.mergeState(); err != nil {
return nil, err
} }
if err := env.SetUpdater(nil); err != nil { if err := env.SetUpdater(nil); err != nil {
return nil, err return nil, err
@ -77,17 +76,10 @@ func (env *Env) Input() *compiler.Value {
func (env *Env) SetInput(i *compiler.Value) error { func (env *Env) SetInput(i *compiler.Value) error {
if i == nil { if i == nil {
var err error i = compiler.EmptyStruct()
i, err = compiler.EmptyStruct()
if err != nil {
return err
} }
} env.input = i
return env.set( return env.mergeState()
env.base,
i,
env.output,
)
} }
// Update the base configuration // Update the base configuration
@ -104,12 +96,9 @@ func (env *Env) Update(ctx context.Context, s Solver) error {
if err != nil { if err != nil {
return fmt.Errorf("base config: %w", err) return fmt.Errorf("base config: %w", err)
} }
env.base = base
// Commit // Commit
return env.set( return env.mergeState()
base,
env.input,
env.output,
)
} }
func (env *Env) Base() *compiler.Value { func (env *Env) Base() *compiler.Value {
@ -147,52 +136,48 @@ func (env *Env) LocalDirs() map[string]string {
) )
} }
// 1. Scan the environment state // 1. Scan the environment state
env.State().Walk( // FIXME: use a common `flow` instance to avoid rescanning the tree.
func(v *compiler.Value) bool { inst := env.state.CueInst()
compute := v.Get("#dagger.compute") flow := cueflow.New(&cueflow.Config{}, inst, newDummyTaskFunc(inst))
if !compute.Exists() { for _, t := range flow.Tasks() {
// No compute script v := compiler.Wrap(t.Value(), inst)
return true localdirs(v.Get("#dagger.compute"))
} }
localdirs(compute)
return false // no nested executables
},
nil,
)
// 2. Scan the environment updater // 2. Scan the environment updater
localdirs(env.Updater()) localdirs(env.Updater())
return dirs return dirs
} }
// FIXME: this is just a 3-way merge. Add var args to compiler.Value.Merge. // FIXME: this is just a 3-way merge. Add var args to compiler.Value.Merge.
func (env *Env) set(base, input, output *compiler.Value) (err error) { func (env *Env) mergeState() error {
// FIXME: make this cleaner in *compiler.Value by keeping intermediary instances // FIXME: make this cleaner in *compiler.Value by keeping intermediary instances
// FIXME: state.CueInst() must return an instance with the same // FIXME: state.CueInst() must return an instance with the same
// contents as state.v, for the purposes of cueflow. // contents as state.v, for the purposes of cueflow.
// That is not currently how *compiler.Value works, so we prepare the cue // That is not currently how *compiler.Value works, so we prepare the cue
// instance manually. // instance manually.
// --> refactor the compiler.Value API to do this for us. // --> refactor the compiler.Value API to do this for us.
stateInst := env.state.CueInst() var (
state = compiler.EmptyStruct()
stateInst = state.CueInst()
err error
)
stateInst, err = stateInst.Fill(base.Cue()) stateInst, err = stateInst.Fill(env.base.Cue())
if err != nil { if err != nil {
return fmt.Errorf("merge base & input: %w", err) return fmt.Errorf("merge base & input: %w", err)
} }
stateInst, err = stateInst.Fill(input.Cue()) stateInst, err = stateInst.Fill(env.input.Cue())
if err != nil { if err != nil {
return fmt.Errorf("merge base & input: %w", err) return fmt.Errorf("merge base & input: %w", err)
} }
stateInst, err = stateInst.Fill(output.Cue()) stateInst, err = stateInst.Fill(env.output.Cue())
if err != nil { if err != nil {
return fmt.Errorf("merge output with base & input: %w", err) return fmt.Errorf("merge output with base & input: %w", err)
} }
state := compiler.Wrap(stateInst.Value(), stateInst) state = compiler.Wrap(stateInst.Value(), stateInst)
// commit // commit
env.base = base
env.input = input
env.output = output
env.state = state env.state = state
return nil return nil
} }
@ -217,15 +202,7 @@ func (env *Env) Export(fs FS) (FS, error) {
// fs = env.output.SaveJSON(fs, "output.cue") // fs = env.output.SaveJSON(fs, "output.cue")
// } // }
// For now, export a single `state.cue` containing the combined output. // For now, export a single `state.cue` containing the combined output.
var err error fs = fs.WriteValueJSON("state.cue", env.state)
state := env.state
if env.output != nil {
state, err = state.Merge(env.output)
if err != nil {
return fs, err
}
}
fs = fs.WriteValueJSON("state.cue", state)
return fs, nil return fs, nil
} }
@ -240,11 +217,8 @@ func (env *Env) Compute(ctx context.Context, s Solver) error {
Str("value", compiler.Wrap(flowInst.Value(), flowInst).JSON().String()). Str("value", compiler.Wrap(flowInst.Value(), flowInst).JSON().String()).
Msg("walking") Msg("walking")
// Initialize empty output // Reset the output
output, err := compiler.EmptyStruct() env.output = compiler.EmptyStruct()
if err != nil {
return err
}
// Cueflow config // Cueflow config
flowCfg := &cueflow.Config{ flowCfg := &cueflow.Config{
@ -266,7 +240,7 @@ func (env *Env) Compute(ctx context.Context, s Solver) error {
lg.Debug().Msg("cueflow task: filling result") lg.Debug().Msg("cueflow task: filling result")
// Merge task value into output // Merge task value into output
var err error var err error
output, err = output.MergePath(t.Value(), t.Path()) env.output, err = env.output.MergePath(t.Value(), t.Path())
if err != nil { if err != nil {
lg. lg.
Error(). Error().
@ -277,21 +251,38 @@ func (env *Env) Compute(ctx context.Context, s Solver) error {
return nil return nil
}, },
} }
// Cueflow match func // Orchestrate execution with cueflow
flowMatchFn := func(flowVal cue.Value) (cueflow.Runner, error) { flow := cueflow.New(flowCfg, flowInst, newPipelineTaskFunc(ctx, flowInst, s))
v := compiler.Wrap(flowVal, flowInst) if err := flow.Run(ctx); err != nil {
compute := v.Get("#dagger.compute") return err
if !compute.Exists() { }
return env.mergeState()
}
func newDummyTaskFunc(inst *cue.Instance) cueflow.TaskFunc {
return func(flowVal cue.Value) (cueflow.Runner, error) {
v := compiler.Wrap(flowVal, inst)
if !isComponent(v) {
// No compute script // No compute script
return nil, nil return nil, nil
} }
if _, err := compute.List(); err != nil { return cueflow.RunnerFunc(func(t *cueflow.Task) error {
// invalid compute script return nil
return nil, err }), nil
}
}
func newPipelineTaskFunc(ctx context.Context, inst *cue.Instance, s Solver) cueflow.TaskFunc {
return func(flowVal cue.Value) (cueflow.Runner, error) {
v := compiler.Wrap(flowVal, inst)
if !isComponent(v) {
// No compute script
return nil, nil
} }
// Cueflow run func: // Cueflow run func:
return cueflow.RunnerFunc(func(t *cueflow.Task) error { return cueflow.RunnerFunc(func(t *cueflow.Task) error {
lg := lg. lg := log.
Ctx(ctx).
With(). With().
Str("path", t.Path().String()). Str("path", t.Path().String()).
Logger() Logger()
@ -303,19 +294,9 @@ func (env *Env) Compute(ctx context.Context, s Solver) error {
Str("dependency", dep.Path().String()). Str("dependency", dep.Path().String()).
Msg("dependency detected") Msg("dependency detected")
} }
v := compiler.Wrap(t.Value(), flowInst) v := compiler.Wrap(t.Value(), inst)
p := NewPipeline(s, NewFillable(t)) p := NewPipeline(s, NewFillable(t))
return p.Do(ctx, v) return p.Do(ctx, v)
}), nil }), nil
} }
// Orchestrate execution with cueflow
flow := cueflow.New(flowCfg, flowInst, flowMatchFn)
if err := flow.Run(ctx); err != nil {
return err
}
return env.set(
env.base,
env.input,
output,
)
} }

View File

@ -31,29 +31,33 @@ func (p *Pipeline) FS() FS {
return p.fs return p.fs
} }
func isComponent(v *compiler.Value) bool {
return v.Get("#dagger.compute").Exists()
}
func ops(code ...*compiler.Value) ([]*compiler.Value, error) { func ops(code ...*compiler.Value) ([]*compiler.Value, error) {
ops := []*compiler.Value{} ops := []*compiler.Value{}
// 1. Decode 'code' into a single flat array of operations. // 1. Decode 'code' into a single flat array of operations.
for _, x := range code { for _, x := range code {
// 1. attachment array // 1. attachment array
if xops, err := x.Get("#dagger.compute").List(); err == nil { if isComponent(x) {
xops, err := x.Get("#dagger.compute").List()
if err != nil {
return nil, err
}
// 'from' has an executable attached // 'from' has an executable attached
ops = append(ops, xops...) ops = append(ops, xops...)
continue
}
// 2. individual op // 2. individual op
if _, err := x.Get("do").String(); err == nil { } else if _, err := x.Get("do").String(); err == nil {
ops = append(ops, x) ops = append(ops, x)
continue
}
// 3. op array // 3. op array
if xops, err := x.List(); err == nil { } else if xops, err := x.List(); err == nil {
ops = append(ops, xops...) ops = append(ops, xops...)
continue } else {
}
// 4. error // 4. error
return nil, fmt.Errorf("not executable: %s", x.SourceUnsafe()) return nil, fmt.Errorf("not executable: %s", x.SourceUnsafe())
} }
}
return ops, nil return ops, nil
} }