up: store outputs and merge in query

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2021-04-06 17:43:12 -07:00
parent e82ef9dfce
commit 5381d0bfe1
9 changed files with 314 additions and 218 deletions

View File

@ -2,7 +2,6 @@ package common
import (
"context"
"fmt"
"os"
"dagger.io/go/dagger"
@ -61,22 +60,19 @@ func GetCurrentDeploymentState(ctx context.Context, store *dagger.Store) *dagger
}
// Re-compute a deployment (equivalent to `dagger up`).
// If printOutput is true, print the JSON-encoded computed state to standard output
func DeploymentUp(ctx context.Context, state *dagger.DeploymentState, printOutput bool) {
func DeploymentUp(ctx context.Context, state *dagger.DeploymentState) *dagger.DeploymentResult {
lg := log.Ctx(ctx)
c, err := dagger.NewClient(ctx, "")
if err != nil {
lg.Fatal().Err(err).Msg("unable to create client")
}
output, err := c.Do(ctx, state, func(ctx context.Context, deployment *dagger.Deployment, s dagger.Solver) error {
result, err := c.Do(ctx, state, func(ctx context.Context, deployment *dagger.Deployment, s dagger.Solver) error {
log.Ctx(ctx).Debug().Msg("bringing deployment up")
return deployment.Up(ctx, s, nil)
return deployment.Up(ctx, s)
})
if err != nil {
lg.Fatal().Err(err).Msg("failed to up deployment")
}
if printOutput {
fmt.Println(output.JSON())
}
return result
}

View File

@ -3,6 +3,7 @@ package cmd
import (
"encoding/json"
"errors"
"fmt"
"os"
"strings"
@ -127,7 +128,14 @@ var computeCmd = &cobra.Command{
}
}
common.DeploymentUp(ctx, st, true)
result := common.DeploymentUp(ctx, st)
cueVal, err := result.Merge()
if err != nil {
lg.Fatal().Err(err).Msg("failed to merge result")
}
fmt.Println(cueVal.JSON())
},
}

View File

@ -63,7 +63,7 @@ var newCmd = &cobra.Command{
Msg("deployment created")
if viper.GetBool("up") {
common.DeploymentUp(ctx, st, false)
common.DeploymentUp(ctx, st)
}
},
}

View File

