--------------------- PatchSet 4044 Date: 2007/02/14 06:10:21 Author: rousskov Branch: squid3-icap Tag: (none) Log: - Use BodyPipe instead of MsgPipe for receiving virgin and sending adapted message bodies. BodyPipe is not much different from MsgPipeBody, but it is better to use a "universal" class that the rest of Squid code now uses. One complication is that BodyPipes are currently not created for messages with zero-size bodies. The code had to be changed to not assume that a zero-size body comes with a pipe. - Use ICAPInitiator API to send "success" or "abort" messages to ICAP transaction initiator. Store virgin and adapted metadata as public fields (if the newly added ICAPInOut type) that the initiator can access when receiving our "successful adaptation" message. This keeps messages simple. - Using ICAPInitiator API and a "universal" BodyPipe API makes it possible to exchange bodies directly with client- or server-side code without ICAPClient* translators, which are now gone along with the ICAPInitXaction function in ICAPClient. - Use asynchronous start and swan song methods that are now supported by ICAPXaction. This should make transaction initiation and completion more robust in the presense of transaction errors and exceptions. - Do not send last-chunk in ICAP Preview with a null-body. I am consulting with other ICAP folks whether this behavior is correct, but it seems logical. It is possible that the old code would send the last-chunk under some Preview conditions with null-body, but I am not sure. - Polished debugging. Members: src/ICAP/ICAPClient.cc:1.1.2.1->1.1.2.2 src/ICAP/ICAPClient.h:1.1.2.2->1.1.2.3 src/ICAP/ICAPInOut.h:1.1->1.1.2.1 src/ICAP/ICAPModXact.cc:1.1.2.19->1.1.2.20 src/ICAP/ICAPModXact.h:1.1.2.8->1.1.2.9 Index: squid3/src/ICAP/ICAPClient.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/ICAP/ICAPClient.cc,v retrieving revision 1.1.2.1 retrieving revision 1.1.2.2 diff -u -r1.1.2.1 -r1.1.2.2 --- squid3/src/ICAP/ICAPClient.cc 21 Nov 2005 21:05:51 -0000 1.1.2.1 +++ squid3/src/ICAP/ICAPClient.cc 14 Feb 2007 06:10:22 -0000 1.1.2.2 @@ -1,37 +1,11 @@ #include "squid.h" -#include "ICAPModXact.h" #include "ICAPClient.h" -#include "http.h" void ICAPInitModule() { - /* - * ICAP's MsgPipe buffer needs to be at least as large - * as the HTTP read buffer. Otherwise HTTP may take - * data from the network that won't fit into the MsgPipe, - * which leads to a runtime assertion. - */ - assert(ICAP::MsgPipeBufSizeMax >= SQUID_TCP_SO_RCVBUF); + debugs(93,2, "ICAP Client module enabled."); } void ICAPCleanModule() -{} - -// initialize ICAP-specific ends of message pipes -void ICAPInitXaction(ICAPServiceRep::Pointer service, MsgPipe::Pointer virgin, MsgPipe::Pointer adapted) -{ - ICAPModXact::Pointer x = new ICAPModXact; - debugs(93,5, "ICAPInitXaction: " << x.getRaw()); - x->init(service, virgin, adapted, x); - // if we want to do something to the transaction after it is done, - // we need to keep a pointer to it -} - -// declared in ICAPModXact.h (ick?) -void ICAPNoteXactionDone(ICAPModXact::Pointer x) { - // nothing to be done here? - // refcounting will delete the transaction - // as soon as the last pointer to it is gone - debugs(93,5, "ICAPNoteXactionDone: " << x.getRaw()); } Index: squid3/src/ICAP/ICAPClient.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/ICAP/ICAPClient.h,v retrieving revision 1.1.2.2 retrieving revision 1.1.2.3 diff -u -r1.1.2.2 -r1.1.2.3 --- squid3/src/ICAP/ICAPClient.h 29 Sep 2006 23:27:15 -0000 1.1.2.2 +++ squid3/src/ICAP/ICAPClient.h 14 Feb 2007 06:10:22 -0000 1.1.2.3 @@ -1,6 +1,6 @@ /* - * $Id: ICAPClient.h,v 1.1.2.2 2006/09/29 23:27:15 dwsquid Exp $ + * $Id: ICAPClient.h,v 1.1.2.3 2007/02/14 06:10:22 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -34,17 +34,9 @@ #ifndef SQUID_ICAPCLIENT_H #define SQUID_ICAPCLIENT_H -#include "MsgPipe.h" // TODO: move; needed for ICAPInitXaction() -#include "ICAPServiceRep.h" // TODO: move; needed for ICAPInitXaction() - // ICAP-related things needed by code unaware of ICAP internals. extern void ICAPInitModule(); extern void ICAPCleanModule(); -// let ICAP initialize ICAP-specific ends of message pipes - -class MsgPipe; -extern void ICAPInitXaction(ICAPServiceRep::Pointer, MsgPipe::Pointer virgin, MsgPipe::Pointer adapted); - #endif /* SQUID_ICAPCLIENT_H */ --- /dev/null Wed Feb 14 13:37:19 2007 +++ squid3/src/ICAP/ICAPInOut.h Wed Feb 14 13:38:37 2007 @@ -0,0 +1,90 @@ + +/* + * $Id: ICAPInOut.h,v 1.1.2.1 2007/02/14 06:10:22 rousskov Exp $ + * + * + * SQUID Web Proxy Cache http://www.squid-cache.org/ + * ---------------------------------------------------------- + * + * Squid is the result of efforts by numerous individuals from + * the Internet community; see the CONTRIBUTORS file for full + * details. Many organizations have provided support for Squid's + * development; see the SPONSORS file for full details. Squid is + * Copyrighted (C) 2001 by the Regents of the University of + * California; see the COPYRIGHT file for full details. Squid + * incorporates software developed and/or copyrighted by other + * sources; see the CREDITS file for full details. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. + * + */ + +#ifndef SQUID_ICAPINOUT_H +#define SQUID_ICAPINOUT_H + +#include "HttpMsg.h" +#include "HttpRequest.h" +#include "HttpReply.h" + +// IcapInOut manages a pointer to the HTTP message being worked on. +// For HTTP responses, request header information is also available +// as the "cause". ICAP transactions use this class to store virgin +// and adapted HTTP messages. + +class ICAPInOut +{ + +public: + typedef HttpMsg Header; + + ICAPInOut(): header(0), cause(0) {} + + ~ICAPInOut() + { + HTTPMSGUNLOCK(cause); + HTTPMSGUNLOCK(header); + } + + void setCause(HttpRequest *r) + { + if (r) { + HTTPMSGUNLOCK(cause); + cause = HTTPMSGLOCK(r); + } else { + assert(!cause); + } + } + + void setHeader(Header *h) + { + HTTPMSGUNLOCK(header); + header = HTTPMSGLOCK(h); + body_pipe = header->body_pipe; + } + +public: + // virgin or adapted message being worked on + Header *header; // parsed HTTP status/request line and headers + + // HTTP request header for HTTP responses (the cause of the response) + HttpRequest *cause; + + // Copy of header->body_pipe, in case somebody moves the original. + BodyPipe::Pointer body_pipe; +}; + +// TODO: s/Header/Message/i ? + +#endif /* SQUID_ICAPINOUT_H */ Index: squid3/src/ICAP/ICAPModXact.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/ICAP/ICAPModXact.cc,v retrieving revision 1.1.2.19 retrieving revision 1.1.2.20 diff -u -r1.1.2.19 -r1.1.2.20 --- squid3/src/ICAP/ICAPModXact.cc 14 Dec 2006 05:17:53 -0000 1.1.2.19 +++ squid3/src/ICAP/ICAPModXact.cc 14 Feb 2007 06:10:21 -0000 1.1.2.20 @@ -4,11 +4,11 @@ #include "squid.h" #include "comm.h" -#include "MsgPipe.h" -#include "MsgPipeData.h" +#include "HttpMsg.h" #include "HttpRequest.h" #include "HttpReply.h" #include "ICAPServiceRep.h" +#include "ICAPInitiator.h" #include "ICAPModXact.h" #include "ICAPClient.h" #include "ChunkedCodingParser.h" @@ -21,14 +21,11 @@ // HTTP| --> receive --> encode --> write --> |network // end | <-- send <-- parse <-- read <-- |end -// TODO: doneSending()/doneReceving() data members should probably be in sync -// with this->adapted/virgin pointers. Make adapted/virgin methods? - // TODO: replace gotEncapsulated() with something faster; we call it often CBDATA_CLASS_INIT(ICAPModXact); -static const size_t TheBackupLimit = ICAP::MsgPipeBufSizeMax; +static const size_t TheBackupLimit = BodyPipe::MaxCapacity; extern ICAPConfig TheICAPConfig; @@ -38,34 +35,22 @@ memset(this, sizeof(*this), 0); } -ICAPModXact::ICAPModXact(): ICAPXaction("ICAPModXact"), - self(NULL), virgin(NULL), adapted(NULL), - icapReply(NULL), virginConsumed(0), - bodyParser(NULL) -{} - -void ICAPModXact::init(ICAPServiceRep::Pointer &aService, MsgPipe::Pointer &aVirgin, MsgPipe::Pointer &anAdapted, Pointer &aSelf) +ICAPModXact::ICAPModXact(ICAPInitiator *anInitiator, HttpMsg *virginHeader, + HttpRequest *virginCause, ICAPServiceRep::Pointer &aService): + ICAPXaction("ICAPModXact"), + initiator(cbdataReference(anInitiator)), + icapReply(NULL), + virginConsumed(0), + bodyParser(NULL) { - assert(!self.getRaw() && !virgin.getRaw() && !adapted.getRaw()); - assert(aSelf.getRaw() && aVirgin.getRaw() && anAdapted.getRaw()); + assert(virginHeader); - self = aSelf; service(aService); - virgin = aVirgin; - adapted = anAdapted; + virgin.setHeader(virginHeader); // sets virgin.body_pipe if needed + virgin.setCause(virginCause); // may be NULL - // receiving end - virgin->sink = this; // should be 'self' and refcounted - // virgin pipe data is initiated by the source - - // sending end - adapted->source = this; // should be 'self' and refcounted - adapted->data = new MsgPipeData; - - adapted->data->body = new MemBuf; // XXX: make body a non-pointer? - adapted->data->body->init(ICAP::MsgPipeBufSizeMin, ICAP::MsgPipeBufSizeMax); - // headers are initialized when we parse them + // adapted header and body are initialized when we parse them // writing and reading ends are handled by ICAPXaction @@ -76,16 +61,15 @@ icapReply = new HttpReply; icapReply->protoPrefix = "ICAP/"; // TODO: make an IcapReply class? - // XXX: make sure stop() cleans all buffers + debugs(93,7, "ICAPModXact initialized." << status()); } -// HTTP side starts sending virgin data -void ICAPModXact::noteSourceStart(MsgPipe *p) +// initiator wants us to start +void ICAPModXact::start() { - ICAPXaction_Enter(noteSourceStart); + ICAPXaction_Enter(start); - // make sure TheBackupLimit is in-sync with the buffer size - Must(TheBackupLimit <= static_cast(virgin->data->body->max_capacity)); + ICAPXaction::start(); estimateVirginBody(); // before virgin disappears! @@ -99,6 +83,7 @@ // XXX: but this has to be here to catch other errors. Thus, if // commConnectStart in startWriting fails, we may get here //_after_ the object got destroyed. Somebody please fix commConnectStart! + // XXX: Is the above comment still valid? ICAPXaction_Exit(); } @@ -113,7 +98,7 @@ void ICAPModXact::waitForService() { Must(!state.serviceWaiting); - debugs(93, 7, "ICAPModXact will wait for the ICAP service " << status()); + debugs(93, 7, "ICAPModXact will wait for the ICAP service" << status()); state.serviceWaiting = true; service().callWhenReady(&ICAPModXact_noteServiceReady, this); } @@ -151,13 +136,12 @@ requestBuf.init(); makeRequestHeaders(requestBuf); - debugs(93, 9, "ICAPModXact ICAP status " << status() << " will write:\n" << + debugs(93, 9, "ICAPModXact ICAP will write" << status() << ":\n" << (requestBuf.terminate(), requestBuf.content())); // write headers state.writing = State::writingHeaders; scheduleWrite(requestBuf); - virgin->sendSinkNeed(); } void ICAPModXact::handleCommWrote(size_t sz) @@ -174,18 +158,24 @@ { Must(state.writing == State::writingHeaders); - if (virginBody.expected()) { - state.writing = preview.enabled() ? - State::writingPreview : State::writingPrime; - virginWriteClaim.protectAll(); - writeMore(); - } else { + // determine next step + if (preview.enabled()) + state.writing = preview.done() ? State::writingPaused : State::writingPreview; + else + if (virginBody.expected()) + state.writing = State::writingPrime; + else { stopWriting(true); + return; } + + writeMore(); } void ICAPModXact::writeMore() { + debugs(93, 5, HERE << "checking whether to write more" << status()); + if (writer) // already writing something return; @@ -208,7 +198,7 @@ return; case State::writingPreview: - writePriviewBody(); + writePreviewBody(); return; case State::writingPrime: @@ -220,19 +210,21 @@ } } -void ICAPModXact::writePriviewBody() +void ICAPModXact::writePreviewBody() { - debugs(93, 8, "ICAPModXact will write Preview body " << status()); + debugs(93, 8, HERE << "will write Preview body from " << + virgin.body_pipe << status()); Must(state.writing == State::writingPreview); + Must(virgin.body_pipe != NULL); - MsgPipeData::Body *body = virgin->data->body; - const size_t size = XMIN(preview.debt(), (size_t)body->contentSize()); + const size_t sizeMax = (size_t)virgin.body_pipe->buf().contentSize(); + const size_t size = XMIN(preview.debt(), sizeMax); writeSomeBody("preview body", size); // change state once preview is written if (preview.done()) { - debugs(93, 7, "ICAPModXact wrote entire Preview body " << status()); + debugs(93, 7, "ICAPModXact wrote entire Preview body" << status()); if (preview.ieof()) stopWriting(true); @@ -246,12 +238,11 @@ Must(state.writing == State::writingPrime); Must(virginWriteClaim.active()); - MsgPipeData::Body *body = virgin->data->body; - const size_t size = body->contentSize(); + const size_t size = (size_t)virgin.body_pipe->buf().contentSize(); writeSomeBody("prime virgin body", size); - if (state.doneReceiving && claimSize(virginWriteClaim) <= 0) { - debugs(93, 5, HERE << "state.doneReceiving is set and wrote all"); + if (doneWithClaim(virginWriteClaim)) { + debugs(93, 5, HERE << "wrote entire body"); stopWriting(true); } } @@ -259,6 +250,7 @@ void ICAPModXact::writeSomeBody(const char *label, size_t size) { Must(!writer && state.writing < state.writingAlmostDone); + Must(virgin.body_pipe != NULL); debugs(93, 8, HERE << "will write up to " << size << " bytes of " << label); @@ -276,13 +268,13 @@ debugs(93, 7, "ICAPModXact has no writable " << label << " content"); } - moveRequestChunk(writeBuf, chunkSize); + moveRequestChunk(writeBuf, chunkSize); // even if chunkSize == 0 const bool lastChunk = (state.writing == State::writingPreview && preview.done()) || - (state.doneReceiving && claimSize(virginWriteClaim) <= 0); + doneWithClaim(virginWriteClaim); - if (lastChunk && virginBody.expected()) { + if (lastChunk) { debugs(93, 8, HERE << "will write last-chunk of " << label); addLastRequestChunk(writeBuf); } @@ -309,9 +301,10 @@ } if (state.writing == State::writingPreview) { - // even if we are doneReceiving, we may not have written everything - const bool wroteEof = state.doneReceiving && - claimSize(virginWriteClaim) <= 0; + // if there is no active virginWriteClaim, + // then there is no body left to write, + // so we must have written everything + const bool wroteEof = !virginWriteClaim.active(); preview.wrote(chunkSize, wroteEof); // even if wrote nothing } } @@ -333,11 +326,21 @@ buf.append(ICAP::crlf, 2); // chunk-terminating CRLF } +// can we stop the activity (i.e., writing or sending) protected by the claim? +bool ICAPModXact::doneWithClaim(const MemBufClaim &claim) const +{ + // handled all (assuming the claim was originally enabled) OR + // handled everything we will ever had + return + !claim.active() || + (virgin.body_pipe->productionEnded() && claimSize(claim) <= 0); +} + size_t ICAPModXact::claimSize(const MemBufClaim &claim) const { Must(claim.active()); const size_t start = claim.offset(); - const size_t end = virginConsumed + virgin->data->body->contentSize(); + const size_t end = virginConsumed + virgin.body_pipe->buf().contentSize(); Must(virginConsumed <= start && start <= end); return end - start; } @@ -347,13 +350,16 @@ Must(claim.active()); const size_t start = claim.offset(); Must(virginConsumed <= start); - return virgin->data->body->content() + (start - virginConsumed); + return virgin.body_pipe->buf().content() + (start-virginConsumed); } void ICAPModXact::virginConsume() { - MemBuf &buf = *virgin->data->body; - const size_t have = static_cast(buf.contentSize()); + if (!virgin.body_pipe) + return; + + BodyPipe &bp = *virgin.body_pipe; + const size_t have = static_cast(bp.buf().contentSize()); const size_t end = virginConsumed + have; size_t offset = end; @@ -368,11 +374,8 @@ if (const size_t size = offset - virginConsumed) { debugs(93, 8, HERE << "consuming " << size << " out of " << have << " virgin body bytes"); - buf.consume(size); + bp.consume(size); virginConsumed += size; - - if (!state.doneReceiving) - virgin->sendSinkNeed(); } } @@ -391,26 +394,28 @@ if (writer) { if (nicely) { - debugs(93, 7, HERE << "will wait for the last write " << status()); + debugs(93, 7, HERE << "will wait for the last write" << status()); state.writing = State::writingAlmostDone; // may already be set + checkConsuming(); return; } - debugs(93, 2, HERE << "will NOT wait for the last write " << status()); + debugs(93, 2, HERE << "will NOT wait for the last write" << status()); // Comm does not have an interface to clear the writer callback nicely, // but without clearing the writer we cannot recycle the connection. // We prevent connection reuse and hope that we can handle a callback - // call at any time. Somebody should either fix this code or add - // comm_remove_write_handler() to comm API. + // call at any time, usually in the middle of the destruction sequence! + // Somebody should add comm_remove_write_handler() to comm API. reuseConnection = false; } - debugs(93, 7, HERE << "will no longer write " << status()); + debugs(93, 7, HERE << "will no longer write" << status()); state.writing = State::writingReallyDone; - virginWriteClaim.disable(); - - virginConsume(); + if (virginWriteClaim.active()) { + virginWriteClaim.disable(); + virginConsume(); + } } void ICAPModXact::stopBackup() @@ -418,17 +423,15 @@ if (!virginSendClaim.active()) return; - debugs(93, 7, "ICAPModXact will no longer backup " << status()); - + debugs(93, 7, "ICAPModXact will no longer backup" << status()); virginSendClaim.disable(); - virginConsume(); } bool ICAPModXact::doneAll() const { return ICAPXaction::doneAll() && !state.serviceWaiting && - state.doneReceiving && doneSending() && + doneSending() && doneReading() && state.doneWriting(); } @@ -436,9 +439,8 @@ { Must(connection >= 0); Must(!reader); - Must(adapted.getRaw()); - Must(adapted->data); - Must(adapted->data->body); + Must(!adapted.header); + Must(!adapted.body_pipe); // we use the same buffer for headers and body and then consume headers readMore(); @@ -452,8 +454,9 @@ } // do not fill readBuf if we have no space to store the result - if (!adapted->data->body->hasPotentialSpace()) { - debugs(93,3,HERE << "not reading because ICAP reply buffer is full"); + if (adapted.body_pipe != NULL && + !adapted.body_pipe->buf().hasPotentialSpace()) { + debugs(93,3,HERE << "not reading because ICAP reply pipe is full"); return; } @@ -474,37 +477,34 @@ void ICAPModXact::echoMore() { Must(state.sending == State::sendingVirgin); + Must(adapted.body_pipe != NULL); Must(virginSendClaim.active()); - MemBuf &from = *virgin->data->body; - MemBuf &to = *adapted->data->body; - const size_t sizeMax = claimSize(virginSendClaim); - const size_t size = XMIN(static_cast(to.potentialSpaceSize()), - sizeMax); - debugs(93, 5, "ICAPModXact echos " << size << " out of " << sizeMax << - " bytes"); + debugs(93,5, HERE << "will echo up to " << sizeMax << " bytes from " << + virgin.body_pipe->status() << " to " << adapted.body_pipe->status()); - if (size > 0) { - to.append(claimContent(virginSendClaim), size); + if (sizeMax > 0) { + const size_t size = adapted.body_pipe->putMoreData(claimContent(virginSendClaim), sizeMax); + debugs(93,5, HERE << "echoed " << size << " out of " << sizeMax << + " bytes"); virginSendClaim.release(size); virginConsume(); - adapted->sendSourceProgress(); } - if (state.doneReceiving && claimSize(virginSendClaim) <= 0) { - debugs(93, 5, "ICAPModXact echoed all " << status()); + if (doneWithClaim(virginSendClaim)) { + debugs(93, 5, "ICAPModXact echoed all" << status()); stopSending(true); } else { - debugs(93, 5, "ICAPModXact has " << from.contentSize() << " bytes " << - "and expects more to echo " << status()); - virgin->sendSinkNeed(); // TODO: timeout if sink is broken + debugs(93, 5, "ICAPModXact has " << + virgin.body_pipe->buf().contentSize() << " bytes " << + "and expects more to echo" << status()); + // TODO: timeout if virgin or adapted pipes are broken } } bool ICAPModXact::doneSending() const { - Must((state.sending == State::sendingDone) == (!adapted)); return state.sending == State::sendingDone; } @@ -514,38 +514,32 @@ return; if (state.sending != State::sendingUndecided) { - debugs(93, 7, "ICAPModXact will no longer send " << status()); - - if (nicely) - adapted->sendSourceFinish(); - else - adapted->sendSourceAbort(); + debugs(93, 7, "ICAPModXact will no longer send" << status()); + if (adapted.body_pipe != NULL) { + virginSendClaim.disable(); + // we may leave debts if we were echoing and the virgin + // body_pipe got exhausted before we echoed all planned bytes + const bool leftDebts = adapted.body_pipe->needsMoreData(); + stopProducingFor(adapted.body_pipe, nicely && !leftDebts); + } } else { - debugs(93, 7, "ICAPModXact will not start sending " << status()); - adapted->sendSourceAbort(); // or the sink may wait forever + debugs(93, 7, "ICAPModXact will not start sending" << status()); + Must(!adapted.body_pipe); } state.sending = State::sendingDone; - - adapted = NULL; // refcounted + checkConsuming(); } -void ICAPModXact::stopReceiving() +// should be called after certain state.writing or state.sending changes +void ICAPModXact::checkConsuming() { - // stopSending NULLifies adapted but we do not NULLify virgin. - // This is assymetric because we want to keep virgin->data even - // though we are not expecting any more virgin->data->body. - // TODO: can we cache just the needed headers info instead? - - // If they closed first, there is not point (or means) to notify them. - - if (state.doneReceiving) + // quit if we already stopped or are still using the pipe + if (!virgin.body_pipe || !state.doneConsumingVirgin()) return; - // There is no sendSinkFinished() to notify the other side. - debugs(93, 7, "ICAPModXact will not receive " << status()); - - state.doneReceiving = true; + debugs(93, 7, HERE << "will stop consuming" << status()); + stopConsumingFrom(virgin.body_pipe); } void ICAPModXact::parseMore() @@ -564,13 +558,13 @@ // note that allocation for echoing is done in handle204NoContent() void ICAPModXact::maybeAllocateHttpMsg() { - if (adapted->data->header) // already allocated + if (adapted.header) // already allocated return; if (gotEncapsulated("res-hdr")) { - adapted->data->setHeader(new HttpReply); + adapted.setHeader(new HttpReply); } else if (gotEncapsulated("req-hdr")) { - adapted->data->setHeader(new HttpRequest); + adapted.setHeader(new HttpRequest); } else throw TexcHere("Neither res-hdr nor req-hdr in maybeAllocateHttpMsg()"); } @@ -594,7 +588,8 @@ return; } - adapted->sendSourceStart(); + AsyncCall(93,5, initiator, ICAPInitiator::noteIcapHeadersAdapted); + cbdataReferenceDone(initiator); if (state.sending == State::sendingVirgin) echoMore(); @@ -668,10 +663,10 @@ void ICAPModXact::handle100Continue() { Must(state.writing == State::writingPaused); + // server must not respond before the end of preview: we may send ieof Must(preview.enabled() && preview.done() && !preview.ieof()); - Must(virginSendClaim.active()); - if (virginSendClaim.limited()) // preview only + if (virginSendClaim.active() && virginSendClaim.limited()) // preview only stopBackup(); state.parsing = State::psIcapHeader; // eventually @@ -687,21 +682,19 @@ state.parsing = State::psHttpHeader; state.sending = State::sendingAdapted; stopBackup(); + checkConsuming(); } void ICAPModXact::handle204NoContent() { stopParsing(); - Must(virginSendClaim.active()); - virginSendClaim.protectAll(); // extends protection if needed - state.sending = State::sendingVirgin; // We want to clone the HTTP message, but we do not want // to copy non-HTTP state parts that HttpMsg kids carry in them. // Thus, we cannot use a smart pointer, copy constructor, or equivalent. // Instead, we simply write the HTTP message and "clone" it by parsing. - HttpMsg *oldHead = virgin->data->header; + HttpMsg *oldHead = virgin.header; debugs(93, 7, "ICAPModXact cloning virgin message " << oldHead); MemBuf httpBuf; @@ -711,7 +704,7 @@ packHead(httpBuf, oldHead); // allocate the adapted message and copy metainfo - Must(!adapted->data->header); + Must(!adapted.header); HttpMsg *newHead = NULL; if (const HttpRequest *oldR = dynamic_cast(oldHead)) { HttpRequest *newR = new HttpRequest; @@ -722,7 +715,7 @@ newHead = new HttpReply; Must(newHead); - adapted->data->setHeader(newHead); + adapted.setHeader(newHead); // parse the buffer back http_status error = HTTP_STATUS_NONE; @@ -733,7 +726,28 @@ httpBuf.clean(); - debugs(93, 7, "ICAPModXact cloned virgin message " << oldHead << " to " << newHead); + debugs(93, 7, "ICAPModXact cloned virgin message " << oldHead << " to " << + newHead); + + // setup adapted body pipe if needed + if (oldHead->body_pipe != NULL) { + debugs(93, 7, HERE << "will echo virgin body from " << + oldHead->body_pipe); + state.sending = State::sendingVirgin; + checkConsuming(); + Must(virginSendClaim.active()); + virginSendClaim.protectAll(); // extends protection if needed + // TODO: optimize: is it possible to just use the oldHead pipe and + // remove ICAP from the loop? This echoing is probably a common case! + makeAdaptedBodyPipe("echoed virgin response"); + if (oldHead->body_pipe->bodySizeKnown()) + adapted.body_pipe->setBodySize(oldHead->body_pipe->bodySize()); + debugs(93, 7, HERE << "will echo virgin body to " << + adapted.body_pipe); + } else { + debugs(93, 7, HERE << "no virgin body to echo"); + stopSending(true); + } } void ICAPModXact::handleUnknownScode() @@ -751,11 +765,11 @@ if (gotEncapsulated("res-hdr") || gotEncapsulated("req-hdr")) { maybeAllocateHttpMsg(); - if (!parseHead(adapted->data->header)) + if (!parseHead(adapted.header)) return; // need more header data } - state.parsing = State::psBody; + decideOnParsingBody(); } // parses both HTTP and ICAP headers @@ -780,39 +794,40 @@ return true; } -void ICAPModXact::parseBody() -{ - Must(state.parsing == State::psBody); - - debugs(93, 5, HERE << "have " << readBuf.contentSize() << " body bytes to parse"); - +void ICAPModXact::decideOnParsingBody() { if (gotEncapsulated("res-body") || gotEncapsulated("req-body")) { - if (!parsePresentBody()) // need more body data - return; + debugs(93, 5, HERE << "expecting a body"); + state.parsing = State::psBody; + bodyParser = new ChunkedCodingParser; + makeAdaptedBodyPipe("adapted response from the ICAP server"); + Must(state.sending == State::sendingAdapted); } else { debugs(93, 5, HERE << "not expecting a body"); + stopParsing(); + stopSending(true); } - - stopParsing(); - stopSending(true); } -// returns true iff complete body was parsed -bool ICAPModXact::parsePresentBody() +void ICAPModXact::parseBody() { - if (!bodyParser) - bodyParser = new ChunkedCodingParser; + Must(state.parsing == State::psBody); + Must(bodyParser); - // the parser will throw on errors - const bool parsed = bodyParser->parse(&readBuf, adapted->data->body); + debugs(93, 5, HERE << "have " << readBuf.contentSize() << " body bytes to parse"); - adapted->sendSourceProgress(); // TODO: do not send if parsed nothing + // the parser will throw on errors + BodyPipeCheckout bpc(*adapted.body_pipe); + const bool parsed = bodyParser->parse(&readBuf, &bpc.buf); + bpc.checkIn(); debugs(93, 5, HERE << "have " << readBuf.contentSize() << " body bytes after " << "parse; parsed all: " << parsed); - if (parsed) - return true; + if (parsed) { + stopParsing(); + stopSending(true); // the parser succeeds only if all parsed data fits + return; + } debugs(93,3,HERE << this << " needsMoreData = " << bodyParser->needsMoreData()); @@ -824,11 +839,10 @@ if (bodyParser->needsMoreSpace()) { Must(!doneSending()); // can hope for more space - Must(adapted->data->body->hasContent()); // paranoid - // TODO: there should be a timeout in case the sink is broken. + Must(adapted.body_pipe->buf().contentSize() > 0); // paranoid + // TODO: there should be a timeout in case the sink is broken + // or cannot consume partial content (while we need more space) } - - return false; } void ICAPModXact::stopParsing() @@ -836,7 +850,7 @@ if (state.parsing == State::psDone) return; - debugs(93, 7, "ICAPModXact will no longer parse " << status()); + debugs(93, 7, "ICAPModXact will no longer parse" << status()); delete bodyParser; @@ -846,11 +860,10 @@ } // HTTP side added virgin body data -void ICAPModXact::noteSourceProgress(MsgPipe *p) +void ICAPModXact::noteMoreBodyDataAvailable(BodyPipe &) { - ICAPXaction_Enter(noteSourceProgress); + ICAPXaction_Enter(noteMoreBodyDataAvailable); - Must(!state.doneReceiving); writeMore(); if (state.sending == State::sendingVirgin) @@ -860,12 +873,11 @@ } // HTTP side sent us all virgin info -void ICAPModXact::noteSourceFinish(MsgPipe *p) +void ICAPModXact::noteBodyProductionEnded(BodyPipe &) { - ICAPXaction_Enter(noteSourceFinish); + ICAPXaction_Enter(noteBodyProductionEnded); - Must(!state.doneReceiving); - stopReceiving(); + Must(virgin.body_pipe->productionEnded()); // push writer and sender in case we were waiting for the last-chunk writeMore(); @@ -876,22 +888,34 @@ ICAPXaction_Exit(); } -// HTTP side is aborting -void ICAPModXact::noteSourceAbort(MsgPipe *p) +// body producer aborted +void ICAPModXact::noteBodyProducerAborted(BodyPipe &) +{ + ICAPXaction_Enter(noteBodyProducerAborted); + + mustStop("virgin HTTP body producer aborted"); + + ICAPXaction_Exit(); +} + +// initiator aborted +void ICAPModXact::noteInitiatorAborted() { - ICAPXaction_Enter(noteSourceAbort); + ICAPXaction_Enter(noteInitiatorAborted); - Must(!state.doneReceiving); - stopReceiving(); - mustStop("HTTP source quit"); + if (initiator) { + cbdataReferenceDone(initiator); + mustStop("initiator aborted"); + } ICAPXaction_Exit(); } -// HTTP side wants more adapted data and possibly freed some buffer space -void ICAPModXact::noteSinkNeed(MsgPipe *p) +// adapted body consumer wants more adapted data and +// possibly freed some buffer space +void ICAPModXact::noteMoreBodySpaceAvailable(BodyPipe &) { - ICAPXaction_Enter(noteSinkNeed); + ICAPXaction_Enter(noteMoreBodySpaceAvailable); if (state.sending == State::sendingVirgin) echoMore(); @@ -903,21 +927,26 @@ ICAPXaction_Exit(); } -// HTTP side aborted -void ICAPModXact::noteSinkAbort(MsgPipe *p) +// adapted body consumer aborted +void ICAPModXact::noteBodyConsumerAborted(BodyPipe &) { - ICAPXaction_Enter(noteSinkAbort); + ICAPXaction_Enter(noteBodyConsumerAborted); - mustStop("HTTP sink quit"); + mustStop("adapted body consumer aborted"); ICAPXaction_Exit(); } // internal cleanup -void ICAPModXact::doStop() +void ICAPModXact::swanSong() { - debugs(93, 5, HERE << "doStop() called"); - ICAPXaction::doStop(); + debugs(93, 5, HERE << "swan sings" << status()); + + if (initiator) { +debugs(93, 2, HERE << "swan sings for " << stopReason << status()); + AsyncCall(93,5, initiator, ICAPInitiator::noteIcapHeadersAborted); + cbdataReferenceDone(initiator); + } stopWriting(false); stopBackup(); @@ -929,23 +958,7 @@ stopSending(false); - // see stopReceiving() for reasons it cannot NULLify virgin there - - if (virgin != NULL) { - if (!state.doneReceiving) - virgin->sendSinkAbort(); - else - virgin->sink = NULL; - - virgin = NULL; // refcounted - } - - if (self != NULL) { - Pointer s = self; - self = NULL; - ICAPNoteXactionDone(s); - /* this object may be destroyed when 's' is cleared */ - } + ICAPXaction::swanSong(); } void ICAPModXact::makeRequestHeaders(MemBuf &buf) @@ -970,11 +983,11 @@ // build HTTP request header, if any ICAP::Method m = s.method; - const HttpRequest *request = virgin->data->cause ? - virgin->data->cause : - dynamic_cast(virgin->data->header); + const HttpRequest *request = virgin.cause ? + virgin.cause : + dynamic_cast(virgin.header); - // to simplify, we could we assume that request is always available + // to simplify, we could assume that request is always available String urlPath; if (request) { @@ -983,11 +996,11 @@ encapsulateHead(buf, "req-hdr", httpBuf, request); else if (ICAP::methodReqmod == m) - encapsulateHead(buf, "req-hdr", httpBuf, virgin->data->header); + encapsulateHead(buf, "req-hdr", httpBuf, virgin.header); } if (ICAP::methodRespmod == m) - if (const MsgPipeData::Header *prime = virgin->data->header) + if (const HttpMsg *prime = virgin.header) encapsulateHead(buf, "res-hdr", httpBuf, prime); if (!virginBody.expected()) @@ -1001,13 +1014,17 @@ if (shouldPreview(urlPath)) { buf.Printf("Preview: %d\r\n", (int)preview.ad()); - virginSendClaim.protectUpTo(preview.ad()); + if (virginBody.expected()) // there is a body to preview + virginSendClaim.protectUpTo(preview.ad()); + else + finishNullOrEmptyBodyPreview(httpBuf); } if (shouldAllow204()) { buf.Printf("Allow: 204\r\n"); // be robust: do not rely on the expected body size - virginSendClaim.protectAll(); + if (virginBody.expected()) // there is a body to protect + virginSendClaim.protectAll(); } if (TheICAPConfig.send_client_ip && request) @@ -1059,13 +1076,13 @@ // decides whether to offer a preview and calculates its size bool ICAPModXact::shouldPreview(const String &urlPath) { - size_t wantedSize; - if (!TheICAPConfig.preview_enable) { debugs(93, 5, HERE << "preview disabled by squid.conf"); return false; } + size_t wantedSize; + if (!service().wantsPreview(urlPath, wantedSize)) { debugs(93, 5, "ICAPModXact should not offer preview for " << urlPath); return false; @@ -1077,7 +1094,7 @@ size_t ad = XMIN(wantedSize, TheBackupLimit); if (!virginBody.expected()) - ad = 0; // nothing to preview but headers + ad = 0; else if (virginBody.knownSize()) ad = XMIN(ad, virginBody.size()); // not more than we have @@ -1086,8 +1103,9 @@ "(service wanted " << wantedSize << ")"); preview.enable(ad); + Must(preview.enabled()); - return preview.enabled(); + return true; } // decides whether to allow 204 responses @@ -1109,6 +1127,26 @@ return virginBody.size() < TheBackupLimit; } +// Normally, the body-writing code handles preview body. It can deal with +// bodies of unexpected size, including those that turn out to be empty. +// However, that code assumes that the body was expected and body control +// structures were initialized. This is not the case when there is no body +// or the body is known to be empty, because the virgin message will lack a +// body_pipe. So we handle preview of null-body and zero-size bodies here. +void ICAPModXact::finishNullOrEmptyBodyPreview(MemBuf &buf) +{ + Must(!virginWriteClaim.active()); // one reason we handle it here + Must(!virgin.body_pipe); // another reason we handle it here + Must(!preview.ad()); + + // do not add last-chunk because our Encapsulated header says null-body + // addLastRequestChunk(buf); + preview.wrote(0, true); + + Must(preview.done()); + Must(!preview.ieof()); +} + void ICAPModXact::fillPendingStatus(MemBuf &buf) const { ICAPXaction::fillPendingStatus(buf); @@ -1116,10 +1154,10 @@ if (state.serviceWaiting) buf.append("U", 1); - if (!state.doneReceiving) + if (virgin.body_pipe != NULL) buf.append("R", 1); - if (!doneReading()) + if (connection > 0 && !doneReading()) buf.append("r", 1); if (!state.doneWriting() && state.writing != State::writingInit) @@ -1144,7 +1182,7 @@ { ICAPXaction::fillDoneStatus(buf); - if (state.doneReceiving) + if (!virgin.body_pipe) buf.append("R", 1); if (state.doneWriting()) @@ -1173,33 +1211,61 @@ // calculate whether there is a virgin HTTP body and // whether its expected size is known +// TODO: rename because we do not just estimate void ICAPModXact::estimateVirginBody() { - // note: defaults should be fine but will disable previews and 204s + // note: lack of size info may disable previews and 204s - Must(virgin != NULL && virgin->data->header); + HttpMsg *msg = virgin.header; + Must(msg); method_t method; - if (virgin->data->cause) - method = virgin->data->cause->method; + if (virgin.cause) + method = virgin.cause->method; else - if (HttpRequest *req = dynamic_cast(virgin->data-> - header)) - method = req->method; - else - return; + if (HttpRequest *req = dynamic_cast(msg)) + method = req->method; + else + method = METHOD_NONE; ssize_t size; - if (virgin->data->header->expectingBody(method, size)) { - virginBody.expect(size) - ; - debugs(93, 6, "ICAPModXact expects virgin body; size: " << size); + // expectingBody returns true for zero-sized bodies, but we will not + // get a pipe for that body, so we treat the message as bodyless + if (method != METHOD_NONE && msg->expectingBody(method, size) && size) { + debugs(93, 6, "ICAPModXact expects virgin body from " << + virgin.body_pipe << "; size: " << size); + + virginBody.expect(size); + + if (size < 0) // unknown size + virginWriteClaim.protectAll(); + else + virginWriteClaim.protectUpTo(size); + + // sign up as a body consumer + Must(msg->body_pipe != NULL); + Must(msg->body_pipe == virgin.body_pipe); + virgin.body_pipe->setConsumer(this); + + // make sure TheBackupLimit is in-sync with the buffer size + Must(TheBackupLimit <= static_cast(msg->body_pipe->buf().max_capacity)); } else { debugs(93, 6, "ICAPModXact does not expect virgin body"); + Must(msg->body_pipe == NULL); + checkConsuming(); } } +void ICAPModXact::makeAdaptedBodyPipe(const char *what) { + Must(!adapted.body_pipe); + Must(!adapted.header->body_pipe); + adapted.header->body_pipe = new BodyPipe(this); + adapted.body_pipe = adapted.header->body_pipe; + debugs(93, 7, HERE << "will supply " << what << " via " << + adapted.body_pipe << " pipe"); +} + // TODO: Move SizedEstimate, MemBufBackup, and ICAPPreview elsewhere @@ -1336,16 +1402,10 @@ bool ICAPModXact::fillVirginHttpHeader(MemBuf &mb) const { - if (virgin == NULL) - return false; - - if (virgin->data == NULL) - return false; - - if (virgin->data->header == NULL) + if (virgin.header == NULL) return false; - virgin->data->header->firstLineBuf(mb); + virgin.header->firstLineBuf(mb); return true; } Index: squid3/src/ICAP/ICAPModXact.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/ICAP/ICAPModXact.h,v retrieving revision 1.1.2.8 retrieving revision 1.1.2.9 diff -u -r1.1.2.8 -r1.1.2.9 --- squid3/src/ICAP/ICAPModXact.h 14 Dec 2006 05:17:53 -0000 1.1.2.8 +++ squid3/src/ICAP/ICAPModXact.h 14 Feb 2007 06:10:22 -0000 1.1.2.9 @@ -1,6 +1,6 @@ /* - * $Id: ICAPModXact.h,v 1.1.2.8 2006/12/14 05:17:53 rousskov Exp $ + * $Id: ICAPModXact.h,v 1.1.2.9 2007/02/14 06:10:22 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -35,15 +35,19 @@ #define SQUID_ICAPMODXACT_H #include "ICAPXaction.h" -#include "MsgPipe.h" -#include "MsgPipeSource.h" -#include "MsgPipeSink.h" - -/* ICAPModXact implements ICAP REQMOD and RESPMOD transaction using ICAPXaction - * as the base. It implements message pipe sink and source interfaces for - * communication with various HTTP "anchors" and "hooks". ICAPModXact receives - * virgin HTTP messages, communicates with the ICAP server, and sends the - * adapted messages back. ICAPClient is the "owner" of the ICAPModXact. */ +#include "ICAPInOut.h" +#include "BodyPipe.h" + +/* + * ICAPModXact implements ICAP REQMOD and RESPMOD transaction using + * ICAPXaction as the base. The ICAPModXact receives a virgin HTTP message + * from an ICAP vecoring point, (a.k.a., initiator), communicates with the + * ICAP server, and sends the adapted HTTP message headers back. + * Virgin/adapted HTTP message body is reveived/sent using BodyPipe + * interface. The initiator (or its associate) is expected to send and/or + * receive the HTTP body. + */ + class ChunkedCodingParser; @@ -118,27 +122,29 @@ enum State { stDisabled, stWriting, stIeof, stDone } theState; }; -class ICAPModXact: public ICAPXaction, public MsgPipeSource, public MsgPipeSink +class ICAPInitiator; + +class ICAPModXact: public ICAPXaction, public BodyProducer, public BodyConsumer { public: typedef RefCount Pointer; public: - ICAPModXact(); - - // called by ICAPClient - void init(ICAPServiceRep::Pointer&, MsgPipe::Pointer &aVirgin, MsgPipe::Pointer &anAdapted, Pointer &aSelf); + ICAPModXact(ICAPInitiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, ICAPServiceRep::Pointer &s); - // pipe source methods; called by Anchor while receiving the adapted msg - virtual void noteSinkNeed(MsgPipe *p); - virtual void noteSinkAbort(MsgPipe *p); - - // pipe sink methods; called by ICAP while sending the virgin message - virtual void noteSourceStart(MsgPipe *p); - virtual void noteSourceProgress(MsgPipe *p); - virtual void noteSourceFinish(MsgPipe *p); - virtual void noteSourceAbort(MsgPipe *p); + // communication with the initiator + void noteInitiatorAborted(); + AsyncCallWrapper(93,3, ICAPModXact, noteInitiatorAborted) + + // BodyProducer methods + virtual void noteMoreBodySpaceAvailable(BodyPipe &); + virtual void noteBodyConsumerAborted(BodyPipe &); + + // BodyConsumer methods + virtual void noteMoreBodyDataAvailable(BodyPipe &); + virtual void noteBodyProductionEnded(BodyPipe &); + virtual void noteBodyProducerAborted(BodyPipe &); // comm handlers virtual void handleCommConnected(); @@ -150,8 +156,15 @@ // service waiting void noteServiceReady(); +public: + ICAPInOut virgin; + ICAPInOut adapted; + private: + virtual void start(); + void estimateVirginBody(); + void makeAdaptedBodyPipe(const char *what); void waitForService(); @@ -160,7 +173,7 @@ void startWriting(); void writeMore(); - void writePriviewBody(); + void writePreviewBody(); void writePrimeBody(); void writeSomeBody(const char *label, size_t size); @@ -171,6 +184,8 @@ size_t claimSize(const MemBufClaim &claim) const; const char *claimContent(const MemBufClaim &claim) const; + bool doneWithClaim(const MemBufClaim &claim) const; + void makeRequestHeaders(MemBuf &buf); void makeUsernameHeader(const HttpRequest *request, MemBuf &buf); void moveRequestChunk(MemBuf &buf, size_t chunkSize); @@ -178,6 +193,7 @@ void openChunk(MemBuf &buf, size_t chunkSize, bool ieof); void closeChunk(MemBuf &buf); void virginConsume(); + void finishNullOrEmptyBodyPreview(MemBuf &buf); bool shouldPreview(const String &urlPath); bool shouldAllow204(); @@ -191,8 +207,8 @@ void parseHttpHead(); bool parseHead(HttpMsg *head); + void decideOnParsingBody(); void parseBody(); - bool parsePresentBody(); void maybeAllocateHttpMsg(); void handle100Continue(); @@ -204,8 +220,8 @@ void echoMore(); virtual bool doneAll() const; + virtual void swanSong(); - virtual void doStop(); void stopReceiving(); void stopSending(bool nicely); void stopWriting(bool nicely); @@ -220,17 +236,16 @@ void packHead(MemBuf &httpBuf, const HttpMsg *head); void encapsulateHead(MemBuf &icapBuf, const char *section, MemBuf &httpBuf, const HttpMsg *head); bool gotEncapsulated(const char *section) const; + void checkConsuming(); - Pointer self; - MsgPipe::Pointer virgin; - MsgPipe::Pointer adapted; + ICAPInitiator *initiator; HttpReply *icapReply; SizedEstimate virginBody; MemBufClaim virginWriteClaim; // preserve virgin data buffer for writing - MemBufClaim virginSendClaim; // ... for sending (previe and 204s) - size_t virginConsumed; // virgin data consumed so far + MemBufClaim virginSendClaim; // ... for sending (preview and 204s) + size_t virginConsumed; // virgin data consumed so far ICAPPreview preview; // use for creating (writing) the preview ChunkedCodingParser *bodyParser; // ICAP response body parser @@ -243,15 +258,15 @@ public: - unsigned serviceWaiting: - 1; // waiting for the ICAPServiceRep preparing the ICAP service - - unsigned doneReceiving: - 1; // expect no new virgin info (from the virgin pipe) + bool serviceWaiting; // waiting for ICAP service options // will not write anything [else] to the ICAP server connection bool doneWriting() const { return writing == writingReallyDone; } + // will not use virgin.body_pipe + bool doneConsumingVirgin() const { return writing >= writingAlmostDone + && (sending == sendingAdapted || sending == sendingDone); } + // parsed entire ICAP response from the ICAP server bool doneParsing() const { return parsing == psDone; }