KCloud-Platform-IoT KCloud-Platform-IoT
首页
  • 开发手册

    • 组件【ai】
    • 组件【algorithm】
    • 组件【banner】
    • 组件【core】
    • 组件【bom】
    • 组件【cors】
    • 组件【domain】
    • 组件【crypto】
    • 组件【data-cache】
    • 组件【elasticsearch】
    • 组件【excel】
    • 组件【i18n】
    • 组件【idempotent】
    • 组件【influxdb】
    • 组件【kafka】
    • 组件【log】
    • 组件【lock】
    • 组件【mail】
    • 组件【log4j2】
    • 组件【mqtt】
    • 组件【mybatis-plus】
    • 组件【nacos】
    • 组件【netty】
    • 组件【openapi-doc】
    • 组件【oss】
    • 组件【prometheus】
    • 组件【rate-limiter】
    • 组件【reactor】
    • 组件【redis】
    • 组件【secret】
    • 组件【security】
    • 组件【sensitive】
    • 组件【sms】
    • 组件【snail-job】
    • 组件【storage】
    • 组件【tdengine】
    • 组件【tenant】
    • 组件【test】
    • 组件【trace】
    • 组件【xss】
  • 环境搭建

    • Centos7安装Mysql 8.0.33
    • Centos7安装Redis 7.0.11
    • Centos7安装RocketMQ 5.1.1
    • Centos7安装Jdk 17.0.7
    • Centos7安装Docker 23.0.6
    • Centos7安装Elasticsearch 8.6.2
    • Docker安装RabbitMQ 3.12.2
    • Docker安装Postgresql 16.1
    • Ubuntu20.04安装Docker
    • Ubuntu20.04忘记密码或指纹错误
  • 常用命令

    • Centos7常用命令
    • Centos7常用命令
  • 快速上手

    • 项目启动【dev环境】
    • 项目启动【test环境】
    • 项目启动【prod环境】
    • 更新日志
  • 前端指南

    • 前端启动
  • 后端指南

    • COLA代码规范
    • SSL证书
    • 一键修改项目模块
    • 一键生成项目骨架
    • 一键修改项目版本号
    • 一键跳过测试用例
    • 一键生成后端COLA代码
    • 分布式链路跟踪之ELK日志
    • 一键检查代码规范
    • Spring Cloud Gateway+Redis+Nacos之动态路由和负载均衡
  • 项目部署

    • 项目部署之镜像打包与推送
  • 其他

    • Java如何快速转Go
    • Go快速开发API
    • Vue快速开发Api
    • React快速开发Api
  • 文章

    • 物联网之对接MQTT最佳实践
    • 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】
    • vue3+lime-echart各种图表使用【懒人专用,建议收藏】
    • 物联网之使用Vertx实现TCP最佳实践【响应式】
    • 物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】
    • 物联网之使用Vertx实现UDP最佳实践【响应式】
    • Spring Cloud Gateway实现分布式限流和熔断降级
    • 物联网之常见网络配置
    • Go之封装Http请求和日志
    • 物联网之小白调试网关设备
    • 微服务之注册中心与ShardingSphere关于分库分表的那些事
    • Fory序列化与反序列化
  • 文章

    • IntelliJ IDEA插件推荐
  • 活动

    • KCloud-Platform-IoT 开源三周年快乐&父亲节快乐
  • 儒学

    • 儒学摘抄(一)
  • 禅语

    • 禅语摘抄(一)
  • 诗词

    • 诗词摘抄(一)
  • 道法

    • 道法摘抄(一)
  • 养生

    • 养生摘抄(一)
  • 读后感

    • 读《强者,都是含泪奔跑的人》读后感
  • 修行

    • 修身/养生/情感
  • 觉悟

    • 觉悟日记(一)
