KCloud-Platform-IoT KCloud-Platform-IoT
首页
  • 开发手册

    • 组件【ai】
    • 组件【algorithm】
    • 组件【banner】
    • 组件【clickhouse】
    • 组件【core】
    • 组件【bom】
    • 组件【cors】
    • 组件【domain】
    • 组件【crypto】
    • 组件【data-cache】
    • 组件【dubbo】
    • 组件【elasticsearch】
    • 组件【excel】
    • 组件【extension】
    • 组件【flink】
    • 组件【grpc】
    • 组件【i18n】
    • 组件【idempotent】
    • 组件【influxdb】
    • 组件【kafka】
    • 组件【log】
    • 组件【lock】
    • 组件【mail】
    • 组件【log4j2】
    • 组件【mqtt】
    • 组件【mongodb】
    • 组件【mybatis-plus】
    • 组件【nacos】
    • 组件【netty】
    • 组件【openapi-doc】
    • 组件【openfeign】
    • 组件【oss】
    • 组件【prometheus】
    • 组件【rabbitmq】
    • 组件【rate-limiter】
    • 组件【reactor】
    • 组件【redis】
    • 组件【rocketmq】
    • 组件【ruleengine】
    • 组件【secret】
    • 组件【security】
    • 组件【sensitive】
    • 组件【sentinel】
    • 组件【sms】
    • 组件【snail-job】
    • 组件【spark】
    • 组件【starrocks】
    • 组件【statemachine】
    • 组件【storage】
    • 组件【tdengine】
    • 组件【tenant】
    • 组件【test】
    • 组件【trace】
    • 组件【xss】
    • 组件【shardingsphere】
  • 环境搭建

    • Centos7安装Mysql 8.0.33
    • Centos7安装Redis 7.0.11
    • Centos7安装RocketMQ 5.1.1
    • Centos7安装Jdk 17.0.7
    • Centos7安装Docker 23.0.6
    • Centos7安装Elasticsearch 8.6.2
    • Docker安装RabbitMQ 3.12.2
    • Docker安装Postgresql 16.1
    • Ubuntu20.04安装Docker
    • Ubuntu20.04忘记密码或指纹错误
  • 常用命令

    • Centos7常用命令
    • Centos7常用命令
  • 快速上手

    • 项目启动【dev环境】
    • 项目启动【test环境】
    • 项目启动【prod环境】
    • 更新日志
  • 前端指南

    • 前端启动
  • 后端指南

    • COLA代码规范
    • SSL证书
    • 一键修改项目模块
    • 一键生成项目骨架
    • 一键修改项目版本号
    • 一键跳过测试用例
    • 一键生成后端COLA代码
    • 分布式链路跟踪之ELK日志
    • 一键检查代码规范
    • 动态路由
  • 项目部署

    • 项目部署之镜像打包与推送
  • 其他

    • Java如何快速转Go
    • Go快速开发API
    • Vue快速开发Api
    • React快速开发Api
  • 文章

    • 物联网之对接MQTT最佳实践
    • 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】
    • vue3+lime-echart各种图表使用【懒人专用,建议收藏】
    • 物联网之使用Vertx实现TCP最佳实践【响应式】
    • 物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】
    • 物联网之使用Vertx实现UDP最佳实践【响应式】
  • 儒学

    • 儒学摘抄(一)
  • 禅语

    • 禅语摘抄(一)
  • 诗词

    • 诗词摘抄(一)
  • 道法

    • 道法摘抄(一)
  • 养生

    • 养生摘抄(一)
  • 读后感

    • 读《强者,都是含泪奔跑的人》读后感
  • 修行

    • 修身/养生/情感
  • 觉悟

    • 觉悟日记(一)
