简单生产消息和消费消息
2023年1月7日
简单生产消息和消费消息
生产者发送消息
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func main() {
dsn := fmt.Sprintf("amqp://%s:%s@%s:%d", "wxviurs", "123", "127.0.0.1", 5672)
conn, err := amqp.Dial(dsn)
if err != nil {
log.Fatalln(err)
}
defer conn.Close()
// 创建 channel
c, err := conn.Channel()
if err != nil {
log.Fatalln(err)
}
defer c.Close()
// 创建队列
queue, err := c.QueueDeclare("test", false, false, false, false, nil)
if err != nil {
log.Fatalln(err)
}
// 使用channel发布消息
err = c.Publish("", queue.Name, false, false,
amqp.Publishing{
ContentType: "text/plain", // 消息类型
Body: []byte("test0001"), // 消息内容
})
if err != nil {
log.Fatalln(err)
}
log.Println("发生消息成功")
}
我这里发送了 2 次,这里的idle
表示空闲状态,Total
有 2 个消息,队列名为test
消费者读取消息
将连接 MQ 的代码进行封装一下
package AppInit
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
var MQCone *amqp.Connection
func init() {
dsn := fmt.Sprintf("amqp://%s:%s@%s:%d", "wxviurs", "123", "127.0.0.1", 5672)
conn, err := amqp.Dial(dsn)
if err != nil {
log.Fatalln(err)
}
MQCone = conn
log.Println(MQCone.Major)
}
func GetConn() *amqp.Connection {
return MQCone
}
package main
import (
"fmt"
"log"
"rmq/AppInit"
)
func main() {
conn := AppInit.GetConn()
defer conn.Close()
c, err := conn.Channel()
if err != nil {
log.Fatalln(err)
}
defer c.Close()
// 消费者
messages, err := c.Consume("test", "c1", false, false, false, false, nil)
if err != nil {
log.Fatalln(err)
}
for msg := range messages {
fmt.Println(msg.DeliveryTag, // 唯一标识
string(msg.Body),// 内容
)
}
}
➜ rmq go run client.go
2023/01/07 14:41:13 0
1 test0001
2 test0002
此时 MQ 中是Unacked
,这个是确认机制,我们获取消息后,我们需要告诉 MQ 消息收到了,否则下次运行,Ready
又会变成 2 个,还能继续收到这个消息。
https://www.rabbitmq.com/tutorials/tutorial-two-go.html
for msg := range messages {
msg.Ack(false)
fmt.Println(msg.DeliveryTag, // 唯一标识
string(msg.Body),// 内容
)
}
简单封装 MQ 发送消息
package Lib
import (
"github.com/streadway/amqp"
"log"
"rmq/AppInit"
)
const (
QueueNewUser = "newuser" //用户注册 对应的队列名称
)
type MQ struct {
Channel *amqp.Channel
}
func NewMQ() *MQ {
// 创建channel
c, err := AppInit.GetConn().Channel()
if err != nil {
log.Println(err)
return nil
}
return &MQ{Channel: c}
}
func (mq *MQ) SendMessage(queueName string, message string) error {
// 声明队列
_, err := mq.Channel.QueueDeclare(queueName, false, false, false, false, nil)
if err != nil {
return err
}
// exchange 为空 会使用默认的交换机
return mq.Channel.Publish("", queueName, false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
}
使用gin
来模拟一个api
来用作用户注册操作
package main
import (
"github.com/gin-gonic/gin"
"log"
"rmq/Lib"
"rmq/UserReg/Models"
"strconv"
"time"
)
func main() {
router := gin.Default()
router.Handle("POST", "/user", func(context *gin.Context) {
userModel := Models.NewUserModel()
err := context.BindJSON(&userModel)
if err != nil {
context.JSON(400, gin.H{"result": "param error"})
} else {
userModel.UserId = int(time.Now().Unix()) //假设就是入库过程
if userModel.UserId > 0 { //假设入库成功
mq := Lib.NewMQ()
err := mq.SendMessage(Lib.QueueNewUser, strconv.Itoa(userModel.UserId))
if err != nil {
log.Println(err)
}
}
context.JSON(200, gin.H{"result": userModel})
}
})
router.Run(":8080")
}
对应也出现了这个newuser
的队列名称。
Loading...