Springboot 实现接口限流之单实例

Springboot开发Restful接口时,当流量超过服务极限能力时,系统可能会出现卡死、崩溃的情况,所以就有了降级和限流。在接口层如何做限流?

1. 限流

限流其实就是:当高并发或者瞬时高并发时,为了保证系统的稳定性、可用性,系统以牺牲部分请求为代价或者延迟处理请求为代价,保证系统整体服务可用

1.1 限流常见思路

  • 从算法上看: 令牌桶(Token Bucket)、漏桶(leaky bucket) 和计数器法时最常用的三种限流算法
  • 单实例

    应用级限流方式只是单应用内的请求限流,不能进行全局限流。

    1. 限流总资源数
    2. 限流总并并发/连接/请求数
    3. 限流莫个接口的总并发/请求数
    4. 限流某个接口的时间窗请求数
    5. 平滑限流某个接口的请求数
    6. Guava RateLimiter
  • 分布式

    我们需要分布式限流和接入层限流来进行全局限流

    1. redis + lua实现中的lua脚本
    2. 使用Nginx + lua实现的lua脚本
    3. 使用OpenResty开源限流方案
    4. 限流框架,比如Sentinel实现降级限流熔断

2. 实现思路

主要思路:AOP拦截自定义的RateLimit注解,在AOP中通过Guava RateLimiter(提供了令牌桶算法实现):
平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)实现。

2.1 引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.0-jre</version>
</dependency>

2.2 定义RateLimit注解

1
2
3
4
5
6
7
8
/**
* @author xiaoyuge
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {
int limit() default 10;
}

2.3 定义AOP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @author xiaoyuge
*/
@Slf4j
@Component
@Aspect
public class RateLimitAspect {

private final ConcurrentHashMap<String, RateLimiter> EXISTED_RATE_LIMIT = new ConcurrentHashMap<>();

@Pointcut("@annotation(org.example.annotations.RateLimit)")
public void rateLimit() {
}

@Around("rateLimit()")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
final RateLimit annotation = AnnotationUtils.findAnnotation(method, RateLimit.class);
//get rate limiter RateLimiter是Guava库中的一个限流器
RateLimiter rateLimiter = EXISTED_RATE_LIMIT.computeIfAbsent(method.getName(), k -> RateLimiter.create(annotation.limit()));
//process
if (rateLimiter.tryAcquire()){
return pjp.proceed();
}else{
throw new RuntimeException("too many requests,please try again later...");
}
}
}

2.4 定义异常类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
public class BusinessException extends RuntimeException {

public BusinessException() {
super();
}

public BusinessException(final String message) {
super(message);
}

public BusinessException(final String message, final Throwable cause) {
super(message, cause);
}

public BusinessException(final Throwable cause) {
super(cause);
}

protected BusinessException(final String message, final Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

2.5 定义异常处理类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
@RestControllerAdvice
public class GlobalExceptionHandler {
@ResponseBody
@ExceptionHandler(BusinessException.class)
public ResponseResult<BusinessException> processBusinessException(BusinessException businessException) {
log.error(businessException.getLocalizedMessage());
return ResponseResult.fail(null, businessException.getLocalizedMessage() == null
? ResponseStatus.HTTP_STATUS_500.getDescription()
: businessException.getLocalizedMessage());
}

@ResponseBody
@ExceptionHandler(Exception.class)
public ResponseResult<Exception> processException(Exception exception) {
log.error(exception.getLocalizedMessage(), exception);
return ResponseResult.fail(null, ResponseStatus.HTTP_STATUS_500.getDescription());
}
}

2.6 Controller 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* @author xiaoyuge
*/
@Slf4j
@RestController
public class RateLimitController {

@RateLimit
@GetMapping("/limit")
public ResponseResult<String> limit(){
log.info("limit");
return ResponseResult.success();
}

@RateLimit(limit = 5)
@GetMapping("/limit1")
public ResponseResult<String> limit1() {
log.info("limit1");
return ResponseResult.success();
}
@GetMapping("/nolimit")
public ResponseResult<String> noRateLimiter() {
log.info("no limit");
return ResponseResult.success();
}
}

2.7 定义测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@SpringBootTest(classes = InterfaceLimitApplication.class)
@RunWith(SpringRunner.class)
public class AppTest {

@Test
public void test() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
ExecutorService executorService = Executors.newFixedThreadPool(10);
IntStream.range(0, 10).forEach(i -> {
executorService.submit(() -> {
RestTemplate restTemplate = new RestTemplate();
restTemplate.getForObject("http://localhost:8080/limit1", ResponseResult.class);
latch.countDown();
});
});

latch.await();
executorService.shutdown();
}
}

2.8 上面方案的问题点

  1. 首先, EXISTED_RATE_LIMITERS.computeIfAbsent(method.getName(), k -> RateLimiter.create(annotation.limit())) 这行代码中 method.getName()表明是对方法名进行限流的,其实并不合适,应该需要至少加上类名;
  2. 其次, 如果首次运行时访问的请求是一次性涌入的,即EXISTED_RATE_LIMITERS还是空的时候并发请求@RateLimit接口,那么RateLimiter.create(annotation.limit())是会重复创建并加入到EXISTED_RATE_LIMITERS的,这是明显的bug;
  3. 再者, 上述实现方式按照方法名去限定请求量,对于很多情况下至少需要支持按照IP和方法名,或者其它自定义的方式进行限流。
  4. 其它一些场景支持的参数抽象和封装等#