前后端数据常用传输格式有:json、xml 和 proto等,不管是mvc还是ddd,都会在表现层的对不同的格式进行转化下层依赖所需的类、Object、 aggregate等。

Kratos 表现层

在实际开发场景中,前后端传输采用json。 但kratos使用proto定义Api, 那么我们以一个Http请求为例,来看看kratos的表现层是如何处理的。

以下是一个Kratos-Example,由编写好的proto文件,proto-go 等自动生成表现层的代码。 大概步骤:

  1. 编写proto,包括Service,Req,Reply
  2. 生成表现层代码
  3. 实现下层逻辑等

Example完整代码如下

func _Greeter_SayHello0_HTTP_Handler(srv GreeterHTTPServer) func(ctx http.Context) error {
	return func(ctx http.Context) error {
		var in HelloRequest
		if err := ctx.BindQuery(&in); err != nil {
			return err
		}
		if err := ctx.BindVars(&in); err != nil {
			return err
		}
		http.SetOperation(ctx, "/helloworld.v1.Greeter/SayHello")
		h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
			return srv.SayHello(ctx, req.(*HelloRequest))
		})
		out, err := h(ctx, &in)
		if err != nil {
			return err
		}
		reply := out.(*HelloReply)
		return ctx.Result(200, reply)
	}
}

可以看到:

  1. 这里对具体的格式无感,输入In还是输出Out都是一个proto生成的类。
  2. 具体的实现交给了ctx 上下文去实现 查看 ctx.Result的源码:
func (c *wrapper) Result(code int, v interface{}) error {
	c.w.WriteHeader(code)
	return c.router.srv.enc(&c.w, c.req, v)
}

这里的enc,没有特殊设置,使用默认的DefaultResponseEncoder,部分源码如下

func DefaultResponseEncoder(w http.ResponseWriter, r *http.Request, v interface{}) error {
	...
	codec, _ := CodecForRequest(r, "Accept")
	data, err := codec.Marshal(v)
	if err != nil {
		return err
	}
	w.Header().Set("Content-Type", httputil.ContentType(codec.Name()))
	_, err = w.Write(data)
    ...
}

通过codc接口隔离具体实现,调用接口的 codec.Marshal序列化。

codec包

Codec定义Transport用于编码和解码消息的接口。它的实现包括 json,xml,proto,yaml等

type Codec interface {
	// Marshal returns the wire format of v.
	Marshal(v interface{}) ([]byte, error)
	// Unmarshal parses the wire format into v.
	Unmarshal(data []byte, v interface{}) error
	// Name returns the name of the Codec implementation. The returned string
	// will be used as part of content type in transmission.  The result must be
	// static; the result cannot change between calls.
	Name() string
}

这里我们聚焦正在使用的Json

type codec struct{}
func (codec) Marshal(v interface{}) ([]byte, error) {
	switch m := v.(type) {
	case json.Marshaler:
		return m.MarshalJSON()
	case proto.Message:
		return MarshalOptions.Marshal(m)
	default:
		return json.Marshal(m)
	}
}

这里Marshal()使用第三方库““google.golang.org/protobuf/encoding/protojson”

实现proto转json。

protojson第三方包

protojson是Google提供的proto和json的转化

package api_test

import (
	"google.golang.org/protobuf/encoding/protojson"
	"testing"

	v1 "helloworld/api/helloworld/v1"
)

func TestToJson(t *testing.T) {
	reply := &v1.HelloReply{
		Message: "Jobs",
	}
	replyJson, err := MarshalOptions.Marshal(reply.ProtoReflect().Interface())
	if err != nil {
		t.Errorf("Marshal Error: %v", err)
	}
	t.Logf("replyJson:  %v", string(replyJson))
}

更多的接口请参考“https://google.golang.org/protobuf/encoding/protojson”

小结

