Compare commits

..

1 Commits

Author SHA1 Message Date
cuddle-please
3359326061 chore(release): 0.1.0
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
2025-01-06 22:21:14 +00:00
4 changed files with 13 additions and 44 deletions

View File

@ -6,10 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.1.0] - 2025-01-09 ## [0.1.0] - 2025-01-06
### Added ### Added
- add executor (#3)
- add basic main.go - add basic main.go
- add default - add default

View File

@ -3,7 +3,6 @@ package app
import ( import (
"log/slog" "log/slog"
"git.front.kjuulh.io/kjuulh/orbis/internal/executor"
"git.front.kjuulh.io/kjuulh/orbis/internal/scheduler" "git.front.kjuulh.io/kjuulh/orbis/internal/scheduler"
) )
@ -22,9 +21,5 @@ func (a *App) Logger() *slog.Logger {
} }
func (a *App) Scheduler() *scheduler.Scheduler { func (a *App) Scheduler() *scheduler.Scheduler {
return scheduler.NewScheduler(a.logger.With("component", "scheduler"), Postgres(), a.Executor()) return scheduler.NewScheduler(a.logger, Postgres())
}
func (a *App) Executor() *executor.Executor {
return executor.NewExecutor(a.logger.With("component", "executor"))
} }

View File

@ -1,22 +0,0 @@
package executor
import (
"context"
"log/slog"
)
type Executor struct {
logger *slog.Logger
}
func NewExecutor(logger *slog.Logger) *Executor {
return &Executor{
logger: logger,
}
}
func (e *Executor) DispatchEvents(ctx context.Context) error {
e.logger.InfoContext(ctx, "dispatching events")
return nil
}

View File

@ -8,21 +8,18 @@ import (
"math/rand" "math/rand"
"time" "time"
"git.front.kjuulh.io/kjuulh/orbis/internal/executor"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
) )
type Scheduler struct { type Scheduler struct {
logger *slog.Logger logger *slog.Logger
db *pgx.Conn db *pgx.Conn
executor *executor.Executor
} }
func NewScheduler(logger *slog.Logger, db *pgx.Conn, executor *executor.Executor) *Scheduler { func NewScheduler(logger *slog.Logger, db *pgx.Conn) *Scheduler {
return &Scheduler{ return &Scheduler{
logger: logger, logger: logger,
db: db, db: db,
executor: executor,
} }
} }
@ -37,14 +34,12 @@ func (s *Scheduler) Execute(ctx context.Context) error {
return nil return nil
} }
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
s.logger.Info("gracefully shutting down elected scheduler") s.logger.Info("gracefully shutting down elected scheduler")
return nil return nil
case <-ticker.C: default:
if err := s.process(ctx); err != nil { if err := s.process(ctx); err != nil {
return fmt.Errorf("scheduler failed: %w", err) return fmt.Errorf("scheduler failed: %w", err)
} }
@ -57,6 +52,7 @@ func (s *Scheduler) acquireLeader(ctx context.Context) (bool, error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return false, nil return false, nil
default: default:
var acquiredLock bool var acquiredLock bool
if err := s.db.QueryRow(ctx, "SELECT pg_try_advisory_lock(1234)").Scan(&acquiredLock); err != nil { if err := s.db.QueryRow(ctx, "SELECT pg_try_advisory_lock(1234)").Scan(&acquiredLock); err != nil {
@ -81,9 +77,10 @@ func (s *Scheduler) acquireLeader(ctx context.Context) (bool, error) {
} }
func (s *Scheduler) process(ctx context.Context) error { func (s *Scheduler) process(ctx context.Context) error {
if err := s.executor.DispatchEvents(ctx); err != nil { s.logger.Debug("scheduler processing items")
return fmt.Errorf("failed to dispatch events: %w", err)
} // FIXME: simulate work
time.Sleep(time.Second * 2)
return nil return nil
} }