如何通过限流算法防止系统过载

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

限流算法顾名思义就是指对流量进行控制的算法因此也常被称为流控算法。

我们在日常生活中就有很多限流的例子比如地铁站在早高峰的时候会利用围栏让乘客们有序排队限制队伍行进的速度避免大家一拥而上再比如在疫情期间很多景点会按时段限制售卖的门票数量避免同一时间在景区的游客太多等等。

对于 Server 服务而言单位时间内能承载的请求也是存在容量上限的我们也需要通过一些策略控制请求数量多少实现对流量的控制虽然限流为了保证一部分的请求流量可以得到正常的响应一定会导致部分请求响应速度下降或者直接被拒绝但是相比于全部的请求都不能得到响应系统直接崩溃的情况限流还是要好得多。

本篇内容主要介绍业务中的限流场景、限流算法、限流值的确认、不仅仅限流


文章目录


一、业务中的限流场景

1、限流算法介绍

限流算法顾名思义就是指对流量进行控制的算法因此也常被称为流控算法。

我们在日常生活中就有很多限流的例子比如地铁站在早高峰的时候会利用围栏让乘客们有序排队限制队伍行进的速度避免大家一拥而上再比如在疫情期间很多景点会按时段限制售卖的门票数量避免同一时间在景区的游客太多等等。

对于 Server 服务而言单位时间内能承载的请求也是存在容量上限的我们也需要通过一些策略控制请求数量多少实现对流量的控制虽然限流为了保证一部分的请求流量可以得到正常的响应一定会导致部分请求响应速度下降或者直接被拒绝但是相比于全部的请求都不能得到响应系统直接崩溃的情况限流还是要好得多。

限流与熔断经常被人弄混博主认为它们最大的区别在于限流主要在 Server 实现而熔断主要在 Client 实现当然了一个服务既可以充当 Server 也可以充当 Client这也是让限流与熔断同时存在一个服务中这两个概念才容易被混淆。

业务中的典型的限流场景主要分为三种

  • 突发流量
  • 恶意流量
  • 业务本身需要

2、突发流量

突发流量是我们需要限流的主要场景之一。当我们后端服务处理能力有限面对业务流量突然激增即突发流量时很容易出现服务器被打垮的情况。

如我们常见的双十一京东 618 这些整点秒杀的业务12306 这些都会出现某段时间面临着大量的流量流入的情况。

在这些情况下除了提供更好的弹性伸缩的能力以及在已经能预测的前提下提前准备更多的资源我们还能做的一件事就是利用限流来保护服务即使拒绝了一部分请求至少也让剩下的请求可以正常被响应。

3、恶意流量

除了突发流量限流有的时候也是出于安全性的考虑。网络世界有其凶险的地方所有暴露出去的API都有可能面对非正常业务的请求。

比如我们常见的各种各样的网络爬虫或者恶意的流量攻击网站等等都会产生大量的恶意流量。面对我们服务对外暴露接口的大规模疯狂调用很有可能也会可能导致服务崩溃在很多时候也会导致我们需要的计算成本飙升比如云计算的场景下。

4、业务本身需要

还有一种业务本身需要的场景这种场景也十分常见比如云服务平台根据不同套餐等级需要对不同服务流量限制时也是需要采取限流算法的。


二、限流算法

终于到了正题我们这里将介绍 4 种限流算法分别是 固定窗口计数器、滑动窗口计数器、Leaky Bucket 漏桶、Token Bucket令牌桶

1、固定窗口计数器

规定我们单位时间处理的请求数量。比如我们规定我们的一个接口一分钟只能访问10次的话。使用固定窗口计数器算法的话可以这样实现给定一个变量counter来记录处理的请求数量当1分钟之内处理一个请求之后counter+11分钟之内的如果counter=100的话后续的请求就会被全部拒绝。等到 1分钟结束后将counter回归成0重新开始计数ps只要过了一个周期就讲counter回归成0。

image-20230117115505429

这种限流算法无法保证限流速率因而无法保证突然激增的流量。比如我们限制一个接口一分钟只能访问10次的话前半分钟一个请求没有接收后半分钟接收了10个请求。

# 具体实现

package com.lizhengi.limiter;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 固定窗口计数器
 * @date 2023-01-17 2:20 下午
 **/
public class FixedWindowCounterLimiter {

    /**
     * 限流阈值
     */
    private final int limit;

    /**
     * 计数器
     */
    private final AtomicInteger count;