赞助
项目课程 (opens new window)
GitHub (opens new window)
首页
  • 开发手册

    • 组件【ai】
    • 组件【algorithm】
    • 组件【banner】
    • 组件【core】
    • 组件【bom】
    • 组件【cors】
    • 组件【domain】
    • 组件【crypto】
    • 组件【data-cache】
    • 组件【elasticsearch】
    • 组件【excel】
    • 组件【i18n】
    • 组件【idempotent】
    • 组件【influxdb】
    • 组件【kafka】
    • 组件【log】
    • 组件【lock】
    • 组件【mail】
    • 组件【log4j2】
    • 组件【mqtt】
    • 组件【mybatis-plus】
    • 组件【nacos】
    • 组件【netty】
    • 组件【openapi-doc】
    • 组件【oss】
    • 组件【prometheus】
    • 组件【rate-limiter】
    • 组件【reactor】
    • 组件【redis】
    • 组件【secret】
    • 组件【security】
    • 组件【sensitive】
    • 组件【sms】
    • 组件【snail-job】
    • 组件【storage】
    • 组件【tdengine】
    • 组件【tenant】
    • 组件【test】
    • 组件【trace】
    • 组件【xss】
  • 环境搭建

    • Centos7安装Mysql 8.0.33
    • Centos7安装Redis 7.0.11
    • Centos7安装RocketMQ 5.1.1
    • Centos7安装Jdk 17.0.7
    • Centos7安装Docker 23.0.6
    • Centos7安装Elasticsearch 8.6.2
    • Docker安装RabbitMQ 3.12.2
    • Docker安装Postgresql 16.1
    • Ubuntu20.04安装Docker
    • Ubuntu20.04忘记密码或指纹错误
  • 常用命令

    • Centos7常用命令
    • Centos7常用命令
  • 快速上手

    • 项目启动【dev环境】
    • 项目启动【test环境】
    • 项目启动【prod环境】
    • 更新日志
  • 前端指南

    • 前端启动
  • 后端指南

    • COLA代码规范
    • SSL证书
    • 一键修改项目模块
    • 一键生成项目骨架
    • 一键修改项目版本号
    • 一键跳过测试用例
    • 一键生成后端COLA代码
    • 分布式链路跟踪之ELK日志
    • 一键检查代码规范
    • Spring Cloud Gateway+Redis+Nacos之动态路由和负载均衡
  • 项目部署

    • 项目部署之镜像打包与推送
  • 其他

    • Java如何快速转Go
    • Go快速开发API
    • Vue快速开发Api
    • React快速开发Api
  • 文章

    • 物联网之对接MQTT最佳实践
    • 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】
    • vue3+lime-echart各种图表使用【懒人专用,建议收藏】
    • 物联网之使用Vertx实现TCP最佳实践【响应式】
    • 物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】
    • 物联网之使用Vertx实现UDP最佳实践【响应式】
    • Spring Cloud Gateway实现分布式限流和熔断降级
    • 物联网之常见网络配置
    • Go之封装Http请求和日志
    • 物联网之小白调试网关设备
    • 微服务之注册中心与ShardingSphere关于分库分表的那些事
    • Fory序列化与反序列化
  • 文章

    • IntelliJ IDEA插件推荐
  • 活动

    • KCloud-Platform-IoT 开源三周年快乐&父亲节快乐
  • 儒学

    • 儒学摘抄(一)
  • 禅语

    • 禅语摘抄(一)
  • 诗词

    • 诗词摘抄(一)
  • 道法

    • 道法摘抄(一)
  • 养生

    • 养生摘抄(一)
  • 读后感

    • 读《强者,都是含泪奔跑的人》读后感
  • 修行

    • 修身/养生/情感
  • 觉悟

    • 觉悟日记(一)
