From 5381d0bfe1e43ce797dde074973c07a74b27467f Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Tue, 6 Apr 2021 17:43:12 -0700 Subject: [PATCH] up: store outputs and merge in query Signed-off-by: Andrea Luzzardi --- cmd/dagger/cmd/common/common.go | 12 +- cmd/dagger/cmd/compute.go | 10 +- cmd/dagger/cmd/new.go | 2 +- cmd/dagger/cmd/query.go | 34 +++++- cmd/dagger/cmd/up.go | 6 +- dagger/client.go | 61 ++-------- dagger/deployment.go | 190 +++++++------------------------- dagger/result.go | 161 +++++++++++++++++++++++++++ dagger/state.go | 56 ++++++++++ 9 files changed, 314 insertions(+), 218 deletions(-) create mode 100644 dagger/result.go create mode 100644 dagger/state.go diff --git a/cmd/dagger/cmd/common/common.go b/cmd/dagger/cmd/common/common.go index 2014c224..fde69b17 100644 --- a/cmd/dagger/cmd/common/common.go +++ b/cmd/dagger/cmd/common/common.go @@ -2,7 +2,6 @@ package common import ( "context" - "fmt" "os" "dagger.io/go/dagger" @@ -61,22 +60,19 @@ func GetCurrentDeploymentState(ctx context.Context, store *dagger.Store) *dagger } // Re-compute a deployment (equivalent to `dagger up`). -// If printOutput is true, print the JSON-encoded computed state to standard output -func DeploymentUp(ctx context.Context, state *dagger.DeploymentState, printOutput bool) { +func DeploymentUp(ctx context.Context, state *dagger.DeploymentState) *dagger.DeploymentResult { lg := log.Ctx(ctx) c, err := dagger.NewClient(ctx, "") if err != nil { lg.Fatal().Err(err).Msg("unable to create client") } - output, err := c.Do(ctx, state, func(ctx context.Context, deployment *dagger.Deployment, s dagger.Solver) error { + result, err := c.Do(ctx, state, func(ctx context.Context, deployment *dagger.Deployment, s dagger.Solver) error { log.Ctx(ctx).Debug().Msg("bringing deployment up") - return deployment.Up(ctx, s, nil) + return deployment.Up(ctx, s) }) if err != nil { lg.Fatal().Err(err).Msg("failed to up deployment") } - if printOutput { - fmt.Println(output.JSON()) - } + return result } diff --git a/cmd/dagger/cmd/compute.go b/cmd/dagger/cmd/compute.go index 033e5f38..8a087c7e 100644 --- a/cmd/dagger/cmd/compute.go +++ b/cmd/dagger/cmd/compute.go @@ -3,6 +3,7 @@ package cmd import ( "encoding/json" "errors" + "fmt" "os" "strings" @@ -127,7 +128,14 @@ var computeCmd = &cobra.Command{ } } - common.DeploymentUp(ctx, st, true) + result := common.DeploymentUp(ctx, st) + + cueVal, err := result.Merge() + if err != nil { + lg.Fatal().Err(err).Msg("failed to merge result") + } + + fmt.Println(cueVal.JSON()) }, } diff --git a/cmd/dagger/cmd/new.go b/cmd/dagger/cmd/new.go index 9c3e3966..90d68468 100644 --- a/cmd/dagger/cmd/new.go +++ b/cmd/dagger/cmd/new.go @@ -63,7 +63,7 @@ var newCmd = &cobra.Command{ Msg("deployment created") if viper.GetBool("up") { - common.DeploymentUp(ctx, st, false) + common.DeploymentUp(ctx, st) } }, } diff --git a/cmd/dagger/cmd/query.go b/cmd/dagger/cmd/query.go index 2e4a153d..7dc71e10 100644 --- a/cmd/dagger/cmd/query.go +++ b/cmd/dagger/cmd/query.go @@ -51,12 +51,36 @@ var queryCmd = &cobra.Command{ if err != nil { lg.Fatal().Err(err).Msg("unable to create client") } - output, err := c.Do(ctx, state, nil) + result, err := c.Do(ctx, state, nil) if err != nil { lg.Fatal().Err(err).Msg("failed to query deployment") } - cueVal := output.LookupPath(cuePath) + cueVal := compiler.EmptyStruct() + + if !viper.GetBool("no-plan") { + if err := cueVal.FillPath(cue.MakePath(), result.Plan()); err != nil { + lg.Fatal().Err(err).Msg("failed to merge plan") + } + } + + if !viper.GetBool("no-input") { + if err := cueVal.FillPath(cue.MakePath(), result.Input()); err != nil { + lg.Fatal().Err(err).Msg("failed to merge plan with output") + } + } + + if !viper.GetBool("no-computed") && state.Computed != "" { + computed, err := compiler.DecodeJSON("", []byte(state.Computed)) + if err != nil { + lg.Fatal().Err(err).Msg("failed to decode json") + } + if err := cueVal.FillPath(cue.MakePath(), computed); err != nil { + lg.Fatal().Err(err).Msg("failed to merge plan with computed") + } + } + + cueVal = cueVal.LookupPath(cuePath) if viper.GetBool("concrete") { if err := cueVal.IsConcreteR(); err != nil { @@ -116,9 +140,9 @@ func init() { // FIXME: implement the flags below // queryCmd.Flags().String("revision", "latest", "Query a specific version of the deployment") queryCmd.Flags().StringP("format", "f", "json", "Output format (json|yaml|cue|text|env)") - // queryCmd.Flags().BoolP("no-input", "I", false, "Exclude inputs from query") - // queryCmd.Flags().BoolP("no-output", "O", false, "Exclude outputs from query") - // queryCmd.Flags().BoolP("no-plan", "P", false, "Exclude outputs from query") + queryCmd.Flags().BoolP("no-plan", "P", false, "Exclude plan from query") + queryCmd.Flags().BoolP("no-input", "I", false, "Exclude inputs from query") + queryCmd.Flags().BoolP("no-computed", "C", false, "Exclude computed values from query") if err := viper.BindPFlags(queryCmd.Flags()); err != nil { panic(err) diff --git a/cmd/dagger/cmd/up.go b/cmd/dagger/cmd/up.go index 8d75cc9f..7d6175e8 100644 --- a/cmd/dagger/cmd/up.go +++ b/cmd/dagger/cmd/up.go @@ -31,7 +31,11 @@ var upCmd = &cobra.Command{ state := common.GetCurrentDeploymentState(ctx, store) // TODO: Implement options: --no-cache - common.DeploymentUp(ctx, state, true) + result := common.DeploymentUp(ctx, state) + state.Computed = result.Computed().JSON().String() + if err := store.UpdateDeployment(ctx, state, nil); err != nil { + lg.Fatal().Err(err).Msg("failed to update deployment") + } }, } diff --git a/dagger/client.go b/dagger/client.go index 68a44a1c..a073d9e5 100644 --- a/dagger/client.go +++ b/dagger/client.go @@ -1,16 +1,13 @@ package dagger import ( - "archive/tar" "context" - "errors" "fmt" "io" "os" "path/filepath" "strings" - "cuelang.org/go/cue" "golang.org/x/sync/errgroup" "github.com/opentracing/opentracing-go" @@ -21,7 +18,6 @@ import ( // buildkit bk "github.com/moby/buildkit/client" _ "github.com/moby/buildkit/client/connhelper/dockercontainer" // import the container connection driver - "github.com/moby/buildkit/client/llb" bkgw "github.com/moby/buildkit/frontend/gateway/client" // docker output @@ -64,7 +60,7 @@ func NewClient(ctx context.Context, host string) (*Client, error) { type ClientDoFunc func(context.Context, *Deployment, Solver) error // FIXME: return completed *Route, instead of *compiler.Value -func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc) (*compiler.Value, error) { +func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc) (*DeploymentResult, error) { lg := log.Ctx(ctx) eg, gctx := errgroup.WithContext(ctx) @@ -90,14 +86,15 @@ func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc }) // Spawn output retriever - var out *compiler.Value + var result *DeploymentResult eg.Go(func() error { defer outr.Close() - out, err = c.outputfn(gctx, outr) + + result, err = DeploymentResultFromTar(gctx, outr) return err }) - return out, eg.Wait() + return result, eg.Wait() } func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientDoFunc, ch chan *bk.SolveStatus, w io.WriteCloser) error { @@ -154,15 +151,12 @@ func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientD span, _ := opentracing.StartSpanFromContext(ctx, "Deployment.Export") defer span.Finish() - stateSource, err := deployment.State().Source() + result := deployment.Result() + st, err := result.ToLLB() if err != nil { - return nil, compiler.Err(err) + return nil, err } - st := llb.Scratch().File( - llb.Mkfile("state.cue", 0600, stateSource), - llb.WithCustomName("[internal] serializing state to CUE"), - ) ref, err := s.Solve(ctx, st) if err != nil { return nil, err @@ -185,45 +179,6 @@ func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientD return nil } -// Read tar export stream from buildkit Build(), and extract cue output -func (c *Client) outputfn(ctx context.Context, r io.Reader) (*compiler.Value, error) { - lg := log.Ctx(ctx) - - // FIXME: merge this into deployment output. - out := compiler.EmptyStruct() - - tr := tar.NewReader(r) - for { - h, err := tr.Next() - if errors.Is(err, io.EOF) { - break - } - if err != nil { - return nil, fmt.Errorf("read tar stream: %w", err) - } - - lg := lg. - With(). - Str("file", h.Name). - Logger() - - if !strings.HasSuffix(h.Name, ".cue") { - lg.Debug().Msg("skipping non-cue file from exporter tar stream") - continue - } - lg.Debug().Msg("outputfn: compiling & merging") - - v, err := compiler.Compile(h.Name, tr) - if err != nil { - return nil, err - } - if err := out.FillPath(cue.MakePath(), v); err != nil { - return nil, fmt.Errorf("%s: %w", h.Name, compiler.Err(err)) - } - } - return out, nil -} - func (c *Client) logSolveStatus(ctx context.Context, ch chan *bk.SolveStatus) error { parseName := func(v *bk.Vertex) (string, string) { // Pattern: `@name@ message`. Minimal length is len("@X@ ") diff --git a/dagger/deployment.go b/dagger/deployment.go index 5dfd5dbd..4a1a85cd 100644 --- a/dagger/deployment.go +++ b/dagger/deployment.go @@ -18,80 +18,15 @@ import ( "github.com/rs/zerolog/log" ) -// Contents of a deployment serialized to a file -type DeploymentState struct { - // Globally unique deployment ID - ID string `json:"id,omitempty"` - - // Human-friendly deployment name. - // A deployment may have more than one name. - // FIXME: store multiple names? - Name string `json:"name,omitempty"` - - // Cue module containing the deployment plan - // The input's top-level artifact is used as a module directory. - PlanSource Input `json:"plan,omitempty"` - - Inputs []inputKV `json:"inputs,omitempty"` -} - -type inputKV struct { - Key string `json:"key,omitempty"` - Value Input `json:"value,omitempty"` -} - -func (s *DeploymentState) SetInput(key string, value Input) error { - for i, inp := range s.Inputs { - if inp.Key != key { - continue - } - // Remove existing inputs with the same key - s.Inputs = append(s.Inputs[:i], s.Inputs[i+1:]...) - } - - s.Inputs = append(s.Inputs, inputKV{Key: key, Value: value}) - return nil -} - -// Remove all inputs at the given key, including sub-keys. -// For example RemoveInputs("foo.bar") will remove all inputs -// at foo.bar, foo.bar.baz, etc. -func (s *DeploymentState) RemoveInputs(key string) error { - newInputs := make([]inputKV, 0, len(s.Inputs)) - for _, i := range s.Inputs { - if i.Key == key { - continue - } - newInputs = append(newInputs, i) - } - s.Inputs = newInputs - - return nil -} - type Deployment struct { - st *DeploymentState - - // Layer 1: plan configuration - plan *compiler.Value - - // Layer 2: user inputs - input *compiler.Value - - // Layer 3: computed values - output *compiler.Value - - // All layers merged together: plan + input + output - state *compiler.Value + state *DeploymentState + result *DeploymentResult } func NewDeployment(st *DeploymentState) (*Deployment, error) { - empty := compiler.EmptyStruct() d := &Deployment{ - st: st, - plan: empty, - input: empty, - output: empty, + state: st, + result: NewDeploymentResult(), } // Prepare inputs @@ -101,47 +36,32 @@ func NewDeployment(st *DeploymentState) (*Deployment, error) { return nil, err } if input.Key == "" { - err = d.input.FillPath(cue.MakePath(), v) + err = d.result.input.FillPath(cue.MakePath(), v) } else { - err = d.input.FillPath(cue.ParsePath(input.Key), v) + err = d.result.input.FillPath(cue.ParsePath(input.Key), v) } if err != nil { return nil, err } } - if err := d.mergeState(); err != nil { - return nil, err - } return d, nil } func (d *Deployment) ID() string { - return d.st.ID + return d.state.ID } func (d *Deployment) Name() string { - return d.st.Name + return d.state.Name } func (d *Deployment) PlanSource() Input { - return d.st.PlanSource + return d.state.PlanSource } -func (d *Deployment) Plan() *compiler.Value { - return d.plan -} - -func (d *Deployment) Input() *compiler.Value { - return d.input -} - -func (d *Deployment) Output() *compiler.Value { - return d.output -} - -func (d *Deployment) State() *compiler.Value { - return d.state +func (d *Deployment) Result() *DeploymentResult { + return d.result } // LoadPlan loads the plan @@ -149,7 +69,7 @@ func (d *Deployment) LoadPlan(ctx context.Context, s Solver) error { span, ctx := opentracing.StartSpanFromContext(ctx, "deployment.LoadPlan") defer span.Finish() - planSource, err := d.st.PlanSource.Compile() + planSource, err := d.state.PlanSource.Compile() if err != nil { return err } @@ -169,10 +89,9 @@ func (d *Deployment) LoadPlan(ctx context.Context, s Solver) error { if err != nil { return fmt.Errorf("plan config: %w", err) } - d.plan = plan + d.result.plan = plan - // Commit - return d.mergeState() + return nil } // Scan all scripts in the deployment for references to local directories (do:"local"), @@ -203,15 +122,22 @@ func (d *Deployment) LocalDirs() map[string]string { } // 1. Scan the deployment state // FIXME: use a common `flow` instance to avoid rescanning the tree. - inst := d.state.CueInst() - flow := cueflow.New(&cueflow.Config{}, inst, newTaskFunc(inst, noOpRunner)) + src, err := d.result.Merge() + if err != nil { + panic(err) + } + flow := cueflow.New( + &cueflow.Config{}, + src.CueInst(), + newTaskFunc(src.CueInst(), noOpRunner), + ) for _, t := range flow.Tasks() { - v := compiler.Wrap(t.Value(), inst) + v := compiler.Wrap(t.Value(), src.CueInst()) localdirs(v.Lookup("#up")) } // 2. Scan the plan - plan, err := d.st.PlanSource.Compile() + plan, err := d.state.PlanSource.Compile() if err != nil { panic(err) } @@ -219,54 +145,21 @@ func (d *Deployment) LocalDirs() map[string]string { return dirs } -// FIXME: this is just a 3-way merge. Add var args to compiler.Value.Merge. -func (d *Deployment) mergeState() error { - // FIXME: make this cleaner in *compiler.Value by keeping intermediary instances - // FIXME: state.CueInst() must return an instance with the same - // contents as state.v, for the purposes of cueflow. - // That is not currently how *compiler.Value works, so we prepare the cue - // instance manually. - // --> refactor the compiler.Value API to do this for us. - var ( - state = compiler.EmptyStruct() - stateInst = state.CueInst() - err error - ) - - stateInst, err = stateInst.Fill(d.plan.Cue()) - if err != nil { - return fmt.Errorf("merge base & input: %w", err) - } - stateInst, err = stateInst.Fill(d.input.Cue()) - if err != nil { - return fmt.Errorf("merge base & input: %w", err) - } - stateInst, err = stateInst.Fill(d.output.Cue()) - if err != nil { - return fmt.Errorf("merge output with base & input: %w", err) - } - - state = compiler.Wrap(stateInst.Value(), stateInst) - - // commit - d.state = state - return nil -} - -type UpOpts struct{} - // Up missing values in deployment configuration, and write them to state. -func (d *Deployment) Up(ctx context.Context, s Solver, _ *UpOpts) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "r.Compute") +func (d *Deployment) Up(ctx context.Context, s Solver) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "deployment.Up") defer span.Finish() lg := log.Ctx(ctx) - // Cueflow cue instance - inst := d.state.CueInst() + // Reset the computed values + d.result.computed = compiler.EmptyStruct() - // Reset the output - d.output = compiler.EmptyStruct() + // Cueflow cue instance + src, err := d.result.Merge() + if err != nil { + return err + } // Cueflow config flowCfg := &cueflow.Config{ @@ -285,7 +178,7 @@ func (d *Deployment) Up(ctx context.Context, s Solver, _ *UpOpts) error { return nil } // Merge task value into output - err := d.output.FillPath(t.Path(), t.Value()) + err := d.result.computed.FillPath(t.Path(), t.Value()) if err != nil { lg. Error(). @@ -297,17 +190,16 @@ func (d *Deployment) Up(ctx context.Context, s Solver, _ *UpOpts) error { }, } // Orchestrate execution with cueflow - flow := cueflow.New(flowCfg, inst, newTaskFunc(inst, newPipelineRunner(inst, s))) + flow := cueflow.New( + flowCfg, + src.CueInst(), + newTaskFunc(src.CueInst(), newPipelineRunner(src.CueInst(), s)), + ) if err := flow.Run(ctx); err != nil { return err } - { - span, _ := opentracing.StartSpanFromContext(ctx, "merge state") - defer span.Finish() - - return d.mergeState() - } + return nil } type DownOpts struct{} diff --git a/dagger/result.go b/dagger/result.go new file mode 100644 index 00000000..905c3f52 --- /dev/null +++ b/dagger/result.go @@ -0,0 +1,161 @@ +package dagger + +import ( + "archive/tar" + "context" + "errors" + "fmt" + "io" + "strings" + + "dagger.io/go/dagger/compiler" + "github.com/moby/buildkit/client/llb" + "github.com/rs/zerolog/log" +) + +const ( + planFile = "plan.cue" + inputFile = "input.cue" + computedFile = "computed.cue" +) + +// DeploymentResult represents the layers of a deployment run +type DeploymentResult struct { + // Layer 1: plan configuration + plan *compiler.Value + + // Layer 2: user inputs + input *compiler.Value + + // Layer 3: computed values + computed *compiler.Value +} + +func NewDeploymentResult() *DeploymentResult { + return &DeploymentResult{ + plan: compiler.EmptyStruct(), + input: compiler.EmptyStruct(), + computed: compiler.EmptyStruct(), + } +} + +func (r *DeploymentResult) Plan() *compiler.Value { + return r.plan +} + +func (r *DeploymentResult) Input() *compiler.Value { + return r.input +} + +func (r *DeploymentResult) Computed() *compiler.Value { + return r.computed +} + +func (r *DeploymentResult) Merge() (*compiler.Value, error) { + // FIXME: v.CueInst() must return an instance with the same + // contents as v, for the purposes of cueflow. + // That is not currently how *compiler.Value works, so we prepare the cue + // instance manually. + // --> refactor the compiler.Value API to do this for us. + var ( + v = compiler.EmptyStruct() + inst = v.CueInst() + err error + ) + + inst, err = inst.Fill(r.plan.Cue()) + if err != nil { + return nil, fmt.Errorf("merge plan: %w", err) + } + inst, err = inst.Fill(r.input.Cue()) + if err != nil { + return nil, fmt.Errorf("merge input: %w", err) + } + inst, err = inst.Fill(r.computed.Cue()) + if err != nil { + return nil, fmt.Errorf("merge computed: %w", err) + } + + v = compiler.Wrap(inst.Value(), inst) + return v, nil +} + +func (r *DeploymentResult) ToLLB() (llb.State, error) { + st := llb.Scratch() + + planSource, err := r.plan.Source() + if err != nil { + return st, compiler.Err(err) + } + + inputSource, err := r.input.Source() + if err != nil { + return st, compiler.Err(err) + } + + outputSource, err := r.computed.Source() + if err != nil { + return st, compiler.Err(err) + } + + st = st. + File( + llb.Mkfile(planFile, 0600, planSource), + llb.WithCustomName("[internal] serializing plan"), + ). + File( + llb.Mkfile(inputFile, 0600, inputSource), + llb.WithCustomName("[internal] serializing input"), + ). + File( + llb.Mkfile(computedFile, 0600, outputSource), + llb.WithCustomName("[internal] serializing output"), + ) + + return st, nil +} + +func DeploymentResultFromTar(ctx context.Context, r io.Reader) (*DeploymentResult, error) { + lg := log.Ctx(ctx) + result := NewDeploymentResult() + tr := tar.NewReader(r) + + for { + h, err := tr.Next() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, fmt.Errorf("read tar stream: %w", err) + } + + lg := lg. + With(). + Str("file", h.Name). + Logger() + + if !strings.HasSuffix(h.Name, ".cue") { + lg.Debug().Msg("skipping non-cue file from exporter tar stream") + continue + } + + lg.Debug().Msg("outputfn: compiling") + + v, err := compiler.Compile(h.Name, tr) + if err != nil { + return nil, err + } + + switch h.Name { + case planFile: + result.plan = v + case inputFile: + result.input = v + case computedFile: + result.computed = v + default: + lg.Warn().Msg("unexpected file") + } + } + return result, nil +} diff --git a/dagger/state.go b/dagger/state.go new file mode 100644 index 00000000..debe95a9 --- /dev/null +++ b/dagger/state.go @@ -0,0 +1,56 @@ +package dagger + +// Contents of a deployment serialized to a file +type DeploymentState struct { + // Globally unique deployment ID + ID string `json:"id,omitempty"` + + // Human-friendly deployment name. + // A deployment may have more than one name. + // FIXME: store multiple names? + Name string `json:"name,omitempty"` + + // Cue module containing the deployment plan + // The input's top-level artifact is used as a module directory. + PlanSource Input `json:"plan,omitempty"` + + // User Inputs + Inputs []inputKV `json:"inputs,omitempty"` + + // Computed values + Computed string `json:"output,omitempty"` +} + +type inputKV struct { + Key string `json:"key,omitempty"` + Value Input `json:"value,omitempty"` +} + +func (s *DeploymentState) SetInput(key string, value Input) error { + for i, inp := range s.Inputs { + if inp.Key != key { + continue + } + // Remove existing inputs with the same key + s.Inputs = append(s.Inputs[:i], s.Inputs[i+1:]...) + } + + s.Inputs = append(s.Inputs, inputKV{Key: key, Value: value}) + return nil +} + +// Remove all inputs at the given key, including sub-keys. +// For example RemoveInputs("foo.bar") will remove all inputs +// at foo.bar, foo.bar.baz, etc. +func (s *DeploymentState) RemoveInputs(key string) error { + newInputs := make([]inputKV, 0, len(s.Inputs)) + for _, i := range s.Inputs { + if i.Key == key { + continue + } + newInputs = append(newInputs, i) + } + s.Inputs = newInputs + + return nil +}