diff --git a/README.md b/README.md index 919eecaa..0ecd9a8b 100644 --- a/README.md +++ b/README.md @@ -34,12 +34,12 @@ Currently, besides authorized DNS server of DNSPod, there are various products i mkdir -p /data/f-stack git clone https://github.com/F-Stack/f-stack.git /data/f-stack - # install libnuma-dev + # Install libnuma-dev yum install numactl-devel # on Centos #sudo apt-get install libnuma-dev # on Ubuntu cd f-stack - # compile DPDK + # Compile DPDK cd dpdk/usertools ./dpdk-setup.sh # compile with x86_64-native-linuxapp-gcc @@ -55,10 +55,13 @@ Currently, besides authorized DNS server of DNSPod, there are various products i mkdir /mnt/huge mount -t hugetlbfs nodev /mnt/huge - # close ASLR; it is necessary in multiple process + # Close ASLR; it is necessary in multiple process echo 0 > /proc/sys/kernel/randomize_va_space - # offload NIC + # Install python for running DPDK python scripts + sudo apt install python # On ubuntu + + # Offload NIC modprobe uio insmod /data/f-stack/dpdk/x86_64-native-linuxapp-gcc/kmod/igb_uio.ko insmod /data/f-stack/dpdk/x86_64-native-linuxapp-gcc/kmod/rte_kni.ko carrier=on # carrier=on is necessary, otherwise need to be up `veth0` via `echo 1 > /sys/class/net/veth0/carrier` @@ -66,20 +69,23 @@ Currently, besides authorized DNS server of DNSPod, there are various products i ifconfig eth0 down python dpdk-devbind.py --bind=igb_uio eth0 # assuming that use 10GE NIC and eth0 - # install DPDK + # Install DPDK cd ../x86_64-native-linuxapp-gcc make install # On Ubuntu, use gawk instead of the default mawk. #sudo apt-get install gawk # or execute `sudo update-alternatives --config awk` to choose gawk. + # Install dependencies for F-Stack + sudo apt install gcc make libssl-dev # On ubuntu + # Compile F-Stack export FF_PATH=/data/f-stack export FF_DPDK=/data/f-stack/dpdk/x86_64-native-linuxapp-gcc cd ../../lib/ make - # install F-STACK + # Install F-STACK # libfstack.a will be installed to /usr/local/lib # ff_*.h will be installed to /usr/local/include # start.sh will be installed to /usr/local/bin/ff_start diff --git a/app/micro_thread/micro_thread.cpp b/app/micro_thread/micro_thread.cpp index 55f494ee..452b5537 100644 --- a/app/micro_thread/micro_thread.cpp +++ b/app/micro_thread/micro_thread.cpp @@ -49,7 +49,7 @@ Thread::Thread(int stack_size) memset(&_jmpbuf, 0, sizeof(_jmpbuf)); } - +static DefaultLogAdapter def_log_adapt; /** * @brief LINUX x86/x86_64's allocated stacks. */ @@ -75,7 +75,7 @@ bool Thread::InitStack() void* vaddr = mmap(NULL, memsize, PROT_READ | PROT_WRITE, mmap_flags, zero_fd, 0); if (vaddr == (void *)MAP_FAILED) { - MTLOG_ERROR("mmap stack failed, size %d", memsize); + MTLOG_ERROR("mmap stack failed, size %d,errmsg: %s.", memsize,strerror(errno)); free(_stack); _stack = NULL; return false; @@ -130,6 +130,12 @@ void Thread::SwitchContext() } } + +int Thread::SaveContext() +{ + return save_context(_jmpbuf); +} + void Thread::RestoreContext() { restore_context(_jmpbuf, 1); @@ -375,6 +381,7 @@ void ScheduleObj::ScheduleStartRun() unsigned int ThreadPool::default_thread_num = DEFAULT_THREAD_NUM; ///< 2000 micro threads. +unsigned int ThreadPool::last_default_thread_num = DEFAULT_THREAD_NUM; ///< 2000 micro threads. unsigned int ThreadPool::default_stack_size = DEFAULT_STACK_SIZE; ///< 128k stack. bool ThreadPool::InitialPool(int max_num) @@ -442,6 +449,7 @@ MicroThread* ThreadPool::AllocThread() if (_total_num >= _max_num) { MT_ATTR_API(361140, 1); // no more quota + MTLOG_ERROR("total %d is outof max: %d", _total_num,_max_num); return NULL; } @@ -455,6 +463,12 @@ MicroThread* ThreadPool::AllocThread() } _total_num++; _use_num++; + if(_use_num >(int) default_thread_num){ + if(((int) default_thread_num * 2 )< _max_num){ + last_default_thread_num = default_thread_num; + default_thread_num = default_thread_num * 2; + } + } return thread; } @@ -475,6 +489,10 @@ void ThreadPool::FreeThread(MicroThread* thread) thread->Destroy(); delete thread; _total_num--; + if(default_thread_num / 2 >= DEFAULT_THREAD_NUM){ + last_default_thread_num = default_thread_num; + default_thread_num = default_thread_num / 2; + } } } @@ -500,7 +518,11 @@ void MtFrame::SetHookFlag() { bool MtFrame::InitFrame(LogAdapter* logadpt, int max_thread_num) { - _log_adpt = logadpt; + if(logadpt == NULL){ + _log_adpt = &def_log_adapt; + }else{ + _log_adpt = logadpt; + } if ((this->InitKqueue(max_thread_num) < 0) || !this->InitialPool(max_thread_num)) { @@ -851,6 +873,42 @@ void MtFrame::WaitNotify(utime64_t timeout) thread->SwitchContext(); } +void MtFrame::NotifyThread(MicroThread* thread) +{ + if(thread == NULL){ + return; + } + MicroThread* cur_thread = GetActiveThread(); + if (thread->HasFlag(MicroThread::IO_LIST)) + { + this->RemoveIoWait(thread); + if(cur_thread == this->DaemonThread()){ + // 这里不直接切的话,还是不及时,会导致目标线程等待到超时 + if(cur_thread->SaveContext() == 0){ + this->SetActiveThread(thread); + thread->SetState(MicroThread::RUNNING); + thread->RestoreContext(); + } + }else{ + this->InsertRunable(thread); + } + } +} + +void MtFrame::SwapDaemonThread() +{ + MicroThread* thread = GetActiveThread(); + MicroThread* daemon_thread = this->DaemonThread(); + if(thread != daemon_thread){ + if(thread->SaveContext() == 0){ + this->InsertRunable(thread); + this->SetActiveThread(daemon_thread); + daemon_thread->SetState(MicroThread::RUNNING); + daemon_thread->RestoreContext(); + } + } +} + bool MtFrame::KqueueSchedule(KqObjList* fdlist, KqueuerObj* fd, int timeout) { MicroThread* thread = GetActiveThread(); diff --git a/app/micro_thread/micro_thread.h b/app/micro_thread/micro_thread.h index e91bdb11..9aa97231 100644 --- a/app/micro_thread/micro_thread.h +++ b/app/micro_thread/micro_thread.h @@ -40,6 +40,7 @@ #include #include #include +#include #include #include @@ -56,8 +57,9 @@ namespace NS_MICRO_THREAD { #define STACK_PAD_SIZE 128 #define MEM_PAGE_SIZE 4096 -#define DEFAULT_STACK_SIZE 128*1024 -#define DEFAULT_THREAD_NUM 2000 +#define DEFAULT_STACK_SIZE STACK_PAD_SIZE * 1024 +#define DEFAULT_THREAD_NUM 5000 +#define MAX_THREAD_NUM 800000 typedef unsigned long long utime64_t; typedef void (*ThreadStart)(void*); @@ -119,6 +121,8 @@ public: void SwitchContext(void); + int SaveContext(void); + void RestoreContext(void); utime64_t GetWakeupTime(void) { @@ -317,12 +321,51 @@ public: }; +class DefaultLogAdapter :public LogAdapter +{ +public: + + + bool CheckDebug(){ return false;}; + bool CheckTrace(){ return false;}; + bool CheckError(){ return false;}; + + inline void LogDebug(char* fmt, ...){ + va_list args; + char szBuff[1024]; + va_start(args, fmt); + memset(szBuff, 0, sizeof(szBuff)); + vsprintf(szBuff, fmt, args); + va_end(args); + printf("%s\n",szBuff); + }; + inline void LogTrace(char* fmt, ...){ + va_list args; + char szBuff[1024]; + va_start(args, fmt); + memset(szBuff, 0, sizeof(szBuff)); + vsprintf(szBuff, fmt, args); + va_end(args); + printf("%s\n",szBuff); + }; + inline void LogError(char* fmt, ...){ + va_list args; + char szBuff[1024]; + va_start(args, fmt); + memset(szBuff, 0, sizeof(szBuff)); + vsprintf(szBuff, fmt, args); + va_end(args); + printf("%s\n",szBuff); + }; + +}; class ThreadPool { public: static unsigned int default_thread_num; + static unsigned int last_default_thread_num; static unsigned int default_stack_size; static void SetDefaultThreadNum(unsigned int num) { @@ -403,7 +446,7 @@ public: MicroThread *GetRootThread(); - bool InitFrame(LogAdapter* logadpt = NULL, int max_thread_num = 50000); + bool InitFrame(LogAdapter* logadpt = NULL, int max_thread_num = MAX_THREAD_NUM); void SetHookFlag(); @@ -441,6 +484,10 @@ public: void WaitNotify(utime64_t timeout); + void NotifyThread(MicroThread* thread); + + void SwapDaemonThread(); + void RemoveIoWait(MicroThread* thread); void InsertRunable(MicroThread* thread); diff --git a/app/micro_thread/mt_api.cpp b/app/micro_thread/mt_api.cpp index 474685fd..586f9a4a 100644 --- a/app/micro_thread/mt_api.cpp +++ b/app/micro_thread/mt_api.cpp @@ -645,6 +645,26 @@ void* mt_start_thread(void* entry, void* args) return MtFrame::Instance()->CreateThread((ThreadStart)entry, args, true); } +void* mt_active_thread() +{ + return MtFrame::Instance()->GetActiveThread(); +} + +void mt_thread_wait(int ms) +{ + MtFrame::Instance()->WaitNotify(ms); +} + +void mt_thread_wakeup_wait(void * thread_p) +{ + MtFrame::Instance()->NotifyThread((MicroThread *) thread_p); +} + +void mt_swap_thread() +{ + return MtFrame::Instance()->SwapDaemonThread(); +} + #define BUF_ALIGNMENT_SIZE 4096 #define BUF_ALIGN_SIZE(x) (((x)+BUF_ALIGNMENT_SIZE-1)&~(BUF_ALIGNMENT_SIZE-1)) #define BUF_DEFAULT_SIZE 4096 diff --git a/app/micro_thread/mt_api.h b/app/micro_thread/mt_api.h index fe1e14a5..dfc3e64c 100644 --- a/app/micro_thread/mt_api.h +++ b/app/micro_thread/mt_api.h @@ -134,6 +134,14 @@ int mt_wait_events(int fd, int events, int timeout); void* mt_start_thread(void* entry, void* args); +void* mt_active_thread(); + +void mt_thread_wait(int ms); + +void mt_thread_wakeup_wait(void * thread_p); + +void mt_swap_thread(); + } #endif diff --git a/config.ini b/config.ini index 5440f8ff..3de82e43 100644 --- a/config.ini +++ b/config.ini @@ -61,6 +61,14 @@ nb_vdev=0 # Number of bond. nb_bond=0 +# Each core write into own pcap file, which is open one time, close one time if enough. +# Support dump the first snaplen bytes of each packet. +# if pcap file is lager than savelen bytes, it will be closed and next file was dumped into. +[pcap] +enable = 0 +snaplen= 96 +savelen= 16777216 + # Port config section # Correspond to dpdk.port_list's index: port0, port1... [port0] @@ -99,7 +107,7 @@ gateway=192.168.1.1 # bond config section # See http://doc.dpdk.org/guides/prog_guide/link_bonding_poll_mode_drv_lib.html -[bond0] +#[bond0] #mode=4 #slave=0000:0a:00.0,slave=0000:0a:00.1 #primary=0000:0a:00.0 @@ -171,3 +179,4 @@ net.inet.tcp.delayed_ack=0 net.inet.udp.blackhole=1 net.inet.ip.redirect=0 +net.inet.ip.forwarding=0 diff --git a/lib/ff_config.c b/lib/ff_config.c index 1741caa7..de7cd739 100644 --- a/lib/ff_config.c +++ b/lib/ff_config.c @@ -617,6 +617,16 @@ ini_parse_handler(void* user, const char* section, const char* name, return vdev_cfg_handler(pconfig, section, name, value); } else if (strncmp(section, "bond", 4) == 0) { return bond_cfg_handler(pconfig, section, name, value); + } else if (strcmp(section, "pcap") == 0) { + if (strcmp(name, "snaplen") == 0) { + pconfig->pcap.snap_len = (uint16_t)atoi(value); + } else if (strcmp(name, "savelen") == 0) { + pconfig->pcap.save_len = (uint32_t)atoi(value); + } else if (strcmp(name, "enable") == 0) { + pconfig->pcap.enable = (uint16_t)atoi(value); + } else if (strcmp(name, "savepath") == 0) { + pconfig->pcap.save_path = strdup(value); + } } return 1; @@ -816,6 +826,13 @@ ff_check_config(struct ff_config *cfg) } } + if (cfg->pcap.save_len < PCAP_SAVE_MINLEN) + cfg->pcap.save_len = PCAP_SAVE_MINLEN; + if (cfg->pcap.snap_len < PCAP_SNAP_MINLEN) + cfg->pcap.snap_len = PCAP_SNAP_MINLEN; + if (cfg->pcap.save_path==NULL || strlen(cfg->pcap.save_path) ==0) + cfg->pcap.save_path = strdup("."); + #define CHECK_VALID(n) \ do { \ if (!pc->n) { \ diff --git a/lib/ff_config.h b/lib/ff_config.h index f89654db..0ff2803c 100644 --- a/lib/ff_config.h +++ b/lib/ff_config.h @@ -35,6 +35,8 @@ extern "C" { #define DPDK_CONFIG_NUM 16 #define DPDK_CONFIG_MAXLEN 256 #define DPDK_MAX_LCORE 128 +#define PCAP_SNAP_MINLEN 94 +#define PCAP_SAVE_MINLEN (2<<22) extern int dpdk_argc; extern char *dpdk_argv[DPDK_CONFIG_NUM + 1]; @@ -60,6 +62,8 @@ struct ff_port_cfg { char *broadcast; char *gateway; char *pcap; + uint16_t snaplen; + uint32_t savelen; int nb_lcores; int nb_slaves; @@ -164,6 +168,13 @@ struct ff_config { int fd_reserve; int mem_size; } freebsd; + + struct { + uint16_t enable; + uint16_t snap_len; + uint32_t save_len; + char* save_path; + } pcap; }; extern struct ff_config ff_global_cfg; diff --git a/lib/ff_dpdk_if.c b/lib/ff_dpdk_if.c index 1c788319..a2ddcac9 100644 --- a/lib/ff_dpdk_if.c +++ b/lib/ff_dpdk_if.c @@ -275,7 +275,11 @@ init_lcore_conf(void) lcore_conf.tx_port_id[lcore_conf.nb_tx_port] = port_id; lcore_conf.nb_tx_port++; - lcore_conf.pcap[port_id] = pconf->pcap; + /* Enable pcap dump */ + if (ff_global_cfg.pcap.enable) { + ff_enable_pcap(ff_global_cfg.pcap.save_path, ff_global_cfg.pcap.snap_len); + } + lcore_conf.nb_queue_list[port_id] = pconf->nb_lcores; } @@ -791,11 +795,6 @@ init_port_start(void) printf("set port %u to promiscuous mode error\n", port_id); } } - - /* Enable pcap dump */ - if (pconf->pcap) { - ff_enable_pcap(pconf->pcap); - } } } @@ -1035,9 +1034,9 @@ process_packets(uint16_t port_id, uint16_t queue_id, struct rte_mbuf **bufs, for (i = 0; i < count; i++) { struct rte_mbuf *rtem = bufs[i]; - if (unlikely(qconf->pcap[port_id] != NULL)) { + if (unlikely( ff_global_cfg.pcap.enable)) { if (!pkts_from_ring) { - ff_dump_packets(qconf->pcap[port_id], rtem); + ff_dump_packets( ff_global_cfg.pcap.save_path, rtem, ff_global_cfg.pcap.snap_len, ff_global_cfg.pcap.save_len); } } @@ -1381,10 +1380,11 @@ send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port) queueid = qconf->tx_queue_id[port]; m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table; - if (unlikely(qconf->pcap[port] != NULL)) { + if (unlikely(ff_global_cfg.pcap.enable)) { uint16_t i; for (i = 0; i < n; i++) { - ff_dump_packets(qconf->pcap[port], m_table[i]); + ff_dump_packets( ff_global_cfg.pcap.save_path, m_table[i], + ff_global_cfg.pcap.snap_len, ff_global_cfg.pcap.save_len); } } diff --git a/lib/ff_dpdk_pcap.c b/lib/ff_dpdk_pcap.c index 897b4260..c3f0ac10 100644 --- a/lib/ff_dpdk_pcap.c +++ b/lib/ff_dpdk_pcap.c @@ -26,8 +26,11 @@ #include #include +#include #include "ff_dpdk_pcap.h" +#define FILE_PATH_LEN 64 +#define PCAP_FILE_NUM 10 struct pcap_file_header { uint32_t magic; @@ -46,14 +49,21 @@ struct pcap_pkthdr { uint32_t len; /* length this packet (off wire) */ }; -int -ff_enable_pcap(const char* dump_path) +static __thread FILE* g_pcap_fp = NULL; +static __thread uint32_t seq = 0; +static __thread uint32_t g_flen = 0; + +int ff_enable_pcap(const char* dump_path, uint16_t snap_len) { - FILE* fp = fopen(dump_path, "w"); - if (fp == NULL) { - rte_exit(EXIT_FAILURE, "Cannot open pcap dump path: %s\n", dump_path); + char pcap_f_path[FILE_PATH_LEN] = {0}; + + snprintf(pcap_f_path, FILE_PATH_LEN, "%s/cpu%d_%d.pcap", dump_path==NULL?".":dump_path, rte_lcore_id(), seq); + g_pcap_fp = fopen(pcap_f_path, "w+"); + if (g_pcap_fp == NULL) { + rte_exit(EXIT_FAILURE, "Cannot open pcap dump path: %s, errno %d.\n", pcap_f_path, errno); return -1; } + g_flen = 0; struct pcap_file_header pcap_file_hdr; void* file_hdr = &pcap_file_hdr; @@ -63,40 +73,52 @@ ff_enable_pcap(const char* dump_path) pcap_file_hdr.version_minor = 0x0004; pcap_file_hdr.thiszone = 0x00000000; pcap_file_hdr.sigfigs = 0x00000000; - pcap_file_hdr.snaplen = 0x0000FFFF; //65535 + pcap_file_hdr.snaplen = snap_len; //0x0000FFFF; //65535 pcap_file_hdr.linktype = 0x00000001; //DLT_EN10MB, Ethernet (10Mb) - fwrite(file_hdr, sizeof(struct pcap_file_header), 1, fp); - fclose(fp); + fwrite(file_hdr, sizeof(struct pcap_file_header), 1, g_pcap_fp); + g_flen += sizeof(struct pcap_file_header); return 0; } int -ff_dump_packets(const char* dump_path, struct rte_mbuf* pkt) +ff_dump_packets(const char* dump_path, struct rte_mbuf* pkt, uint16_t snap_len, uint32_t f_maxlen) { - FILE* fp = fopen(dump_path, "a"); - if (fp == NULL) { - return -1; - } - + unsigned int out_len = 0, wr_len = 0; struct pcap_pkthdr pcap_hdr; void* hdr = &pcap_hdr; - struct timeval ts; + char pcap_f_path[FILE_PATH_LEN] = {0}; + + if (g_pcap_fp == NULL) { + return -1; + } + snap_len = pkt->pkt_len < snap_len ? pkt->pkt_len : snap_len; gettimeofday(&ts, NULL); pcap_hdr.sec = ts.tv_sec; pcap_hdr.usec = ts.tv_usec; - pcap_hdr.caplen = pkt->pkt_len; + pcap_hdr.caplen = snap_len; pcap_hdr.len = pkt->pkt_len; - fwrite(hdr, sizeof(struct pcap_pkthdr), 1, fp); + fwrite(hdr, sizeof(struct pcap_pkthdr), 1, g_pcap_fp); + g_flen += sizeof(struct pcap_pkthdr); - while(pkt != NULL) { - fwrite(rte_pktmbuf_mtod(pkt, char*), pkt->data_len, 1, fp); + while(pkt != NULL && out_len <= snap_len) { + wr_len = snap_len - out_len; + wr_len = wr_len > pkt->data_len ? pkt->data_len : wr_len ; + fwrite(rte_pktmbuf_mtod(pkt, char*), wr_len, 1, g_pcap_fp); + out_len += wr_len; pkt = pkt->next; } + g_flen += out_len; - fclose(fp); + if ( g_flen >= f_maxlen ){ + fclose(g_pcap_fp); + if ( ++seq >= PCAP_FILE_NUM ) + seq = 0; + + ff_enable_pcap(dump_path, snap_len); + } return 0; } diff --git a/lib/ff_dpdk_pcap.h b/lib/ff_dpdk_pcap.h index 05a423fa..ee3f9bb7 100644 --- a/lib/ff_dpdk_pcap.h +++ b/lib/ff_dpdk_pcap.h @@ -30,8 +30,8 @@ #include #include -int ff_enable_pcap(const char* dump_path); -int ff_dump_packets(const char* dump_path, struct rte_mbuf *pkt); +int ff_enable_pcap(const char* dump_path, uint16_t snap_len); +int ff_dump_packets(const char* dump_path, struct rte_mbuf *pkt, uint16_t snap_len, uint32_t f_maxlen); #endif /* ifndef _FSTACK_DPDK_PCAP_H */ diff --git a/lib/ff_memory.h b/lib/ff_memory.h index bf3f94f9..8d8d2d4c 100644 --- a/lib/ff_memory.h +++ b/lib/ff_memory.h @@ -89,7 +89,7 @@ struct lcore_conf { uint16_t tx_port_id[RTE_MAX_ETHPORTS]; uint16_t tx_queue_id[RTE_MAX_ETHPORTS]; struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS]; - char *pcap[RTE_MAX_ETHPORTS]; + //char *pcap[RTE_MAX_ETHPORTS]; } __rte_cache_aligned; #ifdef FF_USE_PAGE_ARRAY diff --git a/lib/ff_syscall_wrapper.c b/lib/ff_syscall_wrapper.c index e97eae20..48519966 100644 --- a/lib/ff_syscall_wrapper.c +++ b/lib/ff_syscall_wrapper.c @@ -354,7 +354,7 @@ ip_opt_convert(int optname) case LINUX_IP_DROP_MEMBERSHIP: return IP_DROP_MEMBERSHIP; default: - return -1; + return optname; } } diff --git a/tools/ifconfig/ifbridge.c b/tools/ifconfig/ifbridge.c index 4fb33282..9d5442a7 100644 --- a/tools/ifconfig/ifbridge.c +++ b/tools/ifconfig/ifbridge.c @@ -58,6 +58,7 @@ static const char rcsid[] = #include #include #include +#include #include "ifconfig.h" @@ -166,7 +167,11 @@ bridge_interfaces(int s, const char *prefix) } for (;;) { +#ifndef FSTACK ninbuf = realloc(inbuf, len); +#else + ninbuf = rte_malloc(NULL, len, 0); +#endif if (ninbuf == NULL) err(1, "unable to allocate interface buffer"); bifc.ifbic_len = len; @@ -212,7 +217,11 @@ bridge_interfaces(int s, const char *prefix) printf("\n"); } +#ifndef FSTACK free(inbuf); +#else + rte_free(inbuf); +#endif } static void