可靠的请求-应答模式

第三章中我们使用实例介绍了高级请求-应答模式,本章我们会讲述请求-应答模式的可靠性问题,并使用ZMQ提供的套接字类型组建起可靠的请求-应答消息系统。

本章将介绍的内容有:

  • 客户端请求-应答
  • 最近最少使用队列
  • 心跳机制
  • 面向服务的队列
  • 基于磁盘(脱机)队列
  • 主从备份服务
  • 无中间件的请求-应答

什么是可靠性?

要给可靠性下定义,我们可以先界定它的相反面——故障。如果我们可以处理某些类型的故障,那么我们的模型对于这些故障就是可靠的。下面我们就来列举分布式ZMQ应用程序中可能发生的问题,从可能性高的故障开始:

  • 应用程序代码是最大的故障来源。程序会崩溃或中止,停止对数据来源的响应,或是响应得太慢,耗尽内存等。
  • 系统代码,如使用ZMQ编写的中间件,也会意外中止。系统代码应该要比应用程序代码更为可靠,但毕竟也有可能崩溃。特别是当系统代码与速度过慢的客户端交互时,很容易耗尽内存。
  • 消息队列溢出,典型的情况是系统代码中没有对慢客户端做积极的处理,任由消息队列溢出。
  • 网络临时中断,造成消息丢失。这类错误ZMQ应用程序是无法及时发现的,因为ZMQ会自动进行重连。
  • 硬件系统崩溃,导致所有进程中止。
  • 网络会出现特殊情形的中断,如交换机的某个端口发生故障,导致部分网络无法访问。
  • 数据中心可能遭受雷击、地震、火灾、电压过载、冷却系统失效等。

想要让软件系统规避上述所有的风险,需要大量的人力物力,故不在本指南的讨论范围之内。

由于前五个故障类型涵盖了99.9%的情形(这一数据源自我近期进行的一项研究),所以我们会深入探讨。如果你的公司大到足以考虑最后两种情形,那请及时联系我,因为我正愁没钱将我家后院的大坑建成游泳池。

可靠性设计

简单地来说,可靠性就是当程序发生故障时也能顺利地运行下去,这要比搭建一个消息系统来得困难得多。我们会根据ZMQ提供的每一种核心消息模式,来看看如何保障代码的持续运行。

  • 请求-应答模式:当服务端在处理请求是中断,客户端能够得知这一信息,并停止接收消息,转而选择等待重试、请求另一服务端等操作。这里我们暂不讨论客户端发生问题的情形。

  • 发布-订阅模式:如果客户端收到一些消息后意外中止,服务端是不知道这一情况的。发布-订阅模式中的订阅者不会返回任何消息给发布者。但是,订阅者可以通过其他方式联系服务端,如请求-应答模式,要求服务端重发消息。这里我们暂不讨论服务端发生问题的情形。此外,订阅者可以通过某些方式检查自身是否运行得过慢,并采取相应措施(向操作者发出警告、中止等)。

  • 管道模式:如果worker意外终止,任务分发器将无从得知。管道模式和发布-订阅模式类似,只朝一个方向发送消息。但是,下游的结果收集器可以检测哪项任务没有完成,并告诉任务分发器重新分配该任务。如果任务分发器或结果收集器意外中止了,那客户端发出的请求只能另作处理。所以说,系统代码真的要减少出错的几率,因为这很难处理。

本章主要讲解请求-应答模式中的可靠性设计,其他模式将在后续章节中讲解。

最基本的请求应答模式是REQ客户端发送一个同步的请求至REP服务端,这种模式的可靠性很低。如果服务端在处理请求时中止,那客户端会永远处于等待状态。

相比TCP协议,ZMQ提供了自动重连机制、消息分发的负载均衡等。但是,在真实环境中这也是不够的。唯一可以完全信任基本请求-应答模式的应用场景是同一进程的两个线程之间进行通信,没有网络问题或服务器失效的情况。

但是,只要稍加修饰,这种基本的请求-应答模式就能很好地在现实环境中工作了。我喜欢将其称为“海盗”模式。

粗略地讲,客户端连接服务端有三种方式,每种方式都需要不同的可靠性设计:

  • 多个客户端直接和单个服务端进行通信。使用场景:只有一个单点服务器,所有客户端都需要和它通信。需处理的故障:服务器崩溃和重启;网络连接中断。

  • 多个客户端和单个队列装置通信,该装置将请求分发给多个服务端。使用场景:任务分发。需处理的故障:worker崩溃和重启,死循环,过载;队列装置崩溃和重启;网络中断。

  • 多个客户端直接和多个服务端通信,无中间件。使用场景:类似域名解析的分布式服务。需处理的故障:服务端崩溃和重启,死循环,过载;网络连接中断。

以上每种设计都必须有所取舍,很多时候会混合使用。下面我们详细说明。

客户端的可靠性设计(懒惰海盗模式)

我们可以通过在客户端进行简单的设置,来实现可靠的请求-应答模式。我暂且称之为“懒惰的海盗”(Lazy Pirate)模式。

在接收应答时,我们不进行同步等待,而是做以下操作:

  • 对REQ套接字进行轮询,当消息抵达时才进行接收;
  • 请求超时后重发消息,循环多次;
  • 若仍无消息,则结束当前事务。

1

使用REQ套接字时必须严格遵守发送-接收过程,因为它内部采用了一个有限状态机来限定状态,这一特性会让我们应用“海盗”模式时遇上一些麻烦。最简单的做法是将REQ套接字关闭重启,从而打破这一限定。

lpclient: Lazy Pirate client in C

//
//  Lazy Pirate client
//  使用zmq_poll轮询来实现安全的请求-应答
//  运行时可随机关闭或重启lpserver程序
//
#include "czmq.h"

#define REQUEST_TIMEOUT     2500    //  毫秒, (> 1000!)
#define REQUEST_RETRIES     3       //  尝试次数
#define SERVER_ENDPOINT     "tcp://localhost:5555"

int main (void)
{
    zctx_t *ctx = zctx_new ();
    printf ("I: 正在连接服务器...\n");
    void *client = zsocket_new (ctx, ZMQ_REQ);
    assert (client);
    zsocket_connect (client, SERVER_ENDPOINT);

    int sequence = 0;
    int retries_left = REQUEST_RETRIES;
    while (retries_left && !zctx_interrupted) {
        //  发送一个请求,并开始接收消息
        char request [10];
        sprintf (request, "%d", ++sequence);
        zstr_send (client, request);

        int expect_reply = 1;
        while (expect_reply) {
            //  对套接字进行轮询,并设置超时时间
            zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
            int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
            if (rc == -1)
                break;          //  中断

            //  如果接收到回复则进行处理
            if (items [0].revents & ZMQ_POLLIN) {
                //  收到服务器应答,必须和请求时的序号一致
                char *reply = zstr_recv (client);
                if (!reply)
                    break;      //  Interrupted
                if (atoi (reply) == sequence) {
                    printf ("I: 服务器返回正常 (%s)\n", reply);
                    retries_left = REQUEST_RETRIES;
                    expect_reply = 0;
                }
                else
                    printf ("E: 服务器返回异常: %s\n",
                        reply);

                free (reply);
            }
            else
            if (--retries_left == 0) {
                printf ("E: 服务器不可用,取消操作\n");
                break;
            }
            else {
                printf ("W: 服务器没有响应,正在重试...\n");
                //  关闭旧套接字,并建立新套接字
                zsocket_destroy (ctx, client);
                printf ("I: 服务器重连中...\n");
                client = zsocket_new (ctx, ZMQ_REQ);
                zsocket_connect (client, SERVER_ENDPOINT);
                //  使用新套接字再次发送请求
                zstr_send (client, request);
            }
        }
    }
    zctx_destroy (&ctx);
    return 0;
}

