From dd56145418c897d365ffd3e0b28f164e980098a0 Mon Sep 17 00:00:00 2001 From: kjuulh Date: Wed, 23 Feb 2022 22:39:04 +0100 Subject: [PATCH] Add basic test queue --- docker-compose.yml | 20 ++ services/entry/cmd/app/main.go | 7 + services/entry/go.mod | 3 +- services/entry/go.sum | 4 + .../entry/pkg/infrastructure/queue/queue.go | 4 + .../pkg/infrastructure/queue/rabbitmq.go | 217 ++++++++++++++++++ 6 files changed, 253 insertions(+), 2 deletions(-) create mode 100644 services/entry/pkg/infrastructure/queue/queue.go create mode 100644 services/entry/pkg/infrastructure/queue/rabbitmq.go diff --git a/docker-compose.yml b/docker-compose.yml index d7c2b78..81fb37d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -52,6 +52,25 @@ services: logging: *loki-logging depends_on: - db_migrator + - rabbitmq + +# Messaging + rabbitmq: + image: docker.io/bitnami/rabbitmq:latest + ports: + - '4369:4369' + - '5551:5551' + - '5552:5552' + - '5672:5672' + - '25672:25672' + - '15672:15672' + networks: + - back-tier + environment: + - RABBITMQ_USERNAME=serverctl + - RABBITMQ_PASSWORD=serverctlsecret + volumes: + - 'rabbitmq_data:/bitnami/rabbitmq/mnesia' # Logging loki: @@ -114,3 +133,4 @@ volumes: db_data: {} prometheus_data: {} grafana_data: {} + rabbitmq_data: {} diff --git a/services/entry/cmd/app/main.go b/services/entry/cmd/app/main.go index 41f0ba5..9643e05 100644 --- a/services/entry/cmd/app/main.go +++ b/services/entry/cmd/app/main.go @@ -4,6 +4,7 @@ import ( "serverctl/pkg/api" "serverctl/pkg/infrastructure" "serverctl/pkg/infrastructure/dependencies" + "serverctl/pkg/infrastructure/queue" ) // Run main app, will bootstrap dependencies and run all external ports (http servers, queues, etc). @@ -11,6 +12,12 @@ func Run() { d := dependencies.New() d.Logger.Info("Starting serverctl") + queue.NewRabbitMQ(d.Logger, &queue.RabbitMqConfig{ + Username: "serverctl", + Password: "serverctlsecret", + Host: "rabbitmq", + Port: 5672, + }) // if development add seed data infrastructure.AddSeedData(d.Database, d.Logger) diff --git a/services/entry/go.mod b/services/entry/go.mod index 4dd1610..3703f37 100644 --- a/services/entry/go.mod +++ b/services/entry/go.mod @@ -18,7 +18,6 @@ require ( github.com/docker/go-units v0.4.0 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/eko/gocache v1.2.0 // indirect - github.com/georgysavva/scany v0.3.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/gin-gonic/gin v1.7.7 // indirect github.com/go-co-op/gocron v1.12.0 // indirect @@ -37,7 +36,6 @@ require ( github.com/jackc/pgproto3/v2 v2.2.0 // indirect github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect github.com/jackc/pgtype v1.10.0 // indirect - github.com/jackc/pgx v3.6.2+incompatible // indirect github.com/jackc/pgx/v4 v4.15.0 // indirect github.com/jackc/puddle v1.2.1 // indirect github.com/json-iterator/go v1.1.10 // indirect @@ -54,6 +52,7 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.18.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect + github.com/rabbitmq/amqp091-go v1.3.0 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/sirupsen/logrus v1.8.1 // indirect github.com/spf13/cast v1.3.1 // indirect diff --git a/services/entry/go.sum b/services/entry/go.sum index e94e2cf..d41fd6c 100644 --- a/services/entry/go.sum +++ b/services/entry/go.sum @@ -793,6 +793,8 @@ github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/rabbitmq/amqp091-go v1.3.0 h1:A/QuHiNw7LMCJsxx9iZn5lrIz6OrhIn7Dfk5/1YatWM= +github.com/rabbitmq/amqp091-go v1.3.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= @@ -846,6 +848,8 @@ github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/y github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/services/entry/pkg/infrastructure/queue/queue.go b/services/entry/pkg/infrastructure/queue/queue.go new file mode 100644 index 0000000..df566c7 --- /dev/null +++ b/services/entry/pkg/infrastructure/queue/queue.go @@ -0,0 +1,4 @@ +package queue + +type Queue interface { +} diff --git a/services/entry/pkg/infrastructure/queue/rabbitmq.go b/services/entry/pkg/infrastructure/queue/rabbitmq.go new file mode 100644 index 0000000..1a996fc --- /dev/null +++ b/services/entry/pkg/infrastructure/queue/rabbitmq.go @@ -0,0 +1,217 @@ +package queue + +import ( + "fmt" + "go.uber.org/zap" + "time" +) +import amqp "github.com/rabbitmq/amqp091-go" + +type RabbitMQ struct { + logger *zap.Logger + config *RabbitMqConfig + conn *amqp.Connection +} + +var _ Queue = RabbitMQ{} + +type RabbitMqConfig struct { + Username string + Password string + Host string + Port int +} + +func NewRabbitMQ(logger *zap.Logger, config *RabbitMqConfig) Queue { + + conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", config.Username, config.Password, config.Host, config.Port)) + if err != nil { + logger.Panic("Could not dial rabbitmq", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port), zap.String("error", err.Error())) + } + + //sendBasic(logger, config, conn) + //receiveBasic(logger, config, conn) + sendPublishingBasic(logger, config, conn) + for i := 0; i < 200; i++ { + time.Sleep(time.Millisecond * 200) + receivePublishingBasic(logger, config, conn) + } + //sendMany(logger, config, conn) + + return &RabbitMQ{ + logger: logger, + config: config, + conn: conn, + } +} + +func sendBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) { + ch, err := conn.Channel() + if err != nil { + logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port)) + panic(err) + } + + q, err := ch.QueueDeclare("test", false, false, false, false, nil) + if err != nil { + logger.Fatal("Could not open queue", zap.String("queueName", "test")) + panic(err) + } + + body := "Hello world!" + err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)}) + if err != nil { + logger.Fatal("Could not send message", zap.String("queueName", "test")) + panic(err) + } + logger.Info("Sent message", zap.String("body", body)) + + go func() { + defer ch.Close() + counter := 0 + for { + body := fmt.Sprintf("message nr: %d", counter) + err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)}) + if err != nil { + logger.Fatal("Could not send message", zap.String("queueName", "test")) + panic(err) + } + logger.Info("Sent message", zap.String("body", body)) + counter += 1 + + time.Sleep(5 * time.Second) + } + }() +} +func receiveBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) { + ch, err := conn.Channel() + if err != nil { + logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port)) + panic(err) + } + + q, err := ch.QueueDeclare("test", false, false, false, false, nil) + if err != nil { + logger.Fatal("Could not open queue", zap.String("queueName", "test")) + panic(err) + } + + msgs, err := ch.Consume(q.Name, + "", + true, + false, false, false, nil) + if err != nil { + logger.Panic("failed to register consumer", zap.String("queueName", q.Name), zap.Error(err)) + } + + go func() { + defer ch.Close() + + for d := range msgs { + logger.Info("Received msg", zap.String("body", string(d.Body))) + } + }() +} +func sendMany(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) { + ch, err := conn.Channel() + if err != nil { + logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port)) + panic(err) + } + + q, err := ch.QueueDeclare("test", false, false, false, false, nil) + if err != nil { + logger.Fatal("Could not open queue", zap.String("queueName", "test")) + panic(err) + } + + body := "Hello world!" + err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)}) + if err != nil { + logger.Fatal("Could not send message", zap.String("queueName", "test")) + panic(err) + } + logger.Info("Sent message", zap.String("body", body)) + + go func() { + defer ch.Close() + counter := 0 + for { + body := fmt.Sprintf("message nr: %d", counter) + err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)}) + if err != nil { + logger.Fatal("Could not send message", zap.String("queueName", "test")) + panic(err) + } + logger.Info("Sent message", zap.String("body", body)) + counter += 1 + } + }() +} +func sendPublishingBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) { + ch, err := conn.Channel() + if err != nil { + logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port)) + panic(err) + } + + q, err := ch.QueueDeclare("test", true, false, false, false, nil) + if err != nil { + logger.Fatal("Could not open queue", zap.String("queueName", "test")) + panic(err) + } + + go func() { + defer ch.Close() + counter := 0 + for { + body := fmt.Sprintf("message nr: %d", counter) + err = ch.Publish("", q.Name, false, false, amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", Body: []byte(body)}) + if err != nil { + logger.Fatal("Could not send message", zap.String("queueName", "test")) + panic(err) + } + logger.Info("Sent message", zap.String("body", body)) + counter += 1 + + time.Sleep(time.Millisecond * 200) + } + }() +} +func receivePublishingBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) { + ch, err := conn.Channel() + if err != nil { + logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port)) + panic(err) + } + + ch.Qos(1, 0, false) + + q, err := ch.QueueDeclare("test", true, false, false, false, nil) + if err != nil { + logger.Fatal("Could not open queue", zap.String("queueName", "test")) + panic(err) + } + + msgs, err := ch.Consume(q.Name, + "", + false, + false, false, false, nil) + if err != nil { + logger.Panic("failed to register consumer", zap.String("queueName", q.Name), zap.Error(err)) + } + + go func() { + defer ch.Close() + + for d := range msgs { + logger.Info("Received msg", zap.String("body", string(d.Body)), zap.Int("secondsToSleep", len(d.Body))) + t := time.Duration(len(d.Body)) + time.Sleep(t * time.Second) + logger.Info("Received msg: Done") + d.Ack(false) + } + }() +}