前言

游戏排行榜是一个常见需求,今天主要介绍go语言使用redis-sort-sets来实现排行榜常见功能。

Redis Sort Sets

官方介绍:

Redis排序集是按相关分数排序的惟一字符串(成员)的集合。当多个字符串具有相同的分数时,字符串将按字典顺序排列。排序集的一些用例包括:

游戏排行榜。例如,您可以使用排序集轻松地维护大型在线游戏中最高分数的有序列表。 速率限制器。特别是,您可以使用一个排序集来构建滑动窗口速率限制器,以防止过多的API请求。

常用于排行榜的命令:

  • ZRANGE 返回排序集的成员(升序)
  • ZREVANGE 返回排序集的成员(降序序)
  • ZADD 向排序集添加一个新成员和相关分数。如果成员已经存在,则更新评分。
  • ZREM 删除排序集一个成员
  • ZRANK 返回提供的成员的排名(升序)
  • ZREVRANK 返回提供的成员的排名(降序)

Go Redis

Go Redis为各种风格的Redis提供Go客户端,基础的使用例子如下:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v9"
)

func main() {
	ctx := context.Background()
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379", // use default Addr
		Password: "",               // no password set
		DB:       0,                // use default DB
	})
	pong, err := rdb.Ping(ctx).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println(pong, err)
	if err != nil {
		fmt.Println(err)
		return
	}
    defer rdb.Close()
}

Examples

这里我们用go-redis实现排行榜常见的功能,包括:

  • 获取排行榜成员和分数 (升序、降序)
  • 加入排行榜成员
  • 删除排行榜成员
  • 更新排行榜成员分数
  • 获取排行榜成员分数
  • 获取排行榜名次成员
  • 清空排行榜

完整代码如下:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v9"
	"math/rand"
	"strconv"
)

