This patch is generated from the async-calls branch of HEAD in squid3
Tue Apr 22 00:24:12 2008 GMT
See http://devel.squid-cache.org/

Index: squid3/src/AsyncCall.h
diff -u squid3/src/AsyncCall.h:1.5 squid3/src/AsyncCall.h:1.3.22.24
--- squid3/src/AsyncCall.h:1.5	Tue Feb 26 13:50:51 2008
+++ squid3/src/AsyncCall.h	Mon Apr 21 15:32:38 2008
@@ -5,9 +5,9 @@
 #ifndef SQUID_ASYNCCALL_H
 #define SQUID_ASYNCCALL_H
 
-//#include "cbdata.h"
+#include "cbdata.h"
 #include "event.h"
-//#include "TextException.h"
+#include "TextException.h"
 
 /**
  \defgroup AsynCallsAPI Async-Calls API
@@ -29,6 +29,7 @@
  * debugging.
  */
 
+
 class CallDialer;
 class AsyncCallQueue;
 
@@ -90,6 +91,7 @@
     return os;
 }
 
+
 /**
  \ingroup AsyncCallAPI
  * Interface for all async call dialers
Index: squid3/src/BodyPipe.cc
diff -u squid3/src/BodyPipe.cc:1.9 squid3/src/BodyPipe.cc:1.7.4.13
--- squid3/src/BodyPipe.cc:1.9	Thu Apr 17 14:52:44 2008
+++ squid3/src/BodyPipe.cc	Mon Apr 21 15:32:39 2008
@@ -99,7 +99,7 @@
 {
 	debugs(91,7, HERE << this << " will not produce for " << pipe <<
 		"; atEof: " << atEof);
-	assert(pipe != NULL); // be strict: the caller state may depend on this
+	Must(pipe != NULL); // be strict: the caller state may depend on this
 	pipe->clearProducer(atEof);
 	pipe = NULL;
 }
@@ -112,7 +112,7 @@
 void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
 {
 	debugs(91,7, HERE << this << " will not consume from " << pipe);
-	assert(pipe != NULL); // be strict: the caller state may depend on this
+	Must(pipe != NULL); // be strict: the caller state may depend on this
 	pipe->clearConsumer();
 	pipe = NULL;
 }
@@ -141,14 +141,14 @@
 
 void BodyPipe::setBodySize(uint64_t aBodySize)
 {
-	assert(!bodySizeKnown());
-	assert(aBodySize >= 0);
-	assert(thePutSize <= aBodySize);
+	Must(!bodySizeKnown());
+	Must(aBodySize >= 0);
+	Must(thePutSize <= aBodySize);
 
 	// If this assert fails, we need to add code to check for eof and inform
 	// the consumer about the eof condition via scheduleBodyEndNotification,
 	// because just setting a body size limit may trigger the eof condition.
-	assert(!theConsumer); 
+	Must(!theConsumer); 
 
 	theBodySize = aBodySize;
 	debugs(91,7, HERE << "set body size" << status());
@@ -156,13 +156,13 @@
 
 uint64_t BodyPipe::bodySize() const
 {
-	assert(bodySizeKnown());
+	Must(bodySizeKnown());
 	return static_cast<uint64_t>(theBodySize);
 }
 
 bool BodyPipe::expectMoreAfter(uint64_t offset) const
 {
-	assert(theGetSize <= offset);
+	Must(theGetSize <= offset);
 	return offset < thePutSize || // buffer has more now or
 		(!productionEnded() && mayNeedMoreData()); // buffer will have more
 }
@@ -191,7 +191,7 @@
 				debugs(91,3, HERE << "aborting on premature eof" << status());
 		} else {
 			// asserta that we can detect the abort if the consumer joins later
-			assert(!bodySizeKnown() || bodySize() != thePutSize);
+			Must(!bodySizeKnown() || bodySize() != thePutSize);
 		}
 		scheduleBodyEndNotification();
 	}
@@ -215,8 +215,8 @@
 bool
 BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
 {
-	assert(!theConsumer);
-	assert(aConsumer);
+	Must(!theConsumer);
+	Must(aConsumer);
 
 	// TODO: convert this into an exception and remove IfNotLate suffix
 	// If there is something consumed already, we are in an auto-consuming mode
@@ -302,7 +302,7 @@
 
 MemBuf &
 BodyPipe::checkOut() {
-	assert(!isCheckedOut);
+	Must(!isCheckedOut);
 	isCheckedOut = true;
 	return theBuf;	
 }
@@ -310,7 +310,7 @@
 void
 BodyPipe::checkIn(Checkout &checkout)
 {
-	assert(isCheckedOut);
+	Must(isCheckedOut);
 	isCheckedOut = false;
 	const size_t currentSize = theBuf.contentSize();
 	if (checkout.checkedOutSize > currentSize)
@@ -323,14 +323,14 @@
 void
 BodyPipe::undoCheckOut(Checkout &checkout)
 {
-	assert(isCheckedOut);
+	Must(isCheckedOut);
 	const size_t currentSize = theBuf.contentSize();
 	// We can only undo if size did not change, and even that carries
 	// some risk. If this becomes a problem, the code checking out
 	// raw buffers should always check them in (possibly unchanged)
 	// instead of relying on the automated undo mechanism of Checkout.
 	// The code can always use a temporary buffer to accomplish that.
-	assert(checkout.checkedOutSize == currentSize);
+	Must(checkout.checkedOutSize == currentSize);
 }
 
 // TODO: Optimize: inform consumer/producer about more data/space only if
@@ -338,7 +338,7 @@
 
 void
 BodyPipe::postConsume(size_t size) {
-	assert(!isCheckedOut);
+	Must(!isCheckedOut);
 	theGetSize += size;
 	debugs(91,7, HERE << "consumed " << size << " bytes" << status());
 	if (mayNeedMoreData()){
@@ -352,7 +352,7 @@
 
 void
 BodyPipe::postAppend(size_t size) {
-	assert(!isCheckedOut);
+	Must(!isCheckedOut);
 	thePutSize += size;
 	debugs(91,7, HERE << "added " << size << " bytes" << status());
 
@@ -453,7 +453,7 @@
 void
 BodyPipeCheckout::checkIn()
 {
-	assert(!checkedIn);
+	Must(!checkedIn);
 	pipe.checkIn(*this);
 	checkedIn = true;
 }
Index: squid3/src/Debug.h
diff -u squid3/src/Debug.h:1.14 squid3/src/Debug.h:1.10.4.7
--- squid3/src/Debug.h:1.14	Thu Mar 20 18:22:17 2008
+++ squid3/src/Debug.h	Mon Apr 21 15:32:41 2008
@@ -51,6 +51,11 @@
 #define assert(EX)  ((EX)?((void)0):xassert("EX", __FILE__, __LINE__))
 #endif
 
+void WillCatchException(int debug_section, int debug_level, const char *who);
+void WontCatchException();
+extern int TheSalvagedAsserts;
+extern int TheAssertsPerStep;
+
 /* defined debug section limits */
 #define MAX_DEBUG_SECTIONS 100
 
Index: squid3/src/ESIInclude.cc
diff -u squid3/src/ESIInclude.cc:1.17 squid3/src/ESIInclude.cc:1.14.4.3
--- squid3/src/ESIInclude.cc:1.17	Tue Feb 12 16:50:41 2008
+++ squid3/src/ESIInclude.cc	Wed Apr  2 15:48:23 2008
@@ -110,7 +110,8 @@
                 rep = NULL;
                 esiStream->include->fail (esiStream);
                 esiStream->finished = 1;
-                httpRequestFree (http);
+                http->httpRequestFree ();
+		cbdataInternalUnlock(http); 
                 return;
             }
 
@@ -157,7 +158,8 @@
         debugs(86, 5, "Finished reading upstream data in subrequest");
         esiStream->include->subRequestDone (esiStream, true);
         esiStream->finished = 1;
-        httpRequestFree (http);
+        http->httpRequestFree ();
+	cbdataInternalUnlock(http);
         return;
     }
 
@@ -181,14 +183,16 @@
         debugs(86, 3, "ESI subrequest finished OK");
         esiStream->include->subRequestDone (esiStream, true);
         esiStream->finished = 1;
-        httpRequestFree (http);
+        http->httpRequestFree ();
+	cbdataInternalUnlock(http);
         return;
 
     case STREAM_FAILED:
         debugs(86, 1, "ESI subrequest failed transfer");
         esiStream->include->fail (esiStream);
         esiStream->finished = 1;