    /**
     * 固定窗口计数器
     *
     * @param windowSize 时间窗口大小, Unit: s
     * @param limit      限流阈值
     */
    public FixedWindowCounterLimiter(int windowSize, int limit) {
        this.limit = limit;
        count = new AtomicInteger(0);

        // 通过线程池启动一个线程, 定时清除计数器值
        ExecutorService threadPool = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        threadPool.execute(() -> {
            // noinspection InfiniteLoopStatement
            while (true) {
                try {
                    Thread.sleep(windowSize * 1000L);
                } catch (InterruptedException e) {
                    System.out.println("Happen Exception: " + e.getMessage());
                }
                count.set(0);
            }
        });

    }

    public boolean tryAcquire() {
        int num = count.incrementAndGet();
        // 以达到当前窗口的请求阈值
        return num <= limit;
    }
}

# 测试代码

package com.lizhengi.limiter;

import org.junit.Test;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 固定窗口计数器测试
 * @date 2023-01-17 2:22 下午
 **/
public class FixedWindowCounterLimiterTest {

    @Test
    public void test() throws InterruptedException {
        // 请求总数 通过数 被限流数
        int allNum, passNum = 0, blockNum = 0;
        // 限流配置 : 2s 内只允许通过 5个 !
        FixedWindowCounterLimiter rateLimiter = new FixedWindowCounterLimiter(2, 5);

        // 限流测试 1 - 请求总数 设置 3 次
        allNum = 3;
        //模拟连续请求
        for (int i = 0; i < allNum; i++) {
            if (rateLimiter.tryAcquire()) {
                passNum++;
            } else {
                blockNum++;
            }
        }
        System.out.println("请求总数: " + allNum + ", 通过数: " + passNum + ", 被限流数: " + blockNum);

        // 延时以准备下一次测试
        Thread.sleep(5000);
        // 限流测试 2 - 请求总数 设置 14 次
        allNum = 14;
        passNum = blockNum = 0;
        //模拟连续请求
        for (int i = 0; i < allNum; i++) {
            if (rateLimiter.tryAcquire()) {
                passNum++;
            } else {
                blockNum++;
            }
        }
        System.out.println("请求总数: " + allNum + ", 通过数: " + passNum + ", 被限流数: " + blockNum);
    }
}

# 测试结果

请求总数: 3, 通过数: 3, 被限流数: 0
请求总数: 14, 通过数: 5, 被限流数: 9

2、滑动窗口计数器

算的上是固定窗口计数器算法的升级版。滑动窗口计数器算法相比于固定窗口计数器算法的优化在于它把时间以一定比例分片。例如我们的借口限流每分钟处理60个请求我们可以把 1 分钟分为60个窗口。每隔1秒移动一次每个窗口一秒只能处理 不大于 60(请求数)/60窗口数 的请求 如果当前窗口的请求计数总和超过了限制的数量的话就不再处理其他请求。

image-20230117115613214

很显然当滑动窗口的格子划分的越多滑动窗口的滚动就越平滑限流的统计就会越精确。

# 具体实现

package com.lizhengi.limiter;

import java.util.Arrays;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 滑动时间窗口计数器
 * @date 2023-01-17 3:02 下午
 **/
public class SlidingWindowCounterLimiter {

    /**
     * 用于统计的子窗口数量默认为10
     */
    private final int slotNum;

    /**
     * 子窗口的时间长度, Unit: ms
     */
    private final int slotTime;

    /**
     * 限流阈值
     */
    private final int limit;

    /**
     * 存放子窗口统计结果的数组
     * note: counters[0]记为数组左边, counters[size-1]记为数组右边
     */
    private final int[] counters;

    private long lastTime;

    public SlidingWindowCounterLimiter(int windowSize, int limit) {
        this(windowSize, limit, 10);
    }

    public SlidingWindowCounterLimiter(int windowSize, int limit, int slotNum) {
        this.limit = limit;
        this.slotNum = slotNum;

        this.counters = new int[slotNum];
        // 计算子窗口的时间长度: 时间窗口 / 子窗口数量
        this.slotTime = windowSize * 1000 / slotNum;
        this.lastTime = System.currentTimeMillis();
    }

    public synchronized boolean tryAcquire() {
        long currentTime = System.currentTimeMillis();
        // 计算滑动数, 子窗口统计时所对应的时间范围为左闭右开区间, 即[a,b)
        int slideNum = (int) Math.floor((currentTime - lastTime) * 1.0 / slotTime);
        // 滑动窗口
        slideWindow(slideNum);
        // 统计滑动后的数组之和
        int sum = Arrays.stream(counters).sum();

        // 以达到当前时间窗口的请求阈值, 故被限流直接返回false
        if (sum > limit) {
            return false;
        } else {    // 未达到限流, 故返回true
            counters[slotNum - 1]++;
            return true;
        }
    }

