--------------------- PatchSet 1938 Date: 2005/10/05 22:34:56 Author: rousskov Branch: squid3-icap Tag: (none) Log: - Wrote initial support for ICAP Preview and 204s. Squid will try to supply Preview if the ICAP service asks for it. Regardless of the Preview, Squid will claim support for 204 responses outside of Preview if the HTTP body is not present or is known to fit into transaction's "backup" buffer. There is no code for dealing with 100 Continue and 204 No Content ICAP responses yet; only the writing side is implemented. Untested. Members: src/ICAPXaction.cc:1.1.2.44->1.1.2.45 src/ICAPXaction.h:1.1.2.21->1.1.2.22 Index: squid3/src/ICAPXaction.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/ICAPXaction.cc,v retrieving revision 1.1.2.44 retrieving revision 1.1.2.45 diff -u -r1.1.2.44 -r1.1.2.45 --- squid3/src/ICAPXaction.cc 5 Oct 2005 18:02:08 -0000 1.1.2.44 +++ squid3/src/ICAPXaction.cc 5 Oct 2005 22:34:56 -0000 1.1.2.45 @@ -183,12 +183,13 @@ throw TexcHere("cannot connect to ICAP service " /* + uri */); } + debug(93,3)("ICAPXaction::openConnection() to %s %d\n", service->host.buf(), service->port); + commSetTimeout(connection, Config.Timeout.connect, &ICAPXaction_noteCommTimeout, this); closer = &ICAPXaction_noteCommClose; comm_add_close_handler(connection, closer, this); - debug(93,3)("ICAPXaction::openConnection() to %s %d\n", service->host.buf(), service->port); - state.wroteHeaders = 0; + state.writing = State::writingConnect; commConnectStart(connection, service->host.buf(), service->port, &ICAPXaction_noteCommConnected, this); } @@ -226,7 +227,6 @@ makeRequestHeaders(&requestBuf); // write headers only; comm module will free the requestBuf writer = &ICAPXaction_noteCommWroteHeaders; - //debug(0,0)("{%s}\n", requestBuf.content()); comm_old_write_mbuf(connection, &requestBuf, writer, this); ICAPXaction_Exit(noteCommConnected); @@ -240,45 +240,103 @@ writer = NULL; Must(status == COMM_OK); - state.wroteHeaders = 1; - writeMoreBody(); + + Must(state.writing == State::writingHeaders); + + if (virginBody.expected()) { + state.writing = preview.enabled() ? + State::writingPreview : State::writingPrime; + writeMore(); + } else { + Must(!preview.enabled()); // no preview if no body + state.writing = State::writingDone; + } ICAPXaction_Exit(noteCommWroteHeaders); } -void ICAPXaction::writeMoreBody() +void ICAPXaction::writeMore() { - if (writer || state.doneWriting || !state.wroteHeaders) - return; + while (!writer) { // already writing something - if (!expectVirginBody()) { - state.doneWriting = true; - return; + switch (state.writing) { + + case State::writingConnect: // waiting for the connection to establish + + case State::writingHeaders: // waiting for the headers to be written + + case State::writingPaused: // waiting for the ICAP server response + + case State::writingDone: // nothing more to write + return; + + case State::writingPreview: + writePriviewBody(); + break; + + case State::writingPrime: + writePrimeBody(); + break; + + default: + debugs(93, 1, "ICAPXaction bad writing state: " << state.writing); + Must(false); + } } +} + +void ICAPXaction::writePriviewBody() +{ + Must(state.writing == State::writingPreview); MsgPipeData::Body *body = virgin->data->body; + const size_t size = XMIN(preview.debt(), (size_t)body->contentSize()); + preview.wrote(size, state.doneReceiving); // assumes writeSomeBody success + writeSomeBody("preview body", size, preview.done()); - MemBuf writeBuf; // TODO: suggest a min size based on body and lastChunk + // change state once preview is written - writeBuf.init(); + if (preview.done()) { + state.writing = preview.ieof() ? + State::writingDone : State::writingPaused; + } +} - if (body->hasContent()) { - debugs(93, 7, "ICAPXaction will write " << body->contentSize() << - "-byte chunk"); - moveRequestChunk(&writeBuf); - Must(!body->hasContent()); // we should have moved all of it above +void ICAPXaction::writePrimeBody() +{ + Must(state.writing == State::writingPrime); - if (!state.doneReceiving) - virgin->sendSinkNeed(); + MsgPipeData::Body *body = virgin->data->body; + const size_t size = body->contentSize(); + writeSomeBody("prime virgin body", size, false); + + if (state.doneReceiving) + state.writing = State::writingDone; +} + +void ICAPXaction::writeSomeBody(const char *label, size_t size, bool forceLast) +{ + Must(!writer && !state.doneWriting()); + + MsgPipeData::Body *body = virgin->data->body; + + MemBuf writeBuf; // TODO: suggest a min size based on size and lastChunk + + writeBuf.init(); + + if (body->hasContent() && size > 0) { + debugs(93, 7, "ICAPXaction will write " << size << "-byte chunk of " + << label); + moveRequestChunk(&writeBuf, size, false); } - if (state.doneReceiving && !state.doneWriting) { - debugs(93, 7, "ICAPXaction will write last-chunk"); - moveRequestChunk(&writeBuf); // will add zero-size chunk - state.doneWriting = true; + if (state.doneReceiving || forceLast) { + debugs(93, 7, "ICAPXaction will write last-chunk of " << label); + moveRequestChunk(&writeBuf, 0, forceLast); // adds zero-size chunk } - debugs(93, 7, "ICAPXaction will write " << writeBuf.contentSize() << " bytes"); + debugs(93, 7, "ICAPXaction will write " << writeBuf.contentSize() + << " raw bytes of " << label); if (writeBuf.hasContent()) { // comm will free the chunk @@ -289,17 +347,27 @@ } } -void ICAPXaction::moveRequestChunk(MemBuf *buf) +void ICAPXaction::moveRequestChunk(MemBuf *buf, size_t chunkSize, bool ieof) { - MsgPipeData::Body *body = virgin->data->body; - const mb_size_t chunkSize = body->contentSize(); // may be zero buf->Printf("%x\r\n", chunkSize); if (chunkSize > 0) { + MsgPipeData::Body *body = virgin->data->body; + + if (virginBackup.enabled() && !virginBackup.done()) + virginBackup.update(*body); + buf->append(body->content(), chunkSize); + body->consume(chunkSize); + + if (!state.doneReceiving) + virgin->sendSinkNeed(); } + if (ieof) + buf->append("; ieof", 6); + buf->append(crlf, 2); // chunk-terminating CRLF } @@ -309,7 +377,7 @@ writer = NULL; Must(status == COMM_OK); - writeMoreBody(); + writeMore(); ICAPXaction_Exit(noteCommWroteBody); } @@ -341,7 +409,7 @@ return true; return state.doneReceiving && state.doneSending && - state.doneReading && state.doneWriting; + state.doneReading && state.doneWriting(); } void ICAPXaction::startReading() @@ -489,7 +557,8 @@ http_status error = HTTP_STATUS_NONE; const bool parsed = head->parse(&readBuf, state.doneReading, &error); - // TODO: replace the if below with Must(parsed || !error); + // TODO: replace the if below with Must(parsed || !error); + if (!parsed && error > 0) { // unrecoverable parsing error debugs(11, 1, "ICAPXaction::parseHead: failed to parse ICAP header: '" << readBuf.content() << "'"); notify = notifyHttp; @@ -563,7 +632,7 @@ ICAPXaction_Enter(noteSourceProgress); Must(!state.doneReceiving); - writeMoreBody(); // TODO: add writeMore(); we may still be writing headers + writeMore(); ICAPXaction_Exit(noteSourceProgress); } @@ -575,7 +644,7 @@ Must(!state.doneReceiving); state.doneReceiving = true; - writeMoreBody(); // in case we were waiting to send the last-chunk + writeMore(); // in case we were waiting to send the last-chunk ICAPXaction_Exit(noteSourceFinish); } @@ -701,6 +770,16 @@ buf->append(crlf, 2); // terminate Encapsulated line + if (shouldPreview()) { + buf->Printf("Preview: %d\r\n", (int)preview.ad()); + virginBackup.enable(preview.ad()); + } + + if (shouldAllow204()) { + buf->Printf("Allow: 204\r\n"); + virginBackup.enable(MemBufBackup::Limit); + } + buf->append(crlf, 2); // terminate ICAP header // start ICAP request body with encapsulated HTTP headers @@ -721,6 +800,68 @@ packerClean(&p); } +// decides whether to offer a preview and calculates its size +bool ICAPXaction::shouldPreview() +{ + size_t wantedSize; + + if (!service->wantsPreview(wantedSize)) + return false; + + // cannot preview more than we can backup + size_t ad = XMIN(wantedSize, MemBufBackup::Limit); + + if (virginBody.expected() && virginBody.knownSize()) + ad = XMIN(ad, virginBody.size()); // not more than we have + else + ad = 0; // questionable optimization? + + preview.enable(ad); + + return preview.enabled(); +} + +// decides whether to allow 204 responses +bool ICAPXaction::shouldAllow204() +{ + if (!service->allows204()) + return false; + + if (!virginBody.expected()) + return true; // no body means no problems with supporting 204s. + + // if there is a body, make sure we can backup it all + + if (!virginBody.knownSize()) + return false; + + return virginBody.size() <= MemBufBackup::Limit; +} + +#if 0 +// maybe called several times +void ICAPXaction::prepBackup(size_t maxSize) +{ + Must(0 <= maxSize && maxSize <= TheBackupMax); + theBackupGoal = XMAX(theBackupGoal, maxSize); + + if (!state.backingUp) { + state.backingUp = true; + virginBackup.init(maxSize, TheBackupMax); + } +} + +void ICAPXaction::backup(const MemBuf &buf) +{ + Must(state.backingUp); + const size_t debt = theBackupGoal - virginBackup.contentSize(); + const size_t size = XMIN((size_t)buf.contentSize(), debt); + virginBackup.append(buf.content(), size); + state.backingUp = ((size_t)virginBackup.contentSize()) < theBackupGoal; +} + +#endif + bool ICAPXaction::callStart(const char *method) { debugs(93, 5, "ICAPXaction::" << method << " called " << status()); @@ -778,7 +919,7 @@ if (state.doneReading) status.append("r", 1); - if (state.doneWriting) + if (state.doneWriting()) status.append("w", 1); status.append("]", 1); @@ -816,3 +957,166 @@ return virgin->data->header->expectingBody(method, unused); } + +// calculate whether there is a virgin HTTP body and +// whether its expected size is known +void ICAPXaction::estimateVirginBody() +{ + // note: defaults should be fine but will disable previews and 204s + + if (virgin->data->header == NULL) + return; + + method_t method; + + if (virgin->data->cause) + method = virgin->data->cause->method; + else + if (HttpRequest *req= dynamic_cast(virgin->data-> + header)) + method = req->method; + else + return; + + ssize_t size; + if (virgin->data->header->expectingBody(method, size)) + virginBody.expect(size); +} + + +// TODO: Move SizedEstimate, MemBufBackup, and ICAPPreview elsewhere + +SizedEstimate::SizedEstimate() + : theData(dtUnexpected) +{} + +void SizedEstimate::expect(ssize_t aSize) +{ + theData = (aSize >= 0) ? aSize : (ssize_t)dtUnknown; +} + +bool SizedEstimate::expected() const +{ + return theData != dtUnexpected; +} + +bool SizedEstimate::knownSize() const +{ + Must(expected()); + return theData != dtUnknown; +} + +size_t SizedEstimate::size() const +{ + Must(knownSize()); + return static_cast(theData); +} + + +const size_t MemBufBackup::Limit = ICAPMsgPipeBufSizeMax; + +MemBufBackup::MemBufBackup(): theGoal(-1) +{} + +MemBufBackup::~MemBufBackup() +{ + theBuf.clean(); +} + +// maybe called many times +void MemBufBackup::enable(size_t aGoal) +{ + Must(0 <= aGoal && aGoal <= Limit); + Must(aGoal >= 0); + + if (enabled()) { + Must(theBuf.isNull()); // not started + theGoal = XMAX(theGoal, static_cast(aGoal)); + } else { + theGoal = static_cast(aGoal); + } +} + +bool MemBufBackup::enabled() const +{ + return theGoal >= 0; +} + +bool MemBufBackup::done() const +{ + Must(enabled()); + return theBuf.contentSize() >= theGoal; +} + +void MemBufBackup::update(const MemBuf &source) +{ + Must(enabled()); + + if (theBuf.isNull()) // first update + theBuf.init(theGoal, Limit); + + const ssize_t debt = theGoal - theBuf.contentSize(); + + const ssize_t size = XMIN(source.contentSize(), debt); + + theBuf.append(source.content(), size); +} + +MemBuf &MemBufBackup::buf() +{ + Must(enabled()); + return theBuf; +} + +ICAPPreview::ICAPPreview(): theWritten(0), theAd(0), theState(stDisabled) +{} + +void ICAPPreview::enable(size_t anAd) +{ + // TODO: check for anAd not exceeding preview size limit + Must(anAd >= 0); + Must(!enabled()); + theAd = anAd; +} + +bool ICAPPreview::enabled() const +{ + return theState != stDisabled; +} + +size_t ICAPPreview::ad() const +{ + Must(enabled()); + return theAd; +} + +bool ICAPPreview::done() const +{ + Must(enabled()); + return theState >= stIeof; +} + +bool ICAPPreview::ieof() const +{ + Must(enabled()); + return theState == stIeof; +} + +size_t ICAPPreview::debt() const +{ + Must(enabled()); + return done() ? 0 : (theAd - theWritten); +} + +void ICAPPreview::wrote(size_t size, bool sawEof) +{ + Must(enabled()); + theWritten += size; + + if (theWritten >= theAd) + theState = stDone; // sawEof is irrelevant + else + if (sawEof) + theState = stIeof; +} + Index: squid3/src/ICAPXaction.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/ICAPXaction.h,v retrieving revision 1.1.2.21 retrieving revision 1.1.2.22 diff -u -r1.1.2.21 -r1.1.2.22 --- squid3/src/ICAPXaction.h 1 Oct 2005 04:57:08 -0000 1.1.2.21 +++ squid3/src/ICAPXaction.h 5 Oct 2005 22:34:56 -0000 1.1.2.22 @@ -1,6 +1,6 @@ /* - * $Id: ICAPXaction.h,v 1.1.2.21 2005/10/01 04:57:08 dwsquid Exp $ + * $Id: ICAPXaction.h,v 1.1.2.22 2005/10/05 22:34:56 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -53,6 +53,76 @@ class HttpMsg; +// estimated future presence and size of something (e.g., HTTP body) + +class SizedEstimate +{ + +public: + SizedEstimate(); // not expected by default + void expect(ssize_t aSize); // expect with any, even unknown size + bool expected() const; + + /* other members can be accessed iff expected() */ + + bool knownSize() const; + size_t size() const; // can be accessed iff knownSize() + +private: + enum { dtUnexpected = -2, dtUnknown = -1 }; + ssize_t theData; // combines expectation and size info to save RAM +}; + +// optional data backup + +class MemBufBackup +{ + +public: + MemBufBackup(); // disabled by default + ~MemBufBackup(); + void enable(size_t aGoal); // enable with known non-negative goal + bool enabled() const; + + /* other members can be accessed iff enabled() */ + + bool done() const; + void update(const MemBuf &source); + MemBuf &buf(); + +public: + static const size_t Limit; // cannot backup more than that + +private: + MemBuf theBuf; + ssize_t theGoal; +}; + +// maintains preview-related sizes + +class ICAPPreview +{ + +public: + ICAPPreview(); // disabled + void enable(size_t anAd); // enabled with advertised size + bool enabled() const; + + /* other members can be accessed iff enabled() */ + + size_t ad() const; // advertised preview size + size_t debt() const; // remains to write + bool done() const; // wrote everything + bool ieof() const; // premature EOF + + void wrote(size_t size, bool sawEof); + +private: + size_t theWritten; + size_t theAd; + enum State { stDisabled, stWriting, stIeof, stDone } theState; +}; + class ICAPXaction: public MsgPipeSource, public MsgPipeSink { @@ -86,14 +156,25 @@ void noteCommClose(); private: + void estimateVirginBody(); void openConnection(); void closeConnection(); - void writeMoreBody(); + + void writeMore(); + void writePriviewBody(); + void writePrimeBody(); + void writeSomeBody(const char *label, size_t size, bool forceLast); + void startReading(); void readMore(); void makeRequestHeaders(MemBuf *buf); - void moveRequestChunk(MemBuf *buf); + void moveRequestChunk(MemBuf *buf, size_t chunkSize, bool ieof); + + bool shouldPreview(); + bool shouldAllow204(); + void prepBackup(size_t expectedSize); + void backup(const MemBuf &buf); void parseMore(); void parseHeaders(); @@ -145,6 +226,10 @@ MemBuf readBuf; char *commBuf; + SizedEstimate virginBody; + MemBufBackup virginBackup; + ICAPPreview preview; + // active (pending) comm callbacks for the ICAP server connection IOCB *reader; CWCB *writer; @@ -160,9 +245,6 @@ public: - unsigned wroteHeaders: - 1; // ICAP headers have been written - unsigned inCall: 1; // processing an asynchronous call (e.g., comm read callback) @@ -175,8 +257,8 @@ unsigned doneReading: 1; // will not read from the ICAP server connection - unsigned doneWriting: - 1; // will not write to the ICAP server connection + // will not write anything [else] to the ICAP server connection + bool doneWriting() const { return writing == writingDone; } // parsed entire ICAP response from the ICAP server bool doneParsing() const { return parsing == psDone; } @@ -189,6 +271,10 @@ } enum Parsing { psIcapHeader, psHttpHeader, psBody, psDone } parsing; + + // measures ICAP request writing progress + enum Writing { writingNone, writingConnect, writingHeaders, + writingPreview, writingPaused, writingPrime, writingDone } writing; } state;