9adeecd005
Previously, there was a data race condition error when `op .#PushContainer` was called. It was because of the `Export` func in `solver.go` that create a sub build pipeline and forward his events to the main events channel (that catch all events that we log). To fix it, channel are now split between build pipeline and centralize into the old output channel (`ch`). Signed-off-by: Tom Chauveau <tom.chauveau@epitech.eu>
305 lines
6.6 KiB
Go
305 lines
6.6 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
|
|
"go.opentelemetry.io/otel"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
|
|
// Cue
|
|
|
|
// buildkit
|
|
bk "github.com/moby/buildkit/client"
|
|
_ "github.com/moby/buildkit/client/connhelper/dockercontainer" // import the container connection driver
|
|
"github.com/moby/buildkit/client/llb"
|
|
bkgw "github.com/moby/buildkit/frontend/gateway/client"
|
|
"github.com/moby/buildkit/session"
|
|
|
|
// docker output
|
|
"go.dagger.io/dagger/util/buildkitd"
|
|
"go.dagger.io/dagger/util/progressui"
|
|
|
|
"go.dagger.io/dagger/compiler"
|
|
"go.dagger.io/dagger/environment"
|
|
"go.dagger.io/dagger/solver"
|
|
"go.dagger.io/dagger/state"
|
|
)
|
|
|
|
// A dagger client
|
|
type Client struct {
|
|
c *bk.Client
|
|
cfg Config
|
|
}
|
|
|
|
type Config struct {
|
|
NoCache bool
|
|
|
|
CacheExports []bk.CacheOptionsEntry
|
|
CacheImports []bk.CacheOptionsEntry
|
|
}
|
|
|
|
func New(ctx context.Context, host string, cfg Config) (*Client, error) {
|
|
if host == "" {
|
|
host = os.Getenv("BUILDKIT_HOST")
|
|
}
|
|
if host == "" {
|
|
h, err := buildkitd.Start(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
host = h
|
|
}
|
|
opts := []bk.ClientOpt{}
|
|
|
|
// FIXME: uncomment when next version of buildkit will be released
|
|
// if span := trace.SpanFromContext(ctx); span != nil {
|
|
// opts = append(opts, bk.WithTracerProvider(span.TracerProvider()))
|
|
// }
|
|
|
|
c, err := bk.New(ctx, host, opts...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("buildkit client: %w", err)
|
|
}
|
|
return &Client{
|
|
c: c,
|
|
cfg: cfg,
|
|
}, nil
|
|
}
|
|
|
|
type DoFunc func(context.Context, *environment.Environment, solver.Solver) error
|
|
|
|
// FIXME: return completed *Route, instead of *compiler.Value
|
|
func (c *Client) Do(ctx context.Context, state *state.State, fn DoFunc) error {
|
|
lg := log.Ctx(ctx)
|
|
eg, gctx := errgroup.WithContext(ctx)
|
|
|
|
environment, err := environment.New(state)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Spawn print function
|
|
events := make(chan *bk.SolveStatus)
|
|
eg.Go(func() error {
|
|
// Create a background context so that logging will not be cancelled
|
|
// with the main context.
|
|
dispCtx := lg.WithContext(context.Background())
|
|
return c.logSolveStatus(dispCtx, state, events)
|
|
})
|
|
|
|
// Spawn build function
|
|
eg.Go(func() error {
|
|
return c.buildfn(gctx, state, environment, fn, events)
|
|
})
|
|
|
|
return eg.Wait()
|
|
}
|
|
|
|
func (c *Client) buildfn(ctx context.Context, st *state.State, env *environment.Environment, fn DoFunc, ch chan *bk.SolveStatus) error {
|
|
// Close output channel
|
|
defer close(ch)
|
|
|
|
lg := log.Ctx(ctx)
|
|
|
|
// Scan local dirs to grant access
|
|
localdirs := env.LocalDirs()
|
|
for label, dir := range localdirs {
|
|
abs, err := filepath.Abs(dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
localdirs[label] = abs
|
|
}
|
|
|
|
// buildkit auth provider (registry)
|
|
auth := solver.NewRegistryAuthProvider()
|
|
|
|
// secrets
|
|
secrets := solver.NewSecretsProvider(st)
|
|
|
|
// Setup solve options
|
|
opts := bk.SolveOpt{
|
|
LocalDirs: localdirs,
|
|
Session: []session.Attachable{
|
|
auth,
|
|
secrets,
|
|
solver.NewDockerSocketProvider(),
|
|
},
|
|
CacheExports: c.cfg.CacheExports,
|
|
CacheImports: c.cfg.CacheImports,
|
|
}
|
|
|
|
// Call buildkit solver
|
|
lg.Debug().
|
|
Interface("localdirs", opts.LocalDirs).
|
|
Interface("attrs", opts.FrontendAttrs).
|
|
Msg("spawning buildkit job")
|
|
|
|
wg := sync.WaitGroup{}
|
|
// Catch output from events
|
|
catchOutput := func(inCh chan *bk.SolveStatus) {
|
|
for e := range inCh {
|
|
ch <- e
|
|
}
|
|
wg.Done()
|
|
}
|
|
|
|
// Catch solver's events
|
|
// Closed manually
|
|
eventsCh := make(chan *bk.SolveStatus)
|
|
wg.Add(1)
|
|
go catchOutput(eventsCh)
|
|
|
|
// Catch build events
|
|
// Closed by buildkit
|
|
buildCh := make(chan *bk.SolveStatus)
|
|
wg.Add(1)
|
|
go catchOutput(buildCh)
|
|
|
|
resp, err := c.c.Build(ctx, opts, "", func(ctx context.Context, gw bkgw.Client) (*bkgw.Result, error) {
|
|
// Close events channel
|
|
defer close(eventsCh)
|
|
|
|
s := solver.New(solver.Opts{
|
|
Control: c.c,
|
|
Gateway: gw,
|
|
Events: eventsCh,
|
|
Auth: auth,
|
|
Secrets: secrets,
|
|
NoCache: c.cfg.NoCache,
|
|
})
|
|
|
|
lg.Debug().Msg("loading configuration")
|
|
if err := env.LoadPlan(ctx, s); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Compute output overlay
|
|
if fn != nil {
|
|
if err := fn(ctx, env, s); err != nil {
|
|
return nil, compiler.Err(err)
|
|
}
|
|
}
|
|
|
|
// Export environment to a cue directory
|
|
// FIXME: this should be elsewhere
|
|
lg.Debug().Msg("exporting environment")
|
|
|
|
tr := otel.Tracer("client")
|
|
_, span := tr.Start(ctx, "environment.Export")
|
|
defer span.End()
|
|
|
|
computed := env.Computed().JSON().PrettyString()
|
|
st := llb.
|
|
Scratch().
|
|
File(
|
|
llb.Mkfile("computed.json", 0600, []byte(computed)),
|
|
llb.WithCustomName("[internal] serializing computed values"),
|
|
)
|
|
|
|
ref, err := s.Solve(ctx, st)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
res := bkgw.NewResult()
|
|
res.SetRef(ref)
|
|
return res, nil
|
|
}, buildCh)
|
|
if err != nil {
|
|
return solver.CleanError(err)
|
|
}
|
|
for k, v := range resp.ExporterResponse {
|
|
// FIXME consume exporter response
|
|
lg.
|
|
Debug().
|
|
Str("key", k).
|
|
Str("value", v).
|
|
Msg("exporter response")
|
|
}
|
|
|
|
// Wait until all the events are caught
|
|
wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) logSolveStatus(ctx context.Context, st *state.State, ch chan *bk.SolveStatus) error {
|
|
parseName := func(v *bk.Vertex) (string, string) {
|
|
// Pattern: `@name@ message`. Minimal length is len("@X@ ")
|
|
if len(v.Name) < 2 || !strings.HasPrefix(v.Name, "@") {
|
|
return "", v.Name
|
|
}
|
|
|
|
prefixEndPos := strings.Index(v.Name[1:], "@")
|
|
if prefixEndPos == -1 {
|
|
return "", v.Name
|
|
}
|
|
|
|
component := v.Name[1 : prefixEndPos+1]
|
|
return component, v.Name[prefixEndPos+3 : len(v.Name)]
|
|
}
|
|
|
|
// Just like sprintf, but redacts secrets automatically
|
|
secureSprintf := func(format string, a ...interface{}) string {
|
|
s := fmt.Sprintf(format, a...)
|
|
for _, i := range st.Inputs {
|
|
if i.Secret == nil {
|
|
continue
|
|
}
|
|
s = strings.ReplaceAll(s, i.Secret.PlainText(), "***")
|
|
}
|
|
return s
|
|
}
|
|
|
|
return progressui.PrintSolveStatus(ctx, ch,
|
|
func(v *bk.Vertex, index int) {
|
|
component, name := parseName(v)
|
|
lg := log.
|
|
Ctx(ctx).
|
|
With().
|
|
Str("component", component).
|
|
Logger()
|
|
|
|
lg.
|
|
Debug().
|
|
Msg(secureSprintf("#%d %s\n", index, name))
|
|
lg.
|
|
Debug().
|
|
Msg(secureSprintf("#%d %s\n", index, v.Digest))
|
|
},
|
|
func(v *bk.Vertex, format string, a ...interface{}) {
|
|
component, _ := parseName(v)
|
|
lg := log.
|
|
Ctx(ctx).
|
|
With().
|
|
Str("component", component).
|
|
Logger()
|
|
|
|
msg := secureSprintf(format, a...)
|
|
lg.
|
|
Debug().
|
|
Msg(msg)
|
|
},
|
|
func(v *bk.Vertex, stream int, partial bool, format string, a ...interface{}) {
|
|
component, _ := parseName(v)
|
|
lg := log.
|
|
Ctx(ctx).
|
|
With().
|
|
Str("component", component).
|
|
Logger()
|
|
|
|
msg := secureSprintf(format, a...)
|
|
lg.
|
|
Info().
|
|
Msg(msg)
|
|
},
|
|
)
|
|
}
|