前言:SpringBoot整合mqtt,实现消息接收,发送。

maven

1
2
3
4
5
6
7
8
9
10
11
12
<!-- MQTT -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- Eclipse Paho MQTT 客户端 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<!-- MQTT -->

yml配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
mqtt:
open: true
#MQTT-服务器连接地址,如果有多个,用逗号隔开
host: tcp://服务器地址:1883
#MQTT-连接服务器默认客户端ID
clientId: mqtt_id
#MQTT-用户名
username: abcde
#MQTT-密码
password: 123456
#MQTT-默认的消息推送主题,实际可在调用接口时指定
topic: sensor/#
#连接超时
timeout: 1000
#设置会话心跳时间
keepalive: 100

mqtt配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package com.changruyi.app.config;

import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Configuration
public class MqttConfig {

@Value("${mqtt.host}")
private String host;

@Value("${mqtt.clientId}")
private String clientId;

@Value("${mqtt.username:}")
private String username;

@Value("${mqtt.password:}")
private String password;

@Value("${mqtt.topic}")
private String topic;

@Value("${mqtt.timeout}")
private int timeout;

@Value("${mqtt.keepalive}")
private int keepalive;

/**
* 配置 MQTT 连接工厂
*/
@Bean
public MqttPahoClientFactory mqttClientFactory(MqttCallback mqttCallback) {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setCallback(mqttCallback); // 注册回调
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{host});
options.setCleanSession(true);
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
options.setUserName(username);
options.setPassword(password.toCharArray());
}
options.setAutomaticReconnect(true); // 开启自动重连
factory.setConnectionOptions(options);
return factory;
}

/**
* 配置 MQTT 连接状态回调
*/
@Bean
public MqttCallback mqttCallback() {
return new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
log.error("MQTT 连接丢失: {}", cause.getMessage());
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("接收到来自主题 {} 的消息: {}", topic, new String(message.getPayload()));
}

@Override
public void deliveryComplete(org.eclipse.paho.client.mqttv3.IMqttDeliveryToken token) {
log.info("消息发送完成: {}", token.getMessage());
}
};
}

/**
* 配置消息订阅通道
*/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}

/**
* 配置 MQTT 入站适配器(接收消息)
*/
@Bean
public MqttPahoMessageDrivenChannelAdapter inboundAdapter(MqttPahoClientFactory mqttClientFactory) {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttClientFactory, topic);
adapter.setCompletionTimeout(5000);
adapter.setQos(1); // 设置 QoS 为 1,保证至少接收到一次消息
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}

/**
* 配置消息处理器
*/
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler mqttMessageHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) {
try {
String receivedTopic = message.getHeaders().get("mqtt_receivedTopic").toString();
String payload = message.getPayload().toString();
log.info("接收到消息,主题: {}, 内容: {}", receivedTopic, payload);
// 在这里可以对消息进行额外的处理
} catch (Exception e) {
log.error("处理接收消息时出错: {}", e.getMessage(), e);
}
}
};
}

/**
* 配置 MQTT 出站适配器(发送消息)
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutboundHandler(MqttPahoClientFactory mqttClientFactory) {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId + "_outbound", mqttClientFactory);
handler.setAsync(true); // 异步发送消息
handler.setDefaultTopic(topic); // 设置默认主题
return handler;
}

/**
* 配置消息发送通道
*/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}

mqtt工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.changruyi.app.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class MqttUtil {

private static final Logger logger = LoggerFactory.getLogger(MqttUtil.class);

@Autowired
private MessageChannel mqttOutboundChannel;

private static final String DEFAULT_TOPIC = "default"; // 默认主题

/**
* 发送消息到指定主题
*
* @param topic 主题
* @param payload 消息内容
*/
public void send(String topic, String payload) {
if (mqttOutboundChannel != null) {
boolean success = mqttOutboundChannel.send(
MessageBuilder.withPayload(payload)
.setHeader("mqtt_topic", topic)
.build()
);
if (success) {
logger.info("成功发送消息到主题: {},内容: {}", topic, payload);
} else {
logger.error("发送消息到主题: {} 失败,内容: {}", topic, payload);
}
} else {
logger.error("MQTT 输出通道为空,无法发送消息");
}
}

/**
* 发送消息到默认主题
*
* @param payload 消息内容
*/
public void send(String payload) {
send(DEFAULT_TOPIC, payload);
}
}

定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.changruyi.app.task;

import com.alibaba.fastjson.JSON;
import com.changruyi.app.utils.MqttUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class MqttScheduledTask {

@Autowired
private MqttUtil mqttUtil;

// 每隔 10 秒发送一次消息
@Scheduled(fixedRate = 10000)
public void sendPeriodicMessage() {
mqttUtil.send("sensor/device1/temperature", JSON.toJSONString("{\"temperature\": 22, \"humidity\": 60}"));
}
}