lpserver: Lazy Pirate server in C

//
//  Lazy Pirate server
//  将REQ套接字连接至 tcp://*:5555
//  和hwserver程序类似,除了以下两点:
//   - 直接输出请求内容
//   - 随机地降慢运行速度,或中止程序,模拟崩溃
//
#include "zhelpers.h"

int main (void)
{
    srandom ((unsigned) time (NULL));

    void *context = zmq_init (1);
    void *server = zmq_socket (context, ZMQ_REP);
    zmq_bind (server, "tcp://*:5555");

    int cycles = 0;
    while (1) {
        char *request = s_recv (server);
        cycles++;

        //  循环几次后开始模拟各种故障
        if (cycles > 3 && randof (3) == 0) {
            printf ("I: 模拟程序崩溃\n");
            break;
        }
        else
        if (cycles > 3 && randof (3) == 0) {
            printf ("I: 模拟CPU过载\n");
            sleep (2);
        }
        printf ("I: 正常请求 (%s)\n", request);
        sleep (1);              //  耗时的处理过程
        s_send (server, request);
        free (request);
    }
    zmq_close (server);
    zmq_term (context);
    return 0;
}

运行这个测试用例时,可以打开两个控制台,服务端会随机发生故障,你可以看看客户端的反应。服务端的典型输出如下:

I: normal request (1)
I: normal request (2)
I: normal request (3)
I: simulating CPU overload
I: normal request (4)
I: simulating a crash

客户端的输出是:

I: connecting to server...
I: server replied OK (1)
I: server replied OK (2)
I: server replied OK (3)
W: no response from server, retrying...
I: connecting to server...
W: no response from server, retrying...
I: connecting to server...
E: server seems to be offline, abandoning

客户端为每次请求都加上了序列号,并检查收到的应答是否和序列号一致,以保证没有请求或应答丢失,同一个应答收到多次或乱序。多运行几次实例,看看是否真的能够解决问题。现实环境中你不需要使用到序列号,那只是为了证明这一方式是可行的。

客户端使用REQ套接字进行请求,并在发生问题时打开一个新的套接字来,绕过REQ强制的发送/接收过程。可能你会想用DEALER套接字,但这并不是一个好主意。首先,DEALER并不会像REQ那样处理信封(如果你不知道信封是什么,那更不能用DEALER了)。其次,你可能会获得你并不想得到的结果。

这一方案的优劣是:

  • 优点:简单明了,容易实施;
  • 优点:可以方便地应用到现有的客户端和服务端程序中;
  • 优点:ZMQ有自动重连机制;
  • 缺点:单点服务发生故障时不能定位到新的可用服务。

基本的可靠队列(简单海盗模式)

在第二种模式中,我们使用一个队列装置来扩展上述的“懒惰的海盗”模式,使客户端能够透明地和多个服务端通信。这里的服务端可以定义为worker。我们可以从最基础的模型开始,分阶段实施这个方案。

在所有的海盗模式中,worker是无状态的,或者说存在着一个我们所不知道的公共状态,如共享数据库。队列装置的存在意味着worker可以在client毫不知情的情况下随意进出。一个worker死亡后,会有另一个worker接替它的工作。这种拓扑结果非常简洁,但唯一的缺点是队列装置本身会难以维护,可能造成单点故障。

在第三章中,队列装置的基本算法是最近最少使用算法。那么,如果worker死亡或阻塞,我们需要做些什么?答案是很少很少。我们已经在client中加入了重试的机制,所以,使用基本的LRU队列就可以运作得很好了。这种做法也符合ZMQ的逻辑,所以我们可以通过在点对点交互中插入一个简单的队列装置来扩展它:

2

我们可以直接使用“懒惰的海盗”模式中的client,以下是队列装置的代码:

spqueue: Simple Pirate queue in C

//
//  简单海盗队列
//  
//  这个装置和LRU队列完全一致,不存在任何可靠性机制,依靠client的重试来保证装置的运行
//
#include "czmq.h"

#define LRU_READY   "\001"      //  消息:worker准备就绪

int main (void)
{
    //  准备上下文和套接字
    zctx_t *ctx = zctx_new ();
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    void *backend = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (frontend, "tcp://*:5555");    //  client端点
    zsocket_bind (backend,  "tcp://*:5556");    //  worker端点

    //  存放可用worker的队列
    zlist_t *workers = zlist_new ();

    while (1) {
        zmq_pollitem_t items [] = {
            { backend,  0, ZMQ_POLLIN, 0 },
            { frontend, 0, ZMQ_POLLIN, 0 }
        };
        //  当有可用的woker时,轮询前端端点
        int rc = zmq_poll (items, zlist_size (workers)? 2: 1, -1);
        if (rc == -1)
            break;              //  中断

        //  处理后端端点的worker消息
        if (items [0].revents & ZMQ_POLLIN) {
            //  使用worker的地址进行LRU排队
            zmsg_t *msg = zmsg_recv (backend);
            if (!msg)
                break;          //  中断
            zframe_t *address = zmsg_unwrap (msg);
            zlist_append (workers, address);

            //  如果消息不是READY,则转发给client
            zframe_t *frame = zmsg_first (msg);
            if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
                zmsg_destroy (&msg);
            else
                zmsg_send (&msg, frontend);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            //  获取client请求,转发给第一个可用的worker
            zmsg_t *msg = zmsg_recv (frontend);
            if (msg) {
                zmsg_wrap (msg, (zframe_t *) zlist_pop (workers));
                zmsg_send (&msg, backend);
            }
        }
    }
    //  程序运行结束,进行清理
    while (zlist_size (workers)) {
        zframe_t *frame = (zframe_t *) zlist_pop (workers);
        zframe_destroy (&frame);
    }
    zlist_destroy (&workers);
    zctx_destroy (&ctx);
    return 0;
}

以下是worker的代码,用到了“懒惰的海盗”服务,并将其调整为LRU模式(使用REQ套接字传递“已就绪”信号):

spworker: Simple Pirate worker in C

//
//  简单海盗模式worker
//  
//  使用REQ套接字连接tcp://*:5556,使用LRU算法实现worker
//
#include "czmq.h"
#define LRU_READY   "\001"      //  消息:worker已就绪

