feat: add executor #3

Merged
kjuulh merged 1 commits from feat/add-executor into main 2025-01-09 22:49:43 +01:00
3 changed files with 42 additions and 12 deletions

View File

@ -3,6 +3,7 @@ package app
import (
"log/slog"
"git.front.kjuulh.io/kjuulh/orbis/internal/executor"
"git.front.kjuulh.io/kjuulh/orbis/internal/scheduler"
)
@ -21,5 +22,9 @@ func (a *App) Logger() *slog.Logger {
}
func (a *App) Scheduler() *scheduler.Scheduler {
return scheduler.NewScheduler(a.logger, Postgres())
return scheduler.NewScheduler(a.logger.With("component", "scheduler"), Postgres(), a.Executor())
}
func (a *App) Executor() *executor.Executor {
return executor.NewExecutor(a.logger.With("component", "executor"))
}

View File

@ -0,0 +1,22 @@
package executor
import (
"context"
"log/slog"
)
type Executor struct {
logger *slog.Logger
}
func NewExecutor(logger *slog.Logger) *Executor {
return &Executor{
logger: logger,
}
}
func (e *Executor) DispatchEvents(ctx context.Context) error {
e.logger.InfoContext(ctx, "dispatching events")
return nil
}

View File

@ -8,18 +8,21 @@ import (
"math/rand"
"time"
"git.front.kjuulh.io/kjuulh/orbis/internal/executor"
"github.com/jackc/pgx/v5"
)
type Scheduler struct {
logger *slog.Logger
db *pgx.Conn
logger *slog.Logger
db *pgx.Conn
executor *executor.Executor
}
func NewScheduler(logger *slog.Logger, db *pgx.Conn) *Scheduler {
func NewScheduler(logger *slog.Logger, db *pgx.Conn, executor *executor.Executor) *Scheduler {
return &Scheduler{
logger: logger,
db: db,
logger: logger,
db: db,
executor: executor,
}
}
@ -34,12 +37,14 @@ func (s *Scheduler) Execute(ctx context.Context) error {
return nil
}
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
s.logger.Info("gracefully shutting down elected scheduler")
return nil
default:
case <-ticker.C:
if err := s.process(ctx); err != nil {
return fmt.Errorf("scheduler failed: %w", err)
}
@ -52,7 +57,6 @@ func (s *Scheduler) acquireLeader(ctx context.Context) (bool, error) {
select {
case <-ctx.Done():
return false, nil
default:
var acquiredLock bool
if err := s.db.QueryRow(ctx, "SELECT pg_try_advisory_lock(1234)").Scan(&acquiredLock); err != nil {
@ -77,10 +81,9 @@ func (s *Scheduler) acquireLeader(ctx context.Context) (bool, error) {
}
func (s *Scheduler) process(ctx context.Context) error {
s.logger.Debug("scheduler processing items")
// FIXME: simulate work
time.Sleep(time.Second * 2)
if err := s.executor.DispatchEvents(ctx); err != nil {
return fmt.Errorf("failed to dispatch events: %w", err)
}
return nil
}