背景: 公司项目中,需要暴露对内网的Websocket服务。

Websocket 它是一种基于 TCP 的一种独立实现,APISIX 可以对 TCP/UDP 协议进行代理并实现动态负载均衡, 在 nginx 世界,称 TCP/UDP 代理为 stream 代理,在 APISIX 这里我们也遵循了这个声明,下文统称Stream。

SNI(Server Name Indication)是用来改善 SSL 和 TLS 的一项特性,它允许客户端在服务器端向其发送证书之前向服务器端发送请求的域名,服务器端根据客户端请求的域名选择合适的 SSL 证书发送给客户端。

本文主要介绍ApiSix如何基于SNI代理TCP/UDP。

启用Stream 代理

在 conf/config.yaml 配置文件设置 stream_proxy 选项, 指定一组需要进行动态代理的 IP 地址。默认情况不开启 stream 代理。

apisix:
  stream_proxy: # TCP/UDP proxy
    tcp: # TCP proxy address list
      - 9100
      - "127.0.0.1:9101"
    udp: # UDP proxy address list
      - 9200
      - "127.0.0.1:9211"

如果 apisix.enable_admin 为 true,上面的配置会同时启用 HTTP 和 stream 代理。如果你设置 enable_admin 为 false,且需要同时启用 HTTP 和 stream 代理,设置 only 为 false:

apisix:
  enable_admin: false
  stream_proxy: # TCP/UDP proxy
    only: false
    tcp: # TCP proxy address list
      - 9100

启用 TLS:

首先,我们需要给对应的 TCP 地址启用 TLS

  stream_proxy: # TCP/UDP proxy
    only: false
    tcp: # TCP proxy address list
      - 9100
      - addr: 9333
        tls: true

配置路由

当连接为 TLS over TCP 时,我们可以通过 SNI 来匹配路由,比如:

curl http://127.0.0.1:9180/apisix/admin/stream_routes/1 -H 'X-API-KEY: Your-Api-Key' -X PUT -d '
  {
      "sni": "mysite.com",
      "server_port": 9333,
      "upstream": {
          "scheme": "tls",
          "nodes": {
              "192.168.1.33:8080": 1
          },
          "type": "roundrobin"
      }
  }'

查看是否创建成功:

curl http://127.0.0.1:9180/apisix/admin/stream_routes/1 -H 'X-API-KEY: Your-Api-Key'

如创建错误,可以删除

curl http://127.0.0.1:9180/apisix/admin/stream_routes/1 -H 'X-API-KEY: Your-Api-Key' -X DELETE 

查看更多stream路由:

curl http://127.0.0.1:9180//apisix/admin/stream_routes -H 'X-API-KEY: Your-Api-Key' 

更多Api参考: https://apisix.apache.org/zh/docs/apisix/2.12/admin-api/#stream-route

上传SNI证书

上传站点(mysite.com)的证书到ApiSix

测试

curl -v https://mysite.com:9333/
*   Trying x.x.y.y...
* TCP_NODELAY set
* Connected to mysite.com:19333 (x.x.y.y) port 9333 (#0)
* ALPN, offering h2
* ALPN, offering http/1.1
* successfully set certificate verify locations:
*   CAfile: /etc/ssl/cert.pem
  CApath: none
* TLSv1.2 (OUT), TLS handshake, Client hello (1):
* TLSv1.2 (IN), TLS handshake, Server hello (2):
* TLSv1.2 (IN), TLS handshake, Certificate (11):
* TLSv1.2 (IN), TLS handshake, Server key exchange (12):
* TLSv1.2 (IN), TLS handshake, Server finished (14):
* TLSv1.2 (OUT), TLS handshake, Client key exchange (16):
* TLSv1.2 (OUT), TLS change cipher, Change cipher spec (1):
* TLSv1.2 (OUT), TLS handshake, Finished (20):
* TLSv1.2 (IN), TLS change cipher, Change cipher spec (1):
* TLSv1.2 (IN), TLS handshake, Finished (20):
* SSL connection using TLSv1.2 / ECDHE-RSA-CHACHA20-POLY1305
* ALPN, server did not agree to a protocol
* Server certificate:
*  subject: CN=*.mysite.com
*  start date: Mar 29 00:00:00 2023 GMT
*  expire date: Jun 27 23:59:59 2023 GMT
*  subjectAltName: host "mysite.com" matched cert's "*.mysite.com"
*  issuer: C=AT; O=ZeroSSL; CN=ZeroSSL RSA Domain Secure Site CA
*  SSL certificate verify ok.
> GET / HTTP/1.1
> Host: mysite.com:9333
> User-Agent: curl/7.64.1
> Accept: */*
>
< HTTP/1.1 400 Bad Request
< Content-Type: text/plain; charset=utf-8
< Sec-Websocket-Version: 13
< X-Content-Type-Options: nosniff
< Date: Sun, 23 Apr 2023 05:50:07 GMT
< Content-Length: 12
<
Bad Request
* Connection #0 to host mysite.com left intact
* Closing connection 0

Success。

过程中遇见的问题

  1. 连接不上
 curl -v http://mysite.com
*   Trying x.x.y.y...
* TCP_NODELAY set
* Connected to mysite.com (x.x.y.y.) port 19333
> GET / HTTP/1.1
> Host: mysite.com:19333
> User-Agent: curl/7.64.1
> Accept: */*
>
* Recv failure: Connection reset by peer
* Closing connection 0
curl: (56) Recv failure: Connection reset by peer

需要排查路由器端口是否正常开放,ApiSix的端口是否正常,经排查原因是:配置路由时"server_port"缺失

{
      "sni": "mysite.com",
      "upstream": {
          "scheme": "tls",
          "nodes": {
              "192.168.1.33:8080": 1
          },
          "type": "roundrobin"
      }
  }
  1. handshark失败,证书错误
* TLSv1.2 (OUT), TLS handshake, Client hello (1):
* LibreSSL SSL_connect: SSL_ERROR_SYSCALL in connection to mysite.com:19333
* Closing connection 0
curl: (35) LibreSSL SSL_connect: SSL_ERROR_SYSCALL in connection to mysite.com:19333

排位原因是:这里作者使用的ApiSix版本是2.12,缺失前缀 【addr】

  stream_proxy: # TCP/UDP proxy
    only: false
    tcp: # TCP proxy address list
      - 9100
      - 9333
        tls: true

参考

上篇我们介绍了RedisStream实现消息队列和发布订阅模式 ,今天我们将介绍Go语言的实现方法。

消息队列

接上篇的例子,我们通过go-redis库的接口XAdd、XReadGroup、XGroupCreateMkStream来实现,详细如下:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
)
func main() {
	var (
		ctx      = context.Background()
		consumer = "consumer1"
		stream   = "mystream"
		group    = "group1"
		start    = ">"
		count    = 10
	)

	// 创建Redis客户端
	client := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "",
		DB:       0,
	})
	// 创建消费者组
	err := client.XGroupCreateMkStream(ctx, stream, group, "0-0").Err()
	if err != nil {
		panic(err)
	}
	//添加消息
	_, err = client.XAdd(ctx, &redis.XAddArgs{
		Stream: stream,
		ID:     "*",
		Values: map[string]interface{}{
			"name": "foo",
		},
	}).Result()
	if err != nil {
		panic(err)
	}
	//读取消息
	messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
		Group:    group,
		Consumer: consumer,
		Streams:  []string{stream, start},
		Count:    int64(count),
		Block:    0,
		NoAck:    false,
	}).Result()
	if err != nil {
		panic(err)
	} 
	for _, message := range messages[0].Messages {
		fmt.Printf("Received message: %v\n", message.Values)
	}
}

上例实现了一个简单的消息队列,可以实现消息的发送和接收,并保证每条消息只被处理一次 。

阻塞超时

通过修改Block(单位为ms)参数来实现阻塞,详细修改如下

for {
		fmt.Println("Start Receiving", time.Now().String())
		//读取消息
		messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    group,
			Consumer: consumer,
			Streams:  []string{stream, start},
			Count:    int64(count),
			Block:    2000,
			NoAck:    false,
		}).Result()
		if err != nil {
			panic(err)
		}
		fmt.Println("Received Length=", len(messages), time.Now().String()) 
    ...
	}

输出如下:

Start Receiving 2023-04-08 21:39:23.9669498 +0800 CST m=+0.529310801
Received Length= 1 2023-04-08 21:39:24.0419963 +0800 CST m=+0.604357301
Received message: map[name:foo]
Start Receiving 2023-04-08 21:39:24.0419963 +0800 CST m=+0.604357301
panic: read tcp 127.0.0.1:63777->127.0.0.1:6379: i/o timeout

可见,XReadGroup一直阻塞等待新的消息,直到 Redis 连接超时或者命令被中断。 ps: 如若不需要阻塞等待可修改Block为<0 的值实现。

发布订阅模式