int main (void)
{
    zctx_t *ctx = zctx_new ();
    void *worker = zsocket_new (ctx, ZMQ_REQ);

    //  使用随机符号来指定套接字标识,方便追踪
    srandom ((unsigned) time (NULL));
    char identity [10];
    sprintf (identity, "%04X-%04X", randof (0x10000), randof (0x10000));
    zmq_setsockopt (worker, ZMQ_IDENTITY, identity, strlen (identity));
    zsocket_connect (worker, "tcp://localhost:5556");

    //  告诉代理worker已就绪
    printf ("I: (%s) worker准备就绪\n", identity);
    zframe_t *frame = zframe_new (LRU_READY, 1);
    zframe_send (&frame, worker, 0);

    int cycles = 0;
    while (1) {
        zmsg_t *msg = zmsg_recv (worker);
        if (!msg)
            break;              //  中断

        //  经过几轮循环后,模拟各种问题
        cycles++;
        if (cycles > 3 && randof (5) == 0) {
            printf ("I: (%s) 模拟崩溃\n", identity);
            zmsg_destroy (&msg);
            break;
        }
        else
        if (cycles > 3 && randof (5) == 0) {
            printf ("I: (%s) 模拟CPU过载\n", identity);
            sleep (3);
            if (zctx_interrupted)
                break;
        }
        printf ("I: (%s) 正常应答\n", identity);
        sleep (1);              //  进行某些处理
        zmsg_send (&msg, worker);
    }
    zctx_destroy (&ctx);
    return 0;
}

运行上述事例,启动多个worker,一个client,以及一个队列装置,顺序随意。你可以看到worker最终都会崩溃或死亡,client则多次重试并最终放弃。装置从来不会停止,你可以任意重启worker和client,这个模型可以和任意个worker、client交互。

健壮的可靠队列(偏执海盗模式)

“简单海盗队列”模式工作得非常好,主要是因为它只是两个现有模式的结合体。不过,它也有一些缺点:

  • 该模式无法处理队列的崩溃或重启。client会进行重试,但worker不会重启。虽然ZMQ会自动重连worker的套接字,但对于新启动的队列装置来说,由于worker并没有发送“已就绪”的消息,所以它相当于是不存在的。为了解决这一问题,我们需要从队列发送心跳给worker,这样worker就能知道队列是否已经死亡。

  • 队列没有检测worker是否已经死亡,所以当worker在处于空闲状态时死亡,队列装置只有在发送了某个请求之后才会将该worker从队列中移除。这时,client什么都不能做,只能等待。这不是一个致命的问题,但是依然是不够好的。所以,我们需要从worker发送心跳给队列装置,从而让队列得知worker什么时候消亡。

我们使用一个名为“偏执的海盗模式”来解决上述两个问题。

之前我们使用REQ套接字作为worker的套接字类型,但在偏执海盗模式中我们会改用DEALER套接字,从而使我们能够任意地发送和接受消息,而不是像REQ套接字那样必须完成发送-接受循环。而DEALER的缺点是我们必须自己管理消息信封。如果你不知道信封是什么,那请阅读第三章。

3

我们仍会使用懒惰海盗模式的client,以下是偏执海盗的队列装置代码:

ppqueue: Paranoid Pirate queue in C

//
//  偏执海盗队列
//
#include "czmq.h"

#define HEARTBEAT_LIVENESS  3       //  心跳健康度,3-5是合理的
#define HEARTBEAT_INTERVAL  1000    //  单位:毫秒

//  偏执海盗协议的消息代码
#define PPP_READY       "\001"      //  worker已就绪
#define PPP_HEARTBEAT   "\002"      //  worker心跳


//  使用以下结构表示worker队列中的一个有效的worker

typedef struct {
    zframe_t *address;          //  worker的地址
    char *identity;             //  可打印的套接字标识
    int64_t expiry;             //  过期时间
} worker_t;

//  创建新的worker
static worker_t *
s_worker_new (zframe_t *address)
{
    worker_t *self = (worker_t *) zmalloc (sizeof (worker_t));
    self->address = address;
    self->identity = zframe_strdup (address);
    self->expiry = zclock_time () + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
    return self;
}

//  销毁worker结构,包括标识
static void
s_worker_destroy (worker_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        worker_t *self = *self_p;
        zframe_destroy (&self->address);
        free (self->identity);
        free (self);
        *self_p = NULL;
    }
}

//  worker已就绪,将其移至列表末尾
static void
s_worker_ready (worker_t *self, zlist_t *workers)
{
    worker_t *worker = (worker_t *) zlist_first (workers);
    while (worker) {
        if (streq (self->identity, worker->identity)) {
            zlist_remove (workers, worker);
            s_worker_destroy (&worker);
            break;
        }
        worker = (worker_t *) zlist_next (workers);
    }
    zlist_append (workers, self);
}

//  返回下一个可用的worker地址
static zframe_t *
s_workers_next (zlist_t *workers)
{
    worker_t *worker = zlist_pop (workers);
    assert (worker);
    zframe_t *frame = worker->address;
    worker->address = NULL;
    s_worker_destroy (&worker);
    return frame;
}

//  寻找并销毁已过期的worker。
//  由于列表中最旧的worker排在最前,所以当找到第一个未过期的worker时就停止。
static void
s_workers_purge (zlist_t *workers)
{
    worker_t *worker = (worker_t *) zlist_first (workers);
    while (worker) {
        if (zclock_time () < worker->expiry)
            break;              //  worker未过期,停止扫描

        zlist_remove (workers, worker);
        s_worker_destroy (&worker);
        worker = (worker_t *) zlist_first (workers);
    }
}


