- Introducing the long-awaited support for multiple listeners

in the same mTCP context.
	- Removed (the currently unused) STATIC_TABLE (hashtable)
	- Dynamic hash table is the default

- Listeners are stored in hash table, but the hash function is simply {nbo_port_no & (NUM_BINS - 1)}
master
Asim Jamshed 2016-11-11 00:27:34 +09:00
parent c362aa6851
commit f0be7628a8
13 changed files with 2744 additions and 1423 deletions

View File

@ -1,5 +1,5 @@
<!-- Creator : groff version 1.22.2 -->
<!-- CreationDate: Wed May 18 01:43:08 2016 -->
<!-- CreationDate: Fri Nov 4 00:43:16 2016 -->
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
"http://www.w3.org/TR/html4/loose.dtd">
<html>
@ -94,24 +94,35 @@ set appropriately.</p>
cellspacing="0" cellpadding="0">
<tr valign="top" align="left">
<td width="11%"></td>
<td width="12%">
<td width="15%">
<p style="margin-top: 1em"><b>EBADF</b></p></td>
<td width="3%"></td>
<td width="74%">
<p style="margin-top: 1em"><b>EADDRINUSE</b></p></td>
<td width="8%"></td>
<td width="66%">
<p style="margin-top: 1em"><i>sockid</i> is not a valid
socket descriptor.</p></td></tr>
<p style="margin-top: 1em">Another socket is already
listening on the same port.</p></td></tr>
<tr valign="top" align="left">
<td width="11%"></td>
<td width="12%">
<td width="15%">
<p><b>EBADF</b></p></td>
<td width="8%"></td>
<td width="66%">
<p><i>sockid</i> is not a valid socket descriptor.</p></td></tr>
<tr valign="top" align="left">
<td width="11%"></td>
<td width="15%">
<p><b>EINVAL</b></p></td>
<td width="3%"></td>
<td width="74%">
<td width="8%"></td>
<td width="66%">
<p>The <i>backlog</i> argument is either &lt;= 0 or exceeds
@ -120,24 +131,24 @@ startup configuration file (see <b>mtcp_init()</b> for
details).</p> </td></tr>
<tr valign="top" align="left">
<td width="11%"></td>
<td width="12%">
<td width="15%">
<p><b>ENOMEM</b></p></td>
<td width="3%"></td>
<td width="74%">
<td width="8%"></td>
<td width="66%">
<p>There is not enough available memory to create the
listening <i>backlog</i> queue.</p></td></tr>
<tr valign="top" align="left">
<td width="11%"></td>
<td width="12%">
<td width="15%">
<p><b>ENOTSOCK</b></p></td>
<td width="3%"></td>
<td width="74%">
<td width="8%"></td>
<td width="66%">
<p>The socket referred to by <i>sockid</i> is not

View File

@ -38,12 +38,17 @@ of failure,
is set appropriately.
.\"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
.SH ERRORS
.TP 10
.TP 15
.B "EADDRINUSE"
Another socket is already listening on the same port.
.TP 15
.B "EBADF"
.I "sockid"
is not a valid socket descriptor.
.TP 10
.TP 15
.B "EINVAL"
The
.I "backlog"
@ -52,13 +57,13 @@ limit as set by the mTCP startup configuration file (see
.BR mtcp_init()
for details).
.TP 10
.TP 15
.B "ENOMEM"
There is not enough available memory to create the listening
.I "backlog"
queue.
.TP 10
.TP 15
.B "ENOTSOCK"
The socket referred to by
.I "sockid"

View File

