上篇我们介绍了RedisStream实现消息队列和发布订阅模式 ,今天我们将介绍Go语言的实现方法。

消息队列

接上篇的例子,我们通过go-redis库的接口XAdd、XReadGroup、XGroupCreateMkStream来实现,详细如下:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
)
func main() {
	var (
		ctx      = context.Background()
		consumer = "consumer1"
		stream   = "mystream"
		group    = "group1"
		start    = ">"
		count    = 10
	)

	// 创建Redis客户端
	client := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       0,
	})
	// 创建消费者组
	err := client.XGroupCreateMkStream(ctx, stream, group, "0-0").Err()
	if err != nil {
		panic(err)
	}
	//添加消息
	_, err = client.XAdd(ctx, &redis.XAddArgs{
		Stream: stream,
		ID:     "*",
		Values: map[string]interface{}{
			"name": "foo",
		},
	}).Result()
	if err != nil {
		panic(err)
	}
	//读取消息
	messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
		Group:    group,
		Consumer: consumer,
		Streams:  []string{stream, start},
		Count:    int64(count),
		Block:    0,
		NoAck:    false,
	}).Result()
	if err != nil {
		panic(err)
	} 
	for _, message := range messages[0].Messages {
		fmt.Printf("Received message: %v\n", message.Values)
	}
}

上例实现了一个简单的消息队列,可以实现消息的发送和接收,并保证每条消息只被处理一次 。

阻塞超时

通过修改Block(单位为ms)参数来实现阻塞,详细修改如下

for {
		fmt.Println("Start Receiving", time.Now().String())
		//读取消息
		messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    group,
			Consumer: consumer,
			Streams:  []string{stream, start},
			Count:    int64(count),
			Block:    2000,
			NoAck:    false,
		}).Result()
		if err != nil {
			panic(err)
		}
		fmt.Println("Received Length=", len(messages), time.Now().String()) 
    ...
	}

输出如下:

Start Receiving 2023-04-08 21:39:23.9669498 +0800 CST m=+0.529310801
Received Length= 1 2023-04-08 21:39:24.0419963 +0800 CST m=+0.604357301
Received message: map[name:foo]
Start Receiving 2023-04-08 21:39:24.0419963 +0800 CST m=+0.604357301
panic: read tcp 127.0.0.1:63777->127.0.0.1:6379: i/o timeout

可见,XReadGroup一直阻塞等待新的消息,直到 Redis 连接超时或者命令被中断。 ps: 如若不需要阻塞等待可修改Block为<0 的值实现。

发布订阅模式

接着,我们实现上篇提到的发布订阅模式,XGroupCreateMkStream创建另外一个订阅组group2,两个协程XReadGroup接收消息,往stream中XAdd消息,两个订阅者将都收到消息,详细如下:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
	"time"
)
func main() {
	var (
		ctx       = context.Background()
		consumer  = "consumer1"
		consumer2 = "consumer2"
		stream    = "mystream"
		group     = "group1"
		group2    = "group2"
		start     = ">"
		count     = 10
	)
	// 创建Redis客户端
	client := redis.NewClient(&redis.Options{
		Addr:        "localhost:6379",
		Password:    "",
		DB:          0,
		ReadTimeout: time.Second * 10,
	})
	// 创建消费者组
	err := client.XGroupCreateMkStream(ctx, stream, group, "0-0").Err()
	if err != nil {
		//panic(err)
	}
	// 创建消费者组2
	err = client.XGroupCreateMkStream(ctx, stream, group2, "0-0").Err()
	if err != nil {
		//panic(err)
	}
	//添加消息
	_, err = client.XAdd(ctx, &redis.XAddArgs{
		Stream: stream,
		ID:     "*",
		Values: map[string]interface{}{
			"name": "foo",
		},
	}).Result()
	if err != nil {
		panic(err)
	}
	go func() {
		for {
			fmt.Println("Start Receiving", time.Now().String())
			//读取消息
			messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
				Group:    group,
				Consumer: consumer,
				Streams:  []string{stream, start},
				Count:    int64(count),
				Block:    2000,
				NoAck:    false,
			}).Result()
			if err != nil {
				panic(err)
			}
			fmt.Println("Received Length=", len(messages), time.Now().String())
			for _, message := range messages[0].Messages {
				fmt.Printf("Received message: %v\n", message.Values)
			}

		}
	}()
	//读取消息2
	go func() {
		for {
			fmt.Println("2 Start Receiving ", time.Now().String())
			messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
				Group:    group2,
				Consumer: consumer2,
				Streams:  []string{stream, start},
				Count:    int64(count),
				Block:    2000,
				NoAck:    false,
			}).Result()
			if err != nil {
				panic(err)
			}
			fmt.Println("2 Received Length=", len(messages), time.Now().String())
			for _, message := range messages[0].Messages {
				fmt.Printf("2Received message: %v\n", message.Values)
			}
		}
	}()
	for {
	}
}

