Compare commits

...

1 Commits

Author SHA1 Message Date
dd56145418
Add basic test queue
All checks were successful
continuous-integration/drone/push Build is passing
2022-02-23 22:39:04 +01:00
6 changed files with 253 additions and 2 deletions

View File

@ -52,6 +52,25 @@ services:
logging: *loki-logging logging: *loki-logging
depends_on: depends_on:
- db_migrator - db_migrator
- rabbitmq
# Messaging
rabbitmq:
image: docker.io/bitnami/rabbitmq:latest
ports:
- '4369:4369'
- '5551:5551'
- '5552:5552'
- '5672:5672'
- '25672:25672'
- '15672:15672'
networks:
- back-tier
environment:
- RABBITMQ_USERNAME=serverctl
- RABBITMQ_PASSWORD=serverctlsecret
volumes:
- 'rabbitmq_data:/bitnami/rabbitmq/mnesia'
# Logging # Logging
loki: loki:
@ -114,3 +133,4 @@ volumes:
db_data: {} db_data: {}
prometheus_data: {} prometheus_data: {}
grafana_data: {} grafana_data: {}
rabbitmq_data: {}

View File

@ -4,6 +4,7 @@ import (
"serverctl/pkg/api" "serverctl/pkg/api"
"serverctl/pkg/infrastructure" "serverctl/pkg/infrastructure"
"serverctl/pkg/infrastructure/dependencies" "serverctl/pkg/infrastructure/dependencies"
"serverctl/pkg/infrastructure/queue"
) )
// Run main app, will bootstrap dependencies and run all external ports (http servers, queues, etc). // Run main app, will bootstrap dependencies and run all external ports (http servers, queues, etc).
@ -11,6 +12,12 @@ func Run() {
d := dependencies.New() d := dependencies.New()
d.Logger.Info("Starting serverctl") d.Logger.Info("Starting serverctl")
queue.NewRabbitMQ(d.Logger, &queue.RabbitMqConfig{
Username: "serverctl",
Password: "serverctlsecret",
Host: "rabbitmq",
Port: 5672,
})
// if development add seed data // if development add seed data
infrastructure.AddSeedData(d.Database, d.Logger) infrastructure.AddSeedData(d.Database, d.Logger)

View File

@ -18,7 +18,6 @@ require (
github.com/docker/go-units v0.4.0 // indirect github.com/docker/go-units v0.4.0 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect github.com/dustin/go-humanize v1.0.0 // indirect
github.com/eko/gocache v1.2.0 // indirect github.com/eko/gocache v1.2.0 // indirect
github.com/georgysavva/scany v0.3.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect
github.com/gin-gonic/gin v1.7.7 // indirect github.com/gin-gonic/gin v1.7.7 // indirect
github.com/go-co-op/gocron v1.12.0 // indirect github.com/go-co-op/gocron v1.12.0 // indirect
@ -37,7 +36,6 @@ require (
github.com/jackc/pgproto3/v2 v2.2.0 // indirect github.com/jackc/pgproto3/v2 v2.2.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.10.0 // indirect github.com/jackc/pgtype v1.10.0 // indirect
github.com/jackc/pgx v3.6.2+incompatible // indirect
github.com/jackc/pgx/v4 v4.15.0 // indirect github.com/jackc/pgx/v4 v4.15.0 // indirect
github.com/jackc/puddle v1.2.1 // indirect github.com/jackc/puddle v1.2.1 // indirect
github.com/json-iterator/go v1.1.10 // indirect github.com/json-iterator/go v1.1.10 // indirect
@ -54,6 +52,7 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.18.0 // indirect github.com/prometheus/common v0.18.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect
github.com/rabbitmq/amqp091-go v1.3.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/cast v1.3.1 // indirect github.com/spf13/cast v1.3.1 // indirect

View File

@ -793,6 +793,8 @@ github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rabbitmq/amqp091-go v1.3.0 h1:A/QuHiNw7LMCJsxx9iZn5lrIz6OrhIn7Dfk5/1YatWM=
github.com/rabbitmq/amqp091-go v1.3.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
@ -846,6 +848,8 @@ github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/y
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8= github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

View File

@ -0,0 +1,4 @@
package queue
type Queue interface {
}

View File

@ -0,0 +1,217 @@
package queue
import (
"fmt"
"go.uber.org/zap"
"time"
)
import amqp "github.com/rabbitmq/amqp091-go"
type RabbitMQ struct {
logger *zap.Logger
config *RabbitMqConfig
conn *amqp.Connection
}
var _ Queue = RabbitMQ{}
type RabbitMqConfig struct {
Username string
Password string
Host string
Port int
}
func NewRabbitMQ(logger *zap.Logger, config *RabbitMqConfig) Queue {
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", config.Username, config.Password, config.Host, config.Port))
if err != nil {
logger.Panic("Could not dial rabbitmq", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port), zap.String("error", err.Error()))
}
//sendBasic(logger, config, conn)
//receiveBasic(logger, config, conn)
sendPublishingBasic(logger, config, conn)
for i := 0; i < 200; i++ {
time.Sleep(time.Millisecond * 200)
receivePublishingBasic(logger, config, conn)
}
//sendMany(logger, config, conn)
return &RabbitMQ{
logger: logger,
config: config,
conn: conn,
}
}
func sendBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
ch, err := conn.Channel()
if err != nil {
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
panic(err)
}
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
if err != nil {
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
panic(err)
}
body := "Hello world!"
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
if err != nil {
logger.Fatal("Could not send message", zap.String("queueName", "test"))
panic(err)
}
logger.Info("Sent message", zap.String("body", body))
go func() {
defer ch.Close()
counter := 0
for {
body := fmt.Sprintf("message nr: %d", counter)
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
if err != nil {
logger.Fatal("Could not send message", zap.String("queueName", "test"))
panic(err)
}
logger.Info("Sent message", zap.String("body", body))
counter += 1
time.Sleep(5 * time.Second)
}
}()
}
func receiveBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
ch, err := conn.Channel()
if err != nil {
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
panic(err)
}
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
if err != nil {
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
panic(err)
}
msgs, err := ch.Consume(q.Name,
"",
true,
false, false, false, nil)
if err != nil {
logger.Panic("failed to register consumer", zap.String("queueName", q.Name), zap.Error(err))
}
go func() {
defer ch.Close()
for d := range msgs {
logger.Info("Received msg", zap.String("body", string(d.Body)))
}
}()
}
func sendMany(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
ch, err := conn.Channel()
if err != nil {
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
panic(err)
}
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
if err != nil {
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
panic(err)
}
body := "Hello world!"
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
if err != nil {
logger.Fatal("Could not send message", zap.String("queueName", "test"))
panic(err)
}
logger.Info("Sent message", zap.String("body", body))
go func() {
defer ch.Close()
counter := 0
for {
body := fmt.Sprintf("message nr: %d", counter)
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
if err != nil {
logger.Fatal("Could not send message", zap.String("queueName", "test"))
panic(err)
}
logger.Info("Sent message", zap.String("body", body))
counter += 1
}
}()
}
func sendPublishingBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
ch, err := conn.Channel()
if err != nil {
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
panic(err)
}
q, err := ch.QueueDeclare("test", true, false, false, false, nil)
if err != nil {
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
panic(err)
}
go func() {
defer ch.Close()
counter := 0
for {
body := fmt.Sprintf("message nr: %d", counter)
err = ch.Publish("", q.Name, false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain", Body: []byte(body)})
if err != nil {
logger.Fatal("Could not send message", zap.String("queueName", "test"))
panic(err)
}
logger.Info("Sent message", zap.String("body", body))
counter += 1
time.Sleep(time.Millisecond * 200)
}
}()
}
func receivePublishingBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
ch, err := conn.Channel()
if err != nil {
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
panic(err)
}
ch.Qos(1, 0, false)
q, err := ch.QueueDeclare("test", true, false, false, false, nil)
if err != nil {
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
panic(err)
}
msgs, err := ch.Consume(q.Name,
"",
false,
false, false, false, nil)
if err != nil {
logger.Panic("failed to register consumer", zap.String("queueName", q.Name), zap.Error(err))
}
go func() {
defer ch.Close()
for d := range msgs {
logger.Info("Received msg", zap.String("body", string(d.Body)), zap.Int("secondsToSleep", len(d.Body)))
t := time.Duration(len(d.Body))
time.Sleep(t * time.Second)
logger.Info("Received msg: Done")
d.Ack(false)
}
}()
}