本文主要介绍Kratos Transport和源码的分析.

kratos 框架对传输层进行了抽象,用户可以通过实现接口来接入实现,框架默认实现了gRPC和HTTP两种通信协议传输层。

抽象接口

Server Interface

// 服务的启动和停止,用于管理服务生命周期。
type Server interface {
    Start(context.Context) error
    Stop(context.Context) error
}

Transporter

type Transporter interface {
    // 代表实现的通讯协议的种类,如内置的 http grpc,也可以实现其他的类型如 mqtt,websocket
    Kind() Kind
    // 提供的服务终端地址
    Endpoint() string
    // 用于标识服务的完整方法路径
    // 示例: /helloworld.Greeter/SayHello
    Operation() string
    // http 的请求头或者 grpc 的元数据
    Header() Header
}

Endpointer

type Endpointer interface {
    // 用于实现注册到注册中心的终端地址,如果不实现这个方法则不会注册到注册中心
    Endpoint() (*url.URL, error)
}

Example For Http

使用kratos-transport 创建一个http server 和 client ,在 client和server之间通讯

server

server的完整代码 server.go 如下:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	transporthttp "github.com/go-kratos/kratos/v2/transport/http"
)

func NewServer() *transporthttp.Server {
	var opts = []transporthttp.ServerOption{
		transporthttp.Address("0.0.0.0:8000"),
		transporthttp.Timeout(time.Second * 10),
	}
	svr := transporthttp.NewServer(opts...)
	svr.Handle("/", &helloHandler{})
	return svr
}

func main() {
	ctx := context.Background()
	srv := NewServer()

	err := srv.Start(ctx)
	if err != nil {
		if err == http.ErrServerClosed {
			log.Print("Server closed under request")
		} else {
			log.Fatal("Server closed unexpected")
		}
	}

	done := make(chan os.Signal)
	signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-done
		if err := srv.Stop(ctx); err != nil {
			log.Fatal("Shutdown NewServer:", err)
		}
	}()
}

type Foo struct {
	Bar string
}

type helloHandler struct{}

func (*helloHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	err := r.ParseForm()
	if err != nil {
		return
	}
	formData := make(map[string]interface{})
	json.NewDecoder(r.Body).Decode(&formData)
	fmt.Println(formData)
	for k, v := range formData {
		fmt.Println("key", k, "value", v)
	}

	reply := &Foo{
		Bar: "NiceToMeetYouToo",
	}
	replyBytes, err := json.Marshal(reply)
	if err != nil {
		return
	}
	fmt.Fprintf(w, string(replyBytes))
}

启动服务

 go run main.go

输出

API server listening at: [::]:60870
INFO msg=[HTTP] server listening on: [::]:8000

client

client 的完整代码 client.go 如下:


package client

import (
   "context"
   "github.com/go-kratos/kratos/v2/transport/http"
   "testing"
   "time"

   "github.com/stretchr/testify/assert"
)

type Foo struct {
   Bar string
}

func TestNewClient(t *testing.T) {
   client, err := http.NewClient(
   	context.Background(),
   	http.WithEndpoint("0.0.0.0:8000"),
   	http.WithMiddleware(),
   	http.WithTimeout(time.Second*10),
   )
   assert.Nil(t, err)

   values := map[string]string{
   	"foo":   "bar",
   	"hello": "world",
   } 
   assert.Nil(t, err)
   reply := &Foo{}
   err = client.Invoke(context.Background(), "POST", "/", values, reply)
   assert.Nil(t, err)

   t.Log("reply", reply)
}

运行测试用例,

server 输出:

key foo value bar
key hello value world

client 输出:

=== RUN   TestNewClient
    client_test.go:42: reply &{NiceToMeetYouToo}

源码分析

Transport Server

// 服务的启动和停止,用于管理服务生命周期。
type Server interface {
   Start(context.Context) error
   Stop(context.Context) error
}

Start:启动服务、监听端口、处理请求 Stop:停止服务

如上例 server.go 中,

svr := transporthttp.NewServer(opts...)

这个用创建一个 Server Interface的Http实例

// Server is an HTTP server wrapper.
type Server struct {
	*http.Server
	lis         net.Listener
	tlsConf     *tls.Config
	endpoint    *url.URL
	err         error
	network     string
	address     string
	timeout     time.Duration
	filters     []FilterFunc
	ms          []middleware.Middleware
	dec         DecodeRequestFunc
	enc         EncodeResponseFunc
	ene         EncodeErrorFunc
	strictSlash bool
	router      *mux.Router
	log         *log.Helper
}

可以看到,Server是对标准库 net/http 中http.Server 的一个封装。

// ServerOption is an HTTP server option.
type ServerOption func(*Server)

可通过 option 设置服务参数,比如例子中的

var opts = []transporthttp.ServerOption{
		transporthttp.Address("0.0.0.0:8000"),
		transporthttp.Timeout(time.Second * 10),
	}

Start 启动一个net/http.Server

// Start start the HTTP server.
func (s *Server) Start(ctx context.Context) error { 
    ...
	s.log.Infof("[HTTP] server listening on: %s", s.lis.Addr().String())
	var err error
	if s.tlsConf != nil {
		err = s.ServeTLS(s.lis, "", "")
	} else {
		err = s.Serve(s.lis)
	}
    ...
} 

启动流程

func (a *App) Run() error {
	...
	for _, srv := range a.opts.servers {
		srv := srv
		eg.Go(func() error {
			<-ctx.Done() // wait for stop signal
			return srv.Stop(ctx)
		})
		wg.Add(1)
		eg.Go(func() error {
			wg.Done()
			return srv.Start(ctx)
		})
	}
	wg.Wait()
    ...
}

kratos 的App 层无需关心底层服务协议的实现, 只需管理好应用配置、服务生命周期、加载顺序即可。

Transport Client

Transport Client 是会 http CLient封装

// Client is an HTTP client.
type Client struct {
	opts     clientOptions
	target   *Target
	r        *resolver
	cc       *http.Client
	insecure bool
}

如client.go 中的一个Http Post

    values := map[string]string{
	}  
	reply := &Foo{}
	err = client.Invoke(context.Background(), "POST", "/", values, reply) 

Invoke封装了post,使用更加简洁。

总结

kratos 的App 层无需关心底层服务协议的实现, 只需管理好应用配置、服务生命周期、加载顺序即可。抽象的服务协议可以专注应用配置、服务生命周期等。层次清晰,职责清楚。

参考