@ -4,6 +4,7 @@
#include "addr_pool.h"
#include "rss.h"
#include "debug.h"
#include "config.h"
#define MIN_PORT (1025)
#define MAX_PORT (65535 + 1)
@ -115,8 +116,6 @@ CreateAddressPoolPerCore(int core, int num_queues,
uint32_t saddr_h, daddr_h;
uint16_t sport_h, dport_h;
int rss_core;
uint8_t endian_check = (current_iomodule_func == &dpdk_module_func) ?
0 : 1;
ap = (addr_pool_t)calloc(1, sizeof(struct addr_pool));
if (!ap)
@ -164,7 +163,7 @@ CreateAddressPoolPerCore(int core, int num_queues,
break;
sport_h = j;
rss_core = GetRSSCPUCore(daddr_h, saddr_h, dport_h, sport_h, num_queues, endian_check);
rss_core = GetRSSCPUCore(daddr_h, saddr_h, dport_h, sport_h, num_queues);
if (rss_core != core)
continue;
@ -180,10 +179,10 @@ CreateAddressPoolPerCore(int core, int num_queues,
ap->num_free = cnt;
ap->num_used = 0;
//fprintf(stderr, "CPU %d: Created %d address entries.\n", core, cnt);
if (ap->num_entry < CONFIG.max_concurrency) {
if (ap->num_entry < g_config.mos->max_concurrency) {
fprintf(stderr, "[WARINING] Available # addresses (%d) is smaller than"
" the max concurrency (%d).\n",
ap->num_entry, CONFIG.max_concurrency);
ap->num_entry, g_config.mos->max_concurrency);
}
pthread_mutex_unlock(&ap->lock);
@ -219,8 +218,6 @@ FetchAddress(addr_pool_t ap, int core, int num_queues,
struct addr_entry *walk, *next;
int rss_core;
int ret = -1;
uint8_t endian_check = (current_iomodule_func == &dpdk_module_func) ?
0 : 1;
if (!ap || !daddr || !saddr)
return -1;
@ -244,8 +241,8 @@ FetchAddress(addr_pool_t ap, int core, int num_queues,
}
rss_core = GetRSSCPUCore(ntohl(walk->addr.sin_addr.s_addr),
ntohl(daddr->sin_addr.s_addr), ntohs(walk->addr.sin_port),
ntohs(daddr->sin_port), num_queues, endian_check);
ntohl(daddr->sin_addr.s_addr), ntohs(walk->addr.sin_port),
ntohs(daddr->sin_port), num_queues);
if (core == rss_core)
break;

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -10,11 +10,93 @@
#include "debug.h"
#include "fhash.h"
//#include "stdint.h" /* Replace with <stdint.h> if appropriate */
#undef get16bits
#if (defined(__GNUC__) && defined(__i386__)) || defined(__WATCOMC__) \
|| defined(_MSC_VER) || defined (__BORLANDC__) || defined (__TURBOC__)
#define get16bits(d) (*((const uint16_t *) (d)))
#endif
#if !defined (get16bits)
#define get16bits(d) ((((uint32_t)(((const uint8_t *)(d))[1])) << 8)\
+(uint32_t)(((const uint8_t *)(d))[0]) )
#endif
inline uint32_t
SuperFastHash (const char * data, int len) {
register uint32_t hash = len, tmp;
int rem;
if (len <= 0 || data == NULL) return 0;
rem = len & 3;
len >>= 2;
/* Main loop */
for (;len > 0; len--) {
hash += get16bits (data);
tmp = (get16bits (data+2) << 11) ^ hash;
hash = (hash << 16) ^ tmp;
data += 2*sizeof (uint16_t);
hash += hash >> 11;
}
/* Handle end cases */
switch (rem) {
case 3: hash += get16bits (data);
hash ^= hash << 16;
hash ^= ((signed char)data[sizeof (uint16_t)]) << 18;
hash += hash >> 11;
break;
case 2: hash += get16bits (data);
hash ^= hash << 11;
hash += hash >> 17;
break;
case 1: hash += (signed char)*data;
hash ^= hash << 10;
hash += hash >> 1;
}
/* Force "avalanching" of final 127 bits */
hash ^= hash << 3;
hash += hash >> 5;
hash ^= hash << 4;
hash += hash >> 17;
hash ^= hash << 25;
hash += hash >> 6;
return hash;
}
/*----------------------------------------------------------------------------*/
inline unsigned int
HashFlow(const tcp_stream *flow)
{
#if 0
register unsigned int hash, i;
char *key = (char *)&flow->saddr;
for (hash = i = 0; i < 12; ++i) {
hash += key[i];
hash += (hash << 10);
hash ^= (hash >> 6);
}
hash += (hash << 3);
hash ^= (hash >> 11);
hash += (hash << 15);
return hash & (NUM_BINS - 1);
#else
return SuperFastHash((const char *)&flow->saddr, 12) & (NUM_BINS - 1);
#endif
}
/*---------------------------------------------------------------------------*/
#define EQUAL_FLOW(flow1, flow2) \
(flow1->saddr == flow2->saddr && flow1->sport == flow2->sport && \
flow1->daddr == flow2->daddr && flow1->dport == flow2->dport)
/*---------------------------------------------------------------------------*/
struct hashtable *
CreateHashtable(unsigned int (*hashfn) (const tcp_stream *), // key function
int (*eqfn) (const tcp_stream*,
const tcp_stream *)) // equality
CreateHashtable(void) // equality
{
int i;
struct hashtable* ht = calloc(1, sizeof(struct hashtable));
@ -23,9 +105,6 @@ CreateHashtable(unsigned int (*hashfn) (const tcp_stream *), // key function
return 0;
}
ht->hashfn = hashfn;
ht->eqfn = eqfn;
/* init the tables */
for (i = 0; i < NUM_BINS; i++)
TAILQ_INIT(&ht->ht_table[i]);
@ -39,7 +118,7 @@ DestroyHashtable(struct hashtable *ht)
}
/*----------------------------------------------------------------------------*/
int
HTInsert(struct hashtable *ht, tcp_stream *item)
HTInsert(struct hashtable *ht, tcp_stream *item, unsigned int *hash)
{
/* create an entry*/
int idx;
@ -47,10 +126,15 @@ HTInsert(struct hashtable *ht, tcp_stream *item)
assert(ht);
assert(ht->ht_count <= 65535); // uint16_t ht_count
idx = ht->hashfn(item);
if (hash)
idx = (int)*hash;
else
idx = HashFlow(item);
assert(idx >=0 && idx < NUM_BINS);
#if STATIC_TABLE
int i;
for (i = 0; i < TCP_AR_CNT; i++) {
// insert into empty array slot
if (!ht->ht_array[idx][i]) {
@ -65,6 +149,7 @@ HTInsert(struct hashtable *ht, tcp_stream *item)
#endif
TAILQ_INSERT_TAIL(&ht->ht_table[idx], item, rcvvar->he_link);
item->rcvvar->he_mybucket = &ht->ht_table[idx];
item->ht_idx = TCP_AR_CNT;
ht->ht_count++;
@ -75,7 +160,7 @@ void*
HTRemove(struct hashtable *ht, tcp_stream *item)
{
hash_bucket_head *head;
int idx = ht->hashfn(item);
//int idx = HashFlow(item);
#if STATIC_TABLE
if (item->ht_idx < TCP_AR_CNT) {
@ -83,7 +168,9 @@ HTRemove(struct hashtable *ht, tcp_stream *item)
ht->ht_array[idx][item->ht_idx] = NULL;
} else {
#endif
head = &ht->ht_table[idx];
//head = &ht->ht_table[idx];
head = item->rcvvar->he_mybucket;
assert(head);
TAILQ_REMOVE(head, item, rcvvar->he_link);
#if STATIC_TABLE
}
@ -94,30 +181,35 @@ HTRemove(struct hashtable *ht, tcp_stream *item)
}
/*----------------------------------------------------------------------------*/
tcp_stream*
HTSearch(struct hashtable *ht, const tcp_stream *item)
HTSearch(struct hashtable *ht, const tcp_stream *item, unsigned int *hash)
{
int idx;
tcp_stream *walk;
hash_bucket_head *head;
idx = ht->hashfn(item);
int idx;
#if STATIC_TABLE
int i;
idx = HashFlow(item);
for (i = 0; i < TCP_AR_CNT; i++) {
if (ht->ht_array[idx][i]) {
if (ht->eqfn(ht->ht_array[idx][i], item))
if (EQUAL_FLOW(ht->ht_array[idx][i], item))
return ht->ht_array[idx][i];
}
}
#endif
head = &ht->ht_table[ht->hashfn(item)];
idx = HashFlow(item);
*hash = idx;
head = &ht->ht_table[idx];
TAILQ_FOREACH(walk, head, rcvvar->he_link) {
if (ht->eqfn(walk, item))
if (EQUAL_FLOW(walk, item))
return walk;
}
UNUSED(idx);
return NULL;
}
/*----------------------------------------------------------------------------*/

View File

@ -4,6 +4,8 @@
#include <netinet/in.h>
#include <sys/queue.h>
#define MIN_PORT (1025)
#define MAX_PORT (65535 + 1)
/*----------------------------------------------------------------------------*/
typedef struct addr_pool *addr_pool_t;
/*----------------------------------------------------------------------------*/

View File

@ -4,39 +4,42 @@
#include <sys/queue.h>
#include "tcp_stream.h"
#define NUM_BINS (131072) /* 132 K entries per thread*/
#define TCP_AR_CNT (3)
#define STATIC_TABLE FALSE
#define NUM_BINS_FLOWS (131072) /* 132 K entries per thread*/
#define NUM_BINS_LISTENERS (1024) /* assuming that chaining won't happen excessively */
#define TCP_AR_CNT (3)
typedef struct hash_bucket_head {
tcp_stream *tqh_first;
tcp_stream **tqh_last;
void *tqh_first;
void **tqh_last;
} hash_bucket_head;
/* hashtable structure */
struct hashtable {
uint8_t ht_count ; // count for # entry
uint32_t bins;
#if STATIC_TABLE
tcp_stream* ht_array[NUM_BINS][TCP_AR_CNT];
#endif
hash_bucket_head ht_table[NUM_BINS];
hash_bucket_head *ht_table;
// functions
unsigned int (*hashfn) (const tcp_stream *);
int (*eqfn) (const tcp_stream *, const tcp_stream *);
unsigned int (*hashfn) (const void *);
int (*eqfn) (const void *, const void *);
};
/*functions for hashtable*/
struct hashtable *CreateHashtable(unsigned int (*hashfn) (const tcp_stream*),
int (*eqfn) (const tcp_stream*,
const tcp_stream *));
struct hashtable *CreateHashtable(unsigned int (*hashfn) (const void *),
int (*eqfn) (const void *,
const void *),
int bins);
void DestroyHashtable(struct hashtable *ht);
int HTInsert(struct hashtable *ht, tcp_stream *);
void* HTRemove(struct hashtable *ht, tcp_stream *);
tcp_stream* HTSearch(struct hashtable *ht, const tcp_stream *);
int StreamHTInsert(struct hashtable *ht, void *);
void* StreamHTRemove(struct hashtable *ht, void *);
void *StreamHTSearch(struct hashtable *ht, const void *);
unsigned int HashListener(const void *hbo_port_ptr);
int EqualListener(const void *hbo_port_ptr1, const void *hbo_port_ptr2);
int ListenerHTInsert(struct hashtable *ht, void *);
void *ListenerHTRemove(struct hashtable *ht, void *);
void *ListenerHTSearch(struct hashtable *ht, const void *);
#endif /* __FHASH_H_ */

View File

@ -204,7 +204,7 @@ struct mtcp_manager
struct mtcp_epoll *ep;
uint32_t ts_last_event;
struct tcp_listener *listener;
struct hashtable *listeners;
stream_queue_t connectq; /* streams need to connect */
stream_queue_t sendq; /* streams need to send data */

View File

@ -55,6 +55,8 @@ struct tcp_listener
pthread_mutex_t accept_lock;
pthread_cond_t accept_cond;
TAILQ_ENTRY(tcp_listener) he_link; /* hash table entry link */
};
/*----------------------------------------------------------------------------*/

View File

@ -197,10 +197,10 @@ extern inline char *
TCPStateToString(const tcp_stream *cur_stream);
unsigned int
HashFlow(const tcp_stream *flow);
HashFlow(const void *flow);
int
EqualFlow(const tcp_stream *flow1, const tcp_stream *flow2);
EqualFlow(const void *flow1, const void *flow2);
extern inline int
AddEpollEvent(struct mtcp_epoll *ep,

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff