1.1.1. 07.限流


在高并发场景下有三把利器保护系统:缓存、降级、和限流。

  • 缓存的目的是提升系统的访问你速度和增大系统能处理的容量。
  • 降级是当服务出问题或影响到核心流程的性能则需要暂时屏蔽掉。
  • 限流一般作用于高并发下,除了控制流量外,还有一个应用目的是控制用户行为,避免垃圾请求。如论坛里,控制用户的发帖,回复和点赞等功能。还有商城里如秒杀、抢购、评论、恶意爬虫等。

使用 reids 来实现简单的限流策略

限流算法 常见的限流算法有:计数器,漏桶、令牌桶。

计数器

计数器:记录一段时间窗口内,用户行为总数,判断是否超过限制。 下面是 PHP 的实现方式

<?php

namespace App\Http\Controllers;

class PagesController extends Controller
{
    public function demo()
    {
        $redis = new \Predis\Client();
        for ($i = 0; $i < 20; $i++) {
            dump($this->isActionAllowed($redis, "110", "reply", 60 * 1000, 5)); //执行可以发现只有前5次是通过的
        }
    }

    /**
     * @param \Predis\Client $redis
     * @param String $userId (用户 id)
     * @param String $actionName (操作名)
     * @param Int $period 时间窗口(毫秒)
     * @param Int $maxCount (最大限制个数)
     * @return bool
     * @throws \Exception
     */
    public function isActionAllowed(\Predis\Client $redis, String $userId, String $actionName, Int $period, Int $maxCount)
    {
//        设置键名
        $actionKey = sprintf('current-limiting.%s.%s', $actionName, $userId);
        list($msec, $sec) = explode(' ', microtime());
//        毫秒时间戳
        $now = intval(($sec + $msec) * 1000);

//        管道
        $replies = $redis->pipeline()
//        value 和 score 都用毫秒时间戳
            ->zadd($actionKey, $now, $now)
//        移除时间窗口之前的行为记录,剩下的都是时间窗口内的
            ->zremrangebyscore($actionKey, 0, $now - $period)
//        统计现在个数
            ->zcard($actionKey)
//        多加一秒过期时间
            ->expire($actionKey, $period + 1)
//        执行
            ->execute();
        return $replies[2] <= $maxCount;
    }

}

可以发现,这几个 redis 操作都是针对同一个 key,所以使用管道 pipeline 会显著提升效率。但是这个方案也有一个缺陷,就是当量很大时,会占用很大空间,就不适合做限流了(如限定 60 秒,操作不超过 100 万次)。

漏斗限流

顾名思义,算法灵感来自于漏斗。 漏斗容量有限,上面有水龙头灌水,漏斗下面漏水。当灌水速率大于漏水,则漏斗会满,无法再如新水。反之则漏斗永远装不满水。

漏斗限流

PHP 实现漏斗法

<?php

namespace App\Services;

class FunnelService
{

    private $capacity; // 容量
    private $leakingRate; // 泄露速率
    private $leftCapacity; // 剩余容量
    private $lastLeakedTime; // 上一次漏水时间

    public function __construct(float $capacity, float $leakingRate)
    {
        $this->capacity = $capacity;
        $this->leakingRate = $leakingRate;
        $this->leftCapacity = $capacity;
        $this->lastLeakedTime = time();
    }

    public function makeSpace()
    {
        $now = time();
//        距离上一次漏水过去了多久
        $deltaTime = $now - $this->lastLeakedTime;
//        计算已经腾出了多少空间
        $deltaSpace = $deltaTime * $this->leakingRate;

//        腾出空间最小单位是 1,太小就忽略
        if ($deltaSpace < 1) {
            return;
        }
//        增加剩余空间
        $this->leftCapacity += $deltaSpace;
//        记录漏水时间
        $this->lastLeakedTime = time();

//        如果剩余容量大于了容器容量,则剩余容量为容器容量
        $this->leftCapacity = ($this->leftCapacity > $this->capacity) ? $this->capacity : $this->leftCapacity;
    }

    public function watering(float $quota)
    {
        //漏水操作
        $this->makeSpace();
//        当还有空间时,则减少容器剩余空间
        if ($this->leftCapacity >= $quota) {
            $this->leftCapacity -= $quota;
            return true;
        }
        return false;
    }
}

调用过程

<?php
public function isActionAllowed($userId, $action, $capacity, $leakingRate)
    {
        $key = sprintf("Funnel-%s:%s", $userId, $action);
        $funnel = $GLOBALS['funnel'][$key] ?? '';
        if (!$funnel) {
            $funnel = new FunnelService($capacity, $leakingRate);
            $GLOBALS['funnel'][$key] = $funnel;
        }
        return $funnel->watering(1);
    }


    public function demo()
    {
        for ($i=0; $i<20; $i++){
            dump($this->isActionAllowed("110", "reply", 15, 0.5)); //执行可以发现只有前15次是通过的
        }
    }

解析代码的关键是 makeSpace() 方法,每次调用前都会触发漏水,腾出多少空间。能腾出多少空间取决于 漏水时间*漏水速率。