-        httpRequestFree (http);
+        http->httpRequestFree ();
+	cbdataInternalUnlock(http);
         return;
 
     case STREAM_NONE: {
Index: squid3/src/cf.data.pre
diff -u squid3/src/cf.data.pre:1.178 squid3/src/cf.data.pre:1.155.4.6
--- squid3/src/cf.data.pre:1.178	Sat Apr 12 17:13:46 2008
+++ squid3/src/cf.data.pre	Mon Apr 21 15:33:17 2008
@@ -5682,4 +5682,21 @@
 	rounded to 1000.
 DOC_END
 
+NAME: assert_burst_max
+TYPE: int
+LOC: Config.assert_burst_max
+DEFAULT: 0
+DOC_START
+	When this is set to a possitive number then Squid will try to
+	survive from assertions if possible and will die only if an
+	assertions burst exceeds this number. 
+	If set to zero (default), the first assertion aborts Squid, 
+	giving users the old behavior. If set to a negative number,
+	there is no limit.
+	An asssertions burst defined as the number of assertions per 
+	single Squid main loop iteration.
+	WARNING! This is an experimental feature and the definition 
+	of a "burst" can change
+DOC_END
+
 EOF
Index: squid3/src/client_side.cc
diff -u squid3/src/client_side.cc:1.154 squid3/src/client_side.cc:1.139.4.13
--- squid3/src/client_side.cc:1.154	Mon Apr 14 17:13:14 2008
+++ squid3/src/client_side.cc	Mon Apr 21 15:33:20 2008
@@ -251,8 +251,9 @@
 
     if (connRegistered_)
         deRegisterWithConn();
-
-    httpRequestFree(http);
+   
+    http->httpRequestFree();
+    cbdataReferenceDone(http);
 
     /* clean up connection links to us */
     assert(this != next.getRaw());
@@ -304,7 +305,7 @@
     ClientSocketContext *newContext;
     assert(http != NULL);
     newContext = new ClientSocketContext;
-    newContext->http = http;
+    newContext->http = cbdataReference(http);
     return newContext;
 }
 
@@ -557,13 +558,13 @@
         clientStreamAbort((clientStreamNode *)client_stream.tail->data, this);
 }
 
-void
+/*void
 httpRequestFree(void *data)
 {
     ClientHttpRequest *http = (ClientHttpRequest *)data;
     assert(http != NULL);
     delete http;
-}
+}*/
 
 bool
 ConnStateData::areAllContextsForThisConnection() const
@@ -587,17 +588,17 @@
     ClientSocketContext::Pointer context;
 
     while ((context = getCurrentContext()).getRaw() != NULL) {
-        assert(getCurrentContext() !=
+        Must(getCurrentContext() !=
                getCurrentContext()->next);
         context->connIsFinished();
-        assert (context != currentobject);
+        Must(context != currentobject);
     }
 }
 
 /* This is a handler normally called by comm_close() */
 void ConnStateData::connStateClosed(const CommCloseCbParams &io)
 {
-    assert (fd == io.fd);
+    Must (fd == io.fd);
     close();
 }
 
Index: squid3/src/client_side_request.cc
diff -u squid3/src/client_side_request.cc:1.93 squid3/src/client_side_request.cc:1.79.4.15
--- squid3/src/client_side_request.cc:1.93	Sat Apr 12 17:13:46 2008
+++ squid3/src/client_side_request.cc	Mon Apr 21 15:33:21 2008
@@ -142,7 +142,7 @@
 }
 
 ClientHttpRequest::ClientHttpRequest(ConnStateData * aConn) : 
-#if USE_ADAPTATION
+#if USE_ADAPTATION || USE_SSL
 AsyncJob("ClientHttpRequest"),
 #endif
 loggingEntry_(NULL)
@@ -162,7 +162,7 @@
 bool
 ClientHttpRequest::onlyIfCached()const
 {
-    assert(request);
+    Must(request);
     return request->cache_control &&
            EBIT_TEST(request->cache_control->mask, CC_ONLY_IF_CACHED);
 }
@@ -242,7 +242,7 @@
     /* the ICP check here was erroneous
      * - StoreEntry::releaseRequest was always called if entry was valid 
      */
-    assert(logType < LOG_TYPE_MAX);
+    Must(logType < LOG_TYPE_MAX);
 
     logRequest();
 
@@ -292,6 +292,8 @@
     http->req_sz = 0;
     tempBuffer.length = taillen;
     tempBuffer.data = tailbuf;
+    /*http should be unLocked/released  by the caller module (ESI module for example)*/
+    cbdataInternalLock(http); 
     /* client stream setup */
     clientStreamInit(&http->client_stream, clientGetMoreData, clientReplyDetach,
                      clientReplyStatus, new clientReplyContext(http), streamcallback,
@@ -904,7 +906,7 @@
     debugs(85, 4, "ClientHttpRequest::httpStart: " << log_tags[logType] << " for '" << uri << "'");
 
     /* no one should have touched this */
-    assert(out.offset == 0);
+    Must(out.offset == 0);
     /* Use the Stream Luke */
     clientStreamNode *node = (clientStreamNode *)client_stream.tail->data;
     clientStreamRead(node, this, node->readBuffer);
@@ -931,6 +933,7 @@
 }
 
 // called when comm_write has completed
+/*
 static void
 SslBumpEstablish(int, char *, size_t, comm_err_t errflag, int, void *data)
 {
@@ -940,15 +943,15 @@
     assert(r && cbdataReferenceValid(r));
     r->sslBumpEstablish(errflag);
 }
-
+*/
 void
