Compare commits
1 Commits
f2b4ee1d4f
...
3359326061
Author | SHA1 | Date | |
---|---|---|---|
|
3359326061 |
@ -6,10 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
## [0.1.0] - 2025-01-09
|
## [0.1.0] - 2025-01-06
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
- add executor (#3)
|
|
||||||
- add basic main.go
|
- add basic main.go
|
||||||
- add default
|
- add default
|
||||||
|
|
||||||
|
@ -3,7 +3,6 @@ package app
|
|||||||
import (
|
import (
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
|
||||||
"git.front.kjuulh.io/kjuulh/orbis/internal/executor"
|
|
||||||
"git.front.kjuulh.io/kjuulh/orbis/internal/scheduler"
|
"git.front.kjuulh.io/kjuulh/orbis/internal/scheduler"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -22,9 +21,5 @@ func (a *App) Logger() *slog.Logger {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) Scheduler() *scheduler.Scheduler {
|
func (a *App) Scheduler() *scheduler.Scheduler {
|
||||||
return scheduler.NewScheduler(a.logger.With("component", "scheduler"), Postgres(), a.Executor())
|
return scheduler.NewScheduler(a.logger, Postgres())
|
||||||
}
|
|
||||||
|
|
||||||
func (a *App) Executor() *executor.Executor {
|
|
||||||
return executor.NewExecutor(a.logger.With("component", "executor"))
|
|
||||||
}
|
}
|
||||||
|
@ -1,22 +0,0 @@
|
|||||||
package executor
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"log/slog"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Executor struct {
|
|
||||||
logger *slog.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewExecutor(logger *slog.Logger) *Executor {
|
|
||||||
return &Executor{
|
|
||||||
logger: logger,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Executor) DispatchEvents(ctx context.Context) error {
|
|
||||||
e.logger.InfoContext(ctx, "dispatching events")
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -8,21 +8,18 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.front.kjuulh.io/kjuulh/orbis/internal/executor"
|
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Scheduler struct {
|
type Scheduler struct {
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
db *pgx.Conn
|
db *pgx.Conn
|
||||||
executor *executor.Executor
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewScheduler(logger *slog.Logger, db *pgx.Conn, executor *executor.Executor) *Scheduler {
|
func NewScheduler(logger *slog.Logger, db *pgx.Conn) *Scheduler {
|
||||||
return &Scheduler{
|
return &Scheduler{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
db: db,
|
db: db,
|
||||||
executor: executor,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -37,14 +34,12 @@ func (s *Scheduler) Execute(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ticker := time.NewTicker(5 * time.Second)
|
|
||||||
defer ticker.Stop()
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
s.logger.Info("gracefully shutting down elected scheduler")
|
s.logger.Info("gracefully shutting down elected scheduler")
|
||||||
return nil
|
return nil
|
||||||
case <-ticker.C:
|
default:
|
||||||
if err := s.process(ctx); err != nil {
|
if err := s.process(ctx); err != nil {
|
||||||
return fmt.Errorf("scheduler failed: %w", err)
|
return fmt.Errorf("scheduler failed: %w", err)
|
||||||
}
|
}
|
||||||
@ -57,6 +52,7 @@ func (s *Scheduler) acquireLeader(ctx context.Context) (bool, error) {
|
|||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return false, nil
|
return false, nil
|
||||||
|
|
||||||
default:
|
default:
|
||||||
var acquiredLock bool
|
var acquiredLock bool
|
||||||
if err := s.db.QueryRow(ctx, "SELECT pg_try_advisory_lock(1234)").Scan(&acquiredLock); err != nil {
|
if err := s.db.QueryRow(ctx, "SELECT pg_try_advisory_lock(1234)").Scan(&acquiredLock); err != nil {
|
||||||
@ -81,9 +77,10 @@ func (s *Scheduler) acquireLeader(ctx context.Context) (bool, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scheduler) process(ctx context.Context) error {
|
func (s *Scheduler) process(ctx context.Context) error {
|
||||||
if err := s.executor.DispatchEvents(ctx); err != nil {
|
s.logger.Debug("scheduler processing items")
|
||||||
return fmt.Errorf("failed to dispatch events: %w", err)
|
|
||||||
}
|
// FIXME: simulate work
|
||||||
|
time.Sleep(time.Second * 2)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user