Merge pull request #95 from blocklayerhq/simplify
simplify: remove compute.go
This commit is contained in:
commit
bc94452c83
262
dagger/client.go
262
dagger/client.go
@ -3,7 +3,6 @@ package dagger
|
|||||||
import (
|
import (
|
||||||
"archive/tar"
|
"archive/tar"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@ -15,11 +14,11 @@ import (
|
|||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
|
|
||||||
// Cue
|
// Cue
|
||||||
"cuelang.org/go/cue"
|
|
||||||
|
|
||||||
// buildkit
|
// buildkit
|
||||||
bk "github.com/moby/buildkit/client"
|
bk "github.com/moby/buildkit/client"
|
||||||
_ "github.com/moby/buildkit/client/connhelper/dockercontainer" // import the container connection driver
|
_ "github.com/moby/buildkit/client/connhelper/dockercontainer" // import the container connection driver
|
||||||
|
bkgw "github.com/moby/buildkit/frontend/gateway/client"
|
||||||
|
|
||||||
// docker output
|
// docker output
|
||||||
"github.com/containerd/console"
|
"github.com/containerd/console"
|
||||||
@ -28,15 +27,11 @@ import (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
defaultBuildkitHost = "docker-container://buildkitd"
|
defaultBuildkitHost = "docker-container://buildkitd"
|
||||||
bkUpdaterKey = "updater"
|
|
||||||
bkInputKey = "input"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// A dagger client
|
// A dagger client
|
||||||
type Client struct {
|
type Client struct {
|
||||||
c *bk.Client
|
c *bk.Client
|
||||||
|
|
||||||
localdirs map[string]string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(ctx context.Context, host string) (*Client, error) {
|
func NewClient(ctx context.Context, host string) (*Client, error) {
|
||||||
@ -56,102 +51,61 @@ func NewClient(ctx context.Context, host string) (*Client, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: return completed *Env, instead of *Value
|
// FIXME: return completed *Env, instead of *Value
|
||||||
func (c *Client) Compute(ctx context.Context, env *Env) (o *Value, err error) {
|
func (c *Client) Compute(ctx context.Context, env *Env) (*Value, error) {
|
||||||
lg := log.Ctx(ctx)
|
lg := log.Ctx(ctx)
|
||||||
defer func() {
|
|
||||||
if err != nil {
|
|
||||||
// Expand cue errors to get full details
|
|
||||||
err = cueErr(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
// Scan local dirs to grant access
|
|
||||||
localdirs, err := env.LocalDirs(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "scan local dirs")
|
|
||||||
}
|
|
||||||
for label, dir := range localdirs {
|
|
||||||
abs, err := filepath.Abs(dir)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
localdirs[label] = abs
|
|
||||||
}
|
|
||||||
c.localdirs = localdirs
|
|
||||||
|
|
||||||
// FIXME: merge this into env output.
|
eg, gctx := errgroup.WithContext(ctx)
|
||||||
out, err := env.Compiler().EmptyStruct()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Spawn Build() goroutine
|
// Spawn print function
|
||||||
eg, ctx := errgroup.WithContext(ctx)
|
var events chan *bk.SolveStatus
|
||||||
events := make(chan *bk.SolveStatus)
|
if os.Getenv("DOCKER_OUTPUT") != "" {
|
||||||
outr, outw := io.Pipe()
|
events = make(chan *bk.SolveStatus)
|
||||||
|
eg.Go(func() error {
|
||||||
|
dispCtx := context.TODO()
|
||||||
|
return c.dockerprintfn(dispCtx, events, lg)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Spawn build function
|
// Spawn build function
|
||||||
|
outr, outw := io.Pipe()
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
defer outw.Close()
|
defer outw.Close()
|
||||||
return c.buildfn(ctx, env, events, outw)
|
return c.buildfn(gctx, env, events, outw)
|
||||||
})
|
})
|
||||||
|
|
||||||
// Spawn print function(s)
|
// Spawn output retriever
|
||||||
dispCtx := context.TODO()
|
var (
|
||||||
if os.Getenv("DOCKER_OUTPUT") != "" {
|
out *Value
|
||||||
// Multiplex events
|
err error
|
||||||
eventsPrint := make(chan *bk.SolveStatus)
|
)
|
||||||
eventsDockerPrint := make(chan *bk.SolveStatus)
|
|
||||||
eg.Go(func() error {
|
|
||||||
defer close(eventsPrint)
|
|
||||||
defer close(eventsDockerPrint)
|
|
||||||
|
|
||||||
for e := range events {
|
|
||||||
eventsPrint <- e
|
|
||||||
eventsDockerPrint <- e
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
eg.Go(func() error {
|
|
||||||
return c.printfn(dispCtx, eventsPrint)
|
|
||||||
})
|
|
||||||
|
|
||||||
eg.Go(func() error {
|
|
||||||
return c.dockerprintfn(dispCtx, eventsDockerPrint, lg)
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
eg.Go(func() error {
|
|
||||||
return c.printfn(dispCtx, events)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Retrieve output
|
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
defer outr.Close()
|
defer outr.Close()
|
||||||
return c.outputfn(ctx, outr, out, env.cc)
|
out, err = c.outputfn(gctx, outr, env.cc)
|
||||||
|
return err
|
||||||
})
|
})
|
||||||
return out, eg.Wait()
|
|
||||||
|
return out, cueErr(eg.Wait())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) buildfn(ctx context.Context, env *Env, ch chan *bk.SolveStatus, w io.WriteCloser) error {
|
func (c *Client) buildfn(ctx context.Context, env *Env, ch chan *bk.SolveStatus, w io.WriteCloser) error {
|
||||||
lg := log.Ctx(ctx)
|
lg := log.Ctx(ctx)
|
||||||
|
|
||||||
// Serialize input and updater
|
// Scan local dirs to grant access
|
||||||
input, err := env.Input().SourceString()
|
localdirs, err := env.LocalDirs(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "serialize env input")
|
return errors.Wrap(err, "scan local dirs")
|
||||||
}
|
}
|
||||||
updater, err := env.Updater().Value().SourceString()
|
for label, dir := range localdirs {
|
||||||
if err != nil {
|
abs, err := filepath.Abs(dir)
|
||||||
return errors.Wrap(err, "serialize updater script")
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
localdirs[label] = abs
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setup solve options
|
// Setup solve options
|
||||||
opts := bk.SolveOpt{
|
opts := bk.SolveOpt{
|
||||||
FrontendAttrs: map[string]string{
|
LocalDirs: localdirs,
|
||||||
bkInputKey: input,
|
|
||||||
bkUpdaterKey: updater,
|
|
||||||
},
|
|
||||||
LocalDirs: c.localdirs,
|
|
||||||
// FIXME: catch output & return as cue value
|
// FIXME: catch output & return as cue value
|
||||||
Exports: []bk.ExportEntry{
|
Exports: []bk.ExportEntry{
|
||||||
{
|
{
|
||||||
@ -162,12 +116,33 @@ func (c *Client) buildfn(ctx context.Context, env *Env, ch chan *bk.SolveStatus,
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call buildkit solver
|
// Call buildkit solver
|
||||||
lg.Debug().
|
lg.Debug().
|
||||||
Interface("localdirs", opts.LocalDirs).
|
Interface("localdirs", opts.LocalDirs).
|
||||||
Interface("attrs", opts.FrontendAttrs).
|
Interface("attrs", opts.FrontendAttrs).
|
||||||
Msg("spawning buildkit job")
|
Msg("spawning buildkit job")
|
||||||
resp, err := c.c.Build(ctx, opts, "", Compute, ch)
|
|
||||||
|
resp, err := c.c.Build(ctx, opts, "", func(ctx context.Context, c bkgw.Client) (*bkgw.Result, error) {
|
||||||
|
s := NewSolver(c)
|
||||||
|
|
||||||
|
if err := env.Update(ctx, s); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
lg.Debug().Msg("computing env")
|
||||||
|
// Compute output overlay
|
||||||
|
if err := env.Compute(ctx, s); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
lg.Debug().Msg("exporting env")
|
||||||
|
// Export env to a cue directory
|
||||||
|
outdir, err := env.Export(s.Scratch())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Wrap cue directory in buildkit result
|
||||||
|
return outdir.Result(ctx)
|
||||||
|
}, ch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(bkCleanError(err), "buildkit solve")
|
return errors.Wrap(bkCleanError(err), "buildkit solve")
|
||||||
}
|
}
|
||||||
@ -183,9 +158,15 @@ func (c *Client) buildfn(ctx context.Context, env *Env, ch chan *bk.SolveStatus,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Read tar export stream from buildkit Build(), and extract cue output
|
// Read tar export stream from buildkit Build(), and extract cue output
|
||||||
func (c *Client) outputfn(ctx context.Context, r io.Reader, out *Value, cc *Compiler) error {
|
func (c *Client) outputfn(ctx context.Context, r io.Reader, cc *Compiler) (*Value, error) {
|
||||||
lg := log.Ctx(ctx)
|
lg := log.Ctx(ctx)
|
||||||
|
|
||||||
|
// FIXME: merge this into env output.
|
||||||
|
out, err := cc.EmptyStruct()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
tr := tar.NewReader(r)
|
tr := tar.NewReader(r)
|
||||||
for {
|
for {
|
||||||
h, err := tr.Next()
|
h, err := tr.Next()
|
||||||
@ -193,7 +174,7 @@ func (c *Client) outputfn(ctx context.Context, r io.Reader, out *Value, cc *Comp
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "read tar stream")
|
return nil, errors.Wrap(err, "read tar stream")
|
||||||
}
|
}
|
||||||
|
|
||||||
lg := lg.
|
lg := lg.
|
||||||
@ -209,122 +190,13 @@ func (c *Client) outputfn(ctx context.Context, r io.Reader, out *Value, cc *Comp
|
|||||||
|
|
||||||
v, err := cc.Compile(h.Name, tr)
|
v, err := cc.Compile(h.Name, tr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err := out.Fill(v); err != nil {
|
if err := out.Fill(v); err != nil {
|
||||||
return errors.Wrap(err, h.Name)
|
return nil, errors.Wrap(err, h.Name)
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Status of a node in the config tree being computed
|
|
||||||
// Node may be a component, or a value within a component
|
|
||||||
// (eg. a script or individual operation in a script)
|
|
||||||
type Node struct {
|
|
||||||
Path cue.Path
|
|
||||||
*bk.Vertex
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n Node) ComponentPath() cue.Path {
|
|
||||||
parts := []cue.Selector{}
|
|
||||||
for _, sel := range n.Path.Selectors() {
|
|
||||||
if strings.HasPrefix(sel.String(), "#") {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
parts = append(parts, sel)
|
|
||||||
}
|
|
||||||
return cue.MakePath(parts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n Node) Logf(ctx context.Context, msg string, args ...interface{}) {
|
|
||||||
componentPath := n.ComponentPath().String()
|
|
||||||
args = append([]interface{}{componentPath}, args...)
|
|
||||||
if msg != "" && !strings.HasSuffix(msg, "\n") {
|
|
||||||
msg += "\n"
|
|
||||||
}
|
|
||||||
fmt.Fprintf(os.Stderr, "[%s] "+msg, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n Node) LogStream(ctx context.Context, nStream int, data []byte) {
|
|
||||||
lg := log.
|
|
||||||
Ctx(ctx).
|
|
||||||
With().
|
|
||||||
Str("path", n.ComponentPath().String()).
|
|
||||||
Logger()
|
|
||||||
|
|
||||||
switch nStream {
|
|
||||||
case 1:
|
|
||||||
lg = lg.With().Str("stream", "stdout").Logger()
|
|
||||||
case 2:
|
|
||||||
lg = lg.With().Str("stream", "stderr").Logger()
|
|
||||||
default:
|
|
||||||
lg = lg.With().Str("stream", fmt.Sprintf("%d", nStream)).Logger()
|
|
||||||
}
|
|
||||||
|
|
||||||
lg.Debug().Msg(string(data))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n Node) LogError(ctx context.Context, errmsg string) {
|
|
||||||
log.
|
|
||||||
Ctx(ctx).
|
|
||||||
Error().
|
|
||||||
Str("path", n.ComponentPath().String()).
|
|
||||||
Msg(errmsg)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) printfn(ctx context.Context, ch chan *bk.SolveStatus) error {
|
|
||||||
lg := log.Ctx(ctx)
|
|
||||||
|
|
||||||
// Node status mapped to buildkit vertex digest
|
|
||||||
nodesByDigest := map[string]*Node{}
|
|
||||||
// Node status mapped to cue path
|
|
||||||
nodesByPath := map[string]*Node{}
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
case status, ok := <-ch:
|
|
||||||
if !ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
lg.
|
|
||||||
Debug().
|
|
||||||
Int("vertexes", len(status.Vertexes)).
|
|
||||||
Int("statuses", len(status.Statuses)).
|
|
||||||
Int("logs", len(status.Logs)).
|
|
||||||
Msg("status event")
|
|
||||||
|
|
||||||
for _, v := range status.Vertexes {
|
|
||||||
// FIXME: insert raw buildkit telemetry here (ie for debugging, etc.)
|
|
||||||
|
|
||||||
// IF a buildkit vertex has a valid cue path as name, extract additional info:
|
|
||||||
p := cue.ParsePath(v.Name)
|
|
||||||
if err := p.Err(); err != nil {
|
|
||||||
// Not a valid cue path: skip.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
n := &Node{
|
|
||||||
Path: p,
|
|
||||||
Vertex: v,
|
|
||||||
}
|
|
||||||
nodesByPath[n.Path.String()] = n
|
|
||||||
nodesByDigest[n.Digest.String()] = n
|
|
||||||
if n.Error != "" {
|
|
||||||
n.LogError(ctx, n.Error)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, log := range status.Logs {
|
|
||||||
if n, ok := nodesByDigest[log.Vertex.String()]; ok {
|
|
||||||
n.LogStream(ctx, log.Stream, log.Data)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// debugJSON(status)
|
|
||||||
// FIXME: callbacks for extracting stream/result
|
|
||||||
// see proto 67
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) dockerprintfn(ctx context.Context, ch chan *bk.SolveStatus, out io.Writer) error {
|
func (c *Client) dockerprintfn(ctx context.Context, ch chan *bk.SolveStatus, out io.Writer) error {
|
||||||
|
@ -48,6 +48,9 @@ func (cc *Compiler) EmptyStruct() (*Value, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cc *Compiler) Compile(name string, src interface{}) (*Value, error) {
|
func (cc *Compiler) Compile(name string, src interface{}) (*Value, error) {
|
||||||
|
cc.Lock()
|
||||||
|
defer cc.Unlock()
|
||||||
|
|
||||||
inst, err := cc.Cue().Compile(name, src)
|
inst, err := cc.Cue().Compile(name, src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// FIXME: cleaner way to unwrap cue error details?
|
// FIXME: cleaner way to unwrap cue error details?
|
||||||
|
@ -1,58 +0,0 @@
|
|||||||
package dagger
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
cueerrors "cuelang.org/go/cue/errors"
|
|
||||||
bkgw "github.com/moby/buildkit/frontend/gateway/client"
|
|
||||||
"github.com/rs/zerolog/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Buildkit compute entrypoint (BK calls if "solve" or "build")
|
|
||||||
// Use by wrapping in a buildkit client Build call, or buildkit frontend.
|
|
||||||
func Compute(ctx context.Context, c bkgw.Client) (r *bkgw.Result, err error) {
|
|
||||||
lg := log.Ctx(ctx)
|
|
||||||
// FIXME: wrap errors to avoid crashing buildkit Build()
|
|
||||||
// with cue error types (why??)
|
|
||||||
defer func() {
|
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("%s", cueerrors.Details(err, nil))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
s := NewSolver(c)
|
|
||||||
// Retrieve updater script form client
|
|
||||||
var updater interface{}
|
|
||||||
if o, exists := c.BuildOpts().Opts[bkUpdaterKey]; exists {
|
|
||||||
updater = o
|
|
||||||
}
|
|
||||||
env, err := NewEnv()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err := env.SetUpdater(updater); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err := env.Update(ctx, s); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if input, exists := c.BuildOpts().Opts["input"]; exists {
|
|
||||||
if err := env.SetInput(input); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
lg.Debug().Msg("computing env")
|
|
||||||
// Compute output overlay
|
|
||||||
if err := env.Compute(ctx, s); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
lg.Debug().Msg("exporting env")
|
|
||||||
// Export env to a cue directory
|
|
||||||
outdir, err := env.Export(s.Scratch())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// Wrap cue directory in buildkit result
|
|
||||||
return outdir.Result(ctx)
|
|
||||||
}
|
|
@ -10,6 +10,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func cueErr(err error) error {
|
func cueErr(err error) error {
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return errors.New(cueerrors.Details(err, &cueerrors.Config{}))
|
return errors.New(cueerrors.Details(err, &cueerrors.Config{}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -255,6 +255,9 @@ func (v *Value) Validate(defs ...string) error {
|
|||||||
|
|
||||||
// Return cue source for this value
|
// Return cue source for this value
|
||||||
func (v *Value) Source() ([]byte, error) {
|
func (v *Value) Source() ([]byte, error) {
|
||||||
|
v.cc.RLock()
|
||||||
|
defer v.cc.RUnlock()
|
||||||
|
|
||||||
return cueformat.Node(v.val.Eval().Syntax())
|
return cueformat.Node(v.val.Eval().Syntax())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user