diff --git a/go.mod b/go.mod index 1c5301a..39fbff3 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module git.front.kjuulh.io/kjuulh/orbis go 1.23.4 require ( + github.com/adhocore/gronx v1.19.5 github.com/golang-migrate/migrate/v4 v4.18.1 github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.7.2 diff --git a/go.sum b/go.sum index 96636fa..faabd9e 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25 github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= +github.com/adhocore/gronx v1.19.5 h1:cwIG4nT1v9DvadxtHBe6MzE+FZ1JDvAUC45U2fl4eSQ= +github.com/adhocore/gronx v1.19.5/go.mod h1:7oUY1WAU8rEJWmAxXR2DN0JaO4gi9khSgKjiRypqteg= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/internal/app/app.go b/internal/app/app.go index 09cf844..d46cd42 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -4,8 +4,11 @@ import ( "log/slog" "git.front.kjuulh.io/kjuulh/orbis/internal/executor" + "git.front.kjuulh.io/kjuulh/orbis/internal/modelschedule" "git.front.kjuulh.io/kjuulh/orbis/internal/scheduler" "git.front.kjuulh.io/kjuulh/orbis/internal/worker" + "git.front.kjuulh.io/kjuulh/orbis/internal/workprocessor" + "git.front.kjuulh.io/kjuulh/orbis/internal/workscheduler" ) type App struct { @@ -27,9 +30,27 @@ func (a *App) Scheduler() *scheduler.Scheduler { } func (a *App) Executor() *executor.Executor { - return executor.NewExecutor(a.logger.With("component", "executor")) + return executor.NewExecutor( + a.logger.With("component", "executor"), + ModelRegistry(), + a.ModelSchedule(), + a.Worker(), + a.WorkScheduler(), + ) } func (a *App) Worker() *worker.Worker { - return worker.NewWorker(Postgres(), a.logger) + return worker.NewWorker(Postgres(), a.logger, a.WorkProcessor()) +} + +func (a *App) WorkScheduler() *workscheduler.WorkScheduler { + return workscheduler.NewWorkScheduler(Postgres(), a.logger) +} + +func (a *App) WorkProcessor() *workprocessor.WorkProcessor { + return workprocessor.NewWorkProcessor(a.WorkScheduler(), a.logger) +} + +func (a *App) ModelSchedule() *modelschedule.ModelSchedule { + return modelschedule.NewModelSchedule(a.logger, Postgres()) } diff --git a/internal/app/model_registry.go b/internal/app/model_registry.go new file mode 100644 index 0000000..d700437 --- /dev/null +++ b/internal/app/model_registry.go @@ -0,0 +1,10 @@ +package app + +import ( + "git.front.kjuulh.io/kjuulh/orbis/internal/modelregistry" + "git.front.kjuulh.io/kjuulh/orbis/internal/utilities" +) + +var ModelRegistry = utilities.Singleton(func() (*modelregistry.ModelRegistry, error) { + return modelregistry.NewModelRegistry(), nil +}) diff --git a/internal/executor/executor.go b/internal/executor/executor.go index e61a5b3..9e8a9fe 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -2,26 +2,84 @@ package executor import ( "context" + "fmt" "log/slog" + "time" + + "git.front.kjuulh.io/kjuulh/orbis/internal/modelregistry" + "git.front.kjuulh.io/kjuulh/orbis/internal/modelschedule" + "git.front.kjuulh.io/kjuulh/orbis/internal/worker" + "git.front.kjuulh.io/kjuulh/orbis/internal/workscheduler" ) type Executor struct { logger *slog.Logger + + modelRegistry *modelregistry.ModelRegistry + modelSchedule *modelschedule.ModelSchedule + worker *worker.Worker + workerscheduler *workscheduler.WorkScheduler } -func NewExecutor(logger *slog.Logger) *Executor { +func NewExecutor( + logger *slog.Logger, + modelRegistry *modelregistry.ModelRegistry, + modelSchedule *modelschedule.ModelSchedule, + worker *worker.Worker, + workerscheduler *workscheduler.WorkScheduler, +) *Executor { return &Executor{ logger: logger, + + modelRegistry: modelRegistry, + modelSchedule: modelSchedule, + worker: worker, + workerscheduler: workerscheduler, } } func (e *Executor) DispatchEvents(ctx context.Context) error { e.logger.InfoContext(ctx, "dispatching events") - // TODO: Process updates to models - // TODO: Insert new cron for runtime - // TODO: Calculate time since last run - // TODO: Send events for workers to pick up + start := time.Now().Add(-time.Second * 30) + end := time.Now() + + models, err := e.modelRegistry.GetModels() + if err != nil { + return fmt.Errorf("failed to get models from registry: %w", err) + } + + registeredWorkers, err := e.worker.GetWorkers(ctx) + if err != nil { + return fmt.Errorf("failed to find workers: %w", err) + } + + workers, err := e.workerscheduler.GetWorkers(ctx, registeredWorkers) + if err != nil { + return fmt.Errorf("failed to find workers: %w", err) + } + + for workers := range workers.IterateSlice(2000) { + for _, model := range models { + modelRuns, lastRun, err := e.modelSchedule.GetNext(ctx, model, start, end, uint(len(workers))) + if err != nil { + return err + } + + for i, modelRun := range modelRuns { + worker := workers[i] + e.logger.DebugContext(ctx, "dispatching model run", "modelRun", modelRun.Model.Name, "start", modelRun.Start, "end", modelRun.End) + + if err := e.workerscheduler.InsertModelRun(ctx, worker, &modelRun); err != nil { + return fmt.Errorf("failed to register model run: %w", err) + } + } + + if err := e.modelSchedule.UpdateModelRun(ctx, model, lastRun); err != nil { + return fmt.Errorf("failed to update checkpoint for model: %w", err) + } + } + } return nil } diff --git a/internal/modelregistry/registry.go b/internal/modelregistry/registry.go new file mode 100644 index 0000000..eefa589 --- /dev/null +++ b/internal/modelregistry/registry.go @@ -0,0 +1,105 @@ +package modelregistry + +import "sync" + +type Model struct { + Name string `json:"name"` + Schedule string `json:"schedule"` + Lookup string `json:"lookup"` +} + +type ModelRegistry struct { + models []Model + modelsLock sync.RWMutex +} + +func NewModelRegistry() *ModelRegistry { + return &ModelRegistry{ + models: make([]Model, 0, 128), // We start off with a capacity of 128 models. Makes memory more deterministic + } +} + +func (m *ModelRegistry) GetModels() ([]Model, error) { + m.modelsLock.RLock() + defer m.modelsLock.RUnlock() + + return []Model{ + { + Name: "69C42481-650D-46E0-9C96-3D61B96565FB", + Schedule: "* * * * * *", + Lookup: "something", + }, + { + Name: "5B0F96E5-BC37-427E-B615-E635156386F0", + Schedule: "* * * * * *", + Lookup: "something", + }, + { + Name: "5A511180-6613-4F8E-9125-2E8FE272E03C", + Schedule: "* * * * * *", + Lookup: "something", + }, + { + Name: "4EFE77E8-082B-4828-8527-635E5B6253D9", + Schedule: "* * * * * *", + Lookup: "something", + }, + { + Name: "E53DCA1E-641B-421A-9DB6-1A6F09F3D96D", + Schedule: "* * * * * *", + Lookup: "something", + }, + { + Name: "63BDC98C-ECBA-44FD-BFAE-056B1C004077", + Schedule: "* * * * * *", + Lookup: "something", + }, + { + Name: "C18A1948-0045-4099-AC58-5B7C587AC0F0", + Schedule: "* * * * * *", + Lookup: "something", + }, + { + Name: "8B87FDB5-A119-43C0-8D15-9B517577A8AE", + Schedule: "* * * * * *", + Lookup: "something", + }, + { + Name: "4E121E78-1CBD-4BC1-8A10-14354B76E553", + Schedule: "* * * * * *", + Lookup: "something", + }, + { + Name: "14AF7CDF-783F-4DFE-8B3D-E4C23C12AEDC", + Schedule: "* * * * * *", + Lookup: "something", + }, + { + Name: "60FE99E9-4EF7-40A5-A19D-9A439BA12B24", + Schedule: "* * * * * *", + Lookup: "something", + }, + { + Name: "622C03C2-CAF7-4708-B26D-D536E3C3F4DD", + Schedule: "* * * * * *", + Lookup: "something", + }, + { + Name: "E7BC4A8D-FFF6-4A8D-A48B-6569340746E4", + Schedule: "* * * * * *", + Lookup: "something", + }, + { + Name: "7CB66BA1-CF1E-4FA6-8C32-F048D88FCE54", + Schedule: "* * * * * *", + Lookup: "something", + }, + { + Name: "82518E22-EFED-4AA8-AC19-FF3D81ECE609", + Schedule: "* * * * * *", + Lookup: "something", + }, + }, nil + + //return m.models, nil +} diff --git a/internal/modelschedule/queries.sql b/internal/modelschedule/queries.sql new file mode 100644 index 0000000..d1faf58 --- /dev/null +++ b/internal/modelschedule/queries.sql @@ -0,0 +1,18 @@ +-- name: Ping :one +SELECT 1; + +-- name: GetLast :one +SELECT last_run +FROM + model_schedules +WHERE + model_name = $1 +LIMIT 1; + +-- name: UpsertModel :exec +INSERT INTO model_schedules (model_name, last_run) +VALUES ($1, $2) +ON CONFLICT (model_name) +DO UPDATE SET +last_run = excluded.last_run; + diff --git a/internal/modelschedule/repositories/db.go b/internal/modelschedule/repositories/db.go new file mode 100644 index 0000000..e79f6b4 --- /dev/null +++ b/internal/modelschedule/repositories/db.go @@ -0,0 +1,32 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.23.0 + +package repositories + +import ( + "context" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +type DBTX interface { + Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) + Query(context.Context, string, ...interface{}) (pgx.Rows, error) + QueryRow(context.Context, string, ...interface{}) pgx.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx pgx.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/internal/modelschedule/repositories/models.go b/internal/modelschedule/repositories/models.go new file mode 100644 index 0000000..fcc9201 --- /dev/null +++ b/internal/modelschedule/repositories/models.go @@ -0,0 +1,30 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.23.0 + +package repositories + +import ( + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgtype" +) + +type ModelSchedule struct { + ModelName string `json:"model_name"` + LastRun pgtype.Timestamptz `json:"last_run"` +} + +type WorkSchedule struct { + ScheduleID uuid.UUID `json:"schedule_id"` + WorkerID uuid.UUID `json:"worker_id"` + StartRun pgtype.Timestamptz `json:"start_run"` + EndRun pgtype.Timestamptz `json:"end_run"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` + State string `json:"state"` +} + +type WorkerRegister struct { + WorkerID uuid.UUID `json:"worker_id"` + Capacity int32 `json:"capacity"` + HeartBeat pgtype.Timestamptz `json:"heart_beat"` +} diff --git a/internal/modelschedule/repositories/querier.go b/internal/modelschedule/repositories/querier.go new file mode 100644 index 0000000..90a1d19 --- /dev/null +++ b/internal/modelschedule/repositories/querier.go @@ -0,0 +1,19 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.23.0 + +package repositories + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +type Querier interface { + GetLast(ctx context.Context, modelName string) (pgtype.Timestamptz, error) + Ping(ctx context.Context) (int32, error) + UpsertModel(ctx context.Context, arg *UpsertModelParams) error +} + +var _ Querier = (*Queries)(nil) diff --git a/internal/modelschedule/repositories/queries.sql.go b/internal/modelschedule/repositories/queries.sql.go new file mode 100644 index 0000000..460ce2b --- /dev/null +++ b/internal/modelschedule/repositories/queries.sql.go @@ -0,0 +1,57 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.23.0 +// source: queries.sql + +package repositories + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +const getLast = `-- name: GetLast :one +SELECT last_run +FROM + model_schedules +WHERE + model_name = $1 +LIMIT 1 +` + +func (q *Queries) GetLast(ctx context.Context, modelName string) (pgtype.Timestamptz, error) { + row := q.db.QueryRow(ctx, getLast, modelName) + var last_run pgtype.Timestamptz + err := row.Scan(&last_run) + return last_run, err +} + +const ping = `-- name: Ping :one +SELECT 1 +` + +func (q *Queries) Ping(ctx context.Context) (int32, error) { + row := q.db.QueryRow(ctx, ping) + var column_1 int32 + err := row.Scan(&column_1) + return column_1, err +} + +const upsertModel = `-- name: UpsertModel :exec +INSERT INTO model_schedules (model_name, last_run) +VALUES ($1, $2) +ON CONFLICT (model_name) +DO UPDATE SET +last_run = excluded.last_run +` + +type UpsertModelParams struct { + ModelName string `json:"model_name"` + LastRun pgtype.Timestamptz `json:"last_run"` +} + +func (q *Queries) UpsertModel(ctx context.Context, arg *UpsertModelParams) error { + _, err := q.db.Exec(ctx, upsertModel, arg.ModelName, arg.LastRun) + return err +} diff --git a/internal/modelschedule/schedule.go b/internal/modelschedule/schedule.go new file mode 100644 index 0000000..f0c3703 --- /dev/null +++ b/internal/modelschedule/schedule.go @@ -0,0 +1,105 @@ +package modelschedule + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "git.front.kjuulh.io/kjuulh/orbis/internal/modelregistry" + "git.front.kjuulh.io/kjuulh/orbis/internal/modelschedule/repositories" + "github.com/adhocore/gronx" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" +) + +//go:generate sqlc generate + +type ModelRunSchedule struct { + Model *modelregistry.Model + Start time.Time + End time.Time +} + +type ModelSchedule struct { + logger *slog.Logger + + db *pgxpool.Pool +} + +func NewModelSchedule(logger *slog.Logger, db *pgxpool.Pool) *ModelSchedule { + return &ModelSchedule{ + logger: logger, + + db: db, + } +} + +func (m *ModelSchedule) GetNext( + ctx context.Context, + model modelregistry.Model, + start time.Time, + end time.Time, + amount uint, +) (models []ModelRunSchedule, lastExecuted *time.Time, err error) { + repo := repositories.New(m.db) + + var startRun time.Time + lastRun, err := repo.GetLast(ctx, model.Name) + if err != nil { + if !errors.Is(err, pgx.ErrNoRows) { + return nil, nil, fmt.Errorf("failed to get last run for mode: %s: %w", model.Name, err) + } + + startRun = start + } else { + startRun = lastRun.Time + } + + times := make([]ModelRunSchedule, 0, amount) + for { + next, err := gronx.NextTickAfter(model.Schedule, startRun, false) + if err != nil { + return nil, nil, fmt.Errorf("failed to find next model schedule: %w", err) + } + + if next.Equal(time.Time{}) { + break + } + + if next.After(end) { + break + } + + times = append(times, ModelRunSchedule{ + Model: &model, + Start: startRun, + End: next, + }) + startRun = next + + if len(times) >= int(amount) { + break + } + } + + if len(times) == 0 { + return nil, nil, nil + } + + return times, &startRun, nil +} + +func (m *ModelSchedule) UpdateModelRun(ctx context.Context, model modelregistry.Model, lastRun *time.Time) error { + repo := repositories.New(m.db) + + return repo.UpsertModel(ctx, &repositories.UpsertModelParams{ + ModelName: model.Name, + LastRun: pgtype.Timestamptz{ + Time: *lastRun, + Valid: true, + }, + }) +} diff --git a/internal/modelschedule/sqlc.yaml b/internal/modelschedule/sqlc.yaml new file mode 100644 index 0000000..30fdb94 --- /dev/null +++ b/internal/modelschedule/sqlc.yaml @@ -0,0 +1,21 @@ +version: "2" +sql: + - queries: queries.sql + schema: ../persistence/migrations/ + engine: "postgresql" + gen: + go: + out: "repositories" + package: "repositories" + sql_package: "pgx/v5" + emit_json_tags: true + emit_prepared_queries: true + emit_interface: true + emit_empty_slices: true + emit_result_struct_pointers: true + emit_params_struct_pointers: true + overrides: + - db_type: "uuid" + go_type: + import: "github.com/google/uuid" + type: "UUID" diff --git a/internal/persistence/migrations/001_create_initial.up.sql b/internal/persistence/migrations/001_create_initial.up.sql index 37faf84..028773e 100644 --- a/internal/persistence/migrations/001_create_initial.up.sql +++ b/internal/persistence/migrations/001_create_initial.up.sql @@ -1,4 +1,22 @@ CREATE TABLE worker_register ( - worker_id UUID PRIMARY KEY NOT NULL + worker_id UUID PRIMARY KEY NOT NULL + , capacity INTEGER NOT NULL , heart_beat TIMESTAMPTZ NOT NULL DEFAULT now() ); + +CREATE TABLE model_schedules ( + model_name TEXT PRIMARY KEY NOT NULL + , last_run TIMESTAMPTZ +); + +CREATE TABLE work_schedule ( + schedule_id UUID PRIMARY KEY NOT NULL + , worker_id UUID NOT NULL + , start_run TIMESTAMPTZ NOT NULL + , end_run TIMESTAMPTZ NOT NULL + , updated_at TIMESTAMPTZ NOT NULL DEFAULT now() + , state TEXT NOT NULL +); + +CREATE INDEX idx_work_schedule_worker ON work_schedule (worker_id); +CREATE INDEX idx_work_schedule_worker_updated ON work_schedule (worker_id, updated_at DESC); diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index e8ab133..8e06f1c 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -62,12 +62,13 @@ func (s *Scheduler) Execute(ctx context.Context) error { } func (s *Scheduler) acquireLeader(ctx context.Context) (bool, error) { + db, err := s.db.Acquire(ctx) + for { select { case <-ctx.Done(): return false, nil default: - db, err := s.db.Acquire(ctx) if err != nil { return false, fmt.Errorf("failed to acquire db connection: %w", err) } diff --git a/internal/worker/queries.sql b/internal/worker/queries.sql index 87a0791..2b15dfd 100644 --- a/internal/worker/queries.sql +++ b/internal/worker/queries.sql @@ -2,14 +2,23 @@ SELECT 1; -- name: RegisterWorker :exec -INSERT INTO worker_register (worker_id) +INSERT INTO worker_register (worker_id, capacity) VALUES ( - $1 + $1 + , $2 ); +-- name: GetWorkers :many +SELECT + worker_id + , capacity +FROM + worker_register; + -- name: UpdateWorkerHeartbeat :exec UPDATE worker_register SET heart_beat = now() WHERE worker_id = $1; + diff --git a/internal/worker/repositories/models.go b/internal/worker/repositories/models.go index 5529db0..fcc9201 100644 --- a/internal/worker/repositories/models.go +++ b/internal/worker/repositories/models.go @@ -9,7 +9,22 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +type ModelSchedule struct { + ModelName string `json:"model_name"` + LastRun pgtype.Timestamptz `json:"last_run"` +} + +type WorkSchedule struct { + ScheduleID uuid.UUID `json:"schedule_id"` + WorkerID uuid.UUID `json:"worker_id"` + StartRun pgtype.Timestamptz `json:"start_run"` + EndRun pgtype.Timestamptz `json:"end_run"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` + State string `json:"state"` +} + type WorkerRegister struct { WorkerID uuid.UUID `json:"worker_id"` + Capacity int32 `json:"capacity"` HeartBeat pgtype.Timestamptz `json:"heart_beat"` } diff --git a/internal/worker/repositories/querier.go b/internal/worker/repositories/querier.go index 4978f72..76a11a1 100644 --- a/internal/worker/repositories/querier.go +++ b/internal/worker/repositories/querier.go @@ -11,8 +11,9 @@ import ( ) type Querier interface { + GetWorkers(ctx context.Context) ([]*GetWorkersRow, error) Ping(ctx context.Context) (int32, error) - RegisterWorker(ctx context.Context, workerID uuid.UUID) error + RegisterWorker(ctx context.Context, arg *RegisterWorkerParams) error UpdateWorkerHeartbeat(ctx context.Context, workerID uuid.UUID) error } diff --git a/internal/worker/repositories/queries.sql.go b/internal/worker/repositories/queries.sql.go index 5ab0fdc..981aec4 100644 --- a/internal/worker/repositories/queries.sql.go +++ b/internal/worker/repositories/queries.sql.go @@ -11,6 +11,39 @@ import ( "github.com/google/uuid" ) +const getWorkers = `-- name: GetWorkers :many +SELECT + worker_id + , capacity +FROM + worker_register +` + +type GetWorkersRow struct { + WorkerID uuid.UUID `json:"worker_id"` + Capacity int32 `json:"capacity"` +} + +func (q *Queries) GetWorkers(ctx context.Context) ([]*GetWorkersRow, error) { + rows, err := q.db.Query(ctx, getWorkers) + if err != nil { + return nil, err + } + defer rows.Close() + items := []*GetWorkersRow{} + for rows.Next() { + var i GetWorkersRow + if err := rows.Scan(&i.WorkerID, &i.Capacity); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const ping = `-- name: Ping :one SELECT 1 ` @@ -23,14 +56,20 @@ func (q *Queries) Ping(ctx context.Context) (int32, error) { } const registerWorker = `-- name: RegisterWorker :exec -INSERT INTO worker_register (worker_id) +INSERT INTO worker_register (worker_id, capacity) VALUES ( - $1 + $1 + , $2 ) ` -func (q *Queries) RegisterWorker(ctx context.Context, workerID uuid.UUID) error { - _, err := q.db.Exec(ctx, registerWorker, workerID) +type RegisterWorkerParams struct { + WorkerID uuid.UUID `json:"worker_id"` + Capacity int32 `json:"capacity"` +} + +func (q *Queries) RegisterWorker(ctx context.Context, arg *RegisterWorkerParams) error { + _, err := q.db.Exec(ctx, registerWorker, arg.WorkerID, arg.Capacity) return err } diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 3b57370..77a1906 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -11,36 +11,94 @@ import ( "github.com/jackc/pgx/v5/pgxpool" ) +type workProcessor interface { + ProcessNext(ctx context.Context, worker_id uuid.UUID) error +} + //go:generate sqlc generate type Worker struct { workerID uuid.UUID - db *pgxpool.Pool - logger *slog.Logger + db *pgxpool.Pool + workProcessor workProcessor + logger *slog.Logger + + capacity uint } func NewWorker( db *pgxpool.Pool, logger *slog.Logger, + workProcessor workProcessor, ) *Worker { return &Worker{ - workerID: uuid.New(), - db: db, - logger: logger, + workerID: uuid.New(), + db: db, + workProcessor: workProcessor, + logger: logger, + + capacity: 50, } } func (w *Worker) Setup(ctx context.Context) error { repo := repositories.New(w.db) - if err := repo.RegisterWorker(ctx, w.workerID); err != nil { + w.logger.InfoContext(ctx, "setting up worker", "worker_id", w.workerID) + if err := repo.RegisterWorker( + ctx, + &repositories.RegisterWorkerParams{ + WorkerID: w.workerID, + Capacity: int32(w.capacity), + }, + ); err != nil { return nil } return nil } +type Workers struct { + Instances []WorkerInstance +} + +func (w *Workers) Capacity() uint { + capacity := uint(0) + + for _, worker := range w.Instances { + capacity += worker.Capacity + } + + return capacity +} + +type WorkerInstance struct { + WorkerID uuid.UUID + Capacity uint +} + +func (w *Worker) GetWorkers(ctx context.Context) (*Workers, error) { + repo := repositories.New(w.db) + + dbInstances, err := repo.GetWorkers(ctx) + if err != nil { + return nil, fmt.Errorf("failed to find workers: %w", err) + } + + instances := make([]WorkerInstance, 0, len(dbInstances)) + for _, dbInstance := range dbInstances { + instances = append(instances, WorkerInstance{ + WorkerID: dbInstance.WorkerID, + Capacity: uint(dbInstance.Capacity), + }) + } + + return &Workers{ + Instances: instances, + }, nil +} + func (w *Worker) Start(ctx context.Context) error { heartBeatCtx, heartBeatCancel := context.WithCancel(context.Background()) go func() { @@ -69,10 +127,15 @@ func (w *Worker) Start(ctx context.Context) error { }() for { - if err := w.processWorkQueue(ctx); err != nil { - // FIXME: dead letter item, right now we just log and continue + select { + case <-ctx.Done(): + return nil + default: + if err := w.processWorkQueue(ctx); err != nil { + // FIXME: dead letter item, right now we just log and continue - w.logger.WarnContext(ctx, "failed to handle work item", "error", err) + w.logger.WarnContext(ctx, "failed to handle work item", "error", err) + } } } } @@ -84,8 +147,6 @@ func (w *Worker) updateHeartBeat(ctx context.Context) error { return repo.UpdateWorkerHeartbeat(ctx, w.workerID) } -func (w *Worker) processWorkQueue(_ context.Context) error { - time.Sleep(time.Second) - - return nil +func (w *Worker) processWorkQueue(ctx context.Context) error { + return w.workProcessor.ProcessNext(ctx, w.workerID) } diff --git a/internal/workprocessor/workprocessor.go b/internal/workprocessor/workprocessor.go new file mode 100644 index 0000000..55741ce --- /dev/null +++ b/internal/workprocessor/workprocessor.go @@ -0,0 +1,48 @@ +package workprocessor + +import ( + "context" + "fmt" + "log/slog" + "time" + + "git.front.kjuulh.io/kjuulh/orbis/internal/workscheduler" + "github.com/google/uuid" +) + +type WorkProcessor struct { + workscheduler *workscheduler.WorkScheduler + logger *slog.Logger +} + +func NewWorkProcessor(workscheduler *workscheduler.WorkScheduler, logger *slog.Logger) *WorkProcessor { + return &WorkProcessor{ + workscheduler: workscheduler, + logger: logger, + } +} + +func (w *WorkProcessor) ProcessNext(ctx context.Context, workerID uuid.UUID) error { + schedule, err := w.workscheduler.GetNext(ctx, workerID) + if err != nil { + return fmt.Errorf("failed to get next work item: %w", err) + } + + if schedule == nil { + // TODO: defer somewhere else + time.Sleep(time.Second) + return nil + } + + w.logger.DebugContext(ctx, "handling item", "schedule", schedule) + + if err := w.workscheduler.StartProcessing(ctx, *schedule); err != nil { + return fmt.Errorf("failed to start processing items: %w", err) + } + + if err := w.workscheduler.Archive(ctx, *schedule); err != nil { + return fmt.Errorf("failed to archive item: %w", err) + } + + return nil +} diff --git a/internal/workscheduler/queries.sql b/internal/workscheduler/queries.sql new file mode 100644 index 0000000..fd5ec25 --- /dev/null +++ b/internal/workscheduler/queries.sql @@ -0,0 +1,56 @@ +-- name: Ping :one +SELECT 1; + +-- name: GetCurrentQueueSize :one +SELECT + COUNT(*) current_queue_size +FROM + work_schedule +WHERE + worker_id = $1 + AND state <> 'archived'; + +-- name: InsertQueueItem :exec +INSERT INTO work_schedule + ( + schedule_id + , worker_id + , start_run + , end_run + , state + ) +VALUES + ( + $1 + , $2 + , $3 + , $4 + , 'pending' + ); + + +-- name: GetNext :one +SELECT + * +FROM + work_schedule +WHERE + worker_id = $1 + AND state = 'pending' +ORDER BY updated_at DESC +LIMIT 1; + +-- name: StartProcessing :exec +UPDATE work_schedule +SET + state = 'processing' +WHERE + schedule_id = $1; + +-- name: Archive :exec +UPDATE work_schedule +SET + state = 'archived' +WHERE + schedule_id = $1; + diff --git a/internal/workscheduler/repositories/db.go b/internal/workscheduler/repositories/db.go new file mode 100644 index 0000000..e79f6b4 --- /dev/null +++ b/internal/workscheduler/repositories/db.go @@ -0,0 +1,32 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.23.0 + +package repositories + +import ( + "context" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +type DBTX interface { + Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) + Query(context.Context, string, ...interface{}) (pgx.Rows, error) + QueryRow(context.Context, string, ...interface{}) pgx.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx pgx.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/internal/workscheduler/repositories/models.go b/internal/workscheduler/repositories/models.go new file mode 100644 index 0000000..fcc9201 --- /dev/null +++ b/internal/workscheduler/repositories/models.go @@ -0,0 +1,30 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.23.0 + +package repositories + +import ( + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgtype" +) + +type ModelSchedule struct { + ModelName string `json:"model_name"` + LastRun pgtype.Timestamptz `json:"last_run"` +} + +type WorkSchedule struct { + ScheduleID uuid.UUID `json:"schedule_id"` + WorkerID uuid.UUID `json:"worker_id"` + StartRun pgtype.Timestamptz `json:"start_run"` + EndRun pgtype.Timestamptz `json:"end_run"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` + State string `json:"state"` +} + +type WorkerRegister struct { + WorkerID uuid.UUID `json:"worker_id"` + Capacity int32 `json:"capacity"` + HeartBeat pgtype.Timestamptz `json:"heart_beat"` +} diff --git a/internal/workscheduler/repositories/querier.go b/internal/workscheduler/repositories/querier.go new file mode 100644 index 0000000..f5c000e --- /dev/null +++ b/internal/workscheduler/repositories/querier.go @@ -0,0 +1,22 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.23.0 + +package repositories + +import ( + "context" + + "github.com/google/uuid" +) + +type Querier interface { + Archive(ctx context.Context, scheduleID uuid.UUID) error + GetCurrentQueueSize(ctx context.Context, workerID uuid.UUID) (int64, error) + GetNext(ctx context.Context, workerID uuid.UUID) (*WorkSchedule, error) + InsertQueueItem(ctx context.Context, arg *InsertQueueItemParams) error + Ping(ctx context.Context) (int32, error) + StartProcessing(ctx context.Context, scheduleID uuid.UUID) error +} + +var _ Querier = (*Queries)(nil) diff --git a/internal/workscheduler/repositories/queries.sql.go b/internal/workscheduler/repositories/queries.sql.go new file mode 100644 index 0000000..2114868 --- /dev/null +++ b/internal/workscheduler/repositories/queries.sql.go @@ -0,0 +1,129 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.23.0 +// source: queries.sql + +package repositories + +import ( + "context" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgtype" +) + +const archive = `-- name: Archive :exec +UPDATE work_schedule +SET + state = 'archived' +WHERE + schedule_id = $1 +` + +func (q *Queries) Archive(ctx context.Context, scheduleID uuid.UUID) error { + _, err := q.db.Exec(ctx, archive, scheduleID) + return err +} + +const getCurrentQueueSize = `-- name: GetCurrentQueueSize :one +SELECT + COUNT(*) current_queue_size +FROM + work_schedule +WHERE + worker_id = $1 + AND state <> 'archived' +` + +func (q *Queries) GetCurrentQueueSize(ctx context.Context, workerID uuid.UUID) (int64, error) { + row := q.db.QueryRow(ctx, getCurrentQueueSize, workerID) + var current_queue_size int64 + err := row.Scan(¤t_queue_size) + return current_queue_size, err +} + +const getNext = `-- name: GetNext :one +SELECT + schedule_id, worker_id, start_run, end_run, updated_at, state +FROM + work_schedule +WHERE + worker_id = $1 + AND state = 'pending' +ORDER BY updated_at DESC +LIMIT 1 +` + +func (q *Queries) GetNext(ctx context.Context, workerID uuid.UUID) (*WorkSchedule, error) { + row := q.db.QueryRow(ctx, getNext, workerID) + var i WorkSchedule + err := row.Scan( + &i.ScheduleID, + &i.WorkerID, + &i.StartRun, + &i.EndRun, + &i.UpdatedAt, + &i.State, + ) + return &i, err +} + +const insertQueueItem = `-- name: InsertQueueItem :exec +INSERT INTO work_schedule + ( + schedule_id + , worker_id + , start_run + , end_run + , state + ) +VALUES + ( + $1 + , $2 + , $3 + , $4 + , 'pending' + ) +` + +type InsertQueueItemParams struct { + ScheduleID uuid.UUID `json:"schedule_id"` + WorkerID uuid.UUID `json:"worker_id"` + StartRun pgtype.Timestamptz `json:"start_run"` + EndRun pgtype.Timestamptz `json:"end_run"` +} + +func (q *Queries) InsertQueueItem(ctx context.Context, arg *InsertQueueItemParams) error { + _, err := q.db.Exec(ctx, insertQueueItem, + arg.ScheduleID, + arg.WorkerID, + arg.StartRun, + arg.EndRun, + ) + return err +} + +const ping = `-- name: Ping :one +SELECT 1 +` + +func (q *Queries) Ping(ctx context.Context) (int32, error) { + row := q.db.QueryRow(ctx, ping) + var column_1 int32 + err := row.Scan(&column_1) + return column_1, err +} + +const startProcessing = `-- name: StartProcessing :exec +UPDATE work_schedule +SET + state = 'processing' +WHERE + schedule_id = $1 +` + +func (q *Queries) StartProcessing(ctx context.Context, scheduleID uuid.UUID) error { + _, err := q.db.Exec(ctx, startProcessing, scheduleID) + return err +} diff --git a/internal/workscheduler/sqlc.yaml b/internal/workscheduler/sqlc.yaml new file mode 100644 index 0000000..30fdb94 --- /dev/null +++ b/internal/workscheduler/sqlc.yaml @@ -0,0 +1,21 @@ +version: "2" +sql: + - queries: queries.sql + schema: ../persistence/migrations/ + engine: "postgresql" + gen: + go: + out: "repositories" + package: "repositories" + sql_package: "pgx/v5" + emit_json_tags: true + emit_prepared_queries: true + emit_interface: true + emit_empty_slices: true + emit_result_struct_pointers: true + emit_params_struct_pointers: true + overrides: + - db_type: "uuid" + go_type: + import: "github.com/google/uuid" + type: "UUID" diff --git a/internal/workscheduler/workscheduler.go b/internal/workscheduler/workscheduler.go new file mode 100644 index 0000000..83dbff3 --- /dev/null +++ b/internal/workscheduler/workscheduler.go @@ -0,0 +1,179 @@ +package workscheduler + +import ( + "context" + "errors" + "fmt" + "log/slog" + + "git.front.kjuulh.io/kjuulh/orbis/internal/modelschedule" + "git.front.kjuulh.io/kjuulh/orbis/internal/worker" + "git.front.kjuulh.io/kjuulh/orbis/internal/workscheduler/repositories" + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/jackc/pgx/v5/pgxpool" +) + +//go:generate sqlc generate + +type WorkScheduler struct { + db *pgxpool.Pool + logger *slog.Logger +} + +func NewWorkScheduler( + db *pgxpool.Pool, + logger *slog.Logger, +) *WorkScheduler { + return &WorkScheduler{ + db: db, + logger: logger, + } +} + +type Worker struct { + Instance worker.WorkerInstance + RemainingCapacity uint +} + +type Workers struct { + Workers []*Worker +} + +func (w *Workers) IterateSlice(size uint) func(yield func([]Worker) bool) { + return func(yield func([]Worker) bool) { + if len(w.Workers) == 0 { + return + } + + workers := make([]Worker, 0) + acc := uint(0) + + for { + exit := true + + for _, worker := range w.Workers { + if acc == size { + if !yield(workers) { + return + } + workers = make([]Worker, 0) + acc = uint(0) + + } + + if worker.RemainingCapacity <= 0 { + continue + } + + worker.RemainingCapacity-- + workers = append(workers, *worker) + acc++ + + exit = false + } + + if exit { + if len(workers) > 0 { + if !yield(workers) { + return + } + } + + return + } + } + } +} + +func (w *WorkScheduler) GetWorkers(ctx context.Context, registeredWorkers *worker.Workers) (*Workers, error) { + + w.logger.DebugContext(ctx, "found workers", "workers", len(registeredWorkers.Instances)) + + workers := make([]*Worker, 0, len(registeredWorkers.Instances)) + for _, registeredWorker := range registeredWorkers.Instances { + remainingCapacity, err := w.GetWorker(ctx, ®isteredWorker) + if err != nil { + return nil, fmt.Errorf("failed to find capacity for worker: %w", err) + } + + if remainingCapacity == 0 { + w.logger.DebugContext(ctx, "skipping worker as no remaining capacity") + continue + } + + workers = append(workers, &Worker{ + Instance: registeredWorker, + RemainingCapacity: remainingCapacity, + }) + } + + return &Workers{Workers: workers}, nil +} + +func (w *WorkScheduler) GetWorker( + ctx context.Context, + worker *worker.WorkerInstance, +) (uint, error) { + repo := repositories.New(w.db) + + current_size, err := repo.GetCurrentQueueSize(ctx, worker.WorkerID) + if err != nil { + return 0, fmt.Errorf("failed to get current queue size: %s: %w", worker.WorkerID, err) + } + + if int64(worker.Capacity)-current_size <= 0 { + return 0, nil + } + + return worker.Capacity - uint(current_size), nil +} + +func (w *WorkScheduler) InsertModelRun( + ctx context.Context, + worker Worker, + modelRun *modelschedule.ModelRunSchedule, +) error { + repo := repositories.New(w.db) + + return repo.InsertQueueItem(ctx, &repositories.InsertQueueItemParams{ + ScheduleID: uuid.New(), + WorkerID: worker.Instance.WorkerID, + StartRun: pgtype.Timestamptz{ + Time: modelRun.Start, + Valid: true, + }, + EndRun: pgtype.Timestamptz{ + Time: modelRun.End, + Valid: true, + }, + }) +} + +func (w *WorkScheduler) GetNext(ctx context.Context, workerID uuid.UUID) (*uuid.UUID, error) { + repo := repositories.New(w.db) + + schedule, err := repo.GetNext(ctx, workerID) + if err != nil { + if !errors.Is(err, pgx.ErrNoRows) { + return nil, fmt.Errorf("failed to get next worker item: %w", err) + } + + return nil, nil + } + + return &schedule.ScheduleID, nil +} + +func (w *WorkScheduler) StartProcessing(ctx context.Context, scheduleID uuid.UUID) error { + repo := repositories.New(w.db) + + return repo.StartProcessing(ctx, scheduleID) +} + +func (w *WorkScheduler) Archive(ctx context.Context, scheduleID uuid.UUID) error { + repo := repositories.New(w.db) + + return repo.Archive(ctx, scheduleID) +}