From 3628dbda84b61c33bbc2663fef0b1fd4c08f34e7 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Wed, 10 Mar 2021 15:01:37 -0800 Subject: [PATCH] flow cleanup: separate TaskFunc from RunnerFunc Extracted from #140 This will help be more flexible in what gets executed and how (e.g. for running tests for instance) Signed-off-by: Andrea Luzzardi --- dagger/env.go | 108 ++++++++++++++++++++++++-------------------------- 1 file changed, 51 insertions(+), 57 deletions(-) diff --git a/dagger/env.go b/dagger/env.go index 0d27f362..35464326 100644 --- a/dagger/env.go +++ b/dagger/env.go @@ -146,7 +146,7 @@ func (env *Env) LocalDirs() map[string]string { // 1. Scan the environment state // FIXME: use a common `flow` instance to avoid rescanning the tree. inst := env.state.CueInst() - flow := cueflow.New(&cueflow.Config{}, inst, newDummyTaskFunc(inst)) + flow := cueflow.New(&cueflow.Config{}, inst, newTaskFunc(inst, noOpRunner)) for _, t := range flow.Tasks() { v := compiler.Wrap(t.Value(), inst) localdirs(v.Get("#compute")) @@ -225,7 +225,7 @@ func (env *Env) Compute(ctx context.Context, s Solver) error { lg := log.Ctx(ctx) // Cueflow cue instance - flowInst := env.state.CueInst() + inst := env.state.CueInst() // Reset the output env.output = compiler.EmptyStruct() @@ -260,7 +260,7 @@ func (env *Env) Compute(ctx context.Context, s Solver) error { }, } // Orchestrate execution with cueflow - flow := cueflow.New(flowCfg, flowInst, newPipelineTaskFunc(flowInst, s)) + flow := cueflow.New(flowCfg, inst, newTaskFunc(inst, newPipelineRunner(inst, s))) if err := flow.Run(ctx); err != nil { return err } @@ -273,77 +273,71 @@ func (env *Env) Compute(ctx context.Context, s Solver) error { } } -func newDummyTaskFunc(inst *cue.Instance) cueflow.TaskFunc { +func newTaskFunc(inst *cue.Instance, runner cueflow.RunnerFunc) cueflow.TaskFunc { return func(flowVal cue.Value) (cueflow.Runner, error) { v := compiler.Wrap(flowVal, inst) if !isComponent(v) { // No compute script return nil, nil } - return cueflow.RunnerFunc(func(t *cueflow.Task) error { - return nil - }), nil + return runner, nil } } -func newPipelineTaskFunc(inst *cue.Instance, s Solver) cueflow.TaskFunc { - return func(flowVal cue.Value) (cueflow.Runner, error) { - v := compiler.Wrap(flowVal, inst) - if !isComponent(v) { - // No compute script - return nil, nil - } - // Cueflow run func: - return cueflow.RunnerFunc(func(t *cueflow.Task) error { - ctx := t.Context() - lg := log. - Ctx(ctx). - With(). - Str("component", t.Path().String()). - Logger() - ctx = lg.WithContext(ctx) - span, ctx := opentracing.StartSpanFromContext(ctx, - fmt.Sprintf("compute: %s", t.Path().String()), - ) - defer span.Finish() +func noOpRunner(t *cueflow.Task) error { + return nil +} - start := time.Now() +func newPipelineRunner(inst *cue.Instance, s Solver) cueflow.RunnerFunc { + return cueflow.RunnerFunc(func(t *cueflow.Task) error { + ctx := t.Context() + lg := log. + Ctx(ctx). + With(). + Str("component", t.Path().String()). + Logger() + ctx = lg.WithContext(ctx) + span, ctx := opentracing.StartSpanFromContext(ctx, + fmt.Sprintf("compute: %s", t.Path().String()), + ) + defer span.Finish() + + start := time.Now() + lg. + Info(). + Msg("computing") + for _, dep := range t.Dependencies() { lg. - Info(). - Msg("computing") - for _, dep := range t.Dependencies() { - lg. - Debug(). - Str("dependency", dep.Path().String()). - Msg("dependency detected") - } - v := compiler.Wrap(t.Value(), inst) - p := NewPipeline(t.Path().String(), s, NewFillable(t)) - err := p.Do(ctx, v) - if err != nil { - span.LogFields(otlog.String("error", err.Error())) - ext.Error.Set(span, true) + Debug(). + Str("dependency", dep.Path().String()). + Msg("dependency detected") + } + v := compiler.Wrap(t.Value(), inst) + p := NewPipeline(t.Path().String(), s, NewFillable(t)) + err := p.Do(ctx, v) + if err != nil { + span.LogFields(otlog.String("error", err.Error())) + ext.Error.Set(span, true) - // FIXME: this should use errdefs.IsCanceled(err) - if strings.Contains(err.Error(), "context canceled") { - lg. - Error(). - Dur("duration", time.Since(start)). - Msg("canceled") - return err - } + // FIXME: this should use errdefs.IsCanceled(err) + if strings.Contains(err.Error(), "context canceled") { lg. Error(). Dur("duration", time.Since(start)). - Err(err). - Msg("failed") + Msg("canceled") return err } lg. - Info(). + Error(). Dur("duration", time.Since(start)). - Msg("completed") - return nil - }), nil - } + Err(err). + Msg("failed") + return err + } + lg. + Info(). + Dur("duration", time.Since(start)). + Msg("completed") + return nil + }) }