Redis Stream是在 Redis 5.0中引入的数据类型,可以实现高性能、高可靠性的消息队列。本文主要介绍 Redis Stream 的概念、使用方法和一些适用场景如发布订阅模式、消息队列。 简介 Redis Stream是Redis为消息队列设计的数据类型,它将消息队列的数据结构抽象为一个有序的消息流(stream),每个消息都有一个唯一的ID和一个关联的键值对(key-value pairs)它支持以下功能: 添加消息-将消息添加到队列的末尾 读取消息-从队列的开头读取消息 删除消息-删除指定id的消息 创建消费者组-创建多个消费者组,每个消费者组都可以独立地读取消息。同时可实现负载均衡和消息分发。 确认消息- XACK 命令来确认已经成功处理的消息,避免重复消费 XADD-添加消息 语法: XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...] MAXLEN:可选参数,用于限制消息流的长度。如果指定了 MAXLEN,那么当消息流的长度超过指定的长度时,最早的消息将被自动删除 ID:可选参数,用于指定消息的 ID。如果不指定,Redis 会自动生成一个唯一的 ID。 field value [field value … :消息的键值对,可以是任何字符串。 例如: XADD mystream * name Foo age 10 此命令将向名为 mystream 的消息流中添加一条消息,输出如下: >"1680355760868-0" XREAD-读取消息 语法:

前言

游戏排行榜是一个常见需求,今天主要介绍go语言使用redis-sort-sets来实现排行榜常见功能。

Redis Sort Sets

官方介绍:

Redis排序集是按相关分数排序的惟一字符串(成员)的集合。当多个字符串具有相同的分数时,字符串将按字典顺序排列。排序集的一些用例包括:

游戏排行榜。例如,您可以使用排序集轻松地维护大型在线游戏中最高分数的有序列表。 速率限制器。特别是,您可以使用一个排序集来构建滑动窗口速率限制器,以防止过多的API请求。

常用于排行榜的命令:

  • ZRANGE 返回排序集的成员(升序)
  • ZREVANGE 返回排序集的成员(降序序)
  • ZADD 向排序集添加一个新成员和相关分数。如果成员已经存在,则更新评分。
  • ZREM 删除排序集一个成员
  • ZRANK 返回提供的成员的排名(升序)
  • ZREVRANK 返回提供的成员的排名(降序)

Go Redis

Go Redis为各种风格的Redis提供Go客户端,基础的使用例子如下:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v9"
)

func main() {
	ctx := context.Background()
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379", // use default Addr
		Password: "",               // no password set
		DB:       0,                // use default DB
	})
	pong, err := rdb.Ping(ctx).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println(pong, err)
	if err != nil {
		fmt.Println(err)
		return
	}
    defer rdb.Close()
}

Examples

这里我们用go-redis实现排行榜常见的功能,包括:

  • 获取排行榜成员和分数 (升序、降序)
  • 加入排行榜成员
  • 删除排行榜成员
  • 更新排行榜成员分数
  • 获取排行榜成员分数
  • 获取排行榜名次成员
  • 清空排行榜

完整代码如下:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v9"
	"math/rand"
	"strconv"
)

