本文主要记录kratos项目配置的定义、读取和源码的学习

at first

服务在启动的基本都会用到配置文件,那么如果是你来写config工具库,它所能提供的功能是什么?

  1. 读取不同场景的配置
  2. 读取不同格式的配置
  3. 支持热更
  4. 支持拓展,提供自定义实现

kratos/config 有那些功能

  1. kratos/config包支持多种配置源,包括
    • 本地文件
    • 本地环境
    • contrib/config支持配置中心
  2. kratos/config 支持多种配置格式
    • json
    • proto
    • xml
    • yaml
  3. 支持热更
  4. 支持其它格式的配置文件

定义配置

在项目目录下创建文件: workspace/configs/config.yaml

service:
  name: config
  version: v1.0.0
http:
  server:
    address: 0.0.0.0:8000
    timeout: 1s
grpc:
  server:
    address: 0.0.0.0:9000
    timeout: 1s

加载配置文件

WithSource

使用withsoure指定数据源为本地文件

package main

import (
	"flag"
	"log"

	"github.com/go-kratos/kratos/v2/config"
	"github.com/go-kratos/kratos/v2/config/file"
)

var flagconf string

func init() {
	flag.StringVar(&flagconf, "conf", "config.yaml", "config path, eg: -conf config.yaml")
}

func main() {
	flag.Parse()
	c := config.New(
		config.WithSource(
			file.NewSource(flagconf),
		),
	)
	if err := c.Load(); err != nil {
		panic(err)
	}

	//定义配置JSON字段
	var v struct {
		Service struct {
			Name    string `json:"name"`
			Version string `json:"version"`
		} `json:"service"`
	}

	//将配置Unmarshal 到struct  
	if err := c.Scan(&v); err != nil {
		panic(err)
	}
	log.Printf("config: %+v", v)

	//获取与该键关联的值
	name, err := c.Value("service.name").String()
	if err != nil {
		panic(err)
	}
	log.Printf("service: %s", name)

	// 热更执行钩子
	if err := c.Watch("service.name", func(key string, value config.Value) {
		log.Printf("config changed: %s = %v\n", key, value)
	}); err != nil {
		panic(err)
	}

	<-make(chan struct{})
}

output:

$  config: {Service:{Name:config Version:v1.0.0}}
$  service: config

WithDecoder

Decoder用于将配置文件内容用特定的反序列化方法解析出来

c := config.New(
		config.WithSource(
			file.NewSource(flagconf),
		),
		config.WithDecoder(func(src *config.KeyValue, target map[string]interface{}) error {
			if src.Format == "" {
				// expand key "aaa.bbb" into map[aaa]map[bbb]interface{}
				keys := strings.Split(src.Key, ".")
				for i, k := range keys {
					if i == len(keys)-1 {
						target[k] = src.Value
					} else {
						sub := make(map[string]interface{})
						target[k] = sub
						target = sub
					}
				}
				return nil
			}
			if codec := encoding.GetCodec(src.Format); codec != nil {
				return codec.Unmarshal(src.Value, &target)
			}
			return fmt.Errorf("unsupported key: %s format: %s", src.Key, src.Format)
		}),
	)

output 和上例一致

原理

// Config is a config interface.
type Config interface {
	Load() error
	Scan(v interface{}) error
	Value(key string) Value
	Watch(key string, o Observer) error
	Close() error
}
  1. 指定数据源的这样会将整个目录中的所有文件进行解析加载,合并到同一个map中
    c := config.New(
            config.WithSource(
                file.NewSource(flagconf),
            ),
        )
        if err := c.Load(); err != nil {
            panic(err)
         }
  1. 使用之前创建好的config实例,调用.Scan方法,读取配置文件的内容到结构体中,这种方式适用于完整获取整个配置文件的内容
    if err := c.Scan(&v); err != nil {
      panic(err)
    }
    fmt.Printf("config: %+v", v)
  1. New的时候会有一个默认的 decoder
