Compare commits
3 Commits
903978b933
...
18b6ca3249
Author | SHA1 | Date | |
---|---|---|---|
|
18b6ca3249 | ||
e94271d5e2 | |||
a68eae8519 |
5
.sqlfluff
Normal file
5
.sqlfluff
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
[sqlfluff]
|
||||||
|
dialect = postgres ; or whatever SQL dialect you use
|
||||||
|
|
||||||
|
[sqlfluff:layout:type:comma]
|
||||||
|
line_position = leading
|
@ -6,9 +6,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
## [0.1.0] - 2025-01-13
|
## [0.1.0] - 2025-01-17
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
- enable worker process
|
||||||
|
- add migration
|
||||||
- add executor (#3)
|
- add executor (#3)
|
||||||
Adds an executor which can process and dispatch events to a set of workers.
|
Adds an executor which can process and dispatch events to a set of workers.
|
||||||
Co-authored-by: kjuulh <contact@kjuulh.io>
|
Co-authored-by: kjuulh <contact@kjuulh.io>
|
||||||
|
@ -1,9 +1,8 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"git.front.kjuulh.io/kjuulh/orbis/internal/app"
|
"git.front.kjuulh.io/kjuulh/orbis/internal/app"
|
||||||
|
"git.front.kjuulh.io/kjuulh/orbis/internal/processes"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -18,11 +17,11 @@ func newRoot(app *app.App) *cobra.Command {
|
|||||||
ctx := cmd.Context()
|
ctx := cmd.Context()
|
||||||
logger.Info("starting orbis")
|
logger.Info("starting orbis")
|
||||||
|
|
||||||
if err := app.Scheduler().Execute(ctx); err != nil {
|
return processes.
|
||||||
return fmt.Errorf("scheduler failed with error: %w", err)
|
NewApp(logger).
|
||||||
}
|
Add(app.Scheduler()).
|
||||||
|
Add(app.Worker()).
|
||||||
return nil
|
Execute(ctx)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
8
go.mod
8
go.mod
@ -3,17 +3,25 @@ module git.front.kjuulh.io/kjuulh/orbis
|
|||||||
go 1.23.4
|
go 1.23.4
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/golang-migrate/migrate/v4 v4.18.1
|
||||||
|
github.com/google/uuid v1.6.0
|
||||||
github.com/jackc/pgx/v5 v5.7.2
|
github.com/jackc/pgx/v5 v5.7.2
|
||||||
github.com/joho/godotenv v1.5.1
|
github.com/joho/godotenv v1.5.1
|
||||||
github.com/spf13/cobra v1.8.1
|
github.com/spf13/cobra v1.8.1
|
||||||
gitlab.com/greyxor/slogor v1.6.1
|
gitlab.com/greyxor/slogor v1.6.1
|
||||||
|
golang.org/x/sync v0.10.0
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||||
|
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||||
|
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa // indirect
|
||||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||||
|
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||||
github.com/spf13/pflag v1.0.5 // indirect
|
github.com/spf13/pflag v1.0.5 // indirect
|
||||||
|
go.uber.org/atomic v1.7.0 // indirect
|
||||||
golang.org/x/crypto v0.31.0 // indirect
|
golang.org/x/crypto v0.31.0 // indirect
|
||||||
golang.org/x/sys v0.29.0 // indirect
|
golang.org/x/sys v0.29.0 // indirect
|
||||||
golang.org/x/text v0.21.0 // indirect
|
golang.org/x/text v0.21.0 // indirect
|
||||||
|
65
go.sum
65
go.sum
@ -1,9 +1,42 @@
|
|||||||
|
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
|
||||||
|
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
|
||||||
|
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
|
||||||
|
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
|
||||||
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/dhui/dktest v0.4.3 h1:wquqUxAFdcUgabAVLvSCOKOlag5cIZuaOjYIBOWdsR0=
|
||||||
|
github.com/dhui/dktest v0.4.3/go.mod h1:zNK8IwktWzQRm6I/l2Wjp7MakiyaFWv4G1hjmodmMTs=
|
||||||
|
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
|
||||||
|
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
|
||||||
|
github.com/docker/docker v27.2.0+incompatible h1:Rk9nIVdfH3+Vz4cyI/uhbINhEZ/oLmc+CBXmH6fbNk4=
|
||||||
|
github.com/docker/docker v27.2.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
|
||||||
|
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
|
||||||
|
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
|
||||||
|
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
|
||||||
|
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
|
||||||
|
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
|
||||||
|
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
|
||||||
|
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
|
||||||
|
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||||
|
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||||
|
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||||
|
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||||
|
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||||
|
github.com/golang-migrate/migrate/v4 v4.18.1 h1:JML/k+t4tpHCpQTCAD62Nu43NUFzHY4CV3uAuvHGC+Y=
|
||||||
|
github.com/golang-migrate/migrate/v4 v4.18.1/go.mod h1:HAX6m3sQgcdO81tdjn5exv20+3Kb13cmGli1hrD6hks=
|
||||||
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||||
|
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
|
||||||
|
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||||
|
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
|
||||||
|
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
|
||||||
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
|
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
|
||||||
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
|
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
|
||||||
|
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa h1:s+4MhCQ6YrzisK6hFJUX53drDT4UsSW3DEhKn0ifuHw=
|
||||||
|
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
|
||||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||||
@ -14,6 +47,20 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo
|
|||||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||||
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
||||||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
||||||
|
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
|
||||||
|
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||||
|
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
|
||||||
|
github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
|
||||||
|
github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0=
|
||||||
|
github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y=
|
||||||
|
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
|
||||||
|
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
|
||||||
|
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
|
||||||
|
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
|
||||||
|
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=
|
||||||
|
github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM=
|
||||||
|
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||||
|
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||||
@ -24,18 +71,24 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
|
|||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
gitlab.com/greyxor/slogor v1.6.0 h1:K9QsAoa4leFQfO2RF2MkZ8BYkk2HQpYWykmd4G5R5+Y=
|
|
||||||
gitlab.com/greyxor/slogor v1.6.0/go.mod h1:6UWQsLLkeNL4o911soP9jvCMzXWgokLqzZP+eekAyyU=
|
|
||||||
gitlab.com/greyxor/slogor v1.6.1 h1:ZcvrFuxJMI2YzewC3lFgY+kdxTZj0buX+Q/NPEL4I+g=
|
gitlab.com/greyxor/slogor v1.6.1 h1:ZcvrFuxJMI2YzewC3lFgY+kdxTZj0buX+Q/NPEL4I+g=
|
||||||
gitlab.com/greyxor/slogor v1.6.1/go.mod h1:Nyx8tMQt+RuOmWOYhtXHVK+bd47DwZRpWd/7KZIll+4=
|
gitlab.com/greyxor/slogor v1.6.1/go.mod h1:Nyx8tMQt+RuOmWOYhtXHVK+bd47DwZRpWd/7KZIll+4=
|
||||||
|
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk=
|
||||||
|
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8=
|
||||||
|
go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=
|
||||||
|
go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8=
|
||||||
|
go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc=
|
||||||
|
go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8=
|
||||||
|
go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4=
|
||||||
|
go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ=
|
||||||
|
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
|
||||||
|
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||||
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
|
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
|
||||||
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
|
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
|
||||||
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
||||||
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||||
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
|
|
||||||
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
|
||||||
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
|
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
|
||||||
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||||
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
|
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
|
|
||||||
"git.front.kjuulh.io/kjuulh/orbis/internal/executor"
|
"git.front.kjuulh.io/kjuulh/orbis/internal/executor"
|
||||||
"git.front.kjuulh.io/kjuulh/orbis/internal/scheduler"
|
"git.front.kjuulh.io/kjuulh/orbis/internal/scheduler"
|
||||||
|
"git.front.kjuulh.io/kjuulh/orbis/internal/worker"
|
||||||
)
|
)
|
||||||
|
|
||||||
type App struct {
|
type App struct {
|
||||||
@ -28,3 +29,7 @@ func (a *App) Scheduler() *scheduler.Scheduler {
|
|||||||
func (a *App) Executor() *executor.Executor {
|
func (a *App) Executor() *executor.Executor {
|
||||||
return executor.NewExecutor(a.logger.With("component", "executor"))
|
return executor.NewExecutor(a.logger.With("component", "executor"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *App) Worker() *worker.Worker {
|
||||||
|
return worker.NewWorker(Postgres(), a.logger)
|
||||||
|
}
|
||||||
|
@ -1,23 +1,17 @@
|
|||||||
package app
|
package app
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
|
"git.front.kjuulh.io/kjuulh/orbis/internal/persistence"
|
||||||
"git.front.kjuulh.io/kjuulh/orbis/internal/utilities"
|
"git.front.kjuulh.io/kjuulh/orbis/internal/utilities"
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
)
|
)
|
||||||
|
|
||||||
var Postgres = utilities.Singleton(func() (*pgx.Conn, error) {
|
var Postgres = utilities.Singleton(func() (*pgxpool.Pool, error) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
if err := persistence.Migrate(); err != nil {
|
||||||
defer cancel()
|
return nil, fmt.Errorf("failed to migrate database: %w", err)
|
||||||
|
|
||||||
conn, err := pgx.Connect(ctx, os.Getenv("ORBIS_POSTGRES_DB"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to connect to orbis postgres database: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return conn, nil
|
return persistence.NewConnection()
|
||||||
})
|
})
|
||||||
|
@ -18,5 +18,10 @@ func NewExecutor(logger *slog.Logger) *Executor {
|
|||||||
func (e *Executor) DispatchEvents(ctx context.Context) error {
|
func (e *Executor) DispatchEvents(ctx context.Context) error {
|
||||||
e.logger.InfoContext(ctx, "dispatching events")
|
e.logger.InfoContext(ctx, "dispatching events")
|
||||||
|
|
||||||
|
// TODO: Process updates to models
|
||||||
|
// TODO: Insert new cron for runtime
|
||||||
|
// TODO: Calculate time since last run
|
||||||
|
// TODO: Send events for workers to pick up
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
22
internal/persistence/connection.go
Normal file
22
internal/persistence/connection.go
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
package persistence
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewConnection() (*pgxpool.Pool, error) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
conn, err := pgxpool.New(ctx, os.Getenv("ORBIS_POSTGRES_DB"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to connect to orbis postgres database: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return conn, nil
|
||||||
|
}
|
59
internal/persistence/migrations.go
Normal file
59
internal/persistence/migrations.go
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
package persistence
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"embed"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/golang-migrate/migrate/v4"
|
||||||
|
migratepgx "github.com/golang-migrate/migrate/v4/database/pgx/v5"
|
||||||
|
"github.com/golang-migrate/migrate/v4/source"
|
||||||
|
_ "github.com/golang-migrate/migrate/v4/source/file"
|
||||||
|
"github.com/golang-migrate/migrate/v4/source/iofs"
|
||||||
|
)
|
||||||
|
|
||||||
|
const migrationSource = "migrations"
|
||||||
|
|
||||||
|
//go:embed migrations/*.sql
|
||||||
|
var migrations embed.FS
|
||||||
|
|
||||||
|
func Migrate() error {
|
||||||
|
db, err := sql.Open("pgx", os.Getenv("ORBIS_POSTGRES_DB"))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to establish connection to database: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
driver, err := migratepgx.WithInstance(db, &migratepgx.Config{})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to install postgres driver for migrations: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
migrationSource, err := NewEmbedDriver(migrationSource, migrations)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to setup embedded driver for migrations: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
migration, err := migrate.NewWithInstance("iofs", migrationSource, "postgres", driver)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := migration.Up(); err != nil {
|
||||||
|
if !errors.Is(err, migrate.ErrNoChange) {
|
||||||
|
return fmt.Errorf("failed to migrate database: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEmbedDriver(path string, files embed.FS) (source.Driver, error) {
|
||||||
|
driver, err := iofs.New(files, migrationSource)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return driver, nil
|
||||||
|
}
|
@ -0,0 +1,4 @@
|
|||||||
|
CREATE TABLE worker_register (
|
||||||
|
worker_id UUID PRIMARY KEY NOT NULL
|
||||||
|
, heart_beat TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||||
|
);
|
163
internal/processes/processes.go
Normal file
163
internal/processes/processes.go
Normal file
@ -0,0 +1,163 @@
|
|||||||
|
package processes
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Process interface {
|
||||||
|
Start(ctx context.Context) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type SetupProcesser interface {
|
||||||
|
Setup(ctx context.Context) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type CloseProcesser interface {
|
||||||
|
Close(ctx context.Context) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type App struct {
|
||||||
|
logger *slog.Logger
|
||||||
|
processes []Process
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewApp(logger *slog.Logger) *App {
|
||||||
|
return &App{
|
||||||
|
logger: logger,
|
||||||
|
processes: make([]Process, 0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *App) Add(p Process) *App {
|
||||||
|
a.processes = append(a.processes, p)
|
||||||
|
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *App) Execute(ctx context.Context) error {
|
||||||
|
a.logger.InfoContext(ctx, "starting processor")
|
||||||
|
if err := a.setupProcesses(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
processes, err := a.startProcesses(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
processErr := processes.wait(ctx)
|
||||||
|
|
||||||
|
if err := a.closeProcesses(ctx, processes); err != nil {
|
||||||
|
if processErr != nil {
|
||||||
|
return processErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if processErr != nil {
|
||||||
|
return processErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *App) closeProcesses(ctx context.Context, processes *processStatus) error {
|
||||||
|
waitClose, cancel := context.WithTimeout(ctx, time.Second*10)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
closeErrs := make(chan error)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
errgrp, ctx := errgroup.WithContext(waitClose)
|
||||||
|
for _, closeProcessor := range a.processes {
|
||||||
|
if close, ok := closeProcessor.(CloseProcesser); ok {
|
||||||
|
errgrp.Go(func() error {
|
||||||
|
a.logger.InfoContext(ctx, "closing processor")
|
||||||
|
return close.Close(ctx)
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
closeErrs <- errgrp.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
for _, closeHandle := range processes.processHandles {
|
||||||
|
closeHandle()
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-waitClose.Done():
|
||||||
|
return nil
|
||||||
|
case <-closeErrs:
|
||||||
|
return nil
|
||||||
|
case _, closed := <-processes.errs:
|
||||||
|
if closed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type processStatus struct {
|
||||||
|
errs chan error
|
||||||
|
processHandles []context.CancelFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *processStatus) wait(_ context.Context) error {
|
||||||
|
return <-p.errs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *App) startProcesses(ctx context.Context) (*processStatus, any) {
|
||||||
|
status := &processStatus{
|
||||||
|
errs: make(chan error, len(a.processes)),
|
||||||
|
processHandles: make([]context.CancelFunc, 0),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, process := range a.processes {
|
||||||
|
processCtx, cancelFunc := context.WithCancel(ctx)
|
||||||
|
|
||||||
|
status.processHandles = append(status.processHandles, cancelFunc)
|
||||||
|
|
||||||
|
go func(ctx context.Context, process Process) {
|
||||||
|
a.logger.DebugContext(ctx, "starting process")
|
||||||
|
|
||||||
|
err := process.Start(ctx)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
a.logger.WarnContext(ctx, "process finished with error", "error", err)
|
||||||
|
|
||||||
|
} else {
|
||||||
|
a.logger.DebugContext(ctx, "process finished gracefully")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
status.errs <- err
|
||||||
|
}(processCtx, process)
|
||||||
|
}
|
||||||
|
|
||||||
|
return status, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *App) setupProcesses(ctx context.Context) error {
|
||||||
|
ctxWithDeadline, cancel := context.WithTimeout(ctx, time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
errgrp, ctx := errgroup.WithContext(ctxWithDeadline)
|
||||||
|
for _, setupProcessor := range a.processes {
|
||||||
|
if setup, ok := setupProcessor.(SetupProcesser); ok {
|
||||||
|
errgrp.Go(func() error {
|
||||||
|
a.logger.InfoContext(ctx, "setting up processor")
|
||||||
|
return setup.Setup(ctx)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return errgrp.Wait()
|
||||||
|
}
|
@ -10,15 +10,16 @@ import (
|
|||||||
|
|
||||||
"git.front.kjuulh.io/kjuulh/orbis/internal/executor"
|
"git.front.kjuulh.io/kjuulh/orbis/internal/executor"
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Scheduler struct {
|
type Scheduler struct {
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
db *pgx.Conn
|
db *pgxpool.Pool
|
||||||
executor *executor.Executor
|
executor *executor.Executor
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewScheduler(logger *slog.Logger, db *pgx.Conn, executor *executor.Executor) *Scheduler {
|
func NewScheduler(logger *slog.Logger, db *pgxpool.Pool, executor *executor.Executor) *Scheduler {
|
||||||
return &Scheduler{
|
return &Scheduler{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
db: db,
|
db: db,
|
||||||
@ -26,6 +27,14 @@ func NewScheduler(logger *slog.Logger, db *pgx.Conn, executor *executor.Executor
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) Start(ctx context.Context) error {
|
||||||
|
if err := s.Execute(ctx); err != nil {
|
||||||
|
return fmt.Errorf("execution of scheduler failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Scheduler) Execute(ctx context.Context) error {
|
func (s *Scheduler) Execute(ctx context.Context) error {
|
||||||
acquiredLeader, err := s.acquireLeader(ctx)
|
acquiredLeader, err := s.acquireLeader(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
15
internal/worker/queries.sql
Normal file
15
internal/worker/queries.sql
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
-- name: Ping :one
|
||||||
|
SELECT 1;
|
||||||
|
|
||||||
|
-- name: RegisterWorker :exec
|
||||||
|
INSERT INTO worker_register (worker_id)
|
||||||
|
VALUES (
|
||||||
|
$1
|
||||||
|
);
|
||||||
|
|
||||||
|
-- name: UpdateWorkerHeartbeat :exec
|
||||||
|
UPDATE worker_register
|
||||||
|
SET
|
||||||
|
heart_beat = now()
|
||||||
|
WHERE
|
||||||
|
worker_id = $1;
|
32
internal/worker/repositories/db.go
Normal file
32
internal/worker/repositories/db.go
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
// Code generated by sqlc. DO NOT EDIT.
|
||||||
|
// versions:
|
||||||
|
// sqlc v1.23.0
|
||||||
|
|
||||||
|
package repositories
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
"github.com/jackc/pgx/v5/pgconn"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DBTX interface {
|
||||||
|
Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
|
||||||
|
Query(context.Context, string, ...interface{}) (pgx.Rows, error)
|
||||||
|
QueryRow(context.Context, string, ...interface{}) pgx.Row
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(db DBTX) *Queries {
|
||||||
|
return &Queries{db: db}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Queries struct {
|
||||||
|
db DBTX
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *Queries) WithTx(tx pgx.Tx) *Queries {
|
||||||
|
return &Queries{
|
||||||
|
db: tx,
|
||||||
|
}
|
||||||
|
}
|
15
internal/worker/repositories/models.go
Normal file
15
internal/worker/repositories/models.go
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
// Code generated by sqlc. DO NOT EDIT.
|
||||||
|
// versions:
|
||||||
|
// sqlc v1.23.0
|
||||||
|
|
||||||
|
package repositories
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/jackc/pgx/v5/pgtype"
|
||||||
|
)
|
||||||
|
|
||||||
|
type WorkerRegister struct {
|
||||||
|
WorkerID uuid.UUID `json:"worker_id"`
|
||||||
|
HeartBeat pgtype.Timestamptz `json:"heart_beat"`
|
||||||
|
}
|
19
internal/worker/repositories/querier.go
Normal file
19
internal/worker/repositories/querier.go
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
// Code generated by sqlc. DO NOT EDIT.
|
||||||
|
// versions:
|
||||||
|
// sqlc v1.23.0
|
||||||
|
|
||||||
|
package repositories
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Querier interface {
|
||||||
|
Ping(ctx context.Context) (int32, error)
|
||||||
|
RegisterWorker(ctx context.Context, workerID uuid.UUID) error
|
||||||
|
UpdateWorkerHeartbeat(ctx context.Context, workerID uuid.UUID) error
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Querier = (*Queries)(nil)
|
48
internal/worker/repositories/queries.sql.go
Normal file
48
internal/worker/repositories/queries.sql.go
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
// Code generated by sqlc. DO NOT EDIT.
|
||||||
|
// versions:
|
||||||
|
// sqlc v1.23.0
|
||||||
|
// source: queries.sql
|
||||||
|
|
||||||
|
package repositories
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
const ping = `-- name: Ping :one
|
||||||
|
SELECT 1
|
||||||
|
`
|
||||||
|
|
||||||
|
func (q *Queries) Ping(ctx context.Context) (int32, error) {
|
||||||
|
row := q.db.QueryRow(ctx, ping)
|
||||||
|
var column_1 int32
|
||||||
|
err := row.Scan(&column_1)
|
||||||
|
return column_1, err
|
||||||
|
}
|
||||||
|
|
||||||
|
const registerWorker = `-- name: RegisterWorker :exec
|
||||||
|
INSERT INTO worker_register (worker_id)
|
||||||
|
VALUES (
|
||||||
|
$1
|
||||||
|
)
|
||||||
|
`
|
||||||
|
|
||||||
|
func (q *Queries) RegisterWorker(ctx context.Context, workerID uuid.UUID) error {
|
||||||
|
_, err := q.db.Exec(ctx, registerWorker, workerID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
const updateWorkerHeartbeat = `-- name: UpdateWorkerHeartbeat :exec
|
||||||
|
UPDATE worker_register
|
||||||
|
SET
|
||||||
|
heart_beat = now()
|
||||||
|
WHERE
|
||||||
|
worker_id = $1
|
||||||
|
`
|
||||||
|
|
||||||
|
func (q *Queries) UpdateWorkerHeartbeat(ctx context.Context, workerID uuid.UUID) error {
|
||||||
|
_, err := q.db.Exec(ctx, updateWorkerHeartbeat, workerID)
|
||||||
|
return err
|
||||||
|
}
|
21
internal/worker/sqlc.yaml
Normal file
21
internal/worker/sqlc.yaml
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
version: "2"
|
||||||
|
sql:
|
||||||
|
- queries: queries.sql
|
||||||
|
schema: ../persistence/migrations/
|
||||||
|
engine: "postgresql"
|
||||||
|
gen:
|
||||||
|
go:
|
||||||
|
out: "repositories"
|
||||||
|
package: "repositories"
|
||||||
|
sql_package: "pgx/v5"
|
||||||
|
emit_json_tags: true
|
||||||
|
emit_prepared_queries: true
|
||||||
|
emit_interface: true
|
||||||
|
emit_empty_slices: true
|
||||||
|
emit_result_struct_pointers: true
|
||||||
|
emit_params_struct_pointers: true
|
||||||
|
overrides:
|
||||||
|
- db_type: "uuid"
|
||||||
|
go_type:
|
||||||
|
import: "github.com/google/uuid"
|
||||||
|
type: "UUID"
|
91
internal/worker/worker.go
Normal file
91
internal/worker/worker.go
Normal file
@ -0,0 +1,91 @@
|
|||||||
|
package worker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.front.kjuulh.io/kjuulh/orbis/internal/worker/repositories"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
//go:generate sqlc generate
|
||||||
|
|
||||||
|
type Worker struct {
|
||||||
|
workerID uuid.UUID
|
||||||
|
|
||||||
|
db *pgxpool.Pool
|
||||||
|
logger *slog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWorker(
|
||||||
|
db *pgxpool.Pool,
|
||||||
|
logger *slog.Logger,
|
||||||
|
) *Worker {
|
||||||
|
return &Worker{
|
||||||
|
workerID: uuid.New(),
|
||||||
|
db: db,
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Worker) Setup(ctx context.Context) error {
|
||||||
|
repo := repositories.New(w.db)
|
||||||
|
|
||||||
|
if err := repo.RegisterWorker(ctx, w.workerID); err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Worker) Start(ctx context.Context) error {
|
||||||
|
heartBeatCtx, heartBeatCancel := context.WithCancel(context.Background())
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(time.Second * 5)
|
||||||
|
errorCount := 0
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-heartBeatCtx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
if err := w.updateHeartBeat(heartBeatCtx); err != nil {
|
||||||
|
if errorCount >= 5 {
|
||||||
|
panic(fmt.Errorf("worker failed to register heartbeat for a long time, panicing..., err: %w", err))
|
||||||
|
}
|
||||||
|
errorCount += 1
|
||||||
|
} else {
|
||||||
|
errorCount = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
heartBeatCancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
if err := w.processWorkQueue(ctx); err != nil {
|
||||||
|
// FIXME: dead letter item, right now we just log and continue
|
||||||
|
|
||||||
|
w.logger.WarnContext(ctx, "failed to handle work item", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Worker) updateHeartBeat(ctx context.Context) error {
|
||||||
|
repo := repositories.New(w.db)
|
||||||
|
|
||||||
|
w.logger.DebugContext(ctx, "updating heartbeat", "time", time.Now())
|
||||||
|
return repo.UpdateWorkerHeartbeat(ctx, w.workerID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Worker) processWorkQueue(_ context.Context) error {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user