springboot-整合mqtt
pom和config<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><gr
pom和config
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
MQTTConfig
package com.example.config;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Objects;
/**
* 首先mqtt 要连接的客户端 这里的客户端可以公用 客户端有连接参数
* 然后有连接通道,这里两个topic通道 给发送和接受用
* 发送和接受都有处理,俗话说就是消息拦截;
* 发送的有MessagingGateway这个注解表示对外暴露发送的方法
*/
@Configuration
@IntegrationComponentScan //@MessagingGateway 注解搜索指定的集成注解. 在我们的示例中,它将会扫描到使用@MessagingGateway注解的词条网关.
@Slf4j
@Getter
@Setter
public class MqttConfig {
public static final String OUTBOUND_CHANNEL = "mqttOutboundChannel";
public static final String INPUT_CHANNEL = "mqttInputChannel";
public static final String SUB_TOPICS = "PSimulation,Pressure,PSimulationPump,PSimulationPressure," +
"PSimulationValve,PSimulationFlow,FSimulation,FSimulationPump,FSimulationPressure," +
"FSimulationValve,FSimulationFlow,leak,blast";
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.server}")
private String hostUrl;
@Value("${mqtt.client.id}")
private String clientId;
@Value("${mqtt.topic}")
private String defaultTopic;
@PostConstruct // 执行顺序 Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
public void init() {
log.error("username:{} password:{} hostUrl:{} clientId :{} ===={} ",
this.username, this.password, this.hostUrl, this.clientId, this.defaultTopic);
}
/**
* 创建一个可连接客户端
*
* @return
*/
@Bean
public MqttPahoClientFactory clientFactory() {
// 可以理解为mqtt 的连接队列
final MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{hostUrl});
options.setUserName(username);
options.setPassword(password.toCharArray());
// 客户端
final DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(options);
return factory;
}
// 以下是发布消息
/**
* 返回一个连接通道
*
* @return
*/
@Bean(value = OUTBOUND_CHANNEL)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
//ServiceActivator注解表明当前方法用于处理MQTT消息,OUTBOUND_CHANNEL参数指定了用于接收消息信息的channel。
public MessageHandler mqttOutbound() {
// 创建工厂 客户端id 和 工厂
final MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, clientFactory());
// 业务质量
handler.setDefaultQos(1);
// 消息保留
handler.setDefaultRetained(false);
// 发送的默认主题
handler.setDefaultTopic(defaultTopic);
// 阻塞
handler.setAsync(false);
//当async和async-events都为true时,发出MqttMessageSentEvent事件,包含消息、主题以及由客户端库生成的消息id,客户端id和客户端实例(每次客户端连接增加)。当传送由客户端库确认,发出MqttMessageDeliveredEvent,包含消息号、客户端号和客户端实例,使传送与发送相关联。这些事件可以由任意ApplicationListener接收,或者通过事件inbound通道适配器。注意:在MqttMessageSentEvent之前可能会接收到MqttMessageDeliveredEvent。默认值为false。
handler.setAsyncEvents(false);
return handler;
}
@Service
@MessagingGateway(defaultRequestChannel = MqttConfig.OUTBOUND_CHANNEL)
//@MessagingGateway是一个用于提供消息网关代理整合的注解,参数defaultRequestChannel指定发送消息绑定的channel。 表示对外暴露发送的方法
public interface MqttGateway {
void sendToMqtt(String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
// 以下是订阅消息
/**
* MQTT消息接收处理
*/
//接收通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
//配置client,监听的topic
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
// 三个参数, 客户端id 工厂 监听主题列表
new MqttPahoMessageDrivenChannelAdapter(
clientId + "_inbound", clientFactory(), SUB_TOPICS.split(","));
// 发送超时 如果通道可能会阻塞,才会运用
adapter.setCompletionTimeout(3000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// 通道
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
//通过通道获取数据
@Bean
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public MessageHandler handler() {
return message -> {
String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
log.info("topic: {}", topic);
String[] topics = SUB_TOPICS.split(",");
for (String t : topics) {
if (t.equals(topic)) {
log.info("payload: {}", message.getPayload().toString());
}
}
};
}
}
bean
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
client-id="foo"
url="tcp://localhost:1883"
topics="bar,baz"
qos="1,2"
converter="myConverter"
client-factory="clientFactory"
send-timeout="123"
error-channel="errors"
recovery-interval="10000"
channel="out" />
1)客户端id
2)代理URL
3)适配器会接受到消息的一组以逗号分隔的主题
4)以逗号分隔的一组QoS值,可以是所有主题运用单一值,或者每一个主题一个值(列表必须同样长度)
5)MqttMessageConverter(可选项),默认DefaultPahoMessageConverter生成消息带字符串载荷(默认),携带头部包括:
mqtt_topic 接收消息主题
mqtt_duplicate 如果消息重复,值为true
mqtt_qos 业务质量
DefaultPahoMessageConverter可配置为返回载荷原始byte[]类型,通过将其声明为一个实体类,并且设定payloadAsBytes属性
6)客户端工厂
7)发送超时-如果通道可能会阻塞,才会运用(例如当前已满的边界QueueChannel)
8)错误通道–如果使用的话,ErrorMessage消息下行异常会发送至该通道,载荷为MessagingException,包含错误消息与原因
9)恢复间隔–控制在故障之后适配器会尝试重新连接的时间间隔,默认为10000ms(10s)
从4.1版本开始,编程方式改变适配器订阅的主题可以省略url,DefaultMqttPahoClientFactory属性serverURIs可以提供服务端URI,例如,这将使能连接至HA高可用簇。
从4.2.2版本开始,当适配器成功订阅至主题后,发布MqttSubscribedEvent,当连接/订阅失败时,发布MqttConnectionFailedEvent。这些事件可以由实现ApplicationListener接口的实体类获取。
新的属性recoveryInterval控制在故障之后适配器会尝试重新连接的时间间隔,默认为10000ms(10s)
在4.2.3版本之前,当适配器停止后,客户端总是会解除订阅。这是不正确的,因为如果客户端QoS大于0,我们需要保持订阅以便适配器停止时到达的消息在下一次开始时会传送。这也需要设置客户端工厂cleanSession属性为false,默认值为true。
从4.2.3版本开始,适配器不会解除订阅(默认),如果cleanSession值为false。可以重写该行为,通过设置工厂属性consumerCloseAction,可以有以下值:UNSUBSCRIBE_ALWAYS, UNSUBSCRIBE_NEVER以及UNSUBSCRIBE_CLEAN,后者(默认)会解除订阅仅当cleanSession属性值为true。
回退至4.2.3之前的行为,使用UNSUBSCRIBE_ALWAYS。
<int-mqtt:outbound-channel-adapter id="withConverter"
client-id="foo"
url="tcp://localhost:1883"
converter="myConverter"
client-factory="clientFactory"
default-qos="1"
default-retained="true"
default-topic="bar"
async="false"
async-events="false"
channel="target" />
1)客户端id
2) 代理URL
3)MqttMessageConverter(可选),默认DefaultPahoMessageConverter识别以下头部:
mqtt_topic 消息发送主题
mqtt_retained 如果消息保留的话,值为true
mqtt_qos 业务质量
4)客户端工厂
5)默认业务质量(用于未发现mqtt_qos头部的情况),如果自定义converter提供的话,不允许采用
6)保留标记符默认值(用于未发现mqtt_retained头部的情况),如果自定义converter提供的话,不允许采用
7)消息发送默认主题(用于未发现mqtt_topic头部的情况)
8)当为true,当发送消息时,调用者不会阻塞等待传送确认,默认值:false(发送阻塞直到传送确认)
9)当async和async-events都为true时,发出MqttMessageSentEvent事件,包含消息、主题以及由客户端库生成的消息id,客户端id和客户端实例(每次客户端连接增加)。当传送由客户端库确认,发出MqttMessageDeliveredEvent,包含消息号、客户端号和客户端实例,使传送与发送相关联。这些事件可以由任意ApplicationListener接收,或者通过事件inbound通道适配器。注意:在MqttMessageSentEvent之前可能会接收到MqttMessageDeliveredEvent。默认值为false。
从版本4.1开始,可以省略url,DefaultMqttPahoClientFactory属性serverURIs可以提供服务器URI。例如,这将使能连接至HA高可用簇。
Java
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐

所有评论(0)