RedisStream实现消息队列
Redis Stream是在 Redis 5.0中引入的数据类型,可以实现高性能、高可靠性的消息队列。本文主要介绍 Redis Stream 的概念、使用方法和一些适用场景如发布订阅模式、消息队列。
简介
Redis Stream是Redis为消息队列设计的数据类型,它将消息队列的数据结构抽象为一个有序的消息流(stream),每个消息都有一个唯一的ID和一个关联的键值对(key-value pairs)它支持以下功能:
- 添加消息-将消息添加到队列的末尾
- 读取消息-从队列的开头读取消息
- 删除消息-删除指定id的消息
- 创建消费者组-创建多个消费者组,每个消费者组都可以独立地读取消息。同时可实现负载均衡和消息分发。
- 确认消息- XACK 命令来确认已经成功处理的消息,避免重复消费
XADD-添加消息
语法:
XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold
[LIMIT count]] <* | id> field value [field value ...]
- MAXLEN:可选参数,用于限制消息流的长度。如果指定了 MAXLEN,那么当消息流的长度超过指定的长度时,最早的消息将被自动删除
- ID:可选参数,用于指定消息的 ID。如果不指定,Redis 会自动生成一个唯一的 ID。
- field value [field value … :消息的键值对,可以是任何字符串。
例如:
XADD mystream * name Foo age 10
此命令将向名为 mystream 的消息流中添加一条消息,输出如下:
>"1680355760868-0"
XREAD-读取消息
语法:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id
[id ...]
- COUNT:可选参数,用于指定要读取的消息数。
- BLOCK:可选参数,用于指定阻塞时间,如果没有新的消息可以读取,那么会等待指定的时间后返回。
- milliseconds:阻塞时间,以毫秒为单位。
- key:消息流的名称。
- id:可选参数,用于指定从哪个消息 ID 开始读取消息。如果不指定 id,那么从最新的消息开始读取。
例如:
XREAD STREAMS mystream 0
此命令将返回 mystream 中最新的消息。
输出如下
>1) "mystream"
> 1) 1) "1680355760868-0"
> 2) 1) "name"
> 2) "Foo"
> 3) "age"
> 4) "10"
XDEL 删除消息
语法:
XDEL key id [id ...]
- id 将要删除的ID
例如:
XDEL mystream 1680355760868-0
此命令将从名为 mystream 的消息流中删除一条消息,输出如下
>127.0.0.1:6379> XDEL mystream 1680355760868-0
>(integer) 1
>127.0.0.1:6379> XREAD STREAMS mystream 0
>(nil)
XGROUP CREATE-创建消费者组
语法:
XGROUP CREATE key group <id | $> [MKSTREAM]
[ENTRIESREAD entries-read]
- key:消息流的名称。
- group:消费者组的名称。
- id:可选参数,用于指定消费者组的起始 ID。如果不指定,那么起始 ID 将是最新消息的 ID。
- MKSTREAM:可选参数,用于指定如果消息流不存在时是否自动创建。如果指定了 MKSTREAM,则会自动创建消息流。
例如:
XGROUP CREATE mystream group1 0-0
创建一个名为 group1 的消费者组,并绑定到 mystream 的消息流上, 起始ID为最新的ID。输出如下:
>127.0.0.1:6379> XADD mystream * name Bar age 11
>"1680356821501-0"
>127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS mystream >
>1) 1) "mystream"
> 2) 1) 1) "1680356821501-0"
> 2) 1) "name"
> 2) "Bar"
> 3) "age"
> 4) "11"
XACK-确认消息
语法:
XACK key group id [id ...]
- key:消息流的名称。
- group:消费者组的名称。
- id:要确认的消息的 ID。 例如:
127.0.0.1:6379> XACK mystream group1 1680356821501-0
(integer) 1
此命令将确认 ID 为 1680356821501-0 的消息已经被处理:
XREAD BLOCK 阻塞数据
语法:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id
[id ...]
例如:
127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1680269417682-0
此READ BLOCK 命令可以一直阻塞等待新的消息,直到 Redis 连接超时或者命令被中断。可以通过在 XREAD BLOCK 命令中指定一个非零的超时时间来避免无限阻塞。
实践篇
发布 / 订阅模式
Redis Stream 支持创建多个消费者组,利用这个特性我们可以实现发布 / 订阅模式。 例如上例中,创建一个名为 group2 的消费者组,并绑定到 mystream 的消息流上, 起始ID为最新的ID。
XGROUP CREATE mystream group2 0-0
输出如下:
>127.0.0.1:6379> XADD mystream * name Jeff age 20
>"1680414977243-0"
>
>127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS mystream >
>1) 1) "mystream"
> 2) 1) 1) "1680414977243-0"
> 2) 1) "name"
> 2) "Jeff"
> 3) "age"
> 4) "20"
>127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS mystream >
>1) 1) "mystream"
> 2) 1) 1) "1680414977243-0"
> 2) 1) "name"
> 2) "Jeff"
> 3) "age"
> 4) "20"
例中,我们增加了信息到 mystream,在多的GROUP消费者可以分别收到消息。
消息队列
利用以下几个特性,可以方便地实现各种消息队列的场景。
-
原子性: 既是每个消息有且只有一次被读取
a. 当多个消费者在同一个消费者组中读取消息时,它们会竞争相同的消息。当一个消息被多个消费者读取时,只有其中一个消费者会成功处理该消息。
b. 可以使用 XACK 命令来确认已经成功处理的消息,避免重复消费。
-
高性能可拓展
a. 消费者组的动态扩展和收缩,可以使用消费者组来实现负载均衡和消息分发。