feat: deregister worker on close
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
2cdab4a1ab
commit
b6e9882855
@ -4,9 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.front.kjuulh.io/kjuulh/orbis/internal/app"
|
"git.front.kjuulh.io/kjuulh/orbis/internal/app"
|
||||||
"github.com/joho/godotenv"
|
"github.com/joho/godotenv"
|
||||||
@ -21,22 +18,7 @@ func main() {
|
|||||||
|
|
||||||
app := app.NewApp()
|
app := app.NewApp()
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx := context.Background()
|
||||||
stop := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
<-stop
|
|
||||||
|
|
||||||
app.Logger().Info("stop signal received: shutting down orbis")
|
|
||||||
cancel()
|
|
||||||
|
|
||||||
// Start timer for hard stop
|
|
||||||
time.Sleep(time.Second * 10)
|
|
||||||
fmt.Println("orbis failed to stop in time, forced to hard cancel")
|
|
||||||
os.Exit(1)
|
|
||||||
}()
|
|
||||||
|
|
||||||
if err := newRoot(app).ExecuteContext(ctx); err != nil {
|
if err := newRoot(app).ExecuteContext(ctx); err != nil {
|
||||||
fmt.Printf("%s\n", err)
|
fmt.Printf("%s\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"git.front.kjuulh.io/kjuulh/orbis/internal/app"
|
"git.front.kjuulh.io/kjuulh/orbis/internal/app"
|
||||||
"git.front.kjuulh.io/kjuulh/orbis/internal/processes"
|
"git.front.kjuulh.io/kjuulh/orbis/internal/processes"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
@ -14,13 +16,16 @@ func newRoot(app *app.App) *cobra.Command {
|
|||||||
Short: "Orbis is a data workflow scheduler for all your batch and real-time needs",
|
Short: "Orbis is a data workflow scheduler for all your batch and real-time needs",
|
||||||
|
|
||||||
RunE: func(cmd *cobra.Command, args []string) error {
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
ctx := cmd.Context()
|
//ctx := cmd.Context()
|
||||||
logger.Info("starting orbis")
|
logger.Info("starting orbis")
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
return processes.
|
return processes.
|
||||||
NewApp(logger).
|
NewApp(logger).
|
||||||
Add(app.Scheduler()).
|
Add(app.Scheduler()).
|
||||||
Add(app.Worker()).
|
Add(app.Worker()).
|
||||||
|
WithCtrlC().
|
||||||
Execute(ctx)
|
Execute(ctx)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,9 @@ package processes
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
@ -23,12 +26,15 @@ type CloseProcesser interface {
|
|||||||
type App struct {
|
type App struct {
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
processes []Process
|
processes []Process
|
||||||
|
|
||||||
|
externalDone chan bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewApp(logger *slog.Logger) *App {
|
func NewApp(logger *slog.Logger) *App {
|
||||||
return &App{
|
return &App{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
processes: make([]Process, 0),
|
processes: make([]Process, 0),
|
||||||
|
externalDone: make(chan bool),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -38,6 +44,29 @@ func (a *App) Add(p Process) *App {
|
|||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *App) WithSignal(stop func()) *App {
|
||||||
|
go func() {
|
||||||
|
stop()
|
||||||
|
|
||||||
|
a.externalDone <- true
|
||||||
|
}()
|
||||||
|
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *App) WithCtrlC() *App {
|
||||||
|
go func() {
|
||||||
|
stop := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
|
||||||
|
|
||||||
|
<-stop
|
||||||
|
|
||||||
|
a.externalDone <- true
|
||||||
|
}()
|
||||||
|
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
|
||||||
func (a *App) Execute(ctx context.Context) error {
|
func (a *App) Execute(ctx context.Context) error {
|
||||||
a.logger.InfoContext(ctx, "starting processor")
|
a.logger.InfoContext(ctx, "starting processor")
|
||||||
if err := a.setupProcesses(ctx); err != nil {
|
if err := a.setupProcesses(ctx); err != nil {
|
||||||
@ -49,7 +78,7 @@ func (a *App) Execute(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
processErr := processes.wait(ctx)
|
processErr := processes.wait(ctx, a.externalDone)
|
||||||
|
|
||||||
if err := a.closeProcesses(ctx, processes); err != nil {
|
if err := a.closeProcesses(ctx, processes); err != nil {
|
||||||
if processErr != nil {
|
if processErr != nil {
|
||||||
@ -87,10 +116,6 @@ func (a *App) closeProcesses(ctx context.Context, processes *processStatus) erro
|
|||||||
closeErrs <- errgrp.Wait()
|
closeErrs <- errgrp.Wait()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for _, closeHandle := range processes.processHandles {
|
|
||||||
closeHandle()
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-waitClose.Done():
|
case <-waitClose.Done():
|
||||||
return nil
|
return nil
|
||||||
@ -102,6 +127,10 @@ func (a *App) closeProcesses(ctx context.Context, processes *processStatus) erro
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, closeHandle := range processes.processHandles {
|
||||||
|
closeHandle()
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -110,8 +139,13 @@ type processStatus struct {
|
|||||||
processHandles []context.CancelFunc
|
processHandles []context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processStatus) wait(_ context.Context) error {
|
func (p *processStatus) wait(_ context.Context, externalDone chan bool) error {
|
||||||
return <-p.errs
|
select {
|
||||||
|
case err := <-p.errs:
|
||||||
|
return err
|
||||||
|
case <-externalDone:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) startProcesses(ctx context.Context) (*processStatus, any) {
|
func (a *App) startProcesses(ctx context.Context) (*processStatus, any) {
|
||||||
|
@ -22,3 +22,7 @@ SET
|
|||||||
WHERE
|
WHERE
|
||||||
worker_id = $1;
|
worker_id = $1;
|
||||||
|
|
||||||
|
-- name: DeregisterWorker :exec
|
||||||
|
DELETE FROM worker_register
|
||||||
|
WHERE
|
||||||
|
worker_id = $1;
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Querier interface {
|
type Querier interface {
|
||||||
|
DeregisterWorker(ctx context.Context, workerID uuid.UUID) error
|
||||||
GetWorkers(ctx context.Context) ([]*GetWorkersRow, error)
|
GetWorkers(ctx context.Context) ([]*GetWorkersRow, error)
|
||||||
Ping(ctx context.Context) (int32, error)
|
Ping(ctx context.Context) (int32, error)
|
||||||
RegisterWorker(ctx context.Context, arg *RegisterWorkerParams) error
|
RegisterWorker(ctx context.Context, arg *RegisterWorkerParams) error
|
||||||
|
@ -11,6 +11,17 @@ import (
|
|||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const deregisterWorker = `-- name: DeregisterWorker :exec
|
||||||
|
DELETE FROM worker_register
|
||||||
|
WHERE
|
||||||
|
worker_id = $1
|
||||||
|
`
|
||||||
|
|
||||||
|
func (q *Queries) DeregisterWorker(ctx context.Context, workerID uuid.UUID) error {
|
||||||
|
_, err := q.db.Exec(ctx, deregisterWorker, workerID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
const getWorkers = `-- name: GetWorkers :many
|
const getWorkers = `-- name: GetWorkers :many
|
||||||
SELECT
|
SELECT
|
||||||
worker_id
|
worker_id
|
||||||
|
@ -24,6 +24,9 @@ type Worker struct {
|
|||||||
workProcessor workProcessor
|
workProcessor workProcessor
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
|
|
||||||
|
heartBeatCancel context.CancelFunc
|
||||||
|
heartBeatCtx context.Context
|
||||||
|
|
||||||
capacity uint
|
capacity uint
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,6 +59,67 @@ func (w *Worker) Setup(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
w.heartBeatCtx, w.heartBeatCancel = context.WithCancel(context.Background())
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(time.Second * 5)
|
||||||
|
errorCount := 0
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-w.heartBeatCtx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
if err := w.updateHeartBeat(w.heartBeatCtx); err != nil {
|
||||||
|
if errorCount >= 5 {
|
||||||
|
panic(fmt.Errorf("worker failed to register heartbeat for a long time, panicing..., err: %w", err))
|
||||||
|
}
|
||||||
|
errorCount += 1
|
||||||
|
} else {
|
||||||
|
errorCount = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Worker) Start(ctx context.Context) error {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-w.heartBeatCtx.Done():
|
||||||
|
return nil
|
||||||
|
// case <-ctx.Done():
|
||||||
|
// return nil
|
||||||
|
default:
|
||||||
|
if err := w.processWorkQueue(ctx); err != nil {
|
||||||
|
// FIXME: dead letter item, right now we just log and continue
|
||||||
|
|
||||||
|
w.logger.WarnContext(ctx, "failed to handle work item", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Worker) Close(ctx context.Context) error {
|
||||||
|
if w.heartBeatCancel != nil {
|
||||||
|
w.heartBeatCancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
if w.heartBeatCtx != nil {
|
||||||
|
<-w.heartBeatCtx.Done()
|
||||||
|
|
||||||
|
repo := repositories.New(w.db)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
w.logger.InfoContext(ctx, "deregistering worker", "worker_id", w.workerID)
|
||||||
|
if err := repo.DeregisterWorker(ctx, w.workerID); err != nil {
|
||||||
|
return fmt.Errorf("failed to deregister worker: %s, err: %w", w.workerID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,47 +163,6 @@ func (w *Worker) GetWorkers(ctx context.Context) (*Workers, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Worker) Start(ctx context.Context) error {
|
|
||||||
heartBeatCtx, heartBeatCancel := context.WithCancel(context.Background())
|
|
||||||
go func() {
|
|
||||||
ticker := time.NewTicker(time.Second * 5)
|
|
||||||
errorCount := 0
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-heartBeatCtx.Done():
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
if err := w.updateHeartBeat(heartBeatCtx); err != nil {
|
|
||||||
if errorCount >= 5 {
|
|
||||||
panic(fmt.Errorf("worker failed to register heartbeat for a long time, panicing..., err: %w", err))
|
|
||||||
}
|
|
||||||
errorCount += 1
|
|
||||||
} else {
|
|
||||||
errorCount = 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
heartBeatCancel()
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
if err := w.processWorkQueue(ctx); err != nil {
|
|
||||||
// FIXME: dead letter item, right now we just log and continue
|
|
||||||
|
|
||||||
w.logger.WarnContext(ctx, "failed to handle work item", "error", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) updateHeartBeat(ctx context.Context) error {
|
func (w *Worker) updateHeartBeat(ctx context.Context) error {
|
||||||
repo := repositories.New(w.db)
|
repo := repositories.New(w.db)
|
||||||
|
|
||||||
|
@ -40,6 +40,8 @@ func (w *WorkProcessor) ProcessNext(ctx context.Context, workerID uuid.UUID) err
|
|||||||
return fmt.Errorf("failed to start processing items: %w", err)
|
return fmt.Errorf("failed to start processing items: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
|
||||||
if err := w.workscheduler.Archive(ctx, *schedule); err != nil {
|
if err := w.workscheduler.Archive(ctx, *schedule); err != nil {
|
||||||
return fmt.Errorf("failed to archive item: %w", err)
|
return fmt.Errorf("failed to archive item: %w", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user