Improve solver channel management according to @aluzzardi comments

Signed-off-by: Tom Chauveau <tom.chauveau@epitech.eu>
This commit is contained in:
Tom Chauveau 2021-08-23 16:57:19 +02:00
parent 95468ce2b3
commit 3f0350359e
No known key found for this signature in database
GPG Key ID: 3C9847D981AAC1BF
2 changed files with 47 additions and 46 deletions

View File

@ -170,27 +170,18 @@ func (c *Client) buildfn(ctx context.Context, st *state.State, env *environment.
go catchOutput(buildCh) go catchOutput(buildCh)
resp, err := c.c.Build(ctx, opts, "", func(ctx context.Context, gw bkgw.Client) (*bkgw.Result, error) { resp, err := c.c.Build(ctx, opts, "", func(ctx context.Context, gw bkgw.Client) (*bkgw.Result, error) {
eventsWg := sync.WaitGroup{}
closeCh := make(chan *bk.SolveStatus)
// Close events channel
defer func() {
close(closeCh)
eventsWg.Wait()
close(eventsCh)
}()
s := solver.New(solver.Opts{ s := solver.New(solver.Opts{
Control: c.c, Control: c.c,
Gateway: gw, Gateway: gw,
Events: eventsCh, Events: eventsCh,
EventsWg: &eventsWg,
CloseEvent: closeCh,
Auth: auth, Auth: auth,
Secrets: secrets, Secrets: secrets,
NoCache: c.cfg.NoCache, NoCache: c.cfg.NoCache,
}) })
// Close events channel
defer s.Stop()
// Compute output overlay // Compute output overlay
if fn != nil { if fn != nil {
if err := fn(ctx, env, s); err != nil { if err := fn(ctx, env, s); err != nil {

View File

@ -21,14 +21,14 @@ import (
type Solver struct { type Solver struct {
opts Opts opts Opts
eventsWg *sync.WaitGroup
closeCh chan *bk.SolveStatus
} }
type Opts struct { type Opts struct {
Control *bk.Client Control *bk.Client
Gateway bkgw.Client Gateway bkgw.Client
Events chan *bk.SolveStatus Events chan *bk.SolveStatus
EventsWg *sync.WaitGroup
CloseEvent chan *bk.SolveStatus
Auth *RegistryAuthProvider Auth *RegistryAuthProvider
Secrets session.Attachable Secrets session.Attachable
NoCache bool NoCache bool
@ -36,6 +36,8 @@ type Opts struct {
func New(opts Opts) Solver { func New(opts Opts) Solver {
return Solver{ return Solver{
eventsWg: &sync.WaitGroup{},
closeCh: make(chan *bk.SolveStatus),
opts: opts, opts: opts,
} }
} }
@ -63,6 +65,12 @@ func (s Solver) NoCache() bool {
return s.opts.NoCache return s.opts.NoCache
} }
func (s Solver) Stop() {
close(s.closeCh)
s.eventsWg.Wait()
close(s.opts.Events)
}
func (s Solver) AddCredentials(target, username, secret string) { func (s Solver) AddCredentials(target, username, secret string) {
s.opts.Auth.AddCredentials(target, username, secret) s.opts.Auth.AddCredentials(target, username, secret)
} }
@ -148,11 +156,30 @@ func (s Solver) Solve(ctx context.Context, st llb.State) (bkgw.Reference, error)
return res.SingleRef() return res.SingleRef()
} }
// Forward events from solver to the main events channel
// It creates a task in the solver waiting group to be
// sure that everything will be forward to the main channel
func (s Solver) forwardEvents(ch chan *bk.SolveStatus) {
s.eventsWg.Add(1)
defer s.eventsWg.Done()
for event := range ch {
s.opts.Events <- event
}
}
// Export will export `st` to `output` // Export will export `st` to `output`
// FIXME: this is currently impleneted as a hack, starting a new Build session // FIXME: this is currently impleneted as a hack, starting a new Build session
// within buildkit from the Control API. Ideally the Gateway API should allow to // within buildkit from the Control API. Ideally the Gateway API should allow to
// Export directly. // Export directly.
func (s Solver) Export(ctx context.Context, st llb.State, img *dockerfile2llb.Image, output bk.ExportEntry) (*bk.SolveResponse, error) { func (s Solver) Export(ctx context.Context, st llb.State, img *dockerfile2llb.Image, output bk.ExportEntry) (*bk.SolveResponse, error) {
// Check close event channel and return if we're already done with the main pipeline
select {
case <-s.closeCh:
return nil, context.Canceled
default:
}
def, err := s.Marshal(ctx, st) def, err := s.Marshal(ctx, st)
if err != nil { if err != nil {
return nil, err return nil, err
@ -171,24 +198,7 @@ func (s Solver) Export(ctx context.Context, st llb.State, img *dockerfile2llb.Im
// Forward this build session events to the main events channel, for logging // Forward this build session events to the main events channel, for logging
// purposes. // purposes.
go func() { go s.forwardEvents(ch)
select {
case <-s.opts.CloseEvent:
return
default:
for event := range ch {
s.opts.Events <- event
}
}
}()
// Add task to events
s.opts.EventsWg.Add(1)
// Resolve event
defer func() {
s.opts.EventsWg.Done()
}()
return s.opts.Control.Build(ctx, opts, "", func(ctx context.Context, c bkgw.Client) (*bkgw.Result, error) { return s.opts.Control.Build(ctx, opts, "", func(ctx context.Context, c bkgw.Client) (*bkgw.Result, error) {
res, err := c.Solve(ctx, bkgw.SolveRequest{ res, err := c.Solve(ctx, bkgw.SolveRequest{