本文主要以kratos的表现层的组织方式,来介绍常用的框架表现层处理方式。 其中:

  • kratos通过proto来生成表现层代码的类,包括http和grapc,内部对具体实现无感。
  • kratos通过解码器codec实现不同传输格式的解耦。
  • 第三方包protojson实现proto和json转换。

框架的主要功能之一就是标准化处理一下公共的问题场景,从源码中可以学习:通过接口隔离具体的实现,而使框架与具体实现解耦,且具备更好的拓展性。

参考

本文主要记录kratos项目配置的定义、读取和源码的学习

at first

服务在启动的基本都会用到配置文件,那么如果是你来写config工具库,它所能提供的功能是什么?

  1. 读取不同场景的配置
  2. 读取不同格式的配置
  3. 支持热更
  4. 支持拓展,提供自定义实现

kratos/config 有那些功能

  1. kratos/config包支持多种配置源,包括
    • 本地文件
    • 本地环境
    • contrib/config支持配置中心
  2. kratos/config 支持多种配置格式
    • json
    • proto
    • xml
    • yaml
  3. 支持热更
  4. 支持其它格式的配置文件

定义配置

在项目目录下创建文件: workspace/configs/config.yaml

service:
  name: config
  version: v1.0.0
http:
  server:
    address: 0.0.0.0:8000
    timeout: 1s
grpc:
  server:
    address: 0.0.0.0:9000
    timeout: 1s

加载配置文件

WithSource

使用withsoure指定数据源为本地文件

package main

import (
	"flag"
	"log"

	"github.com/go-kratos/kratos/v2/config"
	"github.com/go-kratos/kratos/v2/config/file"
)

var flagconf string

func init() {
	flag.StringVar(&flagconf, "conf", "config.yaml", "config path, eg: -conf config.yaml")
}

func main() {
	flag.Parse()
	c := config.New(
		config.WithSource(
			file.NewSource(flagconf),
		),
	)
	if err := c.Load(); err != nil {
		panic(err)
	}

	//定义配置JSON字段
	var v struct {
		Service struct {
			Name    string `json:"name"`
			Version string `json:"version"`
		} `json:"service"`
	}

	//将配置Unmarshal 到struct  
	if err := c.Scan(&v); err != nil {
		panic(err)
	}
	log.Printf("config: %+v", v)

	//获取与该键关联的值
	name, err := c.Value("service.name").String()
	if err != nil {
		panic(err)
	}
	log.Printf("service: %s", name)

	// 热更执行钩子
	if err := c.Watch("service.name", func(key string, value config.Value) {
		log.Printf("config changed: %s = %v\n", key, value)
	}); err != nil {
		panic(err)
	}

	<-make(chan struct{})
}

output:

$  config: {Service:{Name:config Version:v1.0.0}}
$  service: config

WithDecoder

Decoder用于将配置文件内容用特定的反序列化方法解析出来

c := config.New(
		config.WithSource(
			file.NewSource(flagconf),
		),
		config.WithDecoder(func(src *config.KeyValue, target map[string]interface{}) error {
			if src.Format == "" {
				// expand key "aaa.bbb" into map[aaa]map[bbb]interface{}
				keys := strings.Split(src.Key, ".")
				for i, k := range keys {
					if i == len(keys)-1 {
						target[k] = src.Value
					} else {
						sub := make(map[string]interface{})
						target[k] = sub
						target = sub
					}
				}
				return nil
			}
			if codec := encoding.GetCodec(src.Format); codec != nil {
				return codec.Unmarshal(src.Value, &target)
			}
			return fmt.Errorf("unsupported key: %s format: %s", src.Key, src.Format)
		}),
	)

output 和上例一致

原理