接着,我们实现上篇提到的发布订阅模式,XGroupCreateMkStream创建另外一个订阅组group2,两个协程XReadGroup接收消息,往stream中XAdd消息,两个订阅者将都收到消息,详细如下:

package main

import (
	"context"
	"fmt"
	"github.com/go-redis/redis/v8"
	"time"
)
func main() {
	var (
		ctx       = context.Background()
		consumer  = "consumer1"
		consumer2 = "consumer2"
		stream    = "mystream"
		group     = "group1"
		group2    = "group2"
		start     = ">"
		count     = 10
	)
	// 创建Redis客户端
	client := redis.NewClient(&redis.Options{
		Addr:        "localhost:6379",
		Password:    "",
		DB:          0,
		ReadTimeout: time.Second * 10,
	})
	// 创建消费者组
	err := client.XGroupCreateMkStream(ctx, stream, group, "0-0").Err()
	if err != nil {
		//panic(err)
	}
	// 创建消费者组2
	err = client.XGroupCreateMkStream(ctx, stream, group2, "0-0").Err()
	if err != nil {
		//panic(err)
	}
	//添加消息
	_, err = client.XAdd(ctx, &redis.XAddArgs{
		Stream: stream,
		ID:     "*",
		Values: map[string]interface{}{
			"name": "foo",
		},
	}).Result()
	if err != nil {
		panic(err)
	}
	go func() {
		for {
			fmt.Println("Start Receiving", time.Now().String())
			//读取消息
			messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
				Group:    group,
				Consumer: consumer,
				Streams:  []string{stream, start},
				Count:    int64(count),
				Block:    2000,
				NoAck:    false,
			}).Result()
			if err != nil {
				panic(err)
			}
			fmt.Println("Received Length=", len(messages), time.Now().String())
			for _, message := range messages[0].Messages {
				fmt.Printf("Received message: %v\n", message.Values)
			}

		}
	}()
	//读取消息2
	go func() {
		for {
			fmt.Println("2 Start Receiving ", time.Now().String())
			messages, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
				Group:    group2,
				Consumer: consumer2,
				Streams:  []string{stream, start},
				Count:    int64(count),
				Block:    2000,
				NoAck:    false,
			}).Result()
			if err != nil {
				panic(err)
			}
			fmt.Println("2 Received Length=", len(messages), time.Now().String())
			for _, message := range messages[0].Messages {
				fmt.Printf("2Received message: %v\n", message.Values)
			}
		}
	}()
	for {
	}
}

输出如下:

Start Receiving 2023-04-08 22:19:41.9981051 +0800 CST m=+0.347161101
2 Start Receiving  2023-04-08 22:19:41.9981051 +0800 CST m=+0.347161101
Received Length= 1 2023-04-08 22:19:42.0571709 +0800 CST m=+0.406226901
Received message: map[name:foo]
Start Receiving 2023-04-08 22:19:42.0571709 +0800 CST m=+0.406226901
2 Received Length= 1 2023-04-08 22:19:42.3628412 +0800 CST m=+0.711897201
2Received message: map[name:foo]

参考

Redis Stream是在 Redis 5.0中引入的数据类型,可以实现高性能、高可靠性的消息队列。本文主要介绍 Redis Stream 的概念、使用方法和一些适用场景如发布订阅模式、消息队列。 简介 Redis Stream是Redis为消息队列设计的数据类型,它将消息队列的数据结构抽象为一个有序的消息流(stream),每个消息都有一个唯一的ID和一个关联的键值对(key-value pairs)它支持以下功能: 添加消息-将消息添加到队列的末尾 读取消息-从队列的开头读取消息 删除消息-删除指定id的消息 创建消费者组-创建多个消费者组,每个消费者组都可以独立地读取消息。同时可实现负载均衡和消息分发。 确认消息- XACK 命令来确认已经成功处理的消息,避免重复消费 XADD-添加消息 语法: XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...] MAXLEN:可选参数,用于限制消息流的长度。如果指定了 MAXLEN,那么当消息流的长度超过指定的长度时,最早的消息将被自动删除 ID:可选参数,用于指定消息的 ID。如果不指定,Redis 会自动生成一个唯一的 ID。 field value [field value … :消息的键值对,可以是任何字符串。 例如: XADD mystream * name Foo age 10 此命令将向名为 mystream 的消息流中添加一条消息,输出如下: >"1680355760868-0" XREAD-读取消息 语法:

jefffff

Stay hungry. Stay Foolish COOL

Go backend developer

China Amoy