输出如下:

Start Receiving 2023-04-08 22:19:41.9981051 +0800 CST m=+0.347161101
2 Start Receiving  2023-04-08 22:19:41.9981051 +0800 CST m=+0.347161101
Received Length= 1 2023-04-08 22:19:42.0571709 +0800 CST m=+0.406226901
Received message: map[name:foo]
Start Receiving 2023-04-08 22:19:42.0571709 +0800 CST m=+0.406226901
2 Received Length= 1 2023-04-08 22:19:42.3628412 +0800 CST m=+0.711897201
2Received message: map[name:foo]

参考

前言

游戏排行榜是一个常见需求,今天主要介绍go语言使用redis-sort-sets来实现排行榜常见功能。

Redis Sort Sets

官方介绍:

Redis排序集是按相关分数排序的惟一字符串(成员)的集合。当多个字符串具有相同的分数时,字符串将按字典顺序排列。排序集的一些用例包括:

游戏排行榜。例如,您可以使用排序集轻松地维护大型在线游戏中最高分数的有序列表。 速率限制器。特别是,您可以使用一个排序集来构建滑动窗口速率限制器,以防止过多的API请求。

常用于排行榜的命令:

  • ZRANGE 返回排序集的成员(升序)
  • ZREVANGE 返回排序集的成员(降序序)
  • ZADD 向排序集添加一个新成员和相关分数。如果成员已经存在,则更新评分。
  • ZREM 删除排序集一个成员
  • ZRANK 返回提供的成员的排名(升序)
  • ZREVRANK 返回提供的成员的排名(降序)

Go Redis

Go Redis为各种风格的Redis提供Go客户端,基础的使用例子如下:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v9"
)

func main() {
	ctx := context.Background()
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379", // use default Addr
		Password: "",               // no password set
		DB:       0,                // use default DB
	})
	pong, err := rdb.Ping(ctx).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println(pong, err)
	if err != nil {
		fmt.Println(err)
		return
	}
    defer rdb.Close()
}

Examples

这里我们用go-redis实现排行榜常见的功能,包括:

  • 获取排行榜成员和分数 (升序、降序)
  • 加入排行榜成员
  • 删除排行榜成员
  • 更新排行榜成员分数
  • 获取排行榜成员分数
  • 获取排行榜名次成员
  • 清空排行榜

完整代码如下:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v9"
	"math/rand"
	"strconv"
)

