ok,欢迎来到brpc小课堂,今天我来讲点不是很hard core的东西,大家take it easy。
众所周知,一个RPC框架除了处理网络请求以外,还有一类任务就是定时任务。所以RPC框架一般都直接提供定时任务的功能。今天我就来聊一下brpc中的定时任务。
当然啦,因为RPC框架中定时任务其实也不是刚需,所以brpc中的定时任务接口设计的比较轻量化。之所以说轻量化,一是因为接口函数足够简单,二是因为它的定时任务不支持周期性定时,也就是说它的定时任务只能生效一次。
如果要实现周期性定时需要自己在定时器的回调函数执行完成,return之前再重新注册一次这一个任务。从而达到周期性定时的效果。可以阅读这个官方issue:
https://github.com/apache/incubator-brpc/issues/175
如图,戈君大神已经做出回复。
ok,说完介绍,接下来就直接说下怎么用吧。主要函数只有一个,那就是bthread_timer_add。通过它来设置定时任务的回调函数和执行时间。它的头文件比较特殊是:#include <bthread/unstable.h>
函数声明如下:
bthread_timer_add(bthread_timer_t* id, timespec abstime,
void (*on_timer)(void*), void* arg);
第一个参数id是一个出参,调用的时候无需事先初始化。
第二个参数abstime表示的是任务执行的时间,这个参数初学者很容易弄错。因为他要传入的并不是时间间隔,而是实际的时钟时间。比如你需要一个任务在五分钟后执行,那么这里传入的是当前时间加上5分钟,然后再对结果转成timespec结构。好在brpc也为你提供了工具函数butil::seconds_from_now()
方便你做时间累加,和类型转换。
timespec是time.h中定义的数据结构。大家可以直接在Linux机器上
man time.h
查看说明。
第三个参数on_timer是定时任务的回调函数,也就是你写具体的处理逻辑的地方。当任务时间到,这个函数指针就会被回调执行。它接收一个void*
类型的参数,值为void。
最后一个参数arg是第三个参数表示的函数指针的执行参数。不多解释了,总体来说和pthread_create
或者bthread_start_background
很像。
函数参数介绍完了,来个例子吧,这个是在echo_server代码基础上进行的修改。
#include <gflags/gflags.h>
#include <brpc/server.h>
#include <bthread/unstable.h> // bthread_timer_add
#include <iostream>
#include "echo.pb.h"
using namespace std;
DEFINE_int32(port, 8000, "TCP Port of this server");
class EchoServiceImpl : public EchoService {
public:
EchoServiceImpl() {};
virtual ~EchoServiceImpl() {};
virtual void Echo(google::protobuf::RpcController* cntl_base,
const EchoRequest* request,
EchoResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
response->set_message(request->message());
done->Run();
}
};
// 定时任务的回调函数
static void timer_func(void* arg) {
cout << "run timer fun:" << time(NULL) << endl;
// 其他逻辑
// ...
}
int main() {
// 1分钟后执行一次
bthread_timer_t timer;
if (bthread_timer_add(&timer, butil::seconds_from_now(60),
timer_func, NULL) != 0) {
cerr << "add timer task failed"<< endl;
}
brpc::Server server;
EchoServiceImpl echo_service_impl;
if (server.AddService(&echo_service_impl,
brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
cerr << "Fail to add service";
return -1;
}
brpc::ServerOptions options;
if (server.Start(FLAGS_port, &options) != 0) {
cerr << "Fail to start EchoServer";
return -1;
}
server.RunUntilAskedToQuit();
return 0;
}
这个例子比较简单,前面我说过了brpc的定时器是默认不能周期性执行的,所以如果要周期性执行只能在回调函数末尾重新注册,比如:
static void timer_func(void* arg) {
cout << "run timer fun:" << time(NULL) << endl;
// 其他逻辑
// ...
bthread_timer_t timer;
if (bthread_timer_add(&timer, butil::seconds_from_now(60),
timer_func, NULL) != 0) {
cerr << "add timer task failed"<< endl;
}
}
但是这样有个问题,那就是在回调函数中,可能并不是“一镜到底”的。里面可能有一堆复杂的逻辑,会有很多中间return的地方。
if (exp1) {
return;
}
...
if (exp2) {
return;
}
如果你要做周期性任务岂不是每个return前面都要写一坨bthread_timer_add()? 那也太丑了吧,也难以维护,很容易遗漏。这时候就要上演RAII的
拿手好戏了。RAII在无GC的C++语言中使用广泛,比如前面代码中的:
brpc::ClosureGuard done_guard(done);
C++标准库中还有std::lock_guard
用以自动释放互斥锁:
std::mutex _m;
...
std::lock_guard<std::mutex> lg(_m);
我们来写个TimerGuard:
class TimerGuard {
typedef void (*on_timer_t)(void*);
public:
TimerGuard(on_timer_t timer_func, int interval_s):_timer_func(timer_func), _interval_s(interval_s) {}
~TimerGuard() {
bthread_timer_t timer;
if (bthread_timer_add(&timer, butil::seconds_from_now(_interval_s),
_timer_func, NULL) != 0) {
cerr << "add timer task failed"<< endl;
}
}
private:
on_timer_t _timer_func;
int _interval_s; // 定时任务执行间隔,单位秒
};
在定义定时任务回调函数的地方,这样即可:
static void timer_func(void* arg) {
TimerGuard tg(timer_func, 60);
cout << "run timer fun:" << time(NULL) << endl;
// 其他逻辑
// ...
}
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8