func main() {
	ctx := context.Background()
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379", // use default Addr
		Password: "",               // no password set
		DB:       0,                // use default DB
	}) 
	var err error
	defer rdb.Close()
	//增加玩家分数的变化
	for i := 1; i <= 5; i++ {
		err = rdb.ZAdd(ctx, "leaderboard", redis.Z{
			Score:  float64(rand.Intn(100)),
			Member: "user:" + strconv.Itoa(i),
		}).Err()
		if err != nil {
			fmt.Println(err)
			return
		}
	}
	if err != nil {
		fmt.Println(err)
		return
	}
	//查询排行榜成员-升序
	members, err := rdb.ZRange(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜成员-升序", members)
	//查询排行榜成员和分数-升序
	membersWithScore, err := rdb.ZRevRangeWithScores(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜成员和分数-升序", membersWithScore)

	//查询排行榜成员-降序
	membersRev, err := rdb.ZRevRange(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜成员-降序", membersRev)
	//查询排行榜成员和分数-升序
	membersRevWithScore, err := rdb.ZRangeWithScores(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜成员和分数-升序", membersRevWithScore)

	//获取某玩家的分数
	user2Score, err := rdb.ZScore(ctx, "leaderboard", "user:2").Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("获取某玩家 user:2 的分数", user2Score)
	//更新某玩家的分数
	err = rdb.ZAdd(ctx, "leaderboard", redis.Z{
		Score:  user2Score,
		Member: "user:1",
	}).Err()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("更新某玩家 user:1 的分数", user2Score)
	//查询排行榜成员和分数-升序
	membersRevWithScore, err = rdb.ZRangeWithScores(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜变化后的排行榜成员和分数-升序", membersRevWithScore)

	//查询排行榜某玩家的分数-升序
	memberRank, err := rdb.ZRank(ctx, "leaderboard", "user:2").Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜某玩家  user:2 的排名", memberRank)

	//查询排行榜某玩家的分数-降序
	memberRevRank, err := rdb.ZRevRank(ctx, "leaderboard", "user:2").Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜某玩家 user:2 的排名-降序", memberRevRank)

	//删除排序集一个成员
	zrem, err := rdb.ZRem(ctx, "leaderboard", "user:1").Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("删除排序集一个成员", zrem)

	//删除后查询排行榜成员-升序
	members, err = rdb.ZRange(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("删除后查询排行榜成员-升序", members)

	//清空排行榜
	clear, err := rdb.ZRemRangeByRank(ctx, "leaderboard", 0, 4).Result()
	if err != nil {
		fmt.Println("err zrem", err)
		return
	}
	fmt.Println("清空排行榜", clear)
	//清空后查询排行榜成员-升序
	members, err = rdb.ZRange(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("清空后查询排行榜成员-升序", members) 

OutPut:

查询排行榜成员-升序 [user:3 user:4 user:1 user:5 user:2]
查询排行榜成员和分数-升序 [{87 user:2} {81 user:5} {81 user:1} {59 user:4} {47 user:3}]
查询排行榜成员-降序 [user:2 user:5 user:1 user:4 user:3]
查询排行榜成员和分数-升序 [{47 user:3} {59 user:4} {81 user:1} {81 user:5} {87 user:2}]
获取某玩家 user:2 的分数 87
更新某玩家 user:1 的分数 87
查询排行榜变化后的排行榜成员和分数-升序 [{47 user:3} {59 user:4} {81 user:5} {87 user:1} {87 user:2}]
查询排行榜某玩家  user:2 的排名 4
查询排行榜某玩家 user:2 的排名-降序 0
删除排序集一个成员 1
删除后查询排行榜成员-升序 [user:3 user:4 user:5 user:2]
清空排行榜 4
清空后查询排行榜成员-升序 []

参考

前两篇(1.volume2.pv&pvc)通过部署redis学习实战了k8s的来Volume、PV和PVC。但是,应⽤程序存在“有状态”和“⽆状态”两种类别,显然redis属于读写磁盘需求的有状态应⽤程序,如⽀持事务功能的RDBMS存储系统,所以,本文学习实战k8s有状态应用的部署。

Stateful基础

StatefulSet是Pod资源控制器的⼀种实现,⽤于部署和扩展有状态应⽤的Pod资源,确保它们的运⾏顺序及每个Pod资源的唯⼀性。适用以下需求的应用:

  • 稳定且唯⼀的⽹络标识符。
  • 稳定且持久的存储。
  • 有序、优雅地部署和扩展。
  • 有序、优雅地删除和终⽌。
  • 有序⽽⾃动地滚动更新。

部署

接着,这里把之前的redis存储修改为stateful的方式,修改后的步骤:

  1. 创建 ConfigMap (参考第一篇)
  2. 修改 Deployment 为StatefulSets

修改部署StatefulSets

mkdir my-redis-statefulsets.yaml

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: redis
  namespace: my-ns
spec:
  replicas: 1
  serviceName: redis
  selector:
    matchLabels:
      name: redis
  template:
    metadata:
      labels:
        name: redis
    spec:
      containers:
        - name: redis
          image: redis
          resources:
            requests:
              cpu: 100m
              memory: 100Mi
#          command: ["redis-server","/etc/redis/redis.conf"]
          command:
            - redis-server
            - /etc/redis/redis.conf
          ports:
            - containerPort: 6379
          volumeMounts:
            - name: my-redis-config
              mountPath: /etc/redis/redis.conf
              subPath: redis.conf
            - name: my-redis-storage
              mountPath: /data
      volumes:
        - name: my-redis-storage
          emptyDir: {}
        - name: my-redis-config
          configMap:
            name: my-redis-config
            items:
              - key: redis.conf
                path: redis.conf
 
---
kind: Service
apiVersion: v1
metadata:
  labels:
    name: redis
  name: redis
  namespace: my-ns
spec:
  type: NodePort
  ports:
  - name: redis
    port: 6379
    targetPort: 6379
    nodePort: 30379
  selector:
    name: redis          

其中:

  1. Headless Service:⽤于为Pod资源标识符⽣成可解析的DNS资源记录
  2. StatefulSet ⽤于管控Pod资源
  3. volumeClaimTemplate则基于静态或动态的PV供给⽅式为Pod资源提供 专有且固定的存储(这里我们直接使用了第二篇创建的pv)

测试

redis-client 连接 NodeId:NodePort

# redis-cli -h YourNodeIp-p 30379 -a 123456
YourNodeIp:30379> info
# Serverredis_version:7.0.4

连接成功!

参考

之前学习实践使用熟悉卷(Volume)来存储利用k8s储存卷来部署redis,本文接着学习实践k8s的存储,主要通过redis存储例子学习实战PV和PVC。

PV & PVC

Kubernetes为例⽤户和开发隐藏底层架构的⽬标,在用户和存储服务之间添加了一个中间层,也就是PersistentVolume和PersistentVolumeClaim。

RedisVolume修改为PV&PVC

接着,这里把之前的redis存储修改为pv-pvc的方式,修改后的步骤:

  1. 创建 ConfigMap (参考第一篇)
  2. 增加声明PV和PVC (新增)
  3. 增加 Deployment (把Volume修改为 步骤2 的PVC)
  4. 暴露 Service (参考第一篇)

步骤2,增加声明PV和PVC

mkdir my-redis-pv-pvc.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: my-redis-pv
  labels:
     app: my-redis
spec:
  capacity:
    storage: 1Gi
  accessModes:
  - ReadWriteOnce
  hostPath:
    path: "/mnt/data/my-redis"
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: redis-pvc
  namespace: my-ns
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 1Gi

执行创建

master# kubectl apply -f my-redis-pv-pvc.yaml

查看pv状态

master# kubectl get pv 
NAME    CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS   CLAIM     STORAGECLASS    REASON   AGE
my-redis-pv     1Gi        RWO            Retain           Bound    my-ns/redis-pvc                                                 50m

查看pvc状态

master# kubectl get pvc -n my-ns
NAME               STATUS   VOLUME         CAPACITY   ACCESS MODES   STORAGECLASS   AGE
redis-pvc          Bound    my-redis-pv    1Gi             RWO                                          54m

pv和pvc已经Bound成功。

步骤3,增加 Deployment (把Volume修改为 步骤2 的PVC)

mkdir my-redis-deployment-pvc.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-redis # Unique name for the deployment
  namespace: myns
  labels:
    app: my-redis       # Labels to be applied to this deployment
spec:
  selector:
    matchLabels:     # This deployment applies to the Pods matching these labels
      app: my-redis
      role: master
      tier: backend
  replicas: 1        # Run a single pod in the deployment
  template:          # Template for the pods that will be created by this deployment
    metadata:
      labels:        # Labels to be applied to the Pods in this deployment
        app: my-redis
        role: master
        tier: backend
    spec:            # Spec for the container which will be run inside the Pod.
      containers:
        - name: my-redis
          image: redis
          resources:
            requests:
              cpu: 100m
              memory: 100Mi
#          command: ["redis-server","/etc/redis/redis.conf"]
          command:
            - redis-server
            - /etc/redis/redis.conf
          ports:
            - containerPort: 6379
          volumeMounts:
            - name: my-redis-config
              mountPath: /etc/redis/redis.conf
              subPath: redis.conf
            - name: my-redis-storage
              mountPath: /data
      volumes:
        - name: my-redis-persistent-storage 
          persistentVolumeClaim:
          claimName: redis-pvc # 这里修改为步骤2声明的pvc
        - name: my-redis-config
          configMap:
            name: my-redis-config
            items:
              - key: redis.conf
                path: redis.conf

执行创建deployment

master# kubectl apply -f my-redis-deployment-pvc.yaml

检查状态

master# kubectl get pod -n my-ns
NAME                                   READY   STATUS    RESTARTS   AGE 
my-redis-6565459689-mbptf              1/1     Running   0          53m

测试

redis-client 连接 NodeId:NodePort

# redis-cli -h YourNodeIp-p 30379 -a 123456
YourNodeIp:30379> info
# Server
redis_version:7.0.4

连接成功。

参考

本文主要通过利用k8s如何部署Redis,来学习使用k8s的存储卷Volume。

Pod本⾝具有⽣命周期,故其内部运⾏的容器及其相关数据⾃⾝均⽆法持久存在。Kubernetes也⽀持类似Docker的存储卷功能,不过,其存储卷Volume是与Pod资源绑定⽽⾮容器。

Pod Volume

如何要在一个Pod里声明 Volume

  1. ⼀是通过.spec.volumes字段定义在Pod之上的存储卷列表,其⽀持使⽤多种不同类型的存储卷且配置参数差别很⼤;
spec:

volumes:
* name: logdata
emptyDir: {}
* name: example
gitRepo:
repository: https://github.com/iKubernetes/k8s_book.git
revision: master
directory: .
  1. 另⼀个是通过.spec.containers.volumeMounts字段在容器上定义的存储卷挂载列表,它只能挂载当前Pod资源中定义的具体存储卷,当然,也可以不挂载任何存储卷
spec:

containers:
* name: <String>

volumeMounts:
* name <string> -required-
mountPath <string> -required-

在之前(声明式对象配置)有介绍过nginx的部署,接着来部署Redis,和Nginx有所不同的是,这里多了一个 ConfigMap 和Volume ,用来配置管理redis和储存。

环境

  • 一个k8s 集群(mater* 1,node* 1)
  • 一个正常连接k8smater的主机的终端

创建 Config-Map

使用 ConfigMap 来配置 Redis ,包含了Redis配置文件里需要的配置项,在创建Pod时会作为配置文件挂载到应用所在的容器中。 my-config-map.yaml 具体如下:

apiVersion: v1
kind: ConfigMap
metadata:
  name: my-redis-config
  namespace: my-ns
data:
  redis.conf: |
    requirepass 123456
    protected-mode no
    port 6379
    tcp-backlog 511
    timeout 0
    tcp-keepalive 300
    daemonize no
    supervised no
    pidfile /var/run/redis_6379.pid
    loglevel notice
    logfile ""
    databases 16
    always-show-logo yes
    save 900 1
    save 300 10
    save 60 10000
    stop-writes-on-bgsave-error yes
    rdbcompression yes
    rdbchecksum yes
    dbfilename dump.rdb
    dir /data
    slave-serve-stale-data yes
    slave-read-only yes
    repl-diskless-sync no
    repl-diskless-sync-delay 5
    repl-disable-tcp-nodelay no
    slave-priority 100
    lazyfree-lazy-eviction no
    lazyfree-lazy-expire no
    lazyfree-lazy-server-del no
    slave-lazy-flush no
    appendonly no
    appendfilename "appendonly.aof"
    appendfsync everysec
    no-appendfsync-on-rewrite no
    auto-aof-rewrite-percentage 100
    auto-aof-rewrite-min-size 64mb
    aof-load-truncated yes
    aof-use-rdb-preamble no
    lua-time-limit 5000
    slowlog-log-slower-than 10000
    slowlog-max-len 128
    latency-monitor-threshold 0
    notify-keyspace-events Ex
    hash-max-ziplist-entries 512
    hash-max-ziplist-value 64
    list-max-ziplist-size -2
    list-compress-depth 0
    set-max-intset-entries 512
    zset-max-ziplist-entries 128
    zset-max-ziplist-value 64
    hll-sparse-max-bytes 3000
    activerehashing yes
    client-output-buffer-limit normal 0 0 0
    client-output-buffer-limit slave 256mb 64mb 60
    client-output-buffer-limit pubsub 32mb 8mb 60
    hz 10
    aof-rewrite-incremental-fsync yes    

执行命令

# kubectl apply -f my-redis-config.yaml

创建 Deployment

创建Deployment 作为调度Pod运行 Redis 的载体。my-redis-deployment.yaml具体如下

apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-redis # Unique name for the deployment
  namespace: my-ns
  labels:
    app: my-redis       # Labels to be applied to this deployment
spec:
  selector:
    matchLabels:     # This deployment applies to the Pods matching these labels
      app: my-redis
      role: master
      tier: backend
  replicas: 1        # Run a single pod in the deployment
  template:          # Template for the pods that will be created by this deployment
    metadata:
      labels:        # Labels to be applied to the Pods in this deployment
        app: my-redis
        role: master
        tier: backend
    spec:            # Spec for the container which will be run inside the Pod.
      containers:
        - name: my-redis
          image: redis
          resources:
            requests:
              cpu: 100m
              memory: 100Mi
#          command: ["redis-server","/etc/redis/redis.conf"]
          command:
            - redis-server
            - /etc/redis/redis.conf
          ports:
            - containerPort: 6379
          volumeMounts:
            - name: my-redis-config
              mountPath: /etc/redis/redis.conf
              subPath: redis.conf
            - name: my-redis-storage
              mountPath: /data
      volumes:
        - name: my-redis-storage
          emptyDir: {}
        - name: my-redis-config
          configMap:
            name: my-redis-config
            items:
              - key: redis.conf
                path: redis.conf

执行

# kubectl apply -f my-redis-deployment.yaml

创建 service

NodePort 方式向外暴露服务。my-redis-service.yaml 具体如下

apiVersion: v1
kind: Service        # Type of Kubernetes resource
metadata:
  name: my-redis-svc # Name of the Kubernetes resource
  namespace: my-ns
  labels:            # Labels that will be applied to this resource
    app: my-redis
    role: master
    tier: backend
spec:
  type: NodePort
  ports:
    - port: 6379       # Map incoming connections on port 6379 to the target port 6379 of the Pod
      targetPort: 6379
      nodePort: 30379
  selector:          # Map any Pod with the specified labels to this service
    app: my-redis
    role: master
    tier: backend

执行

# kubectl apply -f my-redis-service.yaml

测试

redis-client 测试NodeId:NodePort

redis-cli -h YourNodeIp -p 30379 -a 123456
YourNodeIp:30379> info
# Server
redis_version:7.0.4
...

连接成功。

参考

场景

在项目开发中,需要用到缓存和对一个列表数据分页查询,但由于redis是key-value的存储方式,我们期望的使用类似postgresql的offset和limit,不至于需要一个个key遍历过去。

设计

分析一下我们需求,那么需求需要实现的接口大概是:

FindByPage(ctx context.Context, page, size int) ([]Object, error)

大致我们需要解决两个问题:

  • 存储对象
  • 列表分页快速查找

对象存储

Redis hash 是一个 string 类型的 field(字段) 和 value(值) 的映射表,hash 特别适合用于存储对象。

  • HDEL key field1 [field2]删除一个或多个哈希表字段
  • HEXISTS key field查看哈希表 key 中,指定的字段是否存在。
  • HGET key field获取存储在哈希表中指定字段的值。
  • HGETALL key获取在哈希表中指定 key 的所有字段和值

如此,我们可以 redis hash 和 json.Encoding 来存储。 获得一个Object

//【ObjectID】= "{"foo": "123", "bar":"456"}"

import  "github.com/go-redis/redis/v8"

result, err := rdb.HGet(ctx, "key", id.String()).Result()
if err != nil {
     return
}
var obj = &YourObject{}
err = json.Unmarshal([]byte(result), obj)
if err != nil {
     return
}

增加一个Object:

ob, err := json.Marshal(Object{}) 
	err = repo.data.rdb.HSet(ctx, "key", ob.ID, ob).Err()
	if err != nil {
		return err
	}

分页

我们的需求中,ObjectID是一个唯一的int,那么可以使用 zset

ZXyy redis 有序集合的基本命令。

  • ZADD key score1 member1 [score2 member2]向有序集合添加一个或多个成员,或者更新已存在成员的分数
  • ZCARD key获取有序集合的成员数
  • ZRANGE key start stop [WITHSCORES]通过索引区间返回有序集合指定区间内的成员

那么可以把 zset 的key 和 value 都设置为 ObjectID 增加一个Object的代码:

if err := rdb.ZAdd(ctx, "key", &redis.Z{
     Score:  float64(Object.ID),
     Member: strconv.Itoa(int(Object.ID)),
}).Err(); err != nil {
     return err
}

获取X分页的代码:

var (
     start = int64((page - 1) * size)
     end   = start + int64(size)
)
result, err := rdb.ZRange(ctx, "key", start, end).Result()
if err != nil {
     return
}

测试

我们增加N个Object:

[1 a]
[2 b]
[3 c]
[11 aa]
[12 bb]
[13 cc]
[21 aaaa]

每页的长度为4,获取第一页

[1 2 3 11]

第二页:

[12 13 21]

第三页:

[ ]

参考

jefffff

Stay hungry. Stay Foolish COOL

Go backend developer

China Amoy