/ 狼烟 / NGINX源码分析:事件模型

NGINX源码分析:事件模型

2016-10-10 posted in [tech]

前言

“nginx [engine x] is an HTTP and reverse proxy server, a mail proxy server, and a generic TCP/UDP proxy server”,这是NGINX官方对其的定义。

很显然的,NGINX更多地扮演着C/S模型中的Server角色,而TCP/IP协议栈中的服务端框架,主要包含两个基础结构:

前者负责服务端的Socket连接服务,后者负责服务端的数据收发服务,服务端收到来自用户的请求数据,经过特定的应用逻辑,最终将生成的响应数据发送回用户端。

NGINX服务端的基础结构,也就是我们所熟知的事件模型,正是为其提供高效端口侦听和数据收发特性而存在的。

事件模型相关的一系列核心概念:

在此不展开讨论,有兴趣的可以自己去探查select/poll/epoll/kqueue…的原理,本文分析的是NGINX事件模型的实现流程。

NGINX的端口侦听

我们从NGINX源码的main函数开始来分析其端口侦听流程:


/* 本文的NGINX源码版本为[branch stable-1.10](https://github.com/nginx/nginx/tree/branches/stable-1.10) */

/* nginx/src/core/nginx.c */

int ngx_cdecl
main(int argc, char *const *argv)
{
    ...

    /* NGINX平滑升级时继承侦听的listen fd */
    if (ngx_add_inherited_sockets(&init_cycle) != NGX_OK) {
        return 1;
    }

    ...

    /* ngx_cycle初始化 */
    cycle = ngx_init_cycle(&init_cycle);
    if (cycle == NULL) {
        if (ngx_test_config) {
            ngx_log_stderr(0, "configuration file %s test failed",
                           init_cycle.conf_file.data);
        }

        return 1;
    }

    ...
}

可以看到,NGINX主函数对于端口侦听相关的函数为”ngx_add_inherited_sockets()”。这个函数有点复杂,不过它的主要作用是NGINX平滑升级(USR2)时用来继承老的master进程侦听的”listen fd”,有兴趣的可以直接参考文章nginx多进程模型之热代码平滑升级,本文还是来关注核心的初始化流程吧。

“ngx_init_cycle()”是NGINX启动流程中的核心函数,NGINX源码中的核心变量”volatile ngx_cycle_t *ngx_cycle;”就是在该函数中完成初始化的。


/* nginx/src/core/nginx_core.h */

struct ngx_cycle_s {
    void                  ****conf_ctx;
    ngx_pool_t               *pool;

    ngx_log_t                *log;
    ngx_log_t                 new_log;

    ngx_uint_t                log_use_stderr;  /* unsigned  log_use_stderr:1; */

    ngx_connection_t        **files;
    ngx_connection_t         *free_connections; /* 空闲连接池 */
    ngx_uint_t                free_connection_n; /* 空闲连接池中连接的数量 */

    ngx_module_t            **modules;
    ngx_uint_t                modules_n;
    ngx_uint_t                modules_used;    /* unsigned  modules_used:1; */

    ngx_queue_t               reusable_connections_queue;

    ngx_array_t               listening; /* 侦听结构数组 */
    ngx_array_t               paths;
    ngx_array_t               config_dump;
    ngx_list_t                open_files;
    ngx_list_t                shared_memory;

    ngx_uint_t                connection_n; /* NGINX的连接总量 */
    ngx_uint_t                files_n;

    ngx_connection_t         *connections; /* NGINX的连接池 */
    ngx_event_t              *read_events;
    ngx_event_t              *write_events;

    ngx_cycle_t              *old_cycle;

    ngx_str_t                 conf_file;
    ngx_str_t                 conf_param;
    ngx_str_t                 conf_prefix;
    ngx_str_t                 prefix;
    ngx_str_t                 lock_file;
    ngx_str_t                 hostname;
};

我们可以从”ngx_cycle”的结构体声明”ngx_cycle_s”中看到,NGINX事件模型相关的核心内容 - 侦听结构的”listening”以及相关的”connections/free_connections”均在此处。

那么,我们继续从”ngx_init_cycle()”分析其源码实现。


/* nginx/src/core/nginx_cycle.c */