func main() {
	ctx := context.Background()
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379", // use default Addr
		Password: "",               // no password set
		DB:       0,                // use default DB
	}) 
	var err error
	defer rdb.Close()
	//增加玩家分数的变化
	for i := 1; i <= 5; i++ {
		err = rdb.ZAdd(ctx, "leaderboard", redis.Z{
			Score:  float64(rand.Intn(100)),
			Member: "user:" + strconv.Itoa(i),
		}).Err()
		if err != nil {
			fmt.Println(err)
			return
		}
	}
	if err != nil {
		fmt.Println(err)
		return
	}
	//查询排行榜成员-升序
	members, err := rdb.ZRange(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜成员-升序", members)
	//查询排行榜成员和分数-升序
	membersWithScore, err := rdb.ZRevRangeWithScores(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜成员和分数-升序", membersWithScore)

	//查询排行榜成员-降序
	membersRev, err := rdb.ZRevRange(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜成员-降序", membersRev)
	//查询排行榜成员和分数-升序
	membersRevWithScore, err := rdb.ZRangeWithScores(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜成员和分数-升序", membersRevWithScore)

	//获取某玩家的分数
	user2Score, err := rdb.ZScore(ctx, "leaderboard", "user:2").Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("获取某玩家 user:2 的分数", user2Score)
	//更新某玩家的分数
	err = rdb.ZAdd(ctx, "leaderboard", redis.Z{
		Score:  user2Score,
		Member: "user:1",
	}).Err()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("更新某玩家 user:1 的分数", user2Score)
	//查询排行榜成员和分数-升序
	membersRevWithScore, err = rdb.ZRangeWithScores(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜变化后的排行榜成员和分数-升序", membersRevWithScore)

	//查询排行榜某玩家的分数-升序
	memberRank, err := rdb.ZRank(ctx, "leaderboard", "user:2").Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜某玩家  user:2 的排名", memberRank)

	//查询排行榜某玩家的分数-降序
	memberRevRank, err := rdb.ZRevRank(ctx, "leaderboard", "user:2").Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("查询排行榜某玩家 user:2 的排名-降序", memberRevRank)

	//删除排序集一个成员
	zrem, err := rdb.ZRem(ctx, "leaderboard", "user:1").Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("删除排序集一个成员", zrem)

	//删除后查询排行榜成员-升序
	members, err = rdb.ZRange(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("删除后查询排行榜成员-升序", members)

	//清空排行榜
	clear, err := rdb.ZRemRangeByRank(ctx, "leaderboard", 0, 4).Result()
	if err != nil {
		fmt.Println("err zrem", err)
		return
	}
	fmt.Println("清空排行榜", clear)
	//清空后查询排行榜成员-升序
	members, err = rdb.ZRange(ctx, "leaderboard", 0, 5).Result()
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("清空后查询排行榜成员-升序", members) 

OutPut:

查询排行榜成员-升序 [user:3 user:4 user:1 user:5 user:2]
查询排行榜成员和分数-升序 [{87 user:2} {81 user:5} {81 user:1} {59 user:4} {47 user:3}]
查询排行榜成员-降序 [user:2 user:5 user:1 user:4 user:3]
查询排行榜成员和分数-升序 [{47 user:3} {59 user:4} {81 user:1} {81 user:5} {87 user:2}]
获取某玩家 user:2 的分数 87
更新某玩家 user:1 的分数 87
查询排行榜变化后的排行榜成员和分数-升序 [{47 user:3} {59 user:4} {81 user:5} {87 user:1} {87 user:2}]
查询排行榜某玩家  user:2 的排名 4
查询排行榜某玩家 user:2 的排名-降序 0
删除排序集一个成员 1
删除后查询排行榜成员-升序 [user:3 user:4 user:5 user:2]
清空排行榜 4
清空后查询排行榜成员-升序 []

参考

go语言提供非常方便的创建轻量级的协程goroutine来并发处理任务,但是 协程过多影响程序性能,所以,这时goroutine池就登场了。本文简要介绍ants goroutine 池的基本的使用方法。

简介

ants是一个高性能的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。 ants就是一个很多大厂广泛使用的goroute池。

官网地址

https://github.com/panjf2000/ants

功能

  • 自动调度海量的 goroutines,复用 goroutines
  • 定期清理过期的 goroutines,进一步节省资源
  • 提供了大量有用的接口:任务提交、获取运行中的 goroutine 数量、动态调整 Pool 大小、释放 Pool、重启 Pool
  • 优雅处理 panic,防止程序崩溃
  • 资源复用,极大节省内存使用量;在大规模批量并发任务场景下比原生 goroutine 并发具有更高的性能
  • 非阻塞机制

quick start

使用 ants v1 版本:

go get -u github.com/panjf2000/ants

使用 ants v2 版本 (开启 GO111MODULE=on):

go get -u github.com/panjf2000/ants/v2

接下来看一下官方demo:实现一个计算大量整数和的程序

NewPool

NewPool生成ants池实例。

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/panjf2000/ants/v2"
)

var sum int32

func demoFunc() {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("Hello World!")
}

func main() {
	defer ants.Release()

	runTimes := 1000

	//使用公共池。
	var wg sync.WaitGroup
	syncCalculateSum := func() {
		demoFunc()
		wg.Done()
	}
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = ants.Submit(syncCalculateSum)
	}
	wg.Wait()
	fmt.Printf("running goroutines: %d\n", ants.Running())
    fmt.Printf("finish all tasks.\n")
 }

其中:

生成一个具有特定函数的ants池实例

  • NewPool(size int, options …Option) (*PoolWithFunc, error)

args.size 即池容量,即池中最多有 10 个 goroutine。 arg.Option 定制化 goroutine pool.

  • p.Submit向此池提交任务。
  • ants.Release关闭此池并释放工作队列。
  • defaultAntsPool 导入ants时初始化实例池。

NewPoolWithFunc

package main

import (
	"fmt"
	"github.com/panjf2000/ants/v2"
	"sync"
	"sync/atomic"
)

var sum int32

func myFunc(i interface{}) {
	n := i.(int32)
	atomic.AddInt32(&sum, n)
	fmt.Printf("run with %d\n", n)
}

func main() {
	defer ants.Release()
	runTimes := 1000
	var wg sync.WaitGroup
	//使用带有函数的池
//设置goroutine池的容量为10,过期时间为1秒。
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	})
	defer p.Release()
	//逐个提交任务。
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = p.Invoke(int32(i))
	}
	wg.Wait()
	fmt.Printf("running goroutines: %d\n", p.Running())
	fmt.Printf("finish all tasks, result is %d\n", sum)
}

OutPut:

run with 1
run with 5
run with 11
run with 12
run with 13
run with 14
run with 7
...
run with 789
run with 809
running goroutines: 10
finish all tasks, result is 499500

其中:

生成一个具有特定函数的ants池实例

  • NewPoolWithFunc(size int, pf func(interface{}), options …Option) (*PoolWithFunc, error)

args.pf 即为执行任务的函数

优雅处理 panic

测试一些当任务触发panic的情况

func myFunc(i interface{}) {
	n := i.(int32)
	atomic.AddInt32(&sum, n)
	if n%2 == 0 {
		panic(any(fmt.Sprintf("panic from task:%d", n)))
	}
	fmt.Printf("run with %d\n", n)
}

output:

run with 3
run with 13
...
run with 999
2022/10/21 21:41:05 worker with func exits from a panic: panic from task:6
2022/10/21 21:41:05 worker with func exits from a panic: panic from task:0
2022/10/21 21:41:07 worker with func exits from panic: goroutine 14 [running]:

可以看到,main routine 没有因此受影响。

Options

// Options包含实例化ants池时将应用的所有选项。
type Options struct {
	// ExpiryDuration is a period for the scavenger goroutine to clean up those expired workers,
	// the scavenger scans all workers every `ExpiryDuration` and clean up those workers that haven't been
	// used for more than `ExpiryDuration`.
	ExpiryDuration time.Duration

	// PreAlloc indicates whether to make memory pre-allocation when initializing Pool.
	PreAlloc bool

	// Max number of goroutine blocking on pool.Submit.
	// 0 (default value) means no such limit.
	MaxBlockingTasks int

	// When Nonblocking is true, Pool.Submit will never be blocked.
	// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
	// When Nonblocking is true, MaxBlockingTasks is inoperative.
	Nonblocking bool

	// PanicHandler is used to handle panics from each worker goroutine.
	// if nil, panics will be thrown out again from worker goroutines.
	PanicHandler func(interface{})

	// Logger is the customized logger for logging info, if it is not set,
	// default standard logger from log package is used.
	Logger Logger
}

比如 PanicHandler 遇到 panic会调用这里设置的处理函数,以上例中,我们遇到偶数会触发panic,修改NewPool函数

p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	}, ants.WithPanicHandler(func(i interface{}) {
		fmt.Printf("panic recover %v", i)
	}))