func New(opts ...Option) Config {
	o := options{
		logger:   log.DefaultLogger,
		decoder:  defaultDecoder,
		resolver: defaultResolver,
	}
	for _, opt := range opts {
		opt(&o)
	}
	return &config{
		opts:   o,
		reader: newReader(o),
		log:    log.NewHelper(o.logger),
	}
}

通过option可以拓展自定义 decoder

func WithDecoder(d Decoder) Option {
	return func(o *options) {
		o.decoder = d
	}
}

参考

  1. https://github.com/go-kratos/kratos/v2/config
  2. https://github.com/go-kratos/kratos/tree/main/contrib/config
  3. https://go-kratos.dev/docs/component/config

本文主要介绍 在go开发中 errors 的处理和第三方库 github.com/pkg/errors 的使用。

error interface

官方定义:

// The error built-in interface type is the conventional interface for
// representing an error condition, with the nil value representing no error.
type error interface {
	Error() string
}

常用的声明方式

//方式一
err1 := fmt.Errorf("io.EOF")
//方式二
err2 := errors.New("io.EOF")
//方式三: 实现interface
type err3 struct{
}
func (e err3) Error() string {
	return "err3"
}

go的错误常见是把错误层层传递,这样可能带来一些不友好的地方:

  • 错误判断,经过层次包裹的错误,使用层不好判断真实错误
  • 定位错误,error库过于简单,没有提供粗错误堆栈,不好定位问题

错误判断

常见错误的判断方式是:


type base struct{}

func (e base) Error() string {
	return "base error"
}

func wrapBase() error {
	return fmt.Errorf("wrapBase: %w", base{})
}

func TestErrors(t *testing.T) {
	t.Run("==", func(t *testing.T) {
		foo := &base{}
		bar := &base{}
		t.Log(foo)
		t.Log(bar)
		t.Log(foo == bar)
		assert.True(t, foo == bar)
	})
	t.Run("断言", func(t *testing.T) {
		var err error
		_, ok := err.(*base)
		assert.False(t, ok)
	})
	t.Run("Is", func(t *testing.T) {
		foo := base{}
		wrapFoo := wrapBase()
		assert.False(t, foo == wrapFoo)
		assert.True(t, errors.Is(wrapFoo, foo))
		assert.False(t, errors.Is(wrapFoo, &base{}))
	})
	t.Run("As", func(t *testing.T) {
		foo := base{}
		wrapFoo := wrapBase()
		assert.False(t, foo == wrapFoo)
		assert.True(t, errors.As(wrapFoo, &base{}))
	})
}

OutPut:

--- PASS: TestErrors (0.00s)
--- PASS: TestErrors/== (0.00s)
--- PASS: TestErrors/断言 (0.00s)
--- PASS: TestErrors/Is (0.00s)
 --- PASS: TestErrors/As (0.00s)
  • 当没有嵌套错误,可以使用 ==来判断;
  • 当有多层嵌套,可以使用 Is() :
    • //一个错误被认为匹配一个目标,如果它等于那个目标或如果它实现了一个方法Is(error) bool,使Is(target)返回true。
    • // As在err’s chain中找到第一个与target匹配的错误,如果找到,则设置指向错误值并返回true。否则,返回false

定位错误

常用的第三方库

github.com/pkg/errors/errors.go 提供了以下功能:

  • WithStack 包装堆栈
  • WithMessagef 包装异常
func TestWithStackCompare(t *testing.T) {
	t.Run("fmt.Errorf,无堆栈,不好定位", func(t *testing.T) {
		err1 := fmt.Errorf("io.EOF")
		fmt.Printf("err1: %+v", err1)
	})
	t.Run("errors ,无堆栈,不好定位", func(t *testing.T) {
		err2 := errors.New("io.EOF")
		fmt.Printf("err2: %+v", err2)
	})
	t.Run("pkgerrors,有堆栈,方便定位", func(t *testing.T) {
		err3 := pkgerrors.WithStack(io.EOF)
		fmt.Printf("err3: %+v", err3)
	})

}

