交换机
2023年1月7日
交换机
定义交换机:向 2 个队列同时发生消息
前面我们发送消息的时候,我们传的exchange
都是空,所以它是直接使用的默认的AMQP default
默认的交换机。
Direct Exchange
:直接模式交换机。交换机和一个队列绑定起来,并指定一个路由键,交换机会寻找匹配的路由键的绑定,并将消息路由给对应的队列。
我们把前面的发送消息的修改一下代码
func (mq *MQ) SendMessage(queueName string, message string) error {
// 声明队列
q1, err := mq.Channel.QueueDeclare(queueName, false, false, false, false, nil)
if err != nil {
return err
}
// 假设合作网站用的队列
q2, err := mq.Channel.QueueDeclare(queueName+"union", false, false, false, false, nil)
if err != nil {
return err
}
// 声明交换机
err = mq.Channel.ExchangeDeclare("UserExchange", "direct", false, false, false, false, nil)
if err != nil {
return err
}
err = mq.Channel.QueueBind(q1.Name, "userReg", "UserExchange", false, nil)
if err != nil {
return err
}
// userReg 是路由键
err = mq.Channel.QueueBind(q2.Name, "userReg", "UserExchange", false, nil)
if err != nil {
return err
}
// exchange 为空 会使用默认的交换机
return mq.Channel.Publish("UserExchange", "userReg", false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
}
然后接着运行前面的gin
接口代码
对应创建的交换机和队列都有了。而且 2 个队列发送的消息都是一样的。
可以看到UserExchange
和两个路由键绑定了。
代码优化
我们可以将写了很长一部分的声明队列和交换机的进行封装,封装之前先把上面的生成的队列和交换机都删掉。
package Lib
import (
"github.com/streadway/amqp"
"log"
"rmq/AppInit"
"strings"
)
const (
QueueNewUser = "newuser" //用户注册 对应的队列名称
QueueNewUserUnion = "new_user_union" // 合作单位用户注册 对应的队列名称
ExchangeUser = "UserExchange" // 用户模块相关的交换机
RouteKeyUserReg = "userReg" // 注册用户的路由key
)
type MQ struct {
Channel *amqp.Channel
}
func NewMQ() *MQ {
// 创建channel
c, err := AppInit.GetConn().Channel()
if err != nil {
log.Println(err)
return nil
}
return &MQ{Channel: c}
}
// DecQueueAndBind 声明队列以及病毒路由key 多个队列可以用逗号隔开
func (mq *MQ) DecQueueAndBind(queues string, key string, exchange string) error {
qList := strings.Split(queues, ",")
for _, queue := range qList {
q, err := mq.Channel.QueueDeclare(queue, false, false, false, false, nil)
if err != nil {
return err
}
err = mq.Channel.QueueBind(q.Name, key, exchange, false, nil)
if err != nil {
return err
}
}
return nil
}
func (mq *MQ) SendMessage(key string, exchange string, message string) error {
// exchange 为空 会使用默认的交换机
return mq.Channel.Publish(exchange, key, false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
}
QueueInit.go
package Lib
import "fmt"
// UserInit 初始化用户相关的队列
func UserInit() error {
mq := NewMQ()
if mq == nil {
return fmt.Errorf("mq init error")
}
defer mq.Channel.Close()
// 声明交换机
err := mq.Channel.ExchangeDeclare(ExchangeUser, "direct", false, false, false, false, nil)
if err != nil {
return fmt.Errorf("ExChange error: %s", err.Error())
}
qs := fmt.Sprintf("%s,%s", QueueNewUser, QueueNewUserUnion)
err = mq.DecQueueAndBind(qs, RouteKeyUserReg, ExchangeUser)
if err != nil {
return fmt.Errorf("queue bind error: %s", err.Error())
}
return nil
}
改写gin
服务接口
package main
import (
"github.com/gin-gonic/gin"
"log"
"rmq/Lib"
"rmq/UserReg/Models"
"strconv"
"time"
)
func main() {
router := gin.Default()
router.Handle("POST", "/user", func(context *gin.Context) {
userModel := Models.NewUserModel()
err := context.BindJSON(&userModel)
if err != nil {
context.JSON(400, gin.H{"result": "param error"})
} else {
userModel.UserId = int(time.Now().Unix()) //假设就是入库过程
if userModel.UserId > 0 { //假设入库成功
mq := Lib.NewMQ()
err := mq.SendMessage(Lib.RouteKeyUserReg, Lib.ExchangeUser, strconv.Itoa(userModel.UserId))
defer mq.Channel.Close()
if err != nil {
log.Println(err)
}
}
context.JSON(200, gin.H{"result": userModel})
}
})
c := make(chan error)
go func() {
err := router.Run(":8083")
if err != nil {
c <- err
}
}()
go func() {
// 执行mq 初始化用户队列
err := Lib.UserInit()
if err != nil {
c <- err
}
}()
err := <-c
log.Fatalln(err)
}
我们程序启动就会产生上面一样的UserExchange
交换机以及new_user_union
和newuser
队列。现在队列还暂时没有消息,我们调用一下 API 接口,无明显报错,只要稍微等 1 秒就会出现消息。
Loading...