Merge pull request #166 from dagger/flow-cleanup
flow cleanup: separate TaskFunc from RunnerFunc
This commit is contained in:
commit
b039bfd334
108
dagger/env.go
108
dagger/env.go
@ -146,7 +146,7 @@ func (env *Env) LocalDirs() map[string]string {
|
|||||||
// 1. Scan the environment state
|
// 1. Scan the environment state
|
||||||
// FIXME: use a common `flow` instance to avoid rescanning the tree.
|
// FIXME: use a common `flow` instance to avoid rescanning the tree.
|
||||||
inst := env.state.CueInst()
|
inst := env.state.CueInst()
|
||||||
flow := cueflow.New(&cueflow.Config{}, inst, newDummyTaskFunc(inst))
|
flow := cueflow.New(&cueflow.Config{}, inst, newTaskFunc(inst, noOpRunner))
|
||||||
for _, t := range flow.Tasks() {
|
for _, t := range flow.Tasks() {
|
||||||
v := compiler.Wrap(t.Value(), inst)
|
v := compiler.Wrap(t.Value(), inst)
|
||||||
localdirs(v.Get("#compute"))
|
localdirs(v.Get("#compute"))
|
||||||
@ -225,7 +225,7 @@ func (env *Env) Compute(ctx context.Context, s Solver) error {
|
|||||||
lg := log.Ctx(ctx)
|
lg := log.Ctx(ctx)
|
||||||
|
|
||||||
// Cueflow cue instance
|
// Cueflow cue instance
|
||||||
flowInst := env.state.CueInst()
|
inst := env.state.CueInst()
|
||||||
|
|
||||||
// Reset the output
|
// Reset the output
|
||||||
env.output = compiler.EmptyStruct()
|
env.output = compiler.EmptyStruct()
|
||||||
@ -260,7 +260,7 @@ func (env *Env) Compute(ctx context.Context, s Solver) error {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
// Orchestrate execution with cueflow
|
// Orchestrate execution with cueflow
|
||||||
flow := cueflow.New(flowCfg, flowInst, newPipelineTaskFunc(flowInst, s))
|
flow := cueflow.New(flowCfg, inst, newTaskFunc(inst, newPipelineRunner(inst, s)))
|
||||||
if err := flow.Run(ctx); err != nil {
|
if err := flow.Run(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -273,77 +273,71 @@ func (env *Env) Compute(ctx context.Context, s Solver) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDummyTaskFunc(inst *cue.Instance) cueflow.TaskFunc {
|
func newTaskFunc(inst *cue.Instance, runner cueflow.RunnerFunc) cueflow.TaskFunc {
|
||||||
return func(flowVal cue.Value) (cueflow.Runner, error) {
|
return func(flowVal cue.Value) (cueflow.Runner, error) {
|
||||||
v := compiler.Wrap(flowVal, inst)
|
v := compiler.Wrap(flowVal, inst)
|
||||||
if !isComponent(v) {
|
if !isComponent(v) {
|
||||||
// No compute script
|
// No compute script
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
|
return runner, nil
|
||||||
return nil
|
|
||||||
}), nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPipelineTaskFunc(inst *cue.Instance, s Solver) cueflow.TaskFunc {
|
func noOpRunner(t *cueflow.Task) error {
|
||||||
return func(flowVal cue.Value) (cueflow.Runner, error) {
|
return nil
|
||||||
v := compiler.Wrap(flowVal, inst)
|
}
|
||||||
if !isComponent(v) {
|
|
||||||
// No compute script
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
// Cueflow run func:
|
|
||||||
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
|
|
||||||
ctx := t.Context()
|
|
||||||
lg := log.
|
|
||||||
Ctx(ctx).
|
|
||||||
With().
|
|
||||||
Str("component", t.Path().String()).
|
|
||||||
Logger()
|
|
||||||
ctx = lg.WithContext(ctx)
|
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx,
|
|
||||||
fmt.Sprintf("compute: %s", t.Path().String()),
|
|
||||||
)
|
|
||||||
defer span.Finish()
|
|
||||||
|
|
||||||
start := time.Now()
|
func newPipelineRunner(inst *cue.Instance, s Solver) cueflow.RunnerFunc {
|
||||||
|
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
|
||||||
|
ctx := t.Context()
|
||||||
|
lg := log.
|
||||||
|
Ctx(ctx).
|
||||||
|
With().
|
||||||
|
Str("component", t.Path().String()).
|
||||||
|
Logger()
|
||||||
|
ctx = lg.WithContext(ctx)
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx,
|
||||||
|
fmt.Sprintf("compute: %s", t.Path().String()),
|
||||||
|
)
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
lg.
|
||||||
|
Info().
|
||||||
|
Msg("computing")
|
||||||
|
for _, dep := range t.Dependencies() {
|
||||||
lg.
|
lg.
|
||||||
Info().
|
Debug().
|
||||||
Msg("computing")
|
Str("dependency", dep.Path().String()).
|
||||||
for _, dep := range t.Dependencies() {
|
Msg("dependency detected")
|
||||||
lg.
|
}
|
||||||
Debug().
|
v := compiler.Wrap(t.Value(), inst)
|
||||||
Str("dependency", dep.Path().String()).
|
p := NewPipeline(t.Path().String(), s, NewFillable(t))
|
||||||
Msg("dependency detected")
|
err := p.Do(ctx, v)
|
||||||
}
|
if err != nil {
|
||||||
v := compiler.Wrap(t.Value(), inst)
|
span.LogFields(otlog.String("error", err.Error()))
|
||||||
p := NewPipeline(t.Path().String(), s, NewFillable(t))
|
ext.Error.Set(span, true)
|
||||||
err := p.Do(ctx, v)
|
|
||||||
if err != nil {
|
|
||||||
span.LogFields(otlog.String("error", err.Error()))
|
|
||||||
ext.Error.Set(span, true)
|
|
||||||
|
|
||||||
// FIXME: this should use errdefs.IsCanceled(err)
|
// FIXME: this should use errdefs.IsCanceled(err)
|
||||||
if strings.Contains(err.Error(), "context canceled") {
|
if strings.Contains(err.Error(), "context canceled") {
|
||||||
lg.
|
|
||||||
Error().
|
|
||||||
Dur("duration", time.Since(start)).
|
|
||||||
Msg("canceled")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
lg.
|
lg.
|
||||||
Error().
|
Error().
|
||||||
Dur("duration", time.Since(start)).
|
Dur("duration", time.Since(start)).
|
||||||
Err(err).
|
Msg("canceled")
|
||||||
Msg("failed")
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
lg.
|
lg.
|
||||||
Info().
|
Error().
|
||||||
Dur("duration", time.Since(start)).
|
Dur("duration", time.Since(start)).
|
||||||
Msg("completed")
|
Err(err).
|
||||||
return nil
|
Msg("failed")
|
||||||
}), nil
|
return err
|
||||||
}
|
}
|
||||||
|
lg.
|
||||||
|
Info().
|
||||||
|
Dur("duration", time.Since(start)).
|
||||||
|
Msg("completed")
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user