前言:SpringBoot整合mqtt,实现消息接收,发送。
maven
1 2 3 4 5 6 7 8 9 10 11 12
| <!-- MQTT --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <!-- Eclipse Paho MQTT 客户端 --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency> <!-- MQTT -->
|
yml配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| mqtt: open: true #MQTT-服务器连接地址,如果有多个,用逗号隔开 host: tcp://服务器地址:1883 #MQTT-连接服务器默认客户端ID clientId: mqtt_id #MQTT-用户名 username: abcde #MQTT-密码 password: 123456 #MQTT-默认的消息推送主题,实际可在调用接口时指定 topic: sensor/# #连接超时 timeout: 1000 #设置会话心跳时间 keepalive: 100
|
mqtt配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
| package com.changruyi.app.config;
import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import lombok.extern.slf4j.Slf4j;
@Slf4j @Configuration public class MqttConfig {
@Value("${mqtt.host}") private String host;
@Value("${mqtt.clientId}") private String clientId;
@Value("${mqtt.username:}") private String username;
@Value("${mqtt.password:}") private String password;
@Value("${mqtt.topic}") private String topic;
@Value("${mqtt.timeout}") private int timeout;
@Value("${mqtt.keepalive}") private int keepalive;
/** * 配置 MQTT 连接工厂 */ @Bean public MqttPahoClientFactory mqttClientFactory(MqttCallback mqttCallback) { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setCallback(mqttCallback); // 注册回调 MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[]{host}); options.setCleanSession(true); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { options.setUserName(username); options.setPassword(password.toCharArray()); } options.setAutomaticReconnect(true); // 开启自动重连 factory.setConnectionOptions(options); return factory; }
/** * 配置 MQTT 连接状态回调 */ @Bean public MqttCallback mqttCallback() { return new MqttCallback() { @Override public void connectionLost(Throwable cause) { log.error("MQTT 连接丢失: {}", cause.getMessage()); }
@Override public void messageArrived(String topic, MqttMessage message) throws Exception { log.info("接收到来自主题 {} 的消息: {}", topic, new String(message.getPayload())); }
@Override public void deliveryComplete(org.eclipse.paho.client.mqttv3.IMqttDeliveryToken token) { log.info("消息发送完成: {}", token.getMessage()); } }; }
/** * 配置消息订阅通道 */ @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); }
/** * 配置 MQTT 入站适配器(接收消息) */ @Bean public MqttPahoMessageDrivenChannelAdapter inboundAdapter(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttClientFactory, topic); adapter.setCompletionTimeout(5000); adapter.setQos(1); // 设置 QoS 为 1,保证至少接收到一次消息 adapter.setOutputChannel(mqttInputChannel()); return adapter; }
/** * 配置消息处理器 */ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler mqttMessageHandler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) { try { String receivedTopic = message.getHeaders().get("mqtt_receivedTopic").toString(); String payload = message.getPayload().toString(); log.info("接收到消息,主题: {}, 内容: {}", receivedTopic, payload); // 在这里可以对消息进行额外的处理 } catch (Exception e) { log.error("处理接收消息时出错: {}", e.getMessage(), e); } } }; }
/** * 配置 MQTT 出站适配器(发送消息) */ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutboundHandler(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId + "_outbound", mqttClientFactory); handler.setAsync(true); // 异步发送消息 handler.setDefaultTopic(topic); // 设置默认主题 return handler; }
/** * 配置消息发送通道 */ @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }
|
mqtt工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package com.changruyi.app.utils;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component;
@Component public class MqttUtil {
private static final Logger logger = LoggerFactory.getLogger(MqttUtil.class);
@Autowired private MessageChannel mqttOutboundChannel;
private static final String DEFAULT_TOPIC = "default"; // 默认主题
/** * 发送消息到指定主题 * * @param topic 主题 * @param payload 消息内容 */ public void send(String topic, String payload) { if (mqttOutboundChannel != null) { boolean success = mqttOutboundChannel.send( MessageBuilder.withPayload(payload) .setHeader("mqtt_topic", topic) .build() ); if (success) { logger.info("成功发送消息到主题: {},内容: {}", topic, payload); } else { logger.error("发送消息到主题: {} 失败,内容: {}", topic, payload); } } else { logger.error("MQTT 输出通道为空,无法发送消息"); } }
/** * 发送消息到默认主题 * * @param payload 消息内容 */ public void send(String payload) { send(DEFAULT_TOPIC, payload); } }
|
定时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.changruyi.app.task;
import com.alibaba.fastjson.JSON; import com.changruyi.app.utils.MqttUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;
@Component public class MqttScheduledTask {
@Autowired private MqttUtil mqttUtil;
// 每隔 10 秒发送一次消息 @Scheduled(fixedRate = 10000) public void sendPeriodicMessage() { mqttUtil.send("sensor/device1/temperature", JSON.toJSONString("{\"temperature\": 22, \"humidity\": 60}")); } }
|