-ClientHttpRequest::sslBumpEstablish(comm_err_t errflag)
+ClientHttpRequest::sslBumpEstablish(const CommIoCbParams &io)
 {
     // Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up
-    if (errflag == COMM_ERR_CLOSING)
+    if (io.flag == COMM_ERR_CLOSING)
         return;
 
-    if (errflag) {
+    if (io.flag) {
         getConn()->startClosing("CONNECT response failure in SslBump");
         return;
     }
@@ -968,8 +971,11 @@
     // TODO: Unify with tunnel.cc and add a Server(?) header
     static const char *const conn_established =
         "HTTP/1.0 200 Connection established\r\n\r\n";
-    comm_write(fd, conn_established, strlen(conn_established),
-        &SslBumpEstablish, this, NULL);
+    
+    typedef CommCbMemFunT<ClientHttpRequest, CommIoCbParams> Dialer;
+    Dialer dialer(this, &ClientHttpRequest::sslBumpEstablish);
+    AsyncCall::Pointer call = asyncCall(85, 5, "ClientHttpRequest::sslBumpEstablish", dialer);
+    comm_write(fd, conn_established, strlen(conn_established), call);
 }
 
 #endif
@@ -980,7 +986,7 @@
     /** TODO: should be querying the stream. */
     int64_t contentLength =
         memObject()->getReply()->bodySize(request->method);
-    assert(contentLength >= 0);
+    Must(contentLength >= 0);
 
     if (out.offset < contentLength)
         return false;
@@ -1043,7 +1049,7 @@
 void
 ClientHttpRequest::doCallouts()
 {
-    assert(calloutContext);
+    Must(calloutContext);
 
     if (!calloutContext->http_access_done) {
         debugs(83, 3, HERE << "Doing calloutContext->clientAccessCheck()");
@@ -1064,7 +1070,7 @@
 
     if (!calloutContext->redirect_done) {
         calloutContext->redirect_done = true;
-        assert(calloutContext->redirect_state == REDIRECT_NONE);
+        Must(calloutContext->redirect_state == REDIRECT_NONE);
 
         if (Config.Program.redirect) {
             debugs(83, 3, HERE << "Doing calloutContext->clientRedirectStart()");
@@ -1137,8 +1143,8 @@
         return false;
     }
 
-    assert(!virginHeadSource);
-    assert(!adaptedBodySource);
+    Must(!virginHeadSource);
+    Must(!adaptedBodySource);
     virginHeadSource = initiateAdaptation(service->makeXactLauncher(
         this, request, NULL));
 
@@ -1148,8 +1154,8 @@
 void
 ClientHttpRequest::noteAdaptationAnswer(HttpMsg *msg)
 {
-    assert(cbdataReferenceValid(this));		// indicates bug
-    assert(msg);
+    Must(cbdataReferenceValid(this));		// indicates bug
+    Must(msg);
 
     if (HttpRequest *new_req = dynamic_cast<HttpRequest*>(msg)) {
         /*
@@ -1163,14 +1169,14 @@
         xfree(uri);
         uri = xstrdup(urlCanonical(request));
         setLogUri(this, urlCanonicalClean(request));
-        assert(request->method.id());
+        Must(request->method.id());
     } else if (HttpReply *new_rep = dynamic_cast<HttpReply*>(msg)) {
         debugs(85,3,HERE << "REQMOD reply is HTTP reply");
 
         // subscribe to receive reply body
         if (new_rep->body_pipe != NULL) {
             adaptedBodySource = new_rep->body_pipe;
-            assert(adaptedBodySource->setConsumerIfNotLate(this));
+            Must(adaptedBodySource->setConsumerIfNotLate(this));
         }
 
         clientStreamNode *node = (clientStreamNode *)client_stream.tail->prev->data;
@@ -1198,15 +1204,15 @@
 ClientHttpRequest::noteAdaptationQueryAbort(bool final)
 {
     clearAdaptation(virginHeadSource);
-    assert(!adaptedBodySource);
+    Must(!adaptedBodySource);
     handleAdaptationFailure(!final);
 }
 
 void
 ClientHttpRequest::noteMoreBodyDataAvailable(BodyPipe::Pointer)
 {
-    assert(request_satisfaction_mode);
-    assert(adaptedBodySource != NULL);
+    Must(request_satisfaction_mode);
+    Must(adaptedBodySource != NULL);
 
     if (const size_t contentSize = adaptedBodySource->buf().contentSize()) {
         BodyPipeCheckout bpc(*adaptedBodySource);
@@ -1226,12 +1232,12 @@
 void
 ClientHttpRequest::noteBodyProductionEnded(BodyPipe::Pointer)
 {
-    assert(!virginHeadSource);
+    Must(!virginHeadSource);
     if (adaptedBodySource != NULL) { // did not end request satisfaction yet
         // We do not expect more because noteMoreBodyDataAvailable always 
         // consumes everything. We do not even have a mechanism to consume
         // leftovers after noteMoreBodyDataAvailable notifications seize.
-        assert(adaptedBodySource->exhausted());
+        Must(adaptedBodySource->exhausted());
         endRequestSatisfaction();
     }
 }
@@ -1239,7 +1245,7 @@
 void
 ClientHttpRequest::endRequestSatisfaction() {
     debugs(85,4, HERE << this << " ends request satisfaction");
-    assert(request_satisfaction_mode);
+    Must(request_satisfaction_mode);
     stopConsumingFrom(adaptedBodySource);
 
     // TODO: anything else needed to end store entry formation correctly?
@@ -1249,7 +1255,7 @@
 void
 ClientHttpRequest::noteBodyProducerAborted(BodyPipe::Pointer)
 {
-    assert(!virginHeadSource);
+    Must(!virginHeadSource);
     stopConsumingFrom(adaptedBodySource);
     handleAdaptationFailure();
 }
@@ -1274,7 +1280,7 @@
 
     clientStreamNode *node = (clientStreamNode *)client_stream.tail->prev->data;
     clientReplyContext *repContext = dynamic_cast<clientReplyContext *>(node->data.getRaw());
-    assert(repContext);
+    Must(repContext);
 
     // The original author of the code also wanted to pass an errno to 
     // setReplyToError, but it seems unlikely that the errno reflects the
Index: squid3/src/client_side_request.h
diff -u squid3/src/client_side_request.h:1.37 squid3/src/client_side_request.h:1.30.4.12
--- squid3/src/client_side_request.h:1.37	Sat Apr 12 17:13:46 2008
+++ squid3/src/client_side_request.h	Mon Apr 21 15:33:22 2008
@@ -63,13 +63,15 @@
 #if USE_ADAPTATION
     : public Adaptation::Initiator, // to start adaptation transactions
     public BodyConsumer     // to receive reply bodies in request satisf. mode
+#elif USE_SSL
+    : virtual public AsyncJob
 #endif
 {
 
 public:
     void *operator new (size_t);
     void operator delete (void *);
-#if USE_ADAPTATION
+#if USE_ADAPTATION || USE_SSL
     void *toCbdata() { return this; }
 #endif
     ClientHttpRequest(ConnStateData *);
@@ -136,10 +138,13 @@
     ClientRequestContext *calloutContext;
     void doCallouts();
 
-#if USE_ADAPTATION
+#if USE_ADAPTATION || USE_SSL
     // AsyncJob virtual methods
     virtual bool doneAll() const { return Initiator::doneAll() && 
 				       BodyConsumer::doneAll() && false;}
+    void httpRequestFree() { deleteThis("httpRequestFree"); }
+#else
+     void httpRequestFree() { delete this; }
 #endif
 
 private:
@@ -153,7 +158,7 @@
 public:
     bool sslBumpNeeded() const;
     void sslBumpStart();
-    void sslBumpEstablish(comm_err_t errflag);
+    void sslBumpEstablish(const CommIoCbParams &io);
 #endif
 
 #if USE_ADAPTATION
Index: squid3/src/debug.cc
diff -u squid3/src/debug.cc:1.28 squid3/src/debug.cc:1.18.4.8
--- squid3/src/debug.cc:1.28	Mon Apr 14 17:13:14 2008
+++ squid3/src/debug.cc	Mon Apr 21 15:33:23 2008
@@ -37,6 +37,7 @@
 
 #include "Debug.h"
 #include "SquidTime.h"
+#include "TextException.h"
 
 /* for Config */
 #include "structs.h"
@@ -44,6 +45,11 @@
 int Debug::Levels[MAX_DEBUG_SECTIONS];
 int Debug::level;
 
+static bool AsyncCall_Handling_Exceptions = 0;
+int TheCascadingAsserts = 0;
+int TheSalvagedAsserts = 0;
+int TheAssertsPerStep = 0;
+
 static char *debug_log_file = NULL;
 static int Ctx_Lock = 0;
 static const char *debugLogTime(void);
@@ -569,9 +575,48 @@
     return buf;
 }
 
+void WillCatchException(int debug_section, int debug_level, const char *who){
+    if(AsyncCall_Handling_Exceptions) {
+	debugs(0, 0, "AsyncCall handling exceptions already enabled! The last caller is:" << who );
+	abort();
+    }
+    debugs(debug_section, debug_level, "The " << who << " will handle exceptions");
+    AsyncCall_Handling_Exceptions = true;
+}
+
+void WontCatchException(){ 
+    AsyncCall_Handling_Exceptions = false;
+    TheCascadingAsserts = 0;
+}
+
+#define MAX_CASCADING_ASSERTS 10
 void
 xassert(const char *msg, const char *file, int line) {
-    debugs(0, 0, "assertion failed: " << file << ":" << line << ": \"" << msg << "\"");
+
+    debugs(0, 0, "assertion failed: " << file << ":" << line << ": \"" << msg << "\"");    
+
+    if (AsyncCall_Handling_Exceptions &&
+	TheCascadingAsserts < MAX_CASCADING_ASSERTS && 
+	( Config.assert_burst_max < 0 ||
+	  (Config.assert_burst_max > 0 && TheAssertsPerStep < Config.assert_burst_max)
+	    )
+	) {
+	TheCascadingAsserts++;
+	TheSalvagedAsserts++;
+	TheAssertsPerStep++;
+
+	debugs(0, 0, "salvaging assertion #" << TheSalvagedAsserts << " (" <<
+	       TheAssertsPerStep << "/" << Config.assert_burst_max << ")");
+
+	throw TextException(msg, file, line);
+    }
+    
+
+    if(TheCascadingAsserts >= MAX_CASCADING_ASSERTS)
+	debugs(0, 0, "dying after " << TheCascadingAsserts << "cascading assertions" );
+    
+    if(Config.assert_burst_max > 0 && TheAssertsPerStep >= Config.assert_burst_max)
+	debugs(0, 0, "dying after an " << TheAssertsPerStep << " assertions burst" );
 
     if (!shutting_down)
         abort();
Index: squid3/src/event.cc
diff -u squid3/src/event.cc:1.19 squid3/src/event.cc:1.16.4.5
--- squid3/src/event.cc:1.19	Tue Feb 12 22:51:18 2008
+++ squid3/src/event.cc	Mon Apr 21 15:33:24 2008
@@ -43,6 +43,7 @@
 static OBJH eventDump;
 static const char *last_event_ran = NULL;
 
+
 // This AsyncCall dialer can be configured to check that the event cbdata is 
 // valid before calling the event handler
 class EventDialer: public CallDialer
Index: squid3/src/http.cc
diff -u squid3/src/http.cc:1.133 squid3/src/http.cc:1.122.4.13
--- squid3/src/http.cc:1.133	Mon Apr 14 17:13:14 2008
+++ squid3/src/http.cc	Mon Apr 21 15:33:28 2008
@@ -162,14 +162,6 @@
 {
     return fd;
 }
-/*
-static void
-httpStateFree(int fd, void *data)
-{
-    HttpStateData *httpState = static_cast<HttpStateData *>(data);
-    debugs(11, 5, "httpStateFree: FD " << fd << ", httpState=" << data);
-    delete httpState;
-}*/
 
 void 
 HttpStateData::httpStateConnClosed(const CommCloseCbParams &params)
@@ -265,7 +257,7 @@
     if (!remove && !forbidden)
         return;
 
-    assert(e->mem_obj);
+    Must(e->mem_obj);
 
     if (e->mem_obj->request)
         pe = storeGetPublicByRequest(e->mem_obj->request);
@@ -273,7 +265,7 @@
         pe = storeGetPublic(e->mem_obj->url, e->mem_obj->method);
 
     if (pe != NULL) {
-        assert(e != pe);
+        Must(e != pe);
         pe->release();
     }
 
@@ -287,7 +279,7 @@
         pe = storeGetPublic(e->mem_obj->url, METHOD_HEAD);
 
     if (pe != NULL) {
-        assert(e != pe);
+        Must(e != pe);
         pe->release();
     }
 
@@ -322,7 +314,7 @@
             pe = storeGetPublic(e->mem_obj->url, METHOD_GET);
 
         if (pe != NULL) {
-            assert(e != pe);
+            Must(e != pe);
             pe->release();
         }
 
@@ -704,7 +696,7 @@
     Ctx ctx = ctx_enter(entry->mem_obj->url);
     debugs(11, 3, "processReplyHeader: key '" << entry->getMD5Text() << "'");
 
-    assert(!flags.headers_parsed);
+    Must(!flags.headers_parsed);
 
     http_status error = HTTP_STATUS_NONE;
 
@@ -735,8 +727,8 @@
 	 }
 	 
 	 if (!parsed) { // need more data
-	      assert(!error);
-	      assert(!eof);
+	      Must(!error);
+	      Must(!eof);
 	      delete newrep;
 	      ctx_exit(ctx);
 	      return;
@@ -952,18 +944,6 @@
 /*
  * This is the callback after some data has been read from the network
  */
-/*
-void
-HttpStateData::ReadReplyWrapper(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
-{
-    HttpStateData *httpState = static_cast<HttpStateData *>(data);
-    assert (fd == httpState->fd);
-    // assert(buf == readBuf->content());
-    PROF_start(HttpStateData_readReply);
-    httpState->readReply (len, flag, xerrno);
-    PROF_stop(HttpStateData_readReply);
-}
-*/
 /* XXX this function is too long! */
 void
 HttpStateData::readReply (const CommIoCbParams &io)
@@ -972,7 +952,9 @@
     int clen;
     int len = io.size;
 
-    assert(fd == io.fd);
+    PROF_start(HttpStateData_readReply);
+
+    Must(fd == io.fd);
 
     flags.do_next_read = 0;
    
@@ -981,11 +963,13 @@
     // Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us
     if (io.flag == COMM_ERR_CLOSING) {
         debugs(11, 3, "http socket closing");
+	PROF_stop(HttpStateData_readReply);
         return;
     }
 
     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
         maybeReadVirginBody();
+	PROF_stop(HttpStateData_readReply);
         return;
     }
 
@@ -1003,7 +987,8 @@
             flags.do_next_read = 0;
             comm_close(fd);
         }
-
+	
+	PROF_stop(HttpStateData_readReply);
         return;
     }
 
@@ -1044,6 +1029,7 @@
             /* Timeout NOT increased. This whitespace was from previous reply */
             flags.do_next_read = 1;
             maybeReadVirginBody();
+	    PROF_stop(HttpStateData_readReply);
             return;
         }
     }
@@ -1060,8 +1046,10 @@
         processReplyHeader();
         PROF_stop(HttpStateData_processReplyHeader);
 
-        if (!continueAfterParsingHeader()) // parsing error or need more data
+        if (!continueAfterParsingHeader()) { // parsing error or need more data
+	    PROF_stop(HttpStateData_readReply);
             return; // TODO: send errors to ICAP
+	}
 
         adaptOrFinalizeReply();
     }
