This repository has been archived on 2024-04-08. You can view files and clone it, but cannot push or open issues or pull requests.
dagger/plan/plan.go
Andrea Luzzardi 81ec746d2c core: component -> task
Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
2021-11-24 16:58:24 -08:00

174 lines
4.3 KiB
Go

package plan
import (
"context"
"fmt"
"path/filepath"
"strings"
"time"
"cuelang.org/go/cue"
cueflow "cuelang.org/go/tools/flow"
"github.com/rs/zerolog/log"
"go.dagger.io/dagger/compiler"
"go.dagger.io/dagger/environment"
"go.dagger.io/dagger/plan/task"
"go.dagger.io/dagger/plancontext"
"go.dagger.io/dagger/solver"
"go.dagger.io/dagger/state"
"go.opentelemetry.io/otel"
)
type Plan struct {
context *plancontext.Context
source *compiler.Value
}
func Load(ctx context.Context, path, pkg string) (*Plan, error) {
// FIXME: universe vendoring
if err := state.VendorUniverse(ctx, path); err != nil {
return nil, err
}
v, err := compiler.Build(path, nil, pkg)
if err != nil {
return nil, err
}
return &Plan{
context: plancontext.New(),
source: v,
}, nil
}
func (p *Plan) Context() *plancontext.Context {
return p.context
}
func (p *Plan) Source() *compiler.Value {
return p.source
}
// LocalDirectories scans the context for local imports.
// BuildKit requires to known the list of directories ahead of time.
func (p *Plan) LocalDirectories() (map[string]string, error) {
dirs := map[string]string{}
imports, err := p.source.Lookup("context.imports").Fields()
if err != nil {
return nil, err
}
for _, v := range imports {
dir, err := v.Value.Lookup("path").String()
if err != nil {
return nil, err
}
abs, err := filepath.Abs(dir)
if err != nil {
return nil, err
}
dirs[dir] = abs
}
return dirs, nil
}
// Up executes the plan
func (p *Plan) Up(ctx context.Context, s solver.Solver) error {
ctx, span := otel.Tracer("dagger").Start(ctx, "plan.Up")
defer span.End()
computed := compiler.NewValue()
flow := cueflow.New(
&cueflow.Config{},
p.source.Cue(),
newRunner(p.context, s, computed),
)
if err := flow.Run(ctx); err != nil {
return err
}
if src, err := computed.Source(); err == nil {
log.Ctx(ctx).Debug().Str("computed", string(src)).Msg("computed values")
}
// FIXME: canceling the context makes flow return `nil`
// Check explicitly if the context is canceled.
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}
func newRunner(pctx *plancontext.Context, s solver.Solver, computed *compiler.Value) cueflow.TaskFunc {
return func(flowVal cue.Value) (cueflow.Runner, error) {
v := compiler.Wrap(flowVal)
r, err := task.Lookup(v)
if err != nil {
// Not a task
if err == task.ErrNotTask {
return nil, nil
}
return nil, err
}
// Wrapper around `task.Run` that handles logging, tracing, etc.
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
ctx := t.Context()
lg := log.Ctx(ctx).With().Str("task", t.Path().String()).Logger()
ctx = lg.WithContext(ctx)
ctx, span := otel.Tracer("dagger").Start(ctx, fmt.Sprintf("compute: %s", t.Path().String()))
defer span.End()
lg.Info().Str("state", string(environment.StateComputing)).Msg(string(environment.StateComputing))
// Debug: dump dependencies
for _, dep := range t.Dependencies() {
lg.Debug().Str("dependency", dep.Path().String()).Msg("dependency detected")
}
start := time.Now()
result, err := r.Run(ctx, pctx, s, compiler.Wrap(t.Value()))
if err != nil {
// FIXME: this should use errdefs.IsCanceled(err)
if strings.Contains(err.Error(), "context canceled") {
lg.Error().Dur("duration", time.Since(start)).Str("state", string(environment.StateCanceled)).Msg(string(environment.StateCanceled))
} else {
lg.Error().Dur("duration", time.Since(start)).Err(err).Str("state", string(environment.StateFailed)).Msg(string(environment.StateFailed))
}
return err
}
lg.Info().Dur("duration", time.Since(start)).Str("state", string(environment.StateCompleted)).Msg(string(environment.StateCompleted))
// If the result is not concrete, there's nothing to merge.
if !result.IsConcrete() {
return nil
}
if src, err := result.Source(); err == nil {
lg.Debug().Str("result", string(src)).Msg("merging task result")
}
// Mirror task result in both `flow.Task` and `computed`
if err := t.Fill(result.Cue()); err != nil {
lg.Error().Err(err).Msg("failed to fill task")
return err
}
// Merge task value into computed
if err := computed.FillPath(t.Path(), result); err != nil {
lg.Error().Err(err).Msg("failed to fill plan")
return err
}
return nil
}), nil
}
}