with pull into storage

This commit is contained in:
2022-09-11 22:56:54 +02:00
parent 3643d4a467
commit 226e47e5f4
15 changed files with 485 additions and 53 deletions

View File

@@ -0,0 +1,86 @@
package commands
import (
"context"
"sync"
"time"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/actions"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/providers"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/storage"
"go.uber.org/zap"
)
type (
ProcessRepos struct {
logger *zap.Logger
storage *storage.Service
git *providers.Git
}
ProcessReposDeps interface {
GetStorageService() *storage.Service
GetGitProvider() *providers.Git
}
)
func NewProcessRepos(logger *zap.Logger, deps ProcessReposDeps) *ProcessRepos {
return &ProcessRepos{
logger: logger,
storage: deps.GetStorageService(),
git: deps.GetGitProvider(),
}
}
func (pr *ProcessRepos) Process(ctx context.Context, repositoryUrls []string, action *actions.Action) error {
// Clone repos
wg := sync.WaitGroup{}
wg.Add(len(repositoryUrls))
errChan := make(chan error, 1)
for _, repoUrl := range repositoryUrls {
go func(ctx context.Context, repoUrl string) {
defer func() {
wg.Done()
}()
pr.logger.Debug("Creating area", zap.String("repoUrl", repoUrl))
area, err := pr.storage.CreateArea(ctx)
if err != nil {
pr.logger.Error("failed to allocate area", zap.Error(err))
errChan <- err
return
}
defer func(ctx context.Context) {
pr.logger.Debug("Removing area", zap.String("path", area.Path), zap.String("repoUrl", repoUrl))
err = pr.storage.RemoveArea(ctx, area)
if err != nil {
errChan <- err
return
}
}(ctx)
pr.logger.Debug("Cloning repo", zap.String("path", area.Path), zap.String("repoUrl", repoUrl))
ctx, _ = context.WithTimeout(ctx, time.Second*5)
_, err = pr.git.Clone(ctx, area, repoUrl)
if err != nil {
pr.logger.Error("could not clone repo", zap.Error(err))
errChan <- err
return
}
pr.logger.Debug("processing done", zap.String("path", area.Path), zap.String("repoUrl", repoUrl))
}(ctx, repoUrl)
}
wg.Wait()
close(errChan)
pr.logger.Debug("finished processing all repos")
for err := range errChan {
return err
}
return nil
}

View File

