From 1cf9d234917afcacc226c11d569e75ac52241081 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sat, 18 Jan 2025 13:18:12 +0100 Subject: [PATCH] feat: move schedules to registered workers --- internal/executor/executor.go | 6 ++ internal/scheduler/scheduler.go | 2 +- internal/worker/queries.sql | 2 +- internal/worker/repositories/queries.sql.go | 2 +- internal/workscheduler/queries.sql | 20 ++++++ .../workscheduler/repositories/querier.go | 2 + .../workscheduler/repositories/queries.sql.go | 65 +++++++++++++++++++ internal/workscheduler/workscheduler.go | 44 +++++++++++++ 8 files changed, 140 insertions(+), 3 deletions(-) diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 9e8a9fe..724f0a2 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -39,6 +39,7 @@ func NewExecutor( } func (e *Executor) DispatchEvents(ctx context.Context) error { + e.logger.InfoContext(ctx, "dispatching events") start := time.Now().Add(-time.Second * 30) @@ -54,6 +55,11 @@ func (e *Executor) DispatchEvents(ctx context.Context) error { return fmt.Errorf("failed to find workers: %w", err) } + e.logger.InfoContext(ctx, "moving unattended events") + if err := e.workerscheduler.GetUnattended(ctx, registeredWorkers); err != nil { + return fmt.Errorf("failed to move unattended events: %w", err) + } + workers, err := e.workerscheduler.GetWorkers(ctx, registeredWorkers) if err != nil { return fmt.Errorf("failed to find workers: %w", err) diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index b2afe41..d6cccd4 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -49,7 +49,7 @@ func (s *Scheduler) Execute(ctx context.Context) error { return nil } - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { diff --git a/internal/worker/queries.sql b/internal/worker/queries.sql index 1cf840f..da4abc3 100644 --- a/internal/worker/queries.sql +++ b/internal/worker/queries.sql @@ -30,4 +30,4 @@ WHERE -- name: PruneWorker :exec DELETE FROM worker_register WHERE - heart_beat <= now() - INTERVAL '10 minutes'; + heart_beat <= now() - INTERVAL '1 minutes'; diff --git a/internal/worker/repositories/queries.sql.go b/internal/worker/repositories/queries.sql.go index da00e53..9ad9bdc 100644 --- a/internal/worker/repositories/queries.sql.go +++ b/internal/worker/repositories/queries.sql.go @@ -69,7 +69,7 @@ func (q *Queries) Ping(ctx context.Context) (int32, error) { const pruneWorker = `-- name: PruneWorker :exec DELETE FROM worker_register WHERE - heart_beat <= now() - INTERVAL '10 minutes' + heart_beat <= now() - INTERVAL '1 minutes' ` func (q *Queries) PruneWorker(ctx context.Context) error { diff --git a/internal/workscheduler/queries.sql b/internal/workscheduler/queries.sql index fd5ec25..0b0a5d8 100644 --- a/internal/workscheduler/queries.sql +++ b/internal/workscheduler/queries.sql @@ -54,3 +54,23 @@ SET WHERE schedule_id = $1; +-- name: GetUnattended :many +SELECT + * +FROM + work_schedule +WHERE + worker_id NOT IN (SELECT unnest(@worker_ids::uuid[])) + AND state <> 'archived' + --AND updated_at <= now() - INTERVAL '10 minutes' +ORDER BY updated_at DESC +LIMIT @amount::integer; + +-- name: UpdateSchdule :exec +UPDATE work_schedule +SET + state = 'pending' + , worker_id = $1 + , updated_at = now() +WHERE + schedule_id = $2; diff --git a/internal/workscheduler/repositories/querier.go b/internal/workscheduler/repositories/querier.go index f5c000e..f582a24 100644 --- a/internal/workscheduler/repositories/querier.go +++ b/internal/workscheduler/repositories/querier.go @@ -14,9 +14,11 @@ type Querier interface { Archive(ctx context.Context, scheduleID uuid.UUID) error GetCurrentQueueSize(ctx context.Context, workerID uuid.UUID) (int64, error) GetNext(ctx context.Context, workerID uuid.UUID) (*WorkSchedule, error) + GetUnattended(ctx context.Context, arg *GetUnattendedParams) ([]*WorkSchedule, error) InsertQueueItem(ctx context.Context, arg *InsertQueueItemParams) error Ping(ctx context.Context) (int32, error) StartProcessing(ctx context.Context, scheduleID uuid.UUID) error + UpdateSchdule(ctx context.Context, arg *UpdateSchduleParams) error } var _ Querier = (*Queries)(nil) diff --git a/internal/workscheduler/repositories/queries.sql.go b/internal/workscheduler/repositories/queries.sql.go index 2114868..41b364a 100644 --- a/internal/workscheduler/repositories/queries.sql.go +++ b/internal/workscheduler/repositories/queries.sql.go @@ -68,6 +68,51 @@ func (q *Queries) GetNext(ctx context.Context, workerID uuid.UUID) (*WorkSchedul return &i, err } +const getUnattended = `-- name: GetUnattended :many +SELECT + schedule_id, worker_id, start_run, end_run, updated_at, state +FROM + work_schedule +WHERE + worker_id NOT IN (SELECT unnest($1::uuid[])) + AND state <> 'archived' + --AND updated_at <= now() - INTERVAL '10 minutes' +ORDER BY updated_at DESC +LIMIT $2::integer +` + +type GetUnattendedParams struct { + WorkerIds []uuid.UUID `json:"worker_ids"` + Amount int32 `json:"amount"` +} + +func (q *Queries) GetUnattended(ctx context.Context, arg *GetUnattendedParams) ([]*WorkSchedule, error) { + rows, err := q.db.Query(ctx, getUnattended, arg.WorkerIds, arg.Amount) + if err != nil { + return nil, err + } + defer rows.Close() + items := []*WorkSchedule{} + for rows.Next() { + var i WorkSchedule + if err := rows.Scan( + &i.ScheduleID, + &i.WorkerID, + &i.StartRun, + &i.EndRun, + &i.UpdatedAt, + &i.State, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const insertQueueItem = `-- name: InsertQueueItem :exec INSERT INTO work_schedule ( @@ -127,3 +172,23 @@ func (q *Queries) StartProcessing(ctx context.Context, scheduleID uuid.UUID) err _, err := q.db.Exec(ctx, startProcessing, scheduleID) return err } + +const updateSchdule = `-- name: UpdateSchdule :exec +UPDATE work_schedule +SET + state = 'pending' + , worker_id = $1 + , updated_at = now() +WHERE + schedule_id = $2 +` + +type UpdateSchduleParams struct { + WorkerID uuid.UUID `json:"worker_id"` + ScheduleID uuid.UUID `json:"schedule_id"` +} + +func (q *Queries) UpdateSchdule(ctx context.Context, arg *UpdateSchduleParams) error { + _, err := q.db.Exec(ctx, updateSchdule, arg.WorkerID, arg.ScheduleID) + return err +} diff --git a/internal/workscheduler/workscheduler.go b/internal/workscheduler/workscheduler.go index 83dbff3..545cbc2 100644 --- a/internal/workscheduler/workscheduler.go +++ b/internal/workscheduler/workscheduler.go @@ -177,3 +177,47 @@ func (w *WorkScheduler) Archive(ctx context.Context, scheduleID uuid.UUID) error return repo.Archive(ctx, scheduleID) } + +func (w *WorkScheduler) GetUnattended(ctx context.Context, registeredWorkers *worker.Workers) error { + if len(registeredWorkers.Instances) == 0 { + return nil + } + + repo := repositories.New(w.db) + + schedules, err := repo.GetUnattended(ctx, &repositories.GetUnattendedParams{ + WorkerIds: workerIDs(registeredWorkers), + Amount: 100, + }) + if err != nil { + return fmt.Errorf("failed to get unattended workers: %w", err) + } + + for i, schedule := range schedules { + worker := registeredWorkers.Instances[i%len(registeredWorkers.Instances)].WorkerID + + w.logger.InfoContext(ctx, "dispatching schedule for worker", "worker", worker, "schedule", schedule.ScheduleID) + + if err := repo.UpdateSchdule( + ctx, + &repositories.UpdateSchduleParams{ + WorkerID: worker, + ScheduleID: schedule.ScheduleID, + }, + ); err != nil { + return fmt.Errorf("failed to update schedule: %w", err) + } + } + + return nil +} + +func workerIDs(registeredWorkers *worker.Workers) []uuid.UUID { + uuids := make([]uuid.UUID, 0, len(registeredWorkers.Instances)) + + for _, registeredWorker := range registeredWorkers.Instances { + uuids = append(uuids, registeredWorker.WorkerID) + } + + return uuids +}