From d8a1f35f6836adb9289f8071e93839ca61575911 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Tue, 4 May 2021 16:26:56 -0700 Subject: [PATCH] fix pipeline caching when using cache mounts Force the Pipeline to resolve the canonical Cue value before computing. This ensures that `.Path()` gives a constant result even on references. Fixes #399 Signed-off-by: Andrea Luzzardi --- dagger/compiler/value.go | 5 ++ dagger/deployment.go | 12 +-- dagger/pipeline.go | 117 ++++++++++++++++---------- tests/ops.bats | 1 - tests/ops/mounts/valid/cache/main.cue | 9 +- 5 files changed, 90 insertions(+), 54 deletions(-) diff --git a/dagger/compiler/value.go b/dagger/compiler/value.go index 3f20f50e..9b6e0ee5 100644 --- a/dagger/compiler/value.go +++ b/dagger/compiler/value.go @@ -43,6 +43,11 @@ func (v *Value) Lookup(path string) *Value { return v.LookupPath(cue.ParsePath(path)) } +func (v *Value) ReferencePath() (*Value, cue.Path) { + vv, p := v.val.ReferencePath() + return v.cc.Wrap(vv), p +} + // Proxy function to the underlying cue.Value func (v *Value) Len() cue.Value { return v.val.Len() diff --git a/dagger/deployment.go b/dagger/deployment.go index bbbb97c2..f0fe1643 100644 --- a/dagger/deployment.go +++ b/dagger/deployment.go @@ -94,9 +94,9 @@ func (d *Environment) LoadPlan(ctx context.Context, s Solver) error { return err } - p := NewPipeline("[internal] source", s) + p := NewPipeline(planSource, s).WithCustomName("[internal] source") // execute updater script - if err := p.Do(ctx, planSource); err != nil { + if err := p.Run(ctx); err != nil { return err } @@ -120,7 +120,7 @@ func (d *Environment) LoadPlan(ctx context.Context, s Solver) error { // by user-specified scripts. func (d *Environment) LocalDirs() map[string]string { dirs := map[string]string{} - localdirs := func(code ...*compiler.Value) { + localdirs := func(code *compiler.Value) { Analyze( func(op *compiler.Value) error { do, err := op.Lookup("do").String() @@ -137,7 +137,7 @@ func (d *Environment) LocalDirs() map[string]string { dirs[dir] = dir return nil }, - code..., + code, ) } // 1. Scan the environment state @@ -245,8 +245,8 @@ func newPipelineRunner(computed *compiler.Value, s Solver) cueflow.RunnerFunc { Msg("dependency detected") } v := compiler.Wrap(t.Value()) - p := NewPipeline(t.Path().String(), s) - err := p.Do(ctx, v) + p := NewPipeline(v, s) + err := p.Run(ctx) if err != nil { span.LogFields(otlog.String("error", err.Error())) ext.Error.Set(span, true) diff --git a/dagger/pipeline.go b/dagger/pipeline.go index 80112524..56a76604 100644 --- a/dagger/pipeline.go +++ b/dagger/pipeline.go @@ -32,6 +32,7 @@ const ( // An execution pipeline type Pipeline struct { + code *compiler.Value name string s Solver state llb.State @@ -40,15 +41,21 @@ type Pipeline struct { computed *compiler.Value } -func NewPipeline(name string, s Solver) *Pipeline { +func NewPipeline(code *compiler.Value, s Solver) *Pipeline { return &Pipeline{ - name: name, + code: code, + name: code.Path().String(), s: s, state: llb.Scratch(), computed: compiler.NewValue(), } } +func (p *Pipeline) WithCustomName(name string) *Pipeline { + p.name = name + return p +} + func (p *Pipeline) State() llb.State { return p.state } @@ -76,38 +83,35 @@ func isComponent(v *compiler.Value) bool { return v.Lookup("#up").Exists() } -func ops(code ...*compiler.Value) ([]*compiler.Value, error) { +func ops(code *compiler.Value) ([]*compiler.Value, error) { ops := []*compiler.Value{} - // 1. Decode 'code' into a single flat array of operations. - for _, x := range code { - // 1. attachment array - if isComponent(x) { - xops, err := x.Lookup("#up").List() - if err != nil { - return nil, err - } - // 'from' has an executable attached - ops = append(ops, xops...) - // 2. individual op - } else if _, err := x.Lookup("do").String(); err == nil { - ops = append(ops, x) - // 3. op array - } else if xops, err := x.List(); err == nil { - ops = append(ops, xops...) - } else { - // 4. error - source, err := x.Source() - if err != nil { - panic(err) - } - return nil, fmt.Errorf("not executable: %s", source) + // 1. attachment array + if isComponent(code) { + xops, err := code.Lookup("#up").List() + if err != nil { + return nil, err } + // 'from' has an executable attached + ops = append(ops, xops...) + // 2. individual op + } else if _, err := code.Lookup("do").String(); err == nil { + ops = append(ops, code) + // 3. op array + } else if xops, err := code.List(); err == nil { + ops = append(ops, xops...) + } else { + // 4. error + source, err := code.Source() + if err != nil { + panic(err) + } + return nil, fmt.Errorf("not executable: %s", source) } return ops, nil } -func Analyze(fn func(*compiler.Value) error, code ...*compiler.Value) error { - ops, err := ops(code...) +func Analyze(fn func(*compiler.Value) error, code *compiler.Value) error { + ops, err := ops(code) if err != nil { return err } @@ -144,12 +148,8 @@ func analyzeOp(fn func(*compiler.Value) error, op *compiler.Value) error { return nil } -// x may be: -// 1) a single operation -// 2) an array of operations -// 3) a value with an attached array of operations -func (p *Pipeline) Do(ctx context.Context, code ...*compiler.Value) error { - ops, err := ops(code...) +func (p *Pipeline) Run(ctx context.Context) error { + ops, err := ops(p.code) if err != nil { return err } @@ -260,8 +260,8 @@ func (p *Pipeline) Copy(ctx context.Context, op *compiler.Value, st llb.State) ( return st, err } // Execute 'from' in a tmp pipeline, and use the resulting fs - from := NewPipeline(op.Lookup("from").Path().String(), p.s) - if err := from.Do(ctx, op.Lookup("from")); err != nil { + from := NewPipeline(op.Lookup("from"), p.s) + if err := from.Run(ctx); err != nil { return st, err } return st.File( @@ -438,7 +438,7 @@ func (p *Pipeline) mount(ctx context.Context, dest string, mnt *compiler.Value) dest, llb.Scratch(), llb.AsPersistentCacheDir( - mnt.Path().String(), + p.canonicalPath(mnt), llb.CacheMountShared, ), ), nil @@ -453,8 +453,8 @@ func (p *Pipeline) mount(ctx context.Context, dest string, mnt *compiler.Value) } } // eg. mount: "/foo": { from: www.source } - from := NewPipeline(mnt.Lookup("from").Path().String(), p.s) - if err := from.Do(ctx, mnt.Lookup("from")); err != nil { + from := NewPipeline(mnt.Lookup("from"), p.s) + if err := from.Run(ctx); err != nil { return nil, err } // possibly construct mount options for LLB from @@ -470,6 +470,33 @@ func (p *Pipeline) mount(ctx context.Context, dest string, mnt *compiler.Value) return llb.AddMount(dest, from.State(), mo...), nil } +// canonicalPath returns the canonical path of `v` +// If the pipeline is a reference to another pipeline, `canonicalPath()` will +// return the path of the reference of `v`. +// FIXME: this doesn't work with references of references. +func (p *Pipeline) canonicalPath(v *compiler.Value) string { + // value path + vPath := v.Path().Selectors() + + // pipeline path + pipelinePath := p.code.Path().Selectors() + + // check if the pipeline is a reference + _, ref := p.code.ReferencePath() + if len(ref.Selectors()) == 0 { + return v.Path().String() + } + canonicalPipelinePath := ref.Selectors() + + // replace the pipeline path with the canonical pipeline path + // 1. strip the pipeline path from the value path + vPath = vPath[len(pipelinePath):] + // 2. inject the canonical pipeline path + vPath = append(canonicalPipelinePath, vPath...) + + return cue.MakePath(vPath...).String() +} + func (p *Pipeline) Export(ctx context.Context, op *compiler.Value, st llb.State) (llb.State, error) { source, err := op.Lookup("source").String() if err != nil { @@ -553,8 +580,8 @@ func unmarshalAnything(data []byte, fn unmarshaller) (interface{}, error) { func (p *Pipeline) Load(ctx context.Context, op *compiler.Value, st llb.State) (llb.State, error) { // Execute 'from' in a tmp pipeline, and use the resulting fs - from := NewPipeline(op.Lookup("from").Path().String(), p.s) - if err := from.Do(ctx, op.Lookup("from")); err != nil { + from := NewPipeline(op.Lookup("from"), p.s) + if err := from.Run(ctx); err != nil { return st, err } p.image = from.ImageConfig() @@ -735,8 +762,8 @@ func (p *Pipeline) DockerBuild(ctx context.Context, op *compiler.Value, st llb.S // docker build context. This can come from another component, so we need to // compute it first. if dockerContext.Exists() { - from := NewPipeline(op.Lookup("context").Path().String(), p.s) - if err := from.Do(ctx, dockerContext); err != nil { + from := NewPipeline(op.Lookup("context"), p.s) + if err := from.Run(ctx); err != nil { return st, err } contextDef, err = p.s.Marshal(ctx, from.State()) @@ -883,9 +910,9 @@ func (p *Pipeline) WriteFile(ctx context.Context, op *compiler.Value, st llb.Sta content = []byte(str) } case cue.BottomKind: - err = fmt.Errorf("%s: WriteFile content is not set", op.Path().String()) + err = fmt.Errorf("%s: WriteFile content is not set", p.canonicalPath(op)) default: - err = fmt.Errorf("%s: unhandled data type in WriteFile: %s", op.Path().String(), kind) + err = fmt.Errorf("%s: unhandled data type in WriteFile: %s", p.canonicalPath(op), kind) } if err != nil { return st, err diff --git a/tests/ops.bats b/tests/ops.bats index 0bc7135e..a67fe1b9 100644 --- a/tests/ops.bats +++ b/tests/ops.bats @@ -24,7 +24,6 @@ setup() { # cache run "$DAGGER" compute "$TESTDIR"/ops/mounts/valid/cache - assert_line '{"TestMountCache":"NOT SURE WHAT TO TEST YET"}' assert_success # component diff --git a/tests/ops/mounts/valid/cache/main.cue b/tests/ops/mounts/valid/cache/main.cue index b551a6d1..69b9d603 100644 --- a/tests/ops/mounts/valid/cache/main.cue +++ b/tests/ops/mounts/valid/cache/main.cue @@ -1,6 +1,8 @@ package testing -import "dagger.io/dagger/op" +import ( + "dagger.io/dagger/op" +) TestMountCache: { string @@ -11,7 +13,7 @@ TestMountCache: { }, op.#Exec & { args: ["sh", "-c", """ - echo -n "NOT SURE WHAT TO TEST YET" > /out + echo -n "$RANDOM" > /out """] dir: "/" mount: something: "cache" @@ -22,3 +24,6 @@ TestMountCache: { }, ] } + +// Make sure references to pipelines with cache mounts never get re-executed. #399 +TestReference: TestMountCache