support keep-alive in the webserver

The recent patch that brought asynchronous webserver response handling
made it impossible to have proper keep-alive support in the server.

We want the server to support keep-alive though, which is especially
useful when writing a PhantomJS script that allows one to "remote control"
PhantomJS, using the WebServer API, without flooding the TCP connections.
Also the performance might be improved.

Note: This patch reverts commit bbce8920d0,
and resets the Mongoose code to the vanilla 3.0 version. Instead we now
support the async handling of HTTP requests using some QWaitCondition
magic.

Note: keep-alive support is optional, and disabled by default. To enable
it, use something like:

server.listen(port, {"keep-alive": true}, function(request, response) {...});

Like before, calling response.close() is crucial. Furthermore though, a
server that has keep-alive enabled *must* set a proper "Content-Length: ..."
header in it's response, otherwise clients will not be able to know when
the response has finished.

fix memory leaks in webserver

ISSUE: 416 (http://code.google.com/p/phantomjs/issues/detail?id=416)
1.5
Milian Wolff 2012-03-06 13:33:56 +01:00 committed by Ariya Hidayat
parent a398b3fec0
commit ac906391d4
5 changed files with 267 additions and 230 deletions

View File

@ -112,11 +112,15 @@ exports.create = function (opts) {
defineSetter("onNewRequest", "newRequest");
server.listen = function (port, handler) {
if (arguments.length === 2 && typeof handler === 'function') {
this.onNewRequest = handler;
//TODO: settings?
return this.listenOnPort(port);
server.listen = function (port, arg1, arg2) {
if (arguments.length === 2 && typeof arg1 === 'function') {
this.onNewRequest = arg1;
return this.listenOnPort(port, {});
}
if (arguments.length === 3 && typeof arg2 === 'function') {
this.onNewRequest = arg2;
// arg1 == settings
return this.listenOnPort(port, arg1);
}
throw "Wrong use of WebServer#listen";
};

View File

@ -23,8 +23,15 @@
#else
#define _XOPEN_SOURCE 600 // For flockfile() on Linux
#define _LARGEFILE_SOURCE // Enable 64-bit file offsets
#define __STDC_FORMAT_MACROS // <inttypes.h> wants this for C++
#endif
#if defined(__SYMBIAN32__)
#define NO_SSL // SSL is not supported
#define NO_CGI // CGI is not supported
#define PATH_MAX FILENAME_MAX
#endif // __SYMBIAN32__
#ifndef _WIN32_WCE // Some ANSI #includes are not available on Windows CE
#include <sys/types.h>
#include <sys/stat.h>
@ -43,7 +50,7 @@
#include <stddef.h>
#include <stdio.h>
#if defined(_WIN32) // Windows specific #includes and #defines
#if defined(_WIN32) && !defined(__SYMBIAN32__) // Windows specific
#define _WIN32_WINNT 0x0400 // To make it link in VS2005
#include <windows.h>
@ -190,7 +197,9 @@ typedef struct DIR {
#endif
#define DIRSEP '/'
#define IS_DIRSEP_CHAR(c) ((c) == '/')
#ifndef O_BINARY
#define O_BINARY 0
#endif // O_BINARY
#define closesocket(a) close(a)
#define mg_fopen(x, y) fopen(x, y)
#define mg_mkdir(x, y) mkdir(x, y)
@ -283,7 +292,7 @@ struct ssl_func {
#define SSL_connect (* (int (*)(SSL *)) ssl_sw[2].ptr)
#define SSL_read (* (int (*)(SSL *, void *, int)) ssl_sw[3].ptr)
#define SSL_write (* (int (*)(SSL *, const void *,int)) ssl_sw[4].ptr)
#define SSL_get_error (* (int (*)(SSL *, int)) ssl_sw[5])
#define SSL_get_error (* (int (*)(SSL *, int)) ssl_sw[5].ptr)
#define SSL_set_fd (* (int (*)(SSL *, SOCKET)) ssl_sw[6].ptr)
#define SSL_new (* (SSL * (*)(SSL_CTX *)) ssl_sw[7].ptr)
#define SSL_CTX_new (* (SSL_CTX * (*)(SSL_METHOD *)) ssl_sw[8].ptr)
@ -305,8 +314,8 @@ struct ssl_func {
(* (void (*)(void (*)(int, int, const char *, int))) crypto_sw[1].ptr)
#define CRYPTO_set_id_callback \
(* (void (*)(unsigned long (*)(void))) crypto_sw[2].ptr)
#define ERR_get_error (* (unsigned long (*)(void)) ssl_sw[3].ptr)
#define ERR_error_string (* (char * (*)(unsigned long, char *)) ssl_sw[4].ptr)
#define ERR_get_error (* (unsigned long (*)(void)) crypto_sw[3].ptr)
#define ERR_error_string (* (char * (*)(unsigned long,char *)) crypto_sw[4].ptr)
// set_ssl_option() function updates this array.
// It loads SSL library dynamically and changes NULLs to the actual addresses
@ -422,7 +431,7 @@ static const char *config_options[] = {
#define ENTRIES_PER_CONFIG_OPTION 3
struct mg_context {
int stop_flag; // Should we stop event loop
volatile int stop_flag; // Should we stop event loop
SSL_CTX *ssl_ctx; // SSL context
char *config[NUM_OPTIONS]; // Mongoose configuration parameters
mg_callback_t user_callback; // User-defined callback function
@ -430,13 +439,13 @@ struct mg_context {
struct socket *listening_sockets;
int num_threads; // Number of threads
volatile int num_threads; // Number of threads
pthread_mutex_t mutex; // Protects (max|num)_threads
pthread_cond_t cond; // Condvar for tracking workers terminations
struct socket queue[20]; // Accepted sockets
int sq_head; // Head of the socket queue
int sq_tail; // Tail of the socket queue
volatile int sq_head; // Head of the socket queue
volatile int sq_tail; // Tail of the socket queue
pthread_cond_t sq_full; // Singaled when socket is produced
pthread_cond_t sq_empty; // Signaled when socket is consumed
};
@ -455,7 +464,6 @@ struct mg_connection {
int buf_size; // Buffer size
int request_len; // Size of the request + headers in a buffer
int data_len; // Total size of data in a buffer
int is_async; // Flag which allows a worker thread to keep working
};
const char **mg_get_valid_option_names(void) {
@ -684,7 +692,8 @@ static char *skip_quoted(char **buf, const char *delimiters, const char *whitesp
return begin_word;
}
// Simplified version of skip_quoted without quote char and whitespace == delimiters
// Simplified version of skip_quoted without quote char
// and whitespace == delimiters
static char *skip(char **buf, const char *delimiters) {
return skip_quoted(buf, delimiters, delimiters, 0);
}
@ -735,7 +744,7 @@ static const char *next_option(const char *list, struct vec *val,
* so that val points to "x", and eq_val points to "y".
*/
eq_val->len = 0;
eq_val->ptr = memchr(val->ptr, '=', val->len);
eq_val->ptr = (const char *) memchr(val->ptr, '=', val->len);
if (eq_val->ptr != NULL) {
eq_val->ptr++; /* Skip over '=' character */
eq_val->len = val->ptr + val->len - eq_val->ptr;
@ -811,7 +820,7 @@ static void send_http_error(struct mg_connection *conn, int status,
}
}
#ifdef _WIN32
#if defined(_WIN32) && !defined(__SYMBIAN32__)
static int pthread_mutex_init(pthread_mutex_t *mutex, void *unused) {
unused = NULL;
*mutex = CreateMutex(NULL, FALSE, NULL);
@ -841,7 +850,7 @@ static int pthread_cond_wait(pthread_cond_t *cv, pthread_mutex_t *mutex) {
HANDLE handles[] = {cv->signal, cv->broadcast};
ReleaseMutex(*mutex);
WaitForMultipleObjects(2, handles, FALSE, INFINITE);
return ReleaseMutex(*mutex) == 0 ? -1 : 0;
return WaitForSingleObject(*mutex, INFINITE) == WAIT_OBJECT_0? 0 : -1;
}
static int pthread_cond_signal(pthread_cond_t *cv) {
@ -1359,7 +1368,8 @@ int mg_read(struct mg_connection *conn, void *buf, size_t len) {
int n, buffered_len, nread;
const char *buffered;
assert(conn->content_len >= conn->consumed_content);
assert((conn->content_len == -1 && conn->consumed_content == 0) ||
conn->consumed_content <= conn->content_len);
DEBUG_TRACE(("%p %zu %lld %lld", buf, len,
conn->content_len, conn->consumed_content));
nread = 0;
@ -1559,7 +1569,7 @@ static void convert_uri_to_file_name(struct mg_connection *conn,
match_len = get_document_root(conn, &vec);
mg_snprintf(conn, buf, buf_len, "%.*s%s", vec.len, vec.ptr, uri + match_len);
#ifdef _WIN32
#if defined(_WIN32) && !defined(__SYMBIAN32__)
change_slashes_to_backslashes(buf);
#endif /* _WIN32 */
@ -1593,7 +1603,8 @@ static struct mg_connection *mg_connect(struct mg_connection *conn,
cry(conn, "%s: connect(%s:%d): %s", __func__, host, port,
strerror(ERRNO));
closesocket(sock);
} else if ((newconn = calloc(1, sizeof(*newconn))) == NULL) {
} else if ((newconn = (struct mg_connection *)
calloc(1, sizeof(*newconn))) == NULL) {
cry(conn, "%s: calloc: %s", __func__, strerror(ERRNO));
closesocket(sock);
} else {
@ -1975,7 +1986,7 @@ static void MD5Final(unsigned char digest[16], MD5_CTX *ctx) {
MD5Transform(ctx->buf, (uint32_t *) ctx->in);
byteReverse((unsigned char *) ctx->buf, 4);
memcpy(digest, ctx->buf, 16);
memset((char *) ctx, 0, sizeof(ctx));
memset((char *) ctx, 0, sizeof(*ctx));
}
#endif // !HAVE_MD5
@ -2105,9 +2116,7 @@ static int parse_auth_header(struct mg_connection *conn, char *buf,
if (s[0] == ',') {
s++;
}
}
else
{
} else {
value = skip_quoted(&s, ", ", " ", 0); // IE uses commas, FF uses spaces
}
if (*name == '\0') {
@ -2227,16 +2236,14 @@ static int is_authorized_for_put(struct mg_connection *conn) {
return ret;
}
int mg_modify_passwords_file(struct mg_context *ctx, const char *fname,
int mg_modify_passwords_file(const char *fname, const char *domain,
const char *user, const char *pass) {
int found;
char line[512], u[512], d[512], ha1[33], tmp[PATH_MAX];
const char *domain;
FILE *fp, *fp2;
found = 0;
fp = fp2 = NULL;
domain = ctx->config[AUTHENTICATION_DOMAIN];
// Regard empty password as no password - remove user record.
if (pass[0] == '\0') {
@ -2252,10 +2259,9 @@ int mg_modify_passwords_file(struct mg_context *ctx, const char *fname,
// Open the given file and temporary file
if ((fp = mg_fopen(fname, "r")) == NULL) {
cry(fc(ctx), "Cannot open %s: %s", fname, strerror(errno));
return 0;
} else if ((fp2 = mg_fopen(tmp, "w+")) == NULL) {
cry(fc(ctx), "Cannot open %s: %s", tmp, strerror(errno));
fclose(fp);
return 0;
}
@ -2417,6 +2423,7 @@ static void handle_directory_request(struct mg_connection *conn,
}
if (entries == NULL) {
closedir(dirp);
send_http_error(conn, 500, "Cannot open directory",
"%s", "Error: cannot allocate memory");
return;
@ -3352,7 +3359,8 @@ static int set_ports_option(struct mg_context *ctx) {
cry(fc(ctx), "%s: cannot bind to %.*s: %s", __func__,
vec.len, vec.ptr, strerror(ERRNO));
success = 0;
} else if ((listener = calloc(1, sizeof(*listener))) == NULL) {
} else if ((listener = (struct socket *)
calloc(1, sizeof(*listener))) == NULL) {
closesocket(sock);
cry(fc(ctx), "%s: %s", __func__, strerror(ERRNO));
success = 0;
@ -3556,6 +3564,7 @@ static int load_dll(struct mg_context *ctx, const char *dll_name,
// Dynamically load SSL library. Set up ctx->ssl_ctx pointer.
static int set_ssl_option(struct mg_context *ctx) {
struct mg_request_info request_info;
SSL_CTX *CTX;
int i, size;
const char *pem = ctx->config[SSL_CERTIFICATE];
@ -3579,7 +3588,10 @@ static int set_ssl_option(struct mg_context *ctx) {
if ((CTX = SSL_CTX_new(SSLv23_server_method())) == NULL) {
cry(fc(ctx), "SSL_CTX_new error: %s", ssl_error());
} else if (ctx->user_callback != NULL) {
ctx->user_callback(MG_INIT_SSL, (struct mg_connection *) CTX, NULL);
memset(&request_info, 0, sizeof(request_info));
request_info.user_data = ctx->user_data;
ctx->user_callback(MG_INIT_SSL, (struct mg_connection *) CTX,
&request_info);
}
if (CTX != NULL && SSL_CTX_use_certificate_file(CTX, pem,
@ -3696,40 +3708,24 @@ static void discard_current_request_from_buffer(struct mg_connection *conn) {
}
conn->data_len -= conn->request_len + body_len;
memmove(conn->buf, conn->buf + conn->request_len + body_len, (size_t)conn->data_len);
}
void mg_detach(struct mg_connection *conn, int inIsAsync) {
conn->is_async = inIsAsync;
}
void mg_close_detached_connection(struct mg_connection *conn) {
assert (conn);
assert (conn->is_async);
discard_current_request_from_buffer(conn);
close_connection(conn);
free (conn);
memmove(conn->buf, conn->buf + conn->request_len + body_len,
(size_t) conn->data_len);
}
static int parse_url(const char *url, char *host, int *port) {
int len;
if (url == NULL) {
return 0;
};
if (!strncmp(url, "http://", 7)) {
url += 7;
}
if (sscanf(url, "%1024[^:]:%d/%n", host, port, &len) == 2) {
if (sscanf(url, "%*[htps]://%1024[^:]:%d%n", host, port, &len) == 2 ||
sscanf(url, "%1024[^:]:%d%n", host, port, &len) == 2) {
} else if (sscanf(url, "%*[htps]://%1024[^/]%n", host, &len) == 1) {
*port = 80;
} else {
sscanf(url, "%1024[^/]/%n", host, &len);
sscanf(url, "%1024[^/]%n", host, &len);
*port = 80;
}
DEBUG_TRACE(("Host:%s, port:%d", host, *port));
return len > 0 && url[len - 1] == '/' ? len - 1 : len;
return len;
}
static void handle_proxy_request(struct mg_connection *conn) {
@ -3739,7 +3735,7 @@ static void handle_proxy_request(struct mg_connection *conn) {
DEBUG_TRACE(("URL: %s", ri->uri));
if (conn->request_info.uri[0] == '/' ||
(len = parse_url(ri->uri, host, &port)) == 0) {
(ri->uri == NULL || (len = parse_url(ri->uri, host, &port))) == 0) {
return;
}
@ -3750,7 +3746,7 @@ static void handle_proxy_request(struct mg_connection *conn) {
}
conn->peer->client.is_ssl = is_ssl;
}
// Forward client's request to the target
mg_printf(conn->peer, "%s %s HTTP/%s\r\n", ri->request_method, ri->uri + len,
ri->http_version);
@ -3835,9 +3831,6 @@ static void process_new_connection(struct mg_connection *conn) {
handle_request(conn);
}
log_access(conn);
if( conn->is_async )
return;
discard_current_request_from_buffer(conn);
}
// conn->peer is not NULL only for SSL-ed proxy connections
@ -3879,20 +3872,15 @@ static int consume_socket(struct mg_context *ctx, struct socket *sp) {
}
static void worker_thread(struct mg_context *ctx) {
struct mg_connection *conn = NULL;
struct mg_connection *conn;
int buf_size = atoi(ctx->config[MAX_REQUEST_SIZE]);
while (ctx->stop_flag == 0) {
if( !conn ) {
conn = calloc(1, sizeof(*conn) + buf_size);
conn->buf_size = buf_size;
conn->buf = (char *) (conn + 1);
assert(conn != NULL);
}
if( ! consume_socket(ctx, &conn->client) )
break;
conn = (struct mg_connection *) calloc(1, sizeof(*conn) + buf_size);
conn->buf_size = buf_size;
conn->buf = (char *) (conn + 1);
assert(conn != NULL);
while (ctx->stop_flag == 0 && consume_socket(ctx, &conn->client)) {
conn->birth_time = time(NULL);
conn->ctx = ctx;
@ -3910,10 +3898,7 @@ static void worker_thread(struct mg_context *ctx) {
process_new_connection(conn);
}
if (conn->is_async)
conn = NULL;
else
close_connection(conn);
close_connection(conn);
}
free(conn);
@ -4062,7 +4047,7 @@ void mg_stop(struct mg_context *ctx) {
}
free_context(ctx);
#if defined(_WIN32)
#if defined(_WIN32) && !defined(__SYMBIAN32__)
(void) WSACleanup();
#endif // _WIN32
}
@ -4073,14 +4058,14 @@ struct mg_context *mg_start(mg_callback_t user_callback, void *user_data,
const char *name, *value, *default_value;
int i;
#if defined(_WIN32)
#if defined(_WIN32) && !defined(__SYMBIAN32__)
WSADATA data;
WSAStartup(MAKEWORD(2,2), &data);
#endif // _WIN32
// Allocate context and initialize reasonable general case defaults.
// TODO(lsm): do proper error handling here.
ctx = calloc(1, sizeof(*ctx));
ctx = (struct mg_context *) calloc(1, sizeof(*ctx));
ctx->user_callback = user_callback;
ctx->user_data = user_data;
@ -4124,7 +4109,7 @@ struct mg_context *mg_start(mg_callback_t user_callback, void *user_data,
return NULL;
}
#if !defined(_WIN32)
#if !defined(_WIN32) && !defined(__SYMBIAN32__)
// Ignore SIGPIPE signal, so if browser cancels the request, it
// won't kill the whole process.
(void) signal(SIGPIPE, SIG_IGN);

View File

@ -54,7 +54,7 @@ enum mg_event {
MG_NEW_REQUEST, // New HTTP request has arrived from the client
MG_HTTP_ERROR, // HTTP error must be returned to the client
MG_EVENT_LOG, // Mongoose logs an event, request_info.log_message
MG_INIT_SSL, // Mongoose initializes SSL. Instead of mg_connection *,
MG_INIT_SSL // Mongoose initializes SSL. Instead of mg_connection *,
// SSL context is passed to the callback function.
};
@ -92,7 +92,7 @@ typedef void * (*mg_callback_t)(enum mg_event event,
// "listening_ports", "80,443s",
// NULL
// };
// struct mg_context *ctx = mg_start(&my_func, options);
// struct mg_context *ctx = mg_start(&my_func, NULL, options);
//
// Please refer to http://code.google.com/p/mongoose/wiki/MongooseManual
// for the list of valid option and their possible values.
@ -138,8 +138,10 @@ const char **mg_get_valid_option_names(void);
//
// Return:
// 1 on success, 0 on error.
int mg_modify_passwords_file(struct mg_context *ctx,
const char *passwords_file_name, const char *user, const char *password);
int mg_modify_passwords_file(const char *passwords_file_name,
const char *domain,
const char *user,
const char *password);
// Send data to the client.
int mg_write(struct mg_connection *, const void *buf, size_t len);
@ -158,16 +160,6 @@ int mg_printf(struct mg_connection *, const char *fmt, ...);
int mg_read(struct mg_connection *, void *buf, size_t len);
// Allow a user_callback to handle a request on its own thread, in its
// own time. Useful, for example, for doing long pull requests without
// needing a dedicated thread for each.
void mg_detach(struct mg_connection *, int);
// Release the resources associated with this connection.
void mg_close_detached_connection(struct mg_connection *conn);
// Get the value of particular HTTP header.
//
// This is a helper function. It traverses request_info->http_headers array,

View File

@ -37,80 +37,7 @@
#include <QMetaType>
#include <QThread>
#include <QUrl>
static void *callback(mg_event event,
mg_connection *conn,
const mg_request_info *request_info)
{
WebServer* server = static_cast<WebServer*>(request_info->user_data);
// note: we use a blocking queued connection to always handle the request in the main thread
// TODO: check whether direct call works as well
bool handled = false;
Qt::ConnectionType connectionType = Qt::DirectConnection;
if (QThread::currentThread() != server->thread()) {
connectionType = Qt::BlockingQueuedConnection;
}
QMetaObject::invokeMethod(server, "handleRequest", connectionType,
Q_ARG(mg_event, event), Q_ARG(mg_connection*, conn),
Q_ARG(const mg_request_info*, request_info),
Q_ARG(bool*, &handled));
if (handled) {
// anything non-null... pretty ugly, why not simply a bool??
return server;
} else {
return 0;
}
}
WebServer::WebServer(QObject *parent, Config *config)
: REPLCompletable(parent)
, m_config(config)
, m_ctx(0)
{
setObjectName("WebServer");
qRegisterMetaType<mg_event>("mg_event");
qRegisterMetaType<mg_connection*>("mg_connection*");
qRegisterMetaType<const mg_request_info*>("const mg_request_info*");
qRegisterMetaType<bool*>("bool*");
}
WebServer::~WebServer()
{
close();
}
bool WebServer::listenOnPort(const QString& port)
{
///TODO: listen on multiple ports?
close();
const char *options[] = {
"listening_ports", qstrdup(qPrintable(port)),
"enable_directory_listing", "no",
NULL};
///TODO: more options from m_config?
m_ctx = mg_start(&callback, this, options);
if (!m_ctx) {
return false;
}
m_port = port;
return true;
}
QString WebServer::port() const
{
return m_port;
}
void WebServer::close()
{
if (m_ctx) {
mg_stop(m_ctx);
m_ctx = 0;
m_port.clear();
}
}
#include <QVector>
namespace UrlEncodedParser {
@ -149,71 +76,192 @@ QVariantMap parse(const QByteArray &data) {
}
void WebServer::handleRequest(mg_event event, mg_connection *conn, const mg_request_info *request,
bool *handled)
static void *callback(mg_event event,
mg_connection *conn,
const mg_request_info *request)
{
Q_ASSERT(QThread::currentThread() == thread());
if (event == MG_NEW_REQUEST) {
WebServerResponse* responseObj = new WebServerResponse(conn);
WebServer* server = static_cast<WebServer*>(request->user_data);
if (server->handleRequest(event, conn, request)) {
// anything non-null... pretty ugly, why not simply a bool??
return server;
} else {
return 0;
}
}
// Modelled after http://nodejs.org/docs/latest/api/http.html#http.ServerRequest
QVariantMap requestObject;
WebServer::WebServer(QObject *parent, Config *config)
: REPLCompletable(parent)
, m_config(config)
, m_ctx(0)
{
setObjectName("WebServer");
qRegisterMetaType<WebServerResponse*>("WebServerResponse*");
}
///TODO: encoding?!
WebServer::~WebServer()
{
close();
}
if (request->request_method)
requestObject["method"] = QString::fromLocal8Bit(request->request_method);
if (request->http_version)
requestObject["httpVersion"] = QString::fromLocal8Bit(request->http_version);
if (request->status_code >=0)
requestObject["statusCode"] = request->status_code;
bool WebServer::listenOnPort(const QString& port, const QVariantMap& opts)
{
///TODO: listen on multiple ports?
close();
QByteArray uri(request->uri);
if (uri.startsWith('/'))
uri = '/' + QUrl::toPercentEncoding(QString::fromLatin1(request->uri + 1));
if (request->query_string)
uri.append('?').append(QByteArray(request->query_string));
requestObject["url"] = uri.data();
QVector<const char*> options;
options << "listening_ports" << qstrdup(qPrintable(port));
options << "enable_directory_listing" << "no";
if (opts.value("keep-alive", false).toBool()) {
options << "enable_keep_alive" << "yes";
}
options << NULL;
///TODO: more options from m_config?
m_ctx = mg_start(&callback, this, options.data());
if (!m_ctx) {
return false;
}
#if 0
// Non-standard and thus disable for the time being.
requestObject["isSSL"] = request->is_ssl;
requestObject["remoteIP"] = QHostAddress(request->remote_ip).toString();;
requestObject["remotePort"] = request->remote_port;
if (request->remote_user)
requestObject["remoteUser"] = QString::fromLocal8Bit(request->remote_user);
#endif
m_port = port;
return true;
}
QVariantMap headersObject;
for (int i = 0; i < request->num_headers; ++i) {
QString key = QString::fromLocal8Bit(request->http_headers[i].name);
QString value = QString::fromLocal8Bit(request->http_headers[i].value);
headersObject[key] = value;
}
requestObject["headers"] = headersObject;
QString WebServer::port() const
{
return m_port;
}
if ((requestObject["method"] == "POST" || requestObject["method"] == "PUT")
&& headersObject.contains("Content-Length"))
void WebServer::close()
{
if (m_ctx) {
m_closing = 1;
{
bool ok = false;
uint contentLength = headersObject["Content-Length"].toUInt(&ok);
if (ok) {
contentLength += 1; // allow \0 at end
char * data = new char[contentLength];
int read = mg_read(conn, data, contentLength);
QByteArray rawData(data, read);
requestObject["rawData"] = rawData;
if (headersObject["Content-Type"] == "application/x-www-form-urlencoded") {
requestObject["post"] = UrlEncodedParser::parse(rawData);
}
// make sure we wake up all pending responses, such that mg_stop()
// can be called without deadlocking
QMutexLocker lock(&m_mutex);
QHash< WebServerResponse*, QWaitCondition* >::iterator it = m_pendingResponses.begin();
while(it != m_pendingResponses.end()) {
it.value()->wakeAll();
it = m_pendingResponses.erase(it);
}
}
mg_stop(m_ctx);
m_ctx = 0;
m_port.clear();
}
}
emit newRequest(requestObject, responseObj);
*handled = true;
bool WebServer::handleRequest(mg_event event, mg_connection *conn, const mg_request_info *request)
{
if (event != MG_NEW_REQUEST) {
return false;
}
if (m_closing) {
return false;
}
// Modelled after http://nodejs.org/docs/latest/api/http.html#http.ServerRequest
QVariantMap requestObject;
///TODO: encoding?!
if (request->request_method)
requestObject["method"] = QString::fromLocal8Bit(request->request_method);
if (request->http_version)
requestObject["httpVersion"] = QString::fromLocal8Bit(request->http_version);
if (request->status_code >=0)
requestObject["statusCode"] = request->status_code;
QByteArray uri(request->uri);
if (uri.startsWith('/'))
uri = '/' + QUrl::toPercentEncoding(QString::fromLatin1(request->uri + 1));
if (request->query_string)
uri.append('?').append(QByteArray(request->query_string));
requestObject["url"] = uri.data();
#if 0
// Non-standard and thus disable for the time being.
requestObject["isSSL"] = request->is_ssl;
requestObject["remoteIP"] = QHostAddress(request->remote_ip).toString();;
requestObject["remotePort"] = request->remote_port;
if (request->remote_user)
requestObject["remoteUser"] = QString::fromLocal8Bit(request->remote_user);
#endif
QVariantMap headersObject;
for (int i = 0; i < request->num_headers; ++i) {
QString key = QString::fromLocal8Bit(request->http_headers[i].name);
QString value = QString::fromLocal8Bit(request->http_headers[i].value);
headersObject[key] = value;
}
requestObject["headers"] = headersObject;
if ((requestObject["method"] == "POST" || requestObject["method"] == "PUT")
&& headersObject.contains("Content-Length"))
{
bool ok = false;
uint contentLength = headersObject["Content-Length"].toUInt(&ok);
if (ok) {
contentLength += 1; // allow \0 at end
char * data = new char[contentLength];
int read = mg_read(conn, data, contentLength);
QByteArray rawData(data, read);
requestObject["rawData"] = rawData;
if (headersObject["Content-Type"] == "application/x-www-form-urlencoded") {
requestObject["post"] = UrlEncodedParser::parse(rawData);
}
delete[] data;
}
}
WebServerResponse responseObject(conn);
responseObject.moveToThread(thread());
connect(&responseObject, SIGNAL(closing(WebServerResponse*)),
this, SLOT(responseClosed(WebServerResponse*)),
Qt::DirectConnection);
// emit signal that is catched by the PhantomJS callback
// then we wait until response.close() was called from
// the PhantomJS script. This is achieved by waiting
// on a condition that is woken up in WebServer::close()
// and WebServer::aboutToClose(WebServerResponse*)
QMutex m;
m.lock();
QWaitCondition wait;
{
if (m_closing) {
return false;
}
QMutexLocker lock(&m_mutex);
if (m_closing) {
return false;
}
m_pendingResponses[&responseObject] = &wait;
}
emit newRequest(requestObject, &responseObject);
wait.wait(&m);
return true;
}
void WebServer::responseClosed(WebServerResponse *response)
{
// wake up the condition that waits on this response
// if we are shutting down though, we can ignore this
// event and return early
if (m_closing) {
return;
}
*handled = false;
QMutexLocker lock(&m_mutex);
if (m_closing) {
return;
}
QHash< WebServerResponse*, QWaitCondition* >::iterator it = m_pendingResponses.find(response);
if (it == m_pendingResponses.end()) {
return;
}
it.value()->wakeAll();
m_pendingResponses.erase(it);
}
void WebServer::initCompletions()
@ -237,7 +285,6 @@ WebServerResponse::WebServerResponse(mg_connection *conn)
, m_statusCode(200)
, m_headersSent(false)
{
mg_detach(m_conn, 1);
}
@ -357,14 +404,11 @@ void WebServerResponse::write(const QString &body)
mg_write(m_conn, data.constData(), data.size());
}
void WebServerResponse::close()
{
mg_close_detached_connection(m_conn);
deleteLater();
emit closing(this);
}
int WebServerResponse::statusCode() const
{
return m_statusCode;

View File

@ -32,6 +32,8 @@
#define WEBSERVER_H
#include <QVariantMap>
#include <QWaitCondition>
#include <QMutex>
///TODO: is this ok, or should it be put into .cpp
/// can be done by introducing a WebServerPrivate *d;
@ -67,7 +69,7 @@ public slots:
*
* WARNING: must not be the same name as in the javascript api...
*/
bool listenOnPort(const QString &port);
bool listenOnPort(const QString &port, const QVariantMap& options);
/**
* @return the port this server is listening on
* or an empty string if the server is closed.
@ -81,17 +83,22 @@ signals:
/// @p request is a WebServerRequest, @p response is a WebServerResponse
void newRequest(QVariant request, QObject *response);
private slots:
void handleRequest(mg_event event, mg_connection* conn, const mg_request_info* request,
bool* handled);
public:
bool handleRequest(mg_event event, mg_connection *conn, const mg_request_info *request);
private:
virtual void initCompletions();
private slots:
void responseClosed(WebServerResponse*);
private:
Config *m_config;
mg_context *m_ctx;
QString m_port;
QMutex m_mutex;
QHash<WebServerResponse*, QWaitCondition*> m_pendingResponses;
QAtomicInt m_closing;
};
@ -114,8 +121,10 @@ public slots:
void write(const QString &data);
/**
* Closes the request once all writing if finished
* This MUST be called when done with a request!
* Closes the request once all data has been written to the client.
*
* NOTE: This MUST be called, otherwise the server will
* not allow new connections anymore.
*
* NOTE: After calling close(), this request object
* is no longer valid. Any further calls are
@ -138,6 +147,9 @@ public slots:
/// set all headers
void setHeaders(const QVariantMap &headers);
signals:
void closing(WebServerResponse *response);
private:
virtual void initCompletions();