ngx_cycle_t *
ngx_init_cycle(ngx_cycle_t *old_cycle)
{
    ...

    /* 创建和初始化侦听结构数组 */
    n = old_cycle->listening.nelts ? old_cycle->listening.nelts : 10;

    cycle->listening.elts = ngx_pcalloc(pool, n * sizeof(ngx_listening_t));
    if (cycle->listening.elts == NULL) {
        ngx_destroy_pool(pool);
        return NULL;
    }

    /* 此时cycle->listening中的元素个数为0 */
    cycle->listening.nelts = 0;
    cycle->listening.size = sizeof(ngx_listening_t);
    cycle->listening.nalloc = n;
    cycle->listening.pool = pool;


    ngx_queue_init(&cycle->reusable_connections_queue);

    ...

    /* NGINX的配置解析流程 */
    if (ngx_conf_parse(&conf, &cycle->conf_file) != NGX_CONF_OK) {
        environ = senv;
        ngx_destroy_cycle_pools(&conf);
        return NULL;
    }

    ...

    /* handle the listening sockets */

    if (old_cycle->listening.nelts) {

    	...

    } else {
        ls = cycle->listening.elts;
        for (i = 0; i < cycle->listening.nelts; i++) {
            ls[i].open = 1;
#if (NGX_HAVE_DEFERRED_ACCEPT && defined SO_ACCEPTFILTER)
            if (ls[i].accept_filter) {
                ls[i].add_deferred = 1;
            }
#endif
#if (NGX_HAVE_DEFERRED_ACCEPT && defined TCP_DEFER_ACCEPT)
            if (ls[i].deferred_accept) {
                ls[i].add_deferred = 1;
            }
#endif
        }
    }

    /* 在此处才真正创建NGINX的侦听套接字 */
    if (ngx_open_listening_sockets(cycle) != NGX_OK) {
        goto failed;
    }

    if (!ngx_test_config) {
        ngx_configure_listening_sockets(cycle);
    }

    ...
}

NGINX在”ngx_init_cycle()”中首先为侦听结构的”listening”数组完成初始化操作,其元素个数”n”在正常启动流程时为10。

我们暂时忽略上面代码段的中间部分”ngx_conf_parse()”,这部分内容是NGINX解析配置文件,从配置文件中读出侦听地址、侦听端口的逻辑,稍后再讲。

注释部分”handle the listening sockets”相关的代码设置了”listening”中的部分字段。

“ngx_open_listening_sockets()”和”ngx_configure_listening_sockets()”是真正的建立Socket的源码实现:


/* nginx/src/core/ngx_connections.c */

ngx_int_t
ngx_open_listening_sockets(ngx_cycle_t *cycle)
{
    ...

    /* 系统调用socket() => bind() => listen()完成套接字的建立 */

    /* for each listening socket */

    ls = cycle->listening.elts;
    for (i = 0; i < cycle->listening.nelts; i++) {

    	...

    	s = ngx_socket(ls[i].sockaddr->sa_family, ls[i].type, 0);

        if (s == (ngx_socket_t) -1) {
            ngx_log_error(NGX_LOG_EMERG, log, ngx_socket_errno,
                          ngx_socket_n " %V failed", &ls[i].addr_text);
            return NGX_ERROR;
        }

        if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
                       (const void *) &reuseaddr, sizeof(int))
            == -1)
        {
            ngx_log_error(NGX_LOG_EMERG, log, ngx_socket_errno,
                          "setsockopt(SO_REUSEADDR) %V failed",
                          &ls[i].addr_text);

            if (ngx_close_socket(s) == -1) {
                ngx_log_error(NGX_LOG_EMERG, log, ngx_socket_errno,
                              ngx_close_socket_n " %V failed",
                              &ls[i].addr_text);
            }

            return NGX_ERROR;
        }

        ...

        if (bind(s, ls[i].sockaddr, ls[i].socklen) == -1) {
            err = ngx_socket_errno;

            if (err != NGX_EADDRINUSE || !ngx_test_config) {
                ngx_log_error(NGX_LOG_EMERG, log, err,
                              "bind() to %V failed", &ls[i].addr_text);
            }

            if (ngx_close_socket(s) == -1) {
                ngx_log_error(NGX_LOG_EMERG, log, ngx_socket_errno,
                              ngx_close_socket_n " %V failed",
                              &ls[i].addr_text);
            }

            if (err != NGX_EADDRINUSE) {
                return NGX_ERROR;
            }

            if (!ngx_test_config) {
                failed = 1;
            }

            continue;
        }

        ...

        if (listen(s, ls[i].backlog) == -1) {
            err = ngx_socket_errno;

            /*
             * on OpenVZ after suspend/resume EADDRINUSE
             * may be returned by listen() instead of bind(), see
             * https://bugzilla.openvz.org/show_bug.cgi?id=2470
             */

            if (err != NGX_EADDRINUSE || !ngx_test_config) {
                ngx_log_error(NGX_LOG_EMERG, log, err,
                              "listen() to %V, backlog %d failed",
                              &ls[i].addr_text, ls[i].backlog);
            }

            if (ngx_close_socket(s) == -1) {
                ngx_log_error(NGX_LOG_EMERG, log, ngx_socket_errno,
                              ngx_close_socket_n " %V failed",
                              &ls[i].addr_text);
            }

            if (err != NGX_EADDRINUSE) {
                return NGX_ERROR;
            }

            if (!ngx_test_config) {
                failed = 1;
            }

                continue;
        }

        ...

    }

    ...

    return NGX_OK;
}

