This repository has been archived on 2024-04-08. You can view files and clone it, but cannot push or open issues or pull requests.
dagger/environment/environment.go

345 lines
7.3 KiB
Go
Raw Normal View History

package environment
import (
"context"
"fmt"
"io/fs"
"strings"
"time"
"cuelang.org/go/cue"
cueflow "cuelang.org/go/tools/flow"
"go.dagger.io/dagger/compiler"
"go.dagger.io/dagger/solver"
"go.dagger.io/dagger/state"
"go.dagger.io/dagger/stdlib"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/rs/zerolog/log"
)
type Environment struct {
state *state.State
// Layer 1: plan configuration
plan *compiler.Value
// Layer 2: user inputs
input *compiler.Value
// Layer 3: computed values
computed *compiler.Value
}
func New(st *state.State) (*Environment, error) {
e := &Environment{
state: st,
plan: compiler.NewValue(),
input: compiler.NewValue(),
computed: compiler.NewValue(),
}
// Prepare inputs
for key, input := range st.Inputs {
v, err := input.Compile(key, st)
if err != nil {
return nil, err
}
if key == "" {
err = e.input.FillPath(cue.MakePath(), v)
} else {
err = e.input.FillPath(cue.ParsePath(key), v)
}
if err != nil {
return nil, err
}
}
return e, nil
}
func (e *Environment) Name() string {
return e.state.Name
}
func (e *Environment) Plan() *compiler.Value {
return e.plan
}
func (e *Environment) Input() *compiler.Value {
return e.input
}
func (e *Environment) Computed() *compiler.Value {
return e.computed
}
// LoadPlan loads the plan
func (e *Environment) LoadPlan(ctx context.Context, s solver.Solver) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "environment.LoadPlan")
defer span.Finish()
planSource, err := e.state.Plan.Source().Compile("", e.state)
if err != nil {
return err
}
p := NewPipeline(planSource, s).WithCustomName("[internal] source")
// execute updater script
if err := p.Run(ctx); err != nil {
return err
}
// Build a Cue config by overlaying the source with the stdlib
sources := map[string]fs.FS{
stdlib.Path: stdlib.FS,
"/": p.FS(),
}
args := []string{}
if pkg := e.state.Plan.Package; pkg != "" {
args = append(args, pkg)
}
plan, err := compiler.Build(sources, args...)
if err != nil {
return fmt.Errorf("plan config: %w", compiler.Err(err))
}
e.plan = plan
return nil
}
// Scan all scripts in the environment for references to local directories (do:"local"),
// and return all referenced directory names.
// This is used by clients to grant access to local directories when they are referenced
// by user-specified scripts.
func (e *Environment) LocalDirs() map[string]string {
dirs := map[string]string{}
localdirs := func(code *compiler.Value) {
Analyze(
func(op *compiler.Value) error {
do, err := op.Lookup("do").String()
if err != nil {
return err
}
if do != "local" {
return nil
}
dir, err := op.Lookup("dir").String()
if err != nil {
return err
}
dirs[dir] = dir
return nil
},
code,
)
}
// 1. Scan the environment state
// FIXME: use a common `flow` instance to avoid rescanning the tree.
src := compiler.NewValue()
if err := src.FillPath(cue.MakePath(), e.plan); err != nil {
return nil
}
if err := src.FillPath(cue.MakePath(), e.input); err != nil {
return nil
}
flow := cueflow.New(
&cueflow.Config{},
src.Cue(),
newTaskFunc(noOpRunner),
)
for _, t := range flow.Tasks() {
v := compiler.Wrap(t.Value())
localdirs(v.Lookup("#up"))
}
// 2. Scan the plan
plan, err := e.state.Plan.Source().Compile("", e.state)
if err != nil {
panic(err)
}
localdirs(plan)
return dirs
}
// prepare initializes the Environment with inputs and plan code
func (e *Environment) prepare(ctx context.Context) (*compiler.Value, error) {
span, _ := opentracing.StartSpanFromContext(ctx, "environment.Prepare")
defer span.Finish()
// Reset the computed values
e.computed = compiler.NewValue()
src := compiler.NewValue()
if err := src.FillPath(cue.MakePath(), e.plan); err != nil {
return nil, err
}
if err := src.FillPath(cue.MakePath(), e.input); err != nil {
return nil, err
}
return src, nil
}
// Up missing values in environment configuration, and write them to state.
func (e *Environment) Up(ctx context.Context, s solver.Solver) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "environment.Up")
defer span.Finish()
// Set user inputs and plan code
src, err := e.prepare(ctx)
if err != nil {
return err
}
// Orchestrate execution with cueflow
flow := cueflow.New(
&cueflow.Config{},
src.Cue(),
newTaskFunc(newPipelineRunner(e.computed, s)),
)
if err := flow.Run(ctx); err != nil {
return err
}
return nil
}
type DownOpts struct{}
func (e *Environment) Down(ctx context.Context, _ *DownOpts) error {
panic("NOT IMPLEMENTED")
}
type QueryOpts struct{}
func newTaskFunc(runner cueflow.RunnerFunc) cueflow.TaskFunc {
return func(flowVal cue.Value) (cueflow.Runner, error) {
v := compiler.Wrap(flowVal)
if !isComponent(v) {
// No compute script
return nil, nil
}
return runner, nil
}
}
func noOpRunner(t *cueflow.Task) error {
return nil
}
func newPipelineRunner(computed *compiler.Value, s solver.Solver) cueflow.RunnerFunc {
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
ctx := t.Context()
lg := log.
Ctx(ctx).
With().
Str("component", t.Path().String()).
Logger()
ctx = lg.WithContext(ctx)
span, ctx := opentracing.StartSpanFromContext(ctx,
fmt.Sprintf("compute: %s", t.Path().String()),
)
defer span.Finish()
start := time.Now()
lg.
Info().
Msg("computing")
for _, dep := range t.Dependencies() {
lg.
Debug().
Str("dependency", dep.Path().String()).
Msg("dependency detected")
}
v := compiler.Wrap(t.Value())
p := NewPipeline(v, s)
err := p.Run(ctx)
if err != nil {
span.LogFields(otlog.String("error", err.Error()))
ext.Error.Set(span, true)
// FIXME: this should use errdefs.IsCanceled(err)
if strings.Contains(err.Error(), "context canceled") {
lg.
Error().
Dur("duration", time.Since(start)).
Msg("canceled")
return err
}
lg.
Error().
Dur("duration", time.Since(start)).
Err(err).
Msg("failed")
return err
}
// Mirror the computed values in both `Task` and `Result`
if p.Computed().IsEmptyStruct() {
return nil
}
if err := t.Fill(p.Computed().Cue()); err != nil {
lg.
Error().
Err(err).
Msg("failed to fill task")
return err
}
// Merge task value into output
if err := computed.FillPath(t.Path(), p.Computed()); err != nil {
lg.
Error().
Err(err).
Msg("failed to fill task result")
return err
}
lg.
Info().
Dur("duration", time.Since(start)).
Msg("completed")
return nil
})
}
func (e *Environment) ScanInputs(ctx context.Context, mergeUserInputs bool) ([]*compiler.Value, error) {
src := e.plan
if mergeUserInputs {
// Set user inputs and plan code
var err error
src, err = e.prepare(ctx)
if err != nil {
return nil, err
}
}
return ScanInputs(ctx, src), nil
}
func (e *Environment) ScanOutputs(ctx context.Context) ([]*compiler.Value, error) {
src, err := e.prepare(ctx)
if err != nil {
return nil, err
}
if e.state.Computed != "" {
computed, err := compiler.DecodeJSON("", []byte(e.state.Computed))
if err != nil {
return nil, err
}
if err := src.FillPath(cue.MakePath(), computed); err != nil {
return nil, err
}
}
return ScanOutputs(ctx, src), nil
}