KCloud-Platform-IoT KCloud-Platform-IoT
首页
  • 开发手册

    • 组件【ai】
    • 组件【algorithm】
    • 组件【banner】
    • 组件【clickhouse】
    • 组件【core】
    • 组件【bom】
    • 组件【cors】
    • 组件【domain】
    • 组件【crypto】
    • 组件【data-cache】
    • 组件【dubbo】
    • 组件【elasticsearch】
    • 组件【excel】
    • 组件【extension】
    • 组件【flink】
    • 组件【grpc】
    • 组件【i18n】
    • 组件【idempotent】
    • 组件【influxdb】
    • 组件【kafka】
    • 组件【log】
    • 组件【lock】
    • 组件【mail】
    • 组件【log4j2】
    • 组件【mqtt】
    • 组件【mongodb】
    • 组件【mybatis-plus】
    • 组件【nacos】
    • 组件【netty】
    • 组件【openapi-doc】
    • 组件【openfeign】
    • 组件【oss】
    • 组件【prometheus】
    • 组件【rabbitmq】
    • 组件【rate-limiter】
    • 组件【reactor】
    • 组件【redis】
    • 组件【rocketmq】
    • 组件【ruleengine】
    • 组件【secret】
    • 组件【security】
    • 组件【sensitive】
    • 组件【sentinel】
    • 组件【sms】
    • 组件【snail-job】
    • 组件【spark】
    • 组件【starrocks】
    • 组件【statemachine】
    • 组件【storage】
    • 组件【tdengine】
    • 组件【tenant】
    • 组件【test】
    • 组件【trace】
    • 组件【xss】
    • 组件【shardingsphere】
  • 环境搭建

    • Centos7安装Mysql 8.0.33
    • Centos7安装Redis 7.0.11
    • Centos7安装RocketMQ 5.1.1
    • Centos7安装Jdk 17.0.7
    • Centos7安装Docker 23.0.6
    • Centos7安装Elasticsearch 8.6.2
    • Docker安装RabbitMQ 3.12.2
    • Docker安装Postgresql 16.1
    • Ubuntu20.04安装Docker
    • Ubuntu20.04忘记密码或指纹错误
  • 常用命令

    • Centos7常用命令
    • Centos7常用命令
  • 快速上手

    • 项目启动【dev环境】
    • 项目启动【test环境】
    • 项目启动【prod环境】
    • 更新日志
  • 前端指南

    • 前端启动
  • 后端指南

    • COLA代码规范
    • SSL证书
    • 一键修改项目模块
    • 一键生成项目骨架
    • 一键修改项目版本号
    • 一键跳过测试用例
    • 一键生成后端COLA代码
    • 分布式链路跟踪之ELK日志
    • 一键检查代码规范
    • 动态路由
  • 项目部署

    • 项目部署之镜像打包与推送
  • 其他

    • Java如何快速转Go
    • Go快速开发API
    • Vue快速开发Api
    • React快速开发Api
  • 文章

    • 物联网之对接MQTT最佳实践
    • 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】
    • vue3+lime-echart各种图表使用【懒人专用,建议收藏】
    • 物联网之使用Vertx实现TCP最佳实践【响应式】
    • 物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】
    • 物联网之使用Vertx实现UDP最佳实践【响应式】
  • 儒学

    • 儒学摘抄(一)
  • 禅语

    • 禅语摘抄(一)
  • 诗词

    • 诗词摘抄(一)
  • 道法

    • 道法摘抄(一)
  • 养生

    • 养生摘抄(一)
  • 读后感

    • 读《强者,都是含泪奔跑的人》读后感
  • 修行

    • 修身/养生/情感
  • 觉悟

    • 觉悟日记(一)