void
ngx_configure_listening_sockets(ngx_cycle_t *cycle)
{
    ...

    /* 系统调用setsockopt()完成套接字的配置 */

    ls = cycle->listening.elts;
    for (i = 0; i < cycle->listening.nelts; i++) {

        ls[i].log = *ls[i].logp;

        if (ls[i].rcvbuf != -1) {
            if (setsockopt(ls[i].fd, SOL_SOCKET, SO_RCVBUF,
                           (const void *) &ls[i].rcvbuf, sizeof(int))
                == -1)
            {
                ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_socket_errno,
                              "setsockopt(SO_RCVBUF, %d) %V failed, ignored",
                              ls[i].rcvbuf, &ls[i].addr_text);
            }
        }

        if (ls[i].sndbuf != -1) {
            if (setsockopt(ls[i].fd, SOL_SOCKET, SO_SNDBUF,
                           (const void *) &ls[i].sndbuf, sizeof(int))
                == -1)
            {
                ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_socket_errno,
                              "setsockopt(SO_SNDBUF, %d) %V failed, ignored",
                              ls[i].sndbuf, &ls[i].addr_text);
            }
        }

        if (ls[i].keepalive) {
            value = (ls[i].keepalive == 1) ? 1 : 0;

            if (setsockopt(ls[i].fd, SOL_SOCKET, SO_KEEPALIVE,
                           (const void *) &value, sizeof(int))
                == -1)
            {
                ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_socket_errno,
                              "setsockopt(SO_KEEPALIVE, %d) %V failed, ignored",
                              value, &ls[i].addr_text);
            }
        }

        ...
    }
    ...
}

可以看到,在该部分源码中,NGINX将遍历”cycle->listening”数组,逐个调用”socket()”, “bind()”和”listen()”完成侦听操作,同时,调用”setsockopt()”函数设置socket选项,比如接收缓冲区、发送缓冲区等。

所以,”ngx_open_listening_sockets()”和”ngx_configure_listening_sockets()”是NGINX侦听结构初始化的流程末尾,在这里,完成了NGINX服务所有套接字Socket的建立和配置,只需要静待新连接的到来,也就是等待”accept()”的循环调用。

那么,我们还缺少了一环,就是每个Socket的地址信息是从何处获取的呢?

我们来分析上文提到的”ngx_conf_parse()”,忽略配置解析的细节,只关注侦听相关部分。

NGINX配置文件中的HTTP BLOCK和SERVER BLOCK定义了HTTP相关的虚拟主机的配置项,在SERVER BLOCK中,配置项”listen”和”server_name”指明了NGINX作为服务端所监听的端口和域名地址。

OK,那么在解析HTTP BLOCK和SERVER BLOCK时NGINX做了些什么呢,我们来分析HTTP模块的核心函数”ngx_http_block()”。


/* nginx/src/http/ngx_http.c */

static char *
ngx_http_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ...

    /* 建立Server信息的哈希表,用于多虚拟主机的分发 */

    /* optimize the lists of ports, addresses and server names */

    if (ngx_http_optimize_servers(cf, cmcf, cmcf->ports) != NGX_OK) {
        return NGX_CONF_ERROR;
    }

    ...
}

具体的配置项解析流程请自行检索,我们默认分析到此处,配置项”listen”和”server_name”相关的端口和域名地址已经被解析和存储于”cmcf->ports”中。

于是,我们拿着已经解析完的域名和端口”cmcf->ports”作为入参,调用”ngx_http_optimize_servers()”。


/* nginx/src/http/ngx_http.c */