@@ -1070,6 +1058,7 @@
     PROF_start(HttpStateData_processReplyBody);
     processReplyBody(); // may call serverComplete()
     PROF_stop(HttpStateData_processReplyBody);
+    PROF_stop(HttpStateData_readReply);
 }
 
 // Checks whether we can continue with processing the body or doing ICAP.
@@ -1107,12 +1096,12 @@
             error = ERR_INVALID_RESP;
         }
     } else {
-        assert(eof);
+        Must(eof);
         error = readBuf->hasContent() ?
             ERR_INVALID_RESP : ERR_ZERO_SIZE_OBJECT;
     }
 
-    assert(error != ERR_NONE);
+    Must(error != ERR_NONE);
     entry->reset();
     fwd->fail(errorCon(error, HTTP_BAD_GATEWAY, fwd->request));
     flags.do_next_read = 0;
@@ -1139,8 +1128,8 @@
     const char *data = NULL;
     int len;
     bool status = false;
-    assert(flags.chunked);
-    assert(httpChunkDecoder);
+    Must(flags.chunked);
+    Must(httpChunkDecoder);
     SQUID_ENTER_THROWING_CODE();
     MemBuf decodedData;
     decodedData.init();
@@ -1379,7 +1368,7 @@
     const HttpHeaderEntry *e;
     String strFwd;
     HttpHeaderPos pos = HttpHeaderInitPos;
-    assert (hdr_out->owner == hoRequest);
+    Must (hdr_out->owner == hoRequest);
     /* append our IMS header */
 
     if (request->lastmod > -1)
@@ -1547,7 +1536,7 @@
             httpHdrCcSetMaxAge(cc, getMaxAge(url));
 
             if (request->urlpath.size())
-                assert(strstr(url, request->urlpath.buf()));
+                Must(strstr(url, request->urlpath.buf()));
         }
 
         /* Set no-cache if determined needed but not found */
@@ -1789,7 +1778,7 @@
         Dialer dialer(this, &HttpStateData::sentRequestBody);
 	requestSender = asyncCall(11,5, "HttpStateData::sentRequestBody", dialer);
     } else {
-        assert(!requestBodySource);
+        Must(!requestBodySource);
 	typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
         Dialer dialer(this, &HttpStateData::sendComplete);
 	requestSender = asyncCall(11,5, "HttpStateData::SendComplete", dialer);
@@ -1904,7 +1893,7 @@
         return;
     }
 
-    assert(requestBodySource != NULL);
+    Must(requestBodySource != NULL);
 
     if (requestBodySource->buf().hasContent()) {
         // XXX: why does not this trigger a debug message on every request?
Index: squid3/src/http.h
diff -u squid3/src/http.h:1.30 squid3/src/http.h:1.28.4.5
--- squid3/src/http.h:1.30	Tue Feb 12 16:50:41 2008
+++ squid3/src/http.h	Thu Feb 14 13:55:08 2008
@@ -97,6 +97,8 @@
     virtual void closeServer(); // end communication with the server
     virtual bool doneWithServer() const; // did we end communication?
     virtual void abortTransaction(const char *reason); // abnormal termination
+    //AsyncJob virtual methods
+    virtual bool abortOnException() {return true;};
 
     // consuming request body
     virtual void handleMoreRequestBodyAvailable();
Index: squid3/src/main.cc
diff -u squid3/src/main.cc:1.100 squid3/src/main.cc:1.89.4.9
--- squid3/src/main.cc:1.100	Sat Apr 12 17:13:46 2008
+++ squid3/src/main.cc	Mon Apr 21 15:33:30 2008
@@ -150,6 +150,17 @@
     };
 };
 
