curre/manager.go

198 lines
3.3 KiB
Go

package curre
import (
"context"
"os"
"sync"
"time"
)
const (
OK = 0
Internal = 1
)
type Manager struct {
components []Component
logger Logger
startStopLock sync.Mutex
started bool
exitChan chan int
exitCode int
lifetime Lifetime
}
func NewManager() *Manager {
return &Manager{
logger: &DefaultLogger{},
lifetime: ConsoleLifetime,
exitChan: make(chan int, 1),
exitCode: OK,
}
}
type ComponentsAreReady struct {
}
type CleanupFunc func(ctx context.Context) error
func (m *Manager) RunNonBlocking(ctx context.Context, readyChan chan ComponentsAreReady) (CleanupFunc, error) {
go func() error {
m.initLifetime()
err := m.init(ctx)
if err != nil {
return err
}
err = m.startBlocking(ctx)
if err != nil {
return err
}
readyChan <- ComponentsAreReady{}
err = m.wait(ctx)
if err != nil {
return err
}
return nil
}()
return func(ctx context.Context) error {
return m.shutdown(ctx)
}, nil
}
func (m *Manager) Run(ctx context.Context) error {
m.initLifetime()
err := m.init(ctx)
if err != nil {
return err
}
err = m.start(ctx)
if err != nil {
return err
}
err = m.wait(ctx)
if err != nil {
return err
}
err = m.shutdown(ctx)
if err != nil {
return err
}
os.Exit(m.exitCode)
return nil
}
func (m *Manager) Register(components ...Component) *Manager {
if m.started {
panic("cannot register to a started manager")
}
m.components = append(m.components, components...)
return m
}
func (m *Manager) init(ctx context.Context) error {
if m.started {
panic("cannot reinit a started manager")
}
for _, c := range m.components {
err := c.Init(ctx)
if err != nil {
return err
}
m.logger.Info("Manager: Init: %T", c)
}
return nil
}
func (m *Manager) start(ctx context.Context) error {
for _, c := range m.components {
go m.startComponent(ctx, c)
}
return nil
}
func (m *Manager) startBlocking(ctx context.Context) error {
for _, c := range m.components {
m.startComponent(ctx, c)
}
return nil
}
func (m *Manager) startComponent(ctx context.Context, component Component) {
defer func() {
err := recover()
if err != nil {
m.logger.Error("Panic occurred in component: %T, error: %s", component, err)
m.exitChan <- Internal
}
}()
m.logger.Info("Starting %T", component)
err := component.Start(ctx)
if err != nil {
m.logger.Error("Component: %T encountered an error: ", component, err)
m.exitChan <- Internal
return
}
m.logger.Info("Component %T, done running", component)
}
func (m *Manager) initLifetime() {
go func() {
exitCode := m.lifetime()
m.logger.Info("Exit signal received: %d", exitCode)
m.exitChan <- exitCode
}()
}
func (m *Manager) wait(ctx context.Context) error {
select {
case exitCode := <-m.exitChan:
m.exitCode = exitCode
return nil
case <-ctx.Done():
return nil
}
return nil
}
func (m *Manager) shutdown(ctx context.Context) error {
shutdownChan := make(chan struct{}, 1)
closers := m.getClosers(ctx)
go func(ctx context.Context) {
for _, c := range closers {
c.Stop(ctx)
}
shutdownChan <- struct{}{}
}(ctx)
<-shutdownChan
m.logger.Info("Shutting down of components complete")
return nil
}
func (m *Manager) getClosers(ctx context.Context) []Component {
return m.components
}