赞助
项目课程 (opens new window)
GitHub (opens new window)
  • 开发手册

    • 组件【ai】
    • 组件【algorithm】
    • 组件【banner】
    • 组件【core】
    • 组件【bom】
    • 组件【cors】
    • 组件【domain】
    • 组件【crypto】
    • 组件【data-cache】
    • 组件【elasticsearch】
    • 组件【excel】
    • 组件【i18n】
    • 组件【idempotent】
    • 组件【influxdb】
    • 组件【kafka】
    • 组件【log】
    • 组件【lock】
    • 组件【mail】
    • 组件【log4j2】
    • 组件【mqtt】
    • 组件【mybatis-plus】
    • 组件【nacos】
    • 组件【netty】
    • 组件【openapi-doc】
    • 组件【oss】
    • 组件【prometheus】
    • 组件【rate-limiter】
    • 组件【reactor】
    • 组件【redis】
    • 组件【secret】
    • 组件【security】
    • 组件【sensitive】
    • 组件【sms】
    • 组件【snail-job】
    • 组件【storage】
    • 组件【tdengine】
    • 组件【tenant】
    • 组件【test】
    • 组件【trace】
    • 组件【xss】
  • 环境搭建

    • Centos7安装Mysql 8.0.33
    • Centos7安装Redis 7.0.11
    • Centos7安装RocketMQ 5.1.1
    • Centos7安装Jdk 17.0.7
    • Centos7安装Docker 23.0.6
    • Docker安装RabbitMQ 3.12.2
    • Centos7安装Elasticsearch 8.6.2
    • Docker安装Postgresql 16.1
    • Ubuntu20.04安装Docker
    • Ubuntu20.04忘记密码或指纹错误
  • 常用命令

    • Centos7常用命令
    • Docker常用命令
  • 快速上手

    • 项目启动【dev环境】
    • 项目启动【test环境】
    • 项目启动【prod环境】
    • 更新日志
  • 前端指南

    • 前端启动
  • 后端指南

    • COLA代码规范
    • SSL证书
    • 一键修改项目模块
    • 一键生成项目骨架
    • 一键修改项目版本号
    • 一键跳过测试用例
    • 一键生成后端COLA代码
    • 分布式链路跟踪之ELK日志
    • 一键检查代码规范
    • Spring Cloud Gateway+Redis+Nacos之动态路由和负载均衡
      • 介绍
      • 动态路由
        • 引入依赖
        • yaml配置
        • Nacos路由配置
        • 路由配置【router.json】
        • 启动任务
        • 原理
      • 负载均衡
        • 引入依赖
        • ymal配置
        • 代码配置【全路径覆盖spring cloud alibaba】
    • OAuth2.1流程
  • 项目部署

    • 项目部署之镜像打包与推送
  • 其他

    • Java如何快速转Go
    • Go快速开发API
    • Vue快速开发Api
    • React快速开发Api
  • 文章

    • 物联网之对接MQTT最佳实践
    • 物联网之使用Vertx实现MQTT-Server最佳实践【响应式】
    • vue3+lime-echart各种图表使用【懒人专用,建议收藏】
    • 物联网之使用Vertx实现TCP最佳实践【响应式】
    • 物联网之使用Vertx实现HTTPWebSocket最佳实践【响应式】
    • 物联网之使用Vertx实现UDP最佳实践【响应式】
    • Spring Cloud Gateway实现分布式限流和熔断降级
    • 物联网之常见网络配置
    • Go之封装Http请求和日志
    • 物联网之小白调试网关设备
    • 微服务之注册中心与ShardingSphere关于分库分表的那些事
    • Fory序列化与反序列化
  • 推荐

    • IntelliJ IDEA插件推荐
  • 活动

    • KCloud-Platform-IoT 开源三周年快乐&父亲节快乐
  • 指南
  • 后端指南
KCloud-Platform-IoT
2024-09-03
目录

Spring Cloud Gateway+Redis+Nacos之动态路由和负载均衡

你好呀,我的老朋友!我是老寇,欢迎来到老寇云平台!

跟我一起学习动态路由和负载均衡!

# Spring Cloud Gateway

# 介绍

Spring Cloud Gateway官方地址 (opens new window)

提供了一个建立在 Spring 生态系统之上的 API 网关,包括:Spring 6、Spring Boot 3 和 Project Reactor。 Spring Cloud Gateway旨在提供一种简单而有效的方法来路由到API,并为它们提供跨领域关注点,例如:安全性,监控/指标、限流、路由等等。

注意

  • 不启用网关,请设置 spring.cloud.gateway.enabled=false
  • Spring Cloud Gateway需要运行在由Spring Webflux(响应式)提供的Netty容器,不适用于传统的Servlet容器或作为WAR构建

核心概念

  • Route:网关的基本构成单元,它由ID,目标URI,Predicate集合和Filer集合组成,如果满足Predicate,则匹配路由
  • Predicate:断言,这是jdk8 断言函数,输入类型是 Spring Framework ServerWebExchange,可以匹配HTTP请求中的任何内容,例如请求头或参数
  • Filter:是使用特定工厂构造的 GatewayFilter 实例,分为两种类型,分别是Gateway Filter(某个路由过滤器)和Global Filter(全局过滤器),您可以对下游服务请求之前或之后修改请求或响应

流程图

# 动态路由