+class XAssertsEngine : public AsyncEngine
+{
+
+public:
+    int checkEvents(int timeout)
+    {
+	TheAssertsPerStep = 0;
+        return EVENT_IDLE;
+    };
+};
+
 class SignalEngine: public AsyncEngine
 {
 
@@ -1371,6 +1382,9 @@
 
     mainLoop.registerEngine(&signalEngine);
 
+    XAssertsEngine xassertsEngine;
+    mainLoop.registerEngine(&xassertsEngine);
+
     /* TODO: stop requiring the singleton here */
     mainLoop.registerEngine(EventScheduler::GetInstance());
 
Index: squid3/src/stat.cc
diff -u squid3/src/stat.cc:1.49 squid3/src/stat.cc:1.44.4.6
--- squid3/src/stat.cc:1.49	Fri Mar 21 19:52:24 2008
+++ squid3/src/stat.cc	Mon Apr 21 15:33:37 2008
@@ -759,6 +759,9 @@
     storeAppendPrintf(sentry, "Allocation Size\t Alloc Count\t Alloc Delta\t Allocs/sec \n");
     malloc_statistics(info_get_mallstat, sentry);
 #endif
+    storeAppendPrintf(sentry, "Internal squid errors:\n");
+    storeAppendPrintf(sentry, "\tSalvaged assertions:   %6d\n", TheSalvagedAsserts);
+
 }
 
 static void
Index: squid3/src/structs.h
diff -u squid3/src/structs.h:1.132 squid3/src/structs.h:1.116.4.5
--- squid3/src/structs.h:1.132	Mon Apr 14 17:13:14 2008
+++ squid3/src/structs.h	Mon Apr 21 15:33:37 2008
@@ -629,6 +629,7 @@
 #endif
 
     char *accept_filter;
+    int assert_burst_max;
 
 #if USE_LOADABLE_MODULES
     wordlist *loadable_module_names;
Index: squid3/src/tunnel.cc
diff -u squid3/src/tunnel.cc:1.36 squid3/src/tunnel.cc:1.33.4.4
--- squid3/src/tunnel.cc:1.36	Sun Jan 20 01:50:58 2008
+++ squid3/src/tunnel.cc	Tue Mar 25 01:55:17 2008
@@ -46,8 +46,10 @@
 #include "client_side.h"
 #include "MemBuf.h"
 #include "http.h"
+#include "AsyncCall.h"
+#include "ICAP/AsyncJob.h"
 
-class TunnelStateData
+class TunnelStateData:  virtual public AsyncJob
 {
 
 public:
@@ -55,10 +57,12 @@
     class Connection;
     void *operator new(size_t);
     void operator delete (void *);
-    static void ReadClient(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data);
-    static void ReadServer(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data);
-    static void WriteClientDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data);
-    static void WriteServerDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data);
+
+    void *toCbdata() { return this; }
+    virtual bool doneAll() const {return false && AsyncJob::doneAll();}
+
+    TunnelStateData() : AsyncJob("TunnelStateData") {}
+    void close();
 
     bool noConnections() const;
     char *url;
@@ -103,78 +107,78 @@
 
     Connection client, server;
     int *status_ptr;		/* pointer to status for logging */
-    void copyRead(Connection &from, IOCB *completion);
-
+    void copyRead(Connection &from, AsyncCall::Pointer call);
 private:
     CBDATA_CLASS(TunnelStateData);
-    void copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, IOCB *);
-    void readServer(char *buf, size_t len, comm_err_t errcode, int xerrno);
-    void readClient(char *buf, size_t len, comm_err_t errcode, int xerrno);
-    void writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno);
-    void writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno);
+    void copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, AsyncCall::Pointer call);
+
+public:
+
+    void tunnelConnected(int fd);
+    void tunnelProxyConnected(int fd);
+
+// CommCalls
+    void readServer(const CommIoCbParams &io);
+    void readClient(const CommIoCbParams &io);
+    void writeClientDone(const CommIoCbParams &io);
+    void writeServerDone(const CommIoCbParams &io);    
+    void tunnelConnectedWriteDone(const CommIoCbParams &io);
+    void tunnelProxyConnectedWriteDone(const CommIoCbParams &io);
+    void tunnelServerClosed(const CommCloseCbParams &io);
+    void tunnelClientClosed(const CommCloseCbParams &io);
+    void tunnelTimeout(const CommTimeoutCbParams &params);
+    void tunnelConnectTimeout(const CommTimeoutCbParams &params);
+    void tunnelConnectDone(const CommConnectCbParams &params);
 };
 
 static const char *const conn_established = "HTTP/1.0 200 Connection established\r\n\r\n";
 
-static CNCB tunnelConnectDone;
 static ERCB tunnelErrorComplete;
-static PF tunnelServerClosed;
-static PF tunnelClientClosed;
-static PF tunnelTimeout;
 static PSC tunnelPeerSelectComplete;
-static void tunnelStateFree(TunnelStateData * tunnelState);
-static void tunnelConnected(int fd, void *);
-static void tunnelProxyConnected(int fd, void *);
 