static ngx_int_t
ngx_http_optimize_servers(ngx_conf_t *cf, ngx_http_core_main_conf_t *cmcf,
    ngx_array_t *ports)
{
    ngx_uint_t             p, a;
    ngx_http_conf_port_t  *port;
    ngx_http_conf_addr_t  *addr;

    if (ports == NULL) {
        return NGX_OK;
    }

    port = ports->elts;
    for (p = 0; p < ports->nelts; p++) {

        /* 地址排序 */

        ngx_sort(port[p].addrs.elts, (size_t) port[p].addrs.nelts,
                 sizeof(ngx_http_conf_addr_t), ngx_http_cmp_conf_addrs);

        /*
         * check whether all name-based servers have the same
         * configuration as a default server for given address:port
         */

        addr = port[p].addrs.elts;
        for (a = 0; a < port[p].addrs.nelts; a++) {

            if (addr[a].servers.nelts > 1
#if (NGX_PCRE)
                || addr[a].default_server->captures
#endif
               )
            {
            	/* 建立哈希表 */
                if (ngx_http_server_names(cf, cmcf, &addr[a]) != NGX_OK) {
                    return NGX_ERROR;
                }
            }
        }

        /* 将地址信息添加到cycle->listening中,同时设置回调函数 */
        if (ngx_http_init_listening(cf, &port[p]) != NGX_OK) {
            return NGX_ERROR;
        }
    }

    return NGX_OK;
}

“ngx_http_optimize_servers()”的核心工作有三点:


/* nginx/src/http/ngx_http.c */

static ngx_int_t
ngx_http_init_listening(ngx_conf_t *cf, ngx_http_conf_port_t *port)
{
    ...

    addr = port->addrs.elts;
    last = port->addrs.nelts;

    ...

    /* 填充cycle->listening中的元素,实际个数与port列表相关 */

    while (i < last) {

        ...

        ls = ngx_http_add_listening(cf, &addr[i]);

        ...

        addr++;
        last--;
    }

    ...
}

static ngx_listening_t *
ngx_http_add_listening(ngx_conf_t *cf, ngx_http_conf_addr_t *addr)
{
    ...

    /* 创建"ngx_listening_t"结构体实例 */
    ls = ngx_create_listening(cf, &addr->opt.u.sockaddr, addr->opt.socklen);
    if (ls == NULL) {
        return NULL;
    }

    ls->addr_ntop = 1;

    /* 为cycle->listening中的每个元素设置回调函数 */
    ls->handler = ngx_http_init_connection;

    ...
}

如上,”ngx_http_add_listening()”将”listening”的回调函数设置为”ngx_http_init_connection()”。

此时,端口已经处于侦听状态,NGINX只需要等待用户请求的来临,事件模型将继续负责高效地驱动数据的收发循环。

通过源码分析可知,NGINX的端口侦听主要包含如下步骤:

NGINX其实是支持多HTTP BLOCK的配置的,经过测试,多HTTP BLOCK的侦听循环都是正常运行的,它的实现原理其实也蕴含在上面的源码分析中,有兴趣的可以自己思考一下

NGINX的事件模型

NGINX的端口侦听源码,最后将侦听结构的回调函数设置为”ngx_http_init_connection()”,它是NGINX数据收发循环的入口,和事件模型有紧密的关系。

那么,我们将继续分析NGINX事件模型是如何驱动”accept()”循环,以及如何调用”ngx_http_init_connection()”进入数据收发循环。

上文提到过,”ngx_cycle”中除了”listening”成员外,还存在关联的”connections/free_connections”,它们是NGINX中的”连接”,每个连接均对应着TCP/IP协议中的一次”accept()”,也对应着事件模型中的一个读事件和写事件。而读写事件也是”ngx_cycle”的成员,对应的字段为”read_events”和”write_events”。

那么,”ngx_cycle”的这几个成员 - “listening”、”connections/free_connections”和”read_events/write_events”是如何关联起来的呢,我们继续通过源码分析其实现。

NGINX的main函数在启动流程的末尾阶段,将逐个调用每个模块的”init process”回调函数,在NGINX的事件模块”ngx_event_core_module”源码中,”init process”回调函数被设置为”ngx_event_process_init()”,我们来分析其实现源码。


/* nginx/src/event/ngx_event.c */

static ngx_int_t
ngx_event_process_init(ngx_cycle_t *cycle)
{
    ...

    for (m = 0; cycle->modules[m]; m++) {
        if (cycle->modules[m]->type != NGX_EVENT_MODULE) {
            continue;
        }

        if (cycle->modules[m]->ctx_index != ecf->use) {
            continue;
        }

        module = cycle->modules[m]->ctx;

        /* 调用NGX_EVENT_MODULE模块的事件初始化函数 */

        if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) {
            /* fatal */
            exit(2);
        }

        break;
    }

    ...
}

“ngx_event_process_init()”的核心代码很多,我们逐段分析。

