Gunicorn 源码阅读

295次阅读  |  发布于3年以前

gunicorn “Green Unicorn”,脱胎于ruby社区的Unicorn,是一个 WSGI HTTP Server。学习gunicorn后,我们可以把之前的 Bottle 程序正式部署起来。老规矩,本文分下面几个部分:

gunicorn 项目结构简介

gunicorn 源码选择的版本是 20.0.0,主要的文件及包如下:

文件 描述
app包 guincorn 的 Application (不是wsgi定义的applicaton)
http包 gunicorn 对 http协议的一些处理
workers包 gunicorn 的工作类实现 ,包括同步sync实现,线程池版本实现gthread,以及异步版本实现 geventlet,gevent等
arbiter.py guicorn 的master实现

根据gunicorn的设计特点:

Gunicorn is based on the pre-fork worker model. This means that there is a central master process that manages a set of worker processes. The master never knows anything about individual clients. All requests and responses are handled completely by worker processes.

gunicorn使用pre-fork 工作模型,也就是master提前fork出预定数量的work,管理worker集合。所有的request和response都由worker进程处理。

我们重点放在:gunicorn的服务实现,master-worker如何实现和协作上。

gunicorn 使用

编写测试app,可以看到这是一个符合wsgi规范的application:

# myapp.py

def app(environ, start_response):  # env 和 http 状态及头设定回调
    data = b"Hello, World!\n"
    start_response("200 OK", [
            ("Content-Type", "text/plain"),
            ("Content-Length", str(len(data)))
    ])
    return iter([data])  # 返回数据

使用4个work节点,日志级别debug的方式启动服务,加载 myapp:app

# gunicorn -w 4 --log-level debug  myapp:app
[2021-02-23 17:58:57 +0800] [50258] [DEBUG] Current configuration:  # 准备配置
...
[2021-02-23 18:01:12 +0800] [50462] [INFO] Starting gunicorn 20.0.0  # 启动gunicorn
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] Arbiter booted  # 启动master
[2021-02-23 18:01:12 +0800] [50462] [INFO] Listening at: http://127.0.0.1:8000 (50462)  # 监听端口
[2021-02-23 18:01:12 +0800] [50462] [INFO] Using worker: sync
[2021-02-23 18:01:12 +0800] [50464] [INFO] Booting worker with pid: 50464 # 启动worker
[2021-02-23 18:01:12 +0800] [50465] [INFO] Booting worker with pid: 50465
[2021-02-23 18:01:12 +0800] [50466] [INFO] Booting worker with pid: 50466
[2021-02-23 18:01:12 +0800] [50467] [INFO] Booting worker with pid: 50467
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] 4 workers

使用 curl 测试服务

# curl http://127.0.0.1:8000
Hello, World!

同时gunicorn中可以看到 worker=50465 处理了这个http请求

[2021-02-24 16:09:39 +0800] [50465] [DEBUG] GET /

运行时候,还可以通过发送信号,手动扩充work节点数

# kill -TTIN 50462

观察服务日志,会发现 master=50462 进程处理了 ttin 信号,并且扩展worker节点数到5

...
[2021-02-24 18:02:56 +0800] [50462] [INFO] Handling signal: ttin
[2021-02-24 18:02:56 +0800] [75918] [INFO] Booting worker with pid: 75918
[2021-02-24 18:02:56 +0800] [50462] [DEBUG] 5 workers

使用 Ctrl+C 关闭服务,可以看到也是 master=50462 进程处理了 int 信号,并且在关闭worker节点后关闭自己

^C[2021-02-25 15:06:54 +0800] [50462] [INFO] Handling signal: int
[2021-02-25 15:06:54 +0800] [50464] [INFO] Worker exiting (pid: 50464)
[2021-02-25 15:06:54 +0800] [50465] [INFO] Worker exiting (pid: 50465)
[2021-02-25 15:06:54 +0800] [50466] [INFO] Worker exiting (pid: 50466)
[2021-02-25 15:06:54 +0800] [50467] [INFO] Worker exiting (pid: 50467)
[2021-02-25 15:06:54 +0800] [75918] [INFO] Worker exiting (pid: 75918)
[2021-02-25 15:06:54 +0800] [50462] [INFO] Shutting down: Master

如果对gunicon的参数不了解,可以使用下面命令查看帮助

# gunicorn -h
usage: gunicorn [OPTIONS] [APP_MODULE]

optional arguments:
  -h, --help            show this help message and exit
  ...
  -w INT, --workers INT
                        The number of worker processes for handling requests. [1]

帮助使用我们熟悉的 argparse 实现。

class Setting(object):

 def add_option(self, parser):
        args = tuple(self.cli)

        help_txt = "%s [%s]" % (self.short, self.default)
        help_txt = help_txt.replace("%", "%%")

        kwargs = {
            "dest": self.name,
            "action": self.action or "store",
            "type": self.type or str,
            "default": None,
            "help": help_txt
        }
        ...
        parser.add_argument(*args, **kwargs)  # 添加选项