路由规则不是写在配置文件中,而是存储在外部系统(如 Nacos、Consul、Apollo、Redis 或数据库)中。Spring Cloud Gateway 能够监听这些外部系统的变化,在不重启网关的情况下,实时地获取最新的路由配置并使其生效

注意

静态路由是路由规则以配置文件(如 application.yml)的方式硬编码在项目中

# 引入依赖

<dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-gateway-server-webflux</artifactId>
    </dependency>
    <dependency>
       <groupId>com.github.ben-manes.caffeine</groupId>
       <artifactId>caffeine</artifactId>
    </dependency>
    <dependency>
       <groupId>com.alibaba.cloud</groupId>
       <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
    <dependency>
       <groupId>com.alibaba.cloud</groupId>
       <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
    </dependency>
    <dependency>
       <groupId>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-starter-bootstrap</artifactId>
    </dependency>
</dependencies>

# yaml配置

spring:
  config:
    import:
      # 临时配置文件【解决拉取nacos配置文件时,group默认为DEFAULT_GROUP问题】
      - optional:nacos:router.json?refreshEnabled=true&group=DEFAULT
  cloud:
    gateway:
      server:
        webflux:
          enabled: true
          discovery:
            locator:
              # 关闭动态生成路由 => DiscoveryClientRouteDefinitionLocator
              # 查看DiscoveryLocatorProperties
              enabled: false
              # 开启服务ID强制小写
              lower-case-service-id: true
    nacos:
      discovery:
        # 开启服务注册&发现
        enabled: true
        # 服务注册&发现-地址
        server-addr: nacos:8848
        # 服务注册&发现-命名空间
        namespace: public
        # 服务注册&发现-用户名
        username: nacos
        # 服务注册&发现-密码
        password: nacos
        # 服务注册&发现-分组
        group: DEFAULT
        # true支持https,false不支持https
        secure: false
        # true 临时 false 持久
        ephemeral: true
        # 服务注册&发现-集群名称
        cluster-name: nacos-cluster
        heart-beat:
          # 开启心跳检测
          enabled: true
        # 每10秒发送一次心跳【单位毫秒】
        heart-beat-interval: 10000
        # 超过30秒,则标记为不健康
        heart-beat-timeout: 30000
      config:
        # 开启配置中心
        enabled: true
        # 配置中心-地址
        server-addr: nacos:8848
        # 配置中心-命名空间
        namespace: public
        # 配置中心-用户名
        username: nacos
        # 配置中心-密码
        password: nacos
        # 配置中心-分组
        group: DEFAULT
        # 配置中心-集群名称
        cluster-name: nacos-cluster
        # 配置中心-开启自动刷新
        refresh-enabled: true
        # 配置中心-配置文件格式
        file-extension: yaml

# Nacos路由配置

// @formatter:off
/**
 * nacos动态路由缓存库.
 * <a href="https://github.com/alibaba/spring-cloud-alibaba/blob/2.2.x/spring-cloud-alibaba-examples/nacos-example/nacos-config-example/src/main/java/com/alibaba/cloud/examples/example/ConfigListenerExample.java">nacos拉取配置</a>
 *
 * @author laokou
 */
// @formatter:on
@Slf4j
@NonNullApi
@Repository
public class NacosRouteDefinitionRepository implements RouteDefinitionRepository {

    static {
       ForyFactory.INSTANCE.register(org.springframework.cloud.gateway.route.RouteDefinition.class);
       ForyFactory.INSTANCE.register(org.springframework.cloud.gateway.filter.FilterDefinition.class);
       ForyFactory.INSTANCE.register(org.springframework.cloud.gateway.handler.predicate.PredicateDefinition.class);
    }

    private final String dataId = "router.json";

    private final ConfigUtils configUtils;

    private final ReactiveHashOperations<String, String, RouteDefinition> reactiveHashOperations;

    private final ExecutorService virtualThreadExecutor;

    public NacosRouteDefinitionRepository(ConfigUtils configUtils,
                                 ReactiveRedisTemplate<String, Object> reactiveRedisTemplate,
                                 ExecutorService virtualThreadExecutor) {
       this.configUtils = configUtils;
       this.reactiveHashOperations = reactiveRedisTemplate.opsForHash();
       this.virtualThreadExecutor = virtualThreadExecutor;
    }