可以看到,如上代码段将循环遍历类型为NGX_EVENT_MODULE的模块。

“ngx_select_module”、”ngx_poll_module”、”ngx_epoll_module”、”ngx_kqueue_module”等等,都是NGX_EVENT_MODULE类型的模块。

在不同的平台上,select/poll/epoll/kqueue只有一种会生效,本文只讨论epoll。

也就是说,”ngx_event_process_init()”在循环遍历后,”ngx_epoll_module”的”actions.init”函数将被调用。

“ngx_epoll_module”的”actions.init”在模块定义时被设置为”ngx_epoll_init()”。


/* nginx/src/event/modules/ngx_epoll_module.c */

static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
    ...

    /* 创建epoll对象 */

    ep = epoll_create(cycle->connection_n / 2);

    if (ep == -1) {
        ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                      "epoll_create() failed");
        return NGX_ERROR;
    }

    ...
}

有空再分析epoll的源码实现,这里,我们只关心epoll的系统调用”epoll_create()”、”epoll_ctl()”和”epoll_wait()”:

所以,”ngx_epoll_init()”创建了一个epoll对象,它的参数”cycle->connection_n / 2”暂时也没什么意义,因为现版本的epoll实现其实也没用上该参数。

我们继续分析”ngx_event_process_init()”的实现:


/* nginx/src/event/modules/ngx_epoll_module.c */

static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)

{
    ...

    /* 创建连接池、读事件、写事件 */

    cycle->connections =
        ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log);
    if (cycle->connections == NULL) {
        return NGX_ERROR;
    }

    c = cycle->connections;

    cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
                                   cycle->log);
    if (cycle->read_events == NULL) {
        return NGX_ERROR;
    }

    rev = cycle->read_events;
    for (i = 0; i < cycle->connection_n; i++) {
        rev[i].closed = 1;
        rev[i].instance = 1;
    }

    cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
                                    cycle->log);
    if (cycle->write_events == NULL) {
        return NGX_ERROR;
    }

    wev = cycle->write_events;
    for (i = 0; i < cycle->connection_n; i++) {
        wev[i].closed = 1;
    }

    i = cycle->connection_n;
    next = NULL;

    /* 每个连接都实际对应一个读写事件,连接池的本质是链表 */

    do {
        i--;

        c[i].data = next;
        c[i].read = &cycle->read_events[i];
        c[i].write = &cycle->write_events[i];
        c[i].fd = (ngx_socket_t) -1;

        next = &c[i];
    } while (i);

    cycle->free_connections = next;
    cycle->free_connection_n = cycle->connection_n;

    /* 遍历cycle->listening,为每一个侦听结构分配一个空闲连接,所以,实际用来等待accpet()的连接其实就那么几个,剩下的都用来收发循环,后面还会提到 */

    /* for each listening socket */

    ls = cycle->listening.elts;
    for (i = 0; i < cycle->listening.nelts; i++) {

#if (NGX_HAVE_REUSEPORT)
        if (ls[i].reuseport && ls[i].worker != ngx_worker) {
            continue;
        }
#endif

        c = ngx_get_connection(ls[i].fd, cycle->log);

        if (c == NULL) {
            return NGX_ERROR;
        }

        c->type = ls[i].type;
        c->log = &ls[i].log;

        c->listening = &ls[i];
        ls[i].connection = c;

        rev = c->read;

        rev->log = c->log;
        rev->accept = 1;

#if (NGX_HAVE_DEFERRED_ACCEPT)
        rev->deferred_accept = ls[i].deferred_accept;
#endif

        if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {
            if (ls[i].previous) {

                /*
                 * delete the old accept events that were bound to
                 * the old cycle read events array
                 */

                old = ls[i].previous->connection;

                if (ngx_del_event(old->read, NGX_READ_EVENT, NGX_CLOSE_EVENT)
                    == NGX_ERROR)
                {
                    return NGX_ERROR;
                }

                old->fd = (ngx_socket_t) -1;
            }
        }

        ...

        /* 设置读事件的回调为ngx_event_accept */

        rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept
                                                : ngx_event_recvmsg;

        if (ngx_use_accept_mutex
#if (NGX_HAVE_REUSEPORT)
            && !ls[i].reuseport
#endif
           )
        {
            continue;
        }

        if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
            return NGX_ERROR;
        }

#endif

    }
    ...
}

这段代码很清晰地将”listening”、”connections”和”read_events/write_events”关联起来。

首先,NGINX创建了”connection_n”个连接,”connection_n”可以利用配置文件的”worker_connections”参数来配置,默认为1024。

