物联网之使用Vertx实现UDP最佳实践【响应式】
你好呀,我的老朋友!我是老寇,欢迎来到老寇云平台!
跟我一起学习使用Vertx实现UDP-Server
# 实现UDP【响应式】
Vertx-Core地址 (opens new window)
# 注意
UDP是无连接的传输,这意味着您与远程客户端没有建立持续的连接。
所以,您发送和接收的数据包都要包含有远程的地址。
除此之外,UDP不像TCP的使用那样安全, 这也就意味着不能保证发送的数据包一定会被对应的接收端(Endpoint)接收。【传输数据时不建立连接,因此可能导致数据包丢失】
UDP最适合一些允许丢弃数据包的应用(如监视应用程序,视频直播)。
# 实现过程
# 代码比较简单,懒得讲解啦
# 代码比较简单,懒得讲解啦
# 代码比较简单,懒得讲解啦
# 服务端
# 引入依赖
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>5.0.0</version>
</dependency>
UdpServerProperties
/**
* @author laokou
*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.udp-server")
public class UdpServerProperties {
private String host = "0.0.0.0";
private Set<Integer> ports = new HashSet<>(0);
private boolean broadcast = false;
private boolean loopbackModeDisabled = true;
private String multicastNetworkInterface = null;
private boolean ipV6 = false;
}
VertxUdpServer
/**
* @author laokou
*/
@Slf4j
public final class VertxUdpServer extends AbstractVerticle {
private volatile Flux<DatagramSocket> datagramSocket;
private final UdpServerProperties udpServerProperties;
private boolean isClosed = false;
VertxUdpServer(Vertx vertx, UdpServerProperties udpServerProperties) {
this.udpServerProperties = udpServerProperties;
this.vertx = vertx;
}
@Override
public synchronized void start() {
datagramSocket = Flux.fromIterable(udpServerProperties.getPorts()).map(port -> {
DatagramSocket datagramSocket = vertx.createDatagramSocket(getDatagramSocketOption())
.handler(packet -> log.info("【Vertx-UDP-Server】 => 收到数据包:{}", packet.data()));
datagramSocket.listen(port, udpServerProperties.getHost()).onComplete(result -> {
if (isClosed) {
return;
}
if (result.succeeded()) {
log.info("【Vertx-UDP-Server】 => UDP服务启动成功,端口:{}", port);
}
else {
Throwable ex = result.cause();
log.error("【Vertx-UDP-Server】 => UDP服务启动失败,错误信息:{}", ex.getMessage(), ex);
}
});
return datagramSocket;
});
datagramSocket.subscribeOn(Schedulers.boundedElastic()).subscribe();
}
@Override
public synchronized void stop() {
isClosed = true;
datagramSocket.doOnNext(socket -> socket.close().onComplete(result -> {
if (result.succeeded()) {
log.info("【Vertx-UDP-Server】 => UDP服务停止成功");
}
else {
Throwable ex = result.cause();
log.error("【Vertx-UDP-Server】 => UDP服务停止失败,错误信息:{}", ex.getMessage(), ex);
}
})).subscribeOn(Schedulers.boundedElastic()).subscribe();
}
public void deploy() {
// 部署服务
vertx.deployVerticle(this);
// 停止服务
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
}
private DatagramSocketOptions getDatagramSocketOption() {
DatagramSocketOptions datagramSocketOptions = new DatagramSocketOptions();
datagramSocketOptions.setBroadcast(udpServerProperties.isBroadcast());
datagramSocketOptions.setLoopbackModeDisabled(udpServerProperties.isLoopbackModeDisabled());
datagramSocketOptions.setMulticastNetworkInterface(udpServerProperties.getMulticastNetworkInterface());
datagramSocketOptions.setIpV6(udpServerProperties.isIpV6());
return datagramSocketOptions;
}
}
VertxUdpServerManager
/**
* @author laokou
*/
public final class VertxUdpServerManager {
private VertxUdpServerManager() {
}
public static void deploy(final Vertx vertx, final UdpServerProperties properties) {
new VertxUdpServer(vertx, properties).deploy();
}
}
# 客户端【测试】
/**
* @author laokou
*/
@Slf4j
@SpringBootTest
@RequiredArgsConstructor
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
class UdpTest {
private final Vertx vertx;
@Test
void test() throws InterruptedException {
for (int i = 4880; i < 5000; i++) {
DatagramSocket datagramSocket = vertx.createDatagramSocket();
int finalI = i;
datagramSocket.send("Hello Vertx", i, "127.0.0.1").onComplete(result -> {
if (result.succeeded()) {
log.info("【Vertx-UDP-Client】 => 发送成功,端口:{}", finalI);
}
else {
Throwable ex = result.cause();
log.error("【Vertx-UDP-Client】 => 发送失败,端口:{},错误信息:{}", finalI, ex.getMessage(), ex);
}
});
Thread.sleep(2000);
Assertions.assertDoesNotThrow(datagramSocket::close);
}
}
}
这可以满足基本的协议开发,自行修改即可!!!
我是老寇,我们下次再见啦!
上次更新: 5/21/2025, 5:17:33 PM