--------------------- PatchSet 1529 Date: 2005/08/19 20:11:55 Author: rousskov Branch: squid3-icap Tag: (none) Log: - Added ICAPXaction which receives virgin HTTP messages, communicates with the ICAP server, and sends the adapted messages back. Current implementation does not form or parse ICAP messages. Members: src/ICAPXaction.cc:1.1->1.1.2.1 src/ICAPXaction.h:1.1->1.1.2.1 --- /dev/null Wed Feb 14 13:33:00 2007 +++ squid3/src/ICAPXaction.cc Wed Feb 14 13:34:51 2007 @@ -0,0 +1,391 @@ +/* + * DEBUG: section 93 ICAP (RFC 3507) Client + */ + +#include "squid.h" +#include "comm.h" +#include "MsgPipe.h" +#include "MsgPipeData.h" +#include "HttpRequest.h" +#include "HttpReply.h" +#include "ICAPServiceRep.h" +#include "ICAPXaction.h" +#include "ICAPClient.h" +#include "TextException.h" + +// flow and terminology: +// HTTP| --> receive --> encode --> write --> |network +// end | <-- send <-- parse <-- read <-- |end + +/* comm module handlers (wrappers around corresponding ICAPXaction methods */ +// TODO: Teach comm module to call object methods directly + +CBDATA_CLASS_INIT(ICAPXaction); + +static +ICAPXaction &ICAPXaction_fromData(void *data) { + ICAPXaction *x = static_cast(data); + assert(x); + return *x; +} + +static +void ICAPXaction_noteCommTimeout(int, void *data) { + ICAPXaction_fromData(data).noteCommTimeout(); +} + +static +void ICAPXaction_noteCommClose(int, void *data) { + ICAPXaction_fromData(data).noteCommClose(); +} + +static +void ICAPXaction_noteCommConnected(int, comm_err_t status, int xerrno, void *data) { + ICAPXaction_fromData(data).noteCommConnected(status); +} + +static +void ICAPXaction_noteCommWroteHeaders(int, char *, size_t size, comm_err_t status, void *data) { + ICAPXaction_fromData(data).noteCommWroteHeaders(status); +} + +static +void ICAPXaction_noteCommWroteBody(int, char *, size_t size, comm_err_t status, void *data) { + ICAPXaction_fromData(data).noteCommWroteBody(status, size); +} + +static +void ICAPXaction_noteCommRead(int, char *, size_t size, comm_err_t status, int xerrno, void *data) { + ICAPXaction_fromData(data).noteCommRead(status, size); +} + +// call guards for all "asynchronous" note*() methods that are being called +// from message pipes or the comm module +#define ICAPXaction_Enter(method) \ + try { \ + callStart(#method); + +#define ICAPXaction_Exit(method) \ + } \ + catch (const TextException &e) { \ + callException(#method, e); \ + } \ + callEnd(#method); + + +ICAPXaction::State::State() { + memset(this, sizeof(*this), 0); +} + +ICAPXaction::ICAPXaction(): self(NULL), virgin(NULL), adapted(NULL), + service(0), connection(-1), notify(notifyUnknown) { +} + +ICAPXaction::~ICAPXaction() { + notify = notifyNone; + doStop(); +} + +void ICAPXaction::init(MsgPipe *aVirgin, MsgPipe *anAdapted, Pointer &aSelf) { + assert(!self.getRaw() && !virgin && !adapted); + assert(aSelf.getRaw() && aVirgin && anAdapted); + + self = aSelf; + virgin = aVirgin; + adapted = anAdapted; + + // receiving end + virgin->sink = this; + // virgin pipe data is initiated by the source + + // sending end + adapted->sink = this; + adapted->data = new MsgPipeData; + adapted->data->body = new MemBuf; // XXX: make body a non-pointer? + memBufInit(adapted->data->body, ICAPMsgPipeBufSize, ICAPMsgPipeBufSize); + // headers are initialized when we parse them + + // writing end + // nothing to do because we are using temporary write buffers + + // reading end + memBufInit(&readBuf, MAX_CLIENT_BUF_SZ, MAX_CLIENT_BUF_SZ); + + // encoding + memBufInit(&requestBuf, MAX_CLIENT_BUF_SZ, MAX_CLIENT_BUF_SZ); + + // XXX: make sure stop() cleans all buffers +} + +// HTTP side starts sending virgin data +void ICAPXaction::noteSourceStart(MsgPipe *p) { + pickService(); + openConnection(); + // put nothing here as openConnection calls commConnectStart + // and that may call us back without waiting for next select loop +} + +// TODO: obey service-specific, OPTIONS-reported connection limit +void ICAPXaction::openConnection() { + // TODO: check whether NULL domain is appropriate here + connection = pconnPop(service->host, service->port, NULL); + + if (connection < 0) { + connection = comm_open(SOCK_STREAM, 0, getOutgoingAddr(NULL), 0, + COMM_NONBLOCKING, service->uri); + if (connection < 0) + throw TexcHere("cannot connect to ICAP service " /* + uri */); + } + + commSetTimeout(connection, Config.Timeout.connect, + &ICAPXaction_noteCommTimeout, this); + comm_add_close_handler(connection, + &ICAPXaction_noteCommClose, this); + commConnectStart(connection, service->host, service->port, + &ICAPXaction_noteCommConnected, this); +} + +void ICAPXaction::closeConnection() { + if (connection >= 0) { + commSetTimeout(connection, -1, NULL, NULL); + comm_remove_close_handler(connection, + &ICAPXaction_noteCommClose, this); + comm_close(connection); + connection = -1; + } +} + +// connection with the ICAP service established +void ICAPXaction::noteCommConnected(comm_err_t status) { + if (status != COMM_OK) { + mustStop(notifyHttp); + return; + } + + startReading(); + + makeRequestHeaders(); + // write headers only; comm module will free the requestBuf + comm_old_write_mbuf(connection, requestBuf, + &ICAPXaction_noteCommWroteHeaders, this); +} + +void ICAPXaction::noteCommWroteHeaders(comm_err_t status) { + Must(state.isWriting); + state.isWriting = false; + + if (status != COMM_OK) { + mustStop(notifyHttp); + return; + } + + writeMoreBody(); +} + +void ICAPXaction::writeMoreBody() { + if (state.isWriting || state.doneWriting) + return; + + MsgPipeData::Body *body = virgin->data->body; + if (body->hasContent()) { + state.isWriting = true; + + MemBuf chunk; + memBufPrintf(&chunk, "%x\r\n", body->contentSize()); + chunk.append(body->content(), body->contentSize()); + body->consume(body->contentSize()); + virgin->sendSinkNeed(); + + // will free the chunk + comm_old_write_mbuf(connection, chunk, + &ICAPXaction_noteCommWroteBody, this); + } else { + state.doneWriting = state.doneReceiving; + } +} + +void ICAPXaction::noteCommWroteBody(comm_err_t status, size_t sz) { + state.isWriting = false; + if (status != COMM_OK) { + mustStop(notifyHttp); + return; + } + writeMoreBody(); +} + +// communication timeout with the ICAP service +void ICAPXaction::noteCommTimeout() { + mustStop(notifyHttp); +} + +// unexpected connection close while talking to the ICAP service +void ICAPXaction::noteCommClose() { + mustStop(notifyHttp); +} + +bool ICAPXaction::done() const { + if (notify != notifyUnknown) // mustStop() has been called + return true; + + return state.doneReceiving && state.doneSending && + state.doneReading() && state.doneWriting; +} + +void ICAPXaction::startReading() { + Must(connection >= 0 && !state.isReading && + adapted && adapted->data && adapted->data->body); + + readMore(); +} + +void ICAPXaction::readMore() { + if (state.isReading || state.doneReading()) + return; + + // we use the same buffer for headers and body and then consume headers + if (readBuf.hasSpace()) { + state.isReading = true; + comm_read(connection, readBuf.space(), readBuf.spaceSize(), + &ICAPXaction_noteCommRead, this); + } +} + +// comm module read a portion of the ICAP response for us +void ICAPXaction::noteCommRead(comm_err_t status, size_t sz) { + Must(state.isReading); + Must(!state.doneParsing()); + + state.isReading = false; + + if (status != COMM_OK) { + mustStop(notifyHttp); + return; + } + + if (sz > 0) { + readBuf.appended(sz); + parseMore(); + } + + if (!state.doneReading()) + readMore(); +} + +void ICAPXaction::parseMore() { + if (state.parsing == State::psHeaders) + parseHeaders(); + + if (state.parsing == State::psBody) + parseBody(); +} + +void ICAPXaction::parseHeaders() { + Must(state.parsing == State::psHeaders); + Must(false); // implement me +} + +void ICAPXaction::parseBody() { + Must(state.parsing == State::psBody); + Must(false); // implement me +} + +// HTTP side added virgin body data +void ICAPXaction::noteSourceProgress(MsgPipe *p) { + Must(!state.doneReceiving); + writeMoreBody(); +} + +// HTTP side sent us all virgin info +void ICAPXaction::noteSourceFinish(MsgPipe *p) { + Must(!state.doneReceiving); + state.doneReceiving = true; +} + +// HTTP side is aborting +void ICAPXaction::noteSourceAbort(MsgPipe *p) { + Must(!state.doneReceiving); + state.doneReceiving = true; +} + +// HTTP side wants more adapted data and possibly freed some buffer space +void ICAPXaction::noteSinkNeed(MsgPipe *p) { + Must(!state.doneSending); + parseMore(); +} + +// HTTP side aborted +void ICAPXaction::noteSinkAbort(MsgPipe *p) { + mustStop(notifyService); +} + +void ICAPXaction::pickService() { + // TODO: select the service based on config, ACLs, etc. + static ICAPServiceRep TheService; + service = &TheService; +} + +void ICAPXaction::mustStop(Notify who) { + Must(state.inCall); // otherwise nobody will call doStop() + Must(notify == notifyUnknown); + notify = who; + debugs(93, 5, "ICAPXaction will stop and notify " << notify << "\n"); +} + +// internal cleanup +void ICAPXaction::doStop() { + debugs(93, 5, "ICAPXaction::doStop\n"); + + memBufClean(&readBuf); + memBufClean(&requestBuf); + closeConnection(); + + if (virgin) { + if (notify == notifyHttp) + virgin->sendSinkAbort(); + // this is the place to decrement refcount ptr + virgin = NULL; + } + if (adapted) { + if (notify == notifyHttp) + adapted->sendSourceAbort(); + // this is the place to decrement refcount ptr + adapted = NULL; + } + + if (self != NULL) { + Pointer s = self; + self = NULL; + ICAPNoteXactionDone(s); + /* this object may be destroyed when 's' is cleared */ + } +} + +void ICAPXaction::makeRequestHeaders() { + Must(false); // implement me +} + +void ICAPXaction::callStart(const char *method) { + debugs(93, 5, "ICAPXaction::" << method << " called\n"); + state.inCall = true; +} + +void ICAPXaction::callException(const char *method, const TextException &e, Notify defaultWho) { + debugs(93, 4, "ICAPXaction::" << method << " caught an exception: " << + e.message << "\n"); + if (!done()) + mustStop(defaultWho); +} + +void ICAPXaction::callEnd(const char *method) { + debugs(93, 5, "ICAPXaction::" << method << " ending\n"); + + if (done()) { + doStop(); + return; + } + + debugs(93, 5, "ICAPXaction continues after " << method << "\n"); + + debugs(93, 6, "ICAPXaction::" << method << " ended.\n"); + state.inCall = false; +} --- /dev/null Wed Feb 14 13:33:00 2007 +++ squid3/src/ICAPXaction.h Wed Feb 14 13:34:51 2007 @@ -0,0 +1,142 @@ + +/* + * $Id: ICAPXaction.h,v 1.1.2.1 2005/08/19 20:11:55 rousskov Exp $ + * + * + * SQUID Web Proxy Cache http://www.squid-cache.org/ + * ---------------------------------------------------------- + * + * Squid is the result of efforts by numerous individuals from + * the Internet community; see the CONTRIBUTORS file for full + * details. Many organizations have provided support for Squid's + * development; see the SPONSORS file for full details. Squid is + * Copyrighted (C) 2001 by the Regents of the University of + * California; see the COPYRIGHT file for full details. Squid + * incorporates software developed and/or copyrighted by other + * sinks; see the CREDITS file for full details. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. + * + */ + +#ifndef SQUID_ICAPXACTION_H +#define SQUID_ICAPXACTION_H + +#include "MemBuf.h" +#include "MsgPipeSource.h" +#include "MsgPipeSink.h" + +/* The ICAP Xaction implements message pipe sink and source interfaces. It + * receives virgin HTTP messages, communicates with the ICAP server, and sends + * the adapted messages back. ICAPClient is the "owner" of the ICAPXaction. */ + +class ICAPServiceRep; +class TextException; + +class ICAPXaction: public MsgPipeSource, public MsgPipeSink, public RefCountable { +public: + typedef RefCount Pointer; + CBDATA_CLASS2(ICAPXaction); + +public: + ICAPXaction(); + virtual ~ICAPXaction(); + + // called by ICAPClient + void init(MsgPipe *aVirgin, MsgPipe *anAdapted, Pointer &aSelf); + void ownerAbort(); + + // pipe source methods; called by Anchor while receiving the adapted msg + virtual void noteSinkNeed(MsgPipe *p); + virtual void noteSinkAbort(MsgPipe *p); + + // pipe sink methods; called by ICAP while sending the virgin message + virtual void noteSourceStart(MsgPipe *p); + virtual void noteSourceProgress(MsgPipe *p); + virtual void noteSourceFinish(MsgPipe *p); + virtual void noteSourceAbort(MsgPipe *p); + + // comm handlers + void noteCommConnected(comm_err_t status); + void noteCommWroteHeaders(comm_err_t status); + void noteCommWroteBody(comm_err_t status, size_t sz); + void noteCommRead(comm_err_t status, size_t sz); + void noteCommTimeout(); + void noteCommClose(); + +private: + void openConnection(); + void closeConnection(); + void writeMoreBody(); + void startReading(); + void readMore(); + void sendMoreData(); + + void makeRequestHeaders(); + + void parseMore(); + void parseHeaders(); + void parseBody(); + + void pickService(); + + bool done() const; + + typedef enum { notifyUnknown, notifyNone, notifyService, notifyHttp } Notify; + void mustStop(Notify who); + void doStop(); + + void callStart(const char *method); + void callException(const char *method, const TextException &e, Notify who); + void callEnd(const char *method); + +private: + Pointer self; + MsgPipe *virgin; + MsgPipe *adapted; + + ICAPServiceRep *service; + int connection; // FD of the ICAP server connection + + MemBuf requestBuf; + MemBuf readBuf; + + class State { + public: + State(); + + public: + // XXX: document each + unsigned inCall: 1; + unsigned isWriting: 1; + unsigned isReading: 1; + + unsigned doneReceiving: 1; + unsigned doneSending: 1; + unsigned doneWriting: 1; + + bool doneParsing() const { return parsing == psDone; } + bool doneReading() const { return doneParsing(); } + + enum Parsing { psInit, psHeaders, psBody, psDone } parsing; + } state; + + Notify notify; +}; + +// destroys (or pools) the transaction; implemented in ICAPClient.cc (ick?) +extern void ICAPNoteXactionDone(ICAPXaction::Pointer x); + +#endif /* SQUID_ICAPXACTION_H */