    @PostConstruct
    public void listenRouter() throws NacosException {
       log.info("开始监听路由配置信息");
       configUtils.addListener(dataId, configUtils.getGroup(), new Listener() {
          @Override
          public Executor getExecutor() {
             return Executors.newSingleThreadExecutor();
          }

          @Override
          public void receiveConfigInfo(String routes) {
             log.info("监听路由配置信息,开始同步路由配置:{}", routes);
             virtualThreadExecutor.execute(() -> syncRouter(getRoutes(routes))
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe());
          }
       });
    }

    // @formatter:off
    /**
     * 路由基本原理总结:
     * 1.从NacosRouteDefinitionRepository、DiscoveryClientRouteDefinitionLocator和PropertiesRouteDefinitionLocator加载定义的路由规则.
     * 2.通过CompositeRouteDefinitionLocator合并定义的路由规则.
     * 3.加载所有的定义的路由规则,使用配置的断言工厂和过滤器工厂来创建路由.
     * 4.将路由缓存,提高路由查找性能.
     * <p>
     * 获取动态路由(避免集群中网关频繁调用Redis,需要本地缓存).
     * {@link org.springframework.cloud.gateway.config.GatewayAutoConfiguration
     * @return 定义的路由规则
     */
    @Override
    public Flux<RouteDefinition> getRouteDefinitions() {
       return reactiveHashOperations.entries(RedisKeyUtils.getRouteDefinitionHashKey())
          .mapNotNull(Map.Entry::getValue)
          .onErrorContinue((throwable, routeDefinition) -> {
             if (log.isErrorEnabled()) {
                log.error("从Redis获取路由失败,错误信息:{}", throwable.getMessage(), throwable);
             }
          });
    }
    // @formatter:on

    @Override
    public Mono<Void> save(Mono<RouteDefinition> route) {
       return Mono.empty();
    }

    @Override
    public Mono<Void> delete(Mono<String> routeId) {
       return Mono.empty();
    }

    /**
     * 同步路由【同步Nacos动态路由配置到Redis,并且刷新本地缓存】.
     * @return 同步结果
     */
    public Mono<Void> syncRouter() {
       return syncRouter(getRoutes());
    }

    /**
     * 同步路由【同步Nacos动态路由配置到Redis,并且刷新本地缓存】.
     * @param routes 路由
     * @return 同步结果
     */
    private Mono<Void> syncRouter(Collection<RouteDefinition> routes) {
       return reactiveHashOperations.delete(RedisKeyUtils.getRouteDefinitionHashKey())
          .doOnError(throwable -> log.error("删除路由失败,错误信息:{}", throwable.getMessage(), throwable))
          .doOnSuccess(removeFlag -> publishRefreshRoutesEvent())
          .thenMany(Flux.fromIterable(routes))
          .flatMap(router -> reactiveHashOperations.putIfAbsent(RedisKeyUtils.getRouteDefinitionHashKey(), router.getId(), router)
             .doOnError(throwable -> log.error("保存路由失败,错误信息:{}", throwable.getMessage(), throwable)))
          .then()
          .doOnSuccess(saveFlag -> publishRefreshRoutesEvent());
    }

    // @formatter:off
    /**
     * 获取nacos动态路由配置.
     * @return 拉取结果
     */
    private Collection<RouteDefinition> getRoutes() {
       return getRoutes(EMPTY);
    }

    /**
     * 获取nacos动态路由配置.
     * @param str 路由配置
     * @return 拉取结果
     */
    private Collection<RouteDefinition> getRoutes(String str) {
       try {
          String routes = StringUtils.isEmpty(str) ? configUtils.getConfig(dataId, configUtils.getGroup(), 5000) : str;
          return JacksonUtils.toList(routes, RouteDefinition.class);
       }
       catch (Exception e) {
          log.error("动态路由【API网关】不存在,错误信息:{}", e.getMessage(), e);
          throw new SystemException(ROUTER_NOT_EXIST);
       }
    }

    /**
     * 刷新事件.
     */
    private void publishRefreshRoutesEvent() {
       // 刷新事件
       SpringContextUtils.publishEvent(new RefreshRoutesEvent(this));
    }
    // @formatter:on

}

# 路由配置【router.json】