赞助
项目课程 (opens new window)
GitHub (opens new window)
首页
  • 开发手册

    • 组件【ai】
    • 组件【algorithm】
    • 组件【banner】
    • 组件【clickhouse】
    • 组件【core】
    • 组件【bom】
    • 组件【cors】
    • 组件【domain】
    • 组件【crypto】
    • 组件【data-cache】
    • 组件【dubbo】
    • 组件【elasticsearch】
    • 组件【excel】
    • 组件【extension】
    • 组件【flink】
    • 组件【grpc】
    • 组件【i18n】
    • 组件【idempotent】
    • 组件【influxdb】
    • 组件【kafka】
    • 组件【log】
    • 组件【lock】
    • 组件【mail】
    • 组件【log4j2】
    • 组件【mqtt】
    • 组件【mongodb】
    • 组件【mybatis-plus】
    • 组件【nacos】
    • 组件【netty】
    • 组件【openapi-doc】
    • 组件【openfeign】
    • 组件【oss】
    • 组件【prometheus】
    • 组件【rabbitmq】
    • 组件【rate-limiter】
    • 组件【reactor】
    • 组件【redis】
    • 组件【rocketmq】
    • 组件【ruleengine】
    • 组件【secret】
    • 组件【security】
    • 组件【sensitive】
    • 组件【sentinel】
    • 组件【sms】
    • 组件【snail-job】
    • 组件【spark】
    • 组件【starrocks】
    • 组件【statemachine】
    • 组件【storage】
    • 组件【tdengine】
    • 组件【tenant】
    • 组件【test】
    • 组件【trace】
    • 组件【xss】
    • 组件【shardingsphere】
  • 环境搭建

    • Centos7安装Mysql 8.0.33
    • Centos7安装Redis 7.0.11
    • Centos7安装RocketMQ 5.1.1
    • Centos7安装Jdk 17.0.7
    • Centos7安装Docker 23.0.6
    • Centos7安装Elasticsearch 8.6.2
    • Docker安装RabbitMQ 3.12.2
    • Docker安装Postgresql 16.1
    • Ubuntu20.04安装Docker
    • Ubuntu20.04忘记密码或指纹错误
  • 常用命令

    • Centos7常用命令
    • Centos7常用命令
  • 快速上手

    • 项目启动【dev环境】
    • 项目启动【test环境】
    • 项目启动【prod环境】
    • 更新日志
  • 前端指南

    • 前端启动
  • 后端指南

    • COLA代码规范
    • SSL证书
    • 一键修改项目模块
    • 一键生成项目骨架
    • 一键修改项目版本号
    • 一键跳过测试用例
    • 一键生成后端COLA代码
    • 分布式链路跟踪之ELK日志
    • 一键检查代码规范
    • 动态路由
  • 项目部署

    • 项目部署之镜像打包与推送
  • 其他

    • Java如何快速转Go
    • Go快速开发API
    • Vue快速开发Api
    • React快速开发Api
  • 文章

    • 物联网之对接MQTT最佳实践
    • 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】
    • vue3+lime-echart各种图表使用【懒人专用,建议收藏】
    • 物联网之使用Vertx实现TCP最佳实践【响应式】
    • 物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】
    • 物联网之使用Vertx实现UDP最佳实践【响应式】
  • 儒学

    • 儒学摘抄(一)
  • 禅语

    • 禅语摘抄(一)
  • 诗词

    • 诗词摘抄(一)
  • 道法

    • 道法摘抄(一)
  • 养生

    • 养生摘抄(一)
  • 读后感

    • 读《强者,都是含泪奔跑的人》读后感
  • 修行

    • 修身/养生/情感
  • 觉悟

    • 觉悟日记(一)
