--------------------- PatchSet 5023 Date: 2007/07/18 10:21:18 Author: hno Branch: bug1921 Tag: (none) Log: First stabs at getting some kind of common buffering logics in the server state Adds two new protected ServerState methods, hiding most of the magics void setReply(HttpReply *); void addReplyBody(const char *buf, ssize_t len); and a support method to query how much data may be stuffed efficiently size_t replyBodySpace(size_t space = 4096 * 10); it's allowable to stuff more data into the reply body, up to some reasonable limit. Members: src/Server.cc:1.12->1.12.2.1 src/Server.h:1.5->1.5.2.1 src/ftp.cc:1.78->1.78.2.1 src/http.cc:1.115->1.115.2.1 src/http.h:1.25->1.25.2.1 Index: squid3/src/Server.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Server.cc,v retrieving revision 1.12 retrieving revision 1.12.2.1 diff -u -r1.12 -r1.12.2.1 --- squid3/src/Server.cc 29 Jun 2007 08:21:21 -0000 1.12 +++ squid3/src/Server.cc 18 Jul 2007 10:21:18 -0000 1.12.2.1 @@ -1,5 +1,5 @@ /* - * $Id: Server.cc,v 1.12 2007/06/29 08:21:21 amosjeffries Exp $ + * $Id: Server.cc,v 1.12.2.1 2007/07/18 10:21:18 hno Exp $ * * DEBUG: * AUTHOR: Duane Wessels @@ -41,6 +41,8 @@ #if ICAP_CLIENT #include "ICAP/ICAPModXact.h" +#include "ICAP/ICAPConfig.h" +extern ICAPConfig TheICAPConfig; #endif ServerStateData::ServerStateData(FwdState *theFwdState): requestSender(NULL) @@ -296,6 +298,11 @@ // default does nothing } +HttpRequest * +ServerStateData::originalRequest() +{ + return request; +} #if ICAP_CLIENT /* @@ -358,6 +365,9 @@ void ServerStateData::noteMoreBodySpaceAvailable(BodyPipe &) { + if (responseBodyBuffer) { + addReplyBody(NULL, 0); // Hack to kick the buffered fragment alive again + } maybeReadVirginBody(); } @@ -484,12 +494,6 @@ abortTransaction("ICAP failure"); } -HttpRequest * -ServerStateData::originalRequest() -{ - return request; -} - void ServerStateData::icapAclCheckDone(ICAPServiceRep::Pointer service) { @@ -524,4 +528,115 @@ processReplyBody(); } +void +ServerStateData::icapAclCheckDoneWrapper(ICAPServiceRep::Pointer service, void *data) +{ + ServerStateData *state = (ServerStateData *)data; + state->icapAclCheckDone(service); +} +#endif + +void +ServerStateData::setReply(HttpReply *reply) +{ + this->reply = reply; + +#if ICAP_CLIENT + + if (TheICAPConfig.onoff) { + ICAPAccessCheck *icap_access_check = + new ICAPAccessCheck(ICAP::methodRespmod, ICAP::pointPreCache, request, reply, icapAclCheckDoneWrapper, this); + + icapAccessCheckPending = true; + icap_access_check->check(); // will eventually delete self + return; + } + +#endif + + entry->replaceHttpReply(reply); + + haveParsedReplyHeaders(); +} + +void +ServerStateData::addReplyBody(const char *data, ssize_t len) +{ + +#if ICAP_CLIENT + + if (virginBodyDestination != NULL) { + if (responseBodyBuffer) { + responseBodyBuffer->append(data, len); + data = responseBodyBuffer->content(); + len = responseBodyBuffer->contentSize(); + } + + const size_t putSize = virginBodyDestination->putMoreData(data, len); + data += putSize; + len -= putSize; + if (responseBodyBuffer) { + responseBodyBuffer->consume(putSize); + if (responseBodyBuffer->contentSize() == 0) { + delete responseBodyBuffer; + responseBodyBuffer = NULL; + } + } else if (len > 0) { + if (!responseBodyBuffer) { + responseBodyBuffer = new MemBuf; + responseBodyBuffer->init(4096, SQUID_TCP_SO_RCVBUF * 10); + } + responseBodyBuffer->append(data, len); + } + return; + } + + // Even if we are done with sending the virgin body to ICAP, we may still + // be waiting for adapted headers. We need them before writing to store. + if (adaptedHeadSource != NULL) { + debugs(11,5, HERE << "need adapted head from " << adaptedHeadSource); + return; + } + #endif + + if (!len) + return; + + entry->write (StoreIOBuffer(len, currentOffset, (char*)data)); + + currentOffset += len; +} + +size_t ServerStateData::replyBodySpace(size_t space) +{ +#if ICAP_CLIENT + if (responseBodyBuffer) { + return 0; // Stop reading if already overflowed waiting for ICAP to catch up + } + + if (virginBodyDestination != NULL) { + /* + * BodyPipe buffer has a finite size limit. We + * should not read more data from the network than will fit + * into the pipe buffer or we _lose_ what did not fit if + * the response ends sooner that BodyPipe frees up space: + * There is no code to keep pumping data into the pipe once + * response ends and serverComplete() is called. + * + * If the pipe is totally full, don't register the read handler. + * The BodyPipe will call our noteMoreBodySpaceAvailable() method + * when it has free space again. + */ + size_t icap_space = virginBodyDestination->buf().potentialSpaceSize(); + + debugs(11,9, "ServerStateData may read up to min(" << icap_space << + ", " << space << ") bytes"); + + if (icap_space < space) + space = icap_space; + } +#endif + + return space; +} Index: squid3/src/Server.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Server.h,v retrieving revision 1.5 retrieving revision 1.5.2.1 diff -u -r1.5 -r1.5.2.1 --- squid3/src/Server.h 25 Jun 2007 22:52:03 -0000 1.5 +++ squid3/src/Server.h 18 Jul 2007 10:21:18 -0000 1.5.2.1 @@ -1,6 +1,6 @@ /* - * $Id: Server.h,v 1.5 2007/06/25 22:52:03 squidadm Exp $ + * $Id: Server.h,v 1.5.2.1 2007/07/18 10:21:18 hno Exp $ * * AUTHOR: Duane Wessels * @@ -85,11 +85,13 @@ // abnormal transaction termination; reason is for debugging only virtual void abortTransaction(const char *reason) = 0; -#if ICAP_CLIENT - void icapAclCheckDone(ICAPServiceRep::Pointer); // a hack to reach HttpStateData::orignal_request virtual HttpRequest *originalRequest(); +#if ICAP_CLIENT + void icapAclCheckDone(ICAPServiceRep::Pointer); + static void icapAclCheckDoneWrapper(ICAPServiceRep::Pointer service, void *data); + // ICAPInitiator: start an ICAP transaction and receive adapted headers. virtual void noteIcapAnswer(HttpMsg *message); virtual void noteIcapQueryAbort(bool final); @@ -140,6 +142,20 @@ void handleIcapAborted(bool bypassable = false); #endif +protected: + // Kids use these to stuff data into the response instead of messing with the entry directly + void setReply(HttpReply *); + void addReplyBody(const char *buf, ssize_t len); + size_t replyBodySpace(size_t space = 4096 * 10); + + // And kids must customize these for flow control + virtual void pauseReceiver() = 0; + virtual void resumeReceiver() = 0; + + // These should be private + off_t currentOffset; // Our current offset in the StoreEntry + MemBuf *responseBodyBuffer; // Data temporarily buffered for ICAP + public: // should not be StoreEntry *entry; FwdState::Pointer fwd; Index: squid3/src/ftp.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/ftp.cc,v retrieving revision 1.78 retrieving revision 1.78.2.1 diff -u -r1.78 -r1.78.2.1 --- squid3/src/ftp.cc 28 Jun 2007 14:52:11 -0000 1.78 +++ squid3/src/ftp.cc 18 Jul 2007 10:21:19 -0000 1.78.2.1 @@ -1,6 +1,6 @@ /* - * $Id: ftp.cc,v 1.78 2007/06/28 14:52:11 squidadm Exp $ + * $Id: ftp.cc,v 1.78.2.1 2007/07/18 10:21:19 hno Exp $ * * DEBUG: section 9 File Transfer Protocol (FTP) * AUTHOR: Harvest Derived @@ -59,7 +59,6 @@ #include "ICAP/ICAPConfig.h" #include "ICAP/ICAPModXact.h" extern ICAPConfig TheICAPConfig; -static void icapAclCheckDoneWrapper(ICAPServiceRep::Pointer service, void *data); #endif static const char *const crlf = "\r\n"; @@ -231,6 +230,10 @@ private: // BodyConsumer for HTTP: consume request body. virtual void handleRequestBodyProducerAborted(); + +protected: + virtual void pauseReceiver(); + virtual void resumeReceiver(); }; CBDATA_CLASS_INIT(FtpStateData); @@ -3232,25 +3235,7 @@ if (mime_enc) reply->header.putStr(HDR_CONTENT_ENCODING, mime_enc); -#if ICAP_CLIENT - - if (TheICAPConfig.onoff) { - ICAPAccessCheck *icap_access_check = new ICAPAccessCheck(ICAP::methodRespmod, - ICAP::pointPreCache, - request, - reply, - icapAclCheckDoneWrapper, - this); - - icapAccessCheckPending = true; - icap_access_check->check(); // will eventually delete self - return; - } - -#endif - - e->replaceHttpReply(reply); - haveParsedReplyHeaders(); + setReply(reply); } void @@ -3341,25 +3326,8 @@ void FtpStateData::writeReplyBody(const char *data, int len) { -#if ICAP_CLIENT - if (virginBodyDestination != NULL) { - debugs(9,5,HERE << "writing " << len << " bytes to ICAP"); - const size_t putSize = virginBodyDestination->putMoreData(data, len); - if (putSize != (size_t)len) { - // XXX: FTP writing should be rewritten to avoid temporary buffers - // because temporary buffers cannot handle overflows. - debugs(0,0,HERE << "ICAP cannot keep up with FTP; lost " << - (len - putSize) << '/' << len << " bytes."); - } - return; - } -#endif - - debugs(9,5,HERE << "writing " << len << " bytes to StoreEntry"); - - //debugs(9,5,HERE << data); - - entry->append(data, len); + debugs(9,5,HERE << "writing " << len << " bytes to the reply"); + addReplyBody(data, len); } // called after we wrote the last byte of the request body @@ -3430,13 +3398,13 @@ delete this; } -#if ICAP_CLIENT +void +FtpStateData::pauseReceiver() +{ +} -static void -icapAclCheckDoneWrapper(ICAPServiceRep::Pointer service, void *data) +void +FtpStateData::resumeReceiver() { - FtpStateData *ftpState = (FtpStateData *)data; - ftpState->icapAclCheckDone(service); } -#endif Index: squid3/src/http.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/http.cc,v retrieving revision 1.115 retrieving revision 1.115.2.1 diff -u -r1.115 -r1.115.2.1 --- squid3/src/http.cc 26 Jun 2007 00:51:06 -0000 1.115 +++ squid3/src/http.cc 18 Jul 2007 10:21:19 -0000 1.115.2.1 @@ -1,6 +1,6 @@ /* - * $Id: http.cc,v 1.115 2007/06/26 00:51:06 squidadm Exp $ + * $Id: http.cc,v 1.115.2.1 2007/07/18 10:21:19 hno Exp $ * * DEBUG: section 11 Hypertext Transfer Protocol (HTTP) * AUTHOR: Harvest Derived @@ -55,10 +55,6 @@ #if DELAY_POOLS #include "DelayPools.h" #endif -#if ICAP_CLIENT -#include "ICAP/ICAPConfig.h" -extern ICAPConfig TheICAPConfig; -#endif #include "SquidTime.h" CBDATA_CLASS_INIT(HttpStateData); @@ -70,9 +66,6 @@ static void httpMaybeRemovePublic(StoreEntry *, http_status); static void copyOneHeaderFromClientsideRequestToUpstreamRequest(const HttpHeaderEntry *e, String strConnection, HttpRequest * request, HttpRequest * orig_request, HttpHeader * hdr_out, int we_do_ranges, http_state_flags); -#if ICAP_CLIENT -static void icapAclCheckDoneWrapper(ICAPServiceRep::Pointer service, void *data); -#endif HttpStateData::HttpStateData(FwdState *theFwdState) : ServerStateData(theFwdState), header_bytes_read(0), reply_bytes_read(0) @@ -761,25 +754,10 @@ * Parse the header and remove all referenced headers */ -#if ICAP_CLIENT - - if (TheICAPConfig.onoff) { - ICAPAccessCheck *icap_access_check = - new ICAPAccessCheck(ICAP::methodRespmod, ICAP::pointPreCache, request, reply, icapAclCheckDoneWrapper, this); - - icapAccessCheckPending = true; - icap_access_check->check(); // will eventually delete self - ctx_exit(ctx); - return; - } - -#endif - - entry->replaceHttpReply(reply); - - haveParsedReplyHeaders(); + setReply(reply); ctx_exit(ctx); + } // Called when we parsed (and possibly adapted) the headers but @@ -1118,28 +1096,9 @@ const char *data = readBuf->content(); int len = readBuf->contentSize(); -#if ICAP_CLIENT - - if (virginBodyDestination != NULL) { - const size_t putSize = virginBodyDestination->putMoreData(data, len); - readBuf->consume(putSize); - return; - } - - // Even if we are done with sending the virgin body to ICAP, we may still - // be waiting for adapted headers. We need them before writing to store. - if (adaptedHeadSource != NULL) { - debugs(11,5, HERE << "need adapted head from " << adaptedHeadSource); - return; - } - -#endif - - entry->write (StoreIOBuffer(len, currentOffset, (char*)data)); - + addReplyBody(data, len); readBuf->consume(len); - currentOffset += len; } /* @@ -1238,31 +1197,7 @@ void HttpStateData::maybeReadVirginBody() { - int read_sz = readBuf->spaceSize(); - -#if ICAP_CLIENT - if (virginBodyDestination != NULL) { - /* - * BodyPipe buffer has a finite size limit. We - * should not read more data from the network than will fit - * into the pipe buffer or we _lose_ what did not fit if - * the response ends sooner that BodyPipe frees up space: - * There is no code to keep pumping data into the pipe once - * response ends and serverComplete() is called. - * - * If the pipe is totally full, don't register the read handler. - * The BodyPipe will call our noteMoreBodySpaceAvailable() method - * when it has free space again. - */ - int icap_space = virginBodyDestination->buf().potentialSpaceSize(); - - debugs(11,9, "HttpStateData may read up to min(" << icap_space << - ", " << read_sz << ") bytes"); - - if (icap_space < read_sz) - read_sz = icap_space; - } -#endif + int read_sz = replyBodySpace(readBuf->spaceSize()); debugs(11,9, HERE << (flags.do_next_read ? "may" : "wont") << " read up to " << read_sz << " bytes from FD " << fd); @@ -1943,20 +1878,19 @@ version->minor = minor; } -#if ICAP_CLIENT +HttpRequest * +HttpStateData::originalRequest() +{ + return orig_request; +} -static void -icapAclCheckDoneWrapper(ICAPServiceRep::Pointer service, void *data) +void +HttpStateData::pauseReceiver() { - HttpStateData *http = (HttpStateData *)data; - http->icapAclCheckDone(service); } -// TODO: why does FtpStateData not need orig_request? -HttpRequest * -HttpStateData::originalRequest() +void +HttpStateData::resumeReceiver() { - return orig_request; } -#endif Index: squid3/src/http.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/http.h,v retrieving revision 1.25 retrieving revision 1.25.2.1 diff -u -r1.25 -r1.25.2.1 --- squid3/src/http.h 25 Jun 2007 22:52:04 -0000 1.25 +++ squid3/src/http.h 18 Jul 2007 10:21:20 -0000 1.25.2.1 @@ -1,6 +1,6 @@ /* - * $Id: http.h,v 1.25 2007/06/25 22:52:04 squidadm Exp $ + * $Id: http.h,v 1.25.2.1 2007/07/18 10:21:20 hno Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -74,7 +74,6 @@ HttpRequest *orig_request; int fd; http_state_flags flags; - off_t currentOffset; size_t read_sz; int header_bytes_read; // to find end of response, int reply_bytes_read; // without relying on StoreEntry @@ -94,6 +93,8 @@ #if ICAP_CLIENT virtual HttpRequest *originalRequest(); #endif + virtual void pauseReceiver(); + virtual void resumeReceiver(); private: enum ConnectionStatus {