Redis——Redis扩展应用限流解决方案原理

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6


摘要

限流算法在分布式领域是一个经常被提起的话题,当系统的处理能力有限时,如何阻止计划外的请求继续对系统施压,这是一个需要重视的问题。在这里用 “断尾求生” 形容限流背后的思想,当然还有很多成语也表达了类似的意思,如弃卒保车、壮士断腕等等。除了控制流量,限流还有一个应用目的是用于控制用户行为,避免垃圾请求。如在UGC 社区,用户的发帖、回复、点赞等行为都要严格受控,一般要严格限定某行为在规定时间内允许的次数,超过次数那就是非法行为。对非法行为,业务必须规定适当的惩处策略。

一、限流算法原理

代码的实现:https://gitee.com/xjl2462612540/RedisPrinciple/tree/master/springboot-redis/src/main/java/com/zhuangxiaoyan/springbootredis/cell

1.1 计数器算法

计数器算法是使用计数器在周期内累加访问次数,当达到设定的限流值时,触发限流策略。下一个周期开始时,进行清零,重新计数。此算法在单机还是分布式环境下实现都非常简单,使用redis的incr原子自增性和线程安全即可轻松实现。

Redis——Redis扩展应用限流解决方案原理_限流

 这个算法通常用于QPS限流和统计总访问量,对于秒级以上的时间周期来说,会存在一个非常严重的问题,那就是临界问题,如下图:

Redis——Redis扩展应用限流解决方案原理_限流_02

1.2 滑动窗口的算法

滑动窗口算法是将时间周期分为N个小周期,分别记录每个小周期内访问次数,并且根据时间滑动删除过期的小周期。如下图,假设时间周期为1min,将1min再分为2个小周期,统计每个小周期的访问数量,则可以看到,第一个时间周期内,访问数量为75,第二个时间周期内,访问数量为100,超过100的访问则被限流掉了  

Redis——Redis扩展应用限流解决方案原理_redis_03

由此可见,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。此算法可以很好的解决固定窗口算法的临界问题。

1.3 漏桶算法

漏桶算法是访问请求到达时直接放入漏桶,如当前容量已达到上限(限流值),则进行丢弃(触发限流策略)。漏桶以固定的速率进行释放访问请求(即请求通过),直到漏桶为空。

1.4 令牌桶算法

令牌桶算法是程序以r(r=时间周期/限流值)的速度向令牌桶中增加令牌,直到令牌桶满,请求到达时向令牌桶请求令牌,如获取到令牌则通过请求,否则触发限流策略

Redis——Redis扩展应用限流解决方案原理_限流_04

二、限流算法实现与实战

2.1 Redis的zset滑动时间窗口实战

这个限流需求中存在一个滑动时间窗口,想想zset 数据结构的 score 值,是不是可以通过 score 来圈出这个时间窗口来。而且我们只需要保留这个时间窗口,窗口之外的数据都可以砍掉。那这个 zset 的 value 填什么比较合适呢?它只需要保证唯一性即可,用 uuid 会比较浪费空间,那就改用毫秒时间戳吧。如图所示,用一个 zset 结构记录用户的行为历史,每一个行为都会作为 zset 中的一个key 保存下来。同一个用户同一种行为用一个 zset 记录。为节省内存,我们只需要保留时间窗口内的行为记录,同时如果用户是冷用户,滑动时间窗口内的行为是空记录,那么这个 zset 就可以从内存中移除,不再占用空间。zset 集合中只有 score 值非常重要, value 值没有特别的意义,只需要保证它是唯一的就可以了。因为这几个连续的 Redis 操作都是针对同一个 key 的,使用 pipeline 可以显著提升Redis 存取效率。但这种方案也有缺点,因为它要记录时间窗口内所有的行为记录,如果这个量很大,比如限定 60s 内操作不得超过 100w 次这样的参数,它是不适合做这样的限流的,因为会消耗大量的存储空间。

Redis——Redis扩展应用限流解决方案原理_redis_05

public class SimpleRateLimiter {
    private Jedis jedis;
    public SimpleRateLimiter(Jedis jedis) {
        this.jedis = jedis;
    }
    public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) {
        String key = String.format("hist:%s:%s", userId, actionKey);
        long nowTs = System.currentTimeMillis();
        Pipeline pipe = jedis.pipelined();
        pipe.multi();
        pipe.zadd(key, nowTs, "" + nowTs);
        pipe.zremrangeByScore(key, 0, nowTs - period * 1000);
        Response<Long> count = pipe.zcard(key);
        pipe.expire(key, period + 1);
        pipe.exec();
        pipe.close();
        return count.get() <= maxCount;
    }
    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        SimpleRateLimiter limiter = new SimpleRateLimiter(jedis);
        for(int i=0;i<20;i++) {
            System.out.println(limiter.isActionAllowed("laoqian", "reply", 60, 5));
        }
    }
}

