UnixNetProcessor::start()Net模塊的啟動(dòng)intUnixNetProcessor::start(int, size_t){  EventType etype = ET_NET;//給NetH..."/>
溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

trafficserver的Net模塊源碼注釋

發(fā)布時(shí)間:2020-07-19 14:32:07 來(lái)源:網(wǎng)絡(luò) 閱讀:6962 作者:chenpingpiao 欄目:系統(tǒng)運(yùn)維

Net的啟動(dòng)流程:

main.cc
main()--->UnixNetProcessor::start()
Net模塊的啟動(dòng)

int
UnixNetProcessor::start(int, size_t)
{
 EventType etype = ET_NET;
//給NetHandler實(shí)例分配空間
 netHandler_offset = eventProcessor.allocate(sizeof(NetHandler));
//給PollCont實(shí)例分配空間
 pollCont_offset = eventProcessor.allocate(sizeof(PollCont));
//UnixNetProcessor對(duì)應(yīng)的事件類(lèi)型是ET_NET,如果是sslNetProcessor則對(duì)應(yīng)的事件類(lèi)型是ET_SSL
 upgradeEtype(etype);
//從eventProcessor獲得net的線程,這事在event模塊初始化時(shí)做好的
 n_netthreads = eventProcessor.n_threads_for_type[etype];
//從eventProcessor獲得net的線程數(shù)量
 netthreads = eventProcessor.eventthread[etype];
//初始化所有Net線程
 for (int i = 0; i < n_netthreads; ++i) {
   initialize_thread_for_net(netthreads[i]);
#ifndef STANDALONE_IOCORE
   extern void initialize_thread_for_http_sessions(EThread *thread, int thread_index);
   initialize_thread_for_http_sessions(netthreads[i], i);
#endif
 }
 RecData d;
 d.rec_int = 0;
//設(shè)置網(wǎng)絡(luò)鏈接數(shù)的閾值
 change_net_connections_throttle(NULL, RECD_INT, d, NULL);
//sock相關(guān),很少使用,這里先不介紹
 if (!netProcessor.socks_conf_stuff) {
   socks_conf_stuff = NEW(new socks_conf_struct);
   loadSocksConfiguration(socks_conf_stuff);
   if (!socks_conf_stuff->socks_needed && socks_conf_stuff->accept_enabled) {
     Warning("We can not have accept_enabled and socks_needed turned off" " disabling Socks accept\n");
     socks_conf_stuff->accept_enabled = 0;
   } else {
     socks_conf_stuff = netProcessor.socks_conf_stuff;
   }
 }
//在頁(yè)面上顯示Net相關(guān)的統(tǒng)計(jì)信息
#ifdef NON_MODULAR
 extern Action *register_ShowNet(Continuation * c, HTTPHdr * h);
 if (etype == ET_NET)
   statPagesManager.register_http("net", register_ShowNet);
#endif
 return 1;
}

main()--->UnixNetProcessor::start()--->initialize_thread_for_net()

顧名思義,這個(gè)函數(shù)的功能是為網(wǎng)絡(luò)初始化一個(gè)線程,

void
initialize_thread_for_net(EThread *thread)
{
//創(chuàng)建NetHandler、PollCont實(shí)例
//NetHandler:用于處理Net相關(guān)的所有時(shí)間
//PollCont:是一個(gè)Poll的continuation(ats的設(shè)計(jì)思想),包含指向NetHandler和PollDescriptor的指針
//PollDescriptor:Poll的描述封裝結(jié)構(gòu)
 new((ink_dummy_for_new *) get_NetHandler(thread)) NetHandler();
 new((ink_dummy_for_new *) get_PollCont(thread)) PollCont(thread->mutex, get_NetHandler(thread));
 get_NetHandler(thread)->mutex = new_ProxyMutex();
 PollCont *pc = get_PollCont(thread);
 PollDescriptor *pd = pc->pollDescriptor;
//調(diào)用NetHandler實(shí)例啟動(dòng),最終會(huì)每秒調(diào)用NetHandler::mainNetEvent()函數(shù)
 thread->schedule_imm(get_NetHandler(thread));
#ifndef INACTIVITY_TIMEOUT
//創(chuàng)建InactivityCop實(shí)例,InactivityCop會(huì)定時(shí)(1秒)判斷每個(gè)鏈接vc是否可以關(guān)閉然后進(jìn)行關(guān)閉處理
 InactivityCop *inactivityCop = NEW(new InactivityCop(get_NetHandler(thread)->mutex));
//定時(shí)調(diào)度 inactivityCop的check_inactivity()函數(shù)
 thread->schedule_every(inactivityCop, HRTIME_SECONDS(1));
#endif
//注冊(cè)信號(hào)處理函數(shù)
 thread->signal_hook = net_signal_hook_function;
//創(chuàng)建EventIO實(shí)例并初始化
 thread->ep = (EventIO*)ats_malloc(sizeof(EventIO));
 thread->ep->type = EVENTIO_ASYNC_SIGNAL;
#if HAVE_EVENTFD
//啟動(dòng)EventIO實(shí)例,使用epoll注冊(cè)讀事件(不知道epoll的先看一下?。?br />  thread->ep->start(pd, thread->evfd, 0, EVENTIO_READ);
#else
 thread->ep->start(pd, thread->evpipe[0], 0, EVENTIO_READ);
#endif
}

NetHandler的初始化
main()--->UnixNetProcessor::start()--->NetHandler::NetHandler()
設(shè)置NetHandler的handler為NetHandler::startNetEvent
NetHandler::NetHandler():Continuation(NULL), trigger_event(0)
{
 SET_HANDLER((NetContHandler) & NetHandler::startNetEvent);
}

設(shè)置NetHandler的handler為NetHandler::mainNetEvent,并定時(shí)調(diào)度該函數(shù)執(zhí)行
int
NetHandler::startNetEvent(int event, Event *e)
{
 (void) event;
 SET_HANDLER((NetContHandler) & NetHandler::mainNetEvent);
 e->schedule_every(NET_PERIOD);
 trigger_event = e;
 return EVENT_CONT;
}
PollCont的初始化
main()--->UnixNetProcessor::start()--->PollCont::PollCont()
PollCont::PollCont(ProxyMutex *m, NetHandler *nh, int pt):Continuation(m), net_handler(nh), poll_timeout(pt)
{
//創(chuàng)建PollDescriptor實(shí)例
 pollDescriptor = NEW(new PollDescriptor);
//初始化PollDescriptor實(shí)例
 pollDescriptor->init();
//設(shè)置PollCont的handler為 PollCont::pollEvent
 SET_HANDLER(&PollCont::pollEvent);
}

PollDescriptor的初始化
main()--->UnixNetProcessor::start()--->PollCont::PollCont()--->init()
 PollDescriptor *init()
 {
   result = 0;
#if TS_USE_EPOLL
   nfds = 0;
//創(chuàng)建epoll用的文件描述符
   epoll_fd = epoll_create(POLL_DESCRIPTOR_SIZE);
   memset(ePoll_Triggered_Events, 0, sizeof(ePoll_Triggered_Events));
   memset(pfd, 0, sizeof(pfd));
#endif
......
   return this;
 }


main()--->UnixNetProcessor::start()--->initialize_thread_for_net()--->NetHandler::mainNetEvent()
這個(gè)函數(shù)看起來(lái)有點(diǎn)長(zhǎng),先說(shuō)一下它的功能:首先是調(diào)用epoll_wait()等待事件,再次是根據(jù)事件的類(lèi)型做不同的處理,事件分為EVENTIO_READWRITE_VC(讀寫(xiě)事件)、EVENTIO_DNS_CONNECTION(DNS的CONNECT事件)、EVENTIO_ASYNC_SIGNAL(同步信號(hào)事件),正常的HTTP請(qǐng)求的接收和響應(yīng)屬于EVENTIO_READWRITE_VC,DNS請(qǐng)求發(fā)送流程時(shí)說(shuō)過(guò),調(diào)用connect()函數(shù)來(lái)發(fā)送DNS請(qǐng)求時(shí)會(huì)調(diào)用epoll_ctl()來(lái)注冊(cè)響應(yīng)的事件,這就是EVENTIO_DNS_CONNECTION,我們先不關(guān)心
EVENTIO_ASYNC_SIGNAL。
最后是分別遍歷Handler的可讀和可寫(xiě)隊(duì)列,并調(diào)用read和write進(jìn)行讀和寫(xiě),然后通知上層
int
NetHandler::mainNetEvent(int event, Event *e)
{
 ink_assert(trigger_event == e && (event == EVENT_INTERVAL || event == EVENT_POLL));
 (void) event;
 (void) e;
 EventIO *epd = NULL;
 int poll_timeout = net_config_poll_timeout;
//計(jì)數(shù)信息++
 NET_INCREMENT_DYN_STAT(net_handler_run_stat);
//處理NetHandler的可讀和可寫(xiě)隊(duì)列上的時(shí)間UnixNetVConnection,這里你可以看作什么都不做
 process_enabled_list(this);
 if (likely(!read_ready_list.empty() || !write_ready_list.empty() || !read_enable_list.empty() || !write_enable_list.empty()))
   poll_timeout = 0;
 else
   poll_timeout = net_config_poll_timeout;
 PollDescriptor *pd = get_PollDescriptor(trigger_event->ethread);
 UnixNetVConnection *vc = NULL;
#if TS_USE_EPOLL
//調(diào)用epoll事件
 pd->result = epoll_wait(pd->epoll_fd, pd->ePoll_Triggered_Events, POLL_DESCRIPTOR_SIZE, poll_timeout);
 NetDebug("iocore_net_main_poll", "[NetHandler::mainNetEvent] epoll_wait(%d,%d), result=%d", pd->epoll_fd,poll_timeout,pd->result);
   ......
//處理所有的事件
 vc = NULL;
 for (int x = 0; x < pd->result; x++) {
   epd = (EventIO*) get_ev_data(pd,x);
// EVENTIO_READWRITE_VC事件的處理:如果是讀事件則加入到NetHandler的可讀鏈表read_ready_list,如果是寫(xiě)事件加入NetHandler的可讀鏈表write_ready_list
   if (epd->type == EVENTIO_READWRITE_VC) {
     vc = epd->data.vc;
     if (get_ev_events(pd,x) & (EVENTIO_READ|EVENTIO_ERROR)) {
       vc->read.triggered = 1;
       if (!read_ready_list.in(vc))
         read_ready_list.enqueue(vc);
       else if (get_ev_events(pd,x) & EVENTIO_ERROR) {
         // check for unhandled epoll events that should be handled
         Debug("iocore_net_main", "Unhandled epoll event on read: 0x%04x read.enabled=%d closed=%d read.netready_queue=%d",
               get_ev_events(pd,x), vc->read.enabled, vc->closed, read_ready_list.in(vc));
       }
     }
     vc = epd->data.vc;
     if (get_ev_events(pd,x) & (EVENTIO_WRITE|EVENTIO_ERROR)) {
       vc->write.triggered = 1;
       if (!write_ready_list.in(vc))
         write_ready_list.enqueue(vc);
       else if (get_ev_events(pd,x) & EVENTIO_ERROR) {
         Debug("iocore_net_main",
               "Unhandled epoll event on write: 0x%04x write.enabled=%d closed=%d write.netready_queue=%d",
               get_ev_events(pd,x), vc->write.enabled, vc->closed, write_ready_list.in(vc));
       }
     } else if (!get_ev_events(pd,x) & EVENTIO_ERROR) {
       Debug("iocore_net_main", "Unhandled epoll event: 0x%04x", get_ev_events(pd,x));
     }
//EVENTIO_DNS_CONNECTION事件的處理:加入DNSHandler的triggered隊(duì)列
   } else if (epd->type == EVENTIO_DNS_CONNECTION) {
     if (epd->data.dnscon != NULL) {
       epd->data.dnscon->trigger();
#if defined(USE_EDGE_TRIGGER)
       epd->refresh(EVENTIO_READ);
#endif
     }
   } else if (epd->type == EVENTIO_ASYNC_SIGNAL)
     net_signal_hook_callback(trigger_event->ethread);
   ev_next_event(pd,x);
 }
 pd->result = 0;
#if defined(USE_EDGE_TRIGGER)
//遍歷Handler的可讀隊(duì)列中的vc,調(diào)用net_read_io分別處理每個(gè)vc,而net_read_io的功能就是調(diào)用read去接收數(shù)據(jù),然后通知上層(HttpSM)
 while ((vc = read_ready_list.dequeue())) {
   if (vc->closed)
     close_UnixNetVConnection(vc, trigger_event->ethread);
   else if (vc->read.enabled && vc->read.triggered)
     vc->net_read_io(this, trigger_event->ethread);
   else if (!vc->read.enabled) {
     read_ready_list.remove(vc);
   }
 }
//遍歷Handler的可寫(xiě)隊(duì)列中的vc,調(diào)用write_to_net分別處理每個(gè)vc,而write_to_net的功能就是調(diào)用write去發(fā)送數(shù)據(jù),然后通知上層(HttpSM)
 while ((vc = write_ready_list.dequeue())) {
   if (vc->closed)
     close_UnixNetVConnection(vc, trigger_event->ethread);
   else if (vc->write.enabled && vc->write.triggered)
     write_to_net(this, vc, trigger_event->ethread);
   else if (!vc->write.enabled) {
     write_ready_list.remove(vc);
   }
 }
 return EVENT_CONT;
}

別忘了InactivityCop這個(gè)結(jié)構(gòu)
main()--->UnixNetProcessor::start()--->initialize_thread_for_net()--->InactivityCop()

設(shè)置handler為InactivityCop::check_inactivity,該函數(shù)被每秒中調(diào)用一次

struct InactivityCop : public Continuation {
 InactivityCop(ProxyMutex *m):Continuation(m) {
   SET_HANDLER(&InactivityCop::check_inactivity);
 }

main()--->UnixNetProcessor::start()--->initialize_thread_for_net()--->InactivityCop()---> InactivityCop::check_inactivity()

 int check_inactivity(int event, Event *e) {
   (void) event;
   ink_hrtime now = ink_get_hrtime();
   NetHandler *nh = get_NetHandler(this_ethread());
//遍歷NetHandler的鏈接隊(duì)列,判斷和本線程是不是統(tǒng)一線程,是的話加到NetHandler的cop_list隊(duì)列
   forl_LL(UnixNetVConnection, vc, nh->open_list) {
     if (vc->thread == this_ethread())
       nh->cop_list.push(vc);
   }
   while (UnixNetVConnection *vc = nh->cop_list.pop()) {
     // If we cannot ge tthe lock don't stop just keep cleaning
     MUTEX_TRY_LOCK(lock, vc->mutex, this_ethread());
     if (!lock.lock_acquired) {
      NET_INCREMENT_DYN_STAT(inactivity_cop_lock_acquire_failure_stat);
      continue;
     }
//如果該鏈接vc已設(shè)置為關(guān)閉狀態(tài),則調(diào)用close_UnixNetVConnection()進(jìn)行關(guān)閉操作
     if (vc->closed) {
       close_UnixNetVConnection(vc, e->ethread);
       continue;
     }
     if (vc->next_inactivity_timeout_at && vc->next_inactivity_timeout_at < now)
//調(diào)用vc的handler(UnixNetVConnection::mainEvent)進(jìn)行處理
       vc->handleEvent(EVENT_IMMEDIATE, e);
   }
   return 0;
 }


   好了,到此為止,NetProcessor的啟動(dòng)流程已經(jīng)分析完成,可以簡(jiǎn)單得總結(jié)為:NetProcessor的啟動(dòng)主要任務(wù)是初始化幾個(gè)線程定時(shí)調(diào)用epoll_wait()等待讀寫(xiě)事件,如果有讀事件到來(lái)時(shí)調(diào)用read進(jìn)行讀操作,然后把讀到的數(shù)據(jù)傳給上層處理,如果有寫(xiě)事件到來(lái)時(shí)調(diào)用write進(jìn)行發(fā)送操作,發(fā)送完成后通知上層發(fā)送結(jié)果。那么讀寫(xiě)事件是怎么來(lái)的呢?根據(jù)網(wǎng)絡(luò)編程的經(jīng)驗(yàn),server在read和write之前一般都要accept,這里的讀寫(xiě)事件正是來(lái)于accept,下面來(lái)分析NetProcessor的accept。NetProcessor的accept是在HttpProxyServer啟動(dòng)時(shí)調(diào)用的,正確的說(shuō)是main_accept()函數(shù)。

main()--->start_HttpProxyServer()
void
start_HttpProxyServer()
{
//根據(jù)配置,每個(gè)端口(默認(rèn)只有一個(gè):8080)創(chuàng)建一個(gè)Acceptor
 for ( int i = 0 , n = proxy_ports.length() ; i < n ; ++i ) {
   HttpProxyAcceptor& acceptor = HttpProxyAcceptors[i];
   HttpProxyPort& port = proxy_ports[i];
       ......
     if (NULL == netProcessor.main_accept(acceptor._accept, port.m_fd, acceptor._net_opt))
       return;
   }
   ......
 }
}

main()--->start_HttpProxyServer()--->NetProcessor::main_accept()
Action *
NetProcessor::main_accept(Continuation *cont, SOCKET fd, AcceptOptions const& opt)
{
 UnixNetProcessor* this_unp = static_cast<UnixNetProcessor*>(this);
 Debug("iocore_net_processor", "NetProcessor::main_accept - port %d,recv_bufsize %d, send_bufsize %d, sockopt 0x%0x",
       opt.local_port, opt.recv_bufsize, opt.send_bufsize, opt.sockopt_flags);
//直接調(diào)用UnixNetProcessor::accept_internal()
 return this_unp->accept_internal(cont, fd, opt);
}

main()--->start_HttpProxyServer()--->NetProcessor::main_accept()--->UnixNetProcessor::accept_internal()

Action *
UnixNetProcessor::accept_internal(Continuation *cont, int fd, AcceptOptions const& opt)
{
 EventType et = opt.etype;
//創(chuàng)建NetAccept實(shí)例
 NetAccept *na = createNetAccept();
 EThread *thread = this_ethread();
 ProxyMutex *mutex = thread->mutex;
 int accept_threads = opt.accept_threads;
 IpEndpoint accept_ip;
 upgradeEtype(et);
//opt是從配置里讀出來(lái)的網(wǎng)絡(luò)相關(guān)的配置,作為Net的選項(xiàng),具體的配置想請(qǐng)看ATS配置說(shuō)明
 if (opt.accept_threads < 0) {
   REC_ReadConfigInteger(accept_threads, "proxy.config.accept_threads");
 }
 NET_INCREMENT_DYN_STAT(net_accepts_currently_open_stat);
//根據(jù)配置的模式設(shè)置server的地址
 if (opt.localhost_only) {
   accept_ip.setToLoopback(opt.ip_family);
 } else if (opt.local_ip.isValid()) {
   accept_ip.assign(opt.local_ip);
 } else {
   accept_ip.setToAnyAddr(opt.ip_family);
 }
 ink_assert(0 < opt.local_port && opt.local_port < 65536);
 accept_ip.port() = htons(opt.local_port);
 na->accept_fn = net_accept;
 na->server.fd = fd;
 ats_ip_copy(&na->server.accept_addr, &accept_ip);
 na->server.f_inbound_transparent = opt.f_inbound_transparent;
//透明代理
 if (opt.f_inbound_transparent) {
   Debug( "http_tproxy", "Marking accept server %p on port %d as inbound transparent", na, opt.local_port);
 }
 int should_filter_int = 0;
 na->server.http_accept_filter = false;
//查看該配置項(xiàng)的說(shuō)明,只有數(shù)據(jù)到來(lái)時(shí)才會(huì)accept,默認(rèn)等45秒沒(méi)來(lái)就放棄,是在下面調(diào)用setsockopt()通過(guò)設(shè)置socket的選項(xiàng)來(lái)實(shí)現(xiàn)的
 REC_ReadConfigInteger(should_filter_int, "proxy.config.net.defer_accept");
 if (should_filter_int > 0 && opt.etype == ET_NET)
   na->server.http_accept_filter = true;
 na->action_ = NEW(new NetAcceptAction());
 *na->action_ = cont;//指向上層的continuation,如HttpAccept
//下面是初始化接收buffer大小等網(wǎng)絡(luò)的一些參數(shù)
 na->action_->server = &na->server;
 na->callback_on_open = opt.f_callback_on_open;
 na->recv_bufsize = opt.recv_bufsize;
 na->send_bufsize = opt.send_bufsize;
 na->sockopt_flags = opt.sockopt_flags;
 na->packet_mark = opt.packet_mark;
 na->packet_tos = opt.packet_tos;
 na->etype = opt.etype;
 na->backdoor = opt.backdoor;
 if (na->callback_on_open)
   na->mutex = cont->mutex;
//實(shí)時(shí)接收
 if (opt.frequent_accept) {
   //配置的accept的線程數(shù)
   if (accept_threads > 0)  {
       //設(shè)置socket的選項(xiàng)
     if (0 == na->do_listen(BLOCKING, opt.f_inbound_transparent)) {
       NetAccept *a;
       //循環(huán)為每個(gè)線程創(chuàng)建NetAccept實(shí)例,并把上面創(chuàng)建的na賦值給之,最后調(diào)用NetAccept的 init_accept_loop()函數(shù)進(jìn)入循環(huán)地accept的狀態(tài)
       for (int i=1; i < accept_threads; ++i) {
         a = createNetAccept();
         *a = *na;
         a->init_accept_loop();
         Debug("iocore_net_accept", "Created accept thread #%d for port %d", i, ats_ip_port_host_order(&accept_ip));
       }
       Debug("iocore_net_accept", "Created accept thread #%d for port %d", accept_threads, ats_ip_port_host_order(&accept_ip));
       na->init_accept_loop();
     }
   } else {
     na->init_accept_per_thread();
   }
 } else
   na->init_accept();
//查看該配置項(xiàng)的說(shuō)明,只有數(shù)據(jù)到來(lái)時(shí)才會(huì)accept,默認(rèn)等45秒沒(méi)來(lái)就放棄,是在下面調(diào)用setsockopt()通過(guò)設(shè)置socket的選項(xiàng)來(lái)實(shí)現(xiàn)的
#ifdef TCP_DEFER_ACCEPT
 if (should_filter_int > 0) {
   setsockopt(na->server.fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &should_filter_int, sizeof(int));
 }
#endif
 return na->action_;
}

main()--->start_HttpProxyServer()--->NetProcessor::main_accept()--->UnixNetProcessor::accept_internal()--->NetAccept::init_accept_loop()

該函數(shù)的功能是創(chuàng)建一個(gè)線程,并設(shè)置線程的執(zhí)行函數(shù)為NetAccept::acceptLoopEvent

void
NetAccept::init_accept_loop()
{
 size_t stacksize;
//線程棧的大小
 REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
 SET_CONTINUATION_HANDLER(this, &NetAccept::acceptLoopEvent);
 eventProcessor.spawn_thread(this, "[ACCEPT]", stacksize);
}

main()--->start_HttpProxyServer()--->NetProcessor::main_accept()--->UnixNetProcessor::accept_internal()--->NetAccept::init_accept_loop()--->NetAccept::acceptLoopEvent()

int
NetAccept::acceptLoopEvent(int event, Event * e)
{
 (void) event;
 (void) e;
 EThread *t = this_ethread();
//媽啊,終于看到死循環(huán)在accept了(當(dāng)然listen已經(jīng)在前面了采用accept)
 while (1)
   do_blocking_accept(t);
 NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
 delete this;
 return EVENT_DONE;
}


main()--->start_HttpProxyServer()--->NetProcessor::main_accept()--->UnixNetProcessor::accept_internal()--->NetAccept::init_accept_loop()--->NetAccept::acceptLoopEvent()--->NetAccept::do_blocking_accept()

該函數(shù)的功能是循環(huán)地調(diào)用accept去接收請(qǐng)求,并讓event系統(tǒng)調(diào)度去處理每個(gè)鏈接 UnixNetVConnection

int
NetAccept::do_blocking_accept(EThread * t)
{
 int res = 0;
 int loop = accept_till_done;
 UnixNetVConnection *vc = NULL;
 do {
   //創(chuàng)建表示一個(gè)鏈接的UnixNetVConnection的實(shí)例
   vc = (UnixNetVConnection *)alloc_cache;
   if (likely(!vc)) {
     vc = allocateGlobal();
     vc->from_accept_thread = true;
     vc->id = net_next_connection_number();
     alloc_cache = vc;
   }
   //流量控制
   ink_hrtime now = ink_get_hrtime();
   while (!backdoor && check_net_throttle(ACCEPT, now)) {
     check_throttle_warning();
     if (!unix_netProcessor.throttle_error_message) {
       safe_delay(NET_THROTTLE_DELAY);
     } else if (send_throttle_message(this) < 0) {
       goto Lerror;
     }
     now = ink_get_hrtime();
   }
   //調(diào)用accept去接收請(qǐng)求
   if ((res = server.accept(&vc->con)) < 0) {
  //錯(cuò)誤處理
   Lerror:
     int seriousness = accept_error_seriousness(res);
     if (seriousness >= 0) {  
       if (!seriousness)      
         check_transient_accept_error(res);
       safe_delay(NET_THROTTLE_DELAY);
       return 0;
     }
     if (!action_->cancelled) {
       MUTEX_LOCK(lock, action_->mutex, t);
       action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
       MUTEX_UNTAKE_LOCK(action_->mutex, t);
       Warning("accept thread received fatal error: errno = %d", errno);
     }
     return -1;
   }
   //流量控制
   check_emergency_throttle(vc->con);
   alloc_cache = NULL;
   NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
   //設(shè)置vc的時(shí)間和server的ip地址
   vc->submit_time = now;
   ats_ip_copy(&vc->server_addr, &vc->con.addr);
   //透明代理標(biāo)志位
   vc->set_is_transparent(server.f_inbound_transparent);
   vc->mutex = new_ProxyMutex();
   vc->action_ = *action_;
   //設(shè)置UnixNetVConnection的handler為UnixNetVConnection::acceptEvent
   SET_CONTINUATION_HANDLER(vc, (NetVConnHandler) & UnixNetVConnection::acceptEvent);
   //讓event系統(tǒng)調(diào)度vc的handler執(zhí)行
   eventProcessor.schedule_imm_signal(vc, getEtype());
 } while (loop);

 return 1;
}

main()--->start_HttpProxyServer()--->NetProcessor::main_accept()--->UnixNetProcessor::accept_internal()--->NetAccept::init_accept_loop()--->NetAccept::acceptLoopEvent()--->NetAccept::do_blocking_accept()--->UnixNetVConnection::acceptEvent()

該函數(shù)的功能是接收一個(gè)鏈接,向NetHandler注冊(cè)讀寫(xiě)事件,讓NetHandler去接收該鏈接的數(shù)據(jù)和發(fā)送對(duì)該請(qǐng)求的響應(yīng)報(bào)文(NetHandler肯定是干這個(gè)事的?。?,最后調(diào)用上層(HttpSM)的handler(HttpAccept::mainEvent)來(lái)接收該鏈接

int
UnixNetVConnection::acceptEvent(int event, Event *e)
{
 thread = e->ethread;
 MUTEX_TRY_LOCK(lock, get_NetHandler(thread)->mutex, e->ethread);
 if (!lock) {
   if (event == EVENT_NONE) {
     thread->schedule_in(this, NET_RETRY_DELAY);
     return EVENT_DONE;
   } else {
     e->schedule_in(NET_RETRY_DELAY);
     return EVENT_CONT;
   }
 }
 if (action_.cancelled) {
   free(thread);
   return EVENT_DONE;
 }
//設(shè)置UnixNetVConnection的handler為UnixNetVConnection::mainEvent
 SET_HANDLER((NetVConnHandler) & UnixNetVConnection::mainEvent);
//獲取指向NetHandler的指針,NetHandler前面介紹過(guò)了
 nh = get_NetHandler(thread);
//獲取指向PollDescriptor的指針,PollDescriptor前面介紹過(guò)了
 PollDescriptor *pd = get_PollDescriptor(thread);
//注冊(cè)epoll讀和寫(xiě)事件,這會(huì)和上面的流程聯(lián)系起來(lái)了吧
 if (ep.start(pd, this, EVENTIO_READ|EVENTIO_WRITE) < 0) {
   Debug("iocore_net", "acceptEvent : failed EventIO::start\n");
   close_UnixNetVConnection(this, e->ethread);
   return EVENT_DONE;
 }
//把vc加入NetHandler的開(kāi)鏈接隊(duì)列open_list
 nh->open_list.enqueue(this);
//設(shè)置相應(yīng)的超時(shí)時(shí)間用于關(guān)閉鏈接
 if (inactivity_timeout_in)
   UnixNetVConnection::set_inactivity_timeout(inactivity_timeout_in);
 if (active_timeout_in)
   UnixNetVConnection::set_active_timeout(active_timeout_in);
//調(diào)用上層的handler來(lái)處理本次鏈接,如HttpAccept::mainEvent,怎么處理留到HTTP流程再分析
 action_.continuation->handleEvent(NET_EVENT_ACCEPT, this);
 return EVENT_DONE;
}
HttpSM <----------------------HttpAccept
   ^                              ^
   |                              |
   |                              |  
   |          注冊(cè)READ/WRITE事件   |
NetHandler <------------------NetAccept
   ^                              ^
   |                              |
   |                              |
   |                              |
   |                              |  
read() write()                 accept()

   此外,NetProcessor還有兩個(gè)函數(shù)要說(shuō)明,那就是NetProcessor::connect_s()和NetProcessor::connect_re()。這兩個(gè)函數(shù)都是給上層提供的connect接口,他們的區(qū)別是一個(gè)是異步的一個(gè)是同步的,connect_s是同步,connect_re是異步。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI