From 4275294e690901452153be27b33022292f5857f2 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Thu, 9 Jan 2025 22:48:17 +0100 Subject: [PATCH] feat: add executor --- internal/app/app.go | 7 ++++++- internal/executor/executor.go | 22 ++++++++++++++++++++++ internal/scheduler/scheduler.go | 25 ++++++++++++++----------- 3 files changed, 42 insertions(+), 12 deletions(-) create mode 100644 internal/executor/executor.go diff --git a/internal/app/app.go b/internal/app/app.go index b191252..1e0b8ce 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -3,6 +3,7 @@ package app import ( "log/slog" + "git.front.kjuulh.io/kjuulh/orbis/internal/executor" "git.front.kjuulh.io/kjuulh/orbis/internal/scheduler" ) @@ -21,5 +22,9 @@ func (a *App) Logger() *slog.Logger { } func (a *App) Scheduler() *scheduler.Scheduler { - return scheduler.NewScheduler(a.logger, Postgres()) + return scheduler.NewScheduler(a.logger.With("component", "scheduler"), Postgres(), a.Executor()) +} + +func (a *App) Executor() *executor.Executor { + return executor.NewExecutor(a.logger.With("component", "executor")) } diff --git a/internal/executor/executor.go b/internal/executor/executor.go new file mode 100644 index 0000000..968c943 --- /dev/null +++ b/internal/executor/executor.go @@ -0,0 +1,22 @@ +package executor + +import ( + "context" + "log/slog" +) + +type Executor struct { + logger *slog.Logger +} + +func NewExecutor(logger *slog.Logger) *Executor { + return &Executor{ + logger: logger, + } +} + +func (e *Executor) DispatchEvents(ctx context.Context) error { + e.logger.InfoContext(ctx, "dispatching events") + + return nil +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 2373ce6..49e3d0c 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -8,18 +8,21 @@ import ( "math/rand" "time" + "git.front.kjuulh.io/kjuulh/orbis/internal/executor" "github.com/jackc/pgx/v5" ) type Scheduler struct { - logger *slog.Logger - db *pgx.Conn + logger *slog.Logger + db *pgx.Conn + executor *executor.Executor } -func NewScheduler(logger *slog.Logger, db *pgx.Conn) *Scheduler { +func NewScheduler(logger *slog.Logger, db *pgx.Conn, executor *executor.Executor) *Scheduler { return &Scheduler{ - logger: logger, - db: db, + logger: logger, + db: db, + executor: executor, } } @@ -34,12 +37,14 @@ func (s *Scheduler) Execute(ctx context.Context) error { return nil } + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() for { select { case <-ctx.Done(): s.logger.Info("gracefully shutting down elected scheduler") return nil - default: + case <-ticker.C: if err := s.process(ctx); err != nil { return fmt.Errorf("scheduler failed: %w", err) } @@ -52,7 +57,6 @@ func (s *Scheduler) acquireLeader(ctx context.Context) (bool, error) { select { case <-ctx.Done(): return false, nil - default: var acquiredLock bool if err := s.db.QueryRow(ctx, "SELECT pg_try_advisory_lock(1234)").Scan(&acquiredLock); err != nil { @@ -77,10 +81,9 @@ func (s *Scheduler) acquireLeader(ctx context.Context) (bool, error) { } func (s *Scheduler) process(ctx context.Context) error { - s.logger.Debug("scheduler processing items") - - // FIXME: simulate work - time.Sleep(time.Second * 2) + if err := s.executor.DispatchEvents(ctx); err != nil { + return fmt.Errorf("failed to dispatch events: %w", err) + } return nil } -- 2.45.2