class Workers(Setting):  # --workers 的选项类
    name = "workers"
    section = "Worker Processes"
    cli = ["-w", "--workers"]
    meta = "INT"
    validator = validate_pos_int
    type = int
    default = int(os.environ.get("WEB_CONCURRENCY", 1))
    desc = """\
        The number of worker processes for handling requests.

        A positive integer generally in the ``2-4 x $(NUM_CORES)`` range.
        You'll want to vary this a bit to find the best for your particular
        application's work load.

        By default, the value of the ``WEB_CONCURRENCY`` environment variable.
        If it is not defined, the default is ``1``.
        """

def parser(self):
    kwargs = {
        "usage": self.usage,
        "prog": self.prog
    }
    parser = argparse.ArgumentParser(**kwargs)
    parser.add_argument("-v", "--version",
            action="version", default=argparse.SUPPRESS,
            version="%(prog)s (version " + __version__ + ")\n",
            help="show program's version number and exit")
    parser.add_argument("args", nargs="*", help=argparse.SUPPRESS)

    keys = sorted(self.settings, key=self.settings.__getitem__)  # 动态添加参数选项
    for k in keys:
        self.settings[k].add_option(parser)

    return parser

gunicorn-application 实现

gunicorn的application主要是下面三个类实现。需要注意的是这里的application可以理解为web-server的application;bottle/flask/django等实现的是web-framework的applicaiton。前者动态加载后者,前者处理http服务,后者处理单次的http请求。

3个Application梳理后,大概的代码模版如下:

class WSGIApplication(Application)

 def __init__(self, usage=None, prog=None):
  self.do_load_config()  # 加载配置

 def do_load_config():
  ...
  cfg = self.init(parser, args, args.args)  # 初始化配置
  ...

 def init(...):
     ...
     self.app_uri = args[0]  # 获取wsgi-application参数

  def load(...):
    util.import_app(self.app_uri)  # 动态加载wsgi-application
    ...

 def run(...):
  self.load()
  Arbiter(self).run()  # 启动master,也就是Arbiter

def run():  # 运行服务
    """\
    The ``gunicorn`` command line runner for launching Gunicorn with
    generic WSGI applications.
    """
    from gunicorn.app.wsgiapp import WSGIApplication
    WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()

if __name__ == '__main__':
    run()

application部分的实现,相对比较简单,就不再赘述。

arbiter实现

Arbiter 仲裁者,事实上的master进程核心,整理后代码模版如下:

class Arbiter(object):
 def __init__(self, app):
  self.worker_class = self.cfg.worker_class  # worker类
  self.num_workers = self.cfg.worker  # worker数量
        ...

    def start():
  self.init_signals()  # 初始化信号监听
  ...
  sock.create_socket(...) # 创建socket服务

    def run(self):  
  self.start()
  try:
            self.manage_workers() # 启动节点

            while True: #  无限循环
                ...
                sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
                if sig is None:
                    self.sleep()  # 持续休眠
                    self.murder_workers()
                    self.manage_workers()
                    continue

                if sig not in self.SIG_NAMES:
                    self.log.info("Ignoring unknown signal: %s", sig)
                    continue
           # 处理信号
                signame = self.SIG_NAMES.get(sig)
                handler = getattr(self, "handle_%s" % signame, None)
                ...
                handler()
                self.wakeup()  # 唤醒
        except (StopIteration, KeyboardInterrupt):
           ...

在了解Arbiter工作前先了解一下信号, linux 系统可以使用下面命令查看信号清单

# kill -l
 1) SIGHUP  2) SIGINT  3) SIGQUIT  4) SIGILL  5) SIGTRAP
 6) SIGABRT  7) SIGBUS  8) SIGFPE  9) SIGKILL 10) SIGUSR1
 11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM
...

信号是操作系统提供的事件,可以用来进行跨进程的通信。Arbiter.init_signals 做的工作如下:

SIGNALS = [getattr(signal, "SIG%s" % x)
               for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()]

def init_signals(self):
    ...

    # initialize all signals
    for s in self.SIGNALS:
        signal.signal(s, self.signal)
    signal.signal(signal.SIGCHLD, self.handle_chld)  # 添加信号监听器

def signal(self, sig, frame):
 if len(self.SIG_QUEUE) < 5:
   self.SIG_QUEUE.append(sig)
   self.wakeup()

之前演示的扩容信号 TTIN 是这样处理的 :

def handle_ttin(self):
    """\
    SIGTTIN handling.
    Increases the number of workers by one.
    """
    self.num_workers += 1  # 扩容 
    self.manage_workers()  # 管理worker 

Arbiter的sleep和warkeup是这样实现的:

self.PIPE = pair = os.pipe()  # 创建管道

def sleep(self):
 """\
    Sleep until PIPE is readable or we timeout.
    A readable PIPE means a signal occurred.
    """
    try:
        ready = select.select([self.PIPE[0]], [], [], 1.0)  # 使用select监听管道的数据变化
        if not ready[0]:
            return
        while os.read(self.PIPE[0], 1):  # 读取管道数据
            pass
    except (select.error, OSError) as e:
        ...