整体思路就是:每一个行为到来时,都维护一次时间窗口。将时间窗口外的记录全部清理掉,只保留窗口内的记录。zset 集合中只有 score 值非常重要,value 值没有特别的意义,只需要保证它是唯一的就可以了。因为这几个连续的 Redis 操作都是针对同一个 key 的,使用 pipeline 可以显著提升 Redis 存取效率。但这种方案也有缺点,因为它要记录时间窗口内所有的行为记录,如果这个量很大,比如限定 60s 内操作不得超过 100w 次这样的参数,它是不适合做这样的限流的,因为会消耗大量的存储空间。

2.2 Redis令牌桶限流实现与实战

# coding: utf8
import time


class Funnel(object):
    def __init__(self, capacity, leaking_rate):
        self.capacity = capacity  # 漏斗容量
        self.leaking_rate = leaking_rate  # 漏嘴流水速率
        self.left_quota = capacity  # 漏斗剩余空间
        self.leaking_ts = time.time()  # 上一次漏水时间

    def make_space(self):
        now_ts = time.time()
        delta_ts = now_ts - self.leaking_ts  # 距离上一次漏水过去了多久
        delta_quota = delta_ts * self.leaking_rate  # 又可以腾出不少空间了
        if delta_quota < 1:  # 腾的空间太少,那就等下次吧
            return
        self.left_quota += delta_quota  # 增加剩余空间
        self.leaking_ts = now_ts  # 记录漏水时间
        if self.left_quota > self.capacity:  # 剩余空间不得高于容量
            self.left_quota = self.capacity

    def watering(self, quota):
        self.make_space()
        if self.left_quota >= quota:  # 判断剩余空间是否足够
            self.left_quota -= quota
        return True
        return False


funnels = {}  # 所有的漏斗


# capacity 漏斗容量
# leaking_rate 漏嘴流水速率 quota/s
def is_action_allowed(user_id, action_key, capacity, leaking_rate):
    key = '%s:%s' % (user_id, action_key)
    funnel = funnels.get(key)
    if not funnel:
        funnel = Funnel(capacity, leaking_rate)
        funnels[key] = funnel
        return funnel.watering(1)


for i in range(20):
    print(is_action_allowed('laoqian', 'reply', 15, 0.5))
class FunnelRateLimiter {
    static class Funnel {
        int capacity;
        float leakingRate;
        int leftQuota;
        long leakingTs;

        public Funnel(int capacity, float leakingRate) {
            this.capacity = capacity;
            this.leakingRate = leakingRate;
            this.leftQuota = capacity;
            this.leakingTs = System.currentTimeMillis();
        }

        void makeSpace() {
            long nowTs = System.currentTimeMillis();
            long deltaTs = nowTs - leakingTs;
            int deltaQuota = (int) (deltaTs * leakingRate);
            if (deltaQuota < 0) { // 间隔时间太长,整数数字过大溢出
                this.leftQuota = capacity;
                this.leakingTs = nowTs;
                return;
            }
            if (deltaQuota < 1) { // 腾出空间太小,最小单位是1
                return;
            }
            this.leftQuota += deltaQuota;
            this.leakingTs = nowTs;
            if (this.leftQuota > this.capacity) {
                this.leftQuota = this.capacity;
            }
        }

        boolean watering(int quota) {
            makeSpace();
            if (this.leftQuota >= quota) {
                this.leftQuota -= quota;
                return true;
            }
            return false;
        }
    }

    private Map<String, Funnel> funnels = new HashMap<>();

    public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
        String key = String.format("%s:%s", userId, actionKey);
        Funnel funnel = funnels.get(key);
        if (funnel == null) {
            funnel = new Funnel(capacity, leakingRate);
            funnels.put(key, funnel);
        }
        return funnel.watering(1); // 需要1个quota
    }
}

Funnel 对象的 make_space 方法是漏斗算法的核心,其在每次灌水前都会被调用以触发漏水,给漏斗腾出空间来。能腾出多少空间取决于过去了多久以及流水的速率。Funnel 对象占据的空间大小不再和行为的频率成正比,它的空间占用是一个常量。

