Redis Stream是在 Redis 5.0中引入的数据类型,可以实现高性能、高可靠性的消息队列。本文主要介绍 Redis Stream 的概念、使用方法和一些适用场景如发布订阅模式、消息队列。

简介

Redis Stream是Redis为消息队列设计的数据类型,它将消息队列的数据结构抽象为一个有序的消息流(stream),每个消息都有一个唯一的ID和一个关联的键值对(key-value pairs)它支持以下功能:

  1. 添加消息-将消息添加到队列的末尾
  2. 读取消息-从队列的开头读取消息
  3. 删除消息-删除指定id的消息
  4. 创建消费者组-创建多个消费者组,每个消费者组都可以独立地读取消息。同时可实现负载均衡和消息分发。
  5. 确认消息- XACK 命令来确认已经成功处理的消息,避免重复消费

XADD-添加消息

语法:

XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold
  [LIMIT count]] <* | id> field value [field value ...]
  • MAXLEN:可选参数,用于限制消息流的长度。如果指定了 MAXLEN,那么当消息流的长度超过指定的长度时,最早的消息将被自动删除
  • ID:可选参数,用于指定消息的 ID。如果不指定,Redis 会自动生成一个唯一的 ID。
  • field value [field value … :消息的键值对,可以是任何字符串。

例如:

XADD mystream * name Foo age 10

此命令将向名为 mystream 的消息流中添加一条消息,输出如下:

>"1680355760868-0"

XREAD-读取消息

语法:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id
  [id ...]
  • COUNT:可选参数,用于指定要读取的消息数。
  • BLOCK:可选参数,用于指定阻塞时间,如果没有新的消息可以读取,那么会等待指定的时间后返回。
  • milliseconds:阻塞时间,以毫秒为单位。
  • key:消息流的名称。
  • id:可选参数,用于指定从哪个消息 ID 开始读取消息。如果不指定 id,那么从最新的消息开始读取。

例如:

XREAD STREAMS mystream 0

此命令将返回 mystream 中最新的消息。

输出如下

>1) "mystream"
>   1) 1) "1680355760868-0"
>         2) 1) "name"
>            2) "Foo"
>            3) "age"
>            4) "10"

XDEL 删除消息

语法:

XDEL key id [id ...]
  • id 将要删除的ID

例如:

XDEL mystream 1680355760868-0

此命令将从名为 mystream 的消息流中删除一条消息,输出如下

>127.0.0.1:6379> XDEL mystream 1680355760868-0
>(integer) 1
>127.0.0.1:6379> XREAD STREAMS mystream 0
>(nil)

XGROUP CREATE-创建消费者组

语法:

XGROUP CREATE key group <id | $> [MKSTREAM]
  [ENTRIESREAD entries-read]
  • key:消息流的名称。
  • group:消费者组的名称。
  • id:可选参数,用于指定消费者组的起始 ID。如果不指定,那么起始 ID 将是最新消息的 ID。
  • MKSTREAM:可选参数,用于指定如果消息流不存在时是否自动创建。如果指定了 MKSTREAM,则会自动创建消息流。

例如:

XGROUP CREATE mystream group1 0-0

创建一个名为 group1 的消费者组,并绑定到 mystream 的消息流上, 起始ID为最新的ID。输出如下:

>127.0.0.1:6379> XADD mystream * name Bar age 11
>"1680356821501-0"

>127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS mystream >
>1) 1) "mystream"
>   2) 1) 1) "1680356821501-0"
>         2) 1) "name"
>            2) "Bar"
>            3) "age"
>            4) "11"

XACK-确认消息

语法:

XACK key group id [id ...]
  • key:消息流的名称。
  • group:消费者组的名称。
  • id:要确认的消息的 ID。 例如:
127.0.0.1:6379> XACK mystream group1 1680356821501-0
(integer) 1

此命令将确认 ID 为 1680356821501-0 的消息已经被处理:

XREAD BLOCK 阻塞数据

语法:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id
  [id ...]

例如:

127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1680269417682-0

此READ BLOCK 命令可以一直阻塞等待新的消息,直到 Redis 连接超时或者命令被中断。可以通过在 XREAD BLOCK 命令中指定一个非零的超时时间来避免无限阻塞。

实践篇

发布 / 订阅模式

Redis Stream 支持创建多个消费者组,利用这个特性我们可以实现发布 / 订阅模式。 例如上例中,创建一个名为 group2 的消费者组,并绑定到 mystream 的消息流上, 起始ID为最新的ID。

XGROUP CREATE mystream group2 0-0

输出如下:

>127.0.0.1:6379> XADD mystream * name Jeff age 20
>"1680414977243-0"
>
>127.0.0.1:6379>  XREADGROUP GROUP group1 consumer COUNT 5 STREAMS mystream >
>1) 1) "mystream"
>   2) 1) 1) "1680414977243-0"
>         2) 1) "name"
>            2) "Jeff"
>            3) "age"
>            4) "20"

>127.0.0.1:6379>  XREADGROUP GROUP group2 consumer COUNT 5 STREAMS mystream >
>1) 1) "mystream"
>   2) 1) 1) "1680414977243-0"
>         2) 1) "name"
>            2) "Jeff"
>            3) "age"
>            4) "20"

例中,我们增加了信息到 mystream,在多的GROUP消费者可以分别收到消息。

消息队列

利用以下几个特性,可以方便地实现各种消息队列的场景。

  1. 原子性: 既是每个消息有且只有一次被读取

    a. 当多个消费者在同一个消费者组中读取消息时,它们会竞争相同的消息。当一个消息被多个消费者读取时,只有其中一个消费者会成功处理该消息。

    b. 可以使用 XACK 命令来确认已经成功处理的消息,避免重复消费。

  2. 高性能可拓展

    a. 消费者组的动态扩展和收缩,可以使用消费者组来实现负载均衡和消息分发。

参考