--------------------- PatchSet 4039 Date: 2007/02/14 04:44:10 Author: rousskov Branch: squid3-icap Tag: (none) Log: - Replaced BodyReader with BodyPipe. BodyReader was a collection of function pointers augmented with body size calculation logic. BodyReader was used to deliver request body (of a known size) from the client side to the server side. Reference counting was used to communicate abort conditions to the other side (it did not work well because decreasing the reference count does not have any side-effects if the count remains positive). Direct calls between sides sometimes resulted in a call-me-when-I-am-calling-you "loops" and related bugs. BodyPipe is used to deliver request or response body (possibly of unknown size) from the body producer to the body consumer. A producer can be the client side (for virgin requests), the server side (for virgin replies), or the ICAP side (for adapted messages). A consumer can be the client side (for adapted responses, including responses in a request satisfaction mode), the server side (for adapted requests), and the ICAP side (for virgin requests and responses). BodyPipe uses asynchronous calls for communication between sides to avoid call-me-when-I-am-calling-you "loops". BodyPipe has methods to communicate normal termination and abort conditions to the other side. The use of those methods is mandatory. Reference counting is used only as a garbage collection mechanism. Members: src/BodyPipe.cc:1.1->1.1.2.1 src/BodyPipe.h:1.1->1.1.2.1 src/BodyReader.cc:1.5.2.3->1.5.2.4(DEAD) src/BodyReader.h:1.2.2.2->1.2.2.3(DEAD) --- /dev/null Wed Feb 14 13:37:19 2007 +++ squid3/src/BodyPipe.cc Wed Feb 14 13:38:37 2007 @@ -0,0 +1,317 @@ + +#include "squid.h" +#include "BodyPipe.h" + +CBDATA_CLASS_INIT(BodyPipe); + +// inform the pipe that we are done and clear the Pointer +void BodyProducer::stopProducingFor(RefCount &pipe, bool atEof) +{ + debugs(91,7, HERE << this << " will not produce for " << pipe << + "; atEof: " << atEof); + assert(pipe != NULL); // be strict: the caller state may depend on this + pipe->clearProducer(atEof); + pipe = NULL; +} + +// inform the pipe that we are done and clear the Pointer +void BodyConsumer::stopConsumingFrom(RefCount &pipe) +{ + debugs(91,7, HERE << this << " will not consume from " << pipe); + assert(pipe != NULL); // be strict: the caller state may depend on this + pipe->clearConsumer(); + pipe = NULL; +} + +/* BodyPipe */ + +BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1), + theProducer(aProducer), theConsumer(0), + thePutSize(0), theGetSize(0), isCheckedOut(false) +{ + // TODO: teach MemBuf to start with zero minSize + // TODO: limit maxSize by theBodySize, when known? + theBuf.init(2*1024, MaxCapacity); + debugs(91,7, HERE << "created BodyPipe" << status()); +} + +BodyPipe::~BodyPipe() +{ + debugs(91,7, HERE << "destroying BodyPipe" << status()); + assert(!theProducer); + assert(!theConsumer); + theBuf.clean(); +} + +void BodyPipe::setBodySize(size_t aBodySize) +{ + assert(!bodySizeKnown()); + assert(aBodySize >= 0); + assert(thePutSize <= aBodySize); + + // If this assert fails, we need to add code to check for eof and inform + // the consumer about the eof condition via scheduleBodyEndNotification, + // because just setting a body size limit may trigger the eof condition. + assert(!theConsumer); + + theBodySize = aBodySize; +} + +size_t BodyPipe::bodySize() const +{ + assert(bodySizeKnown()); + return static_cast(theBodySize); +} + +bool BodyPipe::exhausted() const +{ + return theGetSize >= thePutSize && // no old data buffered + !mayNeedMoreData(); // no new data expected +} + +size_t BodyPipe::unproducedSize() const +{ + return bodySize() - thePutSize; // bodySize() asserts that size is known +} + +void +BodyPipe::clearProducer(bool atEof) +{ + if (theProducer) { + debugs(91,7, HERE << "clearing BodyPipe producer" << status()); + theProducer = NULL; + if (atEof) { + if (!bodySizeKnown()) + theBodySize = thePutSize; + else + if (bodySize() != thePutSize) + debugs(91,1, HERE << "aborting on premature eof" << status()); + } else { + // asserta that we can detect the abort if the consumer joins later + assert(!bodySizeKnown() || bodySize() != thePutSize); + } + scheduleBodyEndNotification(); + } +} + +size_t +BodyPipe::putMoreData(const char *buf, size_t size) +{ + const size_t spaceSize = static_cast(theBuf.potentialSpaceSize()); + if ((size = XMIN(size, spaceSize))) { + theBuf.append(buf, size); + postAppend(size); + return size; + } + return 0; +} + +void +BodyPipe::setConsumer(Consumer *aConsumer) +{ + assert(!theConsumer); + assert(aConsumer); + theConsumer = aConsumer; + debugs(91,7, HERE << "set consumer" << status()); + if (theBuf.hasContent()) + AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable); + if (!theProducer) + scheduleBodyEndNotification(); +} + +void +BodyPipe::clearConsumer() { + if (theConsumer) { + debugs(91,7, HERE << "clearing consumer" << status()); + theConsumer = NULL; + if (!exhausted()) + AsyncCall(91,5, this, BodyPipe::tellBodyConsumerAborted); + } +} + +size_t +BodyPipe::getMoreData(MemBuf &buf) +{ + if (!theBuf.hasContent()) + return 0; // did not touch the possibly uninitialized buf + + if (buf.isNull()) + buf.init(); + const size_t size = XMIN(theBuf.contentSize(), buf.potentialSpaceSize()); + buf.append(theBuf.content(), size); + theBuf.consume(size); + postConsume(size); + return size; // cannot be zero if we called buf.init above +} + +void +BodyPipe::consume(size_t size) +{ + theBuf.consume(size); + postConsume(size); +} + +MemBuf & +BodyPipe::checkOut() { + assert(!isCheckedOut); + isCheckedOut = true; + return theBuf; +} + +void +BodyPipe::checkIn(Checkout &checkout) +{ + assert(isCheckedOut); + isCheckedOut = false; + const size_t currentSize = theBuf.contentSize(); + if (checkout.checkedOutSize > currentSize) + postConsume(checkout.checkedOutSize - currentSize); + else + if (checkout.checkedOutSize < currentSize) + postAppend(currentSize - checkout.checkedOutSize); +} + +void +BodyPipe::undoCheckOut(Checkout &checkout) +{ + assert(isCheckedOut); + const size_t currentSize = theBuf.contentSize(); + // We can only undo if size did not change, and even that carries + // some risk. If this becomes a problem, the code checking out + // raw buffers should always check them in (possibly unchanged) + // instead of relying on the automated undo mechanism of Checkout. + // The code can always use a temporary buffer to accomplish that. + assert(checkout.checkedOutSize == currentSize); +} + +// TODO: Optimize: inform consumer/producer about more data/space only if +// they used the data/space since we notified them last time. + +void +BodyPipe::postConsume(size_t size) { + assert(!isCheckedOut); + theGetSize += size; + debugs(91,7, HERE << "consumed " << size << " bytes" << status()); + if (mayNeedMoreData()) + AsyncCall(91,5, this, BodyPipe::tellMoreBodySpaceAvailable); +} + +void +BodyPipe::postAppend(size_t size) { + assert(!isCheckedOut); + thePutSize += size; + debugs(91,7, HERE << "added " << size << " bytes" << status()); + AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable); + if (!mayNeedMoreData()) + clearProducer(true); // reached end-of-body +} + + +void +BodyPipe::scheduleBodyEndNotification() +{ + if (bodySizeKnown() && bodySize() == thePutSize) + AsyncCall(91,5, this, BodyPipe::tellBodyProductionEnded); + else + AsyncCall(91,5, this, BodyPipe::tellBodyProducerAborted); +} + +void BodyPipe::tellMoreBodySpaceAvailable() +{ + if (theProducer != NULL) + theProducer->noteMoreBodySpaceAvailable(*this); +} + +void BodyPipe::tellBodyConsumerAborted() +{ + if (theProducer != NULL) + theProducer->noteBodyConsumerAborted(*this); +} + +void BodyPipe::tellMoreBodyDataAvailable() +{ + if (theConsumer != NULL) + theConsumer->noteMoreBodyDataAvailable(*this); +} + +void BodyPipe::tellBodyProductionEnded() +{ + if (theConsumer != NULL) + theConsumer->noteBodyProductionEnded(*this); +} + +void BodyPipe::tellBodyProducerAborted() +{ + if (theConsumer != NULL) + theConsumer->noteBodyProducerAborted(*this); +} + +// a short temporary string describing buffer status for debugging +const char *BodyPipe::status() const +{ + static MemBuf buf; + buf.reset(); + + buf.append(" [", 2); + + buf.Printf("%d<=%d", (int)theGetSize, (int)thePutSize); + if (theBodySize >= 0) + buf.Printf("<=%d", (int)theBodySize); + else + buf.append("<=?", 3); + + buf.Printf(" %d+%d", theBuf.contentSize(), theBuf.spaceSize()); + + buf.Printf(" pipe%p", this); + if (theProducer) + buf.Printf(" prod%p", theProducer); + if (theConsumer) + buf.Printf(" cons%p", theConsumer); + + if (isCheckedOut) + buf.append(" L", 2); // Locked + + buf.append("]", 1); + + buf.terminate(); + + return buf.content(); +} + + +/* BodyPipeCheckout */ + +BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe), + buf(aPipe.checkOut()), offset(aPipe.consumedSize()), + checkedOutSize(buf.contentSize()), checkedIn(false) +{ +} + +BodyPipeCheckout::~BodyPipeCheckout() +{ + if (!checkedIn) + pipe.undoCheckOut(*this); +} + +void +BodyPipeCheckout::checkIn() +{ + assert(!checkedIn); + pipe.checkIn(*this); + checkedIn = true; +} + + +BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe), + buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize), + checkedIn(c.checkedIn) +{ + assert(false); // prevent copying +} + +BodyPipeCheckout & +BodyPipeCheckout::operator =(const BodyPipeCheckout &) +{ + assert(false); // prevent assignment + return *this; +} --- /dev/null Wed Feb 14 13:37:19 2007 +++ squid3/src/BodyPipe.h Wed Feb 14 13:38:37 2007 @@ -0,0 +1,153 @@ + +#ifndef SQUID_BODY_PIPE_H +#define SQUID_BODY_PIPE_H + +#include "MemBuf.h" +#include "AsyncCall.h" + +class BodyPipe; + +// Interface for those who want to produce body content for others. +// BodyProducer is expected to create the BodyPipe. +// One pipe cannot have more than one producer. +class BodyProducer { + public: + virtual ~BodyProducer() {} + + virtual void noteMoreBodySpaceAvailable(BodyPipe &bp) = 0; + virtual void noteBodyConsumerAborted(BodyPipe &bp) = 0; + + protected: + void stopProducingFor(RefCount &pipe, bool atEof); +}; + +// Interface for those who want to consume body content from others. +// BodyConsumer is expected to register with an existing BodyPipe +// by calling BodyPipe::setConsumer(). +// One pipe cannot have more than one consumer. +class BodyConsumer { + public: + virtual ~BodyConsumer() {} + + virtual void noteMoreBodyDataAvailable(BodyPipe &bp) = 0; + virtual void noteBodyProductionEnded(BodyPipe &bp) = 0; + virtual void noteBodyProducerAborted(BodyPipe &bp) = 0; + + protected: + void stopConsumingFrom(RefCount &pipe); +}; + +// Makes raw buffer checkin/checkout interface efficient and exception-safe. +// Either append or consume operations can be performed on a checkedout buffer. +class BodyPipeCheckout { + public: + friend class BodyPipe; + + public: + BodyPipeCheckout(BodyPipe &pipe); // checks out + ~BodyPipeCheckout(); // aborts checkout unless checkedIn + + void checkIn(); + + public: + BodyPipe &pipe; + MemBuf &buf; + const size_t offset; // of current content, relative to the body start + + protected: + const size_t checkedOutSize; + bool checkedIn; + + private: + BodyPipeCheckout(const BodyPipeCheckout &); // prevent copying + BodyPipeCheckout &operator =(const BodyPipeCheckout &); // prevent assignment +}; + +// Connects those who produces message body content with those who +// consume it. For example, connects ConnStateData with FtpStateData OR +// ICAPModXact with HttpStateData. +class BodyPipe: public RefCountable { + public: + typedef RefCount Pointer; + typedef BodyProducer Producer; + typedef BodyConsumer Consumer; + typedef BodyPipeCheckout Checkout; + + enum { MaxCapacity = SQUID_TCP_SO_RCVBUF }; + + friend class BodyPipeCheckout; + + public: + BodyPipe(Producer *aProducer); + ~BodyPipe(); // asserts that producer and consumer are cleared + + void setBodySize(size_t aSize); // set body size + bool bodySizeKnown() const { return theBodySize >= 0; } + size_t bodySize() const; + size_t consumedSize() const { return theGetSize; } + bool productionEnded() const { return !theProducer; } + + // called by producers + void clearProducer(bool atEof); // aborts or sends eof + size_t putMoreData(const char *buf, size_t size); + bool mayNeedMoreData() const { return !bodySizeKnown() || needsMoreData(); } + bool needsMoreData() const { return bodySizeKnown() && unproducedSize() > 0; } + size_t unproducedSize() const; // size of still unproduced data + + // called by consumers + void setConsumer(Consumer *aConsumer); + void clearConsumer(); // aborts if still piping + size_t getMoreData(MemBuf &buf); + void consume(size_t size); + bool exhausted() const; // saw eof/abort and all data consumed + + const MemBuf &buf() const { return theBuf; } + + const char *status() const; // for debugging only + + protected: + // lower-level interface used by Checkout + MemBuf &checkOut(); // obtain raw buffer + void checkIn(Checkout &checkout); // return updated raw buffer + void undoCheckOut(Checkout &checkout); // undo checkout efffect + + void scheduleBodyEndNotification(); + + // keep counters in sync and notify producer or consumer + void postConsume(size_t size); + void postAppend(size_t size); + + public: /* public to enable callbacks, but treat as private */ + + /* these methods are calling producer and sibscriber note*() + * callbacks with this BodyPipe as a parameter, which allows + * a single producer or consumer to support multiple pipes. */ + + void tellMoreBodySpaceAvailable(); + void tellBodyConsumerAborted(); + void tellMoreBodyDataAvailable(); + void tellBodyProductionEnded(); + void tellBodyProducerAborted(); + + AsyncCallWrapper(91,5, BodyPipe, tellMoreBodySpaceAvailable); + AsyncCallWrapper(91,5, BodyPipe, tellBodyConsumerAborted); + AsyncCallWrapper(91,5, BodyPipe, tellMoreBodyDataAvailable); + AsyncCallWrapper(91,5, BodyPipe, tellBodyProductionEnded); + AsyncCallWrapper(91,5, BodyPipe, tellBodyProducerAborted); + + private: + ssize_t theBodySize; // expected total content length, if known + Producer *theProducer; // content producer, if any + Consumer *theConsumer; // content consumer, if any + + size_t thePutSize; // ever-increasing total + size_t theGetSize; // ever-increasing total + + MemBuf theBuf; // produced but not yet consumed content, if any + + bool isCheckedOut; // to keep track of checkout violations + + CBDATA_CLASS2(BodyPipe); +}; + +#endif /* SQUID_BODY_PIPE_H */ --- squid3/src/BodyReader.cc Wed Feb 14 13:38:37 2007 +++ /dev/null Wed Feb 14 13:37:19 2007 @@ -1,162 +0,0 @@ - - -#include "squid.h" -#include "MemBuf.h" -#include "BodyReader.h" - -BodyReader::BodyReader(size_t len, BodyReadFunc *r, BodyAbortFunc *a, BodyKickFunc *k, void *d) : - _remaining(len), _available(0), - read_func(r), abort_func(a), kick_func(k), read_func_data(d), - read_callback(NULL), read_callback_data(NULL) -{ - theBuf.init(4096, 65536); - debugs(32,3,HERE << this << " " << "created new BodyReader for content-length " << len); - bytes_read = 0; -} - -BodyReader::~BodyReader() -{ - debugs(32,3,HERE << this << " destroying with " << _remaining << - " remaining bytes and " << abort_func << " abort_func"); - - if (_remaining && abort_func) - abort_func(read_func_data, _remaining); - - if (callbackPending()) - doCallback(); - -} - -void -BodyReader::read(CBCB *callback, void *cbdata) -{ - assert(_remaining || theBuf.contentSize()); - debugs(32,3,HERE << this << " " << "remaining = " << _remaining); - debugs(32,3,HERE << this << " " << "available = " << _available); - - if (read_callback == NULL) { - read_callback = callback; - read_callback_data = cbdataReference(cbdata); - } else { - assert(read_callback == callback); - assert(read_callback_data == cbdata); - } - - if ((_available == 0) && (theBuf.contentSize() == 0)) { - debugs(32,3,HERE << this << " " << "read: no body data available, saving callback pointers"); - - if (kick_func) - kick_func(read_func_data); - - return; - } - - debugs(32,3,HERE << this << " " << "read_func=" << read_func); - debugs(32,3,HERE << this << " " << "data=" << read_func_data); - size_t size = theBuf.potentialSpaceSize(); - - debugs(32, 3, "BodyReader::read: available: " << _available << ", size " << size << ", remaining: " << _remaining); - - if (size > _available) - size = _available; - - if (size > _remaining) - size = _remaining; - - if (size > 0) { - debugs(32,3,HERE << this << " " << "calling read_func for " << size << " bytes"); - - size_t nread = read_func(read_func_data, theBuf, size); - - if (nread > 0) { - _available -= nread; - reduce_remaining(nread); - } else { - debugs(32,3,HERE << this << " " << "Help, read_func() ret " << nread); - } - } - - if (theBuf.contentSize() > 0) { - debugs(32,3,HERE << this << " have " << theBuf.contentSize() << " bytes in theBuf, calling back"); - doCallback(); - } -} - -void -BodyReader::notify(size_t now_available) -{ - debugs(32,3,HERE << this << " " << "old available = " << _available); - debugs(32,3,HERE << this << " " << "now_available = " << now_available); - _available = now_available; - - if (!callbackPending()) { - debugs(32,3,HERE << this << " " << "no callback pending, nothing to do"); - return; - } - - debugs(32,3,HERE << this << " " << "have data and pending callback, calling read()"); - - read(read_callback, read_callback_data); -} - -bool -BodyReader::callbackPending() -{ - return read_callback ? true : false; -} - -/* - * doCallback - * - * Execute the read callback if there is a function registered - * and the read_callback_data is still valid. - */ -bool -BodyReader::doCallback() -{ - CBCB *t_callback = read_callback; - void *t_cbdata; - - if (t_callback == NULL) - return false; - - read_callback = NULL; - - if (!cbdataReferenceValidDone(read_callback_data, &t_cbdata)) - return false; - - debugs(32,3,HERE << this << " doing callback, theBuf size = " << theBuf.contentSize()); - - t_callback(theBuf, t_cbdata); - - return true; -} - -bool -BodyReader::consume(size_t size) -{ - debugs(32,3,HERE << this << " BodyReader::consume consuming " << size); - - if (theBuf.contentSize() < (mb_size_t) size) { - debugs(0,0,HERE << this << "BodyReader::consume failed"); - debugs(0,0,HERE << this << "BodyReader::consume size = " << size); - debugs(0,0,HERE << this << "BodyReader::consume contentSize() = " << theBuf.contentSize()); - return false; - } - - theBuf.consume(size); - - if (callbackPending() && _available > 0) { - debugs(32,3,HERE << this << " " << "data avail and pending callback, calling read()"); - read(read_callback, read_callback_data); - } - - return true; -} - -void -BodyReader::reduce_remaining(size_t size) -{ - assert(size <= _remaining); - _remaining -= size; -} --- squid3/src/BodyReader.h Wed Feb 14 13:38:37 2007 +++ /dev/null Wed Feb 14 13:37:19 2007 @@ -1,56 +0,0 @@ - -#ifndef SQUID_BODY_READER_H -#define SQUID_BODY_READER_H - -typedef void CBCB (MemBuf &mb, void *data); -typedef size_t BodyReadFunc (void *, MemBuf &mb, size_t size); -typedef void BodyAbortFunc (void *, size_t); -typedef void BodyKickFunc (void *); - -class BodyReader : public RefCountable -{ - -public: - typedef RefCount Pointer; - BodyReader(size_t len, BodyReadFunc *r, BodyAbortFunc *a, BodyKickFunc *k, void *d); - ~BodyReader(); - void read(CBCB *, void *); - void notify(size_t now_available); - size_t remaining() { return _remaining; } - - bool callbackPending(); - bool consume(size_t size); - - int bytes_read; - - /* reduce the number of bytes that the BodyReader is looking for. - * Will trigger an assertion if it tries to reduce below zero - */ - void reduce_remaining(size_t size); - -private: - size_t _remaining; - size_t _available; - MemBuf theBuf; - - /* - * These are for interacting with things that - * "provide" body content. ie, ConnStateData and - * ICAPReqMod after adapation. - */ - BodyReadFunc *read_func; - BodyAbortFunc *abort_func; - BodyKickFunc *kick_func; - void *read_func_data; - - /* - * These are for interacting with things that - * "consume" body content. ie, HttpStateData and - * ICAPReqMod before adaptation. - */ - CBCB *read_callback; - void *read_callback_data; - bool doCallback(); -}; - -#endif