Support nginx reload.

close #12.
For more details, see doc/F-Stack_Nginx_APP_Guide.md.
dev
logwang 2017-08-23 16:54:32 +08:00
parent 6adce16393
commit 406002113b
25 changed files with 1310 additions and 622 deletions

View File

@ -75,7 +75,9 @@ Currently, besides authorized DNS server of DNSPod, there are various products i
make
make install
cd ../..
./start.sh -b /usr/local/nginx_fstack/sbin/nginx -c config.ini
/usr/local/nginx_fstack/sbin/nginx
for more details, see [nginx guide](https://github.com/F-Stack/f-stack/blob/master/doc/F-Stack_Nginx_APP_Guide.md).
#### Redis

View File

@ -148,6 +148,9 @@ install: build $NGX_INSTALL_PERL_MODULES
|| cp conf/nginx.conf '\$(DESTDIR)$NGX_CONF_PATH'
cp conf/nginx.conf '\$(DESTDIR)$NGX_CONF_PREFIX/nginx.conf.default'
test -f '\$(DESTDIR)$NGX_CONF_PREFIX/f-stack.conf' \\
|| cp conf/f-stack.conf '\$(DESTDIR)$NGX_CONF_PREFIX'
test -d '\$(DESTDIR)`dirname "$NGX_PID_PATH"`' \\
|| mkdir -p '\$(DESTDIR)`dirname "$NGX_PID_PATH"`'

View File

@ -100,14 +100,14 @@ EVENT_SRCS="src/event/ngx_event.c \
SELECT_MODULE=ngx_select_module
SELECT_SRCS="src/event/modules/ngx_select_module.c src/event/modules/ngx_ff_module.c"
SELECT_SRCS=src/event/modules/ngx_select_module.c
WIN32_SELECT_SRCS=src/event/modules/ngx_win32_select_module.c
POLL_MODULE=ngx_poll_module
POLL_SRCS=src/event/modules/ngx_poll_module.c
KQUEUE_MODULE=ngx_kqueue_module
KQUEUE_SRCS="src/event/modules/ngx_kqueue_module.c src/event/modules/ngx_ff_module.c"
KQUEUE_MODULE="ngx_kqueue_module ngx_ff_channel_module"
KQUEUE_SRCS="src/event/modules/ngx_kqueue_module.c src/event/modules/ngx_ff_module.c src/event/modules/ngx_ff_channel.c"
DEVPOLL_MODULE=ngx_devpoll_module
DEVPOLL_SRCS=src/event/modules/ngx_devpoll_module.c

View File

@ -0,0 +1,76 @@
[dpdk]
## Hexadecimal bitmask of cores to run on.
lcore_mask=1
## Port mask, enable and disable ports.
## Default: all ports are enabled.
#port_mask=1
channel=4
## Number of ports.
nb_ports=1
promiscuous=1
numa_on=1
## TCP segment offload, default: disabled.
tso=0
## HW vlan strip, default: enabled.
vlan_strip=1
## Port config section
## According to dpdk.nb_ports: port0, port1...
[port0]
addr=192.168.1.2
netmask=255.255.255.0
broadcast=192.168.1.255
gateway=192.168.1.1
## Packet capture path, this will hurt performance
#pcap=./a.pcap
## Kni config: if enabled and method=reject,
## all packets that do not belong to the following tcp_port and udp_port
## will transmit to kernel; if method=accept, all packets that belong to
## the following tcp_port and udp_port will transmit to kernel.
#[kni]
#enable=1
#method=reject
#tcp_port=80,443
#udp_port=53
## FreeBSD network performance tuning configurations.
## Most native FreeBSD configurations are supported.
[freebsd.boot]
hz=100
## Block out a range of descriptors to avoid overlap
## with the kernel's descriptor space.
## You can increase this value according to your app.
fd_reserve=1024
kern.ipc.maxsockets=262144
net.inet.tcp.syncache.hashsize=4096
net.inet.tcp.syncache.bucketlimit=100
net.inet.tcp.tcbhashsize=65536
[freebsd.sysctl]
kern.ipc.somaxconn=32768
kern.ipc.maxsockbuf=16777216
net.inet.tcp.fast_finwait2_recycle=1
net.inet.tcp.sendspace=16384
net.inet.tcp.recvspace=8192
net.inet.tcp.nolocaltimewait=1
net.inet.tcp.cc.algorithm=htcp
net.inet.tcp.sendbuf_max=16777216
net.inet.tcp.recvbuf_max=16777216
net.inet.tcp.sendbuf_auto=1
net.inet.tcp.recvbuf_auto=1
net.inet.tcp.sendbuf_inc=16384
net.inet.tcp.recvbuf_inc=524288
net.inet.tcp.inflight.enable=0
net.inet.tcp.sack=1
net.inet.tcp.blackhole=1
net.inet.tcp.msl=2000
net.inet.tcp.delayed_ack=0
net.inet.udp.blackhole=1
net.inet.ip.redirect=0

View File

@ -1,18 +0,0 @@
[dpdk]
core=1
channel=4
process_num=1
[net]
mac=08:19:a6:25:c5:50
addr=112.90.143.29
netmask=255.255.255.128
broadcast=112.90.143.127
gateway=112.90.143.1
[log]
level=1
dir="/var/log"
[freebsd]
hz=100

View File

@ -1,15 +1,17 @@
#user nobody;
# root account is necessary.
user root;
# should be equal to the lcore count of `dpdk.lcore_mask` in f-stack.conf.
worker_processes 1;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
daemon off;
#pid logs/nginx.pid;
# path of f-stack configuration file, default: $NGX_PREFIX/conf/f-stack.conf.
fstack_conf f-stack.conf;
events {
worker_connections 102400;

View File

@ -31,7 +31,8 @@ static void ngx_unload_module(void *data);
#endif
#if (NGX_HAVE_FSTACK)
void ff_mod_init(int argc, char * const *argv);
static char *ngx_set_fstack_conf(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
#endif
static ngx_conf_enum_t ngx_debug_points[] = {
@ -148,6 +149,15 @@ static ngx_command_t ngx_core_commands[] = {
0,
NULL },
#if (NGX_HAVE_FSTACK)
{ ngx_string("fstack_conf"),
NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1,
ngx_set_fstack_conf,
0,
offsetof(ngx_core_conf_t, fstack_conf),
NULL },
#endif
ngx_null_command
};
@ -197,22 +207,13 @@ main(int argc, char *const *argv)
ngx_conf_dump_t *cd;
ngx_core_conf_t *ccf;
#if (NGX_HAVE_FSTACK)
int ac = 1;
char *p = "nginx";
ff_mod_init(argc, argv);
#endif
ngx_debug_init();
if (ngx_strerror_init() != NGX_OK) {
return 1;
}
#if (NGX_HAVE_FSTACK)
if (ngx_get_options(ac, &p) != NGX_OK) {
#else
if (ngx_get_options(argc, argv) != NGX_OK) {
#endif
return 1;
}
@ -373,16 +374,12 @@ main(int argc, char *const *argv)
ngx_use_stderr = 0;
#if (NGX_HAVE_FSTACK)
ngx_single_process_cycle(cycle);
#else
if (ngx_process == NGX_PROCESS_SINGLE) {
ngx_single_process_cycle(cycle);
} else {
ngx_master_process_cycle(cycle);
}
#endif
return 0;
}
@ -1588,3 +1585,33 @@ ngx_unload_module(void *data)
}
#endif
#if (NGX_HAVE_FSTACK)
static
char *ngx_set_fstack_conf(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf)
{
char *p = conf;
ngx_str_t *field, *value;
ngx_str_t full;
field = (ngx_str_t *)(p + cmd->offset);
if (field->data) {
return "is duplicate";
}
value = cf->args->elts;
full = value[1];
if (ngx_conf_full_name(cf->cycle, &full, 1) != NGX_OK) {
return NGX_CONF_ERROR;
}
*field = full;
return NGX_CONF_OK;
}
#endif

View File

@ -609,9 +609,11 @@ ngx_init_cycle(ngx_cycle_t *old_cycle)
}
}
#if (!NGX_HAVE_FSTACK)
if (ngx_open_listening_sockets(cycle) != NGX_OK) {
goto failed;
}
#endif
if (!ngx_test_config) {
ngx_configure_listening_sockets(cycle);

View File

@ -113,6 +113,10 @@ typedef struct {
ngx_array_t env;
char **environment;
#if (NGX_HAVE_FSTACK)
ngx_str_t fstack_conf;
#endif
} ngx_core_conf_t;

View File

@ -0,0 +1,793 @@
/*
* Copyright (C) 2017 THL A29 Limited, a Tencent company.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_event.h>
#include <ngx_channel.h>
#include <ngx_cycle.h>
#include <pthread.h>
#if (NGX_HAVE_FSTACK)
static void * ngx_ff_channel_create_conf(ngx_cycle_t *cycle);
static char * ngx_ff_channel_init_conf(ngx_cycle_t *cycle,
void *conf);
static ngx_int_t ngx_ff_epoll_init(ngx_cycle_t *cycle);
static void ngx_ff_epoll_done(ngx_cycle_t *cycle);
static ngx_int_t ngx_ff_epoll_add_event(ngx_event_t *ev,
ngx_int_t event, ngx_uint_t flags);
static ngx_int_t ngx_ff_epoll_del_event(ngx_event_t *ev,
ngx_int_t event, ngx_uint_t flags);
static ngx_int_t ngx_ff_epoll_process_events(ngx_cycle_t *cycle,
ngx_msec_t timer, ngx_uint_t flags);
static ngx_int_t ngx_ff_create_connection(ngx_cycle_t *cycle);
static void ngx_ff_delete_connection();
static void ngx_ff_primary_channel_handler(ngx_event_t *ev);
static void ngx_ff_worker_channel_handler(ngx_event_t *ev);
static void *ngx_ff_channel_thread_main(void *args);
static ngx_int_t ngx_ff_add_channel_event(ngx_cycle_t *cycle,
ngx_fd_t fd, ngx_int_t event, ngx_event_handler_pt handler);
ngx_int_t ngx_ff_start_worker_channel(ngx_cycle_t *cycle,
ngx_fd_t fd, ngx_int_t event);
ngx_int_t ngx_ff_start_primary_channel(ngx_cycle_t *cycle,
ngx_fd_t fd, ngx_int_t event);
ngx_int_t ngx_ff_process_channel_events(ngx_cycle_t *cycle);
struct channel_thread_args {
ngx_cycle_t *cycle;
ngx_fd_t fd;
ngx_int_t event;
ngx_event_handler_pt handler;
};
static pthread_t channel_thread;
static int thread_quit;
static int ep = -1;
static struct epoll_event *event_list;
static ngx_uint_t nevents;
static ngx_connection_t *channel_connection;
typedef struct {
ngx_uint_t events;
} ngx_ff_channel_conf_t;
static ngx_command_t ngx_ff_channel_commands[] = {
ngx_null_command
};
ngx_core_module_t ngx_ff_channel_module_ctx = {
ngx_string("ff_channel"),
ngx_ff_channel_create_conf, /* create configuration */
ngx_ff_channel_init_conf, /* init configuration */
};
ngx_module_t ngx_ff_channel_module = {
NGX_MODULE_V1,
&ngx_ff_channel_module_ctx, /* module context */
ngx_ff_channel_commands, /* module directives */
NGX_CORE_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
ngx_ff_epoll_init, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
ngx_ff_epoll_done, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
static void *
ngx_ff_channel_create_conf(ngx_cycle_t *cycle)
{
ngx_ff_channel_conf_t *cf;
cf = ngx_palloc(cycle->pool, sizeof(ngx_ff_channel_conf_t));
if (cf == NULL) {
return NULL;
}
cf->events = NGX_CONF_UNSET;
return cf;
}
static char *
ngx_ff_channel_init_conf(ngx_cycle_t *cycle, void *conf)
{
ngx_ff_channel_conf_t *cf = conf;
cf->events = 1;
return NGX_CONF_OK;
}
static ngx_int_t
ngx_ff_epoll_init(ngx_cycle_t *cycle)
{
if (ep == -1) {
ep = epoll_create(1);
if (ep == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"epoll_create() failed");
return NGX_ERROR;
}
}
if (event_list) {
ngx_free(event_list);
}
event_list = ngx_alloc(sizeof(struct epoll_event), cycle->log);
if (event_list == NULL) {
return NGX_ERROR;
}
nevents = 1;
channel_connection = NULL;
return NGX_OK;
}
static void
ngx_ff_epoll_done(ngx_cycle_t *cycle)
{
if (ep != -1) {
if (close(ep) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"epoll close() failed");
}
ep = -1;
}
if (event_list) {
ngx_free(event_list);
event_list = NULL;
}
nevents = 0;
if (channel_connection) {
ngx_ff_delete_connection();
}
}
static ngx_int_t
ngx_ff_epoll_add_event(ngx_event_t *ev, ngx_int_t event,
ngx_uint_t flags)
{
int op;
uint32_t events, prev;
ngx_event_t *e;
ngx_connection_t *c;
struct epoll_event ee;
c = ev->data;
events = (uint32_t) event;
if (event == NGX_READ_EVENT) {
e = c->write;
prev = EPOLLOUT;
#if (NGX_READ_EVENT != EPOLLIN|EPOLLRDHUP)
events = EPOLLIN|EPOLLRDHUP;
#endif
} else {
e = c->read;
prev = EPOLLIN|EPOLLRDHUP;
#if (NGX_WRITE_EVENT != EPOLLOUT)
events = EPOLLOUT;
#endif
}
if (e->active) {
op = EPOLL_CTL_MOD;
events |= prev;
} else {
op = EPOLL_CTL_ADD;
}
ee.events = events | (uint32_t) flags;
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"epoll add event: fd:%d op:%d ev:%08XD",
c->fd, op, ee.events);
if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"epoll_ctl(%d, %d) failed", op, c->fd);
return NGX_ERROR;
}
ev->active = 1;
#if 0
ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
#endif
return NGX_OK;
}
static ngx_int_t
ngx_ff_epoll_del_event(ngx_event_t *ev, ngx_int_t event,
ngx_uint_t flags)
{
int op;
uint32_t prev;
ngx_event_t *e;
ngx_connection_t *c;
struct epoll_event ee;
/*
* when the file descriptor is closed, the epoll automatically deletes
* it from its queue, so we do not need to delete explicitly the event
* before the closing the file descriptor
*/
if (flags & NGX_CLOSE_EVENT) {
ev->active = 0;
return NGX_OK;
}
c = ev->data;
if (event == NGX_READ_EVENT) {
e = c->write;
prev = EPOLLOUT;
} else {
e = c->read;
prev = EPOLLIN|EPOLLRDHUP;
}
if (e->active) {
op = EPOLL_CTL_MOD;
ee.events = prev | (uint32_t) flags;
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);
} else {
op = EPOLL_CTL_DEL;
ee.events = 0;
ee.data.ptr = NULL;
}
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"epoll del event: fd:%d op:%d ev:%08XD",
c->fd, op, ee.events);
if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"epoll_ctl(%d, %d) failed", op, c->fd);
return NGX_ERROR;
}
ev->active = 0;
return NGX_OK;
}
#if 0
static ngx_int_t
ngx_ff_epoll_add_connection(ngx_connection_t *c)
{
struct epoll_event ee;
ee.events = EPOLLIN|EPOLLOUT|EPOLLET|EPOLLRDHUP;
ee.data.ptr = (void *) ((uintptr_t) c | c->read->instance);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"epoll add connection: fd:%d ev:%08XD", c->fd, ee.events);
if (epoll_ctl(ep, EPOLL_CTL_ADD, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
"epoll_ctl(EPOLL_CTL_ADD, %d) failed", c->fd);
return NGX_ERROR;
}
c->read->active = 1;
c->write->active = 1;
return NGX_OK;
}
static ngx_int_t
ngx_ff_epoll_del_connection(ngx_connection_t *c, ngx_uint_t flags)
{
int op;
struct epoll_event ee;
/*
* when the file descriptor is closed the epoll automatically deletes
* it from its queue so we do not need to delete explicitly the event
* before the closing the file descriptor
*/
if (flags & NGX_CLOSE_EVENT) {
c->read->active = 0;
c->write->active = 0;
return NGX_OK;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
"epoll del connection: fd:%d", c->fd);
op = EPOLL_CTL_DEL;
ee.events = 0;
ee.data.ptr = NULL;
if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
"epoll_ctl(%d, %d) failed", op, c->fd);
return NGX_ERROR;
}
c->read->active = 0;
c->write->active = 0;
return NGX_OK;
}
#endif
static ngx_int_t
ngx_ff_epoll_process_events(ngx_cycle_t *cycle,
ngx_msec_t timer, ngx_uint_t flags)
{
int events;
uint32_t revents;
ngx_int_t instance, i;
ngx_uint_t level;
ngx_err_t err;
ngx_event_t *rev, *wev;
ngx_connection_t *c;
/* NGX_TIMER_INFINITE == INFTIM */
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, log, 0,
"epoll timer: %M", timer);
events = epoll_wait(ep, event_list, (int) nevents, timer);
err = (events == -1) ? ngx_errno : 0;
if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
ngx_time_update();
}
if (err) {
if (err == NGX_EINTR) {
level = NGX_LOG_INFO;
} else {
level = NGX_LOG_ALERT;
}
ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
return NGX_ERROR;
}
if (events == 0) {
if (timer != NGX_TIMER_INFINITE) {
return NGX_OK;
}
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"epoll_wait() returned no events without timeout");
return NGX_ERROR;
}
for (i = 0; i < events; i++) {
c = event_list[i].data.ptr;
instance = (uintptr_t) c & 1;
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
rev = c->read;
if (c->fd == -1 || rev->instance != instance) {
/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}
revents = event_list[i].events;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: fd:%d ev:%04XD d:%p",
c->fd, revents, event_list[i].data.ptr);
if (revents & (EPOLLERR|EPOLLHUP)) {
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll_wait() error on fd:%d ev:%04XD",
c->fd, revents);
/*
* if the error events were returned, add EPOLLIN and EPOLLOUT
* to handle the events at least in one active handler
*/
revents |= EPOLLIN|EPOLLOUT;
}
if ((revents & EPOLLIN) && rev->active) {
rev->ready = 1;
rev->handler(rev);
}
wev = c->write;
if ((revents & EPOLLOUT) && wev->active) {
if (c->fd == -1 || wev->instance != instance) {
/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, log, 0,
"epoll: stale event %p", c);
continue;
}
wev->ready = 1;
#if (NGX_THREADS)
wev->complete = 1;
#endif
wev->handler(wev);
}
}
return NGX_OK;
}
static ngx_int_t
ngx_ff_create_connection(ngx_cycle_t *cycle)
{
ngx_event_t *rev, *wev;
ngx_connection_t *c;
c = ngx_calloc(sizeof(ngx_connection_t), cycle->log);
if (c == NULL) {
return NGX_ERROR;
}
rev = ngx_calloc(sizeof(ngx_event_t), cycle->log);
if (rev == NULL) {
ngx_free(c);
return NGX_ERROR;
}
rev->index = NGX_INVALID_INDEX;
rev->data = c;
rev->log = cycle->log;
wev = ngx_calloc(sizeof(ngx_event_t), cycle->log);
if (wev == NULL) {
ngx_free(c);
ngx_free(rev);
return NGX_ERROR;
}
wev->index = NGX_INVALID_INDEX;
wev->write = 1;
wev->data = c;
wev->log = cycle->log;
c->pool = cycle->pool;
c->data = NULL;
c->read = rev;
c->write = wev;
c->fd = (ngx_socket_t) -1;
c->log = cycle->log;
channel_connection = c;
return NGX_OK;
}
static void
ngx_ff_delete_connection()
{
ngx_connection_t *c = channel_connection;
if (c == NULL) {
return;
}
if (c->read) {
ngx_free(c->read);
}
if (c->write) {
ngx_free(c->write);
}
ngx_free(c);
channel_connection = NULL;
}
static ngx_int_t
ngx_ff_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd,
ngx_int_t event, ngx_event_handler_pt handler)
{
ngx_connection_t *c;
ngx_event_t *ev, *rev, *wev;
if (channel_connection != NULL) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"add channel event failed: already added");
return NGX_ERROR;
}
if (ngx_ff_create_connection(cycle) != NGX_OK) {
return NGX_ERROR;
}
c = channel_connection;
c->fd = fd;
rev = c->read;
wev = c->write;
rev->channel = 1;
wev->channel = 1;
ev = (event == NGX_READ_EVENT) ? rev : wev;
ev->handler = handler;
if (ngx_ff_epoll_add_event(ev, event, 0) == NGX_ERROR) {
return NGX_ERROR;
}
return NGX_OK;
}
static void
ngx_ff_worker_channel_handler(ngx_event_t *ev)
{
ngx_int_t n;
ngx_channel_t ch;
ngx_connection_t *c;
if (ev->timedout) {
ev->timedout = 0;
return;
}
c = ev->data;
ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "worker channel handler");
for ( ;; ) {
n = ngx_read_channel(c->fd, &ch, sizeof(ngx_channel_t), ev->log);
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, "channel: %i", n);
if (n == NGX_ERROR) {
ngx_ff_epoll_del_event(c->read, NGX_READ_EVENT, NGX_CLOSE_EVENT);
close(c->fd);
ngx_ff_delete_connection();
thread_quit = 1;
return;
}
if (n == NGX_AGAIN) {
return;
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
"channel command: %ui", ch.command);
switch (ch.command) {
case NGX_CMD_QUIT:
ngx_quit = 1;
thread_quit = 1;
break;
case NGX_CMD_TERMINATE:
ngx_terminate = 1;
thread_quit = 1;
break;
case NGX_CMD_REOPEN:
ngx_reopen = 1;
break;
case NGX_CMD_OPEN_CHANNEL:
ngx_log_debug3(NGX_LOG_DEBUG_CORE, ev->log, 0,
"get channel s:%i pid:%P fd:%d",
ch.slot, ch.pid, ch.fd);
ngx_processes[ch.slot].pid = ch.pid;
ngx_processes[ch.slot].channel[0] = ch.fd;
break;
case NGX_CMD_CLOSE_CHANNEL:
ngx_log_debug4(NGX_LOG_DEBUG_CORE, ev->log, 0,
"close channel s:%i pid:%P our:%P fd:%d",
ch.slot, ch.pid, ngx_processes[ch.slot].pid,
ngx_processes[ch.slot].channel[0]);
if (close(ngx_processes[ch.slot].channel[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"close() channel failed");
}
ngx_processes[ch.slot].channel[0] = -1;
break;
}
}
}
static void
ngx_ff_primary_channel_handler(ngx_event_t *ev)
{
ngx_int_t n;
ngx_channel_t ch;
ngx_connection_t *c;
if (ev->timedout) {
ev->timedout = 0;
return;
}
c = ev->data;
ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "primary channel handler");
for ( ;; ) {
n = ngx_read_channel(c->fd, &ch, sizeof(ngx_channel_t), ev->log);
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, "channel: %i", n);
if (n == NGX_ERROR) {
ngx_ff_epoll_del_event(c->read, NGX_READ_EVENT, NGX_CLOSE_EVENT);
close(c->fd);
ngx_ff_delete_connection();
return;
}
if (n == NGX_AGAIN) {
return;
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
"channel command: %ui", ch.command);
switch (ch.command) {
case NGX_CMD_TERMINATE:
ngx_terminate = 1;
break;
case NGX_CMD_REOPEN:
ngx_reopen = 1;
break;
case NGX_CMD_OPEN_CHANNEL:
ngx_log_debug3(NGX_LOG_DEBUG_CORE, ev->log, 0,
"get channel s:%i pid:%P fd:%d",
ch.slot, ch.pid, ch.fd);
ngx_processes[ch.slot].pid = ch.pid;
ngx_processes[ch.slot].channel[0] = ch.fd;
break;
case NGX_CMD_CLOSE_CHANNEL:
ngx_log_debug4(NGX_LOG_DEBUG_CORE, ev->log, 0,
"close channel s:%i pid:%P our:%P fd:%d",
ch.slot, ch.pid, ngx_processes[ch.slot].pid,
ngx_processes[ch.slot].channel[0]);
if (close(ngx_processes[ch.slot].channel[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"close() channel failed");
}
ngx_processes[ch.slot].channel[0] = -1;
break;
}
}
}
static void *
ngx_ff_channel_thread_main(void *args)
{
struct channel_thread_args *cta = args;
ngx_cycle_t *cycle = cta->cycle;
if (ngx_ff_add_channel_event(cycle, cta->fd, cta->event,
cta->handler) != NGX_OK) {
return NULL;
}
for (;;) {
ngx_ff_process_channel_events(cycle);
if (thread_quit) {
break;
}
}
ngx_free(cta);
return NULL;
}
ngx_int_t
ngx_ff_process_channel_events(ngx_cycle_t *cycle)
{
return ngx_ff_epoll_process_events(cycle, 500, NGX_UPDATE_TIME);
}
ngx_int_t
ngx_ff_start_worker_channel(ngx_cycle_t *cycle, ngx_fd_t fd,
ngx_int_t event)
{
int ret;
struct channel_thread_args *cta;
cta = ngx_alloc(sizeof(struct channel_thread_args), cycle->log);
if (cta == NULL) {
return NGX_ERROR;
}
cta->cycle = cycle;
cta->fd = fd;
cta->event = event;
cta->handler = ngx_ff_worker_channel_handler;
ret = pthread_create(&channel_thread, NULL,
ngx_ff_channel_thread_main, (void *)cta);
if (ret != 0) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"pthread_create() failed");
return NGX_ERROR;
}
pthread_detach(channel_thread);
return NGX_OK;
}
ngx_int_t
ngx_ff_start_primary_channel(ngx_cycle_t *cycle,
ngx_fd_t fd, ngx_int_t event)
{
return ngx_ff_add_channel_event(cycle, fd, event,
ngx_ff_primary_channel_handler);
}
#endif