其次,NGINX创建了”connection_n”个读写事件,在do循环里将读写事件添加为”ngx_connection_t”的成员,同时,利用”data”字段将所有的连接链接成一个链表,cycle->free_connections指向第一个空闲的连接。这种双链表的小对象内存池方案很常见,Linux内核和CPython的实现都在大量使用。

再次,NGINX将遍历数组”cycle->listening”,并取出对应个数的空闲连接,将侦听套接字关联到这些连接中。这样,NGINX的事件模型终于将套接字、连接和事件全部关联在一个结构中。

最后,NGINX调用”ngx_add_event()” => “ngx_event_actions.add” => “ngx_epoll_add_event()” => “epoll_ctl()”将读事件添加到epoll中,并且设置读事件的回调函数为”ngx_event_accept()”。

至此,NGINX的事件模型已经完成了整体的初始化流程,后续只需要等待epoll唤醒读事件来接收来自客户端的连接请求,我们将在”NGINX的事件驱动”中继续分析请求/响应流程。

NGINX的事件驱动循环

已经分析完毕NGINX的端口侦听和事件模型的初始化流程,现在我们继续分析NGINX的事件是如何驱动侦听循环和收发循环的。

NGINX的worker进程在启动完毕后,核心流程就是一个for循环,如下源码所示:


/* nginx/src/os/unix/ngx_process_cycle.c */

static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
    ngx_int_t worker = (intptr_t) data;

    ngx_process = NGX_PROCESS_WORKER;
    ngx_worker = worker;

    ngx_worker_process_init(cycle, worker);

    ngx_setproctitle("worker process");

    /* ngx_exiting、ngx_terminate、ngx_quit、ngx_reopen,均为启停信号 */

    for ( ;; ) {

        if (ngx_exiting) {
            ngx_event_cancel_timers();

            if (ngx_event_timer_rbtree.root == ngx_event_timer_rbtree.sentinel)
            {
                ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");

                ngx_worker_process_exit(cycle);
            }
        }

        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");

        /* work进程的工作循环核心函数 */

        ngx_process_events_and_timers(cycle);

        if (ngx_terminate) {
            ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");

            ngx_worker_process_exit(cycle);
        }

        if (ngx_quit) {
            ngx_quit = 0;
            ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
                          "gracefully shutting down");
            ngx_setproctitle("worker process is shutting down");

            if (!ngx_exiting) {
                ngx_exiting = 1;
                ngx_close_listening_sockets(cycle);
                ngx_close_idle_connections(cycle);
            }
        }

        if (ngx_reopen) {
            ngx_reopen = 0;
            ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
            ngx_reopen_files(cycle, -1);
        }
    }
}

可以看到,worker进程的核心工作内容除了等待启停信号,只剩下”ngx_process_events_and_timers()”函数,它就是我们关注的事件驱动循环。

我们继续分析:


/* nginx/src/event/ngx_event.c */

void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    ...

    /* NGINX的惊群问题 */

    if (ngx_use_accept_mutex) {
        if (ngx_accept_disabled > 0) {
            ngx_accept_disabled--;

        } else {
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                return;
            }

            if (ngx_accept_mutex_held) {
                flags |= NGX_POST_EVENTS;

            } else {
                if (timer == NGX_TIMER_INFINITE
                    || timer > ngx_accept_mutex_delay)
                {
                    timer = ngx_accept_mutex_delay;
                }
            }
        }
    }

    delta = ngx_current_msec;

    /* 调用epoll等事件模块的事件处理函数 */

    (void) ngx_process_events(cycle, timer, flags);

    delta = ngx_current_msec - delta;

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "timer delta: %M", delta);

    /* accept队列仍有唤醒的事件,继续接收 */

    ngx_event_process_posted(cycle, &ngx_posted_accept_events);

    if (ngx_accept_mutex_held) {
        ngx_shmtx_unlock(&ngx_accept_mutex);
    }

    if (delta) {
        ngx_event_expire_timers();
    }

    /* 仍有延迟的其它事件,继续处理 */
    ngx_event_process_posted(cycle, &ngx_posted_events);
}

NGINX的惊群问题请参考下面的文章,本文不展开讨论:

“ngx_process_events()”是NGINX worker进程的核心工作,它负责驱动侦听结构相关的事件循环,是侦听循环的入口。

我们从源码定位它的真正调用位置:


/* nginx/src/event/ngx_event.h */

#define ngx_process_events   ngx_event_actions.process_events

/* nginx/src/modules/ngx_epoll_module.c */

static ngx_int_t
ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer)
{
    ...

    ngx_event_actions = ngx_epoll_module_ctx.actions;

    ...
}

