Compare commits

...

2 Commits

Author SHA1 Message Date
cuddle-please
072e908cf3 chore(release): 0.1.0
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2025-01-18 11:30:36 +00:00
1d4a72fd5f
feat: prune old workers
All checks were successful
continuous-integration/drone/push Build is passing
2025-01-18 12:30:15 +01:00
8 changed files with 73 additions and 3 deletions

40
CHANGELOG.md Normal file
View File

@ -0,0 +1,40 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.1.0] - 2025-01-18
### Added
- prune old workers
- deregister worker on close
- add worker distributor and model registry
- enable worker process
- add migration
- add executor (#3)
Adds an executor which can process and dispatch events to a set of workers.
Co-authored-by: kjuulh <contact@kjuulh.io>
Co-committed-by: kjuulh <contact@kjuulh.io>
- add basic main.go
- add default
### Fixed
- use dedicated connection for scheduler process
- *(deps)* update module gitlab.com/greyxor/slogor to v1.6.1
- orbis demo
### Other
- add more specific log for when leader is acquired
- add basic leader election on top of postgres
- add orbis demo
- add basic scheduler
- add utility scripts
- add utility scripts
- add logger
- add cmd

View File

@ -26,7 +26,7 @@ func (a *App) Logger() *slog.Logger {
} }
func (a *App) Scheduler() *scheduler.Scheduler { func (a *App) Scheduler() *scheduler.Scheduler {
return scheduler.NewScheduler(a.logger.With("component", "scheduler"), Postgres(), a.Executor()) return scheduler.NewScheduler(a.logger.With("component", "scheduler"), Postgres(), a.Executor(), a.Worker())
} }
func (a *App) Executor() *executor.Executor { func (a *App) Executor() *executor.Executor {

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"git.front.kjuulh.io/kjuulh/orbis/internal/executor" "git.front.kjuulh.io/kjuulh/orbis/internal/executor"
"git.front.kjuulh.io/kjuulh/orbis/internal/worker"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
) )
@ -17,13 +18,15 @@ type Scheduler struct {
logger *slog.Logger logger *slog.Logger
db *pgxpool.Pool db *pgxpool.Pool
executor *executor.Executor executor *executor.Executor
worker *worker.Worker
} }
func NewScheduler(logger *slog.Logger, db *pgxpool.Pool, executor *executor.Executor) *Scheduler { func NewScheduler(logger *slog.Logger, db *pgxpool.Pool, executor *executor.Executor, worker *worker.Worker) *Scheduler {
return &Scheduler{ return &Scheduler{
logger: logger, logger: logger,
db: db, db: db,
executor: executor, executor: executor,
worker: worker,
} }
} }
@ -96,6 +99,10 @@ func (s *Scheduler) acquireLeader(ctx context.Context) (bool, error) {
} }
func (s *Scheduler) process(ctx context.Context) error { func (s *Scheduler) process(ctx context.Context) error {
if err := s.worker.Prune(ctx); err != nil {
return fmt.Errorf("failed to prune error: %w", err)
}
if err := s.executor.DispatchEvents(ctx); err != nil { if err := s.executor.DispatchEvents(ctx); err != nil {
return fmt.Errorf("failed to dispatch events: %w", err) return fmt.Errorf("failed to dispatch events: %w", err)
} }

View File

@ -26,3 +26,8 @@ WHERE
DELETE FROM worker_register DELETE FROM worker_register
WHERE WHERE
worker_id = $1; worker_id = $1;
-- name: PruneWorker :exec
DELETE FROM worker_register
WHERE
heart_beat <= now() - INTERVAL '10 minutes';

View File

@ -14,6 +14,7 @@ type Querier interface {
DeregisterWorker(ctx context.Context, workerID uuid.UUID) error DeregisterWorker(ctx context.Context, workerID uuid.UUID) error
GetWorkers(ctx context.Context) ([]*GetWorkersRow, error) GetWorkers(ctx context.Context) ([]*GetWorkersRow, error)
Ping(ctx context.Context) (int32, error) Ping(ctx context.Context) (int32, error)
PruneWorker(ctx context.Context) error
RegisterWorker(ctx context.Context, arg *RegisterWorkerParams) error RegisterWorker(ctx context.Context, arg *RegisterWorkerParams) error
UpdateWorkerHeartbeat(ctx context.Context, workerID uuid.UUID) error UpdateWorkerHeartbeat(ctx context.Context, workerID uuid.UUID) error
} }

View File

@ -66,6 +66,17 @@ func (q *Queries) Ping(ctx context.Context) (int32, error) {
return column_1, err return column_1, err
} }
const pruneWorker = `-- name: PruneWorker :exec
DELETE FROM worker_register
WHERE
heart_beat <= now() - INTERVAL '10 minutes'
`
func (q *Queries) PruneWorker(ctx context.Context) error {
_, err := q.db.Exec(ctx, pruneWorker)
return err
}
const registerWorker = `-- name: RegisterWorker :exec const registerWorker = `-- name: RegisterWorker :exec
INSERT INTO worker_register (worker_id, capacity) INSERT INTO worker_register (worker_id, capacity)
VALUES ( VALUES (

View File

@ -173,3 +173,9 @@ func (w *Worker) updateHeartBeat(ctx context.Context) error {
func (w *Worker) processWorkQueue(ctx context.Context) error { func (w *Worker) processWorkQueue(ctx context.Context) error {
return w.workProcessor.ProcessNext(ctx, w.workerID) return w.workProcessor.ProcessNext(ctx, w.workerID)
} }
func (w *Worker) Prune(ctx context.Context) error {
repo := repositories.New(w.db)
return repo.PruneWorker(ctx)
}

View File

@ -40,7 +40,7 @@ func (w *WorkProcessor) ProcessNext(ctx context.Context, workerID uuid.UUID) err
return fmt.Errorf("failed to start processing items: %w", err) return fmt.Errorf("failed to start processing items: %w", err)
} }
time.Sleep(10 * time.Second) time.Sleep(time.Millisecond * 10)
if err := w.workscheduler.Archive(ctx, *schedule); err != nil { if err := w.workscheduler.Archive(ctx, *schedule); err != nil {
return fmt.Errorf("failed to archive item: %w", err) return fmt.Errorf("failed to archive item: %w", err)