背景: 公司项目中,需要暴露对内网的Websocket服务。

Websocket 它是一种基于 TCP 的一种独立实现,APISIX 可以对 TCP/UDP 协议进行代理并实现动态负载均衡, 在 nginx 世界,称 TCP/UDP 代理为 stream 代理,在 APISIX 这里我们也遵循了这个声明,下文统称Stream。

SNI(Server Name Indication)是用来改善 SSL 和 TLS 的一项特性,它允许客户端在服务器端向其发送证书之前向服务器端发送请求的域名,服务器端根据客户端请求的域名选择合适的 SSL 证书发送给客户端。

本文主要介绍ApiSix如何基于SNI代理TCP/UDP。

启用Stream 代理

在 conf/config.yaml 配置文件设置 stream_proxy 选项, 指定一组需要进行动态代理的 IP 地址。默认情况不开启 stream 代理。

apisix:
  stream_proxy: # TCP/UDP proxy
    tcp: # TCP proxy address list
      - 9100
      - "127.0.0.1:9101"
    udp: # UDP proxy address list
      - 9200
      - "127.0.0.1:9211"

如果 apisix.enable_admin 为 true,上面的配置会同时启用 HTTP 和 stream 代理。如果你设置 enable_admin 为 false,且需要同时启用 HTTP 和 stream 代理,设置 only 为 false:

apisix:
  enable_admin: false
  stream_proxy: # TCP/UDP proxy
    only: false
    tcp: # TCP proxy address list
      - 9100

启用 TLS:

首先,我们需要给对应的 TCP 地址启用 TLS

  stream_proxy: # TCP/UDP proxy
    only: false
    tcp: # TCP proxy address list
      - 9100
      - addr: 9333
        tls: true

配置路由

当连接为 TLS over TCP 时,我们可以通过 SNI 来匹配路由,比如:

curl http://127.0.0.1:9180/apisix/admin/stream_routes/1 -H 'X-API-KEY: Your-Api-Key' -X PUT -d '
  {
      "sni": "mysite.com",
      "server_port": 9333,
      "upstream": {
          "scheme": "tls",
          "nodes": {
              "192.168.1.33:8080": 1
          },
          "type": "roundrobin"
      }
  }'

查看是否创建成功:

curl http://127.0.0.1:9180/apisix/admin/stream_routes/1 -H 'X-API-KEY: Your-Api-Key'

如创建错误,可以删除

curl http://127.0.0.1:9180/apisix/admin/stream_routes/1 -H 'X-API-KEY: Your-Api-Key' -X DELETE 

查看更多stream路由:

curl http://127.0.0.1:9180//apisix/admin/stream_routes -H 'X-API-KEY: Your-Api-Key' 

更多Api参考: https://apisix.apache.org/zh/docs/apisix/2.12/admin-api/#stream-route

上传SNI证书

上传站点(mysite.com)的证书到ApiSix

测试

curl -v https://mysite.com:9333/
*   Trying x.x.y.y...
* TCP_NODELAY set
* Connected to mysite.com:19333 (x.x.y.y) port 9333 (#0)
* ALPN, offering h2
* ALPN, offering http/1.1
* successfully set certificate verify locations:
*   CAfile: /etc/ssl/cert.pem
  CApath: none
* TLSv1.2 (OUT), TLS handshake, Client hello (1):
* TLSv1.2 (IN), TLS handshake, Server hello (2):
* TLSv1.2 (IN), TLS handshake, Certificate (11):
* TLSv1.2 (IN), TLS handshake, Server key exchange (12):
* TLSv1.2 (IN), TLS handshake, Server finished (14):
* TLSv1.2 (OUT), TLS handshake, Client key exchange (16):
* TLSv1.2 (OUT), TLS change cipher, Change cipher spec (1):
* TLSv1.2 (OUT), TLS handshake, Finished (20):
* TLSv1.2 (IN), TLS change cipher, Change cipher spec (1):
* TLSv1.2 (IN), TLS handshake, Finished (20):
* SSL connection using TLSv1.2 / ECDHE-RSA-CHACHA20-POLY1305
* ALPN, server did not agree to a protocol
* Server certificate:
*  subject: CN=*.mysite.com
*  start date: Mar 29 00:00:00 2023 GMT
*  expire date: Jun 27 23:59:59 2023 GMT
*  subjectAltName: host "mysite.com" matched cert's "*.mysite.com"
*  issuer: C=AT; O=ZeroSSL; CN=ZeroSSL RSA Domain Secure Site CA
*  SSL certificate verify ok.
> GET / HTTP/1.1
> Host: mysite.com:9333
> User-Agent: curl/7.64.1
> Accept: */*
>
< HTTP/1.1 400 Bad Request
< Content-Type: text/plain; charset=utf-8
< Sec-Websocket-Version: 13
< X-Content-Type-Options: nosniff
< Date: Sun, 23 Apr 2023 05:50:07 GMT
< Content-Length: 12
<
Bad Request
* Connection #0 to host mysite.com left intact
* Closing connection 0

Success。

过程中遇见的问题

  1. 连接不上
 curl -v http://mysite.com
*   Trying x.x.y.y...
* TCP_NODELAY set
* Connected to mysite.com (x.x.y.y.) port 19333
> GET / HTTP/1.1
> Host: mysite.com:19333
> User-Agent: curl/7.64.1
> Accept: */*
>
* Recv failure: Connection reset by peer
* Closing connection 0
curl: (56) Recv failure: Connection reset by peer

需要排查路由器端口是否正常开放,ApiSix的端口是否正常,经排查原因是:配置路由时"server_port"缺失

{
      "sni": "mysite.com",
      "upstream": {
          "scheme": "tls",
          "nodes": {
              "192.168.1.33:8080": 1
          },
          "type": "roundrobin"
      }
  }
  1. handshark失败,证书错误
* TLSv1.2 (OUT), TLS handshake, Client hello (1):
* LibreSSL SSL_connect: SSL_ERROR_SYSCALL in connection to mysite.com:19333
* Closing connection 0
curl: (35) LibreSSL SSL_connect: SSL_ERROR_SYSCALL in connection to mysite.com:19333

排位原因是:这里作者使用的ApiSix版本是2.12,缺失前缀 【addr】

  stream_proxy: # TCP/UDP proxy
    only: false
    tcp: # TCP proxy address list
      - 9100
      - 9333
        tls: true

参考

上篇我们介绍了RedisStream实现消息队列和发布订阅模式 ,今天我们将介绍Go语言的实现方法。

消息队列

接上篇的例子,我们通过go-redis库的接口XAdd、XReadGroup、XGroupCreateMkStream来实现,详细如下:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
)
func main() {
	var (
		ctx      = context.Background()
		consumer = "consumer1"
		stream   = "mystream"
		group    = "group1"
		start    = ">"
		count    = 10
	)

	// 创建Redis客户端
	client := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       0,
	})
	// 创建消费者组
	err := client.XGroupCreateMkStream(ctx, stream, group, "0-0").Err()
	if err != nil {
		panic(err)
	}
	//添加消息
	_, err = client.XAdd(ctx, &redis.XAddArgs{
		Stream: stream,
		ID:     "*",
		Values: map[string]interface{}{
			"name": "foo",
		},
	}).Result()
	if err != nil {
		panic(err)
	}
	//读取消息
	messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
		Group:    group,
		Consumer: consumer,
		Streams:  []string{stream, start},
		Count:    int64(count),
		Block:    0,
		NoAck:    false,
	}).Result()
	if err != nil {
		panic(err)
	} 
	for _, message := range messages[0].Messages {
		fmt.Printf("Received message: %v\n", message.Values)
	}
}

上例实现了一个简单的消息队列,可以实现消息的发送和接收,并保证每条消息只被处理一次 。

阻塞超时

通过修改Block(单位为ms)参数来实现阻塞,详细修改如下

for {
		fmt.Println("Start Receiving", time.Now().String())
		//读取消息
		messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    group,
			Consumer: consumer,
			Streams:  []string{stream, start},
			Count:    int64(count),
			Block:    2000,
			NoAck:    false,
		}).Result()
		if err != nil {
			panic(err)
		}
		fmt.Println("Received Length=", len(messages), time.Now().String()) 
    ...
	}

输出如下:

Start Receiving 2023-04-08 21:39:23.9669498 +0800 CST m=+0.529310801
Received Length= 1 2023-04-08 21:39:24.0419963 +0800 CST m=+0.604357301
Received message: map[name:foo]
Start Receiving 2023-04-08 21:39:24.0419963 +0800 CST m=+0.604357301
panic: read tcp 127.0.0.1:63777->127.0.0.1:6379: i/o timeout

可见,XReadGroup一直阻塞等待新的消息,直到 Redis 连接超时或者命令被中断。 ps: 如若不需要阻塞等待可修改Block为<0 的值实现。

发布订阅模式

接着,我们实现上篇提到的发布订阅模式,XGroupCreateMkStream创建另外一个订阅组group2,两个协程XReadGroup接收消息,往stream中XAdd消息,两个订阅者将都收到消息,详细如下:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
	"time"
)
func main() {
	var (
		ctx       = context.Background()
		consumer  = "consumer1"
		consumer2 = "consumer2"
		stream    = "mystream"
		group     = "group1"
		group2    = "group2"
		start     = ">"
		count     = 10
	)
	// 创建Redis客户端
	client := redis.NewClient(&redis.Options{
		Addr:        "localhost:6379",
		Password:    "",
		DB:          0,
		ReadTimeout: time.Second * 10,
	})
	// 创建消费者组
	err := client.XGroupCreateMkStream(ctx, stream, group, "0-0").Err()
	if err != nil {
		//panic(err)
	}
	// 创建消费者组2
	err = client.XGroupCreateMkStream(ctx, stream, group2, "0-0").Err()
	if err != nil {
		//panic(err)
	}
	//添加消息
	_, err = client.XAdd(ctx, &redis.XAddArgs{
		Stream: stream,
		ID:     "*",
		Values: map[string]interface{}{
			"name": "foo",
		},
	}).Result()
	if err != nil {
		panic(err)
	}
	go func() {
		for {
			fmt.Println("Start Receiving", time.Now().String())
			//读取消息
			messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
				Group:    group,
				Consumer: consumer,
				Streams:  []string{stream, start},
				Count:    int64(count),
				Block:    2000,
				NoAck:    false,
			}).Result()
			if err != nil {
				panic(err)
			}
			fmt.Println("Received Length=", len(messages), time.Now().String())
			for _, message := range messages[0].Messages {
				fmt.Printf("Received message: %v\n", message.Values)
			}

		}
	}()
	//读取消息2
	go func() {
		for {
			fmt.Println("2 Start Receiving ", time.Now().String())
			messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
				Group:    group2,
				Consumer: consumer2,
				Streams:  []string{stream, start},
				Count:    int64(count),
				Block:    2000,
				NoAck:    false,
			}).Result()
			if err != nil {
				panic(err)
			}
			fmt.Println("2 Received Length=", len(messages), time.Now().String())
			for _, message := range messages[0].Messages {
				fmt.Printf("2Received message: %v\n", message.Values)
			}
		}
	}()
	for {
	}
}

输出如下:

Start Receiving 2023-04-08 22:19:41.9981051 +0800 CST m=+0.347161101
2 Start Receiving  2023-04-08 22:19:41.9981051 +0800 CST m=+0.347161101
Received Length= 1 2023-04-08 22:19:42.0571709 +0800 CST m=+0.406226901
Received message: map[name:foo]
Start Receiving 2023-04-08 22:19:42.0571709 +0800 CST m=+0.406226901
2 Received Length= 1 2023-04-08 22:19:42.3628412 +0800 CST m=+0.711897201
2Received message: map[name:foo]

参考

Redis Stream是在 Redis 5.0中引入的数据类型,可以实现高性能、高可靠性的消息队列。本文主要介绍 Redis Stream 的概念、使用方法和一些适用场景如发布订阅模式、消息队列。 简介 Redis Stream是Redis为消息队列设计的数据类型,它将消息队列的数据结构抽象为一个有序的消息流(stream),每个消息都有一个唯一的ID和一个关联的键值对(key-value pairs)它支持以下功能: 添加消息-将消息添加到队列的末尾 读取消息-从队列的开头读取消息 删除消息-删除指定id的消息 创建消费者组-创建多个消费者组,每个消费者组都可以独立地读取消息。同时可实现负载均衡和消息分发。 确认消息- XACK 命令来确认已经成功处理的消息,避免重复消费 XADD-添加消息 语法: XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...] MAXLEN:可选参数,用于限制消息流的长度。如果指定了 MAXLEN,那么当消息流的长度超过指定的长度时,最早的消息将被自动删除 ID:可选参数,用于指定消息的 ID。如果不指定,Redis 会自动生成一个唯一的 ID。 field value [field value … :消息的键值对,可以是任何字符串。 例如: XADD mystream * name Foo age 10 此命令将向名为 mystream 的消息流中添加一条消息,输出如下: >"1680355760868-0" XREAD-读取消息 语法:

在 Postgres 中基于 MVCC(多版本并发控制)技术 ,使用一种称为“版本链”的技术来维护对数据库中行的并发访问, 来实现事务隔离级别的。

MVCC-多版本并发控制

在 Postgres 中,每当一行被修改时,Postgres 就会创建一个新版本的该行,并将其添加到版本链的末尾。版本链包含了所有当前和以前的行版本,这样可以避免并发读写时的数据竞争和数据不一致问题。

MVCC避免了传统的数据库系统的锁定方法,将锁争夺最小化来允许多用户环境中的合理性能。

常见并发问题

  1. 脏读: 一个事务读取了另一个并行未提交事务写入的数据
  2. 不可重复读 一个事务重新读取之前读取过的数据,发现该数据已经被另一个事务(在初始读之后提交)修改。
  3. 幻读 一个事务重新执行一个返回符合一个搜索条件的行集合的查询, 发现满足条件的行集合因为另一个最近提交的事务而发生了改变。
  4. 序列化异常 成功提交一组事务的结果与这些事务所有可能的串行执行结果都不一致。

事务隔离级(解决方案)

事务隔离级别指的是多个并发事务之间的可见性和可操作性,SQL标准定义了四种隔离级别,分别是:

  1. Read Uncommitted(读未提交) 这是最低的事务隔离级别,它允许一个事务读取其他事务未提交的数据。这可能会导致脏读、不可重复读和幻读问题。
  2. Read Committed(读提交) 在该隔离级别下,事务只能看到已经提交的数据。它解决了脏读问题,但仍然存在不可重复读和幻读问题。
  3. Repeatable Read(可重复读) 在该隔离级别下,事务在执行期间看到的所有数据都是一致的。它解决了不可重复读问题,但仍然存在幻读问题。
  4. Serializable(可串行化) 在该隔离级别下,事务看到的所有数据都是一致的,并且完全避免了脏读、不可重复读和幻读问题。这是最高的事务隔离级别,但也是最慢的,因为它会导致大量的锁竞争。

在PostgreSQL中,你可以请求四种标准事务隔离级别中的任意一种,但是内部只实现了三种不同的隔离级别,即 PostgreSQL 的读未提交模式的行为和读已提交相同。

golang lib/pq设置隔离等级

在golang中,使用lib/pq包,通过执行“SET TRANSACTION ISOLATION LEVEL …”来设置隔离等级。以下测试脏读的例子:

package main

import (
	"database/sql"
	"fmt"
	_ "github.com/lib/pq"
	"time"
)

func main() {
	// 连接数据库
	db, err := sql.Open("postgres", dsn)
	if err != nil {
		panic(err)
	}
	defer db.Close()

	// 设置隔离级别为读未提交
	_, err = db.Exec("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED")
	//_, err = db.Exec("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
	if err != nil {
		panic(err)
	}
	// 开始第一个事务,修改用户 "Bob" 的年龄
	tx1, err := db.Begin()
	if err != nil {
		panic(err)
	}
	go func() {
		_, err := tx1.Exec("UPDATE users SET age = 6 WHERE name = 'Bob'")
		if err != nil {
			panic(err)
		}
		rows, err := tx1.Query("SELECT age FROM users WHERE name = 'Bob'")
		if err != nil {
			panic(err)
		}
		var age int
		if rows.Next() {
			err = rows.Scan(&age)
			if err != nil {
				panic(err)
			}
		}
		fmt.Printf("tx1 Bob's age is %d\n", age)
	}()

	// 暂停 1 秒钟,以便让第二个事务读取到未提交的数据
	time.Sleep(time.Second)
	// 开始第二个事务,读取用户 "Bob" 的年龄
	tx2, err := db.Begin()
	if err != nil {
		panic(err)
	}
	rows, err := tx2.Query("SELECT age FROM users WHERE name = 'Bob'")
	if err != nil {
		panic(err)
	}
	var age int
	if rows.Next() {
		err = rows.Scan(&age)
		if err != nil {
			panic(err)
		}
	}
	fmt.Printf("tx2 Bob's age is %d\n", age)
	// 提交第一个事务
	err = tx1.Commit()
	if err != nil {
		panic(err)
	}
	// 提交第二个事务
	err = tx2.Commit()
	if err != nil {
		panic(err)
	}
}

执行输出如下:

tx1 Bob's age is 6
tx2 Bob's age is 5

上例中,分别开启两个隔离级别为“读未提交”事务tx1、tx2,其中tx1对一行数据(Bob)的字段(Age)进行了修改,接着两个事务分别读取,即使在tx1提交之前,tx2也不会读取到未提交的数据,有效避免脏读等并发问题。

小结

PostgreSQL 的事务隔离级别和 MVCC 技术为开发人员提供了强大的并发性能和数据一致性保证。同时MVCC并发控制模型,降低了锁竞争和死锁的风险,允许并发事务访问相同的数据而不会相互干扰。对查询(读)数据的锁请求与写数据的锁请求不冲突,所以读不会阻塞写,而写也从不阻塞读。但是,在选择事务隔离级别时,需要考虑不同事务隔离级别的性能以及应用程序的需求和性能特征,并根据实际情况进行权衡,充分利用 MVCC 技术的优点。

参考

在上一篇 Go网络抓包、引流工具GoReplay 介绍了Gor的一些基础和实际开发中对 HTTP 流量进行复制,以用于测试、性能监测等用途。而Apache APISIX 是 Apache 软件基金会下的云原生 API 网关,它兼具动态、实时、高性能等特点,提供了负载均衡、动态上游、灰度发布(金丝雀发布)、服务熔断、身份认证、可观测性等丰富的流量管理功能。结合使用 Gor 和 Apisix 可以轻松地实现流量复制,并将其应用于各种场景中。

本篇文章将介绍如何使用 Gor 和 Apisix 实现流量复制,具体包括以下几个部分:

  1. 测试环境介绍
  2. 安装和配置 Gor
  3. 安装和配置 Apisix
  4. 配置 Apisix 的 proxy-mirror 插件
  5. 测试流量复制

测试环境介绍

  1. OS: 两台 Ubuntu 虚拟机(Gateway、Node)

    a. Gateway 安装 ApiSix

    b. Node 安装节点服务

  2. Docker & DockerCompose

安装和配置 Gor

首先,需要在Node上安装 Gor 工具。可以从 Gor 的官网下载相应的二进制文件,然后将其解压到服务器上即可。

# tar xvf gor_1.3.3_x64.tar.gz
# cp gor /usr/local/bin/gor
# sudo chmod +x /usr/local/bin/gor

安装完成后,可以使用以下命令启动 Gor:

# gor version

输出

# Version:1.3.0 

安装和配置 Apisix

ApiSix: 这里使用 DockerCompose安装

git clone https://github.com/apache/apisix-docker.git

cd apisix-docker/example

docker-compose up -d

这个命令会启动 Apisix 并监听在本地的 80 端口。

配置 Apisix 的 proxy-mirror 插件

Apisix 的 proxy-mirror 插件可以用于流量复制。 增加一个路由hello-proxy-mirror, Upstream为Node主机的服务A(端口9696),流量复制到Node主机的服务B(端口9797),其中

  1. host 为 指定镜像服务地址
  2. sample_ratio 镜像请求采样率,本例中为1,表示全量复制
{
  "uri": "/hello",
  "name": "hello-proxy-mirror",
  "methods": [
    "GET",
    "POST",
    "PUT",
    "DELETE",
    "PATCH",
    "HEAD",
    "OPTIONS",
    "CONNECT",
    "TRACE"
  ],
  "plugins": {
    "proxy-mirror": {
      "disable": false,
      "host": "http://192.168.1.101:9797",
      "sample_ratio": 1
    }
  },
  "upstream": {
    "nodes": [
      {
        "host": "192.168.1.101",
        "port": 9696,
        "weight": 1
      }
    ],
    "timeout": {
      "connect": 6,
      "send": 6,
      "read": 6
    },
    "type": "roundrobin",
    "scheme": "http",
    "pass_host": "pass",
    "keepalive_pool": {
      "idle_timeout": 60,
      "requests": 1000,
      "size": 320
    }
  },
  "status": 1
}

这个配置会开启 proxy-mirror 插件,并将其配置为将流量发送到本地的 8080 端口,即 Gor 监听的地址。

测试流量复制

接着可开始测试流量复制。

启动 HTTP 服务A监听9696端口:

python3 -m http.server 9696

正常启动输出

Serving HTTP on 0.0.0.0 port 9696 (http://0.0.0.0:9696/) ...

启动 镜像服务B监听9797端口:

python3 -m http.server 9797

正常启动输出

Serving HTTP on 0.0.0.0 port 9797 (http://0.0.0.0:9797/) ...

启动 Gor 监听在本地的 9797 端口

sudo gor --input-raw :9797 --output-stdout  

此命令将监听镜像服务B端口9797, 将 HTTP 流量输出到控制台

接着,curl测试

curl http://192.168.1.100:80/hello -i

输出

HTTP/1.1 404 File not found
Content-Type: text/html;charset=utf-8
Content-Length: 469
Connection: keep-alive
Date: Sun, 19 Mar 2023 06:55:42 GMT
Server: APISIX/3.2.0

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN"
        "http://www.w3.org/TR/html4/strict.dtd">
<html>
    <head>
        <meta http-equiv="Content-Type" content="text/html;charset=utf-8">
        <title>Error response</title>
    </head>
    <body>
        <h1>Error response</h1>
        <p>Error code: 404</p>
        <p>Message: File not found.</p>
        <p>Error code explanation: HTTPStatus.NOT_FOUND - Nothing matches the given URI.</p>
    </body>
</html>

测试会发送一个 HTTP 请求到Apisix开放的Http端口80,可以到达下游服务(Node主机的ServerA 9696端口),输出如下:

192.168.1.100 - - [19/Mar/2023 06:47:44] code 404, message File not found
192.168.1.100 - - [19/Mar/2023 06:47:44] "GET /hello HTTP/1.1" 404 -

而且,ApiSix Proxy Mirror复制Http请求到端口 9797的镜像服务, 输出如下:

192.168.1.100 - - [19/Mar/2023 06:49:07] code 404, message File not found
192.168.1.100 - - [19/Mar/2023 06:49:07] "GET /hello HTTP/1.1" 404 -

可见和目标服务的的完全一致。 接着,观察Gor是否正常记录http请求,正常输出如下:

1 af0c2645c0a801c7399d6efb 1679212462567906772 0
GET /hello HTTP/1.1
Host: 192.168.1.100
Connection: close
User-Agent: curl/7.64.1
Accept: */*

说明Gor已经成功从镜像服务捕获流量。 再结合使用Gor的流量回放功能可以应用于例如测试、性能监测等场景。 以上。

参考

在 Kubernetes 中,Kubelet 是在每个节点上运行的重要组件之一,它负责管理容器的生命周期。而 CRI(Container Runtime Interface)则是 Kubelet 用于与容器运行时进行通信的接口(如下图)。

CRI 采用了 ProtoBuffer 和 gPRC,规定 kubelet 该如何调用容器运行时去管理容器和镜像,Kubernetes 通过CRI可支持多种类型的OCI容器运行时,例如 docker、contained、CRI-O、runC、fraki和Kata Containers 等)。

为了方便用户进行容器运行时的调试工作,社区提供了 crictl 工具,用于与 CRI 接口进行交互,本文简要介绍如何使用 crictl 对 Kubernetes节点进行调试 。

kubelet-layout:

安装

你可以从 cri-tools 发布页面 下载一个压缩的 crictl 归档文件,用于几种不同的架构。 下载与你的 kubernetes 版本相对应的版本。 提取它并将其移动到系统路径上的某个位置,例如 /usr/local/bin/。

  • 查看版本,验证安装
crictl --version

输出例如如下,说明安装成功:

crictl version v1.23.0 

查看或编辑配置

要查看或编辑当前配置,请查看或编辑 /etc/crictl.yaml 的内容。

cat /etc/crictl.yaml
image-endpoint: unix:///var/run/image-cri-shim.sock
runtime-endpoint: unix:///run/containerd/containerd.sock

调试节点

  • 列出运行中的容器:
crictl ps

例如我们列出k8s集群的所有容器,例如输出:

CONTAINER           IMAGE               CREATED             STATE               NAME                      ATTEMPT             POD ID
508e30da66ce7       7a71aca7b60fc       3 days ago          Running             calico-node               0                   e0ec650992997
9daa288a68426       f822f80398b9a       3 days ago          Running             calico-typha              0                   f5c4bd6471941
300d948e75019       f6bc1b780606f       3 days ago          Running             kube-controller-manager   1                   d5d681744a377
1cfdc1a6726ae       0198979b7707e       3 days ago          Running             kube-scheduler            1                   eb6ff07ees98c
3699c312c56f9       9e6a540eeeb62       3 days ago          Running             kube-proxy                0                   e8707140d12941
4159d7ec37b29       5bc0062e9555c       3 days ago          Running             kube-apiserver            0                   22d043569737f
8f56a047e8627      25f8c7f3da61c       3 days ago          Restart             etcd                      0                   458e540c798c8

本例中,etcd容器一直启动,可以使用以下命令获取容器的日志:

crictl logs container-id

如此,通过日志帮助定位问题。

更多命令

  • 列出所有的pods
crictl pods
  • 创建容器
crictl run --runtime=remote \
  docker.io/library/nginx:latest \
  nginx-container

ps:使用远程容器CRI来使用最新的 nginx 镜像启动nginx-container的容器。

  • 删除容器:
crictl rm nginx-container
  • 列出所有镜像:
crictl images
  • 帮助
 crictl -h 
NAME:
   crictl - client for CRI

USAGE:
   crictl [global options] command [command options] [arguments...]

VERSION:
   v1.23.0

COMMANDS:
   attach              Attach to a running container
   create              Create a new container
   exec                Run a command in a running container
   version             Display runtime version information
   images, image, img  List images
   inspect             Display the status of one or more containers
   inspecti            Return the status of one or more images
   imagefsinfo         Return image filesystem info
   inspectp            Display the status of one or more pods
   logs                Fetch the logs of a container
   port-forward        Forward local port to a pod
   ps                  List containers
   pull                Pull an image from a registry
   run                 Run a new container inside a sandbox
   runp                Run a new pod
   rm                  Remove one or more containers
   rmi                 Remove one or more images
   rmp                 Remove one or more pods
   pods                List pods
   start               Start one or more created containers
   info                Display information of the container runtime
   stop                Stop one or more running containers
   stopp               Stop one or more running pods
   update              Update one or more running containers
   config              Get and set crictl client configuration options
   stats               List container(s) resource usage statistics
   completion          Output shell completion code
   help, h             Shows a list of commands or help for one command

GLOBAL OPTIONS:
   --config value, -c value            Location of the client config file. If not specified and the default does not exist, the program's directory is searched as well (default: "/etc/crictl.yaml") [$CRI_CONFIG_FILE]
   --debug, -D                         Enable debug mode (default: false)
   --image-endpoint value, -i value    Endpoint of CRI image manager service (default: uses 'runtime-endpoint' setting) [$IMAGE_SERVICE_ENDPOINT]
   --runtime-endpoint value, -r value  Endpoint of CRI container runtime service (default: uses in order the first successful one of [unix:///var/run/dockershim.sock unix:///run/containerd/containerd.sock unix:///run/crio/crio.sock unix:///var/run/cri-dockerd.sock]). Default is now deprecated and the endpoint should be set instead. [$CONTAINER_RUNTIME_ENDPOINT]
   --timeout value, -t value           Timeout of connecting to the server in seconds (e.g. 2s, 20s.). 0 or less is set to default (default: 2s)
   --help, -h                          show help (default: false)
   --version, -v                       print the version (default: false)

以上。

参考

github.com/robfig/cron/v3 是一个功能强大且易于使用的定时任务管理库。本文进一步介绍robfig/cron在定时任务一些主要功能、如何使用它以及一些实际应用场景的例子。主要包括

  1. 添加任务方法AddJob
  2. 指定执行时间
  3. 动态添加和删除任务
  4. Option选项
  5. JobWrapper与DefaultWrapper

AddJob添加任务

Cron实例可以通过调用AddJob() 方法用于添加一个实现了 Job 接口的对象作为任务,Example:

package main

import (
	"fmt"
	"github.com/robfig/cron/v3"
	"time"
)

type GreetingJob struct {
	Name string
}

func (g GreetingJob) Run() {
	fmt.Println("Hi: ", g.Name, "now:", time.Now().String())
}

func main() {
	c := cron.New()
	entityID, err := c.AddJob("@every 2s", GreetingJob{"Greeter"})
	if err != nil {
		fmt.Errorf("error : %v", err)
		return
	}
	fmt.Println("entityID:", entityID)
	c.Start()
	defer c.Stop()
	select {}
}

//output:
//entityID: 1
//Hi:  Greeter now: 2023-03-04 17:50:07.0169185 +0800 CST m=+1.716253301
//Hi:  Greeter now: 2023-03-04 17:50:09.0068064 +0800 CST m=+3.706141201

此例中,GreetingJob实现了cron.Job 接口,cron实例调用AddJob,加入一个每2秒执行的任务,务并传递参数。

指定执行时间

可以通过修改 cron 表达式来指定任务的执行时间。Example:

c := cron.New(cron.WithSeconds()) 
entityID, err := c.AddFunc("0 05 18 * * *", func() {
	fmt.Println("hi now:", time.Now().String())
})

此例中,将在每天的傍晚18点5分执行指定的函数。

动态添加和删除任务

cron/v3 允许开发人员在运行时动态添加和删除任务。Example:

package main

import (
	"fmt"
	"github.com/robfig/cron/v3"
	"time"
)

func main() {
	c := cron.New(cron.WithSeconds())
	c.Start()
	defer c.Stop()
	count := 0
     // 添加第一个任务
	entityID1, err := c.AddFunc("*/2 * * * * *", func() {
		count++
		fmt.Println("Job1: ", count, "now:", time.Now().String())
	})
	must(err)
	fmt.Println("entityID1:", entityID1)
     // 等待 6 秒钟,让第一个任务执行3次
	time.Sleep(time.Second * 6)
    // 添加第二个任务
	entityID2, err := c.AddFunc("*/2 * * * * *", func() {
		count++
		fmt.Println("Job2: ", count, "now:", time.Now().String())
	})
	must(err)
     // 等待 10秒钟,让两个任务交替执行几次
	time.Sleep(time.Second * 10)
     // 删除第2个任务
	c.Remove(entityID2)
     // 等待 10秒钟,发现只有任务1在执行了
	time.Sleep(time.Second * 10)
     // 删除第2个任务
	c.Remove(cron.EntryID(1))
	fmt.Println("entityID2", entityID2)
    //发现只有没有任务在执行了 
	select {}
}

