--------------------- PatchSet 1604 Date: 2005/08/25 20:51:26 Author: dwsquid Branch: squid3-icap Tag: (none) Log: Added methods and code so that HTTP-side can stop reading from the network when ICAP-side buffers become full. This only works, however, if ICAP buffers are at least as large as the HTTP read buffer. Members: src/ICAPAnchor.cc:1.1.2.17->1.1.2.18 src/ICAPAnchor.h:1.1.2.5->1.1.2.6 src/ICAPClient.cc:1.1.2.4->1.1.2.5 src/ICAPClient.h:1.1.2.3->1.1.2.4 src/MsgPipe.cc:1.1.2.6->1.1.2.7 src/http.cc:1.49.2.15->1.49.2.16 src/http.h:1.11.4.6->1.11.4.7 Index: squid3/src/ICAPAnchor.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/ICAPAnchor.cc,v retrieving revision 1.1.2.17 retrieving revision 1.1.2.18 diff -u -r1.1.2.17 -r1.1.2.18 --- squid3/src/ICAPAnchor.cc 25 Aug 2005 16:39:02 -0000 1.1.2.17 +++ squid3/src/ICAPAnchor.cc 25 Aug 2005 20:51:26 -0000 1.1.2.18 @@ -48,11 +48,21 @@ void ICAPAnchor::sendMoreData(StoreIOBuffer buf) { debug(93,5)("ICAPAnchor::sendMoreData() called\n"); //buf.dump(); - virgin->data->body->append(buf.data, - XMIN(buf.length, (size_t)virgin->data->body->potentialSpaceSize())); + /* + * The caller is responsible for not giving us more data + * than will fit in body MemBuf. Caller should use + * potentialSpaceSize() to find out how much we can hold. + */ + virgin->data->body->append(buf.data, buf.length); virgin->sendSourceProgress(); } +int +ICAPAnchor::potentialSpaceSize() +{ + return (int) virgin->data->body->potentialSpaceSize(); +} + // HttpStateData says we have the entire HTTP message void ICAPAnchor::doneSending() { debug(93,5)("ICAPAnchor::doneSending() called\n"); @@ -77,8 +87,8 @@ // ICAP client needs more virgin response data void ICAPAnchor::noteSinkNeed(MsgPipe *p) { debug(93,5)("ICAPAnchor::noteSinkNeed() called\n"); - //1) tell HttpStateData to resume reading (in case it has stopped due to full buffers, etc.) - //2) check that HttpStateData is reading (or we will get stuck w/o progress) + if (virgin->data->body->potentialSpaceSize()) + httpState->icapSpaceAvailable(); } // ICAP client aborting Index: squid3/src/ICAPAnchor.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/ICAPAnchor.h,v retrieving revision 1.1.2.5 retrieving revision 1.1.2.6 diff -u -r1.1.2.5 -r1.1.2.6 --- squid3/src/ICAPAnchor.h 19 Aug 2005 21:58:38 -0000 1.1.2.5 +++ squid3/src/ICAPAnchor.h 25 Aug 2005 20:51:26 -0000 1.1.2.6 @@ -1,6 +1,6 @@ /* - * $Id: ICAPAnchor.h,v 1.1.2.5 2005/08/19 21:58:38 dwsquid Exp $ + * $Id: ICAPAnchor.h,v 1.1.2.6 2005/08/25 20:51:26 dwsquid Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -58,6 +58,7 @@ void sendMoreData(StoreIOBuffer buf); void doneSending(); void ownerAbort(); + int potentialSpaceSize(); /* how much data can we accept? */ // pipe source methods; called by ICAP while receiving the virgin message virtual void noteSinkNeed(MsgPipe *p); Index: squid3/src/ICAPClient.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/ICAPClient.cc,v retrieving revision 1.1.2.4 retrieving revision 1.1.2.5 diff -u -r1.1.2.4 -r1.1.2.5 --- squid3/src/ICAPClient.cc 24 Aug 2005 15:13:16 -0000 1.1.2.4 +++ squid3/src/ICAPClient.cc 25 Aug 2005 20:51:26 -0000 1.1.2.5 @@ -1,8 +1,16 @@ #include "squid.h" #include "ICAPXaction.h" #include "ICAPClient.h" +#include "http.h" void ICAPInitModule() { + /* + * ICAP's MsgPipe buffer needs to be at least as large + * as the HTTP read buffer. Otherwise HTTP may take + * data from the network that won't fit into the MsgPipe, + * which leads to a runtime assertion. + */ + assert(ICAPMsgPipeBufSizeMax >= SQUID_TCP_SO_RCVBUF); } void ICAPCleanModule() { Index: squid3/src/ICAPClient.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/ICAPClient.h,v retrieving revision 1.1.2.3 retrieving revision 1.1.2.4 diff -u -r1.1.2.3 -r1.1.2.4 --- squid3/src/ICAPClient.h 24 Aug 2005 18:59:05 -0000 1.1.2.3 +++ squid3/src/ICAPClient.h 25 Aug 2005 20:51:26 -0000 1.1.2.4 @@ -1,6 +1,6 @@ /* - * $Id: ICAPClient.h,v 1.1.2.3 2005/08/24 18:59:05 rousskov Exp $ + * $Id: ICAPClient.h,v 1.1.2.4 2005/08/25 20:51:26 dwsquid Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -47,6 +47,6 @@ extern void ICAPInitXaction(MsgPipe *virgin, MsgPipe *adapted); // recommended initial size and max capacity for MsgPipe buffer -enum { ICAPMsgPipeBufSizeMin = (4*1024), ICAPMsgPipeBufSizeMax = (8*1024) }; +enum { ICAPMsgPipeBufSizeMin = (4*1024), ICAPMsgPipeBufSizeMax = SQUID_TCP_SO_RCVBUF }; #endif /* SQUID_ICAPCLIENT_H */ Index: squid3/src/MsgPipe.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/MsgPipe.cc,v retrieving revision 1.1.2.6 retrieving revision 1.1.2.7 diff -u -r1.1.2.6 -r1.1.2.7 --- squid3/src/MsgPipe.cc 23 Aug 2005 23:17:53 -0000 1.1.2.6 +++ squid3/src/MsgPipe.cc 25 Aug 2005 20:51:26 -0000 1.1.2.7 @@ -78,7 +78,7 @@ const char *verb = future ? (res ? "will send " : "wont send ") : (res ? "sends " : "ignores "); - debugs(0,0, "MsgPipe " << name << "(" << this << ") " << + debugs(99,5, "MsgPipe " << name << "(" << this << ") " << verb << callName << " to the " << (destination ? destination->kind() : "destination") << "(" << destination << "); " << Index: squid3/src/http.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/http.cc,v retrieving revision 1.49.2.15 retrieving revision 1.49.2.16 diff -u -r1.49.2.15 -r1.49.2.16 --- squid3/src/http.cc 24 Aug 2005 23:34:23 -0000 1.49.2.15 +++ squid3/src/http.cc 25 Aug 2005 20:51:26 -0000 1.49.2.16 @@ -1,6 +1,6 @@ /* - * $Id: http.cc,v 1.49.2.15 2005/08/24 23:34:23 rousskov Exp $ + * $Id: http.cc,v 1.49.2.16 2005/08/25 20:51:26 dwsquid Exp $ * * DEBUG: section 11 Hypertext Transfer Protocol (HTTP) * AUTHOR: Harvest Derived @@ -747,7 +747,23 @@ */ #if ICAP_CLIENT - + /* + * One reason to start ICAP here is so that we know the size + * of the ICAP pipe buffer for the first HTTP reply read + * from the network. + */ + if (doIcap() < 0) { + /* + * XXX Maybe instead of an error page we should + * handle the reply normally (without ICAP). + */ + ErrorState *err = errorCon(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR); + err->xerrno = errno; + err->request = requestLink(orig_request); + errorAppendEntry(entry, err); + comm_close(fd); + return; + } icap->startRespMod(this, request, reply); #else @@ -1228,14 +1244,40 @@ void HttpStateData::maybeReadData() { + int read_sz = SQUID_TCP_SO_RCVBUF; +#if ICAP_CLIENT + if (icap) { + /* + * Our ICAP message pipes have a finite size limit. We + * should not read more data from the network than will fit + * into the pipe buffer. If totally full, don't register + * the read handler at all. The ICAP side will call our + * icapSpaceAvailable() method when it has free space again. + */ + int icap_space = icap->potentialSpaceSize(); + if (icap_space < read_sz) + read_sz = icap_space; + /* + * why <2? Because delayAwareRead() won't actually read if + * you ask it to read 1 byte. The delayed read request + * just gets re-queued until the client side drains, then + * the I/O thread hangs. Better to not register any read + * handler until we get a notification from ICAP that + * space is available. + */ + if (read_sz < 2) + return; + } +#endif if (flags.do_next_read) { flags.do_next_read = 0; - entry->delayAwareRead(fd, buf, SQUID_TCP_SO_RCVBUF, httpReadReply, this); + entry->delayAwareRead(fd, buf, read_sz, httpReadReply, this); } } -/* This will be called when request write is complete. Schedule read of - * reply. */ +/* + * This will be called when request write is complete. + */ void HttpStateData::SendComplete(int fd, char *bufnotused, size_t size, comm_err_t errflag, void *data) { @@ -1267,18 +1309,6 @@ return; } -#if ICAP_CLIENT - if (httpState->doIcap() < 0) { - err = errorCon(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR); - err->xerrno = errno; - err->request = requestLink(httpState->orig_request); - errorAppendEntry(entry, err); - comm_close(fd); - return; - } - -#endif - /* * Set the read timeout here because it hasn't been set yet. * We only set the read timeout after the request has been @@ -1741,9 +1771,9 @@ debug(11, 5) ("httpSendRequest: FD %d: httpState %p.\n", fd, httpState); - /* Schedule read reply. */ commSetTimeout(fd, Config.Timeout.lifetime, httpTimeout, httpState); - entry->delayAwareRead(fd, httpState->buf, SQUID_TCP_SO_RCVBUF, httpReadReply, httpState); + httpState->flags.do_next_read = 1; + httpState->maybeReadData(); if (httpState->orig_request->body_connection.getRaw() != NULL) sendHeaderDone = httpSendRequestEntity; @@ -1994,11 +2024,20 @@ return 0; } +/* + * Called by ICAPAnchor when it has space available for us. + */ +void +HttpStateData::icapSpaceAvailable() +{ + debug(11,5)("HttpStateData::icapSpaceAvailable() called\n"); + maybeReadData(); +} + void HttpStateData::takeAdaptedHeaders(HttpReply *rep) { debug(11,5)("HttpStateData::takeAdaptedHeaders() called\n"); - assert(entry->store_status == STORE_PENDING); if (!entry->isAccepting()) { debug(11,5)("\toops, entry is not Accepting!\n"); icap->ownerAbort(); @@ -2020,7 +2059,6 @@ debug(11,5)("HttpStateData::takeAdaptedBody() called\n"); debug(11,5)("\t%d bytes\n", buf->contentSize()); debug(11,5)("\t%d is current offset\n", (int)currentOffset); - assert(entry->store_status == STORE_PENDING); if (!entry->isAccepting()) { debug(11,5)("\toops, entry is not Accepting!\n"); Index: squid3/src/http.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/http.h,v retrieving revision 1.11.4.6 retrieving revision 1.11.4.7 diff -u -r1.11.4.6 -r1.11.4.7 --- squid3/src/http.h 24 Aug 2005 21:27:22 -0000 1.11.4.6 +++ squid3/src/http.h 25 Aug 2005 20:51:27 -0000 1.11.4.7 @@ -1,6 +1,6 @@ /* - * $Id: http.h,v 1.11.4.6 2005/08/24 21:27:22 dwsquid Exp $ + * $Id: http.h,v 1.11.4.7 2005/08/25 20:51:27 dwsquid Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -54,7 +54,8 @@ void takeAdaptedHeaders(HttpReply *); void takeAdaptedBody(MemBuf *); void doneAdapting(); - void HttpStateData::abortAdapting(); + void abortAdapting(); + void icapSpaceAvailable(); #endif StoreEntry *entry;