动态路由
你好呀,我的老朋友!我是老寇,欢迎来到老寇云平台!
话不多说,讲一讲动态路由!
# Spring Cloud Gateway
Spring Cloud Gateway官方地址 (opens new window)
提供了一个建立在 Spring 生态系统之上的 API 网关,包括:Spring 6、Spring Boot 3 和 Project Reactor。 Spring Cloud Gateway旨在提供一种简单而有效的方法来路由到API,并为它们提供跨领域关注点,例如:安全性,监控/指标、限流、路由等等。
注意
- 不启用网关,请设置
spring.cloud.gateway.enabled=false
- Spring Cloud Gateway需要运行在由Spring Webflux(响应式)提供的Netty容器,不适用于传统的Servlet容器或作为WAR构建
核心概念
- Route:网关的基本构成单元,它由ID,目标URI,Predicate集合和Filer集合组成,如果满足Predicate,则匹配路由
- Predicate:断言,这是jdk8 断言函数,输入类型是
Spring Framework ServerWebExchange
,可以匹配HTTP请求中的任何内容,例如请求头或参数 - Filter:是使用特定工厂构造的
GatewayFilter
实例,分为两种类型,分别是Gateway Filter(某个路由过滤器)和Global Filter(全局过滤器),您可以对下游服务请求之前或之后修改请求或响应
流程图
负载均衡配置
spring:
cloud:
# loadbalancer
loadbalancer:
cache:
caffeine:
# 初始容量 => 30
# 最大容量 => 4096
# 淘汰规则 => 最后一次写操作后经过30s过期
spec: initialCapacity=30,expireAfterWrite=30s,maximumSize=4096
# 开启缓存
enabled: true
nacos:
# 开启Nacos路由负载均衡
enabled: true
# gateway
gateway:
discovery:
locator:
# 关闭动态生成路由 => DiscoveryClientRouteDefinitionLocator
# 查看DiscoveryLocatorProperties
enabled: false
# 开启服务ID强制小写
lower-case-service-id: true
httpclient:
ssl:
# 信任所有下游证书
use-insecure-trust-manager: true
# 关闭netty日志
wiretap: false
pool:
# 连接池中连接的最大空闲时间
max-idle-time: 10m
# 最大连接数
max-connections: 10000
# 连接池中连接的最大存活时间
max-life-time: 5m
# elastic 无线扩展的线程池(弹性线程池,连接数不可控)
# fixed 固定数量线程池
# disabled 不使用线程池(只有一个线程)
type: fixed
# 必须是fixed,线程池获取连接最大等待时间(毫秒)
acquire-timeout: 60000
# 连接超时时间(毫秒),默认30s
connect-timeout: 60000
httpserver:
# 关闭netty日志
wiretap: false
router:
# 开启动态路由认证
auth:
enabled: true
username: laokou
password: laokou123
# 动态路由
# 引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-loadbalancer</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
</dependency>
</dependencies>
# 使用
// @formatter:off
/**
* nacos动态路由缓存库.
* <a href="https://github.com/alibaba/spring-cloud-alibaba/blob/2.2.x/spring-cloud-alibaba-examples/nacos-example/nacos-config-example/src/main/java/com/alibaba/cloud/examples/example/ConfigListenerExample.java">nacos拉取配置</a>
*
* @author laokou
*/
// @formatter:on
@Component
@Slf4j
@NonNullApi
@Repository
public class NacosRouteDefinitionRepository implements RouteDefinitionRepository, ApplicationEventPublisherAware {
/**
* 动态路由配置.
*/
private static final String DATA_ID = "router.json";
private final ConfigUtil configUtil;
private final ReactiveHashOperations<String, String, RouteDefinition> reactiveHashOperations;
private ApplicationEventPublisher applicationEventPublisher;
public NacosRouteDefinitionRepository(ConfigUtil configUtil,
ReactiveRedisTemplate<String, Object> reactiveRedisTemplate) {
this.configUtil = configUtil;
this.reactiveHashOperations = reactiveRedisTemplate.opsForHash();
}
// @formatter:off
/**
* 路由基本原理总结:
* 1.从NacosRouteDefinitionRepository、DiscoveryClientRouteDefinitionLocator和PropertiesRouteDefinitionLocator加载定义的路由规则.
* 2.通过CompositeRouteDefinitionLocator合并定义的路由规则.
* 3.加载所有的定义的路由规则,使用配置的断言工厂和过滤器工厂来创建路由.
* 4.将路由缓存,提高路由查找性能.
* <p>
* 获取动态路由(避免集群中网关频繁调用Redis,需要本地缓存).
* {@link org.springframework.cloud.gateway.config.GatewayAutoConfiguration
* @return 定义的路由规则
*/
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
return reactiveHashOperations.entries(RedisKeyUtil.getRouteDefinitionHashKey())
.mapNotNull(Map.Entry::getValue)
.onErrorContinue((throwable, routeDefinition) -> {
if (log.isErrorEnabled()) {
log.error("Get routes from redis error cause : {}", throwable.toString(), throwable);
}
});
}
// @formatter:on
// @formatter:off
@Override
public Mono<Void> save(Mono<RouteDefinition> route) {
return Mono.empty();
}
@Override
public Mono<Void> delete(Mono<String> routeId) {
return Mono.empty();
}
/**
* 保存路由【同步Nacos路由配置到Redis】.
* @return 保存结果
*/
public Flux<Boolean> saveRouters() {
return Flux.fromIterable(pullRouters())
.flatMap(router -> reactiveHashOperations.putIfAbsent(RedisKeyUtil.getRouteDefinitionHashKey(), router.getId(), router)).doFinally(c -> refreshEvent());
}
/**
* 删除路由.
* @return 删除结果
*/
public Mono<Boolean> removeRouters() {
return reactiveHashOperations.delete(RedisKeyUtil.getRouteDefinitionHashKey()).doFinally(c -> refreshEvent());
}
/**
* 拉取nacos动态路由配置.
* @return 拉取结果
*/
private Collection<RouteDefinition> pullRouters() {
try {
// pull nacos config info
String group = configUtil.getGroup();
ConfigService configService = configUtil.getConfigService();
String configInfo = configService.getConfig(DATA_ID, group, 5000);
return JacksonUtil.toList(configInfo, RouteDefinition.class);
}
catch (Exception e) {
log.error("错误信息:{},详情见日志", LogUtil.record(e.getMessage()), e);
throw new SystemException(ROUTER_NOT_EXIST);
}
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
/**
* 刷新事件.
*/
private void refreshEvent() {
// 刷新事件
applicationEventPublisher.publishEvent(new RefreshRoutesEvent(this));
}
// @formatter:on
}
创建router.json
[
{
"id": "laokou-auth",
"uri": "lb://laokou-auth",
"predicates": [
{
"name": "Path",
"args": {
"pattern": "/auth/**"
}
},
{
"name": "Weight",
"args": {
"_genkey_0": "auth",
"_genkey_1": "100"
}
}
],
"filters": [
{
"name": "StripPrefix",
"args": {
"parts": "1"
}
},
{
"name": "RewritePath",
"args": {
"_genkey_0": "/auth/(?<path>.*)",
"_genkey_1": "/$\\{path}"
}
}
],
"metadata": {
"version": "v3"
},
"order": 1
},
{
"id": "laokou-admin",
"uri": "lb://laokou-admin",
"predicates": [
{
"name": "Path",
"args": {
"pattern": "/admin/**"
}
},
{
"name": "Weight",
"args": {
"_genkey_0": "admin",
"_genkey_1": "100"
}
}
],
"filters": [
{
"name": "StripPrefix",
"args": {
"parts": "1"
}
},
{
"name": "RewritePath",
"args": {
"_genkey_0": "/admin/(?<path>.*)",
"_genkey_1": "/$\\{path}"
}
}
],
"metadata": {
"version": "v3"
},
"order": 1
},
{
"id": "laokou-im",
"uri": "lb:wss://laokou-im",
"predicates": [
{
"name": "Path",
"args": {
"pattern": "/im/**"
}
},
{
"name": "Weight",
"args": {
"_genkey_0": "im",
"_genkey_1": "100"
}
}
],
"filters": [
{
"name": "StripPrefix",
"args": {
"parts": "1"
}
},
{
"name": "RewritePath",
"args": {
"_genkey_0": "/im/(?<path>.*)",
"_genkey_1": "/$\\{path}"
}
}
],
"metadata": {
"version": "v3"
},
"order": 1
}
]
注意:api版本号可以加入动态路由的元数据中,即{ "metadata":{ "version": "v3" }
创建API接口
/**
* 路由管理.
*
* @author laokou
*/
@RestController
@RequiredArgsConstructor
@RequestMapping("v3/routers")
@Tag(name = "路由管理", description = "路由管理")
public class RoutersController {
private final NacosRouteDefinitionRepository nacosRouteDefinitionRepository;
@PostMapping
@Operation(summary = "保存路由", description = "保存路由")
public Flux<Boolean> saveRouter() {
return nacosRouteDefinitionRepository.saveRouters();
}
@DeleteMapping
@Operation(summary = "删除路由", description = "删除路由")
public Mono<Boolean> removeRouter() {
return nacosRouteDefinitionRepository.removeRouters();
}
}
注意:考虑到API安全性,账号和密码都需要进行RSA加密,因此,需要写拦截器
/**
* API过滤器.
*
* @author laokou
*/
@NonNullApi
@RequiredArgsConstructor
public class ApiFilter implements WebFilter {
private static final String API_PATTERN = "/v3/routers/**";
private final RequestMappingHandlerMapping requestMappingHandlerMapping;
private final GatewayExtProperties gatewayExtProperties;
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return requestMappingHandlerMapping.getHandler(exchange)
.switchIfEmpty(chain.filter(exchange))
.flatMap(handler -> {
ServerHttpRequest request = exchange.getRequest();
String requestURL = ReactiveRequestUtil.getRequestURL(request);
if (ReactiveRequestUtil.pathMatcher(requestURL, API_PATTERN)) {
return checkUsernamePassword(exchange, request, chain);
}
return chain.filter(exchange);
});
}
/**
* 校验用户名和密码.
* @param exchange 服务网络交换机
* @param request 请求对象
* @param chain 链式过滤器
* @return 响应结果
*/
private Mono<Void> checkUsernamePassword(ServerWebExchange exchange, ServerHttpRequest request,
WebFilterChain chain) {
String username = ReactiveRequestUtil.getParamValue(request, USERNAME);
String password = ReactiveRequestUtil.getParamValue(request, PASSWORD);
if (StringUtil.isEmpty(username)) {
// 用户名不能为空
return ReactiveResponseUtil.responseOk(exchange,
Result.fail(ValidatorUtil.getMessage(OAUTH2_USERNAME_REQUIRE)));
}
if (StringUtil.isEmpty(password)) {
// 密码不能为空
return ReactiveResponseUtil.responseOk(exchange,
Result.fail(ValidatorUtil.getMessage(OAUTH2_PASSWORD_REQUIRE)));
}
try {
username = RSAUtil.decryptByPrivateKey(username);
password = RSAUtil.decryptByPrivateKey(password);
} catch (Exception e) {
// 用户名或密码错误
return ReactiveResponseUtil.responseOk(exchange, Result.fail(OAUTH2_USERNAME_PASSWORD_ERROR));
}
if (gatewayExtProperties.isEnabled() && (!ObjectUtil.equals(gatewayExtProperties.getPassword(), password)
|| !ObjectUtil.equals(gatewayExtProperties.getUsername(), username))) {
// 用户名或密码错误
return ReactiveResponseUtil.responseOk(exchange, Result.fail(OAUTH2_USERNAME_PASSWORD_ERROR));
}
return chain.filter(exchange);
}
}
考虑到插拔式,所以通过注解形式装配
/**
* 认证开关注解.
*
* @author laokou
*/
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(ApiFilter.class)
public @interface EnableAuth {
}
开启API接口认证
@EnableAuth
@SpringBootApplication
public class GatewayApp {
public static void main(String[] args) {
new SpringApplicationBuilder(GatewayApp.class).web(WebApplicationType.REACTIVE).run(args);
}
}
# 负载均衡
因为需要多版本发布API接口,所以,需要对nacos负载均衡重写,采用的方式就是全路径覆盖源码
/**
* nacos路由负载均衡.
* {@link com.alibaba.cloud.nacos.loadbalancer.NacosLoadBalancerClientConfiguration}
* {@link org.springframework.cloud.loadbalancer.core.RoundRobinLoadBalancer}
*
* @author XuDaojie
* @author laokou
* @since 2021.1
*/
@Slf4j
public class NacosLoadBalancer implements ReactorServiceInstanceLoadBalancer {
/**
* 优雅停机URL.
*/
public static final String GRACEFUL_SHUTDOWN_URL = "/graceful-shutdown";
/**
* Nacos集群配置.
*/
private static final String CLUSTER_CONFIG = "nacos.cluster";
/**
* 版本.
*/
private static final String VERSION = "version";
/**
* 版本号.
*/
private static final String DEFAULT_VERSION_VALUE = "v3";
/**
* IPV6常量.
*/
private static final String IPV6_KEY = "IPv6";
/**
* Storage local valid IPv6 address, it's a flag whether local machine support IPv6
* address stack.
*/
public static String ipv6;
/**
* 服务ID.
*/
private final String serviceId;
private final ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
private final NacosDiscoveryProperties nacosDiscoveryProperties;
private final InetIPv6Utils inetIPv6Utils;
public NacosLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
String serviceId, NacosDiscoveryProperties nacosDiscoveryProperties, InetIPv6Utils inetIPv6Utils) {
this.serviceId = serviceId;
this.inetIPv6Utils = inetIPv6Utils;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
}
/**
* 初始化.
*/
@PostConstruct
public void init() {
String ip = nacosDiscoveryProperties.getIp();
if (StringUtils.isNotEmpty(ip)) {
ipv6 = Pattern.matches(IPV4_REGEX, ip) ? nacosDiscoveryProperties.getMetadata().get(IPV6_KEY) : ip;
} else {
ipv6 = inetIPv6Utils.findIPv6Address();
}
}
/**
* 根据IP类型过滤服务实例.
* @param instances 服务实例
* @return 服务实例列表
*/
private List<ServiceInstance> filterInstanceByIpType(List<ServiceInstance> instances) {
if (StringUtils.isNotEmpty(ipv6)) {
List<ServiceInstance> ipv6InstanceList = new ArrayList<>();
for (ServiceInstance instance : instances) {
if (Pattern.matches(IPV4_REGEX, instance.getHost())) {
if (StringUtils.isNotEmpty(instance.getMetadata().get(IPV6_KEY))) {
ipv6InstanceList.add(instance);
}
} else {
ipv6InstanceList.add(instance);
}
}
// Provider has no IPv6, should use IPv4.
if (ipv6InstanceList.isEmpty()) {
return instances.stream()
.filter(instance -> Pattern.matches(IPV4_REGEX, instance.getHost()))
.collect(Collectors.toList());
} else {
return ipv6InstanceList;
}
}
return instances.stream()
.filter(instance -> Pattern.matches(IPV4_REGEX, instance.getHost()))
.collect(Collectors.toList());
}
/**
* 路由负载均衡.
* @param request 请求
* @return 服务实例(响应式)
*/
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
return serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new)
.get(request)
.next()
.map(instances -> getInstanceResponse(instances, request));
}
/**
* 路由负载均衡.
* @param serviceInstances 服务实例列表
* @param request 请求
* @return 服务实例响应体
*/
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances, Request<?> request) {
if (serviceInstances.isEmpty()) {
return new EmptyResponse();
}
if (request.getContext() instanceof RequestDataContext context) {
// IP优先(优雅停机)
String path = context.getClientRequest().getUrl().getPath();
HttpHeaders headers = context.getClientRequest().getHeaders();
if (ReactiveRequestUtil.pathMatcher(HttpMethod.GET.name(), path,
Map.of(HttpMethod.GET.name(), Collections.singleton(GRACEFUL_SHUTDOWN_URL)))) {
ServiceInstance serviceInstance = serviceInstances.stream()
.filter(instance -> match(instance, headers))
.findFirst()
.orElse(null);
if (ObjectUtil.isNotNull(serviceInstance)) {
return new DefaultResponse(serviceInstance);
}
}
// 服务灰度路由
if (isGrayRouter(headers)) {
String version = RegexUtil.getRegexValue(path, URL_VERSION_REGEX);
if (StringUtils.isNotEmpty(version)) {
serviceInstances = serviceInstances.stream()
.filter(item -> item.getMetadata().getOrDefault(VERSION, DEFAULT_VERSION_VALUE).equals(version))
.toList();
}
}
}
return getInstanceResponse(serviceInstances);
}
/**
* 服务实例响应.
* @param serviceInstances 服务实例
* @return 响应结果
*/
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
if (serviceInstances.isEmpty()) {
log.error("No servers available for service: {}", this.serviceId);
return new EmptyResponse();
}
try {
String clusterName = this.nacosDiscoveryProperties.getClusterName();
List<ServiceInstance> instancesToChoose = serviceInstances;
if (StringUtils.isNotBlank(clusterName)) {
List<ServiceInstance> sameClusterInstances = serviceInstances.stream().filter(serviceInstance -> {
String cluster = serviceInstance.getMetadata().get(CLUSTER_CONFIG);
return StringUtils.equals(cluster, clusterName);
}).collect(Collectors.toList());
if (!CollectionUtils.isEmpty(sameClusterInstances)) {
instancesToChoose = sameClusterInstances;
}
} else {
log.warn("A cross-cluster call occurs,name = {}, clusterName = {}, instance = {}", serviceId,
clusterName, serviceInstances);
}
instancesToChoose = this.filterInstanceByIpType(instancesToChoose);
// 路由权重
ServiceInstance instance = NacosBalancer.getHostByRandomWeight3(instancesToChoose);
return new DefaultResponse(instance);
} catch (Exception e) {
log.error("NacosLoadBalancer error", e);
return null;
}
}
/**
* 判断服务灰度路由.
* @param headers 请求头
* @return 判断结果
*/
private boolean isGrayRouter(HttpHeaders headers) {
String gray = headers.getFirst(SERVICE_GRAY);
return ObjectUtil.equals(TRUE, gray);
}
/**
* 根据IP和端口匹配服务节点.
* @param serviceInstance 服务实例
* @param headers 请求头
* @return 匹配结果
*/
private boolean match(ServiceInstance serviceInstance, HttpHeaders headers) {
String host = headers.getFirst(SERVICE_HOST);
String port = headers.getFirst(SERVICE_PORT);
Assert.isTrue(StringUtil.isNotEmpty(host), "service-host is empty");
Assert.isTrue(StringUtil.isNotEmpty(port), "service-port is empty");
return ObjectUtil.equals(host, serviceInstance.getHost())
&& Integer.parseInt(port) == serviceInstance.getPort();
}
}
/**
* {@link ServiceInstanceListSupplier} don't use cache.<br>
* <br>
* 1. LoadBalancerCache causes information such as the weight of the service instance to
* be changed without immediate effect.<br>
* 2. Nacos itself supports caching.
*
* @author XuDaojie
* @author laokou
* @since 2021.1
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
public class NacosLoadBalancerClientConfiguration {
private static final int REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER = 183827465;
@Bean
@ConditionalOnMissingBean
public ReactorLoadBalancer<ServiceInstance> nacosLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory, NacosDiscoveryProperties nacosDiscoveryProperties,
InetIPv6Utils inetIPv6Utils) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new NacosLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class),
name, nacosDiscoveryProperties, inetIPv6Utils);
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnReactiveDiscoveryEnabled
@Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER)
public static class ReactiveSupportConfiguration {
@Bean
@ConditionalOnBean(ReactiveDiscoveryClient.class)
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "default",
matchIfMissing = true)
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder().withDiscoveryClient().build(context);
}
@Bean
@ConditionalOnBean(ReactiveDiscoveryClient.class)
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "zone-preference")
public ServiceInstanceListSupplier zonePreferenceDiscoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder().withDiscoveryClient().withZonePreference().build(context);
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnBlockingDiscoveryEnabled
@Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER + 1)
public static class BlockingSupportConfiguration {
@Bean
@ConditionalOnBean(DiscoveryClient.class)
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "default",
matchIfMissing = true)
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().build(context);
}
@Bean
@ConditionalOnBean(DiscoveryClient.class)
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "zone-preference")
public ServiceInstanceListSupplier zonePreferenceDiscoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder()
.withBlockingDiscoveryClient()
.withZonePreference()
.build(context);
}
}
}
我是老寇,我们下次再见啦!
上次更新: 11/22/2024, 4:10:36 AM