java对接 mqtt

wxvirus2023年6月12日
大约 1 分钟

依赖

<dependency>
	<groupId>org.eclipse.paho</groupId>
	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
	<version>1.2.2</version>
</dependency>

配置

emq:
	mqttServerUrl: tcp://xxx.xxx.xxx:1883

config包下定义配置emq配置类

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties("emq")
@Data
public class EmqConfig {
	private String mqttServerUrl;
}

编写客户端连接类和发送消息

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.beans.factory.annotation.Autowired;
import org.eclipse.paho.client.mqttv3.MqttClient;

import java.util.UUID;

@Component
@Slf4j
public class EmqClient {
	@Autowired
	private EmqConfig emqConfig;

	private MqttClient mqttClient;

	/**
	 * 链接 emq
	 */
	public void connect() {
		// 服务器 url
		// clientid
		try {
			mqttClient = new MqttClient(emqConfig.getMqttServerUrl(), "monitor." + UUID.randomUUID());
			mqttClient.connect();
		} catch(MqttException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 发送消息
	 * @param topic 主题
	 * @param msg 消息
	 */
	public void publish(String topic, String msg) {
		MqttMessage mqttMessage = new MqttMessage(msg.getBytes());
		try {
			mqttClient.getTopic(topic).publish(mqttMessage);
		} catch (MqttException e) {
			e.printStackTrace();
			log.error('发送消息异常')
		}
	}
}

订阅消息

/**
 * 订阅消息
 */
public void subscribe(String topic) throws MqttException {
	mqttClient.subscribe(topic);
}

回调类,来处理接收数据的消息

新建一个类:EmqMsgProcess

@Component
@Slf4j
public class EmqMsgProcess implements MqttCallback {

	/**
	 * 链接丢失的时候会触发
	 */
	@Override
	public void connectionLost(Throwable throwable) {}

	/**
	 * 接收到消息的时候触发
	 */
	@Override
	public void messageArrived(String topic, MqttMessage msg) throws Exception {
		// 接收到消息
		String payload = new String(msg.getPayload());
		log.info("接收到消息: {}", payload);
	}

	// 传送完成的时候触发
	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {}
}

我们需要在连接之前,把回调函数注册进来


@Autowired
private EmqMsgProcess emqMsgProcess;
public void connect() {
		// 服务器 url
		// clientid
		try {
			mqttClient = new MqttClient(emqConfig.getMqttServerUrl(), "monitor." + UUID.randomUUID());
			mqttClient.setCallback(emqMsgProcess);
			mqttClient.connect();
		} catch(MqttException e) {
			e.printStackTrace();
		}
	}

编写监听类

新建一个包core,新建一个类Monitor

@Component
@Slf4j
public class Monitor {

	@Autowired
	private EmqClient client;

	// 保证程序启动后必然会调用
	@PostConstruct
	public void init() {
		log.info("初始化方法,订阅主题")
		// 调用连接和订阅方法
		client.connect()
		try {
			client.subscribe("测试 topic")
		} catch(MqttException e) {
			e.printStackTrace();
		}
	}
}

测试应用程序启动是否会触发,启动后就处于一个监听的状态


调整为共享订阅

client.subscribe("$queue/" + "测试 topic")
Loading...