上篇我们介绍了RedisStream实现消息队列和发布订阅模式 ,今天我们将介绍Go语言的实现方法。
消息队列
接上篇的例子,我们通过go-redis库的接口XAdd、XReadGroup、XGroupCreateMkStream来实现,详细如下:
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
func main() {
var (
ctx = context.Background()
consumer = "consumer1"
stream = "mystream"
group = "group1"
start = ">"
count = 10
)
// 创建Redis客户端
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
// 创建消费者组
err := client.XGroupCreateMkStream(ctx, stream, group, "0-0").Err()
if err != nil {
panic(err)
}
//添加消息
_, err = client.XAdd(ctx, &redis.XAddArgs{
Stream: stream,
ID: "*",
Values: map[string]interface{}{
"name": "foo",
},
}).Result()
if err != nil {
panic(err)
}
//读取消息
messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: group,
Consumer: consumer,
Streams: []string{stream, start},
Count: int64(count),
Block: 0,
NoAck: false,
}).Result()
if err != nil {
panic(err)
}
for _, message := range messages[0].Messages {
fmt.Printf("Received message: %v\n", message.Values)
}
}
上例实现了一个简单的消息队列,可以实现消息的发送和接收,并保证每条消息只被处理一次 。
阻塞超时
通过修改Block(单位为ms)参数来实现阻塞,详细修改如下
for {
fmt.Println("Start Receiving", time.Now().String())
//读取消息
messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: group,
Consumer: consumer,
Streams: []string{stream, start},
Count: int64(count),
Block: 2000,
NoAck: false,
}).Result()
if err != nil {
panic(err)
}
fmt.Println("Received Length=", len(messages), time.Now().String())
...
}
输出如下:
Start Receiving 2023-04-08 21:39:23.9669498 +0800 CST m=+0.529310801
Received Length= 1 2023-04-08 21:39:24.0419963 +0800 CST m=+0.604357301
Received message: map[name:foo]
Start Receiving 2023-04-08 21:39:24.0419963 +0800 CST m=+0.604357301
panic: read tcp 127.0.0.1:63777->127.0.0.1:6379: i/o timeout
可见,XReadGroup一直阻塞等待新的消息,直到 Redis 连接超时或者命令被中断。 ps: 如若不需要阻塞等待可修改Block为<0 的值实现。
发布订阅模式
接着,我们实现上篇提到的发布订阅模式,XGroupCreateMkStream创建另外一个订阅组group2,两个协程XReadGroup接收消息,往stream中XAdd消息,两个订阅者将都收到消息,详细如下:
package main
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"time"
)
func main() {
var (
ctx = context.Background()
consumer = "consumer1"
consumer2 = "consumer2"
stream = "mystream"
group = "group1"
group2 = "group2"
start = ">"
count = 10
)
// 创建Redis客户端
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
ReadTimeout: time.Second * 10,
})
// 创建消费者组
err := client.XGroupCreateMkStream(ctx, stream, group, "0-0").Err()
if err != nil {
//panic(err)
}
// 创建消费者组2
err = client.XGroupCreateMkStream(ctx, stream, group2, "0-0").Err()
if err != nil {
//panic(err)
}
//添加消息
_, err = client.XAdd(ctx, &redis.XAddArgs{
Stream: stream,
ID: "*",
Values: map[string]interface{}{
"name": "foo",
},
}).Result()
if err != nil {
panic(err)
}
go func() {
for {
fmt.Println("Start Receiving", time.Now().String())
//读取消息
messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: group,
Consumer: consumer,
Streams: []string{stream, start},
Count: int64(count),
Block: 2000,
NoAck: false,
}).Result()
if err != nil {
panic(err)
}
fmt.Println("Received Length=", len(messages), time.Now().String())
for _, message := range messages[0].Messages {
fmt.Printf("Received message: %v\n", message.Values)
}
}
}()
//读取消息2
go func() {
for {
fmt.Println("2 Start Receiving ", time.Now().String())
messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: group2,
Consumer: consumer2,
Streams: []string{stream, start},
Count: int64(count),
Block: 2000,
NoAck: false,
}).Result()
if err != nil {
panic(err)
}
fmt.Println("2 Received Length=", len(messages), time.Now().String())
for _, message := range messages[0].Messages {
fmt.Printf("2Received message: %v\n", message.Values)
}
}
}()
for {
}
}
输出如下:
Start Receiving 2023-04-08 22:19:41.9981051 +0800 CST m=+0.347161101
2 Start Receiving 2023-04-08 22:19:41.9981051 +0800 CST m=+0.347161101
Received Length= 1 2023-04-08 22:19:42.0571709 +0800 CST m=+0.406226901
Received message: map[name:foo]
Start Receiving 2023-04-08 22:19:42.0571709 +0800 CST m=+0.406226901
2 Received Length= 1 2023-04-08 22:19:42.3628412 +0800 CST m=+0.711897201
2Received message: map[name:foo]