[
  {
    "id": "laokou-auth",
    "uri": "lb://laokou-auth",
    "predicates": [
      {
        "name": "Path",
        "args": {
          "pattern": "/auth/**"
        }
      },
      {
        "name": "Weight",
        "args": {
          "_genkey_0": "auth",
          "_genkey_1": "100"
        }
      }
    ],
    "filters": [
      {
        "name": "StripPrefix",
        "args": {
          "parts": "1"
        }
      },
      {
        "name": "RewritePath",
        "args": {
          "_genkey_0": "/auth/(?<path>.*)",
          "_genkey_1": "/$\{path}"
        }
      }
    ],
    "metadata": {
      "version": "v3"
    },
    "order": 1
  }
]

注意:api版本号可以加入动态路由的元数据中,即{ "metadata":{ "version": "v3" }

# 启动任务

@EnableDiscoveryClient
@SpringBootApplication
public class GatewayApp implements CommandLineRunner {

    private final NacosRouteDefinitionRepository nacosRouteDefinitionRepository;

    private final ExecutorService virtualThreadExecutor;

    // @formatter:off
    public static void main(String[] args) throws UnknownHostException, NoSuchAlgorithmException, KeyManagementException {
       // 配置关闭nacos日志,因为nacos的log4j2导致本项目的日志不输出的问题
       System.setProperty("nacos.logging.default.config.enabled", "false");
       // 启用虚拟线程支持
       System.setProperty("reactor.schedulers.defaultBoundedElasticOnVirtualThreads", "true");
       new SpringApplicationBuilder(GatewayApp.class).web(WebApplicationType.REACTIVE).run(args);
    }

    @Override
    public void run(String... args) {
       // 执行同步路由任务
       virtualThreadExecutor.execute(() -> nacosRouteDefinitionRepository.syncRouter()
          .subscribeOn(Schedulers.boundedElastic())
          .subscribe());
    }
    // @formatter:on

}

# 原理

路由基本原理总结:
1.从NacosRouteDefinitionRepository、DiscoveryClientRouteDefinitionLocator和PropertiesRouteDefinitionLocator加载定义的路由规则.
2.通过CompositeRouteDefinitionLocator合并定义的路由规则.
3.加载所有的定义的路由规则,使用配置的断言工厂和过滤器工厂来创建路由.
4.将路由缓存,提高路由查找性能.

# 负载均衡

# 引入依赖

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-loadbalancer</artifactId>
</dependency>

# ymal配置

spring:
    cloud:
      # loadbalancer
      loadbalancer:
        cache:
          caffeine:
            # 初始容量 => 30
            # 最大容量 => 4096
            # 淘汰规则 => 最后一次写操作后经过30s过期
            spec: initialCapacity=30,expireAfterWrite=30s,maximumSize=4096
          # 开启缓存
          enabled: true
        nacos:
          # 开启Nacos路由负载均衡
          enabled: true

# 代码配置【全路径覆盖spring cloud alibaba】

@ConditionalOnDiscoveryEnabled
@ConditionalOnLoadBalancerNacos
@Configuration(proxyBeanMethods = false)
public class NacosLoadBalancerClientConfiguration {

    private static final int REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER = 183827465;

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnBean({ LoadBalancerClientFactory.class, NacosDiscoveryProperties.class, InetIPv6Utils.class })
    public ReactorLoadBalancer<ServiceInstance> nacosLoadBalancer(Environment environment,
          LoadBalancerClientFactory loadBalancerClientFactory, NacosDiscoveryProperties nacosDiscoveryProperties,
          InetIPv6Utils inetIPv6Utils, List<ServiceInstanceFilter> serviceInstanceFilters,
          List<LoadBalancerAlgorithm> loadBalancerAlgorithms) {
       String serviceId = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
       Map<String, LoadBalancerAlgorithm> loadBalancerAlgorithmMap = new HashMap<>();
       loadBalancerAlgorithms.forEach(loadBalancerAlgorithm -> {
          if (!loadBalancerAlgorithmMap.containsKey(loadBalancerAlgorithm.getServiceId())) {
             loadBalancerAlgorithmMap.put(loadBalancerAlgorithm.getServiceId(), loadBalancerAlgorithm);
          }
       });
       return new NacosLoadBalancer(
             loadBalancerClientFactory.getLazyProvider(serviceId, ServiceInstanceListSupplier.class), serviceId,
             nacosDiscoveryProperties, inetIPv6Utils, serviceInstanceFilters, loadBalancerAlgorithmMap);
    }

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnReactiveDiscoveryEnabled
    @Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER)
    public static class ReactiveSupportConfiguration {

       @Bean
       @ConditionalOnBean(ReactiveDiscoveryClient.class)
       @ConditionalOnMissingBean
       @ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "default",
             matchIfMissing = true)
       public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
             ConfigurableApplicationContext context) {
          return ServiceInstanceListSupplier.builder().withDiscoveryClient().build(context);
       }

       @Bean
       @ConditionalOnBean(ReactiveDiscoveryClient.class)
       @ConditionalOnMissingBean
       @ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "zone-preference")
       public ServiceInstanceListSupplier zonePreferenceDiscoveryClientServiceInstanceListSupplier(
             ConfigurableApplicationContext context) {
          return ServiceInstanceListSupplier.builder().withDiscoveryClient().withZonePreference().build(context);
       }

    }

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnBlockingDiscoveryEnabled
    @Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER + 1)
    public static class BlockingSupportConfiguration {

       @Bean
       @ConditionalOnBean(DiscoveryClient.class)
       @ConditionalOnMissingBean
       @ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "default",
             matchIfMissing = true)
       public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
             ConfigurableApplicationContext context) {
          return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().build(context);
       }

       @Bean
       @ConditionalOnBean(DiscoveryClient.class)
       @ConditionalOnMissingBean
       @ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "zone-preference")
       public ServiceInstanceListSupplier zonePreferenceDiscoveryClientServiceInstanceListSupplier(
             ConfigurableApplicationContext context) {
          return ServiceInstanceListSupplier.builder()
             .withBlockingDiscoveryClient()
             .withZonePreference()
             .build(context);
       }

    }

}
@Slf4j
public class NacosLoadBalancer implements ReactorServiceInstanceLoadBalancer {