    /**
     * 将数组元素全部向左移动num个位置
     *
     * @param num 移动位置数目
     */
    private void slideWindow(int num) {
        if (num == 0) {
            return;
        }

        // 数组中所有元素都会被移出, 故直接全部清零
        if (num >= slotNum) {
            Arrays.fill(counters, 0);
        } else {
            // 对于a[0]~a[num-1]而言, 向左移动num个位置后, 则直接被移出了
            // 故从a[num]开始移动即可
            for (int index = num; index < slotNum; index++) {
                // 计算a[index]元素向左移动num个位置后的新位置索引
                int newIndex = index - num;
                counters[newIndex] = counters[index];
                counters[index] = 0;
            }
        }
        // 更新时间
        lastTime = lastTime + (long) num * slotTime;
    }

}

# 测试代码

package com.lizhengi.limiter;

import org.junit.Test;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 滑动时间窗口计数器测试
 * @date 2023-01-17 3:04 下午
 **/
public class SlidingWindowCounterLimiterTest {

    @Test
    public void test() throws InterruptedException {
        // 请求总数 通过数 被限流数
        int allNum, passNum = 0, blockNum = 0;
        // 限流配置 : 2s 内只允许通过 5个 !
        SlidingWindowCounterLimiter rateLimiter = new SlidingWindowCounterLimiter(2, 5);

        // 限流测试 1 - 请求总数 设置 3 次
        allNum = 3;
        //模拟连续请求
        for (int i = 0; i < allNum; i++) {
            if (rateLimiter.tryAcquire()) {
                passNum++;
            } else {
                blockNum++;
            }
        }
        System.out.println("请求总数: " + allNum + ", 通过数: " + passNum + ", 被限流数: " + blockNum);

        // 延时以准备下一次测试
        Thread.sleep(5000);
        // 限流测试 2 - 请求总数 设置 14 次
        allNum = 14;
        passNum = blockNum = 0;
        //模拟连续请求
        for (int i = 0; i < allNum; i++) {
            if (rateLimiter.tryAcquire()) {
                passNum++;
            } else {
                blockNum++;
            }
        }
        System.out.println("请求总数: " + allNum + ", 通过数: " + passNum + ", 被限流数: " + blockNum);
    }
}

# 测试结果

请求总数: 3, 通过数: 3, 被限流数: 0
请求总数: 14, 通过数: 6, 被限流数: 8

3、Leaky Bucket 漏桶 - As a Meter Version

我们可以把发请求的动作比作成注水到桶中我们处理请求的过程可以比喻为漏桶漏水。我们往桶中以任意速率流入水以一定速率流出水。当水超过桶流量则丢弃因为桶容量是不变的保证了整体的速率。如果想要实现这个算法的话也很简单准备一个队列用来保存请求然后我们定期从队列中拿请求来执行就好了。

image-20230117115939370

# 具体实现

package com.lizhengi.limiter;

/**
 * @author liziheng
 * @version 1.0.0
 * @description
 * @date 2023-01-17 3:33 下午
 **/
public class LeakyBucketLimiter1 {

    /**
     * 桶容量, Unit: 个
     */
    private final long capacity;

    /**
     * 出水速率, Unit: 个/秒
     */
    private final long rate;

    /**
     * 桶的当前水量
     */
    private long water;

    /**
     * 上次时间
     */
    private long lastTime;

    public LeakyBucketLimiter1(long capacity, long rate) {
        this.capacity = capacity;
        this.rate = rate;
        this.water = 0;
        this.lastTime = System.currentTimeMillis();
    }

    public synchronized boolean tryAcquire() {
        // 获取当前时间
        long currentTime = System.currentTimeMillis();
        // 计算流出的水量: (当前时间-上次时间) * 出水速率
        long outWater = (currentTime - lastTime) / 1000 * rate;
        // 计算水量: 桶的当前水量 - 流出的水量
        water = Math.max(0, water - outWater);
        // 更新时间
        lastTime = currentTime;

        // 当前水量 小于 桶容量, 则请求放行, 返回true
        if (water < capacity) {
            water++;
            return true;
        } else {
            // 当前水量 不小于 桶容量, 则进行限流, 返回false
            return false;
        }
    }

}

# 测试代码

package com.lizhengi.limiter;

import org.junit.Test;

/**
 * @author liziheng
 * @version 1.0.0
 * @description
 * @date 2023-01-17 3:34 下午
 **/
public class LeakyBucketLimiter1Test {

