feat: enable worker process
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
a68eae8519
commit
e94271d5e2
5
.sqlfluff
Normal file
5
.sqlfluff
Normal file
@ -0,0 +1,5 @@
|
||||
[sqlfluff]
|
||||
dialect = postgres ; or whatever SQL dialect you use
|
||||
|
||||
[sqlfluff:layout:type:comma]
|
||||
line_position = leading
|
@ -1,9 +1,8 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"git.front.kjuulh.io/kjuulh/orbis/internal/app"
|
||||
"git.front.kjuulh.io/kjuulh/orbis/internal/processes"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
@ -18,11 +17,11 @@ func newRoot(app *app.App) *cobra.Command {
|
||||
ctx := cmd.Context()
|
||||
logger.Info("starting orbis")
|
||||
|
||||
if err := app.Scheduler().Execute(ctx); err != nil {
|
||||
return fmt.Errorf("scheduler failed with error: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return processes.
|
||||
NewApp(logger).
|
||||
Add(app.Scheduler()).
|
||||
Add(app.Worker()).
|
||||
Execute(ctx)
|
||||
},
|
||||
}
|
||||
|
||||
|
2
go.mod
2
go.mod
@ -9,6 +9,7 @@ require (
|
||||
github.com/joho/godotenv v1.5.1
|
||||
github.com/spf13/cobra v1.8.1
|
||||
gitlab.com/greyxor/slogor v1.6.1
|
||||
golang.org/x/sync v0.10.0
|
||||
)
|
||||
|
||||
require (
|
||||
@ -22,7 +23,6 @@ require (
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
go.uber.org/atomic v1.7.0 // indirect
|
||||
golang.org/x/crypto v0.31.0 // indirect
|
||||
golang.org/x/sync v0.10.0 // indirect
|
||||
golang.org/x/sys v0.29.0 // indirect
|
||||
golang.org/x/text v0.21.0 // indirect
|
||||
)
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
|
||||
"git.front.kjuulh.io/kjuulh/orbis/internal/executor"
|
||||
"git.front.kjuulh.io/kjuulh/orbis/internal/scheduler"
|
||||
"git.front.kjuulh.io/kjuulh/orbis/internal/worker"
|
||||
)
|
||||
|
||||
type App struct {
|
||||
@ -28,3 +29,7 @@ func (a *App) Scheduler() *scheduler.Scheduler {
|
||||
func (a *App) Executor() *executor.Executor {
|
||||
return executor.NewExecutor(a.logger.With("component", "executor"))
|
||||
}
|
||||
|
||||
func (a *App) Worker() *worker.Worker {
|
||||
return worker.NewWorker(Postgres(), a.logger)
|
||||
}
|
||||
|
@ -5,10 +5,10 @@ import (
|
||||
|
||||
"git.front.kjuulh.io/kjuulh/orbis/internal/persistence"
|
||||
"git.front.kjuulh.io/kjuulh/orbis/internal/utilities"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
var Postgres = utilities.Singleton(func() (*pgx.Conn, error) {
|
||||
var Postgres = utilities.Singleton(func() (*pgxpool.Pool, error) {
|
||||
if err := persistence.Migrate(); err != nil {
|
||||
return nil, fmt.Errorf("failed to migrate database: %w", err)
|
||||
}
|
||||
|
@ -6,14 +6,14 @@ import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
func NewConnection() (*pgx.Conn, error) {
|
||||
func NewConnection() (*pgxpool.Pool, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
|
||||
conn, err := pgx.Connect(ctx, os.Getenv("ORBIS_POSTGRES_DB"))
|
||||
conn, err := pgxpool.New(ctx, os.Getenv("ORBIS_POSTGRES_DB"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to orbis postgres database: %w", err)
|
||||
}
|
||||
|
@ -1,3 +1,4 @@
|
||||
CREATE TABLE test_table (
|
||||
id UUID primary key not null
|
||||
CREATE TABLE worker_register (
|
||||
worker_id UUID PRIMARY KEY NOT NULL
|
||||
, heart_beat TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
163
internal/processes/processes.go
Normal file
163
internal/processes/processes.go
Normal file
@ -0,0 +1,163 @@
|
||||
package processes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type Process interface {
|
||||
Start(ctx context.Context) error
|
||||
}
|
||||
|
||||
type SetupProcesser interface {
|
||||
Setup(ctx context.Context) error
|
||||
}
|
||||
|
||||
type CloseProcesser interface {
|
||||
Close(ctx context.Context) error
|
||||
}
|
||||
|
||||
type App struct {
|
||||
logger *slog.Logger
|
||||
processes []Process
|
||||
}
|
||||
|
||||
func NewApp(logger *slog.Logger) *App {
|
||||
return &App{
|
||||
logger: logger,
|
||||
processes: make([]Process, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) Add(p Process) *App {
|
||||
a.processes = append(a.processes, p)
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
func (a *App) Execute(ctx context.Context) error {
|
||||
a.logger.InfoContext(ctx, "starting processor")
|
||||
if err := a.setupProcesses(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
processes, err := a.startProcesses(ctx)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
processErr := processes.wait(ctx)
|
||||
|
||||
if err := a.closeProcesses(ctx, processes); err != nil {
|
||||
if processErr != nil {
|
||||
return processErr
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if processErr != nil {
|
||||
return processErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *App) closeProcesses(ctx context.Context, processes *processStatus) error {
|
||||
waitClose, cancel := context.WithTimeout(ctx, time.Second*10)
|
||||
defer cancel()
|
||||
|
||||
closeErrs := make(chan error)
|
||||
|
||||
go func() {
|
||||
errgrp, ctx := errgroup.WithContext(waitClose)
|
||||
for _, closeProcessor := range a.processes {
|
||||
if close, ok := closeProcessor.(CloseProcesser); ok {
|
||||
errgrp.Go(func() error {
|
||||
a.logger.InfoContext(ctx, "closing processor")
|
||||
return close.Close(ctx)
|
||||
})
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
closeErrs <- errgrp.Wait()
|
||||
}()
|
||||
|
||||
for _, closeHandle := range processes.processHandles {
|
||||
closeHandle()
|
||||
}
|
||||
|
||||
select {
|
||||
case <-waitClose.Done():
|
||||
return nil
|
||||
case <-closeErrs:
|
||||
return nil
|
||||
case _, closed := <-processes.errs:
|
||||
if closed {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type processStatus struct {
|
||||
errs chan error
|
||||
processHandles []context.CancelFunc
|
||||
}
|
||||
|
||||
func (p *processStatus) wait(_ context.Context) error {
|
||||
return <-p.errs
|
||||
}
|
||||
|
||||
func (a *App) startProcesses(ctx context.Context) (*processStatus, any) {
|
||||
status := &processStatus{
|
||||
errs: make(chan error, len(a.processes)),
|
||||
processHandles: make([]context.CancelFunc, 0),
|
||||
}
|
||||
|
||||
for _, process := range a.processes {
|
||||
processCtx, cancelFunc := context.WithCancel(ctx)
|
||||
|
||||
status.processHandles = append(status.processHandles, cancelFunc)
|
||||
|
||||
go func(ctx context.Context, process Process) {
|
||||
a.logger.DebugContext(ctx, "starting process")
|
||||
|
||||
err := process.Start(ctx)
|
||||
|
||||
if err != nil {
|
||||
a.logger.WarnContext(ctx, "process finished with error", "error", err)
|
||||
|
||||
} else {
|
||||
a.logger.DebugContext(ctx, "process finished gracefully")
|
||||
|
||||
}
|
||||
|
||||
status.errs <- err
|
||||
}(processCtx, process)
|
||||
}
|
||||
|
||||
return status, nil
|
||||
}
|
||||
|
||||
func (a *App) setupProcesses(ctx context.Context) error {
|
||||
ctxWithDeadline, cancel := context.WithTimeout(ctx, time.Minute)
|
||||
defer cancel()
|
||||
|
||||
errgrp, ctx := errgroup.WithContext(ctxWithDeadline)
|
||||
for _, setupProcessor := range a.processes {
|
||||
if setup, ok := setupProcessor.(SetupProcesser); ok {
|
||||
errgrp.Go(func() error {
|
||||
a.logger.InfoContext(ctx, "setting up processor")
|
||||
return setup.Setup(ctx)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return errgrp.Wait()
|
||||
}
|
@ -10,15 +10,16 @@ import (
|
||||
|
||||
"git.front.kjuulh.io/kjuulh/orbis/internal/executor"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
type Scheduler struct {
|
||||
logger *slog.Logger
|
||||
db *pgx.Conn
|
||||
db *pgxpool.Pool
|
||||
executor *executor.Executor
|
||||
}
|
||||
|
||||
func NewScheduler(logger *slog.Logger, db *pgx.Conn, executor *executor.Executor) *Scheduler {
|
||||
func NewScheduler(logger *slog.Logger, db *pgxpool.Pool, executor *executor.Executor) *Scheduler {
|
||||
return &Scheduler{
|
||||
logger: logger,
|
||||
db: db,
|
||||
@ -26,6 +27,14 @@ func NewScheduler(logger *slog.Logger, db *pgx.Conn, executor *executor.Executor
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) Start(ctx context.Context) error {
|
||||
if err := s.Execute(ctx); err != nil {
|
||||
return fmt.Errorf("execution of scheduler failed: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Scheduler) Execute(ctx context.Context) error {
|
||||
acquiredLeader, err := s.acquireLeader(ctx)
|
||||
if err != nil {
|
||||
|
15
internal/worker/queries.sql
Normal file
15
internal/worker/queries.sql
Normal file
@ -0,0 +1,15 @@
|
||||
-- name: Ping :one
|
||||
SELECT 1;
|
||||
|
||||
-- name: RegisterWorker :exec
|
||||
INSERT INTO worker_register (worker_id)
|
||||
VALUES (
|
||||
$1
|
||||
);
|
||||
|
||||
-- name: UpdateWorkerHeartbeat :exec
|
||||
UPDATE worker_register
|
||||
SET
|
||||
heart_beat = now()
|
||||
WHERE
|
||||
worker_id = $1;
|
32
internal/worker/repositories/db.go
Normal file
32
internal/worker/repositories/db.go
Normal file
@ -0,0 +1,32 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.23.0
|
||||
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
)
|
||||
|
||||
type DBTX interface {
|
||||
Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
|
||||
Query(context.Context, string, ...interface{}) (pgx.Rows, error)
|
||||
QueryRow(context.Context, string, ...interface{}) pgx.Row
|
||||
}
|
||||
|
||||
func New(db DBTX) *Queries {
|
||||
return &Queries{db: db}
|
||||
}
|
||||
|
||||
type Queries struct {
|
||||
db DBTX
|
||||
}
|
||||
|
||||
func (q *Queries) WithTx(tx pgx.Tx) *Queries {
|
||||
return &Queries{
|
||||
db: tx,
|
||||
}
|
||||
}
|
15
internal/worker/repositories/models.go
Normal file
15
internal/worker/repositories/models.go
Normal file
@ -0,0 +1,15 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.23.0
|
||||
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
type WorkerRegister struct {
|
||||
WorkerID uuid.UUID `json:"worker_id"`
|
||||
HeartBeat pgtype.Timestamptz `json:"heart_beat"`
|
||||
}
|
19
internal/worker/repositories/querier.go
Normal file
19
internal/worker/repositories/querier.go
Normal file
@ -0,0 +1,19 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.23.0
|
||||
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type Querier interface {
|
||||
Ping(ctx context.Context) (int32, error)
|
||||
RegisterWorker(ctx context.Context, workerID uuid.UUID) error
|
||||
UpdateWorkerHeartbeat(ctx context.Context, workerID uuid.UUID) error
|
||||
}
|
||||
|
||||
var _ Querier = (*Queries)(nil)
|
48
internal/worker/repositories/queries.sql.go
Normal file
48
internal/worker/repositories/queries.sql.go
Normal file
@ -0,0 +1,48 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.23.0
|
||||
// source: queries.sql
|
||||
|
||||
package repositories
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const ping = `-- name: Ping :one
|
||||
SELECT 1
|
||||
`
|
||||
|
||||
func (q *Queries) Ping(ctx context.Context) (int32, error) {
|
||||
row := q.db.QueryRow(ctx, ping)
|
||||
var column_1 int32
|
||||
err := row.Scan(&column_1)
|
||||
return column_1, err
|
||||
}
|
||||
|
||||
const registerWorker = `-- name: RegisterWorker :exec
|
||||
INSERT INTO worker_register (worker_id)
|
||||
VALUES (
|
||||
$1
|
||||
)
|
||||
`
|
||||
|
||||
func (q *Queries) RegisterWorker(ctx context.Context, workerID uuid.UUID) error {
|
||||
_, err := q.db.Exec(ctx, registerWorker, workerID)
|
||||
return err
|
||||
}
|
||||
|
||||
const updateWorkerHeartbeat = `-- name: UpdateWorkerHeartbeat :exec
|
||||
UPDATE worker_register
|
||||
SET
|
||||
heart_beat = now()
|
||||
WHERE
|
||||
worker_id = $1
|
||||
`
|
||||
|
||||
func (q *Queries) UpdateWorkerHeartbeat(ctx context.Context, workerID uuid.UUID) error {
|
||||
_, err := q.db.Exec(ctx, updateWorkerHeartbeat, workerID)
|
||||
return err
|
||||
}
|
@ -7,10 +7,15 @@ sql:
|
||||
go:
|
||||
out: "repositories"
|
||||
package: "repositories"
|
||||
sql_package: "sql/db"
|
||||
sql_package: "pgx/v5"
|
||||
emit_json_tags: true
|
||||
emit_prepared_queries: true
|
||||
emit_interface: true
|
||||
emit_empty_slices: true
|
||||
emit_result_struct_pointers: true
|
||||
emit_params_struct_pointers: true
|
||||
overrides:
|
||||
- db_type: "uuid"
|
||||
go_type:
|
||||
import: "github.com/google/uuid"
|
||||
type: "UUID"
|
||||
|
@ -2,21 +2,90 @@ package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"git.front.kjuulh.io/kjuulh/orbis/internal/worker/repositories"
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
//go:generate sqlc generate
|
||||
|
||||
type Worker struct {
|
||||
workerID uuid.UUID
|
||||
|
||||
db *pgxpool.Pool
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
func NewWorker() *Worker {
|
||||
func NewWorker(
|
||||
db *pgxpool.Pool,
|
||||
logger *slog.Logger,
|
||||
) *Worker {
|
||||
return &Worker{
|
||||
workerID: uuid.New(),
|
||||
db: db,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) Setup(ctx context.Context) error {
|
||||
repo := repositories.New(w.db)
|
||||
|
||||
if err := repo.RegisterWorker(ctx, w.workerID); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Worker) Start(ctx context.Context) error {
|
||||
heartBeatCtx, heartBeatCancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
ticker := time.NewTicker(time.Second * 5)
|
||||
errorCount := 0
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-heartBeatCtx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := w.updateHeartBeat(heartBeatCtx); err != nil {
|
||||
if errorCount >= 5 {
|
||||
panic(fmt.Errorf("worker failed to register heartbeat for a long time, panicing..., err: %w", err))
|
||||
}
|
||||
errorCount += 1
|
||||
} else {
|
||||
errorCount = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
defer func() {
|
||||
heartBeatCancel()
|
||||
}()
|
||||
|
||||
for {
|
||||
if err := w.processWorkQueue(ctx); err != nil {
|
||||
// FIXME: dead letter item, right now we just log and continue
|
||||
|
||||
w.logger.WarnContext(ctx, "failed to handle work item", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) updateHeartBeat(ctx context.Context) error {
|
||||
repo := repositories.New(w.db)
|
||||
|
||||
w.logger.DebugContext(ctx, "updating heartbeat", "time", time.Now())
|
||||
return repo.UpdateWorkerHeartbeat(ctx, w.workerID)
|
||||
}
|
||||
|
||||
func (w *Worker) processWorkQueue(_ context.Context) error {
|
||||
time.Sleep(time.Second)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user