    /**
     * Storage local valid IPv6 address, it's a flag whether local machine support IPv6
     * address stack.
     */
    public static String ipv6;

    private final String serviceId;

    private final ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;

    private final NacosDiscoveryProperties nacosDiscoveryProperties;

    private final InetIPv6Utils inetIPv6Utils;

    private final List<ServiceInstanceFilter> serviceInstanceFilters;

    private final Map<String, LoadBalancerAlgorithm> loadBalancerAlgorithmMap;

    public NacosLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
          String serviceId, NacosDiscoveryProperties nacosDiscoveryProperties, InetIPv6Utils inetIPv6Utils,
          List<ServiceInstanceFilter> serviceInstanceFilters,
          Map<String, LoadBalancerAlgorithm> loadBalancerAlgorithmMap) {
       this.serviceId = serviceId;
       this.inetIPv6Utils = inetIPv6Utils;
       this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
       this.nacosDiscoveryProperties = nacosDiscoveryProperties;
       this.serviceInstanceFilters = serviceInstanceFilters;
       this.loadBalancerAlgorithmMap = loadBalancerAlgorithmMap;
    }

    /**
     * 初始化.
     */
    @PostConstruct
    public void init() {
       String ip = nacosDiscoveryProperties.getIp();
       if (com.alibaba.cloud.commons.lang.StringUtils.isNotEmpty(ip)) {
          ipv6 = RegexUtils.ipv4Regex(ip) ? nacosDiscoveryProperties.getMetadata().get("IPv6") : ip;
       }
       else {
          ipv6 = inetIPv6Utils.findIPv6Address();
       }
    }

    /**
     * 根据IP类型过滤服务实例.
     * @param instances 服务实例
     * @return 服务实例列表
     */
    private List<ServiceInstance> filterInstanceByIpType(List<ServiceInstance> instances) {
       if (com.alibaba.cloud.commons.lang.StringUtils.isNotEmpty(ipv6)) {
          List<ServiceInstance> ipv6InstanceList = new ArrayList<>();
          for (ServiceInstance instance : instances) {
             if (RegexUtils.ipv4Regex(instance.getHost())) {
                if (com.alibaba.cloud.commons.lang.StringUtils.isNotEmpty(instance.getMetadata().get("IPv6"))) {
                   ipv6InstanceList.add(instance);
                }
             }
             else {
                ipv6InstanceList.add(instance);
             }
          }
          // Provider has no IPv6, should use IPv4.
          if (ipv6InstanceList.isEmpty()) {
             return instances.stream().filter(instance -> RegexUtils.ipv4Regex(instance.getHost())).toList();
          }
          else {
             return ipv6InstanceList;
          }
       }
       return instances.stream().filter(instance -> RegexUtils.ipv4Regex(instance.getHost())).toList();
    }

    /**
     * 路由负载均衡.
     * @param request 请求
     * @return 服务实例(响应式)
     */
    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
       return serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new)
          .get(request)
          .next()
          .map(instances -> getInstanceResponse(instances, request));
    }

    /**
     * 路由负载均衡.
     * @param serviceInstances 服务实例列表
     * @param request 请求
     * @return 服务实例响应体
     */
    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances, Request<?> request) {
       if (serviceInstances.isEmpty()) {
          log.warn("No servers available for service: {}", this.serviceId);
          return new EmptyResponse();
       }
       if (request.getContext() instanceof RequestDataContext context) {
          String path = context.getClientRequest().getUrl().getPath();
          HttpHeaders headers = context.getClientRequest().getHeaders();
          // 服务灰度路由
          if (isGrayRouter(headers)) {
             String version = RegexUtils.getRegexValue(path, "/(v\d+)/");
             if (StringUtils.isNotEmpty(version)) {
                serviceInstances = serviceInstances.stream()
                   .filter(item -> item.getMetadata().getOrDefault("version", "v3").equals(version))
                   .toList();
             }
          }
       }
       return getInstanceResponse(request, serviceInstances);
    }

    /**
     * 服务实例响应.
     * @param serviceInstances 服务实例
     * @return 响应结果
     */
    private Response<ServiceInstance> getInstanceResponse(Request<?> request, List<ServiceInstance> serviceInstances) {
       if (serviceInstances.isEmpty()) {
          log.error("No servers available for service: {}", this.serviceId);
          return new EmptyResponse();
       }
       try {
          String clusterName = this.nacosDiscoveryProperties.getClusterName();
          List<ServiceInstance> instancesToChoose = serviceInstances;
          if (com.alibaba.cloud.commons.lang.StringUtils.isNotBlank(clusterName)) {
             List<ServiceInstance> sameClusterInstances = serviceInstances.stream().filter(serviceInstance -> {
                String cluster = serviceInstance.getMetadata().get("nacos.cluster");
                return com.alibaba.cloud.commons.lang.StringUtils.equals(cluster, clusterName);
             }).toList();
             if (!CollectionUtils.isEmpty(sameClusterInstances)) {
                instancesToChoose = sameClusterInstances;
             }
          }
          else {
             log.warn("A cross-cluster call occurs,name = {}, clusterName = {}, instance = {}", serviceId,
                   clusterName, serviceInstances);
          }
          instancesToChoose = this.filterInstanceByIpType(instancesToChoose);

          // Filter the service list sequentially based on the order number
          for (ServiceInstanceFilter filter : serviceInstanceFilters) {
             instancesToChoose = filter.filterInstance(request, instancesToChoose);
          }

          ServiceInstance instance;
          // Find the corresponding load balancing algorithm through the service ID and
          // select the final service instance
          if (loadBalancerAlgorithmMap.containsKey(serviceId)) {
             instance = loadBalancerAlgorithmMap.get(serviceId).getInstance(request, instancesToChoose);
          }
          else {
             instance = loadBalancerAlgorithmMap.get(LoadBalancerAlgorithm.DEFAULT_SERVICE_ID)
                .getInstance(request, instancesToChoose);
          }

          return new DefaultResponse(instance);
       }
       catch (Exception e) {
          log.error("NacosLoadBalancer error", e);
          return null;
       }
    }

    /**
     * 判断服务灰度路由.
     * @param headers 请求头
     * @return 判断结果
     */
    private boolean isGrayRouter(HttpHeaders headers) {
       String gray = headers.getFirst("service-gray");
       return ObjectUtils.equals(TRUE, gray);
    }

}

我是老寇,我们下次再见啦!

上次更新: 9/15/2025, 4:45:51 AM
一键检查代码规范
OAuth2.1流程

← 一键检查代码规范 OAuth2.1流程→

Theme by Vdoing | Copyright © 2022-2025 laokou | Apache 2.0 License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式