@@ -4,32 +4,18 @@ import (
"context"
"errors"
"net/http"
"time"
"git.front.kjuulh.io/kjuulh/curre"
"git.front.kjuulh.io/kjuulh/kraken/internal/commands"
"git.front.kjuulh.io/kjuulh/kraken/internal/serverdeps"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/jobs"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"go.uber.org/zap"
)
func NewHttpServer(deps *serverdeps.ServerDeps) curre.Component {
return curre.NewFunctionalComponent(&curre.FunctionalComponent{
StartFunc: func(_ *curre.FunctionalComponent, _ context.Context) error {
handler := http.NewServeMux()
handler.HandleFunc(
"/health/ready",
func(w http.ResponseWriter, _ *http.Request) {
w.Write([]byte("ready"))
w.WriteHeader(http.StatusOK)
})
http.ListenAndServe("127.0.0.1:3000", handler)
return nil
},
},
)
}
func NewGinHttpServer(_ *serverdeps.ServerDeps) curre.Component {
func NewGinHttpServer(logger *zap.Logger, deps *serverdeps.ServerDeps) curre.Component {
var app *gin.Engine
var server *http.Server
@@ -45,6 +31,30 @@ func NewGinHttpServer(_ *serverdeps.ServerDeps) curre.Component {
})
})
commandRoute := app.Group("commands")
commandRoute.POST("processRepos", func(c *gin.Context) {
type processReposRequest struct {
RepositoryUrls []string `json:"repositoryUrls"`
}
var request processReposRequest
err := c.BindJSON(&request)
if err != nil {
logger.Info("could not bind request", zap.String("request", "processRepo"), zap.Error(err))
c.AbortWithStatus(http.StatusBadRequest)
return
}
jobId := uuid.New().String()
go func(repositoryUrls []string, jobId string) {
ctx := context.WithValue(context.Background(), jobs.JobId{}, jobId)
processRepos := commands.NewProcessRepos(logger, deps)
err = processRepos.Process(ctx, repositoryUrls, nil)
}(request.RepositoryUrls, jobId)
c.Status(http.StatusAccepted)
})
server = &http.Server{
Addr: "127.0.0.1:3000",
Handler: app,
@@ -62,6 +72,7 @@ func NewGinHttpServer(_ *serverdeps.ServerDeps) curre.Component {
return nil
},
StopFunc: func(_ *curre.FunctionalComponent, ctx context.Context) error {
ctx, _ = context.WithTimeout(ctx, time.Second*10)
if server != nil {
server.Shutdown(ctx)
}

View File

@@ -14,7 +14,7 @@ func Start(logger *zap.Logger) error {
deps := serverdeps.NewServerDeps(logger)
return curre.NewManager().
Register(NewGinHttpServer(deps)).
Register(NewGinHttpServer(logger.With(zap.String("app", "ginHttpServer")), deps)).
Register(NewStorageServer(logger.With(zap.String("app", "storageServer")), deps)).
Run(ctx)
}

View File

@@ -2,6 +2,7 @@ package server
import (
"context"
"time"
"git.front.kjuulh.io/kjuulh/curre"
"git.front.kjuulh.io/kjuulh/kraken/internal/serverdeps"
@@ -11,15 +12,16 @@ import (
func NewStorageServer(logger *zap.Logger, deps *serverdeps.ServerDeps) curre.Component {
storage := deps.GetStorageService()
return curre.NewFunctionalComponent(&curre.FunctionalComponent{
InitFunc: func(fc *curre.FunctionalComponent, ctx context.Context) error {
InitFunc: func(_ *curre.FunctionalComponent, ctx context.Context) error {
logger.Debug("Initializing storage")
return storage.InitializeStorage(ctx)
},
StartFunc: func(fc *curre.FunctionalComponent, ctx context.Context) error {
return nil
},
StopFunc: func(fc *curre.FunctionalComponent, ctx context.Context) error {
StopFunc: func(_ *curre.FunctionalComponent, ctx context.Context) error {
logger.Debug("Cleaning up storage")
ctx, _ = context.WithTimeout(ctx, time.Second*10)
return storage.CleanupStorage(ctx)
},
})

View File

@@ -1,6 +1,7 @@
package serverdeps
import (
"git.front.kjuulh.io/kjuulh/kraken/internal/services/providers"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/storage"
"go.uber.org/zap"
)
@@ -8,6 +9,7 @@ import (
type ServerDeps struct {
logger *zap.Logger
storageConfig *storage.StorageConfig
gitCfg *providers.GitConfig
}
func NewServerDeps(logger *zap.Logger) *ServerDeps {
@@ -21,9 +23,22 @@ func NewServerDeps(logger *zap.Logger) *ServerDeps {
deps.storageConfig = storageCfg
}
deps.gitCfg = &providers.GitConfig{
AuthOption: providers.GIT_AUTH_SSH,
User: "git",
Password: "",
AccessToken: "",
SshPublicKeyFilePath: "/Users/kah/.ssh/id_ed25519",
SshPrivateKeyPassword: "",
}
return deps
}
func (deps *ServerDeps) GetStorageService() *storage.Service {
return storage.NewService(deps.storageConfig)
return storage.NewService(deps.logger.With(zap.String("app", "storage")), deps.storageConfig)
}
func (deps *ServerDeps) GetGitProvider() *providers.Git {
return providers.NewGit(deps.logger.With(zap.String("app", "gitProvider")), deps.gitCfg)
}

View File

@@ -1,2 +1,4 @@
package action
package actions
type Action struct {
}

View File

@@ -0,0 +1,4 @@
package jobs
type JobId struct {
}

View File

@@ -0,0 +1,118 @@
package providers
import (
"context"
"git.front.kjuulh.io/kjuulh/kraken/internal/services/storage"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing/transport"
"github.com/go-git/go-git/v5/plumbing/transport/http"
"github.com/go-git/go-git/v5/plumbing/transport/ssh"
"go.uber.org/zap"
"go.uber.org/zap/zapio"
)
// Git is a native git provider, it can clone, pull
// , push and as in abstraction on native git operations
type Git struct {
logger *zap.Logger
gitConfig *GitConfig
}
type GitRepo struct {
repo *git.Repository
}
type GitAuth string
const (
GIT_AUTH_SSH GitAuth = "ssh"
GIT_AUTH_USERNAME_PASSWORD GitAuth = "username_password"
GIT_AUTH_ACCESS_TOKEN GitAuth = "access_token"
GIT_AUTH_ANONYMOUS GitAuth = "anonymous"
GIT_AUTH_SSH_AGENT GitAuth = "ssh_agent"
)
type GitConfig struct {
AuthOption GitAuth
User string
Password string
AccessToken string
SshPublicKeyFilePath string
SshPrivateKeyPassword string
}
func NewGit(logger *zap.Logger, gitConfig *GitConfig) *Git {
return &Git{logger: logger, gitConfig: gitConfig}
}
func (g *Git) Clone(ctx context.Context, storageArea *storage.Area, repoUrl string) (*GitRepo, error) {
g.logger.Debug(
"cloning repository",
zap.String("repoUrl", repoUrl),
zap.String("path", storageArea.Path),
)
auth, err := g.GetAuth()
if err != nil {
return nil, err
}
cloneOptions := git.CloneOptions{
URL: repoUrl,
Auth: auth,
RemoteName: "origin",
ReferenceName: "refs/heads/main",
SingleBranch: true,
NoCheckout: false,
Depth: 1,
RecurseSubmodules: 1,
Progress: &zapio.Writer{
Log: g.logger.With(zap.String("process", "go-git")),
Level: zap.DebugLevel,
},
Tags: 0,
InsecureSkipTLS: false,
CABundle: []byte{},
}
repo, err := git.PlainCloneContext(ctx, storageArea.Path, false, &cloneOptions)
if err != nil {
return nil, err
}
g.logger.Debug("done cloning repo")
return &GitRepo{repo: repo}, nil
}
func (g *Git) GetAuth() (transport.AuthMethod, error) {
switch g.gitConfig.AuthOption {
case GIT_AUTH_SSH:
sshKey, err := ssh.NewPublicKeysFromFile(
g.gitConfig.User,
g.gitConfig.SshPublicKeyFilePath,
g.gitConfig.SshPrivateKeyPassword,
)
if err != nil {
return nil, err
}
return sshKey, nil
case GIT_AUTH_USERNAME_PASSWORD:
return &http.BasicAuth{
Username: g.gitConfig.User,
Password: g.gitConfig.Password,
}, nil
case GIT_AUTH_ACCESS_TOKEN:
return &http.BasicAuth{
Username: "required-username",
Password: g.gitConfig.AccessToken,
}, nil
case GIT_AUTH_ANONYMOUS:
return nil, nil
case GIT_AUTH_SSH_AGENT:
return ssh.NewSSHAgentAuth(g.gitConfig.User)
default:
return nil, nil
}
}

View File

@@ -1,9 +1,11 @@
package storage
import (
"errors"
"os"
"path"
"go.uber.org/zap"
"golang.org/x/net/context"
)
@@ -25,11 +27,12 @@ func NewDefaultStorageConfig() (*StorageConfig, error) {
}
type Service struct {
cfg *StorageConfig
logger *zap.Logger
cfg *StorageConfig
}
func NewService(cfg *StorageConfig) *Service {
return &Service{cfg: cfg}
func NewService(logger *zap.Logger, cfg *StorageConfig) *Service {
return &Service{logger: logger, cfg: cfg}
}
func (s *Service) getStoragePath(ctx context.Context) string {
@@ -41,7 +44,21 @@ func (s *Service) InitializeStorage(ctx context.Context) error {
}
func (s *Service) CleanupStorage(ctx context.Context) error {
return os.RemoveAll(s.getStoragePath(ctx))
doneRemovingChan := make(chan struct{}, 1)
go func(ctx context.Context) {
s.logger.Debug("Removing all temp storage")
os.RemoveAll(s.getStoragePath(ctx))
doneRemovingChan <- struct{}{}
}(ctx)
select {
case <-ctx.Done():
return errors.New("could not cleanup storage aborting")
case <-doneRemovingChan:
return nil
}
return nil
}
func (s *Service) CreateArea(ctx context.Context) (*Area, error) {