func must(err error) {
	if err != nil {
		panic(any(err))
	}
}

Option选项

Option 是一种用于配置 Cron 实例的结构体类型。Option 类型有多个可选字段,可用于配置定时任务的行为,上例中有使用到的WithSeconds。

以下是 Option 类型的一些字段及其说明:

  • WithSeconds():在 cron 表达式中包含秒(0-59),默认为不包含秒。
  • WithLocation():设置时区,可以使用标准时区名称或时区偏移量。
  • WithChain():将多个函数连接成单个函数。
  • WithParser():指定 cron 表达式的解析器,默认为 StandardParser。
  • WithLogger():指定日志记录器,默认为 DefaultLogger。

下面 WithLogger Option Cron Example:

package main

import (
	"fmt"
	"github.com/robfig/cron/v3"
	"log"
	"os"
	"time"
)

func main() {
	c := cron.New(
		cron.WithLogger(
			cron.VerbosePrintfLogger(log.New(os.Stdout, "cron: ", log.LstdFlags))))
	c.AddFunc("@every 1s", func() {
		fmt.Println("hello world")
	})
	c.Start()
	defer c.Stop()

	time.Sleep(3 * time.Second)
}
//output:
//cron: 2023/03/04 21:31:17 start
//cron: 2023/03/04 21:31:17 schedule, now=2023-03-04T21:31:17+08:00, entry=1, next=2023-03-04T21:31:18+08:00
//cron: 2023/03/04 21:31:18 wake, now=2023-03-04T21:31:18+08:00
//cron: 2023/03/04 21:31:18 run, now=2023-03-04T21:31:18+08:00, entry=1, next=2023-03-04T21:31:19+08:00
//hello world

