Midway Task(分布式任务&本地任务)

315次阅读  |  发布于3年以前

@midwayjs/task 是为了解决任务系列的模块,例如分布式定时任务、延迟任务调度。例如每日定时报表邮件发送、订单2小时后失效等工作。

说明:由于底层是依赖 bull,其通过 redis 进行实现,所以配置中,需要加一个 redis 的配置。

安装组件

首先安装 Midway 提供的任务组件:

$ npm install @midwayjs/task -S

configuration.ts 中,引入这个组件:


// src/configuration.ts
import { Configuration } from '@midwayjs/decorator';
import * as task from '@midwayjs/task';   // 导入模块
import { join } from 'path';

@Configuration({
  imports: [task],
  importConfigs: [join(__dirname, 'config')]
})
export class AutoConfiguration{
}

配置

在 config.default.ts 文件中配置对应的模块信息:

export const taskConfig = {
  redis: `redis://127.0.0.1:32768`, //任务依赖redis,所以此处需要加一个redis
  prefix: 'midway-task',            // 这些任务存储的key,都是midway-task开头,以便区分用户原有redis里面的配置。
  defaultJobOptions: {
    repeat: {
      tz: "Asia/Shanghai"            // Task等参数里面设置的比如(0 0 0 * * *)本来是为了0点执行,但是由于时区不对,所以国内用户时区设置一下。
    }
  }
}

业务代码编写方式

分布式定时任务

import { Provide, Inject, Task } from '@midwayjs/decorator';

@Provide()
export class UserService {
  @Inject()
  helloService: HelloService;

  // 例如下面是每分钟执行一次,并且是分布式任务
  @Task({
    repeat: { cron: '* * * * *'}    
  })
  async test(){
    console.log(this.helloService.getName())
  }
}

本地定时任务


import { Provide, Inject, TaskLocal } from '@midwayjs/decorator';

@Provide()
export class UserService {
  @Inject()
  helloService: HelloService;

  // 例如下面是每秒钟执行一次
  @TaskLocal('* * * * * *')    
  async test(){
    console.log(this.helloService.getName())
  }
}

手动触发任务

任务的定义,通过 @Queue 装饰器,定义一个任务类,内必须含有 execute 方法,并且是 async 的。为什么需要是 async 的因为,这个代码,是为了分布式,相当于有个内部的任务调度过程。


import { Provide, Inject, Queue } from '@midwayjs/decorator';

@Queue()
@Provide()
export class HelloTask{

  @Inject()
  service;

  async execute(params){
    console.log(params);
  }
}

触发:

import { QueueService } from '@midwayjs/task';
import { Provide, Inject } from '@midwayjs/decorator';

@Provide()
export class UserTask{

  @Inject()
  service;

  @Inject()
  queueService: QueueService;

  async execute(params){
    // 3秒后触发分布式任务调度。
    const xxx = await this.queueService.execute(HelloTask, params, {delay: 3000});
  }
}

这样,就相当于是 3 秒后,触发 HelloTask 这个任务。

设置进度

例如我们在做音视频或者发布这种比较耗时的任务的时候,我们希望能设置进度。

相当于第二个参数,将bull的job传递给了用户。用户可以通过job.progress来设置进度。

然后查询进度:

import { QueueService } from '@midwayjs/task';
import { Provide, Controller, Get } from '@midwayjs/decorator';

@Provide()
@Controller()
export class HelloController{
  @Inject()
  queueService: QueueService;

  @Get("/get-queue")
  async getQueue(@Query() id: string){
    return await this.queueService.getClassQueue(TestJob).getJob(id);
  }
}

运维

日志

在Midway Task Component上面,增加了两个日志:

  1. midway-task.log
  2. midway-task-error.log

分别在task、localTask、queue触发开始和结束的时候会打印对应的日志。

分布式的Task触发日志:


logger.info(`task start.`)

// 异常情况:
logger.error(`${e.stack}`)

logger.info(`task end.`)

非分布式的LocalTask触发日志:

logger.info(`local task start.`)

// 异常情况:
// logger.error(`${e.stack}`)

logger.info(`local task end.`)

任务队列的触发日志:

logger.info(`queue process start.`)

// 异常情况:
// logger.error(`${e.stack}`)

logger.info(`queue process end.`)

排查问题链路:

用户可以搜索这个相同的id,找到同一次请求的日志。 为了方便用户在自己的业务代码中串联对应的日志,我在ctx上面挂了traceId变量。

例如异常情况: 当异常的时候,

本地 可以在console栏内看到这个错误相关的情况:

日志:可以在midway-task.log文件中查看完整日志:

如果调用情况比较多的时候,会出现A还没执行完成,B又进来,导致日志区分比较麻烦,所以用户可以搜索调用的traceId,也就是下图红色圈起来的地方:

相当于ctrl + f搜索相同的traceId即可。

traceId

localTask则是自己生成了一个uuid的id作为traceId。

task和queue则采用job的id作为traceId。

业务内部的代码

在service内可以通过inject注入logger,或者注入ctx拿logger变量

import { App, Inject, Provide, Queue } from "@midwayjs/decorator";
import { Application } from "@midwayjs/koa";

@Queue()
@Provide()
export class QueueTask{

  @App()
  app: Application;

  @Inject()
  logger;

  async execute(params){
    this.logger.info(`====>QueueTask execute`)
    this.app.getApplicationContext().registerObject(`queueConfig`, JSON.stringify(params));
  }

或者

import { App, Inject, Provide, Queue } from "@midwayjs/decorator";
import { Application } from "@midwayjs/koa";

@Queue()
@Provide()
export class QueueTask{

  @App()
  app: Application;

  @Inject()
  ctx;

  async execute(params){
    this.ctx.logger.info(`====>QueueTask execute`)
    this.app.getApplicationContext().registerObject(`queueConfig`, JSON.stringify(params));
  }
}

打印的日志

2021-07-30 13:00:13,101 INFO 5577 [Queue][12][QueueTask] queue process start.
2021-07-30 13:00:13,102 INFO 5577 [Queue][12][QueueTask] ====>QueueTask execute
2021-07-30 13:00:13,102 INFO 5577 [Queue][12][QueueTask] queue process end.

其他

关于 Task 任务的配置:


*    *    *    *    *    *
┬    ┬    ┬    ┬    ┬    ┬
│    │    │    │    │    |
│    │    │    │    │    └ day of week (0 - 7) (0 or 7 is Sun)
│    │    │    │    └───── month (1 - 12)
│    │    │    └────────── day of month (1 - 31)
│    │    └─────────────── hour (0 - 23)
│    └──────────────────── minute (0 - 59)
└───────────────────────── second (0 - 59, optional)

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8