赞助
项目课程 (opens new window)
GitHub (opens new window)
首页
  • 开发手册

    • 组件【ai】
    • 组件【algorithm】
    • 组件【banner】
    • 组件【clickhouse】
    • 组件【core】
    • 组件【bom】
    • 组件【cors】
    • 组件【domain】
    • 组件【crypto】
    • 组件【data-cache】
    • 组件【dubbo】
    • 组件【elasticsearch】
    • 组件【excel】
    • 组件【extension】
    • 组件【flink】
    • 组件【grpc】
    • 组件【i18n】
    • 组件【idempotent】
    • 组件【influxdb】
    • 组件【kafka】
    • 组件【log】
    • 组件【lock】
    • 组件【mail】
    • 组件【log4j2】
    • 组件【mqtt】
    • 组件【mongodb】
    • 组件【mybatis-plus】
    • 组件【nacos】
    • 组件【netty】
    • 组件【openapi-doc】
    • 组件【openfeign】
    • 组件【oss】
    • 组件【prometheus】
    • 组件【rabbitmq】
    • 组件【rate-limiter】
    • 组件【reactor】
    • 组件【redis】
    • 组件【rocketmq】
    • 组件【ruleengine】
    • 组件【secret】
    • 组件【security】
    • 组件【sensitive】
    • 组件【sentinel】
    • 组件【sms】
    • 组件【snail-job】
    • 组件【spark】
    • 组件【starrocks】
    • 组件【statemachine】
    • 组件【storage】
    • 组件【tdengine】
    • 组件【tenant】
    • 组件【test】
    • 组件【trace】
    • 组件【xss】
    • 组件【shardingsphere】
  • 环境搭建

    • Centos7安装Mysql 8.0.33
    • Centos7安装Redis 7.0.11
    • Centos7安装RocketMQ 5.1.1
    • Centos7安装Jdk 17.0.7
    • Centos7安装Docker 23.0.6
    • Centos7安装Elasticsearch 8.6.2
    • Docker安装RabbitMQ 3.12.2
    • Docker安装Postgresql 16.1
    • Ubuntu20.04安装Docker
    • Ubuntu20.04忘记密码或指纹错误
  • 常用命令

    • Centos7常用命令
    • Centos7常用命令
  • 快速上手

    • 项目启动【dev环境】
    • 项目启动【test环境】
    • 项目启动【prod环境】
    • 更新日志
  • 前端指南

    • 前端启动
  • 后端指南

    • COLA代码规范
    • SSL证书
    • 一键修改项目模块
    • 一键生成项目骨架
    • 一键修改项目版本号
    • 一键跳过测试用例
    • 一键生成后端COLA代码
    • 分布式链路跟踪之ELK日志
    • 一键检查代码规范
    • 动态路由
  • 项目部署

    • 项目部署之镜像打包与推送
  • 其他

    • Java如何快速转Go
    • Go快速开发API
    • Vue快速开发Api
    • React快速开发Api
  • 文章

    • 物联网之对接MQTT最佳实践
    • 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】
    • vue3+lime-echart各种图表使用【懒人专用,建议收藏】
    • 物联网之使用Vertx实现TCP最佳实践【响应式】
    • 物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】
    • 物联网之使用Vertx实现UDP最佳实践【响应式】
  • 儒学

    • 儒学摘抄(一)
  • 禅语

    • 禅语摘抄(一)
  • 诗词

    • 诗词摘抄(一)
  • 道法

    • 道法摘抄(一)
  • 养生

    • 养生摘抄(一)
  • 读后感

    • 读《强者,都是含泪奔跑的人》读后感
  • 修行

    • 修身/养生/情感
  • 觉悟

    • 觉悟日记(一)
