KCloud-Platform-IoT KCloud-Platform-IoT
首页
  • 开发手册

    • 组件【ai】
    • 组件【algorithm】
    • 组件【banner】
    • 组件【clickhouse】
    • 组件【core】
    • 组件【bom】
    • 组件【cors】
    • 组件【domain】
    • 组件【crypto】
    • 组件【data-cache】
    • 组件【dubbo】
    • 组件【elasticsearch】
    • 组件【excel】
    • 组件【extension】
    • 组件【flink】
    • 组件【grpc】
    • 组件【i18n】
    • 组件【idempotent】
    • 组件【influxdb】
    • 组件【kafka】
    • 组件【log】
    • 组件【lock】
    • 组件【mail】
    • 组件【log4j2】
    • 组件【mqtt】
    • 组件【mongodb】
    • 组件【mybatis-plus】
    • 组件【nacos】
    • 组件【netty】
    • 组件【openapi-doc】
    • 组件【openfeign】
    • 组件【oss】
    • 组件【prometheus】
    • 组件【rabbitmq】
    • 组件【rate-limiter】
    • 组件【reactor】
    • 组件【redis】
    • 组件【rocketmq】
    • 组件【ruleengine】
    • 组件【secret】
    • 组件【security】
    • 组件【sensitive】
    • 组件【sentinel】
    • 组件【sms】
    • 组件【snail-job】
    • 组件【spark】
    • 组件【starrocks】
    • 组件【statemachine】
    • 组件【storage】
    • 组件【tdengine】
    • 组件【tenant】
    • 组件【test】
    • 组件【trace】
    • 组件【xss】
    • 组件【shardingsphere】
  • 环境搭建

    • Centos7安装Mysql 8.0.33
    • Centos7安装Redis 7.0.11
    • Centos7安装RocketMQ 5.1.1
    • Centos7安装Jdk 17.0.7
    • Centos7安装Docker 23.0.6
    • Centos7安装Elasticsearch 8.6.2
    • Docker安装RabbitMQ 3.12.2
    • Docker安装Postgresql 16.1
    • Ubuntu20.04安装Docker
    • Ubuntu20.04忘记密码或指纹错误
  • 常用命令

    • Centos7常用命令
    • Centos7常用命令
  • 快速上手

    • 项目启动【dev环境】
    • 项目启动【test环境】
    • 项目启动【prod环境】
    • 更新日志
  • 前端指南

    • 前端启动
  • 后端指南

    • COLA代码规范
    • SSL证书
    • 一键修改项目模块
    • 一键生成项目骨架
    • 一键修改项目版本号
    • 一键跳过测试用例
    • 一键生成后端COLA代码
    • 分布式链路跟踪之ELK日志
    • 一键检查代码规范
    • 动态路由
  • 项目部署

    • 项目部署之镜像打包与推送
  • 其他

    • Java如何快速转Go
    • Go快速开发API
    • Vue快速开发Api
    • React快速开发Api
  • 文章

    • 物联网之对接MQTT最佳实践
    • 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】
    • vue3+lime-echart各种图表使用【懒人专用,建议收藏】
    • 物联网之使用Vertx实现TCP最佳实践【响应式】
    • 物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】
    • 物联网之使用Vertx实现UDP最佳实践【响应式】
  • 儒学

    • 儒学摘抄(一)
  • 禅语

    • 禅语摘抄(一)
  • 诗词

    • 诗词摘抄(一)
  • 道法

    • 道法摘抄(一)
  • 养生

    • 养生摘抄(一)
  • 读后感

    • 读《强者,都是含泪奔跑的人》读后感
  • 修行

    • 修身/养生/情感
  • 觉悟

    • 觉悟日记(一)
