--------------------- PatchSet 1942 Date: 2005/10/07 23:11:38 Author: rousskov Branch: squid3-icap Tag: (none) Log: - Fixed support for ICAP 200 responses and for 204s outside of Preview. 204s are handled by writing the HTTP message head to a memory buffer and then creating a "similar" message by parsing the buffer. This inefficient operation is done because (a) HttMsg kids do not have a clone() method and (b) it is probably incorrect to clone entire HttpMsg kids because their structures contain members unrelated to HTTP (some other code will probably set those parts for adapted HTTP messages retrurned by ICAPXaction, and we do not want that code to distinguish between cloned and adapted HTTP messages). This code passes simple tests, but is likely to have bugs. Members: src/ICAPXaction.cc:1.1.2.46->1.1.2.47 src/ICAPXaction.h:1.1.2.23->1.1.2.24 Index: squid3/src/ICAPXaction.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/ICAPXaction.cc,v retrieving revision 1.1.2.46 retrieving revision 1.1.2.47 diff -u -r1.1.2.46 -r1.1.2.47 --- squid3/src/ICAPXaction.cc 6 Oct 2005 05:31:26 -0000 1.1.2.46 +++ squid3/src/ICAPXaction.cc 7 Oct 2005 23:11:38 -0000 1.1.2.47 @@ -29,6 +29,7 @@ CBDATA_CLASS_INIT(ICAPXaction); static const char *const crlf = "\r\n"; +static const size_t TheBackupLimit = ICAPMsgPipeBufSizeMax; static ICAPXaction &ICAPXaction_fromData(void *data) @@ -159,6 +160,11 @@ { ICAPXaction_Enter(noteSourceStart); + // make sure TheBackupLimit is in-sync with the buffer size + Must(TheBackupLimit <= static_cast(virgin->data->body->max_capacity)); + + estimateVirginBody(); + openConnection(); // put nothing here as openConnection calls commConnectStart // and that may call us back without waiting for the next select loop @@ -205,6 +211,7 @@ } stopReading(); + stopWriting(); comm_close(connection); @@ -213,19 +220,21 @@ } // connection with the ICAP service established -void ICAPXaction::noteCommConnected(comm_err_t status) +void ICAPXaction::noteCommConnected(comm_err_t commStatus) { - debug(93,3)("ICAPXaction::noteCommConnected() called\n"); ICAPXaction_Enter(noteCommConnected); Must(state.writing == State::writingConnect); - Must(status == COMM_OK); + Must(commStatus == COMM_OK); startReading(); // wait for early errors from the ICAP server MemBuf requestBuf; requestBuf.init(); makeRequestHeaders(&requestBuf); + debugs(93, 9, "ICAPXaction ICAP request prefix: " << status() << "\n" << + (requestBuf.terminate(), requestBuf.content())); + // write headers only; comm module will free the requestBuf state.writing = State::writingHeaders; writer = &ICAPXaction_noteCommWroteHeaders; @@ -234,14 +243,14 @@ ICAPXaction_Exit(noteCommConnected); } -void ICAPXaction::noteCommWroteHeaders(comm_err_t status) +void ICAPXaction::noteCommWroteHeaders(comm_err_t commStatus) { ICAPXaction_Enter(noteCommWroteHeaders); Must(writer); writer = NULL; - Must(status == COMM_OK); + Must(commStatus == COMM_OK); Must(state.writing == State::writingHeaders); @@ -251,7 +260,7 @@ writeMore(); } else { Must(!preview.enabled()); // no preview if no body - state.writing = State::writingDone; + stopWriting(); } ICAPXaction_Exit(noteCommWroteHeaders); @@ -259,36 +268,36 @@ void ICAPXaction::writeMore() { - while (!writer) { // already writing something + if (writer) // already writing something + return; - switch (state.writing) { + switch (state.writing) { - case State::writingConnect: // waiting for the connection to establish + case State::writingConnect: // waiting for the connection to establish - case State::writingHeaders: // waiting for the headers to be written + case State::writingHeaders: // waiting for the headers to be written - case State::writingPaused: // waiting for the ICAP server response + case State::writingPaused: // waiting for the ICAP server response - case State::writingDone: // nothing more to write - return; + case State::writingDone: // nothing more to write + return; - case State::writingPreview: - writePriviewBody(); - break; - - case State::writingPrime: - writePrimeBody(); - break; - - default: - debugs(93, 1, "ICAPXaction bad writing state: " << state.writing); - Must(false); - } + case State::writingPreview: + writePriviewBody(); + return; + + case State::writingPrime: + writePrimeBody(); + return; + + default: + throw TexcHere("ICAPXaction in bad writing state"); } } void ICAPXaction::writePriviewBody() { + debugs(93, 8, "ICAPXaction will write Preview body " << status()); Must(state.writing == State::writingPreview); MsgPipeData::Body *body = virgin->data->body; @@ -299,9 +308,14 @@ // change state once preview is written if (preview.done()) { - state.writing = preview.ieof() ? - State::writingDone : State::writingPaused; + debugs(93, 7, "ICAPXaction wrote Preview body " << status()); + + if (preview.ieof()) + stopWriting(); + else + state.writing = State::writingPaused; } + } void ICAPXaction::writePrimeBody() @@ -313,26 +327,31 @@ writeSomeBody("prime virgin body", size, false); if (state.doneReceiving) - state.writing = State::writingDone; + stopWriting(); } void ICAPXaction::writeSomeBody(const char *label, size_t size, bool forceLast) { Must(!writer && !state.doneWriting()); - MsgPipeData::Body *body = virgin->data->body; MemBuf writeBuf; // TODO: suggest a min size based on size and lastChunk writeBuf.init(); - if (body->hasContent() && size > 0) { - debugs(93, 7, "ICAPXaction will write " << size << "-byte chunk of " - << label); - moveRequestChunk(&writeBuf, size, false); + MsgPipeData::Body *body = virgin->data->body; + const size_t writeableSize = virginDataMgr.writeableSize(*body); + + if (const size_t chunkSize = XMIN(writeableSize, size)) { + debugs(93, 7, "ICAPXaction will write " << chunkSize << + "-byte chunk of " << label); + moveRequestChunk(&writeBuf, chunkSize, false); } - if (state.doneReceiving || forceLast) { + const bool exhausted = state.doneReceiving && + virginDataMgr.writeableSize(*body) <= 0; + + if (exhausted || forceLast) { debugs(93, 7, "ICAPXaction will write last-chunk of " << label); moveRequestChunk(&writeBuf, 0, forceLast); // adds zero-size chunk } @@ -355,13 +374,7 @@ if (chunkSize > 0) { MsgPipeData::Body *body = virgin->data->body; - - if (virginBackup.enabled() && !virginBackup.done()) - virginBackup.update(*body); - - buf->append(body->content(), chunkSize); - - body->consume(chunkSize); + (void)virginDataMgr.write(*body, *buf, chunkSize); if (!state.doneReceiving) virgin->sendSinkNeed(); @@ -373,17 +386,53 @@ buf->append(crlf, 2); // chunk-terminating CRLF } -void ICAPXaction::noteCommWroteBody(comm_err_t status, size_t sz) +void ICAPXaction::noteCommWroteBody(comm_err_t commStatus, size_t sz) { ICAPXaction_Enter(noteCommWroteBody); writer = NULL; - Must(status == COMM_OK); + Must(commStatus == COMM_OK); writeMore(); ICAPXaction_Exit(noteCommWroteBody); } +void ICAPXaction::stopWriting() +{ + if (state.writing == State::writingDone) + return; + + debugs(93, 7, "ICAPXaction will no longer write " << status()); + + state.writing = State::writingDone; + + bool freedSpace = false; + + virginDataMgr.disableWriting(*virgin->data->body, &freedSpace); + + if (freedSpace && !state.doneReceiving) + virgin->sendSinkNeed(); + + // Comm does not have an interface to clear the writer, but + // writeMore() will not write if our write callback is called + // when state.writing == State::writingDone; +} + +void ICAPXaction::stopBackup() +{ + if (!virginDataMgr.backupEnabled()) + return; + + debugs(93, 7, "ICAPXaction will no longer backup " << status()); + + bool freedSpace = false; + + virginDataMgr.disableBackup(*virgin->data->body, &freedSpace); + + if (freedSpace && !state.doneReceiving) + virgin->sendSinkNeed(); +} + // communication timeout with the ICAP service void ICAPXaction::noteCommTimeout() { @@ -410,7 +459,7 @@ if (notify != notifyUnknown) // mustStop() has been called return true; - return state.doneReceiving && state.doneSending && + return state.doneReceiving && state.doneSending() && state.doneReading && state.doneWriting(); } @@ -451,7 +500,7 @@ } // comm module read a portion of the ICAP response for us -void ICAPXaction::noteCommRead(comm_err_t status, size_t sz) +void ICAPXaction::noteCommRead(comm_err_t commStatus, size_t sz) { ICAPXaction_Enter(noteCommRead); @@ -459,7 +508,7 @@ reader = NULL; Must(!state.doneParsing()); - Must(status == COMM_OK); + Must(commStatus == COMM_OK); debugs(93, 5, "read " << sz << " bytes"); @@ -483,6 +532,8 @@ void ICAPXaction::stopReading() { + debugs(93, 7, "ICAPXaction will no longer read " << status()); + if (reader) { // check callback presence because comm module removes // fdc_table[].read.callback after the actual I/O but @@ -504,10 +555,50 @@ state.doneReading = true; } +void ICAPXaction::echoMore() +{ + Must(state.sending == State::sendingVirgin); + Must(virginDataMgr.backupEnabled()); + MemBuf &to = *adapted->data->body; + + if (!to.hasPotentialSpace()) { + debugs(93, 5, "cannot echo into a full buffer"); + return; // TODO: timeout if sink is broken + } + + MemBuf &from = *virgin->data->body; + const size_t size = virginDataMgr.restore(from, to); + + debugs(93, 5, "ICAPXaction echoed " << size << " bytes"); + + if (size) { + adapted->sendSourceProgress(); + return; + } + + if (state.doneReceiving) { + debugs(93, 5, "ICAPXaction echoed all"); + stopSending(); + } else { + debugs(93, 5, "ICAPXaction expects more to echo"); + virgin->sendSinkNeed(); + } +} + +void ICAPXaction::stopSending() +{ + debugs(93, 7, "ICAPXaction will no longer send " << status()); + + if (state.sending != State::sendingUndecided) + adapted->sendSourceFinish(); + + state.sending = State::sendingDone; +} + void ICAPXaction::parseMore() { debugs(93, 5, "have " << readBuf.contentSize() << " bytes to parse" << - "; state: " << state.parsing); + status()); if (state.parsingHeaders()) parseHeaders(); @@ -516,6 +607,7 @@ parseBody(); } +// note that allocation for echoing is done in handle204NoContent() void ICAPXaction::maybeAllocateHttpMsg() { if (adapted->data->header) // already allocated @@ -539,14 +631,21 @@ if (state.parsing == State::psHttpHeader) parseHttpHead(); - if (state.parsingHeaders()) // need more data + if (state.parsingHeaders()) { // need more data Must(!state.doneReading); - else - adapted->sendSourceStart(); + return; + } + + adapted->sendSourceStart(); + + if (state.sending == State::sendingVirgin) + echoMore(); } void ICAPXaction::parseIcapHead() { + Must(state.sending == State::sendingUndecided); + if (!parseHead(icapReply)) return; @@ -570,9 +669,12 @@ } // handle100Continue() manages state.writing on its own. - // Non-100 status means the server needs no more data from us. + // Non-100 status means the server needs no postPreview data from us. if (state.writing == State::writingPaused) - state.writing = State::writingDone; + stopWriting(); + + if (!virginDataMgr.backupAll()) + stopBackup(); // TODO: Consider applying a Squid 2.5 patch to recognize 201 responses } @@ -590,21 +692,60 @@ void ICAPXaction::handle200Ok() { state.parsing = State::psHttpHeader; + state.sending = State::sendingAdapted; } void ICAPXaction::handle204NoContent() { - state.parsing = State::psDone; - Must(false); // implement! + Must(virginDataMgr.backupEnabled()); + stopParsing(); + state.sending = State::sendingVirgin; + + // We want to clone the HTTP message, but we do not want + // to copy non-HTTP state parts that HttpMsg kids carry in them. + // Thus, we cannot use a smart pointer, copy constructor, or equivalent. + // Instead, we simply write the HTTP message and "clone" it by parsing. + + HttpMsg *oldHead = virgin->data->header; + debugs(93, 7, "cloning virgin message " << oldHead); + + MemBuf httpBuf; + + // write the virgin message into a memory buffer + httpBuf.init(); + packHead(&httpBuf, oldHead); + + // allocate the adapted message + HttpMsg *&newHead = adapted->data->header; + Must(!newHead); + + if (dynamic_cast(oldHead)) + newHead = new HttpRequest; + else + if (dynamic_cast(oldHead)) + newHead = httpReplyCreate(); + + Must(newHead); + + // parse the buffer back + http_status error = HTTP_STATUS_NONE; + + Must(newHead->parse(&httpBuf, true, &error)); + + Must(newHead->hdr_sz == httpBuf.contentSize()); // no leftovers + + httpBuf.clean(); + + debugs(93, 7, "cloned virgin message " << oldHead << " to " << newHead); } void ICAPXaction::handleUnknownScode() { - state.parsing = State::psDone; + stopParsing(); // TODO: mark connection as "bad" // Terminate the transaction; we do not know how to handle this response. - Must(false); + throw TexcHere("Unsupported ICAP status code"); } void ICAPXaction::parseHttpHead() @@ -645,20 +786,18 @@ debugs(93, 5, "have " << readBuf.contentSize() << " body bytes to parse"); if (gotEncapsulated("res-body")) { - if (!parsePresentBody()) + if (!parsePresentBody()) // need more body data return; } else { debugs(93, 5, "not expecting a body"); } stopReading(); - state.parsing = State::psDone; - state.doneSending = true; - adapted->sendSourceFinish(); - delete bodyParser; - bodyParser = NULL; + stopParsing(); + stopSending(); } +// returns true iff complete body was parsed bool ICAPXaction::parsePresentBody() { if (!bodyParser) @@ -667,7 +806,7 @@ // the parser will throw on errors const bool parsed = bodyParser->parse(&readBuf, adapted->data->body); - adapted->sendSourceProgress(); + adapted->sendSourceProgress(); // TODO: do not send if parsed nothing debugs(93, 5, "have " << readBuf.contentSize() << " body bytes after " << "parse; parsed all: " << parsed); @@ -681,7 +820,7 @@ } if (bodyParser->needsMoreSpace()) { - Must(!state.doneSending); // can hope for more space + Must(!state.doneSending()); // can hope for more space Must(adapted->data->body->hasContent()); // paranoid // TODO: there should be a timeout in case the sink is broken. } @@ -689,6 +828,23 @@ return false; } +void ICAPXaction::stopParsing() +{ + if (state.parsing == State::psDone) + return; + + debugs(93, 7, "ICAPXaction will no longer parse " << status()); + + delete bodyParser; + + bodyParser = NULL; + + state.parsing = State::psDone; + + if (!state.doneReading) + stopReading(); +} + // HTTP side added virgin body data void ICAPXaction::noteSourceProgress(MsgPipe *p) { @@ -697,6 +853,9 @@ Must(!state.doneReceiving); writeMore(); + if (state.sending == State::sendingVirgin) + echoMore(); + ICAPXaction_Exit(noteSourceProgress); } @@ -707,7 +866,7 @@ Must(!state.doneReceiving); state.doneReceiving = true; - writeMore(); // in case we were waiting to send the last-chunk + writeMore(); // in case we were waiting to write the last-chunk ICAPXaction_Exit(noteSourceFinish); } @@ -728,8 +887,13 @@ { ICAPXaction_Enter(noteSinkNeed); - Must(!state.doneSending); - parseMore(); + if (state.sending == State::sendingVirgin) + echoMore(); + else + if (state.sending == State::sendingAdapted) + parseMore(); + else + Must(state.sending == State::sendingUndecided); ICAPXaction_Exit(noteSinkNeed); } @@ -759,13 +923,14 @@ readBuf.clean(); + closeConnection(); // TODO: pconn support: close unless notifyService ... + stopBackup(); + if (icapReply) httpReplyDestroy(icapReply); icapReply = NULL; - closeConnection(); // TODO: pconn support: close unless notifyService ... - if (virgin != NULL) { if (notify == notifyHttp || notify == notifyAll) virgin->sendSinkAbort(); @@ -792,7 +957,7 @@ */ adapted = NULL; // refcounted - state.doneSending = true; + state.sending = State::sendingDone; } if (self != NULL) { // even if notify is notifyNone @@ -835,12 +1000,12 @@ if (shouldPreview()) { buf->Printf("Preview: %d\r\n", (int)preview.ad()); - virginBackup.enable(preview.ad()); + virginDataMgr.enableBackup(false); } if (shouldAllow204()) { buf->Printf("Allow: 204\r\n"); - virginBackup.enable(MemBufBackup::Limit); + virginDataMgr.enableBackup(true); } buf->append(crlf, 2); // terminate ICAP header @@ -857,6 +1022,11 @@ icapBuf->Printf("%s=%d,", section, httpBuf->contentSize()); // pack HTTP head + packHead(httpBuf, head); +} + +void ICAPXaction::packHead(MemBuf *httpBuf, const HttpMsg *head) +{ Packer p; packerToMemInit(&p, httpBuf); head->packInto(&p, true); @@ -872,7 +1042,7 @@ return false; // cannot preview more than we can backup - size_t ad = XMIN(wantedSize, MemBufBackup::Limit); + size_t ad = XMIN(wantedSize, TheBackupLimit); if (virginBody.expected() && virginBody.knownSize()) ad = XMIN(ad, virginBody.size()); // not more than we have @@ -898,33 +1068,11 @@ if (!virginBody.knownSize()) return false; - return virginBody.size() <= MemBufBackup::Limit; + // or should we have a different backup limit? + // note that '<' allows for 0-termination of the "full" backup buffer + return virginBody.size() < TheBackupLimit; } -#if 0 -// maybe called several times -void ICAPXaction::prepBackup(size_t maxSize) -{ - Must(0 <= maxSize && maxSize <= TheBackupMax); - theBackupGoal = XMAX(theBackupGoal, maxSize); - - if (!state.backingUp) { - state.backingUp = true; - virginBackup.init(maxSize, TheBackupMax); - } -} - -void ICAPXaction::backup(const MemBuf &buf) -{ - Must(state.backingUp); - const size_t debt = theBackupGoal - virginBackup.contentSize(); - const size_t size = XMIN((size_t)buf.contentSize(), debt); - virginBackup.append(buf.content(), size); - state.backingUp = ((size_t)virginBackup.contentSize()) < theBackupGoal; -} - -#endif - bool ICAPXaction::callStart(const char *method) { debugs(93, 5, "ICAPXaction::" << method << " called " << status()); @@ -965,31 +1113,55 @@ // returns a temporary string depicting transaction status, for debugging const char *ICAPXaction::status() const { - static MemBuf status; - status.reset(); + static MemBuf buf; + buf.reset(); - status.append("[", 1); + buf.append("[", 1); - if (notify != notifyUnknown) - status.Printf("N(%d)", notify); + if (!state.doneWriting() && state.writing != State::writingInit) + buf.Printf("w(%d)", state.writing); + + if (preview.enabled()) { + if (!preview.done()) + buf.Printf("P(%d)", preview.debt()); + else + buf.Printf("P(%s)", preview.ieof() ? "ieof" : ""); + } + + if (virginDataMgr.backupEnabled()) + buf.append("B", 1); + + if (!state.doneParsing() && state.parsing != State::psIcapHeader) + buf.Printf("p(%d)", state.parsing); + + if (!state.doneSending() && state.sending != State::sendingUndecided) + buf.Printf("S(%d)", state.sending); + + buf.append("/", 1); if (state.doneReceiving) - status.append("R", 1); + buf.append("R", 1); - if (state.doneSending) - status.append("S", 1); + if (state.doneWriting()) + buf.append("w", 1); if (state.doneReading) - status.append("r", 1); + buf.append("r", 1); - if (state.doneWriting()) - status.append("w", 1); + if (state.doneParsing()) + buf.append("p", 1); - status.append("]", 1); + if (state.doneSending()) + buf.append("S", 1); - status.terminate(); + if (notify != notifyUnknown) + buf.Printf("N(%d)", notify); - return status.content(); + buf.append("]", 1); + + buf.terminate(); + + return buf.content(); } bool ICAPXaction::gotEncapsulated(const char *section) const @@ -1042,8 +1214,13 @@ return; ssize_t size; - if (virgin->data->header->expectingBody(method, size)) - virginBody.expect(size); + if (virgin->data->header->expectingBody(method, size)) { + virginBody.expect(size) + ; + debugs(93, 6, "ICAPXaction expects virgin body; size: " << size); + } else { + debugs(93, 6, "ICAPXaction does not expect virgin body"); + } } @@ -1076,61 +1253,121 @@ } -const size_t MemBufBackup::Limit = ICAPMsgPipeBufSizeMax; - -MemBufBackup::MemBufBackup(): theGoal(-1) +ICAPXactionVirginDataMgr::ICAPXactionVirginDataMgr(): + theWriteOffset(0), theBakupOffset(-1), storeAll(false) {} -MemBufBackup::~MemBufBackup() +size_t ICAPXactionVirginDataMgr::writeableSize(MemBuf &from) { - theBuf.clean(); + Must(writeEnabled()); + Must(theWriteOffset <= from.contentSize()); + debugs(93, 9, "ICAPXactionVirginDataMgr write offset: " << + theWriteOffset << "; " << " content: " << from.contentSize()); + return from.contentSize() - theWriteOffset; } -// maybe called many times -void MemBufBackup::enable(size_t aGoal) +// TODO: isolate code common with restore() +size_t ICAPXactionVirginDataMgr::write(MemBuf &from, MemBuf &to, size_t chunkSize) { - Must(0 <= aGoal && aGoal <= Limit); - Must(aGoal >= 0); + const size_t dataSize = writeableSize(from); + const size_t size = XMIN(chunkSize, + XMIN(dataSize, static_cast(to.potentialSpaceSize()))); - if (enabled()) { - Must(theBuf.isNull()); // not started - theGoal = XMAX(theGoal, static_cast(aGoal)); - } else { - theGoal = static_cast(aGoal); + if (size > 0) { + to.append(from.content() + theWriteOffset, size); + theWriteOffset += size; + consume(from); } + + return size; } -bool MemBufBackup::enabled() const +size_t ICAPXactionVirginDataMgr::restore(MemBuf &from, MemBuf &to) { - return theGoal >= 0; + Must(backupEnabled()); + Must(theBakupOffset <= from.contentSize()); + const size_t dataSize = from.contentSize() - theBakupOffset; + const size_t size = + XMIN(dataSize, static_cast(to.potentialSpaceSize())); + + if (size > 0) { + to.append(from.content() + theBakupOffset, size); + theBakupOffset += size; + consume(from); + } + + return size; } -bool MemBufBackup::done() const +bool ICAPXactionVirginDataMgr::consume(MemBuf &buf) { - Must(enabled()); - return theBuf.contentSize() >= theGoal; + const size_t maxOffset = buf.contentSize(); + const size_t bakOffset = backupEnabled() ? theBakupOffset : maxOffset; + const size_t wrOffset = writeEnabled() ? theWriteOffset : maxOffset; + const size_t dataOffset = XMIN(bakOffset, wrOffset); + + debugs(93, 9, "ICAPXactionVirginDataMgr consumes " << dataOffset << "B"); + + if (dataOffset <= 0) + return false; + + buf.consume(dataOffset); + + if (writeEnabled()) + theWriteOffset -= dataOffset; + + if (backupEnabled()) + theBakupOffset -= dataOffset; + + return true; } -void MemBufBackup::update(const MemBuf &source) +bool ICAPXactionVirginDataMgr::writeEnabled() const { - Must(enabled()); + return theWriteOffset >= 0; +} + +void ICAPXactionVirginDataMgr::disableWriting(MemBuf &buf, bool *consumedPtr) +{ + Must(writeEnabled()); + theWriteOffset = -1; + const bool consumed = consume(buf); + + if (consumedPtr) + *consumedPtr = consumed; +} - if (theBuf.isNull()) // first update - theBuf.init(theGoal, Limit); +// maybe called many times +void ICAPXactionVirginDataMgr::enableBackup(bool all) +{ + if (!backupEnabled()) + theBakupOffset = 0; - const ssize_t debt = theGoal - theBuf.contentSize(); + if (!storeAll) + storeAll = all; +} - const ssize_t size = XMIN(source.contentSize(), debt); +bool ICAPXactionVirginDataMgr::backupEnabled() const +{ + return theBakupOffset >= 0; +} - theBuf.append(source.content(), size); +bool ICAPXactionVirginDataMgr::backupAll() const +{ + return backupEnabled() && storeAll; } -MemBuf &MemBufBackup::buf() +void ICAPXactionVirginDataMgr::disableBackup(MemBuf &buf, bool *consumedPtr) { - Must(enabled()); - return theBuf; + Must(backupEnabled()); + theBakupOffset = -1; + const bool consumed = consume(buf); + + if (consumedPtr) + *consumedPtr = consumed; } + ICAPPreview::ICAPPreview(): theWritten(0), theAd(0), theState(stDisabled) {} Index: squid3/src/ICAPXaction.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/ICAPXaction.h,v retrieving revision 1.1.2.23 retrieving revision 1.1.2.24 diff -u -r1.1.2.23 -r1.1.2.24 --- squid3/src/ICAPXaction.h 6 Oct 2005 05:31:26 -0000 1.1.2.23 +++ squid3/src/ICAPXaction.h 7 Oct 2005 23:11:38 -0000 1.1.2.24 @@ -1,6 +1,6 @@ /* - * $Id: ICAPXaction.h,v 1.1.2.23 2005/10/06 05:31:26 rousskov Exp $ + * $Id: ICAPXaction.h,v 1.1.2.24 2005/10/07 23:11:38 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -73,29 +73,42 @@ ssize_t theData; // combines expectation and size info to save RAM }; -// optional data backup +// Optional data backup. Start() to enable and call update() to +// accumulate data. Stop() (and do not call update()!) to stop accumulating. +// Accumulated data can be consumed any time after start(). + +// Manages ICAPXaction's virgin data buffer. +// Supports backup for Previews and 204s. +// To simplify code, the manager backups everything or nothing. This +// crude approach works OK because it does not cost us more to backup +// more bytes than is really needed (e.g., more than Preview size). -class MemBufBackup +class ICAPXactionVirginDataMgr { public: - MemBufBackup(); // disabled by default - ~MemBufBackup(); - void enable(size_t aGoal); // enable with known non-negative goal - bool enabled() const; + ICAPXactionVirginDataMgr(); - /* other members can be accessed iff enabled() */ + bool writeEnabled() const; // disableWriting() was not called + // how much data is available for writing + size_t writeableSize(MemBuf &from); + // prep write buffer for the I/O, may consume if backup allows + size_t write(MemBuf &from, MemBuf &to, size_t chunkSize); + void disableWriting(MemBuf &buf, bool *consumedPtr); // consume if needed + + void enableBackup(bool all); // start preserving [all] data + void disableBackup(MemBuf &buf, bool *consumedPtr); // stop and consume + bool backupEnabled() const; + bool backupAll() const; + size_t restore(MemBuf &from, MemBuf &to); // restores&consumes max possible - bool done() const; - void update(const MemBuf &source); - MemBuf &buf(); - -public: - static const size_t Limit; // cannot backup more than that +private: + bool consume(MemBuf &buf); private: - MemBuf theBuf; - ssize_t theGoal; + ssize_t theWriteOffset; // start of unwritten data + ssize_t theBakupOffset; // start of unrestored data or -1 + bool storeAll; }; // maintains preview-related sizes @@ -193,6 +206,8 @@ void handle204NoContent(); void handleUnknownScode(); + void echoMore(); + bool done() const; typedef enum { notifyUnknown, notifyNone, notifyService, notifyHttp, @@ -200,6 +215,10 @@ void mustStop(Notify who); void doStop(); void stopReading(); + void stopWriting(); + void stopParsing(); + void stopSending(); + void stopBackup(); bool callStart(const char *method); void callException(const char *method, const TextException &e, Notify who); @@ -209,6 +228,7 @@ // returns a temporary string depicting transaction status, for debugging const char *status() const; + void packHead(MemBuf *httpBuf, const HttpMsg *head); void encapsulateHead(MemBuf *icapBuf, const char *section, MemBuf *httpBuf, const HttpMsg *head); bool gotEncapsulated(const char *section) const; @@ -236,7 +256,7 @@ char *commBuf; SizedEstimate virginBody; - MemBufBackup virginBackup; + ICAPXactionVirginDataMgr virginDataMgr; ICAPPreview preview; // active (pending) comm callbacks for the ICAP server connection @@ -260,9 +280,6 @@ unsigned doneReceiving: 1; // expect no new virgin info (from virgin pipe) - unsigned doneSending: - 1; // will not produce new adapted info (for adapted pipe) - unsigned doneReading: 1; // will not read from the ICAP server connection @@ -272,6 +289,9 @@ // parsed entire ICAP response from the ICAP server bool doneParsing() const { return parsing == psDone; } + // will not send anything [else] on the adapted pipe + bool doneSending() const { return sending == sendingDone; } + // is parsing ICAP or HTTP headers read from the ICAP server bool parsingHeaders() const { @@ -282,8 +302,11 @@ enum Parsing { psIcapHeader, psHttpHeader, psBody, psDone } parsing; // measures ICAP request writing progress - enum Writing { writingNone, writingConnect, writingHeaders, + enum Writing { writingInit, writingConnect, writingHeaders, writingPreview, writingPaused, writingPrime, writingDone } writing; + + enum Sending { sendingUndecided, sendingVirgin, sendingAdapted, + sendingDone } sending; } state;