“ngx_process_events()”在NGINX中被定义为”ngx_epoll_module_ctx.actions”,和上文的”actions.init”相似的,我们可以定位到”ngx_epoll_process_events()”函数:


/* nginx/src/modules/ngx_epoll_module.c */

static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
    ...

    /* epoll_wait返回唤醒事件个数,event_list保存唤醒的事件 */

    events = epoll_wait(ep, event_list, (int) nevents, timer);

    ...

    for (i = 0; i < events; i++) {
        c = event_list[i].data.ptr;

        instance = (uintptr_t) c & 1;
        c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

        /* 取得唤醒事件对应的连接和读写事件 */

        rev = c->read;

        ...

        if ((revents & EPOLLIN) && rev->active) {

#if (NGX_HAVE_EPOLLRDHUP)
            if (revents & EPOLLRDHUP) {
                rev->pending_eof = 1;
            }
#endif

            rev->ready = 1;

            if (flags & NGX_POST_EVENTS) {
                queue = rev->accept ? &ngx_posted_accept_events
                                    : &ngx_posted_events;

                ngx_post_event(rev, queue);

            } else {

                /* 读事件的handler为ngx_event_accept() */
                rev->handler(rev);
            }
        }

        ...

        wev = c->write;

        if ((revents & EPOLLOUT) && wev->active) {

            if (c->fd == -1 || wev->instance != instance) {

                /*
                 * the stale event from a file descriptor
                 * that was just closed in this iteration
                 */

                ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                               "epoll: stale event %p", c);
                continue;
            }

            wev->ready = 1;
#if (NGX_THREADS)
            wev->complete = 1;
#endif

            if (flags & NGX_POST_EVENTS) {
                ngx_post_event(wev, &ngx_posted_events);

            } else {

            	/* 如果可写,则回调写事件的handler */
                wev->handler(wev);
            }
        }

        ...

    }

    return NGX_OK;
}

NGINX的事件驱动机制循环遍历epoll中的唤醒事件,如果有新的连接到来,则epoll_wait将返回唤醒事件的列表,每一个事件关联的连接对象中均可以得到”read/write”指代的读写事件。

我们从上面的分析中已经知道,读事件的回调函数是”ngx_event_accept()”:


/* nginx/src/event/ngx_event_accept.c */

void
ngx_event_accept(ngx_event_t *ev)
{
    ...

    ngx_event_t       *rev, *wev;
    ngx_listening_t   *ls;

    ...

    /* NGINX的事件、连接和侦听结构互相关联 */

    lc = ev->data;
    ls = lc->listening;

    ... 

    /* 系统调用accept()接受新的连接 */

    do {
        socklen = NGX_SOCKADDRLEN;

#if (NGX_HAVE_ACCEPT4)
        if (use_accept4) {
            s = accept4(lc->fd, (struct sockaddr *) sa, &socklen,
                        SOCK_NONBLOCK);
        } else {
            s = accept(lc->fd, (struct sockaddr *) sa, &socklen);
        }
#else
        s = accept(lc->fd, (struct sockaddr *) sa, &socklen);
#endif

        ...

        /* NGINX的worker进程负载均衡策略 */
    
        ngx_accept_disabled = ngx_cycle->connection_n / 8
                                  - ngx_cycle->free_connection_n;
    
        /* 从连接池中取出一个空闲的连接来驱动数据循环,并不是直接使用入参ev关联的连接 */
    
        c = ngx_get_connection(s, ev->log);
    
        if (c == NULL) {
            if (ngx_close_socket(s) == -1) {
                ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                              ngx_close_socket_n " failed");
            }
    
            return;
        }
    
        c->type = SOCK_STREAM;
    
#if (NGX_STAT_STUB)
        (void) ngx_atomic_fetch_add(ngx_stat_active, 1);
#endif

        c->pool = ngx_create_pool(ls->pool_size, ev->log);
        if (c->pool == NULL) {
            ngx_close_accepted_connection(c);
            return;
        }
    
        c->sockaddr = ngx_palloc(c->pool, socklen);
        if (c->sockaddr == NULL) {
            ngx_close_accepted_connection(c);
            return;
        }
    
        ngx_memcpy(c->sockaddr, sa, socklen);
    
        log = ngx_palloc(c->pool, sizeof(ngx_log_t));
        if (log == NULL) {
            ngx_close_accepted_connection(c);
            return;
        }

        ...

        *log = ls->log;
    
        c->recv = ngx_recv;
        c->send = ngx_send;
        c->recv_chain = ngx_recv_chain;
        c->send_chain = ngx_send_chain;
    
        c->log = log;
        c->pool->log = log;
    
        c->socklen = socklen;
        c->listening = ls;
        c->local_sockaddr = ls->sockaddr;
        c->local_socklen = ls->socklen;
    
        c->unexpected_eof = 1;

#if (NGX_HAVE_UNIX_DOMAIN)
        if (c->sockaddr->sa_family == AF_UNIX) {
            c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
            c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;
#if (NGX_SOLARIS)
        /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */
        c->sendfile = 0;
#endif
        }
#endif

        rev = c->read;
        wev = c->write;
    
        wev->ready = 1;
    
        if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
            rev->ready = 1;
        }
    
        if (ev->deferred_accept) {
            rev->ready = 1;
#if (NGX_HAVE_KQUEUE)
            rev->available = 1;
#endif
        }

            rev->log = log;
            wev->log = log;

        ...

        }

        ...

        /* 侦听结构的handler为ngx_http_init_connection() */

        ls->handler(c);

    } while (ev->available)
}