OutPut:

=== RUN   TestWithStackCompare
=== RUN   TestWithStackCompare/fmt.Errorf,无堆栈,不好定位
err1: io.EOF=== RUN   TestWithStackCompare/errors_,无堆栈,不好定位
err2: io.EOF=== RUN   TestWithStackCompare/pkgerrors,有堆栈,便以定位
err3: EOF
tkingo.vip/egs/goerrors-demo.TestWithStackCompare.func3
	$ workspace/goerrors-demo/errors_test.go:113
testing.tRunner
	$ workspace/Go/src/testing/testing.go:1439
runtime.goexit

func TestWithMessagef(t *testing.T) {
	tests := []struct {
		err     error
		message string
		want    string
	}{
		{io.EOF, "read error", "read error: EOF"},
		{pkgerrors.WithMessagef(io.EOF, "read error without format specifier"), "client error", "client error: read error without format specifier: EOF"},
		{pkgerrors.WithMessagef(io.EOF, "read error with %d format specifier", 1), "client error", "client error: read error with 1 format specifier: EOF"},
	}

	for _, tt := range tests {
		got := pkgerrors.WithMessagef(tt.err, tt.message).Error()
		if got != tt.want {
			t.Errorf("WithMessage(%v, %q): got: %q, want %q", tt.err, tt.message, got, tt.want)
		}
	}
}

总结

pkg errors 应该能够支持错误堆栈、不同的打印格式很好的补充了go errors 的一些短板

参考:

  • github.com/pkg/errors

本文主要介绍一个好用的时间工具库,主要功能:

  • 当前时间
  • 修改地区
  • 解析字符串 等

Usage


import "github.com/jinzhu/now"

time.Now() // 2013-11-18 17:51:49.123456789 Mon

now.BeginningOfMinute()        // 2013-11-18 17:51:00 Mon
now.BeginningOfHour()          // 2013-11-18 17:00:00 Mon
now.BeginningOfDay()           // 2013-11-18 00:00:00 Mon
now.BeginningOfWeek()          // 2013-11-17 00:00:00 Sun
now.BeginningOfMonth()         // 2013-11-01 00:00:00 Fri
now.BeginningOfQuarter()       // 2013-10-01 00:00:00 Tue
now.BeginningOfYear()          // 2013-01-01 00:00:00 Tue

now.EndOfMinute()              // 2013-11-18 17:51:59.999999999 Mon
now.EndOfHour()                // 2013-11-18 17:59:59.999999999 Mon
now.EndOfDay()                 // 2013-11-18 23:59:59.999999999 Mon
now.EndOfWeek()                // 2013-11-23 23:59:59.999999999 Sat
now.EndOfMonth()               // 2013-11-30 23:59:59.999999999 Sat
now.EndOfQuarter()             // 2013-12-31 23:59:59.999999999 Tue
now.EndOfYear()                // 2013-12-31 23:59:59.999999999 Tue

now.WeekStartDay = time.Monday // Set Monday as first day, default is Sunday
now.EndOfWeek()                // 2013-11-24 23:59:59.999999999 Sun

修改地区

location, err := time.LoadLocation("Asia/Shanghai")

myConfig := &now.Config{
	WeekStartDay: time.Monday,
	TimeLocation: location,
	TimeFormats: []string{"2006-01-02 15:04:05"},
}

t := time.Date(2013, 11, 18, 17, 51, 49, 123456789, time.Now().Location()) // // 2013-11-18 17:51:49.123456789 Mon
myConfig.With(t).BeginningOfWeek()         // 2013-11-18 00:00:00 Mon

myConfig.Parse("2002-10-12 22:14:01")     // 2002-10-12 22:14:01
myConfig.Parse("2002-10-12 22:14")        // returns error 'can't parse string as time: 2002-10-12 22:14'

