--------------------- PatchSet 1954 Date: 2005/10/17 22:58:29 Author: rousskov Branch: squid3-icap Tag: (none) Log: - Big change: Split ICAPXaction into ICAPXaction, ICAPModXact, and ICAPOptXact to avoid code duplication. ICAPModXact implements ICAP REQMOD and RESPMOD transactions, using ICAPXaction as a base. ICAPModXact uses MsgPipes to communicate with the "HTTP side". ICAPOptXact implements ICAP OPTIONS transaction, using ICAPXaction as a base. ICAPModXact is using a simple callback to communicate with the caller. ICAPXaction contains code needed/shared by *MOD and OPTIONS transactions. For example, it implements communication with the ICAP service (via the comm module). It also contains async call wrapper macros and related per-call check/termination logic. ICAPXaction is not a CBDATA_CLASS but assumes its kids are. The primary drawback of the split is the increased difficulty of finding the async call wrappers and handlers. As a rule of thumb, comm-handling wrappers are in ICAPXaction. Everything else is in the derived classes. ICAPXaction knows the name of the derived class and tries to use that in most debugging messages. It is not yet clear whether that leads to more confusion because some kid::method names in the debug messages are inherited and do not really exist in the kid. ICAPModXact now makes sure (via ICAPServiceRep) that ICAP service details (a.k.a., options) are known and fresh before sending an ICAP *MOD request. RFC 3507 requires that. The original ICAPOptXact code was unused/unusable and got deleted in the refactoring process. Members: src/ICAPModXact.cc:1.1->1.1.2.1 src/ICAPModXact.h:1.1->1.1.2.1 src/ICAPOptXact.cc:1.1.2.4->1.1.2.5 src/ICAPOptXact.h:1.1.2.2->1.1.2.3 src/ICAPXaction.cc:1.1.2.50->1.1.2.51 src/ICAPXaction.h:1.1.2.26->1.1.2.27 --- /dev/null Wed Feb 14 13:33:00 2007 +++ squid3/src/ICAPModXact.cc Wed Feb 14 13:35:22 2007 @@ -0,0 +1,1201 @@ +/* + * DEBUG: section 93 ICAP (RFC 3507) Client + */ + +#include "squid.h" +#include "comm.h" +#include "MsgPipe.h" +#include "MsgPipeData.h" +#include "HttpRequest.h" +#include "HttpReply.h" +#include "ICAPServiceRep.h" +#include "ICAPModXact.h" +#include "ICAPClient.h" +#include "ChunkedCodingParser.h" +#include "TextException.h" + +// flow and terminology: +// HTTP| --> receive --> encode --> write --> |network +// end | <-- send <-- parse <-- read <-- |end + +// TODO: doneSending()/doneReceving() data members should probably be in sync +// with this->adapted/virgin pointers. Make adapted/virgin methods? + +// TODO: replace gotEncapsulated() with something faster; we call it often + +CBDATA_CLASS_INIT(ICAPModXact); + +static const size_t TheBackupLimit = ICAP::MsgPipeBufSizeMax; + + +ICAPModXact::State::State() +{ + memset(this, sizeof(*this), 0); +} + +ICAPModXact::ICAPModXact(): ICAPXaction("ICAPModXact"), + self(NULL), virgin(NULL), adapted(NULL), + icapReply(NULL), virginConsumed(0), + bodyParser(NULL) +{} + +void ICAPModXact::init(ICAPServiceRep::Pointer &aService, MsgPipe::Pointer &aVirgin, MsgPipe::Pointer &anAdapted, Pointer &aSelf) +{ + assert(!self.getRaw() && !virgin.getRaw() && !adapted.getRaw()); + assert(aSelf.getRaw() && aVirgin.getRaw() && anAdapted.getRaw()); + + self = aSelf; + service(aService); + + virgin = aVirgin; + adapted = anAdapted; + + // receiving end + virgin->sink = this; // should be 'self' and refcounted + // virgin pipe data is initiated by the source + + // sending end + adapted->source = this; // should be 'self' and refcounted + adapted->data = new MsgPipeData; + + adapted->data->body = new MemBuf; // XXX: make body a non-pointer? + adapted->data->body->init(ICAP::MsgPipeBufSizeMin, ICAP::MsgPipeBufSizeMax); + // headers are initialized when we parse them + + // writing and reading ends are handled by ICAPXaction + + // encoding + // nothing to do because we are using temporary buffers + + // parsing + icapReply = httpReplyCreate(); + icapReply->protoPrefix = "ICAP/"; // TODO: make an IcapReply class? + + // XXX: make sure stop() cleans all buffers +} + +// HTTP side starts sending virgin data +void ICAPModXact::noteSourceStart(MsgPipe *p) +{ + ICAPXaction_Enter(noteSourceStart); + + // make sure TheBackupLimit is in-sync with the buffer size + Must(TheBackupLimit <= static_cast(virgin->data->body->max_capacity)); + + estimateVirginBody(); // before virgin disappears! + + // it is an ICAP violation to send request to a service w/o known OPTIONS + + if (service().up()) + startWriting(); + else + waitForService(); + + // XXX: but this has to be here to catch other errors. Thus, if + // commConnectStart in startWriting fails, we may get here + //_after_ the object got destroyed. Somebody please fix commConnectStart! + ICAPXaction_Exit(); +} + +static +void ICAPModXact_noteServiceReady(void *data, ICAPServiceRep::Pointer &) +{ + ICAPModXact *x = static_cast(data); + assert(x); + x->noteServiceReady(); +} + +void ICAPModXact::waitForService() +{ + Must(!state.serviceWaiting); + debugs(93, 7, "ICAPModXact will wait for the ICAP service " << status()); + state.serviceWaiting = true; + service().callWhenReady(&ICAPModXact_noteServiceReady, this); +} + +void ICAPModXact::noteServiceReady() +{ + ICAPXaction_Enter(noteServiceReady); + + Must(state.serviceWaiting); + state.serviceWaiting = false; + startWriting(); // will throw if service is not up + + ICAPXaction_Exit(); +} + +void ICAPModXact::startWriting() +{ + Must(service().up()); + + state.writing = State::writingConnect; + openConnection(); + // put nothing here as openConnection calls commConnectStart + // and that may call us back without waiting for the next select loop +} + +// connection with the ICAP service established +void ICAPModXact::handleCommConnected() +{ + Must(state.writing == State::writingConnect); + + startReading(); // wait for early errors from the ICAP server + + MemBuf requestBuf; + requestBuf.init(); + + makeRequestHeaders(requestBuf); + debugs(93, 9, "ICAPModXact ICAP request prefix " << status() << ":\n" << + (requestBuf.terminate(), requestBuf.content())); + + // write headers + state.writing = State::writingHeaders; + scheduleWrite(requestBuf); +} + +void ICAPModXact::handleCommWrote(size_t) +{ + if (state.writing == State::writingHeaders) + handleCommWroteHeaders(); + else + handleCommWroteBody(); +} + +void ICAPModXact::handleCommWroteHeaders() +{ + Must(state.writing == State::writingHeaders); + + if (virginBody.expected()) { + state.writing = preview.enabled() ? + State::writingPreview : State::writingPrime; + virginWriteClaim.protectAll(); + writeMore(); + } else { + stopWriting(); + } +} + +void ICAPModXact::writeMore() +{ + if (writer) // already writing something + return; + + switch (state.writing) { + + case State::writingInit: // waiting for service OPTIONS + Must(state.serviceWaiting); + + case State::writingConnect: // waiting for the connection to establish + + case State::writingHeaders: // waiting for the headers to be written + + case State::writingPaused: // waiting for the ICAP server response + + case State::writingDone: // nothing more to write + return; + + case State::writingPreview: + writePriviewBody(); + return; + + case State::writingPrime: + writePrimeBody(); + return; + + default: + throw TexcHere("ICAPModXact in bad writing state"); + } +} + +void ICAPModXact::writePriviewBody() +{ + debugs(93, 8, "ICAPModXact will write Preview body " << status()); + Must(state.writing == State::writingPreview); + + MsgPipeData::Body *body = virgin->data->body; + const size_t size = XMIN(preview.debt(), (size_t)body->contentSize()); + writeSomeBody("preview body", size); + + // change state once preview is written + + if (preview.done()) { + debugs(93, 7, "ICAPModXact wrote entire Preview body " << status()); + + if (preview.ieof()) + stopWriting(); + else + state.writing = State::writingPaused; + } +} + +void ICAPModXact::writePrimeBody() +{ + Must(state.writing == State::writingPrime); + Must(virginWriteClaim.active()); + + MsgPipeData::Body *body = virgin->data->body; + const size_t size = body->contentSize(); + writeSomeBody("prime virgin body", size); + + if (state.doneReceiving) + stopWriting(); +} + +void ICAPModXact::writeSomeBody(const char *label, size_t size) +{ + Must(!writer && !state.doneWriting()); + debugs(93, 8, "ICAPModXact will write up to " << size << " bytes of " << + label); + + MemBuf writeBuf; // TODO: suggest a min size based on size and lastChunk + + writeBuf.init(); // note: we assume that last-chunk will fit + + const size_t writeableSize = claimSize(virginWriteClaim); + const size_t chunkSize = XMIN(writeableSize, size); + + if (chunkSize) { + debugs(93, 7, "ICAPModXact will write " << chunkSize << + "-byte chunk of " << label); + } else { + debugs(93, 7, "ICAPModXact has no writeable " << label << " content"); + } + + moveRequestChunk(writeBuf, chunkSize); + + const bool lastChunk = + (state.writing == State::writingPreview && preview.done()) || + (state.doneReceiving && claimSize(virginWriteClaim) <= 0); + + if (lastChunk && virginBody.expected()) { + debugs(93, 8, "ICAPModXact will write last-chunk of " << label); + addLastRequestChunk(writeBuf); + } + + debugs(93, 7, "ICAPModXact will write " << writeBuf.contentSize() + << " raw bytes of " << label); + + if (writeBuf.hasContent()) { + scheduleWrite(writeBuf); // comm will free the chunk + } else { + writeBuf.clean(); + } +} + +void ICAPModXact::moveRequestChunk(MemBuf &buf, size_t chunkSize) +{ + if (chunkSize > 0) { + openChunk(buf, chunkSize); + buf.append(claimContent(virginWriteClaim), chunkSize); + closeChunk(buf, false); + + virginWriteClaim.release(chunkSize); + virginConsume(); + } + + if (state.writing == State::writingPreview) + preview.wrote(chunkSize, state.doneReceiving); // even if wrote nothing +} + +void ICAPModXact::addLastRequestChunk(MemBuf &buf) +{ + openChunk(buf, 0); + closeChunk(buf, state.writing == State::writingPreview && preview.ieof()); +} + +void ICAPModXact::openChunk(MemBuf &buf, size_t chunkSize) +{ + buf.Printf("%x\r\n", chunkSize); +} + +void ICAPModXact::closeChunk(MemBuf &buf, bool ieof) +{ + if (ieof) + buf.append("; ieof", 6); + + buf.append(ICAP::crlf, 2); // chunk-terminating CRLF +} + +size_t ICAPModXact::claimSize(const MemBufClaim &claim) const +{ + Must(claim.active()); + const size_t start = claim.offset(); + const size_t end = virginConsumed + virgin->data->body->contentSize(); + Must(virginConsumed <= start && start <= end); + return end - start; +} + +const char *ICAPModXact::claimContent(const MemBufClaim &claim) const +{ + Must(claim.active()); + const size_t start = claim.offset(); + Must(virginConsumed <= start); + return virgin->data->body->content() + (start - virginConsumed); +} + +void ICAPModXact::virginConsume() +{ + MemBuf &buf = *virgin->data->body; + const size_t have = static_cast(buf.contentSize()); + const size_t end = virginConsumed + have; + size_t offset = end; + + if (virginWriteClaim.active()) + offset = XMIN(virginWriteClaim.offset(), offset); + + if (virginSendClaim.active()) + offset = XMIN(virginSendClaim.offset(), offset); + + Must(virginConsumed <= offset && offset <= end); + + if (const size_t size = offset - virginConsumed) { + debugs(93, 8, "ICAPModXact consumes " << size << " out of " << have << + " virgin body bytes"); + buf.consume(size); + virginConsumed += size; + + if (!state.doneReceiving) + virgin->sendSinkNeed(); + } +} + +void ICAPModXact::handleCommWroteBody() +{ + writeMore(); +} + +void ICAPModXact::stopWriting() +{ + if (state.writing == State::writingDone) + return; + + debugs(93, 7, "ICAPModXact will no longer write " << status()); + + state.writing = State::writingDone; + + virginWriteClaim.disable(); + + virginConsume(); + + // Comm does not have an interface to clear the writer, but + // writeMore() will not write if our write callback is called + // when state.writing == State::writingDone; +} + +void ICAPModXact::stopBackup() +{ + if (!virginSendClaim.active()) + return; + + debugs(93, 7, "ICAPModXact will no longer backup " << status()); + + virginSendClaim.disable(); + + virginConsume(); +} + +bool ICAPModXact::doneAll() const +{ + return ICAPXaction::doneAll() && !state.serviceWaiting && + state.doneReceiving && doneSending() && + doneReading() && state.doneWriting(); +} + +void ICAPModXact::startReading() +{ + Must(connection >= 0); + Must(!reader); + Must(adapted.getRaw()); + Must(adapted->data); + Must(adapted->data->body); + + // we use the same buffer for headers and body and then consume headers + readMore(); +} + +void ICAPModXact::readMore() +{ + if (reader || doneReading()) + return; + + // do not fill readBuf if we have no space to store the result + if (!adapted->data->body->hasPotentialSpace()) + return; + + if (readBuf.hasSpace()) + scheduleRead(); +} + +// comm module read a portion of the ICAP response for us +void ICAPModXact::handleCommRead(size_t) +{ + Must(!state.doneParsing()); + parseMore(); + readMore(); +} + +void ICAPModXact::echoMore() +{ + Must(state.sending == State::sendingVirgin); + Must(virginSendClaim.active()); + + MemBuf &from = *virgin->data->body; + MemBuf &to = *adapted->data->body; + + const size_t sizeMax = claimSize(virginSendClaim); + const size_t size = XMIN(static_cast(to.potentialSpaceSize()), + sizeMax); + debugs(93, 5, "ICAPModXact echos " << size << " out of " << sizeMax << + " bytes"); + + if (size > 0) { + to.append(claimContent(virginSendClaim), size); + virginSendClaim.release(size); + virginConsume(); + adapted->sendSourceProgress(); + } + + if (!from.hasContent() && state.doneReceiving) { + debugs(93, 5, "ICAPModXact echoed all " << status()); + stopSending(true); + } else { + debugs(93, 5, "ICAPModXact has " << from.contentSize() << " bytes " << + "and expects more to echo " << status()); + virgin->sendSinkNeed(); // TODO: timeout if sink is broken + } +} + +bool ICAPModXact::doneSending() const +{ + Must((state.sending == State::sendingDone) == (!adapted)); + return state.sending == State::sendingDone; +} + +void ICAPModXact::stopSending(bool nicely) +{ + if (doneSending()) + return; + + if (state.sending != State::sendingUndecided) { + debugs(93, 7, "ICAPModXact will no longer send " << status()); + + if (nicely) + adapted->sendSourceFinish(); + else + adapted->sendSourceAbort(); + } else { + debugs(93, 7, "ICAPModXact will not start sending " << status()); + adapted->sendSourceAbort(); // or the sink may wait forever + } + + state.sending = State::sendingDone; + + /* + * Note on adapted->data->header: we created the header, but allow the + * other side (ICAPAnchor) to take control of it. We won't touch it here + * and instead rely on the Anchor-side to make sure it is properly freed. + */ + adapted = NULL; // refcounted +} + +void ICAPModXact::stopReceiving() +{ + // stopSending NULLifies adapted but we do not NULLify virgin. + // This is assymetric because we want to keep virgin->data even + // though we are not expecting any more virgin->data->body. + // TODO: can we cache just the needed headers info instead? + + // If they closed first, there is not point (or means) to notify them. + + if (state.doneReceiving) + return; + + // There is no sendSinkFinished() to notify the other side. + debugs(93, 7, "ICAPModXact will not receive " << status()); + + state.doneReceiving = true; +} + +void ICAPModXact::parseMore() +{ + debugs(93, 5, "have " << readBuf.contentSize() << " bytes to parse" << + status()); + + if (state.parsingHeaders()) + parseHeaders(); + + if (state.parsing == State::psBody) + parseBody(); +} + +// note that allocation for echoing is done in handle204NoContent() +void ICAPModXact::maybeAllocateHttpMsg() +{ + if (adapted->data->header) // already allocated + return; + + if (gotEncapsulated("res-hdr")) { + adapted->data->header = httpReplyCreate(); + } else if (gotEncapsulated("req-hdr")) { + adapted->data->header = new HttpRequest; + } else + throw TexcHere("Neither res-hdr nor req-hdr in maybeAllocateHttpMsg()"); +} + +void ICAPModXact::parseHeaders() +{ + Must(state.parsingHeaders()); + + if (state.parsing == State::psIcapHeader) + parseIcapHead(); + + if (state.parsing == State::psHttpHeader) + parseHttpHead(); + + if (state.parsingHeaders()) { // need more data + Must(mayReadMore()); + return; + } + + adapted->sendSourceStart(); + + if (state.sending == State::sendingVirgin) + echoMore(); +} + +void ICAPModXact::parseIcapHead() +{ + Must(state.sending == State::sendingUndecided); + + if (!parseHead(icapReply)) + return; + + switch (icapReply->sline.status) { + + case 100: + handle100Continue(); + break; + + case 200: + handle200Ok(); + break; + + case 204: + handle204NoContent(); + break; + + default: + handleUnknownScode(); + break; + } + + // handle100Continue() manages state.writing on its own. + // Non-100 status means the server needs no postPreview data from us. + if (state.writing == State::writingPaused) + stopWriting(); + + // TODO: Consider applying a Squid 2.5 patch to recognize 201 responses +} + +void ICAPModXact::handle100Continue() +{ + Must(state.writing == State::writingPaused); + Must(preview.enabled() && preview.done() && !preview.ieof()); + Must(virginSendClaim.active()); + + if (virginSendClaim.limited()) // preview only + stopBackup(); + + state.parsing = State::psHttpHeader; // eventually + + state.writing = State::writingPrime; + + writeMore(); +} + +void ICAPModXact::handle200Ok() +{ + state.parsing = State::psHttpHeader; + state.sending = State::sendingAdapted; + stopBackup(); +} + +void ICAPModXact::handle204NoContent() +{ + stopParsing(); + Must(virginSendClaim.active()); + virginSendClaim.protectAll(); // extends protection if needed + state.sending = State::sendingVirgin; + + // We want to clone the HTTP message, but we do not want + // to copy non-HTTP state parts that HttpMsg kids carry in them. + // Thus, we cannot use a smart pointer, copy constructor, or equivalent. + // Instead, we simply write the HTTP message and "clone" it by parsing. + + HttpMsg *oldHead = virgin->data->header; + debugs(93, 7, "ICAPModXact cloning virgin message " << oldHead); + + MemBuf httpBuf; + + // write the virgin message into a memory buffer + httpBuf.init(); + packHead(httpBuf, oldHead); + + // allocate the adapted message + HttpMsg *&newHead = adapted->data->header; + Must(!newHead); + + if (dynamic_cast(oldHead)) + newHead = new HttpRequest; + else + if (dynamic_cast(oldHead)) + newHead = httpReplyCreate(); + + Must(newHead); + + // parse the buffer back + http_status error = HTTP_STATUS_NONE; + + Must(newHead->parse(&httpBuf, true, &error)); + + Must(newHead->hdr_sz == httpBuf.contentSize()); // no leftovers + + httpBuf.clean(); + + debugs(93, 7, "ICAPModXact cloned virgin message " << oldHead << " to " << newHead); +} + +void ICAPModXact::handleUnknownScode() +{ + stopParsing(); + stopBackup(); + // TODO: mark connection as "bad" + + // Terminate the transaction; we do not know how to handle this response. + throw TexcHere("Unsupported ICAP status code"); +} + +void ICAPModXact::parseHttpHead() +{ + if (gotEncapsulated("res-hdr") || gotEncapsulated("req-hdr")) { + maybeAllocateHttpMsg(); + + if (!parseHead(adapted->data->header)) + return; + } + + state.parsing = State::psBody; +} + +bool ICAPModXact::parseHead(HttpMsg *head) +{ + assert(head); + debugs(93, 5, "have " << readBuf.contentSize() << " head bytes to parse" << + "; state: " << state.parsing); + + http_status error = HTTP_STATUS_NONE; + const bool parsed = head->parse(&readBuf, commEof, &error); + Must(parsed || !error); // success or need more data + + if (!parsed) { // need more data + head->reset(); + return false; + } + + readBuf.consume(head->hdr_sz); + return true; +} + +void ICAPModXact::parseBody() +{ + Must(state.parsing == State::psBody); + + debugs(93, 5, "have " << readBuf.contentSize() << " body bytes to parse"); + + if (gotEncapsulated("res-body")) { + if (!parsePresentBody()) // need more body data + return; + } else { + debugs(93, 5, "not expecting a body"); + } + + stopParsing(); + stopSending(true); +} + +// returns true iff complete body was parsed +bool ICAPModXact::parsePresentBody() +{ + if (!bodyParser) + bodyParser = new ChunkedCodingParser; + + // the parser will throw on errors + const bool parsed = bodyParser->parse(&readBuf, adapted->data->body); + + adapted->sendSourceProgress(); // TODO: do not send if parsed nothing + + debugs(93, 5, "have " << readBuf.contentSize() << " body bytes after " << + "parse; parsed all: " << parsed); + + if (parsed) + return true; + + if (bodyParser->needsMoreData()) + Must(mayReadMore()); + + if (bodyParser->needsMoreSpace()) { + Must(!doneSending()); // can hope for more space + Must(adapted->data->body->hasContent()); // paranoid + // TODO: there should be a timeout in case the sink is broken. + } + + return false; +} + +void ICAPModXact::stopParsing() +{ + if (state.parsing == State::psDone) + return; + + debugs(93, 7, "ICAPModXact will no longer parse " << status()); + + delete bodyParser; + + bodyParser = NULL; + + state.parsing = State::psDone; +} + +// HTTP side added virgin body data +void ICAPModXact::noteSourceProgress(MsgPipe *p) +{ + ICAPXaction_Enter(noteSourceProgress); + + Must(!state.doneReceiving); + writeMore(); + + if (state.sending == State::sendingVirgin) + echoMore(); + + ICAPXaction_Exit(); +} + +// HTTP side sent us all virgin info +void ICAPModXact::noteSourceFinish(MsgPipe *p) +{ + ICAPXaction_Enter(noteSourceFinish); + + Must(!state.doneReceiving); + stopReceiving(); + + // push writer and sender in case we were waiting for the last-chunk + writeMore(); + + if (state.sending == State::sendingVirgin) + echoMore(); + + ICAPXaction_Exit(); +} + +// HTTP side is aborting +void ICAPModXact::noteSourceAbort(MsgPipe *p) +{ + ICAPXaction_Enter(noteSourceAbort); + + Must(!state.doneReceiving); + stopReceiving(); + mustStop("HTTP source quit"); + + ICAPXaction_Exit(); +} + +// HTTP side wants more adapted data and possibly freed some buffer space +void ICAPModXact::noteSinkNeed(MsgPipe *p) +{ + ICAPXaction_Enter(noteSinkNeed); + + if (state.sending == State::sendingVirgin) + echoMore(); + else + if (state.sending == State::sendingAdapted) + parseMore(); + else + Must(state.sending == State::sendingUndecided); + + ICAPXaction_Exit(); +} + +// HTTP side aborted +void ICAPModXact::noteSinkAbort(MsgPipe *p) +{ + ICAPXaction_Enter(noteSinkAbort); + + mustStop("HTTP sink quit"); + + ICAPXaction_Exit(); +} + +// internal cleanup +void ICAPModXact::doStop() +{ + ICAPXaction::doStop(); + + stopWriting(); + stopBackup(); + + if (icapReply) { + httpReplyDestroy(icapReply); + icapReply = NULL; + } + + stopSending(false); + + // see stopReceiving() for reasons it cannot NULLify virgin there + + if (virgin != NULL) { + if (!state.doneReceiving) + virgin->sendSinkAbort(); + else + virgin->sink = NULL; + + virgin = NULL; // refcounted + } + + if (self != NULL) { + Pointer s = self; + self = NULL; + ICAPNoteXactionDone(s); + /* this object may be destroyed when 's' is cleared */ + } +} + +void ICAPModXact::makeRequestHeaders(MemBuf &buf) +{ + const ICAPServiceRep &s = service(); + buf.Printf("%s %s ICAP/1.0\r\n", s.methodStr(), s.uri.buf()); + buf.Printf("Host: %s:%d\r\n", s.host.buf(), s.port); + buf.Printf("Encapsulated: "); + + MemBuf httpBuf; + httpBuf.init(); + + // build HTTP request header, if any + ICAP::Method m = s.method; + + if (ICAP::methodRespmod == m && virgin->data->cause) + encapsulateHead(buf, "req-hdr", httpBuf, virgin->data->cause); + else if (ICAP::methodReqmod == m) + encapsulateHead(buf, "req-hdr", httpBuf, virgin->data->header); + + if (ICAP::methodRespmod == m) + if (const MsgPipeData::Header *prime = virgin->data->header) + encapsulateHead(buf, "res-hdr", httpBuf, prime); + + if (!virginBody.expected()) + buf.Printf("null-body=%d", httpBuf.contentSize()); + else if (ICAP::methodReqmod == m) + buf.Printf("req-body=%d", httpBuf.contentSize()); + else + buf.Printf("res-body=%d", httpBuf.contentSize()); + + buf.append(ICAP::crlf, 2); // terminate Encapsulated line + + if (shouldPreview()) { + buf.Printf("Preview: %d\r\n", (int)preview.ad()); + virginSendClaim.protectUpTo(preview.ad()); + } + + if (shouldAllow204()) { + buf.Printf("Allow: 204\r\n"); + // be robust: do not rely on the expected body size + virginSendClaim.protectAll(); + } + + buf.append(ICAP::crlf, 2); // terminate ICAP header + + // start ICAP request body with encapsulated HTTP headers + buf.append(httpBuf.content(), httpBuf.contentSize()); + + httpBuf.clean(); +} + +void ICAPModXact::encapsulateHead(MemBuf &icapBuf, const char *section, MemBuf &httpBuf, const HttpMsg *head) +{ + // update ICAP header + icapBuf.Printf("%s=%d,", section, httpBuf.contentSize()); + + // pack HTTP head + packHead(httpBuf, head); +} + +void ICAPModXact::packHead(MemBuf &httpBuf, const HttpMsg *head) +{ + Packer p; + packerToMemInit(&p, &httpBuf); + head->packInto(&p, true); + packerClean(&p); +} + +// decides whether to offer a preview and calculates its size +bool ICAPModXact::shouldPreview() +{ + size_t wantedSize; + + if (!service().wantsPreview(wantedSize)) { + debugs(93, 5, "ICAPModXact should not offer preview"); + return false; + } + + Must(wantedSize >= 0); + + // cannot preview more than we can backup + size_t ad = XMIN(wantedSize, TheBackupLimit); + + if (virginBody.expected() && virginBody.knownSize()) + ad = XMIN(ad, virginBody.size()); // not more than we have + else + ad = 0; // questionable optimization? + + debugs(93, 5, "ICAPModXact should offer " << ad << "-byte preview " << + "(service wanted " << wantedSize << ")"); + + preview.enable(ad); + + return preview.enabled(); +} + +// decides whether to allow 204 responses +bool ICAPModXact::shouldAllow204() +{ + if (!service().allows204()) + return false; + + if (!virginBody.expected()) + return true; // no body means no problems with supporting 204s. + + // if there is a body, make sure we can backup it all + + if (!virginBody.knownSize()) + return false; + + // or should we have a different backup limit? + // note that '<' allows for 0-termination of the "full" backup buffer + return virginBody.size() < TheBackupLimit; +} + +// returns a temporary string depicting transaction status, for debugging +void ICAPModXact::fillPendingStatus(MemBuf &buf) const +{ + if (state.serviceWaiting) + buf.append("U", 1); + + if (!state.doneWriting() && state.writing != State::writingInit) + buf.Printf("w(%d)", state.writing); + + if (preview.enabled()) { + if (!preview.done()) + buf.Printf("P(%d)", preview.debt()); + } + + if (virginSendClaim.active()) + buf.append("B", 1); + + if (!state.doneParsing() && state.parsing != State::psIcapHeader) + buf.Printf("p(%d)", state.parsing); + + if (!doneSending() && state.sending != State::sendingUndecided) + buf.Printf("S(%d)", state.sending); +} + +void ICAPModXact::fillDoneStatus(MemBuf &buf) const +{ + if (state.doneReceiving) + buf.append("R", 1); + + if (state.doneWriting()) + buf.append("w", 1); + + if (preview.enabled()) { + if (preview.done()) + buf.Printf("P%s", preview.ieof() ? "(ieof)" : ""); + } + + if (doneReading()) + buf.append("r", 1); + + if (state.doneParsing()) + buf.append("p", 1); + + if (doneSending()) + buf.append("S", 1); +} + +bool ICAPModXact::gotEncapsulated(const char *section) const +{ + return httpHeaderGetByNameListMember(&icapReply->header, "Encapsulated", + section, ',').size() > 0; +} + +// calculate whether there is a virgin HTTP body and +// whether its expected size is known +void ICAPModXact::estimateVirginBody() +{ + // note: defaults should be fine but will disable previews and 204s + + Must(virgin != NULL && virgin->data->header); + + method_t method; + + if (virgin->data->cause) + method = virgin->data->cause->method; + else + if (HttpRequest *req= dynamic_cast(virgin->data-> + header)) + method = req->method; + else + return; + + ssize_t size; + if (virgin->data->header->expectingBody(method, size)) { + virginBody.expect(size) + ; + debugs(93, 6, "ICAPModXact expects virgin body; size: " << size); + } else { + debugs(93, 6, "ICAPModXact does not expect virgin body"); + } +} + + +// TODO: Move SizedEstimate, MemBufBackup, and ICAPPreview elsewhere + +SizedEstimate::SizedEstimate() + : theData(dtUnexpected) +{} + +void SizedEstimate::expect(ssize_t aSize) +{ + theData = (aSize >= 0) ? aSize : (ssize_t)dtUnknown; +} + +bool SizedEstimate::expected() const +{ + return theData != dtUnexpected; +} + +bool SizedEstimate::knownSize() const +{ + Must(expected()); + return theData != dtUnknown; +} + +size_t SizedEstimate::size() const +{ + Must(knownSize()); + return static_cast(theData); +} + + + +MemBufClaim::MemBufClaim(): theStart(-1), theGoal(-1) +{} + +void MemBufClaim::protectAll() +{ + if (theStart < 0) + theStart = 0; + + theGoal = -1; // no specific goal +} + +void MemBufClaim::protectUpTo(size_t aGoal) +{ + if (theStart < 0) + theStart = 0; + + Must(aGoal >= 0); + + theGoal = (theGoal < 0) ? static_cast(aGoal) : + XMIN(static_cast(aGoal), theGoal); +} + +void MemBufClaim::disable() +{ + theStart = -1; +} + +void MemBufClaim::release(size_t size) +{ + Must(active()); + Must(size >= 0); + theStart += static_cast(size); + + if (limited() && theStart >= theGoal) + disable(); +} + +size_t MemBufClaim::offset() const +{ + Must(active()); + return static_cast(theStart); +} + +bool MemBufClaim::limited() const +{ + Must(active()); + return theGoal >= 0; +} + + +ICAPPreview::ICAPPreview(): theWritten(0), theAd(0), theState(stDisabled) +{} + +void ICAPPreview::enable(size_t anAd) +{ + // TODO: check for anAd not exceeding preview size limit + Must(anAd >= 0); + Must(!enabled()); + theAd = anAd; + theState = stWriting; +} + +bool ICAPPreview::enabled() const +{ + return theState != stDisabled; +} + +size_t ICAPPreview::ad() const +{ + Must(enabled()); + return theAd; +} + +bool ICAPPreview::done() const +{ + Must(enabled()); + return theState >= stIeof; +} + +bool ICAPPreview::ieof() const +{ + Must(enabled()); + return theState == stIeof; +} + +size_t ICAPPreview::debt() const +{ + Must(enabled()); + return done() ? 0 : (theAd - theWritten); +} + +void ICAPPreview::wrote(size_t size, bool sawEof) +{ + Must(enabled()); + theWritten += size; + + if (theWritten >= theAd) + theState = stDone; // sawEof is irrelevant + else + if (sawEof) + theState = stIeof; +} + --- /dev/null Wed Feb 14 13:33:00 2007 +++ squid3/src/ICAPModXact.h Wed Feb 14 13:35:22 2007 @@ -0,0 +1,279 @@ + +/* + * $Id: ICAPModXact.h,v 1.1.2.1 2005/10/17 22:58:29 rousskov Exp $ + * + * + * SQUID Web Proxy Cache http://www.squid-cache.org/ + * ---------------------------------------------------------- + * + * Squid is the result of efforts by numerous individuals from + * the Internet community; see the CONTRIBUTORS file for full + * details. Many organizations have provided support for Squid's + * development; see the SPONSORS file for full details. Squid is + * Copyrighted (C) 2001 by the Regents of the University of + * California; see the COPYRIGHT file for full details. Squid + * incorporates software developed and/or copyrighted by other + * sinks; see the CREDITS file for full details. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. + * + */ + +#ifndef SQUID_ICAPMODXACT_H +#define SQUID_ICAPMODXACT_H + +#include "ICAPXaction.h" +#include "MsgPipe.h" +#include "MsgPipeSource.h" +#include "MsgPipeSink.h" + +/* ICAPModXact implements ICAP REQMOD and RESPMOD transaction using ICAPXaction + * as the base. It implements message pipe sink and source interfaces for + * communication with various HTTP "anchors" and "hooks". ICAPModXact receives + * virgin HTTP messages, communicates with the ICAP server, and sends the + * adapted messages back. ICAPClient is the "owner" of the ICAPModXact. */ + +class ChunkedCodingParser; + +// estimated future presence and size of something (e.g., HTTP body) + +class SizedEstimate +{ + +public: + SizedEstimate(); // not expected by default + void expect(ssize_t aSize); // expect with any, even unknown size + bool expected() const; + + /* other members can be accessed iff expected() */ + + bool knownSize() const; + size_t size() const; // can be accessed iff knownSize() + +private: + enum { dtUnexpected = -2, dtUnknown = -1 }; + ssize_t theData; // combines expectation and size info to save RAM +}; + +// Protects buffer area. If area size is unknown, protects buffer suffix. +// Only "released" data can be consumed by the caller. Used to maintain +// write, preview, and 204 promises for ICAPModXact virgin->data-body buffer. + +class MemBufClaim +{ + +public: + MemBufClaim(); + + void protectAll(); + void protectUpTo(size_t aGoal); + void disable(); + bool active() const { return theStart >= 0; } + + // methods below require active() + + void release(size_t size); // stop protecting size more bytes + size_t offset() const; // protected area start + bool limited() const; // protects up to a known size goal + +private: + ssize_t theStart; // left area border + ssize_t theGoal; // "end" maximum, if any +}; + +// maintains preview-related sizes + +class ICAPPreview +{ + +public: + ICAPPreview(); // disabled + void enable(size_t anAd); // enabled with advertised size + bool enabled() const; + + /* other members can be accessed iff enabled() */ + + size_t ad() const; // advertised preview size + size_t debt() const; // remains to write + bool done() const; // wrote everything + bool ieof() const; // premature EOF + + void wrote(size_t size, bool sawEof); + +private: + size_t theWritten; + size_t theAd; + enum State { stDisabled, stWriting, stIeof, stDone } theState; +}; + +class ICAPModXact: public ICAPXaction, public MsgPipeSource, public MsgPipeSink +{ + +public: + typedef RefCount Pointer; + +public: + ICAPModXact(); + + // called by ICAPClient + void init(ICAPServiceRep::Pointer&, MsgPipe::Pointer &aVirgin, MsgPipe::Pointer &anAdapted, Pointer &aSelf); + + // pipe source methods; called by Anchor while receiving the adapted msg + virtual void noteSinkNeed(MsgPipe *p); + virtual void noteSinkAbort(MsgPipe *p); + + // pipe sink methods; called by ICAP while sending the virgin message + virtual void noteSourceStart(MsgPipe *p); + virtual void noteSourceProgress(MsgPipe *p); + virtual void noteSourceFinish(MsgPipe *p); + virtual void noteSourceAbort(MsgPipe *p); + + // comm handlers + virtual void handleCommConnected(); + virtual void handleCommWrote(size_t size); + virtual void handleCommRead(size_t size); + void handleCommWroteHeaders(); + void handleCommWroteBody(); + + // service waiting + void noteServiceReady(); + +private: + void estimateVirginBody(); + + void waitForService(); + + // will not send anything [else] on the adapted pipe + bool doneSending() const; + + void startWriting(); + void writeMore(); + void writePriviewBody(); + void writePrimeBody(); + void writeSomeBody(const char *label, size_t size); + + void startReading(); + void readMore(); + virtual bool doneReading() const { return commEof || state.doneParsing(); } + + size_t claimSize(const MemBufClaim &claim) const; + const char *claimContent(const MemBufClaim &claim) const; + void makeRequestHeaders(MemBuf &buf); + void moveRequestChunk(MemBuf &buf, size_t chunkSize); + void addLastRequestChunk(MemBuf &buf); + void openChunk(MemBuf &buf, size_t chunkSize); + void closeChunk(MemBuf &buf, bool ieof); + void virginConsume(); + + bool shouldPreview(); + bool shouldAllow204(); + void prepBackup(size_t expectedSize); + void backup(const MemBuf &buf); + + void parseMore(); + + void parseHeaders(); + void parseIcapHead(); + void parseHttpHead(); + bool parseHead(HttpMsg *head); + + void parseBody(); + bool parsePresentBody(); + void maybeAllocateHttpMsg(); + + void handle100Continue(); + void handle200Ok(); + void handle204NoContent(); + void handleUnknownScode(); + + void echoMore(); + + virtual bool doneAll() const; + + virtual void doStop(); + void stopReceiving(); + void stopSending(bool nicely); + void stopWriting(); + void stopParsing(); + void stopBackup(); + + virtual void fillPendingStatus(MemBuf &buf) const; + virtual void fillDoneStatus(MemBuf &buf) const; + +private: + void packHead(MemBuf &httpBuf, const HttpMsg *head); + void encapsulateHead(MemBuf &icapBuf, const char *section, MemBuf &httpBuf, const HttpMsg *head); + bool gotEncapsulated(const char *section) const; + + Pointer self; + MsgPipe::Pointer virgin; + MsgPipe::Pointer adapted; + + HttpReply *icapReply; + + SizedEstimate virginBody; + MemBufClaim virginWriteClaim; // preserve virgin data buffer for writing + MemBufClaim virginSendClaim; // ... for sending (previe and 204s) + size_t virginConsumed; // virgin data consumed so far + ICAPPreview preview; // use for creating (writing) the preview + + ChunkedCodingParser *bodyParser; // ICAP response body parser + + class State + { + + public: + State(); + + public: + + unsigned serviceWaiting: + 1; // waiting for the ICAPServiceRep preparing the ICAP service + + unsigned doneReceiving: + 1; // expect no new virgin info (from the virgin pipe) + + // will not write anything [else] to the ICAP server connection + bool doneWriting() const { return writing == writingDone; } + + // parsed entire ICAP response from the ICAP server + bool doneParsing() const { return parsing == psDone; } + + // is parsing ICAP or HTTP headers read from the ICAP server + bool parsingHeaders() const + { + return parsing == psIcapHeader || + parsing == psHttpHeader; + } + + enum Parsing { psIcapHeader, psHttpHeader, psBody, psDone } parsing; + + // measures ICAP request writing progress + enum Writing { writingInit, writingConnect, writingHeaders, + writingPreview, writingPaused, writingPrime, writingDone } writing; + + enum Sending { sendingUndecided, sendingVirgin, sendingAdapted, + sendingDone } sending; + } + + state; + + CBDATA_CLASS2(ICAPModXact); +}; + +// destroys the transaction; implemented in ICAPClient.cc (ick?) +extern void ICAPNoteXactionDone(ICAPModXact::Pointer x); + +#endif /* SQUID_ICAPMOD_XACT_H */ Index: squid3/src/ICAPOptXact.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/ICAPOptXact.cc,v retrieving revision 1.1.2.4 retrieving revision 1.1.2.5 diff -u -r1.1.2.4 -r1.1.2.5 --- squid3/src/ICAPOptXact.cc 1 Oct 2005 05:09:11 -0000 1.1.2.4 +++ squid3/src/ICAPOptXact.cc 17 Oct 2005 22:58:29 -0000 1.1.2.5 @@ -1,619 +1,108 @@ +/* + * DEBUG: section 93 ICAP (RFC 3507) Client + */ + #include "squid.h" #include "comm.h" -#include "client_side_request.h" -#include "MsgPipe.h" -#include "MsgPipeData.h" -#include "MsgPipeSource.h" -#include "MsgPipeSink.h" -#include "HttpRequest.h" +#include "HttpReply.h" -#include "ICAPServiceRep.h" #include "ICAPOptXact.h" -#include "ICAPClient.h" -#include "ChunkedCodingParser.h" +#include "ICAPOptions.h" #include "TextException.h" -#include "LeakFinder.h" - -extern LeakFinder *MsgPipeLeaker; - CBDATA_CLASS_INIT(ICAPOptXact); -static const char *const crlf = "\r\n"; - -static -ICAPOptXact &ICAPOptXact_fromData(void *data) -{ - ICAPOptXact *x = static_cast(data); - assert(x); - return *x; -} - -static -void ICAPOptXact_noteCommTimeout(int, void *data) -{ - ICAPOptXact_fromData(data).noteCommTimeout(); -} - -static -void ICAPOptXact_noteCommClose(int, void *data) -{ - ICAPOptXact_fromData(data).noteCommClose(); -} - -static -void ICAPOptXact_noteCommConnected(int, comm_err_t status, int xerrno, void *data) -{ - ICAPOptXact_fromData(data).noteCommConnected(status); -} - - - -static -void ICAPOptXact_noteCommWroteHeaders(int, char *, size_t size, comm_err_t status, void *data) -{ - ICAPOptXact_fromData(data).noteCommWroteHeaders(status); -} - +ICAPOptXact::ICAPOptXact(): ICAPXaction("ICAPOptXact"), options(NULL), + cb(NULL), cbData(NULL) -static -void ICAPOptXact_noteCommRead(int, char *, size_t size, comm_err_t status, int xerrno, void *data) { - ICAPOptXact_fromData(data).noteCommRead(status, size); -} - -#define ICAPOptXact_Enter(method) \ - const Notify defaultNotify = notifyAll; \ - try { \ - if (!callStart(#method)) \ - return; -#define ICAPOptXact_Exit(method) \ - } \ - catch (const TextException &e) { \ - callException(#method, e, defaultNotify); \ - } \ - callEnd(#method); - -ICAPOptXact::State::State() -{ - memset(this, sizeof(*this), 0); -} - -const char *ICAPOptXact::status() const -{ - static MemBuf status; - memBufReset(&status); - - status.append("[", 1); - - if (notify != notifyUnknown) - memBufPrintf(&status, "N(%d)", notify); - - if (state.doneReceiving) - status.append("R", 1); - - if (state.doneSending) - status.append("S", 1); - - if (state.doneReading) - status.append("r", 1); - - if (state.doneWriting) - status.append("w", 1); - - status.append("]", 1); - - status.terminate(); - - return status.content(); -} - - -ICAPOptXact::ICAPOptXact(): service(NULL), virgin(NULL), adapted(NULL) -{ - debug(93,0)("ICAPOptXact constructed, this=%p\n", this); + debug(93,9)("ICAPOptXact constructed, this=%p\n", this); } ICAPOptXact::~ICAPOptXact() { - stop(notifyNone); - // cbdataReferenceDone(service); - debug(93,0)("ICAPOptXact destructed, this=%p\n", this); - - if (virgin != NULL) - freeVirgin(); - - if (adapted != NULL) - freeAdapted(); + Must(!options); // the caller must set to NULL + debug(93,9)("ICAPOptXact destructed, this=%p\n", this); } -void ICAPOptXact::init(MsgPipe::Pointer aVirgin, MsgPipe::Pointer anAdapted) -{ - assert(aVirgin.getRaw() && anAdapted.getRaw()); - virgin = aVirgin; - adapted = anAdapted; - virgin->sink = this; - adapted->source = this; - adapted->data = new MsgPipeData; - adapted->data->body = new MemBuf; - memBufInit(adapted->data->body, ICAPMsgPipeBufSizeMin, ICAPMsgPipeBufSizeMax); - memBufDefInit(&readBuf); -}; - -void ICAPOptXact::startOptMod(ICAPServiceRep *anService) +void ICAPOptXact::start(ICAPServiceRep::Pointer &aService, Callback *aCb, void *aCbData) { - debug(93,0)("ICAPOptXact::startOptMod() called\n"); - service = cbdataReference(anService); - // service = anService; - - virgin = new MsgPipe("virgin"); - leakTouch(virgin.getRaw(), MsgPipeLeaker); - - virgin->source = this; - virgin->data = new MsgPipeData; - virgin->data->cause = NULL; - virgin->data->body = new MemBuf; - - memBufInit(virgin->data->body, ICAPMsgPipeBufSizeMin, ICAPMsgPipeBufSizeMax); - - adapted = new MsgPipe("adapted"); - leakTouch(adapted.getRaw(), MsgPipeLeaker); - adapted->sink = this; - - init(virgin, adapted); - - virgin->sendSourceStart(); - adapted->sendSinkNeed(); -} - -void ICAPOptXact::startOptMod() -{ - debug(93,0)("ICAPOptXact::startOptMod() called\n"); - // service = cbdataReference(anService); - service = new ICAPServiceRep(); - - virgin = new MsgPipe("virgin"); - leakTouch(virgin.getRaw(), MsgPipeLeaker); + service(aService); - virgin->source = this; - virgin->data = new MsgPipeData; - virgin->data->cause = NULL; - virgin->data->body = new MemBuf; - - memBufInit(virgin->data->body, ICAPMsgPipeBufSizeMin, ICAPMsgPipeBufSizeMax); - - adapted = new MsgPipe("adapted"); - leakTouch(adapted.getRaw(), MsgPipeLeaker); - adapted->sink = this; - - init(virgin, adapted); - - virgin->sendSourceStart(); - adapted->sendSinkNeed(); -} - - - -void ICAPOptXact::openConnection() -{ - connection = pconnPop(service->host, service->port, NULL); - - if (connection < 0) { - connection = comm_open(SOCK_STREAM, 0, getOutgoingAddr(NULL), 0, - COMM_NONBLOCKING, service->uri); - - if (connection < 0) - throw TexcHere("cannot connect to ICAP service " /* + uri */); - } - - commSetTimeout(connection, Config.Timeout.connect, - &ICAPOptXact_noteCommTimeout, this); - - state.closer = &ICAPOptXact_noteCommClose; - comm_add_close_handler(connection, state.closer, this); - commConnectStart(connection, service->host, service->port, - &ICAPOptXact_noteCommConnected, this); -} - -void ICAPOptXact::closeConnection() -{ - if (connection >= 0) { - commSetTimeout(connection, -1, NULL, NULL); - - if (state.closer) { - comm_remove_close_handler(connection, state.closer, this); - state.closer = NULL; - } - - stopReading(); - comm_close(connection); - connection = -1; - } -} - - -void ICAPOptXact::noteSourceStart(MsgPipe *p) -{ - ICAPOptXact_Enter(noteSourceStart); + Must(!cb && aCb && aCbData); + cb = aCb; + cbData = cbdataReference(aCbData); openConnection(); - - ICAPOptXact_Exit(noteSourceStart); -} - - - -void ICAPOptXact::sendNext() -{ - debug(93,0)("ICAPOptXact::sendNext() called\n"); - leakTouch(virgin.getRaw(), MsgPipeLeaker); - virgin->sendSourceProgress(); -} - -// ClientHttpRequest says we have the entire HTTP message -void ICAPOptXact::doneSending() -{ - debugs(93, 3, "ICAPOptXact::doneSending() called\n"); - leakTouch(virgin.getRaw(), MsgPipeLeaker); - -#if ICAP_CLIENTSIDEHOOK_LOOPBACK - - return; -#else - - virgin->sendSourceFinish(); - -#endif -} - -// ClientHttpRequest tells us to abort -void ICAPOptXact::ownerAbort() -{ - debug(93,0)("ICAPOptXact::ownerAbort() called\n"); - stop(notifyIcap); -} - -// ICAP client needs more virgin response data -void ICAPOptXact::noteSinkNeed(MsgPipe *p) -{ - debug(93,0)("ICAPOptXact::noteSinkNeed() called\n"); - - leakTouch(virgin.getRaw(), MsgPipeLeaker); - - // if (virgin->data->body->potentialSpaceSize()) - // chr->icapSpaceAvailable(); -} - -// ICAP client aborting -void ICAPOptXact::noteSinkAbort(MsgPipe *p) -{ - debug(93,0)("ICAPOptXact::noteSinkAbort() called\n"); - stop(notifyOwner); -} - -// ICAP client sends more data -void ICAPOptXact::noteSourceProgress(MsgPipe *p) -{ - debug(93,0)("ICAPOptXact::noteSourceProgress() called\n"); - leakTouch(p, MsgPipeLeaker); - // if (p->data->body->hasContent()) { - // chr->takeAdaptedBody(p->data->body); - // } -} - -// ICAP client is done sending adapted response -void ICAPOptXact::noteSourceFinish(MsgPipe *p) -{ - ICAPOptXact_Enter(noteSourceFinish); - - Must(!state.doneReceiving); - state.doneReceiving = true; - ICAPOptXact_Exit(noteSourceFinish); -} - -// ICAP client is aborting -void ICAPOptXact::noteSourceAbort(MsgPipe *p) -{ - debug(93,0)("ICAPOptXact::noteSourceAbort() called\n"); - leakTouch(p, MsgPipeLeaker); - stop(notifyOwner); -} - -// internal cleanup -void ICAPOptXact::stop(Notify notify) -{ - if (virgin != NULL) { - leakTouch(virgin.getRaw(), MsgPipeLeaker); - - if (notify == notifyIcap) - virgin->sendSourceAbort(); - else - virgin->source = NULL; - - freeVirgin(); - } - - if (adapted != NULL) { - leakTouch(adapted.getRaw(), MsgPipeLeaker); - - if (notify == notifyIcap) - adapted->sendSinkAbort(); - else - adapted->sink = NULL; - - freeAdapted(); - } - - /* - if (chr) { - if (notify == notifyOwner) - // tell ClientHttpRequest that we are aborting prematurely - chr->abortAdapting(); - cbdataReferenceDone(chr); - // chr is now NULL, will not call it any more - } - */ -} - - -bool ICAPOptXact::callStart(const char *method) -{ - debugs(93, 5, "ICAPOptXact::" << method << " called " << status()); - - if (state.inCall) { - // this may happen when we have bugs or when arguably buggy - // comm interface calls us while we are closing the connection - debugs(93, 5, "ICAPOptXact::" << method << " cancels reentry."); - return false; - } - - state.inCall = true; - return true; } -void ICAPOptXact::callException(const char *method, const TextException &e, Notify defaultWho) +void ICAPOptXact::handleCommConnected() { - debugs(93, 4, "ICAPOptXact::" << method << " caught an exception: " << - e.message << ' ' << status()); - - if (!done()) - mustStop(defaultWho); -} + scheduleRead(); -void ICAPOptXact::callEnd(const char *method) -{ - if (done()) { - debugs(93, 5, "ICAPOptXact::" << method << " ends xaction " << status()); - - service->icap_options = new ICAPOptions(); - - if (!service->icap_options->parseResponse()) { - debugs(93, 3, "ICAPOptXact::" << method << "error OPTIONS response parsing"); - } - - doStop(); - return; - } - - debugs(93, 6, "ICAPOptXact::" << method << " ended " << status()); - state.inCall = false; -} - - -void ICAPOptXact::freeVirgin() -{ - requestUnlink(virgin->data->cause); - virgin->data->cause = NULL; - virgin->data->header = NULL; - leakTouch(virgin.getRaw(), MsgPipeLeaker); - virgin = NULL; // refcounted -} - -void ICAPOptXact::freeAdapted() -{ - adapted->data->header = NULL; // we don't own it - leakTouch(adapted.getRaw(), MsgPipeLeaker); - adapted = NULL; // refcounted -} - -void ICAPOptXact::stopReading() -{ - if (state.isReading) { - // check callback presence because comm module removes - // fdc_table[].read.callback after the actual I/O but - // before we get the callback via a queued event. - // These checks try to mimic the comm_read_cancel() assertions. - - if (comm_has_pending_read(connection) && - !comm_has_pending_read_callback(connection)) - comm_read_cancel(connection, state.isReading, this); - - state.isReading = NULL; - } - - state.doneReading = true; -} - -bool ICAPOptXact::done() const -{ - - if (notify != notifyUnknown) // mustStop() has been called - return true; + MemBuf requestBuf; + requestBuf.init(); + makeRequest(requestBuf); + debugs(93, 9, "ICAPOptXact request " << status() << ":\n" << + (requestBuf.terminate(), requestBuf.content())); - return state.doneReceiving && state.doneSending && - state.doneReading && state.doneWriting; + scheduleWrite(requestBuf); } -void ICAPOptXact::mustStop(Notify who) -{ - Must(state.inCall); // otherwise nobody will call doStop() - Must(notify == notifyUnknown); - notify = who; - debugs(93, 5, "ICAPOptXact will stop and notify " << notify); -} - -// internal cleanup void ICAPOptXact::doStop() { - debugs(93, 5, "ICAPOptXacty::doStop"); - - memBufClean(&readBuf); - - closeConnection(); // TODO: pconn support: close unless notifyService ... - - if (virgin != NULL) { - if (notify == notifyAll) - virgin->sendSinkAbort(); - else - virgin->sink = NULL; - - virgin = NULL; // refcounted - - state.doneReceiving = true; - } + ICAPXaction::doStop(); - if (adapted != NULL) { - if (notify == notifyAll) - adapted->sendSourceAbort(); - else - adapted->source = NULL; - - adapted = NULL; // refcounted - - state.doneSending = true; + if (Callback *call = cb) { + cb = NULL; + void *data = NULL; + + if (cbdataReferenceValidDone(cbData, &data)) { + (*call)(this, data); // will delete us + return; + } } -} - -void ICAPOptXact::makeRequestHeaders(MemBuf *buf) -{ - memBufPrintf(buf, "OPTIONS %s ICAP/1.0\r\n", service->uri); - memBufPrintf(buf, "Host: %s:%d\r\n", service->host, service->port); - - buf->append(crlf, 2); -} - - - -// communication timeout with the ICAP service -void ICAPOptXact::noteCommTimeout() -{ - ICAPOptXact_Enter(noteCommTimeout); - - // mustStop(notifyHttp); - - ICAPOptXact_Exit(noteCommTimeout); -} - -// unexpected connection close while talking to the ICAP service -void ICAPOptXact::noteCommClose() -{ - state.closer = NULL; - ICAPOptXact_Enter(noteCommClose); - - // mustStop(notifyHttp); - - ICAPOptXact_Exit(noteCommClose); -} - -// connection with the ICAP service established -void ICAPOptXact::noteCommConnected(comm_err_t status) -{ - ICAPOptXact_Enter(noteCommConnected); - - Must(status == COMM_OK); - startReading(); - - MemBuf requestBuf; - memBufDefInit(&requestBuf); - makeRequestHeaders(&requestBuf); - - // write headers only; comm module will free the requestBuf - state.isWriting = &ICAPOptXact_noteCommWroteHeaders; - comm_old_write_mbuf(connection, &requestBuf, state.isWriting, this); - ICAPOptXact_Exit(noteCommConnected); -} - -void ICAPOptXact::startReading() -{ - Must(connection >= 0); - Must(!state.isReading); - Must(adapted.getRaw()); - Must(adapted->data); - Must(adapted->data->body); + // get rid of options if we did call the callback + delete options; - readMore(); + options = NULL; } -void ICAPOptXact::readMore() +void ICAPOptXact::makeRequest(MemBuf &buf) { - if (state.isReading || state.doneReading) - return; - - // do not fill readBuf if we have no space to store the result - if (!adapted->data->body->hasPotentialSpace()) - return; - - // we use the same buffer for headers and body and then consume headers - if (readBuf.hasSpace()) { - state.isReading = &ICAPOptXact_noteCommRead; - comm_read(connection, readBuf.space(), readBuf.spaceSize(), - state.isReading, this); - } + const ICAPServiceRep &s = service(); + buf.Printf("OPTIONS %s ICAP/1.0\r\n", s.uri.buf()); + buf.Printf("Host: %s:%d\r\n", s.host.buf(), s.port); + buf.append(ICAP::crlf, 2); } -void ICAPOptXact::noteCommWroteHeaders(comm_err_t status) +void ICAPOptXact::handleCommWrote(size_t size) { - ICAPOptXact_Enter(noteCommWroteHeaders); - - Must(state.isWriting); - state.isWriting = NULL; - state.doneWriting = true; - state.doneSending = true; - - ICAPOptXact_Exit(noteCommWroteHeaders); + debugs(93, 9, "ICAPOptXact finished writing " << size << + "-byte request " << status()); } // comm module read a portion of the ICAP response for us -void ICAPOptXact::noteCommRead(comm_err_t status, size_t sz) +void ICAPOptXact::handleCommRead(size_t) { - ICAPOptXact_Enter(noteCommRead); - - Must(state.isReading); - state.isReading = NULL; - - // Must(!state.doneParsing()); - Must(status == COMM_OK); - - if (sz == 0) - stopReading(); + if (parseResponse()) + Must(done()); // there should be nothing else to do else - if (sz > 0) - readBuf.appended(sz); - - stopReading(); - - state.parsing = State::psDone; - - state.doneSending = true; + scheduleRead(); +} +bool ICAPOptXact::parseResponse() +{ + HttpReply r; + r.protoPrefix = "ICAP/"; // TODO: make an IcapReply class? - readMore(); + if (!parseHttpMsg(r)) + return false; - ICAPOptXact_Exit(noteCommRead); -} + options = new ICAPOptions; -void ICAPOptXact::moveRequestChunk(MemBuf *buf) -{ - MsgPipeData::Body *body = virgin->data->body; - const mb_size_t chunkSize = body->contentSize(); // may be zero - memBufPrintf(buf, "%x\r\n", chunkSize); - - if (chunkSize > 0) { - buf->append(body->content(), chunkSize); - body->consume(chunkSize); - } + options->configure(r); - buf->append(crlf, 2); // chunk-terminating CRLF + return true; } - Index: squid3/src/ICAPOptXact.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/ICAPOptXact.h,v retrieving revision 1.1.2.2 retrieving revision 1.1.2.3 diff -u -r1.1.2.2 -r1.1.2.3 --- squid3/src/ICAPOptXact.h 21 Sep 2005 20:35:08 -0000 1.1.2.2 +++ squid3/src/ICAPOptXact.h 17 Oct 2005 22:58:29 -0000 1.1.2.3 @@ -1,5 +1,5 @@ /* - * $Id: ICAPOptXact.h,v 1.1.2.2 2005/09/21 20:35:08 dwsquid Exp $ + * $Id: ICAPOptXact.h,v 1.1.2.3 2005/10/17 22:58:29 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -33,132 +33,43 @@ #ifndef SQUID_ICAPOPTXACT_H #define SQUID_ICAPOPTXACT_H -#include "MsgPipe.h" -#include "MsgPipeSource.h" -#include "MsgPipeSink.h" +#include "ICAPXaction.h" -#include "ICAPServiceRep.h" +class ICAPOptions; -class TextException; +/* ICAPOptXact sends an ICAP OPTIONS request to the ICAP service, + * converts the response into ICAPOptions object, and notifies + * the caller via the callback. NULL options objects means the + * ICAP service could not be contacted or did not return any response */ -class ICAPOptXact: public MsgPipeSource, public MsgPipeSink +class ICAPOptXact: public ICAPXaction { public: - typedef RefCount Pointer; + typedef void Callback(ICAPOptXact*, void *data); - -public: ICAPOptXact(); - ~ICAPOptXact(); + virtual ~ICAPOptXact(); - void init(MsgPipe::Pointer, MsgPipe::Pointer); + void start(ICAPServiceRep::Pointer &aService, Callback *aCb, void *aCbData); - void startOptMod(ICAPServiceRep *); - void startOptMod(); - void sendNext(); - void doneSending(); - void ownerAbort(); - - // int potentialSpaceSize(); - - // pipe source methods; called by ICAP while receiving the virgin message - virtual void noteSinkNeed(MsgPipe *p); - virtual void noteSinkAbort(MsgPipe *p); - - // pipe sink methods; called by ICAP while sending the adapted message - virtual void noteSourceStart(MsgPipe *p); - virtual void noteSourceProgress(MsgPipe *p); - virtual void noteSourceFinish(MsgPipe *p); - virtual void noteSourceAbort(MsgPipe *p); - - void noteCommConnected(comm_err_t status); - void noteCommWroteHeaders(comm_err_t status); - void noteCommRead(comm_err_t status, size_t sz); + ICAPOptions *options; // result for the caller to take/handle - void noteCommTimeout(); - void noteCommClose(); +protected: + virtual void handleCommConnected(); + virtual void handleCommWrote(size_t size); + virtual void handleCommRead(size_t size); - void startReading(); - void readMore(); + void makeRequest(MemBuf &buf); + bool parseResponse(); -public: - ICAPServiceRep *service; - Pointer self; + void startReading(); - MsgPipe::Pointer virgin; - MsgPipe::Pointer adapted; + virtual void doStop(); private: - typedef enum { notifyUnknown, notifyNone, notifyOwner, notifyIcap, notifyAll } Notify; - int connection; // FD of the ICAP server connection - - MemBuf readBuf; - const char *status() const; - - void openConnection(); - void closeConnection(); - - void moveRequestChunk(MemBuf *buf); - - void makeRequestHeaders(MemBuf *buf); - - bool callStart(const char *method); - void callException(const char *method, const TextException &e, Notify who); - void callEnd(const char *method); - void stopReading(); - - void stop(Notify notify); - bool done() const; - void mustStop(Notify who); - void doStop(); - - void freeVirgin(); - void freeAdapted(); - - class State - { - - public: - State(); - - public: - // XXX: document each - - unsigned inCall: - 1; - - unsigned doneReceiving: - 1; - - unsigned doneSending: - 1; - - unsigned doneReading: - 1; - - unsigned doneWriting: - 1; - - bool doneParsing() const { return parsing == psDone; } - - bool parsingHeaders() const - { - return parsing == psIcapHeader || - parsing == psHttpHeader; - } - - CWCB *isWriting; - IOCB *isReading; - PF *closer; - - enum Parsing { psIcapHeader, psHttpHeader, psBody, psDone } parsing; - } - - state; - - Notify notify; - + Callback *cb; + void *cbData; CBDATA_CLASS2(ICAPOptXact); }; Index: squid3/src/ICAPXaction.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/ICAPXaction.cc,v retrieving revision 1.1.2.50 retrieving revision 1.1.2.51 diff -u -r1.1.2.50 -r1.1.2.51 --- squid3/src/ICAPXaction.cc 11 Oct 2005 22:28:12 -0000 1.1.2.50 +++ squid3/src/ICAPXaction.cc 17 Oct 2005 22:58:29 -0000 1.1.2.51 @@ -4,32 +4,16 @@ #include "squid.h" #include "comm.h" -#include "MsgPipe.h" -#include "MsgPipeData.h" -#include "HttpRequest.h" #include "HttpReply.h" -#include "ICAPServiceRep.h" #include "ICAPXaction.h" #include "ICAPClient.h" -#include "ChunkedCodingParser.h" #include "TextException.h" -// flow and terminology: -// HTTP| --> receive --> encode --> write --> |network -// end | <-- send <-- parse <-- read <-- |end - -// TODO: state.doneSending/Receving data members should probably be in sync -// with this->adapted/virgin pointers. Make doneSending/Receving methods? - -// TODO: replace gotEncapsulated() with something faster; we call it often - /* comm module handlers (wrappers around corresponding ICAPXaction methods */ -// TODO: Teach comm module to call object methods directly -CBDATA_CLASS_INIT(ICAPXaction); +// TODO: Teach comm module to call object methods directly -static const char *const crlf = "\r\n"; -static const size_t TheBackupLimit = ICAPMsgPipeBufSizeMax; +//CBDATA_CLASS_INIT(ICAPXaction); static ICAPXaction &ICAPXaction_fromData(void *data) @@ -40,15 +24,15 @@ } static -void ICAPXaction_noteCommTimeout(int, void *data) +void ICAPXaction_noteCommTimedout(int, void *data) { - ICAPXaction_fromData(data).noteCommTimeout(); + ICAPXaction_fromData(data).noteCommTimedout(); } static -void ICAPXaction_noteCommClose(int, void *data) +void ICAPXaction_noteCommClosed(int, void *data) { - ICAPXaction_fromData(data).noteCommClose(); + ICAPXaction_fromData(data).noteCommClosed(); } static @@ -58,15 +42,9 @@ } static -void ICAPXaction_noteCommWroteHeaders(int, char *, size_t size, comm_err_t status, void *data) -{ - ICAPXaction_fromData(data).noteCommWroteHeaders(status); -} - -static -void ICAPXaction_noteCommWroteBody(int, char *, size_t size, comm_err_t status, void *data) +void ICAPXaction_noteCommWrote(int, char *, size_t size, comm_err_t status, void *data) { - ICAPXaction_fromData(data).noteCommWroteBody(status, size); + ICAPXaction_fromData(data).noteCommWrote(status, size); } static @@ -75,130 +53,53 @@ ICAPXaction_fromData(data).noteCommRead(status, size); } -// call guards for all "asynchronous" note*() methods that are being called -// from message pipes or the comm module - -// asynchronous call entry: -// - remember default notify address (currently hard-coded to notifyAll); -// - open the try clause; -// - call callStart(). -#define ICAPXaction_Enter(method) \ - const Notify defaultNotify = notifyAll; \ - try { \ - if (!callStart(#method)) \ - return; -// ... -// asynchronous call exit: -// - close the try clause; -// - catch exceptions; -// - let callEnd() handle transaction termination conditions -#define ICAPXaction_Exit(method) \ - } \ - catch (const TextException &e) { \ - callException(#method, e, defaultNotify); \ - } \ - callEnd(#method); - -ICAPXaction::State::State() -{ - memset(this, sizeof(*this), 0); -} - -ICAPXaction::ICAPXaction(): self(NULL), service(NULL), virgin(NULL), adapted(NULL), - icapReply(NULL), connection(-1), - commBuf(NULL), virginConsumed(0), - reader(NULL), writer(NULL), closer(NULL), - bodyParser(NULL), - notify(notifyUnknown) -{} +ICAPXaction::ICAPXaction(const char *aTypeName): + connection(-1), + commBuf(NULL), commBufSize(0), + commEof(false), + connector(NULL), reader(NULL), writer(NULL), closer(NULL), + typeName(aTypeName), + theService(NULL), + inCall(NULL) +{ + readBuf.init(SQUID_TCP_SO_RCVBUF, SQUID_TCP_SO_RCVBUF); + commBuf = (char*)memAllocBuf(SQUID_TCP_SO_RCVBUF, &commBufSize); + // make sure maximum readBuf space does not exceed commBuf size + Must(static_cast(readBuf.potentialSpaceSize()) <= commBufSize); +} ICAPXaction::~ICAPXaction() { - notify = notifyNone; doStop(); -} - -void ICAPXaction::init(ICAPServiceRep::Pointer aService, MsgPipe::Pointer aVirgin, MsgPipe::Pointer anAdapted, Pointer &aSelf) -{ - assert(!self.getRaw() && !virgin.getRaw() && !adapted.getRaw()); - assert(aSelf.getRaw() && aVirgin.getRaw() && anAdapted.getRaw()); - - self = aSelf; - service = aService; - virgin = aVirgin; - adapted = anAdapted; - - // receiving end - virgin->sink = this; // should be 'self' and refcounted - // virgin pipe data is initiated by the source - - // sending end - adapted->source = this; // should be 'self' and refcounted - adapted->data = new MsgPipeData; - - adapted->data->body = new MemBuf; // XXX: make body a non-pointer? - adapted->data->body->init(ICAPMsgPipeBufSizeMin, ICAPMsgPipeBufSizeMax); - // headers are initialized when we parse them - - // writing end - // nothing to do because we are using temporary write buffers - - // reading end - readBuf.init(); - - // encoding - // nothing to do because we are using temporary buffers - - // parsing - icapReply = httpReplyCreate(); - icapReply->protoPrefix = "ICAP/"; // TODO: make an IcapReply class? - - // XXX: make sure stop() cleans all buffers -} - -// HTTP side starts sending virgin data -void ICAPXaction::noteSourceStart(MsgPipe *p) -{ - ICAPXaction_Enter(noteSourceStart); - - // make sure TheBackupLimit is in-sync with the buffer size - Must(TheBackupLimit <= static_cast(virgin->data->body->max_capacity)); - - estimateVirginBody(); - - openConnection(); - // put nothing here as openConnection calls commConnectStart - // and that may call us back without waiting for the next select loop - - // XXX: but this has to be here to catch other errors. Thus, if - // commConnectStart fails (see the comment above), we may get here - // _after_ the object got destroyed. Somebody please fix commConnectStart! - ICAPXaction_Exit(noteSourceStart); + readBuf.clean(); + memFreeBuf(commBufSize, commBuf); } // TODO: obey service-specific, OPTIONS-reported connection limit void ICAPXaction::openConnection() { + const ICAPServiceRep &s = service(); // TODO: check whether NULL domain is appropriate here - connection = pconnPop(service->host.buf(), service->port, NULL); + connection = pconnPop(s.host.buf(), s.port, NULL); if (connection < 0) { connection = comm_open(SOCK_STREAM, 0, getOutgoingAddr(NULL), 0, - COMM_NONBLOCKING, service->uri.buf()); + COMM_NONBLOCKING, s.uri.buf()); if (connection < 0) throw TexcHere("cannot connect to ICAP service " /* + uri */); } - debug(93,3)("ICAPXaction::openConnection() to %s %d\n", service->host.buf(), service->port); + debugs(93,3, typeName << " opens connection to " << s.host.buf() << ":" << s.port); commSetTimeout(connection, Config.Timeout.connect, - &ICAPXaction_noteCommTimeout, this); - closer = &ICAPXaction_noteCommClose; + &ICAPXaction_noteCommTimedout, this); + + closer = &ICAPXaction_noteCommClosed; comm_add_close_handler(connection, closer, this); - state.writing = State::writingConnect; - commConnectStart(connection, service->host.buf(), service->port, - &ICAPXaction_noteCommConnected, this); + + connector = &ICAPXaction_noteCommConnected; + commConnectStart(connection, s.host.buf(), s.port, connector, this); } void ICAPXaction::closeConnection() @@ -211,11 +112,11 @@ closer = NULL; } - stopReading(); - stopWriting(); + cancelRead(); comm_close(connection); + connector = NULL; connection = -1; } } @@ -225,336 +126,93 @@ { ICAPXaction_Enter(noteCommConnected); - Must(state.writing == State::writingConnect); + Must(connector); + connector = NULL; Must(commStatus == COMM_OK); - startReading(); // wait for early errors from the ICAP server - - MemBuf requestBuf; - requestBuf.init(); - makeRequestHeaders(requestBuf); - debugs(93, 9, "ICAPXaction ICAP request prefix " << status() << ":\n" << - (requestBuf.terminate(), requestBuf.content())); + handleCommConnected(); - // write headers only; comm module will free the requestBuf - state.writing = State::writingHeaders; - writer = &ICAPXaction_noteCommWroteHeaders; - comm_old_write_mbuf(connection, &requestBuf, writer, this); + ICAPXaction_Exit(); +} - ICAPXaction_Exit(noteCommConnected); +void ICAPXaction::scheduleWrite(MemBuf &buf) +{ + // comm module will free the buffer + writer = &ICAPXaction_noteCommWrote; + comm_old_write_mbuf(connection, &buf, writer, this); } -void ICAPXaction::noteCommWroteHeaders(comm_err_t commStatus) +void ICAPXaction::noteCommWrote(comm_err_t commStatus, size_t size) { - ICAPXaction_Enter(noteCommWroteHeaders); + ICAPXaction_Enter(noteCommWrote); Must(writer); writer = NULL; Must(commStatus == COMM_OK); - Must(state.writing == State::writingHeaders); - - if (virginBody.expected()) { - state.writing = preview.enabled() ? - State::writingPreview : State::writingPrime; - virginWriteClaim.protectAll(); - writeMore(); - } else { - stopWriting(); - } - - ICAPXaction_Exit(noteCommWroteHeaders); -} - -void ICAPXaction::writeMore() -{ - if (writer) // already writing something - return; - - switch (state.writing) { - - case State::writingConnect: // waiting for the connection to establish - - case State::writingHeaders: // waiting for the headers to be written - - case State::writingPaused: // waiting for the ICAP server response - - case State::writingDone: // nothing more to write - return; - - case State::writingPreview: - writePriviewBody(); - return; - - case State::writingPrime: - writePrimeBody(); - return; - - default: - throw TexcHere("ICAPXaction in bad writing state"); - } -} - -void ICAPXaction::writePriviewBody() -{ - debugs(93, 8, "ICAPXaction will write Preview body " << status()); - Must(state.writing == State::writingPreview); - - MsgPipeData::Body *body = virgin->data->body; - const size_t size = XMIN(preview.debt(), (size_t)body->contentSize()); - writeSomeBody("preview body", size); - - // change state once preview is written - - if (preview.done()) { - debugs(93, 7, "ICAPXaction wrote entire Preview body " << status()); - - if (preview.ieof()) - stopWriting(); - else - state.writing = State::writingPaused; - } -} - -void ICAPXaction::writePrimeBody() -{ - Must(state.writing == State::writingPrime); - Must(virginWriteClaim.active()); - - MsgPipeData::Body *body = virgin->data->body; - const size_t size = body->contentSize(); - writeSomeBody("prime virgin body", size); + handleCommWrote(size); - if (state.doneReceiving) - stopWriting(); + ICAPXaction_Exit(); } -void ICAPXaction::writeSomeBody(const char *label, size_t size) -{ - Must(!writer && !state.doneWriting()); - debugs(93, 8, "ICAPXaction will write up to " << size << " bytes of " << - label); - - MemBuf writeBuf; // TODO: suggest a min size based on size and lastChunk - - writeBuf.init(); // note: we assume that last-chunk will fit - - const size_t writeableSize = claimSize(virginWriteClaim); - const size_t chunkSize = XMIN(writeableSize, size); - - if (chunkSize) { - debugs(93, 7, "ICAPXaction will write " << chunkSize << - "-byte chunk of " << label); - } else { - debugs(93, 7, "ICAPXaction has no writeable " << label << " content"); - } - - moveRequestChunk(writeBuf, chunkSize); - - const bool lastChunk = - (state.writing == State::writingPreview && preview.done()) || - (state.doneReceiving && claimSize(virginWriteClaim) <= 0); - - if (lastChunk && virginBody.expected()) { - debugs(93, 8, "ICAPXaction will write last-chunk of " << label); - addLastRequestChunk(writeBuf); - } - - debugs(93, 7, "ICAPXaction will write " << writeBuf.contentSize() - << " raw bytes of " << label); - - if (writeBuf.hasContent()) { - // comm will free the chunk - writer = &ICAPXaction_noteCommWroteBody; - comm_old_write_mbuf(connection, &writeBuf, writer, this); - } else { - writeBuf.clean(); - } -} - -void ICAPXaction::moveRequestChunk(MemBuf &buf, size_t chunkSize) -{ - if (chunkSize > 0) { - openChunk(buf, chunkSize); - buf.append(claimContent(virginWriteClaim), chunkSize); - closeChunk(buf, false); - - virginWriteClaim.release(chunkSize); - virginConsume(); - } - - if (state.writing == State::writingPreview) - preview.wrote(chunkSize, state.doneReceiving); // even if wrote nothing -} - -void ICAPXaction::addLastRequestChunk(MemBuf &buf) -{ - openChunk(buf, 0); - closeChunk(buf, state.writing == State::writingPreview && preview.ieof()); -} - -void ICAPXaction::openChunk(MemBuf &buf, size_t chunkSize) -{ - buf.Printf("%x\r\n", chunkSize); -} - -void ICAPXaction::closeChunk(MemBuf &buf, bool ieof) -{ - if (ieof) - buf.append("; ieof", 6); - - buf.append(crlf, 2); // chunk-terminating CRLF -} - -size_t ICAPXaction::claimSize(const MemBufClaim &claim) const -{ - Must(claim.active()); - const size_t start = claim.offset(); - const size_t end = virginConsumed + virgin->data->body->contentSize(); - Must(virginConsumed <= start && start <= end); - return end - start; -} - -const char *ICAPXaction::claimContent(const MemBufClaim &claim) const -{ - Must(claim.active()); - const size_t start = claim.offset(); - Must(virginConsumed <= start); - return virgin->data->body->content() + (start - virginConsumed); -} - -void ICAPXaction::virginConsume() -{ - MemBuf &buf = *virgin->data->body; - const size_t have = static_cast(buf.contentSize()); - const size_t end = virginConsumed + have; - size_t offset = end; - - if (virginWriteClaim.active()) - offset = XMIN(virginWriteClaim.offset(), offset); - - if (virginSendClaim.active()) - offset = XMIN(virginSendClaim.offset(), offset); - - Must(virginConsumed <= offset && offset <= end); - - if (const size_t size = offset - virginConsumed) { - debugs(93, 8, "ICAPXaction consumes " << size << " out of " << have << - " virgin body bytes"); - buf.consume(size); - virginConsumed += size; - - if (!state.doneReceiving) - virgin->sendSinkNeed(); - } -} - -void ICAPXaction::noteCommWroteBody(comm_err_t commStatus, size_t sz) +// communication timeout with the ICAP service +void ICAPXaction::noteCommTimedout() { - ICAPXaction_Enter(noteCommWroteBody); + ICAPXaction_Enter(noteCommTimedout); - writer = NULL; - Must(commStatus == COMM_OK); - writeMore(); + handleCommTimedout(); - ICAPXaction_Exit(noteCommWroteBody); + ICAPXaction_Exit(); } -void ICAPXaction::stopWriting() +void ICAPXaction::handleCommTimedout() { - if (state.writing == State::writingDone) - return; - - debugs(93, 7, "ICAPXaction will no longer write " << status()); - - state.writing = State::writingDone; - - virginWriteClaim.disable(); - - virginConsume(); - - // Comm does not have an interface to clear the writer, but - // writeMore() will not write if our write callback is called - // when state.writing == State::writingDone; + mustStop("connection with ICAP service timed out"); } -void ICAPXaction::stopBackup() +// unexpected connection close while talking to the ICAP service +void ICAPXaction::noteCommClosed() { - if (!virginSendClaim.active()) - return; - - debugs(93, 7, "ICAPXaction will no longer backup " << status()); + closer = NULL; + ICAPXaction_Enter(noteCommClosed); - virginSendClaim.disable(); + handleCommClosed(); - virginConsume(); + ICAPXaction_Exit(); } -// communication timeout with the ICAP service -void ICAPXaction::noteCommTimeout() +void ICAPXaction::handleCommClosed() { - ICAPXaction_Enter(noteCommTimeout); - - mustStop(notifyHttp); - - ICAPXaction_Exit(noteCommTimeout); -} - -// unexpected connection close while talking to the ICAP service -void ICAPXaction::noteCommClose() -{ - closer = NULL; - ICAPXaction_Enter(noteCommClose); - - mustStop(notifyHttp); - - ICAPXaction_Exit(noteCommClose); + mustStop("ICAP service connection externally closed"); } bool ICAPXaction::done() const { - if (notify != notifyUnknown) // mustStop() has been called + if (stopReason != NULL) // mustStop() has been called return true; - return state.doneReceiving && state.doneSending() && - state.doneReading && state.doneWriting(); + return doneAll(); } -void ICAPXaction::startReading() +bool ICAPXaction::doneAll() const { - Must(connection >= 0); - Must(!reader); - Must(adapted.getRaw()); - Must(adapted->data); - Must(adapted->data->body); - - readMore(); + return !connector && !reader && !writer; } -void ICAPXaction::readMore() +void ICAPXaction::scheduleRead() { - if (reader || state.doneReading) - return; - - // do not fill readBuf if we have no space to store the result - if (!adapted->data->body->hasPotentialSpace()) - return; - - // we use the same buffer for headers and body and then consume headers - if (readBuf.hasSpace()) { - reader = &ICAPXaction_noteCommRead; - /* - * See comments in ICAPXaction.h about why we use commBuf - * here instead of reading directly into readBuf.buf. - */ + Must(connection >= 0); + Must(!reader); + Must(readBuf.hasSpace()); - if (NULL == commBuf) - commBuf = (char*)memAllocate(MEM_64K_BUF); + reader = &ICAPXaction_noteCommRead; + /* + * See comments in ICAPXaction.h about why we use commBuf + * here instead of reading directly into readBuf.buf. + */ - comm_read(connection, commBuf, readBuf.spaceSize(), - reader, this); - } + comm_read(connection, commBuf, readBuf.spaceSize(), reader, this); } // comm module read a portion of the ICAP response for us @@ -565,36 +223,28 @@ Must(reader); reader = NULL; - Must(!state.doneParsing()); Must(commStatus == COMM_OK); + Must(sz >= 0); debugs(93, 5, "read " << sz << " bytes"); - if (sz == 0) - stopReading(); - else - if (sz > 0) - readBuf.append(commBuf, sz); - /* * See comments in ICAPXaction.h about why we use commBuf * here instead of reading directly into readBuf.buf. */ - parseMore(); + if (sz > 0) + readBuf.append(commBuf, sz); + else + commEof = true; - readMore(); + handleCommRead(sz); - ICAPXaction_Exit(noteCommRead); + ICAPXaction_Exit(); } -void ICAPXaction::stopReading() +void ICAPXaction::cancelRead() { - if (state.doneReading) - return; - - debugs(93, 7, "ICAPXaction will no longer read " << status()); - if (reader) { // check callback presence because comm module removes // fdc_table[].read.callback after the actual I/O but @@ -607,589 +257,103 @@ reader = NULL; } - - if (commBuf) { - memFree(commBuf, MEM_64K_BUF); - commBuf = NULL; - } - - state.doneReading = true; } -void ICAPXaction::echoMore() +bool ICAPXaction::parseHttpMsg(HttpMsg &msg) { - Must(state.sending == State::sendingVirgin); - Must(virginSendClaim.active()); - - MemBuf &from = *virgin->data->body; - MemBuf &to = *adapted->data->body; + debugs(93, 5, "have " << readBuf.contentSize() << " head bytes to parse"); - const size_t sizeMax = claimSize(virginSendClaim); - const size_t size = XMIN(static_cast(to.potentialSpaceSize()), - sizeMax); - debugs(93, 5, "ICAPXaction echos " << size << " out of " << sizeMax << - " bytes"); - - if (size > 0) { - to.append(claimContent(virginSendClaim), size); - virginSendClaim.release(size); - virginConsume(); - adapted->sendSourceProgress(); - } - - if (!from.hasContent() && state.doneReceiving) { - debugs(93, 5, "ICAPXaction echoed all " << status()); - stopSending(); - } else { - debugs(93, 5, "ICAPXaction has " << from.contentSize() << " bytes " << - "and expects more to echo " << status()); - virgin->sendSinkNeed(); // TODO: timeout if sink is broken - } -} - -void ICAPXaction::stopSending() -{ - debugs(93, 7, "ICAPXaction will no longer send " << status()); - - if (state.sending != State::sendingUndecided) - adapted->sendSourceFinish(); - - state.sending = State::sendingDone; -} - -void ICAPXaction::parseMore() -{ - debugs(93, 5, "have " << readBuf.contentSize() << " bytes to parse" << - status()); - - if (state.parsingHeaders()) - parseHeaders(); - - if (state.parsing == State::psBody) - parseBody(); -} - -// note that allocation for echoing is done in handle204NoContent() -void ICAPXaction::maybeAllocateHttpMsg() -{ - if (adapted->data->header) // already allocated - return; - - if (gotEncapsulated("res-hdr")) { - adapted->data->header = httpReplyCreate(); - } else if (gotEncapsulated("req-hdr")) { - adapted->data->header = new HttpRequest; - } else - throw TexcHere("Neither res-hdr nor req-hdr in maybeAllocateHttpMsg()"); -} - -void ICAPXaction::parseHeaders() -{ - Must(state.parsingHeaders()); - - if (state.parsing == State::psIcapHeader) - parseIcapHead(); - - if (state.parsing == State::psHttpHeader) - parseHttpHead(); - - if (state.parsingHeaders()) { // need more data - Must(!state.doneReading); - return; - } - - adapted->sendSourceStart(); - - if (state.sending == State::sendingVirgin) - echoMore(); -} - -void ICAPXaction::parseIcapHead() -{ - Must(state.sending == State::sendingUndecided); - - if (!parseHead(icapReply)) - return; - - switch (icapReply->sline.status) { - - case 100: - handle100Continue(); - break; - - case 200: - handle200Ok(); - break; - - case 204: - handle204NoContent(); - break; - - default: - handleUnknownScode(); - break; - } - - // handle100Continue() manages state.writing on its own. - // Non-100 status means the server needs no postPreview data from us. - if (state.writing == State::writingPaused) - stopWriting(); - - // TODO: Consider applying a Squid 2.5 patch to recognize 201 responses -} - -void ICAPXaction::handle100Continue() -{ - Must(state.writing == State::writingPaused); - Must(preview.enabled() && preview.done() && !preview.ieof()); - Must(virginSendClaim.active()); - - if (virginSendClaim.limited()) // preview only - stopBackup(); - - state.parsing = State::psHttpHeader; // eventually - - state.writing = State::writingPrime; - - writeMore(); -} - -void ICAPXaction::handle200Ok() -{ - state.parsing = State::psHttpHeader; - state.sending = State::sendingAdapted; - stopBackup(); -} - -void ICAPXaction::handle204NoContent() -{ - stopParsing(); - Must(virginSendClaim.active()); - virginSendClaim.protectAll(); // extends protection if needed - state.sending = State::sendingVirgin; - - // We want to clone the HTTP message, but we do not want - // to copy non-HTTP state parts that HttpMsg kids carry in them. - // Thus, we cannot use a smart pointer, copy constructor, or equivalent. - // Instead, we simply write the HTTP message and "clone" it by parsing. - - HttpMsg *oldHead = virgin->data->header; - debugs(93, 7, "ICAPXaction cloning virgin message " << oldHead); - - MemBuf httpBuf; - - // write the virgin message into a memory buffer - httpBuf.init(); - packHead(httpBuf, oldHead); - - // allocate the adapted message - HttpMsg *&newHead = adapted->data->header; - Must(!newHead); - - if (dynamic_cast(oldHead)) - newHead = new HttpRequest; - else - if (dynamic_cast(oldHead)) - newHead = httpReplyCreate(); - - Must(newHead); - - // parse the buffer back http_status error = HTTP_STATUS_NONE; - - Must(newHead->parse(&httpBuf, true, &error)); - - Must(newHead->hdr_sz == httpBuf.contentSize()); // no leftovers - - httpBuf.clean(); - - debugs(93, 7, "ICAPXaction cloned virgin message " << oldHead << " to " << newHead); -} - -void ICAPXaction::handleUnknownScode() -{ - stopParsing(); - stopBackup(); - // TODO: mark connection as "bad" - - // Terminate the transaction; we do not know how to handle this response. - throw TexcHere("Unsupported ICAP status code"); -} - -void ICAPXaction::parseHttpHead() -{ - if (gotEncapsulated("res-hdr") || gotEncapsulated("req-hdr")) { - maybeAllocateHttpMsg(); - - if (!parseHead(adapted->data->header)) - return; - } - - state.parsing = State::psBody; -} - -bool ICAPXaction::parseHead(HttpMsg *head) -{ - assert(head); - debugs(93, 5, "have " << readBuf.contentSize() << " head bytes to parse" << - "; state: " << state.parsing); - - http_status error = HTTP_STATUS_NONE; - const bool parsed = head->parse(&readBuf, state.doneReading, &error); + const bool parsed = msg.parse(&readBuf, commEof, &error); Must(parsed || !error); // success or need more data if (!parsed) { // need more data - head->reset(); + Must(mayReadMore()); + msg.reset(); return false; } - readBuf.consume(head->hdr_sz); + readBuf.consume(msg.hdr_sz); return true; } -void ICAPXaction::parseBody() +bool ICAPXaction::mayReadMore() const { - Must(state.parsing == State::psBody); - - debugs(93, 5, "have " << readBuf.contentSize() << " body bytes to parse"); - - if (gotEncapsulated("res-body")) { - if (!parsePresentBody()) // need more body data - return; - } else { - debugs(93, 5, "not expecting a body"); - } - - stopReading(); - stopParsing(); - stopSending(); + return !doneReading() && // will read more data + readBuf.hasSpace(); // have space for more data } -// returns true iff complete body was parsed -bool ICAPXaction::parsePresentBody() +bool ICAPXaction::doneReading() const { - if (!bodyParser) - bodyParser = new ChunkedCodingParser; - - // the parser will throw on errors - const bool parsed = bodyParser->parse(&readBuf, adapted->data->body); - - adapted->sendSourceProgress(); // TODO: do not send if parsed nothing - - debugs(93, 5, "have " << readBuf.contentSize() << " body bytes after " << - "parse; parsed all: " << parsed); - - if (parsed) - return true; - - if (bodyParser->needsMoreData()) { - Must(!state.doneReading); // will get more data - Must(readBuf.hasSpace()); // have space for more data - } - - if (bodyParser->needsMoreSpace()) { - Must(!state.doneSending()); // can hope for more space - Must(adapted->data->body->hasContent()); // paranoid - // TODO: there should be a timeout in case the sink is broken. - } - - return false; + return commEof; } -void ICAPXaction::stopParsing() +void ICAPXaction::mustStop(const char *aReason) { - if (state.parsing == State::psDone) - return; - - debugs(93, 7, "ICAPXaction will no longer parse " << status()); - - delete bodyParser; - - bodyParser = NULL; - - state.parsing = State::psDone; - - if (!state.doneReading) - stopReading(); -} - -// HTTP side added virgin body data -void ICAPXaction::noteSourceProgress(MsgPipe *p) -{ - ICAPXaction_Enter(noteSourceProgress); - - Must(!state.doneReceiving); - writeMore(); - - if (state.sending == State::sendingVirgin) - echoMore(); - - ICAPXaction_Exit(noteSourceProgress); -} - -// HTTP side sent us all virgin info -void ICAPXaction::noteSourceFinish(MsgPipe *p) -{ - ICAPXaction_Enter(noteSourceFinish); - - Must(!state.doneReceiving); - state.doneReceiving = true; - - // push writer and sender in case we were waiting for the last-chunk - writeMore(); - - if (state.sending == State::sendingVirgin) - echoMore(); - - ICAPXaction_Exit(noteSourceFinish); -} - -// HTTP side is aborting -void ICAPXaction::noteSourceAbort(MsgPipe *p) -{ - ICAPXaction_Enter(noteSourceAbort); - - Must(!state.doneReceiving); - state.doneReceiving = true; - mustStop(notifyService); - - ICAPXaction_Exit(noteSourceAbort); -} - -// HTTP side wants more adapted data and possibly freed some buffer space -void ICAPXaction::noteSinkNeed(MsgPipe *p) -{ - ICAPXaction_Enter(noteSinkNeed); - - if (state.sending == State::sendingVirgin) - echoMore(); - else - if (state.sending == State::sendingAdapted) - parseMore(); - else - Must(state.sending == State::sendingUndecided); - - ICAPXaction_Exit(noteSinkNeed); -} - -// HTTP side aborted -void ICAPXaction::noteSinkAbort(MsgPipe *p) -{ - ICAPXaction_Enter(noteSinkAbort); - - mustStop(notifyService); - - ICAPXaction_Exit(noteSinkAbort); -} - -void ICAPXaction::mustStop(Notify who) -{ - Must(state.inCall); // otherwise nobody will call doStop() - Must(notify == notifyUnknown); - notify = who; - debugs(93, 5, "ICAPXaction will stop and notify " << notify); + Must(inCall); // otherwise nobody will call doStop() + Must(!stopReason); + Must(aReason); + stopReason = aReason; + debugs(93, 5, typeName << " will stop, reason: " << stopReason); } // internal cleanup void ICAPXaction::doStop() { - debugs(93, 5, "ICAPXaction::doStop " << status()); - - readBuf.clean(); - - closeConnection(); // TODO: pconn support: close unless notifyService ... - stopBackup(); - - if (icapReply) - httpReplyDestroy(icapReply); - - icapReply = NULL; - - if (virgin != NULL) { - if (notify == notifyHttp || notify == notifyAll) - virgin->sendSinkAbort(); - else - virgin->sink = NULL; - - virgin = NULL; // refcounted - - state.doneReceiving = true; - } - - if (adapted != NULL) { - if (notify == notifyHttp || notify == notifyAll) - adapted->sendSourceAbort(); - else - adapted->source = NULL; - - /* - * Note on adapted->data->header: - * we created the header, but allow the other side (ICAPAnchor) - * to take control of it. We won't touch it here and instead - * rely on the Anchor-side to make sure it is properly - * freed. - */ - adapted = NULL; // refcounted - - state.sending = State::sendingDone; - } - - if (self != NULL) { // even if notify is notifyNone - Pointer s = self; - self = NULL; - ICAPNoteXactionDone(s); - /* this object may be destroyed when 's' is cleared */ - } -} - -void ICAPXaction::makeRequestHeaders(MemBuf &buf) -{ - ICAPServiceRep::IcapMethod m = service->method; - buf.Printf("%s %s ICAP/1.0\r\n", service->methodStr(), service->uri.buf()); - buf.Printf("Host: %s:%d\r\n", service->host.buf(), service->port); - buf.Printf("Encapsulated: "); - - MemBuf httpBuf; - httpBuf.init(); - - // build HTTP request header, if any - - if (ICAPServiceRep::respmod == m && virgin->data->cause) - encapsulateHead(buf, "req-hdr", httpBuf, virgin->data->cause); - else if (ICAPServiceRep::reqmod == m) - encapsulateHead(buf, "req-hdr", httpBuf, virgin->data->header); - - if (ICAPServiceRep::respmod == m) - if (const MsgPipeData::Header *prime = virgin->data->header) - encapsulateHead(buf, "res-hdr", httpBuf, prime); - - if (!virginBody.expected()) - buf.Printf("null-body=%d", httpBuf.contentSize()); - else if (m == ICAPServiceRep::reqmod) - buf.Printf("req-body=%d", httpBuf.contentSize()); - else - buf.Printf("res-body=%d", httpBuf.contentSize()); - - buf.append(crlf, 2); // terminate Encapsulated line - - if (shouldPreview()) { - buf.Printf("Preview: %d\r\n", (int)preview.ad()); - virginSendClaim.protectUpTo(preview.ad()); - } - - if (shouldAllow204()) { - buf.Printf("Allow: 204\r\n"); - // be robust: do not rely on the expected body size - virginSendClaim.protectAll(); - } - - buf.append(crlf, 2); // terminate ICAP header - - // start ICAP request body with encapsulated HTTP headers - buf.append(httpBuf.content(), httpBuf.contentSize()); - - httpBuf.clean(); -} - -void ICAPXaction::encapsulateHead(MemBuf &icapBuf, const char *section, MemBuf &httpBuf, const HttpMsg *head) -{ - // update ICAP header - icapBuf.Printf("%s=%d,", section, httpBuf.contentSize()); + debugs(93, 5, typeName << "::doStop " << status()); - // pack HTTP head - packHead(httpBuf, head); + closeConnection(); // TODO: pconn support: close iff bad connection } -void ICAPXaction::packHead(MemBuf &httpBuf, const HttpMsg *head) +void ICAPXaction::service(ICAPServiceRep::Pointer &aService) { - Packer p; - packerToMemInit(&p, &httpBuf); - head->packInto(&p, true); - packerClean(&p); + Must(!theService); + Must(aService != NULL); + theService = aService; } -// decides whether to offer a preview and calculates its size -bool ICAPXaction::shouldPreview() +ICAPServiceRep &ICAPXaction::service() { - size_t wantedSize; - - if (!service->wantsPreview(wantedSize)) { - debugs(93, 5, "ICAPXaction should not offer preview"); - return false; - } - - Must(wantedSize >= 0); - - // cannot preview more than we can backup - size_t ad = XMIN(wantedSize, TheBackupLimit); - - if (virginBody.expected() && virginBody.knownSize()) - ad = XMIN(ad, virginBody.size()); // not more than we have - else - ad = 0; // questionable optimization? - - debugs(93, 5, "ICAPXaction should offer " << ad << "-byte preview " << - "(service wanted " << wantedSize << ")"); - - preview.enable(ad); - - return preview.enabled(); -} - -// decides whether to allow 204 responses -bool ICAPXaction::shouldAllow204() -{ - if (!service->allows204()) - return false; - - if (!virginBody.expected()) - return true; // no body means no problems with supporting 204s. - - // if there is a body, make sure we can backup it all - - if (!virginBody.knownSize()) - return false; - - // or should we have a different backup limit? - // note that '<' allows for 0-termination of the "full" backup buffer - return virginBody.size() < TheBackupLimit; + Must(theService != NULL); + return *theService; } bool ICAPXaction::callStart(const char *method) { - debugs(93, 5, "ICAPXaction::" << method << " called " << status()); + debugs(93, 5, typeName << "::" << method << " called " << status()); - if (state.inCall) { + if (inCall) { // this may happen when we have bugs or when arguably buggy // comm interface calls us while we are closing the connection - debugs(93, 5, "ICAPXaction::" << method << " cancels reentry."); + debugs(93, 5, typeName << "::" << inCall << " is in progress; " << + typeName << "::" << method << " cancels reentry."); return false; } - state.inCall = true; + inCall = method; return true; } -void ICAPXaction::callException(const char *method, const TextException &e, Notify defaultWho) +void ICAPXaction::callException(const TextException &e) { - debugs(93, 4, "ICAPXaction::" << method << " caught an exception: " << + debugs(93, 4, typeName << "::" << inCall << " caught an exception: " << e.message << ' ' << status()); if (!done()) - mustStop(defaultWho); + mustStop("exception"); } -void ICAPXaction::callEnd(const char *method) +void ICAPXaction::callEnd() { if (done()) { - debugs(93, 5, "ICAPXaction::" << method << " ends xaction " << + debugs(93, 5, "ICAPXaction::" << inCall << " ends xaction " << status()); - doStop(); + doStop(); // may delete us return; } - debugs(93, 6, "ICAPXaction::" << method << " ended " << status()); - state.inCall = false; + debugs(93, 6, typeName << "::" << inCall << " ended " << status()); + inCall = NULL; } // returns a temporary string depicting transaction status, for debugging @@ -1200,47 +364,9 @@ buf.append("[", 1); - if (!state.doneWriting() && state.writing != State::writingInit) - buf.Printf("w(%d)", state.writing); - - if (preview.enabled()) { - if (!preview.done()) - buf.Printf("P(%d)", preview.debt()); - } - - if (virginSendClaim.active()) - buf.append("B", 1); - - if (!state.doneParsing() && state.parsing != State::psIcapHeader) - buf.Printf("p(%d)", state.parsing); - - if (!state.doneSending() && state.sending != State::sendingUndecided) - buf.Printf("S(%d)", state.sending); - + fillPendingStatus(buf); buf.append("/", 1); - - if (state.doneReceiving) - buf.append("R", 1); - - if (state.doneWriting()) - buf.append("w", 1); - - if (preview.enabled()) { - if (preview.done()) - buf.Printf("P%s", preview.ieof() ? "(ieof)" : ""); - } - - if (state.doneReading) - buf.append("r", 1); - - if (state.doneParsing()) - buf.append("p", 1); - - if (state.doneSending()) - buf.append("S", 1); - - if (notify != notifyUnknown) - buf.Printf("N(%d)", notify); + fillDoneStatus(buf); buf.append("]", 1); @@ -1249,173 +375,26 @@ return buf.content(); } -bool ICAPXaction::gotEncapsulated(const char *section) const -{ - return httpHeaderGetByNameListMember(&icapReply->header, "Encapsulated", - section, ',').size() > 0; -} - -// calculate whether there is a virgin HTTP body and -// whether its expected size is known -void ICAPXaction::estimateVirginBody() +void ICAPXaction::fillPendingStatus(MemBuf &buf) const { - // note: defaults should be fine but will disable previews and 204s + if (connection >= 0) { + buf.Printf("Comm(%d", connection); - if (virgin->data->header == NULL) - return; + if (writer) + buf.append("w", 1); - method_t method; + if (reader) + buf.append("r", 1); - if (virgin->data->cause) - method = virgin->data->cause->method; - else - if (HttpRequest *req= dynamic_cast(virgin->data-> - header)) - method = req->method; - else - return; - - ssize_t size; - if (virgin->data->header->expectingBody(method, size)) { - virginBody.expect(size) - ; - debugs(93, 6, "ICAPXaction expects virgin body; size: " << size); - } else { - debugs(93, 6, "ICAPXaction does not expect virgin body"); + buf.append(")", 1); } } - -// TODO: Move SizedEstimate, MemBufBackup, and ICAPPreview elsewhere - -SizedEstimate::SizedEstimate() - : theData(dtUnexpected) -{} - -void SizedEstimate::expect(ssize_t aSize) +void ICAPXaction::fillDoneStatus(MemBuf &buf) const { - theData = (aSize >= 0) ? aSize : (ssize_t)dtUnknown; -} + if (connection >= 0 && commEof) + buf.Printf("Comm(%d)", connection); -bool SizedEstimate::expected() const -{ - return theData != dtUnexpected; -} - -bool SizedEstimate::knownSize() const -{ - Must(expected()); - return theData != dtUnknown; + if (stopReason != NULL) + buf.Printf("Stopped"); } - -size_t SizedEstimate::size() const -{ - Must(knownSize()); - return static_cast(theData); -} - - - -MemBufClaim::MemBufClaim(): theStart(-1), theGoal(-1) -{} - -void MemBufClaim::protectAll() -{ - if (theStart < 0) - theStart = 0; - - theGoal = -1; // no specific goal -} - -void MemBufClaim::protectUpTo(size_t aGoal) -{ - if (theStart < 0) - theStart = 0; - - Must(aGoal >= 0); - - theGoal = (theGoal < 0) ? static_cast(aGoal) : - XMIN(static_cast(aGoal), theGoal); -} - -void MemBufClaim::disable() -{ - theStart = -1; -} - -void MemBufClaim::release(size_t size) -{ - Must(active()); - Must(size >= 0); - theStart += static_cast(size); - - if (limited() && theStart >= theGoal) - disable(); -} - -size_t MemBufClaim::offset() const -{ - Must(active()); - return static_cast(theStart); -} - -bool MemBufClaim::limited() const -{ - Must(active()); - return theGoal >= 0; -} - - -ICAPPreview::ICAPPreview(): theWritten(0), theAd(0), theState(stDisabled) -{} - -void ICAPPreview::enable(size_t anAd) -{ - // TODO: check for anAd not exceeding preview size limit - Must(anAd >= 0); - Must(!enabled()); - theAd = anAd; - theState = stWriting; -} - -bool ICAPPreview::enabled() const -{ - return theState != stDisabled; -} - -size_t ICAPPreview::ad() const -{ - Must(enabled()); - return theAd; -} - -bool ICAPPreview::done() const -{ - Must(enabled()); - return theState >= stIeof; -} - -bool ICAPPreview::ieof() const -{ - Must(enabled()); - return theState == stIeof; -} - -size_t ICAPPreview::debt() const -{ - Must(enabled()); - return done() ? 0 : (theAd - theWritten); -} - -void ICAPPreview::wrote(size_t size, bool sawEof) -{ - Must(enabled()); - theWritten += size; - - if (theWritten >= theAd) - theState = stDone; // sawEof is irrelevant - else - if (sawEof) - theState = stIeof; -} - Index: squid3/src/ICAPXaction.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/ICAPXaction.h,v retrieving revision 1.1.2.26 retrieving revision 1.1.2.27 diff -u -r1.1.2.26 -r1.1.2.27 --- squid3/src/ICAPXaction.h 11 Oct 2005 22:00:20 -0000 1.1.2.26 +++ squid3/src/ICAPXaction.h 17 Oct 2005 22:58:29 -0000 1.1.2.27 @@ -1,6 +1,6 @@ /* - * $Id: ICAPXaction.h,v 1.1.2.26 2005/10/11 22:00:20 rousskov Exp $ + * $Id: ICAPXaction.h,v 1.1.2.27 2005/10/17 22:58:29 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -35,203 +35,69 @@ #define SQUID_ICAPXACTION_H #include "MemBuf.h" -#include "MsgPipe.h" -#include "MsgPipeSource.h" -#include "MsgPipeSink.h" - #include "ICAPServiceRep.h" -/* The ICAP Xaction implements message pipe sink and source interfaces. It - * receives virgin HTTP messages, communicates with the ICAP server, and sends - * the adapted messages back. ICAPClient is the "owner" of the ICAPXaction. */ - -class ICAPServiceRep; - -class TextException; - -class ChunkedCodingParser; - class HttpMsg; -// estimated future presence and size of something (e.g., HTTP body) - -class SizedEstimate -{ - -public: - SizedEstimate(); // not expected by default - void expect(ssize_t aSize); // expect with any, even unknown size - bool expected() const; - - /* other members can be accessed iff expected() */ - - bool knownSize() const; - size_t size() const; // can be accessed iff knownSize() - -private: - enum { dtUnexpected = -2, dtUnknown = -1 }; - ssize_t theData; // combines expectation and size info to save RAM -}; - -// Protects buffer area. If area size is unknown, protects buffer suffix. -// Only "released" data can be consumed by the caller. Used to maintain -// write, preview, and 204 promises for ICAPXaction virgin->data-body buffer. - -class MemBufClaim -{ - -public: - MemBufClaim(); - - void protectAll(); - void protectUpTo(size_t aGoal); - void disable(); - bool active() const { return theStart >= 0; } - - // methods below require active() - - void release(size_t size); // stop protecting size more bytes - size_t offset() const; // protected area start - bool limited() const; // protects up to a known size goal - -private: - ssize_t theStart; // left area border - ssize_t theGoal; // "end" maximum, if any -}; - -// maintains preview-related sizes - -class ICAPPreview -{ - -public: - ICAPPreview(); // disabled - void enable(size_t anAd); // enabled with advertised size - bool enabled() const; - - /* other members can be accessed iff enabled() */ - - size_t ad() const; // advertised preview size - size_t debt() const; // remains to write - bool done() const; // wrote everything - bool ieof() const; // premature EOF +class TextException; - void wrote(size_t size, bool sawEof); +/* The ICAP Xaction implements message pipe sink and source interfaces. It + * receives virgin HTTP messages, communicates with the ICAP server, and sends + * the adapted messages back. ICAPClient is the "owner" of the ICAPXaction. */ -private: - size_t theWritten; - size_t theAd; - enum State { stDisabled, stWriting, stIeof, stDone } theState; -}; +// Note: ICAPXaction must be the first parent for object-unaware cbdata to work -class ICAPXaction: public MsgPipeSource, public MsgPipeSink +class ICAPXaction: public RefCountable { public: typedef RefCount Pointer; public: - ICAPXaction(); + ICAPXaction(const char *aTypeName); virtual ~ICAPXaction(); - // called by ICAPClient - void init(ICAPServiceRep::Pointer, MsgPipe::Pointer aVirgin, MsgPipe::Pointer anAdapted, Pointer &aSelf); - void ownerAbort(); - - // pipe source methods; called by Anchor while receiving the adapted msg - virtual void noteSinkNeed(MsgPipe *p); - virtual void noteSinkAbort(MsgPipe *p); - - // pipe sink methods; called by ICAP while sending the virgin message - virtual void noteSourceStart(MsgPipe *p); - virtual void noteSourceProgress(MsgPipe *p); - virtual void noteSourceFinish(MsgPipe *p); - virtual void noteSourceAbort(MsgPipe *p); - - // comm handlers + // comm handler wrappers, treat as private void noteCommConnected(comm_err_t status); - void noteCommWroteHeaders(comm_err_t status); - void noteCommWroteBody(comm_err_t status, size_t sz); + void noteCommWrote(comm_err_t status, size_t sz); void noteCommRead(comm_err_t status, size_t sz); - void noteCommTimeout(); - void noteCommClose(); + void noteCommTimedout(); + void noteCommClosed(); + +protected: + // Set or get service pointer; ICAPXaction cbdata-locks it. + void service(ICAPServiceRep::Pointer &aService); + ICAPServiceRep &service(); + + // comm hanndlers; called by comm handler wrappers + virtual void handleCommConnected() = 0; + virtual void handleCommWrote(size_t sz) = 0; + virtual void handleCommRead(size_t sz) = 0; + virtual void handleCommTimedout(); + virtual void handleCommClosed(); -private: - void estimateVirginBody(); void openConnection(); void closeConnection(); + void scheduleRead(); + void scheduleWrite(MemBuf &buf); - void writeMore(); - void writePriviewBody(); - void writePrimeBody(); - void writeSomeBody(const char *label, size_t size); - - void startReading(); - void readMore(); - - size_t claimSize(const MemBufClaim &claim) const; - const char *claimContent(const MemBufClaim &claim) const; - void makeRequestHeaders(MemBuf &buf); - void moveRequestChunk(MemBuf &buf, size_t chunkSize); - void addLastRequestChunk(MemBuf &buf); - void openChunk(MemBuf &buf, size_t chunkSize); - void closeChunk(MemBuf &buf, bool ieof); - void virginConsume(); - - bool shouldPreview(); - bool shouldAllow204(); - void prepBackup(size_t expectedSize); - void backup(const MemBuf &buf); - - void parseMore(); - - void parseHeaders(); - void parseIcapHead(); - void parseHttpHead(); - bool parseHead(HttpMsg *head); - - void parseBody(); - bool parsePresentBody(); - void maybeAllocateHttpMsg(); - - void handle100Continue(); - void handle200Ok(); - void handle204NoContent(); - void handleUnknownScode(); + void cancelRead(); - void echoMore(); + bool parseHttpMsg(HttpMsg &msg); // true=success; false=needMore; throw=err + bool mayReadMore() const; + virtual bool doneReading() const; bool done() const; + virtual bool doneAll() const; + virtual void doStop(); + void mustStop(const char *reason); - typedef enum { notifyUnknown, notifyNone, notifyService, notifyHttp, - notifyAll } Notify; - void mustStop(Notify who); - void doStop(); - void stopReading(); - void stopWriting(); - void stopParsing(); - void stopSending(); - void stopBackup(); - - bool callStart(const char *method); - void callException(const char *method, const TextException &e, Notify who); - void callEnd(const char *method); - -private: // returns a temporary string depicting transaction status, for debugging const char *status() const; + virtual void fillPendingStatus(MemBuf &buf) const; + virtual void fillDoneStatus(MemBuf &buf) const; - void packHead(MemBuf &httpBuf, const HttpMsg *head); - void encapsulateHead(MemBuf &icapBuf, const char *section, MemBuf &httpBuf, const HttpMsg *head); - bool gotEncapsulated(const char *section) const; - - Pointer self; - ICAPServiceRep::Pointer service; - MsgPipe::Pointer virgin; - MsgPipe::Pointer adapted; - - HttpReply *icapReply; - +protected: int connection; // FD of the ICAP server connection /* @@ -247,71 +113,52 @@ */ MemBuf readBuf; char *commBuf; + size_t commBufSize; + bool commEof; - SizedEstimate virginBody; - MemBufClaim virginWriteClaim; // preserve virgin data buffer for writing - MemBufClaim virginSendClaim; // ... for sending (previe and 204s) - size_t virginConsumed; // virgin data consumed so far - ICAPPreview preview; // use for creating (writing) the preview + const char *stopReason; + + // asynchronous call maintenance + bool callStart(const char *method); + void callException(const TextException &e); + void callEnd(); // active (pending) comm callbacks for the ICAP server connection + CNCB *connector; IOCB *reader; CWCB *writer; PF *closer; - ChunkedCodingParser *bodyParser; // ICAP response body parser - - class State - { - - public: - State(); - - public: - - unsigned inCall: - 1; // processing an asynchronous call (e.g., comm read callback) - - unsigned doneReceiving: - 1; // expect no new virgin info (from virgin pipe) - - unsigned doneReading: - 1; // will not read from the ICAP server connection - - // will not write anything [else] to the ICAP server connection - bool doneWriting() const { return writing == writingDone; } + const char *typeName; // the type of the final class (child), for debugging - // parsed entire ICAP response from the ICAP server - bool doneParsing() const { return parsing == psDone; } - - // will not send anything [else] on the adapted pipe - bool doneSending() const { return sending == sendingDone; } - - // is parsing ICAP or HTTP headers read from the ICAP server - bool parsingHeaders() const - { - return parsing == psIcapHeader || - parsing == psHttpHeader; - } - - enum Parsing { psIcapHeader, psHttpHeader, psBody, psDone } parsing; - - // measures ICAP request writing progress - enum Writing { writingInit, writingConnect, writingHeaders, - writingPreview, writingPaused, writingPrime, writingDone } writing; +private: + ICAPServiceRep::Pointer theService; - enum Sending { sendingUndecided, sendingVirgin, sendingAdapted, - sendingDone } sending; - } + const char *inCall; // name of the asynchronous call being executed, if any - state; + //CBDATA_CLASS2(ICAPXaction); +}; - Notify notify; +// call guards for all "asynchronous" note*() methods - CBDATA_CLASS2(ICAPXaction); -}; +// asynchronous call entry: +// - open the try clause; +// - call callStart(). +#define ICAPXaction_Enter(method) \ + try { \ + if (!callStart(#method)) \ + return; + +// asynchronous call exit: +// - close the try clause; +// - catch exceptions; +// - let callEnd() handle transaction termination conditions +#define ICAPXaction_Exit() \ + } \ + catch (const TextException &e) { \ + callException(e); \ + } \ + callEnd(); -// destroys (or pools) the transaction; implemented in ICAPClient.cc (ick?) -extern void ICAPNoteXactionDone(ICAPXaction::Pointer x); #endif /* SQUID_ICAPXACTION_H */