client: simplify Compute logic

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2021-02-02 11:32:35 -08:00
parent c9e0d0854d
commit cd0f21dbd2
2 changed files with 34 additions and 31 deletions

View File

@ -51,46 +51,40 @@ func NewClient(ctx context.Context, host string) (*Client, error) {
} }
// FIXME: return completed *Env, instead of *Value // FIXME: return completed *Env, instead of *Value
func (c *Client) Compute(ctx context.Context, env *Env) (o *Value, err error) { func (c *Client) Compute(ctx context.Context, env *Env) (*Value, error) {
lg := log.Ctx(ctx) lg := log.Ctx(ctx)
defer func() {
if err != nil {
// Expand cue errors to get full details
err = cueErr(err)
}
}()
// FIXME: merge this into env output. eg, gctx := errgroup.WithContext(ctx)
out, err := env.Compiler().EmptyStruct()
if err != nil {
return nil, err
}
// Spawn Build() goroutine
eg, ctx := errgroup.WithContext(ctx)
events := make(chan *bk.SolveStatus)
outr, outw := io.Pipe()
// Spawn build function
eg.Go(func() error {
defer outw.Close()
return c.buildfn(ctx, env, events, outw)
})
// Spawn print function // Spawn print function
var events chan *bk.SolveStatus
if os.Getenv("DOCKER_OUTPUT") != "" { if os.Getenv("DOCKER_OUTPUT") != "" {
events = make(chan *bk.SolveStatus)
eg.Go(func() error { eg.Go(func() error {
dispCtx := context.TODO() dispCtx := context.TODO()
return c.dockerprintfn(dispCtx, events, lg) return c.dockerprintfn(dispCtx, events, lg)
}) })
} }
// Retrieve output // Spawn build function
outr, outw := io.Pipe()
eg.Go(func() error {
defer outw.Close()
return c.buildfn(gctx, env, events, outw)
})
// Spawn output retriever
var (
out *Value
err error
)
eg.Go(func() error { eg.Go(func() error {
defer outr.Close() defer outr.Close()
return c.outputfn(ctx, outr, out, env.cc) out, err = c.outputfn(gctx, outr, env.cc)
return err
}) })
return out, eg.Wait()
return out, cueErr(eg.Wait())
} }
func (c *Client) buildfn(ctx context.Context, env *Env, ch chan *bk.SolveStatus, w io.WriteCloser) error { func (c *Client) buildfn(ctx context.Context, env *Env, ch chan *bk.SolveStatus, w io.WriteCloser) error {
@ -164,9 +158,15 @@ func (c *Client) buildfn(ctx context.Context, env *Env, ch chan *bk.SolveStatus,
} }
// Read tar export stream from buildkit Build(), and extract cue output // Read tar export stream from buildkit Build(), and extract cue output
func (c *Client) outputfn(ctx context.Context, r io.Reader, out *Value, cc *Compiler) error { func (c *Client) outputfn(ctx context.Context, r io.Reader, cc *Compiler) (*Value, error) {
lg := log.Ctx(ctx) lg := log.Ctx(ctx)
// FIXME: merge this into env output.
out, err := cc.EmptyStruct()
if err != nil {
return nil, err
}
tr := tar.NewReader(r) tr := tar.NewReader(r)
for { for {
h, err := tr.Next() h, err := tr.Next()
@ -174,7 +174,7 @@ func (c *Client) outputfn(ctx context.Context, r io.Reader, out *Value, cc *Comp
break break
} }
if err != nil { if err != nil {
return errors.Wrap(err, "read tar stream") return nil, errors.Wrap(err, "read tar stream")
} }
lg := lg. lg := lg.
@ -190,13 +190,13 @@ func (c *Client) outputfn(ctx context.Context, r io.Reader, out *Value, cc *Comp
v, err := cc.Compile(h.Name, tr) v, err := cc.Compile(h.Name, tr)
if err != nil { if err != nil {
return err return nil, err
} }
if err := out.Fill(v); err != nil { if err := out.Fill(v); err != nil {
return errors.Wrap(err, h.Name) return nil, errors.Wrap(err, h.Name)
} }
} }
return nil return out, nil
} }
func (c *Client) dockerprintfn(ctx context.Context, ch chan *bk.SolveStatus, out io.Writer) error { func (c *Client) dockerprintfn(ctx context.Context, ch chan *bk.SolveStatus, out io.Writer) error {

View File

@ -10,6 +10,9 @@ import (
) )
func cueErr(err error) error { func cueErr(err error) error {
if err == nil {
return nil
}
return errors.New(cueerrors.Details(err, &cueerrors.Config{})) return errors.New(cueerrors.Details(err, &cueerrors.Config{}))
} }