物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】
你好呀,我的老朋友!我是老寇,欢迎来到老寇云平台!
跟我一起学习使用Vertx实现HTTP-Server和WebSocket-Server
# 实现Http/WebSocket【响应式】
Vertx-Web地址 (opens new window)
# 实现过程
# 代码比较简单,懒得讲解啦
# 代码比较简单,懒得讲解啦
# 代码比较简单,懒得讲解啦
# http/websocket【响应式】
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
<version>5.0.0</version>
</dependency>
HttpServerProperties
/**
* @author laokou
*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.http-server")
public class HttpServerProperties {
private boolean auth = true;
private String host = "0.0.0.0";
private Set<Integer> ports = new HashSet<>(0);
private boolean compressionSupported = false;
private int compressionLevel = 6;
private int maxWebSocketFrameSize = 65536;
private int maxWebSocketMessageSize = 65536 * 4;
private boolean handle100ContinueAutomatically = false;
private int maxChunkSize = 8192;
private int maxInitialLineLength = 4096;
private int maxHeaderSize = 8192;
private int maxFormAttributeSize = 8192;
private int maxFormFields = 512;
private int maxFormBufferedBytes = 2048;
private Http2Settings initialSettings = new Http2Settings()
.setMaxConcurrentStreams(DEFAULT_INITIAL_SETTINGS_MAX_CONCURRENT_STREAMS);
private List<HttpVersion> alpnVersions = new ArrayList<>(DEFAULT_ALPN_VERSIONS);
private boolean http2ClearTextEnabled = true;
private int http2ConnectionWindowSize = -1;
private boolean decompressionSupported = false;
private boolean acceptUnmaskedFrames = false;
private int decoderInitialBufferSize = 256;
private boolean perFrameWebSocketCompressionSupported = true;
private boolean perMessageWebSocketCompressionSupported = true;
private int webSocketCompressionLevel = 6;
private boolean webSocketAllowServerNoContext = false;
private boolean webSocketPreferredClientNoContext = false;
private int webSocketClosingTimeout = 30;
private TracingPolicy tracingPolicy = TracingPolicy.ALWAYS;
private boolean registerWebSocketWriteHandlers = false;
private int http2RstFloodMaxRstFramePerWindow = 400;
private int http2RstFloodWindowDuration = 60;
private TimeUnit http2RstFloodWindowDurationTimeUnit = TimeUnit.SECONDS;
}
VertxHttpServer
/**
* @author laokou
*/
@Slf4j
final class VertxHttpServer extends AbstractVerticle {
private final HttpServerProperties properties;
private final Vertx vertx;
private final Router router;
private volatile Flux<HttpServer> httpServer;
private boolean isClosed = false;
VertxHttpServer(Vertx vertx, HttpServerProperties properties) {
this.vertx = vertx;
this.properties = properties;
this.router = getRouter();
}
@Override
public synchronized void start() {
httpServer = getHttpServerOptions().map(vertx::createHttpServer)
.doOnNext(server -> server.webSocketHandler(serverWebSocket -> {
if (!RegexUtils.matches(WebsocketMessageEnum.UP_PROPERTY_REPORT.getPath(), serverWebSocket.path())) {
serverWebSocket.close();
return;
}
serverWebSocket.textMessageHandler(message -> log.info("【Vertx-WebSocket-Server】 => 收到消息:{}", message))
.closeHandler(v -> log.error("【Vertx-WebSocket-Server】 => 断开连接"))
.exceptionHandler(err -> log.error("【Vertx-WebSocket-Server】 => 错误信息:{}", err.getMessage(), err))
.endHandler(v -> log.error("【Vertx-WebSocket-Server】 => 结束"));
}).requestHandler(router).listen().onComplete(completionHandler -> {
if (isClosed) {
return;
}
if (completionHandler.succeeded()) {
log.info("【Vertx-HTTP-Server】 => HTTP服务启动成功,端口:{}", server.actualPort());
}
else {
Throwable ex = completionHandler.cause();
log.error("【Vertx-HTTP-Server】 => HTTP服务启动失败,错误信息:{}", ex.getMessage(), ex);
}
}));
httpServer.subscribeOn(Schedulers.boundedElastic()).subscribe();
}
@Override
public synchronized void stop() {
isClosed = true;
httpServer.doOnNext(server -> server.close().onComplete(result -> {
if (result.succeeded()) {
log.info("【Vertx-HTTP-Server】 => HTTP服务停止成功,端口:{}", server.actualPort());
}
else {
Throwable ex = result.cause();
log.error("【Vertx-HTTP-Server】 => HTTP服务停止失败,错误信息:{}", ex.getMessage(), ex);
}
})).subscribeOn(Schedulers.boundedElastic()).subscribe();
}
public void deploy() {
// 部署服务
vertx.deployVerticle(this);
// 停止服务
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
}
private Router getRouter() {
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
router.post(HttpMessageEnum.UP_PROPERTY_REPORT.getRouter()).handler(ctx -> {
String body = ctx.body().asString();
Long deviceId = Long.valueOf(ctx.pathParam("deviceId"));
Long productId = Long.valueOf(ctx.pathParam("productId"));
log.info("productId:{},deviceId:{},body:{}", productId, deviceId, body);
ctx.response().end();
});
return router;
}
private Flux<HttpServerOptions> getHttpServerOptions() {
return Flux.fromIterable(properties.getPorts()).map(this::getHttpServerOption);
}
private HttpServerOptions getHttpServerOption(int port) {
HttpServerOptions options = new HttpServerOptions();
options.setHost(properties.getHost());
options.setPort(port);
options.setCompressionSupported(properties.isCompressionSupported());
options.setDecompressionSupported(properties.isDecompressionSupported());
options.setCompressionLevel(properties.getCompressionLevel());
options.setMaxWebSocketFrameSize(properties.getMaxWebSocketFrameSize());
options.setMaxWebSocketMessageSize(properties.getMaxWebSocketMessageSize());
options.setHandle100ContinueAutomatically(properties.isHandle100ContinueAutomatically());
options.setMaxChunkSize(properties.getMaxChunkSize());
options.setMaxInitialLineLength(properties.getMaxInitialLineLength());
options.setMaxHeaderSize(properties.getMaxHeaderSize());
options.setMaxFormAttributeSize(properties.getMaxFormAttributeSize());
options.setMaxFormFields(properties.getMaxFormFields());
options.setMaxFormBufferedBytes(properties.getMaxFormBufferedBytes());
options.setInitialSettings(properties.getInitialSettings());
options.setAlpnVersions(properties.getAlpnVersions());
options.setHttp2ClearTextEnabled(properties.isHttp2ClearTextEnabled());
options.setHttp2ConnectionWindowSize(properties.getHttp2ConnectionWindowSize());
options.setDecoderInitialBufferSize(properties.getDecoderInitialBufferSize());
options.setPerFrameWebSocketCompressionSupported(properties.isPerFrameWebSocketCompressionSupported());
options.setPerMessageWebSocketCompressionSupported(properties.isPerMessageWebSocketCompressionSupported());
options.setWebSocketCompressionLevel(properties.getWebSocketCompressionLevel());
options.setWebSocketAllowServerNoContext(properties.isWebSocketAllowServerNoContext());
options.setWebSocketPreferredClientNoContext(properties.isWebSocketPreferredClientNoContext());
options.setWebSocketClosingTimeout(properties.getWebSocketClosingTimeout());
options.setTracingPolicy(properties.getTracingPolicy());
options.setRegisterWebSocketWriteHandlers(properties.isRegisterWebSocketWriteHandlers());
options.setHttp2RstFloodMaxRstFramePerWindow(properties.getHttp2RstFloodMaxRstFramePerWindow());
options.setHttp2RstFloodWindowDuration(properties.getHttp2RstFloodWindowDuration());
options.setHttp2RstFloodWindowDurationTimeUnit(properties.getHttp2RstFloodWindowDurationTimeUnit());
return options;
}
}
这只是一个demo,实际情况,需要对http请求进行鉴权,推荐使用OAuth2
我是老寇,我们下次再见啦!
上次更新: 5/21/2025, 5:17:33 PM