brpc中的定时任务使用介绍

776次阅读  |  发布于2年以前

ok,欢迎来到brpc小课堂,今天我来讲点不是很hard core的东西,大家take it easy。

众所周知,一个RPC框架除了处理网络请求以外,还有一类任务就是定时任务。所以RPC框架一般都直接提供定时任务的功能。今天我就来聊一下brpc中的定时任务。

当然啦,因为RPC框架中定时任务其实也不是刚需,所以brpc中的定时任务接口设计的比较轻量化。之所以说轻量化,一是因为接口函数足够简单,二是因为它的定时任务不支持周期性定时,也就是说它的定时任务只能生效一次。

如果要实现周期性定时需要自己在定时器的回调函数执行完成,return之前再重新注册一次这一个任务。从而达到周期性定时的效果。可以阅读这个官方issue:

https://github.com/apache/incubator-brpc/issues/175

如图,戈君大神已经做出回复。

ok,说完介绍,接下来就直接说下怎么用吧。主要函数只有一个,那就是bthread_timer_add。通过它来设置定时任务的回调函数和执行时间。它的头文件比较特殊是:#include <bthread/unstable.h>

函数声明如下:

bthread_timer_add(bthread_timer_t* id, timespec abstime,
                  void (*on_timer)(void*), void* arg);

第一个参数id是一个出参,调用的时候无需事先初始化。

第二个参数abstime表示的是任务执行的时间,这个参数初学者很容易弄错。因为他要传入的并不是时间间隔,而是实际的时钟时间。比如你需要一个任务在五分钟后执行,那么这里传入的是当前时间加上5分钟,然后再对结果转成timespec结构。好在brpc也为你提供了工具函数butil::seconds_from_now()方便你做时间累加,和类型转换。

timespec是time.h中定义的数据结构。大家可以直接在Linux机器上man time.h查看说明。

第三个参数on_timer是定时任务的回调函数,也就是你写具体的处理逻辑的地方。当任务时间到,这个函数指针就会被回调执行。它接收一个void*类型的参数,值为void。

最后一个参数arg是第三个参数表示的函数指针的执行参数。不多解释了,总体来说和pthread_create或者bthread_start_background很像。

函数参数介绍完了,来个例子吧,这个是在echo_server代码基础上进行的修改。

#include <gflags/gflags.h>
#include <brpc/server.h>
#include <bthread/unstable.h>    // bthread_timer_add
#include <iostream>
#include "echo.pb.h"
using namespace std;

DEFINE_int32(port, 8000, "TCP Port of this server");

class EchoServiceImpl : public EchoService {
public:
    EchoServiceImpl() {};
    virtual ~EchoServiceImpl() {};
    virtual void Echo(google::protobuf::RpcController* cntl_base,
                      const EchoRequest* request,
                      EchoResponse* response,
                      google::protobuf::Closure* done) {
        brpc::ClosureGuard done_guard(done);

        response->set_message(request->message());
        done->Run();
    }
};
// 定时任务的回调函数
static void timer_func(void* arg) {
    cout << "run timer fun:" << time(NULL) << endl;
    // 其他逻辑
    // ...
}

int main() {
    // 1分钟后执行一次
    bthread_timer_t timer;
    if (bthread_timer_add(&timer, butil::seconds_from_now(60),
                          timer_func, NULL) != 0) {
        cerr << "add timer task failed"<< endl;
    }

    brpc::Server server;
    EchoServiceImpl echo_service_impl;
    if (server.AddService(&echo_service_impl,
                          brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
        cerr << "Fail to add service";
        return -1;
    }
    brpc::ServerOptions options;
    if (server.Start(FLAGS_port, &options) != 0) {
        cerr << "Fail to start EchoServer";
        return -1;
    }

    server.RunUntilAskedToQuit();
    return 0;
}

这个例子比较简单,前面我说过了brpc的定时器是默认不能周期性执行的,所以如果要周期性执行只能在回调函数末尾重新注册,比如:

static void timer_func(void* arg) {
    cout << "run timer fun:" << time(NULL) << endl;
    // 其他逻辑
    // ...
    bthread_timer_t timer;
    if (bthread_timer_add(&timer, butil::seconds_from_now(60),
                          timer_func, NULL) != 0) {
        cerr << "add timer task failed"<< endl;
    }
}

但是这样有个问题,那就是在回调函数中,可能并不是“一镜到底”的。里面可能有一堆复杂的逻辑,会有很多中间return的地方。

    if (exp1) {
        return;
    }
    ...
    if (exp2) {
        return;
    }

如果你要做周期性任务岂不是每个return前面都要写一坨bthread_timer_add()? 那也太丑了吧,也难以维护,很容易遗漏。这时候就要上演RAII的拿手好戏了。RAII在无GC的C++语言中使用广泛,比如前面代码中的:

brpc::ClosureGuard done_guard(done);

C++标准库中还有std::lock_guard用以自动释放互斥锁:

std::mutex _m;  
...
std::lock_guard<std::mutex> lg(_m);

我们来写个TimerGuard:

class TimerGuard {
    typedef void (*on_timer_t)(void*);
public:
    TimerGuard(on_timer_t timer_func, int interval_s):_timer_func(timer_func), _interval_s(interval_s) {}
    ~TimerGuard() {
        bthread_timer_t timer;
        if (bthread_timer_add(&timer, butil::seconds_from_now(_interval_s),
                              _timer_func, NULL) != 0) {
            cerr << "add timer task failed"<< endl;
        }
    }

private:
    on_timer_t _timer_func;
    int _interval_s; // 定时任务执行间隔,单位秒
};

在定义定时任务回调函数的地方,这样即可:

static void timer_func(void* arg) {
    TimerGuard tg(timer_func, 60);
    cout << "run timer fun:" << time(NULL) << endl;
    // 其他逻辑
    // ...
}

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8