store only computed values in compute layer

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2021-04-07 18:41:44 -07:00
parent 5381d0bfe1
commit e54f1b0c3a
7 changed files with 80 additions and 94 deletions

View File

@ -56,7 +56,7 @@ var queryCmd = &cobra.Command{
lg.Fatal().Err(err).Msg("failed to query deployment") lg.Fatal().Err(err).Msg("failed to query deployment")
} }
cueVal := compiler.EmptyStruct() cueVal := compiler.NewValue()
if !viper.GetBool("no-plan") { if !viper.GetBool("no-plan") {
if err := cueVal.FillPath(cue.MakePath(), result.Plan()); err != nil { if err := cueVal.FillPath(cue.MakePath(), result.Plan()); err != nil {

View File

@ -90,7 +90,7 @@ func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc
eg.Go(func() error { eg.Go(func() error {
defer outr.Close() defer outr.Close()
result, err = DeploymentResultFromTar(gctx, outr) result, err = ReadDeploymentResult(gctx, outr)
return err return err
}) })

View File

@ -19,8 +19,8 @@ func Compile(name string, src interface{}) (*Value, error) {
return DefaultCompiler.Compile(name, src) return DefaultCompiler.Compile(name, src)
} }
func EmptyStruct() *Value { func NewValue() *Value {
return DefaultCompiler.EmptyStruct() return DefaultCompiler.NewValue()
} }
// FIXME can be refactored away now? // FIXME can be refactored away now?
@ -72,9 +72,14 @@ func (c *Compiler) Cue() *cue.Runtime {
return &(c.Runtime) return &(c.Runtime)
} }
// Compile an empty struct // Compile an empty value
func (c *Compiler) EmptyStruct() *Value { func (c *Compiler) NewValue() *Value {
empty, err := c.Compile("", "") empty, err := c.Compile("", `
{
...
_
}
`)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -74,7 +74,7 @@ func (d *Deployment) LoadPlan(ctx context.Context, s Solver) error {
return err return err
} }
p := NewPipeline("[internal] source", s, nil) p := NewPipeline("[internal] source", s)
// execute updater script // execute updater script
if err := p.Do(ctx, planSource); err != nil { if err := p.Do(ctx, planSource); err != nil {
return err return err
@ -150,10 +150,8 @@ func (d *Deployment) Up(ctx context.Context, s Solver) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "deployment.Up") span, ctx := opentracing.StartSpanFromContext(ctx, "deployment.Up")
defer span.Finish() defer span.Finish()
lg := log.Ctx(ctx)
// Reset the computed values // Reset the computed values
d.result.computed = compiler.EmptyStruct() d.result.computed = compiler.NewValue()
// Cueflow cue instance // Cueflow cue instance
src, err := d.result.Merge() src, err := d.result.Merge()
@ -161,39 +159,11 @@ func (d *Deployment) Up(ctx context.Context, s Solver) error {
return err return err
} }
// Cueflow config
flowCfg := &cueflow.Config{
UpdateFunc: func(c *cueflow.Controller, t *cueflow.Task) error {
if t == nil {
return nil
}
lg := lg.
With().
Str("component", t.Path().String()).
Str("state", t.State().String()).
Logger()
if t.State() != cueflow.Terminated {
return nil
}
// Merge task value into output
err := d.result.computed.FillPath(t.Path(), t.Value())
if err != nil {
lg.
Error().
Err(err).
Msg("failed to fill task result")
return err
}
return nil
},
}
// Orchestrate execution with cueflow // Orchestrate execution with cueflow
flow := cueflow.New( flow := cueflow.New(
flowCfg, &cueflow.Config{},
src.CueInst(), src.CueInst(),
newTaskFunc(src.CueInst(), newPipelineRunner(src.CueInst(), s)), newTaskFunc(src.CueInst(), newPipelineRunner(src.CueInst(), d.result, s)),
) )
if err := flow.Run(ctx); err != nil { if err := flow.Run(ctx); err != nil {
return err return err
@ -225,7 +195,7 @@ func noOpRunner(t *cueflow.Task) error {
return nil return nil
} }
func newPipelineRunner(inst *cue.Instance, s Solver) cueflow.RunnerFunc { func newPipelineRunner(inst *cue.Instance, result *DeploymentResult, s Solver) cueflow.RunnerFunc {
return cueflow.RunnerFunc(func(t *cueflow.Task) error { return cueflow.RunnerFunc(func(t *cueflow.Task) error {
ctx := t.Context() ctx := t.Context()
lg := log. lg := log.
@ -250,7 +220,7 @@ func newPipelineRunner(inst *cue.Instance, s Solver) cueflow.RunnerFunc {
Msg("dependency detected") Msg("dependency detected")
} }
v := compiler.Wrap(t.Value(), inst) v := compiler.Wrap(t.Value(), inst)
p := NewPipeline(t.Path().String(), s, NewFillable(t)) p := NewPipeline(t.Path().String(), s)
err := p.Do(ctx, v) err := p.Do(ctx, v)
if err != nil { if err != nil {
span.LogFields(otlog.String("error", err.Error())) span.LogFields(otlog.String("error", err.Error()))
@ -271,6 +241,30 @@ func newPipelineRunner(inst *cue.Instance, s Solver) cueflow.RunnerFunc {
Msg("failed") Msg("failed")
return err return err
} }
// Mirror the computed values in both `Task` and `Result`
computed := p.Computed()
if computed.IsEmptyStruct() {
return nil
}
if err := t.Fill(computed.Cue()); err != nil {
lg.
Error().
Err(err).
Msg("failed to fill task")
return err
}
// Merge task value into output
if err := result.computed.FillPath(t.Path(), computed); err != nil {
lg.
Error().
Err(err).
Msg("failed to fill task result")
return err
}
lg. lg.
Info(). Info().
Dur("duration", time.Since(start)). Dur("duration", time.Since(start)).

View File

@ -10,6 +10,7 @@ import (
"path" "path"
"strings" "strings"
"cuelang.org/go/cue"
"github.com/docker/distribution/reference" "github.com/docker/distribution/reference"
bk "github.com/moby/buildkit/client" bk "github.com/moby/buildkit/client"
"github.com/moby/buildkit/client/llb" "github.com/moby/buildkit/client/llb"
@ -29,19 +30,19 @@ const (
// An execution pipeline // An execution pipeline
type Pipeline struct { type Pipeline struct {
name string name string
s Solver s Solver
state llb.State state llb.State
result bkgw.Reference result bkgw.Reference
out *Fillable computed *compiler.Value
} }
func NewPipeline(name string, s Solver, out *Fillable) *Pipeline { func NewPipeline(name string, s Solver) *Pipeline {
return &Pipeline{ return &Pipeline{
name: name, name: name,
s: s, s: s,
state: llb.Scratch(), state: llb.Scratch(),
out: out, computed: compiler.NewValue(),
} }
} }
@ -60,6 +61,10 @@ func (p *Pipeline) FS() fs.FS {
return NewBuildkitFS(p.result) return NewBuildkitFS(p.result)
} }
func (p *Pipeline) Computed() *compiler.Value {
return p.computed
}
func isComponent(v *compiler.Value) bool { func isComponent(v *compiler.Value) bool {
return v.Lookup("#up").Exists() return v.Lookup("#up").Exists()
} }
@ -246,7 +251,7 @@ func (p *Pipeline) Copy(ctx context.Context, op *compiler.Value, st llb.State) (
return st, err return st, err
} }
// Execute 'from' in a tmp pipeline, and use the resulting fs // Execute 'from' in a tmp pipeline, and use the resulting fs
from := NewPipeline(op.Lookup("from").Path().String(), p.s, nil) from := NewPipeline(op.Lookup("from").Path().String(), p.s)
if err := from.Do(ctx, op.Lookup("from")); err != nil { if err := from.Do(ctx, op.Lookup("from")); err != nil {
return st, err return st, err
} }
@ -446,7 +451,7 @@ func (p *Pipeline) mount(ctx context.Context, dest string, mnt *compiler.Value)
} }
} }
// eg. mount: "/foo": { from: www.source } // eg. mount: "/foo": { from: www.source }
from := NewPipeline(mnt.Lookup("from").Path().String(), p.s, nil) from := NewPipeline(mnt.Lookup("from").Path().String(), p.s)
if err := from.Do(ctx, mnt.Lookup("from")); err != nil { if err := from.Do(ctx, mnt.Lookup("from")); err != nil {
return nil, err return nil, err
} }
@ -488,7 +493,7 @@ func (p *Pipeline) Export(ctx context.Context, op *compiler.Value, st llb.State)
Bytes("contents", contents). Bytes("contents", contents).
Msg("exporting string") Msg("exporting string")
if err := p.out.Fill(string(contents)); err != nil { if err := p.computed.FillPath(cue.MakePath(), string(contents)); err != nil {
return st, err return st, err
} }
case "json": case "json":
@ -504,7 +509,7 @@ func (p *Pipeline) Export(ctx context.Context, op *compiler.Value, st llb.State)
Interface("contents", o). Interface("contents", o).
Msg("exporting json") Msg("exporting json")
if err := p.out.Fill(o); err != nil { if err := p.computed.FillPath(cue.MakePath(), o); err != nil {
return st, err return st, err
} }
case "yaml": case "yaml":
@ -520,7 +525,7 @@ func (p *Pipeline) Export(ctx context.Context, op *compiler.Value, st llb.State)
Interface("contents", o). Interface("contents", o).
Msg("exporting yaml") Msg("exporting yaml")
if err := p.out.Fill(o); err != nil { if err := p.computed.FillPath(cue.MakePath(), o); err != nil {
return st, err return st, err
} }
default: default:
@ -550,7 +555,7 @@ func unmarshalAnything(data []byte, fn unmarshaller) (interface{}, error) {
func (p *Pipeline) Load(ctx context.Context, op *compiler.Value, st llb.State) (llb.State, error) { func (p *Pipeline) Load(ctx context.Context, op *compiler.Value, st llb.State) (llb.State, error) {
// Execute 'from' in a tmp pipeline, and use the resulting fs // Execute 'from' in a tmp pipeline, and use the resulting fs
from := NewPipeline(op.Lookup("from").Path().String(), p.s, nil) from := NewPipeline(op.Lookup("from").Path().String(), p.s)
if err := from.Do(ctx, op.Lookup("from")); err != nil { if err := from.Do(ctx, op.Lookup("from")); err != nil {
return st, err return st, err
} }
@ -683,7 +688,7 @@ func (p *Pipeline) DockerBuild(ctx context.Context, op *compiler.Value, st llb.S
// docker build context. This can come from another component, so we need to // docker build context. This can come from another component, so we need to
// compute it first. // compute it first.
if context.Exists() { if context.Exists() {
from := NewPipeline(op.Lookup("context").Path().String(), p.s, nil) from := NewPipeline(op.Lookup("context").Path().String(), p.s)
if err := from.Do(ctx, context); err != nil { if err := from.Do(ctx, context); err != nil {
return st, err return st, err
} }

View File

@ -33,9 +33,9 @@ type DeploymentResult struct {
func NewDeploymentResult() *DeploymentResult { func NewDeploymentResult() *DeploymentResult {
return &DeploymentResult{ return &DeploymentResult{
plan: compiler.EmptyStruct(), plan: compiler.NewValue(),
input: compiler.EmptyStruct(), input: compiler.NewValue(),
computed: compiler.EmptyStruct(), computed: compiler.NewValue(),
} }
} }
@ -58,7 +58,7 @@ func (r *DeploymentResult) Merge() (*compiler.Value, error) {
// instance manually. // instance manually.
// --> refactor the compiler.Value API to do this for us. // --> refactor the compiler.Value API to do this for us.
var ( var (
v = compiler.EmptyStruct() v = compiler.NewValue()
inst = v.CueInst() inst = v.CueInst()
err error err error
) )
@ -115,7 +115,7 @@ func (r *DeploymentResult) ToLLB() (llb.State, error) {
return st, nil return st, nil
} }
func DeploymentResultFromTar(ctx context.Context, r io.Reader) (*DeploymentResult, error) { func ReadDeploymentResult(ctx context.Context, r io.Reader) (*DeploymentResult, error) {
lg := log.Ctx(ctx) lg := log.Ctx(ctx)
result := NewDeploymentResult() result := NewDeploymentResult()
tr := tar.NewReader(r) tr := tar.NewReader(r)
@ -141,11 +141,21 @@ func DeploymentResultFromTar(ctx context.Context, r io.Reader) (*DeploymentResul
lg.Debug().Msg("outputfn: compiling") lg.Debug().Msg("outputfn: compiling")
v, err := compiler.Compile(h.Name, tr) src, err := io.ReadAll(tr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
v, err := compiler.Compile(h.Name, src)
if err != nil {
lg.
Debug().
Err(compiler.Err(err)).
Bytes("src", src).
Msg("invalid result file")
return nil, fmt.Errorf("failed to compile result: %w", compiler.Err(err))
}
switch h.Name { switch h.Name {
case planFile: case planFile:
result.plan = v result.plan = v

View File

@ -1,28 +0,0 @@
package dagger
import (
"os"
cueflow "cuelang.org/go/tools/flow"
)
var ErrNotExist = os.ErrNotExist
// Something which can be filled in-place with a cue value
type Fillable struct {
t *cueflow.Task
}
func NewFillable(t *cueflow.Task) *Fillable {
return &Fillable{
t: t,
}
}
func (f *Fillable) Fill(x interface{}) error {
// Use a nil pointer receiver to discard all values
if f == nil {
return nil
}
return f.t.Fill(x)
}