分布式任务任务调度与管理在微服务开发中是很有必要的。例如,当需要执行一些计算密集型或网络I/O密集型操作时,为了不影响主线程的性能,我们可以将这些任务放到后台异步执行。此外,异步任务处理还可以改善应用程序的可伸缩性和可靠性,因为它可以将任务分布到多个处理器上并允许任务的重试。

Asynq 介绍

Asynq Simple, reliable, and efficient distributed task queue in Go

Asynq 是一个 Go 库,用于排队任务并与 worker 异步处理它们。它由Redis提供支持,旨在实现可扩展且易于上手。

特性

  • 保证至少执行一次任务
  • 任务调度
  • 失败任务的重试
  • 工作人员崩溃时自动恢复任务
  • 加权优先级队列
  • 严格的优先队列
  • 添加任务的延迟低,因为 Redis 中的写入速度很快
  • 使用唯一选项对任务进行重复数据删除
  • 允许每个任务超时和截止日期
  • 允许聚合任务组以批处理多个连续操作
  • 支持中间件的灵活处理程序接口
  • 能够暂停队列以停止处理队列中的任务
  • 定期任务
  • 支持 Redis Cluster实现自动分片和高可用
  • 支持 Redis Sentinels以实现高可用性
  • 与Prometheus集成以收集和可视化队列指标
  • 用于检查和远程控制队列和任务的Web UI
  • CLI检查和远程控制队列和任务

总的来说:

  • 分布式任务队列:Asynq提供了一个任务队列,可以分布式地处理异步任务,使得任务可以在多个处理器之间分配。
  • 可靠性:Asynq具有高可靠性,可以确保任务不会丢失或重复执行。
  • 异常处理:Asynq提供了对任务异常的处理机制,以便在任务执行失败时进行重试或处理。
  • 优先级和延迟任务:Asynq允许您为任务设置优先级和延迟执行,以便您可以控制任务的执行顺序。
  • Web UI和CLI:Asynq提供了一个易于使用的Web UI和CLI工具,可以方便地监控和管理异步任务。

本文主要记录Asynq的入门、基本使用和工作原理。

安装

使用go get命令安装Asynq库

go get -u github.com/hibiken/asynq

工作原理

高级概述:

  1. Client客户端将任务放入队列
  2. Server服务器从队列中拉取任务并为每个任务启动一个工作协程
  3. 任务由多个worker同时处理

Client 创建异步任务

asynq.NewClient是一个用于创建异步任务客户端的函数,它接收一个asynq.ClientConfig配置参数,并返回一个asynq.Client对象,用于在应用程序中创建和发送异步任务。

示例代码如下(发送email)

package main

import (
	"encoding/json"
	"github.com/hibiken/asynq"
	"log"
)

const redisAddr = "127.0.0.1:6379"

func main() {
       //创建asynq.Client对象
	client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
	defer client.Close()
    //发送电子邮件的任务,包括发送邮件所需的参数和其他相关信息
	type EmailDeliveryPayload struct {
		UserID     int
		TemplateID string
	}
	payload, err := json.Marshal(EmailDeliveryPayload{
		UserID:     123,
		TemplateID: "some:template:id",
	})
	if err != nil {
		return
	}
    // 定义任务类型
	task := asynq.NewTask("email", payload)
   //添加任务
	info, err := client.Enqueue(task)
	if err != nil {
		log.Fatalf("could not enqueue task: %v", err)
	}
    //打印任务ID
	log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}

OutPut:

$   enqueued task: id=c19bb1cd-321e-48dc-83bc-2eeef4ec4dd1 queue=default

查看redis仓储的数据

127.0.0.1:6379> keys *
1) "asynq:queues"
2) "asynq:{default}:pending"
3) "asynq:{default}:t:c19bb1cd-321e-48dc-83bc-2eeef4ec4dd1"

具体说来, 调用asynq.NewClient创建一个asynq.Client对象,并调用asynq.Client.Enqueue方法向队列中添加任务,即向Redis中指定的队列中添加任务并设置任务参数,然后等待异步任务处理器执行该任务。如果任务添加成功,将打印任务ID。

Server 创建异步任务处理服务器

asynq.NewServer是一个用于创建异步任务处理服务器的函数,它接收一个asynq.ServerConfig配置参数,并返回一个asynq.Server对象,用于启动异步任务处理的主循环并等待关闭信号。

示例代码如下:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"

	"github.com/hibiken/asynq"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    //asynq.NewServer函数会创建一个asynq.Server对象,
   //该对象会监听Redis中队列的任务并执行相应的处理函数
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: redisAddr},
		asynq.Config{
			//指定使用多少并发wroks
			Concurrency: 10,
			// 可以指定多个不同优先级的队列。
			Queues: map[string]int{
				"critical": 6,
				"default":  3,
				"low":      1,
			},
		//查看godoc中的其他配置选项
		},
	)

	// mux将类型映射到处理程序
	mux := asynq.NewServeMux()
	mux.HandleFunc("email", HandleEmailDeliveryTask)
    //异步任务处理的主循
	if err := srv.Run(mux); err != nil {
		log.Fatalf("could not run server: %v", err)
	}
}

//接收邮件
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
	type EmailDeliveryPayload struct {
		UserID     int
		TemplateID string
	}
	var p EmailDeliveryPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
	}
	log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID) 
	return nil
}

OutPut:

$   asynq: pid=44348 2023/02/18 13:13:26.363649 INFO: Starting processing
$   asynq: pid=44348 2023/02/18 13:13:26.363649 INFO: Send signal TERM or INT to terminate the process
$    Sending Email to User: user_id=123, template_id=some:template:id

查看redis仓储的数据

127.0.0.1:6379> keys *
1) "asynq:{default}:processed"
2) "asynq:servers"
3) "asynq:workers"
4) "asynq:queues"
5) "asynq:{default}:processed:2023-02-18"
6) "asynq:servers:{LAPTOP-H3HBMV8A:44348:4739417b-c837-4710-a6b4-8b759cd4dc0d}"

具体来说,在创建asynq.Server对象后,通过调用asynq.Server.Run方法启动异步任务处理的主循环,并等待关闭信号。该方法会创建多个goroutine处理Redis中的任务,并通过多个worker消费队列中的任务。当收到终止信号时,asynq.Server对象会优雅地关闭所有worker和消费goroutine,并退出主循环。

参考