octopush/internal/commands/process_repos.go

108 lines
2.6 KiB
Go
Raw Normal View History

2022-09-11 22:56:54 +02:00
package commands
import (
"context"
"sync"
"time"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/actions"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/providers"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/storage"
"go.uber.org/zap"
)
type (
ProcessRepos struct {
logger *zap.Logger
storage *storage.Service
git *providers.Git
2022-09-12 09:52:44 +02:00
action *actions.Action
2022-09-11 22:56:54 +02:00
}
ProcessReposDeps interface {
GetStorageService() *storage.Service
GetGitProvider() *providers.Git
2022-09-12 09:52:44 +02:00
GetAction() *actions.Action
2022-09-11 22:56:54 +02:00
}
)
func NewProcessRepos(logger *zap.Logger, deps ProcessReposDeps) *ProcessRepos {
return &ProcessRepos{
logger: logger,
storage: deps.GetStorageService(),
git: deps.GetGitProvider(),
2022-09-12 09:52:44 +02:00
action: deps.GetAction(),
2022-09-11 22:56:54 +02:00
}
}
2022-09-12 09:52:44 +02:00
func (pr *ProcessRepos) Process(ctx context.Context, repositoryUrls []string) error {
2022-09-11 22:56:54 +02:00
// Clone repos
wg := sync.WaitGroup{}
wg.Add(len(repositoryUrls))
errChan := make(chan error, 1)
for _, repoUrl := range repositoryUrls {
go func(ctx context.Context, repoUrl string) {
defer func() {
wg.Done()
}()
pr.logger.Debug("Creating area", zap.String("repoUrl", repoUrl))
area, err := pr.storage.CreateArea(ctx)
if err != nil {
pr.logger.Error("failed to allocate area", zap.Error(err))
errChan <- err
return
}
defer func(ctx context.Context) {
pr.logger.Debug("Removing area", zap.String("path", area.Path), zap.String("repoUrl", repoUrl))
err = pr.storage.RemoveArea(ctx, area)
if err != nil {
errChan <- err
return
}
}(ctx)
pr.logger.Debug("Cloning repo", zap.String("path", area.Path), zap.String("repoUrl", repoUrl))
2022-09-12 09:52:44 +02:00
cloneCtx, _ := context.WithTimeout(ctx, time.Second*5)
_, err = pr.git.Clone(cloneCtx, area, repoUrl)
2022-09-11 22:56:54 +02:00
if err != nil {
pr.logger.Error("could not clone repo", zap.Error(err))
errChan <- err
return
}
2022-09-12 09:52:44 +02:00
err = pr.action.Run(
ctx,
area,
func(ctx context.Context, area *storage.Area) (bool, error) {
pr.logger.Debug("checking predicate", zap.String("area", area.Path))
return true, nil
},
func(ctx context.Context, area *storage.Area) error {
pr.logger.Debug("running action", zap.String("area", area.Path))
return nil
}, false)
if err != nil {
pr.logger.Error("could not run action", zap.Error(err))
errChan <- err
return
}
2022-09-11 22:56:54 +02:00
pr.logger.Debug("processing done", zap.String("path", area.Path), zap.String("repoUrl", repoUrl))
}(ctx, repoUrl)
}
wg.Wait()
close(errChan)
pr.logger.Debug("finished processing all repos")
for err := range errChan {
return err
}
return nil
}