out:

panic recover panic from i:992run with 880
panic recover panic from i:880run with 972
panic recover panic from i:972run with 994
run with 991

还有更多关于 Benchmarks 性能报告,可参考https://github.com/panjf2000/ants

参考:

前后端数据常用传输格式有:json、xml 和 proto等,不管是mvc还是ddd,都会在表现层的对不同的格式进行转化下层依赖所需的类、Object、 aggregate等。

Kratos 表现层

在实际开发场景中,前后端传输采用json。 但kratos使用proto定义Api, 那么我们以一个Http请求为例,来看看kratos的表现层是如何处理的。

以下是一个Kratos-Example,由编写好的proto文件,proto-go 等自动生成表现层的代码。 大概步骤:

  1. 编写proto,包括Service,Req,Reply
  2. 生成表现层代码
  3. 实现下层逻辑等

Example完整代码如下

func _Greeter_SayHello0_HTTP_Handler(srv GreeterHTTPServer) func(ctx http.Context) error {
	return func(ctx http.Context) error {
		var in HelloRequest
		if err := ctx.BindQuery(&in); err != nil {
			return err
		}
		if err := ctx.BindVars(&in); err != nil {
			return err
		}
		http.SetOperation(ctx, "/helloworld.v1.Greeter/SayHello")
		h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
			return srv.SayHello(ctx, req.(*HelloRequest))
		})
		out, err := h(ctx, &in)
		if err != nil {
			return err
		}
		reply := out.(*HelloReply)
		return ctx.Result(200, reply)
	}
}

可以看到:

  1. 这里对具体的格式无感,输入In还是输出Out都是一个proto生成的类。
  2. 具体的实现交给了ctx 上下文去实现 查看 ctx.Result的源码:
func (c *wrapper) Result(code int, v interface{}) error {
	c.w.WriteHeader(code)
	return c.router.srv.enc(&c.w, c.req, v)
}

这里的enc,没有特殊设置,使用默认的DefaultResponseEncoder,部分源码如下

func DefaultResponseEncoder(w http.ResponseWriter, r *http.Request, v interface{}) error {
	...
	codec, _ := CodecForRequest(r, "Accept")
	data, err := codec.Marshal(v)
	if err != nil {
		return err
	}
	w.Header().Set("Content-Type", httputil.ContentType(codec.Name()))
	_, err = w.Write(data)
    ...
}

通过codc接口隔离具体实现,调用接口的 codec.Marshal序列化。

codec包

Codec定义Transport用于编码和解码消息的接口。它的实现包括 json,xml,proto,yaml等

type Codec interface {
	// Marshal returns the wire format of v.
	Marshal(v interface{}) ([]byte, error)
	// Unmarshal parses the wire format into v.
	Unmarshal(data []byte, v interface{}) error
	// Name returns the name of the Codec implementation. The returned string
	// will be used as part of content type in transmission.  The result must be
	// static; the result cannot change between calls.
	Name() string
}

这里我们聚焦正在使用的Json

type codec struct{}
func (codec) Marshal(v interface{}) ([]byte, error) {
	switch m := v.(type) {
	case json.Marshaler:
		return m.MarshalJSON()
	case proto.Message:
		return MarshalOptions.Marshal(m)
	default:
		return json.Marshal(m)
	}
}

这里Marshal()使用第三方库““google.golang.org/protobuf/encoding/protojson”

实现proto转json。

protojson第三方包

protojson是Google提供的proto和json的转化

