Compare commits
2 Commits
1819b214a6
...
072e908cf3
Author | SHA1 | Date | |
---|---|---|---|
|
072e908cf3 | ||
1d4a72fd5f |
40
CHANGELOG.md
Normal file
40
CHANGELOG.md
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
# Changelog
|
||||||
|
All notable changes to this project will be documented in this file.
|
||||||
|
|
||||||
|
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||||
|
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||||
|
|
||||||
|
## [Unreleased]
|
||||||
|
|
||||||
|
## [0.1.0] - 2025-01-18
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- prune old workers
|
||||||
|
- deregister worker on close
|
||||||
|
- add worker distributor and model registry
|
||||||
|
- enable worker process
|
||||||
|
- add migration
|
||||||
|
- add executor (#3)
|
||||||
|
Adds an executor which can process and dispatch events to a set of workers.
|
||||||
|
Co-authored-by: kjuulh <contact@kjuulh.io>
|
||||||
|
Co-committed-by: kjuulh <contact@kjuulh.io>
|
||||||
|
- add basic main.go
|
||||||
|
- add default
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- use dedicated connection for scheduler process
|
||||||
|
- *(deps)* update module gitlab.com/greyxor/slogor to v1.6.1
|
||||||
|
- orbis demo
|
||||||
|
|
||||||
|
### Other
|
||||||
|
- add more specific log for when leader is acquired
|
||||||
|
- add basic leader election on top of postgres
|
||||||
|
|
||||||
|
- add orbis demo
|
||||||
|
|
||||||
|
- add basic scheduler
|
||||||
|
|
||||||
|
- add utility scripts
|
||||||
|
- add utility scripts
|
||||||
|
- add logger
|
||||||
|
- add cmd
|
@ -26,7 +26,7 @@ func (a *App) Logger() *slog.Logger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) Scheduler() *scheduler.Scheduler {
|
func (a *App) Scheduler() *scheduler.Scheduler {
|
||||||
return scheduler.NewScheduler(a.logger.With("component", "scheduler"), Postgres(), a.Executor())
|
return scheduler.NewScheduler(a.logger.With("component", "scheduler"), Postgres(), a.Executor(), a.Worker())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) Executor() *executor.Executor {
|
func (a *App) Executor() *executor.Executor {
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.front.kjuulh.io/kjuulh/orbis/internal/executor"
|
"git.front.kjuulh.io/kjuulh/orbis/internal/executor"
|
||||||
|
"git.front.kjuulh.io/kjuulh/orbis/internal/worker"
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
)
|
)
|
||||||
@ -17,13 +18,15 @@ type Scheduler struct {
|
|||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
db *pgxpool.Pool
|
db *pgxpool.Pool
|
||||||
executor *executor.Executor
|
executor *executor.Executor
|
||||||
|
worker *worker.Worker
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewScheduler(logger *slog.Logger, db *pgxpool.Pool, executor *executor.Executor) *Scheduler {
|
func NewScheduler(logger *slog.Logger, db *pgxpool.Pool, executor *executor.Executor, worker *worker.Worker) *Scheduler {
|
||||||
return &Scheduler{
|
return &Scheduler{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
db: db,
|
db: db,
|
||||||
executor: executor,
|
executor: executor,
|
||||||
|
worker: worker,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -96,6 +99,10 @@ func (s *Scheduler) acquireLeader(ctx context.Context) (bool, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scheduler) process(ctx context.Context) error {
|
func (s *Scheduler) process(ctx context.Context) error {
|
||||||
|
if err := s.worker.Prune(ctx); err != nil {
|
||||||
|
return fmt.Errorf("failed to prune error: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := s.executor.DispatchEvents(ctx); err != nil {
|
if err := s.executor.DispatchEvents(ctx); err != nil {
|
||||||
return fmt.Errorf("failed to dispatch events: %w", err)
|
return fmt.Errorf("failed to dispatch events: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -26,3 +26,8 @@ WHERE
|
|||||||
DELETE FROM worker_register
|
DELETE FROM worker_register
|
||||||
WHERE
|
WHERE
|
||||||
worker_id = $1;
|
worker_id = $1;
|
||||||
|
|
||||||
|
-- name: PruneWorker :exec
|
||||||
|
DELETE FROM worker_register
|
||||||
|
WHERE
|
||||||
|
heart_beat <= now() - INTERVAL '10 minutes';
|
||||||
|
@ -14,6 +14,7 @@ type Querier interface {
|
|||||||
DeregisterWorker(ctx context.Context, workerID uuid.UUID) error
|
DeregisterWorker(ctx context.Context, workerID uuid.UUID) error
|
||||||
GetWorkers(ctx context.Context) ([]*GetWorkersRow, error)
|
GetWorkers(ctx context.Context) ([]*GetWorkersRow, error)
|
||||||
Ping(ctx context.Context) (int32, error)
|
Ping(ctx context.Context) (int32, error)
|
||||||
|
PruneWorker(ctx context.Context) error
|
||||||
RegisterWorker(ctx context.Context, arg *RegisterWorkerParams) error
|
RegisterWorker(ctx context.Context, arg *RegisterWorkerParams) error
|
||||||
UpdateWorkerHeartbeat(ctx context.Context, workerID uuid.UUID) error
|
UpdateWorkerHeartbeat(ctx context.Context, workerID uuid.UUID) error
|
||||||
}
|
}
|
||||||
|
@ -66,6 +66,17 @@ func (q *Queries) Ping(ctx context.Context) (int32, error) {
|
|||||||
return column_1, err
|
return column_1, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const pruneWorker = `-- name: PruneWorker :exec
|
||||||
|
DELETE FROM worker_register
|
||||||
|
WHERE
|
||||||
|
heart_beat <= now() - INTERVAL '10 minutes'
|
||||||
|
`
|
||||||
|
|
||||||
|
func (q *Queries) PruneWorker(ctx context.Context) error {
|
||||||
|
_, err := q.db.Exec(ctx, pruneWorker)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
const registerWorker = `-- name: RegisterWorker :exec
|
const registerWorker = `-- name: RegisterWorker :exec
|
||||||
INSERT INTO worker_register (worker_id, capacity)
|
INSERT INTO worker_register (worker_id, capacity)
|
||||||
VALUES (
|
VALUES (
|
||||||
|
@ -173,3 +173,9 @@ func (w *Worker) updateHeartBeat(ctx context.Context) error {
|
|||||||
func (w *Worker) processWorkQueue(ctx context.Context) error {
|
func (w *Worker) processWorkQueue(ctx context.Context) error {
|
||||||
return w.workProcessor.ProcessNext(ctx, w.workerID)
|
return w.workProcessor.ProcessNext(ctx, w.workerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *Worker) Prune(ctx context.Context) error {
|
||||||
|
repo := repositories.New(w.db)
|
||||||
|
|
||||||
|
return repo.PruneWorker(ctx)
|
||||||
|
}
|
||||||
|
@ -40,7 +40,7 @@ func (w *WorkProcessor) ProcessNext(ctx context.Context, workerID uuid.UUID) err
|
|||||||
return fmt.Errorf("failed to start processing items: %w", err)
|
return fmt.Errorf("failed to start processing items: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(time.Millisecond * 10)
|
||||||
|
|
||||||
if err := w.workscheduler.Archive(ctx, *schedule); err != nil {
|
if err := w.workscheduler.Archive(ctx, *schedule); err != nil {
|
||||||
return fmt.Errorf("failed to archive item: %w", err)
|
return fmt.Errorf("failed to archive item: %w", err)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user