上例中,调用cron.VerbosPrintfLogger()包装log.Logger,这个logger会详细记录cron内部的调度过程

JobWrapper与DefaultWrapper

//NewChain返回一个由给定JobWrappers组成的Chain,类似中间件。

type JobWrapper func(Job) Job

type Chain struct {
	wrappers []JobWrapper
}

func NewChain(c ...JobWrapper) Chain {
	return Chain{c}
}

cron内置了 3 个用得比较多的JobWrapper

  • Recover 捕获内部Job产生的 panic
  • DelayIfStillRunning 序列化作业,延迟后续的运行直到前一个是完整的。
  • SkipIfStillRunning 跳过Job的调用,如果之前的调用是仍在运行。 以Reover为例:
package main

import (
	"fmt"
	"github.com/robfig/cron/v3"
)

func main() {
	c := cron.New(
		cron.WithSeconds(),
		cron.WithChain(
			cron.Recover(cron.DefaultLogger),
		),
	)

	c.AddFunc("@every 2s", func() {
		fmt.Println("Start...")
		panic(any("ohno...."))
		fmt.Println("End...")
	})

	c.Start()
	defer c.Stop()

	select {}
}

output:

Start...
cron: 2023/03/04 21:59:32 panic, error=ohno...., stack=...
goroutine 7 [running]:
github.com/robfig/cron/v3.Recover.func1.1.1()
	E:/gopath/pkg/mod/github.com/robfig/cron/v3@v3.0.1/chain.go:45 +0xa5