package api_test

import (
	"google.golang.org/protobuf/encoding/protojson"
	"testing"

	v1 "helloworld/api/helloworld/v1"
)

func TestToJson(t *testing.T) {
	reply := &v1.HelloReply{
		Message: "Jobs",
	}
	replyJson, err := MarshalOptions.Marshal(reply.ProtoReflect().Interface())
	if err != nil {
		t.Errorf("Marshal Error: %v", err)
	}
	t.Logf("replyJson:  %v", string(replyJson))
}

更多的接口请参考“https://google.golang.org/protobuf/encoding/protojson”

小结

本文主要以kratos的表现层的组织方式,来介绍常用的框架表现层处理方式。 其中:

  • kratos通过proto来生成表现层代码的类,包括http和grapc,内部对具体实现无感。
  • kratos通过解码器codec实现不同传输格式的解耦。
  • 第三方包protojson实现proto和json转换。

框架的主要功能之一就是标准化处理一下公共的问题场景,从源码中可以学习:通过接口隔离具体的实现,而使框架与具体实现解耦,且具备更好的拓展性。

参考

本文主要介绍go如何读取和写入csv文件,以及使用第三方库gocsv转换为Struct。

读取csv

go 标准库 “encoding/csv” 用于读写csv文件。

Reader从csv编码的文件中读取记录

  • type Reader
    • func NewReader(r io.Reader) *Reader
    • func (r *Reader) Read() (record []string, err error) 。
    • func (r *Reader) ReadAll() (records [][]string, err error)

创建一个csv文件servers.csv,具体如下

world-svc,1/1,Running,0,44m
battle-svc,1/1,Running,0,7d

ReadAll从r中读取所有剩余的记录,每个记录都是字段的切片,成功的调用返回值err为nil而不是EOF。因为ReadAll方法定义为读取直到文件结尾,因此它不会将文件结尾视为应该报告的错误。

package main

import (
	"encoding/csv"
	"fmt"
	"io"
	"os"
)
func main() {
	readeCsvAll() 
}
func readerCsvReadAll() {
	file, err := os.Open("servers.csv")
	defer file.Close()
	if err != nil {
		fmt.Println(err)
	}
	reader := csv.NewReader(file)
	servers, _ := reader.ReadAll()
	fmt.Println("ReadAll:", servers)
}

OutPut:

ReadAll:  [world-svc 1/1 Running 0 44m] [battle-svc 1/1 Running 0 7d]]

Read从r读取一条记录,返回值record是字符串的切片,每个字符串代表一个字段。

reader := csv.NewReader(file)
for {
    servers, err := reader.Read()
    if err == io.EOF {
        break
    } else if err != nil {
        fmt.Println("Error:", err)
        return
    }
    fmt.Println(servers)
}

输出和ReadAll是一样的。

csv读写逗号分隔值(csv)的文件,亦或可以定义数据的分隔符

reader.Comma = '|' 

亦或可以定义忽略的的行

reader.Comment = '#'

写入csv

Writer类型的值将记录写入一个csv编码的文件

  • type Writer
    • func NewWriter(w io.Writer) *Writer
    • func (w *Writer) Write(record []string) (err error)
    • func (w *Writer) WriteAll(records [][]string) (err error)
    • func (w *Writer) Flush()
    • func (w *Writer) Error() error

向w中写入一条记录,会自行添加必需的引号。记录是字符串切片,每个字符串代表一个字段。