// Config is a config interface.
type Config interface {
	Load() error
	Scan(v interface{}) error
	Value(key string) Value
	Watch(key string, o Observer) error
	Close() error
}
  1. 指定数据源的这样会将整个目录中的所有文件进行解析加载,合并到同一个map中
    c := config.New(
            config.WithSource(
                file.NewSource(flagconf),
            ),
        )
        if err := c.Load(); err != nil {
            panic(err)
         }
  1. 使用之前创建好的config实例,调用.Scan方法,读取配置文件的内容到结构体中,这种方式适用于完整获取整个配置文件的内容
    if err := c.Scan(&v); err != nil {
      panic(err)
    }
    fmt.Printf("config: %+v", v)
  1. New的时候会有一个默认的 decoder
func New(opts ...Option) Config {
	o := options{
		logger:   log.DefaultLogger,
		decoder:  defaultDecoder,
		resolver: defaultResolver,
	}
	for _, opt := range opts {
		opt(&o)
	}
	return &config{
		opts:   o,
		reader: newReader(o),
		log:    log.NewHelper(o.logger),
	}
}

通过option可以拓展自定义 decoder

func WithDecoder(d Decoder) Option {
	return func(o *options) {
		o.decoder = d
	}
}

参考

  1. https://github.com/go-kratos/kratos/v2/config
  2. https://github.com/go-kratos/kratos/tree/main/contrib/config
  3. https://go-kratos.dev/docs/component/config

本文主要介绍Kratos Transport和源码的分析.

kratos 框架对传输层进行了抽象,用户可以通过实现接口来接入实现,框架默认实现了gRPC和HTTP两种通信协议传输层。

抽象接口

Server Interface

// 服务的启动和停止,用于管理服务生命周期。
type Server interface {
    Start(context.Context) error
    Stop(context.Context) error
}

Transporter

type Transporter interface {
    // 代表实现的通讯协议的种类,如内置的 http grpc,也可以实现其他的类型如 mqtt,websocket
    Kind() Kind
    // 提供的服务终端地址
    Endpoint() string
    // 用于标识服务的完整方法路径
    // 示例: /helloworld.Greeter/SayHello
    Operation() string
    // http 的请求头或者 grpc 的元数据
    Header() Header
}

Endpointer

type Endpointer interface {
    // 用于实现注册到注册中心的终端地址,如果不实现这个方法则不会注册到注册中心
    Endpoint() (*url.URL, error)
}

Example For Http

使用kratos-transport 创建一个http server 和 client ,在 client和server之间通讯

server

server的完整代码 server.go 如下:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	transporthttp "github.com/go-kratos/kratos/v2/transport/http"
)

func NewServer() *transporthttp.Server {
	var opts = []transporthttp.ServerOption{
		transporthttp.Address("0.0.0.0:8000"),
		transporthttp.Timeout(time.Second * 10),
	}
	svr := transporthttp.NewServer(opts...)
	svr.Handle("/", &helloHandler{})
	return svr
}

func main() {
	ctx := context.Background()
	srv := NewServer()

	err := srv.Start(ctx)
	if err != nil {
		if err == http.ErrServerClosed {
			log.Print("Server closed under request")
		} else {
			log.Fatal("Server closed unexpected")
		}
	}

	done := make(chan os.Signal)
	signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-done
		if err := srv.Stop(ctx); err != nil {
			log.Fatal("Shutdown NewServer:", err)
		}
	}()
}

type Foo struct {
	Bar string
}

type helloHandler struct{}

func (*helloHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	err := r.ParseForm()
	if err != nil {
		return
	}
	formData := make(map[string]interface{})
	json.NewDecoder(r.Body).Decode(&formData)
	fmt.Println(formData)
	for k, v := range formData {
		fmt.Println("key", k, "value", v)
	}

	reply := &Foo{
		Bar: "NiceToMeetYouToo",
	}
	replyBytes, err := json.Marshal(reply)
	if err != nil {
		return
	}
	fmt.Fprintf(w, string(replyBytes))
}

启动服务

 go run main.go

输出

API server listening at: [::]:60870
INFO msg=[HTTP] server listening on: [::]:8000

client

