--------------------- PatchSet 4052 Date: 2007/02/14 07:19:43 Author: rousskov Branch: squid3-icap Tag: (none) Log: - WARNING: The changes below are significant and probably introduce new bugs. The code passed some Web Polygraph and manual surfing tests, with and without ICAP. More testing is needed. New known problems are marked with XXXs. - ClientHttpRequest: Support the new ICAPInitiator API and talk to ICAPModXact directly instead of using ICAPClient* classes, which are now gone. See the last ICAPClient* CVS log message for the rationale. - ConnStateData: Use BodyPipe for delivering virgin request bodies to the server or ICAP side. Implement the BodyProducer interface. ClientHttpRequest: Use BodyPipe instead of BodyReader when receiving request bodies (from client side or ICAP). Implement the BodyConsumer interface. See the first BodyPipe CVS log message for the rationale. - Use BodyPipe for maintaining the "closing" state of a connection instead of in.abortedSize. This change "removes" a few memory leaks and an assertion, but does need more work, especially when the regular BodyPipe consumer leaves early and does not consume the request body. - The client stream code sometimes marks the "closing" connection as STREAM_UNPLANNED_COMPLETE, leading to a double-close. I do not yet understand why. There is now code to ignore multiple attempts to enter the "closing" state. - The new BodyPipe approach should make support for HTTP/1.1 chunked requests easier. Only a few places in the pipe-related code assume that the request size is known. - Handle REQMOD transaction failures where we cannot proceed with the normal request flow. Members: src/client_side.cc:1.74.2.16->1.74.2.17 src/client_side.h:1.8.4.4->1.8.4.5 src/client_side_request.cc:1.34.4.23->1.34.4.24 src/client_side_request.h:1.17.12.8->1.17.12.9 Index: squid3/src/client_side.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/client_side.cc,v retrieving revision 1.74.2.16 retrieving revision 1.74.2.17 diff -u -r1.74.2.16 -r1.74.2.17 --- squid3/src/client_side.cc 25 Oct 2006 14:44:04 -0000 1.74.2.16 +++ squid3/src/client_side.cc 14 Feb 2007 07:19:43 -0000 1.74.2.17 @@ -1,6 +1,6 @@ /* - * $Id: client_side.cc,v 1.74.2.16 2006/10/25 14:44:04 rousskov Exp $ + * $Id: client_side.cc,v 1.74.2.17 2007/02/14 07:19:43 rousskov Exp $ * * DEBUG: section 33 Client-side Routines * AUTHOR: Duane Wessels @@ -134,8 +134,6 @@ #if USE_IDENT static IDCB clientIdentDone; #endif -static BodyReadFunc clientReadBody; -static BodyAbortFunc clientAbortBody; static CSCB clientSocketRecipient; static CSD clientSocketDetach; static void clientSetKeepaliveFlag(ClientHttpRequest *); @@ -494,6 +492,8 @@ al.http.content_type = loggingEntry()->mem_obj->getReply()->content_type.buf(); } + debug(33, 9) ("clientLogRequest: http.code='%d'\n", al.http.code); + if (loggingEntry() && loggingEntry()->mem_obj) al.cache.objectSize = contentLen(loggingEntry()); @@ -577,23 +577,6 @@ return true; } -BodyReader * -ConnStateData::body_reader() -{ - return body_reader_.getRaw(); -} - -void -ConnStateData::body_reader(BodyReader::Pointer reader) -{ - body_reader_ = reader; - - if (reader == NULL) - fd_note(fd, "Waiting for next request"); - else - fd_note(fd, "Reading request body"); -} - void ConnStateData::freeAllContexts() { @@ -653,7 +636,10 @@ cbdataReferenceDone(port); - body_reader(NULL); // refcounted + if (bodyPipe != NULL) { + bodyPipe->clearProducer(false); + bodyPipe = NULL; // refcounted + } } /* @@ -1560,14 +1546,24 @@ } void -ClientSocketContext::initiateClose() +ClientSocketContext::initiateClose(const char *reason) { + debugs(33, 5, HERE << "initiateClose: closing for " << reason); if (http != NULL) { ConnStateData::Pointer conn = http->getConn(); if (conn != NULL) { - if (conn->bodySizeLeft() > 0) { - debug(33, 5) ("ClientSocketContext::initiateClose: closing, but first we need to read the rest of the request\n"); + if (const ssize_t expecting = conn->bodySizeLeft()) { + debugs(33, 5, HERE << "ClientSocketContext::initiateClose: " << + "closing, but first " << conn << " needs to read " << + expecting << " request body bytes with " << + conn->in.notYetUsed << " notYetUsed"); + + if (conn->closing()) { + debugs(33, 2, HERE << "avoiding double-closing " << conn); + return; + } + /* * XXX We assume the reply fits in the TCP transmit * window. If not the connection may stall while sending @@ -1576,20 +1572,7 @@ * As of yet we have not received any complaints indicating * this may be an issue. */ - conn->closing(true); - /* any unread body becomes abortedSize at this point. */ - conn->in.abortedSize = conn->bodySizeLeft(); - /* - * Trigger the BodyReader abort handler, if necessary, - * by destroying it. It is a refcounted pointer, so - * set it to NULL and let the destructor be called when - * all references are gone. - * - * This seems to be flawed: theres no way this can trigger - * if conn->body_reader is not NULL. Perhaps it works for - * ICAP but not real requests ? - */ - http->request->body_reader = NULL; // refcounted + conn->startClosing(reason); return; } } @@ -1610,9 +1593,12 @@ clientUpdateSocketStats(http->logType, size); assert (this->fd() == fd); + /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */ + if (errflag == COMM_ERR_CLOSING) + return; + if (errflag || clientHttpRequestStatus(fd, http)) { - debug (33,5)("clientWriteComplete: FD %d, closing connection due to failure, or true requeststatus\n", fd); - initiateClose(); + initiateClose("failure or true request status"); /* Do we leak here ? */ return; } @@ -1632,7 +1618,7 @@ /* fallthrough */ case STREAM_FAILED: - initiateClose(); + initiateClose("STREAM_UNPLANNED_COMPLETE|STREAM_FAILED"); return; default: @@ -1857,7 +1843,7 @@ r = HttpParserParseReqLine(hp); if (r == 0) { debug(33, 5) ("Incomplete request, waiting for end of request line\n"); - return NULL; + return NULL; } if (r == -1) { return parseHttpRequestAbort(conn, "error:invalid-request"); @@ -1894,9 +1880,9 @@ /* Set method_p */ *method_p = HttpRequestMethod(&hp->buf[hp->m_start], &hp->buf[hp->m_end]); if (*method_p == METHOD_NONE) { - /* XXX need a way to say "this many character length string" */ + /* XXX need a way to say "this many character length string" */ debug(33, 1) ("clientParseRequestMethod: Unsupported method in request '%s'\n", hp->buf); - /* XXX where's the method set for this error? */ + /* XXX where's the method set for this error? */ return parseHttpRequestAbort(conn, "error:unsupported-request-method"); } @@ -1921,7 +1907,7 @@ if (strstr(req_hdr, "\r\r\n")) { debug(33, 1) ("WARNING: suspicious HTTP request contains double CR\n"); - xfree(url); + xfree(url); return parseHttpRequestAbort(conn, "error:double-CR"); } @@ -2138,6 +2124,8 @@ { ClientHttpRequest *http = context->http; HttpRequest *request = NULL; + bool notedUseOfBuffer = false; + /* We have an initial client stream in place should it be needed */ /* setup our private context */ context->registerWithConn(); @@ -2152,7 +2140,7 @@ assert(context->http->out.offset == 0); context->pullData(); conn->flags.readMoreRequests = false; - goto finish; + goto finish; } if ((request = HttpRequest::CreateFromUrlAndMethod(http->uri, method)) == NULL) { @@ -2166,7 +2154,7 @@ assert(context->http->out.offset == 0); context->pullData(); conn->flags.readMoreRequests = false; - goto finish; + goto finish; } /* compile headers */ @@ -2183,7 +2171,7 @@ assert(context->http->out.offset == 0); context->pullData(); conn->flags.readMoreRequests = false; - goto finish; + goto finish; } request->flags.accelerated = http->flags.accel; @@ -2226,7 +2214,7 @@ assert(context->http->out.offset == 0); context->pullData(); conn->flags.readMoreRequests = false; - goto finish; + goto finish; } @@ -2240,29 +2228,22 @@ assert(context->http->out.offset == 0); context->pullData(); conn->flags.readMoreRequests = false; - goto finish; + goto finish; } http->request = HTTPMSGLOCK(request); clientSetKeepaliveFlag(http); - /* Do we expect a request-body? */ + /* Do we expect a request-body? */ if (request->content_length > 0) { - request->body_reader = new BodyReader(request->content_length, - clientReadBody, - clientAbortBody, - NULL, - conn.getRaw()); - conn->body_reader(request->body_reader); - /* - * NOTE: We haven't called connNoteUseOfBuffer() yet. It gets - * done at finish: below. So here we have to subtract off - * req_sz from notYetUsed, or else the BodyReader thinks it - * has more data than it really does, and will get confused. - */ - request->body_reader->notify(conn->in.notYetUsed - http->req_sz); + request->body_pipe = conn->expectRequestBody(request->content_length); - if (request->body_reader->remaining()) + // consume header early so that body pipe gets just the body + connNoteUseOfBuffer(conn.getRaw(), http->req_sz); + notedUseOfBuffer = true; + + conn->handleRequestBodyData(); + if (!request->body_pipe->exhausted()) conn->readSomeData(); /* Is it too large? */ @@ -2278,7 +2259,7 @@ assert(context->http->out.offset == 0); context->pullData(); conn->flags.readMoreRequests = false; - goto finish; + goto finish; } context->mayUseConnection(true); @@ -2293,8 +2274,8 @@ http->doCallouts(); finish: - /* Consume request buffer */ - connNoteUseOfBuffer(conn.getRaw(), http->req_sz); + if (!notedUseOfBuffer) + connNoteUseOfBuffer(conn.getRaw(), http->req_sz); } static void @@ -2330,8 +2311,9 @@ ssize_t ConnStateData::bodySizeLeft() { - if (body_reader_ != NULL) - return body_reader_->remaining(); + // XXX: this logic will not work for chunked requests with unknown sizes + if (bodyPipe != NULL) + return bodyPipe->unproducedSize(); return 0; } @@ -2357,9 +2339,9 @@ while (conn->in.notYetUsed > 0 && conn->bodySizeLeft() == 0) { connStripBufferWhitespace (conn); - /* Don't try to parse if the buffer is empty */ - if (conn->in.notYetUsed == 0) - break; + /* Don't try to parse if the buffer is empty */ + if (conn->in.notYetUsed == 0) + break; /* Limit the number of concurrent requests to 2 */ @@ -2371,13 +2353,13 @@ /* Terminate the string */ conn->in.buf[conn->in.notYetUsed] = '\0'; - /* Begin the parsing */ - HttpParserInit(&hp, conn->in.buf, conn->in.notYetUsed); + /* Begin the parsing */ + HttpParserInit(&hp, conn->in.buf, conn->in.notYetUsed); /* Process request */ - PROF_start(parseHttpRequest); + PROF_start(parseHttpRequest); context = parseHttpRequest(conn, &hp, &method, &http_ver); - PROF_stop(parseHttpRequest); + PROF_stop(parseHttpRequest); /* partial or incomplete request */ if (!context) { @@ -2450,45 +2432,8 @@ if (size > 0) { kb_incr(&statCounter.client_http.kbytes_in, size); - char *current_buf = conn->in.addressToReadInto(); - - if (buf != current_buf) - xmemmove(current_buf, buf, size); - - conn->in.notYetUsed += size; - - conn->in.buf[conn->in.notYetUsed] = '\0'; /* Terminate the string */ - - /* if there is available non-aborted data, give it to the - * BodyReader - */ - if (conn->body_reader() != NULL) - conn->body_reader()->notify(conn->in.notYetUsed); + conn->handleReadData(buf, size); - /* there is some aborted body to remove - * could we? should we? use BodyReader to eliminate this via an - * abort() api. - * - * This is not the most optimal path: ideally we would: - * - optimise the memmove above to not move data we're discarding - * - discard notYetUsed earlier - */ - if (conn->in.abortedSize) { - size_t discardSize = XMIN(conn->in.abortedSize, conn->in.notYetUsed); - /* these figures must match */ - assert(conn->in.abortedSize == (size_t)conn->bodySizeLeft()); - conn->body_reader()->reduce_remaining(discardSize); - connNoteUseOfBuffer(conn.getRaw(), discardSize); - conn->in.abortedSize -= discardSize; - - if (!conn->in.abortedSize) - /* we've finished reading like good clients, - * now do the close that initiateClose initiated. - * - * XXX: do we have to close? why not check keepalive et. - */ - comm_close(fd); - } } else if (size == 0) { debug(33, 5) ("clientReadRequest: FD %d closed?\n", fd); @@ -2539,67 +2484,65 @@ } } -/* - * clientReadBody - * - * A request to receive some HTTP request body data. This is a - * 'read_func' of BodyReader class. Feels to me like this function - * belongs to ConnStateData class. - * - * clientReadBody is of type 'BodyReadFunc' - */ -size_t -clientReadBody(void *data, MemBuf &mb, size_t size) +// called when new request data has been read from the socket +void +ConnStateData::handleReadData(char *buf, size_t size) { - ConnStateData *conn = (ConnStateData *) data; - assert(conn); - debugs(33,3,HERE << "clientReadBody requested size " << size); - debugs(33,3,HERE << "clientReadBody FD " << conn->fd); - debugs(33,3,HERE << "clientReadBody in.notYetUsed " << conn->in.notYetUsed); + char *current_buf = in.addressToReadInto(); - if (size > conn->in.notYetUsed) - size = conn->in.notYetUsed; // may make size zero + if (buf != current_buf) + xmemmove(current_buf, buf, size); - debugs(33,3,HERE << "clientReadBody actual size " << size); + in.notYetUsed += size; + in.buf[in.notYetUsed] = '\0'; /* Terminate the string */ - if (size > 0) { - mb.append(conn->in.buf, size); - connNoteUseOfBuffer(conn, size); - } - - return size; + // if we are reading a body, stuff data into the body pipe + if (bodyPipe != NULL) + handleRequestBodyData(); } -/* - * clientAbortBody - * - * A dummy callback that consumes the remains of a request - * body for an aborted transaction. - * - * clientAbortBody is of type 'BodyAbortFunc' - */ -static void -clientAbortBody(void *data, size_t remaining) +// called when new request body data has been buffered in in.buf +// may close the connection if we were closing and piped everything out +void +ConnStateData::handleRequestBodyData() { - ConnStateData *conn = (ConnStateData *) data; - debugs(33,3,HERE << "clientAbortBody FD " << conn->fd); - debugs(33,3,HERE << "clientAbortBody in.notYetUsed " << conn->in.notYetUsed); - debugs(33,3,HERE << "clientAbortBody remaining " << remaining); - conn->in.abortedSize += remaining; - - if (conn->in.notYetUsed) { - size_t to_discard = XMIN(conn->in.notYetUsed, conn->in.abortedSize); - debugs(33,3,HERE << "to_discard " << to_discard); - conn->in.abortedSize -= to_discard; - connNoteUseOfBuffer(conn, to_discard); + assert(bodyPipe != NULL); + + if (const size_t putSize = bodyPipe->putMoreData(in.buf, in.notYetUsed)) + connNoteUseOfBuffer(this, putSize); + + if (!bodyPipe->mayNeedMoreData()) { + // BodyPipe will clear us automagically when we produced everything + bodyPipe = NULL; + + debugs(33,5, HERE << "produced entire request body for FD " << fd); + + if (closing()) { + /* we've finished reading like good clients, + * now do the close that initiateClose initiated. + * + * XXX: do we have to close? why not check keepalive et. + * + * XXX: To support chunked requests safely, we need to handle + * the case of an endless request. This if-statement does not, + * because mayNeedMoreData is true if request size is not known. + */ + comm_close(fd); + } } +} - /* - * This assertion exists to make sure that there is never a - * case where this function should be responsible for closing - * the file descriptor. - */ - assert(!conn->isOpen()); +void +ConnStateData::noteMoreBodySpaceAvailable(BodyPipe &) +{ + handleRequestBodyData(); +} + +void +ConnStateData::noteBodyConsumerAborted(BodyPipe &) +{ + if (!closing()) + startClosing("body consumer aborted"); } /* general lifetime handler for HTTP requests */ @@ -3234,17 +3177,40 @@ reading_ = newBool; } + +BodyPipe::Pointer +ConnStateData::expectRequestBody(size_t size) +{ + bodyPipe = new BodyPipe(this); + bodyPipe->setBodySize(size); + return bodyPipe; +} + bool ConnStateData::closing() const { return closing_; } +// Called by ClientSocketContext to give the connection a chance to read +// the entire body before closing the socket. void -ConnStateData::closing(bool const newBool) +ConnStateData::startClosing(const char *reason) { - assert (closing() != newBool); - closing_ = newBool; + debugs(33, 5, HERE << "startClosing " << this << " for " << reason); + assert(!closing()); + closing_ = true; + + assert(bodyPipe != NULL); + assert(bodySizeLeft() > 0); + + // We do not have to abort the body pipeline because we are going to + // read the entire body anyway. + // Perhaps an ICAP server wants to log the complete request. + + // XXX: but it could be consumer abort that caused this closing, + // which means we may get stuck as nobody is consuming our data! + // BodyPipe should probably implement a consumer-less sink mode. } char * Index: squid3/src/client_side.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/client_side.h,v retrieving revision 1.8.4.4 retrieving revision 1.8.4.5 diff -u -r1.8.4.4 -r1.8.4.5 --- squid3/src/client_side.h 30 Oct 2006 18:35:58 -0000 1.8.4.4 +++ squid3/src/client_side.h 14 Feb 2007 07:19:43 -0000 1.8.4.5 @@ -1,6 +1,6 @@ /* - * $Id: client_side.h,v 1.8.4.4 2006/10/30 18:35:58 rousskov Exp $ + * $Id: client_side.h,v 1.8.4.5 2007/02/14 07:19:43 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -36,7 +36,7 @@ #include "comm.h" #include "StoreIOBuffer.h" -#include "BodyReader.h" +#include "BodyPipe.h" #include "RefCount.h" class ConnStateData; @@ -119,14 +119,14 @@ void packRange(StoreIOBuffer const &, MemBuf * mb); void deRegisterWithConn(); void doClose(); - void initiateClose(); + void initiateClose(const char *reason); bool mayUseConnection_; /* This request may use the connection. Don't read anymore requests for now */ bool connRegistered_; }; /* A connection to a socket */ -class ConnStateData : public RefCountable +class ConnStateData : public BodyProducer, public RefCountable { public: @@ -157,15 +157,6 @@ char *buf; size_t notYetUsed; size_t allocatedSize; - /* - * abortedSize is the amount of data that should be read - * from the socket and immediately discarded. It may be - * set when there is a request body and that transaction - * gets aborted. The client side should read the remaining - * body content and just discard it, if the connection - * will be staying open. - */ - size_t abortedSize; } in; ssize_t bodySizeLeft(); @@ -206,16 +197,16 @@ void transparent(bool const); bool reading() const; void reading(bool const); + bool closing() const; - void closing(bool const); + void startClosing(const char *reason); - /* get the body reader that has been attached to the client - * request - */ - BodyReader * body_reader(); - /* set a body reader that should read data from the request - */ - void body_reader(BodyReader::Pointer); + BodyPipe::Pointer expectRequestBody(size_t size); + virtual void noteMoreBodySpaceAvailable(BodyPipe &); + virtual void noteBodyConsumerAborted(BodyPipe &); + + void handleReadData(char *buf, size_t size); + void handleRequestBodyData(); private: CBDATA_CLASS2(ConnStateData); @@ -223,7 +214,7 @@ bool reading_; bool closing_; Pointer openReference; - BodyReader::Pointer body_reader_; + BodyPipe::Pointer bodyPipe; // set when we are reading request body }; /* convenience class while splitting up body handling */ Index: squid3/src/client_side_request.cc =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/client_side_request.cc,v retrieving revision 1.34.4.23 retrieving revision 1.34.4.24 diff -u -r1.34.4.23 -r1.34.4.24 --- squid3/src/client_side_request.cc 29 Jan 2007 20:00:26 -0000 1.34.4.23 +++ squid3/src/client_side_request.cc 14 Feb 2007 07:19:43 -0000 1.34.4.24 @@ -1,6 +1,6 @@ /* - * $Id: client_side_request.cc,v 1.34.4.23 2007/01/29 20:00:26 rousskov Exp $ + * $Id: client_side_request.cc,v 1.34.4.24 2007/02/14 07:19:43 rousskov Exp $ * * DEBUG: section 85 Client-side Request Routines * AUTHOR: Robert Collins (Originally Duane Wessels in client_side.c) @@ -59,7 +59,7 @@ #include "wordlist.h" #if ICAP_CLIENT -#include "ICAP/ICAPClientReqmodPrecache.h" +#include "ICAP/ICAPModXact.h" #include "ICAP/ICAPElements.h" #include "ICAP/ICAPConfig.h" static void icapAclCheckDoneWrapper(ICAPServiceRep::Pointer service, void *data); @@ -120,7 +120,7 @@ } } - debugs(85,3, HERE << this << " ClientHttpRequest destructed"); + debugs(85,3, HERE << this << " ClientRequestContext destructed"); } ClientRequestContext::ClientRequestContext(ClientHttpRequest *anHttp) : http(cbdataReference(anHttp)), acl_checklist (NULL), redirect_state (REDIRECT_NONE) @@ -156,8 +156,6 @@ setConn(aConn); dlinkAdd(this, &active, &ClientActiveRequests); #if ICAP_CLIENT - icap = NULL; - icapDone = false; request_satisfaction_mode = false; #endif } @@ -239,14 +237,10 @@ { debug(33, 3) ("httpRequestFree: %s\n", uri); PROF_start(httpRequestFree); - /* if body_connection !NULL, then ProcessBody has not - * found the end of the body yet - */ - if (request && request->body_reader != NULL) { - debugs(32, 3, HERE << "setting body_reader " << request->body_reader.getRaw() << " to NULL for request " << request); - request->body_reader = NULL; // refcounted, triggers abort if needed. - } + // Even though freeResources() below may destroy the request, + // we no longer set request->body_pipe to NULL here + // because we did not initiate that pipe (ConnStateData did) /* the ICP check here was erroneous * - storeReleaseRequest was always called if entry was valid @@ -263,11 +257,12 @@ freeResources(); #if ICAP_CLIENT - if (icap) { - if (!icapDone) - icap->ownerAbort(); - delete icap; + if (icapHeadSource != NULL) { + icapHeadSource->noteInitiatorAborted(); + icapHeadSource = NULL; } + if (icapBodySource != NULL) + stopConsumingFrom(icapBodySource); #endif if (calloutContext) @@ -541,7 +536,7 @@ fatal("Fix this case in ClientRequestContext::icapAclCheckDone()"); // And when fixed, check whether the service is down in doIcap and - // if it is, abort early, without creating ICAPClientReqmodPrecache. + // if it is, abort early, without creating ICAPModXact. // See Server::startIcap() and its use. http->doCallouts(); @@ -856,10 +851,10 @@ ; } - if (old_request->body_reader != NULL) { - new_request->body_reader = old_request->body_reader; - old_request->body_reader = NULL; - debugs(0,0,HERE << "setting body_reader = NULL for request " << old_request); + if (old_request->body_pipe != NULL) { + new_request->body_pipe = old_request->body_pipe; + old_request->body_pipe = NULL; + debugs(0,0,HERE << "redirecting body_pipe " << new_request->body_pipe << " from request " << old_request << " to " << new_request); } new_request->content_length = old_request->content_length; @@ -872,7 +867,7 @@ /* FIXME PIPELINE: This is innacurate during pipelining */ - if (http->getConn().getRaw() != NULL) + if (http->getConn() != NULL) fd_note(http->getConn()->fd, http->uri); assert(http->uri); @@ -1101,132 +1096,24 @@ ClientHttpRequest::doIcap(ICAPServiceRep::Pointer service) { debugs(85, 3, HERE << this << " ClientHttpRequest::doIcap() called"); - assert(NULL == icap); - icap = new ICAPClientReqmodPrecache(service); - icap->startReqMod(this, request); - - if (request->body_reader == NULL) { - debugs(32, 3, HERE << "client request hasnt body..."); - icap->doneSending(); - - } - + assert(!icapHeadSource); + assert(!icapBodySource); + icapHeadSource = new ICAPModXact(this, request, NULL, service); + ICAPModXact::AsyncStart(icapHeadSource.getRaw()); return 0; } -/* - * icapSendRequestBodyWrapper - * - * A callback wrapper for ::icapSendRequestBody() - * - * icapSendRequestBodyWrapper is of type CBCB - */ -void -ClientHttpRequest::icapSendRequestBodyWrapper(MemBuf &mb, void *data) -{ - ClientHttpRequest *chr = static_cast(data); - chr->icapSendRequestBody(mb); -} - - -/* - * icapSendRequestBody - * - * Sends some chunk of a request body to the ICAP side. Must make sure - * that the ICAP-side can accept the data we have. If there is more - * body data to read, then schedule another BodyReader callback. - */ void -ClientHttpRequest::icapSendRequestBody(MemBuf &mb) +ClientHttpRequest::noteIcapHeadersAdapted() { - ssize_t size_to_send = mb.contentSize(); - debugs(32,3,HERE << "have " << mb.contentSize() << " bytes in mb"); - - if (size_to_send == 0) { - /* - * An error occurred during this transaction. Tell ICAP that we're done. - */ - - if (icap) - icap->doneSending(); - - return; - } - - debugs(32,3,HERE << "icap->potentialSpaceSize() = " << icap->potentialSpaceSize()); - - if (size_to_send > icap->potentialSpaceSize()) - size_to_send = icap->potentialSpaceSize(); - - if (size_to_send) { - debugs(32,3,HERE << "sending " << size_to_send << " body bytes to ICAP"); - StoreIOBuffer sbuf(size_to_send, 0, mb.content()); - icap->sendMoreData(sbuf); - icap->body_reader->consume(size_to_send); - icap->body_reader->bytes_read += size_to_send; - debugs(32,3," HTTP client body bytes_read=" << icap->body_reader->bytes_read); - } else { - debugs(32,2,HERE << "cannot send body data to ICAP"); - debugs(32,2,HERE << "\tBodyReader MemBuf has " << mb.contentSize()); - debugs(32,2,HERE << "\tbut icap->potentialSpaceSize() is " << icap->potentialSpaceSize()); - return; - } - - /* - * If we sent some data this time, and there is more data to - * read, then schedule another read request via BodyReader. - */ - if (size_to_send && icap->body_reader->remaining()) { - debugs(32,3,HERE << "calling body_reader->read()"); - icap->body_reader->read(icapSendRequestBodyWrapper, this); - } else { - debugs(32,3,HERE << "No more request body bytes to send"); - icap->doneSending(); - } -} - -/* - * Called by ICAPAnchor when it has space available for us. - */ -void -ClientHttpRequest::icapSpaceAvailable() -{ - debugs(85,3,HERE << this << " ClientHttpRequest::icapSpaceAvailable() called\n"); - - if (request->body_reader != NULL && icap->body_reader == NULL) { - debugs(32,3,HERE << "reassigning HttpRequest->body_reader to ICAP"); - /* - * ICAP hooks on to the BodyReader that gets data from - * ConnStateData. We'll make a new BodyReader that - * HttpStateData can use if the adapted response has a - * request body. See ICAPClientReqmodPrecache::noteSourceStart() - */ - icap->body_reader = request->body_reader; - request->body_reader = NULL; - } - - if (icap->body_reader == NULL) - return; - - if (icap->body_reader->callbackPending()) - return; - - debugs(32,3,HERE << "Calling read() for body data"); - - icap->body_reader->read(icapSendRequestBodyWrapper, this); -} - -void -ClientHttpRequest::takeAdaptedHeaders(HttpMsg *msg) -{ - debug(85,3)("ClientHttpRequest::takeAdaptedHeaders() called\n"); assert(cbdataReferenceValid(this)); // indicates bug + HttpMsg *msg = icapHeadSource->adapted.header; + assert(msg); + if (HttpRequest *new_req = dynamic_cast(msg)) { /* - * Replace the old request with the new request. First, - * Move the "body_connection" over, then unlink old and - * link new to the http state. + * Replace the old request with the new request. */ HTTPMSGUNLOCK(request); request = HTTPMSGLOCK(new_req); @@ -1240,6 +1127,12 @@ } else if (HttpReply *new_rep = dynamic_cast(msg)) { debugs(85,3,HERE << "REQMOD reply is HTTP reply"); + // subscribe to receive reply body + if (new_rep->body_pipe != NULL) { + icapBodySource = new_rep->body_pipe; + icapBodySource->setConsumer(this); + } + clientStreamNode *node = (clientStreamNode *)client_stream.tail->prev->data; clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); repContext->createStoreEntry(request->method, request->flags); @@ -1251,79 +1144,108 @@ clientGetMoreData(node, this); } + // we are done with getting headers (but may be receiving body) + icapHeadSource = NULL; + if (!request_satisfaction_mode) doCallouts(); +} - debug(85,3)("ClientHttpRequest::takeAdaptedHeaders() finished\n"); +void +ClientHttpRequest::noteIcapHeadersAborted() +{ + icapHeadSource = NULL; + assert(!icapBodySource); + handleIcapFailure(); } void -ClientHttpRequest::takeAdaptedBody(MemBuf *buf) +ClientHttpRequest::noteMoreBodyDataAvailable(BodyPipe &) { - debug(85,3)("ClientHttpRequest::takeAdaptedBody() called\n"); + assert(request_satisfaction_mode); + assert(icapBodySource != NULL); - if (request_satisfaction_mode) { - storeEntry()->write(StoreIOBuffer(buf, request_satisfaction_offset)); - request_satisfaction_offset += buf->contentSize(); - buf->consume(buf->contentSize()); // consume everything written - } else { - debug(85,0)("Unexpected call to takeAdaptedBody when " - "not in request_satisfaction_mode"); + if (const size_t contentSize = icapBodySource->buf().contentSize()) { + BodyPipeCheckout bpc(*icapBodySource); + const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset); + storeEntry()->write(ioBuf); + // assume can write everything + request_satisfaction_offset += contentSize; + bpc.buf.consume(contentSize); + bpc.checkIn(); } + + if (icapBodySource->exhausted()) + endRequestSatisfaction(); + // else wait for more body data } void -ClientHttpRequest::doneAdapting() +ClientHttpRequest::noteBodyProductionEnded(BodyPipe &) { - debug(85,3)("ClientHttpRequest::doneAdapting() called\n"); - // Do not delete icap here because the http (server) side - // may still need body_reader and body_reader may call - // ICAPClientReqmodPrecache for more body. - // Hopefully, something will trigger our destruction and we - // we will delete icap then. - icapDone = true; + assert(!icapHeadSource); + if (icapBodySource != NULL) { // did not end request satisfaction yet + // We do not expect more because noteMoreBodyDataAvailable always + // consumes everything. We do not even have a mechanism to consume + // leftovers after noteMoreBodyDataAvailable notifications seize. + assert(icapBodySource->exhausted()); + endRequestSatisfaction(); + } } void -ClientHttpRequest::abortAdapting() -{ - debug(85,3)("ClientHttpRequest::abortAdapting() called\n"); - delete icap; // we do not mimic doneAdapting here because icap is aborting - icap = NULL; - - if ((NULL == storeEntry()) || storeEntry()->isEmpty()) { - debug(85,3)("WARNING: ICAP REQMOD callout failed, proceeding with original request\n"); +ClientHttpRequest::endRequestSatisfaction() { + assert(request_satisfaction_mode); + stopConsumingFrom(icapBodySource); - if (calloutContext) - doCallouts(); - -#if ICAP_HARD_ERROR - - clientStreamNode *node = (clientStreamNode *)client_stream.tail->prev->data; - - clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); - - assert (repContext); + // XXX: how to end store entry formation correctly? + debugs(85,2, HERE << this << " does not seem to end request satisfaction"); +} - // Note if this code is ever used, clientBuildError() should be modified to - // accept an errno arg - repContext->setReplyToError(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR, - request->method, NULL, - getConn().getRaw() != NULL ? &getConn()->peer.sin_addr : &no_addr, request, - NULL, getConn().getRaw() != NULL - && getConn()->auth_user_request ? getConn()-> - auth_user_request : request->auth_user_request, errno); +void +ClientHttpRequest::noteBodyProducerAborted(BodyPipe &) +{ + assert(!icapHeadSource); + stopConsumingFrom(icapBodySource); + handleIcapFailure(); +} - node = (clientStreamNode *)client_stream.tail->data; +void +ClientHttpRequest::handleIcapFailure() +{ + debugs(85,3, HERE << "handleIcapFailure"); - clientStreamRead(node, this, node->readBuffer); + const bool usedStore = !storeEntry() || storeEntry()->isEmpty(); + const bool usedPipe = !request->body_pipe || + !request->body_pipe->consumedSize(); -#endif + // XXX: we must not try to recover if the ICAP service is not bypassable! + if (!usedStore && !usedPipe) { + debug(85,2)("WARNING: ICAP REQMOD callout failed, proceeding with original request\n"); + if (calloutContext) + doCallouts(); return; } - debug(0,0)("write me at %s:%d\n", __FILE__,__LINE__); + debugs(85,3, HERE << "ICAP REQMOD callout failed, responding with error"); + + clientStreamNode *node = (clientStreamNode *)client_stream.tail->prev->data; + clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); + assert(repContext); + + // The original author of the code also wanted to pass an errno to + // setReplyToError, but it seems unlikely that the errno reflects the + // true cause of the error at this point, so I did not pass it. + ConnStateData::Pointer c = getConn(); + repContext->setReplyToError(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR, + request->method, NULL, + (c != NULL ? &c->peer.sin_addr : &no_addr), request, NULL, + (c != NULL && c->auth_user_request ? + c->auth_user_request : request->auth_user_request)); + + node = (clientStreamNode *)client_stream.tail->data; + clientStreamRead(node, this, node->readBuffer); } #endif Index: squid3/src/client_side_request.h =================================================================== RCS file: /cvsroot/squid-sf//squid3/src/client_side_request.h,v retrieving revision 1.17.12.8 retrieving revision 1.17.12.9 diff -u -r1.17.12.8 -r1.17.12.9 --- squid3/src/client_side_request.h 29 Jan 2007 17:34:22 -0000 1.17.12.8 +++ squid3/src/client_side_request.h 14 Feb 2007 07:19:43 -0000 1.17.12.9 @@ -1,6 +1,6 @@ /* - * $Id: client_side_request.h,v 1.17.12.8 2007/01/29 17:34:22 rousskov Exp $ + * $Id: client_side_request.h,v 1.17.12.9 2007/02/14 07:19:43 rousskov Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -44,8 +44,8 @@ #if ICAP_CLIENT #include "ICAP/ICAPServiceRep.h" - -class ICAPClientReqmodPrecache; +#include "ICAP/ICAPInitiator.h" +#include "ICAP/ICAPModXact.h" class HttpMsg; #endif @@ -60,6 +60,10 @@ class ClientRequestContext; class ClientHttpRequest +#if ICAP_CLIENT + : public ICAPInitiator, // to start ICAP transactions + public BodyConsumer // to receive reply bodies in request satisf. mode +#endif { public: @@ -158,18 +162,27 @@ #if ICAP_CLIENT public: - ICAPClientReqmodPrecache *icap; int doIcap(ICAPServiceRep::Pointer); - void icapSendRequestBody(MemBuf&); - static void icapSendRequestBodyWrapper(MemBuf&, void*); - void icapSpaceAvailable(); - void takeAdaptedHeaders(HttpMsg *); - void takeAdaptedBody(MemBuf *); - void doneAdapting(); - void abortAdapting(); + +private: + // ICAPInitiator API, called by ICAPXaction + virtual void noteIcapHeadersAdapted(); + virtual void noteIcapHeadersAborted(); + + // BodyConsumer API, called by BodyPipe + virtual void noteMoreBodyDataAvailable(BodyPipe &); + virtual void noteBodyProductionEnded(BodyPipe &); + virtual void noteBodyProducerAborted(BodyPipe &); + + void endRequestSatisfaction(); + void handleIcapFailure(); + +private: + ICAPModXact::Pointer icapHeadSource; + BodyPipe::Pointer icapBodySource; + bool request_satisfaction_mode; off_t request_satisfaction_offset; - bool icapDone; // ICAPClientVector has told us that it is finished. #endif };