func writerOne() {
	servers := []Server{
		{"world-svc", "1/1", "Running", 0, "44m"},
		{"battle-svc", "1/1", "Running", 0, "7d"},
	}
	file, err := os.Create("serversA.csv")
	defer file.Close()
	if err != nil {
		log.Fatalln("failed to open file", err)
	}
	w := csv.NewWriter(file)
   //将缓存中的数据写入底层的io.Writer。
	defer w.Flush()
	// 使用 Write
	for _, server := range servers {
		row := []string{server.Name, server.Ready, server.State, strconv.Itoa(server.Restarts), server.Age}
		if err := w.Write(row); err != nil {
			log.Fatalln("error writing server to file", err)
		}
	}

执行,在当前目录生成 serversA.csv,内容如下:

world-svc,1/1,Running,0,44m
battle-svc,1/1,Running,0,7d

WriteAll方法使用Write方法向w写入多条记录,并在最后调用Flush方法清空缓存

	var data [][]string
	for _, server := range servers {
		row := []string{server.Name, server.Ready, server.State, strconv.Itoa(server.Restarts), server.Age}
		data = append(data, row)
	}
	w.WriteAll(data)

To Struct

GoCSV包旨在提供 CSV 和 Go (golang) 值之间的快速和惯用的映射。 已有servers.csv 内容如下

Name,Ready,Status,Restart,Age
world-svc,1/1,Running,0,44m
battle-svc,1/1,Running,0,7d

UnmarshalFile(in *os.File, out interface{}) error UnmarshalFile从接口中的文件解析CSV。

func ToStruct() {
	clientsFile, err := os.OpenFile("servers.csv", os.O_RDWR|os.O_CREATE, os.ModePerm)
	if err != nil {
		panic(err)
	}
	defer clientsFile.Close()

	var servers []*Server
    //转换成 Server Object
	if err := gocsv.UnmarshalFile(clientsFile, &servers); err != nil {  
		panic(err)
	}
	for _, server := range servers {
		fmt.Println("Server", server.Name)
	}
}

OuPut:

Server world-svc
Server battle-svc

更多api使用方式可参考:https ://github.com/gocarina/gocsv

参考

原子操作

原子操作即执行过程不能被中断的操作。在针对某个值的原子操作执行过程当值,cpu绝不会再去执行其他针对该值的操作,无论这些其他操作是否为原子操作。

go-atomic

查看Mutex、RWMutex的源码,底层是通过atomic包中的一些原子操作来实现。

Go标准库 sync/atomic 提供了对基础类型 int32、int64、uint32、uint64、uintptr、Pointer(Add 方法不支持) 的原子级内存操作。其中包括:

  • Add (给第一个参数地址中的值增加一个 delta 值)
  • CompareAndSwap(判断相等即替换))
  • Swap(不需要比较旧值,直接替换,返回旧值)
  • Load(方法会取出 addr 地址中的值)
  • Store( 方法会把一个值存入到指定的 addr 地址中)

我们通过一个例子可以快速了解atomic封装的这些api

package main

import (
	"fmt"
	"sync/atomic"
)

func main() {
	var x int32 = 0

	//func AddInt32(addr *int32, delta int32) (new int32)
	y := atomic.AddInt32(&x, int32(1))
	fmt.Printf("x=%d,y=%d \n", x, y)
	//OutPut: x=1,y=1

	// func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
	// addr 地址里的值是不是 old,如果不等于 old,就返回 false;如果等于 old,就把此地址的值替换成 new 值,返回 true
	isCompare := atomic.CompareAndSwapInt32(&x, int32(0), int32(2))
	fmt.Printf("isCompare=%v,x=%d  \n", isCompare, x)
	//OutPut: isCompare=false,x=1
	//不相等,故x还是1
	isCompare2 := atomic.CompareAndSwapInt32(&x, int32(1), int32(2))
	fmt.Printf("isCompare=%v,x=%d  \n", isCompare2, x)
	//OutPut: isCompare=true,x=2
	//相等,故x还是2

	//func SwapInt32(addr *int32, new int32) (old int32)
	xOld := atomic.SwapInt32(&x, int32(3))
	fmt.Printf("xOld=%d,x=%d  \n", xOld, x)
	//OutPut: xOld=2,x=3
	//不比较,故x替换3

	//func LoadInt32(addr *int32) (val int32)
	vValue := atomic.LoadInt32(&x)
	fmt.Printf("vValue=%d \n", vValue)
	//OutPut: xOld=2,x=3
	//获取x的值3

	//func StoreInt32(addr *int32, val int32)
	atomic.StoreInt32(&x, 8)
	vValue2 := atomic.LoadInt32(&x)
	fmt.Printf("vValue2=%d \n", vValue2)
	//OutPut:vValue2=8 
}

小试牛刀

atomic 比较常见的类型 还提供了一个特殊的类型:Value,但是只支持 load、store。 这里模拟一个场景:当配置变更后,期待其他goroutine可以收到通知和变更。

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

type Config struct {
	Network      string
	Addr         string
	ReadTimeout  int32
	WriteTimeout int32
}

func loadConfig() Config {
	return Config{
		Network:      "redis",
		Addr:         "127.0.0.1:6379",
		ReadTimeout:  60,
		WriteTimeout: 60,
	}
}

var (
	done   bool
	config atomic.Value
)

func main() {
	config.Store(loadConfig())
	var cond = sync.NewCond(&sync.Mutex{})
	go waitForLoad(1, cond)
	go waitForLoad(2, cond)
	go beginLoad(cond)
	select {}
}

func beginLoad(cond *sync.Cond) {
	for {
		time.Sleep(3 * time.Second)
		config.Store(loadConfig())
		cond.Broadcast()
	}
}

func waitForLoad(node int, cond *sync.Cond) {
	cond.L.Lock()
	for {
		if !done {
			cond.Wait()
		}
		c := config.Load().(Config)
		fmt.Printf("node: %d - redis config: %+v\n", node, c)

	}
	cond.L.Unlock()
}

OutPut:

