--------------------- PatchSet 6209 Date: 2007/11/29 21:00:27 Author: rousskov Branch: async-calls Tag: (none) Log: Added initial implelentation of AsyncCall-based wrappers for comm callbacks. The comm layer no longer calls callbacks from the select loop. Instead, the select loop schedules async calls. Scheduled calls are then called from the main loop (like all other async calls), after the select loop completes. There are a few corner cases and unknowns marked by XXXs that will need to be revisited and cleaned up. A few XXXs mark outdated assumptions that callback calls are synchronous. I suspect that most of the internal types holding callback information can also be eliminated if they start using generic AsyncCall pointers. The intermediate collection of ready-to-fire callbacks can probably be removed as well: It seems to duplicate part of the async call scheduling functionality. Currently, only comm callbacks based on good old function pointers are supported. Support for calling job methods will be added next. Added comm_read() that accepts AsyncCall. It is used by delayed reads now. Eventually, all CNCB-, IOCB-, and IOACB-accepting comm_ functions will be replaced by functions accepting AsyncCalls. Members: src/CommCalls.cc:1.1->1.1.2.1 src/CommCalls.h:1.1->1.1.2.1 src/CommRead.h:1.8->1.8.20.1 src/Makefile.am:1.131->1.131.4.1 src/comm.cc:1.81->1.81.4.1 --- /dev/null Fri Nov 30 01:21:00 2007 +++ squid3/src/CommCalls.cc Fri Nov 30 01:21:00 2007 @@ -0,0 +1,118 @@ +#include "squid.h" +#include "CommCalls.h" + +/* CommCommonCbParams */ + +#if 0 +CommCommonCbParams::CommCommonCbParams(int aFd, comm_err_t aFlag, + int anXerrno, void *aData): data(cbdataReference(aData)), fd(aFd), + xerrno(anXerrno), flag(aFlag) +{ +} +#endif + +CommCommonCbParams::CommCommonCbParams(void *aData): + data(cbdataReference(aData)), fd(-1), xerrno(0), flag(COMM_OK) +{ +} + +CommCommonCbParams::CommCommonCbParams(const CommCommonCbParams &p): + data(cbdataReference(p.data)), fd(p.fd), xerrno(p.xerrno), flag(p.flag) +{ +} + +CommCommonCbParams::~CommCommonCbParams() +{ + cbdataReferenceDone(data); +} + + +/* CommAcceptCbParams */ + +#if 0 +CommAcceptCbParams::CommAcceptCbParams(int aFd, int aNewFd, + const ConnectionDetail &aDetails, comm_err_t aFlag, int anXerrno, + void *aData): CommCommonCbParams(aFd, aFlag, anXerrno, aData), + details(aDetails), nfd(aNewFd) +{ +} +#endif + +CommAcceptCbParams::CommAcceptCbParams(void *aData): CommCommonCbParams(aData), + nfd(-1) +{ +} + + +/* CommConnectCbParams */ + +#if 0 +CommConnectCbParams::CommConnectCbParams(int aFd, comm_err_t aFlag, + int anXerrno, void *aData): CommCommonCbParams(aFd, aFlag, anXerrno, aData) +{ +} +#endif + +CommConnectCbParams::CommConnectCbParams(void *aData): + CommCommonCbParams(aData) +{ +} + + +/* CommIoCbParams */ + +#if 0 +CommIoCbParams::CommIoCbParams(int aFd, char *aBuf, size_t aSize, + comm_err_t aFlag, int anXerrno, void *aData): + CommCommonCbParams(aFd, aFlag, anXerrno, aData), buf(aBuf), size(aSize) +{ +} +#endif + +CommIoCbParams::CommIoCbParams(void *aData): CommCommonCbParams(aData) +{ +} + + +/* CommAcceptCbPtrFun */ + +CommAcceptCbPtrFun::CommAcceptCbPtrFun(IOACB *aHandler, + const CommAcceptCbParams &aParams): + handler(aHandler), params(aParams) +{ +} + +void +CommAcceptCbPtrFun::operator ()() +{ + handler(params.fd, params.nfd, ¶ms.details, params.flag, params.xerrno, params.data); +} + + +/* CommConnectCbPtrFun */ + +CommConnectCbPtrFun::CommConnectCbPtrFun(CNCB *aHandler, + const CommConnectCbParams &aParams): + handler(aHandler), params(aParams) +{ +} + +void +CommConnectCbPtrFun::operator ()() +{ + handler(params.fd, params.flag, params.xerrno, params.data); +} + + +/* CommIoCbPtrFun */ + +CommIoCbPtrFun::CommIoCbPtrFun(IOCB *aHandler, const CommIoCbParams &aParams): + handler(aHandler), params(aParams) +{ +} + +void +CommIoCbPtrFun::operator ()() +{ + handler(params.fd, params.buf, params.size, params.flag, params.xerrno, params.data); +} --- /dev/null Fri Nov 30 01:21:00 2007 +++ squid3/src/CommCalls.h Fri Nov 30 01:21:00 2007 @@ -0,0 +1,256 @@ + +/* + * $Id: CommCalls.h,v 1.1.2.1 2007/11/29 21:00:30 rousskov Exp $ + */ + +#ifndef SQUID_COMMCALLS_H +#define SQUID_COMMCALLS_H + +#include "comm.h" +#include "ConnectionDetail.h" +#include "AsyncCall.h" + +/* CommCalls implement AsyncCall interface for comm_* callbacks. + * The classes cover two cases: + * - A C-style call using a function pointer (depricated); + * - A C++-style call to an AsyncJob child. + * There are three comm_* callback kinds (connect, accept, and I/O), + * resulting in six classes. + */ + +// Maintains parameters common to all comm callbacks +class CommCommonCbParams { +public: +// CommCommonCbParams(int aFd, comm_err_t aFlag, int anXerrno, void *aData); + CommCommonCbParams(void *aData); + CommCommonCbParams(const CommCommonCbParams &p); + ~CommCommonCbParams(); + +public: + void *data; + int fd; + int xerrno; + comm_err_t flag; + +private: + // not needed and not yet implemented + CommCommonCbParams &operator =(const CommCommonCbParams &p); +}; + +class CommAcceptCbParams: public CommCommonCbParams { +public: +// CommAcceptCbParams(int aFd, int aNewFd, const ConnectionDetail &detail, +// comm_err_t aFlag, int anXerrno, void *aData); + CommAcceptCbParams(void *aData); + +public: + ConnectionDetail details; + int nfd; // TODO: rename to fdNew or somesuch +}; + +class CommConnectCbParams: public CommCommonCbParams { +public: + //CommConnectCbParams(int aFd, comm_err_t aFlag, int anXerrno, void *aData); + CommConnectCbParams(void *aData); +}; + +class CommIoCbParams: public CommCommonCbParams { +public: + //CommIoCbParams(int aFd, char *aBuf, size_t aSize, comm_err_t aFlag, + // int anXerrno, void *aData); + CommIoCbParams(void *aData); + +public: + char *buf; + size_t size; +}; + + +// IOACB wrappers + +class CommAcceptCbPtrFun { +public: + typedef CommAcceptCbParams Params; + + CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams); + void operator ()(); // { handler(params.fd, ...); } + +public: + IOACB *handler; + Params params; +}; + +template +class CommAcceptCbMemFunT { +public: + typedef void (C::*HandlerMethod)(int fd, comm_err_t flag, int xerrno, + void *data); + typedef CommAcceptCbParams Params; + + void operator ()(); // { (handlerObj->*handlerMeth)(params.fd, ...); } + +public: + C *hanlderObj; + HandlerMethod handlerMeth; + Params params; +}; + + +// CNCB wrappers + +class CommConnectCbPtrFun { +public: + typedef CommConnectCbParams Params; + + CommConnectCbPtrFun(CNCB *aHandler, const Params &aParams); + void operator ()(); // { handler(params.fd, ...); } + +public: + CNCB *handler; + Params params; +}; + +template +class CommConnectCbMemFunT { +public: + typedef void (C::*HandlerMethod)(int fd, int nfd, + ConnectionDetail *details, comm_err_t flag, int xerrno, void *data); + typedef CommConnectCbParams Params; + + void operator ()(); // { (handlerObj->*handlerMeth)(params.fd, ...); } + +public: + C *hanlderObj; + HandlerMethod handlerMeth; + Params params; +}; + + +// IOCB wrappers + +class CommIoCbPtrFun { +public: + typedef CommIoCbParams Params; + + CommIoCbPtrFun(IOCB *aHandler, const Params &aParams); + void operator ()(); // { handler(params.fd, ...); } + +public: + IOCB *handler; + Params params; +}; + +template +class CommIoCbMemFunT { +public: + typedef void (C::*HandlerMethod)(int fd, + char *, size_t size, comm_err_t flag, int xerrno, void *data); + typedef CommIoCbParams Params; + + void operator ()(); // { (handlerObj->*handlerMeth)(params.fd, ...); } + +public: + C *hanlderObj; + HandlerMethod handlerMeth; + Params params; +}; + + + +#include "AsyncCall.h" + +// AsyncCall that exposes the right comm parameters. +// The kind of call dialer is unknown at this point. +template +class CommCbCallT: public AsyncCall { +public: + typedef Params_ Params; + inline CommCbCallT(int debugSection, int debugLevel, const char *callName); + + virtual Params ¶ms() = 0; +}; + +// AsyncCall to comm handlers implemented as global functions. +// The dialer is one of the Comm*CbMemFunT above +template +class CommCbFunPtrCallT: public CommCbCallT { +public: + typedef typename Dialer::Params Params; + + inline CommCbFunPtrCallT(int debugSection, int debugLevel, + const char *callName, const Dialer &aDialer); + + inline virtual bool fire(); + inline virtual void handleException(const TextException &e); + inline virtual void end(); + + virtual Params ¶ms() { return dialer.params; } + + Dialer dialer; +}; + +// Conveninece wrapper: It is often easier to call a templated function than +// to create a templated class. +template +inline +CommCbFunPtrCallT *commCbCall(int debugSection, int debugLevel, + const char *callName, const Dialer &dialer) +{ + return new CommCbFunPtrCallT(debugSection, debugLevel, callName, + dialer); +} + + +#if 0 +4 typedef void CNCB(int fd, comm_err_t status, int xerrno, void *data); +6 typedef void IOCB(int fd, char *, size_t size, comm_err_t flag, int xerrno, void *data); +6 typedef void IOACB(int fd, int nfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data); +#endif + +template +CommCbCallT::CommCbCallT(int debugSection, int debugLevel, + const char *callName): + AsyncCall(debugSection, debugLevel, callName) +{ +} + +template +CommCbFunPtrCallT::CommCbFunPtrCallT(int debugSection, int debugLevel, + const char *callName, const Dialer &aDialer): + CommCbCallT(debugSection, debugLevel, + callName), + dialer(aDialer) +{ +} + +template +bool +CommCbFunPtrCallT::fire() +{ + const CommCommonCbParams &p = dialer.params; + if (!cbdataReferenceValid(p.data)) + return false; + + debugs(this->debugSection, this->debugLevel, + this->name << "(FD " << p.fd << ") {"); + dialer(); + return true; +} + +template +void +CommCbFunPtrCallT::handleException(const TextException &e) +{ +assert(!"XXX: implement me"); +} + +template +void +CommCbFunPtrCallT::end() +{ + const CommCommonCbParams &p = dialer.params; + debugs(this->debugSection, this->debugLevel, + "} " << this->name << "(FD " << p.fd << ")"); +} + +#endif /* SQUID_COMMCALLS_H */ Index: squid3/src/CommRead.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/CommRead.h,v retrieving revision 1.8 retrieving revision 1.8.20.1 diff -u -r1.8 -r1.8.20.1 --- squid3/src/CommRead.h 25 Apr 2007 11:51:02 -0000 1.8 +++ squid3/src/CommRead.h 29 Nov 2007 21:00:30 -0000 1.8.20.1 @@ -1,6 +1,6 @@ /* - * $Id: CommRead.h,v 1.8 2007/04/25 11:51:02 squidadm Exp $ + * $Id: CommRead.h,v 1.8.20.1 2007/11/29 21:00:30 rousskov Exp $ * * DEBUG: section 5 Comms * AUTHOR: Robert Collins @@ -42,92 +42,15 @@ #include "squid.h" #include "comm.h" +#include "CommCalls.h" #include "List.h" -template - -class CallBack -{ - -public: - CallBack() : handler(NULL), data(NULL){} - - CallBack(C *aHandler, void *someData) : handler(aHandler), data (NULL) - { - if (someData) - data = cbdataReference(someData); - } - - CallBack(CallBack const &old) : handler(old.handler) - { - if (old.data) - data = cbdataReference (old.data); - else - data = NULL; - } - - ~CallBack() - { - replaceData (NULL); - } - - CallBack &operator = (CallBack const & rhs) - { - handler = rhs.handler; - - replaceData (rhs.data); - - return *this; - } - - bool dataValid() - { - return cbdataReferenceValid(data); - } - - bool operator == (CallBack const &rhs) { return handler==rhs.handler && data==rhs.data;} - -#if 0 - // twould be nice - RBC 20030307 - C callback; -#endif - - C *handler; - void *data; - -private: - void replaceData(void *someData) - { - void *temp = NULL; - - if (someData) - temp = cbdataReference(someData); - - if (data) - cbdataReferenceDone(data); - - data = temp; - } -}; - -#if 0 -// twould be nice - RBC 20030307 -void -CallBack::callback(int fd, char *buf, size_t size , comm_err_t errcode, int xerrno, void *tempData) -{ - assert (tempData == data); - handler (fd, buf, size , errcode, xerrno, data); - *this = CallBack(); -} - -#endif - class CommRead { public: CommRead (); - CommRead (int fd, char *buf, int len, IOCB *handler, void *data); + CommRead (int fd, char *buf, int len, AsyncCall *callback); void queueCallback(size_t retval, comm_err_t errcode, int xerrno); bool hasCallback() const; void hasCallbackInvariant() const; @@ -140,7 +63,7 @@ int fd; char *buf; int len; - CallBack callback; + AsyncCall *callback; static void ReadTry(int fd, void *data); }; Index: squid3/src/Makefile.am =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Makefile.am,v retrieving revision 1.131 retrieving revision 1.131.4.1 diff -u -r1.131 -r1.131.4.1 --- squid3/src/Makefile.am 19 Sep 2007 09:50:44 -0000 1.131 +++ squid3/src/Makefile.am 29 Nov 2007 21:00:27 -0000 1.131.4.1 @@ -1,7 +1,7 @@ # # Makefile for the Squid Object Cache server # -# $Id: Makefile.am,v 1.131 2007/09/19 09:50:44 squidadm Exp $ +# $Id: Makefile.am,v 1.131.4.1 2007/11/29 21:00:27 rousskov Exp $ # # Uncomment and customize the following to suit your needs: # @@ -389,6 +389,8 @@ libsquid_la_SOURCES = \ comm.cc \ comm.h \ + CommCalls.cc \ + CommCalls.h \ IPInterception.cc \ IPInterception.h Index: squid3/src/comm.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/comm.cc,v retrieving revision 1.81 retrieving revision 1.81.4.1 diff -u -r1.81 -r1.81.4.1 --- squid3/src/comm.cc 25 Sep 2007 13:52:18 -0000 1.81 +++ squid3/src/comm.cc 29 Nov 2007 21:00:27 -0000 1.81.4.1 @@ -1,6 +1,6 @@ /* - * $Id: comm.cc,v 1.81 2007/09/25 13:52:18 squidadm Exp $ + * $Id: comm.cc,v 1.81.4.1 2007/11/29 21:00:27 rousskov Exp $ * * DEBUG: section 5 Socket Functions * AUTHOR: Harvest Derived @@ -46,6 +46,7 @@ #include "MemBuf.h" #include "pconn.h" #include "SquidTime.h" +#include "CommCalls.h" #if defined(_SQUID_CYGWIN_) #include @@ -67,8 +68,7 @@ struct _comm_io_callback { iocb_type type; int fd; - IOCB *callback; - void *callback_data; + AsyncCall *callback; char *buf; FREE *freefunc; int size; @@ -111,13 +111,13 @@ * @param size buffer size */ void -commio_set_callback(int fd, iocb_type type, comm_io_callback_t *ccb, IOCB *cb, void *cbdata, char *buf, FREE *freefunc, int size) +commio_set_callback(int fd, iocb_type type, comm_io_callback_t *ccb, + AsyncCall *cb, char *buf, FREE *freefunc, int size) { assert(ccb->active == false); assert(ccb->type == type); ccb->fd = fd; - ccb->callback = cb; - ccb->callback_data = cbdataReference(cbdata); + ccb->callback = cb; // XXX: or clone? Used to cbdataReference(cbdata) here! ccb->buf = buf; ccb->freefunc = freefunc; ccb->size = size; @@ -165,14 +165,12 @@ if (ccb->completed == true) { dlinkDelete(&ccb->node, &commfd_completed_events); } - if (ccb->callback_data) - cbdataReferenceDone(ccb->callback_data); ccb->xerrno = 0; ccb->active = false; ccb->completed = false; + delete ccb->callback; ccb->callback = NULL; - ccb->callback_data = NULL; } /* @@ -184,7 +182,6 @@ commio_call_callback(comm_io_callback_t *ccb) { comm_io_callback_t cb = *ccb; - void *cbdata; assert(cb.active == true); assert(cb.completed == true); debugs(5, 3, "commio_call_callback: called for " << ccb->fd); @@ -195,17 +192,25 @@ ccb->xerrno = 0; ccb->active = false; ccb->completed = false; - ccb->callback = NULL; - ccb->callback_data = NULL; + ccb->callback = NULL; // cb has it /* free data */ if (cb.freefunc) { cb.freefunc(cb.buf); cb.buf = NULL; } - if (cb.callback && cbdataReferenceValidDone(cb.callback_data, &cbdata)) { - /* XXX truely ugly for now! */ - cb.callback(cb.fd, cb.buf, cb.offset, cb.errcode, cb.xerrno, cbdata); + if (cb.callback) { + typedef CommCbCallT Call; + Call *call = dynamic_cast(cb.callback); + assert(call); + + Call::Params ¶ms = call->params(); + params.fd = cb.fd; + params.buf = cb.buf; + params.size = cb.offset; + params.flag = cb.errcode; + params.xerrno = cb.xerrno; + ScheduleCallHere(call); } } @@ -234,7 +239,7 @@ u_short port; struct sockaddr_in S; - CallBack callback; + AsyncCall *callback; struct IN_ADDR in_addr; int fd; @@ -268,7 +273,7 @@ { public: - AcceptFD() : count(0), finished_(false){} + AcceptFD() : callback(0), count(0), finished_(false){} void doCallback(int fd, int newfd, comm_err_t errcode, int xerrno, ConnectionDetail *); void nullCallback(); @@ -277,7 +282,7 @@ size_t acceptCount() const { return count;} bool finishedAccepting() const; - CallBack callback; + AsyncCall *callback; bool finished() const; void finished(bool); @@ -380,31 +385,17 @@ public: MEMPROXY_CLASS(CommAcceptCallbackData); - CommAcceptCallbackData(int const anFd, CallBack, comm_err_t, int, int, ConnectionDetail const &); + CommAcceptCallbackData(int const anFd, AsyncCall*, comm_err_t, int, int, ConnectionDetail const &); virtual void callCallback(); private: - CallBack callback; + AsyncCall *callback; int newfd; ConnectionDetail details; }; MEMPROXY_CLASS_INLINE(CommAcceptCallbackData) -class CommFillCallbackData : public CommCallbackData -{ - -public: - MEMPROXY_CLASS(CommFillCallbackData); - CommFillCallbackData(int const anFd, CallBack aCallback, comm_err_t, int); - virtual void callCallback(); - -private: - CallBack callback; -}; - -MEMPROXY_CLASS_INLINE(CommFillCallbackData) - struct _fd_debug_t { char const *close_file; @@ -427,7 +418,7 @@ registerSelf(); } -CommAcceptCallbackData::CommAcceptCallbackData(int const anFd, CallBack aCallback, comm_err_t anErrcode, int anErrno, int aNewFD, ConnectionDetail const &newDetails) :CommCallbackData(CommCommonCallback(anFd, anErrcode, anErrno)), callback (aCallback), newfd(aNewFD), details(newDetails) +CommAcceptCallbackData::CommAcceptCallbackData(int const anFd, AsyncCall *aCallback, comm_err_t anErrcode, int anErrno, int aNewFD, ConnectionDetail const &newDetails) :CommCallbackData(CommCommonCallback(anFd, anErrcode, anErrno)), callback (aCallback), newfd(aNewFD), details(newDetails) {} void @@ -458,7 +449,19 @@ CommAcceptCallbackData::callCallback() { PROF_start(CommAcceptCallbackData_callCallback); - callback.handler(result.fd, newfd, &details, result.errcode, result.xerrno, callback.data); + + typedef CommCbCallT Call; + Call *call = dynamic_cast(callback); + assert(call); + + Call::Params ¶ms = call->params(); + params.fd = result.fd; + params.nfd = newfd; + params.details = details; + params.flag = result.errcode; + params.xerrno = result.xerrno; + ScheduleCallHere(call); + PROF_stop(CommAcceptCallbackData_callCallback); } @@ -569,6 +572,14 @@ void comm_read(int fd, char *buf, int size, IOCB *handler, void *handler_data) { + comm_read(fd, buf, size, + commCbCall(5,4, "SomeCommIoHandler", + CommIoCbPtrFun(handler, handler_data))); +} + +void +comm_read(int fd, char *buf, int size, AsyncCall *callback) +{ /* Make sure we're not reading anything and we're not closing */ assert(fdc_table[fd].active == 1); assert(!fd_table[fd].flags.closing); @@ -577,7 +588,8 @@ /* Queue the read */ /* XXX ugly */ - commio_set_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd), handler, handler_data, (char *)buf, NULL, size); + commio_set_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd), + callback, (char *)buf, NULL, size); commSetSelect(fd, COMM_SELECT_READ, commHandleRead, COMMIO_FD_READCB(fd), 0); } @@ -676,6 +688,12 @@ * Cancel a pending read. Assert that we have the right parameters, * and that there are no pending read events! * + * XXX: We do not assert that there are no pending read events and + * with async calls it becomes even more difficult. + * The whole interface should be reworked to do callback->cancel() + * instead of searching for places where the callback may be stored and + * updating the state of those places. + * * AHC Don't call the comm handlers? */ void @@ -683,12 +701,17 @@ { requireOpenAndActive(fd); + comm_io_callback_t *cb = COMMIO_FD_READCB(fd); + typedef CommCbFunPtrCallT Call; + Call *call = dynamic_cast(cb->callback); + assert(call); + /* Ok, we can be reasonably sure we won't lose any data here! */ - assert(COMMIO_FD_READCB(fd)->callback == callback); - assert(COMMIO_FD_READCB(fd)->callback_data == data); + assert(call->dialer.handler == callback); + assert(call->params().data == data); /* Delete the callback */ - commio_cancel_callback(fd, COMMIO_FD_READCB(fd)); + commio_cancel_callback(fd, cb); /* And the IO event */ commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); @@ -986,7 +1009,10 @@ cs->fd = fd; cs->host = xstrdup(host); cs->port = port; - cs->callback = CallBack(callback, data); + + cs->callback = commCbCall(5,5, "CommConnectReadyXXX", + CommConnectCbPtrFun(callback, data)); + comm_add_close_handler(fd, commConnectFree, cs); ipcache_nbgethostbyname(host, commConnectDnsHandle, cs); } @@ -1025,15 +1051,21 @@ void ConnectStateData::callCallback(comm_err_t status, int xerrno) { - debugs(5, 3, "commConnectCallback: FD " << fd << ", data " << callback.data); + debugs(5, 3, "commConnectCallback: FD " << fd); + + typedef CommCbCallT Call; + Call *call = dynamic_cast(callback); + assert(call); comm_remove_close_handler(fd, commConnectFree, this); - CallBack aCallback = callback; - callback = CallBack(); + callback = NULL; commSetTimeout(fd, -1, NULL, NULL); - if (aCallback.dataValid()) - aCallback.handler(fd, status, xerrno, aCallback.data); + Call::Params ¶ms = call->params(); + params.fd = fd; + params.flag = status; + params.xerrno = xerrno; + ScheduleCallHere(call); commConnectFree(fd, this); } @@ -1043,7 +1075,8 @@ { ConnectStateData *cs = (ConnectStateData *)data; debugs(5, 3, "commConnectFree: FD " << fd); - cs->callback = CallBack(); + delete cs->callback; + cs->callback = NULL; safe_free(cs->host); delete cs; } @@ -1072,8 +1105,10 @@ int ConnectStateData::commResetFD() { - if (!cbdataReferenceValid(callback.data)) - return 0; +// XXX: do we have to check this? +// +// if (!cbdataReferenceValid(callback.data)) +// return 0; statCounter.syscalls.sock.sockets++; @@ -1453,31 +1488,55 @@ void CommRead::nullCallback() { - callback = CallBack(); + delete callback; + callback = NULL; } +// TODO: remove duplicate method, merge with CommRead::nullCallback void AcceptFD::nullCallback() { - callback = CallBack(); + delete callback; + callback = NULL; } void CommRead::doCallback(comm_err_t errcode, int xerrno) { - if (callback.handler) - callback.handler(fd, buf, 0, errcode, xerrno, callback.data); + if (callback) { + typedef CommCbCallT Call; + Call *call = dynamic_cast(callback); + assert(call); + + Call::Params ¶ms = call->params(); + params.fd = fd; + params.size = 0; + params.flag = errcode; + params.xerrno = xerrno; + ScheduleCallHere(call); - nullCallback(); + callback = NULL; + } } void AcceptFD::doCallback(int fd, int newfd, comm_err_t errcode, int xerrno, ConnectionDetail *connDetails) { - if (callback.handler) { - CallBack aCallback = callback; - nullCallback(); - aCallback.handler(fd, newfd, connDetails, errcode, xerrno, aCallback.data); + if (callback) { + typedef CommCbCallT Call; + Call *call = dynamic_cast(callback); + assert(call); + + Call::Params ¶ms = call->params(); + params.fd = fd; + params.nfd = newfd; + if (connDetails) + params.details = *connDetails; + params.flag = errcode; + params.xerrno = xerrno; + ScheduleCallHere(call); + + callback = NULL; } } @@ -1935,7 +1994,10 @@ fatalf ("comm_write: fd %d: pending callback!\n", fd); } /* XXX ugly */ - commio_set_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd), handler, handler_data, (char *)buf, free_func, size); + commio_set_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd), + commCbCall(5,5, "SomeWriteHander", + CommIoCbPtrFun(handler, handler_data)), + (char *)buf, free_func, size); commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, COMMIO_FD_WRITECB(fd), 0); } @@ -2106,7 +2168,7 @@ fdc_t::acceptOne(int fd) { // If there is no callback and we accept, we will leak the accepted FD. // When we are running out of FDs, there is often no callback. - if (!accept.accept.callback.handler) { + if (!accept.accept.callback) { debugs(5, 5, "fdc_t::acceptOne orphaned: FD " << fd); // XXX: can we remove this and similar "just in case" calls and // either listen always or listen only when there is a callback? @@ -2130,7 +2192,7 @@ if (newfd < 0) { if (newfd == COMM_NOMESSAGE) { /* register interest again */ - debugs(5, 5, "fdc_t::acceptOne eof: FD " << fd << " handler: " << (void*)accept.accept.callback.handler); + debugs(5, 5, "fdc_t::acceptOne eof: FD " << fd << " handler: " << (void*)accept.accept.callback); commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0); accept.accept.finished(true); return; @@ -2138,22 +2200,22 @@ /* A non-recoverable error - register an error callback */ new CommAcceptCallbackData(fd, accept.accept.callback, COMM_ERROR, errno, -1, accept.connDetails); - - accept.accept.callback = CallBack(); + accept.accept.callback = NULL; // CommAcceptCallbackData got ours accept.accept.finished(true); return; } - debugs(5, 5, "fdc_t::acceptOne accepted: FD " << fd << " handler: " << (void*)accept.accept.callback.handler << " newfd: " << newfd); + debugs(5, 5, "fdc_t::acceptOne accepted: FD " << fd << " handler: " << (void*)accept.accept.callback << " newfd: " << newfd); - assert(accept.accept.callback.handler); + assert(accept.accept.callback); accept.accept.doCallback(fd, newfd, COMM_OK, 0, &accept.connDetails); +// XXX: the code below assumes that the accept callback is synchronous /* If we weren't re-registed, don't bother trying again! */ - if (accept.accept.callback.handler == NULL) + if (!accept.accept.callback) accept.accept.finished(true); } @@ -2197,10 +2259,12 @@ requireOpenAndActive(fd); /* make sure we're not pending! */ - assert(fdc_table[fd].accept.accept.callback.handler == NULL); + assert(fdc_table[fd].accept.accept.callback == NULL); /* Record our details */ - fdc_table[fd].accept.accept.callback = CallBack (handler, handler_data); + AsyncCall *call = commCbCall(5,5, "CommAcceptReadyXXX", + CommAcceptCbPtrFun(handler, handler_data)); + fdc_table[fd].accept.accept.callback = call; /* Kick off the accept */ #if OPTIMISTIC_IO @@ -2416,10 +2480,10 @@ */ } -CommRead::CommRead() : fd(-1), buf(NULL), len(0) {} +CommRead::CommRead() : fd(-1), buf(NULL), len(0), callback(NULL) {} -CommRead::CommRead(int fd_, char *buf_, int len_, IOCB *handler_, void *data_) - : fd(fd_), buf(buf_), len(len_), callback(handler_, data_) {} +CommRead::CommRead(int fd_, char *buf_, int len_, AsyncCall *callback_) + : fd(fd_), buf(buf_), len(len_), callback(callback_) {} DeferredRead::DeferredRead () : theReader(NULL), theContext(NULL), theRead(), cancelled(false) {}