分布式的漏斗算法

我们可以把 Funnel 对象存放到 Redis 的 hash 数据结构中。灌水的时候取出进行逻辑计算,再将新值存回到 hash 中。

但是这里有一个问题,无法保证整个过程的原子性。从 hash 中取出,逻辑计算后再存回,这几个过程都无法原子化。意味着需要进行适当的加锁,而一旦加锁,就有失败的可能,加锁失败就需要选择重试或者放弃。重试导致性能下降,放弃就降低用户体验。

于是,Redis-Cell 来了。

Redis 在 4.0 之后提供一个 Redis-Cell 模块,该模块基于漏斗算法,并提供了原子的限流指令。

安装 Redis-Cell

官网:GitHub - brandur/redis-cell: A Redis module that provides rate limiting in Redis as a single command.

在这里下载对应的版本。Releases · brandur/redis-cell · GitHub

我电脑是 mac,下载好后,解压得到 .dylib 文件,如果是 linux 会得到 .so 文件。然后,找到 redis 的配置文件 /usr/local/etc/redis.conf,添加下面一段代码。

 loadmodule /usr/local/etc/libredis_cell.dylib

重启 redis,安装就完成了

    brew services restart redis
Redis-Cell 指令

该模块只有一个指令 cl.throttle。其意思是允许"某个行为"的频率是每 60 秒最多 30 次(漏水速率)。漏斗的最大容量为 15。

CL.THROTTLE user123 15 30 60 1
               ▲     ▲  ▲  ▲ ▲
               |     |  |  | └───── apply 1 token (default if omitted) 可选参数,默认值为 1
               |     |  └──┴─────── 30 tokens / 60 seconds 速率
               |     └───────────── 15 最大容量
               └─────────────────── key "user123"

其返回值为:

127.0.0.1:6379> CL.THROTTLE user123 15 30 60
1) (integer) 0 # 0 表示允许,1 表示拒绝
2) (integer) 15 # 漏斗容量 capacity
3) (integer) 14 # 当前容量 leftCapacity
4) (integer) -1 # 如果被拒绝了,需要多少时间后再试(漏斗有空间了,单位秒)
5) (integer) 2 # 多长时间后,漏斗完全空出来(leftCapacity=capacity,单位秒)

在执行指令时,如果被拒绝了,则需要丢弃或重试。重试的话,指令已经算好了时间,直接 sleep 就行,不过会阻塞线程,否则异步定时任务来执行。

漏斗算法的弊端

无法应对短时间的突发流量。

令牌桶算法

令牌桶算法算是漏斗算法的改进,漏斗算法能够限制流出速率,而令牌桶算法能 在限制调用平均速率的同时还允许一定程度上的突发调用

令牌桶算法

算法描述:

  • 有一个固定容量的桶,按照固定的速率往里面添加令牌。
  • 如果桶满了,新添加的令牌将丢弃。
  • 当请求来时,必须从桶中拿出一个令牌才能继续处理,否则拒绝请求,或者暂存到某个缓冲区等待先从桶中获取令牌再执行请求。

php 实现令牌桶算法

<?php

namespace App\Services;

use Auth;
use App\Models\CartItem;

class TrafficShaperService
{
    public $_redis;
    public $_key;
    public $_maxCount;

    public function __construct(\Predis\Client $redis, String $key, Int $maxCount)
    {
        $this->_redis = $redis;
        $this->_key = $key;
        $this->_maxCount = $maxCount;
    }

    /**
     * 添加令牌
     * @param Int $num
     * @return Int
     */
    public function add(Int $num = 0)
    {
//        当前剩余令牌数
        $leftTokenCount = $this->_redis->llen($this->_key);

//        计算最大可加入的令牌数量,不能超过最大令牌数
        $num = ($leftTokenCount + $num > $this->_maxCount) ? 0 : $num;

        if ($num > 0) {
            // 填充 token,为了简化步骤,这里的 token 值为 1。
            $token = array_fill(0, $num, 1);
            $this->_redis->lpush($this->_key, $token);
            return $num;
        }
        return 0;
    }

    /**
     * 重置令牌桶
     */
    public function reset()
    {
        $this->_redis->del([$this->_key]);
        $this->add($this->_maxCount);
    }

    /**
     * 获取令牌
     * @return string
     */
    public function getToken()
    {
        return $this->_redis->rpop($this->_key);
    }

}

执行代码

<?php
        $redis = new \Predis\Client();
        $service = new TrafficShaperService($redis, 'TrafficShaper', 5);

        $service->reset();

        for ($i = 0; $i < 8; $i++) {
            dump($service->getToken());
        }

        dump($service->add(3));

        for ($i = 0; $i < 5; $i++) {
            dump($service->getToken());
        }

结果

"1"
"1"
"1"
"1"
"1"
null
null
null
3
"1"
"1"
"1"
null
null
Copyright © Kagami丶 2019 all right reserved,powered by Gitbook该文件修订时间: 2019-12-10 21:46:19

results matching ""

    No results matching ""