Go分布式任务队列Asynq入门
分布式任务任务调度与管理在微服务开发中是很有必要的。例如,当需要执行一些计算密集型或网络I/O密集型操作时,为了不影响主线程的性能,我们可以将这些任务放到后台异步执行。此外,异步任务处理还可以改善应用程序的可伸缩性和可靠性,因为它可以将任务分布到多个处理器上并允许任务的重试。
Asynq 介绍
Asynq Simple, reliable, and efficient distributed task queue in Go
Asynq 是一个 Go 库,用于排队任务并与 worker 异步处理它们。它由Redis提供支持,旨在实现可扩展且易于上手。
特性:
- 保证至少执行一次任务
- 任务调度
- 失败任务的重试
- 工作人员崩溃时自动恢复任务
- 加权优先级队列
- 严格的优先队列
- 添加任务的延迟低,因为 Redis 中的写入速度很快
- 使用唯一选项对任务进行重复数据删除
- 允许每个任务超时和截止日期
- 允许聚合任务组以批处理多个连续操作
- 支持中间件的灵活处理程序接口
- 能够暂停队列以停止处理队列中的任务
- 定期任务
- 支持 Redis Cluster实现自动分片和高可用
- 支持 Redis Sentinels以实现高可用性
- 与Prometheus集成以收集和可视化队列指标
- 用于检查和远程控制队列和任务的Web UI
- CLI检查和远程控制队列和任务
总的来说:
- 分布式任务队列:Asynq提供了一个任务队列,可以分布式地处理异步任务,使得任务可以在多个处理器之间分配。
- 可靠性:Asynq具有高可靠性,可以确保任务不会丢失或重复执行。
- 异常处理:Asynq提供了对任务异常的处理机制,以便在任务执行失败时进行重试或处理。
- 优先级和延迟任务:Asynq允许您为任务设置优先级和延迟执行,以便您可以控制任务的执行顺序。
- Web UI和CLI:Asynq提供了一个易于使用的Web UI和CLI工具,可以方便地监控和管理异步任务。
本文主要记录Asynq的入门、基本使用和工作原理。
安装
使用go get命令安装Asynq库
go get -u github.com/hibiken/asynq
工作原理
高级概述:
- Client客户端将任务放入队列
- Server服务器从队列中拉取任务并为每个任务启动一个工作协程
- 任务由多个worker同时处理
Client 创建异步任务
asynq.NewClient是一个用于创建异步任务客户端的函数,它接收一个asynq.ClientConfig配置参数,并返回一个asynq.Client对象,用于在应用程序中创建和发送异步任务。
示例代码如下(发送email)
package main
import (
"encoding/json"
"github.com/hibiken/asynq"
"log"
)
const redisAddr = "127.0.0.1:6379"
func main() {
//创建asynq.Client对象
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
//发送电子邮件的任务,包括发送邮件所需的参数和其他相关信息
type EmailDeliveryPayload struct {
UserID int
TemplateID string
}
payload, err := json.Marshal(EmailDeliveryPayload{
UserID: 123,
TemplateID: "some:template:id",
})
if err != nil {
return
}
// 定义任务类型
task := asynq.NewTask("email", payload)
//添加任务
info, err := client.Enqueue(task)
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
//打印任务ID
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}
OutPut:
$ enqueued task: id=c19bb1cd-321e-48dc-83bc-2eeef4ec4dd1 queue=default
查看redis仓储的数据
127.0.0.1:6379> keys *
1) "asynq:queues"
2) "asynq:{default}:pending"
3) "asynq:{default}:t:c19bb1cd-321e-48dc-83bc-2eeef4ec4dd1"
具体说来, 调用asynq.NewClient创建一个asynq.Client对象,并调用asynq.Client.Enqueue方法向队列中添加任务,即向Redis中指定的队列中添加任务并设置任务参数,然后等待异步任务处理器执行该任务。如果任务添加成功,将打印任务ID。
Server 创建异步任务处理服务器
asynq.NewServer是一个用于创建异步任务处理服务器的函数,它接收一个asynq.ServerConfig配置参数,并返回一个asynq.Server对象,用于启动异步任务处理的主循环并等待关闭信号。
示例代码如下:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
//asynq.NewServer函数会创建一个asynq.Server对象,
//该对象会监听Redis中队列的任务并执行相应的处理函数
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{
//指定使用多少并发wroks
Concurrency: 10,
// 可以指定多个不同优先级的队列。
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
//查看godoc中的其他配置选项
},
)
// mux将类型映射到处理程序
mux := asynq.NewServeMux()
mux.HandleFunc("email", HandleEmailDeliveryTask)
//异步任务处理的主循
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)
}
}
//接收邮件
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
type EmailDeliveryPayload struct {
UserID int
TemplateID string
}
var p EmailDeliveryPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID)
return nil
}
OutPut:
$ asynq: pid=44348 2023/02/18 13:13:26.363649 INFO: Starting processing
$ asynq: pid=44348 2023/02/18 13:13:26.363649 INFO: Send signal TERM or INT to terminate the process
$ Sending Email to User: user_id=123, template_id=some:template:id
查看redis仓储的数据
127.0.0.1:6379> keys *
1) "asynq:{default}:processed"
2) "asynq:servers"
3) "asynq:workers"
4) "asynq:queues"
5) "asynq:{default}:processed:2023-02-18"
6) "asynq:servers:{LAPTOP-H3HBMV8A:44348:4739417b-c837-4710-a6b4-8b759cd4dc0d}"
具体来说,在创建asynq.Server对象后,通过调用asynq.Server.Run方法启动异步任务处理的主循环,并等待关闭信号。该方法会创建多个goroutine处理Redis中的任务,并通过多个worker消费队列中的任务。当收到终止信号时,asynq.Server对象会优雅地关闭所有worker和消费goroutine,并退出主循环。