1.1.1. 15.Stream

简介

Redis 5.0 开始支持 Stream,它狠狠的借鉴了 Kafka 的设计。

Stream 结构

Stream 的结构如图所示,它是一个消息链表。

  1. 每个消息都有一个消息 ID 与之对应。
  2. 消息持久化,重启后,消息仍在。
  3. 每个 Stream 都有一个名字,即 Redis 的 key。
  4. 每个 Stream 都可以挂多个消费组。
    • 每个消费组上都会有一个 last_delivered_id 来记录消费到哪了。
    • 每个消费组的状态都相互独立,互不影响。同一份 stream 可以被每个消费组都消费到。
  5. 每个消费组(Consumer Group)都可以挂多个消费者(Consumer),这些消费者是竞争关系。任何一个消费者消费 Stream 都会使 last_delivered_id 向前移动。每个消费者在一个组里都有唯一名称。
  6. 消费者(Consumer)内部有个变量 pending_ids ,它记录了当前已经被客户端读取的消息,但是还没有 ack(确认返回)。
    • 只有 ack 后,pending_ids 才会开始减少。
    • 这个 pending_ids 被 Redis 官方成为 PEL(Pending Entries List)。
    • 它是被用来确保客户端至少消费了一次,而不是因为网络问题导致没处理。

消息 ID

消息 ID 的形式是 timestampInMillis-sequence,例如1527846880572-5,表示在 1527846880572 时产生的第 5 条消息。

  1. 消息 id 是系统自动生成。
  2. 也可以手动生成,但形式必须是 「整数-整数」,并且后面加入的消息 id 必须大于前面加入的消息 id。

消息内容

消息内容就是键值对,形如 hash 结构的键值对。

指令-增删改查

  1. 追加消息 - xadd,如果 id 为 * 则代表不指定 id。返回 id,格式:
    xadd [stream_name] [id] [key-value]...
    
127.0.0.1:6383> xadd demo * name jk age 27
"1570511860225-0"
127.0.0.1:6383> xadd demo * name jk4905 age 28
"1570511869366-0"
127.0.0.1:6383> xadd demo * name kagami age 29
"1570511875999-0"
  1. 消息长度 - xlen

    127.0.0.1:6383> xlen demo
    (integer) 3
    
  2. 获取消息列表,自动过滤已删除的消息。

    xrange [key] [start] [end]
    
# - 为最小值,+ 为最大值
127.0.0.1:6383> xrange demo - +
1) 1) "1570511860225-0"
   2) 1) "name"
      2) "jk"
      3) "age"
      4) "27"
2) 1) "1570511869366-0"
   2) 1) "name"
      2) "jk4905"
      3) "age"
      4) "28"
3) 1) "1570511875999-0"
   2) 1) "name"
      2) "kagami"
      3) "age"
      4) "29"

# 指定最小值
127.0.0.1:6383> xrange demo 1570511869366-0 +
1) 1) "1570511869366-0"
   2) 1) "name"
      2) "jk4905"
      3) "age"
      4) "28"
2) 1) "1570511875999-0"
   2) 1) "name"
      2) "kagami"
      3) "age"
      4) "29"

# 指定最大值
127.0.0.1:6383> xrange demo - 1570511869366-0
1) 1) "1570511860225-0"
   2) 1) "name"
      2) "jk"
      3) "age"
      4) "27"
2) 1) "1570511869366-0"
   2) 1) "name"
      2) "jk4905"
      3) "age"
      4) "28"
  1. 删除消息,逻辑删除,设置删除标记位,不影响消息总长度

    127.0.0.1:6383> xdel name 1570511869366-0
    (integer) 1
    # 长度不受影响
    127.0.0.1:6383> xlen demo
    (integer) 3
    # 被删除的消息没了
    127.0.0.1:6383> xrange demo - +
    1) 1) "1570511860225-0"
    2) 1) "name"
       2) "jk"
       3) "age"
       4) "27"
    2) 1) "1570511875999-0"
    2) 1) "name"
       2) "kagami"
       3) "age"
       4) "29"
    
  2. 删除 Stream

    127.0.0.1:6383> del name
    (integer) 1s
    

独立消费

当我们不定义消费组(Consutmer Group)时,进行 Stream 的独立消费。此时把 Stream 当做一个普通列表(list)。当 Stream 没有消息时,甚至可以阻塞。