问题来了,分布式的漏斗算法该如何实现?能不能使用 Redis 的基础数据结构来搞定?我们观察 Funnel 对象的几个字段,我们发现可以将 Funnel 对象的内容按字段存储到一个 hash 结构中,灌水的时候将 hash 结构的字段取出来进行逻辑运算后,再将新值回填到 hash 结构中就完成了一次行为频度的检测。但是有个问题,我们无法保证整个过程的原子性。从 hash 结构中取值,然后在内存里运算,再回填到 hash 结构,这三个过程无法原子化,意味着需要进行适当的加锁控制。而一旦加锁,就意味着会有加锁失败,加锁失败就需要选择重试或者放弃。如果重试的话,就会导致性能下降。如果放弃的话,就会影响用户体验。同时,代码的复杂度也跟着升高很多。这真是个艰难的选择。

cl.throttle laoqian:reply 15 30 60
1) (integer) 0 # 0 表示允许,1表示拒绝
2) (integer) 15 # 漏斗容量capacity
3) (integer) 14 # 漏斗剩余空间left_quota
4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒)
5) (integer) 2 # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)

在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle 指令考虑的非常周到,连重试时间都帮你算好了,直接取返回结果数组的第四个值进行 sleep 即可,如果不想阻塞线程,也可以异步定时任务来重试。

package com.zhuangxiaoyan.springbootredis.cell;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.util.Arrays;
import java.util.List;

/**
 * @Classname RedisCell
 * @Description TODO
 * @Date 2022/4/13 22:37
 * @Created by xjl
 */
public class RedisCell {

    /**
     * redis 客户端
     */
    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 请求是否被允许
     * @param key       限流的key
     * @param maxBurst  最大请求
     * @param tokens    令牌数量   每过seconds时间就往桶里放入tokens的令牌数量
     * @param seconds   时间      每过seconds时间就往桶里放入tokens的令牌数量
     * @param apply     本次获取几个令牌
     * @return
     */
    public boolean isActionAllowed(String key, int maxBurst, int tokens, int seconds, int apply){

        DefaultRedisScript<List> script = new DefaultRedisScript<>("return redis.call('cl.throttle',KEYS[1],ARGV[1],ARGV[2],ARGV[3],ARGV[4])", List.class);

        List<Long> rst = redisTemplate.execute(script, Arrays.asList(key), maxBurst, tokens, seconds, apply);
        //响应结果
        /**
         * 1) (integer) 0       # 当前请求是否被允许,0表示允许,1表示不允许
         * 2) (integer) 11      # 令牌桶的最大容量,令牌桶中令牌数的最大值
         * 3) (integer) 10      # 令牌桶中当前的令牌数
         * 4) (integer) -1      # 如果被拒绝,需要多长时间后在重试,如果当前被允许则为-1
         * 5) (integer) 12      # 多长时间后令牌桶中的令牌会满
         */
        return rst.get(0) == 0;
    }
}

三、 分布式限流解决方案

在高并发系统中常见的限流方式有很多种,但是总的来说限流的分类如下所示:

  1. 合法性验证限流:比如验证码、IP 黑名单等,这些手段可以有效的防止恶意攻击和爬虫采集;
  2. 容器限流:比如 Tomcat、Nginx 等限流手段,其中 Tomcat 可以设置最大线程数(maxThreads),当并发超过最大线程数会排队等待执行;而 Nginx 提供了两种限流手段:一是控制速率,二是控制并发连接数;
  3. 服务端限流:比如我们在服务器端通过限流算法实现限流,此项也是我们本文介绍的重点。

3.1 合法性验证限流

合法性验证限流为最常规的业务代码,就是普通的验证码和 IP 黑名单系统。可以利用的redis中布隆过滤器来实现的IP黑白名单。

Redis——Redis扩展应用限流解决方案原理_ci_06

3.2 容器限流设计

3.2.1 Tomcat 限流设置

<Connector port="8080" protocol="HTTP/1.1"  connectionTimeout="20000"    maxThreads="150"  redirectPort="8443" />

其中 maxThreads 就是 Tomcat 的最大线程数,当请求的并发大于此值(maxThreads)时,请求就会排队执行,这样就完成了限流的目的。 maxThreads 的值可以适当的调大一些,此值默认为 150(Tomcat 版本 8.5.42),但这个值也不是越大越好,要看具体的硬件配置,需要注意的是每开启一个线程需要耗用 1MB 的 JVM 内存空间用于作为线程栈之用,并且线程越多 GC 的负担也越重。需要注意一下,操作系统对于进程中的线程数有一定的限制,Windows 每个进程中的线程数不允许超过 2000,Linux 每个进程中的线程数不允许超过 1000。

3.2.2 Nginx 限流