panic({0x7370c0, 0x75b930})
	...

在上例中,使用 cron.Recover() 方法来捕获任务执行过程中的 panic,并记录日志。如果不进行 panic 捕获的话,程序将会因为 panic 而退出。需要注意的是,当使用 Recover() 方法时,不应该让任务函数返回一个 error,否则它将不会被正确地捕获。如果任务函数可能会返回 error,建议使用 Try() 方法进行捕获。

以上。

参考

本文主要介绍Docker、Go、PostgreSQL如何修改它们的时区。

首先需要知道一些基础概念:

  1. Unix 时间戳 -是从1970年1月1日(UTC/GMT的午夜)开始所经过的秒数,不考虑闰秒。
  2. UTC –协调世界时,又称世界统一时间、世界标准时间、国际协调时间。
  3. CST–可视为中国、古巴的标准时间或美国、澳大利亚的中部时间。北京时间,也就是东八区时间。

Docker

Docker 作为部署和运行应用程序的环境,默认使用 UTC 作为其容器的时区,但我们可以通过设置环境变量来修改时区。

修改的方法

  1. 在 Dockerfile 中添加以下行:
ENV TZ=Asia/Shanghai
  1. 在 Kubernetes 中的 Pod 配置文件中,添加 env 字段,设置环境变量。Example:
