Make Mongoose webserver asynchronous.

This allows for asynchronous reply handling, which is much more logical when dealing with the async
nature of PhantomJS like page.open(). This uses a async patch from Andy Rahn's Mongoose close,
available here: http://code.google.com/r/andyrahn-async/

This is from https://github.com/ariya/phantomjs/pull/202.

http://code.google.com/p/phantomjs/issues/detail?id=326
1.5
Leo Franchi 2012-02-10 13:42:38 -05:00 committed by Ariya Hidayat
parent 7fa13832ef
commit bbce8920d0
7 changed files with 4409 additions and 4383 deletions

View File

@ -34,6 +34,7 @@ if (phantom.args.length !== 1) {
response.write("<html><head><title>YES!</title></head>");
// note: writeBody can be called multiple times
response.write("<body><p>pretty cool :)</body></html>");
response.close();
});
if (!listening) {
console.log("could not create web server listening on port " + port);

View File

@ -27,6 +27,7 @@ else
response.write "</pre>"
response.write "</body>"
response.write "</html>"
response.close
)
if service
console.log "Web server running on port " + port

View File

@ -29,6 +29,7 @@ if (phantom.args.length !== 1) {
response.write('</pre>');
response.write('</body>');
response.write('</html>');
response.close();
});
if (service) {

View File

@ -23,15 +23,8 @@
#else
#define _XOPEN_SOURCE 600 // For flockfile() on Linux
#define _LARGEFILE_SOURCE // Enable 64-bit file offsets
#define __STDC_FORMAT_MACROS // <inttypes.h> wants this for C++
#endif
#if defined(__SYMBIAN32__)
#define NO_SSL // SSL is not supported
#define NO_CGI // CGI is not supported
#define PATH_MAX FILENAME_MAX
#endif // __SYMBIAN32__
#ifndef _WIN32_WCE // Some ANSI #includes are not available on Windows CE
#include <sys/types.h>
#include <sys/stat.h>
@ -50,7 +43,7 @@
#include <stddef.h>
#include <stdio.h>
#if defined(_WIN32) && !defined(__SYMBIAN32__) // Windows specific
#if defined(_WIN32) // Windows specific #includes and #defines
#define _WIN32_WINNT 0x0400 // To make it link in VS2005
#include <windows.h>
@ -197,9 +190,7 @@ typedef struct DIR {
#endif
#define DIRSEP '/'
#define IS_DIRSEP_CHAR(c) ((c) == '/')
#ifndef O_BINARY
#define O_BINARY 0
#endif // O_BINARY
#define closesocket(a) close(a)
#define mg_fopen(x, y) fopen(x, y)
#define mg_mkdir(x, y) mkdir(x, y)
@ -215,7 +206,7 @@ typedef int SOCKET;
#include "mongoose.h"
#define MONGOOSE_VERSION "3.1"
#define MONGOOSE_VERSION "3.0"
#define PASSWORDS_FILE_NAME ".htpasswd"
#define CGI_ENVIRONMENT_SIZE 4096
#define MAX_CGI_ENVIR_VARS 64
@ -292,7 +283,7 @@ struct ssl_func {
#define SSL_connect (* (int (*)(SSL *)) ssl_sw[2].ptr)
#define SSL_read (* (int (*)(SSL *, void *, int)) ssl_sw[3].ptr)
#define SSL_write (* (int (*)(SSL *, const void *,int)) ssl_sw[4].ptr)
#define SSL_get_error (* (int (*)(SSL *, int)) ssl_sw[5].ptr)
#define SSL_get_error (* (int (*)(SSL *, int)) ssl_sw[5])
#define SSL_set_fd (* (int (*)(SSL *, SOCKET)) ssl_sw[6].ptr)
#define SSL_new (* (SSL * (*)(SSL_CTX *)) ssl_sw[7].ptr)
#define SSL_CTX_new (* (SSL_CTX * (*)(SSL_METHOD *)) ssl_sw[8].ptr)
@ -314,8 +305,8 @@ struct ssl_func {
(* (void (*)(void (*)(int, int, const char *, int))) crypto_sw[1].ptr)
#define CRYPTO_set_id_callback \
(* (void (*)(unsigned long (*)(void))) crypto_sw[2].ptr)
#define ERR_get_error (* (unsigned long (*)(void)) crypto_sw[3].ptr)
#define ERR_error_string (* (char * (*)(unsigned long,char *)) crypto_sw[4].ptr)
#define ERR_get_error (* (unsigned long (*)(void)) ssl_sw[3].ptr)
#define ERR_error_string (* (char * (*)(unsigned long, char *)) ssl_sw[4].ptr)
// set_ssl_option() function updates this array.
// It loads SSL library dynamically and changes NULLs to the actual addresses
@ -431,7 +422,7 @@ static const char *config_options[] = {
#define ENTRIES_PER_CONFIG_OPTION 3
struct mg_context {
volatile int stop_flag; // Should we stop event loop
int stop_flag; // Should we stop event loop
SSL_CTX *ssl_ctx; // SSL context
char *config[NUM_OPTIONS]; // Mongoose configuration parameters
mg_callback_t user_callback; // User-defined callback function
@ -439,13 +430,13 @@ struct mg_context {
struct socket *listening_sockets;
volatile int num_threads; // Number of threads
int num_threads; // Number of threads
pthread_mutex_t mutex; // Protects (max|num)_threads
pthread_cond_t cond; // Condvar for tracking workers terminations
struct socket queue[20]; // Accepted sockets
volatile int sq_head; // Head of the socket queue
volatile int sq_tail; // Tail of the socket queue
int sq_head; // Head of the socket queue
int sq_tail; // Tail of the socket queue
pthread_cond_t sq_full; // Singaled when socket is produced
pthread_cond_t sq_empty; // Signaled when socket is consumed
};
@ -464,6 +455,7 @@ struct mg_connection {
int buf_size; // Buffer size
int request_len; // Size of the request + headers in a buffer
int data_len; // Total size of data in a buffer
int is_async; // Flag which allows a worker thread to keep working
};
const char **mg_get_valid_option_names(void) {
@ -692,8 +684,7 @@ static char *skip_quoted(char **buf, const char *delimiters, const char *whitesp
return begin_word;
}
// Simplified version of skip_quoted without quote char
// and whitespace == delimiters
// Simplified version of skip_quoted without quote char and whitespace == delimiters
static char *skip(char **buf, const char *delimiters) {
return skip_quoted(buf, delimiters, delimiters, 0);
}
@ -744,7 +735,7 @@ static const char *next_option(const char *list, struct vec *val,
* so that val points to "x", and eq_val points to "y".
*/
eq_val->len = 0;
eq_val->ptr = (const char *) memchr(val->ptr, '=', val->len);
eq_val->ptr = memchr(val->ptr, '=', val->len);
if (eq_val->ptr != NULL) {
eq_val->ptr++; /* Skip over '=' character */
eq_val->len = val->ptr + val->len - eq_val->ptr;
@ -820,7 +811,7 @@ static void send_http_error(struct mg_connection *conn, int status,
}
}
#if defined(_WIN32) && !defined(__SYMBIAN32__)
#ifdef _WIN32
static int pthread_mutex_init(pthread_mutex_t *mutex, void *unused) {
unused = NULL;
*mutex = CreateMutex(NULL, FALSE, NULL);
@ -850,7 +841,7 @@ static int pthread_cond_wait(pthread_cond_t *cv, pthread_mutex_t *mutex) {
HANDLE handles[] = {cv->signal, cv->broadcast};
ReleaseMutex(*mutex);
WaitForMultipleObjects(2, handles, FALSE, INFINITE);
return WaitForSingleObject(*mutex, INFINITE) == WAIT_OBJECT_0? 0 : -1;
return ReleaseMutex(*mutex) == 0 ? -1 : 0;
}
static int pthread_cond_signal(pthread_cond_t *cv) {
@ -860,16 +851,7 @@ static int pthread_cond_signal(pthread_cond_t *cv) {
static int pthread_cond_broadcast(pthread_cond_t *cv) {
// Implementation with PulseEvent() has race condition, see
// http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
// return PulseEvent(cv->broadcast) == 0 ? -1 : 0;
// [KirillJacobson] PulseEvent causes ms_stop() function to hang when the
// process runs in the VisualStudio debugger (observed on Windows XP SP3,
// VS 2008)
//
// MSDN states that PulseEvent() "function is unreliable and should
// not be used. It exists mainly for backward compatibility"
// http://msdn.microsoft.com/en-us/library/ms684914%28v=vs.85%29.aspx
return SetEvent(cv->broadcast) == 0 ? -1 : 0;
return PulseEvent(cv->broadcast) == 0 ? -1 : 0;
}
static int pthread_cond_destroy(pthread_cond_t *cv) {
@ -1377,8 +1359,7 @@ int mg_read(struct mg_connection *conn, void *buf, size_t len) {
int n, buffered_len, nread;
const char *buffered;
assert((conn->content_len == -1 && conn->consumed_content == 0) ||
conn->consumed_content <= conn->content_len);
assert(conn->content_len >= conn->consumed_content);
DEBUG_TRACE(("%p %zu %lld %lld", buf, len,
conn->content_len, conn->consumed_content));
nread = 0;
@ -1578,7 +1559,7 @@ static void convert_uri_to_file_name(struct mg_connection *conn,
match_len = get_document_root(conn, &vec);
mg_snprintf(conn, buf, buf_len, "%.*s%s", vec.len, vec.ptr, uri + match_len);
#if defined(_WIN32) && !defined(__SYMBIAN32__)
#ifdef _WIN32
change_slashes_to_backslashes(buf);
#endif /* _WIN32 */
@ -1612,8 +1593,7 @@ static struct mg_connection *mg_connect(struct mg_connection *conn,
cry(conn, "%s: connect(%s:%d): %s", __func__, host, port,
strerror(ERRNO));
closesocket(sock);
} else if ((newconn = (struct mg_connection *)
calloc(1, sizeof(*newconn))) == NULL) {
} else if ((newconn = calloc(1, sizeof(*newconn))) == NULL) {
cry(conn, "%s: calloc: %s", __func__, strerror(ERRNO));
closesocket(sock);
} else {
@ -1995,7 +1975,7 @@ static void MD5Final(unsigned char digest[16], MD5_CTX *ctx) {
MD5Transform(ctx->buf, (uint32_t *) ctx->in);
byteReverse((unsigned char *) ctx->buf, 4);
memcpy(digest, ctx->buf, 16);
memset((char *) ctx, 0, sizeof(*ctx));
memset((char *) ctx, 0, sizeof(ctx));
}
#endif // !HAVE_MD5
@ -2125,7 +2105,9 @@ static int parse_auth_header(struct mg_connection *conn, char *buf,
if (s[0] == ',') {
s++;
}
} else {
}
else
{
value = skip_quoted(&s, ", ", " ", 0); // IE uses commas, FF uses spaces
}
if (*name == '\0') {
@ -2245,14 +2227,16 @@ static int is_authorized_for_put(struct mg_connection *conn) {
return ret;
}
int mg_modify_passwords_file(const char *fname, const char *domain,
int mg_modify_passwords_file(struct mg_context *ctx, const char *fname,
const char *user, const char *pass) {
int found;
char line[512], u[512], d[512], ha1[33], tmp[PATH_MAX];
const char *domain;
FILE *fp, *fp2;
found = 0;
fp = fp2 = NULL;
domain = ctx->config[AUTHENTICATION_DOMAIN];
// Regard empty password as no password - remove user record.
if (pass[0] == '\0') {
@ -2268,9 +2252,10 @@ int mg_modify_passwords_file(const char *fname, const char *domain,
// Open the given file and temporary file
if ((fp = mg_fopen(fname, "r")) == NULL) {
cry(fc(ctx), "Cannot open %s: %s", fname, strerror(errno));
return 0;
} else if ((fp2 = mg_fopen(tmp, "w+")) == NULL) {
fclose(fp);
cry(fc(ctx), "Cannot open %s: %s", tmp, strerror(errno));
return 0;
}
@ -2432,7 +2417,6 @@ static void handle_directory_request(struct mg_connection *conn,
}
if (entries == NULL) {
closedir(dirp);
send_http_error(conn, 500, "Cannot open directory",
"%s", "Error: cannot allocate memory");
return;
@ -3342,7 +3326,7 @@ static int parse_port_string(const struct vec *vec, struct socket *so) {
static int set_ports_option(struct mg_context *ctx) {
const char *list = ctx->config[LISTENING_PORTS];
int on = 1, success = 1;
int reuseaddr = 1, success = 1;
SOCKET sock;
struct vec vec;
struct socket so, *listener;
@ -3359,26 +3343,16 @@ static int set_ports_option(struct mg_context *ctx) {
#if !defined(_WIN32)
// On Windows, SO_REUSEADDR is recommended only for
// broadcast UDP sockets
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on,
sizeof(on)) != 0 ||
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr,
sizeof(reuseaddr)) != 0 ||
#endif // !_WIN32
// Set TCP keep-alive. This is needed because if HTTP-level
// keep-alive is enabled, and client resets the connection,
// server won't get TCP FIN or RST and will keep the connection
// open forever. With TCP keep-alive, next keep-alive
// handshake will figure out that the client is down and
// will close the server end.
// Thanks to Igor Klopov who suggested the patch.
setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &on,
sizeof(on)) != 0 ||
bind(sock, &so.lsa.u.sa, so.lsa.len) != 0 ||
listen(sock, 20) != 0) {
closesocket(sock);
cry(fc(ctx), "%s: cannot bind to %.*s: %s", __func__,
vec.len, vec.ptr, strerror(ERRNO));
success = 0;
} else if ((listener = (struct socket *)
calloc(1, sizeof(*listener))) == NULL) {
} else if ((listener = calloc(1, sizeof(*listener))) == NULL) {
closesocket(sock);
cry(fc(ctx), "%s: %s", __func__, strerror(ERRNO));
success = 0;
@ -3582,7 +3556,6 @@ static int load_dll(struct mg_context *ctx, const char *dll_name,
// Dynamically load SSL library. Set up ctx->ssl_ctx pointer.
static int set_ssl_option(struct mg_context *ctx) {
struct mg_request_info request_info;
SSL_CTX *CTX;
int i, size;
const char *pem = ctx->config[SSL_CERTIFICATE];
@ -3606,10 +3579,7 @@ static int set_ssl_option(struct mg_context *ctx) {
if ((CTX = SSL_CTX_new(SSLv23_server_method())) == NULL) {
cry(fc(ctx), "SSL_CTX_new error: %s", ssl_error());
} else if (ctx->user_callback != NULL) {
memset(&request_info, 0, sizeof(request_info));
request_info.user_data = ctx->user_data;
ctx->user_callback(MG_INIT_SSL, (struct mg_connection *) CTX,
&request_info);
ctx->user_callback(MG_INIT_SSL, (struct mg_connection *) CTX, NULL);
}
if (CTX != NULL && SSL_CTX_use_certificate_file(CTX, pem,
@ -3726,24 +3696,40 @@ static void discard_current_request_from_buffer(struct mg_connection *conn) {
}
conn->data_len -= conn->request_len + body_len;
memmove(conn->buf, conn->buf + conn->request_len + body_len,
(size_t) conn->data_len);
memmove(conn->buf, conn->buf + conn->request_len + body_len, (size_t)conn->data_len);
}
void mg_detach(struct mg_connection *conn, int inIsAsync) {
conn->is_async = inIsAsync;
}
void mg_close_detached_connection(struct mg_connection *conn) {
assert (conn);
assert (conn->is_async);
discard_current_request_from_buffer(conn);
close_connection(conn);
free (conn);
}
static int parse_url(const char *url, char *host, int *port) {
int len;
if (sscanf(url, "%*[htps]://%1024[^:]:%d%n", host, port, &len) == 2 ||
sscanf(url, "%1024[^:]:%d%n", host, port, &len) == 2) {
} else if (sscanf(url, "%*[htps]://%1024[^/]%n", host, &len) == 1) {
*port = 80;
if (url == NULL) {
return 0;
};
if (!strncmp(url, "http://", 7)) {
url += 7;
}
if (sscanf(url, "%1024[^:]:%d/%n", host, port, &len) == 2) {
} else {
sscanf(url, "%1024[^/]%n", host, &len);
sscanf(url, "%1024[^/]/%n", host, &len);
*port = 80;
}
DEBUG_TRACE(("Host:%s, port:%d", host, *port));
return len;
return len > 0 && url[len - 1] == '/' ? len - 1 : len;
}
static void handle_proxy_request(struct mg_connection *conn) {
@ -3753,7 +3739,7 @@ static void handle_proxy_request(struct mg_connection *conn) {
DEBUG_TRACE(("URL: %s", ri->uri));
if (conn->request_info.uri[0] == '/' ||
(ri->uri == NULL || (len = parse_url(ri->uri, host, &port))) == 0) {
(len = parse_url(ri->uri, host, &port)) == 0) {
return;
}
@ -3849,6 +3835,9 @@ static void process_new_connection(struct mg_connection *conn) {
handle_request(conn);
}
log_access(conn);
if( conn->is_async )
return;
discard_current_request_from_buffer(conn);
}
// conn->peer is not NULL only for SSL-ed proxy connections
@ -3890,15 +3879,20 @@ static int consume_socket(struct mg_context *ctx, struct socket *sp) {
}
static void worker_thread(struct mg_context *ctx) {
struct mg_connection *conn;
struct mg_connection *conn = NULL;
int buf_size = atoi(ctx->config[MAX_REQUEST_SIZE]);
conn = (struct mg_connection *) calloc(1, sizeof(*conn) + buf_size);
while (ctx->stop_flag == 0) {
if( !conn ) {
conn = calloc(1, sizeof(*conn) + buf_size);
conn->buf_size = buf_size;
conn->buf = (char *) (conn + 1);
assert(conn != NULL);
}
if( ! consume_socket(ctx, &conn->client) )
break;
while (ctx->stop_flag == 0 && consume_socket(ctx, &conn->client)) {
conn->birth_time = time(NULL);
conn->ctx = ctx;
@ -3916,6 +3910,9 @@ static void worker_thread(struct mg_context *ctx) {
process_new_connection(conn);
}
if (conn->is_async)
conn = NULL;
else
close_connection(conn);
}
free(conn);
@ -4065,7 +4062,7 @@ void mg_stop(struct mg_context *ctx) {
}
free_context(ctx);
#if defined(_WIN32) && !defined(__SYMBIAN32__)
#if defined(_WIN32)
(void) WSACleanup();
#endif // _WIN32
}
@ -4076,14 +4073,14 @@ struct mg_context *mg_start(mg_callback_t user_callback, void *user_data,
const char *name, *value, *default_value;
int i;
#if defined(_WIN32) && !defined(__SYMBIAN32__)
#if defined(_WIN32)
WSADATA data;
WSAStartup(MAKEWORD(2,2), &data);
#endif // _WIN32
// Allocate context and initialize reasonable general case defaults.
// TODO(lsm): do proper error handling here.
ctx = (struct mg_context *) calloc(1, sizeof(*ctx));
ctx = calloc(1, sizeof(*ctx));
ctx->user_callback = user_callback;
ctx->user_data = user_data;
@ -4127,7 +4124,7 @@ struct mg_context *mg_start(mg_callback_t user_callback, void *user_data,
return NULL;
}
#if !defined(_WIN32) && !defined(__SYMBIAN32__)
#if !defined(_WIN32)
// Ignore SIGPIPE signal, so if browser cancels the request, it
// won't kill the whole process.
(void) signal(SIGPIPE, SIG_IGN);

View File

@ -21,8 +21,6 @@
#ifndef MONGOOSE_HEADER_INCLUDED
#define MONGOOSE_HEADER_INCLUDED
#include <stddef.h>
#ifdef __cplusplus
extern "C" {
#endif // __cplusplus
@ -56,7 +54,7 @@ enum mg_event {
MG_NEW_REQUEST, // New HTTP request has arrived from the client
MG_HTTP_ERROR, // HTTP error must be returned to the client
MG_EVENT_LOG, // Mongoose logs an event, request_info.log_message
MG_INIT_SSL // Mongoose initializes SSL. Instead of mg_connection *,
MG_INIT_SSL, // Mongoose initializes SSL. Instead of mg_connection *,
// SSL context is passed to the callback function.
};
@ -94,7 +92,7 @@ typedef void * (*mg_callback_t)(enum mg_event event,
// "listening_ports", "80,443s",
// NULL
// };
// struct mg_context *ctx = mg_start(&my_func, NULL, options);
// struct mg_context *ctx = mg_start(&my_func, options);
//
// Please refer to http://code.google.com/p/mongoose/wiki/MongooseManual
// for the list of valid option and their possible values.
@ -140,10 +138,8 @@ const char **mg_get_valid_option_names(void);
//
// Return:
// 1 on success, 0 on error.
int mg_modify_passwords_file(const char *passwords_file_name,
const char *domain,
const char *user,
const char *password);
int mg_modify_passwords_file(struct mg_context *ctx,
const char *passwords_file_name, const char *user, const char *password);
// Send data to the client.
int mg_write(struct mg_connection *, const void *buf, size_t len);
@ -162,6 +158,16 @@ int mg_printf(struct mg_connection *, const char *fmt, ...);
int mg_read(struct mg_connection *, void *buf, size_t len);
// Allow a user_callback to handle a request on its own thread, in its
// own time. Useful, for example, for doing long pull requests without
// needing a dedicated thread for each.
void mg_detach(struct mg_connection *, int);
// Release the resources associated with this connection.
void mg_close_detached_connection(struct mg_connection *conn);
// Get the value of particular HTTP header.
//
// This is a helper function. It traverses request_info->http_headers array,

View File

@ -30,7 +30,7 @@
#include "webserver.h"
#include "mongoose.h"
#include "mongoose/mongoose.h"
#include <QByteArray>
#include <QHostAddress>
@ -117,7 +117,7 @@ void WebServer::handleRequest(mg_event event, mg_connection *conn, const mg_requ
{
Q_ASSERT(QThread::currentThread() == thread());
if (event == MG_NEW_REQUEST) {
WebServerResponse responseObj(conn);
WebServerResponse* responseObj = new WebServerResponse(conn);
// Modelled after http://nodejs.org/docs/latest/api/http.html#http.ServerRequest
QVariantMap requestObject;
@ -155,7 +155,7 @@ void WebServer::handleRequest(mg_event event, mg_connection *conn, const mg_requ
}
requestObject["headers"] = headersObject;
emit newRequest(requestObject, &responseObj);
emit newRequest(requestObject, responseObj);
*handled = true;
return;
}
@ -166,13 +166,15 @@ void WebServer::handleRequest(mg_event event, mg_connection *conn, const mg_requ
//BEGIN WebServerResponse
WebServerResponse::WebServerResponse(mg_connection *conn)
: m_conn(conn)
: QObject()
, m_conn(conn)
, m_statusCode(200)
, m_headersSent(false)
{
mg_detach(m_conn, 1);
}
const char* responseCodeString(int code)
{
// see: http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
@ -289,6 +291,14 @@ void WebServerResponse::write(const QString &body)
mg_write(m_conn, data.constData(), data.size());
}
void WebServerResponse::close()
{
mg_close_detached_connection(m_conn);
deleteLater();
}
int WebServerResponse::statusCode() const
{
return m_statusCode;

View File

@ -110,6 +110,16 @@ public slots:
/// sends @p data to client and makes sure the headers are send beforehand
void write(const QString &data);
/**
* Closes the request once all writing if finished
* This MUST be called when done with a request!
*
* NOTE: After calling close(), this request object
* is no longer valid. Any further calls are
* undefined and may crash.
*/
void close();
/// get the currently set status code, 200 is the default
int statusCode() const;
/// set the status code to @p code