详解Python的Twisted框架中reactor事件管理器的用法

1023次阅读  |  发布于5年以前

铺垫
在大量的实践中,似乎我们总是通过类似的方式来使用异步编程:

因此我们将这样的异步模式称为Reactor模式,例如在iOS开发中的Run Loop概念,实际上非常类似于Reactor loop,主线程的Run Loop监听屏幕UI事件,一旦发生UI事件则执行对应的事件处理代码,还可以通过GCD等方式产生事件至主线程执行。

2016525114543434.png \(524×364\)

上图是boost对Reactor模式的描绘,Twisted的设计就是基于这样的Reactor模式,Twisted程序就是在等待事件、处理事件的过程中不断循环。


    from twisted.internet import reactor
    reactor.run()

reactor是Twisted程序中的单例对象。

reactor
reactor是事件管理器,用于注册、注销事件,运行事件循环,当事件发生时调用回调函数处理。关于reactor有下面几个结论:

最后一条需要解释清楚。在Twisted中,reactor是Singleton(也就是单例模式),即在一个程序中只能有一个reactor,并且只要你引入它就相应地创建一个。上面引入的方式这是twisted默认使用的方法,当然了,twisted还有其它可以引入reactor的方法。例如,可以使用twisted.internet.pollreactor中的系统调用来poll来代替select方法。

若使用其它的reactor,需要在引入twisted.internet.reactor前安装它。下面是安装pollreactor的方法:


    from twisted.internet import pollreactor
    pollreactor.install()

如果你没有安装其它特殊的reactor而引入了twisted.internet.reactor,那么Twisted会根据操作系统安装默认的reactor。正因为如此,习惯性做法不要在最顶层的模块内引入reactor以避免安装默认reactor,而是在你要使用reactor的区域内安装。
下面是使用 pollreactor重写上上面的程序:


    from twited.internet import pollreactor
    pollreactor.install()
    from twisted.internet import reactor
    reactor.run()

那么reactor是如何实现单例的?来看一下from twisted.internet import reactor做了哪些事情就并明白了。

下面是twisted/internet/reactor.py的部分代码:


    # twisted/internet/reactor.py
    import sys
    del sys.modules['twisted.internet.reactor']
    from twisted.internet import default
    default.install()

注:Python中所有加载到内存的模块都放在sys.modules,它是一个全局字典。当import一个模块时首先会在这个列表中查找是否已经加载了此模块,如果加载了则只是将模块的名字加入到正在调用import的模块的命名空间中。如果没有加载则从sys.path目录中按照模块名称查找模块文件,找到后将模块载入内存,并加入到sys.modules中,并将名称导入到当前的命名空间中。

假如我们是第一次运行from twisted.internet import reactor,因为sys.modules中还没有twisted.internet.reactor,所以会运行reactory.py中的代码,安装默认的reactor。之后,如果导入的话,因为sys.modules中已存在该模块,所以会直接将sys.modules中的twisted.internet.reactor导入到当前命名空间。

default中的install:


    # twisted/internet/default.py
    def _getInstallFunction(platform):
      """
      Return a function to install the reactor most suited for the given platform.

      @param platform: The platform for which to select a reactor.
      @type platform: L{twisted.python.runtime.Platform}

      @return: A zero-argument callable which will install the selected
        reactor.
      """
      try:
        if platform.isLinux():
          try:
            from twisted.internet.epollreactor import install
          except ImportError:
            from twisted.internet.pollreactor import install
        elif platform.getType() == 'posix' and not platform.isMacOSX():
          from twisted.internet.pollreactor import install
        else:
          from twisted.internet.selectreactor import install
      except ImportError:
        from twisted.internet.selectreactor import install
      return install


    install = _getInstallFunction(platform)

很明显,default中会根据平台获取相应的install。Linux下会首先使用epollreactor,如果内核还不支持,就只能使用pollreactor。Mac平台使用pollreactor,windows使用selectreactor。每种install的实现差不多,这里我们抽取selectreactor中的install来看看。


    # twisted/internet/selectreactor.py:
    def install():
      """Configure the twisted mainloop to be run using the select() reactor.
      """
      # 单例
      reactor = SelectReactor()
      from twisted.internet.main import installReactor
      installReactor(reactor)

    # twisted/internet/main.py:
    def installReactor(reactor):
      """
      Install reactor C{reactor}.

      @param reactor: An object that provides one or more IReactor* interfaces.
      """
      # this stuff should be common to all reactors.
      import twisted.internet
      import sys
      if 'twisted.internet.reactor' in sys.modules:
        raise error.ReactorAlreadyInstalledError("reactor already installed")
      twisted.internet.reactor = reactor
      sys.modules['twisted.internet.reactor'] = reactor

在installReactor中,向sys.modules添加twisted.internet.reactor键,值就是再install中创建的单例reactor。以后要使用reactor,就会导入这个单例了。


    SelectReactor
    # twisted/internet/selectreactor.py
    @implementer(IReactorFDSet)
    class SelectReactor(posixbase.PosixReactorBase, _extraBase)

