Chromium除了远近闻名的多进程架构之外,它的多线程模型也相当引人注目的。Chromium的多进程架构是为了解决网页的稳定性问题,而多线程模型则是为了解决网页的卡顿问题。为了达到这个目的,Chromium的多线程模型是基于异步通信的。也就是说,一个线程请求另外一个线程执行一个任务的时候,不需要等待该任务完成就可以去做其它事情,从而避免了卡顿。本文就分析Chromium的多线程模型的设计和实现。
有同学看到这里可能会有疑问,如果一个线程请求另外一个线程执行一个任务的时候,就是要等该任务完成之后才能做其它事情,那么使用异步通信有什么用呢?的确如此,但是Chromium提供这种基于异步通信的多线程模型,就是希望作为开发者的你在实现一个模块的时候,尽最大努力地设计好各个子模块及其对应的数据结构,使得它们在协作时可以最大程度地进行异步通信。因此,Chromium基于异步通信的多线程模型更多的是体现一种设计哲学。
一个典型的异步通信过程如图1所示:
图1 线程异步通信过程
Task-1被分解成三个子任务Task-1(1)、Task-1(2)和Task-1(3)。其中,Task-1(1)由Thread-1执行。Task-1(1)执行完成后,Thread-1通过我们在前面Chromium多线程通信的Closure机制分析一文分析的Closure请求Thread-2执行Task-1(2)。Task-1(2)执行完成后,Thread-2又通过一个Closure请求Thread-1执行Task-1(3)。至此,Task-1就执行完成。我们可以将第一个Closure看作是一个Request操作,而第二个Closure是一个Reply操作。这是一个典型的异步通信过程。当然,如果不需要知道异步通信结果,那么第二个Closure和Task-1(3)就是不需要的。
假设Thread-1需要知道异步通信的结果,那么在图1中我们可以看到一个非常关键的点:Thread-1并不是什么也不干就只是等着Thread-2执行完成Task-1(2),它趁着这个等待的空隙,干了另外一件事情----Task-2。如果我们将Thread-1看作是一个UI线程,那么就意味着这种异步通信模式是可以提高它的响应性的。
为了能够完成上述的异步通信过程,一个线程的生命周期如图2所示:
图2 线程生命周期
线程经过短暂的启动之后(Start),就围绕着一个任务队列(TaskQueue)不断地进行循环,直到被通知停止为止(Stop)。在围绕任务队列循环期间,它会不断地检查任务队列是否为空。如果不为空,那么就会将里面的任务(Task)取出来,并且进行处理。这样,一个线程如果要请求另外一个线程执行某一个操作,那么只需要将该操作封装成一个任务,并且发送到目标线程的任务队列去即可。
为了更好地理解这种基于任务队列的线程运行模式,我们脑补一下另外一种常用的基于锁的线程运行模式。一个线程要执行某一个操作的时候,就直接调用一个代表该操作的一个函数。如果该函数需要访问全局数据或者共享数据,那么就需要进行加锁,避免其它线程也正在访问这些全局数据或者共享数据。这样做的一个好处是我们只需要关心问题的建模,而不需要关心问题是由谁来执行的,只要保证逻辑正确并且数据完整即可。当然坏处也是显然的。首先是为了保持数据完整性,也就是避免访问数据时出现竞争条件,代码里面充斥着各种锁。其次,如果多个线程同时获取同一个锁,那么就会产生竞争。这种锁竞争会带来额外的开销,从而降低线程的响应性。
基于任务队列的线程运行模式,要求在对问题进行建模时,要提前知道谁是执行者。也就是说,在对问题进行建模时,需要指派好每一个子问题的执行者。这样我们为子问题设计数据结构时,就规定这些数据结构仅仅会被子问题的执行者访问。这样执行者在解决指派给它的问题时,就不需要进行加锁操作,因为在解决问题过程中需要访问的数据不会同时被其它执行者访问。这就是通过任务队列来实现异步通信的多线程模型的设计哲学。
当然,这并不是说,基于任务队列的线程运行模式可以完全避免使用锁,因为任务队列本身就是一个线程间的共享资源。想象一下,一个线程要往里面添加任务,另一个线程要从里面将任务提取出来处理。因此,所有涉及到任务队列访问的地方都是需要加锁的。但是如果我们再仔细想想,那么就会发现,任务队列只是一个基础设施,它与具体的问题是无关的。因此,只要我们遵循上述设计哲学,就可以将代码里面需要加锁的地方仅限于访问任务队列的地方,从而就可以减少锁竞争带来的额外的开销。
这样说来,似乎基于任务队列的线程运行模式很好,但是实际上它对问题建模提出了更高的要求,也就是进行子问题划分时,要求划分出来的子问题是正交的,这样我们才有可能为这些子问题设计出不会同时被访问的数据结构。看到"正交"两个字,是不是想起高数里面的向量空间的正交基了?或者傅里叶变换用到的一组三角函数了?其实道理就是一样一样的。
好了,说了这么多,我们就步入到正题,分析Chromium多线程模型的设计和实现,也就是基于任务队列的线程运行模式涉及到核心类图,如图3所示:
图3 基于任务队列的线程运行模式核心类关系图
Thread是一个用来创建带消息循环的类。当我们创建一个Thread对象后,调用它的成员函数Start或者StartWithOptions就可以启动一个带消息循环的线程。其中,成员函数StartWithOptions可以指定线程创建参数。当我们不需要这个线程时,就可以调用之前创建的Thread对象的成员函数Stop。
Thread类继承了PlatformThread::Delegate类,并且重写了它的成员函数ThreadMain。我们知道,Chromium是跨平台的,这样各个平台创建线程使用的API有可能是不一样的。不过,我们可以通过PlatformThread::Delegate类为各个平台创建的线程提供一个入口点。这个入口点就是PlatformThread::Delegate类的成员函数ThreadMain。由于Thread类重写了父类PlatformThread::Delegate的成员函数ThreadMain,因此无论是哪一个平台,当它创建完成一个线程后,都会以Thread类的成员函数ThreadMain作为线程的入口点。
Thread类有一个重要的成员变量messageloop,它指向的是一个MessageLoop对象。这个MessageLoop对象就是用来描述线程的消息循环的。MessageLoop类内部通过成员变量run_loop_指向的一个RunLoop对象和成员变量pump_指向的一个MessagePump对象来描述一个线程的消息循环。
一个线程在运行的过程中,可以有若干个消息循环,也就是一个消息循环可以运行在另外一个消息循环里面。除了最外层的消息循环,其余的消息的消息循环称为嵌套消息循环。我们为什么需要嵌套消息循环呢?这主要是跟模式对话框有关。
考虑一个情景,我们在一个窗口弹出一个文件选择对话框。窗口必须要等到用户在文件选择对话框选择了文件之后,才能去做其它事情。窗口是在消息循环过程中打开文件对话框的,它要等待用户在文件选择对话框中选择文件 ,就意味着消息循环被中止了。由于文件选择对话框也是通过消息循环来响应用户输入的,因此如果打开的它窗口中止了消息循环,就会导致它无法响应用户输入。为了解决这个问题,就要求打开文件选择的窗口不能中止消息循环。方法就是该窗口创建一个子消息循环,该子消息循环负责处理文件选择对应框的输入事件,直到用户选择了一个文件为止。
MessageLoop类的成员变量run_loop_指向的一个RunLoop对象就是用来记录线程当使用的消息循环的。RunLoop类有三个重要的成员变量:
1. messageloop,记录一个RunLoop对象关联的MessageLoop对象。
2. previousloop,记录前一个消息循环,当就是包含当前消息循环的消息循环。
3. rundepth,记录消息循环的嵌套深度。
MessageLoop类的成员变量pump_指向的一个MessagePump对象是用来进行消息循环的,也就是说,Thread类描述的线程通过MessagePump类进入到消息循环中去。
Thread类将消息划分为三类,分别通过以下三个成员变量来描述:
1. workqueue,指向一个TaskQueue对象,用来保存那些需要马上处理的消息。
2. delayed_workqueue,指向一个DelayedTaskQueue,用来保存那些需要延迟一段时间再处理的消息。
3. deferred_non_nestable_workqueue,指向一个TaskQueue对象,用来保存那些不能够在嵌套消息循环中处理的消息。
一个MessagePump对象在进行消息循环时,如果发现消息队列中有消息,那么就需要通知关联的MessageLoop对象进行处理。通知使用的接口就通过MessagePump::Delegate类来描述。
MessagePump::Delegate类定义了四个成员函数,如下所示:
1. DoWork,用来通知MessageLoop类处理其成员变量work_queue_保存的消息。
2. DoDelayedWork,用来通知MessageLoop类处理其成员变量delayed_work_queue_保存的消息。
3. DoIdleWork,用来通知MessageLoop类当前无消息需要处理,MessageLoop类可以利用该间隙做一些Idle Work。
4. GetQueueingInformation,用来获取MessageLoop类内部维护的消息队列的信息,例如消息队列的大小,以及下一个延迟消息的处理时间。
有了前面的基础知识,接下来我们就可以大概描述Thread类描述的线程的执行过程。
首先是线程的启动过程:
1. 调用Thread类的成员函数Start或者StartWithOptions启动一个线程,并且以Thread类的成员函数ThreadMain作为入口点。
Thread类的成员函数ThreadMain负责创建消息循环,也就是通过MessageLoop类创建消息循环。
MessageLoop类在创建消息循环的过程中,会通过成员函数Init创建用来一个用来消息循环的MessagePump对象。
消息循环创建完成之后,调用MessageLoop类的成员函数Run进入消息循环。
MessageLoop类的成员函数Run创建一个RunLoop对象,并且调用它的成员函数Run进入消息循环。注意,该RunLoop对象在创建的过程,会关联上当前线程使用的消息循环,也就是创建它的MessageLoop对象。
RunLoop类的成员函数Run负责建立好消息循环的嵌套关系,也就是设置好它的成员变量previous_loop_和run_depth_等,然后就会调用其关联的MessageLoop对象的成员函数RunHandler进入消息循环。
MessageLoop类的成员函数RunHandler调用成员变量pump_描述的一个MessagePump对象的成员函数Run进入消息循环。
接下来是向线程的消息队列发送消息的过程。这是通过MessageLoop类的以下四个成员函数向消息队列发送消息的:
1. PostTask,发送需要马上进行处理的并且可以在嵌套消息循环中处理的消息。
2. PostDelayedTask,发送需要延迟处理的并且可以在嵌套消息循环中处理的消息。
3. PostNonNestableTask,发送需要马上进行处理的并且不可以在嵌套消息循环中处理的消息。
向线程的消息队列发送了新的消息之后,需要唤醒线程,这是通过调用MessagePump类的成员函数Schedule进行的。线程被唤醒之后 ,就会分别调用MessageLoop类重写父类MessagePump::Delegate的两个成员函数DoWork和DoDelayedWork对消息队列的消息进行处理。如果没有消息可以处理,就调用MessageLoop类重写父类MessagePump::Delegate的成员函数DoIdleWork通知线程进入Idle状态,这时候线程就可以做一些Idle Work。
MessageLoop类的成员函数DoWork在处理消息的过程中,按照以下三个类别进行处理:
1. 对于可以马上处理的消息,即保存在成员变量work_queue_描述的消息队列的消息,执行它们的成员函数Run。
2. 对于需要延迟处理的消息,将它们保存在成员变量delayed_work_queue_描述的消息队列中,并且调用成员变量pump_指向的一个MessagePump对象的成员函数ScheduleDelayedWork设置最早一个需要处理的延迟消息的处理时间,以便该MessagePump对象可以优化消息循环逻辑。
3. 对于可以马上处理但是不可以在嵌套消息循环中处理的消息,如果线程是处理嵌套消息循环中,那么将它们保存在成员变量deferred_non_nestable_work_queue_描述的消息队列中,这些消息将会在线程进入Idle状态时,并且是处理最外层消息循环时,得到处理。
以上就是Thread类描述的线程的大概执行过程,接下来我们通过源码分析详细描述这些过程。
我们首先看线程的启动过程,即Thread类的成员函数Start的实现,如下所示:
bool Thread::Start() {
Options options;
......
return StartWithOptions(options);
}
这个函数定义在文件external/chromium_org/base/threading/thread.cc中。
Thread类的成员函数Start调用另外一个成员函数StartWithOptions来启动一个线程,后者可以通过一个类型为Options的参数指定线程的启动参数,这里没有指定,意味着采用默认参数启动一个线程。
Thread类的成员函数StartWithOptions的实现如下所示:
bool Thread::StartWithOptions(const Options& options) {
......
StartupData startup_data(options);
startup_data_ = &startup_data;
if (!PlatformThread::Create(options.stack_size, this, &thread_)) {
......
return false;
}
// Wait for the thread to start and initialize message_loop_
base::ThreadRestrictions::ScopedAllowWait allow_wait;
startup_data.event.Wait();
// set it to NULL so we don't keep a pointer to some object on the stack.
startup_data_ = NULL;
started_ = true;
......
return true;
}
这个函数定义在文件external/chromium_org/base/threading/thread.cc中。
Thread类的成员函数StartWithOptions首先是将线程启动参数封装一个在栈上分配的StartupData对象中,并且这个StartupData对象的地址会保存在Thread类的成员变量startup_data_中。接下来再调用由平台实现的PlatformThread类的静态成员函数Create创建一个线程。最后通过上面封装的StartupData对象的成员变量event描述的一个WaitableEvent对象等待上述创建的线程启动完成。
一般情况下,线程是不可以进入等待状态的,因为这样会降低线程的响应性。但是有时候线程不得不进入等待状态,例如现在这个情况,当前线程必须要等新创建的线程启动完成之后才能返回,否则的话有可能新创建的线程还没有启动完成,前面在栈上分配的StartupData对象就已经被释放,这样会导致新创建的线程无法访问它的启动参数。
当新创建的线程启动完成之后,就会通过上述的WaitableEvent对象唤醒当前线程,当前线程将Thread类的成员变量startup_data_置为NULL,避免它引用一个即将无效的在栈上分配的StartupData对象,并且将Thread类的成员变量started_的值设置为true,表示新创建的线程已经启动完毕。
接下来我们继续分析PlatformThread类的静态成员函数Create的实现。以Android平台为例,它的实现如下所示:
bool PlatformThread::Create(size_t stack_size, Delegate* delegate,
PlatformThreadHandle* thread_handle) {
base::ThreadRestrictions::ScopedAllowWait allow_wait;
return CreateThread(stack_size, true /* joinable thread */,
delegate, thread_handle, kThreadPriority_Normal);
}
这个函数定义在文件external/chromium_org/base/threading/platform_thread_posix.cc中。
PlatformThread类的静态成员函数Create调用了另外一个函数CreateThread来创建一个线程,后者的实现如下所示:
bool CreateThread(size_t stack_size, bool joinable,
PlatformThread::Delegate* delegate,
PlatformThreadHandle* thread_handle,
ThreadPriority priority) {
......
bool success = false;
pthread_attr_t attributes;
pthread_attr_init(&attributes);
// Pthreads are joinable by default, so only specify the detached
// attribute if the thread should be non-joinable.
if (!joinable) {
pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_DETACHED);
}
// Get a better default if available.
if (stack_size == 0)
stack_size = base::GetDefaultThreadStackSize(attributes);
if (stack_size > 0)
pthread_attr_setstacksize(&attributes, stack_size);
ThreadParams params;
params.delegate = delegate;
params.joinable = joinable;
params.priority = priority;
params.handle = thread_handle;
pthread_t handle;
int err = pthread_create(&handle,
&attributes,
ThreadFunc,
&ms);
success = !err;
......
pthread_attr_destroy(&attributes);
.....
return success;
}
这个函数定义在文件external/chromium_org/base/threading/platform_thread_posix.cc中。
从这里就可以看到,Android平台调用POSIX线程库中的函数pthread_create创建了一个线程,并且指定新创建的线程的入口点函数为ThreadFunc,同时传递给该入口点函数的参数为一个ThreadParams对象,该ThreadParams对象封装了线程启动过程中需要使用到的一系列参数。
新创建线程的入口点函数ThreadFunc的实现如下所示:
void* ThreadFunc(void* params) {
......
ThreadParams* thread_params = static_cast<ThreadParams*>(params);
PlatformThread::Delegate* delegate = thread_params->delegate;
......
delegate->ThreadMain();
......
return NULL;
}
这个函数定义在文件external/chromium_org/base/threading/platform_thread_posix.cc中。
函数ThreadFunc首先将参数params转换为一个ThreadParams对象。有了这个ThreadParams对象之后,就可以通过它的成员变量delegate获得一个PlatformThread::Delegate对象。从前面的调用过程可以知道,这个PlatformThread::Delegate对象实际上是一个Thread对象,用来描述新创建的线程。得到了用来描述新创建线程的Thread对象之后,就可以调用它的成员函数ThreadMain继续启动线程了。
Thread类的成员函数ThreadMain的实现如下所示:
void Thread::ThreadMain() {
{
......
scoped_ptr<MessageLoop> message_loop;
if (!startup_data_->options.message_pump_factory.is_null()) {
message_loop.reset(
new MessageLoop(startup_data_->options.message_pump_factory.Run()));
} else {
message_loop.reset(
new MessageLoop(startup_data_->options.message_loop_type));
}
......
message_loop_ = message_loop.get();
Init();
running_ = true;
startup_data_->event.Signal();
......
Run(message_loop_);
running_ = false;
......
message_loop_ = NULL;
}
}
这个函数定义在文件external/chromium_org/base/threading/thread.cc中。
回忆前面分析的Thread类的成员函数StartWithOptions,它已经将用来描述线程启动参数的一个Options对象保存在成员变量startup_data_描述的一个StartupData对象中,因此我们就可以重新获取这个Options对象。
当Options类的成员变量message_pump_factory不等于NULL时,就表示新创建线程使用的Message Pump通过该成员变量描述的一个Callback对象来创建,也就是调用该Callback对象的成员函数Run来创建。关于Chromium的Callback机制,可以参考前面Chromium多线程通信的Closure机制分析一文。有了Message Pump之后,就可以创建一个Message Loop了。该Message Loop最终会保存在Thread类的成员变量message_loop_中。
一般我们不通过Options类的成员变量message_pump_factory来创建Message Pump,而是通过另外一个成员变量message_loop_type来创建指定Message Loop的类型 ,从而确定要创建的Message Pump,这些逻辑都封装在MessageLoop类的构造函数中。
创建好Message Loop之后,线程的启动工作就完成了,接下来新创建的线程就需要进入到初始化状态,这是通过调用Thread类的成员函数Init实现的。Thread类的成员函数Init一般由子类重写,这样子类就有机会执行一些线程初始化工作。
再接下来,新创建的线程就需要进入运行状态,这是通过调用Thread类的成员函数Run实现的。不过在新创建线程进入运行状态之前,还会做两件事情。第一件事情是将Thread类的成员变量running_设置为true,表示新创建的线程正在运行。第二件事情是通过Thread类的成员变量startup_data_指向的一个StartupData对象的成员变量event描述的一个WaitableEvent唤醒请求创建新线程的线程。
最后,当Thread类的成员函数Run执行完成返回后,需要将Thread类的成员变量running_和message_loop_分别重置为false和NULL,表示新创建的线程已经运行结束了,因此就不再需要Message Loop了。
接下来我们首先分析线程的Message Loop的创建过程,也就是MessageLoop类的构造函数的实现,以完成线程的启动过程,然后再分析线程的运行过程,也就是Thread类的成员函数Run的实现。
我们假设线程的Message Loop是通过Message Loop Type来创建的,对应的MessageLoop类构造函数的实现如下所示:
MessageLoop::MessageLoop(Type type)
: type_(type),
nestable_tasks_allowed_(true),
......
run_loop_(NULL) {
Init();
pump_ = CreateMessagePumpForType(type).Pass();
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc。
MessageLoop类的成员变量type_描述的是消息循环的类型,nestable_tasks_allowed_描述当前是否允许处理嵌套消息,runn_loop_描述的是当前使用的消息循环。
MessageLoop类构造函数首先是调用成员函数Init执行初始化工作,接着再调用成员函数CreateMessagePumpForType根据消息循环的类型创建一个Message Pump。接下来我们就分别分析这两个成员函数的实现。
MessageLoop类的成员函数Init的实现如下所示:
LazyInstance<base::ThreadLocalPointer<MessageLoop> >::Leaky lazy_tls_ptr =
LAZY_INSTANCE_INITIALIZER;
......
void MessageLoop::Init() {
......
lazy_tls_ptr.Pointer()->Set(this);
incoming_task_queue_ = new internal::IncomingTaskQueue(this);
message_loop_proxy_ =
new internal::MessageLoopProxyImpl(incoming_task_queue_);
thread_task_runner_handle_.reset(
new ThreadTaskRunnerHandle(message_loop_proxy_));
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
MessageLoop类的成员函数Init首先将当前创建的MessageLoop对象保存在全局变量lazy_tls_ptr指向一块线程局部存储中,这样我们就可以通过MessageLoop类的静态成员函数current获得当前线程的消息循环,如下所示:
MessageLoop* MessageLoop::current() {
......
return lazy_tls_ptr.Pointer()->Get();
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
回到MessageLoop类的成员函数Init中,接下来它创建了一个任务队列,并且保存在成员变量incoming_queue_中。这个任务队列通过IncomingQueue类来描述,它的定义如下所示:
class BASE_EXPORT IncomingTaskQueue
: public RefCountedThreadSafe<IncomingTaskQueue> {
public:
......
bool AddToIncomingQueue(const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay,
bool nestable);
......
void ReloadWorkQueue(TaskQueue* work_queue);
......
void WillDestroyCurrentMessageLoop();
......
private:
......
TaskQueue incoming_queue_;
......
MessageLoop* message_loop_;
......
};
这个类定义在external/chromium_org/base/message_loop/incoming_task_queue.h中。
IncomingQueue类有两个重要的成员变量:
1. incomingqueue,它描述的是一个TaskQueue,代表的是线程的消息队列,也就是所有发送给线程的消息都保存在这里。
2. messageloop,它指向一个MessageLoop对象,描述的是线程的消息循环。
IncomingQueue类有三个重要的成员函数:
AddToIncomingQueue,用来向成员变量incoming_queue_描述的消息队列发送一个消息,并且唤醒线程进行处理。
ReloadWorkQueue,用来提取成员变量incoming_queue_描述的消息队列中的消息,并且保存在参数work_queue中。
WillDestroyCurrentMessageLoop,当该函数被调用时,会将成员变量message_loop_的值设置为NULL,使得我们不能够再向线程发送消息,也就是请求线程执行某一个操作。
IncomingQueue类的上述成员变量和成员函数我们后面分析消息的发送和处理再详细分析。现在返回到MessageLoop类的成员函数Init中,它接下来创建了一个MessageLoopProxyImpl对象和一个ThreadTaskRunnerHandle对象,分别保存在成员变量message_loop_proxy_和thread_task_runner_handle中,前者封装了当前线程的消息队列,后者又封装了前者。它们与MessageLoop类一样,都是可以用来向线程的消息队列发送消息,这意味着我们有三种方式向线程的消息队列发送消息,后面分析消息的发送过程时我们再详细分析。
MessageLoop类的成员函数Init执行完成后,回到MessageLoop类的构造函数中,接下来它调用另外一个成员函数CreateMessagePumpForType根据消息循环的类型创建一个消息泵(Message Pump),并且保存在成员变量pump_中。
MessageLoop类的成员函数CreateMessagePumpForType的实现如下所示:
#if defined(OS_IOS)
typedef MessagePumpIOSForIO MessagePumpForIO;
#elif defined(OS_NACL)
typedef MessagePumpDefault MessagePumpForIO;
#elif defined(OS_POSIX)
typedef MessagePumpLibevent MessagePumpForIO;
#endif
......
scoped_ptr<MessagePump> MessageLoop::CreateMessagePumpForType(Type type) {
......
#if defined(OS_IOS) || defined(OS_MACOSX)
#define MESSAGE_PUMP_UI scoped_ptr<MessagePump>(MessagePumpMac::Create())
#elif defined(OS_NACL)
// Currently NaCl doesn't have a UI MessageLoop.
// TODO(abarth): Figure out if we need this.
#define MESSAGE_PUMP_UI scoped_ptr<MessagePump>()
#else
#define MESSAGE_PUMP_UI scoped_ptr<MessagePump>(new MessagePumpForUI())
#endif
if (type == MessageLoop::TYPE_UI) {
if (message_pump_for_ui_factory_)
return message_pump_for_ui_factory_();
return MESSAGE_PUMP_UI;
}
if (type == MessageLoop::TYPE_IO)
return scoped_ptr<MessagePump>(new MessagePumpForIO());
#if defined(OS_ANDROID)
if (type == MessageLoop::TYPE_JAVA)
return scoped_ptr<MessagePump>(new MessagePumpForUI());
#endif
......
return scoped_ptr<MessagePump>(new MessagePumpDefault());
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
上面的代码通过一系列宏来适配不同的平台,这里我们只考虑Android平台,这意味着MessagePumpForIO定义为MessagePumpLibevent,MESSAGE_PUMP_UI定义为scoped_ptr
从MessageLoop类的成员函数CreateMessagePumpForType的实现可以知道:
1. 如果消息循环的类型为MessageLoop::TYPE_UI,那么对应的消息泵为MessagePumpForUI,或者由函数指针message_pump_for_ui_factory_指向的函数创建。但是一般不设置函数指针message_pump_for_uifactory,因此,类型为MessageLoop::TYPE_UI的消息循环对应的消息泵为MessagePumpForUI。在Chromium中,消息循环类型为MessageLoop::TYPE_UI的线程称为UI线程,也就是应用程序的主线程。
3. 如果消息循环的类型为MessageLoop::TYPE_JAVA,那么对应的消息泵为MessagePumpForUI。在Chromium中,消息循环类型为MessageLoop::TYPE_JAVA的线程称为JAVA线程,它们与UI线程一样,在JAVA层具有自己的消息循环。
4. 其余类型的消息循环,对应的消息泵为MessagePumpDefault。
总结来说,就是在Android平台上,涉及到的消息泵有MessagePumpForUI、MessagePumpForIO和MessagePumpDefault三种,各自有不同的用途,其中MessagePumpForUI适用于在Java层具有自己的消息循环的UI线程和Java线程,MessagePumpLibevent适用于用来负责执行IPC的IO线程,MessagePumpDefault适用于其它的一般线程。我们先从一般性出发,分析MessagePumpDefault的实现,后面再分析MessagePumpForUI和MessagePumpForIO的实现。
MessagePumpDefault类继承于MessagePump类,它的定义如下所示:
class MessagePumpDefault : public MessagePump {
public:
MessagePumpDefault();
virtual ~MessagePumpDefault();
// MessagePump methods:
virtual void Run(Delegate* delegate) OVERRIDE;
virtual void Quit() OVERRIDE;
virtual void ScheduleWork() OVERRIDE;
virtual void ScheduleDelayedWork(const TimeTicks& delayed_work_time) OVERRIDE;
private:
// This flag is set to false when Run should return.
bool keep_running_;
// Used to sleep until there is more work to do.
WaitableEvent event_;
// The time at which we should call DoDelayedWork.
TimeTicks delayed_work_time_;
DISALLOW_COPY_AND_ASSIGN(MessagePumpDefault);
};
这个类定义在文件external/chromium_org/base/message_loop/message_pump_default.h 。
MessagePumpDefault类重写了父类MessagePump的成员函数Run、Quit、ScheduleWork和ScheduleDelayedWork,后面我们分析消息循环的执行过程和消息的发送过程时,会看到它们的具体实现。
MessagePumpDefault类具有三个成员变量:
1. keeprunning,类型为bool,表示消息循环是否需要继续执行。只要线程不退出,消息循环就要持续执行。
2. event_,类型为WaitableEvent,表示一个可以进行Wait/Wake的事件变量。当线程的消息队列为空时,线程就通过它进入到Wait状态,而当向线程的消息队列发送了一个消息时,就通过它唤醒线程。
3. delayed_worktime,类型为TimeTicks,表示线程进入Wait状态的超时时间。达到超时时间之后,线程就会自动唤醒,然后处理那些延迟消息。
这样,一个消息循环及其对应的消息泵就创建完毕,回到Thread类的成员函数ThreadMain中,接下来它调用成员函数Run使得线程进入到运行状态,也就是围绕消息队列进行不断的循环,直到线程退出为止。
Thread类的成员函数Run的实现如下所示:
void Thread::Run(MessageLoop* message_loop) {
message_loop->Run();
}
这个函数定义在文件external/chromium_org/base/threading/thread.cc中。
Thread类的成员函数Run调用参数message_loop指向的一个MessageLoop对象的成员函数Run使得线程进入运行状态。
MessageLoop类的成员函数Run的实现如下所示:
void MessageLoop::Run() {
RunLoop run_loop;
run_loop.Run();
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
MessageLoop类的成员函数Run在栈上创建了一个RunLoop对象,然后通过调用该RunLoop对象的成员函数Run使得线程进入运行状态。
前面提到,RunLoop的作用是用来建立消息循环的层次关系的,主要是通过它的两个成员变量previous_run_loop_和run_depth来实现,此外,它还有一个成员变量loop,用来关联它所对应的消息循环。
RunLoop类的上述三个成员变量的定义如下所示:
class BASE_EXPORT RunLoop {
public:
......
private:
......
MessageLoop* loop_;
// Parent RunLoop or NULL if this is the top-most RunLoop.
RunLoop* previous_run_loop_;
......
// Used to count how many nested Run() invocations are on the stack.
int run_depth_;
......
};
这三个成员变量定义在文件external/chromium_org/base/run_loop.h中。
它们在RunLoop类的构造函数被初始化,如下所示:
RunLoop::RunLoop()
: loop_(MessageLoop::current()),
previous_run_loop_(NULL),
run_depth_(0),
...... {
......
}
这个函数定义在文件external/chromium_org/base/run_loop.cc中。
从这里我们就可以看到,一个RunLoop关联的消息循环就是当前线程使用的消息循环。这个消息循环可以通过调用前面提到的MessageLoop类的静态成员函数current获得。
RunLoop类的成员变量previous_run_loop_和run_depth_分别被初始化为NULL和0,表示还没有建立好层次关系,但是当RunLoop类的成员函数Run被调用时,它们就会被设置,从而形成层次关系。
从前面的调用过程可以知道,RunLoop类的成员函数Run在MessageLoop类的成员函数Run中调用,它的实现如下所示:
void RunLoop::Run() {
if (!BeforeRun())
return;
loop_->RunHandler();
AfterRun();
}
这个函数定义在文件external/chromium_org/base/run_loop.cc中。
在调用成员变量loop_指向的一个MessageLoop对象的成员函数RunHandler进入消息循环前后,RunLoop类的成员函数Run分别调用了BeforeRun和AfterRun两个成员函数,目的就是为了建立好消息循环的层次关系,它们的实现如下所示:
bool RunLoop::BeforeRun() {
DCHECK(!run_called_);
run_called_ = true;
// Allow Quit to be called before Run.
if (quit_called_)
return false;
// Push RunLoop stack:
previous_run_loop_ = loop_->run_loop_;
run_depth_ = previous_run_loop_? previous_run_loop_->run_depth_ + 1 : 1;
loop_->run_loop_ = this;
running_ = true;
return true;
}
void RunLoop::AfterRun() {
running_ = false;
// Pop RunLoop stack:
loop_->run_loop_ = previous_run_loop_;
// Execute deferred QuitNow, if any:
if (previous_run_loop_ && previous_run_loop_->quit_called_)
loop_->QuitNow();
}
这两个函数定义在文件external/chromium_org/base/run_loop.cc中。
MessageLoop类的成员变量run_loop_记录的是消息循环当前使用的Run Loop,因此,RunLoop类的成员函数BeforeRun会将当前正在处理的RunLoop对象记录在其成员变量loop_指向的一个MessageLoop对象的成员变量run_loop_中,而该MessageLoop对象的成员变量run_loop_原来指向的RunLoop对象则记录在当前正在处理的RunLoop对象的成员变量previous_run_loop_中,从而就形成一个Run Loop调用栈。此外,第一个Run Loop的Run Depth被设置为1,后面的Run Loop的Run Depth依次增加1。
从上面的分析就可以看出,RunLoop类的成员函数BeforeRun执行的是一个Run Loop入栈操作,相应地,RunLoop类的成员函数AfterRun执行的是一个Run Loop出栈操作,它将消息循环当前使用的Run Loop恢复为前一个Run Loop。
RunLoop类的成员变量running_描述的是一个Run Loop当前是否正在被消息循环使用,因此,在RunLoop类的成员函数BeforeRun和AfterRun中,它的值分别被设置为true和false。
RunLoop类的成员变量quit_called_描述的是一个Run Loop是否收到退出请求。如果一个Run Loop当前正在消息循环使用,并且又收到了退出请求,那么就将会导致消息循环退出。这样就会导致以下两种情况:
1. 一个Run Loop在即将被消息循环使用之前,就已经收到了退出请求,那么就不会被消息循环使用,表现就为在RunLoop类的成员函数BeforeRun中,如果当前正在处理的RunLoop对象的成员变量quit_called_的值等于true,那么就返回一个false值给调用者,表示当前正在处理的RunLoop对象不能够进入消息循环。
回到RunLoop类的成员函数Run中,在调用成员函数BeforeRun成功建立好消息循环的层次关系之后,就通过当前正在处理的RunLoop对象进入到下一层消息循环中,这是通过调用当前正在处理的RunLoop对象的成员变量loop_指向的一个MessageLoop对象的成员函数RunHandler实现的。从前面的分析可以知道,该MessageLoop对象描述的是就是当前线程使用的消息循环。
MessageLoop类的成员函数RunHandler的实现如下所示:
void MessageLoop::RunHandler() {
......
pump_->Run(this);
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
MessageLoop类的成员函数RunHandler通过调用成员变量pump_指向的一个MessagePump对象的成员函数Run进入消息循环。前面我们假设该MessagePump对象是一个MessagePumpDefault对象,因此接下来我们继续分析MessagePumpDefault类的成员函数Run的实现,如下所示:
void MessagePumpDefault::Run(Delegate* delegate) {
......
for (;;) {
......
bool did_work = delegate->DoWork();
if (!keep_running_)
break;
did_work |= delegate->DoDelayedWork(&delayed_work_time_);
if (!keep_running_)
break;
if (did_work)
continue;
did_work = delegate->DoIdleWork();
if (!keep_running_)
break;
if (did_work)
continue;
ThreadRestrictions::ScopedAllowWait allow_wait;
if (delayed_work_time_.is_null()) {
event_.Wait();
} else {
TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
if (delay > TimeDelta()) {
event_.TimedWait(delay);
} else {
// It looks like delayed_work_time_ indicates a time in the past, so we
// need to call DoDelayedWork now.
delayed_work_time_ = TimeTicks();
}
}
// Since event_ is auto-reset, we don't need to do anything special here
// other than service each delegate method.
}
keep_running_ = true;
}
这个函数定义在文件external/chromium_org/base/message_loop/message_pump_default.cc中。
参数delegate是一个Delegate类型的指针,但是从上面的调用过程可以知道,它指向的是一个MessageLoop对象。
MessagePumpDefault类的成员函数Run在一个for循环中不断地通过调用参数delegate指向的一个MessageLoop对象的成员函数DoWork和DoDelayedWork检查线程的消息队列是否有任务需要处理。如果没有,再调用该MessageLoop对象的成员函数DoIdleWork处理一些适用在线程空闲时进行的任务。
MessageLoop类的成员函数DoWork、DoDelayedWork和DoIdleWork的返回值均为一个布尔值。当这个布尔值等于true的时候,就表示线程处理了一些任务。在这种情况下,就需要重新执行一遍for循环,这是因为上述三个函数在处理任务的过程中,可能又往线程的消息队列发送了新的任务,因此需要for循环检查进行检查,以及新发送的任务能够得到及时处理。
另一方面,如果MessageLoop类的成员函数DoWork、DoDelayedWork和DoIdleWork的返回值均为false,那就表示线程当前实在是无事可做。这时候就不适合重新执行一遍for循环,因此这会使得线程在空转。在这种情况下,最好的方式就是让线程进入睡眠状态,以便将CPU释放出来。那么线程什么时候需要唤醒呢?
在两种情况下,线程需要从睡眠状态唤醒过来。第一种情况是线程的消息队列有新的消息加入的时候,这时候由发送消息的线程进行唤醒。第二种情况是,线程有一个延时消息需要处理,那么当系统达到该消息的处理时间时,线程就需要自动唤醒过来。
如果线程有一个延时消息需要处理,那么MessagePumpDefault类的成员变量delayed_work_time_就表示该消息在将来执行的时间点。注意,如果线程具有多个延时消息,那么MessagePumpDefault类的成员变量delayed_work_time_描述的是最早的延时点,这时候线程最多就只能睡眠到该时间点,然后自动唤醒过来。还有一点需要注意的是,如果最早的延时点小于系统的当前时间,那么线程就不可以睡眠,而要马上重新执行for循环,以便可以对已经超过了时间点处理的消息进行处理。如果线程没有延时消息需要处理,那么线程就不会设置自动唤醒时间,而是一直处理睡眠状态,直到被其它线程唤醒为止。
无论线程是通过哪一种情况下进行睡眠状态,都是通过MessagePumpDefault类的成员变量event_描述的一个WaitableEvent对象进行,即通过调用它的成员函数Wait和TimedWait进行的。
WaitableEvent是有效地实现线程消息循环的一个重要类。通过WaitableEvent类,线程可以在无消息处理时进入睡眠状态,并且在有消息处理时从睡眠状态唤醒过来,从而避免了不断地轮循消息队列是否有消息处理的操作。因为消息队列可能在大多数情况下都是空的,对它进行不断轮循将会浪费CPU周期。
因为WaitableEvent类是如此重要,因此接下来我们先分析它的实现,然后再继续分析线程处理消息的过程,也就是MessageLoop类的成员函数DoWork、DoDelayedWork和DoIdleWork的实现。
WaitableEvent类的定义如下所示:
class BASE_EXPORT WaitableEvent {
public:
......
WaitableEvent(bool manual_reset, bool initially_signaled);
......
void Signal();
......
void Wait();
......
bool TimedWait(const TimeDelta& max_time);
......
class Waiter {
public:
......
virtual bool Fire(WaitableEvent* signaling_event) = 0;
......
virtual bool Compare(void* tag) = 0;
......
};
......
struct WaitableEventKernel :
public RefCountedThreadSafe<WaitableEventKernel> {
public:
WaitableEventKernel(bool manual_reset, bool initially_signaled);
bool Dequeue(Waiter* waiter, void* tag);
base::Lock lock_;
const bool manual_reset_;
bool signaled_;
std::list<Waiter*> waiters_;
......
};
......
bool SignalAll();
bool SignalOne();
void Enqueue(Waiter* waiter);
scoped_refptr<WaitableEventKernel> kernel_;
......
};
这个类定义在文件external/chromium_org/base/synchronization/waitable_event.h中。
这里我们只讨论Android平台相关的实现。WaitableEvent类提供两个最基本的功能:Wait和Signal。Wait操作使得线程进入睡眠状态,而Signal操作使得线程从睡眠状态唤醒过来。
在WaitableEvent类中,Wait操作对应的两个成员函数为Wait和TimedWait。前者使得线程一直处理唤醒状态,直到被其它线程唤醒为止,而后者使得线程进入到睡眠状态的时间为有限时间,并且在超过该时间后,线程自动唤醒。
在WaitableEvent类中,Signal操作对应的成员函数为Signal,内部通成员函数SignalAll和SignalOne实现。前者唤醒所有的等待者,而后者只唤醒其中一个等待者。
等待者通过内部类Waiter描述,它有Fire和Compare两个成员函数。一个Waiter需要唤醒时,它的成员函数Fire就会被调用。Waiter类的成员函数Compare用来比较一个Waiter与另外一个Waiter是否相同。
一个WaitableEvent可以有若干个Waiter,这些Waiter通过WaitableEvent类的成员函数Enqueue加入到成员变量kernel_指向的一个WaitableEventKernel对象的成员变量waiters_描述的一个列表中。
WaitableEventKernel类除了上述的成员变量waiters_之外,还具有以下三个成员变量:
lock_,一个互斥锁,用来保护成员变量waiters_的并发访问。
manualreset,一个布尔变量,用来表示一个WaitableEvent被唤醒的时候,是否需要手动设置才变为Signaled状态。
signaled_,一个布尔变量,用来表示一个WaitableEvent是否处于Signaled状态。
上述三个成员变量以及成员变量waiters_都是用来描述一个WaitableEvent的状态的。为什么不将这些成员变量直接作为WaitableEvent类的成员变量呢?这是为了模拟Windows系统的HANDLE语意的。在Windows平台,一个描述WaitableEvent对象的HANDLE处理等待状态时,是可以关闭的,即可以被Close。Windows平台认为这种情况会出现未定义行为,但是不会导致程序Crash。Android平台的WaitableEvent为具有这样的语意,就将描述WaitableEvent状态的成员变量保存一个WaitableEventKernel对象中,然后通过一个scoped_refptr智能指针kernel_引用它。这样,当一个WaitableEvent被Close时,它本身是被销毁了,但是它的成员变量kernel_指向的WaitableEventKernel对象却未必会被销毁,这取决于其宿主WaitableEvent的使用情况。例如,如果这个WaitableEventKernel对象同时也被另外一个scoped_refptr智能指针引用时,由于它的引用计数大于1,那么它就不会被销毁。这意味着在我们可以有一种方式,使得一个WaitableEvent被销毁时,我们仍然可以通过其成员变量kernel_描述的WaitableEventKernel对象操作该WaitableEvent,而且可以避免程序Crash。
我们先看WaitableEvent类的构造函数的实现,如下所示:
WaitableEvent::WaitableEvent(bool manual_reset, bool initially_signaled)
: kernel_(new WaitableEventKernel(manual_reset, initially_signaled)) {
}
这个函数定义在文件external/chromium_org/base/synchronization/waitable_event_posix.cc中。
WaitableEvent的构造函数主要就是创建了一个WaitableEventKernel对象,并且保存在成员变量kernel_中。
我们接下来继续分析WaitableEvent类的成员函数Wait的实现,如下所示:
void WaitableEvent::Wait() {
bool result = TimedWait(TimeDelta::FromSeconds(-1));
......
}
这个函数定义在文件external/chromium_org/base/synchronization/waitable_event_posix.cc中。
WaitableEvent类的成员函数Wait通过调用另外一个成员函数TimedWait使得线程进入睡眠状态,并且指定进入睡眠状态的时间为-1,即无限地进入睡眠状态,直到被其它线程唤醒为止。
WaitableEvent类的成员函数TimedWait的实现如下所示:所示:
bool WaitableEvent::TimedWait(const TimeDelta& max_time) {
......
const TimeTicks end_time(TimeTicks::Now() + max_time);
const bool finite_time = max_time.ToInternalValue() >= 0;
kernel_->lock_.Acquire();
if (kernel_->signaled_) {
if (!kernel_->manual_reset_) {
// In this case we were signaled when we had no waiters. Now that
// someone has waited upon us, we can automatically reset.
kernel_->signaled_ = false;
}
kernel_->lock_.Release();
return true;
}
SyncWaiter sw;
sw.lock()->Acquire();
Enqueue(&sw);
kernel_->lock_.Release();
for (;;) {
const TimeTicks current_time(TimeTicks::Now());
if (sw.fired() || (finite_time && current_time >= end_time)) {
const bool return_value = sw.fired();
......
sw.lock()->Release();
kernel_->lock_.Acquire();
kernel_->Dequeue(&sw, &sw);
kernel_->lock_.Release();
return return_value;
}
if (finite_time) {
const TimeDelta max_wait(end_time - current_time);
sw.cv()->TimedWait(max_wait);
} else {
sw.cv()->Wait();
}
}
}
这个函数定义在文件external/chromium_org/base/synchronization/waitable_event_posix.cc中。
我们分段来阅读WaitableEvent类的成员函数TimedWait的代码。
第一段代码如下所示:
const TimeTicks end_time(TimeTicks::Now() + max_time);
const bool finite_time = max_time.ToInternalValue() >= 0;
第一行代码计算线程进入睡眠状态的结束时间,保存在变量end_time中。第二行代码判断参数max_time是否大于等于0。如果大于等于0,就意味着参数max_time描述的是一个有限的时间,即线程不能无限进入睡眠状态。在这种情况下,变量finite_time的值等于true。否则的话,变量finite_time的值等于false。
第二段代码如下所示:
kernel_->lock_.Acquire();
if (kernel_->signaled_) {
if (!kernel_->manual_reset_) {
// In this case we were signaled when we had no waiters. Now that
// someone has waited upon us, we can automatically reset.
kernel_->signaled_ = false;
}
kernel_->lock_.Release();
return true;
}
这段代码判断当前处理的WaitableEvent是否已经处于Signaled状态。如果是的话,当前线程就不需要进入睡眠状态了,因为当前线程本来就是要等待当前处理的WaitableEvent处于Signaled状态的。在这种情况下,WaitableEvent类的成员函数TimedWait就直接返回一个true值给调用者,表示已经成功地等待当前处理的WaitableEvent处于Signaled状态。
不过在返回之前,会判断当前处理的WaitableEvent在创建时是否指定了当它处于Signaled状态时,可以自动Reset为非Signaled状态。从这里我们就可以看出,一个WaitableEvent的状态可以从Signaled自动Reset为非Signaled,指的就是一个当其在Signaled状态时被执行Wait操作时,会自动变为非Signaled状态。这样在下一次执行Wait操作时,就要等到该WaitableEvent的状态变为Signaled之后,WaitableEvent类的成员函数TimedWait才会返回。
从这里我们就可以看到,WaitableEvent类的成员函数TimedWait是通过成员变量kernel_指向的一个WaitableEventKernel对象来获得Signaled状态的,从而可以避免一个WaitableEvent被销毁的时候,我们仍然可以访问它的状态,而不会引发程序Crash。
从这里我们还可以看到,访问WaitableEvent的状态需要在加锁的情况下进行,该锁由其成员变量kernel_指向的一个WaitableEventKernel对象的成员变量lock_描述。同时,WaitableEvent类的成员函数TimedWait在返回之前,需要释放该锁。
第三段代码如下所示:
SyncWaiter sw;
sw.lock()->Acquire();
Enqueue(&sw);
kernel_->lock_.Release();
这段代码在栈上创建一个SyncWaiter对象,并且通过调用成员函数Enqueue将其加入到当前正在处理的WaitableEvent的Waiter列表中,如下所示:
void WaitableEvent::Enqueue(Waiter* waiter) {
kernel_->waiters_.push_back(waiter);
}
由于要操作当前正在处理的WaitableEvent的Waiter列表,因此WaitableEvent类的成员函数Enqueue需要在加锁的情况下进行操作。
SyncWaiter是一个用来描述同步Waiter的类,所谓同步Waiter,就是说在线程进入睡眠状态这段时间,它是不会被销毁的。因此,在线程进入睡眠状态这段时间里,我们可以安全地对它进行操作。
SyncWaiter类的实现如下所示:
class SyncWaiter : public WaitableEvent::Waiter {
public:
SyncWaiter()
: fired_(false),
signaling_event_(NULL),
lock_(),
cv_(&lock_) {
}
virtual bool Fire(WaitableEvent* signaling_event) OVERRIDE {
base::AutoLock locked(lock_);
if (fired_)
return false;
fired_ = true;
signaling_event_ = signaling_event;
cv_.Broadcast();
return true;
}
WaitableEvent* signaling_event() const {
return signaling_event_;
}
virtual bool Compare(void* tag) OVERRIDE {
return this == tag;
}
bool fired() const {
return fired_;
}
void Disable() {
fired_ = true;
}
base::Lock* lock() {
return &lock_;
}
base::ConditionVariable* cv() {
return &cv_;
}
private:
bool fired_;
WaitableEvent* signaling_event_; // The WaitableEvent which woke us
base::Lock lock_;
base::ConditionVariable cv_;
};
这个类定义在文件external/chromium_org/base/synchronization/waitable_event_posix.cc中。
SyncWaiter类的核心是定义了一个互斥锁lock和一个条件变量cv,它们分别可以通过成员函数lock和cv来获得。
SyncWaiter类有一个重要的成员函数Fire,它的作用唤醒睡眠在条件变量cv_的线程,并且将成员变量fired_设置为true,用来表示成员变量signaling_event_描述的WaitableEvent已经处于Signaled状态。
我们继续分析WaitableEvent类的成员函数TimedWait的最后一段代码,如下所示:
for (;;) {
const TimeTicks current_time(TimeTicks::Now());
if (sw.fired() || (finite_time && current_time >= end_time)) {
const bool return_value = sw.fired();
......
sw.lock()->Release();
kernel_->lock_.Acquire();
kernel_->Dequeue(&sw, &sw);
kernel_->lock_.Release();
return return_value;
}
if (finite_time) {
const TimeDelta max_wait(end_time - current_time);
sw.cv()->TimedWait(max_wait);
} else {
sw.cv()->Wait();
}
}
这个for循环不断检查刚才已经加入到当前正在处理的WaitableEvent的Waiter列表的等待者sw是否已经被Fired。如果已经被Fired,那么就说明当前正在处理的WaitableEvent已经处理Signaled状态,因此就可以结束检查,并且返回了。不过在返回之前,会调用WaitableEventKernel类的成员函数Dequeue将等待者sw从当前正在处理的WaitableEvent的Waiter列表删除。
WaitableEventKernel类的成员函数Dequeue的实现如下所示:
bool WaitableEvent::WaitableEventKernel::Dequeue(Waiter* waiter, void* tag) {
for (std::list<Waiter*>::iterator
i = waiters_.begin(); i != waiters_.end(); ++i) {
if (*i == waiter && (*i)->Compare(tag)) {
waiters_.erase(i);
return true;
}
}
return false;
}
这个函数定义在文件external/chromium_org/base/synchronization/waitable_event_posix.cc中。
从这里我们可以看到,WaitableEventKernel类的成员函数Dequeue首先是在当前正在处理的WaitableEvent的Waiter列表找到参数waiter描述的Waiter,然后再将其从列表中删除。
在当前正在处理的WaitableEvent的Waiter列表中查找参数waiter描述的Waiter,不仅要对比列表的Waiter与参数waiter描述的Waiter的地址是否相等,还要进一步以参数tag为参数,调用前者的成员函数Compare,只有当该成员函数返回值等于true时,才会将参数waiter描述的Waiter从列表中删除。之所以要这样做,是与后面我们分析的异步Waiter有关的。这一点我们后面再详细分析。
回到WaitableEvent类的成员函数TimedWait的最后一段代码中。虽然等待者sw没有被Fired,但是WaitableEvent类的成员函数TimedWait的参数max_time指定了当前线程只可以等待有限的时候,并且这个有限时间已经过去。在这种情况下,即使等待者sw没有被Fired,那么WaitableEvent类的成员函数TimedWait也要返回了,不过这时候它的返回值为false。
最后,WaitableEvent类的成员函数TimedWait判断线程是否只是有限地进入睡眠状态,即判断变量finite_time的值是否等于true。如果等于true,那么就会通过调用等待者sw内部的条件变量cv_的成员函数TimedWait使得当前线程进入睡眠状态,并且指定最长的睡眠时间为max_wait。
另一方面,如果WaitableEvent类的成员函数TimedWait判断线程需要无限地进入睡眠状态,那么就会通过调用等待者sw内部的条件变量cv_的成员函数Timed使得当前线程进入无限睡眠状态,直到被其它线程唤醒为止。
我们最后继续分析WaitableEvent类的成员函数Signal的实现,如下所示:
void WaitableEvent::Signal() {
base::AutoLock locked(kernel_->lock_);
if (kernel_->signaled_)
return;
if (kernel_->manual_reset_) {
SignalAll();
kernel_->signaled_ = true;
} else {
// In the case of auto reset, if no waiters were woken, we remain
// signaled.
if (!SignalOne())
kernel_->signaled_ = true;
}
}
这个函数定义在文件external/chromium_org/base/synchronization/waitable_event_posix.cc中。
如果当前正在处理的WaitableEvent已经处于Signaled状态,那么WaitableEvent类的成员函数Signal就不需要再次将其修改为Signaled状态并且唤醒等待者了。否则的话,就继续往前执行。
如果当前正在处理的WaitableEvent的Signaled状态不可自动Reset为非Signaled状态,那么就调用成员函数SignalAll唤醒所有的等待者,并且保存当前正在处理的WaitableEvent的状态为Signaled状态。
如果当前正在处理的WaitableEvent的Signaled状态可以自动Reset为非Signaled状态,那么就调用成员函数SignalOne唤醒其中的一个等待者。但是如果一个等待者都没有被唤醒,那么就会继续保持当前正在处理的WaitableEvent的状态为Signaled状态。
WaitableEvent类的成员函数SignalAll的实现如下所示:
bool WaitableEvent::SignalAll() {
bool signaled_at_least_one = false;
for (std::list<Waiter*>::iterator
i = kernel_->waiters_.begin(); i != kernel_->waiters_.end(); ++i) {
if ((*i)->Fire(this))
signaled_at_least_one = true;
}
kernel_->waiters_.clear();
return signaled_at_least_one;
}
这个函数定义在文件external/chromium_org/base/synchronization/waitable_event_posix.cc中。
WaitableEvent类的成员函数SignalAll对当前正在处理的WaitableEvent的Waiter列表中的每一个Waiter,都调用其成员函数Fire,使得它们可以唤醒相应的线程。
最后,WaitableEvent类的成员函数SignalAll会清空当前正在处理的WaitableEvent的Waiter列表,并且在至少唤醒一个Waiter的情况下,返回一个true值给调用者,否则就返回false。
WaitableEvent类的成员函数SignalOne的实现如下所示:
bool WaitableEvent::SignalOne() {
for (;;) {
if (kernel_->waiters_.empty())
return false;
const bool r = (*kernel_->waiters_.begin())->Fire(this);
kernel_->waiters_.pop_front();
if (r)
return true;
}
}
这个函数定义在文件external/chromium_org/base/synchronization/waitable_event_posix.cc中。
WaitableEvent类的成员函数SignalOne从当前正在处理的WaitableEvent的Waiter列表中的第一个Waiter开始,依次调用它们的成员函数Fire。只要其中的某一个Waiter的成员函数Fire的返回值为true,那么就会停止遍历Waiter列表,并且返回一个true值给调用者。如果没有一个Waiter的成员函数Fire返回值为true,那么WaitableEvent类的成员函数SignalOne的返回值就为false。注意,每一个被遍历过的Waiter,不管它的成员函数Fire的返回值是什么,它都会从Waiter列表删除。
以上就是WaitableEvent通过同步等待者(SyncWaiter)实现的Wait和Signal操作。WaitableEvent还可以通过异步等待者实现异步的Wait和Signal操作。为了理解这种异步Wait和Signal操作,我们先看一个例子,如下所示:
class MyClass {
public:
void DoStuffWhenSignaled(WaitableEvent *waitable_event) {
watcher_.StartWatching(waitable_event,
base::Bind(&MyClass::OnWaitableEventSignaled, this);
}
private:
void OnWaitableEventSignaled(WaitableEvent* waitable_event) {
// OK, time to do stuff!
}
base::WaitableEventWatcher watcher_;
};
当我们调用MyClass类的成员函数DoStuffWhenSignaled的时候,表示希望在参数waitable_event描述的一个WaitableEvent处于Signaled状态时,可以调用MyClass类的另外一个成员函数OnWaitableEventSignaled干点其它事情,这是通过成员变量watcher_描述的一个WaitableEventWatcher对象的成员函数StartWatching实现的。
也就是说,通过WaitableEventWatcher类,我们可以监控一个WaitableEvent,使得它处于Signaled状态时,获得通知。
WaitableEventWatcher的定义如下所示:
class BASE_EXPORT WaitableEventWatcher
: public MessageLoop::DestructionObserver {
public:
typedef Callback<void(WaitableEvent*)> EventCallback;
WaitableEventWatcher();
virtual ~WaitableEventWatcher();
bool StartWatching(WaitableEvent* event, const EventCallback& callback);
void StopWatching();
......
private:
......
MessageLoop* message_loop_;
scoped_refptr<Flag> cancel_flag_;
AsyncWaiter* waiter_;
base::Closure internal_callback_;
scoped_refptr<WaitableEvent::WaitableEventKernel> kernel_;
WaitableEvent* event_;
EventCallback callback_;
};
这个类定义在文件external/chromium_org/base/synchronization/waitable_event_watcher.h。
WaitableEventWatcher类有两个重要的成员函数StartWatching和StopWatching。前者用来监控一个WaitableEvent,并且当该WaitableEvent状态变成Signaled时,调用一个EventCallback,它的实现如下所示:
bool WaitableEventWatcher::StartWatching(
WaitableEvent* event,
const EventCallback& callback) {
MessageLoop *const current_ml = MessageLoop::current();
......
cancel_flag_ = new Flag;
callback_ = callback;
internal_callback_ =
base::Bind(&AsyncCallbackHelper, cancel_flag_, callback_, event);
WaitableEvent::WaitableEventKernel* kernel = event->kernel_.get();
AutoLock locked(kernel->lock_);
event_ = event;
if (kernel->signaled_) {
if (!kernel->manual_reset_)
kernel->signaled_ = false;
// No hairpinning - we can't call the delegate directly here. We have to
// enqueue a task on the MessageLoop as normal.
current_ml->PostTask(FROM_HERE, internal_callback_);
return true;
}
message_loop_ = current_ml;
......
kernel_ = kernel;
waiter_ = new AsyncWaiter(current_ml, internal_callback_, cancel_flag_.get());
event->Enqueue(waiter_);
return true;
}
这个函数定义在文件external/chromium_org/base/synchronization/waitable_event_watcher_posix.cc中。
WaitableEventWatcher类的成员函数StartWatching首先是获得当前线程的消息循环,最终会保存在成员变量messageloop,这样当参数event描述的WaitableEvent状态变为Signaled时,将参数callback描述的一个EventCallback发送到该消息循环去,然后在当前线程中执行。
接下来,WaitableEventWatcher类的成员函数StartWatching创建了一个Flag对象,并且保存在成员变量cancel_flag_中。这个Flag对象的作用是用来处理在异步等待WaitableEvent状态变为Signaled的过程中出现的异常情况的。由于是异步等待,因此就有可能要监控的WaitableEvent的状态还没有变为Signaled,进行监控的WaitableEventWatcher就已经被销毁。
例如,在前面举的例子中,我们创建一个MyClass对象,然后调用它的成员函数DoStuffWhenSignaled对一个WaitableEvent进行监控。但是有可能该WaitableEvent的状态还没有变为Signaled,前面创建的MyClass对象就被销毁。这意味着它内部通过成员变量watcher_描述的WaitableEventWatcher对象也会被销毁。在这种情况下,如果要监控的WaitableEvent状态变为Signaled,我们必须要保证已经被销毁的MyClass对象的成员函数OnWaitableEventSignaled不会被调用,否则的话就会出错了。
为了能够正确处理上述的异常情况,就必须要给一个WaitableEventWatcher关联一个生命周期更长的Flag对象,该Flag对象在要监控的WaitableEvent状态变为Signaled之前,不会被销毁。
Flag类的实现如下所示:
class Flag : public RefCountedThreadSafe<Flag> {
public:
Flag() { flag_ = false; }
void Set() {
AutoLock locked(lock_);
flag_ = true;
}
bool value() const {
AutoLock locked(lock_);
return flag_;
}
private:
friend class RefCountedThreadSafe<Flag>;
~Flag() {}
mutable Lock lock_;
bool flag_;
DISALLOW_COPY_AND_ASSIGN(Flag);
};
这个类定义在文件external/chromium_org/base/synchronization/waitable_event_watcher_posix.cc中。
当一个WaitableEventWatcher被销毁时,与它关联的Flag对象的成员变量flag_的值就会被设置true,这意味着当监控的WatiableEvent状态变为Signaled时,不需要执行之前指定的一个EventCallback。
回到WaitableEventWatcher类的成员函数StartWatching中,接下来它创建一个Closure对象,并且保存在成员变量internal_callback_中,该Closure对象绑定的函数为AsyncCallbackHelper,并且当它被调用时,传递给它的参数前面创建的Flag对象,以及参数callback和event描述的EventCallback对象和WaitableEvent对象。这样我们就可以推断出,当参数event描述的WaitableEvent对象状态变为Signaled时,会通过函数AsyncCallbackHelper来间接地执行参数event描述的EventCallback对象。
WaitableEventWatcher类的成员函数StartWatching接下来获得要监控的WaitableEvent内部的一个WaitableEventKernel对象,然后通过该WaitableEventKernel对象判断要监控的WaitableEvent的状态是否已经是Signaled。如果是的话,那么就需要等待了,直接将前面创建的Closure发送到当前线程的消息循环去等待执行即可。
最后,如果要监控的WaitableEvent的状态还没有变为Signaled,那么就需要进行等待了。这是通过创建一个类型为AsyncWaiter的异步等待者,并且将它加入到要监控的WaitableEvent的Waiter列表中去实现的。这里我们就可以看到前面分析的同步等待和异步等待的区别。同步等待将一个SyncWaiter加入到一个WaitableEvent的Waiter列表去后,不能够马上返回,而是要通过一个for循环不断等待指定的WaitableEvent状态变为Signaled为止,或者直到等待的时候超出指定的时间为止。
AsyncWaiter类的实现如下所示:
class AsyncWaiter : public WaitableEvent::Waiter {
public:
AsyncWaiter(MessageLoop* message_loop,
const base::Closure& callback,
Flag* flag)
: message_loop_(message_loop),
callback_(callback),
flag_(flag) { }
virtual bool Fire(WaitableEvent* event) OVERRIDE {
// Post the callback if we haven't been cancelled.
if (!flag_->value()) {
message_loop_->PostTask(FROM_HERE, callback_);
}
// We are removed from the wait-list by the WaitableEvent itself. It only
// remains to delete ourselves.
delete this;
// We can always return true because an AsyncWaiter is never in two
// different wait-lists at the same time.
return true;
}
// See StopWatching for discussion
virtual bool Compare(void* tag) OVERRIDE {
return tag == flag_.get();
}
private:
MessageLoop *const message_loop_;
base::Closure callback_;
scoped_refptr<Flag> flag_;
};
这个类定义在文件external/chromium_org/base/synchronization/waitable_event_watcher_posix.cc中。
从前面的分析可以知道,当要监控的WaitableEvent的状态变为Signaled时,前面已经已经加入到它的Waiter列表的AsyncWaiter对象的成员函数Fire就会被调用。
AsyncWaiter类的成员变量flag_指向的是一个Flag对象。前面提到,如果该Flag对象关联的WaitableEventWatcher在要监控的WaitableEvent状态还没有变为Signaled就已经被销毁,那么该Flag对象的成员变量flag_的值就会被设置为true。在这种情况下,AsyncWaiter类的成员函数Fire就不需要执行成员变量callback_描述的一个Closure。
另一方面,如果AsyncWaiter类的成员变量flag_指向的Flag对象的成员变量flag_的值保持为false,那么就需要将成员变量callback_描述的一个Closure发送到成员变量message_loop_描述的一个消息循环去执行。
从前面的分析可以知道,AsyncWaiter类的成员变量message_loop_描述的消息循环即为调用WaitableEventWatcher类的成员函数StartWatching的那个线程的消息循环,并且AsyncWaiter类的成员变量callback_描述的Closure绑定的函数为AsyncCallbackHelper,它的实现如下所示:
void AsyncCallbackHelper(Flag* flag,
const WaitableEventWatcher::EventCallback& callback,
WaitableEvent* event) {
// Runs in MessageLoop thread.
if (!flag->value()) {
// This is to let the WaitableEventWatcher know that the event has occured
// because it needs to be able to return NULL from GetWatchedObject
flag->Set();
callback.Run(event);
}
}
这个函数定义在文件external/chromium_org/base/synchronization/waitable_event_watcher_posix.cc中。
参数flag描述的Flag对象就是前面在WaitableEventWatcher类的成员函数StartWatching中创建的Flag对象,参数evnet描述的就是要监控的WaitableEvent,而参数callback就是当参数evnet描述的WaitableEvent状态变为Signaled时要执行的一个EventCallback。
只有在参数flag描述的Flag对象的成员变量flag_的值等于false的情况下,函数AsyncCallbackHelper才会执行参数callback描述的EventCallback,并且在执行该EventCallback之前,会将参数flag描述的Flag对象的成员变量flag_的值设置为true,用来表示参数callback描述的EventCallback已经执行过了。
以上就是通过一个WaitableEventWatcher来监控一个WaitableEvent的状态变为Signaled并且获得通知的过程。前面我们还提及到一种情况,就是要监控的WaitableEvent的状态尚未变为Signaled,执行监控的WaitableEventWatcher就已经被销毁。接下来我们就继续分析这种情况是如何处理的。
当一个WaitableEventWatcher被销毁时,它的析构函数就会被调用,如下所示:
WaitableEventWatcher::~WaitableEventWatcher() {
StopWatching();
}
这个函数定义在文件external/chromium_org/base/synchronization/waitable_event_watcher_posix.cc中。
WaitableEventWatcher类的析构函数调用另外一个成员函数StopWatching停止监控之前在成员函数StartWatching指定的WaitableEvent。
WaitableEventWatcher类的成员函数StopWatching的实现如下所示:
void WaitableEventWatcher::StopWatching() {
......
AutoLock locked(kernel_->lock_);
......
if (kernel_->Dequeue(waiter_, cancel_flag_.get())) {
....
delete waiter_;
internal_callback_.Reset();
cancel_flag_ = NULL;
return;
}
......
cancel_flag_->Set();
cancel_flag_ = NULL;
.....
}
这个函数定义在文件external/chromium_org/base/synchronization/waitable_event_watcher_posix.cc中。
WaitableEventWatcher类的成员函数StopWatching要处理的边界情况很多,这里我们只关心其中两个最主要的情况。
第一种情况是一个WaitableEventWatcher被销毁时,它要监控的WaitableEvent的状态依然是没有变为Signaled。这时候该WaitableEventWatcher的成员变量waiter_指向的一个AsyncWaiter对象是在要监控的WaitableEvent的Waiter列表中。因此这时候调用与该WaitableEvent关联的一个WaitableEventKernel对象的成员函数Dequeue可以将它从Waiter列表中删除,即调用上述WaitableEventKernel对象的成员函数Dequeue的返回值为true。这样就可以保证以后要监控的WaitableEvent的状态变为Signaled时,当前被销毁的WaitableEventWatcher不会获得任务通知。
第二种情况是一个WaitableEventWatcher被销毁时,它要监控的WaitableEvent的状态已经变为Signaled。从前面的分析可以知道,这时候该WaitableEventWatcher的成员变量waiter_指向的一个AsyncWaiter对象已经不在要监控的WaitableEvent的Waiter列表了,并且它的成员函数Fire已经被调用。但是该AsyncWaiter对象的成员变量callback_描述的一个Closure可能还没有被调度执行,这时候就需要将与它关联的一个Flag对象的成员变量flag_的值设置为true,保证该Closure不会被执行。这个Flag对象就是WaitableEventWatcher类的成员变量cancel_flag指向的Flag对象,调用它的成员函数Set即可将它的成员变量flag_的值设置为true。
这里有一点需要注意的是,WaitableEventWatcher类的成员函数StopWatching在调用WaitableEventKernel类的成员函数Dequeue从Waiter列表中删除成员变量waiter_描述的一个AsyncWaiter时,传递的参数除了要删除的AsyncWaiter的地址外,还包括与它关联的一个Flag对象的地址。为什么不可以只传递要删除的AsyncWaiter的地址给WaitableEventKernel类的成员函数Dequeue呢?这是为了处理一种称为ABA的问题的。
当一个AsyncWaiter被Fired时,它的成员函数Fire会被调用,并且这个成员函数在执行自我销毁的操作。如果这时候恰好其它地方又创建了一个AsyncWaiter,并且这个新创建的AsyncWaiter被添加到了同样的WaitableEvent的Waiter列表中,更神奇的是这个新创建的AsyncWaiter占用的内存与前面被销毁的AsyncWaiter占用的内存是完全一样的。这样就会导致前面调用WaitableEventKernel类的成员函数Dequeue删除了一个不该删除的AsyncWaiter!
注意,这种情况只会出现在异步等待WaitableEvent状态变为Signaled的情况,对于同步等待WaitableEvent状态变为Signaled的情况是没有这样的问题的。分析为了避免这种情况,WaitableEventKernel类的成员函数Dequeue要求删除一个Waiter时,提供另外一个额外的参数tag,该参数会传递给即将被删除的Waiter的成员函数Compare。只有被删除的Waiter存在Waiter列表中,并且它的成员函数Compare的返回值也为true的情况下,WaitableEventKernel类的成员函数Dequeue才会将它从Waiter列表中删除,如下所示:
bool WaitableEvent::WaitableEventKernel::Dequeue(Waiter* waiter, void* tag) {
for (std::list<Waiter*>::iterator
i = waiters_.begin(); i != waiters_.end(); ++i) {
if (*i == waiter && (*i)->Compare(tag)) {
waiters_.erase(i);
return true;
}
}
return false;
}
这个函数定义在文件external/chromium_org/base/synchronization/waitable_event_posix.cc。
AsyncWaiter类的成员函数Compare的实现如下所示:
class AsyncWaiter : public WaitableEvent::Waiter {
public:
......
// See StopWatching for discussion
virtual bool Compare(void* tag) OVERRIDE {
return tag == flag_.get();
}
private:
......
scoped_refptr<Flag> flag_;
};
这个函数定义在文件external/chromium_org/base/synchronization/waitable_event_watcher_posix.cc中。
从这里我们可以看到,只有当参数tag的值等于AsyncWaiter类的成员变量flag_引用的一个Flag对象的地址值时,AsyncWaiter类的成员函数Compare的返回值才为true。
由于AsyncWaiter类的成员变量flag_是一个scoped_refptr智能指针,它引用的Flag对象同时也被关联的WatiableEventWatcher对象通过scoped_refptr智能指针引用,这意味着当一个AsyncWaiter被销毁时,它的成员变量flag_引用的Flag对象是仍然存在的,这样通过比较它的地址值与参数tag的值,就可以区别出两个前后分配在同一块内存的AsyncWaiter对象来。
至此,我们就分析完成了WaitableEvent类是如何实现线程在无消息处理时进入睡眠状态,并且在有消息处理时从睡眠状态唤醒过来的。总结来说,就是通过条件变量来实现的。同时,我们还知道,WaitableEvent类不仅可以实现同步等待,还可以实现异步等待,前者通过配合SyncWaiter类实现,而后者通过配合AsyncWaiter类实现。异步等待,或者说异步操作,是Chromium的一大特色。正是由于使用了大量的异步操作,Chromium才能做到快速地响应用户输入,从而使得用户感觉到Chromium在打开网页的时候非常快。
回到MessagePumpDefault类的成员函数Run中,我们前面提到,每次它通过成员变量event_描述的一个WaitableEvent从睡眠状态唤醒过来之后,会依次调用参数delegate描述的一个MessageLoop对象的成员函数DoWork和DoDelayedWork处理消息队列的消息以及成员函数DoIdleWork处理一些线程空闲时任务。
在分析MessageLoop类的成员函数DoWork、DoDelayedWork和DoIdleWork之前,我们首先分析向一个线程的消息队列发送消息的过程。这是通过我们在前面提到的MessageLoop类的成员函数PostTask、PostDelayedTask、 PostNonNestableTask和PostNonNestableDelayedTask实现的。它们的定义如下所示:
void MessageLoop::PostTask(
const tracked_objects::Location& from_here,
const Closure& task) {
......
incoming_task_queue_->AddToIncomingQueue(from_here, task, TimeDelta(), true);
}
void MessageLoop::PostDelayedTask(
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) {
......
incoming_task_queue_->AddToIncomingQueue(from_here, task, delay, true);
}
void MessageLoop::PostNonNestableTask(
const tracked_objects::Location& from_here,
const Closure& task) {
......
incoming_task_queue_->AddToIncomingQueue(from_here, task, TimeDelta(), false);
}
void MessageLoop::PostNonNestableDelayedTask(
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) {
......
incoming_task_queue_->AddToIncomingQueue(from_here, task, delay, false);
}
这四个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
MessageLoop类的上述四个成员函数都是通过调用成员变量incoming_task_queue_描述的一个IncomingTaskQueue对象的成员函数AddToIncomingQueue来发送参数task描述的一个Closure对象到线程的消息队列去的。
IncomingTaskQueue类的成员函数AddToIncomingQueue的实现如下所示:
bool IncomingTaskQueue::AddToIncomingQueue(
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay,
bool nestable) {
AutoLock locked(incoming_queue_lock_);
PendingTask pending_task(
from_here, task, CalculateDelayedRuntime(delay), nestable);
return PostPendingTask(&pending_task);
}
这个函数定义在文件external/chromium_org/base/message_loop/incoming_task_queue.cc中。
IncomingTaskQueue类的成员函数AddToIncomingQueue首先将参数task描述的Closure封装在一个PendingTask中,然后调用另外一个成员函数PostPendingTask将它发送到线程的消息队列中去。由于消息队列既会被发送消息的线程操作,也会被处理消息的线程操作,因此它需要在加锁的前提下进行操作。这个锁通过IncomingTaskQueue类的成员变量incoming_queue_lock_描述。
IncomingTaskQueue类的成员函数PostPendingTask的实现如下所示:
bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
......
// This should only be called while the lock is taken.
incoming_queue_lock_.AssertAcquired();
......
bool was_empty = incoming_queue_.empty();
incoming_queue_.push(*pending_task);
pending_task->task.Reset();
// Wake up the pump.
message_loop_->ScheduleWork(was_empty);
return true;
}
这个函数定义在文件external/chromium_org/base/message_loop/incoming_task_queue.cc中。
IncomingTaskQueue类的成员函数PostPendingTask首先是调用成员变量incoming_queue_lock_描述的一个锁的成员函数AssertAcquired确保该锁已经被调用者获取,因为它接下来要操作线程的消息队列。
IncomingTaskQueue类的成员函数PostPendingTask接下来要做的事情很简单,一是将参数pending_task描述的一个PendingTask保存在成员变量incoming_queue_描述的一个TaskQueue中,二是调用成员变量message_loop_描述的一个MessageLoop的成员函数ScheduleWork唤醒线程对刚才添加在TaskQueue的PendingTask进行处理。
注意,在调用MessageLoop类的成员函数ScheduleWork的时候,传递有一个参数was_empty,该参数用来描述在添加参数pending_task描述的一个PendingTask到线程的消息队列之前,线程的消息队列是否为空。如果为空,意味着线程当前处于无限睡眠状态中,因此需要主动唤醒它。如果不为空,则说明线程当前要么正在运行,要么是处于一个会自动唤醒过来的睡眠状态中。后面这种情况不需要唤醒线程的。
MessageLoop类的成员函数ScheduleWork的实现如下所示:
bool AlwaysNotifyPump(MessageLoop::Type type) {
#if defined(OS_ANDROID)
return type == MessageLoop::TYPE_UI || type == MessageLoop::TYPE_JAVA;
#else
return false;
#endif
}
......
void MessageLoop::ScheduleWork(bool was_empty) {
// The Android UI message loop needs to get notified each time
// a task is added to the incoming queue.
if (was_empty || AlwaysNotifyPump(type_))
pump_->ScheduleWork();
}
这两个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
如前所述,当参数was_empty的值等于true的时候,MessageLoop类的成员函数ScheduleWork就会调用成员变量pump_描述的一个MessagePump对象的成员函数ScheduleWork来唤醒线程。
但是对于Android平台来说,如果当前正在处理的MessageLoop关联的是一个UI线程或者Java线程,不管参数was_empty的值是否为true,都需要唤醒它们。这是因为对于Android平台来说,UI线程和Java线程在Java层有着自己的消息循环,Native层的消息循环是借助于Java层的消息循环来实现的。这意味着线程的消息循环不是由Native来管理的,也就是Native层不知道Java的消息循环的管理逻辑,它就只有每当有新的消息加入,都通知一下Java层对该消息进行处理。后面我们再详细分析Android平台的UI线程和Java线程在Native层的消息循环的实现。
前面我们假设MessageLoop类的成员变量pump_指向的是一个MessagePumpDefault对象,因此接下来MessageLoop类的成员函数ScheduleWork调用的是MessagePumpDefault类的成员函数ScheduleWork,它的实现如下所示:
void MessagePumpDefault::ScheduleWork() {
// Since this can be called on any thread, we need to ensure that our Run
// loop wakes up.
event_.Signal();
}
这个函数定义在文件external/chromium_org/base/message_loop/message_pump_default.cc中。
前面分析MessagePumpDefault类的成员函数Run的时候提到,如果一个线程当前是处于睡眠状态的,那么它就是通过调用成员变量event_描述的一个WaitableEvent的成员函数Wait或者TimedWait进入到睡眠状态的,因此现在就可以通过调用该WaitableEvent的成员函数Signal来唤醒它。这个唤醒过程可以参考前面分析的WaitableEvent类的成员函数Signal的实现。
线程被唤醒之后,如前所述,就会依次调用MessageLoop类的成员函数DoWork、DoDelayedWork和DoIdleWork。
MessageLoop类的成员函数DoWork的实现如下所示:
bool MessageLoop::DoWork() {
if (!nestable_tasks_allowed_) {
// Task can't be executed right now.
return false;
}
for (;;) {
ReloadWorkQueue();
if (work_queue_.empty())
break;
// Execute oldest task.
do {
PendingTask pending_task = work_queue_.front();
work_queue_.pop();
if (!pending_task.delayed_run_time.is_null()) {
AddToDelayedWorkQueue(pending_task);
// If we changed the topmost task, then it is time to reschedule.
if (delayed_work_queue_.top().task.Equals(pending_task.task))
pump_->ScheduleDelayedWork(pending_task.delayed_run_time);
} else {
if (DeferOrRunPendingTask(pending_task))
return true;
}
} while (!work_queue_.empty());
}
// Nothing happened.
return false;
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
在默认情况下,消息是禁止嵌套处理的,也就是说,线程在处理一个消息的过程时,不能够处理其它消息,这时候MessageLoop类的成员变量nestable_tasks_allowed_的值会被设置为false。因此,MessageLoop类的成员函数DoWork首先是判断成员变量nestable_tasks_allowed_的值是否等于false。如果等于的话,就什么也不做就返回了。
如果我们确实嵌套处理消息,那么需要通过ScopedNestableTaskAllower类临时设置线程的MessageLoop允许执行嵌套消息,即将MessageLoop类的成员变量nestable_tasks_allowed_设置为true。
ScopedNestableTaskAllower类的实现如下所示:
class BASE_EXPORT MessageLoop : public MessagePump::Delegate {
public:
......
// Enables nestable tasks on |loop| while in scope.
class ScopedNestableTaskAllower {
public:
explicit ScopedNestableTaskAllower(MessageLoop* loop)
: loop_(loop),
old_state_(loop_->NestableTasksAllowed()) {
loop_->SetNestableTasksAllowed(true);
}
~ScopedNestableTaskAllower() {
loop_->SetNestableTasksAllowed(old_state_);
}
private:
MessageLoop* loop_;
bool old_state_;
};
......
};
这个类定义在文件external/chromium_org/base/message_loop/message_loop.h中。
ScopedNestableTaskAllower类的构造函数调用了MessageLoop类的成员函数SetNestableTasksAllowed将线程的消息循环设置为可嵌套执行消息,并且在析构函数中也是调用MessageLoop类的成员函数SetNestableTasksAllowed将线程的消息循环设置为不可嵌套执行消息。
回到MessageLoop类的成员函数DoWork中,它接下来是通过两个循环不断地处理线程的消息队列的消息,直到该消息队列为空为止。
在外层的循环中,MessageLoop类的成员函数DoWork首先是调用另外一个成员函数ReloadWorkQueue将保存在成员变量incoming_task_queue_描述的一个IncomingTaskQueue中的消息提取出来,保存在成员变量work_queue_描述的一个TaskQueue中,然后再通过内层的循环对保存在该TaskQueue的每一个消息进行处理。
MessageLoop类的成员函数ReloadWorkQueue的实现如下所示:
void MessageLoop::ReloadWorkQueue() {
......
if (work_queue_.empty())
incoming_task_queue_->ReloadWorkQueue(&work_queue_);
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
在成员变员work_queue_描述的TaskQueue为空的情况下, MessageLoop类的成员函数ReloadWorkQueue调用成员变量incoming_task_queue_描述的一个IncomingTaskQueue对象的成员函数ReloadWorkQueue将它里面消息都提取到成员变员work_queue_描述的一个TaskQueue中。
IncomingTaskQueue类的成员函数ReloadWorkQueue的实现如下所示:
void IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue) {
......
// Acquire all we can from the inter-thread queue with one lock acquisition.
AutoLock lock(incoming_queue_lock_);
if (!incoming_queue_.empty())
incoming_queue_.Swap(work_queue); // Constant time
......
}
这个函数定义在文件external/chromium_org/base/message_loop/incoming_task_queue.cc中。
在成员变量incoming_queue_描述的TaskQueue不为空的情况下, IncomingTaskQueue类的成员函数ReloadWorkQueue通过交换它和参数work_queue描述的TaskQueue即可将它里面的消息都提取给参数work_queue描述的TaskQueue。
回到MessageLoop类的成员函数DoWork中,它接下来就通过内层循环对已经提取到成员变量work_queue_描述的TaskQueue的消息进行处理,为了方便描述,我们将相应的代码再列出来,如下所示:
// Execute oldest task.
do {
PendingTask pending_task = work_queue_.front();
work_queue_.pop();
if (!pending_task.delayed_run_time.is_null()) {
AddToDelayedWorkQueue(pending_task);
// If we changed the topmost task, then it is time to reschedule.
if (delayed_work_queue_.top().task.Equals(pending_task.task))
pump_->ScheduleDelayedWork(pending_task.delayed_run_time);
} else {
if (DeferOrRunPendingTask(pending_task))
return true;
}
} while (!work_queue_.empty());
这段代码依次地将保存在成员变量work_queue_描述的TaskQueue中的消息提取出来。每一个消息都是通过一个PendingTask描述的。
如果提取出来的消息是一个延迟处理的消息,即对应的PendingTask对象的成员变量delayed_run_time设置的时间不为空,那么就会调用MessageLoop类的成员函数AddToDelayedWorkQueue将它添加到另外一个延迟处理的消息队列中。如果该延迟消息被添加到了延迟处理消息队列的头部,那么就意味着要修改线程的下一次进入睡眠状态的时间长度,这是因为保存在处迟处理消息队列的消息是按照延迟处理时间从小到大的顺序排序的。
MessageLoop类的成员函数AddToDelayedWorkQueue的实现如下所示:
void MessageLoop::AddToDelayedWorkQueue(const PendingTask& pending_task) {
// Move to the delayed work queue.
delayed_work_queue_.push(pending_task);
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
MessageLoop类使用的延迟处理消息队列由成员变量delayed_work_queue_描述的一个DelayedTaskQueue对象表示,当调用它的成员函数push新添加一个PendingTask时,就会根据该PendingTask延迟执行时间点将放在队列的合适位置,使得队列始终是按照延迟执行时间点从小到大的顺序排列它里面的PendingTask。
修改线程的下一次进入睡眠状态的时间长度是通过调用MessageLoop类的成员变量pump_指向的一个MessagePumpDefault对象的成员函数ScheduleDelayedWork实现的,它的实现如下所示:
void MessagePumpDefault::ScheduleDelayedWork(
const TimeTicks& delayed_work_time) {
// We know that we can't be blocked on Wait right now since this method can
// only be called on the same thread as Run, so we only need to update our
// record of how long to sleep when we do sleep.
delayed_work_time_ = delayed_work_time;
}
这个函数定义在文件external/chromium_org/base/message_loop/message_pump_default.cc中。
MessagePumpDefault类的成员函数ScheduleDelayedWork只是简单地将参数delayed_work_time描述的时间保存在成员变量delayed_work_time_中。等到MessageLoop类的成员函数DoWork执行完毕回到MessagePumpDefault类的成员函数Run时,该时间就会用来计算线程下一次要进入睡眠状态的时间长度。
最后,如果一个消息需要马上处理,那么MessageLoop类的成员函数DoWork的内层循环就会调用另外一个成员函数DeferOrRunPendingTask来对它进行处理。
MessageLoop类的成员函数DeferOrRunPendingTask的实现如下所示:
bool MessageLoop::DeferOrRunPendingTask(const PendingTask& pending_task) {
if (pending_task.nestable || run_loop_->run_depth_ == 1) {
RunTask(pending_task);
// Show that we ran a task (Note: a new one might arrive as a
// consequence!).
return true;
}
// We couldn't run the task now because we're in a nested message loop
// and the task isn't nestable.
deferred_non_nestable_work_queue_.push(pending_task);
return false;
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
参数pending_task描述的消息能够马上执行需要满足以下两个条件之一:
1. 参数pending_task描述的消息是一个可嵌套处理的消息,即对应的PendingTask对象的成员变量nestable的值等于true。
2. 参数pending_task描述的消息不是一个可嵌套处理的消息,但是线程当前运行在最外层的消息循环中,即MessageLoop类的成员变量run_loop_描述的一个RunLoop对象的成员变量run_depth_的值等于1。
如果以上两个条件都不能满足,那么就将参数pending_task描述的消息添加到成员变量deferred_non_nestable_work_queue_描述的一个TaskQueue中等待在合适的时候再处理。
如果能满足以上两个条件之一,那么就将参数pending_task描述的消息就会被MessageLoop类的成员函数RunTask进行处理,如下所示:
void MessageLoop::RunTask(const PendingTask& pending_task) {
......
// Execute the task and assume the worst: It is probably not reentrant.
nestable_tasks_allowed_ = false;
......
pending_task.task.Run();
......
nestable_tasks_allowed_ = true;
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
从这里就可以看到,MessageLoop类的成员函数RunTask在执行参数pending_task描述的消息之前,会先将成员变量nestable_tasks_allowed_的值设置为false,用来禁止线程嵌套执行其它消息,并且在执行完成参数pending_task描述的消息的之后,将成员变量nestable_tasks_allowed_的值重新设置为true。
从前面分析的MessageLoop类的成员函数PostTask、PostDelayedTask、 PostNonNestableTask和PostNonNestableDelayedTask的实现可以知道,参数pending_task描述的消息实际上是一个Closure对象,该Closure对象保存在参数pending_task指向的一个PendingTask对象的成员变量task中。从前面Chromium多线程通信的Closure机制分析一文可以知道,调用该Closure对象的成员函数Run即可执行它描述的任务。
接下来我们继续分析MessageLoop类的成员函数DoDelayedWork的实现,如下所示:
bool MessageLoop::DoDelayedWork(TimeTicks* next_delayed_work_time) {
if (!nestable_tasks_allowed_ || delayed_work_queue_.empty()) {
recent_time_ = *next_delayed_work_time = TimeTicks();
return false;
}
......
TimeTicks next_run_time = delayed_work_queue_.top().delayed_run_time;
if (next_run_time > recent_time_) {
recent_time_ = TimeTicks::Now(); // Get a better view of Now();
if (next_run_time > recent_time_) {
*next_delayed_work_time = next_run_time;
return false;
}
}
PendingTask pending_task = delayed_work_queue_.top();
delayed_work_queue_.pop();
if (!delayed_work_queue_.empty())
*next_delayed_work_time = delayed_work_queue_.top().delayed_run_time;
return DeferOrRunPendingTask(pending_task);
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
参数DoDelayedWork描述的是下一个延迟处理消息的执行时间点。
如果线程的延迟处理消息队列空,那么MessageLoop类的成员函数DoDelayedWork就只是简单地置空成员变量recent_time_和参数next_delayed_work_time描述的时间变返回了。
另一方面,与MessageLoop类的成员函数DoWork类似,MessageLoop类的成员函数DoDelayedWork也禁止处理嵌套消息,因此,当MessageLoop类的成员变量nestable_tasks_allowed_的值等于false的时候,MessageLoop类的成员函数DoDelayedWork就直接返回。
如果MessageLoop类的成员函数DoDelayedWork可以继续往下执行,那么它就检查位于延迟处理消息队列头部的消息,并且判断它的执行时间是否大于当前时间。如果是的话的,那么就说明该消息还未到时间执行,因此MessageLoop类的成员函数DoDelayedWork不会执行它,而是直接返回。
最后,如果位于延迟处理消息队列头部的消息的执行时间小于等于当前时间,那么就是时间将它从队列中取出,并且执行了。在执行之前,MessageLoop类的成员函数DoDelayedWork会获得下一个延迟处理消息的执行时间点,并且保存在参数next_delayed_work_time描述的一个TimeTicks对象,以便返回到MessagePumpDefault类的成员函数Run的时候,后者可以计算出下一次进入睡眠状态的时间长度。
延迟处理消息同样是通过我们在前面分析的MessageLoop类的成员函数DeferOrRunPendingTask来执行的,因此这里不再复述。
接下来我们继续分析MessageLoop类的成员函数DoIdleWork的实现,如下所示:
bool MessageLoop::DoIdleWork() {
if (ProcessNextDelayedNonNestableTask())
return true;
if (run_loop_->quit_when_idle_received_)
pump_->Quit();
return false;
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
MessageLoop类的成员函数DoIdleWork调用另外一个成员函数ProcessNextDelayedNonNestableTask处理那些被延迟的不能嵌套处理的消息。
如果线程没有被延迟的不能嵌套处理的消息,并且当前消息循环使用的RunLoop的成员变量quit_when_idle_received_的值被设置为true,即线程被设置在空闲时无事可做时,就会退出线程,这是通过调用成员变量pump_指向的一个MessagePumpDefault对象的成员函数Quit来实现的。
接下来我们主要分析MessageLoop类的成员函数ProcessNextDelayedNonNestableTask的实现,如下所示:
bool MessageLoop::ProcessNextDelayedNonNestableTask() {
if (run_loop_->run_depth_ != 1)
return false;
if (deferred_non_nestable_work_queue_.empty())
return false;
PendingTask pending_task = deferred_non_nestable_work_queue_.front();
deferred_non_nestable_work_queue_.pop();
RunTask(pending_task);
return true;
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
被延迟的不能嵌套处理的消息保存在MessageLoop类的成员变量deferred_non_nestable_work_queue_描述的一个TaskQueue中,这些消息只能够在最外层的消息循环中执行。因此,MessageLoop类的成员函数ProcessNextDelayedNonNestableTask首先判断线程当前是否运行在最外层的消息循环中,即判断成员变量run_loop_指向的一个RunLoop对象的成员变量run_depth_的值是否等于1。如果不等于1,那么就直接返回不往下执行了。
如果线程当前是运行在最外层的消息循环中,那么接下就从成员变量deferred_non_nestable_work_queue_描述的一个TaskQueue的头部取出一个消息,并且调用前面分析过的成员函数RunTask对它进行处理。
至此,我们就分析完成了线程的启动、围绕消息队列运行、发送消息和处理消息的过程了。其中,围绕消息队列运行这一过程是针对普通的线程的。对于Android平台的UI线程和Java线程,由于它们在Java层使用了Android系统提供的消息循环机制,因此如果我们需要在Native层使用Chromium提供的消息循环机制,就要进行特殊处理。接下来我们就继续分析如何在Android平台的UI线程和Java线程中使用Chromium提供的消息循环机制。关于Android系统提供的消息循环机制,可以参考Android应用程序消息处理机制(Looper、Handler)分析一文。
对于Android平台的UI线程和Java线程来说,它们使用的消息循环和消息泵分别是通过MessageLoopForUI类和MessagePumpForUI类描述的,这就区别于一般线程使用MessageLoop类和MesagePumpDefault来描述消息循环和消息泵。接下来我们以UI线程为例来说明它是如何使用Chromium提供的消息循环机制的。对于Java线程,原理是一样的。
当我们在Android应用程序中使用WebView的时候,会在UI线程中调用BrowserMainRunnerImpl类的成员函数Initialize执行一些初始化工作,其中就包括在Native层中创建一个类型为MessageLoopForUI的消息循环。在后面的文章中,我们分析WebView的启动过程时,就会看到这一过程。现在我们直接分析BrowserMainRunnerImpl类的成员函数Initialize的实现,如下所示:
class BrowserMainRunnerImpl : public BrowserMainRunner {
public:
......
virtual int Initialize(const MainFunctionParams& parameters) OVERRIDE {
......
if (!initialization_started_) {
initialization_started_ = true;
main_loop_.reset(new BrowserMainLoop(parameters));
......
main_loop_->EarlyInitialization();
......
main_loop_->MainMessageLoopStart();
......
}
......
int result_code = main_loop_->GetResultCode();
if (result_code > 0)
return result_code;
// Return -1 to indicate no early termination.
return -1;
}
......
};
这个函数定义在文件external/chromium_org/content/browser/browser_main_runner.cc。
BrowserMainRunnerImpl类的成员函数Initialize首先是创建了一个BrowserMainLoop对象,并且保存在成员变量main_loop_中。接下来再调用该BrowserMainLoop对象的成员函数EarlyInitialization执行早期初始化工作。这个早期初始化工作就包括了创建一个类型为MessageLoopForUI的消息循环,如下所示:
void BrowserMainLoop::EarlyInitialization() {
......
if (parts_)
parts_->PreEarlyInitialization();
......
}
这个函数定义在文件external/chromium_org/content/browser/browser_main_loop.cc中。
BrowserMainLoop类的成员变量parts_指向的是一个AwBrowserMainParts对象,这里调用它的成员函数PreEarlyInitialization创建一个类型为MessageLoopForUI的消息循环,如下所示:
void AwBrowserMainParts::PreEarlyInitialization() {
......
// Android WebView does not use default MessageLoop. It has its own
// Android specific MessageLoop. Also see MainMessageLoopRun.
DCHECK(!main_message_loop_.get());
main_message_loop_.reset(new base::MessageLoopForUI);
base::MessageLoopForUI::current()->Start();
}
这个函数定义在文件external/chromium_org/android_webview/browser/aw_browser_main_parts.cc中。
AwBrowserMainParts类的成员函数PreEarlyInitialization创建了一个MessageLoopForUI对象,接着再调用它的成员函数Start执行启动工作。
接下来我们先分析MessageLoopForUI对象的创建过程,即MessageLoopForUI类的构造函数的实现,如下所示:
class BASE_EXPORT MessageLoopForUI : public MessageLoop {
public:
MessageLoopForUI() : MessageLoop(TYPE_UI) {
}
......
};
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.h中。
MessageLoopForUI类继承了MessageLoop类。MessageLoopForUI类的构造函数在调用父类MessageLoop的构造函数时,传递进去的参数为TYPE_UI,表示要创建一个类型为TYPE_UI的消息循环。从前面的分析可以知道,MessageLoop类的构造函数会根据传递给它的参数TYPE_UI创建一个类型为MessagePumpForUI的消息泵,并且保存在成员变量pump_中。
回到AwBrowserMainParts类的成员函数PreEarlyInitialization中,当它创建了一个MessageLoopForUI对象之后,接下来就会调用它的成员函数Start执行启动工作,如下所示:
void MessageLoopForUI::Start() {
// No Histogram support for UI message loop as it is managed by Java side
static_cast<MessagePumpForUI*>(pump_.get())->Start(this);
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
MessageLoopForUI类的成员变量pump_是从父类MessageLoop继承下来的,从前面的分析可以知道,它指向的是一个MessagePumpForUI对象,这里调用它的成员函数Start执行启动工作,如下所示:
void MessagePumpForUI::Start(Delegate* delegate) {
run_loop_ = new RunLoop();
// Since the RunLoop was just created above, BeforeRun should be guaranteed to
// return true (it only returns false if the RunLoop has been Quit already).
if (!run_loop_->BeforeRun())
NOTREACHED();
DCHECK(system_message_handler_obj_.is_null());
JNIEnv* env = base::android::AttachCurrentThread();
DCHECK(env);
system_message_handler_obj_.Reset(
Java_SystemMessageHandler_create(
env, reinterpret_cast<intptr_t>(delegate)));
}
这个函数定义在文件external/chromium_org/base/message_loop/message_pump_android.cc中。
MessagePumpForUI类的成员函数Start首先是创建了一个RunLoop对象,并且调用它的成员函数BeforeRun创建最外层的消息循环,接下来调用函数Java_SystemMessageHandler_create创建了一个Java层的SystemMessageHandler对象,并且保存在成员变量system_message_handler_obj_对象。这个SystemMessageHandler对象就是Java层和Native层的消息循环之间进行通信的桥梁。
这样,Android应用程序的UI线程在Chromium中使用的消息循环就启动起来了。注意,Android应用程序的UI线程是在Java层围绕Android的消息队列运行的,因此它不能像普通的线程一样,也在Native层围绕Chromium的消息队列运行,也就是Android应用程序的UI线程不会调用MessageLoopForUI类的成员函数Run进入运行状态。
这一步执行完成之后,一直返回到BrowserMainRunnerImpl类的成员函数Initialize,它接下来调用前面创建的BrowserMainLoop对象的成员函数MainMessageLoopStart,继续执行其它的初始化工作,如下所示:
void BrowserMainLoop::MainMessageLoopStart() {
......
InitializeMainThread();
......
}
这个函数定义在文件external/chromium_org/content/browser/browser_main_loop.cc中。
BrowserMainLoop对象的成员函数MainMessageLoopStart执行的其中一个工作是调用另外一个成员函数InitializeMainThread初始化主线程,如下所示:
void BrowserMainLoop::InitializeMainThread() {
......
// Register the main thread by instantiating it, but don't call any methods.
main_thread_.reset(
new BrowserThreadImpl(BrowserThread::UI, base::MessageLoop::current()));
}
这个函数定义在文件external/chromium_org/content/browser/browser_main_loop.cc中。
BrowserMainLoop类的成员函数InitializeMainThread为主线程创建了一个BrowserThreadImpl对象,并且保存在成员变量main_thread_中。
BrowserThreadImpl对象的创建过程如下所示:
BrowserThreadImpl::BrowserThreadImpl(ID identifier,
base::MessageLoop* message_loop)
: Thread(message_loop->thread_name()), identifier_(identifier) {
set_message_loop(message_loop);
Initialize();
}
这个函数定义在文件external/chromium_org/content/browser/browser_thread_impl.cc。
从前面的调用过程可以知道,参数identifier的值为BrowserThread::UI,而参数message_loop指向的是一个MessageLoopForUI对象。
BrowserThreadImpl类是从我们前面分析过的base::Thread类继承下来的,BrowserThreadImpl类调用的成员函数set_message_loop也是从父类base::Thread继承下来的,这里调用它来为UI线程设置一个消息循环,如下所示:
class BASE_EXPORT Thread : PlatformThread::Delegate {
public:
......
protected:
......
void set_message_loop(MessageLoop* message_loop) {
message_loop_ = message_loop;
}
......
private:
......
MessageLoop* message_loop_;
......
};
这个函数定义在文件external/chromium_org/base/threading/thread.h中。
这意味Android应用程序的UI线程虽然不像其它线程一样,不能在Native层围绕Chromium的消息队列运行,但是它也像其线程一样,使用一个Thead对象来描述,并且这个Thread对象具有一个类型为MessageLoopForUI的消息循环。
返回到BrowserThreadImpl类的构造函数中,设置了Android应用程序的UI线程的消息循环之外,接下来调用另外一个成员函数Initialize执行其它的初始化工作,如下所示:
base::LazyInstance<BrowserThreadGlobals>::Leaky
g_globals = LAZY_INSTANCE_INITIALIZER;
......
void BrowserThreadImpl::Initialize() {
BrowserThreadGlobals& globals = g_globals.Get();
base::AutoLock lock(globals.lock);
......
globals.threads[identifier_] = this;
}
这个函数定义在文件external/chromium_org/content/browser/browser_thread_impl.cc中。
BrowserThreadImpl类的成员函数Initialize做的工作实际上就是将用来描述UI线程的一个BrowserThreadImpl对象注册在全局变量g_globals指向的一个BrowserThreadGlobals对象的成员变量threads描述的一个BrowserThreadImpl数组中。
有了这个BrowserThreadImpl数组之后,以后就可以通过BrowserThread类的静态成员函数PostTask、PostDelayedTask、PostNonNestableTask和PostNonNestableDelayedTask等向UI线程发送消息。以BrowserThread类的静态成员函数PostTask为例,调用它向UI线程发送消息的代码如下所示:
BrowserThread::PostTask(BrowserThread::UI, FROM_HERE, task);
BrowserThread类的静态成员函数PostTask的实现如下所示:
bool BrowserThread::PostTask(ID identifier,
const tracked_objects::Location& from_here,
const base::Closure& task) {
return BrowserThreadImpl::PostTaskHelper(
identifier, from_here, task, base::TimeDelta(), true);
}
这个函数定义在文件external/chromium_org/content/browser/browser_thread_impl.cc中。
BrowserThread类的静态成员函数PostTask调用了BrowserThreadImpl类的静态成员函数PostTaskHelper向参数identifier描述的线程的消息队列发送一个Closure。
BrowserThreadImpl类的静态成员函数PostTaskHelper的实现如下所示:
bool BrowserThreadImpl::PostTaskHelper(
BrowserThread::ID identifier,
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay,
bool nestable) {
......
base::MessageLoop* message_loop =
globals.threads[identifier] ? globals.threads[identifier]->message_loop()
: NULL;
if (message_loop) {
if (nestable) {
message_loop->PostDelayedTask(from_here, task, delay);
} else {
message_loop->PostNonNestableDelayedTask(from_here, task, delay);
}
}
.....
return !!message_loop;
}
这个函数定义在文件external/chromium_org/content/browser/browser_thread_impl.cc中。
BrowserThreadImpl类的静态成员函数PostTaskHelper首先是根据参数identifier在前面描述的全局变量g_globals指向的一个BrowserThreadGlobals对象的成员变量threads描述的一个BrowserThreadImpl数组中获得一个对应的BrowserThreadImpl对象,接着再通过调用BrowserThreadImpl对象的成员函数message_loop获得一个MessageLoop对象。有了这个MessageLoop对象之后,就可以调用它的成员函数PostDelayedTask或者PostNonNestableDelayedTask向指定的线程的消息队列发送消息了。
前面我们已经分析过MessageLoop类的成员函数PostDelayedTask和PostNonNestableDelayedTask了。从前面的分析可以知道,当它们向线程的消息队列发送了一个消息之后,最后会调用它的成员变量pump_描述的一个MessagePump对象的成员函数ScheduleWork唤醒线程,以便它可以处理新发送到消息队列的消息。
从前面的分析可以知道,对于UI线程来说,它使用的消息循环通过类MessageLoopForUI来描述,而MessageLoopForUI类的成员变量pump_指向的是一个MessagePumpForUI对象。MessagePumpForUI类是从MessagePump类继承下来的,并且重写了成员函数ScheduleWork,如下所示:
void MessagePumpForUI::ScheduleWork() {
DCHECK(!system_message_handler_obj_.is_null());
JNIEnv* env = base::android::AttachCurrentThread();
DCHECK(env);
Java_SystemMessageHandler_scheduleWork(env,
system_message_handler_obj_.obj());
}
这个函数定义在文件/external/chromium_org/base/message_loop/message_pump_android.cc中。
前面提到,MessagePumpForUI类的成员变量system_message_handler_obj_描述的是一个Java层的SystemMessageHandler对象,这里通过函数Java_SystemMessageHandler_scheduleWork调用它的成员函数scheduleWork。
Java层的SystemMessageHandler类的成员函数scheduleWork的实现如下所示:
class SystemMessageHandler extends Handler {
......
@SuppressWarnings("unused")
@CalledByNative
private void scheduleWork() {
sendEmptyMessage(SCHEDULED_WORK);
}
......
}
这个函数定义在文件external/chromium_org/base/android/java/src/org/chromium/base/SystemMessageHandler.java中。
SystemMessageHandler类的成员函数scheduleWork调用从父类Handler继承下来的成员函数sendEmptyMessage向Java层的消息队列发送一个类型为SCHEDULED_WORK的消息,该消息最终地在SystemMessageHandler类的成员函数handleMessage中得到处理,如下所示:
class SystemMessageHandler extends Handler {
......
// Native class pointer set by the constructor of the SharedClient native class.
private long mMessagePumpDelegateNative = 0;
......
private SystemMessageHandler(long messagePumpDelegateNative) {
mMessagePumpDelegateNative = messagePumpDelegateNative;
}
@Override
public void handleMessage(Message msg) {
......
nativeDoRunLoopOnce(mMessagePumpDelegateNative, mDelayedScheduledTimeTicks);
}
......
}
这个函数定义在文件external/chromium_org/base/android/java/src/org/chromium/base/SystemMessageHandler.java中。
SystemMessageHandler类的成员变量mMessagePumpDelegateNative是在构造函数中初始化的,从前面分析的MessagePumpForUI的成员函数Start可以知道,它指向的是一个Native层的MessageLoopForUI对象。
SystemMessageHandler类的成员函数handleMessage调用JNI函数nativeDoRunLoopOnce通知成员变量mMessagePumpDelegateNative描述的一个Native层的MessageLoopForUI对象,它的消息队列有新的消息需要处理。
SystemMessageHandler类的JNI函数nativeDoRunLoopOnce由Naitve层的函数Java_com_android_org_chromium_base_SystemMessageHandler_nativeDoRunLoopOnce实现,如下所示:
static void DoRunLoopOnce(JNIEnv* env, jobject jcaller,
jlong messagePumpDelegateNative,
jlong delayedScheduledTimeTicks);
__attribute__((visibility("default")))
void
Java_com_android_org_chromium_base_SystemMessageHandler_nativeDoRunLoopOnce(JNIEnv*
env, jobject jcaller,
jlong messagePumpDelegateNative,
jlong delayedScheduledTimeTicks) {
return DoRunLoopOnce(env, jcaller, messagePumpDelegateNative,
delayedScheduledTimeTicks);
}
这个函数定义在编译时自动生成的文件out/target/product/generic/obj/GYP/shared_intermediates/base/jni/SystemMessageHandler_jni.h中。
函数Java_com_android_org_chromium_base_SystemMessageHandler_nativeDoRunLoopOnce调用了另外一个函数DoRunLoopOnce通知UI线程在Native的消息队列有新的消息需要处理。
函数DoRunLoopOnce的实现如下所示:
static void DoRunLoopOnce(JNIEnv* env, jobject obj, jlong native_delegate,
jlong delayed_scheduled_time_ticks) {
base::MessagePump::Delegate* delegate =
reinterpret_cast<base::MessagePump::Delegate*>(native_delegate);
......
bool did_work = delegate->DoWork();
......
base::TimeTicks next_delayed_work_time;
did_work |= delegate->DoDelayedWork(&next_delayed_work_time);
if (!next_delayed_work_time.is_null()) {
......
if (delayed_scheduled_time_ticks == 0 ||
next_delayed_work_time < base::TimeTicks::FromInternalValue(
delayed_scheduled_time_ticks)) {
Java_SystemMessageHandler_scheduleDelayedWork(env, obj,
next_delayed_work_time.ToInternalValue(),
(next_delayed_work_time -
base::TimeTicks::Now()).InMillisecondsRoundedUp());
}
}
......
if (did_work)
return;
delegate->DoIdleWork();
}
这个函数定义在文件external/chromium_org/base/message_loop/message_pump_android.cc中。
函数DoRunLoopOnce首先是将参数native_delegate转化为一个Native层的base::MessagePump::Delegate对象。在我们这个情景中,这个base::MessagePump::Delegate对象实际上是一个MessageLoopForUI对象。
接下来函数DoRunLoopOnce执行的工作与前面分析的MessagePumpDefault类的成员函数Run是类似的,区别主要在于:
1. 函数DoRunLoopOnce只执行一次循环,而MessagePumpDefault类的成员函数Run执行的是无限循环。
2. 函数DoRunLoopOnce不能够进入睡眠状态,如果它有一个延迟处理的消息,那么需要通过函数Java_SystemMessageHandler_scheduleDelayedWork调用Java层的SystemMessageHandler类的成员函数scheduleDelayedWork来进行调度,而MessagePumpDefault类的成员函数Run可以自行进入睡眠状态来调度延迟处理消息的执行。
至此,我们就分析完成Android应用程序的UI线程是如何实现Chromium的消息循环了。接下来我们继续分析另外一个特殊的消息循环,那就是负责执行IPC的IO线程的消息循环。
在前面分析的BrowserMainRunnerImpl类的成员函数Initialize中,除了我们提到它会调用BrowserMainLoop类的成员函数EarlyInitialization和MainMessageLoopStart来执行一些与UI线程相关的工作之外,还会调用BrowserMainLoop类的成员函数CreateStartupTasks来执行其它的启动任务,如下所示:
class BrowserMainRunnerImpl : public BrowserMainRunner {
public:
......
virtual int Initialize(const MainFunctionParams& parameters) OVERRIDE {
......
if (!initialization_started_) {
......
}
main_loop_->CreateStartupTasks();
int result_code = main_loop_->GetResultCode();
if (result_code > 0)
return result_code;
// Return -1 to indicate no early termination.
return -1;
}
......
};
这个函数定义在文件external/chromium_org/content/browser/browser_main_runner.cc中。
BrowserMainLoop类的成员函数CreateStartupTasks执行的启动任务很多,这里我们只关心与IO线程相关的任务,如下所示:
void BrowserMainLoop::CreateStartupTasks() {
......
if (!startup_task_runner_.get()) {
startup_task_runner_ = make_scoped_ptr(new StartupTaskRunner(
base::Bind(&BrowserStartupComplete),
base::MessageLoop::current()->message_loop_proxy()));
......
StartupTask create_threads =
base::Bind(&BrowserMainLoop::CreateThreads, base::Unretained(this));
startup_task_runner_->AddTask(create_threads);
......
if (BrowserMayStartAsynchronously()) {
startup_task_runner_->StartRunningTasksAsync();
}
}
if (!BrowserMayStartAsynchronously()) {
......
startup_task_runner_->RunAllTasksNow();
}
}
这个函数定义在文件external/chromium_org/content/browser/browser_main_loop.cc中。
BrowserMainLoop类的成员函数CreateStartupTasks首先是会创建一个StartupTaskRunner对象,并且保存在成员变量startup_task_runner_中。这个StartupTaskRunner对象封装了当前线程的一个消息循环,因此通过它可以向当前线程的消息队列发送消息。当前线程即为Android应用程序的UI线程,因此有了这个StartupTaskRunner对象之后,接下来可以向UI线程的消息队列发送消息。
BrowserMainLoop类的成员函数CreateStartupTasks创建了一个用来创建线程的StartupTask,这个StartupTask绑定的函数为BrowserMainLoop类的成员函数CreateThreads,并且会保存在前面创建的一个StartupTaskRunner的内部等待执行。
最后,取决于Android应用程序的UI线程是使用同步还是异步方式来启动WebView,BrowserMainLoop类的成员函数CreateStartupTasks使用不同的方式来执行保存在成员变量startup_task_runner_指向的一个StartupTaskRunner对象中的StartupTask:
1. 如果是使用同步方式启动WebView,那么就调用上述StartupTaskRunner对象的成员函数RunAllTasksNow来执行保存在它里面的各个StartupTask对象的成员函数Run来执行它们。
2. 如果是使用异步方式启动WebView,那么就调用上述StartupTaskRunner对象的成员函数StartRunningTasksAsync向UI线程的消息队列发送一个消息,当该消息被处理时,再执行保存在上述StartupTaskRunner对象里面的各个StartupTask对象的成员函数Run。
无论是同步方式,还是异步方式,最终都会在UI线程调用BrowserMainLoop类的成员函数CreateThreads来创建一系列线程,如下所示:
int BrowserMainLoop::CreateThreads() {
......
base::Thread::Options default_options;
base::Thread::Options io_message_loop_options;
io_message_loop_options.message_loop_type = base::MessageLoop::TYPE_IO;
......
for (size_t thread_id = BrowserThread::UI + 1;
thread_id < BrowserThread::ID_COUNT;
++thread_id) {
scoped_ptr<BrowserProcessSubThread>* thread_to_start = NULL;
base::Thread::Options* options = &default_options;
switch (thread_id) {
......
case BrowserThread::IO:
......
thread_to_start = &io_thread_;
options = &io_message_loop_options;
break;
......
default:
NOTREACHED();
break;
}
BrowserThread::ID id = static_cast<BrowserThread::ID>(thread_id);
if (thread_to_start) {
(*thread_to_start).reset(new BrowserProcessSubThread(id));
(*thread_to_start)->StartWithOptions(*options);
}
......
}
......
return result_code_;
}
这个函数定义在文件external/chromium_org/content/browser/browser_main_loop.cc中。
BrowserMainLoop类的成员函数CreateThreads创建了很多线程,每一个线程都有专门的作用。这些线程的作用可以参考以下的枚举类型ID的定义:
class CONTENT_EXPORT BrowserThread {
public:
// An enumeration of the well-known threads.
// NOTE: threads must be listed in the order of their life-time, with each
// thread outliving every other thread below it.
enum ID {
// The main thread in the browser.
UI,
// This is the thread that interacts with the database.
DB,
// This is the thread that interacts with the file system.
FILE,
// Used for file system operations that block user interactions.
// Responsiveness of this thread affect users.
FILE_USER_BLOCKING,
// Used to launch and terminate Chrome processes.
PROCESS_LAUNCHER,
// This is the thread to handle slow HTTP cache operations.
CACHE,
// NOTE: do not add new threads here that are only used by a small number of
// files. Instead you should just use a Thread class and pass its
// MessageLoopProxy around. Named threads there are only for threads that
// are used in many places.
// This identifier does not represent a thread. Instead it counts the
// number of well-known threads. Insert new well-known threads before this
// identifier.
ID_COUNT
};
......
};
这个枚举类型定义在文件external/chromium_org/content/public/browser/browser_thread.h中。
回到BrowserMainLoop类的成员函数CreateThreads中,我们只关注IO线程的创建过程。这个IO线程使用一个BrowserProcessSubThread对象来描述,并且通过调用该BrowserProcessSubThread对象的成员函数StartWithOptions来启动。
BrowserProcessSubThread类是从BrowserThreadImpl类继承下来的。从前面的分析又可以知道,BrowserThreadImpl类又是从base::Thread类继承下来的。因此,Android应用程序中用来负责执行IPC的IO线程实际上是通过Thread类的成员函数StartWithOptions来创建,并且在创建的时候,指定创建的消息循环的类型为base::MessageLoop::TYPE_IO。
从前面分析的MessageLoop类的成员函数CreateMessagePumpForType的实现可以知道,类型为base::MessageLoop::TYPE_IO的消息循环使用的消息泵的通过类MessagePumpLibevent来描述。
MessagePumpLibevent类是从MessagePump类继承下来的,它的定义如下所示:
class BASE_EXPORT MessagePumpLibevent : public MessagePump {
public:
......
MessagePumpLibevent();
......
// MessagePump methods:
virtual void Run(Delegate* delegate) OVERRIDE;
virtual void Quit() OVERRIDE;
virtual void ScheduleWork() OVERRIDE;
virtual void ScheduleDelayedWork(const TimeTicks& delayed_work_time) OVERRIDE;
private:
......
// Libevent dispatcher. Watches all sockets registered with it, and sends
// readiness callbacks when a socket is ready for I/O.
event_base* event_base_;
// ... write end; ScheduleWork() writes a single byte to it
int wakeup_pipe_in_;
// ... read end; OnWakeup reads it and then breaks Run() out of its sleep
int wakeup_pipe_out_;
// ... libevent wrapper for read end
event* wakeup_event_;
......
};
这个类定义在文件external/chromium_org/base/message_loop/message_pump_libevent.h中。
MessagePumpLibevent类与前面分析的MessagePumpDefault类实现消息循环的最大区别是,前者通过Libevent实现线程睡眠与唤醒,而后者是通过条件变量实现线程睡眠与唤醒的。
Libevent在Android平台上实际上就是封装了由Linux内核提供的epoll机制。如果读过Android应用程序消息处理机制(Looper、Handler)分析这篇文章,Android应用程序使用的的消息循环机是基于epoll机制实现的。因此,Chromium里面的IO线程的消息循环机制与Android应用程序的消息循环机制的实现是很相似的。接下来我们就简单分析Chromium里面的IO线程的消息循环的实现,主要就是分析MessagePumpLibevent类的构造函数、成员函数Run和ScheduleWork的实现。
MessagePumpLibevent类的构造函数的实现如下所示:
MessagePumpLibevent::MessagePumpLibevent()
: .....
event_base_(event_base_new()),
wakeup_pipe_in_(-1),
wakeup_pipe_out_(-1) {
if (!Init())
NOTREACHED();
}
这个函数定义在文件external/chromium_org/base/message_loop/message_pump_libevent.cc中。
MessagePumpLibevent类的构造函数主要就是调用成员函数Init初始化Libevent,如下所示:
bool MessagePumpLibevent::Init() {
int fds[2];
if (pipe(fds)) {
......
return false;
}
......
wakeup_pipe_out_ = fds[0];
wakeup_pipe_in_ = fds[1];
wakeup_event_ = new event;
event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
OnWakeup, this);
event_base_set(event_base_, wakeup_event_);
if (event_add(wakeup_event_, 0))
return false;
return true;
}
这个函数定义在文件external/chromium_org/base/message_loop/message_pump_libevent.cc中。
MessagePumpLibevent类的成员函数Init首先是创创建一个管道,并且分别将该管道的读端和写端文件描述符保存在成员变量wakeup_pipe_out_和wakeup_pipe_in_中。
接下来,MessagePumpLibevent类的成员函数Init创建了一个Libevent里面的event,保存在成员变量wakeup_event_中,并且通过Libevent提供的函数event_set指定该event是用来监控文件描述wakeup_pipe_out_的EV_READ和EV_PERSIST事件的,同时指定上述事件发生时,就调用MessagePumpLibevent类的静态成员函数OnWakeup。这相当于是创建了一个epoll里面的epoll_event。
再接下来,MessagePumpLibevent类的成员函数Init通过Libevent提供的函数event_base_set创建了一个event_base。这相当于是通过epoll提供的函数epoll_create创建了一个epoll文件描述符。
最后,MessagePumpLibevent类的成员函数Init通过调用Libevent提供函数event_add将前面创建的event加入到前面创建的event_base里面去,以便可以对指定的IO事件进行监控。这相当于是调用了epoll提供的函数epoll_ctl。
接下来,我们继续分析MessagePumpLibevent类的成员函数Run的实现,如下所示:
void MessagePumpLibevent::Run(Delegate* delegate) {
......
scoped_ptr<event> timer_event(new event);
for (;;) {
......
bool did_work = delegate->DoWork();
if (!keep_running_)
break;
event_base_loop(event_base_, EVLOOP_NONBLOCK);
did_work |= processed_io_events_;
processed_io_events_ = false;
if (!keep_running_)
break;
did_work |= delegate->DoDelayedWork(&delayed_work_time_);
if (!keep_running_)
break;
if (did_work)
continue;
did_work = delegate->DoIdleWork();
if (!keep_running_)
break;
if (did_work)
continue;
// EVLOOP_ONCE tells libevent to only block once,
// but to service all pending events when it wakes up.
if (delayed_work_time_.is_null()) {
event_base_loop(event_base_, EVLOOP_ONCE);
} else {
TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
if (delay > TimeDelta()) {
struct timeval poll_tv;
poll_tv.tv_sec = delay.InSeconds();
poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
event_set(timer_event.get(), -1, 0, timer_callback, event_base_);
event_base_set(event_base_, timer_event.get());
event_add(timer_event.get(), &poll_tv);
event_base_loop(event_base_, EVLOOP_ONCE);
event_del(timer_event.get());
} else {
// It looks like delayed_work_time_ indicates a time in the past, so we
// need to call DoDelayedWork now.
delayed_work_time_ = TimeTicks();
}
}
}
}
这个函数定义在文件external/chromium_org/base/message_loop/message_pump_libevent.cc中。
与MessagePumpDefault类的成员函数Run相比,MessagePumpLibevent类的成员函数Run的执行流程是相似的,主要区别在于:
1. MessagePumpLibevent类通过调用Libevent提供的函数event_base_loop使得线程进入睡眠状态,而MessagePumpDefault类通过条件变量使得程进入睡眠状态。
2. MessagePumpLibevent除了用来监控消息队列有无新增消息之后,还用来监控指定的文件描述符的IO事件,如以下代码所示:
event_base_loop(event_base_, EVLOOP_NONBLOCK);
did_work |= processed_io_events_;
processed_io_events_ = false;
if (!keep_running_)
break;
这段代码调用Libevent提供的函数event_base_loop检查其它指定监控的文件描述是有IO事件发生。如果有发生的话,就调用它们指定的回调函数进行处理。注意,这里调用函数event_base_loop时,第二个参数指定为EVLOOP_NONBLOCK,表示在没有IO事件发生的情况下,不会阻塞当前线程的执行。
在调用函数event_base_loop时,如果第二个参数指定为EVLOOP_ONCE,则表示在没有IO事件发生的情况下,会阻塞当前线程的执行,直到有IO事件发生,或者指定的阻塞时间超时为止。这相当于是调用了epoll提供的函数epoll_wait。
接下来我们继续分析MessagePumpLibevent类的成员函数ScheduleWork的实现,如下所示:
void MessagePumpLibevent::ScheduleWork() {
// Tell libevent (in a threadsafe way) that it should break out of its loop.
char buf = 0;
int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
DCHECK(nwrite == 1 || errno == EAGAIN)
<< "[nwrite:" << nwrite << "] [errno:" << errno << "]";
}
这个函数定义在文件external/chromium_org/base/message_loop/message_pump_libevent.cc中。
MessagePumpLibevent类的成员函数ScheduleWork向成员变量wakeup_pipe_in_描述的管道写入一个字符,从前面分析的MessagePumpLibevent类的成员函数Init可以知道,这将会导致MessagePumpLibevent类的静态成员函数OnWakeup被调用,如下所示:
void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
MessagePumpLibevent* that = static_cast<MessagePumpLibevent*>(context);
DCHECK(that->wakeup_pipe_out_ == socket);
// Remove and discard the wakeup byte.
char buf;
int nread = HANDLE_EINTR(read(socket, &buf, 1));
DCHECK_EQ(nread, 1);
that->processed_io_events_ = true;
// Tell libevent to break out of inner loop.
event_base_loopbreak(that->event_base_);
}
这个函数定义在文件external/chromium_org/base/message_loop/message_pump_libevent.cc中。
MessagePumpLibevent类的静态成员函数OnWakeup首先是调用函数read将前面写入到管道的字符读取出来,并且调用Libevent提供的函数event_base_loopbreak使得MessagePumpLibevent类的成员函数Run可以从函数event_base_loop返回,以便可以去处理消息队列的消息。
前面提到,MessagePumpLibevent除了用来监控消息队列之外,还可以用来监控指定文件描述符的IO事件。这是通过调用MessagePumpLibevent类的成员函数WatchFileDescriptor实现的,如下所示:
bool MessagePumpLibevent::WatchFileDescriptor(int fd,
bool persistent,
int mode,
FileDescriptorWatcher *controller,
Watcher *delegate) {
......
int event_mask = persistent ? EV_PERSIST : 0;
if (mode & WATCH_READ) {
event_mask |= EV_READ;
}
if (mode & WATCH_WRITE) {
event_mask |= EV_WRITE;
}
scoped_ptr<event> evt(controller->ReleaseEvent());
if (evt.get() == NULL) {
// Ownership is transferred to the controller.
evt.reset(new event);
} else {
// Make sure we don't pick up any funky internal libevent masks.
int old_interest_mask = evt.get()->ev_events &
(EV_READ | EV_WRITE | EV_PERSIST);
// Combine old/new event masks.
event_mask |= old_interest_mask;
// Must disarm the event before we can reuse it.
event_del(evt.get());
if (EVENT_FD(evt.get()) != fd) {
......
return false;
}
}
// Set current interest mask and message pump for this event.
event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
// Tell libevent which message pump this socket will belong to when we add it.
if (event_base_set(event_base_, evt.get())) {
return false;
}
// Add this socket to the list of monitored sockets.
if (event_add(evt.get(), NULL)) {
return false;
}
// Transfer ownership of evt to controller.
controller->Init(evt.release());
controller->set_watcher(delegate);
controller->set_pump(this);
return true;
}
这个函数定义在文件external/chromium_org/base/message_loop/message_pump_libevent.cc中。
各个参数的含义如下所示:
1. fd:要监控其IO事件的文件描述符。
2. persistent:是否要持续监控参数mode描述的IO事件。
3. mode:具体的IO事件,例如读写事件等。
4. controller:指向一个负责接收IO事件通知的FileDescriptorWatcher对象。
5. delegate:指向一个Watcher对象,参数controller将接收到的IO事件转发给它处理。
MessagePumpLibevent类的成员函数WatchFileDescriptor首先是根据参数persistent和mode初始化好一个event_mask,接下来调用event_set设置一个代表IO监控事件的event时要用到。
MessagePumpLibevent类的成员函数WatchFileDescriptor接下来调用参数controller描述的一个FileDescriptorWatcher对象的成员函数ReleaseEvent检查其内部是否提供了一个event。如果没有提供,那么创建一个新的event,以便用来监控文件描述符fd的IO事件。如果有提供,则复用它。复用不仅仅是event对象本身,还包括该event对象原来设置的event_mask。同时,能够复用有一个前提,就是被复用的event关联的文件描述符必须要与参数fd描述的文件描述符一致。
MessagePumpLibevent类的成员函数WatchFileDescriptor接下来调用Libevent提供的函数event_set重新设置前面获得的event的属性,包括它要监控的文件描述符、要监控的具体IO事件、以及监控的IO事件发生时的回调函数等。从这里就可以看到,当文件描述符fd指定的IO事件发生时,MessagePumpLibevent类的静态成员函数OnLibeventNotification就会被调用,并且会获得参数controller指向的一个FileDescriptorWatcher对象。
MessagePumpLibevent类的成员函数WatchFileDescriptor接下来调用Libevent提供的函数event_base_set和event_add将前面已经设置好属性的event增加到成员变量event_base_描述的一个事件监控对象中去。
MessagePumpLibevent类的成员函数WatchFileDescriptor最后将用来描述IO事件监控的event、负责处理IO事件的Watcher和以及当前正在处理的一个MessagePumpLibevent对象设置到参数controller描述的一个FileDescriptorWatcher对象的内部去,以便该FileDescriptorWatcher对象在接收到IO事件通知时可以进行相应的处理。
接下来我们再看被监控的文件描述符发生指定的IO事件时的处理流程,即MessagePumpLibevent类的静态成员函数OnLibeventNotification的实现,如下所示:
void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
void* context) {
WeakPtr<FileDescriptorWatcher> controller =
static_cast<FileDescriptorWatcher*>(context)->weak_factory_.GetWeakPtr();
DCHECK(controller.get());
MessagePumpLibevent* pump = controller->pump();
pump->processed_io_events_ = true;
if (flags & EV_WRITE) {
controller->OnFileCanWriteWithoutBlocking(fd, pump);
}
// Check |controller| in case it's been deleted in
// controller->OnFileCanWriteWithoutBlocking().
if (controller.get() && flags & EV_READ) {
controller->OnFileCanReadWithoutBlocking(fd, pump);
}
}
这个函数定义在文件external/chromium_org/base/message_loop/message_pump_libevent.cc中。
从前面的分析可以知道,参数context指向的一个FileDescriptorWatcher对象,因此MessagePumpLibevent类的静态成员函数OnLibeventNotification首先是将它强制转化为一个FileDescriptorWatcher对象,并且获得一个引用了它的WeakPtr弱智能指针。关于WeakPtr弱智能指针的实现和使用方式,可以参考前面Chromium和WebKit的智能指针实现原理分析一文。
这里为什么要获得参数context指向的一个FileDescriptorWatcher对象的一个弱智能指针呢?这是因为后面调用它的成员函数OnFileCanWriteWithoutBlocking时,然后该成员函数OnFileCanWriteWithoutBlocking将IO事件分发给它内部的一个Watcher处理时,该FileDescriptorWatcher对象可能会被销毁。为了不阻止该FileDescriptorWatcher对象销毁,于是就使用WeakPtr弱智能指针引用它了。
MessagePumpLibevent类的静态成员函数OnLibeventNotification接下来调用前面获得的FileDescriptorWatcher对象的成员函数pump获得一个与它关联的MessagePumpLibevent对象,并且将它的成员变量processed_io_events_的值设置为true,表示关联的MessagePumpLibevent对象有新的IO事件需要处理。这个设置将会影响到前面分析的MessagePumpLibevent类的成员函数Run的运行,因为在这种情况下,MessagePumpLibevent类的成员函数Run不能够去处理延迟消息、也不能执行Idle Work以及进入睡眠等待状态,而是要马上重新执行一次for循环,以及检查有没有更多的需要马上处理IO事件需要处理。
MessagePumpLibevent类的静态成员函数OnLibeventNotification最后就通过参数flags检查具体发生的IO事件,并且执行相应的处理:
1. 如果发生的是写事件,那么就调用参数context指向的一个FileDescriptorWatcher对象的成员函数OnFileCanWriteWithoutBlocking进行处理。
2. 如果发生的是读事件,那么就调用参数context指向的一个FileDescriptorWatcher对象的成员函数OnFileCanReadWithoutBlocking进行处理。如上所述,前面在处理写事件的过程中,有可能参数context指向的一个FileDescriptorWatcher对象已经被销毁,因此,这里要先调用一下WeakPtr弱智能指针controller的成员函数get判断它是否真的已经被销毁。如果已经被销毁,那么就不需要调用它的成员函数OnFileCanReadWithoutBlocking了。
FileDescriptorWatcher类的成员函数OnFileCanWriteWithoutBlocking和OnFileCanReadWithoutBlocking的实现如下所示:
void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking(
int fd, MessagePumpLibevent* pump) {
// Since OnFileCanWriteWithoutBlocking() gets called first, it can stop
// watching the file descriptor.
if (!watcher_)
return;
pump->WillProcessIOEvent();
watcher_->OnFileCanReadWithoutBlocking(fd);
pump->DidProcessIOEvent();
}
void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking(
int fd, MessagePumpLibevent* pump) {
DCHECK(watcher_);
pump->WillProcessIOEvent();
watcher_->OnFileCanWriteWithoutBlocking(fd);
pump->DidProcessIOEvent();
}
这两个函数定义在文件external/chromium_org/base/message_loop/message_pump_libevent.cc中。
从这里就可以看到,FileDescriptorWatcher类的成员函数OnFileCanWriteWithoutBlocking和OnFileCanReadWithoutBlocking只是简单地将接收到的IO事件通知转发给成员变量watcher_描述的一个Watcher对象处理。在转发前后,它们也会分别调用MessagePumpLibevent类的成员函数WillProcessIOEvent和DidProcessIOEvent通知关联的MessagePumpLibevent对象将有IO事件被处理以及IO事件已处理完毕。
这里需要注意的一点是,如果一个文件描述符同时发生了读事件和写事件,那么如前所述,先处理写事件,再处理读事件。这样就有可能在处理写事件的时候,关联的FileDescriptorWatcher对象的成员变量watcher_指向的Watcher对象被销毁了,因此在处理读事件的时候,需要先判断成员变量watcher_的值是否为NULL。如果为NULL,那么就意味着它之前指向的Watcher对象被销毁了,于是就不用往下处理了。
以上就是通过MessagePumpLibevent类实现消息循环的原理,它与Android应用程序使用的消息循环的实现原理是一样的,因此这里我们并没有很深入地对它进行分析,例如没有深入到Libevent内部去分析,有兴趣的同学可以参考前面Android应用程序消息处理机制(Looper、Handler)分析一文。
IO线程的消息循环之所以要通过MessagePumpLibevent类来实现消息循环,是因为它的消息循环主要是用来监控一个负责执行IPC的UNIX Socket的,也就是说,Chromium的IPC是通过UNIX Socket进行的。这样当一个进程向另外一个进程发送消息时,就会触发使用的UNIX Socket发生IO事件,然后就会被IO线程的消息循环监控到,最后就可以得到处理。
至此,关于Chromium的线程消息循环我们就分析完毕,但是关于消息发送,还有一些特性值得进一步分析,主要是关于消息的发送接口的。前面我们分析消息发送接口都是通过MessageLoop提供的。也就是说,在往一个线程的消息队列发送消息之前,我们首先要获得这个线程的消息循环,这是通过调用Thread类的成员函数message_loop获得的,如下所示:
class BASE_EXPORT Thread : PlatformThread::Delegate {
public:
......
MessageLoop* message_loop() const { return message_loop_; }
......
};
这个函数定义在文件external/chromium_org/base/threading/thread.h中。
通过调用Thread类的成员函数message_loop直接获取线程的关联的MessageLoop对象会有一个问题,我们以后通过该MessageLoop对象发送消息时,不能保证该MessageLoop对象是有效的,因为线程有可能退出了,这会导致其关联的MessageLoop对象被销毁了。
因此,我们需要有一种机制,即使是线程退出了,我们也可以继续持有一个消息发送接口。该消息发送接口能够保证,如果线程还没有退出,那么就能正常地向它发送消息。另一方面,如果线程已经退出,那么最多就是执行一空操作,但是不会造成非法内存访问。
学习过Chromium和WebKit的智能指针实现原理分析这篇文章之后,我们很容易想到,可以通过scoped_refptr智能指针来实现这种机制。Thread类提供了一个成员函数message_loop_proxy,可以获得线程的一个消息发送代理接口,即一个MessageLoopProxy接口,如下所示:
class BASE_EXPORT Thread : PlatformThread::Delegate {
public:
......
scoped_refptr<MessageLoopProxy> message_loop_proxy() const {
return message_loop_ ? message_loop_->message_loop_proxy() : NULL;
}
......
};
这个函数定义在文件external/chromium_org/base/threading/thread.h中。
这个MessageLoopProxy接口通过scoped_refptr智能指针引用,因此就能保证它还在使用的时候,不会被销毁,即使线程已经退出,这样就能够避免非法内存访问。
接下来我们就继续分析MessageLoopProxy接口是如何实现的。从Thread类的成员函数message_loop_proxy可以知道,它返回给调用者的MessageLoopProxy接口是通过成员变量message_loop_指向的一个MessageLoop对象的成员函数message_loop_proxy获得的。
MessageLoop类的成员函数message_loop_proxy的实现如下所示:
class BASE_EXPORT MessageLoop : public MessagePump::Delegate {
public:
......
scoped_refptr<MessageLoopProxy> message_loop_proxy() {
return message_loop_proxy_;
}
......
private:
......
scoped_refptr<internal::MessageLoopProxyImpl> message_loop_proxy_;
......
};
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.h中。
MessageLoop类的成员函数message_loop_proxy返回的是成员变量message_loop_proxy_指向的一个MessageLoopProxyImpl对象。从前面的分析可以知道,这个成员变量是在MessageLoop类的成员函数Init中初始化的,如下所示:
void MessageLoop::Init() {
......
incoming_task_queue_ = new internal::IncomingTaskQueue(this);
message_loop_proxy_ =
new internal::MessageLoopProxyImpl(incoming_task_queue_);
......
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
MessageLoop类的成员函数Init首先创建了一个用来描述线程消息队列的一个IncomingTaskQueue,然后再根据这个IncomingTaskQueue创建了一个MessageLoopProxyImpl对象,并且保存在成员变量message_loop_proxy_中。
MessageLoopProxyImpl对象的创建过程如下所示:
MessageLoopProxyImpl::MessageLoopProxyImpl(
scoped_refptr<IncomingTaskQueue> incoming_queue)
: incoming_queue_(incoming_queue),
valid_thread_id_(PlatformThread::CurrentId()) {
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop_proxy_impl.cc中。
MessageLoopProxyImpl类的构造函数主要就是将参数incoming_queue指向的一个IncomingTaskQueue对象保存在成员变量incoming_queue_中。注意,MessageLoopProxyImpl类的成员变量incoming_queue_是一个scoped_refptr智能指针,因此即使它所属的线程退出了,它所引用的IncomingTaskQueue对象仍然是存在的。
MessageLoopProxyImpl类的继承关系如图4所示:
图4 MessageLoopProxyImpl类继承图
从图4可以看到,MessageLoopProxyImpl类从TaskRunner类一路继承下来。TaskRunner类定义了PostTask和PostDelayedTask两个接口。此外,SequencedTaskRunner类又定义了PostNonNestableTask和PostNonNestableDelayedTask两个接口。MessageLoopProxyImpl类本身重写了父类TaskRunner的PostDelayedTask接口以及SequencedTaskRunner类的PostNonNestableDelayedTask接口。这样就使得MessageLoopProxyImpl类像MessageLoop类一样,具有PostTask、PostDelayedTask、PostNonNestableTask和PostNonNestableDelayedTask四个消息发送接口。
TaskRunner类的成员函数PostTask的实现如下所示:
bool TaskRunner::PostTask(const tracked_objects::Location& from_here,
const Closure& task) {
return PostDelayedTask(from_here, task, base::TimeDelta());
}
这个函数定义在文件external/chromium_org/base/task_runner.cc中。
从这里就可以看到,TaskRunner类的成员函数PostTask最终通过调用由子类MessageLoopProxyImpl重写的接口PostDelayedTask来向线程的消息队列发送消息。
SequencedTaskRunner类的成员函数PostNonNestableTask的实现如下所示:
bool SequencedTaskRunner::PostNonNestableTask(
const tracked_objects::Location& from_here,
const Closure& task) {
return PostNonNestableDelayedTask(from_here, task, base::TimeDelta());
}
这个函数定义在文件external/chromium_org/base/sequenced_task_runner.cc中。
从这里也可以看到,SequencedTaskRunner类的成员函数PostNonNestableTask最终通过调用由子类MessageLoopProxyImpl重写的接口PostNonNestableDelayedTask来向线程的消息队列发送消息。
因此,无论我们调用MessageLoopProxyImpl类的哪一个消息发送接口,最终都归结为调用PostDelayedTask和PostNonNestableDelayedTask这两个接口,它们的实现如下所示:
bool MessageLoopProxyImpl::PostDelayedTask(
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay) {
DCHECK(!task.is_null()) << from_here.ToString();
return incoming_queue_->AddToIncomingQueue(from_here, task, delay, true);
}
bool MessageLoopProxyImpl::PostNonNestableDelayedTask(
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay) {
DCHECK(!task.is_null()) << from_here.ToString();
return incoming_queue_->AddToIncomingQueue(from_here, task, delay, false);
}
这两个函数定义在文件external/chromium_org/base/message_loop/message_loop_proxy_impl.cc中。
MessageLoopProxyImpl类的成员函数PostDelayedTask和PostNonNestableDelayedTask都是通过调用成员变量imcoming_queue_指向的一个IncomingTaskQueue对象的成员函数AddToIncomingQueue向线程的消息队列发送消息。
从前面的分析可以知道,IncomingTaskQueue类的成员函数AddToIncomingQueue最终调用了另外一个成员函数PostPendingTask向线程的消息队列发送消息,如下所示:
bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
......
// This should only be called while the lock is taken.
incoming_queue_lock_.AssertAcquired();
if (!message_loop_) {
pending_task->task.Reset();
return false;
}
......
bool was_empty = incoming_queue_.empty();
incoming_queue_.push(*pending_task);
pending_task->task.Reset();
// Wake up the pump.
message_loop_->ScheduleWork(was_empty);
return true;
}
这个函数定义在文件external/chromium_org/base/message_loop/incoming_task_queue.cc中。
我们注意到,IncomingTaskQueue类的成员函数PostPendingTask在将消息添加到线程的消息队列之前,首先会判断线程的消息循环是否还存在,即判断成员变量message_loop_的值是否等于NULL。如果等于NULL,那么就说明线程已经退出了,这时候就什么也不做就返回了。
IncomingTaskQueue类的成员变量message_loop_是在构造函数中初始化的,如下所示:
IncomingTaskQueue::IncomingTaskQueue(MessageLoop* message_loop)
: message_loop_(message_loop),
...... {
}
这个函数定义在文件external/chromium_org/base/message_loop/incoming_task_queue.cc中。
现在的重点问题就是,IncomingTaskQueue类的成员变量message_loop_是什么时候被设置为NULL的呢?也就是它是怎么知道线程退出的呢?
当线程退出时,MessageLoop对象会被销毁,这时候它的析构函数会被调用,如下所示:
MessageLoop::~MessageLoop() {
......
// Tell the incoming queue that we are dying.
incoming_task_queue_->WillDestroyCurrentMessageLoop();
......
}
这个函数定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
MessageLoop类的析构函数会调用成员变量incoming_task_queue_指向的一个IncomingTaskQueue对象的成员函数WillDestroyCurrentMessageLoop通知它线程的消息循环要被销毁了。
IncomingTaskQueue类的成员函数WillDestroyCurrentMessageLoop的实现如下所示:
void IncomingTaskQueue::WillDestroyCurrentMessageLoop() {
......
AutoLock lock(incoming_queue_lock_);
message_loop_ = NULL;
}
这个函数定义在文件external/chromium_org/base/message_loop/incoming_task_queue.cc中。
这时候IncomingTaskQueue类的成员变量message_loop_就会被设置为NULL,这样以后再调用IncomingTaskQueue类的成员函数AddToIncomingQueue就再无法向线程的消息队列发送消息了。
从图4还可以看到,我们除了可以使用MessageLoopProxy接口向线程的消息队列发送消息之外,还可以通过SingleThreadTaskRunner、SequencedTaskRunner和TaskRunner接口向线程的消息队列发送消息,这一类接口统称为TaskRunner接口。
TaskRunner是一个用来执行异步任务接口,我们通过它的成员函数PostTask和PostDelayedTask等可以将一个Closure发送给一个线程或者一个线程池执行。由于TaskRunner可能会将不同的Closure交给不同的线程执行,因此不能保证交给它的Closure的执行顺序。TaskRunner唯一能保证的是它不会同步执行交给它的Closure,也就是不会直接调用Closure的成员函数Run。
SequencedTaskRunner是从TaskRunner继承下来的,但是它比TaskRunner多出一个额外的保证,就是交给它的Closure是按照一定顺序执行的,不会出现两个Closure同时执行的情况。例如,给出两个Closure1和Closure2,如果满足以下三个条件,则能保证Closure2在Closure1之后执行:
1. Closure1比Closure2先Post给SequencedTaskRunner;
2. Closure1指定的执行时间点小于等于Closure2指定的执行时间点;
3. Closure1可嵌套消息循环中执行或者Closure2不可嵌套消息循环中执行。
保证Closure2在Closure1之后执行,是说Closure1执行完成之后,才执行Closure2,而不只是说Closure1的开始执行时间点比Closure2的开始执行时间点早。
SingleThreadTaskRunner是从SequencedTaskRunner继承下来的,但是它比SequencedTaskRunner又多出一个额外的保证,就是交给它的Closure都是由同一个线程执行的,不会出现一个Closure是在一个线程执行,另一个Closure是在另外一个线程执行的情况。
我们发现一个带消息循环的线程完全能够满足SingleThreadTaskRunner接口的要求,那么我们如何获得一个线程的SingleThreadTaskRunner接口呢?
MessageLoop类有一个成员变量thread_task_runnerhandle,如下所示:
class BASE_EXPORT MessageLoop : public MessagePump::Delegate {
......
private:
......
scoped_ptr<ThreadTaskRunnerHandle> thread_task_runner_handle_;
......
};
这个成员变量定义在文件external/chromium_org/base/message_loop/message_loop.h中。
MessageLoop类的成员变量thread_task_runner_handle_是在前面我们分析过的MessageLoop类的成员函数Init初始化的,如下所示:
void MessageLoop::Init() {
......
incoming_task_queue_ = new internal::IncomingTaskQueue(this);
message_loop_proxy_ =
new internal::MessageLoopProxyImpl(incoming_task_queue_);
thread_task_runner_handle_.reset(
new ThreadTaskRunnerHandle(message_loop_proxy_));
}
这个成员变量定义在文件external/chromium_org/base/message_loop/message_loop.cc中。
MessageLoop类的成员函数Init根据前面创建出来的MessageLoopProxyImpl对象创建了一个ThreadTaskRunnerHandle对象。
一个ThreadTaskRunnerHandle对象的创建过程如下所示:
base::LazyInstance<base::ThreadLocalPointer<ThreadTaskRunnerHandle> >
lazy_tls_ptr = LAZY_INSTANCE_INITIALIZER;
......
ThreadTaskRunnerHandle::ThreadTaskRunnerHandle(
const scoped_refptr<SingleThreadTaskRunner>& task_runner)
: task_runner_(task_runner) {
......
lazy_tls_ptr.Pointer()->Set(this);
}
这个函数定义在文件external/chromium_org/base/thread_task_runner_handle.cc中。
从图4可以知道,MessageLoopProxyImpl类是从SingleThreadTaskRunner类继承下来的,因此ThreadTaskRunnerHandle类的构造函数可以接受一个MessageLoopProxyImpl对象作为参数。
ThreadTaskRunnerHandle类的构造函数做了两件事情。第一件事情就是将参数task_runner描述的一个SingleThreadTaskRunner对象保存在成员变量task_runner_中。第二件事情就是将正在创建的ThreadTaskRunnerHandle对象保存线程局部存储变量lazy_tls_ptr中。
ThreadTaskRunnerHandle类还提供了一个静态成员函数Get,用来获得保存在线程局部存储变量lazy_tls_ptr的一个ThreadTaskRunnerHandle对象的成员变量task_runner_描述的一个SingleThreadTaskRunner对象,如下所示:
scoped_refptr<SingleThreadTaskRunner> ThreadTaskRunnerHandle::Get() {
ThreadTaskRunnerHandle* current = lazy_tls_ptr.Pointer()->Get();
......
return current->task_runner_;
}
这个函数定义在文件external/chromium_org/base/thread_task_runner_handle.cc中。
这样我们就可以获得一个带消息循环的线程的SingleThreadTaskRunner接口了,这个接口指向的实际上是一个MessageLoopProxyImpl对象,因此最终实际上是通过前面分析的MessageLoopProxyImpl接口来往线程发送消息。
从图4还可以知道,TaskRunner接口提供了一个成员函数PostTaskAndReply,如下所示:
class BASE_EXPORT TaskRunner
: public RefCountedThreadSafe<TaskRunner, TaskRunnerTraits> {
public:
......
bool PostTaskAndReply(const tracked_objects::Location& from_here,
const Closure& task,
const Closure& reply);
......
};
这个函数声明在文件external/chromium_org/base/task_runner.h中。
从TaskRunner类的成员函数PostTaskAndReply的声明可以推断出,它用来向一个目标线程请求异步执行一个任务task,并且当该任务执行完成时,向发出请求的线程发送一个reply。它实现的功能正好就是我们在前面图1所描述的线程双向异步通信机制。接下来我们就分析它是如何实现的。
TaskRunner类的成员函数PostTaskAndReply的实现如下所示:
bool TaskRunner::PostTaskAndReply(
const tracked_objects::Location& from_here,
const Closure& task,
const Closure& reply) {
return PostTaskAndReplyTaskRunner(this).PostTaskAndReply(
from_here, task, reply);
}
这个函数定义在文件external/chromium_org/base/task_runner.cc中。
TaskRunner类的成员函数PostTaskAndReply首先创建了一个PostTaskAndReplyTaskRunner对象,接着调用这个PostTaskAndReplyTaskRunner对象的成员函数 PostTaskAndReply来实现双向异步通信机制。
PostTaskAndReplyTaskRunner对象的创建过程如下所示:
PostTaskAndReplyTaskRunner::PostTaskAndReplyTaskRunner(
TaskRunner* destination) : destination_(destination) {
DCHECK(destination_);
这个函数定义在文件external/chromium_org/base/task_runner.cc中。
PostTaskAndReplyTaskRunner类的构造函数将参数destination描述的一个TaskRunner对象保存在成员变量destination_中。
PostTaskAndReplyTaskRunner类是从PostTaskAndReplyImpl类继承下来的,并且它的成员函数PostTaskAndReply也是从PostTaskAndReplyImpl类继承下来的,因此前面调用PostTaskAndReplyTaskRunner类的成员函数PostTaskAndReply实际上调用的是PostTaskAndReplyImpl类的成员函数PostTaskAndReply。
PostTaskAndReplyImpl类的成员函数PostTaskAndReply的实现如下所示:
bool PostTaskAndReplyImpl::PostTaskAndReply(
const tracked_objects::Location& from_here,
const Closure& task,
const Closure& reply) {
PostTaskAndReplyRelay* relay =
new PostTaskAndReplyRelay(from_here, task, reply);
if (!PostTask(from_here, Bind(&PostTaskAndReplyRelay::Run,
Unretained(relay)))) {
delete relay;
return false;
}
return true;
}
这个函数定义在文件external/chromium_org/base/threading/post_task_and_reply_impl.cc中。
PostTaskAndReplyImpl类的成员函数PostTaskAndReply先将参数from_here、task和reply封装在一个PostTaskAndReplyRelay对象中,然后再将调用函数Bind创建一个Closure,并且通过调用由子类PostTaskAndReplyTaskRunner实现的成员函数PostTask执行该Closure,注意,这个Closure绑定的是PostTaskAndReplyRelay类的成员函数Run。
PostTaskAndReplyRelay对象的创建过程如下所示:
class PostTaskAndReplyRelay {
public:
PostTaskAndReplyRelay(const tracked_objects::Location& from_here,
const Closure& task, const Closure& reply)
: from_here_(from_here),
origin_loop_(ThreadTaskRunnerHandle::Get()) {
task_ = task;
reply_ = reply;
}
......
private:
tracked_objects::Location from_here_;
scoped_refptr<SingleThreadTaskRunner> origin_loop_;
Closure reply_;
Closure task_;
};
这个函数定义在文件external/chromium_org/base/threading/post_task_and_reply_impl.cc中。
PostTaskAndReplyRelay类的构造函数主要就是将参数from_here、task和reply描述的对象分别保存在成员变量fromhere、task_和reply_中。另外,它还会通过我们前面分析过的ThreadTaskRunnerHandle类的静态成员函数Get获得一个SingleThreadTaskRunner对象,并且保存在成员变量origin_loop_中。注意,这个SingleThreadTaskRunner对象是从当前线程获得的,也就是调用了TaskRunner类的成员函数PostTaskAndReply的线程。
PostTaskAndReplyTaskRunner类的成员函数PostTask的实现如下所示:
bool PostTaskAndReplyTaskRunner::PostTask(
const tracked_objects::Location& from_here,
const Closure& task) {
return destination_->PostTask(from_here, task);
}
这个函数定义在文件external/chromium_org/base/task_runner.cc中。
从前面的分析可以知道,PostTaskAndReplyTaskRunner类的成员变量destination_指向的一个TaskRunner接口。假设这个TaskRunner接口描述的是一个从带消息循环的线程的获得的MessageLoopProxyImpl对象,那么PostTaskAndReplyTaskRunner类的成员函数PostTask就是通过该MessageLoopProxyImpl对象向线程的消息队列发送参数task描述的一个消息。
由于参数task指向的是一个Closure对象,并且它绑定的是PostTaskAndReplyRelay类的成员函数Run,因此当上述消息被处理时,PostTaskAndReplyRelay类的成员函数Run就会被调用。
PostTaskAndReplyRelay类的成员函数Run的实现如下所示:
class PostTaskAndReplyRelay {
public:
......
void Run() {
task_.Run();
origin_loop_->PostTask(
from_here_,
Bind(&PostTaskAndReplyRelay::RunReplyAndSelfDestruct,
base::Unretained(this)));
}
......
}
这个函数定义在文件external/chromium_org/base/threading/post_task_and_reply_impl.cc中。
PostTaskAndReplyRelay类的成员函数Run调用了成员变量task_指向的一个Closure对象的成员函数Run。从前面的分析可以知道,该Closure对象就是最初调用TaskRunner类的成员函数PostTaskAndReply所要执行的Task。
执行完成成员变量task_指向的一个Closure之后,接下来PostTaskAndReplyRelay类的成员函数Run接下来向最初调用了TaskRunner类的成员函数PostTaskAndReply的线程发送一个Closure,该Closure绑定的是当前正在处理的PostTaskAndReplyRelay对象的成员函数RunReplyAndSelfDestruct。
PostTaskAndReplyRelay类的成员函数RunReplyAndSelfDestruct的实现如下所示:
class PostTaskAndReplyRelay {
......
private:
void RunReplyAndSelfDestruct() {
DCHECK(origin_loop_->BelongsToCurrentThread());
// Force |task_| to be released before |reply_| is to ensure that no one
// accidentally depends on |task_| keeping one of its arguments alive while
// |reply_| is executing.
task_.Reset();
reply_.Run();
// Cue mission impossible theme.
delete this;
}
......
}
这个函数定义在文件external/chromium_org/base/threading/post_task_and_reply_impl.cc中。
PostTaskAndReplyRelay类的成员函数RunReplyAndSelfDestruct所做的事情就是在最初调用了TaskRunner类的成员函数PostTaskAndReply的线程中执行成员变量reply_描述的一个Closure对象。从前面的分析可以知道,该Closure对象就是最初用TaskRunner类的成员函数PostTaskAndReply时指定的第三个参数reply所描述的Closure对象。
最后,PostTaskAndReplyRelay类的成员函数RunReplyAndSelfDestruct将当前正在处理的PostTaskAndReplyRelay对象销毁掉。这样,一个双向的异步通信就执行完成了。
至此,我们就分析完成Chromium的线程消息循环和消息发送机制了。Chromium的多线程模型正是基于这种线程消息循环和消息发送机制设计和实现的,其最大的特点是一切皆异步通信,从而提高各个线程,特别是UI线程的响应性,从而让用户觉得Chromium加载网页的速度很快。
最后,我们分别对Chromium在Android平台实现的线程消息循环和消息发送作一个简要的总结。
Chromium的线程消息循环根据不同的线程具有不同的实现,具体来说,就是:
1. UI线程和Java线程的消息循环是通过Java层的消息循环实现的,也就是通过Android应用程序使用的消息循环实现的。
2. IO线程的消息循环是基于Libevent实现的,也就是通过epoll实现的,这是因为IO线程主要是用来执行IPC,而这种IPC是通过UNIX Socket实现的,这意味IO线程的消息循环主要用来监控UNIX Socket文件描述符的,因此就适合使用epoll来实现。
3. 其它类型的线程的消息循环是基于条件变量实现的。
Chromium的线程消息发送可以通过以下三种接口实现:
1. SingleThreadTaskRunner、SequencedTaskRunner和TaskRunner,这三个接口是比MessageLoopProxy和MessageLoop更一般的接口,因为它们不关心负责处理消息的线程是如何实现的。
2. MessageLoopProxy,这个接口比MessageLoop更好用,因为消息的发送者可以一直持有该接口,而不用关心该接口所关联的线程是否已经退出。
3. MessageLoop,这个接口要求使用者确保它所关联的线程是否已经退出,如果已经退出,那么是不可以使用的。
理解Chromium的线程消息循环和消息发送机制对理解Chromium的多线程模型非常重要,而在Chromium的源码里大量地使用了这些消息循环和消息处理机制。例如,我们接下来的文章中分析的Chromium多进程通信机制就是通过上面提到的IO线程实现的。敬请关注!更多的信息也可以关注老罗的新浪微博:http://weibo.com/shengyangluo。
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8