上面我们提过,NGINX的端口侦听已经完成了Socket的”socket()” => “bind()” => “listen()”,现在,我们终于等到了系统调用”accept()”,也就是说,此时,来自用户的连接经过TCP的三次握手后,已经可以在NGINX中以连接的方式存在。但是,这种连接是NGINX重新从空闲连接池中取到的,经过一系列填充后,我们终于又回到了最初的侦听结构的回调函数”ngx_http_init_connection()”。


/* nginx/src/http/ngx_http_request.c */

void
ngx_http_init_connection(ngx_connection_t *c)
{
    ...

    /* 将读事件的回调修改为ngx_http_wait_request_handler,如果是HTTP 2.0或者HTTPS,则修改为ngx_http_v2_init或者ngx_http_ssl_handshake */

    rev = c->read;
    rev->handler = ngx_http_wait_request_handler;
    c->write->handler = ngx_http_empty_handler;

#if (NGX_HTTP_V2)
    if (hc->addr_conf->http2) {
        rev->handler = ngx_http_v2_init;
    }
#endif

#if (NGX_HTTP_SSL)
    {
    ngx_http_ssl_srv_conf_t  *sscf;

    sscf = ngx_http_get_module_srv_conf(hc->conf_ctx, ngx_http_ssl_module);

    if (sscf->enable || hc->addr_conf->ssl) {

        c->log->action = "SSL handshaking";

        if (hc->addr_conf->ssl && sscf->ssl.ctx == NULL) {
            ngx_log_error(NGX_LOG_ERR, c->log, 0,
                          "no \"ssl_certificate\" is defined "
                          "in server listening on SSL port");
            ngx_http_close_connection(c);
            return;
        }

        hc->ssl = 1;

        rev->handler = ngx_http_ssl_handshake;
    }
    }
#endif

    ...

    if (rev->ready) {
        /* the deferred accept(), iocp */

        if (ngx_use_accept_mutex) {
            ngx_post_event(rev, &ngx_posted_events);
            return;
        }

        /* 此处将调用ngx_http_wait_request_handler、ngx_http_v2_init或者ngx_http_ssl_handshake */

        rev->handler(rev);
        return;
    }

    ngx_add_timer(rev, c->listening->post_accept_timeout);
    ngx_reusable_connection(c, 1);

    if (ngx_handle_read_event(rev, 0) != NGX_OK) {
        ngx_http_close_connection(c);
        return;
    }
}

此刻,我们可以清晰地认识到,从”ngx_http_init_connection()”函数开始,NGINX已经进入了真正的应用循环,NGINX作为HTTP SERVER,将调用”ngx_http_wait_request_handler”、”ngx_http_v2_init”或者”ngx_http_ssl_handshake”来处理HTTP、HTTP 2.0或者HTTPS请求。

依然阶段性总结一下:

我们在分析上述两个入口的过程中,其实也已经提到过相应的写事件,当响应生成时,NGINX将逆向处理,由相关的写事件完成发送操作。

NGINX在完成响应发送操作或者异常处理操作之后,会将使用过的连接释放回空闲连接池,重新回到worker进程的工作循环,继续侦听。

总结

NGINX的事件模型非常精妙,一环扣一环地完成侦听结构、连接池和事件的关联。

同时,通过事件模块的抽象,可以跨平台地使用select/poll/epoll/kqueue…来驱动事件。

而事件在NGINX worker进程的工作循环中又不断驱动着侦听循环和收发循环。

表示对NGINX的事件模型叹为观止,受益匪浅~~