fix pipeline caching when using cache mounts

Force the Pipeline to resolve the canonical Cue value before computing.
This ensures that `.Path()` gives a constant result even on references.

Fixes #399

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2021-05-04 16:26:56 -07:00
parent e43532b6fc
commit d8a1f35f68
5 changed files with 90 additions and 54 deletions

View File

@ -43,6 +43,11 @@ func (v *Value) Lookup(path string) *Value {
return v.LookupPath(cue.ParsePath(path))
}
func (v *Value) ReferencePath() (*Value, cue.Path) {
vv, p := v.val.ReferencePath()
return v.cc.Wrap(vv), p
}
// Proxy function to the underlying cue.Value
func (v *Value) Len() cue.Value {
return v.val.Len()

View File

@ -94,9 +94,9 @@ func (d *Environment) LoadPlan(ctx context.Context, s Solver) error {
return err
}
p := NewPipeline("[internal] source", s)
p := NewPipeline(planSource, s).WithCustomName("[internal] source")
// execute updater script
if err := p.Do(ctx, planSource); err != nil {
if err := p.Run(ctx); err != nil {
return err
}
@ -120,7 +120,7 @@ func (d *Environment) LoadPlan(ctx context.Context, s Solver) error {
// by user-specified scripts.
func (d *Environment) LocalDirs() map[string]string {
dirs := map[string]string{}
localdirs := func(code ...*compiler.Value) {
localdirs := func(code *compiler.Value) {
Analyze(
func(op *compiler.Value) error {
do, err := op.Lookup("do").String()
@ -137,7 +137,7 @@ func (d *Environment) LocalDirs() map[string]string {
dirs[dir] = dir
return nil
},
code...,
code,
)
}
// 1. Scan the environment state
@ -245,8 +245,8 @@ func newPipelineRunner(computed *compiler.Value, s Solver) cueflow.RunnerFunc {
Msg("dependency detected")
}
v := compiler.Wrap(t.Value())
p := NewPipeline(t.Path().String(), s)
err := p.Do(ctx, v)
p := NewPipeline(v, s)
err := p.Run(ctx)
if err != nil {
span.LogFields(otlog.String("error", err.Error()))
ext.Error.Set(span, true)

View File

@ -32,6 +32,7 @@ const (
// An execution pipeline
type Pipeline struct {
code *compiler.Value
name string
s Solver
state llb.State
@ -40,15 +41,21 @@ type Pipeline struct {
computed *compiler.Value
}
func NewPipeline(name string, s Solver) *Pipeline {
func NewPipeline(code *compiler.Value, s Solver) *Pipeline {
return &Pipeline{
name: name,
code: code,
name: code.Path().String(),
s: s,
state: llb.Scratch(),
computed: compiler.NewValue(),
}
}
func (p *Pipeline) WithCustomName(name string) *Pipeline {
p.name = name
return p
}
func (p *Pipeline) State() llb.State {
return p.state
}
@ -76,38 +83,35 @@ func isComponent(v *compiler.Value) bool {
return v.Lookup("#up").Exists()
}
func ops(code ...*compiler.Value) ([]*compiler.Value, error) {
func ops(code *compiler.Value) ([]*compiler.Value, error) {
ops := []*compiler.Value{}
// 1. Decode 'code' into a single flat array of operations.
for _, x := range code {
// 1. attachment array
if isComponent(x) {
xops, err := x.Lookup("#up").List()
if isComponent(code) {
xops, err := code.Lookup("#up").List()
if err != nil {
return nil, err
}
// 'from' has an executable attached
ops = append(ops, xops...)
// 2. individual op
} else if _, err := x.Lookup("do").String(); err == nil {
ops = append(ops, x)
} else if _, err := code.Lookup("do").String(); err == nil {
ops = append(ops, code)
// 3. op array
} else if xops, err := x.List(); err == nil {
} else if xops, err := code.List(); err == nil {
ops = append(ops, xops...)
} else {
// 4. error
source, err := x.Source()
source, err := code.Source()
if err != nil {
panic(err)
}
return nil, fmt.Errorf("not executable: %s", source)
}
}
return ops, nil
}
func Analyze(fn func(*compiler.Value) error, code ...*compiler.Value) error {
ops, err := ops(code...)
func Analyze(fn func(*compiler.Value) error, code *compiler.Value) error {
ops, err := ops(code)
if err != nil {
return err
}
@ -144,12 +148,8 @@ func analyzeOp(fn func(*compiler.Value) error, op *compiler.Value) error {
return nil
}
// x may be:
// 1) a single operation
// 2) an array of operations
// 3) a value with an attached array of operations
func (p *Pipeline) Do(ctx context.Context, code ...*compiler.Value) error {
ops, err := ops(code...)
func (p *Pipeline) Run(ctx context.Context) error {
ops, err := ops(p.code)
if err != nil {
return err
}
@ -260,8 +260,8 @@ func (p *Pipeline) Copy(ctx context.Context, op *compiler.Value, st llb.State) (
return st, err
}
// Execute 'from' in a tmp pipeline, and use the resulting fs
from := NewPipeline(op.Lookup("from").Path().String(), p.s)
if err := from.Do(ctx, op.Lookup("from")); err != nil {
from := NewPipeline(op.Lookup("from"), p.s)
if err := from.Run(ctx); err != nil {
return st, err
}
return st.File(
@ -438,7 +438,7 @@ func (p *Pipeline) mount(ctx context.Context, dest string, mnt *compiler.Value)
dest,
llb.Scratch(),
llb.AsPersistentCacheDir(
mnt.Path().String(),
p.canonicalPath(mnt),
llb.CacheMountShared,
),
), nil
@ -453,8 +453,8 @@ func (p *Pipeline) mount(ctx context.Context, dest string, mnt *compiler.Value)
}
}
// eg. mount: "/foo": { from: www.source }
from := NewPipeline(mnt.Lookup("from").Path().String(), p.s)
if err := from.Do(ctx, mnt.Lookup("from")); err != nil {
from := NewPipeline(mnt.Lookup("from"), p.s)
if err := from.Run(ctx); err != nil {
return nil, err
}
// possibly construct mount options for LLB from
@ -470,6 +470,33 @@ func (p *Pipeline) mount(ctx context.Context, dest string, mnt *compiler.Value)
return llb.AddMount(dest, from.State(), mo...), nil
}
// canonicalPath returns the canonical path of `v`
// If the pipeline is a reference to another pipeline, `canonicalPath()` will
// return the path of the reference of `v`.
// FIXME: this doesn't work with references of references.
func (p *Pipeline) canonicalPath(v *compiler.Value) string {
// value path
vPath := v.Path().Selectors()
// pipeline path
pipelinePath := p.code.Path().Selectors()
// check if the pipeline is a reference
_, ref := p.code.ReferencePath()
if len(ref.Selectors()) == 0 {
return v.Path().String()
}
canonicalPipelinePath := ref.Selectors()
// replace the pipeline path with the canonical pipeline path
// 1. strip the pipeline path from the value path
vPath = vPath[len(pipelinePath):]
// 2. inject the canonical pipeline path
vPath = append(canonicalPipelinePath, vPath...)
return cue.MakePath(vPath...).String()
}
func (p *Pipeline) Export(ctx context.Context, op *compiler.Value, st llb.State) (llb.State, error) {
source, err := op.Lookup("source").String()
if err != nil {
@ -553,8 +580,8 @@ func unmarshalAnything(data []byte, fn unmarshaller) (interface{}, error) {
func (p *Pipeline) Load(ctx context.Context, op *compiler.Value, st llb.State) (llb.State, error) {
// Execute 'from' in a tmp pipeline, and use the resulting fs
from := NewPipeline(op.Lookup("from").Path().String(), p.s)
if err := from.Do(ctx, op.Lookup("from")); err != nil {
from := NewPipeline(op.Lookup("from"), p.s)
if err := from.Run(ctx); err != nil {
return st, err
}
p.image = from.ImageConfig()
@ -735,8 +762,8 @@ func (p *Pipeline) DockerBuild(ctx context.Context, op *compiler.Value, st llb.S
// docker build context. This can come from another component, so we need to
// compute it first.
if dockerContext.Exists() {
from := NewPipeline(op.Lookup("context").Path().String(), p.s)
if err := from.Do(ctx, dockerContext); err != nil {
from := NewPipeline(op.Lookup("context"), p.s)
if err := from.Run(ctx); err != nil {
return st, err
}
contextDef, err = p.s.Marshal(ctx, from.State())
@ -883,9 +910,9 @@ func (p *Pipeline) WriteFile(ctx context.Context, op *compiler.Value, st llb.Sta
content = []byte(str)
}
case cue.BottomKind:
err = fmt.Errorf("%s: WriteFile content is not set", op.Path().String())
err = fmt.Errorf("%s: WriteFile content is not set", p.canonicalPath(op))
default:
err = fmt.Errorf("%s: unhandled data type in WriteFile: %s", op.Path().String(), kind)
err = fmt.Errorf("%s: unhandled data type in WriteFile: %s", p.canonicalPath(op), kind)
}
if err != nil {
return st, err

View File

@ -24,7 +24,6 @@ setup() {
# cache
run "$DAGGER" compute "$TESTDIR"/ops/mounts/valid/cache
assert_line '{"TestMountCache":"NOT SURE WHAT TO TEST YET"}'
assert_success
# component

View File

@ -1,6 +1,8 @@
package testing
import "dagger.io/dagger/op"
import (
"dagger.io/dagger/op"
)
TestMountCache: {
string
@ -11,7 +13,7 @@ TestMountCache: {
},
op.#Exec & {
args: ["sh", "-c", """
echo -n "NOT SURE WHAT TO TEST YET" > /out
echo -n "$RANDOM" > /out
"""]
dir: "/"
mount: something: "cache"
@ -22,3 +24,6 @@ TestMountCache: {
},
]
}
// Make sure references to pipelines with cache mounts never get re-executed. #399
TestReference: TestMountCache