Added cleanup

This commit is contained in:
Kasper Juul Hermansen 2021-12-24 17:43:07 +01:00
parent a8bd48e09f
commit ed4475149a
Signed by: kjuulh
GPG Key ID: 0F95C140730F2F23
5 changed files with 95 additions and 1 deletions

View File

@ -11,4 +11,5 @@ func main() {
router. router.
NewRouter(). NewRouter().
Run() Run()
} }

View File

@ -10,6 +10,7 @@ import (
"downloader/internal/core/ports/fileorchestrator" "downloader/internal/core/ports/fileorchestrator"
"downloader/internal/core/ports/fileorchestrator/destinationhandler" "downloader/internal/core/ports/fileorchestrator/destinationhandler"
"downloader/internal/core/ports/fileorchestrator/sourcehandler" "downloader/internal/core/ports/fileorchestrator/sourcehandler"
"downloader/internal/core/services/cleanup"
"downloader/internal/core/services/download/default" "downloader/internal/core/services/download/default"
"downloader/internal/core/services/download/handlers" "downloader/internal/core/services/download/handlers"
"downloader/internal/core/services/downloader" "downloader/internal/core/services/downloader"
@ -78,4 +79,7 @@ func setupDownloadRoute(router *router) {
downloadApi := download.New(drService) downloadApi := download.New(drService)
downloadApi.SetupDownloadApi(router.internalRouter) downloadApi.SetupDownloadApi(router.internalRouter)
cleanupJob := cleanup.New(drRepository, newLogger)
cleanupJob.RunOnSchedule()
} }

View File

@ -10,4 +10,6 @@ type Repository interface {
GetById(ctx context.Context, id string) (*entities.Download, error) GetById(ctx context.Context, id string) (*entities.Download, error)
Update(ctx context.Context, download *entities.Download) error Update(ctx context.Context, download *entities.Download) error
Get(ctx context.Context, active bool) ([]*entities.Download, error) Get(ctx context.Context, active bool) ([]*entities.Download, error)
GetOldOrStuck(ctx context.Context) ([]*entities.Download, error)
BatchDelete(ctx context.Context, requests []*entities.Download) error
} }

View File

@ -3,6 +3,7 @@ package sql
import ( import (
"context" "context"
"downloader/internal/core/entities" "downloader/internal/core/entities"
"downloader/internal/core/ports/download_request"
"github.com/uptrace/bun" "github.com/uptrace/bun"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -12,7 +13,7 @@ type repository struct {
logger *zap.SugaredLogger logger *zap.SugaredLogger
} }
func NewDownloadRequestSqlRepository(db *bun.DB, logger *zap.SugaredLogger) *repository { func NewDownloadRequestSqlRepository(db *bun.DB, logger *zap.SugaredLogger) download_request.Repository {
return &repository{ return &repository{
db: db, db: db,
logger: logger, logger: logger,
@ -82,6 +83,7 @@ func (r repository) Get(ctx context.Context, active bool) ([]*entities.Download,
err := r.db.NewSelect(). err := r.db.NewSelect().
Model(&downloads). Model(&downloads).
Column("id", "status", "link"). Column("id", "status", "link").
Where("status LIKE ?", "in-progress%").
Limit(20). Limit(20).
Order("created_at ASC"). Order("created_at ASC").
Scan(ctx) Scan(ctx)
@ -100,3 +102,53 @@ func (r repository) Get(ctx context.Context, active bool) ([]*entities.Download,
return responseDownloads, nil return responseDownloads, nil
} }
func (r repository) GetOldOrStuck(ctx context.Context) ([]*entities.Download, error) {
var downloads []Download
err := r.db.NewSelect().
Model(&downloads).
Column("id", "status", "link").
Where("status LIKE ?", "in-progress%").
Where("updated_at < now() - interval '1 minutes'").
Limit(20).
Order("created_at ASC").
Scan(ctx)
if err != nil {
return nil, err
}
var responseDownloads []*entities.Download
for _, download := range downloads {
responseDownloads = append(responseDownloads, &entities.Download{
ID: download.ID,
Status: download.Status,
Link: download.Link,
})
}
if len(responseDownloads) == 0 {
return []*entities.Download{}, nil
}
return responseDownloads, nil
}
func (r repository) BatchDelete(ctx context.Context, requests []*entities.Download) error {
var downloads []*Download
for _, request := range requests {
downloads = append(downloads, &Download{ID: request.ID})
}
if len(downloads) == 0 {
return nil
}
_, err := r.db.NewDelete().
Model(&downloads).
WherePK().
Exec(ctx)
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,35 @@
package cleanup
import (
"context"
"downloader/internal/core/ports/download_request"
"go.uber.org/zap"
"time"
)
type cleanUp struct {
repository download_request.Repository
logger *zap.SugaredLogger
}
func New(repository download_request.Repository, logger *zap.SugaredLogger) *cleanUp {
return &cleanUp{repository: repository, logger: logger}
}
func (c *cleanUp) RunOnSchedule() {
ctx := context.TODO()
go func() {
for true {
requests, err := c.repository.GetOldOrStuck(ctx)
if err == nil {
c.logger.Debugw("Cleaning up downloads",
"downloads", requests)
_ = c.repository.BatchDelete(ctx, requests)
} else {
c.logger.Warn("could not process old or stuck in-progress jobs")
}
time.Sleep(time.Minute)
}
}()
}