解析字符串

time.Now() // 2013-11-18 17:51:49.123456789 Mon

// Parse(string) (time.Time, error)
t, err := now.Parse("2017")                // 2017-01-01 00:00:00, nil
t, err := now.Parse("2017-10")             // 2017-10-01 00:00:00, nil
t, err := now.Parse("2017-10-13")          // 2017-10-13 00:00:00, nil
t, err := now.Parse("1999-12-12 12")       // 1999-12-12 12:00:00, nil
t, err := now.Parse("1999-12-12 12:20")    // 1999-12-12 12:20:00, nil
t, err := now.Parse("1999-12-12 12:20:21") // 1999-12-12 12:20:21, nil
t, err := now.Parse("10-13")               // 2013-10-13 00:00:00, nil
t, err := now.Parse("12:20")               // 2013-11-18 12:20:00, nil
t, err := now.Parse("12:20:13")            // 2013-11-18 12:20:13, nil
t, err := now.Parse("14")                  // 2013-11-18 14:00:00, nil
t, err := now.Parse("99:99")               // 2013-11-18 12:20:00, Can't parse string as time: 99:99`

参考

本文主要介绍Kratos Transport和源码的分析.

kratos 框架对传输层进行了抽象,用户可以通过实现接口来接入实现,框架默认实现了gRPC和HTTP两种通信协议传输层。

抽象接口

Server Interface

// 服务的启动和停止,用于管理服务生命周期。
type Server interface {
    Start(context.Context) error
    Stop(context.Context) error
}

Transporter

type Transporter interface {
    // 代表实现的通讯协议的种类,如内置的 http grpc,也可以实现其他的类型如 mqtt,websocket
    Kind() Kind
    // 提供的服务终端地址
    Endpoint() string
    // 用于标识服务的完整方法路径
    // 示例: /helloworld.Greeter/SayHello
    Operation() string
    // http 的请求头或者 grpc 的元数据
    Header() Header
}

Endpointer

type Endpointer interface {
    // 用于实现注册到注册中心的终端地址,如果不实现这个方法则不会注册到注册中心
    Endpoint() (*url.URL, error)
}

Example For Http

使用kratos-transport 创建一个http server 和 client ,在 client和server之间通讯

server

server的完整代码 server.go 如下:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	transporthttp "github.com/go-kratos/kratos/v2/transport/http"
)

func NewServer() *transporthttp.Server {
	var opts = []transporthttp.ServerOption{
		transporthttp.Address("0.0.0.0:8000"),
		transporthttp.Timeout(time.Second * 10),
	}
	svr := transporthttp.NewServer(opts...)
	svr.Handle("/", &helloHandler{})
	return svr
}

func main() {
	ctx := context.Background()
	srv := NewServer()

	err := srv.Start(ctx)
	if err != nil {
		if err == http.ErrServerClosed {
			log.Print("Server closed under request")
		} else {
			log.Fatal("Server closed unexpected")
		}
	}

	done := make(chan os.Signal)
	signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-done
		if err := srv.Stop(ctx); err != nil {
			log.Fatal("Shutdown NewServer:", err)
		}
	}()
}

type Foo struct {
	Bar string
}

type helloHandler struct{}

func (*helloHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	err := r.ParseForm()
	if err != nil {
		return
	}
	formData := make(map[string]interface{})
	json.NewDecoder(r.Body).Decode(&formData)
	fmt.Println(formData)
	for k, v := range formData {
		fmt.Println("key", k, "value", v)
	}

	reply := &Foo{
		Bar: "NiceToMeetYouToo",
	}
	replyBytes, err := json.Marshal(reply)
	if err != nil {
		return
	}
	fmt.Fprintf(w, string(replyBytes))
}

启动服务

 go run main.go

输出

API server listening at: [::]:60870
INFO msg=[HTTP] server listening on: [::]:8000

