Api: add packet dispatch callback function register.

In some cases, for example, packets are forwarded to your server through
IP tunnel, and they will be received on fixed queues, since RSS doesn't support tunnels.So we need to dispatch them again.

With this commit, we can implement a dispatcher callback function and regist
it, packets retrieved from rx queue will be dispatched again according to
the dispatcher result.
dev
logwang 2017-11-09 21:22:13 +08:00
parent c855fce65e
commit eb5902d97d
3 changed files with 97 additions and 36 deletions

View File

@ -50,6 +50,8 @@ int ff_init(int argc, char * const argv[]);
void ff_run(loop_func_t loop, void *arg);
/* POSIX-LIKE api begin */
int ff_fcntl(int fd, int cmd, ...);
int ff_sysctl(const int *name, u_int namelen, void *oldp, size_t *oldlenp,
@ -109,8 +111,13 @@ int ff_kevent_do_each(int kq, const struct kevent *changelist, int nchanges,
int ff_gettimeofday(struct timeval *tv, struct timezone *tz);
/* POSIX-LIKE api end */
/* Tests if fd is used by F-Stack */
extern int ff_fdisused(int fd);
/* route api begin */
enum FF_ROUTE_CTL {
FF_ROUTE_ADD,
@ -133,6 +140,34 @@ int ff_route_ctl(enum FF_ROUTE_CTL req, enum FF_ROUTE_FLAG flag,
/* route api end */
/* dispatch api begin */
/*
* Packet dispatch callback function.
* Implemented by user.
*
* @param data
* The data pointer of the packet.
* @param len
* The length of the packet.
* @param nb_queues
* Number of queues to be dispatched.
*
* @return 0 to (nb_queues - 1)
* The queue id that the packet will be dispatched to.
* @return -1
* Error occurs or packet is handled by user, packet will be freed.
*
*/
typedef int (*dispatch_func_t)(void *data, uint16_t len, uint16_t nb_queues);
/* regist a packet dispath function */
void ff_regist_packet_dispatcher(dispatch_func_t func);
/* dispatch api end */
/* internal api begin */
/*

View File

@ -61,7 +61,7 @@
#define MEMPOOL_CACHE_SIZE 256
#define ARP_RING_SIZE 2048
#define DISPATCH_RING_SIZE 2048
#define MSG_RING_SIZE 32
@ -162,7 +162,8 @@ static struct lcore_conf lcore_conf;
static struct rte_mempool *pktmbuf_pool[NB_SOCKETS];
static struct rte_ring **arp_ring[RTE_MAX_ETHPORTS];
static struct rte_ring **dispatch_ring[RTE_MAX_ETHPORTS];
static dispatch_func_t packet_dispatcher;
static uint16_t rss_reta_size[RTE_MAX_ETHPORTS];
@ -337,6 +338,10 @@ init_lcore_conf(void)
lcore_conf.nb_queue_list[port_id] = pconf->nb_lcores;
}
if (lcore_conf.nb_rx_queue == 0) {
rte_exit(EXIT_FAILURE, "lcore %u has nothing to do\n", lcore_id);
}
return 0;
}
@ -355,7 +360,7 @@ init_mem_pool(void)
nb_lcores*MEMPOOL_CACHE_SIZE +
nb_ports*KNI_MBUF_MAX +
nb_ports*KNI_QUEUE_SIZE +
nb_lcores*nb_ports*ARP_RING_SIZE),
nb_lcores*nb_ports*DISPATCH_RING_SIZE),
(unsigned)8192);
unsigned socketid = 0;
@ -418,7 +423,7 @@ create_ring(const char *name, unsigned count, int socket_id, unsigned flags)
}
static int
init_arp_ring(void)
init_dispatch_ring(void)
{
int j;
char name_buf[RTE_RING_NAMESIZE];
@ -432,28 +437,29 @@ init_arp_ring(void)
uint16_t portid = ff_global_cfg.dpdk.portid_list[j];
struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[portid];
int nb_queues = pconf->nb_lcores;
if (arp_ring[portid] == NULL) {
if (dispatch_ring[portid] == NULL) {
snprintf(name_buf, RTE_RING_NAMESIZE, "ring_ptr_p%d", portid);
arp_ring[portid] = rte_zmalloc(name_buf,
sizeof(struct rte_ring *) * nb_queues,
RTE_CACHE_LINE_SIZE);
if (arp_ring[portid] == NULL) {
dispatch_ring[portid] = rte_zmalloc(name_buf,
sizeof(struct rte_ring *) * nb_queues,
RTE_CACHE_LINE_SIZE);
if (dispatch_ring[portid] == NULL) {
rte_exit(EXIT_FAILURE, "rte_zmalloc(%s (struct rte_ring*)) "
"failed\n", name_buf);
"failed\n", name_buf);
}
}
for(queueid = 0; queueid < nb_queues; ++queueid) {
snprintf(name_buf, RTE_RING_NAMESIZE, "arp_ring_p%d_q%d", portid, queueid);
arp_ring[portid][queueid] = create_ring(name_buf, ARP_RING_SIZE,
socketid, RING_F_SC_DEQ);
snprintf(name_buf, RTE_RING_NAMESIZE, "dispatch_ring_p%d_q%d",
portid, queueid);
dispatch_ring[portid][queueid] = create_ring(name_buf,
DISPATCH_RING_SIZE, socketid, RING_F_SC_DEQ);
if (arp_ring[portid][queueid] == NULL)
if (dispatch_ring[portid][queueid] == NULL)
rte_panic("create ring:%s failed!\n", name_buf);
printf("create ring:%s success, %u ring entries are now free!\n",
name_buf, rte_ring_free_count(arp_ring[portid][queueid]));
name_buf, rte_ring_free_count(dispatch_ring[portid][queueid]));
}
}
@ -807,7 +813,7 @@ ff_dpdk_init(int argc, char **argv)
init_mem_pool();
init_arp_ring();
init_dispatch_ring();
init_msg_ring();
@ -897,38 +903,56 @@ process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
uint16_t count, const struct ff_dpdk_if_context *ctx, int pkts_from_ring)
{
struct lcore_conf *qconf = &lcore_conf;
uint16_t nb_queues = qconf->nb_queue_list[port_id];
uint16_t i;
for (i = 0; i < count; i++) {
struct rte_mbuf *rtem = bufs[i];
if (unlikely(qconf->pcap[port_id] != NULL)) {
ff_dump_packets(qconf->pcap[port_id], rtem);
if (!pkts_from_ring) {
ff_dump_packets(qconf->pcap[port_id], rtem);
}
}
void *data = rte_pktmbuf_mtod(rtem, void*);
uint16_t len = rte_pktmbuf_data_len(rtem);
if (!pkts_from_ring && packet_dispatcher) {
int ret = (*packet_dispatcher)(data, len, nb_queues);
if (ret < 0 || ret >= nb_queues) {
rte_pktmbuf_free(rtem);
continue;
}
if (ret != queue_id) {
ret = rte_ring_enqueue(dispatch_ring[port_id][ret], rtem);
if (ret < 0)
rte_pktmbuf_free(rtem);
continue;
}
}
enum FilterReturn filter = protocol_filter(data, len);
if (filter == FILTER_ARP) {
struct rte_mempool *mbuf_pool;
struct rte_mbuf *mbuf_clone;
if (pkts_from_ring == 0) {
uint16_t i;
uint16_t nb_queues = qconf->nb_queue_list[port_id];
for(i = 0; i < nb_queues; ++i) {
if(i == queue_id)
if (!pkts_from_ring) {
uint16_t j;
for(j = 0; j < nb_queues; ++j) {
if(j == queue_id)
continue;
unsigned socket_id = 0;
if (numa_on) {
uint16_t lcore_id = qconf->port_cfgs[port_id].lcore_list[i];
uint16_t lcore_id = qconf->port_cfgs[port_id].lcore_list[j];
socket_id = rte_lcore_to_socket_id(lcore_id);
}
mbuf_pool = pktmbuf_pool[socket_id];
mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
if(mbuf_clone) {
int ret = rte_ring_enqueue(arp_ring[port_id][i], mbuf_clone);
int ret = rte_ring_enqueue(dispatch_ring[port_id][j], mbuf_clone);
if (ret < 0)
rte_pktmbuf_free(mbuf_clone);
}
@ -954,12 +978,12 @@ process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
}
static inline int
process_arp_ring(uint8_t port_id, uint16_t queue_id,
process_dispatch_ring(uint8_t port_id, uint16_t queue_id,
struct rte_mbuf **pkts_burst, const struct ff_dpdk_if_context *ctx)
{
/* read packet from ring buf and to process */
uint16_t nb_rb;
nb_rb = rte_ring_dequeue_burst(arp_ring[port_id][queue_id],
nb_rb = rte_ring_dequeue_burst(dispatch_ring[port_id][queue_id],
(void **)pkts_burst, MAX_PKT_BURST);
if(nb_rb > 0) {
@ -1233,7 +1257,6 @@ main_loop(void *arg)
struct loop_routine *lr = (struct loop_routine *)arg;
struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
unsigned lcore_id;
uint64_t prev_tsc, diff_tsc, cur_tsc, usch_tsc, div_tsc, usr_tsc, sys_tsc, end_tsc;
int i, j, nb_rx, idle;
uint8_t port_id, queue_id;
@ -1245,14 +1268,8 @@ main_loop(void *arg)
prev_tsc = 0;
usch_tsc = 0;
lcore_id = rte_lcore_id();
qconf = &lcore_conf;
if (qconf->nb_rx_queue == 0) {
printf("lcore %u has nothing to do\n", lcore_id);
return 0;
}
while (1) {
cur_tsc = rte_rdtsc();
if (unlikely(freebsd_clock.expire < cur_tsc)) {
@ -1296,7 +1313,7 @@ main_loop(void *arg)
ff_kni_process(port_id, queue_id, pkts_burst, MAX_PKT_BURST);
}
process_arp_ring(port_id, queue_id, pkts_burst, ctx);
process_dispatch_ring(port_id, queue_id, pkts_burst, ctx);
nb_rx = rte_eth_rx_burst(port_id, queue_id, pkts_burst,
MAX_PKT_BURST);
@ -1350,6 +1367,8 @@ main_loop(void *arg)
ff_status.loops++;
}
return 0;
}
int
@ -1446,3 +1465,9 @@ ff_rss_check(void *softc, uint32_t saddr, uint32_t daddr,
return ((hash & (reta_size - 1)) % nb_queues) == queueid;
}
void
ff_regist_packet_dispatcher(dispatch_func_t func)
{
packet_dispatcher = func;
}

View File

@ -257,8 +257,9 @@ protocol_filter_ip(const void *data, uint16_t len)
const struct ipv4_hdr *hdr;
hdr = (const struct ipv4_hdr *)data;
void *next = (void *)data + sizeof(struct ipv4_hdr);
uint16_t next_len = len - sizeof(struct ipv4_hdr);
int hdr_len = (hdr->version_ihl & 0x0f) << 2;
void *next = (void *)data + hdr_len;
uint16_t next_len = len - hdr_len;
switch (hdr->next_proto_id) {
case IPPROTO_TCP: