From 95468ce2b333e87902b00f62f03f9da7bcc97af9 Mon Sep 17 00:00:00 2001 From: Tom Chauveau Date: Mon, 23 Aug 2021 15:15:05 +0200 Subject: [PATCH 1/2] Solve sub build miss synchronous behavior Signed-off-by: Tom Chauveau --- client/client.go | 23 ++++++++++++++++------- solver/solver.go | 32 ++++++++++++++++++++++++-------- 2 files changed, 40 insertions(+), 15 deletions(-) diff --git a/client/client.go b/client/client.go index 4709fa98..b0af9f22 100644 --- a/client/client.go +++ b/client/client.go @@ -170,16 +170,25 @@ func (c *Client) buildfn(ctx context.Context, st *state.State, env *environment. go catchOutput(buildCh) resp, err := c.c.Build(ctx, opts, "", func(ctx context.Context, gw bkgw.Client) (*bkgw.Result, error) { + eventsWg := sync.WaitGroup{} + closeCh := make(chan *bk.SolveStatus) + // Close events channel - defer close(eventsCh) + defer func() { + close(closeCh) + eventsWg.Wait() + close(eventsCh) + }() s := solver.New(solver.Opts{ - Control: c.c, - Gateway: gw, - Events: eventsCh, - Auth: auth, - Secrets: secrets, - NoCache: c.cfg.NoCache, + Control: c.c, + Gateway: gw, + Events: eventsCh, + EventsWg: &eventsWg, + CloseEvent: closeCh, + Auth: auth, + Secrets: secrets, + NoCache: c.cfg.NoCache, }) // Compute output overlay diff --git a/solver/solver.go b/solver/solver.go index 1df62ac5..4afff25e 100644 --- a/solver/solver.go +++ b/solver/solver.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strings" + "sync" bk "github.com/moby/buildkit/client" "github.com/moby/buildkit/client/llb" @@ -23,12 +24,14 @@ type Solver struct { } type Opts struct { - Control *bk.Client - Gateway bkgw.Client - Events chan *bk.SolveStatus - Auth *RegistryAuthProvider - Secrets session.Attachable - NoCache bool + Control *bk.Client + Gateway bkgw.Client + Events chan *bk.SolveStatus + EventsWg *sync.WaitGroup + CloseEvent chan *bk.SolveStatus + Auth *RegistryAuthProvider + Secrets session.Attachable + NoCache bool } func New(opts Opts) Solver { @@ -169,11 +172,24 @@ func (s Solver) Export(ctx context.Context, st llb.State, img *dockerfile2llb.Im // Forward this build session events to the main events channel, for logging // purposes. go func() { - for event := range ch { - s.opts.Events <- event + select { + case <-s.opts.CloseEvent: + return + default: + for event := range ch { + s.opts.Events <- event + } } }() + // Add task to events + s.opts.EventsWg.Add(1) + + // Resolve event + defer func() { + s.opts.EventsWg.Done() + }() + return s.opts.Control.Build(ctx, opts, "", func(ctx context.Context, c bkgw.Client) (*bkgw.Result, error) { res, err := c.Solve(ctx, bkgw.SolveRequest{ Definition: def, From 3f0350359eff34bac29ed16fb5e46708a890408f Mon Sep 17 00:00:00 2001 From: Tom Chauveau Date: Mon, 23 Aug 2021 16:57:19 +0200 Subject: [PATCH 2/2] Improve solver channel management according to @aluzzardi comments Signed-off-by: Tom Chauveau --- client/client.go | 27 +++++++------------- solver/solver.go | 66 ++++++++++++++++++++++++++++-------------------- 2 files changed, 47 insertions(+), 46 deletions(-) diff --git a/client/client.go b/client/client.go index b0af9f22..47f95478 100644 --- a/client/client.go +++ b/client/client.go @@ -170,26 +170,17 @@ func (c *Client) buildfn(ctx context.Context, st *state.State, env *environment. go catchOutput(buildCh) resp, err := c.c.Build(ctx, opts, "", func(ctx context.Context, gw bkgw.Client) (*bkgw.Result, error) { - eventsWg := sync.WaitGroup{} - closeCh := make(chan *bk.SolveStatus) + s := solver.New(solver.Opts{ + Control: c.c, + Gateway: gw, + Events: eventsCh, + Auth: auth, + Secrets: secrets, + NoCache: c.cfg.NoCache, + }) // Close events channel - defer func() { - close(closeCh) - eventsWg.Wait() - close(eventsCh) - }() - - s := solver.New(solver.Opts{ - Control: c.c, - Gateway: gw, - Events: eventsCh, - EventsWg: &eventsWg, - CloseEvent: closeCh, - Auth: auth, - Secrets: secrets, - NoCache: c.cfg.NoCache, - }) + defer s.Stop() // Compute output overlay if fn != nil { diff --git a/solver/solver.go b/solver/solver.go index 4afff25e..d167675a 100644 --- a/solver/solver.go +++ b/solver/solver.go @@ -20,23 +20,25 @@ import ( ) type Solver struct { - opts Opts + opts Opts + eventsWg *sync.WaitGroup + closeCh chan *bk.SolveStatus } type Opts struct { - Control *bk.Client - Gateway bkgw.Client - Events chan *bk.SolveStatus - EventsWg *sync.WaitGroup - CloseEvent chan *bk.SolveStatus - Auth *RegistryAuthProvider - Secrets session.Attachable - NoCache bool + Control *bk.Client + Gateway bkgw.Client + Events chan *bk.SolveStatus + Auth *RegistryAuthProvider + Secrets session.Attachable + NoCache bool } func New(opts Opts) Solver { return Solver{ - opts: opts, + eventsWg: &sync.WaitGroup{}, + closeCh: make(chan *bk.SolveStatus), + opts: opts, } } @@ -63,6 +65,12 @@ func (s Solver) NoCache() bool { return s.opts.NoCache } +func (s Solver) Stop() { + close(s.closeCh) + s.eventsWg.Wait() + close(s.opts.Events) +} + func (s Solver) AddCredentials(target, username, secret string) { s.opts.Auth.AddCredentials(target, username, secret) } @@ -148,11 +156,30 @@ func (s Solver) Solve(ctx context.Context, st llb.State) (bkgw.Reference, error) return res.SingleRef() } +// Forward events from solver to the main events channel +// It creates a task in the solver waiting group to be +// sure that everything will be forward to the main channel +func (s Solver) forwardEvents(ch chan *bk.SolveStatus) { + s.eventsWg.Add(1) + defer s.eventsWg.Done() + + for event := range ch { + s.opts.Events <- event + } +} + // Export will export `st` to `output` // FIXME: this is currently impleneted as a hack, starting a new Build session // within buildkit from the Control API. Ideally the Gateway API should allow to // Export directly. func (s Solver) Export(ctx context.Context, st llb.State, img *dockerfile2llb.Image, output bk.ExportEntry) (*bk.SolveResponse, error) { + // Check close event channel and return if we're already done with the main pipeline + select { + case <-s.closeCh: + return nil, context.Canceled + default: + } + def, err := s.Marshal(ctx, st) if err != nil { return nil, err @@ -171,24 +198,7 @@ func (s Solver) Export(ctx context.Context, st llb.State, img *dockerfile2llb.Im // Forward this build session events to the main events channel, for logging // purposes. - go func() { - select { - case <-s.opts.CloseEvent: - return - default: - for event := range ch { - s.opts.Events <- event - } - } - }() - - // Add task to events - s.opts.EventsWg.Add(1) - - // Resolve event - defer func() { - s.opts.EventsWg.Done() - }() + go s.forwardEvents(ch) return s.opts.Control.Build(ctx, opts, "", func(ctx context.Context, c bkgw.Client) (*bkgw.Result, error) { res, err := c.Solve(ctx, bkgw.SolveRequest{