Fix data race issue by synchronizing events channel through waiting group

Previously, there was a data race condition error when `op
.#PushContainer` was called. It was because of the `Export` func
in `solver.go` that create a sub build pipeline and forward his
events to the main events channel (that catch all events that we log).
To fix it, channel are now split between build pipeline and
centralize into the old output channel (`ch`).

Signed-off-by: Tom Chauveau <tom.chauveau@epitech.eu>
This commit is contained in:
Tom Chauveau 2021-08-20 14:07:04 +02:00
parent 22a9445455
commit 9adeecd005
No known key found for this signature in database
GPG Key ID: 3C9847D981AAC1BF

View File

@ -6,6 +6,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -103,6 +104,9 @@ func (c *Client) Do(ctx context.Context, state *state.State, fn DoFunc) error {
} }
func (c *Client) buildfn(ctx context.Context, st *state.State, env *environment.Environment, fn DoFunc, ch chan *bk.SolveStatus) error { func (c *Client) buildfn(ctx context.Context, st *state.State, env *environment.Environment, fn DoFunc, ch chan *bk.SolveStatus) error {
// Close output channel
defer close(ch)
lg := log.Ctx(ctx) lg := log.Ctx(ctx)
// Scan local dirs to grant access // Scan local dirs to grant access
@ -139,20 +143,35 @@ func (c *Client) buildfn(ctx context.Context, st *state.State, env *environment.
Interface("attrs", opts.FrontendAttrs). Interface("attrs", opts.FrontendAttrs).
Msg("spawning buildkit job") Msg("spawning buildkit job")
solverCh := make(chan *bk.SolveStatus) wg := sync.WaitGroup{}
defer close(solverCh) // Catch output from events
go func() { catchOutput := func(inCh chan *bk.SolveStatus) {
for e := range solverCh { for e := range inCh {
fmt.Println("J'envoie un event depuis pushContainer")
ch <- e ch <- e
} }
}() wg.Done()
}
// Catch solver's events
// Closed manually
eventsCh := make(chan *bk.SolveStatus)
wg.Add(1)
go catchOutput(eventsCh)
// Catch build events
// Closed by buildkit
buildCh := make(chan *bk.SolveStatus)
wg.Add(1)
go catchOutput(buildCh)
resp, err := c.c.Build(ctx, opts, "", func(ctx context.Context, gw bkgw.Client) (*bkgw.Result, error) { resp, err := c.c.Build(ctx, opts, "", func(ctx context.Context, gw bkgw.Client) (*bkgw.Result, error) {
// Close events channel
defer close(eventsCh)
s := solver.New(solver.Opts{ s := solver.New(solver.Opts{
Control: c.c, Control: c.c,
Gateway: gw, Gateway: gw,
Events: solverCh, Events: eventsCh,
Auth: auth, Auth: auth,
Secrets: secrets, Secrets: secrets,
NoCache: c.cfg.NoCache, NoCache: c.cfg.NoCache,
@ -193,7 +212,7 @@ func (c *Client) buildfn(ctx context.Context, st *state.State, env *environment.
res := bkgw.NewResult() res := bkgw.NewResult()
res.SetRef(ref) res.SetRef(ref)
return res, nil return res, nil
}, ch) }, buildCh)
if err != nil { if err != nil {
return solver.CleanError(err) return solver.CleanError(err)
} }
@ -205,6 +224,9 @@ func (c *Client) buildfn(ctx context.Context, st *state.State, env *environment.
Str("value", v). Str("value", v).
Msg("exporter response") Msg("exporter response")
} }
// Wait until all the events are caught
wg.Wait()
return nil return nil
} }