交换机

wxvirus2023年1月7日
大约 3 分钟

交换机

定义交换机:向 2 个队列同时发生消息

前面我们发送消息的时候,我们传的exchange都是空,所以它是直接使用的默认的AMQP default默认的交换机。

image-20230107162507874

Direct Exchange:直接模式交换机。交换机和一个队列绑定起来,并指定一个路由键,交换机会寻找匹配的路由键的绑定,并将消息路由给对应的队列。

我们把前面的发送消息的修改一下代码

func (mq *MQ) SendMessage(queueName string, message string) error {
	// 声明队列
	q1, err := mq.Channel.QueueDeclare(queueName, false, false, false, false, nil)
	if err != nil {
		return err
	}
	// 假设合作网站用的队列
	q2, err := mq.Channel.QueueDeclare(queueName+"union", false, false, false, false, nil)
	if err != nil {
		return err
	}
	// 声明交换机
	err = mq.Channel.ExchangeDeclare("UserExchange", "direct", false, false, false, false, nil)
	if err != nil {
		return err
	}
	err = mq.Channel.QueueBind(q1.Name, "userReg", "UserExchange", false, nil)
	if err != nil {
		return err
	}
    // userReg 是路由键
	err = mq.Channel.QueueBind(q2.Name, "userReg", "UserExchange", false, nil)
	if err != nil {
		return err
	}
	// exchange 为空 会使用默认的交换机
	return mq.Channel.Publish("UserExchange", "userReg", false, false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		},
	)
}

然后接着运行前面的gin接口代码

image-20230107163524334

image-20230107163535550

对应创建的交换机和队列都有了。而且 2 个队列发送的消息都是一样的。

image-20230107163827355

可以看到UserExchange和两个路由键绑定了。

代码优化

我们可以将写了很长一部分的声明队列和交换机的进行封装,封装之前先把上面的生成的队列和交换机都删掉。

package Lib

import (
	"github.com/streadway/amqp"
	"log"
	"rmq/AppInit"
	"strings"
)

const (
	QueueNewUser = "newuser" //用户注册 对应的队列名称
	QueueNewUserUnion = "new_user_union" // 合作单位用户注册 对应的队列名称
	ExchangeUser = "UserExchange" // 用户模块相关的交换机
	RouteKeyUserReg = "userReg" // 注册用户的路由key
)

type MQ struct {
	Channel *amqp.Channel
}

func NewMQ() *MQ {
	// 创建channel
	c, err := AppInit.GetConn().Channel()
	if err != nil {
		log.Println(err)
		return nil
	}
	return &MQ{Channel: c}
}

// DecQueueAndBind 声明队列以及病毒路由key 多个队列可以用逗号隔开
func (mq *MQ) DecQueueAndBind(queues string, key string, exchange string) error {
	qList := strings.Split(queues, ",")
	for _, queue := range qList {
		q, err := mq.Channel.QueueDeclare(queue, false, false, false, false, nil)
		if err != nil {
			return err
		}
		err = mq.Channel.QueueBind(q.Name, key, exchange, false, nil)
		if err != nil {
			return err
		}
	}
	return nil
}

func (mq *MQ) SendMessage(key string, exchange string, message string) error {

	// exchange 为空 会使用默认的交换机
	return mq.Channel.Publish(exchange, key, false, false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		},
	)
}

QueueInit.go

package Lib

import "fmt"

// UserInit 初始化用户相关的队列
func UserInit() error {
	mq := NewMQ()
	if mq == nil {
		return fmt.Errorf("mq init error")
	}
	defer mq.Channel.Close()

	// 声明交换机
	err := mq.Channel.ExchangeDeclare(ExchangeUser, "direct", false, false, false, false, nil)
	if err != nil {
		return fmt.Errorf("ExChange error: %s", err.Error())
	}
	qs := fmt.Sprintf("%s,%s", QueueNewUser, QueueNewUserUnion)
	err = mq.DecQueueAndBind(qs, RouteKeyUserReg, ExchangeUser)
	if err != nil {
		return fmt.Errorf("queue bind error: %s", err.Error())
	}
	return nil
}

改写gin服务接口

package main

import (
	"github.com/gin-gonic/gin"
	"log"
	"rmq/Lib"
	"rmq/UserReg/Models"
	"strconv"
	"time"
)

func main() {
	router := gin.Default()
	router.Handle("POST", "/user", func(context *gin.Context) {
		userModel := Models.NewUserModel()
		err := context.BindJSON(&userModel)
		if err != nil {
			context.JSON(400, gin.H{"result": "param error"})
		} else {
			userModel.UserId = int(time.Now().Unix()) //假设就是入库过程
			if userModel.UserId > 0 {                 //假设入库成功
				mq := Lib.NewMQ()
				err := mq.SendMessage(Lib.RouteKeyUserReg, Lib.ExchangeUser, strconv.Itoa(userModel.UserId))
				defer mq.Channel.Close()
				if err != nil {
					log.Println(err)
				}
			}
			context.JSON(200, gin.H{"result": userModel})
		}
	})
	c := make(chan error)
	go func() {
		err := router.Run(":8083")
		if err != nil {
			c <- err
		}
	}()

	go func() {
		// 执行mq 初始化用户队列
		err := Lib.UserInit()
		if err != nil {
			c <- err
		}
	}()

	err := <-c
	log.Fatalln(err)
}

我们程序启动就会产生上面一样的UserExchange交换机以及new_user_unionnewuser队列。现在队列还暂时没有消息,我们调用一下 API 接口,无明显报错,只要稍微等 1 秒就会出现消息。

Loading...