赞助
项目课程 (opens new window)
GitHub (opens new window)
  • 开发手册

    • 组件【ai】
    • 组件【algorithm】
    • 组件【banner】
    • 组件【clickhouse】
    • 组件【core】
    • 组件【bom】
    • 组件【cors】
    • 组件【domain】
    • 组件【crypto】
    • 组件【data-cache】
    • 组件【dubbo】
    • 组件【elasticsearch】
    • 组件【excel】
    • 组件【extension】
    • 组件【flink】
    • 组件【grpc】
    • 组件【i18n】
    • 组件【idempotent】
    • 组件【influxdb】
    • 组件【kafka】
    • 组件【log】
    • 组件【lock】
    • 组件【mail】
    • 组件【log4j2】
    • 组件【mqtt】
    • 组件【mongodb】
    • 组件【mybatis-plus】
    • 组件【nacos】
    • 组件【netty】
    • 组件【openapi-doc】
    • 组件【openfeign】
    • 组件【oss】
    • 组件【prometheus】
    • 组件【rabbitmq】
    • 组件【rate-limiter】
    • 组件【reactor】
    • 组件【redis】
    • 组件【rocketmq】
    • 组件【ruleengine】
    • 组件【secret】
    • 组件【security】
    • 组件【sensitive】
    • 组件【sentinel】
    • 组件【sms】
    • 组件【snail-job】
    • 组件【spark】
    • 组件【starrocks】
    • 组件【statemachine】
    • 组件【storage】
    • 组件【tdengine】
    • 组件【tenant】
    • 组件【test】
    • 组件【trace】
    • 组件【xss】
    • 组件【shardingsphere】
  • 环境搭建

    • Centos7安装Mysql 8.0.33
    • Centos7安装Redis 7.0.11
    • Centos7安装RocketMQ 5.1.1
    • Centos7安装Jdk 17.0.7
    • Centos7安装Docker 23.0.6
    • Docker安装RabbitMQ 3.12.2
    • Centos7安装Elasticsearch 8.6.2
    • Docker安装Postgresql 16.1
    • Ubuntu20.04安装Docker
    • Ubuntu20.04忘记密码或指纹错误
  • 常用命令

    • Centos7常用命令
    • Docker常用命令
  • 快速上手

    • 项目启动【dev环境】
    • 项目启动【test环境】
    • 项目启动【prod环境】
    • 更新日志
  • 前端指南

    • 前端启动
  • 后端指南

    • COLA代码规范
    • SSL证书
    • 一键修改项目模块
    • 一键生成项目骨架
    • 一键修改项目版本号
    • 一键跳过测试用例
    • 一键生成后端COLA代码
    • 分布式链路跟踪之ELK日志
    • 一键检查代码规范
    • 动态路由
    • OAuth2.1流程
  • 项目部署

    • 项目部署之镜像打包与推送
  • 其他

    • Java如何快速转Go
    • Go快速开发API
    • Vue快速开发Api
    • React快速开发Api
  • 文章

    • 物联网之对接MQTT最佳实践
    • 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】
    • vue3+lime-echart各种图表使用【懒人专用,建议收藏】
    • 物联网之使用Vertx实现TCP最佳实践【响应式】
    • 物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】
    • 物联网之使用Vertx实现UDP最佳实践【响应式】
    • 指南
    • 文章
    KCloud-Platform-IoT
    2025-05-21
    目录

    物联网之使用Vertx实现UDP最佳实践【响应式】

    你好呀,我的老朋友!我是老寇,欢迎来到老寇云平台!

    跟我一起学习使用Vertx实现UDP-Server

    # 实现UDP【响应式】

    Vertx-Core地址 (opens new window)

    # 注意

    UDP是无连接的传输,这意味着您与远程客户端没有建立持续的连接。

    所以,您发送和接收的数据包都要包含有远程的地址。

    除此之外,UDP不像TCP的使用那样安全, 这也就意味着不能保证发送的数据包一定会被对应的接收端(Endpoint)接收。【传输数据时不建立连接,因此可能导致数据包丢失】

    UDP最适合一些允许丢弃数据包的应用(如监视应用程序,视频直播)。

    # 实现过程

    查看源码 (opens new window)

    # 代码比较简单,懒得讲解啦
    # 代码比较简单,懒得讲解啦
    # 代码比较简单,懒得讲解啦
    # 服务端
    # 引入依赖
    <dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-core</artifactId>
      <version>5.0.0</version>
    </dependency>
    

    UdpServerProperties

    /**
     * @author laokou
     */
    @Data
    @Component
    @ConfigurationProperties(prefix = "spring.udp-server")
    public class UdpServerProperties {
    
        private String host = "0.0.0.0";
    
        private Set<Integer> ports = new HashSet<>(0);
    
        private boolean broadcast = false;
    
        private boolean loopbackModeDisabled = true;
    
        private String multicastNetworkInterface = null;
    
        private boolean ipV6 = false;
    
    }
    

    VertxUdpServer

    /**
     * @author laokou
     */
    @Slf4j
    public final class VertxUdpServer extends AbstractVerticle {
    
        private volatile Flux<DatagramSocket> datagramSocket;
    
        private final UdpServerProperties udpServerProperties;
    
        private boolean isClosed = false;
    
        VertxUdpServer(Vertx vertx, UdpServerProperties udpServerProperties) {
           this.udpServerProperties = udpServerProperties;
           this.vertx = vertx;
        }
    
        @Override
        public synchronized void start() {
           datagramSocket = Flux.fromIterable(udpServerProperties.getPorts()).map(port -> {
              DatagramSocket datagramSocket = vertx.createDatagramSocket(getDatagramSocketOption())
                 .handler(packet -> log.info("【Vertx-UDP-Server】 => 收到数据包:{}", packet.data()));
              datagramSocket.listen(port, udpServerProperties.getHost()).onComplete(result -> {
                 if (isClosed) {
                    return;
                 }
                 if (result.succeeded()) {
                    log.info("【Vertx-UDP-Server】 => UDP服务启动成功,端口:{}", port);
                 }
                 else {
                    Throwable ex = result.cause();
                    log.error("【Vertx-UDP-Server】 => UDP服务启动失败,错误信息:{}", ex.getMessage(), ex);
                 }
              });
              return datagramSocket;
           });
           datagramSocket.subscribeOn(Schedulers.boundedElastic()).subscribe();
        }
    
        @Override
        public synchronized void stop() {
           isClosed = true;
           datagramSocket.doOnNext(socket -> socket.close().onComplete(result -> {
              if (result.succeeded()) {
                 log.info("【Vertx-UDP-Server】 => UDP服务停止成功");
              }
              else {
                 Throwable ex = result.cause();
                 log.error("【Vertx-UDP-Server】 => UDP服务停止失败,错误信息:{}", ex.getMessage(), ex);
              }
           })).subscribeOn(Schedulers.boundedElastic()).subscribe();
        }
    
        public void deploy() {
           // 部署服务
           vertx.deployVerticle(this);
           // 停止服务
           Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
        }
    
        private DatagramSocketOptions getDatagramSocketOption() {
           DatagramSocketOptions datagramSocketOptions = new DatagramSocketOptions();
           datagramSocketOptions.setBroadcast(udpServerProperties.isBroadcast());
           datagramSocketOptions.setLoopbackModeDisabled(udpServerProperties.isLoopbackModeDisabled());
           datagramSocketOptions.setMulticastNetworkInterface(udpServerProperties.getMulticastNetworkInterface());
           datagramSocketOptions.setIpV6(udpServerProperties.isIpV6());
           return datagramSocketOptions;
        }
    
    }
    

    VertxUdpServerManager

    /**
     * @author laokou
     */
    public final class VertxUdpServerManager {
    
        private VertxUdpServerManager() {
        }
    
        public static void deploy(final Vertx vertx, final UdpServerProperties properties) {
           new VertxUdpServer(vertx, properties).deploy();
        }
    
    }
    
    # 客户端【测试】
    /**
     * @author laokou
     */
    @Slf4j
    @SpringBootTest
    @RequiredArgsConstructor
    @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
    class UdpTest {
    
        private final Vertx vertx;
    
        @Test
        void test() throws InterruptedException {
           for (int i = 4880; i < 5000; i++) {
              DatagramSocket datagramSocket = vertx.createDatagramSocket();
              int finalI = i;
              datagramSocket.send("Hello Vertx", i, "127.0.0.1").onComplete(result -> {
                 if (result.succeeded()) {
                    log.info("【Vertx-UDP-Client】 => 发送成功,端口:{}", finalI);
                 }
                 else {
                    Throwable ex = result.cause();
                    log.error("【Vertx-UDP-Client】 => 发送失败,端口:{},错误信息:{}", finalI, ex.getMessage(), ex);
                 }
              });
              Thread.sleep(2000);
              Assertions.assertDoesNotThrow(datagramSocket::close);
           }
        }
    
    }
    

    这可以满足基本的协议开发,自行修改即可!!!

    我是老寇,我们下次再见啦!

    上次更新: 5/21/2025, 5:17:33 PM
    物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】

    ← 物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】

    Theme by Vdoing | Copyright © 2022-2025 laokou | Apache 2.0 License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式