赞助
项目课程 (opens new window)
GitHub (opens new window)
首页
  • 开发手册

    • 组件【ai】
    • 组件【algorithm】
    • 组件【banner】
    • 组件【clickhouse】
    • 组件【core】
    • 组件【bom】
    • 组件【cors】
    • 组件【domain】
    • 组件【crypto】
    • 组件【data-cache】
    • 组件【dubbo】
    • 组件【elasticsearch】
    • 组件【excel】
    • 组件【extension】
    • 组件【flink】
    • 组件【grpc】
    • 组件【i18n】
    • 组件【idempotent】
    • 组件【influxdb】
    • 组件【kafka】
    • 组件【log】
    • 组件【lock】
    • 组件【mail】
    • 组件【log4j2】
    • 组件【mqtt】
    • 组件【mongodb】
    • 组件【mybatis-plus】
    • 组件【nacos】
    • 组件【netty】
    • 组件【openapi-doc】
    • 组件【openfeign】
    • 组件【oss】
    • 组件【prometheus】
    • 组件【rabbitmq】
    • 组件【rate-limiter】
    • 组件【reactor】
    • 组件【redis】
    • 组件【rocketmq】
    • 组件【ruleengine】
    • 组件【secret】
    • 组件【security】
    • 组件【sensitive】
    • 组件【sentinel】
    • 组件【sms】
    • 组件【snail-job】
    • 组件【spark】
    • 组件【starrocks】
    • 组件【statemachine】
    • 组件【storage】
    • 组件【tdengine】
    • 组件【tenant】
    • 组件【test】
    • 组件【trace】
    • 组件【xss】
    • 组件【shardingsphere】
  • 环境搭建

    • Centos7安装Mysql 8.0.33
    • Centos7安装Redis 7.0.11
    • Centos7安装RocketMQ 5.1.1
    • Centos7安装Jdk 17.0.7
    • Centos7安装Docker 23.0.6
    • Centos7安装Elasticsearch 8.6.2
    • Docker安装RabbitMQ 3.12.2
    • Docker安装Postgresql 16.1
    • Ubuntu20.04安装Docker
    • Ubuntu20.04忘记密码或指纹错误
  • 常用命令

    • Centos7常用命令
    • Centos7常用命令
  • 快速上手

    • 项目启动【dev环境】
    • 项目启动【test环境】
    • 项目启动【prod环境】
    • 更新日志
  • 前端指南

    • 前端启动
  • 后端指南

    • COLA代码规范
    • SSL证书
    • 一键修改项目模块
    • 一键生成项目骨架
    • 一键修改项目版本号
    • 一键跳过测试用例
    • 一键生成后端COLA代码
    • 分布式链路跟踪之ELK日志
    • 一键检查代码规范
    • 动态路由
  • 项目部署

    • 项目部署之镜像打包与推送
  • 其他

    • Java如何快速转Go
    • Go快速开发API
    • Vue快速开发Api
    • React快速开发Api
  • 文章

    • 物联网之对接MQTT最佳实践
    • 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】
    • vue3+lime-echart各种图表使用【懒人专用,建议收藏】
    • 物联网之使用Vertx实现TCP最佳实践【响应式】
    • 物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】
    • 物联网之使用Vertx实现UDP最佳实践【响应式】
  • 儒学

    • 儒学摘抄(一)
  • 禅语

    • 禅语摘抄(一)
  • 诗词

    • 诗词摘抄(一)
  • 道法

    • 道法摘抄(一)
  • 养生

    • 养生摘抄(一)
  • 读后感

    • 读《强者,都是含泪奔跑的人》读后感
  • 修行

    • 修身/养生/情感
  • 觉悟

    • 觉悟日记(一)
