From 10224682f766bfee75bc20260815921e88ca5838 Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Mon, 11 Jan 2021 11:51:15 -0800 Subject: [PATCH] client: keep resource initialization and cleanup together Signed-off-by: Andrea Luzzardi --- dagger/client.go | 308 ++++++++++++++++++++++++----------------------- 1 file changed, 155 insertions(+), 153 deletions(-) diff --git a/dagger/client.go b/dagger/client.go index 0dc4f1b0..ee2c782b 100644 --- a/dagger/client.go +++ b/dagger/client.go @@ -7,7 +7,6 @@ import ( "io" "os" "strings" - "time" "github.com/pkg/errors" "golang.org/x/sync/errgroup" @@ -112,112 +111,128 @@ func (c *Client) Compute(ctx context.Context) (*Value, error) { eg, ctx := errgroup.WithContext(ctx) events := make(chan *bk.SolveStatus) outr, outw := io.Pipe() + // Spawn build function - eg.Go(c.buildfn(ctx, events, outw)) + eg.Go(func() error { + defer outw.Close() + return c.buildfn(ctx, events, outw) + }) + // Spawn print function(s) dispCtx := context.TODO() - var eventsdup chan *bk.SolveStatus if os.Getenv("DOCKER_OUTPUT") != "" { - eventsdup = make(chan *bk.SolveStatus) - eg.Go(c.dockerprintfn(dispCtx, eventsdup, os.Stderr)) + // Multiplex events + eventsPrint := make(chan *bk.SolveStatus) + eventsDockerPrint := make(chan *bk.SolveStatus) + eg.Go(func() error { + defer close(eventsPrint) + defer close(eventsDockerPrint) + + for e := range events { + eventsPrint <- e + eventsDockerPrint <- e + } + return nil + }) + + eg.Go(func() error { + return c.printfn(dispCtx, eventsPrint) + }) + + eg.Go(func() error { + return c.dockerprintfn(dispCtx, eventsDockerPrint, os.Stderr) + }) + } else { + eg.Go(func() error { + return c.printfn(dispCtx, events) + }) } - eg.Go(c.printfn(dispCtx, events, eventsdup)) + // Retrieve output - eg.Go(c.outputfn(ctx, outr, out)) + eg.Go(func() error { + defer outr.Close() + return c.outputfn(ctx, outr, out) + }) return out, eg.Wait() } -func (c *Client) buildfn(ctx context.Context, ch chan *bk.SolveStatus, w io.WriteCloser) func() error { - return func() (err error) { - defer func() { - debugf("buildfn complete, err=%q", err) - if err != nil { - // Close exporter pipe so that export processor can return - w.Close() - } - }() - boot, err := c.BootScript() - if err != nil { - close(ch) - return errors.Wrap(err, "assemble boot script") - } - bootSource, err := boot.Value().Source() - if err != nil { - close(ch) - return errors.Wrap(err, "serialize boot script") - } - debugf("client: assembled boot script: %s\n", bootSource) - // Setup solve options - opts := bk.SolveOpt{ - FrontendAttrs: map[string]string{ - bkInputKey: c.input, - bkBootKey: string(bootSource), - }, - LocalDirs: map[string]string{}, - // FIXME: catch output & return as cue value - Exports: []bk.ExportEntry{ - { - Type: bk.ExporterTar, - Output: func(m map[string]string) (io.WriteCloser, error) { - return w, nil - }, +func (c *Client) buildfn(ctx context.Context, ch chan *bk.SolveStatus, w io.WriteCloser) error { + boot, err := c.BootScript() + if err != nil { + return errors.Wrap(err, "assemble boot script") + } + bootSource, err := boot.Value().Source() + if err != nil { + return errors.Wrap(err, "serialize boot script") + } + debugf("client: assembled boot script: %s\n", bootSource) + // Setup solve options + opts := bk.SolveOpt{ + FrontendAttrs: map[string]string{ + bkInputKey: c.input, + bkBootKey: string(bootSource), + }, + LocalDirs: map[string]string{}, + // FIXME: catch output & return as cue value + Exports: []bk.ExportEntry{ + { + Type: bk.ExporterTar, + Output: func(m map[string]string) (io.WriteCloser, error) { + return w, nil }, }, - } - // Connect local dirs - localdirs, err := c.LocalDirs() - if err != nil { - close(ch) - return errors.Wrap(err, "connect local dirs") - } - for _, dir := range localdirs { - opts.LocalDirs[dir] = dir - } - // Call buildkit solver - resp, err := c.c.Build(ctx, opts, "", Compute, ch) - if err != nil { - err = errors.New(bkCleanError(err.Error())) - return errors.Wrap(err, "buildkit solve") - } - for k, v := range resp.ExporterResponse { - // FIXME consume exporter response - fmt.Printf("exporter response: %s=%s\n", k, v) - } - return nil + }, } + // Connect local dirs + localdirs, err := c.LocalDirs() + if err != nil { + return errors.Wrap(err, "connect local dirs") + } + for _, dir := range localdirs { + opts.LocalDirs[dir] = dir + } + // Call buildkit solver + resp, err := c.c.Build(ctx, opts, "", Compute, ch) + if err != nil { + err = errors.New(bkCleanError(err.Error())) + return errors.Wrap(err, "buildkit solve") + } + for k, v := range resp.ExporterResponse { + // FIXME consume exporter response + fmt.Printf("exporter response: %s=%s\n", k, v) + } + return nil } // Read tar export stream from buildkit Build(), and extract cue output -func (c *Client) outputfn(_ context.Context, r io.Reader, out *Value) func() error { - return func() error { - defer debugf("outputfn complete") - tr := tar.NewReader(r) - for { - h, err := tr.Next() - if err == io.EOF { - break - } - if err != nil { - return errors.Wrap(err, "read tar stream") - } - if !strings.HasSuffix(h.Name, ".cue") { - debugf("skipping non-cue file from exporter tar stream: %s", h.Name) - continue - } - debugf("outputfn: compiling & merging %q", h.Name) - - cc := out.Compiler() - v, err := cc.Compile(h.Name, tr) - if err != nil { - return err - } - if err := out.Fill(v); err != nil { - return errors.Wrap(err, h.Name) - } - debugf("outputfn: DONE: compiling & merging %q", h.Name) +func (c *Client) outputfn(_ context.Context, r io.Reader, out *Value) error { + defer debugf("outputfn complete") + tr := tar.NewReader(r) + for { + h, err := tr.Next() + if err == io.EOF { + break } - return nil + if err != nil { + return errors.Wrap(err, "read tar stream") + } + if !strings.HasSuffix(h.Name, ".cue") { + debugf("skipping non-cue file from exporter tar stream: %s", h.Name) + continue + } + debugf("outputfn: compiling & merging %q", h.Name) + + cc := out.Compiler() + v, err := cc.Compile(h.Name, tr) + if err != nil { + return err + } + if err := out.Fill(v); err != nil { + return errors.Wrap(err, h.Name) + } + debugf("outputfn: DONE: compiling & merging %q", h.Name) } + return nil } // Status of a node in the config tree being computed @@ -269,64 +284,53 @@ func (n Node) LogError(errmsg string) { n.Logf("ERROR: %s", bkCleanError(errmsg)) } -func (c *Client) printfn(ctx context.Context, ch, ch2 chan *bk.SolveStatus) func() error { - return func() error { - // Node status mapped to buildkit vertex digest - nodesByDigest := map[string]*Node{} - // Node status mapped to cue path - nodesByPath := map[string]*Node{} +func (c *Client) printfn(ctx context.Context, ch chan *bk.SolveStatus) error { + // Node status mapped to buildkit vertex digest + nodesByDigest := map[string]*Node{} + // Node status mapped to cue path + nodesByPath := map[string]*Node{} - defer debugf("printfn complete") - if ch2 != nil { - defer close(ch2) - } - ticker := time.NewTicker(150 * time.Millisecond) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-ticker.C: - case status, ok := <-ch: - if !ok { - return nil - } - if ch2 != nil { - ch2 <- status - } - debugf("status event: vertexes:%d statuses:%d logs:%d\n", - len(status.Vertexes), - len(status.Statuses), - len(status.Logs), - ) - for _, v := range status.Vertexes { - // FIXME: insert raw buildkit telemetry here (ie for debugging, etc.) - - // IF a buildkit vertex has a valid cue path as name, extract additional info: - p := cue.ParsePath(v.Name) - if err := p.Err(); err != nil { - // Not a valid cue path: skip. - continue - } - n := &Node{ - Path: p, - Vertex: v, - } - nodesByPath[n.Path.String()] = n - nodesByDigest[n.Digest.String()] = n - if n.Error != "" { - n.LogError(n.Error) - } - } - for _, log := range status.Logs { - if n, ok := nodesByDigest[log.Vertex.String()]; ok { - n.LogStream(log.Stream, log.Data) - } - } - // debugJSON(status) - // FIXME: callbacks for extracting stream/result - // see proto 67 + defer debugf("printfn complete") + for { + select { + case <-ctx.Done(): + return ctx.Err() + case status, ok := <-ch: + if !ok { + return nil } + debugf("status event: vertexes:%d statuses:%d logs:%d\n", + len(status.Vertexes), + len(status.Statuses), + len(status.Logs), + ) + for _, v := range status.Vertexes { + // FIXME: insert raw buildkit telemetry here (ie for debugging, etc.) + + // IF a buildkit vertex has a valid cue path as name, extract additional info: + p := cue.ParsePath(v.Name) + if err := p.Err(); err != nil { + // Not a valid cue path: skip. + continue + } + n := &Node{ + Path: p, + Vertex: v, + } + nodesByPath[n.Path.String()] = n + nodesByDigest[n.Digest.String()] = n + if n.Error != "" { + n.LogError(n.Error) + } + } + for _, log := range status.Logs { + if n, ok := nodesByDigest[log.Vertex.String()]; ok { + n.LogStream(log.Stream, log.Data) + } + } + // debugJSON(status) + // FIXME: callbacks for extracting stream/result + // see proto 67 } } } @@ -346,11 +350,9 @@ func bkCleanError(msg string) string { return msg } -func (c *Client) dockerprintfn(ctx context.Context, ch chan *bk.SolveStatus, out io.Writer) func() error { - return func() error { - defer debugf("dockerprintfn complete") - var cons console.Console - // FIXME: use smarter writer from blr - return progressui.DisplaySolveStatus(ctx, "", cons, out, ch) - } +func (c *Client) dockerprintfn(ctx context.Context, ch chan *bk.SolveStatus, out io.Writer) error { + defer debugf("dockerprintfn complete") + var cons console.Console + // FIXME: use smarter writer from blr + return progressui.DisplaySolveStatus(ctx, "", cons, out, ch) }