orbis/internal/modelschedule/schedule.go
kjuulh 2cdab4a1ab
All checks were successful
continuous-integration/drone/push Build is passing
feat: add worker distributor and model registry
2025-01-18 01:46:53 +01:00

106 lines
2.0 KiB
Go

package modelschedule
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"git.front.kjuulh.io/kjuulh/orbis/internal/modelregistry"
"git.front.kjuulh.io/kjuulh/orbis/internal/modelschedule/repositories"
"github.com/adhocore/gronx"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
)
//go:generate sqlc generate
type ModelRunSchedule struct {
Model *modelregistry.Model
Start time.Time
End time.Time
}
type ModelSchedule struct {
logger *slog.Logger
db *pgxpool.Pool
}
func NewModelSchedule(logger *slog.Logger, db *pgxpool.Pool) *ModelSchedule {
return &ModelSchedule{
logger: logger,
db: db,
}
}
func (m *ModelSchedule) GetNext(
ctx context.Context,
model modelregistry.Model,
start time.Time,
end time.Time,
amount uint,
) (models []ModelRunSchedule, lastExecuted *time.Time, err error) {
repo := repositories.New(m.db)
var startRun time.Time
lastRun, err := repo.GetLast(ctx, model.Name)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
return nil, nil, fmt.Errorf("failed to get last run for mode: %s: %w", model.Name, err)
}
startRun = start
} else {
startRun = lastRun.Time
}
times := make([]ModelRunSchedule, 0, amount)
for {
next, err := gronx.NextTickAfter(model.Schedule, startRun, false)
if err != nil {
return nil, nil, fmt.Errorf("failed to find next model schedule: %w", err)
}
if next.Equal(time.Time{}) {
break
}
if next.After(end) {
break
}
times = append(times, ModelRunSchedule{
Model: &model,
Start: startRun,
End: next,
})
startRun = next
if len(times) >= int(amount) {
break
}
}
if len(times) == 0 {
return nil, nil, nil
}
return times, &startRun, nil
}
func (m *ModelSchedule) UpdateModelRun(ctx context.Context, model modelregistry.Model, lastRun *time.Time) error {
repo := repositories.New(m.db)
return repo.UpsertModel(ctx, &repositories.UpsertModelParams{
ModelName: model.Name,
LastRun: pgtype.Timestamptz{
Time: *lastRun,
Valid: true,
},
})
}