读取 - xread

xread [COUNT count] [BLOCK milliseconds] STREAMS [key] [id]..
# 从 Stream 头部读取两条消息
127.0.0.1:6383> xread count 2 streams demo 0-0
1) 1) "demo"
   2) 1) 1) "1570512984508-0"
         2) 1) "name"
            2) "jk"
            3) "age"
            4) "27"
      2) 1) "1570512989666-0"
         2) 1) "name"
            2) "jk4905"
            3) "age"
            4) "28"


# 从 Stream 尾部读取一条消息,毫无疑问,这里不会返回任何消息
127.0.0.1:6379> xread count 1 Streams demo $
(nil)

# 从尾部阻塞等待新消息到来,下面的指令会堵住,直到新消息到来
127.0.0.1:6379> xread count 1 block 0 streams demo $

# 从尾部阻塞等待新消息到来,下面的指令会堵住,直到新消息到来
127.0.0.1:6379> xadd demo * name kagam1 age 30
"1570514266119-0"

# 再切换到前面的窗口,我们可以看到阻塞解除了,返回了新的消息内容
# 而且还显示了一个等待时间,这里我们等待了 36s
127.0.0.1:6379> xread count 1 block 0 streams demo $
1) 1) "demo"
   2) 1) 1) "1570514266119-0"
         2) 1) "name"
            2) "kagam1"
            3) "age"
            4) "30"
(36.18s)

注意:如果要使用 xread 消费,一定要记录消费到了哪个位置。

block 0 表示永远阻塞,直到消息到来。如果 block 1000 表示阻塞 1s,也就是说 1s 之内如果没有消息来就返回 nil。

127.0.0.1:6379> xread count 1 block 1000 streams demo $
(nil)
(1.01s)

创建消费组

消费组

# 从开头开始消费
127.0.0.1:6379> xgroup create demo cg1 0-0
OK

# 从尾部开始消费,只接受新消息,当前的 Stream 会全忽略
127.0.0.1:6379> xgroup create demo cg2 $
OK

# 获取当前 stream 的信息
127.0.0.1:6379> xinfo stream demo
 1) "length"
 2) (integer) 4 # 共 4 个消息
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 2 # 两个消费组
 9) "last-generated-id"
10) "1570514266119-0"
11) "first-entry" # 第一个消息
12) 1) "1570513760437-0"
    2) 1) "name"
       2) "jk"
       3) "age"
       4) "27"
13) "last-entry" # 最后一个消息
14) 1) "1570514266119-0"
    2) 1) "name"
       2) "kagam1"
       3) "age"
       4) "30"

# 查询当前消费组信息
127.0.0.1:6379> xinfo groups demo
1) 1) "name"
   2) "cg1"
   3) "consumers"
   4) (integer) 0 # 该消费组还没有消费者
   5) "pending"
   6) (integer) 0 # 该消费组没有正在处理的消息
   7) "last-delivered-id"
   8) "0-0"
2) 1) "name"
   2) "cg2"
   3) "consumers"
   4) (integer) 0 # 该消费组还没有消费者
   5) "pending"
   6) (integer) 0 # 该消费组没有正在处理的消息
   7) "last-delivered-id"
   8) "1570514266119-0"

消费

指令:

xreadgroup GROUP [group_name] [consumer] [COUNT count] [BLOCK milliseconds] STREAMS [key] ID

基本用法与 「xread」 一样。也可以阻塞消息。当读到新消息后,会将对应的 id 加入到PEL(正在处理消息)结构里,然后客户端处理完后通过 xack 指令通知服务器,消息处理完毕,则消息 ID 会从 PEL 中删除。

# > 表示从当前 last_delivered_id 后面开始读取
# 每当消费者读取一条信息,last_delivered_id 就会往前移动
127.0.0.1:6379> xreadgroup group cg1 c1 count 1 streams demo >
1) 1) "demo"
   2) 1) 1) "1570513760437-0"
         2) 1) "name"
            2) "jk"
            3) "age"
            4) "27"
127.0.0.1:6379> xreadgroup group cg1 c1 count 1 streams demo >
1) 1) "demo"
   2) 1) 1) "1570513764246-0"
         2) 1) "name"
            2) "jk4905"
            3) "age"
            4) "28"
