--------------------- PatchSet 6240 Date: 2007/12/05 04:42:48 Author: rousskov Branch: async-calls Tag: (none) Log: Simplified BodyProducerCall/BodyConsumerCall classes using revised AsyncCall/Dialer API. These classes are dialers now, which removes one level of complexity. Besides the mandatory constructor, they overwrite canDial method to ignore late calls. Replaced ignoreLateProducer/ConsumerNote with stillProducing/stillConsuming methods that seem more intuitive. Members: src/BodyPipe.cc:1.7.4.7->1.7.4.8 src/BodyPipe.h:1.5.4.2->1.5.4.3 Index: squid3/src/BodyPipe.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/BodyPipe.cc,v retrieving revision 1.7.4.7 retrieving revision 1.7.4.8 diff -u -r1.7.4.7 -r1.7.4.8 --- squid3/src/BodyPipe.cc 29 Nov 2007 20:39:37 -0000 1.7.4.7 +++ squid3/src/BodyPipe.cc 5 Dec 2007 04:42:48 -0000 1.7.4.8 @@ -30,54 +30,68 @@ CBDATA_CLASS_INIT(BodySink); -// The BodyProducerCall is an AsyncCall class which used to schedule BodyProducer calls. +// The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls. // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of // the BodyPipe passed as argument -class BodyProducerCall: - public JobCallT< UnaryMemFunT > +class BodyProducerDialer: public UnaryMemFunT { public: - typedef JobCallT< UnaryMemFunT > Parent; - typedef void (BodyProducer::*noteConsumerHandler)(BodyPipe::Pointer ); + typedef UnaryMemFunT Parent; - BodyProducerCall(BodyProducer *aProducer, noteConsumerHandler aHandler, - BodyPipe::Pointer bp, const char *callName): - Parent(MemFun(aProducer, aHandler, bp), 93, 5, callName){} - - virtual bool fire() { - BodyPipe::Pointer thePipe = theDialer.arg1(); - BodyProducer *theProducer = theDialer.object(); - assert(theProducer); - if(!cbdataReferenceValid(theCbdata)) - return false; - return !theProducer->ignoreLateConsumerNote(thePipe) && Parent::fire(); - } + BodyProducerDialer(BodyProducer *aProducer, Parent::Method aHandler, + BodyPipe::Pointer bp): Parent(aProducer, aHandler, bp) {} + + virtual bool canDial(); }; -// The BodyConsumerCall is an AsyncCall class which used to schedule BodyConsumer calls. +// The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls. // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient // of the BodyPipe passed as argument -class BodyConsumerCall: - public JobCallT< UnaryMemFunT > +class BodyConsumerDialer: public UnaryMemFunT { public: - typedef JobCallT< UnaryMemFunT > Parent; - typedef void (BodyConsumer::*noteProducerHandler)(BodyPipe::Pointer ); + typedef UnaryMemFunT Parent; + + BodyConsumerDialer(BodyConsumer *aConsumer, Parent::Method aHandler, + BodyPipe::Pointer bp): Parent(aConsumer, aHandler, bp) {} - BodyConsumerCall(BodyConsumer *aConsumer, noteProducerHandler aHandler, - BodyPipe::Pointer bp, const char *callName): - Parent(MemFun(aConsumer, aHandler, bp), 93, 5, callName){} - - virtual bool fire() { - BodyPipe::Pointer thePipe = theDialer.arg1(); - BodyConsumer *theConsumer = theDialer.object(); - assert(theConsumer); - if(!cbdataReferenceValid(theCbdata)) - return false; - return !theConsumer->ignoreLateProducerNote(thePipe) && Parent::fire(); - } + virtual bool canDial(); }; +bool +BodyProducerDialer::canDial() { + if (!Parent::canDial()) + return false; + + BodyProducer *producer = object; + BodyPipe::Pointer pipe = arg1; + if (!pipe->stillProducing(producer)) { + debugs(call->debugSection, call->debugLevel, HERE << producer << + " no longer producing for " << pipe->status()); + return call->cancel("no longer producing"); + } + + return true; +} + +bool +BodyConsumerDialer::canDial() { + if (!Parent::canDial()) + return false; + + BodyConsumer *consumer = object; + BodyPipe::Pointer pipe = arg1; + if (!pipe->stillConsuming(consumer)) { + debugs(call->debugSection, call->debugLevel, HERE << consumer << + " no longer consuming from " << pipe->status()); + return call->cancel("no longer consuming"); + } + + return true; +} + + +/* BodyProducer */ // inform the pipe that we are done and clear the Pointer void BodyProducer::stopProducingFor(RefCount &pipe, bool atEof) @@ -89,11 +103,9 @@ pipe = NULL; } -bool BodyProducer::ignoreLateConsumerNote(RefCount &bp) -{ - assert(bp!=NULL); - return !bp->checkProducer(this); -} + + +/* BodyConsumer */ // inform the pipe that we are done and clear the Pointer void BodyConsumer::stopConsumingFrom(RefCount &pipe) @@ -104,11 +116,6 @@ pipe = NULL; } -bool BodyConsumer::ignoreLateProducerNote(RefCount &bp) -{ - assert(bp!=NULL); - return !bp->checkConsumer(this); -} /* BodyPipe */ @@ -240,9 +247,12 @@ debugs(91,7, HERE << "clearing consumer" << status()); theConsumer = NULL; theCCallsToSkip = theCCallsPending; // skip all pending consumer calls - if (consumedSize() && !exhausted()){ - AsyncCall *call= new BodyProducerCall(theProducer, &BodyProducer::noteBodyConsumerAborted, this, "BodyProducer::noteBodyConsumerAborted"); - ScheduleCallHere(call); + if (consumedSize() && !exhausted()) { + AsyncCall *call= asyncCall(91, 7, + "BodyProducer::noteBodyConsumerAborted", + BodyProducerDialer(theProducer, + &BodyProducer::noteBodyConsumerAborted, this)); + ScheduleCallHere(call); } } } @@ -276,7 +286,7 @@ mustAutoConsume = true; debugs(91,5, HERE << "enabled auto consumption" << status()); if (!theConsumer && theBuf.hasContent()){ - theConsumer = new BodySink; + theConsumer = new BodySink; scheduleBodyDataNotification(); } } @@ -323,8 +333,11 @@ theGetSize += size; debugs(91,7, HERE << "consumed " << size << " bytes" << status()); if (mayNeedMoreData()){ - AsyncCall *call= new BodyProducerCall(theProducer, &BodyProducer::noteMoreBodySpaceAvailable, this, "BodyProducer::noteMoreBodySpaceAvailable"); - ScheduleCallHere(call); + AsyncCall *call= asyncCall(91, 7, + "BodyProducer::noteMoreBodySpaceAvailable", + BodyProducerDialer(theProducer, + &BodyProducer::noteMoreBodySpaceAvailable, this)); + ScheduleCallHere(call); } } @@ -348,7 +361,10 @@ { if (theConsumer || mustAutoConsume) { ++theCCallsPending; - AsyncCall *call= new BodyConsumerCall(theConsumer, &BodyConsumer::noteMoreBodyDataAvailable, this, "BodyConsumer::noteMoreBodyDataAvailable"); + AsyncCall *call = asyncCall(91, 7, + "BodyConsumer::noteMoreBodyDataAvailable", + BodyConsumerDialer(theConsumer, + &BodyConsumer::noteMoreBodyDataAvailable, this)); ScheduleCallHere(call); } } @@ -359,12 +375,18 @@ if (theConsumer) { ++theCCallsPending; if (bodySizeKnown() && bodySize() == thePutSize) { - AsyncCall *call= new BodyConsumerCall(theConsumer, &BodyConsumer::noteBodyProductionEnded, this, "BodyConsumer::noteBodyProductionEnded"); - ScheduleCallHere(call); + AsyncCall *call = asyncCall(91, 7, + "BodyConsumer::noteBodyProductionEnded", + BodyConsumerDialer(theConsumer, + &BodyConsumer::noteBodyProductionEnded, this)); + ScheduleCallHere(call); } - else{ - AsyncCall *call= new BodyConsumerCall(theConsumer, &BodyConsumer::noteBodyProducerAborted, this, "BodyConsumer::noteBodyProducerAborted"); - ScheduleCallHere(call); + else { + AsyncCall *call = asyncCall(91, 7, + "BodyConsumer::noteBodyProducerAborted", + BodyConsumerDialer(theConsumer, + &BodyConsumer::noteBodyProducerAborted, this)); + ScheduleCallHere(call); } } } Index: squid3/src/BodyPipe.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/BodyPipe.h,v retrieving revision 1.5.4.2 retrieving revision 1.5.4.3 diff -u -r1.5.4.2 -r1.5.4.3 --- squid3/src/BodyPipe.h 21 Nov 2007 21:59:38 -0000 1.5.4.2 +++ squid3/src/BodyPipe.h 5 Dec 2007 04:42:48 -0000 1.5.4.3 @@ -19,7 +19,6 @@ virtual void noteMoreBodySpaceAvailable(RefCount bp) = 0; virtual void noteBodyConsumerAborted(RefCount bp) = 0; - bool ignoreLateConsumerNote(RefCount &bp); protected: void stopProducingFor(RefCount &pipe, bool atEof); }; @@ -37,7 +36,6 @@ virtual void noteBodyProductionEnded(RefCount bp) = 0; virtual void noteBodyProducerAborted(RefCount bp) = 0; - bool ignoreLateProducerNote(RefCount &bp); protected: void stopConsumingFrom(RefCount &pipe); }; @@ -98,7 +96,7 @@ bool mayNeedMoreData() const { return !bodySizeKnown() || needsMoreData(); } bool needsMoreData() const { return bodySizeKnown() && unproducedSize() > 0; } uint64_t unproducedSize() const; // size of still unproduced data - bool checkProducer(Producer *aProducer) { return aProducer == theProducer;} + bool stillProducing(Producer *producer) const { return theProducer == producer; } // called by consumers bool setConsumerIfNotLate(Consumer *aConsumer); @@ -107,7 +105,7 @@ void consume(size_t size); bool expectMoreAfter(uint64_t offset) const; bool exhausted() const; // saw eof/abort and all data consumed - bool checkConsumer(Consumer *aConsumer) { return aConsumer == theConsumer;} + bool stillConsuming(Consumer *consumer) const { return theConsumer == consumer; } // start or continue consuming when there is no consumer void enableAutoConsumption();