/** * Tencent is pleased to support the open source community by making MSEC available. * * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. * * Licensed under the GNU General Public License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. You may * obtain a copy of the License at * * https://opensource.org/licenses/GPL-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, * either express or implied. See the License for the specific language governing permissions * and limitations under the License. */ /** * @file mt_connection.cpp * @time 20130924 **/ #include #include #include #include #include #include "micro_thread.h" #include "mt_msg.h" #include "mt_notify.h" #include "mt_connection.h" #include "mt_sys_hook.h" #include "ff_hook.h" using namespace std; using namespace NS_MICRO_THREAD; IMtConnection::IMtConnection() { _type = OBJ_CONN_UNDEF; _action = NULL; _ntfy_obj = NULL; _msg_buff = NULL; } IMtConnection::~IMtConnection() { if (_ntfy_obj) { NtfyObjMgr::Instance()->FreeNtfyObj(_ntfy_obj); _ntfy_obj = NULL; } if (_msg_buff) { MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff); _msg_buff = NULL; } } void IMtConnection::Reset() { if (_ntfy_obj) { NtfyObjMgr::Instance()->FreeNtfyObj(_ntfy_obj); _ntfy_obj = NULL; } if (_msg_buff) { MsgBuffPool::Instance()->FreeMsgBuf(_msg_buff); _msg_buff = NULL; } _action = NULL; _ntfy_obj = NULL; _msg_buff = NULL; } int UdpShortConn::CreateSocket() { _osfd = socket(AF_INET, SOCK_DGRAM, 0); if (_osfd < 0) { MTLOG_ERROR("socket create failed, errno %d(%s)", errno, strerror(errno)); return -1; } int flags = 1; if (ioctl(_osfd, FIONBIO, &flags) < 0) { MTLOG_ERROR("socket unblock failed, errno %d(%s)", errno, strerror(errno)); close(_osfd); _osfd = -1; return -2; } if (_ntfy_obj) { _ntfy_obj->SetOsfd(_osfd); } return _osfd; } int UdpShortConn::CloseSocket() { if (_osfd < 0) { return 0; } close(_osfd); _osfd = -1; return 0; } int UdpShortConn::SendData() { if (!_action || !_msg_buff) { MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff); return -100; } mt_hook_syscall(sendto); int ret = ff_hook_sendto(_osfd, _msg_buff->GetMsgBuff(), _msg_buff->GetMsgLen(), 0, (struct sockaddr*)_action->GetMsgDstAddr(), sizeof(struct sockaddr_in)); if (ret == -1) { if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) { return 0; } else { MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _osfd, errno, strerror(errno)); return -2; } } else { _msg_buff->SetHaveSndLen(ret); return ret; } } int UdpShortConn::RecvData() { if (!_action || !_msg_buff) { MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff); return -100; } struct sockaddr_in from; socklen_t fromlen = sizeof(from); mt_hook_syscall(recvfrom); int ret = ff_hook_recvfrom(_osfd, _msg_buff->GetMsgBuff(), _msg_buff->GetMaxLen(), 0, (struct sockaddr*)&from, &fromlen); if (ret < 0) { if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) { return 0; } else { MTLOG_ERROR("socket recv failed, fd %d, errno %d(%s)", _osfd, errno, strerror(errno)); return -2; } } else if (ret == 0) { return -1; } else { _msg_buff->SetHaveRcvLen(ret); } ret = _action->DoInput(); if (ret > 0) { _msg_buff->SetMsgLen(ret); return ret; } else if (ret == 0) { return 0; } else if (ret == -65535) { _msg_buff->SetHaveRcvLen(0); return 0; } else { return -1; } } void UdpShortConn::Reset() { CloseSocket(); this->IMtConnection::Reset(); } int TcpKeepConn::OpenCnnect() { if (!_action || !_msg_buff) { MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff); return -100; } int err = 0; mt_hook_syscall(connect); int ret = ff_hook_connect(_osfd, (struct sockaddr*)_action->GetMsgDstAddr(), sizeof(struct sockaddr_in)); if (ret < 0) { err = errno; if (err == EISCONN) { return 0; } else { if ((err == EINPROGRESS) || (err == EALREADY) || (err == EINTR)) { MTLOG_DEBUG("Open connect not ok, maybe first try, sock %d, errno %d", _osfd, err); return -1; } else { MTLOG_ERROR("Open connect not ok, sock %d, errno %d", _osfd, err); return -2; } } } else { return 0; } } int TcpKeepConn::CreateSocket() { if (_osfd > 0) { if (_ntfy_obj) { _ntfy_obj->SetOsfd(_osfd); } return _osfd; } _osfd = socket(AF_INET, SOCK_STREAM, 0); if (_osfd < 0) { MTLOG_ERROR("create tcp socket failed, error: %d", errno); return -1; } int flags = 1; if (ioctl(_osfd, FIONBIO, &flags) < 0) { MTLOG_ERROR("set tcp socket unblock failed, error: %d", errno); close(_osfd); _osfd = -1; return -2; } _keep_ntfy.SetOsfd(_osfd); _keep_ntfy.DisableOutput(); _keep_ntfy.EnableInput(); if (_ntfy_obj) { _ntfy_obj->SetOsfd(_osfd); } return _osfd; } int TcpKeepConn::SendData() { if (!_action || !_msg_buff) { MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff); return -100; } char* msg_ptr = (char*)_msg_buff->GetMsgBuff(); int msg_len = _msg_buff->GetMsgLen(); int have_send_len = _msg_buff->GetHaveSndLen(); mt_hook_syscall(send); int ret = ff_hook_send(_osfd, msg_ptr + have_send_len, msg_len - have_send_len, 0); if (ret == -1) { if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) { return 0; } else { MTLOG_ERROR("send tcp socket failed, error: %d", errno); return -1; } } else { have_send_len += ret; _msg_buff->SetHaveSndLen(have_send_len); } if (have_send_len >= msg_len) { return msg_len; } else { return 0; } } int TcpKeepConn::RecvData() { if (!_action || !_msg_buff) { MTLOG_ERROR("conn not set action %p, or msg %p, error", _action, _msg_buff); return -100; } char* msg_ptr = (char*)_msg_buff->GetMsgBuff(); int max_len = _msg_buff->GetMaxLen(); int have_rcv_len = _msg_buff->GetHaveRcvLen(); mt_hook_syscall(recv); int ret = ff_hook_recv(_osfd, (char*)msg_ptr + have_rcv_len, max_len - have_rcv_len, 0); if (ret < 0) { if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) { return 0; } else { MTLOG_ERROR("recv tcp socket failed, error: %d", errno); return -2; } } else if (ret == 0) { MTLOG_ERROR("tcp remote close, address: %s[%d]", inet_ntoa(_dst_addr.sin_addr), ntohs(_dst_addr.sin_port)); return -1; } else { have_rcv_len += ret; _msg_buff->SetHaveRcvLen(have_rcv_len); } ret = _action->DoInput(); if (ret > 0) { _msg_buff->SetMsgLen(have_rcv_len); return ret; } else if (ret == 0) { return 0; } else { return -1; } } int TcpKeepConn::CloseSocket() { if (_osfd < 0) { return 0; } _keep_ntfy.SetOsfd(-1); close(_osfd); _osfd = -1; return 0; } void TcpKeepConn::Reset() { memset(&_dst_addr, 0 ,sizeof(_dst_addr)); CloseSocket(); this->IMtConnection::Reset(); } void TcpKeepConn::ConnReuseClean() { this->IMtConnection::Reset(); } bool TcpKeepConn::IdleAttach() { if (_osfd < 0) { MTLOG_ERROR("obj %p attach failed, fd %d error", this, _osfd); return false; } if (_keep_flag & TCP_KEEP_IN_KQUEUE) { MTLOG_ERROR("obj %p repeat attach, error", this); return true; } _keep_ntfy.DisableOutput(); _keep_ntfy.EnableInput(); CTimerMng* timer = MtFrame::Instance()->GetTimerMng(); if ((NULL == timer) || !timer->start_timer(this, _keep_time)) { MTLOG_ERROR("obj %p attach timer failed, error", this); return false; } if (MtFrame::Instance()->KqueueAddObj(&_keep_ntfy)) { _keep_flag |= TCP_KEEP_IN_KQUEUE; return true; } else { MTLOG_ERROR("obj %p attach failed, error", this); return false; } } bool TcpKeepConn::IdleDetach() { if (_osfd < 0) { MTLOG_ERROR("obj %p detach failed, fd %d error", this, _osfd); return false; } if (!(_keep_flag & TCP_KEEP_IN_KQUEUE)) { MTLOG_DEBUG("obj %p repeat detach, error", this); return true; } _keep_ntfy.DisableOutput(); _keep_ntfy.EnableInput(); CTimerMng* timer = MtFrame::Instance()->GetTimerMng(); if (NULL != timer) { timer->stop_timer(this); } if (MtFrame::Instance()->KqueueDelObj(&_keep_ntfy)) { _keep_flag &= ~TCP_KEEP_IN_KQUEUE; return true; } else { MTLOG_ERROR("obj %p detach failed, error", this); return false; } } void TcpKeepConn::timer_notify() { MTLOG_DEBUG("keep timeout[%u], fd %d, close connection", _keep_time, _osfd); ConnectionMgr::Instance()->CloseIdleTcpKeep(this); } TcpKeepMgr::TcpKeepMgr() { _keep_hash = new HashList(10000); } TcpKeepMgr::~TcpKeepMgr() { if (!_keep_hash) { return; } HashKey* hash_item = _keep_hash->HashGetFirst(); while (hash_item) { delete hash_item; hash_item = _keep_hash->HashGetFirst(); } delete _keep_hash; _keep_hash = NULL; } TcpKeepConn* TcpKeepMgr::GetTcpKeepConn(struct sockaddr_in* dst) { TcpKeepConn* conn = NULL; if (NULL == dst) { MTLOG_ERROR("input param dst null, error"); return NULL; } TcpKeepKey key(dst); TcpKeepKey* conn_list = (TcpKeepKey*)_keep_hash->HashFindData(&key); if ((NULL == conn_list) || (NULL == conn_list->GetFirstConn())) { conn = _mem_queue.AllocPtr(); if (conn) { conn->SetDestAddr(dst); } } else { conn = conn_list->GetFirstConn(); conn_list->RemoveConn(conn); conn->IdleDetach(); } return conn; } bool TcpKeepMgr::RemoveTcpKeepConn(TcpKeepConn* conn) { struct sockaddr_in* dst = conn->GetDestAddr(); if ((dst->sin_addr.s_addr == 0) || (dst->sin_port == 0)) { MTLOG_ERROR("sock addr, invalid, %x:%d", dst->sin_addr.s_addr, dst->sin_port); return false; } TcpKeepKey key(dst); TcpKeepKey* conn_list = (TcpKeepKey*)_keep_hash->HashFindData(&key); if (!conn_list) { MTLOG_ERROR("no conn cache list, invalid, %x:%d", dst->sin_addr.s_addr, dst->sin_port); return false; } conn->IdleDetach(); conn_list->RemoveConn(conn); return true; } bool TcpKeepMgr::CacheTcpKeepConn(TcpKeepConn* conn) { struct sockaddr_in* dst = conn->GetDestAddr(); if ((dst->sin_addr.s_addr == 0) || (dst->sin_port == 0)) { MTLOG_ERROR("sock addr, invalid, %x:%d", dst->sin_addr.s_addr, dst->sin_port); return false; } TcpKeepKey key(dst); TcpKeepKey* conn_list = (TcpKeepKey*)_keep_hash->HashFindData(&key); if (!conn_list) { conn_list = new TcpKeepKey(conn->GetDestAddr()); if (!conn_list) { MTLOG_ERROR("new conn list failed, error"); return false; } _keep_hash->HashInsert(conn_list); } if (!conn->IdleAttach()) { MTLOG_ERROR("conn IdleAttach failed, error"); return false; } conn->ConnReuseClean(); conn_list->InsertConn(conn); return true; } void TcpKeepMgr::FreeTcpKeepConn(TcpKeepConn* conn, bool force_free) { if (force_free) { conn->Reset(); _mem_queue.FreePtr(conn); return; } else { if (!CacheTcpKeepConn(conn)) { conn->Reset(); _mem_queue.FreePtr(conn); return; } } } int UdpSessionConn::CreateSocket() { if (!_action || !_ntfy_obj) { MTLOG_ERROR("conn not set action %p, or _ntfy_obj %p, error", _action, _ntfy_obj); return -100; } SessionProxy* proxy = dynamic_cast(_ntfy_obj); if (!proxy) { MTLOG_ERROR("ntfy obj not match, _ntfy_obj %p, error", _ntfy_obj); return -200; } ISessionNtfy* real_ntfy = proxy->GetRealNtfyObj(); if (!real_ntfy) { MTLOG_ERROR("real ntfy obj not match, _ntfy_obj %p, error", _ntfy_obj); return -300; } int osfd = real_ntfy->GetOsfd(); if (osfd <= 0) { osfd = real_ntfy->CreateSocket(); if (osfd <= 0) { MTLOG_ERROR("real ntfy obj create fd failed, _ntfy_obj %p, error", real_ntfy); return -400; } } _ntfy_obj->SetOsfd(osfd); return osfd; } int UdpSessionConn::CloseSocket() { return 0; } int UdpSessionConn::SendData() { if (!_action || !_msg_buff || !_ntfy_obj) { MTLOG_ERROR("conn not set action %p, or msg %p, ntfy %p error", _action, _msg_buff, _ntfy_obj); return -100; } mt_hook_syscall(sendto); int ret = ff_hook_sendto(_ntfy_obj->GetOsfd(), _msg_buff->GetMsgBuff(), _msg_buff->GetMsgLen(), 0, (struct sockaddr*)_action->GetMsgDstAddr(), sizeof(struct sockaddr_in)); if (ret == -1) { if ((errno == EINTR) || (errno == EAGAIN) || (errno == EINPROGRESS)) { return 0; } else { MTLOG_ERROR("socket send failed, fd %d, errno %d(%s)", _ntfy_obj->GetOsfd(), errno, strerror(errno)); return -2; } } else { _msg_buff->SetHaveSndLen(ret); return ret; } } int UdpSessionConn::RecvData() { if (!_ntfy_obj || !_msg_buff) { MTLOG_ERROR("conn not set _ntfy_obj %p, or msg %p, error", _ntfy_obj, _msg_buff); return -100; } if (_ntfy_obj->GetRcvEvents() <= 0) { MTLOG_DEBUG("conn _ntfy_obj %p, no recv event, retry it", _ntfy_obj); return 0; } int msg_len = _msg_buff->GetMsgLen(); if (BUFF_RECV == _msg_buff->GetBuffType()) { return msg_len; } else { MTLOG_DEBUG("conn msg buff %p, no recv comm", _msg_buff); return 0; } } ConnectionMgr* ConnectionMgr::_instance = NULL; ConnectionMgr* ConnectionMgr::Instance (void) { if (NULL == _instance) { _instance = new ConnectionMgr(); } return _instance; } void ConnectionMgr::Destroy() { if( _instance != NULL ) { delete _instance; _instance = NULL; } } ConnectionMgr::ConnectionMgr() { } ConnectionMgr::~ConnectionMgr() { } IMtConnection* ConnectionMgr::GetConnection(CONN_OBJ_TYPE type, struct sockaddr_in* dst) { switch (type) { case OBJ_SHORT_CONN: return _udp_short_queue.AllocPtr(); break; case OBJ_TCP_KEEP: return _tcp_keep_mgr.GetTcpKeepConn(dst); break; case OBJ_UDP_SESSION: return _udp_session_queue.AllocPtr(); break; default: return NULL; break; } } void ConnectionMgr::FreeConnection(IMtConnection* conn, bool force_free) { if (!conn) { return; } CONN_OBJ_TYPE type = conn->GetConnType(); switch (type) { case OBJ_SHORT_CONN: conn->Reset(); return _udp_short_queue.FreePtr(dynamic_cast(conn)); break; case OBJ_TCP_KEEP: return _tcp_keep_mgr.FreeTcpKeepConn(dynamic_cast(conn), force_free); break; case OBJ_UDP_SESSION: conn->Reset(); return _udp_session_queue.FreePtr(dynamic_cast(conn)); break; default: break; } delete conn; return; } void ConnectionMgr::CloseIdleTcpKeep(TcpKeepConn* conn) { _tcp_keep_mgr.RemoveTcpKeepConn(conn); _tcp_keep_mgr.FreeTcpKeepConn(conn, true); }