client 的完整代码 client.go 如下:


package client

import (
   "context"
   "github.com/go-kratos/kratos/v2/transport/http"
   "testing"
   "time"

   "github.com/stretchr/testify/assert"
)

type Foo struct {
   Bar string
}

func TestNewClient(t *testing.T) {
   client, err := http.NewClient(
   	context.Background(),
   	http.WithEndpoint("0.0.0.0:8000"),
   	http.WithMiddleware(),
   	http.WithTimeout(time.Second*10),
   )
   assert.Nil(t, err)

   values := map[string]string{
   	"foo":   "bar",
   	"hello": "world",
   } 
   assert.Nil(t, err)
   reply := &Foo{}
   err = client.Invoke(context.Background(), "POST", "/", values, reply)
   assert.Nil(t, err)

   t.Log("reply", reply)
}

运行测试用例,

server 输出:

key foo value bar
key hello value world

client 输出:

=== RUN   TestNewClient
    client_test.go:42: reply &{NiceToMeetYouToo}

源码分析

Transport Server

// 服务的启动和停止,用于管理服务生命周期。
type Server interface {
   Start(context.Context) error
   Stop(context.Context) error
}

Start:启动服务、监听端口、处理请求 Stop:停止服务

如上例 server.go 中,

svr := transporthttp.NewServer(opts...)

这个用创建一个 Server Interface的Http实例

// Server is an HTTP server wrapper.
type Server struct {
	*http.Server
	lis         net.Listener
	tlsConf     *tls.Config
	endpoint    *url.URL
	err         error
	network     string
	address     string
	timeout     time.Duration
	filters     []FilterFunc
	ms          []middleware.Middleware
	dec         DecodeRequestFunc
	enc         EncodeResponseFunc
	ene         EncodeErrorFunc
	strictSlash bool
	router      *mux.Router
	log         *log.Helper
}

可以看到,Server是对标准库 net/http 中http.Server 的一个封装。

// ServerOption is an HTTP server option.
type ServerOption func(*Server)

可通过 option 设置服务参数,比如例子中的

var opts = []transporthttp.ServerOption{
		transporthttp.Address("0.0.0.0:8000"),
		transporthttp.Timeout(time.Second * 10),
	}

Start 启动一个net/http.Server

// Start start the HTTP server.
func (s *Server) Start(ctx context.Context) error { 
    ...
	s.log.Infof("[HTTP] server listening on: %s", s.lis.Addr().String())
	var err error
	if s.tlsConf != nil {
		err = s.ServeTLS(s.lis, "", "")
	} else {
		err = s.Serve(s.lis)
	}
    ...
} 

启动流程

func (a *App) Run() error {
	...
	for _, srv := range a.opts.servers {
		srv := srv
		eg.Go(func() error {
			<-ctx.Done() // wait for stop signal
			return srv.Stop(ctx)
		})
		wg.Add(1)
		eg.Go(func() error {
			wg.Done()
			return srv.Start(ctx)
		})
	}
	wg.Wait()
    ...
}

kratos 的App 层无需关心底层服务协议的实现, 只需管理好应用配置、服务生命周期、加载顺序即可。

Transport Client

Transport Client 是会 http CLient封装

// Client is an HTTP client.
type Client struct {
	opts     clientOptions
	target   *Target
	r        *resolver
	cc       *http.Client
	insecure bool
}

如client.go 中的一个Http Post

    values := map[string]string{
	}  
	reply := &Foo{}
	err = client.Invoke(context.Background(), "POST", "/", values, reply) 

Invoke封装了post,使用更加简洁。

总结

kratos 的App 层无需关心底层服务协议的实现, 只需管理好应用配置、服务生命周期、加载顺序即可。抽象的服务协议可以专注应用配置、服务生命周期等。层次清晰,职责清楚。

参考

jefffff

Stay hungry. Stay Foolish COOL

Go backend developer

China Amoy