Merge pull request #23 from blocklayerhq/fix-10-cherrypick
Fix race condition in cueflow. Fixes #10.
This commit is contained in:
commit
bbdd2b394a
@ -3,6 +3,7 @@ package dagger
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"os"
|
"os"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"cuelang.org/go/cue"
|
"cuelang.org/go/cue"
|
||||||
cueflow "cuelang.org/go/tools/flow"
|
cueflow "cuelang.org/go/tools/flow"
|
||||||
@ -104,6 +105,7 @@ type EnvWalkFunc func(*Component, Fillable) error
|
|||||||
func (env *Env) Walk(ctx context.Context, fn EnvWalkFunc) (*Value, error) {
|
func (env *Env) Walk(ctx context.Context, fn EnvWalkFunc) (*Value, error) {
|
||||||
debugf("Env.Walk")
|
debugf("Env.Walk")
|
||||||
defer debugf("COMPLETE: Env.Walk")
|
defer debugf("COMPLETE: Env.Walk")
|
||||||
|
l := sync.Mutex{}
|
||||||
// Cueflow cue instance
|
// Cueflow cue instance
|
||||||
// FIXME: make this cleaner in *Value by keeping intermediary instances
|
// FIXME: make this cleaner in *Value by keeping intermediary instances
|
||||||
flowInst, err := env.base.CueInst().Fill(env.input.CueInst().Value())
|
flowInst, err := env.base.CueInst().Fill(env.input.CueInst().Value())
|
||||||
@ -119,6 +121,8 @@ func (env *Env) Walk(ctx context.Context, fn EnvWalkFunc) (*Value, error) {
|
|||||||
// Cueflow config
|
// Cueflow config
|
||||||
flowCfg := &cueflow.Config{
|
flowCfg := &cueflow.Config{
|
||||||
UpdateFunc: func(c *cueflow.Controller, t *cueflow.Task) error {
|
UpdateFunc: func(c *cueflow.Controller, t *cueflow.Task) error {
|
||||||
|
l.Lock()
|
||||||
|
defer l.Unlock()
|
||||||
debugf("compute step")
|
debugf("compute step")
|
||||||
if t == nil {
|
if t == nil {
|
||||||
return nil
|
return nil
|
||||||
@ -141,6 +145,8 @@ func (env *Env) Walk(ctx context.Context, fn EnvWalkFunc) (*Value, error) {
|
|||||||
}
|
}
|
||||||
// Cueflow match func
|
// Cueflow match func
|
||||||
flowMatchFn := func(v cue.Value) (cueflow.Runner, error) {
|
flowMatchFn := func(v cue.Value) (cueflow.Runner, error) {
|
||||||
|
l.Lock()
|
||||||
|
defer l.Unlock()
|
||||||
debugf("Env.Walk: processing %s", v.Path().String())
|
debugf("Env.Walk: processing %s", v.Path().String())
|
||||||
val := env.cc.Wrap(v, flowInst)
|
val := env.cc.Wrap(v, flowInst)
|
||||||
c, err := val.Component()
|
c, err := val.Component()
|
||||||
@ -152,6 +158,8 @@ func (env *Env) Walk(ctx context.Context, fn EnvWalkFunc) (*Value, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
|
return cueflow.RunnerFunc(func(t *cueflow.Task) error {
|
||||||
|
l.Lock()
|
||||||
|
defer l.Unlock()
|
||||||
return fn(c, t)
|
return fn(c, t)
|
||||||
}), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user