From ed4475149a1f8bfaea0099d91b1173241178e294 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Fri, 24 Dec 2021 17:43:07 +0100 Subject: [PATCH] Added cleanup --- api/cmd/api/main.go | 1 + api/internal/app/router/router.go | 4 ++ .../core/ports/download_request/repository.go | 2 + .../ports/download_request/sql/repository.go | 54 ++++++++++++++++++- api/internal/core/services/cleanup/cleanup.go | 35 ++++++++++++ 5 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 api/internal/core/services/cleanup/cleanup.go diff --git a/api/cmd/api/main.go b/api/cmd/api/main.go index a707e9b..5a30280 100644 --- a/api/cmd/api/main.go +++ b/api/cmd/api/main.go @@ -11,4 +11,5 @@ func main() { router. NewRouter(). Run() + } diff --git a/api/internal/app/router/router.go b/api/internal/app/router/router.go index 7679990..28a855f 100644 --- a/api/internal/app/router/router.go +++ b/api/internal/app/router/router.go @@ -10,6 +10,7 @@ import ( "downloader/internal/core/ports/fileorchestrator" "downloader/internal/core/ports/fileorchestrator/destinationhandler" "downloader/internal/core/ports/fileorchestrator/sourcehandler" + "downloader/internal/core/services/cleanup" "downloader/internal/core/services/download/default" "downloader/internal/core/services/download/handlers" "downloader/internal/core/services/downloader" @@ -78,4 +79,7 @@ func setupDownloadRoute(router *router) { downloadApi := download.New(drService) downloadApi.SetupDownloadApi(router.internalRouter) + + cleanupJob := cleanup.New(drRepository, newLogger) + cleanupJob.RunOnSchedule() } diff --git a/api/internal/core/ports/download_request/repository.go b/api/internal/core/ports/download_request/repository.go index 7a53fec..c2293e9 100644 --- a/api/internal/core/ports/download_request/repository.go +++ b/api/internal/core/ports/download_request/repository.go @@ -10,4 +10,6 @@ type Repository interface { GetById(ctx context.Context, id string) (*entities.Download, error) Update(ctx context.Context, download *entities.Download) error Get(ctx context.Context, active bool) ([]*entities.Download, error) + GetOldOrStuck(ctx context.Context) ([]*entities.Download, error) + BatchDelete(ctx context.Context, requests []*entities.Download) error } diff --git a/api/internal/core/ports/download_request/sql/repository.go b/api/internal/core/ports/download_request/sql/repository.go index be40eb1..2f1e4fd 100644 --- a/api/internal/core/ports/download_request/sql/repository.go +++ b/api/internal/core/ports/download_request/sql/repository.go @@ -3,6 +3,7 @@ package sql import ( "context" "downloader/internal/core/entities" + "downloader/internal/core/ports/download_request" "github.com/uptrace/bun" "go.uber.org/zap" ) @@ -12,7 +13,7 @@ type repository struct { logger *zap.SugaredLogger } -func NewDownloadRequestSqlRepository(db *bun.DB, logger *zap.SugaredLogger) *repository { +func NewDownloadRequestSqlRepository(db *bun.DB, logger *zap.SugaredLogger) download_request.Repository { return &repository{ db: db, logger: logger, @@ -82,6 +83,7 @@ func (r repository) Get(ctx context.Context, active bool) ([]*entities.Download, err := r.db.NewSelect(). Model(&downloads). Column("id", "status", "link"). + Where("status LIKE ?", "in-progress%"). Limit(20). Order("created_at ASC"). Scan(ctx) @@ -100,3 +102,53 @@ func (r repository) Get(ctx context.Context, active bool) ([]*entities.Download, return responseDownloads, nil } + +func (r repository) GetOldOrStuck(ctx context.Context) ([]*entities.Download, error) { + + var downloads []Download + err := r.db.NewSelect(). + Model(&downloads). + Column("id", "status", "link"). + Where("status LIKE ?", "in-progress%"). + Where("updated_at < now() - interval '1 minutes'"). + Limit(20). + Order("created_at ASC"). + Scan(ctx) + if err != nil { + return nil, err + } + + var responseDownloads []*entities.Download + for _, download := range downloads { + responseDownloads = append(responseDownloads, &entities.Download{ + ID: download.ID, + Status: download.Status, + Link: download.Link, + }) + } + if len(responseDownloads) == 0 { + return []*entities.Download{}, nil + } + + return responseDownloads, nil +} + +func (r repository) BatchDelete(ctx context.Context, requests []*entities.Download) error { + var downloads []*Download + for _, request := range requests { + downloads = append(downloads, &Download{ID: request.ID}) + } + if len(downloads) == 0 { + return nil + } + + _, err := r.db.NewDelete(). + Model(&downloads). + WherePK(). + Exec(ctx) + if err != nil { + return err + } + + return nil +} diff --git a/api/internal/core/services/cleanup/cleanup.go b/api/internal/core/services/cleanup/cleanup.go new file mode 100644 index 0000000..03bd007 --- /dev/null +++ b/api/internal/core/services/cleanup/cleanup.go @@ -0,0 +1,35 @@ +package cleanup + +import ( + "context" + "downloader/internal/core/ports/download_request" + "go.uber.org/zap" + "time" +) + +type cleanUp struct { + repository download_request.Repository + logger *zap.SugaredLogger +} + +func New(repository download_request.Repository, logger *zap.SugaredLogger) *cleanUp { + return &cleanUp{repository: repository, logger: logger} +} + +func (c *cleanUp) RunOnSchedule() { + ctx := context.TODO() + go func() { + for true { + requests, err := c.repository.GetOldOrStuck(ctx) + if err == nil { + c.logger.Debugw("Cleaning up downloads", + "downloads", requests) + _ = c.repository.BatchDelete(ctx, requests) + } else { + c.logger.Warn("could not process old or stuck in-progress jobs") + } + + time.Sleep(time.Minute) + } + }() +}