--------------------- PatchSet 6267 Date: 2007/12/13 18:13:32 Author: rousskov Branch: async-calls Tag: (none) Log: Removed accept loop. We cannot loop because async calls do not get fired during the loop and, hence, cannot register new callbacks for new ready FDs. The loop is implicit now. When the next callback is registered, we check whether the last accept(2) call was successful or OPTIMISTIC_IO is defined and call acceptNext() again if yes. AcceptNext() may schedule another async call (using the being-submitted callback) if the socket was still ready. Since callbacks are fired until there are no callabacks left, we still have an accept loop. Polished AcceptFD class to expose fewer internals and remove a few XXXs. Removed unused nullCallback() methods. Members: src/comm.cc:1.81.4.4->1.81.4.5 Index: squid3/src/comm.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/comm.cc,v retrieving revision 1.81.4.4 retrieving revision 1.81.4.5 diff -u -r1.81.4.4 -r1.81.4.5 --- squid3/src/comm.cc 13 Dec 2007 05:40:30 -0000 1.81.4.4 +++ squid3/src/comm.cc 13 Dec 2007 18:13:32 -0000 1.81.4.5 @@ -1,6 +1,6 @@ /* - * $Id: comm.cc,v 1.81.4.4 2007/12/13 05:40:30 rousskov Exp $ + * $Id: comm.cc,v 1.81.4.5 2007/12/13 18:13:32 rousskov Exp $ * * DEBUG: section 5 Socket Functions * AUTHOR: Harvest Derived @@ -270,18 +270,19 @@ { public: - AcceptFD(int aFd = -1): callback(0), fd(aFd) {} + AcceptFD(int aFd = -1): fd(aFd), theCallback(0), mayAcceptMore(false) {} - void doCallback(int newfd, comm_err_t errcode, int xerrno, const ConnectionDetail &); - void nullCallback(); + void subscribe(AsyncCall *call); + void acceptNext(); + void notify(int newfd, comm_err_t, int xerrno, const ConnectionDetail &); - void acceptSome(); - - AsyncCall *callback; int fd; private: bool acceptOne(); + + AsyncCall *theCallback; + bool mayAcceptMore; }; typedef enum { @@ -1247,21 +1248,6 @@ } void -CommRead::nullCallback() -{ - delete callback; - callback = NULL; -} - -// TODO: remove duplicate method, merge with CommRead::nullCallback -void -AcceptFD::nullCallback() -{ - delete callback; - callback = NULL; -} - -void CommRead::doCallback(comm_err_t errcode, int xerrno) { if (callback) { @@ -1277,18 +1263,18 @@ } void -AcceptFD::doCallback(int newfd, comm_err_t errcode, int xerrno, const ConnectionDetail &connDetails) +AcceptFD::notify(int newfd, comm_err_t errcode, int xerrno, const ConnectionDetail &connDetails) { - if (callback) { + if (theCallback) { typedef CommAcceptCbParams Params; - Params ¶ms = GetCommParams(callback); + Params ¶ms = GetCommParams(theCallback); params.fd = fd; params.nfd = newfd; params.details = connDetails; params.flag = errcode; params.xerrno = xerrno; - ScheduleCallHere(callback); - callback = NULL; + ScheduleCallHere(theCallback); + theCallback = NULL; } } @@ -1351,7 +1337,7 @@ } /* Do callbacks for read/accept routines, if any */ - fdc_table[fd].doCallback(-1, COMM_ERR_CLOSING, 0, ConnectionDetail()); + fdc_table[fd].notify(-1, COMM_ERR_CLOSING, 0, ConnectionDetail()); commCallCloseHandlers(fd); @@ -1889,11 +1875,40 @@ return sock; } +// AcceptFD::callback() wrapper +void +comm_accept(int fd, IOACB *handler, void *handler_data) { + debugs(5, 5, "comm_accept: FD " << fd << " handler: " << (void*)handler); + assert(isOpen(fd)); + + AsyncCall *call = commCbCall(5,5, "SomeCommAcceptHandler", + CommAcceptCbPtrFun(handler, handler_data)); + fdc_table[fd].subscribe(call); +} + +// Called when somebody wants to be notified when our socket accepts new +// connection. We do not probe the FD until there is such interest. +void +AcceptFD::subscribe(AsyncCall *call) { + /* make sure we're not pending! */ + assert(!theCallback); + theCallback = call; + +#if OPTIMISTIC_IO + mayAcceptMore = true; // even if we failed to accept last time +#endif + + if (mayAcceptMore) + acceptNext(); + else + commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0); +} + bool AcceptFD::acceptOne() { // If there is no callback and we accept, we will leak the accepted FD. // When we are running out of FDs, there is often no callback. - if (!callback) { + if (!theCallback) { debugs(5, 5, "AcceptFD::acceptOne orphaned: FD " << fd); // XXX: can we remove this and similar "just in case" calls and // either listen always or listen only when there is a callback? @@ -1915,43 +1930,31 @@ /* Check for errors */ if (newfd < 0) { + assert(theCallback); + if (newfd == COMM_NOMESSAGE) { /* register interest again */ - debugs(5, 5, "AcceptFD::acceptOne eof: FD " << fd << " handler: " << (void*)callback); + debugs(5, 5, "AcceptFD::acceptOne eof: FD " << fd << + " handler: " << *theCallback); commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0); return false; } - /* A non-recoverable error - register an error callback */ - // TODO: replace with doCallback(...)? - typedef CommAcceptCbParams Params; - Params ¶ms = GetCommParams(callback); - params.fd = fd; - params.details = connDetails; - params.flag = COMM_ERROR; - params.xerrno = errno; - ScheduleCallHere(callback); - - callback = NULL; + // A non-recoverable error; notify the caller */ + notify(-1, COMM_ERROR, errno, connDetails); return false; } - debugs(5, 5, "AcceptFD::acceptOne accepted: FD " << fd << " handler: " << (void*)callback << " newfd: " << newfd); - - assert(callback); - doCallback(newfd, COMM_OK, 0, connDetails); - -// XXX: the code below assumes that the accept callback is synchronous -// because doCallback always sets callback to NULL - /* If we weren't re-registed, don't bother trying again! */ - - return callback != NULL; + assert(theCallback); + debugs(5, 5, "AcceptFD::acceptOne accepted: FD " << fd << + " newfd: " << newfd << " handler: " << *theCallback); + notify(newfd, COMM_OK, 0, connDetails); + return true; } void -AcceptFD::acceptSome() { - const int limit = 10; - for (int count = 0; count < limit && acceptOne(); ++count); +AcceptFD::acceptNext() { + mayAcceptMore = acceptOne(); } /* @@ -1961,35 +1964,7 @@ static void comm_accept_try(int fd, void *) { assert(isOpen(fd)); - fdc_table[fd].acceptSome(); -} - -/* - * Notes: - * + the current interface will queue _one_ accept per io loop. - * this isn't very optimal and should be revisited at a later date. - */ -void -comm_accept(int fd, IOACB *handler, void *handler_data) { - debugs(5, 5, "comm_accept: FD " << fd << " handler: " << (void*)handler); - requireOpenAndActive(fd); - - /* make sure we're not pending! */ - assert(fdc_table[fd].callback == NULL); - - /* Record our details */ - AsyncCall *call = commCbCall(5,5, "SomeCommAcceptHandler", - CommAcceptCbPtrFun(handler, handler_data)); - fdc_table[fd].callback = call; - - /* Kick off the accept */ -#if OPTIMISTIC_IO - - comm_accept_try(fd, NULL); -#else - - commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0); -#endif + fdc_table[fd].acceptNext(); } void CommIO::Initialise() {