KCloud-Platform-IoT KCloud-Platform-IoT
首页
  • 开发手册

    • 组件【ai】
    • 组件【algorithm】
    • 组件【banner】
    • 组件【core】
    • 组件【bom】
    • 组件【cors】
    • 组件【domain】
    • 组件【crypto】
    • 组件【data-cache】
    • 组件【elasticsearch】
    • 组件【excel】
    • 组件【i18n】
    • 组件【idempotent】
    • 组件【influxdb】
    • 组件【kafka】
    • 组件【log】
    • 组件【lock】
    • 组件【mail】
    • 组件【log4j2】
    • 组件【mqtt】
    • 组件【mybatis-plus】
    • 组件【nacos】
    • 组件【netty】
    • 组件【openapi-doc】
    • 组件【oss】
    • 组件【prometheus】
    • 组件【rate-limiter】
    • 组件【reactor】
    • 组件【redis】
    • 组件【secret】
    • 组件【security】
    • 组件【sensitive】
    • 组件【sms】
    • 组件【snail-job】
    • 组件【storage】
    • 组件【tdengine】
    • 组件【tenant】
    • 组件【test】
    • 组件【trace】
    • 组件【xss】
  • 环境搭建

    • Centos7安装Mysql 8.0.33
    • Centos7安装Redis 7.0.11
    • Centos7安装RocketMQ 5.1.1
    • Centos7安装Jdk 17.0.7
    • Centos7安装Docker 23.0.6
    • Centos7安装Elasticsearch 8.6.2
    • Docker安装RabbitMQ 3.12.2
    • Docker安装Postgresql 16.1
    • Ubuntu20.04安装Docker
    • Ubuntu20.04忘记密码或指纹错误
  • 常用命令

    • Centos7常用命令
    • Centos7常用命令
  • 快速上手

    • 项目启动【dev环境】
    • 项目启动【test环境】
    • 项目启动【prod环境】
    • 更新日志
  • 前端指南

    • 前端启动
  • 后端指南

    • COLA代码规范
    • SSL证书
    • 一键修改项目模块
    • 一键生成项目骨架
    • 一键修改项目版本号
    • 一键跳过测试用例
    • 一键生成后端COLA代码
    • 分布式链路跟踪之ELK日志
    • 一键检查代码规范
    • 动态路由
  • 项目部署

    • 项目部署之镜像打包与推送
  • 其他

    • Java如何快速转Go
    • Go快速开发API
    • Vue快速开发Api
    • React快速开发Api
  • 文章

    • 物联网之对接MQTT最佳实践
    • 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】
    • vue3+lime-echart各种图表使用【懒人专用,建议收藏】
    • 物联网之使用Vertx实现TCP最佳实践【响应式】
    • 物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】
    • 物联网之使用Vertx实现UDP最佳实践【响应式】
  • 文章

    • IntelliJ IDEA插件推荐
  • 儒学

    • 儒学摘抄(一)
  • 禅语

    • 禅语摘抄(一)
  • 诗词

    • 诗词摘抄(一)
  • 道法

    • 道法摘抄(一)
  • 养生

    • 养生摘抄(一)
  • 读后感

    • 读《强者,都是含泪奔跑的人》读后感
  • 修行

    • 修身/养生/情感
  • 觉悟

    • 觉悟日记(一)
赞助
项目课程 (opens new window)
GitHub (opens new window)
首页
  • 开发手册

    • 组件【ai】
    • 组件【algorithm】
    • 组件【banner】
    • 组件【core】
    • 组件【bom】
    • 组件【cors】
    • 组件【domain】
    • 组件【crypto】
    • 组件【data-cache】
    • 组件【elasticsearch】
    • 组件【excel】
    • 组件【i18n】
    • 组件【idempotent】
    • 组件【influxdb】
    • 组件【kafka】
    • 组件【log】
    • 组件【lock】
    • 组件【mail】
    • 组件【log4j2】
    • 组件【mqtt】
    • 组件【mybatis-plus】
    • 组件【nacos】
    • 组件【netty】
    • 组件【openapi-doc】
    • 组件【oss】
    • 组件【prometheus】
    • 组件【rate-limiter】
    • 组件【reactor】
    • 组件【redis】
    • 组件【secret】
    • 组件【security】
    • 组件【sensitive】
    • 组件【sms】
    • 组件【snail-job】
    • 组件【storage】
    • 组件【tdengine】
    • 组件【tenant】
    • 组件【test】
    • 组件【trace】
    • 组件【xss】
  • 环境搭建

    • Centos7安装Mysql 8.0.33
    • Centos7安装Redis 7.0.11
    • Centos7安装RocketMQ 5.1.1
    • Centos7安装Jdk 17.0.7
    • Centos7安装Docker 23.0.6
    • Centos7安装Elasticsearch 8.6.2
    • Docker安装RabbitMQ 3.12.2
    • Docker安装Postgresql 16.1
    • Ubuntu20.04安装Docker
    • Ubuntu20.04忘记密码或指纹错误
  • 常用命令

    • Centos7常用命令
    • Centos7常用命令
  • 快速上手

    • 项目启动【dev环境】
    • 项目启动【test环境】
    • 项目启动【prod环境】
    • 更新日志
  • 前端指南

    • 前端启动
  • 后端指南

    • COLA代码规范
    • SSL证书
    • 一键修改项目模块
    • 一键生成项目骨架
    • 一键修改项目版本号
    • 一键跳过测试用例
    • 一键生成后端COLA代码
    • 分布式链路跟踪之ELK日志
    • 一键检查代码规范
    • 动态路由
  • 项目部署

    • 项目部署之镜像打包与推送
  • 其他

    • Java如何快速转Go
    • Go快速开发API
    • Vue快速开发Api
    • React快速开发Api
  • 文章

    • 物联网之对接MQTT最佳实践
    • 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】
    • vue3+lime-echart各种图表使用【懒人专用,建议收藏】
    • 物联网之使用Vertx实现TCP最佳实践【响应式】
    • 物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】
    • 物联网之使用Vertx实现UDP最佳实践【响应式】
  • 文章

    • IntelliJ IDEA插件推荐
  • 儒学

    • 儒学摘抄(一)
  • 禅语

    • 禅语摘抄(一)
  • 诗词

    • 诗词摘抄(一)
  • 道法

    • 道法摘抄(一)
  • 养生

    • 养生摘抄(一)
  • 读后感

    • 读《强者,都是含泪奔跑的人》读后感
  • 修行

    • 修身/养生/情感
  • 觉悟

    • 觉悟日记(一)
赞助
项目课程 (opens new window)
GitHub (opens new window)
  • 开发手册

    • 组件【ai】
    • 组件【algorithm】
    • 组件【banner】
    • 组件【core】
    • 组件【bom】
    • 组件【cors】
    • 组件【domain】
    • 组件【crypto】
    • 组件【data-cache】
    • 组件【elasticsearch】
    • 组件【excel】
    • 组件【i18n】
    • 组件【idempotent】
    • 组件【influxdb】
    • 组件【kafka】
    • 组件【log】
    • 组件【lock】
    • 组件【mail】
    • 组件【log4j2】
    • 组件【mqtt】
    • 组件【mybatis-plus】
    • 组件【nacos】
    • 组件【netty】
    • 组件【openapi-doc】
    • 组件【oss】
    • 组件【prometheus】
    • 组件【rate-limiter】
    • 组件【reactor】
    • 组件【redis】
    • 组件【secret】
    • 组件【security】
    • 组件【sensitive】
    • 组件【sms】
    • 组件【snail-job】
    • 组件【storage】
    • 组件【tdengine】
    • 组件【tenant】
    • 组件【test】
    • 组件【trace】
    • 组件【xss】
  • 环境搭建

    • Centos7安装Mysql 8.0.33
    • Centos7安装Redis 7.0.11
    • Centos7安装RocketMQ 5.1.1
    • Centos7安装Jdk 17.0.7
    • Centos7安装Docker 23.0.6
    • Docker安装RabbitMQ 3.12.2
    • Centos7安装Elasticsearch 8.6.2
    • Docker安装Postgresql 16.1
    • Ubuntu20.04安装Docker
    • Ubuntu20.04忘记密码或指纹错误
  • 常用命令

    • Centos7常用命令
    • Docker常用命令
  • 快速上手

    • 项目启动【dev环境】
    • 项目启动【test环境】
    • 项目启动【prod环境】
    • 更新日志
  • 前端指南

    • 前端启动
  • 后端指南

    • COLA代码规范
    • SSL证书
    • 一键修改项目模块
    • 一键生成项目骨架
    • 一键修改项目版本号
    • 一键跳过测试用例
    • 一键生成后端COLA代码
    • 分布式链路跟踪之ELK日志
    • 一键检查代码规范
    • 动态路由
    • OAuth2.1流程
  • 项目部署

    • 项目部署之镜像打包与推送
  • 其他

    • Java如何快速转Go
    • Go快速开发API
    • Vue快速开发Api
    • React快速开发Api
  • 文章

    • 物联网之对接MQTT最佳实践
      • 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】
      • vue3+lime-echart各种图表使用【懒人专用,建议收藏】
      • 物联网之使用Vertx实现TCP最佳实践【响应式】
      • 物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】
      • 物联网之使用Vertx实现UDP最佳实践【响应式】
    • 推荐

      • IntelliJ IDEA插件推荐
    • 指南
    • 文章
    KCloud-Platform-IoT
    2025-05-05
    目录

    物联网之对接MQTT最佳实践

    你好呀,我的老朋友!我是老寇,欢迎来到老寇云平台!

    跟我一起学习对接MQTT

    # 安装EMQX

    采用docker-compose一键式启动!!!

    还没有安装docker朋友,参考文章下面两篇文章

    # Ubuntu20.04安装Docker (opens new window)

    # Centos7安装Docker 23.0.6 (opens new window)

    # 使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!
    # 使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!
    # 使用 emqx 5.4.1,按照老夫的教程来,请不要改版本号!!!
    services:
        emqx:
          image: emqx/emqx:5.4.1
          container_name: emqx
          # 保持容器在没有守护程序的情况下运行
          tty: true
          restart: always
          privileged: true
          ports:
            - "1883:1883"
            - "8083:8083"
            - "8883:8883"
            - "18083:18083"
          environment:
            - TZ=Asia/Shanghai
          volumes:
            # 挂载数据存储
            - ./emqx/data:/opt/emqx/data
            # 挂载日志文件
            - ./emqx/log:/opt/emqx/log
          networks:
            - iot_network
    networks:
      iot_network:
        driver: bridge
    

    访问 http://127.0.0.1:18083 (opens new window) 设置密码

    # EMQX MQTT【摘抄自官方文档】

    EMQX官方文档 (opens new window)

    MQTT (opens new window) 是物联网 (IoT) 的 OASIS 标准消息传递协议。它被设计为一种极轻量的发布/订阅消息传输协议,非常适合以较小的代码占用空间和极低的网络带宽连接远程设备。MQTT 目前广泛应用于汽车、制造、电信、石油和天然气等众多行业。

    EMQX 完全兼容 MQTT 5.0 和 3.x,本节将介绍 MQTT 相关功能的基本配置项,包括基本 MQTT 设置、订阅设置、会话设置、强制关闭设置和强制垃圾回收设置等

    # 客户端对接

    本文章采用三种客户端对接

    维度 Paho Hivemq-MQTT-Client Vert.x MQTT Client
    协议支持 MQTT 3.1.1(5.0 实验性) MQTT 5.0 完整支持 MQTT 5.0(较新版本)
    性能 中(同步模式) 高(异步非阻塞) 极高(响应式架构)
    依赖复杂度 低 中(仅 Netty) 高(需 Vert.x 生态)
    社区资源 丰富 较少 中等
    适用场景 传统 IoT、跨语言项目 企业级 MQTT 5.0、高吞吐 响应式系统、高并发微服务

    # Paho【不推荐,连接不稳定】

    Paho代码地址 (opens new window)

    # 引入依赖
    <dependencies>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
            <version>1.2.5</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
    </dependencies>
    
    # 项目集成

    PahoProperties

    /**
     * @author laokou
     */
    @Data
    public class PahoProperties {
    
        private boolean auth = true;
    
        private String username = "emqx";
    
        private String password = "laokou123";
    
        private String host = "127.0.0.1";
    
        private int port = 1883;
    
        private String clientId;
    
        private int subscribeQos = 1;
    
        private int publishQos = 0;
    
        private int willQos = 1;
    
        private int connectionTimeout = 60;
    
        private boolean manualAcks = false;
    
        // @formatter:off
        /**
         * 控制是否创建新会话(true=新建,false=复用历史会话). clearStart=true => Broker 会在连接断开后立即清除所有会话信息.
         * clearStart=false => Broker 会在连接断开后保存会话信息,并在重新连接后复用会话信息.
         * <a href="https://github.com/hivemq/hivemq-mqtt-client/issues/627">...</a>
         */
        // @formatter:on
        private boolean clearStart = false;
    
        private int receiveMaximum = 10000;
    
        private int maximumPacketSize = 10000;
    
        // @formatter:off
        /**
         * 默认会话保留一天.
         * 最大值,4294967295L,会话过期时间【永不过期,单位秒】.
         * 定义客户端断开后会话保留的时间(仅在 Clean Session = false 时生效).
         */
        private long sessionExpiryInterval = 86400L;
        // @formatter:on
    
        /**
         * 心跳包每隔60秒发一次.
         */
        private int keepAliveInterval = 60;
    
        private boolean automaticReconnect = true;
    
        private Set<String> topics = new HashSet<>(0);
    
    }
    

    PahoMqttClientMessageCallbackV5

    /**
     * @author laokou
     */
    @Slf4j
    @RequiredArgsConstructor
    public class PahoMqttClientMessageCallbackV5 implements MqttCallback {
    
        private final List<MessageHandler> messageHandlers;
    
        @Override
        public void disconnected(MqttDisconnectResponse disconnectResponse) {
           log.error("【Paho-V5】 => MQTT关闭连接");
        }
    
        @Override
        public void mqttErrorOccurred(MqttException ex) {
           log.error("【Paho-V5】 => MQTT报错,错误信息:{}", ex.getMessage());
        }
    
        @Override
        public void messageArrived(String topic, MqttMessage message) {
           for (MessageHandler messageHandler : messageHandlers) {
              if (messageHandler.isSubscribe(topic)) {
                 log.info("【Paho-V5】 => MQTT接收到消息,Topic:{}", topic);
                 messageHandler.handle(new org.laokou.sample.mqtt.handler.MqttMessage(message.getPayload(), topic));
              }
           }
        }
    
        @Override
        public void deliveryComplete(IMqttToken token) {
           log.info("【Paho-V5】 => MQTT消息发送成功,消息ID:{}", token.getMessageId());
        }
    
        @Override
        public void connectComplete(boolean reconnect, String uri) {
           if (reconnect) {
              log.info("【Paho-V5】 => MQTT重连成功,URI:{}", uri);
           }
           else {
              log.info("【Paho-V5】 => MQTT建立连接,URI:{}", uri);
           }
        }
    
        @Override
        public void authPacketArrived(int reasonCode, MqttProperties properties) {
           log.info("【Paho-V5】 => 接收到身份验证数据包:{}", reasonCode);
        }
    
    }
    

    PahoV5MqttClientTest

    /**
     * @author laokou
     */
    @SpringBootTest
    @RequiredArgsConstructor
    @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
    class PahoV5MqttClientTest {
    
        private final List<MessageHandler> messageHandlers;
    
        @Test
        void testMqttClient() throws InterruptedException {
            ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);
    
            PahoProperties pahoProperties = new PahoProperties();
            pahoProperties.setClientId("test-client-3");
            pahoProperties.setTopics(Set.of("/test-topic-3/#"));
            PahoMqttClientV5 pahoMqttClientV5 = new PahoMqttClientV5(pahoProperties, messageHandlers, scheduledExecutorService);
            pahoMqttClientV5.open();
            Thread.sleep(1000);
            pahoMqttClientV5.publish("/test-topic-3/789", "Hello World789".getBytes());
        }
    
    }
    

    PahoMqttClientMessageCallbackV3

    /**
     * @author laokou
     */
    @Slf4j
    @RequiredArgsConstructor
    public class PahoMqttClientMessageCallbackV3 implements MqttCallback {
    
        private final List<MessageHandler> messageHandlers;
    
        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
           log.info("【Paho-V3】 => MQTT消息发送成功,消息ID:{}", iMqttDeliveryToken.getMessageId());
        }
    
        @Override
        public void connectionLost(Throwable throwable) {
           log.error("【Paho-V3】 => MQTT关闭连接");
        }
    
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
           for (MessageHandler messageHandler : messageHandlers) {
              if (messageHandler.isSubscribe(topic)) {
                 log.info("【Paho-V3】 => MQTT接收到消息,Topic:{}", topic);
                 messageHandler.handle(new org.laokou.sample.mqtt.handler.MqttMessage(message.getPayload(), topic));
              }
           }
        }
    }
    

    PahoV3MqttClientTest

    /**
     * @author laokou
     */
    @SpringBootTest
    @RequiredArgsConstructor
    @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
    class PahoV3MqttClientTest {
    
        private final List<MessageHandler> messageHandlers;
    
        @Test
        void testMqttClient() throws InterruptedException {
            ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);
    
            PahoProperties pahoProperties2 = new PahoProperties();
            pahoProperties2.setClientId("test-client-4");
            pahoProperties2.setTopics(Set.of("/test-topic-4/#"));
            PahoMqttClientV3 pahoMqttClientV3 = new PahoMqttClientV3(pahoProperties2, messageHandlers, scheduledExecutorService);
            pahoMqttClientV3.open();
            Thread.sleep(1000);
            pahoMqttClientV3.publish("/test-topic-4/000", "Hello World000".getBytes());
        }
    
    }
    

    # Hivemq-MQTT-Client【不推荐】

    注意:订阅一段时间收不到数据,标准mqtt5.0协议,不兼容emqx broker mqtt5.0

    Hivemq代码地址 (opens new window)

    # 引入依赖
    <dependencies>
        <dependency>
            <groupId>com.hivemq</groupId>
            <artifactId>hivemq-mqtt-client-reactor</artifactId>
            <version>1.3.5</version>
        </dependency>
        <dependency>
            <groupId>com.hivemq</groupId>
            <artifactId>hivemq-mqtt-client-epoll</artifactId>
            <version>1.3.5</version>
            <type>pom</type>
        </dependency>
    <dependencies>
    
    # 项目集成

    HivemqProperties

    /**
     * @author laokou
     */
    @Data
    public class HivemqProperties {
    
        private boolean auth = true;
    
        private String username = "emqx";
    
        private String password = "laokou123";
    
        private String host = "127.0.0.1";
    
        private int port = 1883;
    
        private String clientId;
    
        private int subscribeQos = 1;
    
        private int publishQos = 0;
    
        private int willQos = 1;
    
        // @formatter:off
        /**
         * 控制是否创建新会话(true=新建,false=复用历史会话). clearStart=true => Broker 会在连接断开后立即清除所有会话信息.
         * clearStart=false => Broker 会在连接断开后保存会话信息,并在重新连接后复用会话信息.
         * <a href="https://github.com/hivemq/hivemq-mqtt-client/issues/627">...</a>
         */
        // @formatter:on
        private boolean clearStart = false;
    
        private int receiveMaximum = 10000;
    
        private int sendMaximum = 10000;
    
        private int maximumPacketSize = 10000;
    
        private int sendMaximumPacketSize = 10000;
    
        private int topicAliasMaximum = 1024;
    
        private int sendTopicAliasMaximum = 2048;
    
        private long messageExpiryInterval = 86400L;
    
        private boolean requestProblemInformation = true;
    
        private boolean requestResponseInformation = true;
    
        // @formatter:off
        /**
         * 默认会话保留一天.
         * 最大值,4294967295L,会话过期时间【永不过期,单位秒】.
         * 定义客户端断开后会话保留的时间(仅在 Clean Session = false 时生效).
         */
        private long sessionExpiryInterval = 86400L;
        // @formatter:on
    
        /**
         * 心跳包每隔60秒发一次.
         */
        private int keepAliveInterval = 60;
    
        private boolean automaticReconnect = true;
    
        private long automaticReconnectMaxDelay = 5;
    
        private long automaticReconnectInitialDelay = 1;
    
        private Set<String> topics = new HashSet<>(0);
    
        private int nettyThreads = 32;
    
        private boolean retain = false;
    
        private boolean noLocal = false;
    
    }
    

    HivemqClientV5

    /**
     * @author laokou
     */
    @Slf4j
    public class HivemqClientV5 {
    
        /**
         * 响应主题.
         */
        private final String RESPONSE_TOPIC = "response/topic";
    
        /**
         * 服务下线数据.
         */
        private final byte[] WILL_PAYLOAD = "offline".getBytes(UTF_8);
    
        /**
         * 相关数据.
         */
        private final byte[] CORRELATION_DATA = "correlationData".getBytes(UTF_8);
    
        private final HivemqProperties hivemqProperties;
    
        private final List<MessageHandler> messageHandlers;
    
        private volatile Mqtt5RxClient client;
    
        private final Object lock = new Object();
    
        private volatile Disposable connectDisposable;
    
        private volatile Disposable subscribeDisposable;
    
        private volatile Disposable unSubscribeDisposable;
    
        private volatile Disposable publishDisposable;
    
        private volatile Disposable disconnectDisposable;
    
        private volatile Disposable consumeDisposable;
    
        public HivemqClientV5(HivemqProperties hivemqProperties, List<MessageHandler> messageHandlers) {
            this.hivemqProperties = hivemqProperties;
            this.messageHandlers = messageHandlers;
        }
    
        public void open() {
            if (Objects.isNull(client)) {
                synchronized (lock) {
                    if (Objects.isNull(client)) {
                        client = getMqtt5ClientBuilder().buildRx();
                    }
                }
            }
            connect();
            consume();
        }
    
        public void close() {
            if (!Objects.isNull(client)) {
                disconnectDisposable = client.disconnectWith()
                        .sessionExpiryInterval(hivemqProperties.getSessionExpiryInterval())
                        .applyDisconnect()
                        .subscribeOn(Schedulers.io())
                        .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)
                                .takeWhile(retryCount -> retryCount != -1)
                                .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))
                        .subscribe(() -> log.info("【Hivemq-V5】 => MQTT断开连接成功,客户端ID:{}", hivemqProperties.getClientId()),
                                e -> log.error("【Hivemq-V5】 => MQTT断开连接失败,错误信息:{}", e.getMessage(), e));
            }
        }
    
        public void subscribe() {
            String[] topics = getTopics();
            subscribe(topics, getQosArray(topics));
        }
    
        public String[] getTopics() {
            return hivemqProperties.getTopics().toArray(String[]::new);
        }
    
        public int[] getQosArray(String[] topics) {
            return Stream.of(topics).mapToInt(item -> hivemqProperties.getSubscribeQos()).toArray();
        }
    
        public void subscribe(String[] topics, int[] qosArray) {
            checkTopicAndQos(topics, qosArray);
            if (!Objects.isNull(client)) {
                List<Mqtt5Subscription> subscriptions = new ArrayList<>(topics.length);
                for (int i = 0; i < topics.length; i++) {
                    subscriptions.add(Mqtt5Subscription.builder()
                            .topicFilter(topics[i])
                            .qos(getMqttQos(qosArray[i]))
                            .retainAsPublished(hivemqProperties.isRetain())
                            .noLocal(hivemqProperties.isNoLocal())
                            .build());
                }
                subscribeDisposable = client.subscribeWith()
                        .addSubscriptions(subscriptions)
                        .applySubscribe()
                        .subscribeOn(Schedulers.io())
                        .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)
                                .takeWhile(retryCount -> retryCount != -1)
                                .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))
                        .subscribe(ack -> log.info("【Hivemq-V5】 => MQTT订阅成功,主题: {}", String.join("、", topics)), e -> log
                                .error("【Hivemq-V5】 => MQTT订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e));
            }
        }
    
        public void unSubscribe() {
            String[] topics = hivemqProperties.getTopics().toArray(String[]::new);
            unSubscribe(topics);
        }
    
        public void unSubscribe(String[] topics) {
            checkTopic(topics);
            if (!Objects.isNull(client)) {
                List<MqttTopicFilter> matchedTopics = new ArrayList<>(topics.length);
                for (String topic : topics) {
                    matchedTopics.add(MqttTopicFilter.of(topic));
                }
                unSubscribeDisposable = client.unsubscribeWith()
                        .addTopicFilters(matchedTopics)
                        .applyUnsubscribe()
                        .subscribeOn(Schedulers.io())
                        .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)
                                .takeWhile(retryCount -> retryCount != -1)
                                .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))
                        .subscribe(ack -> log.info("【Hivemq-V5】 => MQTT取消订阅成功,主题:{}", String.join("、", topics)), e -> log
                                .error("【Hivemq-V5】 => MQTT取消订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e));
            }
        }
    
        public void publish(String topic, byte[] payload, int qos) {
            if (!Objects.isNull(client)) {
                publishDisposable = client
                        .publish(Flowable.just(Mqtt5Publish.builder()
                                .topic(topic)
                                .qos(getMqttQos(qos))
                                .payload(payload)
                                .noMessageExpiry()
                                .retain(hivemqProperties.isRetain())
                                .messageExpiryInterval(hivemqProperties.getMessageExpiryInterval())
                                .correlationData(CORRELATION_DATA)
                                .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
                                .contentType("text/plain")
                                .responseTopic(RESPONSE_TOPIC)
                                .build()))
                        .subscribeOn(Schedulers.io())
                        .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)
                                .takeWhile(retryCount -> retryCount != -1)
                                .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))
                        .subscribe(ack -> log.info("【Hivemq-V5】 => MQTT消息发布成功,topic:{}", topic),
                                e -> log.error("【Hivemq-V5】 => MQTT消息发布失败,topic:{},错误信息:{}", topic, e.getMessage(), e));
            }
        }
    
        public void publish(String topic, byte[] payload) {
            publish(topic, payload, hivemqProperties.getPublishQos());
        }
    
        public void dispose(Disposable disposable) {
            if (!Objects.isNull(disposable) && !disposable.isDisposed()) {
                // 显式取消订阅
                disposable.dispose();
            }
        }
    
        public void dispose() {
            dispose(connectDisposable);
            dispose(subscribeDisposable);
            dispose(unSubscribeDisposable);
            dispose(publishDisposable);
            dispose(consumeDisposable);
            dispose(disconnectDisposable);
        }
    
        public void reSubscribe() {
            log.info("【Hivemq-V5】 => MQTT重新订阅开始");
            dispose(subscribeDisposable);
            subscribe();
            log.info("【Hivemq-V5】 => MQTT重新订阅结束");
        }
    
        private MqttQos getMqttQos(int qos) {
            return MqttQos.fromCode(qos);
        }
    
        private void connect() {
            connectDisposable = client.connectWith()
                    .keepAlive(hivemqProperties.getKeepAliveInterval())
                    .cleanStart(hivemqProperties.isClearStart())
                    .sessionExpiryInterval(hivemqProperties.getSessionExpiryInterval())
                    .willPublish()
                    .topic("will/topic")
                    .payload(WILL_PAYLOAD)
                    .qos(getMqttQos(hivemqProperties.getWillQos()))
                    .retain(true)
                    .messageExpiryInterval(100)
                    .delayInterval(10)
                    .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8)
                    .contentType("text/plain")
                    .responseTopic(RESPONSE_TOPIC)
                    .correlationData(CORRELATION_DATA)
                    .applyWillPublish()
                    .restrictions()
                    .receiveMaximum(hivemqProperties.getReceiveMaximum())
                    .sendMaximum(hivemqProperties.getSendMaximum())
                    .maximumPacketSize(hivemqProperties.getMaximumPacketSize())
                    .sendMaximumPacketSize(hivemqProperties.getSendMaximumPacketSize())
                    .topicAliasMaximum(hivemqProperties.getTopicAliasMaximum())
                    .sendTopicAliasMaximum(hivemqProperties.getSendTopicAliasMaximum())
                    .requestProblemInformation(hivemqProperties.isRequestProblemInformation())
                    .requestResponseInformation(hivemqProperties.isRequestResponseInformation())
                    .applyRestrictions()
                    .applyConnect()
                    .toFlowable()
                    .firstElement()
                    .subscribeOn(Schedulers.io())
                    .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)
                            .takeWhile(retryCount -> retryCount != -1)
                            .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))
                    .subscribe(
                            ack -> log.info("【Hivemq-V5】 => MQTT连接成功,主机:{},端口:{},客户端ID:{}", hivemqProperties.getHost(),
                                    hivemqProperties.getPort(), hivemqProperties.getClientId()),
                            e -> log.error("【Hivemq-V5】 => MQTT连接失败,错误信息:{}", e.getMessage(), e));
        }
    
        private void consume() {
            if (!Objects.isNull(client)) {
                consumeDisposable = client.publishes(MqttGlobalPublishFilter.ALL)
                        .onBackpressureBuffer(8192)
                        .observeOn(Schedulers.computation(), false, 8192)
                        .doOnSubscribe(subscribe -> {
                            log.info("【Hivemq-V5】 => MQTT开始订阅消息,请稍候。。。。。。");
                            reSubscribe();
                        })
                        .subscribeOn(Schedulers.io())
                        .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)
                                .takeWhile(retryCount -> retryCount != -1)
                                .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))
                        .subscribe(publish -> {
                                    for (MessageHandler messageHandler : messageHandlers) {
                                        if (messageHandler.isSubscribe(publish.getTopic().toString())) {
                                            log.info("【Hivemq-V5】 => MQTT接收到消息,Topic:{}", publish.getTopic());
                                            messageHandler
                                                    .handle(new MqttMessage(publish.getPayloadAsBytes(), publish.getTopic().toString()));
                                        }
                                    }
                                }, e -> log.error("【Hivemq-V5】 => MQTT消息处理失败,错误信息:{}", e.getMessage(), e),
                                () -> log.info("【Hivemq-V5】 => MQTT订阅消息结束,请稍候。。。。。。"));
            }
        }
    
        private Mqtt5ClientBuilder getMqtt5ClientBuilder() {
            Mqtt5ClientBuilder builder = Mqtt5Client.builder().addConnectedListener(listener -> {
                        Optional<? extends MqttClientConnectionConfig> config = Optional
                                .of(listener.getClientConfig().getConnectionConfig())
                                .get();
                        config.ifPresent(mqttClientConnectionConfig -> log.info("【Hivemq-V5】 => MQTT连接保持时间:{}ms",
                                mqttClientConnectionConfig.getKeepAlive()));
                        log.info("【Hivemq-V5】 => MQTT已连接,客户端ID:{}", hivemqProperties.getClientId());
                    })
                    .addDisconnectedListener(
                            listener -> log.error("【Hivemq-V5】 => MQTT已断开连接,客户端ID:{}", hivemqProperties.getClientId()))
                    .identifier(hivemqProperties.getClientId())
                    .serverHost(hivemqProperties.getHost())
                    .serverPort(hivemqProperties.getPort())
                    .executorConfig(MqttClientExecutorConfig.builder()
                            .nettyExecutor(ThreadUtils.newVirtualTaskExecutor())
                            .nettyThreads(hivemqProperties.getNettyThreads())
                            .applicationScheduler(Schedulers.from(ThreadUtils.newVirtualTaskExecutor()))
                            .build());
            // 开启重连
            if (hivemqProperties.isAutomaticReconnect()) {
                builder.automaticReconnect()
                        .initialDelay(hivemqProperties.getAutomaticReconnectInitialDelay(), TimeUnit.SECONDS)
                        .maxDelay(hivemqProperties.getAutomaticReconnectMaxDelay(), TimeUnit.SECONDS)
                        .applyAutomaticReconnect();
            }
            if (hivemqProperties.isAuth()) {
                builder.simpleAuth()
                        .username(hivemqProperties.getUsername())
                        .password(hivemqProperties.getPassword().getBytes())
                        .applySimpleAuth();
            }
            return builder;
        }
    
        private void checkTopicAndQos(String[] topics, int[] qosArray) {
            if (topics == null || qosArray == null) {
                throw new IllegalArgumentException("【" + "Hivemq-V5" + "】 => Topics and QoS arrays cannot be null");
            }
            if (topics.length != qosArray.length) {
                throw new IllegalArgumentException("【" + "Hivemq-V5" + "】 => Topics and QoS arrays must have the same length");
            }
            if (topics.length == 0) {
                throw new IllegalArgumentException("【" + "Hivemq-V5" + "】 => Topics array cannot be empty");
            }
        }
    
        private void checkTopic(String[] topics) {
            if (topics.length == 0) {
                throw new IllegalArgumentException("【" + "Hivemq-V5" + "】 => Topics array cannot be empty");
            }
        }
    
    }
    

    HivemqV5MqttClientTest

    /**
     * @author laokou
     */
    @SpringBootTest
    @RequiredArgsConstructor
    @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
    class HivemqV5MqttClientTest {
    
        private final List<MessageHandler> messageHandlers;
    
        @Test
        void testMqttClient() throws InterruptedException {
            HivemqProperties hivemqProperties = new HivemqProperties();
            hivemqProperties.setClientId("test-client-1");
            hivemqProperties.setTopics(Set.of("/test-topic-1/#"));
            HivemqClientV5 hivemqClientV5 = new HivemqClientV5(hivemqProperties, messageHandlers);
            hivemqClientV5.open();
            hivemqClientV5.publish("/test-topic-1/123", "Hello World123".getBytes());
        }
    
    }
    

    HivemqClientV3

    /**
     * @author laokou
     */
    @Slf4j
    public class HivemqClientV3 {
    
        /**
         * 服务下线数据.
         */
        private final byte[] WILL_PAYLOAD = "offline".getBytes(UTF_8);
    
        private final HivemqProperties hivemqProperties;
    
        private final List<MessageHandler> messageHandlers;
    
        private volatile Mqtt3RxClient client;
    
        private final Object lock = new Object();
    
        private volatile Disposable connectDisposable;
    
        private volatile Disposable subscribeDisposable;
    
        private volatile Disposable unSubscribeDisposable;
    
        private volatile Disposable publishDisposable;
    
        private volatile Disposable disconnectDisposable;
    
        private volatile Disposable consumeDisposable;
    
        public HivemqClientV3(HivemqProperties hivemqProperties, List<MessageHandler> messageHandlers) {
            this.hivemqProperties = hivemqProperties;
            this.messageHandlers = messageHandlers;
        }
    
        public void open() {
            if (Objects.isNull(client)) {
                synchronized (lock) {
                    if (Objects.isNull(client)) {
                        client = getMqtt3ClientBuilder().buildRx();
                    }
                }
            }
            connect();
            consume();
        }
    
        public void close() {
            if (!Objects.isNull(client)) {
                disconnectDisposable = client.disconnect()
                        .subscribeOn(Schedulers.io())
                        .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)
                                .takeWhile(retryCount -> retryCount != -1)
                                .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))
                        .subscribe(() -> log.info("【Hivemq-V3】 => MQTT断开连接成功,客户端ID:{}", hivemqProperties.getClientId()),
                                e -> log.error("【Hivemq-V3】 => MQTT断开连接失败,错误信息:{}", e.getMessage(), e));
            }
        }
    
        public void subscribe() {
            String[] topics = getTopics();
            subscribe(topics, getQosArray(topics));
        }
    
        public String[] getTopics() {
            return hivemqProperties.getTopics().toArray(String[]::new);
        }
    
        public int[] getQosArray(String[] topics) {
            return Stream.of(topics).mapToInt(item -> hivemqProperties.getSubscribeQos()).toArray();
        }
    
        public void subscribe(String[] topics, int[] qosArray) {
            checkTopicAndQos(topics, qosArray);
            if (!Objects.isNull(client)) {
                List<Mqtt3Subscription> subscriptions = new ArrayList<>(topics.length);
                for (int i = 0; i < topics.length; i++) {
                    subscriptions.add(Mqtt3Subscription.builder()
                            .topicFilter(topics[i])
                            .qos(getMqttQos(qosArray[i]))
                            .build());
                }
                subscribeDisposable = client.subscribeWith()
                        .addSubscriptions(subscriptions)
                        .applySubscribe()
                        .subscribeOn(Schedulers.io())
                        .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)
                                .takeWhile(retryCount -> retryCount != -1)
                                .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))
                        .subscribe(ack -> log.info("【Hivemq-V3】 => MQTT订阅成功,主题: {}", String.join("、", topics)), e -> log
                                .error("【Hivemq-V3】 => MQTT订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e));
            }
        }
    
        public void unSubscribe() {
            String[] topics = hivemqProperties.getTopics().toArray(String[]::new);
            unSubscribe(topics);
        }
    
        public void unSubscribe(String[] topics) {
            checkTopic(topics);
            if (!Objects.isNull(client)) {
                List<MqttTopicFilter> matchedTopics = new ArrayList<>(topics.length);
                for (String topic : topics) {
                    matchedTopics.add(MqttTopicFilter.of(topic));
                }
                unSubscribeDisposable = client.unsubscribeWith()
                        .addTopicFilters(matchedTopics)
                        .applyUnsubscribe()
                        .subscribeOn(Schedulers.io())
                        .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)
                                .takeWhile(retryCount -> retryCount != -1)
                                .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))
                        .subscribe(() -> log.info("【Hivemq-V3】 => MQTT取消订阅成功,主题:{}", String.join("、", topics)), e -> log
                                .error("【Hivemq-V3】 => MQTT取消订阅失败,主题:{},错误信息:{}", String.join("、", topics), e.getMessage(), e));
            }
        }
    
        public void publish(String topic, byte[] payload, int qos) {
            if (!Objects.isNull(client)) {
                publishDisposable = client
                        .publish(Flowable.just(Mqtt3Publish.builder()
                                .topic(topic)
                                .qos(getMqttQos(qos))
                                .payload(payload)
                                .retain(hivemqProperties.isRetain())
                                .build()))
                        .subscribeOn(Schedulers.io())
                        .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)
                                .takeWhile(retryCount -> retryCount != -1)
                                .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))
                        .subscribe(ack -> log.info("【Hivemq-V3】 => MQTT消息发布成功,topic:{}", topic),
                                e -> log.error("【Hivemq-V3】 => MQTT消息发布失败,topic:{},错误信息:{}", topic, e.getMessage(), e));
            }
        }
    
        public void publish(String topic, byte[] payload) {
            publish(topic, payload, hivemqProperties.getPublishQos());
        }
    
        public void dispose(Disposable disposable) {
            if (!Objects.isNull(disposable) && !disposable.isDisposed()) {
                // 显式取消订阅
                disposable.dispose();
            }
        }
    
        public void dispose() {
            dispose(connectDisposable);
            dispose(subscribeDisposable);
            dispose(unSubscribeDisposable);
            dispose(publishDisposable);
            dispose(consumeDisposable);
            dispose(disconnectDisposable);
        }
    
        public void reSubscribe() {
            log.info("【Hivemq-V3】 => MQTT重新订阅开始");
            dispose(subscribeDisposable);
            subscribe();
            log.info("【Hivemq-V3】 => MQTT重新订阅结束");
        }
    
        private MqttQos getMqttQos(int qos) {
            return MqttQos.fromCode(qos);
        }
    
        private void connect() {
            connectDisposable = client.connectWith()
                    .keepAlive(hivemqProperties.getKeepAliveInterval())
                    .willPublish()
                    .topic("will/topic")
                    .payload(WILL_PAYLOAD)
                    .qos(getMqttQos(hivemqProperties.getWillQos()))
                    .retain(true)
                    .applyWillPublish()
                    .restrictions()
                    .sendMaximum(hivemqProperties.getSendMaximum())
                    .sendMaximumPacketSize(hivemqProperties.getSendMaximumPacketSize())
                    .applyRestrictions()
                    .applyConnect()
                    .toFlowable()
                    .firstElement()
                    .subscribeOn(Schedulers.io())
                    .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)
                            .takeWhile(retryCount -> retryCount != -1)
                            .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))
                    .subscribe(
                            ack -> log.info("【Hivemq-V3】 => MQTT连接成功,主机:{},端口:{},客户端ID:{}", hivemqProperties.getHost(),
                                    hivemqProperties.getPort(), hivemqProperties.getClientId()),
                            e -> log.error("【Hivemq-V3】 => MQTT连接失败,错误信息:{}", e.getMessage(), e));
        }
    
        private void consume() {
            if (!Objects.isNull(client)) {
                consumeDisposable = client.publishes(MqttGlobalPublishFilter.ALL)
                        .onBackpressureBuffer(8192)
                        .observeOn(Schedulers.computation(), false, 8192)
                        .doOnSubscribe(subscribe -> {
                            log.info("【Hivemq-V3】 => MQTT开始订阅消息,请稍候。。。。。。");
                            reSubscribe();
                        })
                        .subscribeOn(Schedulers.io())
                        .retryWhen(errors -> errors.scan(1, (retryCount, error) -> retryCount > 5 ? -1 : retryCount + 1)
                                .takeWhile(retryCount -> retryCount != -1)
                                .flatMap(retryCount -> Flowable.timer((long) Math.pow(2, retryCount) * 100, TimeUnit.MILLISECONDS)))
                        .subscribe(publish -> {
                                    for (MessageHandler messageHandler : messageHandlers) {
                                        if (messageHandler.isSubscribe(publish.getTopic().toString())) {
                                            log.info("【Hivemq-V3】 => MQTT接收到消息,Topic:{}", publish.getTopic());
                                            messageHandler
                                                    .handle(new MqttMessage(publish.getPayloadAsBytes(), publish.getTopic().toString()));
                                        }
                                    }
                                }, e -> log.error("【Hivemq-V3】 => MQTT消息处理失败,错误信息:{}", e.getMessage(), e),
                                () -> log.info("【Hivemq-V3】 => MQTT订阅消息结束,请稍候。。。。。。"));
            }
        }
    
        private Mqtt3ClientBuilder getMqtt3ClientBuilder() {
            Mqtt3ClientBuilder builder = Mqtt3Client.builder().addConnectedListener(listener -> {
                        Optional<? extends MqttClientConnectionConfig> config = Optional
                                .of(listener.getClientConfig().getConnectionConfig())
                                .get();
                        config.ifPresent(mqttClientConnectionConfig -> log.info("【Hivemq-V5】 => MQTT连接保持时间:{}ms",
                                mqttClientConnectionConfig.getKeepAlive()));
                        log.info("【Hivemq-V3】 => MQTT已连接,客户端ID:{}", hivemqProperties.getClientId());
                    })
                    .addDisconnectedListener(
                            listener -> log.error("【Hivemq-V3】 => MQTT已断开连接,客户端ID:{}", hivemqProperties.getClientId()))
                    .identifier(hivemqProperties.getClientId())
                    .serverHost(hivemqProperties.getHost())
                    .serverPort(hivemqProperties.getPort())
                    .executorConfig(MqttClientExecutorConfig.builder()
                            .nettyExecutor(ThreadUtils.newVirtualTaskExecutor())
                            .nettyThreads(hivemqProperties.getNettyThreads())
                            .applicationScheduler(Schedulers.from(ThreadUtils.newVirtualTaskExecutor()))
                            .build());
            // 开启重连
            if (hivemqProperties.isAutomaticReconnect()) {
                builder.automaticReconnect()
                        .initialDelay(hivemqProperties.getAutomaticReconnectInitialDelay(), TimeUnit.SECONDS)
                        .maxDelay(hivemqProperties.getAutomaticReconnectMaxDelay(), TimeUnit.SECONDS)
                        .applyAutomaticReconnect();
            }
            if (hivemqProperties.isAuth()) {
                builder.simpleAuth()
                        .username(hivemqProperties.getUsername())
                        .password(hivemqProperties.getPassword().getBytes())
                        .applySimpleAuth();
            }
            return builder;
        }
    
        private void checkTopicAndQos(String[] topics, int[] qosArray) {
            if (topics == null || qosArray == null) {
                throw new IllegalArgumentException("【" + "Hivemq-V3" + "】 => Topics and QoS arrays cannot be null");
            }
            if (topics.length != qosArray.length) {
                throw new IllegalArgumentException("【" + "Hivemq-V3" + "】 => Topics and QoS arrays must have the same length");
            }
            if (topics.length == 0) {
                throw new IllegalArgumentException("【" + "Hivemq-V3" + "】 => Topics array cannot be empty");
            }
        }
    
        private void checkTopic(String[] topics) {
            if (topics.length == 0) {
                throw new IllegalArgumentException("【" + "Hivemq-V3" + "】 => Topics array cannot be empty");
            }
        }
    
    }
    

    HivemqV3MqttClientTest

    /**
     * @author laokou
     */
    @SpringBootTest
    @RequiredArgsConstructor
    @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
    class HivemqV3MqttClientTest {
    
        private final List<MessageHandler> messageHandlers;
    
        @Test
        void testMqttClient() throws InterruptedException {
            HivemqProperties hivemqProperties2 = new HivemqProperties();
            hivemqProperties2.setClientId("test-client-2");
            hivemqProperties2.setTopics(Set.of("/test-topic-2/#"));
            HivemqClientV3 hivemqClientV3 = new HivemqClientV3(hivemqProperties2, messageHandlers);
            hivemqClientV3.open();
            hivemqClientV3.publish("/test-topic-2/456", "Hello World456".getBytes());
        }
    
    }
    

    # Vert.x MQTT Client【推荐,只兼容mqtt3.1.1】

    # Vert.x MQTT文档 (opens new window)

    # 引入依赖
    <dependencies>
        <dependency>
          <groupId>io.vertx</groupId>
          <artifactId>vertx-mqtt</artifactId>
          <version>4.5.14</version>
        </dependency>
        <dependency>
          <groupId>io.projectreactor</groupId>
          <artifactId>reactor-core</artifactId>
          <version>3.7.5</version>
        </dependency>
    </dependencies>
    
    # 项目集成

    MqttClientProperties

    /**
     * @author laokou
     */
    @Data
    public class MqttClientProperties {
    
        private boolean auth = true;
    
        private String username = "emqx";
    
        private String password = "laokou123";
    
        private String host = "127.0.0.1";
    
        private int port = 1883;
    
        private String clientId = UUIDGenerator.generateUUID();
    
        // @formatter:off
        /**
         * 控制是否创建新会话(true=新建,false=复用历史会话). clearStart=true => Broker 会在连接断开后立即清除所有会话信息.
         * clearStart=false => Broker 会在连接断开后保存会话信息,并在重新连接后复用会话信息.
         */
        // @formatter:on
        private boolean clearSession = false;
    
        private int receiveBufferSize = Integer.MAX_VALUE;
    
        private int maxMessageSize = -1;
    
        /**
         * 心跳包每隔60秒发一次.
         */
        private int keepAliveInterval = 60;
    
        private boolean autoKeepAlive = true;
    
        private long reconnectInterval = 1000;
    
        private int reconnectAttempts = Integer.MAX_VALUE;
    
        private Map<String, Integer> topics = new HashMap<>(0);
    
        private int willQos = 1;
    
        private boolean willRetain = false;
    
        private int ackTimeout = -1;
    
        private boolean autoAck = true;
    
        /**
         * 服务下线主题.
         */
        private String willTopic = "/will";
    
        /**
         * 服务下线数据.
         */
        private String willPayload = "offline";
    
    }
    

    VertxConfig

    /**
     * @author laokou
     */
    @Configuration
    public class VertxConfig {
    
        @Bean
        public Vertx vertx() {
           VertxOptions vertxOptions = new VertxOptions();
           vertxOptions.setMaxEventLoopExecuteTime(60);
           vertxOptions.setMaxWorkerExecuteTime(60);
           vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS);
           vertxOptions.setMaxWorkerExecuteTimeUnit(TimeUnit.SECONDS);
           vertxOptions.setPreferNativeTransport(true);
           return Vertx.vertx(vertxOptions);
        }
    
    }
    

    VertxMqttClient

    注意:vertx-mqtt不支持客户端自动断线重连,网络不通畅或连接关闭,需要自己手动调用连接!!!实现这个重连的功能

    /**
     * @author laokou
     */
    @Slf4j
    public class VertxMqttClient {
    
        private final Sinks.Many<MqttPublishMessage> messageSink = Sinks.many()
           .multicast()
           .onBackpressureBuffer(Integer.MAX_VALUE, false);
    
        private final MqttClient mqttClient;
    
        private final Vertx vertx;
    
        private final MqttClientProperties mqttClientProperties;
    
        private final List<MessageHandler> messageHandlers;
    
        private final List<Disposable> disposables;
    
        private final AtomicBoolean isConnected = new AtomicBoolean(false);
    
        private final AtomicBoolean isLoaded = new AtomicBoolean(false);
    
        private final AtomicBoolean isReconnected = new AtomicBoolean(true);
    
        public VertxMqttClient(final Vertx vertx, final MqttClientProperties mqttClientProperties,
              final List<MessageHandler> messageHandlers) {
           this.vertx = vertx;
           this.mqttClientProperties = mqttClientProperties;
           this.mqttClient = MqttClient.create(vertx, getOptions());
           this.messageHandlers = messageHandlers;
           this.disposables = Collections.synchronizedList(new ArrayList<>());
        }
    
        public void open() {
           mqttClient.closeHandler(v -> {
              isConnected.set(false);
              log.error("【Vertx-MQTT-Client】 => MQTT连接断开,客户端ID:{}", mqttClientProperties.getClientId());
              reconnect();
           })
              .publishHandler(messageSink::tryEmitNext)
              // 仅接收QoS1和QoS2的消息
              .publishCompletionHandler(id -> {
                 // log.info("【Vertx-MQTT-Client】 => 接收MQTT的PUBACK或PUBCOMP数据包,数据包ID:{}", id);
              })
              .subscribeCompletionHandler(ack -> {
                 // log.info("【Vertx-MQTT-Client】 => 接收MQTT的SUBACK数据包,数据包ID:{}", ack.messageId());
              })
              .unsubscribeCompletionHandler(id -> {
                 // log.info("【Vertx-MQTT-Client】 => 接收MQTT的UNSUBACK数据包,数据包ID:{}", id);
              })
              .pingResponseHandler(s -> {
                 // log.info("【Vertx-MQTT-Client】 => 接收MQTT的PINGRESP数据包");
              })
              .connect(mqttClientProperties.getPort(), mqttClientProperties.getHost(), connectResult -> {
                 if (connectResult.succeeded()) {
                    isConnected.set(true);
                    log.info("【Vertx-MQTT-Client】 => MQTT连接成功,主机:{},端口:{},客户端ID:{}", mqttClientProperties.getHost(),
                          mqttClientProperties.getPort(), mqttClientProperties.getClientId());
                    resubscribe();
                 }
                 else {
                    isConnected.set(false);
                    Throwable ex = connectResult.cause();
                    log.error("【Vertx-MQTT-Client】 => MQTT连接失败,原因:{},客户端ID:{}", ex.getMessage(),
                          mqttClientProperties.getClientId(), ex);
                    reconnect();
                 }
              });
        }
    
        public void close() {
           disconnect();
        }
    
        /**
         * Sends the PUBLISH message to the remote MQTT server.
         * @param topic topic on which the message is published
         * @param payload message payload
         * @param qos QoS level
         * @param isDup if the message is a duplicate
         * @param isRetain if the message needs to be retained
         */
        public void publish(String topic, int qos, String payload, boolean isDup, boolean isRetain) {
           mqttClient.publish(topic, Buffer.buffer(payload), convertQos(qos), isDup, isRetain);
        }
    
        private void reconnect() {
           if (isReconnected.get()) {
              log.info("【Vertx-MQTT-Client】 => MQTT尝试重连");
              vertx.setTimer(mqttClientProperties.getReconnectInterval(),
                    handler -> ThreadUtils.newVirtualTaskExecutor().execute(this::open));
           }
        }
    
        private void subscribe() {
           Map<String, Integer> topics = mqttClientProperties.getTopics();
           checkTopicAndQos(topics);
           mqttClient.subscribe(topics, subscribeResult -> {
              if (subscribeResult.succeeded()) {
                 log.info("【Vertx-MQTT-Client】 => MQTT订阅成功,主题: {}", String.join("、", topics.keySet()));
              }
              else {
                 Throwable ex = subscribeResult.cause();
                 log.error("【Vertx-MQTT-Client】 => MQTT订阅失败,主题:{},错误信息:{}", String.join("、", topics.keySet()), ex.getMessage(),
                       ex);
              }
           });
        }
    
        private void resubscribe() {
           if (isConnected.get() || mqttClient.isConnected()) {
              ThreadUtils.newVirtualTaskExecutor().execute(this::subscribe);
           }
           if (isLoaded.compareAndSet(false, true)) {
              ThreadUtils.newVirtualTaskExecutor().execute(this::consume);
           }
        }
    
        private void consume() {
           Disposable disposable = messageSink.asFlux().doOnNext(mqttPublishMessage -> {
              String topic = mqttPublishMessage.topicName();
              log.info("【Vertx-MQTT-Client】 => MQTT接收到消息,Topic:{}", topic);
              for (MessageHandler messageHandler : messageHandlers) {
                 if (messageHandler.isSubscribe(topic)) {
                    messageHandler.handle(new MqttMessage(mqttPublishMessage.payload(), topic));
                 }
              }
           }).subscribeOn(Schedulers.boundedElastic()).subscribe();
           disposables.add(disposable);
        }
    
        private void disposable() {
           for (Disposable disposable : disposables) {
              if (ObjectUtils.isNotNull(disposable) && !disposable.isDisposed()) {
                 disposable.dispose();
              }
           }
        }
    
        private void disconnect() {
           isReconnected.set(false);
           mqttClient.disconnect(disconnectResult -> {
              if (disconnectResult.succeeded()) {
                 disposable();
                 log.info("【Vertx-MQTT-Client】 => MQTT断开连接成功");
                 disposables.clear();
              }
              else {
                 Throwable ex = disconnectResult.cause();
                 log.error("【Vertx-MQTT-Client】 => MQTT断开连接失败,错误信息:{}", ex.getMessage(), ex);
              }
           });
        }
    
        private void unsubscribe(List<String> topics) {
           checkTopic(topics);
           mqttClient.unsubscribe(topics, unsubscribeResult -> {
              if (unsubscribeResult.succeeded()) {
                 log.info("【Vertx-MQTT-Client】 => MQTT取消订阅成功,主题:{}", String.join("、", topics));
              }
              else {
                 Throwable ex = unsubscribeResult.cause();
                 log.error("【Vertx-MQTT-Client】 => MQTT取消订阅失败,主题:{},错误信息:{}", String.join("、", topics), ex.getMessage(), ex);
              }
           });
        }
    
        private MqttClientOptions getOptions() {
           MqttClientOptions options = new MqttClientOptions();
           options.setClientId(mqttClientProperties.getClientId());
           options.setCleanSession(mqttClientProperties.isClearSession());
           options.setAutoKeepAlive(mqttClientProperties.isAutoKeepAlive());
           options.setKeepAliveInterval(mqttClientProperties.getKeepAliveInterval());
           options.setReconnectAttempts(mqttClientProperties.getReconnectAttempts());
           options.setReconnectInterval(mqttClientProperties.getReconnectInterval());
           options.setWillQoS(mqttClientProperties.getWillQos());
           options.setWillTopic(mqttClientProperties.getWillTopic());
           options.setAutoAck(mqttClientProperties.isAutoAck());
           options.setAckTimeout(mqttClientProperties.getAckTimeout());
           options.setWillRetain(mqttClientProperties.isWillRetain());
           options.setWillMessageBytes(Buffer.buffer(mqttClientProperties.getWillPayload()));
           options.setReceiveBufferSize(mqttClientProperties.getReceiveBufferSize());
           options.setMaxMessageSize(mqttClientProperties.getMaxMessageSize());
           if (mqttClientProperties.isAuth()) {
              options.setPassword(mqttClientProperties.getPassword());
              options.setUsername(mqttClientProperties.getUsername());
           }
           return options;
        }
    
        private void checkTopicAndQos(Map<String, Integer> topics) {
           topics.forEach((topic, qos) -> {
              if (StringUtils.isEmpty(topic) || ObjectUtils.isNull(qos)) {
                 throw new IllegalArgumentException("【Vertx-MQTT-Client】 => Topic and QoS cannot be null");
              }
           });
        }
    
        private void checkTopic(List<String> topics) {
           if (CollectionUtils.isEmpty(topics)) {
              throw new IllegalArgumentException("【Vertx-MQTT-Client】 => Topics list cannot be empty");
           }
        }
    
        private MqttQoS convertQos(int qos) {
           return MqttQoS.valueOf(qos);
        }
    
    }
    

    VertxMqttClientTest

    /**
     * @author laokou
     */
    @SpringBootTest
    @RequiredArgsConstructor
    @ContextConfiguration(classes = { DefaultMessageHandler.class, VertxConfig.class })
    @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
    class VertxMqttClientTest {
    
        private final List<MessageHandler> messageHandlers;
    
        private final Vertx vertx;
    
        @Test
        void testMqttClient() throws InterruptedException {
           MqttClientProperties properties = new MqttClientProperties();
           properties.setHost("127.0.0.1");
           properties.setPort(1883);
           properties.setUsername("emqx");
           properties.setPassword("laokou123");
           properties.setClientId("test-client-1");
           properties.setTopics(Map.of("/test-topic-1/#", 1));
           VertxMqttClient vertxMqttClient = new VertxMqttClient(vertx, properties, messageHandlers);
           Assertions.assertDoesNotThrow(vertxMqttClient::open);
           Thread.sleep(500);
           Assertions.assertDoesNotThrow(() -> vertxMqttClient.publish("/test-topic-1/test", 1, "test", false, false));
           Thread.sleep(500);
           Assertions.assertDoesNotThrow(vertxMqttClient::close);
           Thread.sleep(500);
        }
    
    }
    

    详细代码请点击 (opens new window)

    非常推荐使用vertx-mqtt,项目平稳运行好用!!!

    但是,需要时注意的是,项目部署到Linux系统,需要最少分配 -Xmx2100m -Xms2100m 内存,不然连接会关闭!

    我是老寇,我们下次再见啦~

    上次更新: 6/2/2025, 11:46:37 PM
    React快速开发Api
    物联网之使用Vertx实现MQTT-Server最佳实践【响应式】

    ← React快速开发Api 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】→

    Theme by Vdoing | Copyright © 2022-2025 laokou | Apache 2.0 License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式