runtime: support legacy Pipelines in new execution engine

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2021-11-24 16:23:42 -08:00
parent 2a4db167e4
commit 608f254449
3 changed files with 35 additions and 10 deletions

View File

@ -78,15 +78,14 @@ func (e *Environment) Context() *plancontext.Context {
// Up missing values in environment configuration, and write them to state.
func (e *Environment) Up(ctx context.Context, s solver.Solver) error {
tr := otel.Tracer("environment")
ctx, span := tr.Start(ctx, "environment.Up")
ctx, span := otel.Tracer("dagger").Start(ctx, "environment.Up")
defer span.End()
// Orchestrate execution with cueflow
flow := cueflow.New(
&cueflow.Config{},
e.src.Cue(),
NewTaskFunc(NewPipelineRunner(e.computed, s, e.state.Context)),
newTaskFunc(newPipelineRunner(e.computed, s, e.state.Context)),
)
if err := flow.Run(ctx); err != nil {
return err
@ -110,10 +109,10 @@ func (e *Environment) Down(ctx context.Context, _ *DownOpts) error {
type QueryOpts struct{}
func NewTaskFunc(runner cueflow.RunnerFunc) cueflow.TaskFunc {
func newTaskFunc(runner cueflow.RunnerFunc) cueflow.TaskFunc {
return func(flowVal cue.Value) (cueflow.Runner, error) {
v := compiler.Wrap(flowVal)
if !isComponent(v) {
if !IsComponent(v) {
// No compute script
return nil, nil
}
@ -121,7 +120,7 @@ func NewTaskFunc(runner cueflow.RunnerFunc) cueflow.TaskFunc {
}
}
func NewPipelineRunner(computed *compiler.Value, s solver.Solver, pctx *plancontext.Context) cueflow.RunnerFunc {
func newPipelineRunner(computed *compiler.Value, s solver.Solver, pctx *plancontext.Context) cueflow.RunnerFunc {
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
ctx := t.Context()
lg := log.
@ -131,8 +130,7 @@ func NewPipelineRunner(computed *compiler.Value, s solver.Solver, pctx *plancont
Logger()
ctx = lg.WithContext(ctx)
tr := otel.Tracer("environment")
ctx, span := tr.Start(ctx, fmt.Sprintf("compute: %s", t.Path().String()))
ctx, span := otel.Tracer("dagger").Start(ctx, fmt.Sprintf("compute: %s", t.Path().String()))
defer span.End()
for _, dep := range t.Dependencies() {

View File

@ -91,14 +91,14 @@ func (p *Pipeline) Computed() *compiler.Value {
return p.computed
}
func isComponent(v *compiler.Value) bool {
func IsComponent(v *compiler.Value) bool {
return v.Lookup("#up").Exists()
}
func ops(code *compiler.Value) ([]*compiler.Value, error) {
ops := []*compiler.Value{}
// 1. attachment array
if isComponent(code) {
if IsComponent(code) {
xops, err := code.Lookup("#up").List()
if err != nil {
return nil, err

27
plan/task/pipeline.go Normal file
View File

@ -0,0 +1,27 @@
package task
import (
"context"
"go.dagger.io/dagger/compiler"
"go.dagger.io/dagger/environment"
"go.dagger.io/dagger/plancontext"
"go.dagger.io/dagger/solver"
)
func init() {
Register("#up", func() Task { return &pipelineTask{} })
}
// pipelineTask is an adapter for legacy pipelines (`#up`).
// FIXME: remove once fully migrated to Europa.
type pipelineTask struct {
}
func (c pipelineTask) Run(ctx context.Context, pctx *plancontext.Context, s solver.Solver, v *compiler.Value) (*compiler.Value, error) {
p := environment.NewPipeline(v, s, pctx)
if err := p.Run(ctx); err != nil {
return nil, err
}
return p.Computed(), nil
}