spec:
  containers:
  - name: my-container
    image: my-image
    env:
      - name: TZ
        value: Asia/Shanghai
  1. 验证在容器中使用 env 命令查看环境变量,例如
env

输出会有key为TZ,value为Asia/Shanghai表示成功

TZ=Asia/Shanghai

Go

在 Go 中修改时区需要使用标准库中的 time 包。我们可以通过FixedZone来修改时区。 Example:

package main

import (
	"fmt"
	"time"
)

func main() {
	var utcZone = time.FixedZone("UTC", 0*3600) // UTC
	time.Local = utcZone
	utcNow := time.Now()
	utcDate := time.Date(utcNow.Year(), utcNow.Month(), utcNow.Day(), 0, 0, 0, 0, utcNow.Location())
	fmt.Printf("UTC time: %s\n", utcDate.String())
	fmt.Printf("UTC timestamp: %d\n", utcDate.Unix())

	var cstZone = time.FixedZone("CST", 8*3600) // 东八
	time.Local = cstZone
	cstNow := time.Now()
	cstDate := time.Date(cstNow.Year(), cstNow.Month(), cstNow.Day(), 0, 0, 0, 0, cstNow.Location())
	fmt.Printf("CST time: %s\n", cstDate.String())
	fmt.Printf("CST timestamp: %d\n", cstDate.Unix())
}
//output
//UTC time: 2023-02-25 00:00:00 +0000 UTC
//UTC timestamp: 1677283200
//CST time: 2023-02-25 00:00:00 +0800 CST
//CST timestamp: 1677254400

在上面的代码中,我们使用time.FixedZone分别设置UTC、上海时区,并获取当天零点的时间戳 _date。

PostgreSQL

在PostgreSQL系统内部,所有日期和时间都用全球统一时间UTC格式存储, 时间在发给客户前端前由数据库服务器根据TimeZone 配置参数声明的时区转换成本地时间

在 PostgreSQL 中,我们可以通过修改 postgresql.conf 文件来修改时区。 以下是如何在 Docker运行 PostgreSQL 中修改时区的步骤:

  1. 拷贝dockers中的 postgresql.conf到宿主主机
sudo docker cp [your_docker_contariner_id]:/var/lib/postgresql/data/postgresql.conf /[your_work_space]/
  1. 修改配置
sudo vi /[your_work_space]/postgresql.conf

查找替换timezone为上海时区

timezone = 'Asia/Shanghai'

可以通过sql查找支持的时区:

select * from pg_timezone_names;
  1. 保存并覆盖dockers中配置
sudo docker cp /[your_work_space]/postgresql.conf [your_docker_contariner_id]:/var/lib/postgresql/data/
  1. 重新容器