@ -51,12 +51,36 @@ var queryCmd = &cobra.Command{
if err != nil {
lg.Fatal().Err(err).Msg("unable to create client")
}
output, err := c.Do(ctx, state, nil)
result, err := c.Do(ctx, state, nil)
if err != nil {
lg.Fatal().Err(err).Msg("failed to query deployment")
}
cueVal := output.LookupPath(cuePath)
cueVal := compiler.EmptyStruct()
if !viper.GetBool("no-plan") {
if err := cueVal.FillPath(cue.MakePath(), result.Plan()); err != nil {
lg.Fatal().Err(err).Msg("failed to merge plan")
}
}
if !viper.GetBool("no-input") {
if err := cueVal.FillPath(cue.MakePath(), result.Input()); err != nil {
lg.Fatal().Err(err).Msg("failed to merge plan with output")
}
}
if !viper.GetBool("no-computed") && state.Computed != "" {
computed, err := compiler.DecodeJSON("", []byte(state.Computed))
if err != nil {
lg.Fatal().Err(err).Msg("failed to decode json")
}
if err := cueVal.FillPath(cue.MakePath(), computed); err != nil {
lg.Fatal().Err(err).Msg("failed to merge plan with computed")
}
}
cueVal = cueVal.LookupPath(cuePath)
if viper.GetBool("concrete") {
if err := cueVal.IsConcreteR(); err != nil {
@ -116,9 +140,9 @@ func init() {
// FIXME: implement the flags below
// queryCmd.Flags().String("revision", "latest", "Query a specific version of the deployment")
queryCmd.Flags().StringP("format", "f", "json", "Output format (json|yaml|cue|text|env)")
// queryCmd.Flags().BoolP("no-input", "I", false, "Exclude inputs from query")
// queryCmd.Flags().BoolP("no-output", "O", false, "Exclude outputs from query")
// queryCmd.Flags().BoolP("no-plan", "P", false, "Exclude outputs from query")
queryCmd.Flags().BoolP("no-plan", "P", false, "Exclude plan from query")
queryCmd.Flags().BoolP("no-input", "I", false, "Exclude inputs from query")
queryCmd.Flags().BoolP("no-computed", "C", false, "Exclude computed values from query")
if err := viper.BindPFlags(queryCmd.Flags()); err != nil {
panic(err)

View File

@ -31,7 +31,11 @@ var upCmd = &cobra.Command{
state := common.GetCurrentDeploymentState(ctx, store)
// TODO: Implement options: --no-cache
common.DeploymentUp(ctx, state, true)
result := common.DeploymentUp(ctx, state)
state.Computed = result.Computed().JSON().String()
if err := store.UpdateDeployment(ctx, state, nil); err != nil {
lg.Fatal().Err(err).Msg("failed to update deployment")
}
},
}

View File

@ -1,16 +1,13 @@
package dagger
import (
"archive/tar"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"cuelang.org/go/cue"
"golang.org/x/sync/errgroup"
"github.com/opentracing/opentracing-go"
@ -21,7 +18,6 @@ import (
// buildkit
bk "github.com/moby/buildkit/client"
_ "github.com/moby/buildkit/client/connhelper/dockercontainer" // import the container connection driver
"github.com/moby/buildkit/client/llb"
bkgw "github.com/moby/buildkit/frontend/gateway/client"
// docker output
@ -64,7 +60,7 @@ func NewClient(ctx context.Context, host string) (*Client, error) {
type ClientDoFunc func(context.Context, *Deployment, Solver) error
// FIXME: return completed *Route, instead of *compiler.Value
func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc) (*compiler.Value, error) {
func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc) (*DeploymentResult, error) {
lg := log.Ctx(ctx)
eg, gctx := errgroup.WithContext(ctx)
@ -90,14 +86,15 @@ func (c *Client) Do(ctx context.Context, state *DeploymentState, fn ClientDoFunc
})
// Spawn output retriever
var out *compiler.Value
var result *DeploymentResult
eg.Go(func() error {
defer outr.Close()
out, err = c.outputfn(gctx, outr)
result, err = DeploymentResultFromTar(gctx, outr)
return err
})
return out, eg.Wait()
return result, eg.Wait()
}
func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientDoFunc, ch chan *bk.SolveStatus, w io.WriteCloser) error {
@ -154,15 +151,12 @@ func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientD
span, _ := opentracing.StartSpanFromContext(ctx, "Deployment.Export")
defer span.Finish()
stateSource, err := deployment.State().Source()
result := deployment.Result()
st, err := result.ToLLB()
if err != nil {
return nil, compiler.Err(err)
return nil, err
}
st := llb.Scratch().File(
llb.Mkfile("state.cue", 0600, stateSource),
llb.WithCustomName("[internal] serializing state to CUE"),
)
ref, err := s.Solve(ctx, st)
if err != nil {
return nil, err
@ -185,45 +179,6 @@ func (c *Client) buildfn(ctx context.Context, deployment *Deployment, fn ClientD
return nil
}
// Read tar export stream from buildkit Build(), and extract cue output
func (c *Client) outputfn(ctx context.Context, r io.Reader) (*compiler.Value, error) {
lg := log.Ctx(ctx)
// FIXME: merge this into deployment output.
out := compiler.EmptyStruct()
tr := tar.NewReader(r)
for {
h, err := tr.Next()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, fmt.Errorf("read tar stream: %w", err)
}
lg := lg.
With().
Str("file", h.Name).
Logger()
if !strings.HasSuffix(h.Name, ".cue") {
lg.Debug().Msg("skipping non-cue file from exporter tar stream")
continue
}
lg.Debug().Msg("outputfn: compiling & merging")
v, err := compiler.Compile(h.Name, tr)
if err != nil {
return nil, err
}
if err := out.FillPath(cue.MakePath(), v); err != nil {
return nil, fmt.Errorf("%s: %w", h.Name, compiler.Err(err))
}
}
return out, nil
}
func (c *Client) logSolveStatus(ctx context.Context, ch chan *bk.SolveStatus) error {
parseName := func(v *bk.Vertex) (string, string) {
// Pattern: `@name@ message`. Minimal length is len("@X@ ")

View File

@ -18,80 +18,15 @@ import (
"github.com/rs/zerolog/log"
)
// Contents of a deployment serialized to a file
type DeploymentState struct {
// Globally unique deployment ID
ID string `json:"id,omitempty"`
// Human-friendly deployment name.
// A deployment may have more than one name.
// FIXME: store multiple names?
Name string `json:"name,omitempty"`
// Cue module containing the deployment plan
// The input's top-level artifact is used as a module directory.
PlanSource Input `json:"plan,omitempty"`
Inputs []inputKV `json:"inputs,omitempty"`
}
type inputKV struct {
Key string `json:"key,omitempty"`
Value Input `json:"value,omitempty"`
}
func (s *DeploymentState) SetInput(key string, value Input) error {
for i, inp := range s.Inputs {
if inp.Key != key {
continue
}
// Remove existing inputs with the same key
s.Inputs = append(s.Inputs[:i], s.Inputs[i+1:]...)
}
s.Inputs = append(s.Inputs, inputKV{Key: key, Value: value})
return nil
}
// Remove all inputs at the given key, including sub-keys.
// For example RemoveInputs("foo.bar") will remove all inputs
// at foo.bar, foo.bar.baz, etc.
func (s *DeploymentState) RemoveInputs(key string) error {
newInputs := make([]inputKV, 0, len(s.Inputs))
for _, i := range s.Inputs {
if i.Key == key {
continue
}
newInputs = append(newInputs, i)
}
s.Inputs = newInputs
return nil
}
type Deployment struct {
st *DeploymentState
// Layer 1: plan configuration
plan *compiler.Value
// Layer 2: user inputs
input *compiler.Value
// Layer 3: computed values
output *compiler.Value
// All layers merged together: plan + input + output
state *compiler.Value
state *DeploymentState
result *DeploymentResult
}
func NewDeployment(st *DeploymentState) (*Deployment, error) {
empty := compiler.EmptyStruct()
d := &Deployment{
st: st,
plan: empty,
input: empty,
output: empty,
state: st,
result: NewDeploymentResult(),
}
// Prepare inputs
@ -101,47 +36,32 @@ func NewDeployment(st *DeploymentState) (*Deployment, error) {
return nil, err
}
if input.Key == "" {
err = d.input.FillPath(cue.MakePath(), v)
err = d.result.input.FillPath(cue.MakePath(), v)
} else {
err = d.input.FillPath(cue.ParsePath(input.Key), v)
err = d.result.input.FillPath(cue.ParsePath(input.Key), v)
}
if err != nil {
return nil, err
}
}
if err := d.mergeState(); err != nil {
return nil, err
}
return d, nil
}
func (d *Deployment) ID() string {
return d.st.ID
return d.state.ID
}
func (d *Deployment) Name() string {
return d.st.Name
return d.state.Name
}
func (d *Deployment) PlanSource() Input {
return d.st.PlanSource
return d.state.PlanSource
}
func (d *Deployment) Plan() *compiler.Value {
return d.plan
}
func (d *Deployment) Input() *compiler.Value {
return d.input
}
func (d *Deployment) Output() *compiler.Value {
return d.output
}
func (d *Deployment) State() *compiler.Value {
return d.state
func (d *Deployment) Result() *DeploymentResult {
return d.result
}
// LoadPlan loads the plan
@ -149,7 +69,7 @@ func (d *Deployment) LoadPlan(ctx context.Context, s Solver) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "deployment.LoadPlan")
defer span.Finish()
planSource, err := d.st.PlanSource.Compile()
planSource, err := d.state.PlanSource.Compile()
if err != nil {
return err
}
@ -169,10 +89,9 @@ func (d *Deployment) LoadPlan(ctx context.Context, s Solver) error {
if err != nil {
return fmt.Errorf("plan config: %w", err)
}
d.plan = plan
d.result.plan = plan
// Commit
return d.mergeState()
return nil
}
// Scan all scripts in the deployment for references to local directories (do:"local"),
@ -203,15 +122,22 @@ func (d *Deployment) LocalDirs() map[string]string {
}
// 1. Scan the deployment state
// FIXME: use a common `flow` instance to avoid rescanning the tree.
inst := d.state.CueInst()
flow := cueflow.New(&cueflow.Config{}, inst, newTaskFunc(inst, noOpRunner))
src, err := d.result.Merge()
if err != nil {
panic(err)
}
flow := cueflow.New(
&cueflow.Config{},
src.CueInst(),
newTaskFunc(src.CueInst(), noOpRunner),
)
for _, t := range flow.Tasks() {
v := compiler.Wrap(t.Value(), inst)
v := compiler.Wrap(t.Value(), src.CueInst())
localdirs(v.Lookup("#up"))
}
// 2. Scan the plan
plan, err := d.st.PlanSource.Compile()
plan, err := d.state.PlanSource.Compile()
if err != nil {
panic(err)
}
@ -219,54 +145,21 @@ func (d *Deployment) LocalDirs() map[string]string {
return dirs
}
// FIXME: this is just a 3-way merge. Add var args to compiler.Value.Merge.
func (d *Deployment) mergeState() error {
// FIXME: make this cleaner in *compiler.Value by keeping intermediary instances
// FIXME: state.CueInst() must return an instance with the same
// contents as state.v, for the purposes of cueflow.
// That is not currently how *compiler.Value works, so we prepare the cue
// instance manually.
// --> refactor the compiler.Value API to do this for us.
var (
state = compiler.EmptyStruct()
stateInst = state.CueInst()
err error
)
stateInst, err = stateInst.Fill(d.plan.Cue())
if err != nil {
return fmt.Errorf("merge base & input: %w", err)
}
stateInst, err = stateInst.Fill(d.input.Cue())
if err != nil {
return fmt.Errorf("merge base & input: %w", err)
}
stateInst, err = stateInst.Fill(d.output.Cue())
if err != nil {
return fmt.Errorf("merge output with base & input: %w", err)
}
state = compiler.Wrap(stateInst.Value(), stateInst)
// commit
d.state = state
return nil
}
type UpOpts struct{}
// Up missing values in deployment configuration, and write them to state.
func (d *Deployment) Up(ctx context.Context, s Solver, _ *UpOpts) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "r.Compute")
func (d *Deployment) Up(ctx context.Context, s Solver) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "deployment.Up")
defer span.Finish()
lg := log.Ctx(ctx)
// Cueflow cue instance
inst := d.state.CueInst()
// Reset the computed values
d.result.computed = compiler.EmptyStruct()
// Reset the output
d.output = compiler.EmptyStruct()
// Cueflow cue instance
src, err := d.result.Merge()
if err != nil {
return err
}
// Cueflow config
flowCfg := &cueflow.Config{
@ -285,7 +178,7 @@ func (d *Deployment) Up(ctx context.Context, s Solver, _ *UpOpts) error {
return nil
}
// Merge task value into output
err := d.output.FillPath(t.Path(), t.Value())
err := d.result.computed.FillPath(t.Path(), t.Value())
if err != nil {
lg.
Error().
@ -297,17 +190,16 @@ func (d *Deployment) Up(ctx context.Context, s Solver, _ *UpOpts) error {
},
}
// Orchestrate execution with cueflow
flow := cueflow.New(flowCfg, inst, newTaskFunc(inst, newPipelineRunner(inst, s)))
flow := cueflow.New(
flowCfg,
src.CueInst(),
newTaskFunc(src.CueInst(), newPipelineRunner(src.CueInst(), s)),
)
if err := flow.Run(ctx); err != nil {
return err
}
{
span, _ := opentracing.StartSpanFromContext(ctx, "merge state")
defer span.Finish()
return d.mergeState()
}
return nil
}
type DownOpts struct{}

161
dagger/result.go Normal file
View File

@ -0,0 +1,161 @@
package dagger
import (
"archive/tar"
"context"
"errors"
"fmt"
"io"
"strings"
"dagger.io/go/dagger/compiler"
"github.com/moby/buildkit/client/llb"
"github.com/rs/zerolog/log"
)
const (
planFile = "plan.cue"
inputFile = "input.cue"
computedFile = "computed.cue"
)
// DeploymentResult represents the layers of a deployment run
type DeploymentResult struct {
// Layer 1: plan configuration
plan *compiler.Value
// Layer 2: user inputs
input *compiler.Value
// Layer 3: computed values
computed *compiler.Value
}
func NewDeploymentResult() *DeploymentResult {
return &DeploymentResult{
plan: compiler.EmptyStruct(),
input: compiler.EmptyStruct(),
computed: compiler.EmptyStruct(),
}
}
func (r *DeploymentResult) Plan() *compiler.Value {
return r.plan
}
func (r *DeploymentResult) Input() *compiler.Value {
return r.input
}
func (r *DeploymentResult) Computed() *compiler.Value {
return r.computed
}
func (r *DeploymentResult) Merge() (*compiler.Value, error) {
// FIXME: v.CueInst() must return an instance with the same
// contents as v, for the purposes of cueflow.
// That is not currently how *compiler.Value works, so we prepare the cue
// instance manually.
// --> refactor the compiler.Value API to do this for us.
var (
v = compiler.EmptyStruct()
inst = v.CueInst()
err error
)
inst, err = inst.Fill(r.plan.Cue())
if err != nil {
return nil, fmt.Errorf("merge plan: %w", err)
}
inst, err = inst.Fill(r.input.Cue())
if err != nil {
return nil, fmt.Errorf("merge input: %w", err)
}
inst, err = inst.Fill(r.computed.Cue())
if err != nil {
return nil, fmt.Errorf("merge computed: %w", err)
}
v = compiler.Wrap(inst.Value(), inst)
return v, nil
}
func (r *DeploymentResult) ToLLB() (llb.State, error) {
st := llb.Scratch()
planSource, err := r.plan.Source()
if err != nil {
return st, compiler.Err(err)
}
inputSource, err := r.input.Source()
if err != nil {
return st, compiler.Err(err)
}
outputSource, err := r.computed.Source()
if err != nil {
return st, compiler.Err(err)
}
st = st.
File(
llb.Mkfile(planFile, 0600, planSource),
llb.WithCustomName("[internal] serializing plan"),
).
File(
llb.Mkfile(inputFile, 0600, inputSource),
llb.WithCustomName("[internal] serializing input"),
).
File(
llb.Mkfile(computedFile, 0600, outputSource),
llb.WithCustomName("[internal] serializing output"),
)
return st, nil
}
func DeploymentResultFromTar(ctx context.Context, r io.Reader) (*DeploymentResult, error) {
lg := log.Ctx(ctx)
result := NewDeploymentResult()
tr := tar.NewReader(r)
for {
h, err := tr.Next()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, fmt.Errorf("read tar stream: %w", err)
}
lg := lg.
With().
Str("file", h.Name).
Logger()
if !strings.HasSuffix(h.Name, ".cue") {
lg.Debug().Msg("skipping non-cue file from exporter tar stream")
continue
}
lg.Debug().Msg("outputfn: compiling")
v, err := compiler.Compile(h.Name, tr)
if err != nil {
return nil, err
}
switch h.Name {
case planFile:
result.plan = v
case inputFile:
result.input = v
case computedFile:
result.computed = v
default:
lg.Warn().Msg("unexpected file")
}
}
return result, nil
}

56
dagger/state.go Normal file
View File

@ -0,0 +1,56 @@
package dagger
// Contents of a deployment serialized to a file
type DeploymentState struct {
// Globally unique deployment ID
ID string `json:"id,omitempty"`
// Human-friendly deployment name.
// A deployment may have more than one name.
// FIXME: store multiple names?
Name string `json:"name,omitempty"`
// Cue module containing the deployment plan
// The input's top-level artifact is used as a module directory.
PlanSource Input `json:"plan,omitempty"`
// User Inputs
Inputs []inputKV `json:"inputs,omitempty"`
// Computed values
Computed string `json:"output,omitempty"`
}
type inputKV struct {
Key string `json:"key,omitempty"`
Value Input `json:"value,omitempty"`
}
func (s *DeploymentState) SetInput(key string, value Input) error {
for i, inp := range s.Inputs {
if inp.Key != key {
continue
}
// Remove existing inputs with the same key
s.Inputs = append(s.Inputs[:i], s.Inputs[i+1:]...)
}
s.Inputs = append(s.Inputs, inputKV{Key: key, Value: value})
return nil
}
// Remove all inputs at the given key, including sub-keys.
// For example RemoveInputs("foo.bar") will remove all inputs
// at foo.bar, foo.bar.baz, etc.
func (s *DeploymentState) RemoveInputs(key string) error {
newInputs := make([]inputKV, 0, len(s.Inputs))
for _, i := range s.Inputs {
if i.Key == key {
continue
}
newInputs = append(newInputs, i)
}
s.Inputs = newInputs
return nil
}