implementer表示SelectReactor实现了IReactorFDSet接口的方法,这里用到了zope.interface,它是python中的接口实现,有兴趣的同学可以去看下。

IReactorFDSet接口主要对描述符的获取、添加、删除等操作的方法。这些方法看名字就能知道意思,所以我就没有加注释。


    # twisted/internet/interfaces.py
    class IReactorFDSet(Interface):

      def addReader(reader):

      def addWriter(writer):

      def removeReader(reader):

      def removeWriter(writer):

      def removeAll():

      def getReaders():

      def getWriters():
    reactor.listenTCP()

示例中的reactor.listenTCP()注册了一个监听事件,它是父类PosixReactorBase中方法。


    # twisted/internet/posixbase.py
    @implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
    class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
                ReactorBase):

      def listenTCP(self, port, factory, backlog=50, interface=''):
        p = tcp.Port(port, factory, backlog, interface, self)
        p.startListening()
        return p

    # twisted/internet/tcp.py
    @implementer(interfaces.IListeningPort)
    class Port(base.BasePort, _SocketCloser):
      def __init__(self, port, factory, backlog=50, interface='', reactor=None):
        """Initialize with a numeric port to listen on.
        """
        base.BasePort.__init__(self, reactor=reactor)
        self.port = port
        self.factory = factory
        self.backlog = backlog
        if abstract.isIPv6Address(interface):
          self.addressFamily = socket.AF_INET6
          self._addressType = address.IPv6Address
        self.interface = interface
      ...

      def startListening(self):
        """Create and bind my socket, and begin listening on it.
         创建并绑定套接字,开始监听。

        This is called on unserialization, and must be called after creating a
        server to begin listening on the specified port.
        """
        if self._preexistingSocket is None:
          # Create a new socket and make it listen
          try:
            # 创建套接字
            skt = self.createInternetSocket()
            if self.addressFamily == socket.AF_INET6:
              addr = _resolveIPv6(self.interface, self.port)
            else:
              addr = (self.interface, self.port)
            # 绑定
            skt.bind(addr)
          except socket.error as le:
            raise CannotListenError(self.interface, self.port, le)
          # 监听
          skt.listen(self.backlog)
        else:
          # Re-use the externally specified socket
          skt = self._preexistingSocket
          self._preexistingSocket = None
          # Avoid shutting it down at the end.
          self._shouldShutdown = False

        # Make sure that if we listened on port 0, we update that to
        # reflect what the OS actually assigned us.
        self._realPortNumber = skt.getsockname()[1]

        log.msg("%s starting on %s" % (
            self._getLogPrefix(self.factory), self._realPortNumber))

        # The order of the next 5 lines is kind of bizarre. If no one
        # can explain it, perhaps we should re-arrange them.
        self.factory.doStart()
        self.connected = True
        self.socket = skt
        self.fileno = self.socket.fileno
        self.numberAccepts = 100

        # startReading调用reactor的addReader方法将Port加入读集合
        self.startReading()

整个逻辑很简单,和正常的server端一样,创建套接字、绑定、监听。不同的是将套接字的描述符添加到了reactor的读集合。那么假如有了client连接过来的话,reactor会监控到,然后触发事件处理程序。

reacotr.run()事件主循环


    # twisted/internet/posixbase.py
    @implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
    class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
                ReactorBase)

    # twisted/internet/base.py
    class _SignalReactorMixin(object):

      def startRunning(self, installSignalHandlers=True):
        """
        PosixReactorBase的父类_SignalReactorMixin和ReactorBase都有该函数,但是
        _SignalReactorMixin在前,安装mro顺序的话,会先调用_SignalReactorMixin中的。
        """
        self._installSignalHandlers = installSignalHandlers
        ReactorBase.startRunning(self)

      def run(self, installSignalHandlers=True):
        self.startRunning(installSignalHandlers=installSignalHandlers)
        self.mainLoop()

      def mainLoop(self):
        while self._started:
          try:
            while self._started:
              # Advance simulation time in delayed event
              # processors.
              self.runUntilCurrent()
              t2 = self.timeout()
              t = self.running and t2
              # doIteration是关键,select,poll,epool实现各有不同
              self.doIteration(t)
          except:
            log.msg("Unexpected error in main loop.")
            log.err()
          else:
            log.msg('Main loop terminated.')