sudo docker restart [your_docker_contariner_id

现在,你的 PostgreSQL 数据库就使用了正确的时区。

  1. 检查是否设置成功 通过sql获取设置:
select * from pg_db_role_setting;

也可以查看数据库中表字段格式为TimestampTZ,Example 修改前

2023-02-25 00:00:00 +0000 +00

修改后台

2023-02-25 08:00:00 +0000 +08

以上。

参考

分布式任务任务调度与管理在微服务开发中是很有必要的。例如,当需要执行一些计算密集型或网络I/O密集型操作时,为了不影响主线程的性能,我们可以将这些任务放到后台异步执行。此外,异步任务处理还可以改善应用程序的可伸缩性和可靠性,因为它可以将任务分布到多个处理器上并允许任务的重试。 Asynq 介绍 Asynq Simple, reliable, and efficient distributed task queue in Go Asynq 是一个 Go 库,用于排队任务并与 worker 异步处理它们。它由Redis提供支持,旨在实现可扩展且易于上手。 特性: 保证至少执行一次任务 任务调度 失败任务的重试 工作人员崩溃时自动恢复任务 加权优先级队列 严格的优先队列 添加任务的延迟低,因为 Redis 中的写入速度很快 使用唯一选项对任务进行重复数据删除 允许每个任务超时和截止日期 允许聚合任务组以批处理多个连续操作 支持中间件的灵活处理程序接口 能够暂停队列以停止处理队列中的任务 定期任务 支持 Redis Cluster实现自动分片和高可用 支持 Redis Sentinels以实现高可用性 与Prometheus集成以收集和可视化队列指标 用于检查和远程控制队列和任务的Web UI CLI检查和远程控制队列和任务 总的来说: 分布式任务队列:Asynq提供了一个任务队列,可以分布式地处理异步任务,使得任务可以在多个处理器之间分配。 可靠性:Asynq具有高可靠性,可以确保任务不会丢失或重复执行。 异常处理:Asynq提供了对任务异常的处理机制,以便在任务执行失败时进行重试或处理。 优先级和延迟任务:Asynq允许您为任务设置优先级和延迟执行,以便您可以控制任务的执行顺序。 Web UI和CLI:Asynq提供了一个易于使用的Web UI和CLI工具,可以方便地监控和管理异步任务。 本文主要记录Asynq的入门、基本使用和工作原理。 安装 使用go get命令安装Asynq库 go get -u github.com/hibiken/asynq 工作原理 高级概述: Client客户端将任务放入队列 Server服务器从队列中拉取任务并为每个任务启动一个工作协程 任务由多个worker同时处理 Client 创建异步任务 asynq.

Kratos Middleware是Kratos的核心一个中间件层,它是Kratos架构的核心部分,负责处理所有微服务间的通信和数据流。本文将深入了解Kratos Middleware的工作原理,更好地理解和使用Kratos。

使用示例

Kratos 内置了一系列的 middleware(中间件)用于处理 logging、 metrics 等通用场景, 您也可以通过实现 Middleware 接口,开发自定义 middleware,进行通用的业务处理,比如用户登录鉴权等。

// http
// 定义opts
var opts = []http.ServerOption{
    http.Middleware(
        recovery.Recovery(), // 把middleware按照需要的顺序加入
        tracing.Server(),
        logging.Server(),
    ),
}
// 创建server
http.NewServer(opts...)

自定义中间件的例子:

func Middleware1() middleware.Middleware {
    return func(handler middleware.Handler) middleware.Handler {
        return func(ctx context.Context, req interface{}) (reply interface{}, err error) {
            if tr, ok := transport.FromServerContext(ctx); ok {
                // Do something on entering
                defer func() {
                // Do something on exiting
                 }()
            }
            return handler(ctx, req)
        }
    }
}

链式调用

Kratos Middleware是通过链式结构实现的,其中每一个中间件都代表一个独立的处理单元,可以在请求的生命周期中的不同阶段执行不同的操作,如请求前的预处理、请求后的后处理等。

每一个中间件都是一个函数,具有相同的签名:

// Handler定义中间件调用的处理程序
type Handler func(ctx context.Context, req interface{}) (interface{}, error)

再看看/go-kratos/kratos/blob/main/middleware/middleware包中Middleware和Chain的实现,Middleware 是HTTP/gRPC传输中间件。 Chain返回一个中间件,它指定endpoint.的链式处理程序。

type Middleware func(Handler) Handler

func Chain(m ...Middleware) Middleware {
	return func(next Handler) Handler {
		for i := len(m) - 1; i >= 0; i-- {
			next = m[i](next)
		}
		return next
	}
}

Example,以https://github.com/go-kratos/kratos/blob/main/middleware/middleware_test.go为例:

func TestChain(t *testing.T) {
	next := func(ctx context.Context, req interface{}) (interface{}, error) {
		t.Log(req)
		i += 10
		return "reply", nil
	}

	got, err := Chain(test1Middleware, test2Middleware, test3Middleware)(next)(context.Background(), "hello kratos!")
	if err != nil {
		t.Errorf("expect %v, got %v", nil, err)
        }
  }
  
func test1Middleware(handler Handler) Handler {
	return func(ctx context.Context, req interface{}) (reply interface{}, err error) {
		fmt.Println("test1 before")
		i++
		reply, err = handler(ctx, req)
		fmt.Println("test1 after")
		return
	}
}  
... 篇幅限制省略部分代码
OutPut
test1 before
test2 before
test3 before
    middleware_test.go:14: hello kratos!
test3 after
test2 after
test1 after

调用过程

对链式调用的使用有所了解后,接着来看Middleware 调用组织过程,大致归纳为:

  1. option方式配置中间件
  2. url路由匹配中间件
  3. 组装中间件添加到调用链
  4. 注册到gorilla,其中gorilla实现net.hander,设置为服务路由中间件

以Transport.Server Http服务为例,重点关注middleware和router 。

// Server is an HTTP server wrapper.
type Server struct {
	*http.Server
	middleware  matcher.Matcher
	router      *mux.Router
	...
}
// Matcher is a middleware matcher.
type Matcher interface {
	Use(ms ...middleware.Middleware)
	Add(selector string, ms ...middleware.Middleware)
	Match(operation string) []middleware.Middleware
}
  • Matcher是一个中间件匹配器,operation 为path,实现路由查找。
  • router 为gorilla.Mux, Http路由器和Url匹配器

一个完成的调用api-service:

func _Greeter_SayHello0_HTTP_Handler(srv GreeterHTTPServer) func(ctx http.Context) error {
	return func(ctx http.Context) error {
		var in HelloRequest
		if err := ctx.BindQuery(&in); err != nil {
			return err
		}
		if err := ctx.BindVars(&in); err != nil {
			return err
		}
		http.SetOperation(ctx, "/helloworld.v1.Greeter/SayHello")
		h := ctx.Middleware(func(ctx context.Context, req interface{}) (interface{}, error) {
			return srv.SayHello(ctx, req.(*HelloRequest))
		})
		out, err := h(ctx, &in)
		if err != nil {
			return err
		}
		reply := out.(*HelloReply)
		return ctx.Result(200, reply)
	}
}

其中ctx.Middleware的实现是:

func (c *wrapper) Middleware(h middleware.Handler) middleware.Handler {
	if tr, ok := transport.FromServerContext(c.req.Context()); ok {
		return middleware.Chain(c.router.srv.middleware.Match(tr.Operation())...)(h)
	}
	return middleware.Chain(c.router.srv.middleware.Match(c.req.URL.Path)...)(h)
} 

func (m *matcher) Match(operation string) []middleware.Middleware {
	ms := make([]middleware.Middleware, 0, len(m.defaults))
	if len(m.defaults) > 0 {
		ms = append(ms, m.defaults...)
	}
	if next, ok := m.matchs[operation]; ok {
		return append(ms, next...)
	}
	for _, prefix := range m.prefix {
		if strings.HasPrefix(operation, prefix) {
			return append(ms, m.matchs[prefix]...)
		}
	}
	return ms
}

以optionnal方式定义的中间件存储m.defaults,通过matcher.Match给不同的路由匹配不同的中间件。最终加入到 middleware.Chain的调用链实现链式调用。

小结

文本记录深入理解Kratos框架和Kratos Middleware,核心使用middleware.Chain的链式调用具有相同的签名的中间件函数。框架通过option的方式提供设置中间件, Matcher中间件匹配器匹配不同路由规则的中间件,来实现链式调用。

参考

jefffff

Stay hungry. Stay Foolish COOL

Go backend developer

China Amoy