Nginx 提供了两种限流手段:一是控制速率,二是控制并发连接数。

我们需要使用 limit_req_zone 用来限制单位时间内的请求数,即速率限制。

limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s;
server { 
    location / { 
        limit_req zone=mylimit;
    }
}

以上配置表示,限制每个 IP 访问的速度为 2r/s,因为 Nginx 的限流统计是基于毫秒的,我们设置的速度是 2r/s,转换一下就是 500ms 内单个 IP 只允许通过 1 个请求,从 501ms 开始才允许通过第 2 个请求。
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s;
server { 
    location / { 
        limit_req zone=mylimit burst=4;
    }
}

burst=4 表示每个 IP 最多允许4个突发请求,

 利用 limit_conn_zonelimit_conn 两个指令即可控制并发数

limit_conn_zone $binary_remote_addr zone=perip:10m;
limit_conn_zone $server_name zone=perserver:10m;
server {
    ...
    limit_conn perip 10;
    limit_conn perserver 100;
}


其中 limit_conn perip 10 表示限制单个 IP 同时最多能持有 10 个连接;limit_conn perserver 100 表示 server 同时能处理并发连接的总数为 100 个。

3.3 服务端限流

3.3.1 连接池、线程限流方案

可以使用池化技术来限制总资源数:连接池、线程池。比如分配给每个应用的数据库连接是 100,那么本应用最多可以使用 100 个资源,超出了可以 等待 或者 抛异常。

//获取当前机器的核数
public static final int cpuNum = Runtime.getRuntime().availableProcessors();
@Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(cpuNum);//核心线程大小
        taskExecutor.setMaxPoolSize(cpuNum * 2);//最大线程大小
        taskExecutor.setQueueCapacity(500);//队列最大容量
        //当提交的任务个数大于QueueCapacity,就需要设置该参数,但spring提供的都不太满足业务场景,可以自定义一个,也可以注意不要超过QueueCapacity即可
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        taskExecutor.setThreadNamePrefix("BCarLogo-Thread-");
        taskExecutor.initialize();
        return taskExecutor;
    }

corePoolSize:核心线程数;maximunPoolSize:最大线程数

每当有新的任务到线程池时,

  • 第一步:先判断线程池中当前线程数量是否达到了corePoolSize,若未达到,则新建线程运行此任务,且任务结束后将该线程保留在线程池中,不做销毁处理,若当前线程数量已达到corePoolSize,则进入下一步;
  • 第二步:判断工作队列(workQueue)是否已满,未满则将新的任务提交到工作队列中,满了则进入下一步;
  • 第三步:判断线程池中的线程数量是否达到了maxumunPoolSize,如果未达到,则新建一个工作线程来执行这个任务,如果达到了则使用饱和策略来处理这个任务。注意: 在线程池中的线程数量超过corePoolSize时,每当有线程的空闲时间超过了keepAliveTime,这个线程就会被终止。直到线程池中线程的数量不大于corePoolSize为止。

当工作队列满且线程个数达到maximunPoolSize后所采取的策略

  • AbortPolicy:默认策略;新任务提交时直接抛出未检查的异常RejectedExecutionException,该异常可由调用者捕获。
  • CallerRunsPolicy:既不抛弃任务也不抛出异常,使用调用者所在线程运行新的任务。
  • DiscardPolicy:丢弃新的任务,且不抛出异常。
  • DiscardOldestPolicy:调用poll方法丢弃工作队列队头的任务,然后尝试提交新任务
  • 自定义策略:根据用户需要定制。

3.3.2 限流某个接口的总并发/请求数

使用 Java 中的 AtomicLong,示意代码:

try{
    if(atomic.incrementAndGet() > 限流数) {
        //拒绝请求
    } else {
        //处理请求
    }
} finally {
    atomic.decrementAndGet();
}
LoadingCache counter = CacheBuilder.newBuilder()
    .expireAfterWrite(2, TimeUnit.SECONDS)
    .build(newCacheLoader() {
        @Override
        public AtomicLong load(Long seconds) throws Exception {
            return newAtomicLong(0);
        }
    });

longlimit =1000;
while(true) {
    // 得到当前秒
    long currentSeconds = System.currentTimeMillis() /1000;
    if(counter.get(currentSeconds).incrementAndGet() > limit) {
        System.out.println("限流了: " + currentSeconds);
        continue;
    }
    // 业务处理
}

3.3.3 利用的MQ来实现限流

Redis——Redis扩展应用限流解决方案原理_限流_07

 

博文参考

限流的一些解决方案_夜风_BLOG的博客

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: redis