赞助
项目课程 (opens new window)
GitHub (opens new window)
  • 开发手册

    • 组件【ai】
    • 组件【algorithm】
    • 组件【banner】
    • 组件【clickhouse】
    • 组件【core】
    • 组件【bom】
    • 组件【cors】
    • 组件【domain】
    • 组件【crypto】
    • 组件【data-cache】
    • 组件【dubbo】
    • 组件【elasticsearch】
    • 组件【excel】
    • 组件【extension】
    • 组件【flink】
    • 组件【grpc】
    • 组件【i18n】
    • 组件【idempotent】
    • 组件【influxdb】
    • 组件【kafka】
    • 组件【log】
    • 组件【lock】
    • 组件【mail】
    • 组件【log4j2】
    • 组件【mqtt】
    • 组件【mongodb】
    • 组件【mybatis-plus】
    • 组件【nacos】
    • 组件【netty】
    • 组件【openapi-doc】
    • 组件【openfeign】
    • 组件【oss】
    • 组件【prometheus】
    • 组件【rabbitmq】
    • 组件【rate-limiter】
    • 组件【reactor】
    • 组件【redis】
    • 组件【rocketmq】
    • 组件【ruleengine】
    • 组件【secret】
    • 组件【security】
    • 组件【sensitive】
    • 组件【sentinel】
    • 组件【sms】
    • 组件【snail-job】
    • 组件【spark】
    • 组件【starrocks】
    • 组件【statemachine】
    • 组件【storage】
    • 组件【tdengine】
    • 组件【tenant】
    • 组件【test】
    • 组件【trace】
    • 组件【xss】
    • 组件【shardingsphere】
  • 环境搭建

    • Centos7安装Mysql 8.0.33
    • Centos7安装Redis 7.0.11
    • Centos7安装RocketMQ 5.1.1
    • Centos7安装Jdk 17.0.7
    • Centos7安装Docker 23.0.6
    • Docker安装RabbitMQ 3.12.2
    • Centos7安装Elasticsearch 8.6.2
    • Docker安装Postgresql 16.1
    • Ubuntu20.04安装Docker
    • Ubuntu20.04忘记密码或指纹错误
  • 常用命令

    • Centos7常用命令
    • Docker常用命令
  • 快速上手

    • 项目启动【dev环境】
    • 项目启动【test环境】
    • 项目启动【prod环境】
    • 更新日志
  • 前端指南

    • 前端启动
  • 后端指南

    • COLA代码规范
    • SSL证书
    • 一键修改项目模块
    • 一键生成项目骨架
    • 一键修改项目版本号
    • 一键跳过测试用例
    • 一键生成后端COLA代码
    • 分布式链路跟踪之ELK日志
    • 一键检查代码规范
    • 动态路由
    • OAuth2.1流程
  • 项目部署

    • 项目部署之镜像打包与推送
  • 其他

    • Java如何快速转Go
    • Go快速开发API
    • Vue快速开发Api
    • React快速开发Api
  • 文章

    • 物联网之对接MQTT最佳实践
    • 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】
    • vue3+lime-echart各种图表使用【懒人专用,建议收藏】
    • 物联网之使用Vertx实现TCP最佳实践【响应式】
    • 物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】
      • 物联网之使用Vertx实现UDP最佳实践【响应式】
    • 指南
    • 文章
    KCloud-Platform-IoT
    2025-05-19
    目录

    物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】

    你好呀,我的老朋友!我是老寇,欢迎来到老寇云平台!

    跟我一起学习使用Vertx实现HTTP-Server和WebSocket-Server

    # 实现Http/WebSocket【响应式】

    Vertx-Web地址 (opens new window)

    # 实现过程

    查看源码 (opens new window)

    # 代码比较简单,懒得讲解啦
    # 代码比较简单,懒得讲解啦
    # 代码比较简单,懒得讲解啦
    # http/websocket【响应式】
    <dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-web</artifactId>
      <version>5.0.0</version>
    </dependency>
    

    HttpServerProperties

    /**
     * @author laokou
     */
    @Data
    @Component
    @ConfigurationProperties(prefix = "spring.http-server")
    public class HttpServerProperties {
    
        private boolean auth = true;
    
        private String host = "0.0.0.0";
    
        private Set<Integer> ports = new HashSet<>(0);
    
        private boolean compressionSupported = false;
    
        private int compressionLevel = 6;
    
        private int maxWebSocketFrameSize = 65536;
    
        private int maxWebSocketMessageSize = 65536 * 4;
    
        private boolean handle100ContinueAutomatically = false;
    
        private int maxChunkSize = 8192;
    
        private int maxInitialLineLength = 4096;
    
        private int maxHeaderSize = 8192;
    
        private int maxFormAttributeSize = 8192;
    
        private int maxFormFields = 512;
    
        private int maxFormBufferedBytes = 2048;
    
        private Http2Settings initialSettings = new Http2Settings()
           .setMaxConcurrentStreams(DEFAULT_INITIAL_SETTINGS_MAX_CONCURRENT_STREAMS);
    
        private List<HttpVersion> alpnVersions = new ArrayList<>(DEFAULT_ALPN_VERSIONS);
    
        private boolean http2ClearTextEnabled = true;
    
        private int http2ConnectionWindowSize = -1;
    
        private boolean decompressionSupported = false;
    
        private boolean acceptUnmaskedFrames = false;
    
        private int decoderInitialBufferSize = 256;
    
        private boolean perFrameWebSocketCompressionSupported = true;
    
        private boolean perMessageWebSocketCompressionSupported = true;
    
        private int webSocketCompressionLevel = 6;
    
        private boolean webSocketAllowServerNoContext = false;
    
        private boolean webSocketPreferredClientNoContext = false;
    
        private int webSocketClosingTimeout = 30;
    
        private TracingPolicy tracingPolicy = TracingPolicy.ALWAYS;
    
        private boolean registerWebSocketWriteHandlers = false;
    
        private int http2RstFloodMaxRstFramePerWindow = 400;
    
        private int http2RstFloodWindowDuration = 60;
    
        private TimeUnit http2RstFloodWindowDurationTimeUnit = TimeUnit.SECONDS;
    
    }
    

    VertxHttpServer

    /**
     * @author laokou
     */
    @Slf4j
    final class VertxHttpServer extends AbstractVerticle {
    
        private final HttpServerProperties properties;
    
        private final Vertx vertx;
    
        private final Router router;
    
        private volatile Flux<HttpServer> httpServer;
    
        private boolean isClosed = false;
    
        VertxHttpServer(Vertx vertx, HttpServerProperties properties) {
           this.vertx = vertx;
           this.properties = properties;
           this.router = getRouter();
        }
    
        @Override
        public synchronized void start() {
           httpServer = getHttpServerOptions().map(vertx::createHttpServer)
              .doOnNext(server -> server.webSocketHandler(serverWebSocket -> {
                 if (!RegexUtils.matches(WebsocketMessageEnum.UP_PROPERTY_REPORT.getPath(), serverWebSocket.path())) {
                    serverWebSocket.close();
                    return;
                 }
                 serverWebSocket.textMessageHandler(message -> log.info("【Vertx-WebSocket-Server】 => 收到消息:{}", message))
                    .closeHandler(v -> log.error("【Vertx-WebSocket-Server】 => 断开连接"))
                    .exceptionHandler(err -> log.error("【Vertx-WebSocket-Server】 => 错误信息:{}", err.getMessage(), err))
                    .endHandler(v -> log.error("【Vertx-WebSocket-Server】 => 结束"));
              }).requestHandler(router).listen().onComplete(completionHandler -> {
                 if (isClosed) {
                    return;
                 }
                 if (completionHandler.succeeded()) {
                    log.info("【Vertx-HTTP-Server】 => HTTP服务启动成功,端口:{}", server.actualPort());
                 }
                 else {
                    Throwable ex = completionHandler.cause();
                    log.error("【Vertx-HTTP-Server】 => HTTP服务启动失败,错误信息:{}", ex.getMessage(), ex);
                 }
              }));
           httpServer.subscribeOn(Schedulers.boundedElastic()).subscribe();
        }
    
        @Override
        public synchronized void stop() {
           isClosed = true;
           httpServer.doOnNext(server -> server.close().onComplete(result -> {
              if (result.succeeded()) {
                 log.info("【Vertx-HTTP-Server】 => HTTP服务停止成功,端口:{}", server.actualPort());
              }
              else {
                 Throwable ex = result.cause();
                 log.error("【Vertx-HTTP-Server】 => HTTP服务停止失败,错误信息:{}", ex.getMessage(), ex);
              }
           })).subscribeOn(Schedulers.boundedElastic()).subscribe();
        }
    
        public void deploy() {
           // 部署服务
           vertx.deployVerticle(this);
           // 停止服务
           Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
        }
    
        private Router getRouter() {
           Router router = Router.router(vertx);
           router.route().handler(BodyHandler.create());
           router.post(HttpMessageEnum.UP_PROPERTY_REPORT.getRouter()).handler(ctx -> {
              String body = ctx.body().asString();
              Long deviceId = Long.valueOf(ctx.pathParam("deviceId"));
              Long productId = Long.valueOf(ctx.pathParam("productId"));
              log.info("productId:{},deviceId:{},body:{}", productId, deviceId, body);
              ctx.response().end();
           });
           return router;
        }
    
        private Flux<HttpServerOptions> getHttpServerOptions() {
           return Flux.fromIterable(properties.getPorts()).map(this::getHttpServerOption);
        }
    
        private HttpServerOptions getHttpServerOption(int port) {
           HttpServerOptions options = new HttpServerOptions();
           options.setHost(properties.getHost());
           options.setPort(port);
           options.setCompressionSupported(properties.isCompressionSupported());
           options.setDecompressionSupported(properties.isDecompressionSupported());
           options.setCompressionLevel(properties.getCompressionLevel());
           options.setMaxWebSocketFrameSize(properties.getMaxWebSocketFrameSize());
           options.setMaxWebSocketMessageSize(properties.getMaxWebSocketMessageSize());
           options.setHandle100ContinueAutomatically(properties.isHandle100ContinueAutomatically());
           options.setMaxChunkSize(properties.getMaxChunkSize());
           options.setMaxInitialLineLength(properties.getMaxInitialLineLength());
           options.setMaxHeaderSize(properties.getMaxHeaderSize());
           options.setMaxFormAttributeSize(properties.getMaxFormAttributeSize());
           options.setMaxFormFields(properties.getMaxFormFields());
           options.setMaxFormBufferedBytes(properties.getMaxFormBufferedBytes());
           options.setInitialSettings(properties.getInitialSettings());
           options.setAlpnVersions(properties.getAlpnVersions());
           options.setHttp2ClearTextEnabled(properties.isHttp2ClearTextEnabled());
           options.setHttp2ConnectionWindowSize(properties.getHttp2ConnectionWindowSize());
           options.setDecoderInitialBufferSize(properties.getDecoderInitialBufferSize());
           options.setPerFrameWebSocketCompressionSupported(properties.isPerFrameWebSocketCompressionSupported());
           options.setPerMessageWebSocketCompressionSupported(properties.isPerMessageWebSocketCompressionSupported());
           options.setWebSocketCompressionLevel(properties.getWebSocketCompressionLevel());
           options.setWebSocketAllowServerNoContext(properties.isWebSocketAllowServerNoContext());
           options.setWebSocketPreferredClientNoContext(properties.isWebSocketPreferredClientNoContext());
           options.setWebSocketClosingTimeout(properties.getWebSocketClosingTimeout());
           options.setTracingPolicy(properties.getTracingPolicy());
           options.setRegisterWebSocketWriteHandlers(properties.isRegisterWebSocketWriteHandlers());
           options.setHttp2RstFloodMaxRstFramePerWindow(properties.getHttp2RstFloodMaxRstFramePerWindow());
           options.setHttp2RstFloodWindowDuration(properties.getHttp2RstFloodWindowDuration());
           options.setHttp2RstFloodWindowDurationTimeUnit(properties.getHttp2RstFloodWindowDurationTimeUnit());
           return options;
        }
    
    }
    

    这只是一个demo,实际情况,需要对http请求进行鉴权,推荐使用OAuth2

    我是老寇,我们下次再见啦!

    上次更新: 5/21/2025, 5:17:33 PM
    物联网之使用Vertx实现TCP最佳实践【响应式】
    物联网之使用Vertx实现UDP最佳实践【响应式】

    ← 物联网之使用Vertx实现TCP最佳实践【响应式】 物联网之使用Vertx实现UDP最佳实践【响应式】→

    Theme by Vdoing | Copyright © 2022-2025 laokou | Apache 2.0 License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式