    @Test
    public void test() throws InterruptedException {
        // 请求总数 通过数 被限流数
        int allNum, passNum = 0, blockNum = 0;
        // 漏桶配置, 桶容量:5个, 出水率: 1个/秒
        LeakyBucketLimiter1 rateLimiter = new LeakyBucketLimiter1(5, 1);

        // 限流测试 1 - 请求总数 设置 3 次
        allNum = 3;
        //模拟连续请求
        for (int i = 0; i < allNum; i++) {
            if (rateLimiter.tryAcquire()) {
                passNum++;
            } else {
                blockNum++;
            }
        }
        System.out.println("请求总数: " + allNum + ", 通过数: " + passNum + ", 被限流数: " + blockNum);

        // 延时以准备下一次测试
        Thread.sleep(5000);
        // 限流测试 2 - 请求总数 设置 14 次
        allNum = 14;
        passNum = blockNum = 0;
        //模拟连续请求
        for (int i = 0; i < allNum; i++) {
            if (rateLimiter.tryAcquire()) {
                passNum++;
            } else {
                blockNum++;
            }
        }
        System.out.println("请求总数: " + allNum + ", 通过数: " + passNum + ", 被限流数: " + blockNum);
    }
}

# 测试结果

请求总数: 3, 通过数: 3, 被限流数: 0
请求总数: 14, 通过数: 5, 被限流数: 9

4、Leaky Bucket 漏桶 - As a Queue Version

在 As a Meter Version 版本的漏桶中当桶中水未满请求即会直接被放行。而在漏桶的另外一个版本 As a Queue Version 中如果桶中水未满则该请求将会被暂时存储在桶中。然后以漏桶固定的出水速率对桶中存储的请求依次放行。对比两个版本的漏桶算法不难看出As a Meter Version 版本的漏桶算法可以应对、处理突发流量只要桶中尚有足够空余即可立即放行请求而对于 As a Queue Version 版本的漏桶其只会以固定速率放行请求无法充分利用后续系统的处理能力。

# 具体实现

package com.lizhengi.limiter;

import lombok.AllArgsConstructor;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.*;

/**
 * @author liziheng
 * @version 1.0.0
 * @description
 * @date 2023-01-17 3:38 下午
 **/
public class LeakyBucketLimiter2 {

    /**
     * 阻塞队列, 用于存储用户请求
     */
    private final ArrayBlockingQueue<UserRequest> queue;

    /**
     * @param capacity 桶容量, Unit: 个
     * @param rate     出水速率, Unit: 个/秒
     */
    public LeakyBucketLimiter2(int capacity, long rate) {
        // 根据桶容量构建有界队列
        queue = new ArrayBlockingQueue<>(capacity);

        // 定时任务线程池, 用于以指定速率rate从阻塞队列中获取用户请求进行放行、处理
        ScheduledExecutorService threadPool = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.
                Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
        // 根据出水速率rate计算从阻塞队列获取用户请求的周期 Unit: ms
        long period = 1000 / rate;
        threadPool.scheduleAtFixedRate(getTask(), 0, period, TimeUnit.MILLISECONDS);
    }

    public boolean tryAcquire(UserRequest userRequest) {
        // 添加失败表示用户请求被限流, 则返回false
        return queue.offer(userRequest);
    }

    private Runnable getTask() {
        return () -> {
            // 从阻塞队列获取用户请求
            UserRequest userRequest = queue.poll();
            if (userRequest != null) {
                userRequest.handle();
            }
        };
    }

    /**
     * 用户请求
     */
    @AllArgsConstructor
    public static class UserRequest {

        private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss.SSS");

        private String name;

        public void handle() {
            String timeStr = FORMATTER.format(LocalTime.now());
            String msg = "<" + timeStr + "> " + name + " 开始处理";
            System.out.println(msg);
        }
    }

}

# 代码测试

package com.lizhengi.limiter;

import org.junit.Test;

/**
 * @author liziheng
 * @version 1.0.0
 * @description
 * @date 2023-01-17 3:46 下午
 **/
public class LeakyBucketLimiter2Test {

    @Test
    public void test() throws InterruptedException {
        // 请求总数 通过数 被限流数
        int allNum, passNum = 0, blockNum = 0;
        // 漏桶配置, 桶容量:5个, 出水率: 2个/秒
        LeakyBucketLimiter2 rateLimiter = new LeakyBucketLimiter2(5, 2);

        // 限流测试 1 - 请求总数 设置 7 次
        allNum = 7;
        // 模拟连续请求
        for(int i=1; i<=allNum; i++) {
            // 构建用户请求
            String name = "用户请求:" + i;
            LeakyBucketLimiter2.UserRequest userRequest = new LeakyBucketLimiter2.UserRequest(name);

            if( rateLimiter.tryAcquire( userRequest ) ) {
                passNum++;
            }else{
                blockNum++;
            }
        }
        System.out.println("请求总数: "+allNum+", 通过数: "+passNum+", 被限流数: "+blockNum);

        // 延时等待
        Thread.sleep(120*1000);
    }
}

