SpringCloud 大型系列课程正在制作中,欢迎大家关注与提意见。
程序员每天的CV 与 板砖,也要知其所以然,本系列课程可以帮助初学者学习 SpringBooot 项目开发 与 SpringCloud 微服务系列项目开发
本文章是系列文章中的一篇
在高并发的系统中,往往需要在系统中做限流,常见的限流方式:
- Hystrix适用线程池隔离,超过线程池的负载,走熔断的逻辑
- tomcat容器 通过限制它的线程数
- 通过时间窗口的平均速度来控制流量
常见的限流算法
- 计数器算法,一般会限制一秒钟的能够通过的请求数
- 漏桶算法, 算法内部有一个容器,类似生活用到的漏斗,当请求进来时,相当于水倒入漏斗,然后从下端小口慢慢匀速的流出。不管上面流量多大,下面流出的速度始终保持不变
- 令牌桶算法,令牌桶算法是对漏桶算法的一种改进,桶算法能够限制请求调用的速率,而令牌桶算法能够在限制调用的平均速率的同时还允许一定程度的突发调用
本文章实现的是 redis 的限流,无熔断功能
1 在网关项目配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
实现KeyResolver接口,UriKeyResolver是对URI进行限流。
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
@Component
public class UriKeyResolver implements KeyResolver {
@Override
public Mono<String> resolve(ServerWebExchange exchange) {
final String path = exchange.getRequest().getURI().getPath();
System.out.println(path);
return Mono.just(path);
}
}
2 配置指定接口的限制
比如我这里配置 order-service 的接口访问限流
server:
port: 10001
spring:
application:
name: '@project.name@'
redis:
database: 0 # Redis数据库索引(默认为0)
host: localhost # Redis服务器地址
port: 6379 # Redis服务器连接端口
password: 12345678 # Redis服务器连接密码(默认为空)
cloud:
nacos:
server-addr: localhost:8848 # nacos地址
gateway:
routes: # 网关路由配置
- id: order-service # 路由id,自定义,只要唯一即可
# uri: http://127.0.0.1:8081 # 路由的目标地址 http就是固定地址
uri: lb://order-service
predicates:
- Path=/order/**
filters:
- name: RequestRateLimiter
args:
# 每秒处理多少个平均请求数
redis-rate-limiter.replenishRate: 1
# 令允许在一秒钟内完成的最大请求数
redis-rate-limiter.burstCapacity: 1
# 获取 Bean 对象,@Bean的标识,默认bean名称与方法名一样。
key-resolver: "#{@uriKeyResolver}"
-
key-resolver 字段中 “#{@uriKeyResolver}” 对应上述配置的UriKeyResolver。
然后我这里配置的限流规则是 1秒处理1个平均请求数,然后我使用postman 访问order 接口来查询订单详情 -
name 字段必须为RequestRateLimiter
连续调用两次,第一次可以正常访问,第二次就会出现 429 错误
3 自定义 429 返回错误
限流过滤器编写在RequestRateLimiterGatewayFilterFactory中,Gateway中配置的RequestRateLimiter正是此过滤器去掉后缀后的结果,所以只需重写此过滤器即可。
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver;
import org.springframework.cloud.gateway.filter.ratelimit.RateLimiter;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@Slf4j
@Component
public class GatewayRequestRateLimiterGatewayFilterFactory extends RequestRateLimiterGatewayFilterFactory {
private final RateLimiter defaultRateLimiter;
private final KeyResolver defaultKeyResolver;
public GatewayRequestRateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter, KeyResolver defaultKeyResolver) {
super(defaultRateLimiter, defaultKeyResolver);
this.defaultRateLimiter = defaultRateLimiter;
this.defaultKeyResolver = defaultKeyResolver;
}
@Override
public GatewayFilter apply(Config config) {
KeyResolver resolver = getOrDefault(config.getKeyResolver(), defaultKeyResolver);
RateLimiter<Object> limiter = getOrDefault(config.getRateLimiter(), defaultRateLimiter);
return (exchange, chain) -> resolver.resolve(exchange).flatMap(key -> {
String routeId = config.getRouteId();
if (routeId == null) {
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
routeId = route.getId();
}
String finalRouteId = routeId;
return limiter.isAllowed(routeId, key).flatMap(response -> {
for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
}
if (response.isAllowed()) {
return chain.filter(exchange);
}
log.warn("已限流: {}", finalRouteId);
ServerHttpResponse httpResponse = exchange.getResponse();
//修改code为500
httpResponse.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
if (!httpResponse.getHeaders().containsKey("Content-Type")) {
httpResponse.getHeaders().add("Content-Type", "application/json");
}
//此处无法触发全局异常处理,手动返回
DataBuffer buffer = httpResponse.bufferFactory().wrap(("{\n"
+ " \"code\": \"1414\","
+ " \"message\": \"服务器限流\","
+ " \"data\": \"Server throttling\","
+ " \"success\": false"
+ "}").getBytes(StandardCharsets.UTF_8));
return httpResponse.writeWith(Mono.just(buffer));
});
});
}
private <T> T getOrDefault(T configValue, T defaultValue) {
return (configValue != null) ? configValue : defaultValue;
}
}
然后修改限流配置,修改 name: GatewayRequestRateLimiter
server:
port: 10001
spring:
application:
name: '@project.name@'
redis:
database: 0 # Redis数据库索引(默认为0)
host: localhost # Redis服务器地址
port: 6379 # Redis服务器连接端口
password: 12345678 # Redis服务器连接密码(默认为空)
cloud:
nacos:
server-addr: localhost:8848 # nacos地址
gateway:
routes: # 网关路由配置
- id: order-service # 路由id,自定义,只要唯一即可
# uri: http://127.0.0.1:8081 # 路由的目标地址 http就是固定地址
uri: lb://order-service
predicates:
- Path=/order/**
filters:
- name: GatewayRequestRateLimiter
args:
# 每秒处理多少个平均请求数
redis-rate-limiter.replenishRate: 1
# 令允许在一秒钟内完成的最大请求数
redis-rate-limiter.burstCapacity: 1
# 获取 Bean 对象,@Bean的标识,默认bean名称与方法名一样。
key-resolver: "#{@uriKeyResolver}"
然后再次测试限流