127.0.0.1:6379> xreadgroup group cg1 c1 count 1 streams demo >
1) 1) "demo"
   2) 1) 1) "1570513767145-0"
         2) 1) "name"
            2) "kagami"
            3) "age"
            4) "29"
127.0.0.1:6379> xreadgroup group cg1 c1 count 1 streams demo >
1) 1) "demo"
   2) 1) 1) "1570514266119-0"
         2) 1) "name"
            2) "kagam1"
            3) "age"
            4) "30"

# 再继续读取,就没有新消息了
127.0.0.1:6379> xreadgroup group cg1 c1 count 1 streams demo >
(nil)

# 那就阻塞等待吧
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams demo >
# 开启另一个窗口,往里塞消息
127.0.0.1:6379> xadd demo * name lanying age 61
1527854062442-0
# 回到前一个窗口,发现阻塞解除,收到新消息了
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams demo >
1) 1) "codehole"
   2) 1) 1) 1527854062442-0
         2) 1) "name"
            2) "lanying"
            3) "age"
            4) "61"
(36.54s)
# 查看消费组的信息
127.0.0.1:6379> xinfo groups demo
1) 1) "name"
   2) "cg1"
   3) "consumers"
   4) (integer) 1 # 一个消费者
   5) "pending"
   6) (integer) 4 # 共 4 条信息已经被读取,但是没返回 ack
   7) "last-delivered-id"
   8) "1570514266119-0"
2) 1) "name"
   2) "cg2"
   3) "consumers"
   4) (integer) 0 # 消费组 cg2 没有任何变化,因为前面我们一直在操纵 cg1
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1570514266119-0"

# 查看同一个消费组中的消费者信息
127.0.0.1:6379> xinfo consumers demo cg1
1) 1) "name"
   2) "c1"
   3) "pending"
   4) (integer) 4 # 共 4 条信息待处理
   5) "idle"
   6) (integer) 289904 # 空闲了多长时间 ms 没有读取消息了

# ack 一条信息回去
127.0.0.1:6379> xack demo cg1 1570513760437-0
(integer) 1

# 再次查看消费者
127.0.0.1:6379> xinfo consumers demo cg1
1) 1) "name"
   2) "c1"
   3) "pending" 
   4) (integer) 3 # 已经变成 3 条了,说明返回了一条。
   5) "idle"
   6) (integer) 886880

# 处理完所有信息
127.0.0.1:6379> xack demo cg1 1570513764246-0 1570513767145-0 1570514266119-0
(integer) 3
127.0.0.1:6379> xinfo consumers demo cg1
1) 1) "name"
   2) "c1"
   3) "pending"
   4) (integer) 0 # 现在已经没有未处理的信息了
   5) "idle"
   6) (integer) 1089756

注意点

Stream 消息太多

Redis 提供了一个定长功能。指定 Stream 的最大长度。

# 当前长度为 4
127.0.0.1:6379> xlen demo
(integer) 4
# 添加一个新的消息,并且将最大长度设置为 3
127.0.0.1:6379> xadd demo maxlen 3 * name xiaorui age 1
"1570517209621-0"
# 再次查看长度变为了 3,老消息被砍掉
127.0.0.1:6379> xlen demo
(integer) 3
消息如果忘记 ACK

Stream 保存了正在处理中的消息 id 列表 PEL,如果忘记处理,则保存 PEL 会不断增加。

PEL 如何避免消息丢失?

当客户端读取 Stream 消息时,Redis 服务器将消息回复给客户端的过程中,网络断开,消息就丢失了。但是 PEL 保存了已经发出去的消息 id。待重新连接后,可以再次接受 PEL 中的消息 ID 列表。不过此时 xreadgroup 的起始消息 id 不能用 >,而是任意有效的消息 id,一般为 0-0,表示读取所有的 PEL 消息以及自 last_delivered_id 之后的新消息。

Stream 的高可用

Stream 的高可用是建立在主从复制基础上的。支持 sentinel 和 cluster 集群环境下高可用方案。 但是由于 Redis 的主从复制是异步的,所以在 failover 发生时,Redis 可能失去极小部分数据,这点和 Redis 其他数据结构一样。

使用 PHP 操纵 Stream

laravel 需要使用 phpredis,predis 不支持。

