Kratos Middleware是Kratos的核心一个中间件层,它是Kratos架构的核心部分,负责处理所有微服务间的通信和数据流。本文将深入了解Kratos Middleware的工作原理,更好地理解和使用Kratos。

使用示例

Kratos 内置了一系列的 middleware(中间件)用于处理 logging、 metrics 等通用场景, 您也可以通过实现 Middleware 接口,开发自定义 middleware,进行通用的业务处理,比如用户登录鉴权等。

// http
// 定义opts
var opts = []http.ServerOption{
    http.Middleware(
        recovery.Recovery(), // 把middleware按照需要的顺序加入
        tracing.Server(),
        logging.Server(),
    ),
}
// 创建server
http.NewServer(opts...)

自定义中间件的例子:

func Middleware1() middleware.Middleware {
    return func(handler middleware.Handler) middleware.Handler {
        return func(ctx context.Context, req interface{}) (reply interface{}, err error) {
            if tr, ok := transport.FromServerContext(ctx); ok {
                // Do something on entering
                defer func() {
                // Do something on exiting
                 }()
            }
            return handler(ctx, req)
        }
    }
}

链式调用

Kratos Middleware是通过链式结构实现的,其中每一个中间件都代表一个独立的处理单元,可以在请求的生命周期中的不同阶段执行不同的操作,如请求前的预处理、请求后的后处理等。

每一个中间件都是一个函数,具有相同的签名:

// Handler定义中间件调用的处理程序
type Handler func(ctx context.Context, req interface{}) (interface{}, error)

再看看/go-kratos/kratos/blob/main/middleware/middleware包中Middleware和Chain的实现,Middleware 是HTTP/gRPC传输中间件。 Chain返回一个中间件,它指定endpoint.的链式处理程序。

type Middleware func(Handler) Handler

func Chain(m ...Middleware) Middleware {
	return func(next Handler) Handler {
		for i := len(m) - 1; i >= 0; i-- {
			next = m[i](next)
		}
		return next
	}
}

Example,以https://github.com/go-kratos/kratos/blob/main/middleware/middleware_test.go为例:

func TestChain(t *testing.T) {
	next := func(ctx context.Context, req interface{}) (interface{}, error) {
		t.Log(req)
		i += 10
		return "reply", nil
	}

	got, err := Chain(test1Middleware, test2Middleware, test3Middleware)(next)(context.Background(), "hello kratos!")
	if err != nil {
		t.Errorf("expect %v, got %v", nil, err)
        }
  }
  
func test1Middleware(handler Handler) Handler {
	return func(ctx context.Context, req interface{}) (reply interface{}, err error) {
		fmt.Println("test1 before")
		i++
		reply, err = handler(ctx, req)
		fmt.Println("test1 after")
		return
	}
}  
... 篇幅限制省略部分代码
OutPut
test1 before
test2 before
test3 before
    middleware_test.go:14: hello kratos!
test3 after
test2 after
test1 after

调用过程

对链式调用的使用有所了解后,接着来看Middleware 调用组织过程,大致归纳为:

  1. option方式配置中间件
  2. url路由匹配中间件
  3. 组装中间件添加到调用链
  4. 注册到gorilla,其中gorilla实现net.hander,设置为服务路由中间件

以Transport.Server Http服务为例,重点关注middleware和router 。

// Server is an HTTP server wrapper.
type Server struct {
	*http.Server
	middleware  matcher.Matcher
	router      *mux.Router
	...
}
// Matcher is a middleware matcher.
type Matcher interface {
	Use(ms ...middleware.Middleware)
	Add(selector string, ms ...middleware.Middleware)
	Match(operation string) []middleware.Middleware
}
  • Matcher是一个中间件匹配器,operation 为path,实现路由查找。
  • router 为gorilla.Mux, Http路由器和Url匹配器

一个完成的调用api-service:

func _Greeter_SayHello0_HTTP_Handler(srv GreeterHTTPServer) func(ctx http.Context) error {
	return func(ctx http.Context) error {
		var in HelloRequest
		if err := ctx.BindQuery(&in); err != nil {
			return err
		}
		if err := ctx.BindVars(&in); err != nil {
			return err
		}
		http.SetOperation(ctx, "/helloworld.v1.Greeter/SayHello")
		h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
			return srv.SayHello(ctx, req.(*HelloRequest))
		})
		out, err := h(ctx, &in)
		if err != nil {
			return err
		}
		reply := out.(*HelloReply)
		return ctx.Result(200, reply)
	}
}

其中ctx.Middleware的实现是:

func (c *wrapper) Middleware(h middleware.Handler) middleware.Handler {
	if tr, ok := transport.FromServerContext(c.req.Context()); ok {
		return middleware.Chain(c.router.srv.middleware.Match(tr.Operation())...)(h)
	}
	return middleware.Chain(c.router.srv.middleware.Match(c.req.URL.Path)...)(h)
} 

func (m *matcher) Match(operation string) []middleware.Middleware {
	ms := make([]middleware.Middleware, 0, len(m.defaults))
	if len(m.defaults) > 0 {
		ms = append(ms, m.defaults...)
	}
	if next, ok := m.matchs[operation]; ok {
		return append(ms, next...)
	}
	for _, prefix := range m.prefix {
		if strings.HasPrefix(operation, prefix) {
			return append(ms, m.matchs[prefix]...)
		}
	}
	return ms
}

以optionnal方式定义的中间件存储m.defaults,通过matcher.Match给不同的路由匹配不同的中间件。最终加入到 middleware.Chain的调用链实现链式调用。

小结

文本记录深入理解Kratos框架和Kratos Middleware,核心使用middleware.Chain的链式调用具有相同的签名的中间件函数。框架通过option的方式提供设置中间件, Matcher中间件匹配器匹配不同路由规则的中间件,来实现链式调用。

参考

本文主要记录kratos项目配置的定义、读取和源码的学习

at first

服务在启动的基本都会用到配置文件,那么如果是你来写config工具库,它所能提供的功能是什么?

  1. 读取不同场景的配置
  2. 读取不同格式的配置
  3. 支持热更
  4. 支持拓展,提供自定义实现

kratos/config 有那些功能

  1. kratos/config包支持多种配置源,包括
    • 本地文件
    • 本地环境
    • contrib/config支持配置中心
  2. kratos/config 支持多种配置格式
    • json
    • proto
    • xml
    • yaml
  3. 支持热更
  4. 支持其它格式的配置文件

定义配置

在项目目录下创建文件: workspace/configs/config.yaml

service:
  name: config
  version: v1.0.0
http:
  server:
    address: 0.0.0.0:8000
    timeout: 1s
grpc:
  server:
    address: 0.0.0.0:9000
    timeout: 1s

加载配置文件

WithSource

使用withsoure指定数据源为本地文件

package main

import (
	"flag"
	"log"

	"github.com/go-kratos/kratos/v2/config"
	"github.com/go-kratos/kratos/v2/config/file"
)

var flagconf string

func init() {
	flag.StringVar(&flagconf, "conf", "config.yaml", "config path, eg: -conf config.yaml")
}

func main() {
	flag.Parse()
	c := config.New(
		config.WithSource(
			file.NewSource(flagconf),
		),
	)
	if err := c.Load(); err != nil {
		panic(err)
	}

	//定义配置JSON字段
	var v struct {
		Service struct {
			Name    string `json:"name"`
			Version string `json:"version"`
		} `json:"service"`
	}

	//将配置Unmarshal 到struct  
	if err := c.Scan(&v); err != nil {
		panic(err)
	}
	log.Printf("config: %+v", v)

	//获取与该键关联的值
	name, err := c.Value("service.name").String()
	if err != nil {
		panic(err)
	}
	log.Printf("service: %s", name)

	// 热更执行钩子
	if err := c.Watch("service.name", func(key string, value config.Value) {
		log.Printf("config changed: %s = %v\n", key, value)
	}); err != nil {
		panic(err)
	}

	<-make(chan struct{})
}

output:

$  config: {Service:{Name:config Version:v1.0.0}}
$  service: config

WithDecoder

Decoder用于将配置文件内容用特定的反序列化方法解析出来