def wakeup(self):
    """\
    Wake up the arbiter by writing to the PIPE
    """
    try:
        os.write(self.PIPE[1], b'.')  # 管道写入
    except IOError as e:
        ...

需要说明的是Arbiter通过 sock.create_sockets 创建了socket,并绑定端口和监听,然后在fork-worker的时候,将socket传递给了子进程。

worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
                                   self.app, self.timeout / 2.0,
                                   self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
  worker.pid = pid  # 记录worker的pid
    self.WORKERS[pid] = worker # 添加到worker集合
    return pid

销毁worker是使用信号:

def kill_workers(self, sig):
    """\
    Kill all workers with the signal `sig`
    :attr sig: `signal.SIG*` value
    """
    worker_pids = list(self.WORKERS.keys())
    for pid in worker_pids:
        os.kill(pid, sig)

sync-worker实现

接下来,我们看看worker,主要是sync-worker的实现。worker的关系主要如下:

接之前Arbiter中fork-worker的代码,创建完成的work进入 init_process

# Process Child
worker.pid = os.getpid()
try:
    util._setproctitle("worker [%s]" % self.proc_name)
    self.log.info("Booting worker with pid: %s", worker.pid)
    self.cfg.post_fork(self, worker)
    worker.init_process()
    sys.exit(0)

work的init_process模版如下:

def init_process(self):
    """\
    If you override this method in a subclass, the last statement
    in the function should be to call this method with
    super().init_process() so that the ``run()`` loop is initiated.
    """
    # For waking ourselves up
    self.PIPE = os.pipe()  # 创建管道
    ...
    self.wait_fds = self.sockets + [self.PIPE[0]]  # 监听管道和socket
   ...
    self.init_signals()  # 初始化信号监听
   ...
    self.load_wsgi()  # 加载wsgi的应用
    ...
    # Enter main run loop
    self.booted = True
    self.run()  # 工作循环

work一样的进行信号监听:

SIGNALS = [getattr(signal, "SIG%s" % x)
            for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]
def init_signals(self):
    # reset signaling
    for s in self.SIGNALS:
      signal.signal(s, signal.SIG_DFL)
    # init new signaling
    signal.signal(signal.SIGQUIT, self.handle_quit)
    signal.signal(signal.SIGTERM, self.handle_exit)
    signal.signal(signal.SIGINT, self.handle_quit)
    ...

    if hasattr(signal, 'set_wakeup_fd'):
      signal.set_wakeup_fd(self.PIPE[1])  # 等待select唤醒

work最重要的run循环:

 def run(self, timeout):
    listener = self.sockets[0]
    while self.alive:
     ...
        # Accept a connection. If we get an error telling us
        # that no connection is waiting we fall down to the
        # select which is where we'll wait for a bit for new
        # workers to come give us some love.
        try:
            self.accept(listener)  # 接受客户端链接
            # Keep processing clients until no one is waiting. This
            # prevents the need to select() for every client that we
            # process.
            continue

        except EnvironmentError as e:
              ...

        try:
            self.wait(timeout)  # 休眠等待 
        except StopWaiting:
            return

处理客户端连接,这一部分和之前介绍http比较类似,也不再赘述。

def accept(self, listener):
    client, addr = listener.accept()
    client.setblocking(1)
    util.close_on_exec(client)
    self.handle(listener, client, addr)

work处理完成请求后进入等待

def wait(self, timeout):
    try:
        ret = select.select(self.wait_fds, [], [], timeout)
        if ret[0]:
            if self.PIPE[0] in ret[0]:
                os.read(self.PIPE[0], 1)
            return ret[0]

    except select.error as e:
        if e.args[0] == errno.EINTR:
            return self.sockets
        if e.args[0] == errno.EBADF:
            if self.nr < 0:
                return self.sockets
            else:
                raise StopWaiting
        raise

小结

可以用下面一张图展示gunicorn的工作流程,作为我们的小结论

小技巧

可以使用thread,实现一个定时器

# reloader.py

class Reloader(threading.Thread):
    def __init__(self, extra_files=None, interval=1, callback=None):
        super().__init__()
        self.setDaemon(True)
        self._interval = interval
        self._callback = callback

    def run(self):
        mtimes = {}
        while True:
            for filename in self.get_files():
                try:
                    mtime = os.stat(filename).st_mtime
                except OSError:
                    continue
                old_time = mtimes.get(filename)
                if old_time is None:
                    mtimes[filename] = mtime
                    continue
                elif mtime > old_time:
                    if self._callback:
                        self._callback(filename)
            time.sleep(self._interval)

在使用 gunicorn myapp:app 命令的时候, myapp:app 没有静态的 import ,而是这样动态加载的:

# util.py

klass = components.pop(-1)

mod = importlib.import_module('.'.join(components))

return getattr(mod, klass)

参考链接

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8