This repository has been archived on 2024-04-08. You can view files and clone it, but cannot push or open issues or pull requests.
dagger/client/client.go
Tom Chauveau 9adeecd005
Fix data race issue by synchronizing events channel through waiting group
Previously, there was a data race condition error when `op
.#PushContainer` was called. It was because of the `Export` func
in `solver.go` that create a sub build pipeline and forward his
events to the main events channel (that catch all events that we log).
To fix it, channel are now split between build pipeline and
centralize into the old output channel (`ch`).

Signed-off-by: Tom Chauveau <tom.chauveau@epitech.eu>
2021-08-20 15:33:15 +02:00

305 lines
6.6 KiB
Go

package client
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"go.opentelemetry.io/otel"
"golang.org/x/sync/errgroup"
"github.com/rs/zerolog/log"
// Cue
// buildkit
bk "github.com/moby/buildkit/client"
_ "github.com/moby/buildkit/client/connhelper/dockercontainer" // import the container connection driver
"github.com/moby/buildkit/client/llb"
bkgw "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/session"
// docker output
"go.dagger.io/dagger/util/buildkitd"
"go.dagger.io/dagger/util/progressui"
"go.dagger.io/dagger/compiler"
"go.dagger.io/dagger/environment"
"go.dagger.io/dagger/solver"
"go.dagger.io/dagger/state"
)
// A dagger client
type Client struct {
c *bk.Client
cfg Config
}
type Config struct {
NoCache bool
CacheExports []bk.CacheOptionsEntry
CacheImports []bk.CacheOptionsEntry
}
func New(ctx context.Context, host string, cfg Config) (*Client, error) {
if host == "" {
host = os.Getenv("BUILDKIT_HOST")
}
if host == "" {
h, err := buildkitd.Start(ctx)
if err != nil {
return nil, err
}
host = h
}
opts := []bk.ClientOpt{}
// FIXME: uncomment when next version of buildkit will be released
// if span := trace.SpanFromContext(ctx); span != nil {
// opts = append(opts, bk.WithTracerProvider(span.TracerProvider()))
// }
c, err := bk.New(ctx, host, opts...)
if err != nil {
return nil, fmt.Errorf("buildkit client: %w", err)
}
return &Client{
c: c,
cfg: cfg,
}, nil
}
type DoFunc func(context.Context, *environment.Environment, solver.Solver) error
// FIXME: return completed *Route, instead of *compiler.Value
func (c *Client) Do(ctx context.Context, state *state.State, fn DoFunc) error {
lg := log.Ctx(ctx)
eg, gctx := errgroup.WithContext(ctx)
environment, err := environment.New(state)
if err != nil {
return err
}
// Spawn print function
events := make(chan *bk.SolveStatus)
eg.Go(func() error {
// Create a background context so that logging will not be cancelled
// with the main context.
dispCtx := lg.WithContext(context.Background())
return c.logSolveStatus(dispCtx, state, events)
})
// Spawn build function
eg.Go(func() error {
return c.buildfn(gctx, state, environment, fn, events)
})
return eg.Wait()
}
func (c *Client) buildfn(ctx context.Context, st *state.State, env *environment.Environment, fn DoFunc, ch chan *bk.SolveStatus) error {
// Close output channel
defer close(ch)
lg := log.Ctx(ctx)
// Scan local dirs to grant access
localdirs := env.LocalDirs()
for label, dir := range localdirs {
abs, err := filepath.Abs(dir)
if err != nil {
return err
}
localdirs[label] = abs
}
// buildkit auth provider (registry)
auth := solver.NewRegistryAuthProvider()
// secrets
secrets := solver.NewSecretsProvider(st)
// Setup solve options
opts := bk.SolveOpt{
LocalDirs: localdirs,
Session: []session.Attachable{
auth,
secrets,
solver.NewDockerSocketProvider(),
},
CacheExports: c.cfg.CacheExports,
CacheImports: c.cfg.CacheImports,
}
// Call buildkit solver
lg.Debug().
Interface("localdirs", opts.LocalDirs).
Interface("attrs", opts.FrontendAttrs).
Msg("spawning buildkit job")
wg := sync.WaitGroup{}
// Catch output from events
catchOutput := func(inCh chan *bk.SolveStatus) {
for e := range inCh {
ch <- e
}
wg.Done()
}
// Catch solver's events
// Closed manually
eventsCh := make(chan *bk.SolveStatus)
wg.Add(1)
go catchOutput(eventsCh)
// Catch build events
// Closed by buildkit
buildCh := make(chan *bk.SolveStatus)
wg.Add(1)
go catchOutput(buildCh)
resp, err := c.c.Build(ctx, opts, "", func(ctx context.Context, gw bkgw.Client) (*bkgw.Result, error) {
// Close events channel
defer close(eventsCh)
s := solver.New(solver.Opts{
Control: c.c,
Gateway: gw,
Events: eventsCh,
Auth: auth,
Secrets: secrets,
NoCache: c.cfg.NoCache,
})
lg.Debug().Msg("loading configuration")
if err := env.LoadPlan(ctx, s); err != nil {
return nil, err
}
// Compute output overlay
if fn != nil {
if err := fn(ctx, env, s); err != nil {
return nil, compiler.Err(err)
}
}
// Export environment to a cue directory
// FIXME: this should be elsewhere
lg.Debug().Msg("exporting environment")
tr := otel.Tracer("client")
_, span := tr.Start(ctx, "environment.Export")
defer span.End()
computed := env.Computed().JSON().PrettyString()
st := llb.
Scratch().
File(
llb.Mkfile("computed.json", 0600, []byte(computed)),
llb.WithCustomName("[internal] serializing computed values"),
)
ref, err := s.Solve(ctx, st)
if err != nil {
return nil, err
}
res := bkgw.NewResult()
res.SetRef(ref)
return res, nil
}, buildCh)
if err != nil {
return solver.CleanError(err)
}
for k, v := range resp.ExporterResponse {
// FIXME consume exporter response
lg.
Debug().
Str("key", k).
Str("value", v).
Msg("exporter response")
}
// Wait until all the events are caught
wg.Wait()
return nil
}
func (c *Client) logSolveStatus(ctx context.Context, st *state.State, ch chan *bk.SolveStatus) error {
parseName := func(v *bk.Vertex) (string, string) {
// Pattern: `@name@ message`. Minimal length is len("@X@ ")
if len(v.Name) < 2 || !strings.HasPrefix(v.Name, "@") {
return "", v.Name
}
prefixEndPos := strings.Index(v.Name[1:], "@")
if prefixEndPos == -1 {
return "", v.Name
}
component := v.Name[1 : prefixEndPos+1]
return component, v.Name[prefixEndPos+3 : len(v.Name)]
}
// Just like sprintf, but redacts secrets automatically
secureSprintf := func(format string, a ...interface{}) string {
s := fmt.Sprintf(format, a...)
for _, i := range st.Inputs {
if i.Secret == nil {
continue
}
s = strings.ReplaceAll(s, i.Secret.PlainText(), "***")
}
return s
}
return progressui.PrintSolveStatus(ctx, ch,
func(v *bk.Vertex, index int) {
component, name := parseName(v)
lg := log.
Ctx(ctx).
With().
Str("component", component).
Logger()
lg.
Debug().
Msg(secureSprintf("#%d %s\n", index, name))
lg.
Debug().
Msg(secureSprintf("#%d %s\n", index, v.Digest))
},
func(v *bk.Vertex, format string, a ...interface{}) {
component, _ := parseName(v)
lg := log.
Ctx(ctx).
With().
Str("component", component).
Logger()
msg := secureSprintf(format, a...)
lg.
Debug().
Msg(msg)
},
func(v *bk.Vertex, stream int, partial bool, format string, a ...interface{}) {
component, _ := parseName(v)
lg := log.
Ctx(ctx).
With().
Str("component", component).
Logger()
msg := secureSprintf(format, a...)
lg.
Info().
Msg(msg)
},
)
}