# 测试结果

请求总数: 7, 通过数: 5, 被限流数: 2
<15:48:21.542> 用户请求:1 开始处理
<15:48:22.032> 用户请求:2 开始处理
<15:48:22.532> 用户请求:3 开始处理
<15:48:23.032> 用户请求:4 开始处理
<15:48:23.533> 用户请求:5 开始处理

5、Token Bucket 令牌桶

令牌桶算法也比较简单。和漏桶算法算法一样我们的主角还是桶这限流算法和桶过不去啊。不过现在桶里装的是令牌了请求在被处理之前需要拿到一个令牌请求处理完毕之后将这个令牌丢弃删除。我们根据限流大小按照一定的速率往桶里添加令牌。

image-20230117120058362# 具体实现

package com.lizhengi.limiter;

/**
 * @author liziheng
 * @version 1.0.0
 * @description
 * @date 2023-01-17 3:52 下午
 **/
public class TokenBucketLimiter {

    /**
     * 桶容量, Unit: 个
     */
    private final long capacity;

    /**
     * 令牌生成速率, Unit: 个/秒
     */
    private final long rate;

    /**
     * 桶当前的令牌数量
     */
    private long tokens;

    /**
     * 上次时间
     */
    private long lastTime;

    public TokenBucketLimiter(long capacity, long rate) {
        this.capacity = capacity;
        this.rate = rate;
        this.tokens = capacity;
        this.lastTime = System.currentTimeMillis();
    }

    public synchronized boolean tryAcquire() {
        // 获取当前时间
        long currentTime = System.currentTimeMillis();
        // 计算生成的令牌数量: (当前时间-上次时间) * 令牌生成速率
        long newTokenNum = (currentTime - lastTime) / 1000 * rate;
        // 计算令牌数量: 桶当前的令牌数量 + 生成的令牌数量
        tokens = Math.min(capacity, tokens + newTokenNum);
        // 更新时间
        lastTime = currentTime;

        // 桶中仍有令牌, 则请求放行, 返回true
        if (tokens > 0) {
            tokens--;
            return true;
        } else {
            // 桶中没有令牌, 则进行限流, 返回false
            return false;
        }
    }
}

# 测试代码

package com.lizhengi.limiter;

import org.junit.Test;

/**
 * @author liziheng
 * @version 1.0.0
 * @description
 * @date 2023-01-17 4:01 下午
 **/
public class TokenBucketLimiterTest {
    @Test
    public void test() throws InterruptedException {
        // 请求总数 通过数 被限流数
        int allNum, passNum = 0, blockNum = 0;
        // 令牌桶配置, 桶容量:5个, 令牌生成速率: 1个/秒
        TokenBucketLimiter rateLimiter = new TokenBucketLimiter(5, 1);

        // 限流测试 1 - 请求总数 设置 3 次
        allNum = 3;
        //模拟连续请求
        for (int i = 0; i < allNum; i++) {
            if (rateLimiter.tryAcquire()) {
                passNum++;
            } else {
                blockNum++;
            }
        }
        System.out.println("请求总数: " + allNum + ", 通过数: " + passNum + ", 被限流数: " + blockNum);

        // 延时以准备下一次测试
        Thread.sleep(5000);
        // 限流测试 2 - 请求总数 设置 14 次
        allNum = 14;
        passNum = blockNum = 0;
        //模拟连续请求
        for (int i = 0; i < allNum; i++) {
            if (rateLimiter.tryAcquire()) {
                passNum++;
            } else {
                blockNum++;
            }
        }
        System.out.println("请求总数: " + allNum + ", 通过数: " + passNum + ", 被限流数: " + blockNum);
    }
}

# 测试结果

请求总数: 3, 通过数: 3, 被限流数: 0
请求总数: 14, 通过数: 5, 被限流数: 9


三、限流相关问题

1、限流值的确认

正确的值才能起到效果限流多了等于没限少了则会影响服务利用效率

对于核心服务限流的值可以通过以下方法来设置合理的值

  • 观察评估通过CAT大盘可以观察到服务的平时调用量QPS和各个调用方。
  • 压测摸底通过quake平台可以压测核心服务的支持的最大QPS。
  • 场景分析通过分析各业务调用场景评估一个合理的值。

2、不仅仅限流

限流作为系统稳定性保障的有效措施之一常常与重试、降级、熔断等作为组合方法一起使用。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6