Springboot开发Restful接口时,当流量超过服务极限能力时,系统可能会出现卡死、崩溃的情况,所以就有了降级和限流。在接口层如何做限流?
1. 限流
限流其实就是:当高并发或者瞬时高并发时,为了保证系统的稳定性、可用性,系统以牺牲部分请求为代价或者延迟处理请求为代价,保证系统整体服务可用
1.1 限流常见思路
- 从算法上看: 令牌桶(Token Bucket)、漏桶(leaky bucket) 和计数器法时最常用的三种限流算法
2. 实现思路
主要思路:AOP拦截自定义的RateLimit注解,在AOP中通过Guava RateLimiter(提供了令牌桶算法实现):
平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)实现。
2.1 引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>30.0-jre</version> </dependency>
|
2.2 定义RateLimit注解
1 2 3 4 5 6 7 8
|
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RateLimit { int limit() default 10; }
|
2.3 定义AOP
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
@Slf4j @Component @Aspect public class RateLimitAspect {
private final ConcurrentHashMap<String, RateLimiter> EXISTED_RATE_LIMIT = new ConcurrentHashMap<>();
@Pointcut("@annotation(org.example.annotations.RateLimit)") public void rateLimit() { }
@Around("rateLimit()") public Object around(ProceedingJoinPoint pjp) throws Throwable { MethodSignature signature = (MethodSignature) pjp.getSignature(); Method method = signature.getMethod(); final RateLimit annotation = AnnotationUtils.findAnnotation(method, RateLimit.class); RateLimiter rateLimiter = EXISTED_RATE_LIMIT.computeIfAbsent(method.getName(), k -> RateLimiter.create(annotation.limit())); if (rateLimiter.tryAcquire()){ return pjp.proceed(); }else{ throw new RuntimeException("too many requests,please try again later..."); } } }
|
2.4 定义异常类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Slf4j public class BusinessException extends RuntimeException {
public BusinessException() { super(); }
public BusinessException(final String message) { super(message); }
public BusinessException(final String message, final Throwable cause) { super(message, cause); }
public BusinessException(final Throwable cause) { super(cause); }
protected BusinessException(final String message, final Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } }
|
2.5 定义异常处理类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Slf4j @RestControllerAdvice public class GlobalExceptionHandler { @ResponseBody @ExceptionHandler(BusinessException.class) public ResponseResult<BusinessException> processBusinessException(BusinessException businessException) { log.error(businessException.getLocalizedMessage()); return ResponseResult.fail(null, businessException.getLocalizedMessage() == null ? ResponseStatus.HTTP_STATUS_500.getDescription() : businessException.getLocalizedMessage()); } @ResponseBody @ExceptionHandler(Exception.class) public ResponseResult<Exception> processException(Exception exception) { log.error(exception.getLocalizedMessage(), exception); return ResponseResult.fail(null, ResponseStatus.HTTP_STATUS_500.getDescription()); } }
|
2.6 Controller 接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
@Slf4j @RestController public class RateLimitController {
@RateLimit @GetMapping("/limit") public ResponseResult<String> limit(){ log.info("limit"); return ResponseResult.success(); }
@RateLimit(limit = 5) @GetMapping("/limit1") public ResponseResult<String> limit1() { log.info("limit1"); return ResponseResult.success(); } @GetMapping("/nolimit") public ResponseResult<String> noRateLimiter() { log.info("no limit"); return ResponseResult.success(); } }
|
2.7 定义测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @SpringBootTest(classes = InterfaceLimitApplication.class) @RunWith(SpringRunner.class) public class AppTest {
@Test public void test() throws InterruptedException { CountDownLatch latch = new CountDownLatch(10); ExecutorService executorService = Executors.newFixedThreadPool(10); IntStream.range(0, 10).forEach(i -> { executorService.submit(() -> { RestTemplate restTemplate = new RestTemplate(); restTemplate.getForObject("http://localhost:8080/limit1", ResponseResult.class); latch.countDown(); }); });
latch.await(); executorService.shutdown(); } }
|
2.8 上面方案的问题点
- 首先, EXISTED_RATE_LIMITERS.computeIfAbsent(method.getName(), k -> RateLimiter.create(annotation.limit())) 这行代码中 method.getName()表明是对方法名进行限流的,其实并不合适,应该需要至少加上类名;
- 其次, 如果首次运行时访问的请求是一次性涌入的,即EXISTED_RATE_LIMITERS还是空的时候并发请求@RateLimit接口,那么RateLimiter.create(annotation.limit())是会重复创建并加入到EXISTED_RATE_LIMITERS的,这是明显的bug;
- 再者, 上述实现方式按照方法名去限定请求量,对于很多情况下至少需要支持按照IP和方法名,或者其它自定义的方式进行限流。
- 其它一些场景支持的参数抽象和封装等#