View File

@ -71,10 +71,8 @@
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <sys/select.h>
#include "ff_api.h"
#include "ff_config.h"
#define _GNU_SOURCE
#define __USE_GNU
@ -85,6 +83,7 @@
#include <fcntl.h>
#include <sys/syscall.h>
#include <dlfcn.h>
#include <limits.h>
#ifndef likely
#define likely(x) __builtin_expect((x),1)
@ -99,6 +98,8 @@ static int (*real_socket)(int, int, int);
static int (*real_bind)(int, const struct sockaddr*, socklen_t);
static int (*real_connect)(int, const struct sockaddr*, socklen_t);
static int (*real_listen)(int, int);
static int (*real_getsockopt)(int, int, int, void *, socklen_t*);
static int (*real_setsockopt)(int, int, int, const void *, socklen_t);
static int (*real_accept)(int, struct sockaddr *, socklen_t *);
@ -126,14 +127,37 @@ static int inited;
})
void
ff_mod_init(int argc, char * const *argv) {
int rc;
// proc_type, 1: primary, 0: secondary.
int
ff_mod_init(const char *conf, int proc_id, int proc_type) {
int rc, i;
int ff_argc = 4;
rc = ff_init(argc, argv);
assert(0 == rc);
char **ff_argv = malloc(sizeof(char *)*ff_argc);
for (i = 0; i < ff_argc; i++) {
ff_argv[i] = malloc(sizeof(char)*PATH_MAX);
}
inited = 1;
sprintf(ff_argv[0], "nginx");
sprintf(ff_argv[1], "--conf=%s", conf);
sprintf(ff_argv[2], "--proc-id=%d", proc_id);
if (proc_type == 1) {
sprintf(ff_argv[3], "--proc-type=primary");
} else {
sprintf(ff_argv[3], "--proc-type=secondary");
}
rc = ff_init(ff_argc, ff_argv);
if (rc == 0)
inited = 1;
for (i = 0; i < ff_argc; i++) {
free(ff_argv[i]);
}
free(ff_argv);
return rc;
}
int
@ -250,6 +274,21 @@ listen(int sockfd, int backlog)
}
}
int
getsockopt(int sockfd, int level, int optname,
void *optval, socklen_t *optlen)
{
if (unlikely(inited == 0)) {
return SYSCALL(getsockopt)(sockfd, level, optname, optval, optlen);
}
if (ff_fdisused(sockfd)) {
return ff_getsockopt(sockfd, level, optname, optval, optlen);
} else {
return SYSCALL(getsockopt)(sockfd, level, optname, optval, optlen);
}
}
int
setsockopt (int sockfd, int level, int optname,
const void *optval, socklen_t optlen)

