package queue import ( "fmt" "go.uber.org/zap" "time" ) import amqp "github.com/rabbitmq/amqp091-go" type RabbitMQ struct { logger *zap.Logger config *RabbitMqConfig conn *amqp.Connection } var _ Queue = RabbitMQ{} type RabbitMqConfig struct { Username string Password string Host string Port int } func NewRabbitMQ(logger *zap.Logger, config *RabbitMqConfig) Queue { conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", config.Username, config.Password, config.Host, config.Port)) if err != nil { logger.Panic("Could not dial rabbitmq", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port), zap.String("error", err.Error())) } //sendBasic(logger, config, conn) //receiveBasic(logger, config, conn) sendPublishingBasic(logger, config, conn) for i := 0; i < 200; i++ { time.Sleep(time.Millisecond * 200) receivePublishingBasic(logger, config, conn) } //sendMany(logger, config, conn) return &RabbitMQ{ logger: logger, config: config, conn: conn, } } func sendBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) { ch, err := conn.Channel() if err != nil { logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port)) panic(err) } q, err := ch.QueueDeclare("test", false, false, false, false, nil) if err != nil { logger.Fatal("Could not open queue", zap.String("queueName", "test")) panic(err) } body := "Hello world!" err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)}) if err != nil { logger.Fatal("Could not send message", zap.String("queueName", "test")) panic(err) } logger.Info("Sent message", zap.String("body", body)) go func() { defer ch.Close() counter := 0 for { body := fmt.Sprintf("message nr: %d", counter) err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)}) if err != nil { logger.Fatal("Could not send message", zap.String("queueName", "test")) panic(err) } logger.Info("Sent message", zap.String("body", body)) counter += 1 time.Sleep(5 * time.Second) } }() } func receiveBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) { ch, err := conn.Channel() if err != nil { logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port)) panic(err) } q, err := ch.QueueDeclare("test", false, false, false, false, nil) if err != nil { logger.Fatal("Could not open queue", zap.String("queueName", "test")) panic(err) } msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil) if err != nil { logger.Panic("failed to register consumer", zap.String("queueName", q.Name), zap.Error(err)) } go func() { defer ch.Close() for d := range msgs { logger.Info("Received msg", zap.String("body", string(d.Body))) } }() } func sendMany(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) { ch, err := conn.Channel() if err != nil { logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port)) panic(err) } q, err := ch.QueueDeclare("test", false, false, false, false, nil) if err != nil { logger.Fatal("Could not open queue", zap.String("queueName", "test")) panic(err) } body := "Hello world!" err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)}) if err != nil { logger.Fatal("Could not send message", zap.String("queueName", "test")) panic(err) } logger.Info("Sent message", zap.String("body", body)) go func() { defer ch.Close() counter := 0 for { body := fmt.Sprintf("message nr: %d", counter) err = ch.Publish("", q.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(body)}) if err != nil { logger.Fatal("Could not send message", zap.String("queueName", "test")) panic(err) } logger.Info("Sent message", zap.String("body", body)) counter += 1 } }() } func sendPublishingBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) { ch, err := conn.Channel() if err != nil { logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port)) panic(err) } q, err := ch.QueueDeclare("test", true, false, false, false, nil) if err != nil { logger.Fatal("Could not open queue", zap.String("queueName", "test")) panic(err) } go func() { defer ch.Close() counter := 0 for { body := fmt.Sprintf("message nr: %d", counter) err = ch.Publish("", q.Name, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(body)}) if err != nil { logger.Fatal("Could not send message", zap.String("queueName", "test")) panic(err) } logger.Info("Sent message", zap.String("body", body)) counter += 1 time.Sleep(time.Millisecond * 200) } }() } func receivePublishingBasic(logger *zap.Logger, config *RabbitMqConfig, conn *amqp.Connection) { ch, err := conn.Channel() if err != nil { logger.Fatal("Could not open channel", zap.String("username", config.Username), zap.String("Host", config.Host), zap.Int("port", config.Port)) panic(err) } ch.Qos(1, 0, false) q, err := ch.QueueDeclare("test", true, false, false, false, nil) if err != nil { logger.Fatal("Could not open queue", zap.String("queueName", "test")) panic(err) } msgs, err := ch.Consume(q.Name, "", false, false, false, false, nil) if err != nil { logger.Panic("failed to register consumer", zap.String("queueName", q.Name), zap.Error(err)) } go func() { defer ch.Close() for d := range msgs { logger.Info("Received msg", zap.String("body", string(d.Body)), zap.Int("secondsToSleep", len(d.Body))) t := time.Duration(len(d.Body)) time.Sleep(t * time.Second) logger.Info("Received msg: Done") d.Ack(false) } }() }