简单生产消息和消费消息

wxvirus2023年1月7日
大约 2 分钟

简单生产消息和消费消息

生产者发送消息

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

func main() {
	dsn := fmt.Sprintf("amqp://%s:%s@%s:%d", "wxviurs", "123", "127.0.0.1", 5672)
	conn, err := amqp.Dial(dsn)
	if err != nil {
		log.Fatalln(err)
	}
	defer conn.Close()
	// 创建 channel
	c, err := conn.Channel()
	if err != nil {
		log.Fatalln(err)
	}
	defer c.Close()

	// 创建队列
	queue, err := c.QueueDeclare("test", false, false, false, false, nil)
	if err != nil {
		log.Fatalln(err)
	}

	// 使用channel发布消息
	err = c.Publish("", queue.Name, false, false,
		amqp.Publishing{
			ContentType: "text/plain",       // 消息类型
			Body:        []byte("test0001"), // 消息内容
		})
	if err != nil {
		log.Fatalln(err)
	}
	log.Println("发生消息成功")
}

image-20230107001807167

我这里发送了 2 次,这里的idle表示空闲状态,Total有 2 个消息,队列名为test

消费者读取消息

将连接 MQ 的代码进行封装一下

package AppInit

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

var MQCone *amqp.Connection

func init() {
	dsn := fmt.Sprintf("amqp://%s:%s@%s:%d", "wxviurs", "123", "127.0.0.1", 5672)
	conn, err := amqp.Dial(dsn)
	if err != nil {
		log.Fatalln(err)
	}
	MQCone = conn
	log.Println(MQCone.Major)
}

func GetConn() *amqp.Connection {
	return MQCone
}
package main

import (
	"fmt"
	"log"
	"rmq/AppInit"
)

func main() {
	conn := AppInit.GetConn()
	defer conn.Close()

	c, err := conn.Channel()
	if err != nil {
		log.Fatalln(err)
	}
	defer c.Close()
	// 消费者
	messages, err := c.Consume("test", "c1", false, false, false, false, nil)
	if err != nil {
		log.Fatalln(err)
	}
	for msg := range messages {
		fmt.Println(msg.DeliveryTag, // 唯一标识
			string(msg.Body),// 内容
		)
	}
}

➜  rmq go run client.go
2023/01/07 14:41:13 0
1 test0001
2 test0002

image-20230107144254233

此时 MQ 中是Unacked,这个是确认机制,我们获取消息后,我们需要告诉 MQ 消息收到了,否则下次运行,Ready又会变成 2 个,还能继续收到这个消息。

https://www.rabbitmq.com/tutorials/tutorial-two-go.htmlopen in new window

for msg := range messages {
    msg.Ack(false)
    fmt.Println(msg.DeliveryTag, // 唯一标识
                string(msg.Body),// 内容
               )
}

简单封装 MQ 发送消息

package Lib

import (
	"github.com/streadway/amqp"
	"log"
	"rmq/AppInit"
)

const (
	QueueNewUser = "newuser" //用户注册 对应的队列名称
)

type MQ struct {
	Channel *amqp.Channel
}

func NewMQ() *MQ {
	// 创建channel
	c, err := AppInit.GetConn().Channel()
	if err != nil {
		log.Println(err)
		return nil
	}
	return &MQ{Channel: c}
}

func (mq *MQ) SendMessage(queueName string, message string) error {
	// 声明队列
	_, err := mq.Channel.QueueDeclare(queueName, false, false, false, false, nil)
	if err != nil {
		return err
	}
	// exchange 为空 会使用默认的交换机
	return mq.Channel.Publish("", queueName, false, false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		},
	)
}

使用gin来模拟一个api来用作用户注册操作

package main

import (
	"github.com/gin-gonic/gin"
	"log"
	"rmq/Lib"
	"rmq/UserReg/Models"
	"strconv"
	"time"
)

func main() {
	router := gin.Default()
	router.Handle("POST", "/user", func(context *gin.Context) {
		userModel := Models.NewUserModel()
		err := context.BindJSON(&userModel)
		if err != nil {
			context.JSON(400, gin.H{"result": "param error"})
		} else {
			userModel.UserId = int(time.Now().Unix()) //假设就是入库过程
			if userModel.UserId > 0 {                 //假设入库成功
				mq := Lib.NewMQ()
				err := mq.SendMessage(Lib.QueueNewUser, strconv.Itoa(userModel.UserId))
				if err != nil {
					log.Println(err)
				}
			}
			context.JSON(200, gin.H{"result": userModel})
		}
	})
	router.Run(":8080")

}

image-20230107150914902

image-20230107150941704

对应也出现了这个newuser的队列名称。

Loading...