Redis + Lua脚本实现服务限流


保障服务稳定的三大利器:熔断降级、服务限流和故障模拟。今天和大家谈谈限流算法的几种实现方式,本文所说的限流并非是Nginx层面的限流,而是业务代码中的逻辑限流。

限流的作用

由于 API 接口无法控制调用方的行为,因此当遇到瞬时请求量激增时,会导致接口占用过多服务器资源,使得其他请求响应速度降低或是超时,更有甚者可能导致服务器宕机。
限流 (Ratelimiting) 指对应用服务的请求进行限制,例如某一接口的请求限制为 100 个每秒, 对超过限制的请求则进行快速失败或丢弃。
限流可以应对:

  • 热点业务带来的突发请求;
  • 调用方 bug 导致的突发请求;
  • 恶意攻击请求。

为什么要分布式限流

当应用为单点应用时,只要应用进行了限流,那么应用所依赖的各种服务也都得到了保护。

但线上业务出于各种原因考虑,多是分布式系统,单节点的限流仅能保护自身节点,但无法保护应用依赖的各种服务,并且在进行节点扩容、缩容时也无法准确控制整个服务的请求限制。

而如果实现了分布式限流,那么就可以方便地控制整个服务集群的请求限制,且由于整个集群的请求数量得到了限制,因此服务依赖的各种资源也得到了限流的保护。

代码实现

使用注解实现限流,在需要限流的方法上添加注解以及参数即可

新建一个SpringBoot工程

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-aop</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>28.1-jre</version>
    </dependency>
</dependencies>

限流的Lua脚本,在resources下
为什么会使用lua脚本,限流大多数发生在高并发场景下,redis执行lua脚本的时候是原子性。

-- 获取方法签名特征
local methodKey = KEYS[1]

-- 调用脚本的传入的限流大小
local limit = tonumber(ARGV[1])
local timeout = tonumber(ARGV[2])

-- 获取该方法的流量大小,默认0
local count = tonumber(redis.call('get', methodKey) or "0")

-- 是否超出限流阈值
if count + 1 > limit then
    -- 超过阈值
    return false
else
    -- 累加阈值
    redis.call("INCRBY", methodKey, 1)
    redis.call("PEXPIRE", methodKey, timeout)
    return true
end

Redis加载Lua脚本

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;

/**
 * @author fangxi
 */
@Configuration
public class RedisScriptConfig {

    @Bean
    public DefaultRedisScript<Boolean> redisScript() {
        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
        // 脚本位置
        redisScript.setLocation(new ClassPathResource("ratelimiter.lua"));
        // 脚本的返回值,这里返回 boolean
        redisScript.setResultType(Boolean.class);
        return redisScript;
    }

}

限流具体实现

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import java.util.Collections;

/**
 * @author fangxi
 *  限流,拦截用户请求
 */
@Slf4j
@Component
public class AccessLimiterHandler {

    @Autowired
    private StringRedisTemplate redisTemplate;
    /**
     * lua脚本
     */
    @Autowired
    private RedisScript<Boolean> rateLimitLua;

    /**
     * @param key 方法
     * @param limit 限流个数,默认每秒的限流个数
     */
    public void limitAccess(String key, Integer limit, Long pexpire) {
        // 执行Lua脚本, Collections.singletonList(key) lua脚本中的key
        boolean acquired = redisTemplate.execute(rateLimitLua, Collections.singletonList(key), limit.toString(), pexpire.toString());
        if (!acquired) {
            // 被拦截了
            log.error("Your access is blocked, key: {}", key);
            throw new RuntimeException("Your access is blocked");
        }
    }

}

新建一个注解

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;

/**
 * @author fangxi
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface AccessLimiter {

    /**
     * 限流方法,redis中的key。默认是方法签名
     */
    String methodKey() default "";

    /**
     * 单位时间内允许的请求
     */
    int limit() default 10;

    /**
     * 时间单位
     */
    TimeUnit unit() default TimeUnit.SECONDS;

    /**
     * 时间
     */
    long timeout() default 1;

}

使用aop对标注注解的方法拦截

import com.storyhasyou.example.ratelimiter.annotation.AccessLimiter;
import com.storyhasyou.example.ratelimiter.limiter.AccessLimiterHandler;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * @author fangxi
 */
@Aspect
@Slf4j
@Component
public class AccessLimiterAspect {

    @Autowired
    private AccessLimiterHandler accessLimiterHandler;
    public static final String PREFIX = "rate:limiter:";

    @Before("@annotation(accessLimiter)")
    public void before(JoinPoint joinPoint, AccessLimiter accessLimiter) {
        String key = accessLimiter.methodKey();
        int limit = accessLimiter.limit();
        long timeout = accessLimiter.timeout();
        TimeUnit timeUnit = accessLimiter.unit();
        long pexpire = timeUnit.toMillis(timeout);
        if (StringUtils.isEmpty(key)) {
            MethodSignature signature = (MethodSignature) joinPoint.getSignature();
            String name = signature.getName();
            Class<?>[] parameterTypes = signature.getParameterTypes();
            key = name + Stream.of(parameterTypes).map(Class::getName).collect(Collectors.joining(",", "(", ")"));
            // log.info("key = {}", key);
        }
        accessLimiterHandler.limitAccess(PREFIX + key, limit, pexpire);
    }
}

使用注解

@RestController
public class RateLimiterController {

	// 一分钟限流10个请求
    @GetMapping("/limit")
    @AccessLimiter(limit = 10, timeout = 1, unit = TimeUnit.MINUTES)
    public ResponseEntity<Void> limit(String name) {
        return ResponseEntity.ok().build();
    }

}

Author: Re:0
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint policy. If reproduced, please indicate source Re:0 !
 Previous
组合模式 组合模式
定义这节我们将介绍一种全新的设计模式——组合模式。想起“组合”二字,自然联想到了很多,比如:文件和文件夹、容器和组件、火车和车厢、大树的枝干和叶子等等,大自然中组合的例子数不胜数。
2022-03-09
Next 
过滤器模式 过滤器模式
定义模式引入首先,什么是过滤器模式?这种模式生活中比较常见,比如移动推出某项优惠套餐,但是套餐可使用的用户群体有限,必须满足入网 5 年以上这种条件,我们可以将 “入网五年” 作为客户群体的过滤条件,这种就是简单的过滤器模式应用。
2022-03-08
  TOC