client: keep resource initialization and cleanup together
Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
parent
642e8c5a2f
commit
10224682f7
308
dagger/client.go
308
dagger/client.go
@ -7,7 +7,6 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
@ -112,112 +111,128 @@ func (c *Client) Compute(ctx context.Context) (*Value, error) {
|
|||||||
eg, ctx := errgroup.WithContext(ctx)
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
events := make(chan *bk.SolveStatus)
|
events := make(chan *bk.SolveStatus)
|
||||||
outr, outw := io.Pipe()
|
outr, outw := io.Pipe()
|
||||||
|
|
||||||
// Spawn build function
|
// Spawn build function
|
||||||
eg.Go(c.buildfn(ctx, events, outw))
|
eg.Go(func() error {
|
||||||
|
defer outw.Close()
|
||||||
|
return c.buildfn(ctx, events, outw)
|
||||||
|
})
|
||||||
|
|
||||||
// Spawn print function(s)
|
// Spawn print function(s)
|
||||||
dispCtx := context.TODO()
|
dispCtx := context.TODO()
|
||||||
var eventsdup chan *bk.SolveStatus
|
|
||||||
if os.Getenv("DOCKER_OUTPUT") != "" {
|
if os.Getenv("DOCKER_OUTPUT") != "" {
|
||||||
eventsdup = make(chan *bk.SolveStatus)
|
// Multiplex events
|
||||||
eg.Go(c.dockerprintfn(dispCtx, eventsdup, os.Stderr))
|
eventsPrint := make(chan *bk.SolveStatus)
|
||||||
|
eventsDockerPrint := make(chan *bk.SolveStatus)
|
||||||
|
eg.Go(func() error {
|
||||||
|
defer close(eventsPrint)
|
||||||
|
defer close(eventsDockerPrint)
|
||||||
|
|
||||||
|
for e := range events {
|
||||||
|
eventsPrint <- e
|
||||||
|
eventsDockerPrint <- e
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
eg.Go(func() error {
|
||||||
|
return c.printfn(dispCtx, eventsPrint)
|
||||||
|
})
|
||||||
|
|
||||||
|
eg.Go(func() error {
|
||||||
|
return c.dockerprintfn(dispCtx, eventsDockerPrint, os.Stderr)
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
eg.Go(func() error {
|
||||||
|
return c.printfn(dispCtx, events)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
eg.Go(c.printfn(dispCtx, events, eventsdup))
|
|
||||||
// Retrieve output
|
// Retrieve output
|
||||||
eg.Go(c.outputfn(ctx, outr, out))
|
eg.Go(func() error {
|
||||||
|
defer outr.Close()
|
||||||
|
return c.outputfn(ctx, outr, out)
|
||||||
|
})
|
||||||
return out, eg.Wait()
|
return out, eg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) buildfn(ctx context.Context, ch chan *bk.SolveStatus, w io.WriteCloser) func() error {
|
func (c *Client) buildfn(ctx context.Context, ch chan *bk.SolveStatus, w io.WriteCloser) error {
|
||||||
return func() (err error) {
|
boot, err := c.BootScript()
|
||||||
defer func() {
|
if err != nil {
|
||||||
debugf("buildfn complete, err=%q", err)
|
return errors.Wrap(err, "assemble boot script")
|
||||||
if err != nil {
|
}
|
||||||
// Close exporter pipe so that export processor can return
|
bootSource, err := boot.Value().Source()
|
||||||
w.Close()
|
if err != nil {
|
||||||
}
|
return errors.Wrap(err, "serialize boot script")
|
||||||
}()
|
}
|
||||||
boot, err := c.BootScript()
|
debugf("client: assembled boot script: %s\n", bootSource)
|
||||||
if err != nil {
|
// Setup solve options
|
||||||
close(ch)
|
opts := bk.SolveOpt{
|
||||||
return errors.Wrap(err, "assemble boot script")
|
FrontendAttrs: map[string]string{
|
||||||
}
|
bkInputKey: c.input,
|
||||||
bootSource, err := boot.Value().Source()
|
bkBootKey: string(bootSource),
|
||||||
if err != nil {
|
},
|
||||||
close(ch)
|
LocalDirs: map[string]string{},
|
||||||
return errors.Wrap(err, "serialize boot script")
|
// FIXME: catch output & return as cue value
|
||||||
}
|
Exports: []bk.ExportEntry{
|
||||||
debugf("client: assembled boot script: %s\n", bootSource)
|
{
|
||||||
// Setup solve options
|
Type: bk.ExporterTar,
|
||||||
opts := bk.SolveOpt{
|
Output: func(m map[string]string) (io.WriteCloser, error) {
|
||||||
FrontendAttrs: map[string]string{
|
return w, nil
|
||||||
bkInputKey: c.input,
|
|
||||||
bkBootKey: string(bootSource),
|
|
||||||
},
|
|
||||||
LocalDirs: map[string]string{},
|
|
||||||
// FIXME: catch output & return as cue value
|
|
||||||
Exports: []bk.ExportEntry{
|
|
||||||
{
|
|
||||||
Type: bk.ExporterTar,
|
|
||||||
Output: func(m map[string]string) (io.WriteCloser, error) {
|
|
||||||
return w, nil
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
},
|
||||||
// Connect local dirs
|
|
||||||
localdirs, err := c.LocalDirs()
|
|
||||||
if err != nil {
|
|
||||||
close(ch)
|
|
||||||
return errors.Wrap(err, "connect local dirs")
|
|
||||||
}
|
|
||||||
for _, dir := range localdirs {
|
|
||||||
opts.LocalDirs[dir] = dir
|
|
||||||
}
|
|
||||||
// Call buildkit solver
|
|
||||||
resp, err := c.c.Build(ctx, opts, "", Compute, ch)
|
|
||||||
if err != nil {
|
|
||||||
err = errors.New(bkCleanError(err.Error()))
|
|
||||||
return errors.Wrap(err, "buildkit solve")
|
|
||||||
}
|
|
||||||
for k, v := range resp.ExporterResponse {
|
|
||||||
// FIXME consume exporter response
|
|
||||||
fmt.Printf("exporter response: %s=%s\n", k, v)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
// Connect local dirs
|
||||||
|
localdirs, err := c.LocalDirs()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "connect local dirs")
|
||||||
|
}
|
||||||
|
for _, dir := range localdirs {
|
||||||
|
opts.LocalDirs[dir] = dir
|
||||||
|
}
|
||||||
|
// Call buildkit solver
|
||||||
|
resp, err := c.c.Build(ctx, opts, "", Compute, ch)
|
||||||
|
if err != nil {
|
||||||
|
err = errors.New(bkCleanError(err.Error()))
|
||||||
|
return errors.Wrap(err, "buildkit solve")
|
||||||
|
}
|
||||||
|
for k, v := range resp.ExporterResponse {
|
||||||
|
// FIXME consume exporter response
|
||||||
|
fmt.Printf("exporter response: %s=%s\n", k, v)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read tar export stream from buildkit Build(), and extract cue output
|
// Read tar export stream from buildkit Build(), and extract cue output
|
||||||
func (c *Client) outputfn(_ context.Context, r io.Reader, out *Value) func() error {
|
func (c *Client) outputfn(_ context.Context, r io.Reader, out *Value) error {
|
||||||
return func() error {
|
defer debugf("outputfn complete")
|
||||||
defer debugf("outputfn complete")
|
tr := tar.NewReader(r)
|
||||||
tr := tar.NewReader(r)
|
for {
|
||||||
for {
|
h, err := tr.Next()
|
||||||
h, err := tr.Next()
|
if err == io.EOF {
|
||||||
if err == io.EOF {
|
break
|
||||||
break
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "read tar stream")
|
|
||||||
}
|
|
||||||
if !strings.HasSuffix(h.Name, ".cue") {
|
|
||||||
debugf("skipping non-cue file from exporter tar stream: %s", h.Name)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
debugf("outputfn: compiling & merging %q", h.Name)
|
|
||||||
|
|
||||||
cc := out.Compiler()
|
|
||||||
v, err := cc.Compile(h.Name, tr)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := out.Fill(v); err != nil {
|
|
||||||
return errors.Wrap(err, h.Name)
|
|
||||||
}
|
|
||||||
debugf("outputfn: DONE: compiling & merging %q", h.Name)
|
|
||||||
}
|
}
|
||||||
return nil
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "read tar stream")
|
||||||
|
}
|
||||||
|
if !strings.HasSuffix(h.Name, ".cue") {
|
||||||
|
debugf("skipping non-cue file from exporter tar stream: %s", h.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
debugf("outputfn: compiling & merging %q", h.Name)
|
||||||
|
|
||||||
|
cc := out.Compiler()
|
||||||
|
v, err := cc.Compile(h.Name, tr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := out.Fill(v); err != nil {
|
||||||
|
return errors.Wrap(err, h.Name)
|
||||||
|
}
|
||||||
|
debugf("outputfn: DONE: compiling & merging %q", h.Name)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Status of a node in the config tree being computed
|
// Status of a node in the config tree being computed
|
||||||
@ -269,64 +284,53 @@ func (n Node) LogError(errmsg string) {
|
|||||||
n.Logf("ERROR: %s", bkCleanError(errmsg))
|
n.Logf("ERROR: %s", bkCleanError(errmsg))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) printfn(ctx context.Context, ch, ch2 chan *bk.SolveStatus) func() error {
|
func (c *Client) printfn(ctx context.Context, ch chan *bk.SolveStatus) error {
|
||||||
return func() error {
|
// Node status mapped to buildkit vertex digest
|
||||||
// Node status mapped to buildkit vertex digest
|
nodesByDigest := map[string]*Node{}
|
||||||
nodesByDigest := map[string]*Node{}
|
// Node status mapped to cue path
|
||||||
// Node status mapped to cue path
|
nodesByPath := map[string]*Node{}
|
||||||
nodesByPath := map[string]*Node{}
|
|
||||||
|
|
||||||
defer debugf("printfn complete")
|
defer debugf("printfn complete")
|
||||||
if ch2 != nil {
|
for {
|
||||||
defer close(ch2)
|
select {
|
||||||
}
|
case <-ctx.Done():
|
||||||
ticker := time.NewTicker(150 * time.Millisecond)
|
return ctx.Err()
|
||||||
defer ticker.Stop()
|
case status, ok := <-ch:
|
||||||
for {
|
if !ok {
|
||||||
select {
|
return nil
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
case <-ticker.C:
|
|
||||||
case status, ok := <-ch:
|
|
||||||
if !ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if ch2 != nil {
|
|
||||||
ch2 <- status
|
|
||||||
}
|
|
||||||
debugf("status event: vertexes:%d statuses:%d logs:%d\n",
|
|
||||||
len(status.Vertexes),
|
|
||||||
len(status.Statuses),
|
|
||||||
len(status.Logs),
|
|
||||||
)
|
|
||||||
for _, v := range status.Vertexes {
|
|
||||||
// FIXME: insert raw buildkit telemetry here (ie for debugging, etc.)
|
|
||||||
|
|
||||||
// IF a buildkit vertex has a valid cue path as name, extract additional info:
|
|
||||||
p := cue.ParsePath(v.Name)
|
|
||||||
if err := p.Err(); err != nil {
|
|
||||||
// Not a valid cue path: skip.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
n := &Node{
|
|
||||||
Path: p,
|
|
||||||
Vertex: v,
|
|
||||||
}
|
|
||||||
nodesByPath[n.Path.String()] = n
|
|
||||||
nodesByDigest[n.Digest.String()] = n
|
|
||||||
if n.Error != "" {
|
|
||||||
n.LogError(n.Error)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, log := range status.Logs {
|
|
||||||
if n, ok := nodesByDigest[log.Vertex.String()]; ok {
|
|
||||||
n.LogStream(log.Stream, log.Data)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// debugJSON(status)
|
|
||||||
// FIXME: callbacks for extracting stream/result
|
|
||||||
// see proto 67
|
|
||||||
}
|
}
|
||||||
|
debugf("status event: vertexes:%d statuses:%d logs:%d\n",
|
||||||
|
len(status.Vertexes),
|
||||||
|
len(status.Statuses),
|
||||||
|
len(status.Logs),
|
||||||
|
)
|
||||||
|
for _, v := range status.Vertexes {
|
||||||
|
// FIXME: insert raw buildkit telemetry here (ie for debugging, etc.)
|
||||||
|
|
||||||
|
// IF a buildkit vertex has a valid cue path as name, extract additional info:
|
||||||
|
p := cue.ParsePath(v.Name)
|
||||||
|
if err := p.Err(); err != nil {
|
||||||
|
// Not a valid cue path: skip.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
n := &Node{
|
||||||
|
Path: p,
|
||||||
|
Vertex: v,
|
||||||
|
}
|
||||||
|
nodesByPath[n.Path.String()] = n
|
||||||
|
nodesByDigest[n.Digest.String()] = n
|
||||||
|
if n.Error != "" {
|
||||||
|
n.LogError(n.Error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, log := range status.Logs {
|
||||||
|
if n, ok := nodesByDigest[log.Vertex.String()]; ok {
|
||||||
|
n.LogStream(log.Stream, log.Data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// debugJSON(status)
|
||||||
|
// FIXME: callbacks for extracting stream/result
|
||||||
|
// see proto 67
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -346,11 +350,9 @@ func bkCleanError(msg string) string {
|
|||||||
return msg
|
return msg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) dockerprintfn(ctx context.Context, ch chan *bk.SolveStatus, out io.Writer) func() error {
|
func (c *Client) dockerprintfn(ctx context.Context, ch chan *bk.SolveStatus, out io.Writer) error {
|
||||||
return func() error {
|
defer debugf("dockerprintfn complete")
|
||||||
defer debugf("dockerprintfn complete")
|
var cons console.Console
|
||||||
var cons console.Console
|
// FIXME: use smarter writer from blr
|
||||||
// FIXME: use smarter writer from blr
|
return progressui.DisplaySolveStatus(ctx, "", cons, out, ch)
|
||||||
return progressui.DisplaySolveStatus(ctx, "", cons, out, ch)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user