--------------------- PatchSet 4045 Date: 2007/02/14 06:33:45 Author: rousskov Branch: squid3-icap Tag: (none) Log: - WARNING: The changes below are significant and probably introduce new bugs. The code passed some Web Polygraph and manual surfing tests, with and without ICAP. Ftp-specific changes are untested, but the old code probably did not work well with ICAP either. More testing is needed. - Moved more common server-side code from http.* and ftp.* into Server.*. Most ICAP-related code is in the Server class now. The code move to the Server class and migration to BodyPipe exposed several FTP/HTTP inconsistencies and bugs. I marked those I could not fix with XXXs. - Support the new ICAPInitiator API and talk to ICAPModXact directly instead of using ICAPClient* classes, which are now gone. See the last ICAPClient* CVS log message for the rationale. - Use BodyPipe for delivering virgin reply bodies to ICAP and receiving adapted reply bodies from ICAP. Implement the BodyProducer interface. Use BodyPipe instead of BodyReader when receiving request bodies (from client side or ICAP). Implement the BodyConsumer interface. See the first BodyPipe CVS log message for the rationale. - Distinguish the end of communication with the origin server from the end of communication with ICAP. Clean them up separately when possible. Terminate when both are completed (or aborted). - Polished persistentConnStatus() to avoid calling statusIfComplete() until really necessary (and appropriate). This makes debugging easier to understand for some. Members: src/Server.cc:1.4.2.4->1.4.2.5 src/Server.h:1.1.12.4->1.1.12.5 src/ftp.cc:1.29.2.11->1.29.2.12 src/http.cc:1.49.2.60->1.49.2.61 src/http.h:1.11.4.19->1.11.4.20 Index: squid3/src/Server.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Server.cc,v retrieving revision 1.4.2.4 retrieving revision 1.4.2.5 diff -u -r1.4.2.4 -r1.4.2.5 --- squid3/src/Server.cc 26 Oct 2006 05:38:52 -0000 1.4.2.4 +++ squid3/src/Server.cc 14 Feb 2007 06:33:46 -0000 1.4.2.5 @@ -1,5 +1,5 @@ /* - * $Id: Server.cc,v 1.4.2.4 2006/10/26 05:38:52 rousskov Exp $ + * $Id: Server.cc,v 1.4.2.5 2007/02/14 06:33:46 rousskov Exp $ * * DEBUG: * AUTHOR: Duane Wessels @@ -37,18 +37,16 @@ #include "Store.h" #include "HttpRequest.h" #include "HttpReply.h" -#if ICAP_CLIENT -#include "ICAP/ICAPClientRespmodPrecache.h" -#endif +#include "errorpage.h" + -ServerStateData::ServerStateData(FwdState *theFwdState) +ServerStateData::ServerStateData(FwdState *theFwdState): requestSender(NULL) { fwd = theFwdState; entry = fwd->entry; - entry->lock() + entry->lock(); - ; request = HTTPMSGLOCK(fwd->request); } @@ -61,15 +59,238 @@ fwd = NULL; // refcounted + if (requestBodySource != NULL) + requestBodySource->clearConsumer(); + +#if ICAP_CLIENT + cleanIcap(); +#endif +} + +// called when no more server communication is expected; may quit +void +ServerStateData::serverComplete() +{ + debugs(11,5,HERE << "serverComplete " << this); + + if (!doneWithServer()) { + closeServer(); + assert(doneWithServer()); + } + + if (requestBodySource != NULL) + stopConsumingFrom(requestBodySource); + +#if ICAP_CLIENT + if (virginBodyDestination != NULL) + stopProducingFor(virginBodyDestination, true); + + if (!doneWithIcap()) + return; +#endif + + completeForwarding(); + quitIfAllDone(); +} + +// When we are done talking to the primary server, we may be still talking +// to the ICAP service. And vice versa. Here, we quit only if we are done +// talking to both. +void ServerStateData::quitIfAllDone() { +#if ICAP_CLIENT + if (!doneWithIcap()) { + debugs(11,5, HERE << "transaction not done: still talking to ICAP"); + return; + } +#endif + + if (!doneWithServer()) { + debugs(11,5, HERE << "transaction not done: still talking to server"); + return; + } + + debugs(11,3, HERE << "transaction done"); + delete this; +} + +// FTP side overloads this to work around multiple calls to fwd->complete +void +ServerStateData::completeForwarding() { + debugs(11,5, HERE << "completing forwarding for " << fwd); + assert(fwd != NULL); + fwd->complete(); +} + +// Entry-dependent callbacks use this check to quit if the entry went bad +bool +ServerStateData::abortOnBadEntry(const char *abortReason) +{ + if (entry->isAccepting()) + return false; + + debugs(11,5, HERE << "entry is not Accepting!"); + abortTransaction(abortReason); + return true; +} + +// more request or adapted response body is available +void +ServerStateData::noteMoreBodyDataAvailable(BodyPipe &bp) +{ +#if ICAP_CLIENT + if (adaptedBodySource == &bp) { + handleMoreAdaptedBodyAvailable(); + return; + } +#endif + handleMoreRequestBodyAvailable(); +} + +// the entire request or adapted response body was provided, successfully +void +ServerStateData::noteBodyProductionEnded(BodyPipe &bp) +{ #if ICAP_CLIENT - if (icap) { - debug(11,5)("ServerStateData destroying icap=%p\n", icap); - icap->ownerAbort(); - delete icap; + if (adaptedBodySource == &bp) { + handleAdaptedBodyProductionEnded(); + return; } #endif + handleRequestBodyProductionEnded(); +} + +// premature end of the request or adapted response body production +void +ServerStateData::noteBodyProducerAborted(BodyPipe &bp) +{ +#if ICAP_CLIENT + if (adaptedBodySource == &bp) { + handleAdaptedBodyProducerAborted(); + return; + } +#endif + handleRequestBodyProducerAborted(); +} + + +// more origin request body data is available +void +ServerStateData::handleMoreRequestBodyAvailable() +{ + if (!requestSender) + sendMoreRequestBody(); + else + debugs(9,3, HERE << "waiting for request body write to complete"); +} + +// there will be no more handleMoreRequestBodyAvailable calls +void +ServerStateData::handleRequestBodyProductionEnded() +{ + if (!requestSender) + doneSendingRequestBody(); + else + debugs(9,3, HERE << "waiting for request body write to complete"); +} + +// called when we are done sending request body; kids extend this +void +ServerStateData::doneSendingRequestBody() { + debugs(9,3, HERE << "done sending request body"); + assert(requestBodySource != NULL); + stopConsumingFrom(requestBodySource); + + // kids extend this +} + +// called when body producers aborts; kids extend this +void +ServerStateData::handleRequestBodyProducerAborted() +{ + if (requestSender != NULL) + debugs(9,3, HERE << "fyi: request body aborted while we were sending"); + + stopConsumingFrom(requestBodySource); // requestSender, if any, will notice + + // kids extend this +} + +void +ServerStateData::sentRequestBodyWrapper(int fd, char *bufnotused, size_t size, comm_err_t errflag, int xerrno, void *data) +{ + ServerStateData *server = static_cast(data); + server->sentRequestBody(fd, size, errflag); +} + +// called when we wrote request headers(!) or a part of the body +void +ServerStateData::sentRequestBody(int fd, size_t size, comm_err_t errflag) +{ + debug(11, 5) ("sentRequestBody: FD %d: size %d: errflag %d.\n", + fd, (int) size, errflag); + debugs(32,3,HERE << "sentRequestBody called"); + + requestSender = NULL; + + if (size > 0) { + fd_bytes(fd, size, FD_WRITE); + kb_incr(&statCounter.server.all.kbytes_out, size); + // kids should increment their counters + } + + if (errflag == COMM_ERR_CLOSING) + return; + + if (!requestBodySource) { + debugs(9,3, HERE << "detected while-we-were-sending abort"); + return; // do nothing; + } + + if (errflag) { + debug(11, 1) ("sentRequestBody error: FD %d: %s\n", fd, xstrerr(errno)); + ErrorState *err; + err = errorCon(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request); + err->xerrno = errno; + fwd->fail(err); + abortTransaction("I/O error while sending request body"); + return; + } + + if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { + abortTransaction("store entry aborted while sending request body"); + return; + } + + if (requestBodySource->exhausted()) + doneSendingRequestBody(); + else + sendMoreRequestBody(); } +void +ServerStateData::sendMoreRequestBody() +{ + assert(requestBodySource != NULL); + assert(!requestSender); + MemBuf buf; + if (requestBodySource->getMoreData(buf)) { + debugs(9,3, HERE << "will write " << buf.contentSize() << " request body bytes"); + requestSender = &ServerStateData::sentRequestBodyWrapper; + comm_write_mbuf(dataDescriptor(), &buf, requestSender, this); + } else { + debugs(9,3, HERE << "will wait for more request body bytes or eof"); + requestSender = NULL; + } +} + +// called by noteIcapHeadersAdapted(), HTTP server overwrites this +void +ServerStateData::haveParsedReplyHeaders() +{ + // default does nothing +} + + #if ICAP_CLIENT /* * Initiate an ICAP transaction. Return true on success. @@ -77,7 +298,7 @@ * or take other action. */ bool -ServerStateData::startIcap(ICAPServiceRep::Pointer service) +ServerStateData::startIcap(ICAPServiceRep::Pointer service, HttpRequest *cause) { debug(11,5)("ServerStateData::startIcap() called\n"); if (!service) { @@ -88,9 +309,177 @@ debug(11,3)("ServerStateData::startIcap fails: broken service\n"); return false; } - assert(NULL == icap); - icap = new ICAPClientRespmodPrecache(service); + + // check whether we should be sending a body as well + assert(!virginBodyDestination); + assert(!reply->body_pipe); + // start body pipe to feed ICAP transaction if needed + ssize_t size = 0; + if (reply->expectingBody(cause->method, size) && size) { + virginBodyDestination = new BodyPipe(this); + reply->body_pipe = virginBodyDestination; + debugs(93, 6, HERE << "will send virgin reply body to " << + virginBodyDestination << "; size: " << size); + } + + adaptedHeadSource = new ICAPModXact(this, reply, cause, service); + ICAPModXact::AsyncStart(adaptedHeadSource.getRaw()); return true; } +// properly cleans up ICAP-related state +// may be called multiple times +void ServerStateData::cleanIcap() { + debugs(11,5, HERE << "cleaning ICAP"); + + if (virginBodyDestination != NULL) + stopProducingFor(virginBodyDestination, false); + + if (adaptedHeadSource != NULL) { + AsyncCall(11,5, adaptedHeadSource.getRaw(), ICAPModXact::noteInitiatorAborted); + adaptedHeadSource = NULL; + } + + if (adaptedBodySource != NULL) + stopConsumingFrom(adaptedBodySource); + + assert(doneWithIcap()); // make sure the two methods are in sync +} + +bool +ServerStateData::doneWithIcap() const { + return !virginBodyDestination && !adaptedHeadSource && !adaptedBodySource; +} + +// can supply more virgin response body data +void +ServerStateData::noteMoreBodySpaceAvailable(BodyPipe &) +{ + maybeReadVirginBody(); +} + +// the consumer of our virgin response body aborted, we should too +void +ServerStateData::noteBodyConsumerAborted(BodyPipe &bp) +{ + stopProducingFor(virginBodyDestination, false); + handleIcapAborted(); +} + +// received adapted response headers (body may follow) +void +ServerStateData::noteIcapHeadersAdapted() +{ + // extract and lock reply before (adaptedHeadSource = NULL) can destroy it + HttpReply *rep = dynamic_cast(adaptedHeadSource->adapted.header); + HTTPMSGLOCK(rep); + adaptedHeadSource = NULL; // we do not expect any more messages from it + + if (abortOnBadEntry("entry went bad while waiting for adapted headers")) { + HTTPMSGUNLOCK(rep); // hopefully still safe, even if "this" is deleted + return; + } + + assert(rep); + entry->replaceHttpReply(rep); + HTTPMSGUNLOCK(reply); + + reply = rep; // already HTTPMSGLOCKed above + + haveParsedReplyHeaders(); + + assert(!adaptedBodySource); + if (reply->body_pipe != NULL) { + // subscribe to receive adapted body + adaptedBodySource = reply->body_pipe; + adaptedBodySource->setConsumer(this); + } else { + // no body + handleIcapCompleted(); + } + +} + +// will not receive adapted response headers (and, hence, body) +void +ServerStateData::noteIcapHeadersAborted() +{ + adaptedHeadSource = NULL; + handleIcapAborted(); +} + +// more adapted response body is available +void +ServerStateData::handleMoreAdaptedBodyAvailable() +{ + const size_t contentSize = adaptedBodySource->buf().contentSize(); + + debugs(11,5, HERE << "consuming " << contentSize << " bytes of adapted " << + "response body at offset " << adaptedBodySource->consumedSize()); + + if (abortOnBadEntry("entry refuses adapted body")) + return; + + assert(entry); + BodyPipeCheckout bpc(*adaptedBodySource); + const StoreIOBuffer ioBuf(&bpc.buf, bpc.offset); + entry->write(ioBuf); + bpc.buf.consume(contentSize); + bpc.checkIn(); +} + +// the entire adapted response body was produced, successfully +void +ServerStateData::handleAdaptedBodyProductionEnded() +{ + stopConsumingFrom(adaptedBodySource); + + if (abortOnBadEntry("entry went bad while waiting for adapted body eof")) + return; + + handleIcapCompleted(); +} + +// premature end of the adapted response body +void ServerStateData::handleAdaptedBodyProducerAborted() +{ + stopConsumingFrom(adaptedBodySource); + handleIcapAborted(); +} + +// common part of noteIcapHeadersAdapted and handleAdaptedBodyProductionEnded +void +ServerStateData::handleIcapCompleted() +{ + debugs(11,5, HERE << "handleIcapCompleted"); + cleanIcap(); + completeForwarding(); + quitIfAllDone(); +} + +// common part of noteIcap*Aborted and noteBodyConsumerAborted methods +void +ServerStateData::handleIcapAborted() +{ + debugs(11,5, HERE << "handleIcapAborted; entry empty: " << entry->isEmpty()); + + if (abortOnBadEntry("entry went bad while ICAP aborted")) + return; + + if (entry->isEmpty()) { + debugs(11,9, HERE << "creating ICAP error entry after ICAP failure"); + ErrorState *err = + errorCon(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR, request); + err->xerrno = errno; + fwd->fail(err); + fwd->dontRetry(true); + } + + debugs(11,5, HERE << "bailing after ICAP failure"); + + cleanIcap(); + closeServer(); + quitIfAllDone(); +} + #endif Index: squid3/src/Server.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Server.h,v retrieving revision 1.1.12.4 retrieving revision 1.1.12.5 diff -u -r1.1.12.4 -r1.1.12.5 --- squid3/src/Server.h 26 Oct 2006 05:38:57 -0000 1.1.12.4 +++ squid3/src/Server.h 14 Feb 2007 06:33:46 -0000 1.1.12.5 @@ -1,6 +1,6 @@ /* - * $Id: Server.h,v 1.1.12.4 2006/10/26 05:38:57 rousskov Exp $ + * $Id: Server.h,v 1.1.12.5 2007/02/14 06:33:46 rousskov Exp $ * * AUTHOR: Duane Wessels * @@ -33,55 +33,132 @@ */ /* - * ServerStateData is a class for common elements of Server-side modules - * such as http.cc and ftp.cc. It was invented to make ICAP code simpler. + * ServerStateData is a common base for server-side classes such as + * HttpStateData and FtpStateData. All such classes must be able to + * consume request bodies from the client-side or ICAP producer, adapt + * virgin responses using ICAP, and provide the client-side consumer with + * responses. + * + * TODO: Rename to ServerStateDataInfoRecordHandler. */ + #ifndef SQUID_SERVER_H #define SQUID_SERVER_H #include "StoreIOBuffer.h" #include "forward.h" +#include "BodyPipe.h" #if ICAP_CLIENT #include "ICAP/ICAPServiceRep.h" - -class ICAPClientRespmodPrecache; +#include "ICAP/ICAPInitiator.h" +#include "ICAP/ICAPModXact.h" class ICAPAccessCheck; #endif -class ServerStateData +class ServerStateData: +#if ICAP_CLIENT + public ICAPInitiator, + public BodyProducer, +#endif + public BodyConsumer { public: ServerStateData(FwdState *); virtual ~ServerStateData(); + // returns primary or "request data connection" fd + virtual int dataDescriptor() const = 0; + + // BodyConsumer: consume request body or adapted response body. + // The implementation just calls the corresponding HTTP or ICAP handle*() + // method, depending on the pipe. + virtual void noteMoreBodyDataAvailable(BodyPipe &); + virtual void noteBodyProductionEnded(BodyPipe &); + virtual void noteBodyProducerAborted(BodyPipe &); + + // read response data from the network + virtual void maybeReadVirginBody() = 0; + + // abnormal transaction termination; reason is for debugging only + virtual void abortTransaction(const char *reason) = 0; + #if ICAP_CLIENT - virtual bool takeAdaptedHeaders(HttpReply *) = 0; - virtual bool takeAdaptedBody(MemBuf *) = 0; - virtual void finishAdapting() = 0; - virtual void abortAdapting() = 0; - virtual void icapSpaceAvailable() = 0; virtual void icapAclCheckDone(ICAPServiceRep::Pointer) = 0; + + // ICAPInitiator: start an ICAP transaction and receive adapted headers. + virtual void noteIcapHeadersAdapted(); + virtual void noteIcapHeadersAborted(); + + // BodyProducer: provide virgin response body to ICAP. + virtual void noteMoreBodySpaceAvailable(BodyPipe &); + virtual void noteBodyConsumerAborted(BodyPipe &); #endif -public: - // should be protected +public: // should be protected + void serverComplete(); // call when no server communication is expected + +protected: + // kids customize these + virtual void haveParsedReplyHeaders(); // default does nothing + virtual void completeForwarding(); // default calls fwd->complete() + + // BodyConsumer for HTTP: consume request body. + void handleMoreRequestBodyAvailable(); + void handleRequestBodyProductionEnded(); + virtual void handleRequestBodyProducerAborted() = 0; + + // sending of the request body to the server + void sendMoreRequestBody(); + // has body; kids overwrite to increment I/O stats counters + virtual void sentRequestBody(int fd, size_t size, comm_err_t errflag) = 0; + virtual void doneSendingRequestBody() = 0; + static IOCB sentRequestBodyWrapper; + + virtual void closeServer() = 0; // end communication with the server + virtual bool doneWithServer() const = 0; // did we end communication? + + // Entry-dependent callbacks use this check to quit if the entry went bad + bool abortOnBadEntry(const char *abortReason); + +#if ICAP_CLIENT + bool startIcap(ICAPServiceRep::Pointer, HttpRequest *cause); + void cleanIcap(); + virtual bool doneWithIcap() const; // did we end ICAP communication? + + // BodyConsumer for ICAP: consume adapted response body. + void handleMoreAdaptedBodyAvailable(); + void handleAdaptedBodyProductionEnded(); + void handleAdaptedBodyProducerAborted(); + + void handleIcapCompleted(); + void handleIcapAborted(); +#endif + +public: // should not be StoreEntry *entry; FwdState::Pointer fwd; HttpRequest *request; HttpReply *reply; protected: + BodyPipe::Pointer requestBodySource; // to consume request body + IOCB *requestSender; // set if we are expecting comm_write to call us back + #if ICAP_CLIENT + BodyPipe::Pointer virginBodyDestination; // to provide virgin response body + ICAPModXact::Pointer adaptedHeadSource; // to get adapted response headers + BodyPipe::Pointer adaptedBodySource; // to consume adated response body - ICAPClientRespmodPrecache *icap; bool icapAccessCheckPending; - bool startIcap(ICAPServiceRep::Pointer); #endif +private: + void quitIfAllDone(); // successful termination + }; #endif /* SQUID_SERVER_H */ Index: squid3/src/ftp.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/ftp.cc,v retrieving revision 1.29.2.11 retrieving revision 1.29.2.12 diff -u -r1.29.2.11 -r1.29.2.12 --- squid3/src/ftp.cc 25 Jan 2007 21:09:07 -0000 1.29.2.11 +++ squid3/src/ftp.cc 14 Feb 2007 06:33:45 -0000 1.29.2.12 @@ -1,6 +1,6 @@ /* - * $Id: ftp.cc,v 1.29.2.11 2007/01/25 21:09:07 rousskov Exp $ + * $Id: ftp.cc,v 1.29.2.12 2007/02/14 06:33:45 rousskov Exp $ * * DEBUG: section 9 File Transfer Protocol (FTP) * AUTHOR: Harvest Derived @@ -56,8 +56,8 @@ #include "URLScheme.h" #if ICAP_CLIENT -#include "ICAP/ICAPClientRespmodPrecache.h" #include "ICAP/ICAPConfig.h" +#include "ICAP/ICAPModXact.h" extern ICAPConfig TheICAPConfig; static void icapAclCheckDoneWrapper(ICAPServiceRep::Pointer service, void *data); #endif @@ -107,6 +107,7 @@ bool put_mkdir; bool listformat_unknown; bool listing_started; + bool completed_forwarding; }; class FtpStateData; @@ -191,6 +192,7 @@ void listingFinish(); void scheduleReadControlReply(int); void handleControlReply(); + void readStor(); char *htmlifyListEntry(const char *line); void parseListing(); void dataComplete(); @@ -200,10 +202,11 @@ void buildTitleUrl(); void writeReplyBody(const char *, int len); void printfReplyBody(const char *fmt, ...); - void maybeReadData(); - void transactionComplete(); - void transactionForwardComplete(); - void transactionAbort(); + virtual int dataDescriptor() const; + virtual void maybeReadVirginBody(); + virtual void closeServer(); + virtual void completeForwarding(); + virtual void abortTransaction(const char *reason); void processReplyBody(); void writeCommand(const char *buf); @@ -211,27 +214,27 @@ static CNCB ftpPasvCallback; static IOCB dataReadWrapper; static PF ftpDataWrite; - static IOCB ftpDataWriteCallback; static PF ftpTimeout; static IOCB ftpReadControlReply; static IOCB ftpWriteCommandCallback; static HttpReply *ftpAuthRequired(HttpRequest * request, const char *realm); - static CBCB ftpRequestBody; static wordlist *ftpParseControlReply(char *, size_t, int *, int *); -#if ICAP_CLIENT + // sending of the request body to the server + virtual void sentRequestBody(int fd, size_t size, comm_err_t errflag); + virtual void doneSendingRequestBody(); + + virtual bool doneWithServer() const; + +private: + // BodyConsumer for HTTP: consume request body. + virtual void handleRequestBodyProducerAborted(); +#if ICAP_CLIENT public: void icapAclCheckDone(ICAPServiceRep::Pointer); - virtual bool takeAdaptedHeaders(HttpReply *); - virtual bool takeAdaptedBody(MemBuf *); - virtual void finishAdapting(); - virtual void abortAdapting(); - virtual void icapSpaceAvailable(); + bool icapAccessCheckPending; -private: - void backstabAdapter(); - void endAdapting(); #endif }; @@ -450,7 +453,7 @@ safe_free(dirpath); safe_free(data.host); - /* XXX this is also set to NULL in transactionForwardComplete */ + fwd = NULL; // refcounted } @@ -1152,15 +1155,14 @@ assert(t != NULL); #if ICAP_CLIENT - - if (icap) { - if ((int)strlen(t) > icap->potentialSpaceSize()) { + if (virginBodyDestination != NULL) { + // XXX: There are other places where writeReplyBody may overflow! + if ((int)strlen(t) > virginBodyDestination->buf().potentialSpaceSize()) { debugs(0,0,HERE << "WARNING avoid overwhelming ICAP with data!"); usable = s - sbuf; break; } } - #endif writeReplyBody(t, strlen(t)); @@ -1171,6 +1173,11 @@ xfree(sbuf); } +int +FtpStateData::dataDescriptor() const { + return data.fd; +} + void FtpStateData::dataComplete() { @@ -1199,7 +1206,7 @@ } void -FtpStateData::maybeReadData() +FtpStateData::maybeReadVirginBody() { if (data.fd < 0) return; @@ -1210,14 +1217,8 @@ int read_sz = data.readBuf->spaceSize(); #if ICAP_CLIENT - - if (icap) { - int icap_space = icap->potentialSpaceSize(); - - if (icap_space < read_sz) - read_sz = icap_space; - } - + // See HttpStateData::maybeReadVirginBody() for a size-limiting piece of + // code that used to be there. Hopefully, it is not really needed. #endif debugs(11,9, HERE << "FTP may read up to " << read_sz << " bytes"); @@ -1259,7 +1260,7 @@ #endif if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { - transactionAbort(); + abortTransaction("entry aborted during dataRead"); return; } @@ -1293,7 +1294,7 @@ if (ignoreErrno(xerrno)) { commSetTimeout(fd, Config.Timeout.read, ftpTimeout, this); - maybeReadData(); + maybeReadVirginBody(); } else { if (!flags.http_header_sent && !fwd->ftpPasvFailed() && flags.pasv_supported) { fwd->dontRetry(false); /* this is a retryable error */ @@ -1339,7 +1340,7 @@ storeBufferFlush(entry); - maybeReadData(); + maybeReadVirginBody(); } /* @@ -1485,7 +1486,7 @@ entry->replaceHttpReply(reply); - transactionComplete(); + serverComplete(); return; } @@ -1710,7 +1711,7 @@ return; if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { - ftpState->transactionAbort(); + ftpState->abortTransaction("entry aborted during control reply read"); return; } @@ -1742,8 +1743,8 @@ return; } - /* XXX this may end up having to be transactionComplete() .. */ - ftpState->transactionAbort(); + /* XXX this may end up having to be serverComplete() .. */ + ftpState->abortTransaction("zero control reply read"); return; } @@ -2194,7 +2195,7 @@ */ if (!EBIT_TEST(ftpState->entry->flags, ENTRY_ABORTED)) - ftpState->transactionForwardComplete(); + ftpState->completeForwarding(); ftpSendQuit(ftpState); @@ -2475,7 +2476,7 @@ return; if (EBIT_TEST(ftpState->entry->flags, ENTRY_ABORTED)) { - ftpState->transactionAbort(); + ftpState->abortTransaction("entry aborted when accepting data conn"); return; } @@ -2567,33 +2568,38 @@ static void ftpReadStor(FtpStateData * ftpState) { - int code = ftpState->ctrl.replycode; + ftpState->readStor(); +} + +void FtpStateData::readStor() { + int code = ctrl.replycode; debug(9, 3) ("This is ftpReadStor\n"); - if (code == 125 || (code == 150 && ftpState->data.host)) { + if (code == 125 || (code == 150 && data.host)) { /* Begin data transfer */ debug(9, 3) ("ftpReadStor: starting data transfer\n"); - commSetSelect(ftpState->data.fd, - COMM_SELECT_WRITE, - FtpStateData::ftpDataWrite, - ftpState, - Config.Timeout.read); + sendMoreRequestBody(); /* * Cancel the timeout on the Control socket and * establish one on the data socket. */ - commSetTimeout(ftpState->ctrl.fd, -1, NULL, NULL); - commSetTimeout(ftpState->data.fd, Config.Timeout.read, FtpStateData::ftpTimeout, - ftpState); - ftpState->state = WRITING_DATA; + commSetTimeout(ctrl.fd, -1, NULL, NULL); + commSetTimeout(data.fd, Config.Timeout.read, FtpStateData::ftpTimeout, + this); + + // register to receive body data + assert(request->body_pipe != NULL); + request->body_pipe->setConsumer(this); + + state = WRITING_DATA; debug(9, 3) ("ftpReadStor: writing data channel\n"); } else if (code == 150) { /* Accept data channel */ debug(9, 3) ("ftpReadStor: accepting data channel\n"); - comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState); + comm_accept(data.fd, ftpAcceptDataConnection, this); } else { debug(9, 3) ("ftpReadStor: Unexpected reply code %03d\n", code); - ftpFail(ftpState); + ftpFail(this); } } @@ -2684,7 +2690,7 @@ if (code == 125 || (code == 150 && ftpState->data.host)) { /* Begin data transfer */ /* XXX what about Config.Timeout.read? */ - ftpState->maybeReadData(); + ftpState->maybeReadVirginBody(); ftpState->state = READING_DATA; /* * Cancel the timeout on the Control socket and establish one @@ -2729,7 +2735,7 @@ /* Begin data transfer */ debug(9, 3) ("ftpReadRetr: reading data channel\n"); /* XXX what about Config.Timeout.read? */ - ftpState->maybeReadData(); + ftpState->maybeReadVirginBody(); ftpState->state = READING_DATA; /* * Cancel the timeout on the Control socket and establish one @@ -2780,55 +2786,22 @@ } } -/* This will be called when there is data available to put */ +// premature end of the request body void -FtpStateData::ftpRequestBody(MemBuf &mb, void *data) +FtpStateData::handleRequestBodyProducerAborted() { - FtpStateData *ftpState = (FtpStateData *) data; - debugs(9, 3, HERE << "ftpRequestBody: size=" << mb.contentSize() << " ftpState=%p" << data); - - if (mb.contentSize() > 0) { - /* DataWrite */ - comm_write(ftpState->data.fd, mb.content(), mb.contentSize(), FtpStateData::ftpDataWriteCallback, ftpState, NULL); - } else if (mb.contentSize() < 0) { - /* Error */ - debug(9, 1) ("ftpRequestBody: request aborted"); - ftpState->failed(ERR_READ_ERROR, 0); - } else if (mb.contentSize() == 0) { - /* End of transfer */ - ftpState->dataComplete(); - } + ServerStateData::handleRequestBodyProducerAborted(); + debugs(9, 3, HERE << "noteBodyProducerAborted: ftpState=" << this); + failed(ERR_READ_ERROR, 0); } /* This will be called when the put write is completed */ void -FtpStateData::ftpDataWriteCallback(int fd, char *buf, size_t size, comm_err_t err, int xerrno, void *data) +FtpStateData::sentRequestBody(int fd, size_t size, comm_err_t errflag) { - FtpStateData *ftpState = (FtpStateData *) data; - - if (err == COMM_ERR_CLOSING) - return; - - if (!err) { - /* Schedule the rest of the request */ - commSetSelect(fd, - COMM_SELECT_WRITE, - ftpDataWrite, - ftpState, - Config.Timeout.read); - } else { - debug(9, 1) ("ftpDataWriteCallback: write error: %s\n", xstrerr(xerrno)); - ftpState->failed(ERR_WRITE_ERROR, xerrno); - } -} - -void -FtpStateData::ftpDataWrite(int ftp, void *data) -{ - FtpStateData *ftpState = (FtpStateData *) data; - debug(9, 3) ("ftpDataWrite\n"); - /* This starts the body transfer */ - ftpState->request->body_reader->read(ftpRequestBody, ftpState); + if (size > 0) + kb_incr(&statCounter.server.ftp.kbytes_out, size); + ServerStateData::sentRequestBody(fd, size, errflag); } static void @@ -2860,8 +2833,8 @@ static void ftpReadQuit(FtpStateData * ftpState) { - /* XXX should this just be a case of transactionAbort? */ - ftpState->transactionComplete(); + /* XXX should this just be a case of abortTransaction? */ + ftpState->serverComplete(); } static void @@ -2954,7 +2927,7 @@ if (entry->isEmpty()) failedErrorMessage(error, xerrno); - transactionComplete(); + serverComplete(); } void @@ -3257,13 +3230,17 @@ FtpStateData::writeReplyBody(const char *data, int len) { #if ICAP_CLIENT - - if (icap) { + if (virginBodyDestination != NULL) { debugs(9,5,HERE << "writing " << len << " bytes to ICAP"); - icap->sendMoreData (StoreIOBuffer(len, 0, (char*)data)); + const size_t putSize = virginBodyDestination->putMoreData(data, len); + if (putSize != (size_t)len) { + // XXX: FTP writing should be rewritten to avoid temporary buffers + // because temporary buffers cannot handle overflows. + debugs(0,0,HERE << "ICAP cannot keep up with FTP; lost " << + (len - putSize) << '/' << len << " bytes."); + } return; } - #endif debugs(9,5,HERE << "writing " << len << " bytes to StoreEntry"); @@ -3273,48 +3250,36 @@ storeAppend(entry, data, len); } -/* - * We've completed with the forwardstate - finish up if necessary. - * This is a simple hack to ensure we don't double-complete on the - * forward entry. - */ +// called after we wrote the last byte of the request body void -FtpStateData::transactionForwardComplete() +FtpStateData::doneSendingRequestBody() { - debugs(9,5,HERE << "transactionForwardComplete FD " << ctrl.fd << ", Data FD " << data.fd << ", this " << this); - if (fwd == NULL) { - fwd->complete(); - /* XXX this is also set to NULL in the destructor, but we need to do it as early as possible.. -adrian */ - fwd = NULL; // refcounted - } - + debugs(9,3,HERE << "doneSendingRequestBody"); + ftpWriteTransferDone(this); } -/* - * Quickly abort a connection. - * This will, for now, just call comm_close(). That'll unravel everything - * properly (I hope!) by using abort handlers. This all has to change soon - * enough! - */ +// a hack to ensure we do not double-complete on the forward entry. +// TODO: FtpStateData logic should probably be rewritten to avoid +// double-completion or FwdState should be rewritten to allow it. void -FtpStateData::transactionAbort() +FtpStateData::completeForwarding() { - debugs(9,5,HERE << "transactionAbort FD " << ctrl.fd << ", Data FD " << data.fd << ", this " << this); - assert(ctrl.fd != -1); + if (fwd == NULL || flags.completed_forwarding) { + debugs(9,2,HERE << "completeForwarding avoids " << + "double-complete on FD " << ctrl.fd << ", Data FD " << data.fd << + ", this " << this << ", fwd " << fwd); + return; + } - comm_close(ctrl.fd); - /* We could have had our state data freed from underneath us here.. */ + flags.completed_forwarding = true; + ServerStateData::completeForwarding(); } -/* - * Done with the FTP server, so close those sockets. May not be - * done with ICAP yet though. Don't free ftpStateData if ICAP is - * still around. - */ +// Close the FTP server connection(s). Used by serverComplete(). void -FtpStateData::transactionComplete() +FtpStateData::closeServer() { - debugs(9,5,HERE << "transactionComplete FD " << ctrl.fd << ", Data FD " << data.fd << ", this " << this); + debugs(9,5, HERE << "closing FTP server FD " << ctrl.fd << ", Data FD " << data.fd << ", this " << this); if (ctrl.fd > -1) { fwd->unregister(ctrl.fd); @@ -3327,19 +3292,27 @@ comm_close(data.fd); data.fd = -1; } +} -#if ICAP_CLIENT - - if (icap) { - icap->doneSending(); - return; - } - -#endif - - transactionForwardComplete(); +// Did we close all FTP server connection(s)? +bool +FtpStateData::doneWithServer() const +{ + return ctrl.fd < 0 && data.fd < 0; +} - ftpSocketClosed(-1, this); +// Quickly abort the transaction +// TODO: destruction should be sufficient as the destructor should cleanup, +// including canceling close handlers +void +FtpStateData::abortTransaction(const char *reason) +{ + debugs(9,5,HERE << "aborting transaction for " << reason << + "; FD " << ctrl.fd << ", Data FD " << data.fd << ", this " << this); + if (ctrl.fd >= 0) + comm_close(ctrl.fd); + else + delete this; } #if ICAP_CLIENT @@ -3351,12 +3324,13 @@ ftpState->icapAclCheckDone(service); } +// TODO: merge with http.cc and move to Server.cc? void FtpStateData::icapAclCheckDone(ICAPServiceRep::Pointer service) { icapAccessCheckPending = false; - const bool startedIcap = startIcap(service); + const bool startedIcap = startIcap(service, request); if (!startedIcap && (!service || service->bypass)) { // handle ICAP start failure when no service was selected @@ -3375,109 +3349,7 @@ return; } - icap->startRespMod(this, request, reply); processReplyBody(); } -/* - * Called by ICAPClientRespmodPrecache when it has space available for us. - */ -void -FtpStateData::icapSpaceAvailable() -{ - debug(11,5)("FtpStateData::icapSpaceAvailable() called\n"); - maybeReadData(); -} - -bool -FtpStateData::takeAdaptedHeaders(HttpReply *rep) -{ - debug(11,5)("FtpStateData::takeAdaptedHeaders() called\n"); - - if (!entry->isAccepting()) { - debug(11,5)("\toops, entry is not Accepting!\n"); - backstabAdapter(); - return false; - } - - assert (rep); - entry->replaceHttpReply(rep); - HTTPMSGUNLOCK(reply); - - reply = HTTPMSGLOCK(rep); - - debug(11,5)("FtpStateData::takeAdaptedHeaders() finished\n"); - return true; -} - -bool -FtpStateData::takeAdaptedBody(MemBuf *buf) -{ - debug(11,5)("FtpStateData::takeAdaptedBody() called\n"); - debug(11,5)("\t%d bytes\n", (int) buf->contentSize()); - - if (!entry->isAccepting()) { - debug(11,5)("\toops, entry is not Accepting!\n"); - backstabAdapter(); - return false; - } - - storeAppend(entry, buf->content(), buf->contentSize()); - buf->consume(buf->contentSize()); // consume everything written - return true; -} - -void -FtpStateData::finishAdapting() -{ - debug(11,5)("FtpStateData::doneAdapting() called\n"); - - if (!entry->isAccepting()) { - debug(11,5)("\toops, entry is not Accepting!\n"); - backstabAdapter(); - } else { - transactionForwardComplete(); - endAdapting(); - } -} - -void -FtpStateData::abortAdapting() -{ - debug(11,5)("FtpStateData::abortAdapting() called\n"); - - if (entry->isEmpty()) { - ErrorState *err; - err = errorCon(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR, request); - err->xerrno = errno; - fwd->fail(err); - fwd->dontRetry(true); - } - - endAdapting(); -} - -// internal helper to terminate adotation when called by the adapter -void -FtpStateData::backstabAdapter() -{ - debug(11,5)("HttpStateData::backstabAdapter() called for %p\n", icap); - assert(icap); - icap->ownerAbort(); - endAdapting(); -} - -void -FtpStateData::endAdapting() -{ - delete icap; - icap = NULL; - - if (ctrl.fd >= 0) - comm_close(ctrl.fd); - else - delete this; -} - - #endif Index: squid3/src/http.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/http.cc,v retrieving revision 1.49.2.60 retrieving revision 1.49.2.61 diff -u -r1.49.2.60 -r1.49.2.61 --- squid3/src/http.cc 26 Oct 2006 05:38:56 -0000 1.49.2.60 +++ squid3/src/http.cc 14 Feb 2007 06:33:45 -0000 1.49.2.61 @@ -1,6 +1,6 @@ /* - * $Id: http.cc,v 1.49.2.60 2006/10/26 05:38:56 rousskov Exp $ + * $Id: http.cc,v 1.49.2.61 2007/02/14 06:33:45 rousskov Exp $ * * DEBUG: section 11 Hypertext Transfer Protocol (HTTP) * AUTHOR: Harvest Derived @@ -56,7 +56,6 @@ #include "DelayPools.h" #endif #if ICAP_CLIENT -#include "ICAP/ICAPClientRespmodPrecache.h" #include "ICAP/ICAPConfig.h" extern ICAPConfig TheICAPConfig; #endif @@ -75,7 +74,8 @@ static void icapAclCheckDoneWrapper(ICAPServiceRep::Pointer service, void *data); #endif -HttpStateData::HttpStateData(FwdState *theFwdState) : ServerStateData(theFwdState) +HttpStateData::HttpStateData(FwdState *theFwdState) : ServerStateData(theFwdState), + header_bytes_read(0), reply_bytes_read(0) { debugs(11,5,HERE << "HttpStateData " << this << " created"); ignoreCacheControl = false; @@ -141,11 +141,6 @@ * don't forget that ~ServerStateData() gets called automatically */ - if (orig_request->body_reader != NULL) { - orig_request->body_reader = NULL; - debugs(32,3,HERE << "setting body_reader = NULL for request " << orig_request); - } - if (!readBuf->isNull()) readBuf->clean(); @@ -153,7 +148,12 @@ HTTPMSGUNLOCK(orig_request); - debugs(11,5,HERE << "HttpStateData " << this << " destroyed"); + debugs(11,5, HERE << "HttpStateData " << this << " destroyed; FD " << fd); +} + +int +HttpStateData::dataDescriptor() const { + return fd; } static void @@ -161,9 +161,7 @@ { HttpStateData *httpState = static_cast(data); debug(11,5)("httpStateFree: FD %d, httpState=%p\n", fd, data); - - if (httpState) - delete httpState; + delete httpState; } int @@ -389,8 +387,8 @@ * condition */ #define REFRESH_OVERRIDE(flag) \ - ((R = (R ? R : refreshLimits(entry->mem_obj->url))) , \ - (R && R->flags.flag)) + ((R = (R ? R : refreshLimits(entry->mem_obj->url))) , \ + (R && R->flags.flag)) #else #define REFRESH_OVERRIDE(flag) 0 #endif @@ -468,9 +466,9 @@ */ if (!refreshIsCachable(entry)) { - debug(22, 3) ("refreshIsCachable() returned non-cacheable..\n"); + debug(22, 3) ("refreshIsCachable() returned non-cacheable..\n"); return 0; - } + } /* don't cache objects from peers w/o LMT, Date, or Expires */ /* check that is it enough to check headers @?@ */ @@ -645,7 +643,7 @@ entry->replaceHttpReply(reply); if (eof == 1) { - transactionComplete(); + serverComplete(); } } @@ -724,7 +722,8 @@ debug(11, 9) ("GOT HTTP REPLY HDR:\n---------\n%s\n----------\n", readBuf->content()); - readBuf->consume(headersEnd(readBuf->content(), readBuf->contentSize())); + header_bytes_read = headersEnd(readBuf->content(), readBuf->contentSize()); + readBuf->consume(header_bytes_read); flags.headers_parsed = 1; @@ -757,16 +756,14 @@ haveParsedReplyHeaders(); if (eof == 1) { - transactionComplete(); + serverComplete(); } ctx_exit(ctx); } -/* - * This function used to be joined with processReplyHeader(), but - * we split it for ICAP. - */ +// Called when we parsed (and possibly adapted) the headers but +// had not starting storing (a.k.a., sending) the body yet. void HttpStateData::haveParsedReplyHeaders() { @@ -845,11 +842,11 @@ EBIT_SET(entry->flags, ENTRY_REVALIDATE); } - ctx_exit(ctx); #if HEADERS_LOG - headersLog(1, 0, request->method, getReply()); #endif + + ctx_exit(ctx); } HttpStateData::ConnectionStatus @@ -899,41 +896,36 @@ HttpStateData::ConnectionStatus HttpStateData::persistentConnStatus() const { - int clen; debug(11, 3) ("persistentConnStatus: FD %d\n", fd); - ConnectionStatus result = statusIfComplete(); debug(11, 5) ("persistentConnStatus: content_length=%d\n", reply->content_length); - /* If we haven't seen the end of reply headers, we are not done */ + /* If we haven't seen the end of reply headers, we are not done */ debug(11,5)("persistentConnStatus: flags.headers_parsed=%d\n", flags.headers_parsed); - if (!flags.headers_parsed) return INCOMPLETE_MSG; - clen = reply->bodySize(request->method); - + const int clen = reply->bodySize(request->method); debug(11,5)("persistentConnStatus: clen=%d\n", clen); - /* If there is no message body, we can be persistent */ - if (0 == clen) - return result; - /* If the body size is unknown we must wait for EOF */ if (clen < 0) return INCOMPLETE_MSG; - /* If the body size is known, we must wait until we've gotten all of it. */ - /* old technique: - * if (entry->mem_obj->endOffset() < reply->content_length + reply->hdr_sz) */ - debug(11,5)("persistentConnStatus: body_bytes_read=%d, content_length=%d\n", - body_bytes_read, reply->content_length); + /* If the body size is known, we must wait until we've gotten all of it. */ + if (clen > 0) { + // old technique: + // if (entry->mem_obj->endOffset() < reply->content_length + reply->hdr_sz) + const int body_bytes_read = reply_bytes_read - header_bytes_read; + debugs(11,5, "persistentConnStatus: body_bytes_read=" << + body_bytes_read << " content_length=" << reply->content_length); - if (body_bytes_read < reply->content_length) - return INCOMPLETE_MSG; + if (body_bytes_read < reply->content_length) + return INCOMPLETE_MSG; + } - /* We got it all */ - return result; + /* If there is no message body or we got it all, we can be persistent */ + return statusIfComplete(); } /* @@ -968,7 +960,7 @@ } if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { - maybeReadData(); + maybeReadVirginBody(); return; } @@ -979,6 +971,7 @@ if (flag == COMM_OK && len > 0) { readBuf->appended(len); + reply_bytes_read += len; #if DELAY_POOLS DelayId delayId = entry->mem_obj->mostBytesAllowed(); @@ -1011,7 +1004,7 @@ /* Continue to read... */ /* Timeout NOT increased. This whitespace was from previous reply */ flags.do_next_read = 1; - maybeReadData(); + maybeReadVirginBody(); return; } } @@ -1048,10 +1041,10 @@ * definately at EOF, so we want to process the reply * headers. */ - PROF_start(HttpStateData_processReplyHeader); + PROF_start(HttpStateData_processReplyHeader); processReplyHeader(); - PROF_stop(HttpStateData_processReplyHeader); - } + PROF_stop(HttpStateData_processReplyHeader); + } else if (getReply()->sline.status == HTTP_INVALID_HEADER && HttpVersion(0,9) != getReply()->sline.version) { fwd->fail(errorCon(ERR_INVALID_RESP, HTTP_BAD_GATEWAY, fwd->request)); flags.do_next_read = 0; @@ -1063,14 +1056,14 @@ flags.do_next_read = 0; comm_close(fd); } else { - transactionComplete(); + serverComplete(); } } } else { if (!flags.headers_parsed) { - PROF_start(HttpStateData_processReplyHeader); + PROF_start(HttpStateData_processReplyHeader); processReplyHeader(); - PROF_stop(HttpStateData_processReplyHeader); + PROF_stop(HttpStateData_processReplyHeader); if (flags.headers_parsed) { bool fail = reply == NULL; @@ -1102,19 +1095,27 @@ * which should be sent to either StoreEntry, or to ICAP... */ void -HttpStateData::writeReplyBody(const char *data, int len) +HttpStateData::writeReplyBody() { -#if ICAP_CLIENT + const char *data = readBuf->content(); + int len = readBuf->contentSize(); - if (icap) { - icap->sendMoreData (StoreIOBuffer(len, 0, (char*)data)); +#if ICAP_CLIENT + if (virginBodyDestination != NULL) { + const size_t putSize = virginBodyDestination->putMoreData(data, len); + readBuf->consume(putSize); + return; + } + // Even if we are done with sending the virgin body to ICAP, we may still + // be waiting for adapted headers. We need them before writing to store. + if (adaptedHeadSource != NULL) { + debugs(11,5, HERE << "need adapted head from " << adaptedHeadSource); return; } - #endif entry->write (StoreIOBuffer(len, currentOffset, (char*)data)); - + readBuf->consume(len); currentOffset += len; } @@ -1130,14 +1131,13 @@ { if (!flags.headers_parsed) { flags.do_next_read = 1; - maybeReadData(); + maybeReadVirginBody(); return; } #if ICAP_CLIENT if (icapAccessCheckPending) return; - #endif /* @@ -1145,11 +1145,7 @@ * That means header content has been removed from readBuf and * it contains only body data. */ - writeReplyBody(readBuf->content(), readBuf->contentSize()); - - body_bytes_read += readBuf->contentSize(); - - readBuf->consume(readBuf->contentSize()); + writeReplyBody(); if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { /* @@ -1194,33 +1190,36 @@ fd = -1; - transactionComplete(); + serverComplete(); return; case COMPLETE_NONPERSISTENT_MSG: debug(11,5)("processReplyBody: COMPLETE_NONPERSISTENT_MSG\n"); - transactionComplete(); + serverComplete(); return; } - maybeReadData(); + maybeReadVirginBody(); } void -HttpStateData::maybeReadData() +HttpStateData::maybeReadVirginBody() { int read_sz = readBuf->spaceSize(); -#if ICAP_CLIENT - if (icap) { +#if ICAP_CLIENT +#if RE_ENABLE_THIS_IF_NEEDED_OR_DELETE + // This code is not broken, but is probably not needed because we + // probably can read more than will fit into the BodyPipe buffer. + if (virginBodyDestination != NULL) { /* - * Our ICAP message pipes have a finite size limit. We + * BodyPipe buffer has a finite size limit. We * should not read more data from the network than will fit * into the pipe buffer. If totally full, don't register * the read handler at all. The ICAP side will call our * icapSpaceAvailable() method when it has free space again. */ - int icap_space = icap->potentialSpaceSize(); + int icap_space = virginBodyDestination->buf().potentialSpaceSize(); debugs(11,9, "HttpStateData may read up to min(" << icap_space << ", " << read_sz << ") bytes"); @@ -1228,10 +1227,11 @@ if (icap_space < read_sz) read_sz = icap_space; } - +#endif #endif - debugs(11,9, "HttpStateData may read up to " << read_sz << " bytes"); + debugs(11,9, HERE << (flags.do_next_read ? "may" : "wont") << + " read up to " << read_sz << " bytes from FD " << fd); /* * why <2? Because delayAwareRead() won't actually read if @@ -1295,36 +1295,23 @@ httpState->flags.request_sent = 1; } -/* - * Calling this function marks the end of the HTTP transaction. - * i.e., done talking to the HTTP server. With ICAP, however, that - * does not mean that we're done with HttpStateData and the StoreEntry. - * We'll be expecting adapted data to come back from the ICAP - * routines. - */ +// Close the HTTP server connection. Used by serverComplete(). void -HttpStateData::transactionComplete() +HttpStateData::closeServer() { - debugs(11,5,HERE << "transactionComplete FD " << fd << " this " << this); - + debugs(11,5, HERE << "closing HTTP server FD " << fd << " this " << this); if (fd >= 0) { fwd->unregister(fd); comm_remove_close_handler(fd, httpStateFree, this); comm_close(fd); fd = -1; } +} -#if ICAP_CLIENT - if (icap) { - icap->doneSending(); - return; - } - -#endif - - fwd->complete(); - - httpStateFree(-1, this); +bool +HttpStateData::doneWithServer() const +{ + return fd < 0; } /* @@ -1736,20 +1723,22 @@ HttpStateData::sendRequest() { MemBuf mb; - IOCB *sendHeaderDone; - debug(11, 5) ("httpSendRequest: FD %d: this %p.\n", fd, this); + debug(11, 5) ("httpSendRequest: FD %d, request %p, this %p.\n", fd, request, this); commSetTimeout(fd, Config.Timeout.lifetime, httpTimeout, this); flags.do_next_read = 1; - maybeReadData(); - - debugs(32,3,HERE<< "request " << request << " body_reader = " << orig_request->body_reader.getRaw()); + maybeReadVirginBody(); - if (orig_request->body_reader != NULL) - sendHeaderDone = HttpStateData::SendRequestEntityWrapper; - else - sendHeaderDone = HttpStateData::SendComplete; + if (orig_request->body_pipe != NULL) { + requestBodySource = orig_request->body_pipe; + requestBodySource->setConsumer(this); + requestSender = HttpStateData::sentRequestBodyWrapper; + debugs(32,3, HERE << "expecting request body on pipe " << requestBodySource); + } else { + assert(!requestBodySource); + requestSender = HttpStateData::SendComplete; + } if (_peer != NULL) { if (_peer->options.originserver) { @@ -1788,7 +1777,7 @@ mb.init(); buildRequestPrefix(request, orig_request, entry, &mb, flags); debug(11, 6) ("httpSendRequest: FD %d:\n%s\n", fd, mb.buf); - comm_write_mbuf(fd, &mb, sendHeaderDone, this); + comm_write_mbuf(fd, &mb, requestSender, this); } void @@ -1813,10 +1802,10 @@ } void -HttpStateData::sendRequestEntityDone() +HttpStateData::doneSendingRequestBody() { ACLChecklist ch; - debug(11, 5) ("httpSendRequestEntityDone: FD %d\n", fd); + debugs(11,5, HERE << "doneSendingRequestBody: FD " << fd); ch.request = HTTPMSGLOCK(request); if (Config.accessList.brokenPosts) @@ -1825,42 +1814,35 @@ /* cbdataReferenceDone() happens in either fastCheck() or ~ACLCheckList */ if (!Config.accessList.brokenPosts) { - debug(11, 5) ("httpSendRequestEntityDone: No brokenPosts list\n"); + debug(11, 5) ("doneSendingRequestBody: No brokenPosts list\n"); HttpStateData::SendComplete(fd, NULL, 0, COMM_OK, 0, this); } else if (!ch.fastCheck()) { - debug(11, 5) ("httpSendRequestEntityDone: didn't match brokenPosts\n"); + debug(11, 5) ("doneSendingRequestBody: didn't match brokenPosts\n"); HttpStateData::SendComplete(fd, NULL, 0, COMM_OK, 0, this); } else { - debug(11, 2) ("httpSendRequestEntityDone: matched brokenPosts\n"); + debug(11, 2) ("doneSendingRequestBody: matched brokenPosts\n"); comm_write(fd, "\r\n", 2, HttpStateData::SendComplete, this, NULL); } } -/* - * RequestBodyHandlerWrapper - * - * BodyReader calls this when it has some body data for us. - * It is of type CBCB. - */ -void -HttpStateData::RequestBodyHandlerWrapper(MemBuf &mb, void *data) -{ - HttpStateData *httpState = static_cast(data); - httpState->requestBodyHandler(mb); -} - +// more origin request body data is available void -HttpStateData::requestBodyHandler(MemBuf &mb) +HttpStateData::handleMoreRequestBodyAvailable() { if (eof || fd < 0) { + // XXX: we should check this condition in other callbacks then! + // TODO: Check whether this can actually happen: We should unsubscribe + // as a body consumer when the above condition(s) are detected. debugs(11, 1, HERE << "Transaction aborted while reading HTTP body"); return; } - if (mb.contentSize() > 0) { + assert(requestBodySource != NULL); + if (requestBodySource->buf().hasContent()) { + // XXX: why does not this trigger a debug message on every request? if (flags.headers_parsed && !flags.abuse_detected) { flags.abuse_detected = 1; - debug(11, 1) ("httpSendRequestEntryDone: Likely proxy abuse detected '%s' -> '%s'\n", + debug(11, 1) ("http handleMoreRequestBodyAvailable: Likely proxy abuse detected '%s' -> '%s'\n", inet_ntoa(orig_request->client_addr), storeUrl(entry)); @@ -1869,98 +1851,41 @@ return; } } - - /* - * mb's content will be consumed in the SendRequestEntityWrapper - * callback after comm_write is done. - */ - flags.consume_body_data = 1; - - comm_write(fd, mb.content(), mb.contentSize(), SendRequestEntityWrapper, this, NULL); - } else if (orig_request->body_reader == NULL) { - /* Failed to get whole body, probably aborted */ - SendComplete(fd, NULL, 0, COMM_ERR_CLOSING, 0, this); - } else if (orig_request->body_reader->remaining() == 0) { - /* End of body */ - sendRequestEntityDone(); - } else { - /* Failed to get whole body, probably aborted */ - SendComplete(fd, NULL, 0, COMM_ERR_CLOSING, 0, this); } + + HttpStateData::handleMoreRequestBodyAvailable(); } +// premature end of the request body void -HttpStateData::SendRequestEntityWrapper(int fd, char *bufnotused, size_t size, comm_err_t errflag, int xerrno, void *data) +HttpStateData::handleRequestBodyProducerAborted() { - HttpStateData *httpState = static_cast(data); - httpState->sendRequestEntity(fd, size, errflag); + ServerStateData::handleRequestBodyProducerAborted(); + // XXX: SendComplete(COMM_ERR_CLOSING) does little. Is it enough? + SendComplete(fd, NULL, 0, COMM_ERR_CLOSING, 0, this); } +// called when we wrote request headers(!) or a part of the body void -HttpStateData::sendRequestEntity(int fd, size_t size, comm_err_t errflag) +HttpStateData::sentRequestBody(int fd, size_t size, comm_err_t errflag) { - debug(11, 5) ("httpSendRequestEntity: FD %d: size %d: errflag %d.\n", - fd, (int) size, errflag); - debugs(32,3,HERE << "httpSendRequestEntity called"); - - /* - * This used to be an assertion for body_reader != NULL. - * Currently there are cases where body_reader may become NULL - * before reaching this point in the code. This can happen - * because body_reader is attached to HttpRequest and other - * modules (client_side, ICAP) have access to HttpRequest->body - * reader. An aborted transaction may cause body_reader to - * become NULL between the time sendRequestEntity was registered - * and actually called. For now we'll abort the whole transaction, - * but this should be fixed so that the client/icap/server sides - * are cleaned up independently. - */ - - if (orig_request->body_reader == NULL) { - debugs(32,1,HERE << "sendRequestEntity body_reader became NULL, aborting transaction"); - comm_close(fd); - return; - } - - if (size > 0) { - fd_bytes(fd, size, FD_WRITE); - kb_incr(&statCounter.server.all.kbytes_out, size); + if (size > 0) kb_incr(&statCounter.server.http.kbytes_out, size); + ServerStateData::sentRequestBody(fd, size, errflag); +} - if (flags.consume_body_data) { - orig_request->body_reader->consume(size); - orig_request->body_reader->bytes_read += size; - debugs(32,3," HTTP server body bytes_read=" << orig_request->body_reader->bytes_read); - } - } - - if (errflag == COMM_ERR_CLOSING) - return; - - if (errflag) { - ErrorState *err; - err = errorCon(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request); - err->xerrno = errno; - fwd->fail(err); - comm_close(fd); - return; - } - - if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { +// Quickly abort the transaction +// TODO: destruction should be sufficient as the destructor should cleanup, +// including canceling close handlers +void +HttpStateData::abortTransaction(const char *reason) +{ + debugs(11,5, HERE << "aborting transaction for " << reason << + "; FD " << fd << ", this " << this); + if (fd >= 0) comm_close(fd); - return; - } - - size_t r = orig_request->body_reader->remaining(); - debugs(32,3,HERE << "body remaining = " << r); - - if (r) { - debugs(32,3,HERE << "reading more body data"); - orig_request->body_reader->read(RequestBodyHandlerWrapper, this); - } else { - debugs(32,3,HERE << "done reading body data"); - sendRequestEntityDone(); - } + else + delete this; } void @@ -1984,7 +1909,7 @@ { icapAccessCheckPending = false; - const bool startedIcap = startIcap(service); + const bool startedIcap = startIcap(service, orig_request); if (!startedIcap && (!service || service->bypass)) { // handle ICAP start failure when no service was selected @@ -1995,7 +1920,7 @@ processReplyBody(); if (eof == 1) - transactionComplete(); + serverComplete(); return; } @@ -2009,120 +1934,7 @@ return; } - icap->startRespMod(this, orig_request, reply); processReplyBody(); } -/* - * Called by ICAPClientRespmodPrecache when it has space available for us. - */ -void -HttpStateData::icapSpaceAvailable() -{ - debug(11,5)("HttpStateData::icapSpaceAvailable() called\n"); - maybeReadData(); -} - -bool -HttpStateData::takeAdaptedHeaders(HttpReply *rep) -{ - debug(11,5)("HttpStateData::takeAdaptedHeaders() called\n"); - - if (!entry->isAccepting()) { - debug(11,5)("\toops, entry is not Accepting!\n"); - backstabAdapter(); - return false; - } - - assert (rep); - entry->replaceHttpReply(rep); - HTTPMSGUNLOCK(reply); - - reply = HTTPMSGLOCK(rep); - - haveParsedReplyHeaders(); - - debug(11,5)("HttpStateData::takeAdaptedHeaders() finished\n"); - return true; -} - -bool -HttpStateData::takeAdaptedBody(MemBuf *buf) -{ - debug(11,5)("HttpStateData::takeAdaptedBody() called\n"); - debug(11,5)("\t%d bytes\n", (int) buf->contentSize()); - debug(11,5)("\t%d is current offset\n", (int)currentOffset); - - if (!entry->isAccepting()) { - debug(11,5)("\toops, entry is not Accepting!\n"); - backstabAdapter(); - return false; - } - - entry->write(StoreIOBuffer(buf, currentOffset)); // write everything - currentOffset += buf->contentSize(); - buf->consume(buf->contentSize()); // consume everything written - return true; -} - -// called when ICAP adaptation is about to finish successfully, destroys icap -// must be called by the ICAP code -void -HttpStateData::finishAdapting() -{ - debug(11,5)("HttpStateData::finishAdapting() called by %p\n", icap); - - if (!entry->isAccepting()) { // XXX: do we need this check here? - debug(11,5)("\toops, entry is not Accepting!\n"); - backstabAdapter(); - } else { - fwd->complete(); - endAdapting(); - } -} - -// called when there was an ICAP error, destroys icap -// must be called by the ICAP code -void -HttpStateData::abortAdapting() -{ - debug(11,5)("HttpStateData::abortAdapting() called by %p\n", icap); - - if (entry->isEmpty()) { - ErrorState *err; - err = errorCon(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR, request); - err->xerrno = errno; - fwd->fail( err); - fwd->dontRetry(true); - flags.do_next_read = 0; - } - - endAdapting(); -} - -// internal helper to terminate adotation when called by the adapter -void -HttpStateData::backstabAdapter() -{ - debug(11,5)("HttpStateData::backstabAdapter() called for %p\n", icap); - assert(icap); - icap->ownerAbort(); - endAdapting(); -} - -// internal helper to delete icap and close the HTTP connection -void -HttpStateData::endAdapting() -{ - debug(11,5)("HttpStateData::endAdapting() called, deleting %p\n", icap); - - delete icap; - icap = NULL; - - if (fd >= 0) - comm_close(fd); - else - httpStateFree(fd, this); // deletes us -} - #endif Index: squid3/src/http.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/http.h,v retrieving revision 1.11.4.19 retrieving revision 1.11.4.20 diff -u -r1.11.4.19 -r1.11.4.20 --- squid3/src/http.h 7 Oct 2006 05:05:55 -0000 1.11.4.19 +++ squid3/src/http.h 14 Feb 2007 06:33:45 -0000 1.11.4.20 @@ -1,6 +1,6 @@ /* - * $Id: http.h,v 1.11.4.19 2006/10/07 05:05:55 rousskov Exp $ + * $Id: http.h,v 1.11.4.20 2007/02/14 06:33:45 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -38,13 +38,10 @@ #include "comm.h" #include "forward.h" #include "Server.h" -#include "BodyReader.h" #if ICAP_CLIENT #include "ICAP/ICAPServiceRep.h" -class ICAPClientRespmodPrecache; - class ICAPAccessCheck; #endif @@ -56,30 +53,22 @@ ~HttpStateData(); static IOCB SendComplete; - static IOCB SendRequestEntityWrapper; static IOCB ReadReplyWrapper; - static CBCB RequestBodyHandlerWrapper; static void httpBuildRequestHeader(HttpRequest * request, HttpRequest * orig_request, StoreEntry * entry, HttpHeader * hdr_out, http_state_flags flags); + + virtual int dataDescriptor() const; /* should be private */ void sendRequest(); void processReplyHeader(); void processReplyBody(); void readReply(size_t len, comm_err_t flag, int xerrno); - void maybeReadData(); + virtual void maybeReadVirginBody(); // read response data from the network int cacheableReply(); -#if ICAP_CLIENT - virtual bool takeAdaptedHeaders(HttpReply *); - virtual bool takeAdaptedBody(MemBuf *); - virtual void finishAdapting(); // deletes icap - virtual void abortAdapting(); // deletes icap - virtual void icapSpaceAvailable(); -#endif - peer *_peer; /* peer request made to */ int eof; /* reached end-of-object? */ HttpRequest *orig_request; @@ -87,13 +76,15 @@ http_state_flags flags; off_t currentOffset; size_t read_sz; - int body_bytes_read; /* to find end of response, independent of StoreEntry */ + int header_bytes_read; // to find end of response, + int reply_bytes_read; // without relying on StoreEntry MemBuf *readBuf; bool ignoreCacheControl; bool surrogateNoStore; + void processSurrogateControl(HttpReply *); -#if ICAP_CLIENT +#if ICAP_CLIENT void icapAclCheckDone(ICAPServiceRep::Pointer); bool icapAccessCheckPending; #endif @@ -121,12 +112,20 @@ void failReply (HttpReply *reply, http_status const &status); void keepaliveAccounting(HttpReply *); void checkDateSkew(HttpReply *); - void haveParsedReplyHeaders(); - void transactionComplete(); - void writeReplyBody(const char *data, int len); - void sendRequestEntityDone(); + + virtual void haveParsedReplyHeaders(); + virtual void closeServer(); // end communication with the server + virtual bool doneWithServer() const; // did we end communication? + virtual void abortTransaction(const char *reason); // abnormal termination + + // consuming request body + virtual void handleMoreRequestBodyAvailable(); + virtual void handleRequestBodyProducerAborted(); + + void writeReplyBody(); + void doneSendingRequestBody(); void requestBodyHandler(MemBuf &); - void sendRequestEntity(int fd, size_t size, comm_err_t errflag); + virtual void sentRequestBody(int fd, size_t size, comm_err_t errflag); mb_size_t buildRequestPrefix(HttpRequest * request, HttpRequest * orig_request, StoreEntry * entry, @@ -135,8 +134,6 @@ static bool decideIfWeDoRanges (HttpRequest * orig_request); #if ICAP_CLIENT - void backstabAdapter(); - void endAdapting(); #endif private: