前言:SpringBoot整合mqtt,实现消息接收,发送。
maven
1 2 3 4 5 6 7 8 9 10 11 12
| <!-- MQTT --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <!-- Eclipse Paho MQTT 客户端 --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency> <!-- MQTT -->
|
yml配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| mqtt: open: true host: tcp://服务器地址:1883 clientId: mqtt_id username: abcde password: 123456 topic: sensor/# timeout: 1000 keepalive: 100
|
mqtt配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
| package com.changruyi.app.config;
import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import lombok.extern.slf4j.Slf4j;
@Slf4j @Configuration public class MqttConfig {
@Value("${mqtt.host}") private String host;
@Value("${mqtt.clientId}") private String clientId;
@Value("${mqtt.username:}") private String username;
@Value("${mqtt.password:}") private String password;
@Value("${mqtt.topic}") private String topic;
@Value("${mqtt.timeout}") private int timeout;
@Value("${mqtt.keepalive}") private int keepalive;
@Bean public MqttPahoClientFactory mqttClientFactory(MqttCallback mqttCallback) { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setCallback(mqttCallback); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[]{host}); options.setCleanSession(true); options.setConnectionTimeout(timeout); options.setKeepAliveInterval(keepalive); if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) { options.setUserName(username); options.setPassword(password.toCharArray()); } options.setAutomaticReconnect(true); factory.setConnectionOptions(options); return factory; }
@Bean public MqttCallback mqttCallback() { return new MqttCallback() { @Override public void connectionLost(Throwable cause) { log.error("MQTT 连接丢失: {}", cause.getMessage()); }
@Override public void messageArrived(String topic, MqttMessage message) throws Exception { log.info("接收到来自主题 {} 的消息: {}", topic, new String(message.getPayload())); }
@Override public void deliveryComplete(org.eclipse.paho.client.mqttv3.IMqttDeliveryToken token) { log.info("消息发送完成: {}", token.getMessage()); } }; }
@Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); }
@Bean public MqttPahoMessageDrivenChannelAdapter inboundAdapter(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttClientFactory, topic); adapter.setCompletionTimeout(5000); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; }
@Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler mqttMessageHandler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) { try { String receivedTopic = message.getHeaders().get("mqtt_receivedTopic").toString(); String payload = message.getPayload().toString(); log.info("接收到消息,主题: {}, 内容: {}", receivedTopic, payload); } catch (Exception e) { log.error("处理接收消息时出错: {}", e.getMessage(), e); } } }; }
@Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutboundHandler(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId + "_outbound", mqttClientFactory); handler.setAsync(true); handler.setDefaultTopic(topic); return handler; }
@Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }
|
mqtt工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package com.changruyi.app.utils;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component;
@Component public class MqttUtil {
private static final Logger logger = LoggerFactory.getLogger(MqttUtil.class);
@Autowired private MessageChannel mqttOutboundChannel;
private static final String DEFAULT_TOPIC = "default";
public void send(String topic, String payload) { if (mqttOutboundChannel != null) { boolean success = mqttOutboundChannel.send( MessageBuilder.withPayload(payload) .setHeader("mqtt_topic", topic) .build() ); if (success) { logger.info("成功发送消息到主题: {},内容: {}", topic, payload); } else { logger.error("发送消息到主题: {} 失败,内容: {}", topic, payload); } } else { logger.error("MQTT 输出通道为空,无法发送消息"); } }
public void send(String payload) { send(DEFAULT_TOPIC, payload); } }
|
定时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.changruyi.app.task;
import com.alibaba.fastjson.JSON; import com.changruyi.app.utils.MqttUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;
@Component public class MqttScheduledTask {
@Autowired private MqttUtil mqttUtil;
@Scheduled(fixedRate = 10000) public void sendPeriodicMessage() { mqttUtil.send("sensor/device1/temperature", JSON.toJSONString("{\"temperature\": 22, \"humidity\": 60}")); } }
|