-static void
-tunnelServerClosed(int fd, void *data)
+void
+TunnelStateData::tunnelServerClosed(const CommCloseCbParams &io)
 {
-    TunnelStateData *tunnelState = (TunnelStateData *)data;
-    debugs(26, 3, "tunnelServerClosed: FD " << fd);
-    assert(fd == tunnelState->server.fd());
-    tunnelState->server.fd(-1);
+    debugs(26, 3, "tunnelServerClosed: FD " << io.fd);
+    Must(io.fd == server.fd());
+    server.fd(-1);
 
-    if (tunnelState->noConnections()) {
-        tunnelStateFree(tunnelState);
+    if (noConnections()) {
+	close();
 	return;
     }
     
-    if (!tunnelState->server.len) {
-	comm_close(tunnelState->client.fd());
+    if (!server.len) {
+	comm_close(client.fd());
 	return;
     }
 }
 
-static void
-tunnelClientClosed(int fd, void *data)
+void
+TunnelStateData::tunnelClientClosed(const CommCloseCbParams &io)
 {
-    TunnelStateData *tunnelState = (TunnelStateData *)data;
-    debugs(26, 3, "tunnelClientClosed: FD " << fd);
-    assert(fd == tunnelState->client.fd());
-    tunnelState->client.fd(-1);
+    debugs(26, 3, "tunnelClientClosed: FD " << io.fd);
+    Must(io.fd == client.fd());
+    client.fd(-1);
 
-    if (tunnelState->noConnections()) {
-        tunnelStateFree(tunnelState);
+    if (noConnections()) {
+	close();
 	return;
     }
     
-    if (!tunnelState->client.len) {
-	comm_close(tunnelState->server.fd());
+    if (!client.len) {
+	comm_close(server.fd());
 	return;
     }
 }
 
-static void
-tunnelStateFree(TunnelStateData * tunnelState)
-{
-    debugs(26, 3, "tunnelStateFree: tunnelState=" << tunnelState);
-    assert(tunnelState != NULL);
-    assert(tunnelState->noConnections());
-    safe_free(tunnelState->url);
-    FwdState::serversFree(&tunnelState->servers);
-    tunnelState->host = NULL;
-    HTTPMSGUNLOCK(tunnelState->request);
-    delete tunnelState;
+void TunnelStateData::close() {
+    Must(noConnections());
+    safe_free(url);
+    FwdState::serversFree(&servers);
+    host = NULL;
+    HTTPMSGUNLOCK(request);
+    deleteThis("TunnelStateData::tunnelState");
 }
 
 TunnelStateData::Connection::~Connection()
@@ -221,35 +225,28 @@
 
 /* Read from server side and queue it for writing to the client */
 void
-TunnelStateData::ReadServer(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data)
-{
-    TunnelStateData *tunnelState = (TunnelStateData *)data;
-    assert (cbdataReferenceValid (tunnelState));
-
-    assert(fd == tunnelState->server.fd());
-    tunnelState->readServer(buf, len, errcode, xerrno);
-}
-
-void
-TunnelStateData::readServer(char *buf, size_t len, comm_err_t errcode, int xerrno)
+TunnelStateData::readServer(const CommIoCbParams &io)
 {
     /*
      * Bail out early on COMM_ERR_CLOSING
      * - close handlers will tidy up for us 
      */
 
-    if (errcode == COMM_ERR_CLOSING)
+    if (io.flag == COMM_ERR_CLOSING)
         return;
 
-    debugs(26, 3, "tunnelReadServer: FD " << server.fd() << ", read   " << len << " bytes");
+    debugs(26, 3, "tunnelReadServer: FD " << server.fd() << ", read   " << io.size << " bytes");
 
-    if (len > 0) {
-        server.bytesIn(len);
-        kb_incr(&statCounter.server.all.kbytes_in, len);
-        kb_incr(&statCounter.server.other.kbytes_in, len);
+    if (io.size > 0) {
+        server.bytesIn(io.size);
+        kb_incr(&statCounter.server.all.kbytes_in, io.size);
+        kb_incr(&statCounter.server.other.kbytes_in, io.size);
     }
 
-    copy (len, errcode, xerrno, server, client, WriteClientDone);
+    typedef CommCbMemFunT<TunnelStateData, CommIoCbParams> Dialer;
+    Dialer dialer(this, &TunnelStateData::writeClientDone);
+    AsyncCall::Pointer call = asyncCall(26, 5, "TunnelStateData::writeClientDone", dialer);    
+    copy (io.size, io.flag, io.xerrno, server, client, call);
 }
 
 void
@@ -270,38 +267,34 @@
 
 /* Read from client side and queue it for writing to the server */
 void
-TunnelStateData::ReadClient(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data)
+TunnelStateData::readClient(const CommIoCbParams &io)
 {
-    TunnelStateData *tunnelState = (TunnelStateData *)data;
-    assert (cbdataReferenceValid (tunnelState));
 
-    assert(fd == tunnelState->client.fd());
-    tunnelState->readClient(buf, len, errcode, xerrno);
-}
+    Must(io.fd == client.fd());
 
-void
-TunnelStateData::readClient(char *buf, size_t len, comm_err_t errcode, int xerrno)
-{
     /*
      * Bail out early on COMM_ERR_CLOSING
      * - close handlers will tidy up for us 
      */
 
-    if (errcode == COMM_ERR_CLOSING)
+    if (io.flag == COMM_ERR_CLOSING)
         return;
 
-    debugs(26, 3, "tunnelReadClient: FD " << client.fd() << ", read " << len << " bytes");
+    debugs(26, 3, "tunnelReadClient: FD " << client.fd() << ", read " << io.size << " bytes");
 
-    if (len > 0) {
-        client.bytesIn(len);
-        kb_incr(&statCounter.client_http.kbytes_in, len);
+    if (io.size > 0) {
+        client.bytesIn(io.size);
+        kb_incr(&statCounter.client_http.kbytes_in, io.size);
     }
 
-    copy (len, errcode, xerrno, client, server, WriteServerDone);
+    typedef CommCbMemFunT<TunnelStateData, CommIoCbParams> Dialer;
+    Dialer dialer(this, &TunnelStateData::writeServerDone);
+    AsyncCall::Pointer call = asyncCall(26, 5, "TunnelStateData::writeServerDone", dialer);    
+    copy (io.size, io.flag, io.xerrno, client, server, call);
 }
 
 void
-TunnelStateData::copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, IOCB *completion)
+TunnelStateData::copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, AsyncCall::Pointer completion)
 {
     /* I think this is to prevent free-while-in-a-callback behaviour
      * - RBC 20030229 
@@ -309,8 +302,12 @@
     cbdataInternalLock(this);	/* ??? should be locked by the caller... */
 
     /* Bump the server connection timeout on any activity */
-    if (server.fd() != -1)
-	commSetTimeout(server.fd(), Config.Timeout.read, tunnelTimeout, this);
+    if (server.fd() != -1) {
+	typedef CommCbMemFunT<TunnelStateData, CommTimeoutCbParams> TimeoutDialer;
+	AsyncCall::Pointer timeoutCall =  asyncCall(26, 5, "TunnelStateData::tunnelTimeout",
+                        TimeoutDialer(this, &TunnelStateData::tunnelTimeout));
+	commSetTimeout(server.fd(), Config.Timeout.read, timeoutCall);
+    }
 
     if (len < 0 || errcode)
         from.error (xerrno);
@@ -322,43 +319,35 @@
             comm_close(to.fd());
         }
     } else if (cbdataReferenceValid(this))
-        comm_write(to.fd(), from.buf, len, completion, this, NULL);
+        comm_write(to.fd(), from.buf, len, completion);
 
     cbdataInternalUnlock(this);	/* ??? */
 }
 
 /* Writes data from the client buffer to the server side */
 void
-TunnelStateData::WriteServerDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+TunnelStateData::writeServerDone(const CommIoCbParams &io)
 {
-    TunnelStateData *tunnelState = (TunnelStateData *)data;
-    assert (cbdataReferenceValid (tunnelState));
-
-    assert(fd == tunnelState->server.fd());
-    tunnelState->writeServerDone(buf, len, flag, xerrno);
-}
+    Must(io.fd == server.fd());
 
-void
-TunnelStateData::writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno)
-{
-    debugs(26, 3, "tunnelWriteServer: FD " << server.fd() << ", " << len << " bytes written");
+    debugs(26, 3, "tunnelWriteServer: FD " << server.fd() << ", " << io.size << " bytes written");
 
     /* Error? */
-    if (len < 0 || flag != COMM_OK) {
-        server.error(xerrno); // may call comm_close
+    if (io.size < 0 || io.flag != COMM_OK) {
+        server.error(io.xerrno); // may call comm_close
         return;
     }
 
     /* EOF? */
-    if (len == 0) {
+    if (io.size == 0) {
         comm_close(server.fd());
         return;
     }
 
     /* Valid data */
-    kb_incr(&statCounter.server.all.kbytes_out, len);
-    kb_incr(&statCounter.server.other.kbytes_out, len);
-    client.dataSent(len);
+    kb_incr(&statCounter.server.all.kbytes_out, io.size);
+    kb_incr(&statCounter.server.other.kbytes_out, io.size);
+    client.dataSent(io.size);
 
     /* If the other end has closed, so should we */
     if (client.fd() == -1) {
@@ -368,27 +357,21 @@
 
     cbdataInternalLock(this);	/* ??? should be locked by the caller... */
 
-    if (cbdataReferenceValid(this))
-        copyRead(client, ReadClient);
+    if (cbdataReferenceValid(this)) {
+	typedef CommCbMemFunT<TunnelStateData, CommIoCbParams> Dialer;
+	Dialer dialer(this, &TunnelStateData::readClient);
+	AsyncCall::Pointer call = asyncCall(26, 5, "TunnelStateData::readClient", dialer);
+	copyRead(client, call);
+    }
 
     cbdataInternalUnlock(this);	/* ??? */
 }
 
 /* Writes data from the server buffer to the client side */
 void
-TunnelStateData::WriteClientDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
-{
-    TunnelStateData *tunnelState = (TunnelStateData *)data;
-    assert (cbdataReferenceValid (tunnelState));
-
-    assert(fd == tunnelState->client.fd());
-    tunnelState->writeClientDone(buf, len, flag, xerrno);
-}
-
-void
 TunnelStateData::Connection::dataSent (size_t amount)
 {
-    assert(amount == (size_t)len);
+    Must(amount == (size_t)len);
     len =0;
     /* increment total object size */
 
@@ -397,25 +380,27 @@
 }
 
 void
-TunnelStateData::writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno)
+TunnelStateData::writeClientDone(const CommIoCbParams &io)
 {
-    debugs(26, 3, "tunnelWriteClient: FD " << client.fd() << ", " << len << " bytes written");
+    Must(io.fd == client.fd());
+
+    debugs(26, 3, "tunnelWriteClient: FD " << client.fd() << ", " << io.size << " bytes written");
 
     /* Error? */
-    if (len < 0 || flag != COMM_OK) {
-        client.error(xerrno); // may call comm_close
+    if (io.size < 0 || io.flag != COMM_OK) {
+        client.error(io.xerrno); // may call comm_close
         return;
     }
 
     /* EOF? */
-    if (len == 0) {
+    if (io.size == 0) {
         comm_close(client.fd());
         return;
     }
 
     /* Valid data */
-    kb_incr(&statCounter.client_http.kbytes_out, len);
-    server.dataSent(len);
+    kb_incr(&statCounter.client_http.kbytes_out, io.size);
+    server.dataSent(io.size);
 
     /* If the other end has closed, so should we */
     if (server.fd() == -1) {
@@ -423,25 +408,28 @@
         return;
     }
 
-    cbdataInternalLock(this);	/* ??? should be locked by the caller... */
+//    cbdataInternalLock(this);	/* ??? should be locked by the caller... */
 
-    if (cbdataReferenceValid(this))
-        copyRead(server, ReadServer);
+//    if (cbdataReferenceValid(this)){
+	typedef CommCbMemFunT<TunnelStateData, CommIoCbParams> Dialer;
+	Dialer dialer(this, &TunnelStateData::readServer);
+	AsyncCall::Pointer call = asyncCall(26, 5, "TunnelStateData::readServer", dialer);
+	copyRead(server, call);
+//    }
 
-    cbdataInternalUnlock(this);	/* ??? */
+//    cbdataInternalUnlock(this);	/* ??? */
 }
 
-static void
-tunnelTimeout(int fd, void *data)
+void
+TunnelStateData::tunnelTimeout(const CommTimeoutCbParams &params)
 {
-    TunnelStateData *tunnelState = (TunnelStateData *)data;
-    debugs(26, 3, "tunnelTimeout: FD " << fd);
+    debugs(26, 3, "tunnelTimeout: FD " << params.fd);
     /* Temporary lock to protect our own feets (comm_close -> tunnelClientClosed -> Free) */
-    cbdataInternalLock(tunnelState);
+//    cbdataInternalLock(tunnelState);
 
-    tunnelState->client.closeIfOpen();
-    tunnelState->server.closeIfOpen();
-    cbdataInternalUnlock(tunnelState);
+    client.closeIfOpen();
+    server.closeIfOpen();
+//    cbdataInternalUnlock(tunnelState);
 }
 
 void
@@ -452,79 +440,81 @@
 }
 
 void
-TunnelStateData::copyRead(Connection &from, IOCB *completion)
+TunnelStateData::copyRead(Connection &from, AsyncCall::Pointer call)
 {
-    assert(from.len == 0);
-    comm_read(from.fd(), from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), completion, this);
+    Must(from.len == 0);
+    comm_read(from.fd(), from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), call);
 }
 
-static void
-tunnelConnectTimeout(int fd, void *data)
+void
+TunnelStateData::tunnelConnectTimeout(const CommTimeoutCbParams &params)
 {
-    TunnelStateData *tunnelState = (TunnelStateData *)data;
-    HttpRequest *request = tunnelState->request;
     ErrorState *err = NULL;
 
-    if (tunnelState->servers->_peer)
-        hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code,
-                      tunnelState->servers->_peer->host);
+    if (servers->_peer)
+        hierarchyNote(&request->hier, servers->code,
+                      servers->_peer->host);
     else if (Config.onoff.log_ip_on_direct)
-        hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code,
-                      fd_table[tunnelState->server.fd()].ipaddr);
+        hierarchyNote(&request->hier, servers->code, fd_table[server.fd()].ipaddr);
     else
-        hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code,
-                      tunnelState->host);
+        hierarchyNote(&request->hier, servers->code, host);
 
 
     err = errorCon(ERR_CONNECT_FAIL, HTTP_SERVICE_UNAVAILABLE, request);
 
-    *tunnelState->status_ptr = HTTP_SERVICE_UNAVAILABLE;
+    *status_ptr = HTTP_SERVICE_UNAVAILABLE;
 
     err->xerrno = ETIMEDOUT;
 
-    err->port = tunnelState->port;
+    err->port = port;
 
     err->callback = tunnelErrorComplete;
 
-    err->callback_data = tunnelState;
+    err->callback_data = this;
 
-    errorSend(tunnelState->client.fd(), err);
-    comm_close(fd);
+    errorSend(client.fd(), err);
+    comm_close(params.fd);
 }
 
