serverctl/services/entry/pkg/infrastructure/queue/rabbitmq.go

218 lines
6.0 KiB
Go
Raw Permalink Normal View History

2022-02-23 22:39:04 +01:00
package queue
import (
"fmt"
"go.uber.org/zap"
"time"
)
import amqp "github.com/rabbitmq/amqp091-go"
type RabbitMQ struct {
logger *zap.Logger
config *RabbitMqConfig
conn *amqp.Connection
}
var _ Queue = RabbitMQ{}
type RabbitMqConfig struct {
Username string
Password string
Host string
Port int
}
func NewRabbitMQ(logger *zap.Logger, config *RabbitMqConfig) Queue {
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", config.Username, config.Password, config.Host, config.Port))
if err != nil {
logger.Panic("Could not dial rabbitmq", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port), zap.String("error", err.Error()))
}
//sendBasic(logger, config, conn)
//receiveBasic(logger, config, conn)
sendPublishingBasic(logger, config, conn)
for i := 0; i < 200; i++ {
time.Sleep(time.Millisecond * 200)
receivePublishingBasic(logger, config, conn)
}
//sendMany(logger, config, conn)
return &RabbitMQ{
logger: logger,
config: config,
conn: conn,
}
}
func sendBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
ch, err := conn.Channel()
if err != nil {
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
panic(err)
}
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
if err != nil {
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
panic(err)
}
body := "Hello world!"
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
if err != nil {
logger.Fatal("Could not send message", zap.String("queueName", "test"))
panic(err)
}
logger.Info("Sent message", zap.String("body", body))
go func() {
defer ch.Close()
counter := 0
for {
body := fmt.Sprintf("message nr: %d", counter)
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
if err != nil {
logger.Fatal("Could not send message", zap.String("queueName", "test"))
panic(err)
}
logger.Info("Sent message", zap.String("body", body))
counter += 1
time.Sleep(5 * time.Second)
}
}()
}
func receiveBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
ch, err := conn.Channel()
if err != nil {
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
panic(err)
}
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
if err != nil {
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
panic(err)
}
msgs, err := ch.Consume(q.Name,
"",
true,
false, false, false, nil)
if err != nil {
logger.Panic("failed to register consumer", zap.String("queueName", q.Name), zap.Error(err))
}
go func() {
defer ch.Close()
for d := range msgs {
logger.Info("Received msg", zap.String("body", string(d.Body)))
}
}()
}
func sendMany(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
ch, err := conn.Channel()
if err != nil {
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
panic(err)
}
q, err := ch.QueueDeclare("test", false, false, false, false, nil)
if err != nil {
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
panic(err)
}
body := "Hello world!"
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
if err != nil {
logger.Fatal("Could not send message", zap.String("queueName", "test"))
panic(err)
}
logger.Info("Sent message", zap.String("body", body))
go func() {
defer ch.Close()
counter := 0
for {
body := fmt.Sprintf("message nr: %d", counter)
err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)})
if err != nil {
logger.Fatal("Could not send message", zap.String("queueName", "test"))
panic(err)
}
logger.Info("Sent message", zap.String("body", body))
counter += 1
}
}()
}
func sendPublishingBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
ch, err := conn.Channel()
if err != nil {
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
panic(err)
}
q, err := ch.QueueDeclare("test", true, false, false, false, nil)
if err != nil {
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
panic(err)
}
go func() {
defer ch.Close()
counter := 0
for {
body := fmt.Sprintf("message nr: %d", counter)
err = ch.Publish("", q.Name, false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain", Body: []byte(body)})
if err != nil {
logger.Fatal("Could not send message", zap.String("queueName", "test"))
panic(err)
}
logger.Info("Sent message", zap.String("body", body))
counter += 1
time.Sleep(time.Millisecond * 200)
}
}()
}
func receivePublishingBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) {
ch, err := conn.Channel()
if err != nil {
logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port))
panic(err)
}
ch.Qos(1, 0, false)
q, err := ch.QueueDeclare("test", true, false, false, false, nil)
if err != nil {
logger.Fatal("Could not open queue", zap.String("queueName", "test"))
panic(err)
}
msgs, err := ch.Consume(q.Name,
"",
false,
false, false, false, nil)
if err != nil {
logger.Panic("failed to register consumer", zap.String("queueName", q.Name), zap.Error(err))
}
go func() {
defer ch.Close()
for d := range msgs {
logger.Info("Received msg", zap.String("body", string(d.Body)), zap.Int("secondsToSleep", len(d.Body)))
t := time.Duration(len(d.Body))
time.Sleep(t * time.Second)
logger.Info("Received msg: Done")
d.Ack(false)
}
}()
}