安装 phpredis
# 下载源码
git clone https://github.com/phpredis/phpredis.git

# 执行,如果失败通过 使用 sudo apt-get install php7.2-dev
phpize 

./configure

sudo make 

sudo make install

# 编译好的文件存在 ./module 下 redis.sos

将 redis.so 放入 /usr/lib/php/20170718/ 下

在 /etc/php/7.2/fpm/conf.d/ 创建 20-redis.ini 文件。 并在文件中添加

extension=redis.so

保存后重启 php-fpm 和 nginx

service php7.2-fpm restart
service nginx restart

# 查看是否安装成功
php -m|grep redis
使用

修改 laravel 中的 config/database.php

<?php 
···
···
'redis'         => [
//        'client'   => 'predis',
        'client'   => 'phpredis',
        'cluster'  => true,
        'options'  => [
            'cluster' => 'redis'
        ],
        'default'  => [
            'host'         => env('REDIS_HOST_FIRST', '127.0.0.1'),
            'password'     => env('REDIS_PASSWORD', null),
            'port'         => env('REDIS_PORT', 6379),
            'database'     => 1,
            'read_timeout' => env('REDIS_TIMEOUT', 5),
        ],
        'clusters' => [
            'mycluster1' => [
                [
                    'host'     => '127.0.0.1',
                    'password' => null,
                    'port'     => 6381,
                    'database' => 0,
                ],
                [
                    'host'     => '127.0.0.1',
                    'password' => null,
                    'port'     => 6382,
                    'database' => 0,
                ],
                [
                    'host'     => '127.0.0.1',
                    'password' => null,
                    'port'     => 6383,
                    'database' => 0,
                ],
                [
                    'host'     => '127.0.0.1',
                    'password' => null,
                    'port'     => 6384,
                    'database' => 0,
                ],
                [
                    'host'     => '127.0.0.1',
                    'password' => null,
                    'port'     => 6385,
                    'database' => 0,
                ],
                [
                    'host'     => '127.0.0.1',
                    'password' => null,
                    'port'     => 6386,
                    'database' => 0,
                ],
            ],
        ],
    ],

操作

<?php
        $redis = Redis::connection('mycluster1');
        $redis->del('codehole');
        $redis->xGroup('DESTROY', 'codehole', 'cg1');

        $redis->xAdd('codehole','*',['name'=>'laoqian','age'=>'30']);
        $redis->xAdd('codehole','*',['name'=>'xiaoyu','age'=>'29']);
        $redis->xAdd('codehole','*',['name'=>'xiaoqian','age'=>'1']);

        $xrange = $redis->xrange('codehole','-','+');

        $xgroup1 = $redis->xGroup('Create','codehole','cg1','0',true);

        $xlen = $redis->xLen('codehole');

        $xinfoGroups = $redis->xInfo('GROUPS','codehole');
        $xinfoStreams = $redis->xInfo('STREAM','codehole');
        dump($xgroup1);
        dump($xlen);
        dump($xrange);
        dump($xinfoGroups);
        dump($xinfoStreams);

        $xread1 = $redis->xReadGroup('cg1','c1',['codehole'=>'>'],1,1);
        $xread2 = $redis->xReadGroup('cg1','c1',['codehole'=>'>'],1,1);
        $xread3 = $redis->xReadGroup('cg1','c1',['codehole'=>'>'],1,1);
        dump($xread1);
        dump($xread2);
        dump($xread3);

        dump($redis->xInfo('GROUPS','codehole'));

        $xreadkey1 = array_keys($xread1['codehole'])[0];
        $xreadkey2 = array_keys($xread2['codehole'])[0];
        $xreadkey3 = array_keys($xread3['codehole'])[0];

        $xack1 = $redis->xAck('codehole','cg1',[$xreadkey1]);
        $xack2 = $redis->xAck('codehole','cg1',[$xreadkey2]);
        $xack3 = $redis->xAck('codehole','cg1',[$xreadkey3]);

        dump($xack1);
        dump($xack2);
        dump($xack3);
        dump($redis->xInfo('GROUPS','codehole'));

        exit;
Copyright © Kagami丶 2019 all right reserved,powered by Gitbook该文件修订时间: 2019-12-10 20:25:25

results matching ""

    No results matching ""