node: 2 - redis config: {Network:redis Addr:127.0.0.1:6379 ReadTimeout:60 WriteTimeout:60}
node: 1 - redis config: {Network:redis Addr:127.0.0.1:6379 ReadTimeout:60 WriteTimeout:60}

uber-go/atomic

uber-go/atomic对标准库进行进一步封装,采用面向对象的使用方式。 这些类型包括 Bool、Duration、Error、Float64、Int32、Int64、String、Uint32、Uint64 等 举个例子uint32的减法你可能是这么写的

atomic.AddUint32(&x, ^(delta - 1))

利用计算机补码的规则,把减法变成加法 。uber-go对它进行了封装

var atom atomic.Uint32
atom.Store(10)
atom.Sub(2)
atom.CAS(20, 1)

小结

本文简要介绍原子操作和go-atomic,使用场景,以及第三库uber-go/atomic。

参考

本文简要介绍golang 使用crontab实现定时任务。

linux crontab

Linux crontab 是用来定期执行程序的命令。当安装完成操作系统之后,默认便会启动此任务调度命令

命令语法:

crontab [ -u user ] file

crontab [ -u user ] { -l | -r | -e }

*    *    *    *    *
-    -    -    -    -
|    |    |    |    |
|    |    |    |    +----- 星期中星期几 (0 - 6) (星期天 为0)
|    |    |    +---------- 月份 (1 - 12) 
|    |    +--------------- 一个月中的第几天 (1 - 31)
|    +-------------------- 小时 (0 - 23)
+------------------------- 分钟 (0 - 59)

go cron

第三方包“github.com/robfig/cron”来创建 crontab,以实现定时任务

package main

import (
	"fmt"
	"github.com/robfig/cron"
)

func main() {
	var (
		cronS = cron.New()
		spec  = "*/2 * * * * "
		count = 0
	)

	entityID, err := cronS.AddFunc(spec, func() {
		count++
		fmt.Println("count: ", count,"now:", time.Now().Unix())
	})
	if err != nil {
		fmt.Errorf("error : %v", err)
		return
	}
	cronS.Start()

	fmt.Println(entityID)
	defer cronS.Stop()
	select {}
}

go run main.go

输出:

count:  1 now: 1658053860
count:  2 now: 1658053920
count:  3 now: 1658053980
count:  4 now: 1658054040
count:  5 now: 1658054100

可以看到每隔1分钟,执行一次Func,count++

默认情况下标准 cron 规范解析(第一个字段是“分钟”) 可以轻松选择进入秒字段。

cronS = cron.New(cron.WithSeconds())
//注意这里多了一个参数
spec  = "*/2 * * * * * "

执行输出

count:  1 now: 1658053640
count:  2 now: 1658053642
count:  3 now: 1658053644
count:  4 now: 1658053646
count:  5 now: 1658053648 

可以看到每隔两秒执行一次

上一篇(传送门)介绍了测试平台 locust + boomer 的环境搭建,以及运行http压测用例,观测性能指数、图表。这篇接上篇,继续讲go boomer如何实现。

setup

Install the master branch

$ go get github.com/myzhan/boomer

Install a tagged version that works with locust 1.6.0

$ go get github.com/myzhan/boomer@v1.6.0

install gomq

$ go get -u github.com/zeromq/gomq

quick start

run master

创建python文件 workspace/dummy.py

from locust import Locust, TaskSet, task
class MyTaskSet(TaskSet):
    @task(20)
    def hello(self):
        pass
class Dummy(Locust):
    task_set = MyTaskSet

运行:

$ locust –master -f dummy.py output:

$locust.main: Starting web interface at http://0.0.0.0:8089 (accepting connections from all network interfaces)
$locust.main: Starting Locust 2.9.1.dev23
run slave

创建go文件 workspace/main.go

package main

import(
	"fmt"
	"io/ioutil"
	"net/http"
	"time"
	"github.com/myzhan/boomer"
)

func helloTask() {
	start := time.Now()
	err := HttpGet("hello")
	elapsed := time.Since(start)
	if err != nil {
		boomer.RecordFailure("http", "world", elapsed.Nanoseconds()/int64(time.Millisecond), err.Error())
		return
	}
    
/*    Report your test result as a success, if you write it in locust, it will looks like this    events.request_success.fire(request_type="http", name="world", response_time=100, response_length=10)    */
	boomer.RecordSuccess("http", "world", elapsed.Nanoseconds()/int64(time.Millisecond), int64(10))
}

func worldTask() {
	start := time.Now()
	err := HttpGet("world")
	elapsed := time.Since(start)
	if err != nil {
		boomer.RecordFailure("udp", "world", elapsed.Nanoseconds()/int64(time.Millisecond), err.Error())
		return
	} 
/*  Report your test result as a failure, if you write it in locust, it will looks like this    events.request_failure.fire(request_type="udp", name="hello", response_time=100, exception=Exception("udp error"))    */
	boomer.RecordSuccess("udp", "world", elapsed.Nanoseconds()/int64(time.Millisecond), int64(10))
}

