From e94271d5e25dbb19eb948e3e81d46b436b7f258b Mon Sep 17 00:00:00 2001 From: kjuulh Date: Fri, 17 Jan 2025 20:51:50 +0100 Subject: [PATCH] feat: enable worker process --- .sqlfluff | 5 + cmd/orbis/root.go | 13 +- go.mod | 2 +- internal/app/app.go | 5 + internal/app/postgres.go | 4 +- internal/persistence/connection.go | 6 +- .../migrations/001_create_initial.up.sql | 5 +- internal/processes/processes.go | 163 ++++++++++++++++++ internal/scheduler/scheduler.go | 13 +- internal/worker/queries.sql | 15 ++ internal/worker/repositories/db.go | 32 ++++ internal/worker/repositories/models.go | 15 ++ internal/worker/repositories/querier.go | 19 ++ internal/worker/repositories/queries.sql.go | 48 ++++++ internal/worker/sqlc.yaml | 7 +- internal/worker/worker.go | 71 +++++++- 16 files changed, 404 insertions(+), 19 deletions(-) create mode 100644 .sqlfluff create mode 100644 internal/processes/processes.go create mode 100644 internal/worker/queries.sql create mode 100644 internal/worker/repositories/db.go create mode 100644 internal/worker/repositories/models.go create mode 100644 internal/worker/repositories/querier.go create mode 100644 internal/worker/repositories/queries.sql.go diff --git a/.sqlfluff b/.sqlfluff new file mode 100644 index 0000000..aa8fc0c --- /dev/null +++ b/.sqlfluff @@ -0,0 +1,5 @@ +[sqlfluff] +dialect = postgres ; or whatever SQL dialect you use + +[sqlfluff:layout:type:comma] +line_position = leading diff --git a/cmd/orbis/root.go b/cmd/orbis/root.go index 4fa72ee..d517154 100644 --- a/cmd/orbis/root.go +++ b/cmd/orbis/root.go @@ -1,9 +1,8 @@ package main import ( - "fmt" - "git.front.kjuulh.io/kjuulh/orbis/internal/app" + "git.front.kjuulh.io/kjuulh/orbis/internal/processes" "github.com/spf13/cobra" ) @@ -18,11 +17,11 @@ func newRoot(app *app.App) *cobra.Command { ctx := cmd.Context() logger.Info("starting orbis") - if err := app.Scheduler().Execute(ctx); err != nil { - return fmt.Errorf("scheduler failed with error: %w", err) - } - - return nil + return processes. + NewApp(logger). + Add(app.Scheduler()). + Add(app.Worker()). + Execute(ctx) }, } diff --git a/go.mod b/go.mod index 8a59ef3..1c5301a 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/joho/godotenv v1.5.1 github.com/spf13/cobra v1.8.1 gitlab.com/greyxor/slogor v1.6.1 + golang.org/x/sync v0.10.0 ) require ( @@ -22,7 +23,6 @@ require ( github.com/spf13/pflag v1.0.5 // indirect go.uber.org/atomic v1.7.0 // indirect golang.org/x/crypto v0.31.0 // indirect - golang.org/x/sync v0.10.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 // indirect ) diff --git a/internal/app/app.go b/internal/app/app.go index 1e0b8ce..09cf844 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -5,6 +5,7 @@ import ( "git.front.kjuulh.io/kjuulh/orbis/internal/executor" "git.front.kjuulh.io/kjuulh/orbis/internal/scheduler" + "git.front.kjuulh.io/kjuulh/orbis/internal/worker" ) type App struct { @@ -28,3 +29,7 @@ func (a *App) Scheduler() *scheduler.Scheduler { func (a *App) Executor() *executor.Executor { return executor.NewExecutor(a.logger.With("component", "executor")) } + +func (a *App) Worker() *worker.Worker { + return worker.NewWorker(Postgres(), a.logger) +} diff --git a/internal/app/postgres.go b/internal/app/postgres.go index 05d7a5d..ae9b2c4 100644 --- a/internal/app/postgres.go +++ b/internal/app/postgres.go @@ -5,10 +5,10 @@ import ( "git.front.kjuulh.io/kjuulh/orbis/internal/persistence" "git.front.kjuulh.io/kjuulh/orbis/internal/utilities" - "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" ) -var Postgres = utilities.Singleton(func() (*pgx.Conn, error) { +var Postgres = utilities.Singleton(func() (*pgxpool.Pool, error) { if err := persistence.Migrate(); err != nil { return nil, fmt.Errorf("failed to migrate database: %w", err) } diff --git a/internal/persistence/connection.go b/internal/persistence/connection.go index 0c60be6..237942b 100644 --- a/internal/persistence/connection.go +++ b/internal/persistence/connection.go @@ -6,14 +6,14 @@ import ( "os" "time" - "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" ) -func NewConnection() (*pgx.Conn, error) { +func NewConnection() (*pgxpool.Pool, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - conn, err := pgx.Connect(ctx, os.Getenv("ORBIS_POSTGRES_DB")) + conn, err := pgxpool.New(ctx, os.Getenv("ORBIS_POSTGRES_DB")) if err != nil { return nil, fmt.Errorf("failed to connect to orbis postgres database: %w", err) } diff --git a/internal/persistence/migrations/001_create_initial.up.sql b/internal/persistence/migrations/001_create_initial.up.sql index e88a9ca..37faf84 100644 --- a/internal/persistence/migrations/001_create_initial.up.sql +++ b/internal/persistence/migrations/001_create_initial.up.sql @@ -1,3 +1,4 @@ -CREATE TABLE test_table ( - id UUID primary key not null +CREATE TABLE worker_register ( + worker_id UUID PRIMARY KEY NOT NULL + , heart_beat TIMESTAMPTZ NOT NULL DEFAULT now() ); diff --git a/internal/processes/processes.go b/internal/processes/processes.go new file mode 100644 index 0000000..37512c6 --- /dev/null +++ b/internal/processes/processes.go @@ -0,0 +1,163 @@ +package processes + +import ( + "context" + "log/slog" + "time" + + "golang.org/x/sync/errgroup" +) + +type Process interface { + Start(ctx context.Context) error +} + +type SetupProcesser interface { + Setup(ctx context.Context) error +} + +type CloseProcesser interface { + Close(ctx context.Context) error +} + +type App struct { + logger *slog.Logger + processes []Process +} + +func NewApp(logger *slog.Logger) *App { + return &App{ + logger: logger, + processes: make([]Process, 0), + } +} + +func (a *App) Add(p Process) *App { + a.processes = append(a.processes, p) + + return a +} + +func (a *App) Execute(ctx context.Context) error { + a.logger.InfoContext(ctx, "starting processor") + if err := a.setupProcesses(ctx); err != nil { + return err + } + + processes, err := a.startProcesses(ctx) + if err != nil { + return nil + } + + processErr := processes.wait(ctx) + + if err := a.closeProcesses(ctx, processes); err != nil { + if processErr != nil { + return processErr + } + + return err + } + + if processErr != nil { + return processErr + } + + return nil +} + +func (a *App) closeProcesses(ctx context.Context, processes *processStatus) error { + waitClose, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + closeErrs := make(chan error) + + go func() { + errgrp, ctx := errgroup.WithContext(waitClose) + for _, closeProcessor := range a.processes { + if close, ok := closeProcessor.(CloseProcesser); ok { + errgrp.Go(func() error { + a.logger.InfoContext(ctx, "closing processor") + return close.Close(ctx) + }) + + } + } + + closeErrs <- errgrp.Wait() + }() + + for _, closeHandle := range processes.processHandles { + closeHandle() + } + + select { + case <-waitClose.Done(): + return nil + case <-closeErrs: + return nil + case _, closed := <-processes.errs: + if closed { + return nil + } + } + + return nil +} + +type processStatus struct { + errs chan error + processHandles []context.CancelFunc +} + +func (p *processStatus) wait(_ context.Context) error { + return <-p.errs +} + +func (a *App) startProcesses(ctx context.Context) (*processStatus, any) { + status := &processStatus{ + errs: make(chan error, len(a.processes)), + processHandles: make([]context.CancelFunc, 0), + } + + for _, process := range a.processes { + processCtx, cancelFunc := context.WithCancel(ctx) + + status.processHandles = append(status.processHandles, cancelFunc) + + go func(ctx context.Context, process Process) { + a.logger.DebugContext(ctx, "starting process") + + err := process.Start(ctx) + + if err != nil { + a.logger.WarnContext(ctx, "process finished with error", "error", err) + + } else { + a.logger.DebugContext(ctx, "process finished gracefully") + + } + + status.errs <- err + }(processCtx, process) + } + + return status, nil +} + +func (a *App) setupProcesses(ctx context.Context) error { + ctxWithDeadline, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + + errgrp, ctx := errgroup.WithContext(ctxWithDeadline) + for _, setupProcessor := range a.processes { + if setup, ok := setupProcessor.(SetupProcesser); ok { + errgrp.Go(func() error { + a.logger.InfoContext(ctx, "setting up processor") + return setup.Setup(ctx) + }) + } + } + + return errgrp.Wait() +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 49e3d0c..6f820f0 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -10,15 +10,16 @@ import ( "git.front.kjuulh.io/kjuulh/orbis/internal/executor" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" ) type Scheduler struct { logger *slog.Logger - db *pgx.Conn + db *pgxpool.Pool executor *executor.Executor } -func NewScheduler(logger *slog.Logger, db *pgx.Conn, executor *executor.Executor) *Scheduler { +func NewScheduler(logger *slog.Logger, db *pgxpool.Pool, executor *executor.Executor) *Scheduler { return &Scheduler{ logger: logger, db: db, @@ -26,6 +27,14 @@ func NewScheduler(logger *slog.Logger, db *pgx.Conn, executor *executor.Executor } } +func (s *Scheduler) Start(ctx context.Context) error { + if err := s.Execute(ctx); err != nil { + return fmt.Errorf("execution of scheduler failed: %w", err) + } + + return nil +} + func (s *Scheduler) Execute(ctx context.Context) error { acquiredLeader, err := s.acquireLeader(ctx) if err != nil { diff --git a/internal/worker/queries.sql b/internal/worker/queries.sql new file mode 100644 index 0000000..87a0791 --- /dev/null +++ b/internal/worker/queries.sql @@ -0,0 +1,15 @@ +-- name: Ping :one +SELECT 1; + +-- name: RegisterWorker :exec +INSERT INTO worker_register (worker_id) +VALUES ( + $1 +); + +-- name: UpdateWorkerHeartbeat :exec +UPDATE worker_register +SET + heart_beat = now() +WHERE + worker_id = $1; diff --git a/internal/worker/repositories/db.go b/internal/worker/repositories/db.go new file mode 100644 index 0000000..e79f6b4 --- /dev/null +++ b/internal/worker/repositories/db.go @@ -0,0 +1,32 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.23.0 + +package repositories + +import ( + "context" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +type DBTX interface { + Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) + Query(context.Context, string, ...interface{}) (pgx.Rows, error) + QueryRow(context.Context, string, ...interface{}) pgx.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx pgx.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/internal/worker/repositories/models.go b/internal/worker/repositories/models.go new file mode 100644 index 0000000..5529db0 --- /dev/null +++ b/internal/worker/repositories/models.go @@ -0,0 +1,15 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.23.0 + +package repositories + +import ( + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgtype" +) + +type WorkerRegister struct { + WorkerID uuid.UUID `json:"worker_id"` + HeartBeat pgtype.Timestamptz `json:"heart_beat"` +} diff --git a/internal/worker/repositories/querier.go b/internal/worker/repositories/querier.go new file mode 100644 index 0000000..4978f72 --- /dev/null +++ b/internal/worker/repositories/querier.go @@ -0,0 +1,19 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.23.0 + +package repositories + +import ( + "context" + + "github.com/google/uuid" +) + +type Querier interface { + Ping(ctx context.Context) (int32, error) + RegisterWorker(ctx context.Context, workerID uuid.UUID) error + UpdateWorkerHeartbeat(ctx context.Context, workerID uuid.UUID) error +} + +var _ Querier = (*Queries)(nil) diff --git a/internal/worker/repositories/queries.sql.go b/internal/worker/repositories/queries.sql.go new file mode 100644 index 0000000..5ab0fdc --- /dev/null +++ b/internal/worker/repositories/queries.sql.go @@ -0,0 +1,48 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.23.0 +// source: queries.sql + +package repositories + +import ( + "context" + + "github.com/google/uuid" +) + +const ping = `-- name: Ping :one +SELECT 1 +` + +func (q *Queries) Ping(ctx context.Context) (int32, error) { + row := q.db.QueryRow(ctx, ping) + var column_1 int32 + err := row.Scan(&column_1) + return column_1, err +} + +const registerWorker = `-- name: RegisterWorker :exec +INSERT INTO worker_register (worker_id) +VALUES ( + $1 +) +` + +func (q *Queries) RegisterWorker(ctx context.Context, workerID uuid.UUID) error { + _, err := q.db.Exec(ctx, registerWorker, workerID) + return err +} + +const updateWorkerHeartbeat = `-- name: UpdateWorkerHeartbeat :exec +UPDATE worker_register +SET + heart_beat = now() +WHERE + worker_id = $1 +` + +func (q *Queries) UpdateWorkerHeartbeat(ctx context.Context, workerID uuid.UUID) error { + _, err := q.db.Exec(ctx, updateWorkerHeartbeat, workerID) + return err +} diff --git a/internal/worker/sqlc.yaml b/internal/worker/sqlc.yaml index e4c3629..30fdb94 100644 --- a/internal/worker/sqlc.yaml +++ b/internal/worker/sqlc.yaml @@ -7,10 +7,15 @@ sql: go: out: "repositories" package: "repositories" - sql_package: "sql/db" + sql_package: "pgx/v5" emit_json_tags: true emit_prepared_queries: true emit_interface: true emit_empty_slices: true emit_result_struct_pointers: true emit_params_struct_pointers: true + overrides: + - db_type: "uuid" + go_type: + import: "github.com/google/uuid" + type: "UUID" diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 07363b9..3b57370 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -2,21 +2,90 @@ package worker import ( "context" + "fmt" + "log/slog" + "time" + "git.front.kjuulh.io/kjuulh/orbis/internal/worker/repositories" "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgxpool" ) +//go:generate sqlc generate + type Worker struct { workerID uuid.UUID + + db *pgxpool.Pool + logger *slog.Logger } -func NewWorker() *Worker { +func NewWorker( + db *pgxpool.Pool, + logger *slog.Logger, +) *Worker { return &Worker{ workerID: uuid.New(), + db: db, + logger: logger, } } func (w *Worker) Setup(ctx context.Context) error { + repo := repositories.New(w.db) + + if err := repo.RegisterWorker(ctx, w.workerID); err != nil { + return nil + } + + return nil +} + +func (w *Worker) Start(ctx context.Context) error { + heartBeatCtx, heartBeatCancel := context.WithCancel(context.Background()) + go func() { + ticker := time.NewTicker(time.Second * 5) + errorCount := 0 + + for { + select { + case <-heartBeatCtx.Done(): + return + case <-ticker.C: + if err := w.updateHeartBeat(heartBeatCtx); err != nil { + if errorCount >= 5 { + panic(fmt.Errorf("worker failed to register heartbeat for a long time, panicing..., err: %w", err)) + } + errorCount += 1 + } else { + errorCount = 0 + } + } + } + }() + + defer func() { + heartBeatCancel() + }() + + for { + if err := w.processWorkQueue(ctx); err != nil { + // FIXME: dead letter item, right now we just log and continue + + w.logger.WarnContext(ctx, "failed to handle work item", "error", err) + } + } +} + +func (w *Worker) updateHeartBeat(ctx context.Context) error { + repo := repositories.New(w.db) + + w.logger.DebugContext(ctx, "updating heartbeat", "time", time.Now()) + return repo.UpdateWorkerHeartbeat(ctx, w.workerID) +} + +func (w *Worker) processWorkQueue(_ context.Context) error { + time.Sleep(time.Second) return nil }