-static void
-tunnelConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, void *data)
+void
+TunnelStateData::tunnelConnectedWriteDone(const CommIoCbParams &io)
 {
-    TunnelStateData *tunnelState = (TunnelStateData *)data;
 
-    if (flag != COMM_OK) {
-        tunnelErrorComplete(fd, data, 0);
+    if (io.flag != COMM_OK) {
+        tunnelErrorComplete(io.fd, this, 0);
         return;
     }
 
-    if (cbdataReferenceValid(tunnelState)) {
-        tunnelState->copyRead(tunnelState->server, TunnelStateData::ReadServer);
-        tunnelState->copyRead(tunnelState->client, TunnelStateData::ReadClient);
-    }
+    typedef CommCbMemFunT<TunnelStateData, CommIoCbParams> Dialer;
+    Dialer readServerDialer(this, &TunnelStateData::readServer);
+    AsyncCall::Pointer readServerCall = asyncCall(26, 5, "TunnelStateData::readServer", readServerDialer);
+    copyRead(server, readServerCall);
+    
+    typedef CommCbMemFunT<TunnelStateData, CommIoCbParams> Dialer;
+    Dialer readClientDialer(this, &TunnelStateData::readClient);
+    AsyncCall::Pointer readClientCall = asyncCall(26, 5, "TunnelStateData::readClient", readClientDialer);
+    copyRead(client, readClientCall);
 }
 
 /*
  * handle the write completion from a proxy request to an upstream proxy
  */
-static void
-tunnelProxyConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, void *data)
+void
+TunnelStateData::tunnelProxyConnectedWriteDone(const CommIoCbParams &io)
 {
-    tunnelConnectedWriteDone(fd, buf, size, flag, xerrno, data);
+    tunnelConnectedWriteDone(io);
 }
 
-static void
-tunnelConnected(int fd, void *data)
+void
+TunnelStateData::tunnelConnected(int fd)
 {
-    TunnelStateData *tunnelState = (TunnelStateData *)data;
-    debugs(26, 3, "tunnelConnected: FD " << fd << " tunnelState=" << tunnelState);
-    *tunnelState->status_ptr = HTTP_OK;
-    comm_write(tunnelState->client.fd(), conn_established, strlen(conn_established),
-               tunnelConnectedWriteDone, tunnelState, NULL);
+    debugs(26, 3, "tunnelConnected: FD " << fd << " tunnelState=" << this);
+    *status_ptr = HTTP_OK;
+    
+    typedef CommCbMemFunT<TunnelStateData, CommIoCbParams> Dialer;
+    AsyncCall::Pointer call = asyncCall(26, 5, "TunnelStateData::tunnelConnectedWriteDone", 
+					Dialer(this, &TunnelStateData::tunnelConnectedWriteDone));
+    comm_write(client.fd(), conn_established, strlen(conn_established), call);
 }
 
 static void
@@ -545,50 +535,50 @@
 }
 
 
-static void
-tunnelConnectDone(int fdnotused, comm_err_t status, int xerrno, void *data)
+void
+TunnelStateData::tunnelConnectDone(const CommConnectCbParams &params)
 {
-    TunnelStateData *tunnelState = (TunnelStateData *)data;
-    HttpRequest *request = tunnelState->request;
     ErrorState *err = NULL;
 
-    if (tunnelState->servers->_peer)
-        hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code,
-                      tunnelState->servers->_peer->host);
+    if (servers->_peer)
+        hierarchyNote(&request->hier, servers->code,
+                      servers->_peer->host);
     else if (Config.onoff.log_ip_on_direct)
-        hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code,
-                      fd_table[tunnelState->server.fd()].ipaddr);
+        hierarchyNote(&request->hier, servers->code,
+                      fd_table[server.fd()].ipaddr);
     else
-        hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code,
-                      tunnelState->host);
+        hierarchyNote(&request->hier, servers->code,
+                      host);
 
-    if (status == COMM_ERR_DNS) {
-        debugs(26, 4, "tunnelConnect: Unknown host: " << tunnelState->host);
+    if (params.flag == COMM_ERR_DNS) {
+        debugs(26, 4, "tunnelConnect: Unknown host: " << host);
         err = errorCon(ERR_DNS_FAIL, HTTP_NOT_FOUND, request);
-        *tunnelState->status_ptr = HTTP_NOT_FOUND;
+        *status_ptr = HTTP_NOT_FOUND;
         err->dnsserver_msg = xstrdup(dns_error_message);
         err->callback = tunnelErrorComplete;
-        err->callback_data = tunnelState;
-        errorSend(tunnelState->client.fd(), err);
-    } else if (status != COMM_OK) {
+        err->callback_data = this;
+        errorSend(client.fd(), err);
+    } else if (params.flag != COMM_OK) {
         err = errorCon(ERR_CONNECT_FAIL, HTTP_SERVICE_UNAVAILABLE, request);
-        *tunnelState->status_ptr = HTTP_SERVICE_UNAVAILABLE;
-        err->xerrno = xerrno;
-        err->port = tunnelState->port;
+        *status_ptr = HTTP_SERVICE_UNAVAILABLE;
+        err->xerrno = params.xerrno;
+        err->port = port;
         err->callback = tunnelErrorComplete;
-        err->callback_data = tunnelState;
-        errorSend(tunnelState->client.fd(), err);
+        err->callback_data = this;
+        errorSend(client.fd(), err);
     } else {
-        if (tunnelState->servers->_peer)
-            tunnelProxyConnected(tunnelState->server.fd(), tunnelState);
+        if (servers->_peer)
+            tunnelProxyConnected(server.fd());
         else {
-            tunnelConnected(tunnelState->server.fd(), tunnelState);
+            tunnelConnected(server.fd());
         }
 
-        commSetTimeout(tunnelState->server.fd(),
+	typedef CommCbMemFunT<TunnelStateData, CommTimeoutCbParams> TimeoutDialer;
+	AsyncCall::Pointer timeoutCall =  asyncCall(26, 5, "TunnelStateData::tunnelTimeout",
+						    TimeoutDialer(this, &TunnelStateData::tunnelTimeout));
+        commSetTimeout(server.fd(),
                        Config.Timeout.read,
-                       tunnelTimeout,
-                       tunnelState);
+                       timeoutCall);
     }
 }
 
