gunicorn “Green Unicorn”,脱胎于ruby社区的Unicorn,是一个 WSGI HTTP Server。学习gunicorn后,我们可以把之前的 Bottle 程序正式部署起来。老规矩,本文分下面几个部分:
gunicorn 源码选择的版本是 20.0.0
,主要的文件及包如下:
文件 | 描述 |
---|---|
app包 | guincorn 的 Application (不是wsgi定义的applicaton) |
http包 | gunicorn 对 http协议的一些处理 |
workers包 | gunicorn 的工作类实现 ,包括同步sync实现,线程池版本实现gthread,以及异步版本实现 geventlet,gevent等 |
arbiter.py | guicorn 的master实现 |
根据gunicorn的设计特点:
Gunicorn is based on the pre-fork worker model. This means that there is a central master process that manages a set of worker processes. The master never knows anything about individual clients. All requests and responses are handled completely by worker processes.
gunicorn使用pre-fork 工作模型,也就是master提前fork出预定数量的work,管理worker集合。所有的request和response都由worker进程处理。
我们重点放在:gunicorn的服务实现,master-worker如何实现和协作上。
编写测试app,可以看到这是一个符合wsgi规范的application:
# myapp.py
def app(environ, start_response): # env 和 http 状态及头设定回调
data = b"Hello, World!\n"
start_response("200 OK", [
("Content-Type", "text/plain"),
("Content-Length", str(len(data)))
])
return iter([data]) # 返回数据
使用4个work节点,日志级别debug的方式启动服务,加载 myapp:app
# gunicorn -w 4 --log-level debug myapp:app
[2021-02-23 17:58:57 +0800] [50258] [DEBUG] Current configuration: # 准备配置
...
[2021-02-23 18:01:12 +0800] [50462] [INFO] Starting gunicorn 20.0.0 # 启动gunicorn
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] Arbiter booted # 启动master
[2021-02-23 18:01:12 +0800] [50462] [INFO] Listening at: http://127.0.0.1:8000 (50462) # 监听端口
[2021-02-23 18:01:12 +0800] [50462] [INFO] Using worker: sync
[2021-02-23 18:01:12 +0800] [50464] [INFO] Booting worker with pid: 50464 # 启动worker
[2021-02-23 18:01:12 +0800] [50465] [INFO] Booting worker with pid: 50465
[2021-02-23 18:01:12 +0800] [50466] [INFO] Booting worker with pid: 50466
[2021-02-23 18:01:12 +0800] [50467] [INFO] Booting worker with pid: 50467
[2021-02-23 18:01:12 +0800] [50462] [DEBUG] 4 workers
使用 curl
测试服务
# curl http://127.0.0.1:8000
Hello, World!
同时gunicorn中可以看到 worker=50465 处理了这个http请求
[2021-02-24 16:09:39 +0800] [50465] [DEBUG] GET /
运行时候,还可以通过发送信号,手动扩充work节点数
# kill -TTIN 50462
观察服务日志,会发现 master=50462 进程处理了 ttin
信号,并且扩展worker节点数到5
...
[2021-02-24 18:02:56 +0800] [50462] [INFO] Handling signal: ttin
[2021-02-24 18:02:56 +0800] [75918] [INFO] Booting worker with pid: 75918
[2021-02-24 18:02:56 +0800] [50462] [DEBUG] 5 workers
使用 Ctrl+C
关闭服务,可以看到也是 master=50462 进程处理了 int
信号,并且在关闭worker节点后关闭自己
^C[2021-02-25 15:06:54 +0800] [50462] [INFO] Handling signal: int
[2021-02-25 15:06:54 +0800] [50464] [INFO] Worker exiting (pid: 50464)
[2021-02-25 15:06:54 +0800] [50465] [INFO] Worker exiting (pid: 50465)
[2021-02-25 15:06:54 +0800] [50466] [INFO] Worker exiting (pid: 50466)
[2021-02-25 15:06:54 +0800] [50467] [INFO] Worker exiting (pid: 50467)
[2021-02-25 15:06:54 +0800] [75918] [INFO] Worker exiting (pid: 75918)
[2021-02-25 15:06:54 +0800] [50462] [INFO] Shutting down: Master
如果对gunicon的参数不了解,可以使用下面命令查看帮助
# gunicorn -h
usage: gunicorn [OPTIONS] [APP_MODULE]
optional arguments:
-h, --help show this help message and exit
...
-w INT, --workers INT
The number of worker processes for handling requests. [1]
帮助使用我们熟悉的 argparse 实现。
class Setting(object):
def add_option(self, parser):
args = tuple(self.cli)
help_txt = "%s [%s]" % (self.short, self.default)
help_txt = help_txt.replace("%", "%%")
kwargs = {
"dest": self.name,
"action": self.action or "store",
"type": self.type or str,
"default": None,
"help": help_txt
}
...
parser.add_argument(*args, **kwargs) # 添加选项
class Workers(Setting): # --workers 的选项类
name = "workers"
section = "Worker Processes"
cli = ["-w", "--workers"]
meta = "INT"
validator = validate_pos_int
type = int
default = int(os.environ.get("WEB_CONCURRENCY", 1))
desc = """\
The number of worker processes for handling requests.
A positive integer generally in the ``2-4 x $(NUM_CORES)`` range.
You'll want to vary this a bit to find the best for your particular
application's work load.
By default, the value of the ``WEB_CONCURRENCY`` environment variable.
If it is not defined, the default is ``1``.
"""
def parser(self):
kwargs = {
"usage": self.usage,
"prog": self.prog
}
parser = argparse.ArgumentParser(**kwargs)
parser.add_argument("-v", "--version",
action="version", default=argparse.SUPPRESS,
version="%(prog)s (version " + __version__ + ")\n",
help="show program's version number and exit")
parser.add_argument("args", nargs="*", help=argparse.SUPPRESS)
keys = sorted(self.settings, key=self.settings.__getitem__) # 动态添加参数选项
for k in keys:
self.settings[k].add_option(parser)
return parser
gunicorn的application主要是下面三个类实现。需要注意的是这里的application可以理解为web-server的application;bottle/flask/django等实现的是web-framework的applicaiton。前者动态加载后者,前者处理http服务,后者处理单次的http请求。
3个Application梳理后,大概的代码模版如下:
class WSGIApplication(Application)
def __init__(self, usage=None, prog=None):
self.do_load_config() # 加载配置
def do_load_config():
...
cfg = self.init(parser, args, args.args) # 初始化配置
...
def init(...):
...
self.app_uri = args[0] # 获取wsgi-application参数
def load(...):
util.import_app(self.app_uri) # 动态加载wsgi-application
...
def run(...):
self.load()
Arbiter(self).run() # 启动master,也就是Arbiter
def run(): # 运行服务
"""\
The ``gunicorn`` command line runner for launching Gunicorn with
generic WSGI applications.
"""
from gunicorn.app.wsgiapp import WSGIApplication
WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()
if __name__ == '__main__':
run()
application部分的实现,相对比较简单,就不再赘述。
Arbiter 仲裁者,事实上的master进程核心,整理后代码模版如下:
class Arbiter(object):
def __init__(self, app):
self.worker_class = self.cfg.worker_class # worker类
self.num_workers = self.cfg.worker # worker数量
...
def start():
self.init_signals() # 初始化信号监听
...
sock.create_socket(...) # 创建socket服务
def run(self):
self.start()
try:
self.manage_workers() # 启动节点
while True: # 无限循环
...
sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
if sig is None:
self.sleep() # 持续休眠
self.murder_workers()
self.manage_workers()
continue
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
# 处理信号
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
...
handler()
self.wakeup() # 唤醒
except (StopIteration, KeyboardInterrupt):
...
在了解Arbiter工作前先了解一下信号, linux 系统可以使用下面命令查看信号清单
# kill -l
1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL 5) SIGTRAP
6) SIGABRT 7) SIGBUS 8) SIGFPE 9) SIGKILL 10) SIGUSR1
11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM
...
信号是操作系统提供的事件,可以用来进行跨进程的通信。Arbiter.init_signals 做的工作如下:
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()]
def init_signals(self):
...
# initialize all signals
for s in self.SIGNALS:
signal.signal(s, self.signal)
signal.signal(signal.SIGCHLD, self.handle_chld) # 添加信号监听器
def signal(self, sig, frame):
if len(self.SIG_QUEUE) < 5:
self.SIG_QUEUE.append(sig)
self.wakeup()
之前演示的扩容信号 TTIN
是这样处理的 :
def handle_ttin(self):
"""\
SIGTTIN handling.
Increases the number of workers by one.
"""
self.num_workers += 1 # 扩容
self.manage_workers() # 管理worker
Arbiter的sleep和warkeup是这样实现的:
self.PIPE = pair = os.pipe() # 创建管道
def sleep(self):
"""\
Sleep until PIPE is readable or we timeout.
A readable PIPE means a signal occurred.
"""
try:
ready = select.select([self.PIPE[0]], [], [], 1.0) # 使用select监听管道的数据变化
if not ready[0]:
return
while os.read(self.PIPE[0], 1): # 读取管道数据
pass
except (select.error, OSError) as e:
...
def wakeup(self):
"""\
Wake up the arbiter by writing to the PIPE
"""
try:
os.write(self.PIPE[1], b'.') # 管道写入
except IOError as e:
...
需要说明的是Arbiter通过 sock.create_sockets
创建了socket,并绑定端口和监听,然后在fork-worker的时候,将socket传递给了子进程。
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
worker.pid = pid # 记录worker的pid
self.WORKERS[pid] = worker # 添加到worker集合
return pid
销毁worker是使用信号:
def kill_workers(self, sig):
"""\
Kill all workers with the signal `sig`
:attr sig: `signal.SIG*` value
"""
worker_pids = list(self.WORKERS.keys())
for pid in worker_pids:
os.kill(pid, sig)
接下来,我们看看worker,主要是sync-worker的实现。worker的关系主要如下:
接之前Arbiter中fork-worker的代码,创建完成的work进入 init_process
# Process Child
worker.pid = os.getpid()
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", worker.pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0)
work的init_process模版如下:
def init_process(self):
"""\
If you override this method in a subclass, the last statement
in the function should be to call this method with
super().init_process() so that the ``run()`` loop is initiated.
"""
# For waking ourselves up
self.PIPE = os.pipe() # 创建管道
...
self.wait_fds = self.sockets + [self.PIPE[0]] # 监听管道和socket
...
self.init_signals() # 初始化信号监听
...
self.load_wsgi() # 加载wsgi的应用
...
# Enter main run loop
self.booted = True
self.run() # 工作循环
work一样的进行信号监听:
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]
def init_signals(self):
# reset signaling
for s in self.SIGNALS:
signal.signal(s, signal.SIG_DFL)
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
...
if hasattr(signal, 'set_wakeup_fd'):
signal.set_wakeup_fd(self.PIPE[1]) # 等待select唤醒
work最重要的run循环:
def run(self, timeout):
listener = self.sockets[0]
while self.alive:
...
# Accept a connection. If we get an error telling us
# that no connection is waiting we fall down to the
# select which is where we'll wait for a bit for new
# workers to come give us some love.
try:
self.accept(listener) # 接受客户端链接
# Keep processing clients until no one is waiting. This
# prevents the need to select() for every client that we
# process.
continue
except EnvironmentError as e:
...
try:
self.wait(timeout) # 休眠等待
except StopWaiting:
return
处理客户端连接,这一部分和之前介绍http比较类似,也不再赘述。
def accept(self, listener):
client, addr = listener.accept()
client.setblocking(1)
util.close_on_exec(client)
self.handle(listener, client, addr)
work处理完成请求后进入等待
def wait(self, timeout):
try:
ret = select.select(self.wait_fds, [], [], timeout)
if ret[0]:
if self.PIPE[0] in ret[0]:
os.read(self.PIPE[0], 1)
return ret[0]
except select.error as e:
if e.args[0] == errno.EINTR:
return self.sockets
if e.args[0] == errno.EBADF:
if self.nr < 0:
return self.sockets
else:
raise StopWaiting
raise
可以用下面一张图展示gunicorn的工作流程,作为我们的小结论
可以使用thread,实现一个定时器
# reloader.py
class Reloader(threading.Thread):
def __init__(self, extra_files=None, interval=1, callback=None):
super().__init__()
self.setDaemon(True)
self._interval = interval
self._callback = callback
def run(self):
mtimes = {}
while True:
for filename in self.get_files():
try:
mtime = os.stat(filename).st_mtime
except OSError:
continue
old_time = mtimes.get(filename)
if old_time is None:
mtimes[filename] = mtime
continue
elif mtime > old_time:
if self._callback:
self._callback(filename)
time.sleep(self._interval)
在使用 gunicorn myapp:app
命令的时候, myapp:app 没有静态的 import ,而是这样动态加载的:
# util.py
klass = components.pop(-1)
mod = importlib.import_module('.'.join(components))
return getattr(mod, klass)
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8