diff --git a/cmd/orbis/main.go b/cmd/orbis/main.go index 48d4f69..25dca20 100644 --- a/cmd/orbis/main.go +++ b/cmd/orbis/main.go @@ -4,9 +4,6 @@ import ( "context" "fmt" "os" - "os/signal" - "syscall" - "time" "git.front.kjuulh.io/kjuulh/orbis/internal/app" "github.com/joho/godotenv" @@ -21,22 +18,7 @@ func main() { app := app.NewApp() - ctx, cancel := context.WithCancel(context.Background()) - stop := make(chan os.Signal, 1) - signal.Notify(stop, os.Interrupt, syscall.SIGTERM) - - go func() { - <-stop - - app.Logger().Info("stop signal received: shutting down orbis") - cancel() - - // Start timer for hard stop - time.Sleep(time.Second * 10) - fmt.Println("orbis failed to stop in time, forced to hard cancel") - os.Exit(1) - }() - + ctx := context.Background() if err := newRoot(app).ExecuteContext(ctx); err != nil { fmt.Printf("%s\n", err) os.Exit(1) diff --git a/cmd/orbis/root.go b/cmd/orbis/root.go index d517154..d9c3978 100644 --- a/cmd/orbis/root.go +++ b/cmd/orbis/root.go @@ -1,6 +1,8 @@ package main import ( + "context" + "git.front.kjuulh.io/kjuulh/orbis/internal/app" "git.front.kjuulh.io/kjuulh/orbis/internal/processes" "github.com/spf13/cobra" @@ -14,13 +16,16 @@ func newRoot(app *app.App) *cobra.Command { Short: "Orbis is a data workflow scheduler for all your batch and real-time needs", RunE: func(cmd *cobra.Command, args []string) error { - ctx := cmd.Context() + //ctx := cmd.Context() logger.Info("starting orbis") + ctx := context.Background() + return processes. NewApp(logger). Add(app.Scheduler()). Add(app.Worker()). + WithCtrlC(). Execute(ctx) }, } diff --git a/internal/processes/processes.go b/internal/processes/processes.go index 37512c6..37789dc 100644 --- a/internal/processes/processes.go +++ b/internal/processes/processes.go @@ -3,6 +3,9 @@ package processes import ( "context" "log/slog" + "os" + "os/signal" + "syscall" "time" "golang.org/x/sync/errgroup" @@ -23,12 +26,15 @@ type CloseProcesser interface { type App struct { logger *slog.Logger processes []Process + + externalDone chan bool } func NewApp(logger *slog.Logger) *App { return &App{ - logger: logger, - processes: make([]Process, 0), + logger: logger, + processes: make([]Process, 0), + externalDone: make(chan bool), } } @@ -38,6 +44,29 @@ func (a *App) Add(p Process) *App { return a } +func (a *App) WithSignal(stop func()) *App { + go func() { + stop() + + a.externalDone <- true + }() + + return a +} + +func (a *App) WithCtrlC() *App { + go func() { + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + + <-stop + + a.externalDone <- true + }() + + return a +} + func (a *App) Execute(ctx context.Context) error { a.logger.InfoContext(ctx, "starting processor") if err := a.setupProcesses(ctx); err != nil { @@ -49,7 +78,7 @@ func (a *App) Execute(ctx context.Context) error { return nil } - processErr := processes.wait(ctx) + processErr := processes.wait(ctx, a.externalDone) if err := a.closeProcesses(ctx, processes); err != nil { if processErr != nil { @@ -87,10 +116,6 @@ func (a *App) closeProcesses(ctx context.Context, processes *processStatus) erro closeErrs <- errgrp.Wait() }() - for _, closeHandle := range processes.processHandles { - closeHandle() - } - select { case <-waitClose.Done(): return nil @@ -102,6 +127,10 @@ func (a *App) closeProcesses(ctx context.Context, processes *processStatus) erro } } + for _, closeHandle := range processes.processHandles { + closeHandle() + } + return nil } @@ -110,8 +139,13 @@ type processStatus struct { processHandles []context.CancelFunc } -func (p *processStatus) wait(_ context.Context) error { - return <-p.errs +func (p *processStatus) wait(_ context.Context, externalDone chan bool) error { + select { + case err := <-p.errs: + return err + case <-externalDone: + return nil + } } func (a *App) startProcesses(ctx context.Context) (*processStatus, any) { diff --git a/internal/worker/queries.sql b/internal/worker/queries.sql index 2b15dfd..5430f3d 100644 --- a/internal/worker/queries.sql +++ b/internal/worker/queries.sql @@ -22,3 +22,7 @@ SET WHERE worker_id = $1; +-- name: DeregisterWorker :exec +DELETE FROM worker_register +WHERE + worker_id = $1; diff --git a/internal/worker/repositories/querier.go b/internal/worker/repositories/querier.go index 76a11a1..a980c46 100644 --- a/internal/worker/repositories/querier.go +++ b/internal/worker/repositories/querier.go @@ -11,6 +11,7 @@ import ( ) type Querier interface { + DeregisterWorker(ctx context.Context, workerID uuid.UUID) error GetWorkers(ctx context.Context) ([]*GetWorkersRow, error) Ping(ctx context.Context) (int32, error) RegisterWorker(ctx context.Context, arg *RegisterWorkerParams) error diff --git a/internal/worker/repositories/queries.sql.go b/internal/worker/repositories/queries.sql.go index 981aec4..a6aecab 100644 --- a/internal/worker/repositories/queries.sql.go +++ b/internal/worker/repositories/queries.sql.go @@ -11,6 +11,17 @@ import ( "github.com/google/uuid" ) +const deregisterWorker = `-- name: DeregisterWorker :exec +DELETE FROM worker_register +WHERE + worker_id = $1 +` + +func (q *Queries) DeregisterWorker(ctx context.Context, workerID uuid.UUID) error { + _, err := q.db.Exec(ctx, deregisterWorker, workerID) + return err +} + const getWorkers = `-- name: GetWorkers :many SELECT worker_id diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 77a1906..29908c0 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -24,6 +24,9 @@ type Worker struct { workProcessor workProcessor logger *slog.Logger + heartBeatCancel context.CancelFunc + heartBeatCtx context.Context + capacity uint } @@ -56,6 +59,67 @@ func (w *Worker) Setup(ctx context.Context) error { return nil } + w.heartBeatCtx, w.heartBeatCancel = context.WithCancel(context.Background()) + go func() { + ticker := time.NewTicker(time.Second * 5) + errorCount := 0 + + for { + select { + case <-w.heartBeatCtx.Done(): + return + case <-ticker.C: + if err := w.updateHeartBeat(w.heartBeatCtx); err != nil { + if errorCount >= 5 { + panic(fmt.Errorf("worker failed to register heartbeat for a long time, panicing..., err: %w", err)) + } + errorCount += 1 + } else { + errorCount = 0 + } + } + } + }() + + return nil +} + +func (w *Worker) Start(ctx context.Context) error { + for { + select { + case <-w.heartBeatCtx.Done(): + return nil + // case <-ctx.Done(): + // return nil + default: + if err := w.processWorkQueue(ctx); err != nil { + // FIXME: dead letter item, right now we just log and continue + + w.logger.WarnContext(ctx, "failed to handle work item", "error", err) + } + } + } +} + +func (w *Worker) Close(ctx context.Context) error { + if w.heartBeatCancel != nil { + w.heartBeatCancel() + } + + if w.heartBeatCtx != nil { + <-w.heartBeatCtx.Done() + + repo := repositories.New(w.db) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + w.logger.InfoContext(ctx, "deregistering worker", "worker_id", w.workerID) + if err := repo.DeregisterWorker(ctx, w.workerID); err != nil { + return fmt.Errorf("failed to deregister worker: %s, err: %w", w.workerID, err) + } + } + return nil } @@ -99,47 +163,6 @@ func (w *Worker) GetWorkers(ctx context.Context) (*Workers, error) { }, nil } -func (w *Worker) Start(ctx context.Context) error { - heartBeatCtx, heartBeatCancel := context.WithCancel(context.Background()) - go func() { - ticker := time.NewTicker(time.Second * 5) - errorCount := 0 - - for { - select { - case <-heartBeatCtx.Done(): - return - case <-ticker.C: - if err := w.updateHeartBeat(heartBeatCtx); err != nil { - if errorCount >= 5 { - panic(fmt.Errorf("worker failed to register heartbeat for a long time, panicing..., err: %w", err)) - } - errorCount += 1 - } else { - errorCount = 0 - } - } - } - }() - - defer func() { - heartBeatCancel() - }() - - for { - select { - case <-ctx.Done(): - return nil - default: - if err := w.processWorkQueue(ctx); err != nil { - // FIXME: dead letter item, right now we just log and continue - - w.logger.WarnContext(ctx, "failed to handle work item", "error", err) - } - } - } -} - func (w *Worker) updateHeartBeat(ctx context.Context) error { repo := repositories.New(w.db) diff --git a/internal/workprocessor/workprocessor.go b/internal/workprocessor/workprocessor.go index 55741ce..8aef83b 100644 --- a/internal/workprocessor/workprocessor.go +++ b/internal/workprocessor/workprocessor.go @@ -40,6 +40,8 @@ func (w *WorkProcessor) ProcessNext(ctx context.Context, workerID uuid.UUID) err return fmt.Errorf("failed to start processing items: %w", err) } + time.Sleep(10 * time.Second) + if err := w.workscheduler.Archive(ctx, *schedule); err != nil { return fmt.Errorf("failed to archive item: %w", err) }