From 0b042780c3fd2f1cd6ceaff60d59b9c4b70fb009 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Sun, 19 Jan 2025 11:33:38 +0100 Subject: [PATCH] feat: add dead letter queue --- internal/app/app.go | 7 +++- internal/deadletter/deadletter.go | 35 +++++++++++++++++ internal/deadletter/queries.sql | 12 ++++++ internal/deadletter/repositories/db.go | 32 +++++++++++++++ internal/deadletter/repositories/models.go | 35 +++++++++++++++++ internal/deadletter/repositories/querier.go | 18 +++++++++ .../deadletter/repositories/queries.sql.go | 39 +++++++++++++++++++ internal/deadletter/sqlc.yaml | 21 ++++++++++ internal/modelschedule/repositories/models.go | 5 +++ .../migrations/001_create_initial.up.sql | 5 +++ internal/worker/repositories/models.go | 5 +++ internal/workprocessor/workprocessor.go | 17 +++++++- internal/workscheduler/repositories/models.go | 5 +++ 13 files changed, 234 insertions(+), 2 deletions(-) create mode 100644 internal/deadletter/deadletter.go create mode 100644 internal/deadletter/queries.sql create mode 100644 internal/deadletter/repositories/db.go create mode 100644 internal/deadletter/repositories/models.go create mode 100644 internal/deadletter/repositories/querier.go create mode 100644 internal/deadletter/repositories/queries.sql.go create mode 100644 internal/deadletter/sqlc.yaml diff --git a/internal/app/app.go b/internal/app/app.go index 9f35c01..84f4e98 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -3,6 +3,7 @@ package app import ( "log/slog" + "git.front.kjuulh.io/kjuulh/orbis/internal/deadletter" "git.front.kjuulh.io/kjuulh/orbis/internal/executor" "git.front.kjuulh.io/kjuulh/orbis/internal/modelschedule" "git.front.kjuulh.io/kjuulh/orbis/internal/scheduler" @@ -48,7 +49,11 @@ func (a *App) WorkScheduler() *workscheduler.WorkScheduler { } func (a *App) WorkProcessor() *workprocessor.WorkProcessor { - return workprocessor.NewWorkProcessor(a.WorkScheduler(), a.logger) + return workprocessor.NewWorkProcessor(a.WorkScheduler(), a.logger, a.DeadLetter()) +} + +func (a *App) DeadLetter() *deadletter.DeadLetter { + return deadletter.NewDeadLetter(Postgres(), a.logger) } func (a *App) ModelSchedule() *modelschedule.ModelSchedule { diff --git a/internal/deadletter/deadletter.go b/internal/deadletter/deadletter.go new file mode 100644 index 0000000..0072a12 --- /dev/null +++ b/internal/deadletter/deadletter.go @@ -0,0 +1,35 @@ +package deadletter + +import ( + "context" + "fmt" + "log/slog" + + "git.front.kjuulh.io/kjuulh/orbis/internal/deadletter/repositories" + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgxpool" +) + +type DeadLetter struct { + db *pgxpool.Pool + logger *slog.Logger +} + +func NewDeadLetter(db *pgxpool.Pool, logger *slog.Logger) *DeadLetter { + return &DeadLetter{ + db: db, + logger: logger, + } +} + +func (d *DeadLetter) InsertDeadLetter(ctx context.Context, schedule uuid.UUID) error { + repo := repositories.New(d.db) + + d.logger.WarnContext(ctx, "deadlettering schedule", "schedule", schedule) + if err := repo.InsertDeadLetter(ctx, schedule); err != nil { + return fmt.Errorf("failed to insert item into dead letter: %w", err) + } + + return nil + +} diff --git a/internal/deadletter/queries.sql b/internal/deadletter/queries.sql new file mode 100644 index 0000000..d318d93 --- /dev/null +++ b/internal/deadletter/queries.sql @@ -0,0 +1,12 @@ +-- name: Ping :one +SELECT 1; + +-- name: InsertDeadLetter :exec +INSERT INTO dead_letter + ( + schedule_id + ) +VALUES + ( + $1 + ); diff --git a/internal/deadletter/repositories/db.go b/internal/deadletter/repositories/db.go new file mode 100644 index 0000000..e79f6b4 --- /dev/null +++ b/internal/deadletter/repositories/db.go @@ -0,0 +1,32 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.23.0 + +package repositories + +import ( + "context" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +type DBTX interface { + Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) + Query(context.Context, string, ...interface{}) (pgx.Rows, error) + QueryRow(context.Context, string, ...interface{}) pgx.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx pgx.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/internal/deadletter/repositories/models.go b/internal/deadletter/repositories/models.go new file mode 100644 index 0000000..fba97eb --- /dev/null +++ b/internal/deadletter/repositories/models.go @@ -0,0 +1,35 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.23.0 + +package repositories + +import ( + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgtype" +) + +type DeadLetter struct { + ScheduleID uuid.UUID `json:"schedule_id"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` +} + +type ModelSchedule struct { + ModelName string `json:"model_name"` + LastRun pgtype.Timestamptz `json:"last_run"` +} + +type WorkSchedule struct { + ScheduleID uuid.UUID `json:"schedule_id"` + WorkerID uuid.UUID `json:"worker_id"` + StartRun pgtype.Timestamptz `json:"start_run"` + EndRun pgtype.Timestamptz `json:"end_run"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` + State string `json:"state"` +} + +type WorkerRegister struct { + WorkerID uuid.UUID `json:"worker_id"` + Capacity int32 `json:"capacity"` + HeartBeat pgtype.Timestamptz `json:"heart_beat"` +} diff --git a/internal/deadletter/repositories/querier.go b/internal/deadletter/repositories/querier.go new file mode 100644 index 0000000..7cf0a68 --- /dev/null +++ b/internal/deadletter/repositories/querier.go @@ -0,0 +1,18 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.23.0 + +package repositories + +import ( + "context" + + "github.com/google/uuid" +) + +type Querier interface { + InsertDeadLetter(ctx context.Context, scheduleID uuid.UUID) error + Ping(ctx context.Context) (int32, error) +} + +var _ Querier = (*Queries)(nil) diff --git a/internal/deadletter/repositories/queries.sql.go b/internal/deadletter/repositories/queries.sql.go new file mode 100644 index 0000000..8817f77 --- /dev/null +++ b/internal/deadletter/repositories/queries.sql.go @@ -0,0 +1,39 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.23.0 +// source: queries.sql + +package repositories + +import ( + "context" + + "github.com/google/uuid" +) + +const insertDeadLetter = `-- name: InsertDeadLetter :exec +INSERT INTO dead_letter + ( + schedule_id + ) +VALUES + ( + $1 + ) +` + +func (q *Queries) InsertDeadLetter(ctx context.Context, scheduleID uuid.UUID) error { + _, err := q.db.Exec(ctx, insertDeadLetter, scheduleID) + return err +} + +const ping = `-- name: Ping :one +SELECT 1 +` + +func (q *Queries) Ping(ctx context.Context) (int32, error) { + row := q.db.QueryRow(ctx, ping) + var column_1 int32 + err := row.Scan(&column_1) + return column_1, err +} diff --git a/internal/deadletter/sqlc.yaml b/internal/deadletter/sqlc.yaml new file mode 100644 index 0000000..30fdb94 --- /dev/null +++ b/internal/deadletter/sqlc.yaml @@ -0,0 +1,21 @@ +version: "2" +sql: + - queries: queries.sql + schema: ../persistence/migrations/ + engine: "postgresql" + gen: + go: + out: "repositories" + package: "repositories" + sql_package: "pgx/v5" + emit_json_tags: true + emit_prepared_queries: true + emit_interface: true + emit_empty_slices: true + emit_result_struct_pointers: true + emit_params_struct_pointers: true + overrides: + - db_type: "uuid" + go_type: + import: "github.com/google/uuid" + type: "UUID" diff --git a/internal/modelschedule/repositories/models.go b/internal/modelschedule/repositories/models.go index fcc9201..fba97eb 100644 --- a/internal/modelschedule/repositories/models.go +++ b/internal/modelschedule/repositories/models.go @@ -9,6 +9,11 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +type DeadLetter struct { + ScheduleID uuid.UUID `json:"schedule_id"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` +} + type ModelSchedule struct { ModelName string `json:"model_name"` LastRun pgtype.Timestamptz `json:"last_run"` diff --git a/internal/persistence/migrations/001_create_initial.up.sql b/internal/persistence/migrations/001_create_initial.up.sql index 028773e..10dd3f4 100644 --- a/internal/persistence/migrations/001_create_initial.up.sql +++ b/internal/persistence/migrations/001_create_initial.up.sql @@ -20,3 +20,8 @@ CREATE TABLE work_schedule ( CREATE INDEX idx_work_schedule_worker ON work_schedule (worker_id); CREATE INDEX idx_work_schedule_worker_updated ON work_schedule (worker_id, updated_at DESC); + +CREATE TABLE dead_letter ( + schedule_id UUID PRIMARY KEY NOT NULL + , updated_at TIMESTAMPTZ NOT NULL default now() +); diff --git a/internal/worker/repositories/models.go b/internal/worker/repositories/models.go index fcc9201..fba97eb 100644 --- a/internal/worker/repositories/models.go +++ b/internal/worker/repositories/models.go @@ -9,6 +9,11 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +type DeadLetter struct { + ScheduleID uuid.UUID `json:"schedule_id"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` +} + type ModelSchedule struct { ModelName string `json:"model_name"` LastRun pgtype.Timestamptz `json:"last_run"` diff --git a/internal/workprocessor/workprocessor.go b/internal/workprocessor/workprocessor.go index f71388c..cf9896a 100644 --- a/internal/workprocessor/workprocessor.go +++ b/internal/workprocessor/workprocessor.go @@ -4,8 +4,10 @@ import ( "context" "fmt" "log/slog" + "math/rand/v2" "time" + "git.front.kjuulh.io/kjuulh/orbis/internal/deadletter" "git.front.kjuulh.io/kjuulh/orbis/internal/workscheduler" "github.com/google/uuid" ) @@ -13,12 +15,18 @@ import ( type WorkProcessor struct { workscheduler *workscheduler.WorkScheduler logger *slog.Logger + deadletter *deadletter.DeadLetter } -func NewWorkProcessor(workscheduler *workscheduler.WorkScheduler, logger *slog.Logger) *WorkProcessor { +func NewWorkProcessor( + workscheduler *workscheduler.WorkScheduler, + logger *slog.Logger, + deadletter *deadletter.DeadLetter, +) *WorkProcessor { return &WorkProcessor{ workscheduler: workscheduler, logger: logger, + deadletter: deadletter, } } @@ -41,10 +49,17 @@ func (w *WorkProcessor) ProcessNext(ctx context.Context, workerID uuid.UUID) err } time.Sleep(time.Millisecond * 10) + // Process or achive if err := w.workscheduler.Archive(ctx, *schedule); err != nil { return fmt.Errorf("failed to archive item: %w", err) } + if rand.Float64() > 0.9 { + if err := w.deadletter.InsertDeadLetter(ctx, *schedule); err != nil { + return fmt.Errorf("failed to deadletter message: %w", err) + } + } + return nil } diff --git a/internal/workscheduler/repositories/models.go b/internal/workscheduler/repositories/models.go index fcc9201..fba97eb 100644 --- a/internal/workscheduler/repositories/models.go +++ b/internal/workscheduler/repositories/models.go @@ -9,6 +9,11 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +type DeadLetter struct { + ScheduleID uuid.UUID `json:"schedule_id"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` +} + type ModelSchedule struct { ModelName string `json:"model_name"` LastRun pgtype.Timestamptz `json:"last_run"`