feat: prune old workers
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2025-01-18 12:30:13 +01:00
parent b6e9882855
commit 1d4a72fd5f
7 changed files with 33 additions and 3 deletions

View File

@@ -26,3 +26,8 @@ WHERE
DELETE FROM worker_register
WHERE
worker_id = $1;
-- name: PruneWorker :exec
DELETE FROM worker_register
WHERE
heart_beat <= now() - INTERVAL '10 minutes';

View File

@@ -14,6 +14,7 @@ type Querier interface {
DeregisterWorker(ctx context.Context, workerID uuid.UUID) error
GetWorkers(ctx context.Context) ([]*GetWorkersRow, error)
Ping(ctx context.Context) (int32, error)
PruneWorker(ctx context.Context) error
RegisterWorker(ctx context.Context, arg *RegisterWorkerParams) error
UpdateWorkerHeartbeat(ctx context.Context, workerID uuid.UUID) error
}

View File

@@ -66,6 +66,17 @@ func (q *Queries) Ping(ctx context.Context) (int32, error) {
return column_1, err
}
const pruneWorker = `-- name: PruneWorker :exec
DELETE FROM worker_register
WHERE
heart_beat <= now() - INTERVAL '10 minutes'
`
func (q *Queries) PruneWorker(ctx context.Context) error {
_, err := q.db.Exec(ctx, pruneWorker)
return err
}
const registerWorker = `-- name: RegisterWorker :exec
INSERT INTO worker_register (worker_id, capacity)
VALUES (

View File

@@ -173,3 +173,9 @@ func (w *Worker) updateHeartBeat(ctx context.Context) error {
func (w *Worker) processWorkQueue(ctx context.Context) error {
return w.workProcessor.ProcessNext(ctx, w.workerID)
}
func (w *Worker) Prune(ctx context.Context) error {
repo := repositories.New(w.db)
return repo.PruneWorker(ctx)
}