大纲 Gateway 的流量控制 Gateway 的限流概述 在开发高并发系统时可以用三把利器来保护系统:缓存、降级和限流。缓存的目的是提升系统访问速度和增大系统处理的容量,是抗高并发流量的 “银弹”;而降级是当服务出现问题或者影响到核心流程时,需要暂时将其屏蔽掉,待高峰过去之后或者问题解决后再打开;而有些场景并不能用缓存和降级来解决,比如稀缺资源(秒杀、抢购)、写服务(如评论、下单)、频繁的复杂查询等,因此需要有一种手段来限制这些场景的并发 / 请求量,即限流。限流的目的是通过对并发访问 / 请求进行限速或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务(定向到错误页或友好的展示页)、排队或等待(比如秒杀、评论、下单等场景)、降级(返回兜底数据或默认数据)。主流的中间件都会有单机限流框架,一般支持两种限流模式:控制速率和控制并发。Spring Cloud Zuul 通过第三方扩展 spring-cloud-zuul-ratelimit 也可以支持限流。Spring Cloud Gateway 是一个 API 网关中间件,网关是所有请求流量的入口;特别是像天猫双十一、双十二等高并发场景下,当流量迅速剧增,网关除了要保护自身之外,还要限流保护后端应用。常见的限流算法有漏桶和令牌桶,计数器也可以进行粗暴限流实现。对于限流算法,可以参考 Guava 中的 RateLimiter、Bucket4j 、RateLimitJ 等项目的具体实现。下面将介绍如何基于 Bucket4j、Gateway 内置的限流过滤器工厂(RequestRateLimiterGatewayFilterFactory
)、CPU 使用率实现限流,点击下载 完整的案例代码。
Gateway 的限流方案 基于 Bucket4j 实现限流 在 Spring Cloud Gateway 中实现限流比较简单,只需要编写一个过滤器就可以。下面介绍在 Spring Cloud Gateway 中使用 Bucket4j 实现限流,由于篇幅有限,只给出 Gateway Server 工程的核心代码和配置。
1 2 3 4 5 6 7 8 9 <dependency > <groupId > com.github.vladimir-bukhtoyarov</groupId > <artifactId > bucket4j-core</artifactId > <version > 4.10.0</version > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-gateway</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public class GatewayRateLimitFilterByIp implements GatewayFilter , Ordered { private final Logger log = LoggerFactory.getLogger(GatewayRateLimitFilterByIp.class); private static final Map<String, Bucket> LOCAL_CACHE = new ConcurrentHashMap<>(); int capacity; int refillTokens; Duration refillDuration; public GatewayRateLimitFilterByIp () { } public GatewayRateLimitFilterByIp (int capacity, int refillTokens, Duration refillDuration) { this .capacity = capacity; this .refillTokens = refillTokens; this .refillDuration = refillDuration; } private Bucket createNewBucket () { Refill refill = Refill.greedy(refillTokens, refillDuration); Bandwidth limit = Bandwidth.classic(capacity, refill); return Bucket4j.builder().addLimit(limit).build(); } @Override public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { String ip = exchange.getRequest().getRemoteAddress().getAddress().getHostAddress(); Bucket bucket = LOCAL_CACHE.computeIfAbsent(ip, k -> createNewBucket()); log.info("IP:{} ,令牌桶可用的令牌数量:{} " , ip, bucket.getAvailableTokens()); if (bucket.tryConsume(1 )) { return chain.filter(exchange); } else { exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS); return exchange.getResponse().setComplete(); } } @Override public int getOrder () { return -1000 ; } }
通过 Java 流式 API 的方式配置路由规则,其中 http://127.0.0.1:9091/sayHello/peter/
对应的是后端的服务,这里不再累述 1 2 3 4 5 6 7 8 9 10 11 12 13 @Configuration public class CommonConfiguration { @Bean public RouteLocator rateLimitFilterByIp (RouteLocatorBuilder builder) { return builder.routes() .route(r -> r.path("/rateLimit" ) .filters(f -> f.filter(new GatewayRateLimitFilterByIp(10 , 1 , Duration.ofSeconds(1 )))) .uri("http://127.0.0.1:9091/sayHello/peter/" ) .id("ratelimit_route" )) .build(); } }
1 2 3 4 5 6 server: port: 9090 spring: application: name: gateway-server
分别启动各个应用后,多次访问 http://127.0.0.1:9090/rateLimit
,可以看到控制台输出如下日志信息。当可用的令牌数量为 0 时,Spring Cloud Gateway 中自定义的限流过滤器开始拒绝处理请求,直接返回 429 状态码(因为请求太多,限流返回 429 状态码)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 c.s.s.filter.GatewayRateLimitFilterByIp : IP:127.0.0.1 ,令牌桶可用的令牌数量:10 c.s.s.filter.GatewayRateLimitFilterByIp : IP:127.0.0.1 ,令牌桶可用的令牌数量:9 c.s.s.filter.GatewayRateLimitFilterByIp : IP:127.0.0.1 ,令牌桶可用的令牌数量:8 c.s.s.filter.GatewayRateLimitFilterByIp : IP:127.0.0.1 ,令牌桶可用的令牌数量:7 c.s.s.filter.GatewayRateLimitFilterByIp : IP:127.0.0.1 ,令牌桶可用的令牌数量:7 c.s.s.filter.GatewayRateLimitFilterByIp : IP:127.0.0.1 ,令牌桶可用的令牌数量:6 c.s.s.filter.GatewayRateLimitFilterByIp : IP:127.0.0.1 ,令牌桶可用的令牌数量:5 c.s.s.filter.GatewayRateLimitFilterByIp : IP:127.0.0.1 ,令牌桶可用的令牌数量:4 c.s.s.filter.GatewayRateLimitFilterByIp : IP:127.0.0.1 ,令牌桶可用的令牌数量:3 c.s.s.filter.GatewayRateLimitFilterByIp : IP:127.0.0.1 ,令牌桶可用的令牌数量:2 c.s.s.filter.GatewayRateLimitFilterByIp : IP:127.0.0.1 ,令牌桶可用的令牌数量:2 c.s.s.filter.GatewayRateLimitFilterByIp : IP:127.0.0.1 ,令牌桶可用的令牌数量:1 c.s.s.filter.GatewayRateLimitFilterByIp : IP:127.0.0.1 ,令牌桶可用的令牌数量:0 c.s.s.filter.GatewayRateLimitFilterByIp : IP:127.0.0.1 ,令牌桶可用的令牌数量:0
基于 Sentinel 实现限流 基于 CPU 的使用率实现限流 在实际项目应用中对网关进行限流时,需要参考的因素比较多,可能会根据网络请求连接数、请求流量、CPU 使用率、内存使用率等进行流控。可以通过 Spring Boot Actuator 提供的 Metrics 获取当前 CPU 的使用情况,当 CPU 使用率高于某个阈值就开启限流,否则不开启限流。值得一提的是,在 Actuator 1.x 里可以通过 SystemPublicMetrics 来获取 CPU 的使用情况,但是在 Actuator 2.x 里只能通过 MetricsEndpoint 来获取。由于篇幅有限,下面只给出 Gateway Server 工程的核心代码和配置。
1 2 3 4 5 6 7 8 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-gateway</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-actuator</artifactId > </dependency >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Component public class GatewayRateLimitFilterByCpu implements GatewayFilter , Ordered { @Autowired private MetricsEndpoint metricsEndpoint; private static final double MAX_USAGE = 0.50D ; private static final String METRIC_NAME = "system.cpu.usage" ; private final Logger log = LoggerFactory.getLogger(GatewayRateLimitFilterByCpu.class); @Override public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { Double systemCpuUsage = metricsEndpoint.metric(METRIC_NAME, null ) .getMeasurements() .stream() .filter(Objects::nonNull) .findFirst() .map(MetricsEndpoint.Sample::getValue) .filter(Double::isFinite) .orElse(0.0D ); boolean isOpenRateLimit = systemCpuUsage > MAX_USAGE; log.info("system.cpu.usage: {}, isOpenRateLimit:{} " , systemCpuUsage, isOpenRateLimit); if (isOpenRateLimit) { exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS); return exchange.getResponse().setComplete(); } else { return chain.filter(exchange); } } @Override public int getOrder () { return 0 ; } }
通过 Java 流式 API 的方式配置路由规则,其中 http://127.0.0.1:9091/sayHello/peter/
对应的是后端的服务,这里不再累述 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Configuration public class CommonConfiguration { @Autowired private GatewayRateLimitFilterByCpu gatewayRateLimitFilterByCpu; @Bean public RouteLocator customerRouteLocator (RouteLocatorBuilder builder) { return builder.routes() .route(r -> r.path("/rateLimit" ) .filters(f -> f.filter(gatewayRateLimitFilterByCpu)) .uri("http://127.0.0.1:9091/sayHello/peter/" ) .id("rateLimit_route" ) ).build(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 server: port: 9093 spring: application: name: gateway-server management: endpoints: web: exposure: include: '*' security: enabled: false
(1) Linux 系统下执行压测命令 sysbench cpu --cpu-max-prime=20000 --threads=8 --time=60 run
来模拟 CPU 高负载,其中 --threads
是指 CPU 核数,--time
是指运行时间(秒) (2) 访问 http://localhost:9093/actuator/metrics/system.cpu.usage
,查看网关服务所在机器的 CPU 使用情况 (3) 分别启动各个应用后,多次访问 http://127.0.0.1:9090/rateLimit
,当 CPU 使用率超过 50% 后,Spring Cloud Gateway 中自定义的限流过滤器开始拒绝处理请求,直接返回 429 状态码(因为请求太多,限流返回 429 状态码),控制台输出的日志信息如下:
1 2 3 4 5 c.s.s.f.GatewayRateLimitFilterByCpu : system.cpu.usage: 0.846045400926432, isOpenRateLimit:true c.s.s.f.GatewayRateLimitFilterByCpu : system.cpu.usage: 0.8458261370178468, isOpenRateLimit:true c.s.s.f.GatewayRateLimitFilterByCpu : system.cpu.usage: 0.844951044863364, isOpenRateLimit:true c.s.s.f.GatewayRateLimitFilterByCpu : system.cpu.usage: 0.8547458051590282, isOpenRateLimit:true c.s.s.f.GatewayRateLimitFilterByCpu : system.cpu.usage: 0.8486913849509269, isOpenRateLimit:true
基于 Gateway 内置的限流过滤器工厂 Spring Cloud Gateway 内置了一个名为 RequestRateLimiterGatewayFilterFactory 的过滤器工厂,可以直接用来限流;其底层的实现依赖于 Redis,使用的算法是令牌桶算法。由于篇幅有限,下面只给出 Gateway Server 工程的核心代码和配置。
1 2 3 4 5 6 7 8 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-gateway</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-redis-reactive</artifactId > </dependency >
编写 RemoteAddrKeyResolver 类 1 2 3 4 5 6 7 8 9 public class RemoteAddrKeyResolver implements KeyResolver { public static final String BEAN_NAME = "remoteAddrKeyResolver" ; @Override public Mono<String> resolve (ServerWebExchange exchange) { return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress()); } }
1 2 3 4 5 6 7 8 @Configuration public class CommonConfiguration { @Bean(RemoteAddrKeyResolver.BEAN_NAME) public RemoteAddrKeyResolver remoteAddrKeyResolver () { return new RemoteAddrKeyResolver(); } }
编写 application.yml
配置文件,添加 Gateway 限流相关的配置内容 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 server: port: 9092 spring: application: name: gateway-server redis: host: 172.175 .0 .3 port: 6379 cloud: gateway: routes: - id: rateLimit_route uri: http://127.0.0.1:9091/sayHello/peter/ order: 0 predicates: - Path=/rateLimit filters: - name: RequestRateLimiter args: key-resolver: "#{@remoteAddrKeyResolver}" redis-rate-limiter.replenishRate: 1 redis-rate-limiter.burstCapacity: 5
分别启动各个应用后,多次访问 http://127.0.0.1:9092/rateLimit
,可以发现当请求太过频繁的时候,Spring Cloud Gateway 会直接返回 429 状态码。
Gateway 的动态路由 网关中有两个重要的概念,那就是路由配置和路由规则。路由配置是指配置某请求路径路由到指定的目的地址,而路由规则是指匹配到路由配置之后,再根据路由规则进行转发处理。 Spring Cloud Gateway 作为所有请求流量的入口,在实际生产环境中为了保证高可靠和高可用,以及尽量避免重启,需要实现 Spring Cloud Gateway 动态路由配置。Spring Cloud Gateway 提供了两种方法来配置路由规则(Java 流式 API、YML 配置文件),但都是在 Spring Cloud Gateway 启动时将路由配置和规则加载到内存里,无法做到不重启网关应用就可以动态地对路由的配置和规则进行增加、修改和删除操作。Spring Cloud Gateway 的官方文档并没有讲如何进行动态配置,査看 Spring Cloud Gateway 的源码,发现在 org.springframework.cloud.gateway.actuate.GatewayControllerEndpoint
类中提供了动态配置的 Rest 接口,但是需要开启 Gateway 的端点,而且其提供的功能不是很强大。通过参考与 GatewayControllerEndpoint 相关的代码,可以自己编码实现动态路由配置。
基于 Rest API 的动态路由实现(内存版) 下面将介绍 Gateway 基于 Rest API 的动态路由实现,为了方便演示,下述示例的路由配置信息默认存储在内存;若需要持久化路由配置信息(如 MySQL 持久化),可以扩展实现 RouteDefinitionRepository 接口,点击下载 完整的案例代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-gateway</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-webflux</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-actuator</artifactId > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.47</version > </dependency >
定义数据传输模型,分别编写 GatewayRouteDefinition、GatewayPredicateDefinition、GatewayFilterDefinition 类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class GatewayRouteDefinition { private String id; private List<GatewayPredicateDefinition> predicates = new ArrayList<>(); private List<GatewayFilterDefinition> filters = new ArrayList<>(); private String uri; private int order = 0 ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class GatewayPredicateDefinition { private String name; private Map<String, String> args = new LinkedHashMap<>(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class GatewayFilterDefinition { private String name; private Map<String, String> args = new LinkedHashMap<>(); }
编写动态路由的实现类 DynamicRouteServicelmpl,需要实现 ApplicationEventPublisherAware 接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 @Service public class DynamicRouteServiceImpl implements ApplicationEventPublisherAware { private ApplicationEventPublisher publisher; @Autowired private RouteDefinitionWriter routeDefinitionWriter; private static final Logger logger = LoggerFactory.getLogger(DynamicRouteServiceImpl.class); @Override public void setApplicationEventPublisher (ApplicationEventPublisher applicationEventPublisher) { this .publisher = applicationEventPublisher; } private void notifyChanged () { this .publisher.publishEvent(new RefreshRoutesEvent(this )); } public boolean add (RouteDefinition definition) { try { routeDefinitionWriter.save(Mono.just(definition)).subscribe(); notifyChanged(); } catch (Exception e) { logger.error("add route fail: " + e.getMessage()); return false ; } return true ; } public boolean update (RouteDefinition definition) { try { this .routeDefinitionWriter.delete(Mono.just(definition.getId())); } catch (Exception e) { logger.error("update route fail: " + e.getMessage()); return false ; } try { routeDefinitionWriter.save(Mono.just(definition)).subscribe(); notifyChanged(); return true ; } catch (Exception e) { logger.error("update route fail: " + e.getMessage()); return false ; } } public boolean delete (String id) { try { this .routeDefinitionWriter.delete(Mono.just(id)).subscribe(); notifyChanged(); return true ; } catch (Exception e) { logger.error("delete route fail: " + e.getMessage()); return false ; } } public RouteDefinition assembleRouteDefinition (GatewayRouteDefinition gwdefinition) { RouteDefinition definition = new RouteDefinition(); definition.setId(gwdefinition.getId()); List<PredicateDefinition> pdList = new ArrayList<>(); for (GatewayPredicateDefinition gpDefinition : gwdefinition.getPredicates()) { PredicateDefinition predicate = new PredicateDefinition(); predicate.setArgs(gpDefinition.getArgs()); predicate.setName(gpDefinition.getName()); pdList.add(predicate); } definition.setPredicates(pdList); List<FilterDefinition> fdList = new ArrayList<>(); for (GatewayFilterDefinition gfDefinition : gwdefinition.getFilters()) { FilterDefinition filter = new FilterDefinition(); filter.setArgs(gfDefinition.getArgs()); filter.setName(gfDefinition.getName()); fdList.add(filter); } definition.setFilters(fdList); URI uri = UriComponentsBuilder.fromUriString(gwdefinition.getUri()).build().toUri(); definition.setUri(uri); return definition; } }
编写 Rest 控制器,对外暴露 Rest API 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @RestController @RequestMapping("/route") public class RouteController { @Autowired private DynamicRouteServiceImpl dynamicRouteService; @PostMapping("/add") public String add (@RequestBody GatewayRouteDefinition gwdefinition) { RouteDefinition definition = dynamicRouteService.assembleRouteDefinition(gwdefinition); return this .dynamicRouteService.add(definition) ? "success" : "fail" ; } @GetMapping("/delete/{id}") public String delete (@PathVariable String id) { return this .dynamicRouteService.delete(id) ? "success" : "fail" ; } @PostMapping("/update") public String update (@RequestBody GatewayRouteDefinition gwdefinition) { RouteDefinition definition = dynamicRouteService.assembleRouteDefinition(gwdefinition); return this .dynamicRouteService.update(definition) ? "success" : "fail" ; } }
1 2 3 4 5 6 7 @SpringBootApplication public class GatewayServerApplication { public static void main (String[] args) { SpringApplication.run(GatewayServerApplication.class, args); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 server: port: 9090 spring: application: name: gateway-server management: endpoints: web: exposure: include: '*' security: enabled: false
(1) 启动 gateway 应用 (2) 访问 http://127.0.0.1:9090/actuator/gateway/routes
,此时返回的路由信息应该为空 []
(3) 通过 Postman 访问 http://127.0.0.1:9090/route/add
,发起 Post 请求添加路由配置信息,其中需要提交的 JSON 数据如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 { "filters" : [], "id" : "jd_route" , "order" : 0 , "predicates" : [ { "args" : { "pattern" : "/jd" }, "name" : "Path" } ], "uri" : "http://www.jd.com" }
(4) 再次访问 http://127.0.0.1:9090/actuator/gateway/routes
,此时应该可以返回上面添加的路由配置信息 (5) 访问 http://127.0.0.1:9090/jd
,发现可以正常跳转到京东商城的首页,说明上面添加的路由配置生效了 (6) 通过 Postman 访问 http://127.0.0.1:9090/route/update
,发起 Post 请求更改路由配置信息,其中需要提交的 JSON 数据如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 { "filters" : [], "id" : "jd_route" , "order" : 0 , "predicates" : [ { "args" : { "pattern" : "/jd" }, "name" : "Path" } ], "uri" : "http://www.taobao.com" }
(7) 访问 http://127.0.0.1:9090/actuator/gateway/routes
,可以发现返回的路由配置信息已经被修改了 (8) 访问 http://127.0.0.1:9090/jd
,发现可以成功跳转到淘宝网 (9) 通过 Postman 访问 http://127.0.0.1:9090/route/delete/jd_route
,发起 Get 请求删除路由配置信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 { "filters" : [ { "args" : { "name" : "hystrix" , "fallbackUri" : "forward:/fallback" }, "name" : "Hystrix" }, { "args" : {}, "name" : "RateLimit" } ], "id" : "jd_route" , "order" : 0 , "predicates" : [ { "args" : { "pattern" : "/jd" }, "name" : "Path" } ], "uri" : "http://www.jd.com" }
Gateway 集群下的动态路由实现 上面的示例简单地实现了单机 Gateway 的动态路由,单机 Gateway 中的路由配置信息保存在当前实例的内存中,实例重启后会丢失路由配置信息,同时无法做到整个 Gateway 集群的动态路由控制。通过分析 Spring Cloud Gateway 源码可以发现,默认的 RouteDefinitionWriter 实现类是 InMemoryRouteDefinitionRepository。而 RouteDefinitionRepository 继承了 RouteDefinitionWriter,是 Spring Cloud Gateway 官方预留的接口,因此可以通过下面两种方式来实现集群下的动态路由控制:RouteDefinitionWriter 接口和 RouteDefinitionRepository 接口。在这里推荐实现 RouteDefinitionRepository 这个接口,从数据库或者从配置中心获取路由进行动态配置;具体可以参考上面单机版的动态路由实现,在这里不再累述。
参考资料