client

client 的完整代码 client.go 如下:


package client

import (
   "context"
   "github.com/go-kratos/kratos/v2/transport/http"
   "testing"
   "time"

   "github.com/stretchr/testify/assert"
)

type Foo struct {
   Bar string
}

func TestNewClient(t *testing.T) {
   client, err := http.NewClient(
   	context.Background(),
   	http.WithEndpoint("0.0.0.0:8000"),
   	http.WithMiddleware(),
   	http.WithTimeout(time.Second*10),
   )
   assert.Nil(t, err)

   values := map[string]string{
   	"foo":   "bar",
   	"hello": "world",
   } 
   assert.Nil(t, err)
   reply := &Foo{}
   err = client.Invoke(context.Background(), "POST", "/", values, reply)
   assert.Nil(t, err)

   t.Log("reply", reply)
}

运行测试用例,

server 输出:

key foo value bar
key hello value world

client 输出:

=== RUN   TestNewClient
    client_test.go:42: reply &{NiceToMeetYouToo}

源码分析

Transport Server

// 服务的启动和停止,用于管理服务生命周期。
type Server interface {
   Start(context.Context) error
   Stop(context.Context) error
}

Start:启动服务、监听端口、处理请求 Stop:停止服务

如上例 server.go 中,

svr := transporthttp.NewServer(opts...)

这个用创建一个 Server Interface的Http实例

// Server is an HTTP server wrapper.
type Server struct {
	*http.Server
	lis         net.Listener
	tlsConf     *tls.Config
	endpoint    *url.URL
	err         error
	network     string
	address     string
	timeout     time.Duration
	filters     []FilterFunc
	ms          []middleware.Middleware
	dec         DecodeRequestFunc
	enc         EncodeResponseFunc
	ene         EncodeErrorFunc
	strictSlash bool
	router      *mux.Router
	log         *log.Helper
}

可以看到,Server是对标准库 net/http 中http.Server 的一个封装。

// ServerOption is an HTTP server option.
type ServerOption func(*Server)

可通过 option 设置服务参数,比如例子中的

var opts = []transporthttp.ServerOption{
		transporthttp.Address("0.0.0.0:8000"),
		transporthttp.Timeout(time.Second * 10),
	}

Start 启动一个net/http.Server

// Start start the HTTP server.
func (s *Server) Start(ctx context.Context) error { 
    ...
	s.log.Infof("[HTTP] server listening on: %s", s.lis.Addr().String())
	var err error
	if s.tlsConf != nil {
		err = s.ServeTLS(s.lis, "", "")
	} else {
		err = s.Serve(s.lis)
	}
    ...
} 

启动流程

func (a *App) Run() error {
	...
	for _, srv := range a.opts.servers {
		srv := srv
		eg.Go(func() error {
			<-ctx.Done() // wait for stop signal
			return srv.Stop(ctx)
		})
		wg.Add(1)
		eg.Go(func() error {
			wg.Done()
			return srv.Start(ctx)
		})
	}
	wg.Wait()
    ...
}

kratos 的App 层无需关心底层服务协议的实现, 只需管理好应用配置、服务生命周期、加载顺序即可。

Transport Client

Transport Client 是会 http CLient封装

// Client is an HTTP client.
type Client struct {
	opts     clientOptions
	target   *Target
	r        *resolver
	cc       *http.Client
	insecure bool
}

如client.go 中的一个Http Post

    values := map[string]string{
	}  
	reply := &Foo{}
	err = client.Invoke(context.Background(), "POST", "/", values, reply) 

Invoke封装了post,使用更加简洁。

总结

kratos 的App 层无需关心底层服务协议的实现, 只需管理好应用配置、服务生命周期、加载顺序即可。抽象的服务协议可以专注应用配置、服务生命周期等。层次清晰,职责清楚。

参考

jefffff

Stay hungry. Stay Foolish COOL

Go backend developer

China Amoy