上篇我们介绍了RedisStream实现消息队列和发布订阅模式 ,今天我们将介绍Go语言的实现方法。

消息队列

接上篇的例子,我们通过go-redis库的接口XAdd、XReadGroup、XGroupCreateMkStream来实现,详细如下:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
)
func main() {
	var (
		ctx      = context.Background()
		consumer = "consumer1"
		stream   = "mystream"
		group    = "group1"
		start    = ">"
		count    = 10
	)

	// 创建Redis客户端
	client := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       0,
	})
	// 创建消费者组
	err := client.XGroupCreateMkStream(ctx, stream, group, "0-0").Err()
	if err != nil {
		panic(err)
	}
	//添加消息
	_, err = client.XAdd(ctx, &redis.XAddArgs{
		Stream: stream,
		ID:     "*",
		Values: map[string]interface{}{
			"name": "foo",
		},
	}).Result()
	if err != nil {
		panic(err)
	}
	//读取消息
	messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
		Group:    group,
		Consumer: consumer,
		Streams:  []string{stream, start},
		Count:    int64(count),
		Block:    0,
		NoAck:    false,
	}).Result()
	if err != nil {
		panic(err)
	} 
	for _, message := range messages[0].Messages {
		fmt.Printf("Received message: %v\n", message.Values)
	}
}

上例实现了一个简单的消息队列,可以实现消息的发送和接收,并保证每条消息只被处理一次 。

阻塞超时

通过修改Block(单位为ms)参数来实现阻塞,详细修改如下

for {
		fmt.Println("Start Receiving", time.Now().String())
		//读取消息
		messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    group,
			Consumer: consumer,
			Streams:  []string{stream, start},
			Count:    int64(count),
			Block:    2000,
			NoAck:    false,
		}).Result()
		if err != nil {
			panic(err)
		}
		fmt.Println("Received Length=", len(messages), time.Now().String()) 
    ...
	}

输出如下:

Start Receiving 2023-04-08 21:39:23.9669498 +0800 CST m=+0.529310801
Received Length= 1 2023-04-08 21:39:24.0419963 +0800 CST m=+0.604357301
Received message: map[name:foo]
Start Receiving 2023-04-08 21:39:24.0419963 +0800 CST m=+0.604357301
panic: read tcp 127.0.0.1:63777->127.0.0.1:6379: i/o timeout

可见,XReadGroup一直阻塞等待新的消息,直到 Redis 连接超时或者命令被中断。 ps: 如若不需要阻塞等待可修改Block为<0 的值实现。

发布订阅模式

接着,我们实现上篇提到的发布订阅模式,XGroupCreateMkStream创建另外一个订阅组group2,两个协程XReadGroup接收消息,往stream中XAdd消息,两个订阅者将都收到消息,详细如下:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
	"time"
)
func main() {
	var (
		ctx       = context.Background()
		consumer  = "consumer1"
		consumer2 = "consumer2"
		stream    = "mystream"
		group     = "group1"
		group2    = "group2"
		start     = ">"
		count     = 10
	)
	// 创建Redis客户端
	client := redis.NewClient(&redis.Options{
		Addr:        "localhost:6379",
		Password:    "",
		DB:          0,
		ReadTimeout: time.Second * 10,
	})
	// 创建消费者组
	err := client.XGroupCreateMkStream(ctx, stream, group, "0-0").Err()
	if err != nil {
		//panic(err)
	}
	// 创建消费者组2
	err = client.XGroupCreateMkStream(ctx, stream, group2, "0-0").Err()
	if err != nil {
		//panic(err)
	}
	//添加消息
	_, err = client.XAdd(ctx, &redis.XAddArgs{
		Stream: stream,
		ID:     "*",
		Values: map[string]interface{}{
			"name": "foo",
		},
	}).Result()
	if err != nil {
		panic(err)
	}
	go func() {
		for {
			fmt.Println("Start Receiving", time.Now().String())
			//读取消息
			messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
				Group:    group,
				Consumer: consumer,
				Streams:  []string{stream, start},
				Count:    int64(count),
				Block:    2000,
				NoAck:    false,
			}).Result()
			if err != nil {
				panic(err)
			}
			fmt.Println("Received Length=", len(messages), time.Now().String())
			for _, message := range messages[0].Messages {
				fmt.Printf("Received message: %v\n", message.Values)
			}

		}
	}()
	//读取消息2
	go func() {
		for {
			fmt.Println("2 Start Receiving ", time.Now().String())
			messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
				Group:    group2,
				Consumer: consumer2,
				Streams:  []string{stream, start},
				Count:    int64(count),
				Block:    2000,
				NoAck:    false,
			}).Result()
			if err != nil {
				panic(err)
			}
			fmt.Println("2 Received Length=", len(messages), time.Now().String())
			for _, message := range messages[0].Messages {
				fmt.Printf("2Received message: %v\n", message.Values)
			}
		}
	}()
	for {
	}
}

输出如下:

Start Receiving 2023-04-08 22:19:41.9981051 +0800 CST m=+0.347161101
2 Start Receiving  2023-04-08 22:19:41.9981051 +0800 CST m=+0.347161101
Received Length= 1 2023-04-08 22:19:42.0571709 +0800 CST m=+0.406226901
Received message: map[name:foo]
Start Receiving 2023-04-08 22:19:42.0571709 +0800 CST m=+0.406226901
2 Received Length= 1 2023-04-08 22:19:42.3628412 +0800 CST m=+0.711897201
2Received message: map[name:foo]

参考

Redis Stream是在 Redis 5.0中引入的数据类型,可以实现高性能、高可靠性的消息队列。本文主要介绍 Redis Stream 的概念、使用方法和一些适用场景如发布订阅模式、消息队列。 简介 Redis Stream是Redis为消息队列设计的数据类型,它将消息队列的数据结构抽象为一个有序的消息流(stream),每个消息都有一个唯一的ID和一个关联的键值对(key-value pairs)它支持以下功能: 添加消息-将消息添加到队列的末尾 读取消息-从队列的开头读取消息 删除消息-删除指定id的消息 创建消费者组-创建多个消费者组,每个消费者组都可以独立地读取消息。同时可实现负载均衡和消息分发。 确认消息- XACK 命令来确认已经成功处理的消息,避免重复消费 XADD-添加消息 语法: XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...] MAXLEN:可选参数,用于限制消息流的长度。如果指定了 MAXLEN,那么当消息流的长度超过指定的长度时,最早的消息将被自动删除 ID:可选参数,用于指定消息的 ID。如果不指定,Redis 会自动生成一个唯一的 ID。 field value [field value … :消息的键值对,可以是任何字符串。 例如: XADD mystream * name Foo age 10 此命令将向名为 mystream 的消息流中添加一条消息,输出如下: >"1680355760868-0" XREAD-读取消息 语法:

jefffff

Stay hungry. Stay Foolish COOL

Go backend developer

China Amoy