--------------------- PatchSet 6156 Date: 2007/11/20 15:40:34 Author: rousskov Branch: ecap Tag: (none) Log: Added sketches of eCAP service and transaction classes using USE_ECAP_AS_ICAP_HACK. This code compiles but does not link because EcapXaction implementation is missing. Members: src/eCAP/Makefile.am:1.1->1.1.2.1 src/eCAP/ServiceRep.cc:1.1->1.1.2.1 src/eCAP/ServiceRep.h:1.1->1.1.2.1 src/eCAP/Xaction.cc:1.1->1.1.2.1 src/eCAP/Xaction.h:1.1->1.1.2.1 --- /dev/null Wed Nov 21 01:21:49 2007 +++ squid3/src/eCAP/Makefile.am Wed Nov 21 01:21:49 2007 @@ -0,0 +1,33 @@ +# Makefile for the eCAP library +# +# $Id: Makefile.am,v 1.1.2.1 2007/11/20 15:40:34 rousskov Exp $ +# + +AM_CFLAGS = @SQUID_CFLAGS@ +AM_CXXFLAGS = @SQUID_CXXFLAGS@ + +noinst_LTLIBRARIES = libeCAP.la + +libeCAP_la_SOURCES = \ + Registry.h \ + ContentAdapter.cc \ + ContentAdapter.h \ + \ + ServiceRep.cc \ + ServiceRep.h \ + Xaction.cc \ + Xaction.h + +INCLUDES = -I. -I$(top_builddir)/include -I$(top_srcdir)/include \ + -I$(top_srcdir)/src + + +# Sample adapter section. + +EXTRA_DIST = \ + MinimalAdapter.cc + +lib_LTLIBRARIES = MinimalAdapter.la +MinimalAdapter_la_SOURCES = MinimalAdapter.cc +MinimalAdapter_la_LDFLAGS = -module -avoid-version +MinimalAdapter_la_LIBADD = ./libeCAP.la --- /dev/null Wed Nov 21 01:21:49 2007 +++ squid3/src/eCAP/ServiceRep.cc Wed Nov 21 01:21:49 2007 @@ -0,0 +1,539 @@ +/* + * DEBUG: section 93 ICAP (RFC 3507) Client + */ + +#include "squid.h" +#include "eCAP/ServiceRep.h" + +#include "ICAP/TextException.h" +#include "HttpReply.h" +#include "eCAP/ServiceRep.h" +#include "ICAP/ICAPOptions.h" +#include "ConfigParser.h" +#include "ICAP/ICAPConfig.h" +#include "SquidTime.h" + +CBDATA_CLASS_INIT(EcapServiceRep); + +EcapServiceRep::EcapServiceRep(): method(ICAP::methodNone), + point(ICAP::pointNone), bypass(false), + theOptions(NULL), + theSessionFailures(0), isSuspended(0), notifying(false), + self(NULL), + wasAnnouncedUp(true) // do not announce an "up" service at startup +{} + +EcapServiceRep::~EcapServiceRep() +{ + changeOptions(0); +} + +const char * +EcapServiceRep::methodStr() const +{ + return ICAP::methodStr(method); +} + +ICAP::Method +EcapServiceRep::parseMethod(const char *str) const +{ + if (!strncasecmp(str, "REQMOD", 6)) + return ICAP::methodReqmod; + + if (!strncasecmp(str, "RESPMOD", 7)) + return ICAP::methodRespmod; + + return ICAP::methodNone; +} + + +const char * +EcapServiceRep::vectPointStr() const +{ + return ICAP::vectPointStr(point); +} + +ICAP::VectPoint +EcapServiceRep::parseVectPoint(const char *service) const +{ + const char *t = service; + const char *q = strchr(t, '_'); + + if (q) + t = q + 1; + + if (!strcasecmp(t, "precache")) + return ICAP::pointPreCache; + + if (!strcasecmp(t, "postcache")) + return ICAP::pointPostCache; + + return ICAP::pointNone; +} + +bool +EcapServiceRep::configure(Pointer &aSelf) +{ + assert(!self && aSelf != NULL); + self = aSelf; + + char *service_type = NULL; + + ConfigParser::ParseString(&key); + ConfigParser::ParseString(&service_type); + ConfigParser::ParseBool(&bypass); + ConfigParser::ParseString(&uri); + + debugs(3, 5, "eCAPService::parseConfigLine (line " << config_lineno << "): " << key.buf() << " " << service_type << " " << bypass); + + method = parseMethod(service_type); + point = parseVectPoint(service_type); + + debugs(3, 5, "eCAPService::parseConfigLine (line " << config_lineno << "): service is " << methodStr() << "_" << vectPointStr()); + + if (uri.cmp("ecap://", 7) != 0) { + debugs(3, 0, "eCAPService::parseConfigLine (line " << config_lineno << "): wrong uri: " << uri.buf()); + return false; + } + + const char *s = uri.buf() + 7; + + const char *e; + + bool have_port = false; + + if ((e = strchr(s, ':')) != NULL) { + have_port = true; + } else if ((e = strchr(s, '/')) != NULL) { + have_port = false; + } else { + return false; + } + + int len = e - s; + host.limitInit(s, len); + s = e; + + s++; + e = strchr(s, '\0'); + len = e - s; + + if (len > 1024) { + debugs(3, 0, "ecap_service_process (line " << config_lineno << "): long resource name (>1024), probably wrong"); + } + + resource.limitInit(s, len + 1); + + if ((bypass != 0) && (bypass != 1)) { + return false; + } + + return true; + +}; + +void EcapServiceRep::invalidate() +{ + assert(self != NULL); + Pointer savedSelf = self; // to prevent destruction when we nullify self + self = NULL; + + announceStatusChange("invalidated by reconfigure", false); + + savedSelf = NULL; // may destroy us and, hence, invalidate cbdata(this) + // TODO: it would be nice to invalidate cbdata(this) when not destroyed +} + +void EcapServiceRep::noteFailure() { + ++theSessionFailures; + debugs(93,4, theSessionFailures << " ICAPService failures, out of " << + TheICAPConfig.service_failure_limit << " allowed " << status()); + + if (isSuspended) + return; + + if (TheICAPConfig.service_failure_limit >= 0 && + theSessionFailures > TheICAPConfig.service_failure_limit) + suspend("too many failures"); + + // TODO: Should bypass setting affect how much Squid tries to talk to + // the ICAP service that is currently unusable and is likely to remain + // so for some time? The current code says "no". Perhaps the answer + // should be configurable. +} + +void EcapServiceRep::suspend(const char *reason) { + if (isSuspended) { + debugs(93,4, "keeping ICAPService suspended, also for " << reason); + } else { + isSuspended = reason; + debugs(93,1, "suspending ICAPService for " << reason); + scheduleUpdate(squid_curtime + TheICAPConfig.service_revival_delay); + announceStatusChange("suspended", true); + } +} + +bool EcapServiceRep::probed() const +{ + return true; // theLastUpdate != 0; +} + +bool EcapServiceRep::hasOptions() const { + return theOptions && theOptions->valid() && theOptions->fresh(); +} + +bool EcapServiceRep::up() const +{ + return self != NULL && !isSuspended && hasOptions(); +} + +bool EcapServiceRep::broken() const +{ + return probed() && !up(); +} + +bool EcapServiceRep::wantsUrl(const String &urlPath) const +{ + Must(hasOptions()); + return theOptions->transferKind(urlPath) != ICAPOptions::xferIgnore; +} + +bool EcapServiceRep::wantsPreview(const String &urlPath, size_t &wantedSize) const +{ + Must(hasOptions()); + + if (theOptions->preview < 0) + return false; + + if (theOptions->transferKind(urlPath) != ICAPOptions::xferPreview) + return false; + + wantedSize = theOptions->preview; + + return true; +} + +bool EcapServiceRep::allows204() const +{ + Must(hasOptions()); + return true; // in the future, we may have ACLs to prevent 204s +} + + +static +void EcapServiceRep_noteTimeToUpdate(void *data) +{ + EcapServiceRep *service = static_cast(data); + Must(service); + service->noteTimeToUpdate(); +} + +void EcapServiceRep::noteTimeToUpdate() +{ + if (self != NULL) + updateScheduled = false; + + if (!self) { + debugs(93,5, "ICAPService ignores options update " << status()); + return; + } + + debugs(93,5, "ICAPService performs a regular options update " << status()); + startGettingOptions(); +} + +static +void EcapServiceRep_noteTimeToNotify(void *data) +{ + EcapServiceRep *service = static_cast(data); + Must(service); + service->noteTimeToNotify(); +} + +void EcapServiceRep::noteTimeToNotify() +{ + Must(!notifying); + notifying = true; + debugs(93,7, "ICAPService notifies " << theClients.size() << " clients " << + status()); + + // note: we must notify even if we are invalidated + + Pointer us = NULL; + + while (!theClients.empty()) { + Client i = theClients.pop_back(); + us = i.service; // prevent callbacks from destroying us while we loop + + if (cbdataReferenceValid(i.data)) + (*i.callback)(i.data, us); + + cbdataReferenceDone(i.data); + } + + notifying = false; +} + +void EcapServiceRep::callWhenReady(Callback *cb, void *data) +{ + debugs(93,5, HERE << "ICAPService is asked to call " << data << + " when ready " << status()); + + Must(cb); + Must(self != NULL); + Must(!broken()); // we do not wait for a broken service + + Client i; + i.service = self; + i.callback = cb; + i.data = cbdataReference(data); + theClients.push_back(i); + + if (theOptionsFetcher || notifying) + return; // do nothing, we will be picked up in noteTimeToNotify() + + if (needNewOptions()) + startGettingOptions(); + else + scheduleNotification(); +} + +void EcapServiceRep::scheduleNotification() +{ + debugs(93,7, "ICAPService will notify " << theClients.size() << " clients"); + eventAdd("EcapServiceRep::noteTimeToNotify", &EcapServiceRep_noteTimeToNotify, this, 0, 0, true); +} + +bool EcapServiceRep::needNewOptions() const +{ + return self != NULL && !up(); +} + +void EcapServiceRep::changeOptions(ICAPOptions *newOptions) +{ + debugs(93,8, "ICAPService changes options from " << theOptions << " to " << + newOptions << ' ' << status()); + + delete theOptions; + theOptions = newOptions; + theSessionFailures = 0; + isSuspended = 0; + theLastUpdate = squid_curtime; + + checkOptions(); + announceStatusChange("down after an options fetch failure", true); +} + +void EcapServiceRep::checkOptions() +{ + if (theOptions == NULL) + return; + + if (!theOptions->valid()) { + debugs(93,1, "WARNING: Squid got an invalid ICAP OPTIONS response " << + "from service " << uri << "; error: " << theOptions->error); + return; + } + + /* + * Issue a warning if the ICAP server returned methods in the + * options response that don't match the method from squid.conf. + */ + + if (!theOptions->methods.empty()) { + bool method_found = false; + String method_list; + Vector ::iterator iter = theOptions->methods.begin(); + + while (iter != theOptions->methods.end()) { + + if (*iter == method) { + method_found = true; + break; + } + + method_list.append(ICAP::methodStr(*iter)); + method_list.append(" ", 1); + iter++; + } + + if (!method_found) { + debugs(93,1, "WARNING: Squid is configured to use ICAP method " << + ICAP::methodStr(method) << + " for service " << uri.buf() << + " but OPTIONS response declares the methods are " << method_list.buf()); + } + } + + + /* + * Check the ICAP server's date header for clock skew + */ + const int skew = (int)(theOptions->timestamp() - squid_curtime); + if (abs(skew) > theOptions->ttl()) { + // TODO: If skew is negative, the option will be considered down + // because of stale options. We should probably change this. + debugs(93, 1, "ICAP service's clock is skewed by " << skew << + " seconds: " << uri.buf()); + } +} + +void EcapServiceRep::announceStatusChange(const char *downPhrase, bool important) const +{ + if (wasAnnouncedUp == up()) // no significant changes to announce + return; + + const char *what = bypass ? "optional" : "essential"; + const char *state = wasAnnouncedUp ? downPhrase : "up"; + const int level = important ? 1 : 2; + debugs(93,level, what << " ICAP service is " << state << ": " << uri << + ' ' << status()); + + wasAnnouncedUp = !wasAnnouncedUp; +} + +static +void EcapServiceRep_noteGenerateOptions(void *data) +{ + EcapServiceRep *service = static_cast(data); + Must(service); + service->noteGenerateOptions(); +} + +// we are receiving ICAP OPTIONS response headers here or NULL on failures +void EcapServiceRep::noteGenerateOptions() +{ + Must(theOptionsFetcher); + theOptionsFetcher = NULL; + + debugs(93,5, "ICAPService is generating new options " << status()); + + ICAPOptions *newOptions = new ICAPOptions; + throw TexcHere("configure eCAP options"); + + handleNewOptions(newOptions); +} + +void EcapServiceRep::handleNewOptions(ICAPOptions *newOptions) +{ + // new options may be NULL + changeOptions(newOptions); + + debugs(93,3, "ICAPService got new options and is now " << status()); + + scheduleUpdate(optionsFetchTime()); + scheduleNotification(); +} + +void EcapServiceRep::startGettingOptions() +{ + Must(!theOptionsFetcher); + debugs(93,6, "ICAPService will generate new options " << status()); + + theOptionsFetcher = &EcapServiceRep_noteGenerateOptions; + eventAdd("EcapServiceRep::GenerateOptions", + theOptionsFetcher, this, 0, 0, true); +} + +void EcapServiceRep::scheduleUpdate(time_t when) +{ + if (updateScheduled) { + debugs(93,7, "ICAPService reschedules update"); + // XXX: check whether the event is there because AR saw + // an unreproducible eventDelete assertion on 2007/06/18 + if (eventFind(&EcapServiceRep_noteTimeToUpdate, this)) + eventDelete(&EcapServiceRep_noteTimeToUpdate, this); + else + debugs(93,1, "XXX: ICAPService lost an update event."); + updateScheduled = false; + } + + debugs(93,7, HERE << "raw OPTIONS fetch at " << when << " or in " << + (when - squid_curtime) << " sec"); + debugs(93,9, HERE << "last fetched at " << theLastUpdate << " or " << + (squid_curtime - theLastUpdate) << " sec ago"); + + /* adjust update time to prevent too-frequent updates */ + + if (when < squid_curtime) + when = squid_curtime; + + // XXX: move hard-coded constants from here to TheICAPConfig + const int minUpdateGap = 30; // seconds + if (when < theLastUpdate + minUpdateGap) + when = theLastUpdate + minUpdateGap; + + const int delay = when - squid_curtime; + debugs(93,5, "ICAPService will fetch OPTIONS in " << delay << " sec"); + + eventAdd("EcapServiceRep::noteTimeToUpdate", + &EcapServiceRep_noteTimeToUpdate, this, delay, 0, true); + updateScheduled = true; +} + +// returns absolute time when OPTIONS should be fetched +time_t +EcapServiceRep::optionsFetchTime() const +{ + if (theOptions && theOptions->valid()) { + const time_t expire = theOptions->expire(); + debugs(93,7, "ICAPService options expire on " << expire << " >= " << squid_curtime); + + // conservative estimate of how long the OPTIONS transaction will take + // XXX: move hard-coded constants from here to TheICAPConfig + const int expectedWait = 20; // seconds + + // Unknown or invalid (too small) expiration times should not happen. + // ICAPOptions should use the default TTL, and ICAP servers should not + // send invalid TTLs, but bugs and attacks happen. + if (expire < expectedWait) + return squid_curtime; + else + return expire - expectedWait; // before the current options expire + } + + // use revival delay as "expiration" time for a service w/o valid options + return squid_curtime + TheICAPConfig.service_revival_delay; +} + +// returns a temporary string depicting service status, for debugging +const char *EcapServiceRep::status() const +{ + static MemBuf buf; + + buf.reset(); + buf.append("[", 1); + + if (up()) + buf.append("up", 2); + else { + buf.append("down", 4); + if (!self) + buf.append(",gone", 5); + if (isSuspended) + buf.append(",susp", 5); + + if (!theOptions) + buf.append(",!opt", 5); + else + if (!theOptions->valid()) + buf.append(",!valid", 7); + else + if (!theOptions->fresh()) + buf.append(",stale", 6); + } + + if (theOptionsFetcher) + buf.append(",fetch", 6); + + if (notifying) + buf.append(",notif", 6); + + if (theSessionFailures > 0) + buf.Printf(",fail%d", theSessionFailures); + + buf.append("]", 1); + buf.terminate(); + + return buf.content(); +} --- /dev/null Wed Nov 21 01:21:49 2007 +++ squid3/src/eCAP/ServiceRep.h Wed Nov 21 01:21:49 2007 @@ -0,0 +1,122 @@ + +/* + * $Id: ServiceRep.h,v 1.1.2.1 2007/11/20 15:40:34 rousskov Exp $ + * + */ + +#ifndef SQUID_ECAP_SERVICEREP_H +#define SQUID_ECAP_SERVICEREP_H + +#include "cbdata.h" +#include "event.h" +#include "ICAP/ICAPInitiator.h" +#include "ICAP/ICAPElements.h" + +#if USE_ECAP_AS_ICAP_HACK +#define EcapServiceRep ICAPServiceRep +#endif + +class ICAPOptions; +class ICAPOptXact; + +class EcapServiceRep : public RefCountable +{ + +public: + typedef RefCount Pointer; + +public: + EcapServiceRep(); + virtual ~EcapServiceRep(); + + bool configure(Pointer &aSelf); // needs self pointer for ICAPOptXact + void invalidate(); // call when the service is no longer needed or valid + + const char *methodStr() const; + const char *vectPointStr() const; + + bool probed() const; // see comments above + bool broken() const; // see comments above + bool up() const; // see comments above + + typedef void Callback(void *data, Pointer &service); + void callWhenReady(Callback *cb, void *data); + + // the methods below can only be called on an up() service + bool wantsUrl(const String &urlPath) const; + bool wantsPreview(const String &urlPath, size_t &wantedSize) const; + bool allows204() const; + + void noteFailure(); // called by transactions to report service failure + +public: + String key; + ICAP::Method method; + ICAP::VectPoint point; + String uri; // service URI + + // URI components + String host; + int port; + String resource; + + // XXX: use it when selecting a service and handling ICAP errors! + bool bypass; + +public: // treat these as private, they are for callbacks only + void noteTimeToUpdate(); + void noteTimeToNotify(); + void noteGenerateOptions(); + +private: + // stores Prepare() callback info + + struct Client + { + Pointer service; // one for each client to preserve service + Callback *callback; + void *data; + }; + + typedef Vector Clients; + Clients theClients; // all clients waiting for a call back + + ICAPOptions *theOptions; + EVH *theOptionsFetcher; // pending ICAP OPTIONS transaction + time_t theLastUpdate; // time the options were last updated + + static const int TheSessionFailureLimit; + int theSessionFailures; + const char *isSuspended; // also stores suspension reason for debugging + + bool notifying; // may be true in any state except for the initial + bool updateScheduled; // time-based options update has been scheduled + +private: + ICAP::Method parseMethod(const char *) const; + ICAP::VectPoint parseVectPoint(const char *) const; + + void suspend(const char *reason); + + bool hasOptions() const; + bool needNewOptions() const; + time_t optionsFetchTime() const; + + void scheduleUpdate(time_t when); + void scheduleNotification(); + + void startGettingOptions(); + void handleNewOptions(ICAPOptions *newOptions); + void changeOptions(ICAPOptions *newOptions); + void checkOptions(); + + void announceStatusChange(const char *downPhrase, bool important) const; + + const char *status() const; + + Pointer self; + mutable bool wasAnnouncedUp; // prevent sequential same-state announcements + CBDATA_CLASS2(EcapServiceRep); +}; + +#endif /* SQUID_ECAP_SERVICEREP_H */ --- /dev/null Wed Nov 21 01:21:49 2007 +++ squid3/src/eCAP/Xaction.cc Wed Nov 21 01:21:49 2007 @@ -0,0 +1,508 @@ +/* + * DEBUG: section 93 ICAP (RFC 3507) Client + */ + +#include "squid.h" +#include "HttpMsg.h" +#include "eCAP/Xaction.h" + +int EcapXaction::TheLastId = 0; + +CBDATA_CLASS_INIT(EcapXaction); + +#if 0 + +/* comm module handlers (wrappers around corresponding ICAPXaction methods */ + +// TODO: Teach comm module to call object methods directly + +static +ICAPXaction &ICAPXaction_fromData(void *data) +{ + ICAPXaction *x = static_cast(data); + assert(x); + return *x; +} + +static +void ICAPXaction_noteCommTimedout(int, void *data) +{ + ICAPXaction_fromData(data).noteCommTimedout(); +} + +static +void ICAPXaction_noteCommClosed(int, void *data) +{ + ICAPXaction_fromData(data).noteCommClosed(); +} + +static +void ICAPXaction_noteCommConnected(int, comm_err_t status, int xerrno, void *data) +{ + ICAPXaction_fromData(data).noteCommConnected(status); +} + +static +void ICAPXaction_noteCommWrote(int, char *, size_t size, comm_err_t status, int xerrno, void *data) +{ + ICAPXaction_fromData(data).noteCommWrote(status, size); +} + +static +void ICAPXaction_noteCommRead(int, char *, size_t size, comm_err_t status, int xerrno, void *data) +{ + debugs(93,3,HERE << data << " read returned " << size); + ICAPXaction_fromData(data).noteCommRead(status, size); +} + +ICAPXaction::ICAPXaction(const char *aTypeName, ICAPInitiator *anInitiator, ICAPServiceRep::Pointer &aService): + ICAPInitiate(aTypeName, anInitiator, aService), + id(++TheLastId), + connection(-1), + commBuf(NULL), commBufSize(0), + commEof(false), + reuseConnection(true), + isRetriable(true), + ignoreLastWrite(false), + connector(NULL), reader(NULL), writer(NULL), closer(NULL) +{ + debugs(93,3, typeName << " constructed, this=" << this << + " [icapx" << id << ']'); // we should not call virtual status() here +} + +ICAPXaction::~ICAPXaction() +{ + debugs(93,3, typeName << " destructed, this=" << this << + " [icapx" << id << ']'); // we should not call virtual status() here +} + +void ICAPXaction::disableRetries() { + debugs(93,5, typeName << (isRetriable ? " becomes" : " remains") << + " final" << status()); + isRetriable = false; +} + +void ICAPXaction::start() +{ + ICAPInitiate::start(); + + readBuf.init(SQUID_TCP_SO_RCVBUF, SQUID_TCP_SO_RCVBUF); + commBuf = (char*)memAllocBuf(SQUID_TCP_SO_RCVBUF, &commBufSize); + // make sure maximum readBuf space does not exceed commBuf size + Must(static_cast(readBuf.potentialSpaceSize()) <= commBufSize); +} + +// TODO: obey service-specific, OPTIONS-reported connection limit +void ICAPXaction::openConnection() +{ + Must(connection < 0); + + const ICAPServiceRep &s = service(); + + if (!TheICAPConfig.reuse_connections) + disableRetries(); // this will also safely drain pconn pool + + // TODO: check whether NULL domain is appropriate here + connection = icapPconnPool->pop(s.host.buf(), s.port, NULL, NULL, isRetriable); + if (connection >= 0) { + debugs(93,3, HERE << "reused pconn FD " << connection); + connector = &ICAPXaction_noteCommConnected; // make doneAll() false + eventAdd("ICAPXaction::reusedConnection", + reusedConnection, + this, + 0.0, + 0, + true); + return; + } + + disableRetries(); // we only retry pconn failures + + connection = comm_open(SOCK_STREAM, 0, getOutgoingAddr(NULL), 0, + COMM_NONBLOCKING, s.uri.buf()); + + if (connection < 0) + dieOnConnectionFailure(); // throws + + debugs(93,3, typeName << " opens connection to " << s.host.buf() << ":" << s.port); + + // TODO: service bypass status may differ from that of a transaction + commSetTimeout(connection, TheICAPConfig.connect_timeout(service().bypass), + &ICAPXaction_noteCommTimedout, this); + + closer = &ICAPXaction_noteCommClosed; + comm_add_close_handler(connection, closer, this); + + connector = &ICAPXaction_noteCommConnected; + commConnectStart(connection, s.host.buf(), s.port, connector, this); +} + +/* + * This event handler is necessary to work around the no-rentry policy + * of ICAPXaction::callStart() + */ +void +ICAPXaction::reusedConnection(void *data) +{ + debugs(93, 5, "ICAPXaction::reusedConnection"); + ICAPXaction *x = (ICAPXaction*)data; + x->noteCommConnected(COMM_OK); +} + +void ICAPXaction::closeConnection() +{ + if (connection >= 0) { + + if (closer) { + comm_remove_close_handler(connection, closer, this); + closer = NULL; + } + + cancelRead(); // may not work + + if (reuseConnection && !doneWithIo()) { + debugs(93,5, HERE << "not reusing pconn due to pending I/O" << status()); + reuseConnection = false; + } + + if (reuseConnection) { + debugs(93,3, HERE << "pushing pconn" << status()); + commSetTimeout(connection, -1, NULL, NULL); + icapPconnPool->push(connection, theService->host.buf(), theService->port, NULL, NULL); + disableRetries(); + } else { + debugs(93,3, HERE << "closing pconn" << status()); + // comm_close will clear timeout + comm_close(connection); + } + + writer = NULL; + reader = NULL; + connector = NULL; + connection = -1; + } +} + +// connection with the ICAP service established +void ICAPXaction::noteCommConnected(comm_err_t commStatus) +{ + ICAPXaction_Enter(noteCommConnected); + + Must(connector); + connector = NULL; + + if (commStatus != COMM_OK) + dieOnConnectionFailure(); // throws + + fd_table[connection].noteUse(icapPconnPool); + + handleCommConnected(); + + ICAPXaction_Exit(); +} + +void ICAPXaction::dieOnConnectionFailure() { + debugs(93, 2, HERE << typeName << + " failed to connect to " << service().uri); + theService->noteFailure(); + throw TexcHere("cannot connect to the ICAP service"); +} + +void ICAPXaction::scheduleWrite(MemBuf &buf) +{ + // comm module will free the buffer + writer = &ICAPXaction_noteCommWrote; + comm_write_mbuf(connection, &buf, writer, this); + updateTimeout(); +} + +void ICAPXaction::noteCommWrote(comm_err_t commStatus, size_t size) +{ + ICAPXaction_Enter(noteCommWrote); + + Must(writer); + writer = NULL; + + if (ignoreLastWrite) { + // a hack due to comm inability to cancel a pending write + ignoreLastWrite = false; + debugs(93, 7, HERE << "ignoring last write; status: " << commStatus); + } else { + Must(commStatus == COMM_OK); + updateTimeout(); + handleCommWrote(size); + } + + ICAPXaction_Exit(); +} + +// communication timeout with the ICAP service +void ICAPXaction::noteCommTimedout() +{ + ICAPXaction_Enter(noteCommTimedout); + + handleCommTimedout(); + + ICAPXaction_Exit(); +} + +void ICAPXaction::handleCommTimedout() +{ + debugs(93, 2, HERE << typeName << " failed: timeout with " << + theService->methodStr() << " " << theService->uri.buf() << status()); + reuseConnection = false; + service().noteFailure(); + + throw TexcHere(connector ? + "timed out while connecting to the ICAP service" : + "timed out while talking to the ICAP service"); +} + +// unexpected connection close while talking to the ICAP service +void ICAPXaction::noteCommClosed() +{ + closer = NULL; + ICAPXaction_Enter(noteCommClosed); + + handleCommClosed(); + + ICAPXaction_Exit(); +} + +void ICAPXaction::handleCommClosed() +{ + mustStop("ICAP service connection externally closed"); +} + +void ICAPXaction::callEnd() +{ + if (doneWithIo()) { + debugs(93, 5, HERE << typeName << " done with I/O" << status()); + closeConnection(); + } + ICAPInitiate::callEnd(); // may destroy us +} + +bool ICAPXaction::doneAll() const +{ + return !connector && !reader && !writer && ICAPInitiate::doneAll(); +} + +void ICAPXaction::updateTimeout() { + if (reader || writer) { + // restart the timeout before each I/O + // XXX: why does Config.Timeout lacks a write timeout? + // TODO: service bypass status may differ from that of a transaction + commSetTimeout(connection, TheICAPConfig.io_timeout(service().bypass), + &ICAPXaction_noteCommTimedout, this); + } else { + // clear timeout when there is no I/O + // Do we need a lifetime timeout? + commSetTimeout(connection, -1, NULL, NULL); + } +} + +void ICAPXaction::scheduleRead() +{ + Must(connection >= 0); + Must(!reader); + Must(readBuf.hasSpace()); + + reader = &ICAPXaction_noteCommRead; + /* + * See comments in ICAPXaction.h about why we use commBuf + * here instead of reading directly into readBuf.buf. + */ + + comm_read(connection, commBuf, readBuf.spaceSize(), reader, this); + updateTimeout(); +} + +// comm module read a portion of the ICAP response for us +void ICAPXaction::noteCommRead(comm_err_t commStatus, size_t sz) +{ + ICAPXaction_Enter(noteCommRead); + + Must(reader); + reader = NULL; + + Must(commStatus == COMM_OK); + Must(sz >= 0); + + updateTimeout(); + + debugs(93, 3, HERE << "read " << sz << " bytes"); + + /* + * See comments in ICAPXaction.h about why we use commBuf + * here instead of reading directly into readBuf.buf. + */ + + if (sz > 0) { + readBuf.append(commBuf, sz); + disableRetries(); // because pconn did not fail + } else { + reuseConnection = false; + commEof = true; + } + + handleCommRead(sz); + + ICAPXaction_Exit(); +} + +void ICAPXaction::cancelRead() +{ + if (reader) { + // check callback presence because comm module removes + // fdc_table[].read.callback after the actual I/O but + // before we get the callback via a queued event. + // These checks try to mimic the comm_read_cancel() assertions. + + if (comm_has_pending_read(connection) && + !comm_has_pending_read_callback(connection)) { + comm_read_cancel(connection, reader, this); + reader = NULL; + } + } +} + +bool ICAPXaction::parseHttpMsg(HttpMsg *msg) +{ + debugs(93, 5, HERE << "have " << readBuf.contentSize() << " head bytes to parse"); + + http_status error = HTTP_STATUS_NONE; + const bool parsed = msg->parse(&readBuf, commEof, &error); + Must(parsed || !error); // success or need more data + + if (!parsed) { // need more data + Must(mayReadMore()); + msg->reset(); + return false; + } + + readBuf.consume(msg->hdr_sz); + return true; +} + +bool ICAPXaction::mayReadMore() const +{ + return !doneReading() && // will read more data + readBuf.hasSpace(); // have space for more data +} + +bool ICAPXaction::doneReading() const +{ + return commEof; +} + +bool ICAPXaction::doneWriting() const +{ + return !writer; +} + +bool ICAPXaction::doneWithIo() const +{ + return connection >= 0 && // or we could still be waiting to open it + !connector && !reader && !writer && // fast checks, some redundant + doneReading() && doneWriting(); +} + +// initiator aborted +void ICAPXaction::noteInitiatorAborted() +{ + ICAPXaction_Enter(noteInitiatorAborted); + + if (theInitiator) { + clearInitiator(); + mustStop("initiator aborted"); + } + + ICAPXaction_Exit(); +} + +// This 'last chance' method is called before a 'done' transaction is deleted. +// It is wrong to call virtual methods from a destructor. Besides, this call +// indicates that the transaction will terminate as planned. +void ICAPXaction::swanSong() +{ + // kids should sing first and then call the parent method. + + closeConnection(); // TODO: rename because we do not always close + + if (!readBuf.isNull()) + readBuf.clean(); + + if (commBuf) + memFreeBuf(commBufSize, commBuf); + + if (theInitiator) + tellQueryAborted(!isRetriable); + + ICAPInitiate::swanSong(); +} + +// returns a temporary string depicting transaction status, for debugging +const char *ICAPXaction::status() const +{ + static MemBuf buf; + buf.reset(); + + buf.append(" [", 2); + + fillPendingStatus(buf); + buf.append("/", 1); + fillDoneStatus(buf); + + buf.Printf(" icapx%d]", id); + + buf.terminate(); + + return buf.content(); +} + +void ICAPXaction::fillPendingStatus(MemBuf &buf) const +{ + if (connection >= 0) { + buf.Printf("FD %d", connection); + + if (writer) + buf.append("w", 1); + + if (reader) + buf.append("r", 1); + + buf.append(";", 1); + } +} + +void ICAPXaction::fillDoneStatus(MemBuf &buf) const +{ + if (connection >= 0 && commEof) + buf.Printf("Comm(%d)", connection); + + if (stopReason != NULL) + buf.Printf("Stopped"); +} + +bool ICAPXaction::fillVirginHttpHeader(MemBuf &buf) const +{ + return false; +} + +#endif /* 0 */ + + +/* ICAPModXactLauncher */ + +ICAPModXactLauncher::ICAPModXactLauncher(ICAPInitiator *anInitiator, + HttpMsg *aHeader, HttpRequest *aCause, + ICAPServiceRep::Pointer &aService): + EcapXaction("EcapXaction", anInitiator, aService) +{ + virgin.setHeader(aHeader); + virgin.setCause(aCause); +} + + + --- /dev/null Wed Nov 21 01:21:49 2007 +++ squid3/src/eCAP/Xaction.h Wed Nov 21 01:21:49 2007 @@ -0,0 +1,84 @@ + +/* + * $Id: Xaction.h,v 1.1.2.1 2007/11/20 15:40:34 rousskov Exp $ + * + */ + +#ifndef SQUID_ECAP_XACTION_H +#define SQUID_ECAP_XACTION_H + +#include "MemBuf.h" +#include "eCAP/ServiceRep.h" +#include "ICAP/ICAPInitiate.h" +#include "ICAP/ICAPInOut.h" + +class HttpMsg; + +#if USE_ECAP_AS_ICAP_HACK +#define EcapXaction ICAPXaction +#endif + + +class EcapXaction: public ICAPInitiate +{ + +public: + EcapXaction(const char *aTypeName, ICAPInitiator *anInitiator, ICAPServiceRep::Pointer &aService); + virtual ~EcapXaction(); + +public: + ICAPInOut virgin; + ICAPInOut adapted; + +protected: + virtual void start(); + virtual void noteInitiatorAborted(); // TODO: move to ICAPInitiate + + void updateTimeout(); + + virtual bool doneAll() const; + + // called just before the 'done' transaction is deleted + virtual void swanSong(); + + // returns a temporary string depicting transaction status, for debugging + virtual const char *status() const; + virtual void fillPendingStatus(MemBuf &buf) const; + virtual void fillDoneStatus(MemBuf &buf) const; + + // useful for debugging + virtual bool fillVirginHttpHeader(MemBuf&) const; + + // custom end-of-call checks + virtual void callEnd(); + +protected: + const int id; // transaction ID for debugging, unique across ICAP xactions + + const char *stopReason; + +private: + static int TheLastId; + + CBDATA_CLASS2(EcapXaction); +}; + +// call guards for all "asynchronous" note*() methods +// If we move EcapXaction_* macros to core, they can use these generic names: +#define EcapXaction_Enter(method) AsyncCallEnter(method) +#define EcapXaction_Exit() AsyncCallExit() + +#if USE_ECAP_AS_ICAP_HACK + +// An ICAPLauncher that stores ICAPModXact construction info and +// creates ICAPModXact when needed +class ICAPModXactLauncher: public ICAPXaction +{ +public: + ICAPModXactLauncher(ICAPInitiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, ICAPServiceRep::Pointer &s); +}; + +#endif /* USE_ECAP_AS_ICAP_HACK */ + + +#endif /* SQUID_ECAP_XACTION_H */