From 608f2544491bec60cba865cf60ce74257096d9fa Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Wed, 24 Nov 2021 16:23:42 -0800 Subject: [PATCH] runtime: support legacy Pipelines in new execution engine Signed-off-by: Andrea Luzzardi --- environment/environment.go | 14 ++++++-------- environment/pipeline.go | 4 ++-- plan/task/pipeline.go | 27 +++++++++++++++++++++++++++ 3 files changed, 35 insertions(+), 10 deletions(-) create mode 100644 plan/task/pipeline.go diff --git a/environment/environment.go b/environment/environment.go index d74d30e4..4915e30e 100644 --- a/environment/environment.go +++ b/environment/environment.go @@ -78,15 +78,14 @@ func (e *Environment) Context() *plancontext.Context { // Up missing values in environment configuration, and write them to state. func (e *Environment) Up(ctx context.Context, s solver.Solver) error { - tr := otel.Tracer("environment") - ctx, span := tr.Start(ctx, "environment.Up") + ctx, span := otel.Tracer("dagger").Start(ctx, "environment.Up") defer span.End() // Orchestrate execution with cueflow flow := cueflow.New( &cueflow.Config{}, e.src.Cue(), - NewTaskFunc(NewPipelineRunner(e.computed, s, e.state.Context)), + newTaskFunc(newPipelineRunner(e.computed, s, e.state.Context)), ) if err := flow.Run(ctx); err != nil { return err @@ -110,10 +109,10 @@ func (e *Environment) Down(ctx context.Context, _ *DownOpts) error { type QueryOpts struct{} -func NewTaskFunc(runner cueflow.RunnerFunc) cueflow.TaskFunc { +func newTaskFunc(runner cueflow.RunnerFunc) cueflow.TaskFunc { return func(flowVal cue.Value) (cueflow.Runner, error) { v := compiler.Wrap(flowVal) - if !isComponent(v) { + if !IsComponent(v) { // No compute script return nil, nil } @@ -121,7 +120,7 @@ func NewTaskFunc(runner cueflow.RunnerFunc) cueflow.TaskFunc { } } -func NewPipelineRunner(computed *compiler.Value, s solver.Solver, pctx *plancontext.Context) cueflow.RunnerFunc { +func newPipelineRunner(computed *compiler.Value, s solver.Solver, pctx *plancontext.Context) cueflow.RunnerFunc { return cueflow.RunnerFunc(func(t *cueflow.Task) error { ctx := t.Context() lg := log. @@ -131,8 +130,7 @@ func NewPipelineRunner(computed *compiler.Value, s solver.Solver, pctx *plancont Logger() ctx = lg.WithContext(ctx) - tr := otel.Tracer("environment") - ctx, span := tr.Start(ctx, fmt.Sprintf("compute: %s", t.Path().String())) + ctx, span := otel.Tracer("dagger").Start(ctx, fmt.Sprintf("compute: %s", t.Path().String())) defer span.End() for _, dep := range t.Dependencies() { diff --git a/environment/pipeline.go b/environment/pipeline.go index 8812444f..1e01ab28 100644 --- a/environment/pipeline.go +++ b/environment/pipeline.go @@ -91,14 +91,14 @@ func (p *Pipeline) Computed() *compiler.Value { return p.computed } -func isComponent(v *compiler.Value) bool { +func IsComponent(v *compiler.Value) bool { return v.Lookup("#up").Exists() } func ops(code *compiler.Value) ([]*compiler.Value, error) { ops := []*compiler.Value{} // 1. attachment array - if isComponent(code) { + if IsComponent(code) { xops, err := code.Lookup("#up").List() if err != nil { return nil, err diff --git a/plan/task/pipeline.go b/plan/task/pipeline.go new file mode 100644 index 00000000..5cf02519 --- /dev/null +++ b/plan/task/pipeline.go @@ -0,0 +1,27 @@ +package task + +import ( + "context" + + "go.dagger.io/dagger/compiler" + "go.dagger.io/dagger/environment" + "go.dagger.io/dagger/plancontext" + "go.dagger.io/dagger/solver" +) + +func init() { + Register("#up", func() Task { return &pipelineTask{} }) +} + +// pipelineTask is an adapter for legacy pipelines (`#up`). +// FIXME: remove once fully migrated to Europa. +type pipelineTask struct { +} + +func (c pipelineTask) Run(ctx context.Context, pctx *plancontext.Context, s solver.Solver, v *compiler.Value) (*compiler.Value, error) { + p := environment.NewPipeline(v, s, pctx) + if err := p.Run(ctx); err != nil { + return nil, err + } + return p.Computed(), nil +}