保障服务稳定的三大利器:熔断降级、服务限流和故障模拟。今天和大家谈谈限流算法的几种实现方式,本文所说的限流并非是Nginx层面的限流,而是业务代码中的逻辑限流。
限流的作用
由于 API 接口无法控制调用方的行为,因此当遇到瞬时请求量激增时,会导致接口占用过多服务器资源,使得其他请求响应速度降低或是超时,更有甚者可能导致服务器宕机。
限流 (Ratelimiting) 指对应用服务的请求进行限制,例如某一接口的请求限制为 100 个每秒, 对超过限制的请求则进行快速失败或丢弃。
限流可以应对:
- 热点业务带来的突发请求;
- 调用方 bug 导致的突发请求;
- 恶意攻击请求。
为什么要分布式限流
当应用为单点应用时,只要应用进行了限流,那么应用所依赖的各种服务也都得到了保护。
但线上业务出于各种原因考虑,多是分布式系统,单节点的限流仅能保护自身节点,但无法保护应用依赖的各种服务,并且在进行节点扩容、缩容时也无法准确控制整个服务的请求限制。
而如果实现了分布式限流,那么就可以方便地控制整个服务集群的请求限制,且由于整个集群的请求数量得到了限制,因此服务依赖的各种资源也得到了限流的保护。
代码实现
使用注解实现限流,在需要限流的方法上添加注解以及参数即可
新建一个SpringBoot工程
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.1-jre</version>
</dependency>
</dependencies>
限流的Lua脚本,在resources下
为什么会使用lua脚本,限流大多数发生在高并发场景下,redis执行lua脚本的时候是原子性。
-- 获取方法签名特征
local methodKey = KEYS[1]
-- 调用脚本的传入的限流大小
local limit = tonumber(ARGV[1])
local timeout = tonumber(ARGV[2])
-- 获取该方法的流量大小,默认0
local count = tonumber(redis.call('get', methodKey) or "0")
-- 是否超出限流阈值
if count + 1 > limit then
-- 超过阈值
return false
else
-- 累加阈值
redis.call("INCRBY", methodKey, 1)
redis.call("PEXPIRE", methodKey, timeout)
return true
end
Redis加载Lua脚本
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
/**
* @author fangxi
*/
@Configuration
public class RedisScriptConfig {
@Bean
public DefaultRedisScript<Boolean> redisScript() {
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
// 脚本位置
redisScript.setLocation(new ClassPathResource("ratelimiter.lua"));
// 脚本的返回值,这里返回 boolean
redisScript.setResultType(Boolean.class);
return redisScript;
}
}
限流具体实现
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import java.util.Collections;
/**
* @author fangxi
* 限流,拦截用户请求
*/
@Slf4j
@Component
public class AccessLimiterHandler {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* lua脚本
*/
@Autowired
private RedisScript<Boolean> rateLimitLua;
/**
* @param key 方法
* @param limit 限流个数,默认每秒的限流个数
*/
public void limitAccess(String key, Integer limit, Long pexpire) {
// 执行Lua脚本, Collections.singletonList(key) lua脚本中的key
boolean acquired = redisTemplate.execute(rateLimitLua, Collections.singletonList(key), limit.toString(), pexpire.toString());
if (!acquired) {
// 被拦截了
log.error("Your access is blocked, key: {}", key);
throw new RuntimeException("Your access is blocked");
}
}
}
新建一个注解
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;
/**
* @author fangxi
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface AccessLimiter {
/**
* 限流方法,redis中的key。默认是方法签名
*/
String methodKey() default "";
/**
* 单位时间内允许的请求
*/
int limit() default 10;
/**
* 时间单位
*/
TimeUnit unit() default TimeUnit.SECONDS;
/**
* 时间
*/
long timeout() default 1;
}
使用aop对标注注解的方法拦截
import com.storyhasyou.example.ratelimiter.annotation.AccessLimiter;
import com.storyhasyou.example.ratelimiter.limiter.AccessLimiterHandler;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @author fangxi
*/
@Aspect
@Slf4j
@Component
public class AccessLimiterAspect {
@Autowired
private AccessLimiterHandler accessLimiterHandler;
public static final String PREFIX = "rate:limiter:";
@Before("@annotation(accessLimiter)")
public void before(JoinPoint joinPoint, AccessLimiter accessLimiter) {
String key = accessLimiter.methodKey();
int limit = accessLimiter.limit();
long timeout = accessLimiter.timeout();
TimeUnit timeUnit = accessLimiter.unit();
long pexpire = timeUnit.toMillis(timeout);
if (StringUtils.isEmpty(key)) {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
String name = signature.getName();
Class<?>[] parameterTypes = signature.getParameterTypes();
key = name + Stream.of(parameterTypes).map(Class::getName).collect(Collectors.joining(",", "(", ")"));
// log.info("key = {}", key);
}
accessLimiterHandler.limitAccess(PREFIX + key, limit, pexpire);
}
}
使用注解
@RestController
public class RateLimiterController {
// 一分钟限流10个请求
@GetMapping("/limit")
@AccessLimiter(limit = 10, timeout = 1, unit = TimeUnit.MINUTES)
public ResponseEntity<Void> limit(String name) {
return ResponseEntity.ok().build();
}
}