c := config.New(
		config.WithSource(
			file.NewSource(flagconf),
		),
		config.WithDecoder(func(src *config.KeyValue, target map[string]interface{}) error {
			if src.Format == "" {
				// expand key "aaa.bbb" into map[aaa]map[bbb]interface{}
				keys := strings.Split(src.Key, ".")
				for i, k := range keys {
					if i == len(keys)-1 {
						target[k] = src.Value
					} else {
						sub := make(map[string]interface{})
						target[k] = sub
						target = sub
					}
				}
				return nil
			}
			if codec := encoding.GetCodec(src.Format); codec != nil {
				return codec.Unmarshal(src.Value, &target)
			}
			return fmt.Errorf("unsupported key: %s format: %s", src.Key, src.Format)
		}),
	)

output 和上例一致

原理

// Config is a config interface.
type Config interface {
	Load() error
	Scan(v interface{}) error
	Value(key string) Value
	Watch(key string, o Observer) error
	Close() error
}
  1. 指定数据源的这样会将整个目录中的所有文件进行解析加载,合并到同一个map中
    c := config.New(
            config.WithSource(
                file.NewSource(flagconf),
            ),
        )
        if err := c.Load(); err != nil {
            panic(err)
         }
  1. 使用之前创建好的config实例,调用.Scan方法,读取配置文件的内容到结构体中,这种方式适用于完整获取整个配置文件的内容
    if err := c.Scan(&v); err != nil {
      panic(err)
    }
    fmt.Printf("config: %+v", v)
  1. New的时候会有一个默认的 decoder
func New(opts ...Option) Config {
	o := options{
		logger:   log.DefaultLogger,
		decoder:  defaultDecoder,
		resolver: defaultResolver,
	}
	for _, opt := range opts {
		opt(&o)
	}
	return &config{
		opts:   o,
		reader: newReader(o),
		log:    log.NewHelper(o.logger),
	}
}

通过option可以拓展自定义 decoder

func WithDecoder(d Decoder) Option {
	return func(o *options) {
		o.decoder = d
	}
}

参考

  1. https://github.com/go-kratos/kratos/v2/config
  2. https://github.com/go-kratos/kratos/tree/main/contrib/config
  3. https://go-kratos.dev/docs/component/config

本文主要介绍Kratos Transport和源码的分析.

kratos 框架对传输层进行了抽象,用户可以通过实现接口来接入实现,框架默认实现了gRPC和HTTP两种通信协议传输层。

抽象接口

Server Interface

// 服务的启动和停止,用于管理服务生命周期。
type Server interface {
    Start(context.Context) error
    Stop(context.Context) error
}

Transporter

type Transporter interface {
    // 代表实现的通讯协议的种类,如内置的 http grpc,也可以实现其他的类型如 mqtt,websocket
    Kind() Kind
    // 提供的服务终端地址
    Endpoint() string
    // 用于标识服务的完整方法路径
    // 示例: /helloworld.Greeter/SayHello
    Operation() string
    // http 的请求头或者 grpc 的元数据
    Header() Header
}

Endpointer

type Endpointer interface {
    // 用于实现注册到注册中心的终端地址,如果不实现这个方法则不会注册到注册中心
    Endpoint() (*url.URL, error)
}

Example For Http

使用kratos-transport 创建一个http server 和 client ,在 client和server之间通讯

server

server的完整代码 server.go 如下:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	transporthttp "github.com/go-kratos/kratos/v2/transport/http"
)

func NewServer() *transporthttp.Server {
	var opts = []transporthttp.ServerOption{
		transporthttp.Address("0.0.0.0:8000"),
		transporthttp.Timeout(time.Second * 10),
	}
	svr := transporthttp.NewServer(opts...)
	svr.Handle("/", &helloHandler{})
	return svr
}

func main() {
	ctx := context.Background()
	srv := NewServer()

	err := srv.Start(ctx)
	if err != nil {
		if err == http.ErrServerClosed {
			log.Print("Server closed under request")
		} else {
			log.Fatal("Server closed unexpected")
		}
	}

	done := make(chan os.Signal)
	signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-done
		if err := srv.Stop(ctx); err != nil {
			log.Fatal("Shutdown NewServer:", err)
		}
	}()
}

type Foo struct {
	Bar string
}

type helloHandler struct{}

func (*helloHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	err := r.ParseForm()
	if err != nil {
		return
	}
	formData := make(map[string]interface{})
	json.NewDecoder(r.Body).Decode(&formData)
	fmt.Println(formData)
	for k, v := range formData {
		fmt.Println("key", k, "value", v)
	}

	reply := &Foo{
		Bar: "NiceToMeetYouToo",
	}
	replyBytes, err := json.Marshal(reply)
	if err != nil {
		return
	}
	fmt.Fprintf(w, string(replyBytes))
}

