2021-03-23 08:22:50 +01:00
|
|
|
package dagger
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2021-03-25 01:55:21 +01:00
|
|
|
"fmt"
|
|
|
|
"io/fs"
|
|
|
|
"strings"
|
|
|
|
"time"
|
2021-03-23 08:22:50 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
"cuelang.org/go/cue"
|
|
|
|
cueflow "cuelang.org/go/tools/flow"
|
2021-03-23 08:22:50 +01:00
|
|
|
"dagger.io/go/dagger/compiler"
|
2021-03-25 01:55:21 +01:00
|
|
|
"dagger.io/go/stdlib"
|
2021-03-24 23:03:05 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
"github.com/opentracing/opentracing-go"
|
|
|
|
"github.com/opentracing/opentracing-go/ext"
|
|
|
|
otlog "github.com/opentracing/opentracing-go/log"
|
|
|
|
"github.com/rs/zerolog/log"
|
2021-03-24 23:03:05 +01:00
|
|
|
)
|
|
|
|
|
2021-03-25 02:07:52 +01:00
|
|
|
// Contents of a route serialized to a file
|
|
|
|
type RouteState struct {
|
|
|
|
// Globally unique route ID
|
|
|
|
ID string `json:"id,omitempty"`
|
|
|
|
|
|
|
|
// Human-friendly route name.
|
|
|
|
// A route may have more than one name.
|
|
|
|
// FIXME: store multiple names?
|
|
|
|
Name string `json:"name,omitempty"`
|
|
|
|
|
|
|
|
// Cue module containing the route layout
|
|
|
|
// The input's top-level artifact is used as a module directory.
|
|
|
|
LayoutSource Input `json:"layout,omitempty"`
|
|
|
|
|
|
|
|
Inputs []inputKV `json:"inputs,omitempty"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type inputKV struct {
|
|
|
|
Key string `json:"key,omitempty"`
|
|
|
|
Value Input `json:"value,omitempty"`
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *RouteState) AddInput(key string, value Input) error {
|
|
|
|
r.Inputs = append(r.Inputs, inputKV{Key: key, Value: value})
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Remove all inputs at the given key, including sub-keys.
|
|
|
|
// For example RemoveInputs("foo.bar") will remove all inputs
|
|
|
|
// at foo.bar, foo.bar.baz, etc.
|
|
|
|
func (r *RouteState) RemoveInputs(key string) error {
|
2021-03-26 03:08:52 +01:00
|
|
|
newInputs := make([]inputKV, 0, len(r.Inputs))
|
|
|
|
for _, i := range r.Inputs {
|
|
|
|
if i.Key == key {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
newInputs = append(newInputs, i)
|
|
|
|
}
|
|
|
|
r.Inputs = newInputs
|
|
|
|
|
|
|
|
return nil
|
2021-03-25 02:07:52 +01:00
|
|
|
}
|
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
type Route struct {
|
|
|
|
st *RouteState
|
2021-03-23 08:22:50 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
// Env boot script, eg. `[{do:"local",dir:"."}]`
|
|
|
|
// FIXME: rename to 'update' (script to update the env config)
|
|
|
|
// FIXME: embed update script in base as '#update' ?
|
|
|
|
// FIXME: simplify Env by making it single layer? Each layer is one r.
|
2021-03-24 23:03:05 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
// Layer 1: layout configuration
|
|
|
|
layout *compiler.Value
|
|
|
|
|
|
|
|
// Layer 2: user inputs
|
|
|
|
input *compiler.Value
|
|
|
|
|
|
|
|
// Layer 3: computed values
|
|
|
|
output *compiler.Value
|
|
|
|
|
|
|
|
// All layers merged together: layout + input + output
|
|
|
|
state *compiler.Value
|
2021-03-24 18:37:50 +01:00
|
|
|
}
|
|
|
|
|
2021-03-24 23:03:05 +01:00
|
|
|
func (r *Route) ID() string {
|
2021-03-24 18:37:50 +01:00
|
|
|
return r.st.ID
|
|
|
|
}
|
|
|
|
|
2021-03-24 23:03:05 +01:00
|
|
|
func (r *Route) Name() string {
|
2021-03-24 18:37:50 +01:00
|
|
|
return r.st.Name
|
|
|
|
}
|
|
|
|
|
2021-03-24 23:03:05 +01:00
|
|
|
func (r *Route) LayoutSource() Input {
|
2021-03-24 18:37:50 +01:00
|
|
|
return r.st.LayoutSource
|
|
|
|
}
|
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
func NewRoute(st *RouteState) (*Route, error) {
|
|
|
|
empty := compiler.EmptyStruct()
|
|
|
|
r := &Route{
|
|
|
|
st: st,
|
|
|
|
layout: empty,
|
|
|
|
input: empty,
|
|
|
|
output: empty,
|
|
|
|
}
|
2021-03-24 18:37:50 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
// Prepare inputs
|
|
|
|
for _, input := range st.Inputs {
|
|
|
|
v, err := input.Value.Compile()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if input.Key == "" {
|
|
|
|
r.input, err = r.input.Merge(v)
|
|
|
|
} else {
|
|
|
|
r.input, err = r.input.MergeTarget(v, input.Key)
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if err := r.mergeState(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return r, nil
|
2021-03-24 18:37:50 +01:00
|
|
|
}
|
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
func (r *Route) State() *compiler.Value {
|
|
|
|
return r.state
|
2021-03-24 18:37:50 +01:00
|
|
|
}
|
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
// Update the base configuration
|
|
|
|
func (r *Route) Update(ctx context.Context, s Solver) error {
|
|
|
|
span, ctx := opentracing.StartSpanFromContext(ctx, "r.Update")
|
|
|
|
defer span.Finish()
|
2021-03-23 23:41:26 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
layout, err := r.st.LayoutSource.Compile()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-03-24 18:37:50 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
p := NewPipeline("[internal] source", s, nil)
|
|
|
|
// execute updater script
|
|
|
|
if err := p.Do(ctx, layout); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-03-24 18:37:50 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
// Build a Cue config by overlaying the source with the stdlib
|
|
|
|
sources := map[string]fs.FS{
|
|
|
|
stdlib.Path: stdlib.FS,
|
|
|
|
"/": p.FS(),
|
|
|
|
}
|
|
|
|
base, err := compiler.Build(sources)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("base config: %w", err)
|
|
|
|
}
|
|
|
|
r.layout = base
|
2021-03-24 18:37:50 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
// Commit
|
|
|
|
return r.mergeState()
|
2021-03-23 23:41:26 +01:00
|
|
|
}
|
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
func (r *Route) Base() *compiler.Value {
|
|
|
|
return r.layout
|
2021-03-23 08:22:50 +01:00
|
|
|
}
|
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
func (r *Route) Output() *compiler.Value {
|
|
|
|
return r.output
|
2021-03-23 08:22:50 +01:00
|
|
|
}
|
2021-03-23 23:27:16 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
// Scan all scripts in the environment for references to local directories (do:"local"),
|
|
|
|
// and return all referenced directory names.
|
|
|
|
// This is used by clients to grant access to local directories when they are referenced
|
|
|
|
// by user-specified scripts.
|
|
|
|
func (r *Route) LocalDirs() map[string]string {
|
|
|
|
dirs := map[string]string{}
|
|
|
|
localdirs := func(code ...*compiler.Value) {
|
|
|
|
Analyze(
|
|
|
|
func(op *compiler.Value) error {
|
|
|
|
do, err := op.Get("do").String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
// nolint:goconst
|
|
|
|
// FIXME: merge Env into Route, or fix the linter error
|
|
|
|
if do != "local" {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
dir, err := op.Get("dir").String()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
dirs[dir] = dir
|
|
|
|
return nil
|
|
|
|
},
|
|
|
|
code...,
|
|
|
|
)
|
|
|
|
}
|
|
|
|
// 1. Scan the environment state
|
|
|
|
// FIXME: use a common `flow` instance to avoid rescanning the tree.
|
|
|
|
inst := r.state.CueInst()
|
|
|
|
flow := cueflow.New(&cueflow.Config{}, inst, newTaskFunc(inst, noOpRunner))
|
|
|
|
for _, t := range flow.Tasks() {
|
|
|
|
v := compiler.Wrap(t.Value(), inst)
|
|
|
|
localdirs(v.Get("#compute"))
|
|
|
|
}
|
2021-03-23 08:22:50 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
// 2. Scan the layout
|
2021-03-26 22:11:54 +01:00
|
|
|
layout, err := r.st.LayoutSource.Compile()
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
2021-03-25 01:55:21 +01:00
|
|
|
}
|
2021-03-26 22:11:54 +01:00
|
|
|
localdirs(layout)
|
2021-03-25 01:55:21 +01:00
|
|
|
return dirs
|
2021-03-23 08:22:50 +01:00
|
|
|
}
|
2021-03-23 23:27:16 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
// FIXME: this is just a 3-way merge. Add var args to compiler.Value.Merge.
|
|
|
|
func (r *Route) mergeState() error {
|
|
|
|
// FIXME: make this cleaner in *compiler.Value by keeping intermediary instances
|
|
|
|
// FIXME: state.CueInst() must return an instance with the same
|
|
|
|
// contents as state.v, for the purposes of cueflow.
|
|
|
|
// That is not currently how *compiler.Value works, so we prepare the cue
|
|
|
|
// instance manually.
|
|
|
|
// --> refactor the compiler.Value API to do this for us.
|
|
|
|
var (
|
|
|
|
state = compiler.EmptyStruct()
|
|
|
|
stateInst = state.CueInst()
|
|
|
|
err error
|
|
|
|
)
|
2021-03-23 08:22:50 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
stateInst, err = stateInst.Fill(r.layout.Cue())
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("merge base & input: %w", err)
|
|
|
|
}
|
|
|
|
stateInst, err = stateInst.Fill(r.input.Cue())
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("merge base & input: %w", err)
|
|
|
|
}
|
|
|
|
stateInst, err = stateInst.Fill(r.output.Cue())
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("merge output with base & input: %w", err)
|
|
|
|
}
|
2021-03-23 08:22:50 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
state = compiler.Wrap(stateInst.Value(), stateInst)
|
2021-03-23 08:22:50 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
// commit
|
|
|
|
r.state = state
|
|
|
|
return nil
|
2021-03-23 08:22:50 +01:00
|
|
|
}
|
|
|
|
|
2021-03-23 23:41:26 +01:00
|
|
|
type UpOpts struct{}
|
2021-03-23 08:22:50 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
// Up missing values in env configuration, and write them to state.
|
|
|
|
func (r *Route) Up(ctx context.Context, s Solver, _ *UpOpts) error {
|
|
|
|
span, ctx := opentracing.StartSpanFromContext(ctx, "r.Compute")
|
|
|
|
defer span.Finish()
|
|
|
|
|
|
|
|
lg := log.Ctx(ctx)
|
|
|
|
|
|
|
|
// Cueflow cue instance
|
|
|
|
inst := r.state.CueInst()
|
|
|
|
|
|
|
|
// Reset the output
|
|
|
|
r.output = compiler.EmptyStruct()
|
|
|
|
|
|
|
|
// Cueflow config
|
|
|
|
flowCfg := &cueflow.Config{
|
|
|
|
UpdateFunc: func(c *cueflow.Controller, t *cueflow.Task) error {
|
|
|
|
if t == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
lg := lg.
|
|
|
|
With().
|
|
|
|
Str("component", t.Path().String()).
|
|
|
|
Str("state", t.State().String()).
|
|
|
|
Logger()
|
|
|
|
|
|
|
|
if t.State() != cueflow.Terminated {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
// Merge task value into output
|
|
|
|
var err error
|
|
|
|
r.output, err = r.output.MergePath(t.Value(), t.Path())
|
|
|
|
if err != nil {
|
|
|
|
lg.
|
|
|
|
Error().
|
|
|
|
Err(err).
|
|
|
|
Msg("failed to fill task result")
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
},
|
|
|
|
}
|
|
|
|
// Orchestrate execution with cueflow
|
|
|
|
flow := cueflow.New(flowCfg, inst, newTaskFunc(inst, newPipelineRunner(inst, s)))
|
|
|
|
if err := flow.Run(ctx); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
{
|
|
|
|
span, _ := opentracing.StartSpanFromContext(ctx, "r.Compute: merge state")
|
|
|
|
defer span.Finish()
|
|
|
|
|
|
|
|
return r.mergeState()
|
|
|
|
}
|
2021-03-23 08:22:50 +01:00
|
|
|
}
|
2021-03-23 23:27:16 +01:00
|
|
|
|
2021-03-23 23:41:26 +01:00
|
|
|
type DownOpts struct{}
|
2021-03-23 08:22:50 +01:00
|
|
|
|
2021-03-25 01:55:21 +01:00
|
|
|
func (r *Route) Down(ctx context.Context, _ *DownOpts) error {
|
|
|
|
panic("NOT IMPLEMENTED")
|
|
|
|
}
|
|
|
|
|
2021-03-23 23:41:26 +01:00
|
|
|
func (r *Route) Query(ctx context.Context, expr interface{}, o *QueryOpts) (*compiler.Value, error) {
|
2021-03-23 08:22:50 +01:00
|
|
|
panic("NOT IMPLEMENTED")
|
|
|
|
}
|
2021-03-23 23:27:16 +01:00
|
|
|
|
2021-03-23 23:41:26 +01:00
|
|
|
type QueryOpts struct{}
|
2021-03-25 01:55:21 +01:00
|
|
|
|
|
|
|
func newTaskFunc(inst *cue.Instance, runner cueflow.RunnerFunc) cueflow.TaskFunc {
|
|
|
|
return func(flowVal cue.Value) (cueflow.Runner, error) {
|
|
|
|
v := compiler.Wrap(flowVal, inst)
|
|
|
|
if !isComponent(v) {
|
|
|
|
// No compute script
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
return runner, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func noOpRunner(t *cueflow.Task) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func newPipelineRunner(inst *cue.Instance, s Solver) cueflow.RunnerFunc {
|
|
|
|
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
|
|
|
|
ctx := t.Context()
|
|
|
|
lg := log.
|
|
|
|
Ctx(ctx).
|
|
|
|
With().
|
|
|
|
Str("component", t.Path().String()).
|
|
|
|
Logger()
|
|
|
|
ctx = lg.WithContext(ctx)
|
|
|
|
span, ctx := opentracing.StartSpanFromContext(ctx,
|
|
|
|
fmt.Sprintf("compute: %s", t.Path().String()),
|
|
|
|
)
|
|
|
|
defer span.Finish()
|
|
|
|
|
|
|
|
start := time.Now()
|
|
|
|
lg.
|
|
|
|
Info().
|
|
|
|
Msg("computing")
|
|
|
|
for _, dep := range t.Dependencies() {
|
|
|
|
lg.
|
|
|
|
Debug().
|
|
|
|
Str("dependency", dep.Path().String()).
|
|
|
|
Msg("dependency detected")
|
|
|
|
}
|
|
|
|
v := compiler.Wrap(t.Value(), inst)
|
|
|
|
p := NewPipeline(t.Path().String(), s, NewFillable(t))
|
|
|
|
err := p.Do(ctx, v)
|
|
|
|
if err != nil {
|
|
|
|
span.LogFields(otlog.String("error", err.Error()))
|
|
|
|
ext.Error.Set(span, true)
|
|
|
|
|
|
|
|
// FIXME: this should use errdefs.IsCanceled(err)
|
|
|
|
if strings.Contains(err.Error(), "context canceled") {
|
|
|
|
lg.
|
|
|
|
Error().
|
|
|
|
Dur("duration", time.Since(start)).
|
|
|
|
Msg("canceled")
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
lg.
|
|
|
|
Error().
|
|
|
|
Dur("duration", time.Since(start)).
|
|
|
|
Err(err).
|
|
|
|
Msg("failed")
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
lg.
|
|
|
|
Info().
|
|
|
|
Dur("duration", time.Since(start)).
|
|
|
|
Msg("completed")
|
|
|
|
return nil
|
|
|
|
})
|
|
|
|
}
|