flow cleanup: separate TaskFunc from RunnerFunc

Extracted from #140

This will help be more flexible in what gets executed and how (e.g. for
running tests for instance)

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2021-03-10 15:01:37 -08:00
parent ef1bb4b873
commit 3628dbda84

View File

@ -146,7 +146,7 @@ func (env *Env) LocalDirs() map[string]string {
// 1. Scan the environment state // 1. Scan the environment state
// FIXME: use a common `flow` instance to avoid rescanning the tree. // FIXME: use a common `flow` instance to avoid rescanning the tree.
inst := env.state.CueInst() inst := env.state.CueInst()
flow := cueflow.New(&cueflow.Config{}, inst, newDummyTaskFunc(inst)) flow := cueflow.New(&cueflow.Config{}, inst, newTaskFunc(inst, noOpRunner))
for _, t := range flow.Tasks() { for _, t := range flow.Tasks() {
v := compiler.Wrap(t.Value(), inst) v := compiler.Wrap(t.Value(), inst)
localdirs(v.Get("#compute")) localdirs(v.Get("#compute"))
@ -225,7 +225,7 @@ func (env *Env) Compute(ctx context.Context, s Solver) error {
lg := log.Ctx(ctx) lg := log.Ctx(ctx)
// Cueflow cue instance // Cueflow cue instance
flowInst := env.state.CueInst() inst := env.state.CueInst()
// Reset the output // Reset the output
env.output = compiler.EmptyStruct() env.output = compiler.EmptyStruct()
@ -260,7 +260,7 @@ func (env *Env) Compute(ctx context.Context, s Solver) error {
}, },
} }
// Orchestrate execution with cueflow // Orchestrate execution with cueflow
flow := cueflow.New(flowCfg, flowInst, newPipelineTaskFunc(flowInst, s)) flow := cueflow.New(flowCfg, inst, newTaskFunc(inst, newPipelineRunner(inst, s)))
if err := flow.Run(ctx); err != nil { if err := flow.Run(ctx); err != nil {
return err return err
} }
@ -273,27 +273,22 @@ func (env *Env) Compute(ctx context.Context, s Solver) error {
} }
} }
func newDummyTaskFunc(inst *cue.Instance) cueflow.TaskFunc { func newTaskFunc(inst *cue.Instance, runner cueflow.RunnerFunc) cueflow.TaskFunc {
return func(flowVal cue.Value) (cueflow.Runner, error) { return func(flowVal cue.Value) (cueflow.Runner, error) {
v := compiler.Wrap(flowVal, inst) v := compiler.Wrap(flowVal, inst)
if !isComponent(v) { if !isComponent(v) {
// No compute script // No compute script
return nil, nil return nil, nil
} }
return cueflow.RunnerFunc(func(t *cueflow.Task) error { return runner, nil
return nil
}), nil
} }
} }
func newPipelineTaskFunc(inst *cue.Instance, s Solver) cueflow.TaskFunc { func noOpRunner(t *cueflow.Task) error {
return func(flowVal cue.Value) (cueflow.Runner, error) { return nil
v := compiler.Wrap(flowVal, inst)
if !isComponent(v) {
// No compute script
return nil, nil
} }
// Cueflow run func:
func newPipelineRunner(inst *cue.Instance, s Solver) cueflow.RunnerFunc {
return cueflow.RunnerFunc(func(t *cueflow.Task) error { return cueflow.RunnerFunc(func(t *cueflow.Task) error {
ctx := t.Context() ctx := t.Context()
lg := log. lg := log.
@ -344,6 +339,5 @@ func newPipelineTaskFunc(inst *cue.Instance, s Solver) cueflow.TaskFunc {
Dur("duration", time.Since(start)). Dur("duration", time.Since(start)).
Msg("completed") Msg("completed")
return nil return nil
}), nil })
}
} }