--------------------- PatchSet 4087 Date: 2007/03/08 22:03:28 Author: rousskov Branch: squid3-icap Tag: (none) Log: - BodyPipe is used to read request bodies, including requests for which there is no consumer and the connection is in a 'closing' state. Added auto-consumption ability to BodyPipe so that a 'closing' connection does not have to rely on the body consumer presence when eating up remaining body data. If auto-consumption is turned on and the pipe starts consuming before a real consumer is attached to the pipe, the setConsumerIfNotLate call fails, and the real consumer has to handle the failure. Members: src/BodyPipe.cc:1.1.2.3->1.1.2.4 src/BodyPipe.h:1.1.2.2->1.1.2.3 Index: squid3/src/BodyPipe.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/BodyPipe.cc,v retrieving revision 1.1.2.3 retrieving revision 1.1.2.4 diff -u -r1.1.2.3 -r1.1.2.4 --- squid3/src/BodyPipe.cc 8 Mar 2007 05:23:09 -0000 1.1.2.3 +++ squid3/src/BodyPipe.cc 8 Mar 2007 22:03:28 -0000 1.1.2.4 @@ -27,7 +27,7 @@ BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1), theProducer(aProducer), theConsumer(0), - thePutSize(0), theGetSize(0), isCheckedOut(false) + thePutSize(0), theGetSize(0), mustAutoConsume(false), isCheckedOut(false) { // TODO: teach MemBuf to start with zero minSize // TODO: limit maxSize by theBodySize, when known? @@ -116,17 +116,28 @@ return 0; } -void -BodyPipe::setConsumer(Consumer *aConsumer) +bool +BodyPipe::setConsumerIfNotLate(Consumer *aConsumer) { assert(!theConsumer); assert(aConsumer); + + // TODO: convert this into an exception and remove IfNotLate suffix + // If there is something consumed already, we are in an auto-consuming mode + // and it is too late to attach a real consumer to the pipe. + if (theGetSize > 0) { + assert(mustAutoConsume); + return false; + } + theConsumer = aConsumer; debugs(91,7, HERE << "set consumer" << status()); if (theBuf.hasContent()) AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable); if (!theProducer) scheduleBodyEndNotification(); + + return true; } void @@ -161,6 +172,14 @@ postConsume(size); } +void +BodyPipe::enableAutoConsumption() { + mustAutoConsume = true; + debugs(91,5, HERE << "enabled auto consumption" << status()); + if (!theConsumer && theBuf.hasContent()) + AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable); +} + MemBuf & BodyPipe::checkOut() { assert(!isCheckedOut); @@ -211,7 +230,11 @@ assert(!isCheckedOut); thePutSize += size; debugs(91,7, HERE << "added " << size << " bytes" << status()); + + // We should not consume here even if mustAutoConsume because the + // caller may not be ready for the data to be consumed during this call. AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable); + if (!mayNeedMoreData()) clearProducer(true); // reached end-of-body } @@ -242,6 +265,9 @@ { if (theConsumer != NULL) theConsumer->noteMoreBodyDataAvailable(*this); + else + if (mustAutoConsume && theBuf.hasContent()) + consume(theBuf.contentSize()); } void BodyPipe::tellBodyProductionEnded() @@ -278,6 +304,8 @@ if (theConsumer) buf.Printf(" cons%p", theConsumer); + if (mustAutoConsume) + buf.append(" A", 2); if (isCheckedOut) buf.append(" L", 2); // Locked Index: squid3/src/BodyPipe.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/Attic/BodyPipe.h,v retrieving revision 1.1.2.2 retrieving revision 1.1.2.3 diff -u -r1.1.2.2 -r1.1.2.3 --- squid3/src/BodyPipe.h 20 Feb 2007 16:04:36 -0000 1.1.2.2 +++ squid3/src/BodyPipe.h 8 Mar 2007 22:03:28 -0000 1.1.2.3 @@ -95,13 +95,16 @@ size_t unproducedSize() const; // size of still unproduced data // called by consumers - void setConsumer(Consumer *aConsumer); + bool setConsumerIfNotLate(Consumer *aConsumer); void clearConsumer(); // aborts if still piping size_t getMoreData(MemBuf &buf); void consume(size_t size); bool expectMoreAfter(size_t offset) const; bool exhausted() const; // saw eof/abort and all data consumed + // start or continue consuming when there is no consumer + void enableAutoConsumption(); + const MemBuf &buf() const { return theBuf; } const char *status() const; // for debugging only @@ -146,6 +149,7 @@ MemBuf theBuf; // produced but not yet consumed content, if any + bool mustAutoConsume; // consume when there is no consumer bool isCheckedOut; // to keep track of checkout violations CBDATA_CLASS2(BodyPipe);