View File

@ -52,7 +52,7 @@ static struct kevent notify_kev;
#if (NGX_HAVE_FSTACK)
extern int kqueue(void);
extern int kevent(int kq, const struct kevent *changelist, int nchanges,
extern int kevent(int kq, const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents, const struct timespec *timeout);
#endif

View File

@ -22,11 +22,22 @@ static void ngx_master_process_exit(ngx_cycle_t *cycle);
static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data);
static void ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker);
static void ngx_worker_process_exit(ngx_cycle_t *cycle);
#if (NGX_HAVE_FSTACK)
extern ngx_int_t ngx_ff_start_worker_channel(ngx_cycle_t *cycle,
ngx_fd_t fd, ngx_int_t event);
extern ngx_int_t ngx_ff_start_primary_channel(ngx_cycle_t *cycle,
ngx_fd_t fd, ngx_int_t event);
extern ngx_int_t ngx_ff_process_channel_events(ngx_cycle_t *cycle);
#else
static void ngx_channel_handler(ngx_event_t *ev);
#endif
static void ngx_cache_manager_process_cycle(ngx_cycle_t *cycle, void *data);
static void ngx_cache_manager_process_handler(ngx_event_t *ev);
static void ngx_cache_loader_process_handler(ngx_event_t *ev);
#if (NGX_HAVE_FSTACK)
extern int ff_mod_init(const char *conf, int proc_id, int proc_type);
#endif
ngx_uint_t ngx_process;
ngx_uint_t ngx_worker;
@ -68,6 +79,143 @@ static ngx_cycle_t ngx_exit_cycle;
static ngx_log_t ngx_exit_log;
static ngx_open_file_t ngx_exit_log_file;
#if (NGX_HAVE_FSTACK)
static void
ngx_ff_primary_process_exit(ngx_cycle_t *cycle)
{
ngx_uint_t i;
for (i = 0; cycle->modules[i]; i++) {
if (cycle->modules[i]->exit_process) {
cycle->modules[i]->exit_process(cycle);
}
}
/*
* Copy ngx_cycle->log related data to the special static exit cycle,
* log, and log file structures enough to allow a signal handler to log.
* The handler may be called when standard ngx_cycle->log allocated from
* ngx_cycle->pool is already destroyed.
*/
ngx_exit_log = *ngx_log_get_file_log(ngx_cycle->log);
ngx_exit_log_file.fd = ngx_exit_log.file->fd;
ngx_exit_log.file = &ngx_exit_log_file;
ngx_exit_log.next = NULL;
ngx_exit_log.writer = NULL;
ngx_exit_cycle.log = &ngx_exit_log;
ngx_exit_cycle.files = ngx_cycle->files;
ngx_exit_cycle.files_n = ngx_cycle->files_n;
ngx_cycle = &ngx_exit_cycle;
ngx_destroy_pool(cycle->pool);
// wait worker process exited.
ngx_msleep(500);
ngx_log_error(NGX_LOG_NOTICE, ngx_cycle->log, 0, "exit");
exit(0);
}
static void
ngx_ff_primary_process_cycle(ngx_cycle_t *cycle, void *data)
{
ngx_core_conf_t *ccf;
ngx_uint_t i;
ngx_int_t n;
ngx_process = NGX_PROCESS_WORKER;
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx,
ngx_core_module);
ngx_setproctitle("ff primary process");
if (ff_mod_init((const char *)ccf->fstack_conf.data, 0, 1)) {
exit(2);
}
for (i = 0; cycle->modules[i]; i++) {
if (cycle->modules[i]->init_process) {
if (cycle->modules[i]->init_process(cycle) == NGX_ERROR) {
/* fatal */
exit(2);
}
}
}
for (n = 0; n < ngx_last_process; n++) {
if (ngx_processes[n].pid == -1) {
continue;
}
if (n == ngx_process_slot) {
continue;
}
if (ngx_processes[n].channel[1] == -1) {
continue;
}
if (close(ngx_processes[n].channel[1]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"close() channel failed");
}
}
if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"close() channel failed");
}
if (ngx_ff_start_primary_channel(cycle, ngx_channel, NGX_READ_EVENT)) {
exit(2);
}
for ( ;; ) {
if (ngx_terminate) {
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
ngx_ff_primary_process_exit(cycle);
}
if (ngx_reopen) {
ngx_reopen = 0;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
ngx_reopen_files(cycle, -1);
}
(void) ngx_ff_process_channel_events(cycle);
}
}
static void
ngx_start_ff_primary_process(ngx_cycle_t *cycle)
{
ngx_channel_t ch;
ngx_memzero(&ch, sizeof(ngx_channel_t));
ch.command = NGX_CMD_OPEN_CHANNEL;
ngx_spawn_process(cycle, ngx_ff_primary_process_cycle,
NULL, "ff primary process",
NGX_PROCESS_RESPAWN);
ch.pid = ngx_processes[ngx_process_slot].pid;
ch.slot = ngx_process_slot;
ch.fd = ngx_processes[ngx_process_slot].channel[0];
ngx_pass_open_channel(cycle, &ch);
// wait for primary process startup.
ngx_sleep(1);
}
#endif
void
ngx_master_process_cycle(ngx_cycle_t *cycle)
@ -76,12 +224,18 @@ ngx_master_process_cycle(ngx_cycle_t *cycle)
u_char *p;
size_t size;
ngx_int_t i;
#if (NGX_HAVE_FSTACK)
ngx_uint_t sigio;
#else
ngx_uint_t n, sigio;
#endif
sigset_t set;
struct itimerval itv;
ngx_uint_t live;
ngx_msec_t delay;
#if (!NGX_HAVE_FSTACK)
ngx_listening_t *ls;
#endif
ngx_core_conf_t *ccf;
sigemptyset(&set);
@ -127,6 +281,16 @@ ngx_master_process_cycle(ngx_cycle_t *cycle)
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
#if (NGX_HAVE_FSTACK)
if (ccf->fstack_conf.len == 0) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"fstack_conf null");
exit(2);
}
ngx_start_ff_primary_process(cycle);
#endif
ngx_start_worker_processes(cycle, ccf->worker_processes,
NGX_PROCESS_RESPAWN);
ngx_start_cache_manager_processes(cycle, 0);
@ -204,6 +368,7 @@ ngx_master_process_cycle(ngx_cycle_t *cycle)
ngx_signal_worker_processes(cycle,
ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
#if (!NGX_HAVE_FSTACK)
ls = cycle->listening.elts;
for (n = 0; n < cycle->listening.nelts; n++) {
if (ngx_close_socket(ls[n].fd) == -1) {
@ -213,7 +378,7 @@ ngx_master_process_cycle(ngx_cycle_t *cycle)
}
}
cycle->listening.nelts = 0;
#endif
continue;
}
@ -283,50 +448,6 @@ ngx_master_process_cycle(ngx_cycle_t *cycle)
}
}
#if (NGX_HAVE_FSTACK)
#include "ff_api.h"
static int
ngx_single_process_cycle_loop(void *arg)
{
ngx_cycle_t *cycle = (ngx_cycle_t *)arg;
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");
ngx_process_events_and_timers(cycle);
if (ngx_terminate || ngx_quit) {
ngx_uint_t i;
for (i = 0; cycle->modules[i]; i++) {
if (cycle->modules[i]->exit_process) {
cycle->modules[i]->exit_process(cycle);
}
}
ngx_master_process_exit(cycle);
}
if (ngx_reconfigure) {
ngx_reconfigure = 0;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reconfiguring");
cycle = ngx_init_cycle(cycle);
if (cycle == NULL) {
cycle = (ngx_cycle_t *) ngx_cycle;
return 0;
}
ngx_cycle = cycle;
}
if (ngx_reopen) {
ngx_reopen = 0;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
ngx_reopen_files(cycle, (ngx_uid_t) -1);
}
return 0;
}
#endif
void
ngx_single_process_cycle(ngx_cycle_t *cycle)
{
@ -346,9 +467,6 @@ ngx_single_process_cycle(ngx_cycle_t *cycle)
}
}
#if (NGX_HAVE_FSTACK)
ff_run(ngx_single_process_cycle_loop, (void *)cycle);
#else
for ( ;; ) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");
@ -359,11 +477,11 @@ ngx_single_process_cycle(ngx_cycle_t *cycle)
for (i = 0; cycle->modules[i]; i++) {
if (cycle->modules[i]->exit_process) {
cycle->modules[i]->exit_process(cycle);
}
}
}
}
ngx_master_process_exit(cycle);
}
}
if (ngx_reconfigure) {
ngx_reconfigure = 0;