func main() {
	task1 := &boomer.Task{
		// 同时跑多个 tasks 的时候,Weight 字段用于分配 goroutines
		Weight: 10,
		Fn:     helloTask,
	}

	task2 := &boomer.Task{
		Weight: 10,
		Fn:     worldTask,
	}

	// 连接到 master,等待页面上下发指令,支持多个 Task
	boomer.Run(task1, task2)
}


func HttpGet(path string) error {
	url := fmt.Sprintf("http://localhost:8090/%s", path)
	method := "GET"

	client := &http.Client{}
	req, err := http.NewRequest(method, url, nil)

	if err != nil {
		fmt.Println(err)
		return err
	}
	res, err := client.Do(req)
	if err != nil {
		fmt.Println(err)
		return err
	}
	defer res.Body.Close()

	body, err := ioutil.ReadAll(res.Body)
	if err != nil {
		fmt.Println(err)
		return err
	}
	fmt.Println(string(body))
	return nil
}


go run main.go

output

$ Boomer is built with gomq support.
$ Boomer is connected to master(tcp://127.0.0.1:5557) press Ctrl+c to quit.

说明启动slave成功,查看是否连接上master

$ locust.runners: Client 'crazyMac.local_axxbyy123456' reported as ready. Currently 1 clients ready to swarm.

说明已经连接上master 。

testing

启动测试,output

succeed

小结

本文主要介绍了如何利用go boomer 实现locust的通讯协议,以及使用boomer实现一个上一篇的http压测例子。

reference

最近公司打算对后端服务进行压力测试,考虑后端的主要使用golang实现,因此作者准备使用 locust + boomer 实现一个性能测试平台,mark一下实现过程。

what is locust

Locust 是一种易于使用、可编写脚本且可扩展的性能测试工具。 您可以在常规 Python 代码中定义用户的行为,而不是停留在 UI 或限制性特定领域的语言中。

what is boomer

boomer 完整地实现了 locust 的通讯协议,运行在 slave 模式下,用 goroutine 来执行用户提供的测试函数,然后将测试结果上报给运行在 master 模式下的 locust。

与 locust 原生的实现相比,解决了两个问题。 一是单台施压机上,能充分利用多个 CPU 核心来施压, 二是再也不用提防阻塞 IO 操作导致 gevent 阻塞。

环境

  • 服务器
    • Ubuntu (2核4G300G)
  • 压测机
    • Mac
    • Python 版本 Python 3.10.2
    • Go 版本 go version go1.17.1 darwin/arm64

压测机

安装 locust

  1. 安装python3.7或者版本大于3.7 (mac 自带python2.X版本)
brew install python

查看安装版本

# python3 -V
Python 3.10.2
  1. Install Locust
# pip3 install locust
  1. 检查安装是否成功
# locust -V
locust 2.9.1.dev23

运行 locust: hello-world

要把大象放冰箱一共分三步:第一步打开冰箱–,不不不,第一步:先试试把小象(hello-world)看看能不能放的进去

在当前目录 workspace/ 底下创建 locustfile.py

from locust import HttpUser, task
class HelloWorldUser(HttpUser):
    @task
    def hello_world(self):
        self.client.get("/hello")
        self.client.get("/world")
启动 locust
 # locust
 locust 
$: Starting web interface at http://0.0.0.0:8089 (accepting connections from all network interfaces)
$: Starting Locust 2.9.1.dev23

访问 http://localhost:8089/ 可以看到

接着,这边使用golang启动一个http服务 localhost:80(path:/hello & /world)

locust - HelloWorld

进行一个简单测试 50 个并发用户,加速速度为 1个用户/秒,将其指向响应/hello和的服务器/world

点击 “start swarming”

切换标签页 “Charts” 可以查看:显示每秒请求数 (RPS)

查看:响应时间(以毫秒为单位)

查看: 用户数量

小结

本文主要介绍性能测试平台 locust + boomer 的环境搭建,以及运行http 测试用例helloworld,使用locust观测性能指数、图表等。

参考

场景

在项目开发中,需要用到缓存和对一个列表数据分页查询,但由于redis是key-value的存储方式,我们期望的使用类似postgresql的offset和limit,不至于需要一个个key遍历过去。

设计

分析一下我们需求,那么需求需要实现的接口大概是:

FindByPage(ctx context.Context, page, size int) ([]Object, error)

大致我们需要解决两个问题:

  • 存储对象
  • 列表分页快速查找

对象存储

Redis hash 是一个 string 类型的 field(字段) 和 value(值) 的映射表,hash 特别适合用于存储对象。

  • HDEL key field1 [field2]删除一个或多个哈希表字段
  • HEXISTS key field查看哈希表 key 中,指定的字段是否存在。
  • HGET key field获取存储在哈希表中指定字段的值。
  • HGETALL key获取在哈希表中指定 key 的所有字段和值

如此,我们可以 redis hash 和 json.Encoding 来存储。 获得一个Object

//【ObjectID】= "{"foo": "123", "bar":"456"}"

import  "github.com/go-redis/redis/v8"

result, err := rdb.HGet(ctx, "key", id.String()).Result()
if err != nil {
     return
}
var obj = &YourObject{}
err = json.Unmarshal([]byte(result), obj)
if err != nil {
     return
}

增加一个Object:

ob, err := json.Marshal(Object{}) 
	err = repo.data.rdb.HSet(ctx, "key", ob.ID, ob).Err()
	if err != nil {
		return err
	}

分页

我们的需求中,ObjectID是一个唯一的int,那么可以使用 zset

ZXyy redis 有序集合的基本命令。

  • ZADD key score1 member1 [score2 member2]向有序集合添加一个或多个成员,或者更新已存在成员的分数
  • ZCARD key获取有序集合的成员数
  • ZRANGE key start stop [WITHSCORES]通过索引区间返回有序集合指定区间内的成员

那么可以把 zset 的key 和 value 都设置为 ObjectID 增加一个Object的代码:

if err := rdb.ZAdd(ctx, "key", &redis.Z{
     Score:  float64(Object.ID),
     Member: strconv.Itoa(int(Object.ID)),
}).Err(); err != nil {
     return err
}

获取X分页的代码:

var (
     start = int64((page - 1) * size)
     end   = start + int64(size)
)
result, err := rdb.ZRange(ctx, "key", start, end).Result()
if err != nil {
     return
}

测试

我们增加N个Object:

[1 a]
[2 b]
[3 c]
[11 aa]
[12 bb]
[13 cc]
[21 aaaa]

每页的长度为4,获取第一页

[1 2 3 11]

第二页:

[12 13 21]

第三页:

[ ]

参考

本文主要记录 Android app 第三方(google和meta/facebook)登入的后端(go)验证。

android sdk 接入流程

可参考 https://juejin.cn/post/7094889100389384228

google

官网链接: (https://developers.google.com/identity/sign-in/web/backend-auth) 官网推荐两中验证方式 :

  1. 使用谷歌API客户端库,包括Java、Node.js、PHP、Python,是在生产环境中验证谷歌ID令牌的推荐方法。go客户端库:https://github.com/googleapis/google-api-go-client)
  2. 调用谷歌API(https://oauth2.googleapis.com/tokeninfo?id_token=XYZ123)

这里采用第一种,参考代码:

package main

import (
	"context"
	"fmt" 
	"google.golang.org/api/oauth2/v2"
	"google.golang.org/api/option"
	"net/http"
)

func main() {
   // 从客户端获取的谷歌token
	googleToken := `user token from client`
	oatuService, err := oauth2.NewService(context.Background(), option.WithHTTPClient(http.DefaultClient))
	if err != nil { 
		fmt.Println(err)
	}
	tokenInfoCall := oatuService.Tokeninfo()
	tokenInfoCall.IdToken(googleToken)
	tokenInfo, err := tokenInfoCall.Do() 
	if err != nil { 
		fmt.Println(err)
	}
	fmt.Println(tokenInfo)
	fmt.Printf("%v", tokenInfo)
    //可以拿客户端发送的id,和 tokenInfo.Id 做校验
}

meta(facebook) 登入

meta 的验证和 google 类型,调用验证api

官网链接:

  1. Android版本Facebook快速入门:https://developers.facebook.com/docs/facebook-login/android
  2. 后端验证 : (https://developers.facebook.com/docs/facebook-login/guides/%20access-tokens/get-session-info)

请求格式:

GET /debug_token?
  input_token={session-info-token}&
  access_token={your-access-token}

go-api : https://developers.facebook.com/docs/facebook-login/guides/%20access-tokens/get-session-info

package main

import (
	"fmt"
    "net/http"
	fb "github.com/huandu/facebook/v2"
)

func main() {
   //客户端传递过来的 token
	inputToken := "token from client"
	globalApp := fb.New("developer-app-client-id", "develop-app-client-secret")
    //生成 access_token
	token := globalApp.AppAccessToken()
	session := globalApp.Session(token)
	resp, err := session.Get("debug_token", fb.Params{
		"input_token": inputToken,
	})
	if err != nil {
		return
	}
}

参考

jefffff

Stay hungry. Stay Foolish COOL

Go backend developer

China Amoy