--------------------- PatchSet 1542 Date: 2005/08/20 06:15:05 Author: rousskov Branch: squid3-icap Tag: (none) Log: - Guarded async note*() calls to catch exceptions, provide a single point for "are we done yet?" analysis, and provide call enter/exit information for debugging - Removed terminating new lines from debugging messages - Polished notification (notify*), but it should be simplified further. Members: src/ICAPXaction.cc:1.1.2.1->1.1.2.2 src/ICAPXaction.h:1.1.2.1->1.1.2.2 Index: squid3/src/ICAPXaction.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/ICAPXaction.cc,v retrieving revision 1.1.2.1 retrieving revision 1.1.2.2 diff -u -r1.1.2.1 -r1.1.2.2 --- squid3/src/ICAPXaction.cc 19 Aug 2005 20:11:55 -0000 1.1.2.1 +++ squid3/src/ICAPXaction.cc 20 Aug 2005 06:15:05 -0000 1.1.2.2 @@ -61,14 +61,24 @@ // call guards for all "asynchronous" note*() methods that are being called // from message pipes or the comm module + +// asynchronous call entry: +// - remember default notify address (currently hard-coded to notifyAll); +// - open the try clause; +// - call callStart(). #define ICAPXaction_Enter(method) \ + const Notify defaultNotify = notifyAll; \ try { \ callStart(#method); +// asynchronous call exit: +// - close the try clause; +// - catch exceptions; +// - let callEnd() handle transaction termination conditions #define ICAPXaction_Exit(method) \ } \ catch (const TextException &e) { \ - callException(#method, e); \ + callException(#method, e, defaultNotify); \ } \ callEnd(#method); @@ -78,7 +88,7 @@ } ICAPXaction::ICAPXaction(): self(NULL), virgin(NULL), adapted(NULL), - service(0), connection(-1), notify(notifyUnknown) { + service(NULL), connection(-1), notify(notifyUnknown) { } ICAPXaction::~ICAPXaction() { @@ -95,11 +105,11 @@ adapted = anAdapted; // receiving end - virgin->sink = this; + virgin->sink = this; // should be 'self' and refcounted // virgin pipe data is initiated by the source // sending end - adapted->sink = this; + adapted->source = this; // should be 'self' and refcounted adapted->data = new MsgPipeData; adapted->data->body = new MemBuf; // XXX: make body a non-pointer? memBufInit(adapted->data->body, ICAPMsgPipeBufSize, ICAPMsgPipeBufSize); @@ -119,10 +129,17 @@ // HTTP side starts sending virgin data void ICAPXaction::noteSourceStart(MsgPipe *p) { + ICAPXaction_Enter(noteSourceStart); + pickService(); openConnection(); // put nothing here as openConnection calls commConnectStart - // and that may call us back without waiting for next select loop + // and that may call us back without waiting for the next select loop + + // XXX: but this has to be here to catch other errors. Thus, if + // commConnectStart fails (see the comment above), we may get here + // _after_ the object got destroyed. Somebody please fix commConnectStart! + ICAPXaction_Exit(noteSourceStart); } // TODO: obey service-specific, OPTIONS-reported connection limit @@ -157,29 +174,30 @@ // connection with the ICAP service established void ICAPXaction::noteCommConnected(comm_err_t status) { - if (status != COMM_OK) { - mustStop(notifyHttp); - return; - } + ICAPXaction_Enter(noteCommConnected); - startReading(); + Must(status == COMM_OK); + + startReading(); // wait for early errors from the ICAP server makeRequestHeaders(); // write headers only; comm module will free the requestBuf comm_old_write_mbuf(connection, requestBuf, &ICAPXaction_noteCommWroteHeaders, this); + + ICAPXaction_Exit(noteCommConnected); } void ICAPXaction::noteCommWroteHeaders(comm_err_t status) { + ICAPXaction_Enter(noteCommWroteHeaders); + Must(state.isWriting); state.isWriting = false; - if (status != COMM_OK) { - mustStop(notifyHttp); - return; - } - + Must(status == COMM_OK); writeMoreBody(); + + ICAPXaction_Exit(noteCommWroteHeaders); } void ICAPXaction::writeMoreBody() { @@ -205,22 +223,31 @@ } void ICAPXaction::noteCommWroteBody(comm_err_t status, size_t sz) { + ICAPXaction_Enter(noteCommWroteBody); + state.isWriting = false; - if (status != COMM_OK) { - mustStop(notifyHttp); - return; - } + Must(status == COMM_OK); writeMoreBody(); + + ICAPXaction_Exit(noteCommTimeout); } // communication timeout with the ICAP service void ICAPXaction::noteCommTimeout() { + ICAPXaction_Enter(noteCommTimeout); + mustStop(notifyHttp); + + ICAPXaction_Exit(noteCommTimeout); } // unexpected connection close while talking to the ICAP service void ICAPXaction::noteCommClose() { + ICAPXaction_Enter(noteCommClose); + mustStop(notifyHttp); + + ICAPXaction_Exit(noteCommClose); } bool ICAPXaction::done() const { @@ -252,15 +279,13 @@ // comm module read a portion of the ICAP response for us void ICAPXaction::noteCommRead(comm_err_t status, size_t sz) { - Must(state.isReading); - Must(!state.doneParsing()); + ICAPXaction_Enter(noteCommRead); + Must(state.isReading); state.isReading = false; - if (status != COMM_OK) { - mustStop(notifyHttp); - return; - } + Must(!state.doneParsing()); + Must(status == COMM_OK); if (sz > 0) { readBuf.appended(sz); @@ -269,6 +294,8 @@ if (!state.doneReading()) readMore(); + + ICAPXaction_Exit(noteCommRead); } void ICAPXaction::parseMore() { @@ -291,31 +318,51 @@ // HTTP side added virgin body data void ICAPXaction::noteSourceProgress(MsgPipe *p) { + ICAPXaction_Enter(noteSourceProgress); + Must(!state.doneReceiving); writeMoreBody(); + + ICAPXaction_Exit(noteSourceProgress); } // HTTP side sent us all virgin info void ICAPXaction::noteSourceFinish(MsgPipe *p) { + ICAPXaction_Enter(noteSourceFinish); + Must(!state.doneReceiving); state.doneReceiving = true; + + ICAPXaction_Exit(noteSourceFinish); } // HTTP side is aborting void ICAPXaction::noteSourceAbort(MsgPipe *p) { + ICAPXaction_Enter(noteSourceAbort); + Must(!state.doneReceiving); state.doneReceiving = true; + + ICAPXaction_Exit(noteSourceAbort); } // HTTP side wants more adapted data and possibly freed some buffer space void ICAPXaction::noteSinkNeed(MsgPipe *p) { + ICAPXaction_Enter(noteSinkNeed); + Must(!state.doneSending); parseMore(); + + ICAPXaction_Exit(noteSinkNeed); } // HTTP side aborted void ICAPXaction::noteSinkAbort(MsgPipe *p) { + ICAPXaction_Enter(noteSinkAbort); + mustStop(notifyService); + + ICAPXaction_Exit(noteSinkAbort); } void ICAPXaction::pickService() { @@ -328,31 +375,32 @@ Must(state.inCall); // otherwise nobody will call doStop() Must(notify == notifyUnknown); notify = who; - debugs(93, 5, "ICAPXaction will stop and notify " << notify << "\n"); + debugs(93, 5, "ICAPXaction will stop and notify " << notify); } // internal cleanup void ICAPXaction::doStop() { - debugs(93, 5, "ICAPXaction::doStop\n"); + debugs(93, 5, "ICAPXaction::doStop"); memBufClean(&readBuf); memBufClean(&requestBuf); - closeConnection(); + + closeConnection(); // TODO: pconn support: close unless notifyService ... if (virgin) { - if (notify == notifyHttp) + if (notify == notifyHttp || notify == notifyAll) virgin->sendSinkAbort(); // this is the place to decrement refcount ptr virgin = NULL; } if (adapted) { - if (notify == notifyHttp) + if (notify == notifyHttp || notify == notifyAll) adapted->sendSourceAbort(); // this is the place to decrement refcount ptr adapted = NULL; } - if (self != NULL) { + if (self != NULL) { // even if notify is notifyNone Pointer s = self; self = NULL; ICAPNoteXactionDone(s); @@ -365,27 +413,27 @@ } void ICAPXaction::callStart(const char *method) { - debugs(93, 5, "ICAPXaction::" << method << " called\n"); + debugs(93, 5, "ICAPXaction::" << method << " called."); state.inCall = true; } void ICAPXaction::callException(const char *method, const TextException &e, Notify defaultWho) { debugs(93, 4, "ICAPXaction::" << method << " caught an exception: " << - e.message << "\n"); + e.message); if (!done()) mustStop(defaultWho); } void ICAPXaction::callEnd(const char *method) { - debugs(93, 5, "ICAPXaction::" << method << " ending\n"); + debugs(93, 5, "ICAPXaction::" << method << " ending"); if (done()) { doStop(); return; } - debugs(93, 5, "ICAPXaction continues after " << method << "\n"); + debugs(93, 5, "ICAPXaction continues after " << method); - debugs(93, 6, "ICAPXaction::" << method << " ended.\n"); + debugs(93, 6, "ICAPXaction::" << method << " ended"); state.inCall = false; } Index: squid3/src/ICAPXaction.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/ICAPXaction.h,v retrieving revision 1.1.2.1 retrieving revision 1.1.2.2 diff -u -r1.1.2.1 -r1.1.2.2 --- squid3/src/ICAPXaction.h 19 Aug 2005 20:11:55 -0000 1.1.2.1 +++ squid3/src/ICAPXaction.h 20 Aug 2005 06:15:05 -0000 1.1.2.2 @@ -1,6 +1,6 @@ /* - * $Id: ICAPXaction.h,v 1.1.2.1 2005/08/19 20:11:55 rousskov Exp $ + * $Id: ICAPXaction.h,v 1.1.2.2 2005/08/20 06:15:05 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -48,7 +48,6 @@ class ICAPXaction: public MsgPipeSource, public MsgPipeSink, public RefCountable { public: typedef RefCount Pointer; - CBDATA_CLASS2(ICAPXaction); public: ICAPXaction(); @@ -94,7 +93,8 @@ bool done() const; - typedef enum { notifyUnknown, notifyNone, notifyService, notifyHttp } Notify; + typedef enum { notifyUnknown, notifyNone, notifyService, notifyHttp, + notifyAll } Notify; void mustStop(Notify who); void doStop(); @@ -134,6 +134,8 @@ } state; Notify notify; + + CBDATA_CLASS2(ICAPXaction); }; // destroys (or pools) the transaction; implemented in ICAPClient.cc (ick?)