performance: reduce the number of fills
- Remove unnecessary Fill() in Export() - Change `set()` and the way we store outputs so we don't fill intermediaries as much - WIP: Scan the tree only once. Changed LocalDirs to use cueflow rather than doing our own Walk. In a follow up we should use the same flow instance. Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
parent
d5830fbaca
commit
890fdb4176
@ -162,10 +162,7 @@ func (c *Client) outputfn(ctx context.Context, r io.Reader) (*compiler.Value, er
|
||||
lg := log.Ctx(ctx)
|
||||
|
||||
// FIXME: merge this into env output.
|
||||
out, err := compiler.EmptyStruct()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out := compiler.EmptyStruct()
|
||||
|
||||
tr := tar.NewReader(r)
|
||||
for {
|
||||
|
@ -17,7 +17,7 @@ func Compile(name string, src interface{}) (*Value, error) {
|
||||
return DefaultCompiler.Compile(name, src)
|
||||
}
|
||||
|
||||
func EmptyStruct() (*Value, error) {
|
||||
func EmptyStruct() *Value {
|
||||
return DefaultCompiler.EmptyStruct()
|
||||
}
|
||||
|
||||
@ -63,8 +63,12 @@ func (c *Compiler) Cue() *cue.Runtime {
|
||||
}
|
||||
|
||||
// Compile an empty struct
|
||||
func (c *Compiler) EmptyStruct() (*Value, error) {
|
||||
return c.Compile("", "")
|
||||
func (c *Compiler) EmptyStruct() *Value {
|
||||
empty, err := c.Compile("", "")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return empty
|
||||
}
|
||||
|
||||
func (c *Compiler) Compile(name string, src interface{}) (*Value, error) {
|
||||
|
@ -38,14 +38,6 @@ func TestDefNotExist(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSimple(t *testing.T) {
|
||||
c := &Compiler{}
|
||||
_, err := c.EmptyStruct()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSON(t *testing.T) {
|
||||
c := &Compiler{}
|
||||
v, err := c.Compile("", `foo: hello: "world"`)
|
||||
|
@ -67,6 +67,11 @@ func (v *Value) Len() cue.Value {
|
||||
return v.val.Len()
|
||||
}
|
||||
|
||||
// Proxy function to the underlying cue.Value
|
||||
func (v *Value) Kind() cue.Kind {
|
||||
return v.val.Kind()
|
||||
}
|
||||
|
||||
// Proxy function to the underlying cue.Value
|
||||
func (v *Value) Fields() (*cue.Iterator, error) {
|
||||
return v.val.Fields()
|
||||
|
133
dagger/env.go
133
dagger/env.go
@ -51,15 +51,14 @@ func (env *Env) SetUpdater(v *compiler.Value) error {
|
||||
}
|
||||
|
||||
func NewEnv() (*Env, error) {
|
||||
empty, err := compiler.EmptyStruct()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
empty := compiler.EmptyStruct()
|
||||
env := &Env{
|
||||
base: empty,
|
||||
input: empty,
|
||||
output: empty,
|
||||
state: empty,
|
||||
}
|
||||
if err := env.mergeState(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := env.SetUpdater(nil); err != nil {
|
||||
return nil, err
|
||||
@ -77,17 +76,10 @@ func (env *Env) Input() *compiler.Value {
|
||||
|
||||
func (env *Env) SetInput(i *compiler.Value) error {
|
||||
if i == nil {
|
||||
var err error
|
||||
i, err = compiler.EmptyStruct()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
i = compiler.EmptyStruct()
|
||||
}
|
||||
return env.set(
|
||||
env.base,
|
||||
i,
|
||||
env.output,
|
||||
)
|
||||
env.input = i
|
||||
return env.mergeState()
|
||||
}
|
||||
|
||||
// Update the base configuration
|
||||
@ -104,12 +96,9 @@ func (env *Env) Update(ctx context.Context, s Solver) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("base config: %w", err)
|
||||
}
|
||||
env.base = base
|
||||
// Commit
|
||||
return env.set(
|
||||
base,
|
||||
env.input,
|
||||
env.output,
|
||||
)
|
||||
return env.mergeState()
|
||||
}
|
||||
|
||||
func (env *Env) Base() *compiler.Value {
|
||||
@ -147,52 +136,48 @@ func (env *Env) LocalDirs() map[string]string {
|
||||
)
|
||||
}
|
||||
// 1. Scan the environment state
|
||||
env.State().Walk(
|
||||
func(v *compiler.Value) bool {
|
||||
compute := v.Get("#dagger.compute")
|
||||
if !compute.Exists() {
|
||||
// No compute script
|
||||
return true
|
||||
}
|
||||
localdirs(compute)
|
||||
return false // no nested executables
|
||||
},
|
||||
nil,
|
||||
)
|
||||
// FIXME: use a common `flow` instance to avoid rescanning the tree.
|
||||
inst := env.state.CueInst()
|
||||
flow := cueflow.New(&cueflow.Config{}, inst, newDummyTaskFunc(inst))
|
||||
for _, t := range flow.Tasks() {
|
||||
v := compiler.Wrap(t.Value(), inst)
|
||||
localdirs(v.Get("#dagger.compute"))
|
||||
}
|
||||
// 2. Scan the environment updater
|
||||
localdirs(env.Updater())
|
||||
return dirs
|
||||
}
|
||||
|
||||
// FIXME: this is just a 3-way merge. Add var args to compiler.Value.Merge.
|
||||
func (env *Env) set(base, input, output *compiler.Value) (err error) {
|
||||
func (env *Env) mergeState() error {
|
||||
// FIXME: make this cleaner in *compiler.Value by keeping intermediary instances
|
||||
// FIXME: state.CueInst() must return an instance with the same
|
||||
// contents as state.v, for the purposes of cueflow.
|
||||
// That is not currently how *compiler.Value works, so we prepare the cue
|
||||
// instance manually.
|
||||
// --> refactor the compiler.Value API to do this for us.
|
||||
stateInst := env.state.CueInst()
|
||||
var (
|
||||
state = compiler.EmptyStruct()
|
||||
stateInst = state.CueInst()
|
||||
err error
|
||||
)
|
||||
|
||||
stateInst, err = stateInst.Fill(base.Cue())
|
||||
stateInst, err = stateInst.Fill(env.base.Cue())
|
||||
if err != nil {
|
||||
return fmt.Errorf("merge base & input: %w", err)
|
||||
}
|
||||
stateInst, err = stateInst.Fill(input.Cue())
|
||||
stateInst, err = stateInst.Fill(env.input.Cue())
|
||||
if err != nil {
|
||||
return fmt.Errorf("merge base & input: %w", err)
|
||||
}
|
||||
stateInst, err = stateInst.Fill(output.Cue())
|
||||
stateInst, err = stateInst.Fill(env.output.Cue())
|
||||
if err != nil {
|
||||
return fmt.Errorf("merge output with base & input: %w", err)
|
||||
}
|
||||
|
||||
state := compiler.Wrap(stateInst.Value(), stateInst)
|
||||
state = compiler.Wrap(stateInst.Value(), stateInst)
|
||||
|
||||
// commit
|
||||
env.base = base
|
||||
env.input = input
|
||||
env.output = output
|
||||
env.state = state
|
||||
return nil
|
||||
}
|
||||
@ -217,15 +202,7 @@ func (env *Env) Export(fs FS) (FS, error) {
|
||||
// fs = env.output.SaveJSON(fs, "output.cue")
|
||||
// }
|
||||
// For now, export a single `state.cue` containing the combined output.
|
||||
var err error
|
||||
state := env.state
|
||||
if env.output != nil {
|
||||
state, err = state.Merge(env.output)
|
||||
if err != nil {
|
||||
return fs, err
|
||||
}
|
||||
}
|
||||
fs = fs.WriteValueJSON("state.cue", state)
|
||||
fs = fs.WriteValueJSON("state.cue", env.state)
|
||||
return fs, nil
|
||||
}
|
||||
|
||||
@ -240,11 +217,8 @@ func (env *Env) Compute(ctx context.Context, s Solver) error {
|
||||
Str("value", compiler.Wrap(flowInst.Value(), flowInst).JSON().String()).
|
||||
Msg("walking")
|
||||
|
||||
// Initialize empty output
|
||||
output, err := compiler.EmptyStruct()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Reset the output
|
||||
env.output = compiler.EmptyStruct()
|
||||
|
||||
// Cueflow config
|
||||
flowCfg := &cueflow.Config{
|
||||
@ -266,7 +240,7 @@ func (env *Env) Compute(ctx context.Context, s Solver) error {
|
||||
lg.Debug().Msg("cueflow task: filling result")
|
||||
// Merge task value into output
|
||||
var err error
|
||||
output, err = output.MergePath(t.Value(), t.Path())
|
||||
env.output, err = env.output.MergePath(t.Value(), t.Path())
|
||||
if err != nil {
|
||||
lg.
|
||||
Error().
|
||||
@ -277,21 +251,38 @@ func (env *Env) Compute(ctx context.Context, s Solver) error {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
// Cueflow match func
|
||||
flowMatchFn := func(flowVal cue.Value) (cueflow.Runner, error) {
|
||||
v := compiler.Wrap(flowVal, flowInst)
|
||||
compute := v.Get("#dagger.compute")
|
||||
if !compute.Exists() {
|
||||
// Orchestrate execution with cueflow
|
||||
flow := cueflow.New(flowCfg, flowInst, newPipelineTaskFunc(ctx, flowInst, s))
|
||||
if err := flow.Run(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return env.mergeState()
|
||||
}
|
||||
|
||||
func newDummyTaskFunc(inst *cue.Instance) cueflow.TaskFunc {
|
||||
return func(flowVal cue.Value) (cueflow.Runner, error) {
|
||||
v := compiler.Wrap(flowVal, inst)
|
||||
if !isComponent(v) {
|
||||
// No compute script
|
||||
return nil, nil
|
||||
}
|
||||
if _, err := compute.List(); err != nil {
|
||||
// invalid compute script
|
||||
return nil, err
|
||||
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
|
||||
return nil
|
||||
}), nil
|
||||
}
|
||||
}
|
||||
|
||||
func newPipelineTaskFunc(ctx context.Context, inst *cue.Instance, s Solver) cueflow.TaskFunc {
|
||||
return func(flowVal cue.Value) (cueflow.Runner, error) {
|
||||
v := compiler.Wrap(flowVal, inst)
|
||||
if !isComponent(v) {
|
||||
// No compute script
|
||||
return nil, nil
|
||||
}
|
||||
// Cueflow run func:
|
||||
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
|
||||
lg := lg.
|
||||
lg := log.
|
||||
Ctx(ctx).
|
||||
With().
|
||||
Str("path", t.Path().String()).
|
||||
Logger()
|
||||
@ -303,19 +294,9 @@ func (env *Env) Compute(ctx context.Context, s Solver) error {
|
||||
Str("dependency", dep.Path().String()).
|
||||
Msg("dependency detected")
|
||||
}
|
||||
v := compiler.Wrap(t.Value(), flowInst)
|
||||
v := compiler.Wrap(t.Value(), inst)
|
||||
p := NewPipeline(s, NewFillable(t))
|
||||
return p.Do(ctx, v)
|
||||
}), nil
|
||||
}
|
||||
// Orchestrate execution with cueflow
|
||||
flow := cueflow.New(flowCfg, flowInst, flowMatchFn)
|
||||
if err := flow.Run(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return env.set(
|
||||
env.base,
|
||||
env.input,
|
||||
output,
|
||||
)
|
||||
}
|
||||
|
@ -31,28 +31,32 @@ func (p *Pipeline) FS() FS {
|
||||
return p.fs
|
||||
}
|
||||
|
||||
func isComponent(v *compiler.Value) bool {
|
||||
return v.Get("#dagger.compute").Exists()
|
||||
}
|
||||
|
||||
func ops(code ...*compiler.Value) ([]*compiler.Value, error) {
|
||||
ops := []*compiler.Value{}
|
||||
// 1. Decode 'code' into a single flat array of operations.
|
||||
for _, x := range code {
|
||||
// 1. attachment array
|
||||
if xops, err := x.Get("#dagger.compute").List(); err == nil {
|
||||
if isComponent(x) {
|
||||
xops, err := x.Get("#dagger.compute").List()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// 'from' has an executable attached
|
||||
ops = append(ops, xops...)
|
||||
continue
|
||||
}
|
||||
// 2. individual op
|
||||
if _, err := x.Get("do").String(); err == nil {
|
||||
// 2. individual op
|
||||
} else if _, err := x.Get("do").String(); err == nil {
|
||||
ops = append(ops, x)
|
||||
continue
|
||||
}
|
||||
// 3. op array
|
||||
if xops, err := x.List(); err == nil {
|
||||
// 3. op array
|
||||
} else if xops, err := x.List(); err == nil {
|
||||
ops = append(ops, xops...)
|
||||
continue
|
||||
} else {
|
||||
// 4. error
|
||||
return nil, fmt.Errorf("not executable: %s", x.SourceUnsafe())
|
||||
}
|
||||
// 4. error
|
||||
return nil, fmt.Errorf("not executable: %s", x.SourceUnsafe())
|
||||
}
|
||||
return ops, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user