赞助
项目课程 (opens new window)
GitHub (opens new window)
  • 开发手册

    • 组件【ai】
    • 组件【algorithm】
    • 组件【banner】
    • 组件【clickhouse】
    • 组件【core】
    • 组件【bom】
    • 组件【cors】
    • 组件【domain】
    • 组件【crypto】
    • 组件【data-cache】
    • 组件【dubbo】
    • 组件【elasticsearch】
    • 组件【excel】
    • 组件【extension】
    • 组件【flink】
    • 组件【grpc】
    • 组件【i18n】
    • 组件【idempotent】
    • 组件【influxdb】
    • 组件【kafka】
    • 组件【log】
    • 组件【lock】
    • 组件【mail】
    • 组件【log4j2】
    • 组件【mqtt】
    • 组件【mongodb】
    • 组件【mybatis-plus】
    • 组件【nacos】
    • 组件【netty】
    • 组件【openapi-doc】
    • 组件【openfeign】
    • 组件【oss】
    • 组件【prometheus】
    • 组件【rabbitmq】
    • 组件【rate-limiter】
    • 组件【reactor】
    • 组件【redis】
    • 组件【rocketmq】
    • 组件【ruleengine】
    • 组件【secret】
    • 组件【security】
    • 组件【sensitive】
    • 组件【sentinel】
    • 组件【sms】
    • 组件【snail-job】
    • 组件【spark】
    • 组件【starrocks】
    • 组件【statemachine】
    • 组件【storage】
    • 组件【tdengine】
    • 组件【tenant】
    • 组件【test】
    • 组件【trace】
    • 组件【xss】
    • 组件【shardingsphere】
  • 环境搭建

    • Centos7安装Mysql 8.0.33
    • Centos7安装Redis 7.0.11
    • Centos7安装RocketMQ 5.1.1
    • Centos7安装Jdk 17.0.7
    • Centos7安装Docker 23.0.6
    • Docker安装RabbitMQ 3.12.2
    • Centos7安装Elasticsearch 8.6.2
    • Docker安装Postgresql 16.1
    • Ubuntu20.04安装Docker
    • Ubuntu20.04忘记密码或指纹错误
  • 常用命令

    • Centos7常用命令
    • Docker常用命令
  • 快速上手

    • 项目启动【dev环境】
    • 项目启动【test环境】
    • 项目启动【prod环境】
    • 更新日志
  • 前端指南

    • 前端启动
  • 后端指南

    • COLA代码规范
    • SSL证书
    • 一键修改项目模块
    • 一键生成项目骨架
    • 一键修改项目版本号
    • 一键跳过测试用例
    • 一键生成后端COLA代码
    • 分布式链路跟踪之ELK日志
    • 一键检查代码规范
    • 动态路由
    • OAuth2.1流程
  • 项目部署

    • 项目部署之镜像打包与推送
  • 其他

    • Java如何快速转Go
    • Go快速开发API
    • Vue快速开发Api
    • React快速开发Api
  • 文章

    • 物联网之对接MQTT最佳实践
    • 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】
      • vue3+lime-echart各种图表使用【懒人专用,建议收藏】
      • 物联网之使用Vertx实现TCP最佳实践【响应式】
      • 物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】
      • 物联网之使用Vertx实现UDP最佳实践【响应式】
    • 指南
    • 文章
    KCloud-Platform-IoT
    2025-05-10
    目录

    物联网之使用Vertx实现MQTT-Server最佳实践【响应式】

    你好呀,我的老朋友!我是老寇,欢迎来到老寇云平台!

    跟我一起学习使用Vertx实现MQTT-Server

    # 实现MQTT-Server【响应式】

    vertx-mqtt地址 (opens new window)

    # 实现思路

    1.启动MQTT Server并绑定很多端口记录到缓存,服务注册到Nacos,通过接口的方式获取IP和端口【负载均衡】 2.MQTT Client连接MQTT Server并上报数据 3.MQTT Server接收到数据并通过MQ转发出去

    # 代码比较简单,懒得讲解啦
    # 代码比较简单,懒得讲解啦
    # 代码比较简单,懒得讲解啦
    # 实现过程

    查看源码 (opens new window)

    # kafka安装

    采用docker-compose一键式启动!!!

    还没有安装docker朋友,参考文章下面两篇文章

    # Ubuntu20.04安装Docker (opens new window)

    # Centos7安装Docker 23.0.6 (opens new window)

    services:
        kafka:
          image: bitnami/kafka:4.0.0
          container_name: kafka
          tty: true
          ports:
            - '9092:9092'
            - '9093:9093'
          environment:
            # 节点ID
            - KAFKA_BROKER_ID=1
            # 允许使用kraft,即Kafka替代Zookeeper
            - KAFKA_ENABLE_KRAFT=yes
            # kafka角色,做broker,也要做controller
            - KAFKA_CFG_PROCESS_ROLES=broker,controller
            # 指定供外部使用的控制类请求信息
            - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
            # 定义安全协议
            - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
            # 定义kafka服务端socket监听端口
            - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
            # 外网访问地址
            - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
            # 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
            - ALLOW_PLAINTEXT_LISTENER=yes
            # 设置broker最大内存,和初始内存
            - KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
            # 集群地址
            - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
            # 节点ID
            - KAFKA_CFG_NODE_ID=1
          restart: always
          privileged: true
          networks:
            - laokou_network
    networks:
        laokou_network:
            driver: bridge
    
    # 创建topic【进入bin目录执行】 => 每个topic 3个分区和一个副本
    kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic laokou_mqtt_property_report --partitions 3 --replication-factor 1
    
    kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic laokou_mqtt_property_reply --partitions 3 --replication-factor 1
    
    # kafka【响应式】

    1.依赖

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>3.3.5</version>
    </dependency>
    <dependency>
      <groupId>io.projectreactor.kafka</groupId>
      <artifactId>reactor-kafka</artifactId>
      <version>1.3.23</version>
    </dependency>
    

    2.代码

    KafkaAutoConfig

    /**
     * @author laokou
     */
    @Configuration
    public class KafkaAutoConfig {
    
        @Bean("defaultKafkaTemplate")
        @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)
        public DefaultKafkaTemplate defaultKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {
           return new DefaultKafkaTemplate(kafkaTemplate);
        }
    
        @Bean(value = "reactiveKafkaSender")
        @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
        public KafkaSender reactiveKafkaSender(SenderOptions<String, String> senderOptions) {
           return new ReactiveKafkaSender(
                 new reactor.kafka.sender.internals.DefaultKafkaSender<>(ProducerFactory.INSTANCE, senderOptions));
        }
    
        @Bean
        @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
        public SenderOptions<String, String> senderOptions(KafkaProperties kafkaProperties) {
           Map<String, Object> props = new HashMap<>();
           KafkaProperties.Producer producer = kafkaProperties.getProducer();
           props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
           props.put(ProducerConfig.ACKS_CONFIG, producer.getAcks());
           props.put(ProducerConfig.RETRIES_CONFIG, producer.getRetries());
           props.put(ProducerConfig.BATCH_SIZE_CONFIG, (int) producer.getBatchSize().toBytes());
           props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, (int) producer.getBufferMemory().toBytes());
           props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           return SenderOptions.create(props);
        }
    
        @Bean
        @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
        public ReceiverOptions<String, String> receiverOptions(KafkaProperties kafkaProperties) {
           Map<String, Object> props = new HashMap<>();
           KafkaProperties.Consumer consumer = kafkaProperties.getConsumer();
           props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
           props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer.getGroupId());
           props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumer.getMaxPollRecords());
           props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer.getEnableAutoCommit());
           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
           return ReceiverOptions.create(props);
        }
    
    }
    

    KafkaSender

    /**
     * @author laokou
     */
    public interface KafkaSender {
    
        Flux<Boolean> send(String topic, String payload);
    
    }
    

    ReactiveKafkaSender

    /**
     * @author laokou
     */
    @Slf4j
    @RequiredArgsConstructor
    public class ReactiveKafkaSender implements KafkaSender {
    
        private final DefaultKafkaSender<String, String> defaultKafkaSender;
    
        @Override
        public Flux<Boolean> send(String topic, String payload) {
           return defaultKafkaSender.send(Mono.just(SenderRecord.create(topic, null, null, null, payload, null)))
              .map(result -> {
                 Exception exception = result.exception();
                 if (ObjectUtils.isNotNull(exception)) {
                    log.error("【Kafka】 => 发送消息失败,错误信息:{}", exception.getMessage(), exception);
                    return false;
                 }
                 else {
                    return true;
                 }
              });
        }
    
    }
    

    3.yaml配置【自动批量提交】

    spring:
      kafka:
        bootstrap-servers: kafka:9092
        consumer:
          group-id: laokou-mqtt
          # 禁用自动提交(按周期)已消费offset
          enable-auto-commit: true
          # 单次poll()调用返回的记录数
          max-poll-records: 50
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
          # 发生错误后,消息重发的次数。
          retries: 5
          # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
          batch-size: 16384
          # 设置生产者内存缓冲区的大小。
          buffer-memory: 33554432
          # 键的序列化方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          # 值的序列化方式
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
          # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
          # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
          acks: 0
        listener:
          # 在侦听器容器中运行的线程数。
          concurrency: 5
          # 批量提交模式
          ack-mode: batch
          # 批量batch类型
          type: batch
          # topic不存在报错
          missing-topics-fatal: false
        admin:
          auto-create: false
    
    # mqtt-server【响应式】

    依赖

    <dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-mqtt</artifactId>
      <version>4.5.14</version>
    </dependency>
    

    VertxConfig

    /**
     * @author laokou
     */
    @Configuration
    public class VertxConfig {
    
        @Bean(destroyMethod = "close")
        public Vertx vertx() {
           VertxOptions vertxOptions = new VertxOptions();
           vertxOptions.setMaxEventLoopExecuteTime(30);
           vertxOptions.setWorkerPoolSize(40);
           vertxOptions.setMaxWorkerExecuteTime(30);
           vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS);
           vertxOptions.setMaxWorkerExecuteTimeUnit(TimeUnit.SECONDS);
           vertxOptions.setPreferNativeTransport(true);
           vertxOptions.setInternalBlockingPoolSize(40);
           vertxOptions.setEventLoopPoolSize(Math.max(32, 2 * CpuCoreSensor.availableProcessors()));
           return Vertx.vertx(vertxOptions);
        }
    
    }
    

    MqttServerProperties【配置了账号和密码】

    /**
     * @author laokou
     */
    @Data
    @Component
    @ConfigurationProperties(prefix = "spring.mqtt-server")
    public class MqttServerProperties {
    
        private boolean auth = true;
    
        private String username = "vertx";
    
        private String password = "laokou123";
    
        private String host = "0.0.0.0";
    
        private int port = 0;
    
        private int threadSize = 32;
    
        private int maxMessageSize = 8192;
    
        private boolean isAutoClientId = true;
    
        private int maxClientIdLength = 30;
    
        private int timeoutOnConnect = 90;
    
        private boolean useWebSocket = false;
    
        private int webSocketMaxFrameSize = 65536;
    
        private boolean perFrameWebSocketCompressionSupported = true;
    
        private boolean perMessageWebSocketCompressionSupported = true;
    
        private int webSocketCompressionLevel = 6;
    
        private boolean webSocketAllowServerNoContext = false;
    
        private boolean webSocketPreferredClientNoContext = false;
    
        private boolean tcpNoDelay = true;
    
        private boolean tcpKeepAlive = false;
    
        private int tcpKeepAliveIdleSeconds = -1;
    
        private int tcpKeepAliveCount = -1;
    
        private int tcpKeepAliveIntervalSeconds = -1;
    
        private int soLinger = -1;
    
        private int idleTimeout = 0;
    
        private int readIdleTimeout = 0;
    
        private int writeIdleTimeout = 0;
    
        private TimeUnit idleTimeoutUnit = TimeUnit.SECONDS;
    
        private boolean ssl = false;
    
        private boolean tcpFastOpen = false;
    
        private boolean tcpCork = false;
    
        private boolean tcpQuickAck = false;
    
        private int tcpUserTimeout = 0;
    
    }
    

    VertxMqttServer

    /**
     * @author laokou
     */
    @Slf4j
    public final class VertxMqttServer {
    
        private final Sinks.Many<MqttPublishMessage> messageSink = Sinks.many()
           .multicast()
           .onBackpressureBuffer(Integer.MAX_VALUE, false);
    
        private volatile Flux<MqttServer> mqttServer;
    
        private final Vertx vertx;
    
        private final MqttServerProperties properties;
    
        private final List<ReactiveMessageHandler> reactiveMessageHandlers;
    
        private volatile boolean isClosed = false;
    
        public VertxMqttServer(final Vertx vertx, final MqttServerProperties properties,
              List<ReactiveMessageHandler> reactiveMessageHandlers) {
           this.properties = properties;
           this.vertx = vertx;
           this.reactiveMessageHandlers = reactiveMessageHandlers;
        }
    
        public Flux<MqttServer> start() {
           return mqttServer = getMqttServerOptions().map(mqttServerOption -> MqttServer.create(vertx, mqttServerOption)
              .exceptionHandler(
                    error -> log.error("【Vertx-MQTT-Server】 => MQTT服务启动失败,错误信息:{}", error.getMessage(), error))
              .endpointHandler(endpoint -> Optional.ofNullable(authHandler(endpoint))
                 .ifPresent(e -> e.closeHandler(close -> log.info("【Vertx-MQTT-Server】 => MQTT客户端断开连接"))
                    .subscribeHandler(subscribe -> {
                       for (MqttTopicSubscription topicSubscription : subscribe.topicSubscriptions()) {
                          log.info("【Vertx-MQTT-Server】 => MQTT客户端订阅主题:{}", topicSubscription.topicName());
                       }
                    })
                    .disconnectHandler(disconnect -> log.info("【Vertx-MQTT-Server】 => MQTT客户端主动断开连接"))
                    .pingHandler(ping -> log.info("【Vertx-MQTT-Server】 => MQTT客户端发送心跳"))
                    .publishHandler(messageSink::tryEmitNext)
                    // 不保留会话
                    .accept(false)))
              .listen(mqttServerOption.getPort(), mqttServerOption.getHost(), asyncResult -> {
                 if (isClosed) {
                    return;
                 }
                 if (asyncResult.succeeded()) {
                    log.info("【Vertx-MQTT-Server】 => MQTT服务启动成功,主机:{},端口:{}", mqttServerOption.getHost(),
                          mqttServerOption.getPort());
                    // 写入缓存
                    PortCache.add(mqttServerOption.getPort());
                 }
                 else {
                    log.error("【Vertx-MQTT-Server】 => MQTT服务启动失败,主机:{},端口:{},错误信息:{}", mqttServerOption.getHost(),
                          mqttServerOption.getPort(), asyncResult.cause().getMessage(), asyncResult.cause());
                 }
              }));
        }
    
        public Flux<MqttServer> stop() {
           isClosed = true;
           return mqttServer.doOnNext(server -> server.close(completionHandler -> {
              if (completionHandler.succeeded()) {
                 log.info("【Vertx-MQTT-Server】 => MQTT服务停止成功");
              }
              else {
                 log.error("【Vertx-MQTT-Server】 => MQTT服务停止失败,错误信息:{}", completionHandler.cause().getMessage(),
                       completionHandler.cause());
              }
           }));
        }
    
        public Flux<Boolean> publish() {
           return messageSink.asFlux().flatMap(message -> {
              // @formatter:off
                 // log.info("【Vertx-MQTT-Server】 => MQTT服务接收到消息,主题:{},内容:{}", message.topicName(), message.payload().toString());
                 // @formatter:on
              return Flux
                 .fromStream(reactiveMessageHandlers.stream()
                    .filter(reactiveMessageHandler -> reactiveMessageHandler.isSubscribe(message.topicName())))
                 .flatMap(reactiveMessageHandler -> reactiveMessageHandler
                    .handle(new MqttMessage(message.payload(), message.topicName())));
           });
        }
    
        private int detectAvailablePort(String host) {
           try (ServerSocket socket = SSLServerSocketFactory.getDefault().createServerSocket()) {
              socket.bind(new InetSocketAddress(host, properties.getPort()));
              return socket.getLocalPort();
           }
           catch (IOException e) {
              throw new RuntimeException("Port auto-detection failed", e);
           }
        }
    
        private Flux<MqttServerOptions> getMqttServerOptions() {
           return Flux.range(1, Math.max(properties.getThreadSize(), CpuCoreSensor.availableProcessors()))
              .map(item -> getMqttServerOption());
        }
    
        /**
         * 认证.
         */
        private MqttEndpoint authHandler(MqttEndpoint endpoint) {
           MqttAuth mqttAuth = endpoint.auth();
           if (properties.isAuth()) {
              if (ObjectUtils.isNull(mqttAuth)) {
                 endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
                 return null;
              }
              if (!ObjectUtils.equals(mqttAuth.getUsername(), properties.getUsername())
                    || !ObjectUtils.equals(mqttAuth.getPassword(), properties.getPassword())) {
                 endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
                 return null;
              }
           }
           return endpoint;
        }
    
        // @formatter:off
        private MqttServerOptions getMqttServerOption() {
           MqttServerOptions mqttServerOptions = new MqttServerOptions();
           mqttServerOptions.setHost(properties.getHost());
           mqttServerOptions.setPort(detectAvailablePort(properties.getHost()));
           mqttServerOptions.setMaxMessageSize(properties.getMaxMessageSize());
           mqttServerOptions.setAutoClientId(properties.isAutoClientId());
           mqttServerOptions.setMaxClientIdLength(properties.getMaxClientIdLength());
           mqttServerOptions.setTimeoutOnConnect(properties.getTimeoutOnConnect());
           mqttServerOptions.setUseWebSocket(properties.isUseWebSocket());
           mqttServerOptions.setWebSocketMaxFrameSize(properties.getWebSocketMaxFrameSize());
           mqttServerOptions.setPerFrameWebSocketCompressionSupported(properties.isPerFrameWebSocketCompressionSupported());
           mqttServerOptions.setPerMessageWebSocketCompressionSupported(properties.isPerMessageWebSocketCompressionSupported());
           mqttServerOptions.setWebSocketCompressionLevel(properties.getWebSocketCompressionLevel());
           mqttServerOptions.setWebSocketAllowServerNoContext(properties.isWebSocketAllowServerNoContext());
           mqttServerOptions.setWebSocketPreferredClientNoContext(properties.isWebSocketPreferredClientNoContext());
           mqttServerOptions.setTcpNoDelay(properties.isTcpNoDelay());
           mqttServerOptions.setTcpKeepAlive(properties.isTcpKeepAlive());
           mqttServerOptions.setTcpKeepAliveIdleSeconds(properties.getTcpKeepAliveIdleSeconds());
           mqttServerOptions.setTcpKeepAliveIntervalSeconds(properties.getTcpKeepAliveIntervalSeconds());
           mqttServerOptions.setTcpKeepAliveCount(properties.getTcpKeepAliveCount());
           mqttServerOptions.setSoLinger(properties.getSoLinger());
           mqttServerOptions.setIdleTimeout(properties.getIdleTimeout());
           mqttServerOptions.setReadIdleTimeout(properties.getReadIdleTimeout());
           mqttServerOptions.setWriteIdleTimeout(properties.getWriteIdleTimeout());
           mqttServerOptions.setIdleTimeoutUnit(properties.getIdleTimeoutUnit());
           mqttServerOptions.setSsl(properties.isSsl());
           mqttServerOptions.setTcpFastOpen(properties.isTcpFastOpen());
           mqttServerOptions.setTcpCork(properties.isTcpCork());
           mqttServerOptions.setTcpQuickAck(properties.isTcpQuickAck());
           mqttServerOptions.setTcpUserTimeout(properties.getTcpUserTimeout());
           return mqttServerOptions;
        }
        // @formatter:on
    
    }
    

    PortCache【缓存端口】

    /**
     * @author laokou
     */
    public final class PortCache {
    
        private PortCache() {
        }
    
        public static final List<Integer> PORTS = new CopyOnWriteArrayList<>();
    
        public static void add(int port) {
           PORTS.add(port);
        }
    
        public static List<Integer> get() {
           return PORTS;
        }
    
        public static void clear() {
           PORTS.clear();
        }
    
    }
    

    ReactiveMessageHandler【消息处理,没啥好说的,就是用来转发消息到MQ】

    /**
     * @author laokou
     */
    public interface ReactiveMessageHandler {
    
        boolean isSubscribe(String topic);
    
        Flux<Boolean> handle(MqttMessage mqttMessage);
    
    }
    
    /**
     * 属性回复消息处理器.
     *
     * @author laokou
     */
    @Component
    @RequiredArgsConstructor
    public class ReactivePropertyReplyMessageHandler implements ReactiveMessageHandler {
    
        private final KafkaSender kafkaSender;
    
        @Override
        public boolean isSubscribe(String topic) {
           return TopicUtils.match("/+/+/property/reply", topic);
        }
    
        @Override
        public Flux<Boolean> handle(MqttMessage mqttMessage) {
           return kafkaSender.send(LAOKOU_MQTT_PROPERTY_REPLY, mqttMessage.getPayload().toString());
        }
    
    }
    
    /**
     * 属性上报消息处理.
     *
     * @author laokou
     */
    @Component
    @RequiredArgsConstructor
    public class ReactivePropertyReportMessageHandler implements ReactiveMessageHandler {
    
        private final KafkaSender kafkaSender;
    
        @Override
        public boolean isSubscribe(String topic) {
           return TopicUtils.match("/+/+/property/report", topic);
        }
    
        @Override
        public Flux<Boolean> handle(MqttMessage mqttMessage) {
           return kafkaSender.send(LAOKOU_MQTT_PROPERTY_REPORT, mqttMessage.getPayload().toString());
        }
    
    }
    

    配置yaml

    spring:
      application:
        name: ${SERVICE_ID:laokou-mqtt}
      threads:
        virtual:
          enabled: true
      mqtt-server:
        auth: true
        username: vertx
        password: laokou123
        # 开启8192个端口
        thread-size: 8192
    

    启动MQTT-Server

    /**
     * @author laokou
     */
    @Slf4j
    @EnableDiscoveryClient
    @RequiredArgsConstructor
    @EnableConfigurationProperties
    @SpringBootApplication(scanBasePackages = "org.laokou")
    public class MqttServerApp implements CommandLineRunner {
    
        private final Vertx vertx;
    
        private final MqttServerProperties properties;
    
        private final List<ReactiveMessageHandler> reactiveMessageHandlers;
    
        private final ExecutorService virtualThreadExecutor;
    
        @Override
        public void run(String... args) {
           virtualThreadExecutor.execute(this::listenMessage);
        }
    
        private void listenMessage() {
           VertxMqttServer vertxMqttServer = new VertxMqttServer(vertx, properties, reactiveMessageHandlers);
           // 启动服务
           vertxMqttServer.start().subscribeOn(Schedulers.boundedElastic()).subscribe();
           // 发布数据
           vertxMqttServer.publish().subscribeOn(Schedulers.boundedElastic()).subscribe();
           Runtime.getRuntime().addShutdownHook(new Thread(() -> {
              // 清除缓存
              PortCache.clear();
              // 停止服务
              vertxMqttServer.stop().subscribeOn(Schedulers.boundedElastic()).subscribe();
           }));
        }
    
    }
    

    启动好之后,请自行测试,这个东西没啥好说,vertx帮我们都实现了,就是简单调用API,自己玩吧~

    我是老寇,我们下次再见啦~

    上次更新: 5/21/2025, 5:17:33 PM
    物联网之对接MQTT最佳实践
    vue3+lime-echart各种图表使用【懒人专用,建议收藏】

    ← 物联网之对接MQTT最佳实践 vue3+lime-echart各种图表使用【懒人专用,建议收藏】→

    Theme by Vdoing | Copyright © 2022-2025 laokou | Apache 2.0 License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式