mianLoop就是最终的主循环了,在循环中,调用doIteration方法监控读写描述符的集合,一旦发现有描述符准备好读写,就会调用相应的事件处理程序。


    # twisted/internet/selectreactor.py
    @implementer(IReactorFDSet)
    class SelectReactor(posixbase.PosixReactorBase, _extraBase):

      def __init__(self):
        """
        Initialize file descriptor tracking dictionaries and the base class.
        """
        self._reads = set()
        self._writes = set()
        posixbase.PosixReactorBase.__init__(self)

      def doSelect(self, timeout):
        """
        Run one iteration of the I/O monitor loop.

        This will run all selectables who had input or output readiness
        waiting for them.
        """
        try:
          # 调用select方法监控读写集合,返回准备好读写的描述符
          r, w, ignored = _select(self._reads,
                      self._writes,
                      [], timeout)
        except ValueError:
          # Possibly a file descriptor has gone negative?
          self._preenDescriptors()
          return
        except TypeError:
          # Something *totally* invalid (object w/o fileno, non-integral
          # result) was passed
          log.err()
          self._preenDescriptors()
          return
        except (select.error, socket.error, IOError) as se:
          # select(2) encountered an error, perhaps while calling the fileno()
          # method of a socket. (Python 2.6 socket.error is an IOError
          # subclass, but on Python 2.5 and earlier it is not.)
          if se.args[0] in (0, 2):
            # windows does this if it got an empty list
            if (not self._reads) and (not self._writes):
              return
            else:
              raise
          elif se.args[0] == EINTR:
            return
          elif se.args[0] == EBADF:
            self._preenDescriptors()
            return
          else:
            # OK, I really don't know what's going on. Blow up.
            raise

        _drdw = self._doReadOrWrite
        _logrun = log.callWithLogger
        for selectables, method, fdset in ((r, "doRead", self._reads),
                          (w,"doWrite", self._writes)):
          for selectable in selectables:
            # if this was disconnected in another thread, kill it.
            # ^^^^ --- what the !@#*? serious! -exarkun
            if selectable not in fdset:
              continue
            # This for pausing input when we're not ready for more.

            # 调用_doReadOrWrite方法
            _logrun(selectable, _drdw, selectable, method)

      doIteration = doSelect

      def _doReadOrWrite(self, selectable, method):
        try:
          # 调用method,doRead或者是doWrite,
          # 这里的selectable可能是我们监听的tcp.Port
          why = getattr(selectable, method)()
        except:
          why = sys.exc_info()[1]
          log.err()
        if why:
          self._disconnectSelectable(selectable, why, method=="doRead")

那么假如客户端有连接请求了,就会调用读集合中tcp.Port的doRead方法。


    # twisted/internet/tcp.py

    @implementer(interfaces.IListeningPort)
    class Port(base.BasePort, _SocketCloser):

      def doRead(self):
        """Called when my socket is ready for reading.
        当套接字准备好读的时候调用

        This accepts a connection and calls self.protocol() to handle the
        wire-level protocol.
        """
        try:
          if platformType == "posix":
            numAccepts = self.numberAccepts
          else:
            numAccepts = 1
          for i in range(numAccepts):
            if self.disconnecting:
              return
            try:
              # 调用accept
              skt, addr = self.socket.accept()
            except socket.error as e:
              if e.args[0] in (EWOULDBLOCK, EAGAIN):
                self.numberAccepts = i
                break
              elif e.args[0] == EPERM:
                continue
              elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
                log.msg("Could not accept new connection (%s)" % (
                  errorcode[e.args[0]],))
                break
              raise

            fdesc._setCloseOnExec(skt.fileno())
            protocol = self.factory.buildProtocol(self._buildAddr(addr))
            if protocol is None:
              skt.close()
              continue
            s = self.sessionno
            self.sessionno = s+1
            # transport初始化的过程中,会将自身假如到reactor的读集合中,那么当它准备
            # 好读的时候,就可以调用它的doRead方法读取客户端发过来的数据了
            transport = self.transport(skt, protocol, addr, self, s, self.reactor)
            protocol.makeConnection(transport)
          else:
            self.numberAccepts = self.numberAccepts+20
        except:
          log.deferr()

doRead方法中,调用accept产生了用于接收客户端数据的套接字,将套接字与transport绑定,然后把transport加入到reactor的读集合。当客户端有数据到来时,就会调用transport的doRead方法进行数据读取了。

Connection是Server(transport实例的类)的父类,它实现了doRead方法。


    # twisted/internet/tcp.py
    @implementer(interfaces.ITCPTransport, interfaces.ISystemHandle)
    class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
             _AbortingMixin):

      def doRead(self):
        try:
          # 接收数据
          data = self.socket.recv(self.bufferSize)
        except socket.error as se:
          if se.args[0] == EWOULDBLOCK:
            return
          else:
            return main.CONNECTION_LOST

        return self._dataReceived(data)

      def _dataReceived(self, data):
        if not data:
          return main.CONNECTION_DONE
        # 调用我们自定义protocol的dataReceived方法处理数据
        rval = self.protocol.dataReceived(data)
        if rval is not None:
          offender = self.protocol.dataReceived
          warningFormat = (
            'Returning a value other than None from %(fqpn)s is '
            'deprecated since %(version)s.')
          warningString = deprecate.getDeprecationWarningString(
            offender, versions.Version('Twisted', 11, 0, 0),
            format=warningFormat)
          deprecate.warnAboutFunction(offender, warningString)
        return rval

_dataReceived中调用了示例中我们自定义的EchoProtocol的dataReceived方法处理数据。

至此,一个简单的流程,从创建监听事件,到接收客户端数据就此结束了。

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8