func main() {
	ctx := context.Background()
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379", // use default Addr
		Password: "",               // no password set
		DB:       0,                // use default DB
	}) 
	var err error
	defer rdb.Close()
	//增加玩家分数的变化
	for i := 1; i <= 5; i++ {
		err = rdb.ZAdd(ctx, "leaderboard", redis.Z{
			Score:  float64(rand.Intn(100)),
			Member: "user:" + strconv.Itoa(i),
		}).Err()
		if err != nil {
			fmt.Println(err)
			return
		}
	}
	if err != nil {
		fmt.Println(err)
		return
	}
	//查询排行榜成员-升序
	members, err := rdb.ZRange(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜成员-升序", members)
	//查询排行榜成员和分数-升序
	membersWithScore, err := rdb.ZRevRangeWithScores(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜成员和分数-升序", membersWithScore)

	//查询排行榜成员-降序
	membersRev, err := rdb.ZRevRange(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜成员-降序", membersRev)
	//查询排行榜成员和分数-升序
	membersRevWithScore, err := rdb.ZRangeWithScores(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜成员和分数-升序", membersRevWithScore)

	//获取某玩家的分数
	user2Score, err := rdb.ZScore(ctx, "leaderboard", "user:2").Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("获取某玩家 user:2 的分数", user2Score)
	//更新某玩家的分数
	err = rdb.ZAdd(ctx, "leaderboard", redis.Z{
		Score:  user2Score,
		Member: "user:1",
	}).Err()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("更新某玩家 user:1 的分数", user2Score)
	//查询排行榜成员和分数-升序
	membersRevWithScore, err = rdb.ZRangeWithScores(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜变化后的排行榜成员和分数-升序", membersRevWithScore)

	//查询排行榜某玩家的分数-升序
	memberRank, err := rdb.ZRank(ctx, "leaderboard", "user:2").Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜某玩家  user:2 的排名", memberRank)

	//查询排行榜某玩家的分数-降序
	memberRevRank, err := rdb.ZRevRank(ctx, "leaderboard", "user:2").Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜某玩家 user:2 的排名-降序", memberRevRank)

	//删除排序集一个成员
	zrem, err := rdb.ZRem(ctx, "leaderboard", "user:1").Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("删除排序集一个成员", zrem)

	//删除后查询排行榜成员-升序
	members, err = rdb.ZRange(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("删除后查询排行榜成员-升序", members)

	//清空排行榜
	clear, err := rdb.ZRemRangeByRank(ctx, "leaderboard", 0, 4).Result()
	if err != nil {
		fmt.Println("err zrem", err)
		return
	}
	fmt.Println("清空排行榜", clear)
	//清空后查询排行榜成员-升序
	members, err = rdb.ZRange(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("清空后查询排行榜成员-升序", members) 

OutPut:

查询排行榜成员-升序 [user:3 user:4 user:1 user:5 user:2]
查询排行榜成员和分数-升序 [{87 user:2} {81 user:5} {81 user:1} {59 user:4} {47 user:3}]
查询排行榜成员-降序 [user:2 user:5 user:1 user:4 user:3]
查询排行榜成员和分数-升序 [{47 user:3} {59 user:4} {81 user:1} {81 user:5} {87 user:2}]
获取某玩家 user:2 的分数 87
更新某玩家 user:1 的分数 87
查询排行榜变化后的排行榜成员和分数-升序 [{47 user:3} {59 user:4} {81 user:5} {87 user:1} {87 user:2}]
查询排行榜某玩家  user:2 的排名 4
查询排行榜某玩家 user:2 的排名-降序 0
删除排序集一个成员 1
删除后查询排行榜成员-升序 [user:3 user:4 user:5 user:2]
清空排行榜 4
清空后查询排行榜成员-升序 []

参考

场景

在项目开发中,需要用到缓存和对一个列表数据分页查询,但由于redis是key-value的存储方式,我们期望的使用类似postgresql的offset和limit,不至于需要一个个key遍历过去。

设计

分析一下我们需求,那么需求需要实现的接口大概是:

FindByPage(ctx context.Context, page, size int) ([]Object, error)

大致我们需要解决两个问题:

  • 存储对象
  • 列表分页快速查找

对象存储

Redis hash 是一个 string 类型的 field(字段) 和 value(值) 的映射表,hash 特别适合用于存储对象。

  • HDEL key field1 [field2]删除一个或多个哈希表字段
  • HEXISTS key field查看哈希表 key 中,指定的字段是否存在。
  • HGET key field获取存储在哈希表中指定字段的值。
  • HGETALL key获取在哈希表中指定 key 的所有字段和值

如此,我们可以 redis hash 和 json.Encoding 来存储。 获得一个Object

//【ObjectID】= "{"foo": "123", "bar":"456"}"

import  "github.com/go-redis/redis/v8"

result, err := rdb.HGet(ctx, "key", id.String()).Result()
if err != nil {
     return
}
var obj = &YourObject{}
err = json.Unmarshal([]byte(result), obj)
if err != nil {
     return
}

增加一个Object:

ob, err := json.Marshal(Object{}) 
	err = repo.data.rdb.HSet(ctx, "key", ob.ID, ob).Err()
	if err != nil {
		return err
	}

分页

我们的需求中,ObjectID是一个唯一的int,那么可以使用 zset

ZXyy redis 有序集合的基本命令。

  • ZADD key score1 member1 [score2 member2]向有序集合添加一个或多个成员,或者更新已存在成员的分数
  • ZCARD key获取有序集合的成员数
  • ZRANGE key start stop [WITHSCORES]通过索引区间返回有序集合指定区间内的成员

那么可以把 zset 的key 和 value 都设置为 ObjectID 增加一个Object的代码:

if err := rdb.ZAdd(ctx, "key", &redis.Z{
     Score:  float64(Object.ID),
     Member: strconv.Itoa(int(Object.ID)),
}).Err(); err != nil {
     return err
}

获取X分页的代码:

var (
     start = int64((page - 1) * size)
     end   = start + int64(size)
)
result, err := rdb.ZRange(ctx, "key", start, end).Result()
if err != nil {
     return
}

测试

我们增加N个Object:

[1 a]
[2 b]
[3 c]
[11 aa]
[12 bb]
[13 cc]
[21 aaaa]

每页的长度为4,获取第一页

[1 2 3 11]

第二页:

[12 13 21]

第三页:

[ ]

参考

jefffff

Stay hungry. Stay Foolish COOL

Go backend developer

China Amoy