@@ -662,20 +652,30 @@
     tunnelState->status_ptr = status_ptr;
     tunnelState->client.fd(fd);
     tunnelState->server.fd(sock);
-    comm_add_close_handler(tunnelState->server.fd(),
-                           tunnelServerClosed,
-                           tunnelState);
-    comm_add_close_handler(tunnelState->client.fd(),
-                           tunnelClientClosed,
-                           tunnelState);
-    commSetTimeout(tunnelState->client.fd(),
-                   Config.Timeout.lifetime,
-                   tunnelTimeout,
-                   tunnelState);
+
+    typedef CommCbMemFunT<TunnelStateData, CommCloseCbParams> closeDialer;
+    
+    AsyncCall::Pointer serverCloseCall = asyncCall(26, 5, "TunnelStateData::tunnelServerClosed",
+						   closeDialer(tunnelState, &TunnelStateData::tunnelServerClosed));
+    comm_add_close_handler(tunnelState->server.fd(), serverCloseCall);
+
+    AsyncCall::Pointer clientCloseCall = asyncCall(26, 5, "TunnelStateData::tunnelClientClosed",
+						   closeDialer(tunnelState, &TunnelStateData::tunnelClientClosed));
+    comm_add_close_handler(tunnelState->client.fd(), clientCloseCall);
+
+    typedef CommCbMemFunT<TunnelStateData, CommTimeoutCbParams> TimeoutDialer;
+    AsyncCall::Pointer timeoutCall =  asyncCall(26, 5, "TunnelStateData::tunnelTimeout",
+						TimeoutDialer(tunnelState, &TunnelStateData::tunnelTimeout));
+    commSetTimeout(tunnelState->client.fd(), 
+		   Config.Timeout.lifetime, 
+		   timeoutCall);
+
+    AsyncCall::Pointer connectTimeoutCall =  asyncCall(26, 5, "TunnelStateData::tunnelConnectTimeout",
+						       TimeoutDialer(tunnelState, &TunnelStateData::tunnelConnectTimeout));
     commSetTimeout(tunnelState->server.fd(),
-                   Config.Timeout.connect,
-                   tunnelConnectTimeout,
-                   tunnelState);
+		   Config.Timeout.connect,
+                   connectTimeoutCall);
+
     peerSelect(request,
                NULL,
                tunnelPeerSelectComplete,
@@ -687,21 +687,20 @@
     commSetSelect(tunnelState->client.fd(), COMM_SELECT_READ, NULL, NULL, 0);
 }
 
-static void
-tunnelProxyConnected(int fd, void *data)
+void
+TunnelStateData::tunnelProxyConnected(int fd)
 {
-    TunnelStateData *tunnelState = (TunnelStateData *)data;
     HttpHeader hdr_out(hoRequest);
     Packer p;
     http_state_flags flags;
-    debugs(26, 3, "tunnelProxyConnected: FD " << fd << " tunnelState=" << tunnelState);
+    debugs(26, 3, "tunnelProxyConnected: FD " << fd << " tunnelState=" << this);
     memset(&flags, '\0', sizeof(flags));
-    flags.proxying = tunnelState->request->flags.proxying;
+    flags.proxying = request->flags.proxying;
     MemBuf mb;
     mb.init();
-    mb.Printf("CONNECT %s HTTP/1.0\r\n", tunnelState->url);
-    HttpStateData::httpBuildRequestHeader(tunnelState->request,
-                                          tunnelState->request,
+    mb.Printf("CONNECT %s HTTP/1.0\r\n", url);
+    HttpStateData::httpBuildRequestHeader(request,
+                                          request,
                                           NULL,			/* StoreEntry */
                                           &hdr_out,
                                           flags);			/* flags */
@@ -711,8 +710,16 @@
     packerClean(&p);
     mb.append("\r\n", 2);
 
-    comm_write_mbuf(tunnelState->server.fd(), &mb, tunnelProxyConnectedWriteDone, tunnelState);
-    commSetTimeout(tunnelState->server.fd(), Config.Timeout.read, tunnelTimeout, tunnelState);
+    typedef CommCbMemFunT<TunnelStateData, CommIoCbParams> WriteDialer;
+    AsyncCall::Pointer writeCall = asyncCall(26, 5, "TunnelStateData::tunnelProxyConnectedWriteDone", 
+					     WriteDialer(this, &TunnelStateData::tunnelProxyConnectedWriteDone));
+
+    comm_write_mbuf(server.fd(), &mb, writeCall);
+
+    typedef CommCbMemFunT<TunnelStateData, CommTimeoutCbParams> TimeoutDialer;
+    AsyncCall::Pointer timeoutCall =  asyncCall(26, 5, "TunnelStateData::tunnelTimeout",
+						TimeoutDialer(this, &TunnelStateData::tunnelTimeout));
+    commSetTimeout(server.fd(), Config.Timeout.read, timeoutCall);
 }
 
 static void
@@ -760,11 +767,14 @@
 
 #endif
 
+    typedef CommCbMemFunT<TunnelStateData, CommConnectCbParams> Dialer;
+    AsyncCall::Pointer call =  asyncCall(26, 5, "TunnelStateData::tunnelConnectDone",
+					 Dialer(tunnelState, &TunnelStateData::tunnelConnectDone));
+
     commConnectStart(tunnelState->server.fd(),
                      tunnelState->host,
                      tunnelState->port,
-                     tunnelConnectDone,
-                     tunnelState);
+                     call);
 }
 
 CBDATA_CLASS_INIT(TunnelStateData);
Index: squid3/src/ICAP/AsyncJob.cc
diff -u squid3/src/ICAP/AsyncJob.cc:1.6 squid3/src/ICAP/AsyncJob.cc:1.3.4.15
--- squid3/src/ICAP/AsyncJob.cc:1.6	Thu Apr 17 14:52:44 2008
+++ squid3/src/ICAP/AsyncJob.cc	Mon Apr 21 15:33:43 2008
@@ -119,6 +119,9 @@
 {
     // we must be called asynchronously and hence, the caller must lock us
     Must(cbdataReferenceValid(toCbdata()));
+    
+    if(abortOnException())
+	abort();
 
     mustStop("exception");
 }
@@ -210,12 +213,15 @@
     job->callStart(call);
 
     try {
+        WillCatchException(call.debugSection, 3, call.name);
         doDial();
+	WontCatchException();
 	}
     catch (const TextException &e) {
         debugs(call.debugSection, 3,
             HERE << call.name << " threw exception: " << e.message);
         job->callException(e);
+	WontCatchException();
     }
 
     job->callEnd(); // may delete job
Index: squid3/src/ICAP/AsyncJob.h
diff -u squid3/src/ICAP/AsyncJob.h:1.5 squid3/src/ICAP/AsyncJob.h:1.3.14.12
--- squid3/src/ICAP/AsyncJob.h:1.5	Tue Feb 26 13:50:57 2008
+++ squid3/src/ICAP/AsyncJob.h	Mon Apr 21 15:33:43 2008
@@ -65,6 +65,7 @@
     void callStart(AsyncCall &call);
     virtual void callException(const TextException &e);
     virtual void callEnd();
+    virtual bool abortOnException() {return false;};
 
 protected:
     const char *stopReason; // reason for forcing done() to be true
