From 1d4a72fd5f98364e8b73132adf01bb36e899c372 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sat, 18 Jan 2025 12:30:13 +0100 Subject: [PATCH] feat: prune old workers --- internal/app/app.go | 2 +- internal/scheduler/scheduler.go | 9 ++++++++- internal/worker/queries.sql | 5 +++++ internal/worker/repositories/querier.go | 1 + internal/worker/repositories/queries.sql.go | 11 +++++++++++ internal/worker/worker.go | 6 ++++++ internal/workprocessor/workprocessor.go | 2 +- 7 files changed, 33 insertions(+), 3 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index d46cd42..9f35c01 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -26,7 +26,7 @@ func (a *App) Logger() *slog.Logger { } func (a *App) Scheduler() *scheduler.Scheduler { - return scheduler.NewScheduler(a.logger.With("component", "scheduler"), Postgres(), a.Executor()) + return scheduler.NewScheduler(a.logger.With("component", "scheduler"), Postgres(), a.Executor(), a.Worker()) } func (a *App) Executor() *executor.Executor { diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 8e06f1c..b2afe41 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -9,6 +9,7 @@ import ( "time" "git.front.kjuulh.io/kjuulh/orbis/internal/executor" + "git.front.kjuulh.io/kjuulh/orbis/internal/worker" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) @@ -17,13 +18,15 @@ type Scheduler struct { logger *slog.Logger db *pgxpool.Pool executor *executor.Executor + worker *worker.Worker } -func NewScheduler(logger *slog.Logger, db *pgxpool.Pool, executor *executor.Executor) *Scheduler { +func NewScheduler(logger *slog.Logger, db *pgxpool.Pool, executor *executor.Executor, worker *worker.Worker) *Scheduler { return &Scheduler{ logger: logger, db: db, executor: executor, + worker: worker, } } @@ -96,6 +99,10 @@ func (s *Scheduler) acquireLeader(ctx context.Context) (bool, error) { } func (s *Scheduler) process(ctx context.Context) error { + if err := s.worker.Prune(ctx); err != nil { + return fmt.Errorf("failed to prune error: %w", err) + } + if err := s.executor.DispatchEvents(ctx); err != nil { return fmt.Errorf("failed to dispatch events: %w", err) } diff --git a/internal/worker/queries.sql b/internal/worker/queries.sql index 5430f3d..1cf840f 100644 --- a/internal/worker/queries.sql +++ b/internal/worker/queries.sql @@ -26,3 +26,8 @@ WHERE DELETE FROM worker_register WHERE worker_id = $1; + +-- name: PruneWorker :exec +DELETE FROM worker_register +WHERE + heart_beat <= now() - INTERVAL '10 minutes'; diff --git a/internal/worker/repositories/querier.go b/internal/worker/repositories/querier.go index a980c46..a4b3a62 100644 --- a/internal/worker/repositories/querier.go +++ b/internal/worker/repositories/querier.go @@ -14,6 +14,7 @@ type Querier interface { DeregisterWorker(ctx context.Context, workerID uuid.UUID) error GetWorkers(ctx context.Context) ([]*GetWorkersRow, error) Ping(ctx context.Context) (int32, error) + PruneWorker(ctx context.Context) error RegisterWorker(ctx context.Context, arg *RegisterWorkerParams) error UpdateWorkerHeartbeat(ctx context.Context, workerID uuid.UUID) error } diff --git a/internal/worker/repositories/queries.sql.go b/internal/worker/repositories/queries.sql.go index a6aecab..da00e53 100644 --- a/internal/worker/repositories/queries.sql.go +++ b/internal/worker/repositories/queries.sql.go @@ -66,6 +66,17 @@ func (q *Queries) Ping(ctx context.Context) (int32, error) { return column_1, err } +const pruneWorker = `-- name: PruneWorker :exec +DELETE FROM worker_register +WHERE + heart_beat <= now() - INTERVAL '10 minutes' +` + +func (q *Queries) PruneWorker(ctx context.Context) error { + _, err := q.db.Exec(ctx, pruneWorker) + return err +} + const registerWorker = `-- name: RegisterWorker :exec INSERT INTO worker_register (worker_id, capacity) VALUES ( diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 29908c0..68ca8b5 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -173,3 +173,9 @@ func (w *Worker) updateHeartBeat(ctx context.Context) error { func (w *Worker) processWorkQueue(ctx context.Context) error { return w.workProcessor.ProcessNext(ctx, w.workerID) } + +func (w *Worker) Prune(ctx context.Context) error { + repo := repositories.New(w.db) + + return repo.PruneWorker(ctx) +} diff --git a/internal/workprocessor/workprocessor.go b/internal/workprocessor/workprocessor.go index 8aef83b..f71388c 100644 --- a/internal/workprocessor/workprocessor.go +++ b/internal/workprocessor/workprocessor.go @@ -40,7 +40,7 @@ func (w *WorkProcessor) ProcessNext(ctx context.Context, workerID uuid.UUID) err return fmt.Errorf("failed to start processing items: %w", err) } - time.Sleep(10 * time.Second) + time.Sleep(time.Millisecond * 10) if err := w.workscheduler.Archive(ctx, *schedule); err != nil { return fmt.Errorf("failed to archive item: %w", err)