启动服务

 go run main.go

输出

API server listening at: [::]:60870
INFO msg=[HTTP] server listening on: [::]:8000

client

client 的完整代码 client.go 如下:


package client

import (
   "context"
   "github.com/go-kratos/kratos/v2/transport/http"
   "testing"
   "time"

   "github.com/stretchr/testify/assert"
)

type Foo struct {
   Bar string
}

func TestNewClient(t *testing.T) {
   client, err := http.NewClient(
   	context.Background(),
   	http.WithEndpoint("0.0.0.0:8000"),
   	http.WithMiddleware(),
   	http.WithTimeout(time.Second*10),
   )
   assert.Nil(t, err)

   values := map[string]string{
   	"foo":   "bar",
   	"hello": "world",
   } 
   assert.Nil(t, err)
   reply := &Foo{}
   err = client.Invoke(context.Background(), "POST", "/", values, reply)
   assert.Nil(t, err)

   t.Log("reply", reply)
}

运行测试用例,

server 输出:

key foo value bar
key hello value world

client 输出:

=== RUN   TestNewClient
    client_test.go:42: reply &{NiceToMeetYouToo}

源码分析

Transport Server

// 服务的启动和停止,用于管理服务生命周期。
type Server interface {
   Start(context.Context) error
   Stop(context.Context) error
}

Start:启动服务、监听端口、处理请求 Stop:停止服务

如上例 server.go 中,

svr := transporthttp.NewServer(opts...)

这个用创建一个 Server Interface的Http实例

// Server is an HTTP server wrapper.
type Server struct {
	*http.Server
	lis         net.Listener
	tlsConf     *tls.Config
	endpoint    *url.URL
	err         error
	network     string
	address     string
	timeout     time.Duration
	filters     []FilterFunc
	ms          []middleware.Middleware
	dec         DecodeRequestFunc
	enc         EncodeResponseFunc
	ene         EncodeErrorFunc
	strictSlash bool
	router      *mux.Router
	log         *log.Helper
}

可以看到,Server是对标准库 net/http 中http.Server 的一个封装。

// ServerOption is an HTTP server option.
type ServerOption func(*Server)

可通过 option 设置服务参数,比如例子中的

var opts = []transporthttp.ServerOption{
		transporthttp.Address("0.0.0.0:8000"),
		transporthttp.Timeout(time.Second * 10),
	}

Start 启动一个net/http.Server

// Start start the HTTP server.
func (s *Server) Start(ctx context.Context) error { 
    ...
	s.log.Infof("[HTTP] server listening on: %s", s.lis.Addr().String())
	var err error
	if s.tlsConf != nil {
		err = s.ServeTLS(s.lis, "", "")
	} else {
		err = s.Serve(s.lis)
	}
    ...
} 

启动流程

func (a *App) Run() error {
	...
	for _, srv := range a.opts.servers {
		srv := srv
		eg.Go(func() error {
			<-ctx.Done() // wait for stop signal
			return srv.Stop(ctx)
		})
		wg.Add(1)
		eg.Go(func() error {
			wg.Done()
			return srv.Start(ctx)
		})
	}
	wg.Wait()
    ...
}

kratos 的App 层无需关心底层服务协议的实现, 只需管理好应用配置、服务生命周期、加载顺序即可。

Transport Client

Transport Client 是会 http CLient封装

// Client is an HTTP client.
type Client struct {
	opts     clientOptions
	target   *Target
	r        *resolver
	cc       *http.Client
	insecure bool
}

如client.go 中的一个Http Post

    values := map[string]string{
	}  
	reply := &Foo{}
	err = client.Invoke(context.Background(), "POST", "/", values, reply) 

Invoke封装了post,使用更加简洁。

总结

kratos 的App 层无需关心底层服务协议的实现, 只需管理好应用配置、服务生命周期、加载顺序即可。抽象的服务协议可以专注应用配置、服务生命周期等。层次清晰,职责清楚。

参考

jefffff

Stay hungry. Stay Foolish COOL

Go backend developer

China Amoy