int main (void)
{
    zctx_t *ctx = zctx_new ();
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    void *backend  = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (frontend, "tcp://*:5555");    //  client端点
    zsocket_bind (backend,  "tcp://*:5556");    //  worker端点
    //  List of available workers
    zlist_t *workers = zlist_new ();

    //  规律地发送心跳
    uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;

    while (1) {
        zmq_pollitem_t items [] = {
            { backend,  0, ZMQ_POLLIN, 0 },
            { frontend, 0, ZMQ_POLLIN, 0 }
        };
        //  当存在可用worker时轮询前端端点
        int rc = zmq_poll (items, zlist_size (workers)? 2: 1,
            HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断

        //  处理后端worker请求
        if (items [0].revents & ZMQ_POLLIN) {
            //  使用worker地址进行LRU路由
            zmsg_t *msg = zmsg_recv (backend);
            if (!msg)
                break;          //  中断

            //  worker的任何信号均表示其仍然存活
            zframe_t *address = zmsg_unwrap (msg);
            worker_t *worker = s_worker_new (address);
            s_worker_ready (worker, workers);

            //  处理控制消息,或者将应答转发给client
            if (zmsg_size (msg) == 1) {
                zframe_t *frame = zmsg_first (msg);
                if (memcmp (zframe_data (frame), PPP_READY, 1)
                &&  memcmp (zframe_data (frame), PPP_HEARTBEAT, 1)) {
                    printf ("E: invalid message from worker");
                    zmsg_dump (msg);
                }
                zmsg_destroy (&msg);
            }
            else
                zmsg_send (&msg, frontend);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            //  获取下一个client请求,交给下一个可用的worker
            zmsg_t *msg = zmsg_recv (frontend);
            if (!msg)
                break;          //  中断
            zmsg_push (msg, s_workers_next (workers));
            zmsg_send (&msg, backend);
        }

        //  发送心跳给空闲的worker
        if (zclock_time () >= heartbeat_at) {
            worker_t *worker = (worker_t *) zlist_first (workers);
            while (worker) {
                zframe_send (&worker->address, backend,
                             ZFRAME_REUSE + ZFRAME_MORE);
                zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
                zframe_send (&frame, backend, 0);
                worker = (worker_t *) zlist_next (workers);
            }
            heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
        }
        s_workers_purge (workers);
    }

    //  程序结束后进行清理
    while (zlist_size (workers)) {
        worker_t *worker = (worker_t *) zlist_pop (workers);
        s_worker_destroy (&worker);
    }
    zlist_destroy (&workers);
    zctx_destroy (&ctx);
    return 0;
}

该队列装置使用心跳机制扩展了LRU模式,看起来很简单,但要想出这个主意还挺难的。下文会更多地介绍心跳机制。

以下是偏执海盗的worker代码:

ppworker: Paranoid Pirate worker in C

//
//  偏执海盗worker
//
#include "czmq.h"

#define HEARTBEAT_LIVENESS  3       //  合理值:3-5
#define HEARTBEAT_INTERVAL  1000    //  单位:毫秒
#define INTERVAL_INIT       1000    //  重试间隔
#define INTERVAL_MAX       32000    //  回退算法最大值

//  偏执海盗规范的常量定义
#define PPP_READY       "\001"      //  消息:worker已就绪
#define PPP_HEARTBEAT   "\002"      //  消息:worker心跳

//  返回一个连接至偏执海盗队列装置的套接字

static void *
s_worker_socket (zctx_t *ctx) {
    void *worker = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (worker, "tcp://localhost:5556");

    //  告知队列worker已准备就绪
    printf ("I: worker已就绪\n");
    zframe_t *frame = zframe_new (PPP_READY, 1);
    zframe_send (&frame, worker, 0);

    return worker;
}

int main (void)
{
    zctx_t *ctx = zctx_new ();
    void *worker = s_worker_socket (ctx);

    //  如果心跳健康度为零,则表示队列装置已死亡
    size_t liveness = HEARTBEAT_LIVENESS;
    size_t interval = INTERVAL_INIT;

    //  规律地发送心跳
    uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;

    srandom ((unsigned) time (NULL));
    int cycles = 0;
    while (1) {
        zmq_pollitem_t items [] = { { worker,  0, ZMQ_POLLIN, 0 } };
        int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断

        if (items [0].revents & ZMQ_POLLIN) {
            //  获取消息
            //  - 3段消息,信封+内容,表示一个请求
            //  - 1段消息,表示心跳
            zmsg_t *msg = zmsg_recv (worker);
            if (!msg)
                break;          //  中断

            if (zmsg_size (msg) == 3) {
                //  若干词循环后模拟各种问题
                cycles++;
                if (cycles > 3 && randof (5) == 0) {
                    printf ("I: 模拟崩溃\n");
                    zmsg_destroy (&msg);
                    break;
                }
                else
                if (cycles > 3 && randof (5) == 0) {
                    printf ("I: 模拟CPU过载\n");
                    sleep (3);
                    if (zctx_interrupted)
                        break;
                }
                printf ("I: 正常应答\n");
                zmsg_send (&msg, worker);
                liveness = HEARTBEAT_LIVENESS;
                sleep (1);              //  做一些处理工作
                if (zctx_interrupted)
                    break;
            }
            else
            if (zmsg_size (msg) == 1) {
                zframe_t *frame = zmsg_first (msg);
                if (memcmp (zframe_data (frame), PPP_HEARTBEAT, 1) == 0)
                    liveness = HEARTBEAT_LIVENESS;
                else {
                    printf ("E: 非法消息\n");
                    zmsg_dump (msg);
                }
                zmsg_destroy (&msg);
            }
            else {
                printf ("E: 非法消息\n");
                zmsg_dump (msg);
            }
            interval = INTERVAL_INIT;
        }
        else
        if (--liveness == 0) {
            printf ("W: 心跳失败,无法连接队列装置\n");
            printf ("W: %zd 毫秒后进行重连...\n", interval);
            zclock_sleep (interval);

            if (interval < INTERVAL_MAX)
                interval *= 2;
            zsocket_destroy (ctx, worker);
            worker = s_worker_socket (ctx);
            liveness = HEARTBEAT_LIVENESS;
        }

        //  适时发送心跳给队列
        if (zclock_time () > heartbeat_at) {
            heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
            printf ("I: worker心跳\n");
            zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
            zframe_send (&frame, worker, 0);
        }
    }
    zctx_destroy (&ctx);
    return 0;
}

几点说明:

  • 代码中包含了几处失败模拟,和先前一样。这会让代码极难维护,所以当投入使用时,应当移除这些模拟代码。
  • 偏执海盗模式中队列的心跳有时会不正常,下文会讲述这一点。
  • worker使用了一种类似于懒惰海盗client的重试机制,但有两点不同:1、回退算法设置;2、永不言弃。

尝试运行以下代码,跑通流程:

ppqueue &
for i in 1 2 3 4; do
    ppworker &
    sleep 1
done
lpclient &

你会看到worker逐个崩溃,client在多次尝试后放弃。你可以停止并重启队列装置,client和worker会相继重连,并正确地发送、处理和接收请求,顺序不会混乱。所以说,整个通信过程只有两种情形:交互成功,或client最终放弃。

心跳

当我在写偏执海盗模式的示例时,大约花了五个小时的时间来协调队列至worker的心跳,剩下的请求-应答链路只花了约10分钟的时间。心跳机制在可靠性上带来的益处有时还不及它所引发的问题。使用过程中很有可能会产生“虚假故障”的情况,即节点误认为他们已失去连接,因为心跳没有正确地发送。

在理解和实施心跳时,需要考虑以下几点:

  • 心跳不是一种请求-应答,它们异步地在节点之间传递,任一节点都可以通过它来判断对方已经死亡,并中止通信。

  • 如果某个节点使用持久套接字(即设定了套接字标识),意味着发送给它的心跳可能会堆砌,并在重连后一起收到。所以说,worker不应该使用持久套接字。示例代码使用持久套接字是为了便于调试,而且代码中使用了随机的套接字标识,避免重用之前的标识。

  • 使用过程中,应先让心跳工作起来,再进行后面的消息处理。你需要保证启动任一节点后,心跳都能正确地执行。停止并重启他们,模拟冻结、崩溃等情况来进行测试。

  • 当你的主循环使用了zmq_poll(),则应该使用另一个计时器来触发心跳。不要使用主循环来控制心跳的发送,这回导致过量地发送心跳(阻塞网络),或是发送得太少(导致节点断开)。zhelpers包提供了s_clock()函数返回当前系统时间戳,单位是毫秒,可以用它来控制心跳的发送间隔。C代码如下:

// 规律地发送心跳
uint64_t heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
while (1) {
    …
    zmq_poll (items, 1, HEARTBEAT_INTERVAL * 1000);
    …
    // 无论zmq_poll的行为是什么,都使用以下逻辑判断是否发送心跳
    if (s_clock () > heartbeat_at) {
        … 发送心跳给所有节点
        // 设置下一次心跳的时间
        heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
    }
}
  • 主循环应该使用心跳间隔作为超时时间。显然不能使用无超时时间的设置,而短于心跳间隔也只是浪费循环次数而已。

  • 使用简单的追踪方式来进行追踪,如直接输出至控制台。这里有一些追踪的窍门:使用zmsg()函数打印套接字内容;对消息进行编号,判断是否会有间隔。

  • 在真实的应用程序中,心跳必须是可以配置的,并能和节点共同商定。有些节点需要高频心跳,如10毫秒,另一些节点则可能只需要30秒发送一次心跳即可。

  • 如果你要对不同的节点发送不同频率的心跳,那么poll的超时时间应设置为最短的心跳间隔。

  • 也许你会想要用一个单独的套接字来处理心跳,这看起来很棒,可以将同步的请求-应答和异步的心跳隔离开来。但是,这个主意并不好,原因有几点:首先、发送数据时其实是不需要发送心跳的;其次、套接字可能会因为网络问题而阻塞,你需要设法知道用于发送数据的套接字停止响应的原因是死亡了还是过于繁忙而已,这样你就需要对这个套接字进行心跳。最后,处理两个套接字要比处理一个复杂得多。

  • 我们没有设置client至队列的心跳,因为这太过复杂了,而且没有太大价值。

约定和协议

也许你已经注意到,由于心跳机制,偏执海盗模式和简单海盗模式是不兼容的。

其实,这里我们需要写一个协议。也许在试验阶段是不需要协议的,但这在真实的应用程序中是有必要。如果我们想用其他语言来写worker怎么办?我们是否需要通过源代码来查看通信过程?如果我们想改变协议怎么办?规范可能很简单,但并不显然。越是成功的协议,就会越为复杂。

一个缺乏约定的应用程序一定是不可复用的,所以让我们来为这个协议写一个规范,怎么做呢?

  • 位于rfc.zeromq.org的wiki页上,我们特地设置了一个用于存放ZMQ协议的页面。
  • 要创建一个新的协议,你需要注册并按照指导进行。过程很直接,但并不一定所有人都能撰写技术性文档。

我大约花了15分钟的时间草拟海盗模式规范(PPP),麻雀虽小,但五脏俱全。

要用PPP协议进行真实环境下的编程,你还需要:

  • 在READY命令中加入版本号,这样就能再日后安全地新增PPP版本号。
  • 目前,READY和HEARTBEAT信号并没有指定其来源于请求还是应答。要区分他们,需要新建一个消息结构,其中包含“消息类型”这一信息。

面向服务的可靠队列(管家模式)

世上的事物往往瞬息万变,正当我们期待有更好的协议来解决上一节的问题时,已经有人制定好了:

  • http://rfc.zeromq.org/spec:7

这份协议只有一页,它将PPP协议变得更为坚固。我们在设计复杂架构时应该这样做:首先写下约定,再用软件去实现它。

管家模式协议(MDP)在扩展PPP协议时引入了一个有趣的特性:client发送的每一个请求都有一个“服务名称”,而worker在像队列装置注册时需要告知自己的服务类型。MDP的优势在于它来源于现实编程,协议简单,且容易提升。

引入“服务名称”的机制,是对偏执海盗队列的一个简单补充,而结果是让其成为一个面向服务的代理。

4

在实施管家模式之前,我们需要为client和worker编写一个框架。如果程序员可以通过简单的API来实现这种模式,那就没有必要让他们去了解管家模式的协议内容和实现方法了。 所以,我们第一个协议(即管家模式协议)定义了分布式架构中节点是如何互相交互的,第二个协议则要定义应用程序应该如何通过框架来使用这一协议。 管家模式有两个端点,客户端和服务端。因为我们要为client和worker都撰写框架,所以就需要提供两套API。以下是用简单的面向对象方法设计的client端API雏形,使用的是C语言的ZFL library

mdcli_t *mdcli_new     (char *broker);
void     mdcli_destroy (mdcli_t **self_p);
zmsg_t  *mdcli_send    (mdcli_t *self, char *service, zmsg_t **request_p);

就这么简单。我们创建了一个会话来和代理通信,发送并接收一个请求,最后关闭连接。以下是worker端API的雏形。

mdwrk_t *mdwrk_new     (char *broker,char *service);
void     mdwrk_destroy (mdwrk_t **self_p);
zmsg_t  *mdwrk_recv    (mdwrk_t *self, zmsg_t *reply);

上面两段代码看起来差不多,但是worker端API略有不同。worker第一次执行recv()后会传递一个空的应答,之后才传递当前的应答,并获得新的请求。

两段的API都很容易开发,只需在偏执海盗模式代码的基础上修改即可。以下是client API:

mdcliapi: Majordomo client API in C

/*  =====================================================================
    mdcliapi.c

    Majordomo Protocol Client API
    Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.

    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.

    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org

    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.

    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/

#include "mdcliapi.h"

//  类结构
//  我们会通过成员方法来访问这些属性

struct _mdcli_t {
    zctx_t *ctx;                //  上下文
    char *broker;
    void *client;               //  连接至代理的套接字
    int verbose;                //  使用标准输出打印当前活动
    int timeout;                //  请求超时时间
    int retries;                //  请求重试次数
};


//  ---------------------------------------------------------------------
//  连接或重连代理

void s_mdcli_connect_to_broker (mdcli_t *self)
{
    if (self->client)
        zsocket_destroy (self->ctx, self->client);
    self->client = zsocket_new (self->ctx, ZMQ_REQ);
    zmq_connect (self->client, self->broker);
    if (self->verbose)
        zclock_log ("I: 正在连接至代理 %s...", self->broker);
}


//  ---------------------------------------------------------------------
//  构造函数

mdcli_t *
mdcli_new (char *broker, int verbose)
{
    assert (broker);

    mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
    self->ctx = zctx_new ();
    self->broker = strdup (broker);
    self->verbose = verbose;
    self->timeout = 2500;           //  毫秒
    self->retries = 3;              //  尝试次数

    s_mdcli_connect_to_broker (self);
    return self;
}


//  ---------------------------------------------------------------------
//  析构函数

void
mdcli_destroy (mdcli_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        mdcli_t *self = *self_p;
        zctx_destroy (&self->ctx);
        free (self->broker);
        free (self);
        *self_p = NULL;
    }
}


//  ---------------------------------------------------------------------
//  设定请求超时时间

void
mdcli_set_timeout (mdcli_t *self, int timeout)
{
    assert (self);
    self->timeout = timeout;
}


//  ---------------------------------------------------------------------
//  设定请求重试次数

void
mdcli_set_retries (mdcli_t *self, int retries)
{
    assert (self);
    self->retries = retries;
}


//  ---------------------------------------------------------------------
//  向代理发送请求,并尝试获取应答;
//  对消息保持所有权,发送后销毁;
//  返回应答消息,或NULL。

zmsg_t *
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
{
    assert (self);
    assert (request_p);
    zmsg_t *request = *request_p;

    //  用协议前缀包装消息
    //  Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
    //  Frame 2: 服务名称 (可打印字符串)
    zmsg_pushstr (request, service);
    zmsg_pushstr (request, MDPC_CLIENT);
    if (self->verbose) {
        zclock_log ("I: 发送请求给 '%s' 服务:", service);
        zmsg_dump (request);
    }

    int retries_left = self->retries;
    while (retries_left && !zctx_interrupted) {
        zmsg_t *msg = zmsg_dup (request);
        zmsg_send (&msg, self->client);

        while (TRUE) {
            //  轮询套接字以接收应答,有超时时间
            zmq_pollitem_t items [] = {
                { self->client, 0, ZMQ_POLLIN, 0 } };
            int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
            if (rc == -1)
                break;          //  中断

            //  收到应答后进行处理
            if (items [0].revents & ZMQ_POLLIN) {
                zmsg_t *msg = zmsg_recv (self->client);
                if (self->verbose) {
                    zclock_log ("I: received reply:");
                    zmsg_dump (msg);
                }
                //  不要尝试处理错误,直接报错即可
                assert (zmsg_size (msg) >= 3);

                zframe_t *header = zmsg_pop (msg);
                assert (zframe_streq (header, MDPC_CLIENT));
                zframe_destroy (&header);

                zframe_t *reply_service = zmsg_pop (msg);
                assert (zframe_streq (reply_service, service));
                zframe_destroy (&reply_service);

                zmsg_destroy (&request);
                return msg;     //  成功
            }
            else
            if (--retries_left) {
                if (self->verbose)
                    zclock_log ("W: no reply, reconnecting...");
                //  重连并重发消息
                s_mdcli_connect_to_broker (self);
                zmsg_t *msg = zmsg_dup (request);
                zmsg_send (&msg, self->client);
            }
            else {
                if (self->verbose)
                    zclock_log ("W: 发生严重错误,放弃重试。");
                break;          //  放弃
            }
        }
    }
    if (zctx_interrupted)
        printf ("W: 收到中断消息,结束client进程...\n");
    zmsg_destroy (&request);
    return NULL;
}

以下测试程序会执行10万次请求应答:

mdclient: Majordomo client application in C

//
//  管家模式协议 - 客户端示例
//  使用mdcli API隐藏管家模式协议的内部实现
//

//  让我们直接编译这段代码,不生成类库
#include "mdcliapi.c"

int main (int argc, char *argv [])
{
    int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);

    int count;
    for (count = 0; count < 100000; count++) {
        zmsg_t *request = zmsg_new ();
        zmsg_pushstr (request, "Hello world");
        zmsg_t *reply = mdcli_send (session, "echo", &request);
        if (reply)
            zmsg_destroy (&reply);
        else
            break;              //  中断或停止
    }
    printf ("已处理 %d 次请求-应答\n", count);
    mdcli_destroy (&session);
    return 0;
}

下面是worker的API:

mdwrkapi: Majordomo worker API in C

/*  =====================================================================
    mdwrkapi.c

    Majordomo Protocol Worker API
    Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.

    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.

    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org

    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.

    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/

#include "mdwrkapi.h"

//  可靠性参数
#define HEARTBEAT_LIVENESS  3       //  合理值:3-5

//  类结构
//  使用成员函数访问属性

struct _mdwrk_t {
    zctx_t *ctx;                //  上下文
    char *broker;
    char *service;
    void *worker;               //  连接至代理的套接字
    int verbose;                //  使用标准输出打印活动

    //  心跳设置
    uint64_t heartbeat_at;      //  发送心跳的时间
    size_t liveness;            //  尝试次数
    int heartbeat;              //  心跳延时,单位:毫秒
    int reconnect;              //  重连延时,单位:毫秒

    //  内部状态
    int expect_reply;           //  初始值为0

    //  应答地址,如果存在的话
    zframe_t *reply_to;
};


//  ---------------------------------------------------------------------
//  发送消息给代理
//  如果没有提供消息,则内部创建一个

static void
s_mdwrk_send_to_broker (mdwrk_t *self, char *command, char *option,
                        zmsg_t *msg)
{
    msg = msg? zmsg_dup (msg): zmsg_new ();

    //  将协议信封压入消息顶部
    if (option)
        zmsg_pushstr (msg, option);
    zmsg_pushstr (msg, command);
    zmsg_pushstr (msg, MDPW_WORKER);
    zmsg_pushstr (msg, "");

    if (self->verbose) {
        zclock_log ("I: sending %s to broker",
            mdps_commands [(int) *command]);
        zmsg_dump (msg);
    }
    zmsg_send (&msg, self->worker);
}


//  ---------------------------------------------------------------------
//  连接或重连代理

void s_mdwrk_connect_to_broker (mdwrk_t *self)
{
    if (self->worker)
        zsocket_destroy (self->ctx, self->worker);
    self->worker = zsocket_new (self->ctx, ZMQ_DEALER);
    zmq_connect (self->worker, self->broker);
    if (self->verbose)
        zclock_log ("I: 正在连接代理 %s...", self->broker);

    //  向代理注册服务类型
    s_mdwrk_send_to_broker (self, MDPW_READY, self->service, NULL);

    //  当心跳健康度为零,表示代理已断开连接
    self->liveness = HEARTBEAT_LIVENESS;
    self->heartbeat_at = zclock_time () + self->heartbeat;
}


//  ---------------------------------------------------------------------
//  构造函数

mdwrk_t *
mdwrk_new (char *broker,char *service, int verbose)
{
    assert (broker);
    assert (service);

    mdwrk_t *self = (mdwrk_t *) zmalloc (sizeof (mdwrk_t));
    self->ctx = zctx_new ();
    self->broker = strdup (broker);
    self->service = strdup (service);
    self->verbose = verbose;
    self->heartbeat = 2500;     //  毫秒
    self->reconnect = 2500;     //  毫秒

    s_mdwrk_connect_to_broker (self);
    return self;
}


//  ---------------------------------------------------------------------
//  析构函数

void
mdwrk_destroy (mdwrk_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        mdwrk_t *self = *self_p;
        zctx_destroy (&self->ctx);
        free (self->broker);
        free (self->service);
        free (self);
        *self_p = NULL;
    }
}


//  ---------------------------------------------------------------------
//  设置心跳延迟

void
mdwrk_set_heartbeat (mdwrk_t *self, int heartbeat)
{
    self->heartbeat = heartbeat;
}


//  ---------------------------------------------------------------------
//  设置重连延迟

void
mdwrk_set_reconnect (mdwrk_t *self, int reconnect)
{
    self->reconnect = reconnect;
}


//  ---------------------------------------------------------------------
//  若有应答则发送给代理,并等待新的请求

zmsg_t *
mdwrk_recv (mdwrk_t *self, zmsg_t **reply_p)
{
    //  格式化并发送请求传入的应答
    assert (reply_p);
    zmsg_t *reply = *reply_p;
    assert (reply || !self->expect_reply);
    if (reply) {
        assert (self->reply_to);
        zmsg_wrap (reply, self->reply_to);
        s_mdwrk_send_to_broker (self, MDPW_REPLY, NULL, reply);
        zmsg_destroy (reply_p);
    }
    self->expect_reply = 1;

    while (TRUE) {
        zmq_pollitem_t items [] = {
            { self->worker,  0, ZMQ_POLLIN, 0 } };
        int rc = zmq_poll (items, 1, self->heartbeat * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断

        if (items [0].revents & ZMQ_POLLIN) {
            zmsg_t *msg = zmsg_recv (self->worker);
            if (!msg)
                break;          //  中断
            if (self->verbose) {
                zclock_log ("I: 从代理处获得消息:");
                zmsg_dump (msg);
            }
            self->liveness = HEARTBEAT_LIVENESS;

            //  不要处理错误,直接报错即可
            assert (zmsg_size (msg) >= 3);

            zframe_t *empty = zmsg_pop (msg);
            assert (zframe_streq (empty, ""));
            zframe_destroy (&empty);

            zframe_t *header = zmsg_pop (msg);
            assert (zframe_streq (header, MDPW_WORKER));
            zframe_destroy (&header);

            zframe_t *command = zmsg_pop (msg);
            if (zframe_streq (command, MDPW_REQUEST)) {
                //  这里需要将消息中空帧之前的所有地址都保存起来,
                //  但在这里我们暂时只保存一个
                self->reply_to = zmsg_unwrap (msg);
                zframe_destroy (&command);
                return msg;     //  处理请求
            }
            else
            if (zframe_streq (command, MDPW_HEARTBEAT))
                ;               //  不对心跳做任何处理
            else
            if (zframe_streq (command, MDPW_DISCONNECT))
                s_mdwrk_connect_to_broker (self);
            else {
                zclock_log ("E: 消息不合法");
                zmsg_dump (msg);
            }
            zframe_destroy (&command);
            zmsg_destroy (&msg);
        }
        else
        if (--self->liveness == 0) {
            if (self->verbose)
                zclock_log ("W: 失去与代理的连接 - 正在重试...");
            zclock_sleep (self->reconnect);
            s_mdwrk_connect_to_broker (self);
        }
        //  适时地发送消息
        if (zclock_time () > self->heartbeat_at) {
            s_mdwrk_send_to_broker (self, MDPW_HEARTBEAT, NULL, NULL);
            self->heartbeat_at = zclock_time () + self->heartbeat;
        }
    }
    if (zctx_interrupted)
        printf ("W: 收到中断消息,中止worker...\n");
    return NULL;
}

以下测试程序实现了名为echo的服务:

mdworker: Majordomo worker application in C

//
//  管家模式协议 - worker示例
//  使用mdwrk API隐藏MDP协议的内部实现
//

//  让我们直接编译代码,而不创建类库
#include "mdwrkapi.c"

int main (int argc, char *argv [])
{
    int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdwrk_t *session = mdwrk_new (
        "tcp://localhost:5555", "echo", verbose);

    zmsg_t *reply = NULL;
    while (1) {
        zmsg_t *request = mdwrk_recv (session, &reply);
        if (request == NULL)
            break;              //  worker被中止
        reply = request;        //  echo服务……其实很复杂:)
    }
    mdwrk_destroy (&session);
    return 0;
}

几点说明:

  • API是单线程的,所以说worker不会再后台发送心跳,而这也是我们所期望的:如果worker应用程序停止了,心跳就会跟着中止,代理便会停止向该worker发送新的请求。

  • wroker API没有做回退算法的设置,因为这里不值得使用这一复杂的机制。

  • API没有提供任何报错机制,如果出现问题,它会直接报断言(或异常,依语言而定)。这一做法对实验性的编程是有用的,这样可以立刻看到执行结果。但在真实编程环境中,API应该足够健壮,合适地处理非法消息。

也许你会问,worker API为什么要关闭它的套接字并新开一个呢?特别是ZMQ是有重连机制的,能够在节点归来后进行重连。我们可以回顾一下简单海盗模式中的worker,以及偏执海盗模式中的worker来加以理解。ZMQ确实会进行自动重连,但如果代理死亡并重连,worker并不会重新进行注册。这个问题有两种解决方案:一是我们这里用到的较为简便的方案,即当worker判断代理已经死亡时,关闭它的套接字并重头来过;另一个方案是当代理收到未知worker的心跳时要求该worker对其提供的服务类型进行注册,这样一来就需要在协议中说明这一规则。

下面让我们设计管家模式的代理,它的核心代码是一组队列,每种服务对应一个队列。我们会在worker出现时创建相应的队列(worker消失时应该销毁对应的队列,不过我们这里暂时不考虑)。额外的,我们会为每种服务维护一个worker的队列。

为了让C语言代码更为易读易写,我使用了ZFL项目提供的哈希和链表容器,并命名为[zhash](https://github.com/imatix/zguide/blob/master/examples/C/zhash.h zhash)和zlist。如果使用现代语言编写,那自然可以使用其内置的容器。

mdbroker: Majordomo broker in C


//
//  管家模式协议 - 代理
//  协议 http://rfc.zeromq.org/spec:7 和 spec:8 的最简实现
//
#include "czmq.h"
#include "mdp.h"

//  一般我们会从配置文件中获取以下值

#define HEARTBEAT_LIVENESS  3       //  合理值:3-5
#define HEARTBEAT_INTERVAL  2500    //  单位:毫秒
#define HEARTBEAT_EXPIRY    HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS

//  定义一个代理
typedef struct {
    zctx_t *ctx;                //  上下文
    void *socket;               //  用于连接client和worker的套接字
    int verbose;                //  使用标准输出打印活动信息
    char *endpoint;             //  代理绑定到的端点
    zhash_t *services;          //  已知服务的哈希表
    zhash_t *workers;           //  已知worker的哈希表
    zlist_t *waiting;           //  正在等待的worker队列
    uint64_t heartbeat_at;      //  发送心跳的时间
} broker_t;

//  定义一个服务
typedef struct {
    char *name;                 //  服务名称
    zlist_t *requests;          //  客户端请求队列
    zlist_t *waiting;           //  正在等待的worker队列
    size_t workers;             //  可用worker数
} service_t;

//  定义一个worker,状态为空闲或占用
typedef struct {
    char *identity;             //  worker的标识
    zframe_t *address;          //  地址帧
    service_t *service;         //  所属服务
    int64_t expiry;             //  过期时间,从未收到心跳起计时
} worker_t;


//  ---------------------------------------------------------------------
//  代理使用的函数
static broker_t *
    s_broker_new (int verbose);
static void
    s_broker_destroy (broker_t **self_p);
static void
    s_broker_bind (broker_t *self, char *endpoint);
static void
    s_broker_purge_workers (broker_t *self);

//  服务使用的函数
static service_t *
    s_service_require (broker_t *self, zframe_t *service_frame);
static void
    s_service_destroy (void *argument);
static void
    s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg);
static void
    s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg);

//  worker使用的函数
static worker_t *
    s_worker_require (broker_t *self, zframe_t *address);
static void
    s_worker_delete (broker_t *self, worker_t *worker, int disconnect);
static void
    s_worker_destroy (void *argument);
static void
    s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
static void
    s_worker_send (broker_t *self, worker_t *worker, char *command,
                   char *option, zmsg_t *msg);
static void
    s_worker_waiting (broker_t *self, worker_t *worker);

//  客户端使用的函数
static void
    s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg);


//  ---------------------------------------------------------------------
//  主程序

int main (int argc, char *argv [])
{
    int verbose = (argc > 1 && streq (argv [1], "-v"));

    broker_t *self = s_broker_new (verbose);
    s_broker_bind (self, "tcp://*:5555");

    //  接受并处理消息,直至程序被中止
    while (TRUE) {
        zmq_pollitem_t items [] = {
            { self->socket,  0, ZMQ_POLLIN, 0 } };
        int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断

        //  Process next input message, if any
        if (items [0].revents & ZMQ_POLLIN) {
            zmsg_t *msg = zmsg_recv (self->socket);
            if (!msg)
                break;          //  中断
            if (self->verbose) {
                zclock_log ("I: 收到消息:");
                zmsg_dump (msg);
            }
            zframe_t *sender = zmsg_pop (msg);
            zframe_t *empty  = zmsg_pop (msg);
            zframe_t *header = zmsg_pop (msg);

            if (zframe_streq (header, MDPC_CLIENT))
                s_client_process (self, sender, msg);
            else
            if (zframe_streq (header, MDPW_WORKER))
                s_worker_process (self, sender, msg);
            else {
                zclock_log ("E: 非法消息:");
                zmsg_dump (msg);
                zmsg_destroy (&msg);
            }
            zframe_destroy (&sender);
            zframe_destroy (&empty);
            zframe_destroy (&header);
        }
        //  断开并删除过期的worker
        //  适时地发送心跳给worker
        if (zclock_time () > self->heartbeat_at) {
            s_broker_purge_workers (self);
            worker_t *worker = (worker_t *) zlist_first (self->waiting);
            while (worker) {
                s_worker_send (self, worker, MDPW_HEARTBEAT, NULL, NULL);
                worker = (worker_t *) zlist_next (self->waiting);
            }
            self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
        }
    }
    if (zctx_interrupted)
        printf ("W: 收到中断消息,关闭中...\n");

    s_broker_destroy (&self);
    return 0;
}


//  ---------------------------------------------------------------------
//  代理对象的构造函数

static broker_t *
s_broker_new (int verbose)
{
    broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));

    //  初始化代理状态
    self->ctx = zctx_new ();
    self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
    self->verbose = verbose;
    self->services = zhash_new ();
    self->workers = zhash_new ();
    self->waiting = zlist_new ();
    self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
    return self;
}

//  ---------------------------------------------------------------------
//  代理对象的析构函数

static void
s_broker_destroy (broker_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        broker_t *self = *self_p;
        zctx_destroy (&self->ctx);
        zhash_destroy (&self->services);
        zhash_destroy (&self->workers);
        zlist_destroy (&self->waiting);
        free (self);
        *self_p = NULL;
    }
}

//  ---------------------------------------------------------------------
//  将代理套接字绑定至端点,可以重复调用该函数
//  我们使用一个套接字来同时处理client和worker

void
s_broker_bind (broker_t *self, char *endpoint)
{
    zsocket_bind (self->socket, endpoint);
    zclock_log ("I: MDP broker/0.1.1 is active at %s", endpoint);
}

//  ---------------------------------------------------------------------
//  删除空闲状态中过期的worker

static void
s_broker_purge_workers (broker_t *self)
{
    worker_t *worker = (worker_t *) zlist_first (self->waiting);
    while (worker) {
        if (zclock_time () < worker->expiry)
            continue;              //  该worker未过期,停止搜索
        if (self->verbose)
            zclock_log ("I: 正在删除过期的worker: %s",
                worker->identity);

        s_worker_delete (self, worker, 0);
        worker = (worker_t *) zlist_first (self->waiting);
    }
}

//  ---------------------------------------------------------------------
//  定位或创建新的服务项

static service_t *
s_service_require (broker_t *self, zframe_t *service_frame)
{
    assert (service_frame);
    char *name = zframe_strdup (service_frame);

    service_t *service =
        (service_t *) zhash_lookup (self->services, name);
    if (service == NULL) {
        service = (service_t *) zmalloc (sizeof (service_t));
        service->name = name;
        service->requests = zlist_new ();
        service->waiting = zlist_new ();
        zhash_insert (self->services, name, service);
        zhash_freefn (self->services, name, s_service_destroy);
        if (self->verbose)
            zclock_log ("I: 收到消息:");
    }
    else
        free (name);

    return service;
}

//  ---------------------------------------------------------------------
//  当服务从broker->services中移除时销毁该服务对象

static void
s_service_destroy (void *argument)
{
    service_t *service = (service_t *) argument;
    //  销毁请求队列中的所有项目
    while (zlist_size (service->requests)) {
        zmsg_t *msg = zlist_pop (service->requests);
        zmsg_destroy (&msg);
    }
    zlist_destroy (&service->requests);
    zlist_destroy (&service->waiting);
    free (service->name);
    free (service);
}

//  ---------------------------------------------------------------------
//  可能时,分发请求给等待中的worker

static void
s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg)
{
    assert (service);
    if (msg)                    //  将消息加入队列
        zlist_append (service->requests, msg);

    s_broker_purge_workers (self);
    while (zlist_size (service->waiting)
        && zlist_size (service->requests))
    {
        worker_t *worker = zlist_pop (service->waiting);
        zlist_remove (self->waiting, worker);
        zmsg_t *msg = zlist_pop (service->requests);
        s_worker_send (self, worker, MDPW_REQUEST, NULL, msg);
        zmsg_destroy (&msg);
    }
}

//  ---------------------------------------------------------------------
//  使用8/MMI协定处理内部服务

static void
s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg)
{
    char *return_code;
    if (zframe_streq (service_frame, "mmi.service")) {
        char *name = zframe_strdup (zmsg_last (msg));
        service_t *service =
            (service_t *) zhash_lookup (self->services, name);
        return_code = service && service->workers? "200": "404";
        free (name);
    }
    else
        return_code = "501";

    zframe_reset (zmsg_last (msg), return_code, strlen (return_code));

    //  移除并保存返回给client的信封,插入协议头信息和服务名称,并重新包装信封
    zframe_t *client = zmsg_unwrap (msg);
    zmsg_push (msg, zframe_dup (service_frame));
    zmsg_pushstr (msg, MDPC_CLIENT);
    zmsg_wrap (msg, client);
    zmsg_send (&msg, self->socket);
}

//  ---------------------------------------------------------------------
//  按需创建worker

static worker_t *
s_worker_require (broker_t *self, zframe_t *address)
{
    assert (address);

    //  self->workers使用wroker的标识为键
    char *identity = zframe_strhex (address);
    worker_t *worker =
        (worker_t *) zhash_lookup (self->workers, identity);

    if (worker == NULL) {
        worker = (worker_t *) zmalloc (sizeof (worker_t));
        worker->identity = identity;
        worker->address = zframe_dup (address);
        zhash_insert (self->workers, identity, worker);
        zhash_freefn (self->workers, identity, s_worker_destroy);
        if (self->verbose)
            zclock_log ("I: 正在注册新的worker: %s", identity);
    }
    else
        free (identity);
    return worker;
}

//  ---------------------------------------------------------------------
//  从所有数据结构中删除wroker,并销毁worker对象

static void
s_worker_delete (broker_t *self, worker_t *worker, int disconnect)
{
    assert (worker);
    if (disconnect)
        s_worker_send (self, worker, MDPW_DISCONNECT, NULL, NULL);

    if (worker->service) {
        zlist_remove (worker->service->waiting, worker);
        worker->service->workers--;
    }
    zlist_remove (self->waiting, worker);
    //  以下方法间接调用了s_worker_destroy()方法
    zhash_delete (self->

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8