Go微服务 KratosTransporter传输层
本文主要介绍Kratos Transport和源码的分析.
kratos 框架对传输层进行了抽象,用户可以通过实现接口来接入实现,框架默认实现了gRPC和HTTP两种通信协议传输层。
抽象接口
Server Interface
// 服务的启动和停止,用于管理服务生命周期。
type Server interface {
Start(context.Context) error
Stop(context.Context) error
}
Transporter
type Transporter interface {
// 代表实现的通讯协议的种类,如内置的 http grpc,也可以实现其他的类型如 mqtt,websocket
Kind() Kind
// 提供的服务终端地址
Endpoint() string
// 用于标识服务的完整方法路径
// 示例: /helloworld.Greeter/SayHello
Operation() string
// http 的请求头或者 grpc 的元数据
Header() Header
}
Endpointer
type Endpointer interface {
// 用于实现注册到注册中心的终端地址,如果不实现这个方法则不会注册到注册中心
Endpoint() (*url.URL, error)
}
Example For Http
使用kratos-transport 创建一个http server 和 client ,在 client和server之间通讯
server
server的完整代码 server.go 如下:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
transporthttp "github.com/go-kratos/kratos/v2/transport/http"
)
func NewServer() *transporthttp.Server {
var opts = []transporthttp.ServerOption{
transporthttp.Address("0.0.0.0:8000"),
transporthttp.Timeout(time.Second * 10),
}
svr := transporthttp.NewServer(opts...)
svr.Handle("/", &helloHandler{})
return svr
}
func main() {
ctx := context.Background()
srv := NewServer()
err := srv.Start(ctx)
if err != nil {
if err == http.ErrServerClosed {
log.Print("Server closed under request")
} else {
log.Fatal("Server closed unexpected")
}
}
done := make(chan os.Signal)
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-done
if err := srv.Stop(ctx); err != nil {
log.Fatal("Shutdown NewServer:", err)
}
}()
}
type Foo struct {
Bar string
}
type helloHandler struct{}
func (*helloHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
return
}
formData := make(map[string]interface{})
json.NewDecoder(r.Body).Decode(&formData)
fmt.Println(formData)
for k, v := range formData {
fmt.Println("key", k, "value", v)
}
reply := &Foo{
Bar: "NiceToMeetYouToo",
}
replyBytes, err := json.Marshal(reply)
if err != nil {
return
}
fmt.Fprintf(w, string(replyBytes))
}
启动服务
go run main.go
输出
API server listening at: [::]:60870
INFO msg=[HTTP] server listening on: [::]:8000
client
client 的完整代码 client.go 如下:
package client
import (
"context"
"github.com/go-kratos/kratos/v2/transport/http"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
type Foo struct {
Bar string
}
func TestNewClient(t *testing.T) {
client, err := http.NewClient(
context.Background(),
http.WithEndpoint("0.0.0.0:8000"),
http.WithMiddleware(),
http.WithTimeout(time.Second*10),
)
assert.Nil(t, err)
values := map[string]string{
"foo": "bar",
"hello": "world",
}
assert.Nil(t, err)
reply := &Foo{}
err = client.Invoke(context.Background(), "POST", "/", values, reply)
assert.Nil(t, err)
t.Log("reply", reply)
}
运行测试用例,
server 输出:
key foo value bar
key hello value world
client 输出:
=== RUN TestNewClient
client_test.go:42: reply &{NiceToMeetYouToo}
源码分析
Transport Server
// 服务的启动和停止,用于管理服务生命周期。
type Server interface {
Start(context.Context) error
Stop(context.Context) error
}
Start:启动服务、监听端口、处理请求 Stop:停止服务
如上例 server.go 中,
svr := transporthttp.NewServer(opts...)
这个用创建一个 Server Interface的Http实例
// Server is an HTTP server wrapper.
type Server struct {
*http.Server
lis net.Listener
tlsConf *tls.Config
endpoint *url.URL
err error
network string
address string
timeout time.Duration
filters []FilterFunc
ms []middleware.Middleware
dec DecodeRequestFunc
enc EncodeResponseFunc
ene EncodeErrorFunc
strictSlash bool
router *mux.Router
log *log.Helper
}
可以看到,Server是对标准库 net/http 中http.Server 的一个封装。
// ServerOption is an HTTP server option.
type ServerOption func(*Server)
可通过 option 设置服务参数,比如例子中的
var opts = []transporthttp.ServerOption{
transporthttp.Address("0.0.0.0:8000"),
transporthttp.Timeout(time.Second * 10),
}
Start 启动一个net/http.Server
// Start start the HTTP server.
func (s *Server) Start(ctx context.Context) error {
...
s.log.Infof("[HTTP] server listening on: %s", s.lis.Addr().String())
var err error
if s.tlsConf != nil {
err = s.ServeTLS(s.lis, "", "")
} else {
err = s.Serve(s.lis)
}
...
}
启动流程
func (a *App) Run() error {
...
for _, srv := range a.opts.servers {
srv := srv
eg.Go(func() error {
<-ctx.Done() // wait for stop signal
return srv.Stop(ctx)
})
wg.Add(1)
eg.Go(func() error {
wg.Done()
return srv.Start(ctx)
})
}
wg.Wait()
...
}
kratos 的App 层无需关心底层服务协议的实现, 只需管理好应用配置、服务生命周期、加载顺序即可。
Transport Client
Transport Client 是会 http CLient封装
// Client is an HTTP client.
type Client struct {
opts clientOptions
target *Target
r *resolver
cc *http.Client
insecure bool
}
如client.go 中的一个Http Post
values := map[string]string{
}
reply := &Foo{}
err = client.Invoke(context.Background(), "POST", "/", values, reply)
Invoke封装了post,使用更加简洁。
总结
kratos 的App 层无需关心底层服务协议的实现, 只需管理好应用配置、服务生命周期、加载顺序即可。抽象的服务协议可以专注应用配置、服务生命周期等。层次清晰,职责清楚。