Compare commits
1 Commits
main
...
feature/me
Author | SHA1 | Date | |
---|---|---|---|
dd56145418 |
@ -52,6 +52,25 @@ services:
|
|||||||
logging: *loki-logging
|
logging: *loki-logging
|
||||||
depends_on:
|
depends_on:
|
||||||
- db_migrator
|
- db_migrator
|
||||||
|
- rabbitmq
|
||||||
|
|
||||||
|
# Messaging
|
||||||
|
rabbitmq:
|
||||||
|
image: docker.io/bitnami/rabbitmq:latest
|
||||||
|
ports:
|
||||||
|
- '4369:4369'
|
||||||
|
- '5551:5551'
|
||||||
|
- '5552:5552'
|
||||||
|
- '5672:5672'
|
||||||
|
- '25672:25672'
|
||||||
|
- '15672:15672'
|
||||||
|
networks:
|
||||||
|
- back-tier
|
||||||
|
environment:
|
||||||
|
- RABBITMQ_USERNAME=serverctl
|
||||||
|
- RABBITMQ_PASSWORD=serverctlsecret
|
||||||
|
volumes:
|
||||||
|
- 'rabbitmq_data:/bitnami/rabbitmq/mnesia'
|
||||||
|
|
||||||
# Logging
|
# Logging
|
||||||
loki:
|
loki:
|
||||||
@ -114,3 +133,4 @@ volumes:
|
|||||||
db_data: {}
|
db_data: {}
|
||||||
prometheus_data: {}
|
prometheus_data: {}
|
||||||
grafana_data: {}
|
grafana_data: {}
|
||||||
|
rabbitmq_data: {}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"serverctl/pkg/api"
|
"serverctl/pkg/api"
|
||||||
"serverctl/pkg/infrastructure"
|
"serverctl/pkg/infrastructure"
|
||||||
"serverctl/pkg/infrastructure/dependencies"
|
"serverctl/pkg/infrastructure/dependencies"
|
||||||
|
"serverctl/pkg/infrastructure/queue"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Run main app, will bootstrap dependencies and run all external ports (http servers, queues, etc).
|
// Run main app, will bootstrap dependencies and run all external ports (http servers, queues, etc).
|
||||||
@ -11,6 +12,12 @@ func Run() {
|
|||||||
d := dependencies.New()
|
d := dependencies.New()
|
||||||
d.Logger.Info("Starting serverctl")
|
d.Logger.Info("Starting serverctl")
|
||||||
|
|
||||||
|
queue.NewRabbitMQ(d.Logger, &queue.RabbitMqConfig{
|
||||||
|
Username: "serverctl",
|
||||||
|
Password: "serverctlsecret",
|
||||||
|
Host: "rabbitmq",
|
||||||
|
Port: 5672,
|
||||||
|
})
|
||||||
// if development add seed data
|
// if development add seed data
|
||||||
infrastructure.AddSeedData(d.Database, d.Logger)
|
infrastructure.AddSeedData(d.Database, d.Logger)
|
||||||
|
|
||||||
|
@ -18,7 +18,6 @@ require (
|
|||||||
github.com/docker/go-units v0.4.0 // indirect
|
github.com/docker/go-units v0.4.0 // indirect
|
||||||
github.com/dustin/go-humanize v1.0.0 // indirect
|
github.com/dustin/go-humanize v1.0.0 // indirect
|
||||||
github.com/eko/gocache v1.2.0 // indirect
|
github.com/eko/gocache v1.2.0 // indirect
|
||||||
github.com/georgysavva/scany v0.3.0 // indirect
|
|
||||||
github.com/gin-contrib/sse v0.1.0 // indirect
|
github.com/gin-contrib/sse v0.1.0 // indirect
|
||||||
github.com/gin-gonic/gin v1.7.7 // indirect
|
github.com/gin-gonic/gin v1.7.7 // indirect
|
||||||
github.com/go-co-op/gocron v1.12.0 // indirect
|
github.com/go-co-op/gocron v1.12.0 // indirect
|
||||||
@ -37,7 +36,6 @@ require (
|
|||||||
github.com/jackc/pgproto3/v2 v2.2.0 // indirect
|
github.com/jackc/pgproto3/v2 v2.2.0 // indirect
|
||||||
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
|
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
|
||||||
github.com/jackc/pgtype v1.10.0 // indirect
|
github.com/jackc/pgtype v1.10.0 // indirect
|
||||||
github.com/jackc/pgx v3.6.2+incompatible // indirect
|
|
||||||
github.com/jackc/pgx/v4 v4.15.0 // indirect
|
github.com/jackc/pgx/v4 v4.15.0 // indirect
|
||||||
github.com/jackc/puddle v1.2.1 // indirect
|
github.com/jackc/puddle v1.2.1 // indirect
|
||||||
github.com/json-iterator/go v1.1.10 // indirect
|
github.com/json-iterator/go v1.1.10 // indirect
|
||||||
@ -54,6 +52,7 @@ require (
|
|||||||
github.com/prometheus/client_model v0.2.0 // indirect
|
github.com/prometheus/client_model v0.2.0 // indirect
|
||||||
github.com/prometheus/common v0.18.0 // indirect
|
github.com/prometheus/common v0.18.0 // indirect
|
||||||
github.com/prometheus/procfs v0.6.0 // indirect
|
github.com/prometheus/procfs v0.6.0 // indirect
|
||||||
|
github.com/rabbitmq/amqp091-go v1.3.0 // indirect
|
||||||
github.com/robfig/cron/v3 v3.0.1 // indirect
|
github.com/robfig/cron/v3 v3.0.1 // indirect
|
||||||
github.com/sirupsen/logrus v1.8.1 // indirect
|
github.com/sirupsen/logrus v1.8.1 // indirect
|
||||||
github.com/spf13/cast v1.3.1 // indirect
|
github.com/spf13/cast v1.3.1 // indirect
|
||||||
|
@ -793,6 +793,8 @@ github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
|
|||||||
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
|
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
|
||||||
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
|
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
|
||||||
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
|
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
|
||||||
|
github.com/rabbitmq/amqp091-go v1.3.0 h1:A/QuHiNw7LMCJsxx9iZn5lrIz6OrhIn7Dfk5/1YatWM=
|
||||||
|
github.com/rabbitmq/amqp091-go v1.3.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
|
||||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||||
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
|
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
|
||||||
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
|
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
|
||||||
@ -846,6 +848,8 @@ github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/y
|
|||||||
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8=
|
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8=
|
||||||
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
||||||
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
||||||
|
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
|
||||||
|
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
||||||
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
|
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
|
||||||
github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
4
services/entry/pkg/infrastructure/queue/queue.go
Normal file
4
services/entry/pkg/infrastructure/queue/queue.go
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
package queue
|
||||||
|
|
||||||
|
type Queue interface {
|
||||||
|
}
|
217
services/entry/pkg/infrastructure/queue/rabbitmq.go
Normal file
217
services/entry/pkg/infrastructure/queue/rabbitmq.go
Normal file
@ -0,0 +1,217 @@
|
|||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
import amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
|
||||||
|
type RabbitMQ struct {
|
||||||
|
logger *zap.Logger
|
||||||
|
config *RabbitMqConfig
|
||||||
|
conn *amqp.Connection
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Queue = RabbitMQ{}
|
||||||
|
|
||||||
|
type RabbitMqConfig struct {
|
||||||
|
Username string
|
||||||
|
Password string
|
||||||
|
Host string
|
||||||
|
Port int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRabbitMQ(logger *zap.Logger, config *RabbitMqConfig) Queue {
|
||||||
|
|
||||||
|
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", config.Username, config.Password, config.Host, config.Port))
|
||||||
|
if err != nil {
|
||||||
|
logger.Panic("Could not dial rabbitmq", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port), zap.String("error", err.Error()))
|
||||||
|
}
|
||||||
|
|
||||||
|
//sendBasic(logger, config, conn)
|
||||||
|
//receiveBasic(logger, config, conn)
|
||||||
|
sendPublishingBasic(logger, config, conn)
|
||||||
|
for i := 0; i < 200; i++ {
|
||||||
|
time.Sleep(time.Millisecond * 200)
|
||||||
|
receivePublishingBasic(logger, config, conn)
|
||||||
|
}
|
||||||
|
//sendMany(logger, config, conn)
|
||||||
|
|
||||||
|
return &RabbitMQ{
|
||||||
|
logger: logger,
|
||||||
|
config: config,
|
||||||
|
conn: conn,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
body := "Hello world!"
|
||||||
|
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
logger.Info("Sent message", zap.String("body", body))
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer ch.Close()
|
||||||
|
counter := 0
|
||||||
|
for {
|
||||||
|
body := fmt.Sprintf("message nr: %d", counter)
|
||||||
|
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
logger.Info("Sent message", zap.String("body", body))
|
||||||
|
counter += 1
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
func receiveBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := ch.Consume(q.Name,
|
||||||
|
"",
|
||||||
|
true,
|
||||||
|
false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panic("failed to register consumer", zap.String("queueName", q.Name), zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer ch.Close()
|
||||||
|
|
||||||
|
for d := range msgs {
|
||||||
|
logger.Info("Received msg", zap.String("body", string(d.Body)))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
func sendMany(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
body := "Hello world!"
|
||||||
|
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
logger.Info("Sent message", zap.String("body", body))
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer ch.Close()
|
||||||
|
counter := 0
|
||||||
|
for {
|
||||||
|
body := fmt.Sprintf("message nr: %d", counter)
|
||||||
|
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
logger.Info("Sent message", zap.String("body", body))
|
||||||
|
counter += 1
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
func sendPublishingBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare("test", true, false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer ch.Close()
|
||||||
|
counter := 0
|
||||||
|
for {
|
||||||
|
body := fmt.Sprintf("message nr: %d", counter)
|
||||||
|
err = ch.Publish("", q.Name, false, false, amqp.Publishing{
|
||||||
|
DeliveryMode: amqp.Persistent,
|
||||||
|
ContentType: "text/plain", Body: []byte(body)})
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not send message", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
logger.Info("Sent message", zap.String("body", body))
|
||||||
|
counter += 1
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond * 200)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
func receivePublishingBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ch.Qos(1, 0, false)
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare("test", true, false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs, err := ch.Consume(q.Name,
|
||||||
|
"",
|
||||||
|
false,
|
||||||
|
false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panic("failed to register consumer", zap.String("queueName", q.Name), zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer ch.Close()
|
||||||
|
|
||||||
|
for d := range msgs {
|
||||||
|
logger.Info("Received msg", zap.String("body", string(d.Body)), zap.Int("secondsToSleep", len(d.Body)))
|
||||||
|
t := time.Duration(len(d.Body))
|
||||||
|
time.Sleep(t * time.Second)
|
||||||
|
logger.Info("Received msg: Done")
|
||||||
|
d.Ack(false)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user