--------------------- PatchSet 4668 Date: 2002/08/14 11:16:05 Author: rbcollins Branch: esi Tag: (none) Log: snapshotting clientStream changes - not fully functional Members: src/ESI.c:1.1.2.3->1.1.2.4 src/clientStream.c:1.1.2.2->1.1.2.3 src/client_side.c:1.65.2.3->1.65.2.4 src/client_side_reply.c:1.1.2.2->1.1.2.3 src/client_side_request.c:1.1.2.3->1.1.2.4 src/protos.h:1.59.2.3->1.59.2.4 src/store_client.c:1.13->1.13.4.1 src/structs.h:1.61.2.1->1.61.2.2 src/tools.c:1.24->1.24.4.1 src/typedefs.h:1.27.2.1->1.27.2.2 Index: squid/src/ESI.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/Attic/ESI.c,v retrieving revision 1.1.2.3 retrieving revision 1.1.2.4 diff -u -r1.1.2.3 -r1.1.2.4 --- squid/src/ESI.c 11 Aug 2002 09:51:14 -0000 1.1.2.3 +++ squid/src/ESI.c 14 Aug 2002 11:16:05 -0000 1.1.2.4 @@ -1,6 +1,6 @@ /* - * $Id: ESI.c,v 1.1.2.3 2002/08/11 09:51:14 rbcollins Exp $ + * $Id: ESI.c,v 1.1.2.4 2002/08/14 11:16:05 rbcollins Exp $ * * DEBUG: section 86 ESI processing * AUTHOR: Robert Collins @@ -35,23 +35,158 @@ #include "squid.h" +/* quick reference on behaviour here. + * The ESI specification 1.0 requires the ESI processor to be able to + * return an error code at any point in the processing. To that end + * we buffer the incoming esi body until we know we will be able to + * satisfy the request. At that point we start streaming the queued + * data downstream. + * + */ + +/* TODO: Factor the store memory segment management into a reusable code block */ +typedef struct _esiSegment { + char buf[HTTP_REQBUF_SZ]; + size_t len; /* how much data has been pushed into this */ + struct _esiSegment *next; +} esiSegment; +CBDATA_TYPE(esiSegment); + typedef struct _esiContext { int remaining_esi_requests; + int completed_requests; + struct { + int passthrough:1; + int oktosend:1; + int finished:1; + } flags; + HttpReply *rep; /* buffered until we pass data downstream */ + esiSegment *buffered; /* unprocessed data - for whatever reason */ + esiSegment *incoming; + esiSegment *outbound; /* processed data we are waiting to send, or for + * potential errors to be resolved + */ + size_t outbound_offset; /* the offset to the next character to send - + * non zero if we haven't sent the entire segment + * for some reason + */ + off_t readpos; /* the logical position we are reading from */ + off_t pos; /* the logical position of outbound_offset in the data stream */ } esiContext; CBDATA_TYPE(esiContext); +typedef struct _esiStreamContext { + clientHttpRequest *http; + esiContext *esi; +} esiStreamContext; + +CBDATA_TYPE (esiStreamContext); + /* Local functions */ static FREE esiContextFree; -//static void esiStartSub (void); - -CSR esiProcessStream; +static esiContext *esiContextNew(HttpReply *); +static FREE esiStreamContextFree; +static esiStreamContext *esiStreamContextNew (clientHttpRequest *, esiContext *); +static CSCB esiBufferRecipient; +/* static void esiStartSub (void); */ +static void esiFail (esiContext *context, clientHttpRequest *http); /* ESI TODO: * 1. retry failed upstream requests */ +/* request from downstream for more data + */ +void +esiStreamRead (clientStreamNode *this, clientHttpRequest *http) +{ + clientStreamNode *next; + esiContext *context; + /* Test preconditions */ + assert (this != NULL); + assert (cbdataReferenceValid (this)); + /* we are not in the chain until ESI is detected on a data callback */ + assert (this->data != NULL); + assert (this->node.prev != NULL); + assert (this->node.next != NULL); + + context = cbdataReference (this->data); + if (context->flags.passthrough) { + /* passthru mode - read into supplied buffers */ + next = this->node.next->data; + clientStreamRead (this, http, next->readoff, next->readlen, next->readbuf); + cbdataReferenceDone (context); + return; + } + + /* Ok, not passing through. Is there data to send? */ + if (context->flags.oktosend && context->outbound && + context->outbound_offset < context->outbound->len) { + /* Yes! Send it without asking for more upstream */ + /* memcopying because the client provided the buffer */ + /* TODO: skip data until pos == next->readoff; */ + clientStreamNode *next = this->node.next->data; + size_t len = next->readlen < context->outbound->len - context->outbound_offset ? + next->readlen : context->outbound->len - context->outbound_offset; + assert (context->pos == next->readoff); + xmemcpy (next->readbuf, &context->outbound->buf[context->outbound_offset], len); + if (len + context->outbound_offset == context->outbound->len) { + esiSegment *temp = context->outbound->next; + /* remove the used buffer */ + context->outbound_offset = 0; + cbdataReferenceDone (context->outbound); + context->outbound = temp; + } + context->pos += len; + clientStreamCallback (this, http, context->rep, next->readbuf, len); + cbdataReferenceDone (context); + return; + } + + if (context->flags.oktosend && context->flags.finished) { + /* We've finished process, and there is no more data buffered */ + debug (0,0)("Telling recipient EOF\n"); + clientStreamCallback (this, http, NULL, NULL, 0); + cbdataReferenceDone (context); + return; + } + + /* no data that is ready to send? well, lets get some */ + /* secure a buffer */ + if (!context->incoming) { + /* create a new buffer segment */ + CBDATA_INIT_TYPE(esiSegment); + context->buffered = cbdataAlloc (esiSegment); + context->incoming = context->buffered; + } + assert (context->incoming && context->incoming->len != HTTP_REQBUF_SZ); + + clientStreamRead (this, http, context->readpos, context->incoming->len - HTTP_REQBUF_SZ, + &context->incoming->buf[context->incoming->len]); + cbdataReferenceDone (context); +} + +static int +esiAlwaysPassthrough(http_status sline) +{ + switch (sline) { + case HTTP_CONTINUE: /* Should never reach us... but squid needs to alter to accomodate this */ + case HTTP_SWITCHING_PROTOCOLS: /* Ditto */ + case HTTP_PROCESSING: /* Unknown - some extension */ + case HTTP_NO_CONTENT: /* no body, no esi */ + case HTTP_NOT_MODIFIED: /* ESI does not affect assembled page headers, so 304s are valid */ + return 1; + /* unreached */ + break; + default: + return 0; + } +} + + + /* Process incoming data for ESI tags */ /* ESI TODO: Long term: we should have a framework to parse html/xml and * callback to a set of processors like this, to prevent multiple parsing @@ -63,41 +198,207 @@ * There is context data or a reply structure */ void -esiProcessStream (clientStreamNode *node, clientHttpRequest *http, HttpReply *rep, const char *body_data, ssize_t body_size) +esiProcessStream (clientStreamNode *this, clientHttpRequest *http, HttpReply *rep, const char *body_data, ssize_t body_size) { esiContext *context; /* test preconditions */ - assert (node != NULL); + assert (this != NULL); /* ESI TODO: handle this rather than asserting - it should only ever * happen if we cause an abort and the callback chain * loops back to here, so we can simply return. However, that itself * shouldn't happen, so it stays as an assert for now. */ - assert (cbdataReferenceValid (node)); + assert (cbdataReferenceValid (this)); /* * if data is NULL this is the first entrance. If rep is also NULL, * something is wrong. * */ - assert (node->data != NULL || rep); - assert (node->next != NULL); - if (!node->data) - { + assert (this->data != NULL || rep); + assert (this->node.next != NULL); + if (!this->data) /* setup ESI context from reply headers */ - CBDATA_INIT_TYPE_FREECB(esiContext, esiContextFree); - node->data = cbdataAlloc(esiContext); - context = node->data; - /* ESI TODO: remove this hardcoded test parameter */ - context->remaining_esi_requests = 2; - } - context = cbdataReference(node->data); - debug(86, 0) ("clientSendMoreData: Will be checking for ESI content here - %d sub requests remaining\n", context->remaining_esi_requests); + this->data = esiContextNew(rep); + context = cbdataReference(this->data); + /* Finished all ESI processing. All remaining data gets untouched. + * Mainly used when an error has been detected to prevent ESI processing the error body + */ + if (context->flags.passthrough) { + cbdataReferenceDone(context); + clientStreamCallback (this, http, rep, body_data, body_size); + return; + } + + /* Can we generate any data ?*/ + + if (body_data) { + assert (body_size <= HTTP_REQBUF_SZ); + /* secure the data for later use */ + if (!context->incoming) { + /* create a new buffer segment */ + CBDATA_INIT_TYPE(esiSegment); + context->buffered = cbdataAlloc (esiSegment); + context->incoming = context->buffered; + } + if (body_data != &context->incoming->buf[context->incoming->len]) { + /* We have to copy the data out because we didn't supply this buffer */ + size_t space = HTTP_REQBUF_SZ - context->incoming->len; + size_t len = space < body_size ? space : body_size; + xmemcpy (&context->incoming->buf[context->incoming->len], body_data, len); + context->incoming->len += len; + if (context->incoming->len == HTTP_REQBUF_SZ) { + /* append another buffer */ + context->incoming->next = cbdataAlloc (esiSegment); + context->incoming = context->incoming->next; + } + if (len != body_size) { + /* capture the remnants */ + xmemcpy (context->incoming->buf, &body_data[len], body_size - len); + context->incoming->len = body_size - len; + } + /* and note where we are up to */ + context->readpos += body_size; + } else { + /* update our position counters, and if needed assign a new buffer */ + context->incoming->len += body_size; + assert (context->incoming->len <= HTTP_REQBUF_SZ); + if (context->incoming->len > HTTP_REQBUF_SZ * 3 / 4) { + /* allocate a new buffer - to stop us asking for ridiculously small amounts */ + context->incoming->next = cbdataAlloc (esiSegment); + context->incoming = context->incoming->next; + } + context->readpos += body_size; + } + } + + debug(86, 0) ("esiProcessStream: Will be checking for ESI content here - %d sub requests remaining\n", context->remaining_esi_requests); + + /* HACK to test buffering */ + /* EOF / Read error / aborted entry */ + if (rep == NULL && body_data == NULL && body_size == 0 && !context->flags.finished) { + /* Bit test the entry for aborts */ + /* flush the esi processor */ + debug (0,0)("Flushing now \n"); + context->flags.oktosend = 1; + context->flags.finished = 1; + context->outbound = context->buffered; + context->buffered = NULL; + context->incoming = NULL; + cbdataReferenceDone(context); + esiProcessStream (this, http, rep, body_data, body_size); + return; + } + + /* send any processed data */ + if (context->flags.oktosend && context->outbound && + context->outbound_offset < context->outbound->len) { + /* Yes! Send it without asking for more upstream */ + /* memcopying because the client provided the buffer */ + /* TODO: skip data until pos == next->readoff; */ + clientStreamNode *next = this->node.next->data; + size_t len = next->readlen < context->outbound->len - context->outbound_offset ? + next->readlen : context->outbound->len - context->outbound_offset; + /* prevent corruption on range requests, even thought we don't support them yet */ + assert (context->pos == next->readoff); + xmemcpy (next->readbuf, &context->outbound->buf[context->outbound_offset], len); + if (len + context->outbound_offset == context->outbound->len) { + esiSegment *temp = context->outbound->next; + /* remove the used buffer */ + context->outbound_offset = 0; + cbdataFree (context->outbound); + context->outbound = temp; + } + context->pos += len; + clientStreamCallback (this, http, context->rep, next->readbuf, len); + cbdataReferenceDone (context); + return; + } + + /* ok.. no data sent, try to pull more data in from upstream. + * FIXME: Don't try this if we have finished reading the template + */ + assert (context->incoming && context->incoming->len != HTTP_REQBUF_SZ); + clientStreamRead (this, http, context->readpos, context->incoming->len - HTTP_REQBUF_SZ, + &context->incoming->buf[context->incoming->len]); + + + cbdataReferenceDone (context); +#if 0 + + /* no data to send. Try to generate some */ + if (body_data) { + assert (body_size <= HTTP_REQBUF_SZ); + /* secure the data for later use */ + if (!context->incoming) { + /* create a new buffer segment */ + CBDATA_INIT_TYPE(esiSegment); + context->buffered = cbdataAlloc (esiSegment); + context->incoming = context->buffered; + } + if (body_data != &context->incoming->buf[context->incoming->len]) { + /* We have to copy the data out because we didn't supply this buffer */ + size_t space = HTTP_REQBUF_SZ - context->incoming->len; + xmemcpy (&context->incoming->buf[context->incoming->len], body_data, space); + context->incoming->len += space; + if (context->incoming->len == HTTP_REQBUF_SZ) { + /* append another buffer */ + context->incoming->next = cbdataAlloc (esiSegment); + context->incoming = context->incoming->next; + } + if (space != body_size) { + /* capture the remnants */ + xmemcpy (context->incoming->buf, &body_data[space], body_size - space); + context->incoming->len = body_size - space; + } + } else { + /* update our position counters, and if needed assign a new buffer */ + context->incoming->len += body_size; + assert (context->incoming->len <= HTTP_REQBUF_SZ); + if (context->incoming->len > HTTP_REQBUF_SZ / 2) { + /* allocate a new buffer */ + context->incoming->next = cbdataAlloc (esiSegment); + context->incoming = context->incoming->next; + } + } + } + + debug(86, 0) ("clientSendMoreData: Will be checking for ESI content here - %d sub requests remaining\n", context->remaining_esi_requests); + + /* HACK to test buffering */ + /* EOF / Read error / aborted entry */ + if (rep == NULL && body_data == NULL && body_size == 0) { + /* Bit test the entry for aborts */ + /* flush the esi processor */ + context->flags.oktosend = 1; + cbdataReferenceDone(context); + esiProcessStream (this, http, rep, body_data, body_size); + return; + } + + clientStreamRead(this, + /* Read more data, we can't do anything else here */ + storeClientCopy(http->sc, http->entry, + http->out.offset, + HTTP_REQBUF_SZ - context->incoming->len, + context->incoming, + clientSendMoreData, + http); + cbdataReferenceDone(context); + return; + +#endif + + + if (0) /* Kick off a request */ + if (context->remaining_esi_requests--) { HttpHeader tempheaders; + esiStreamContext *esiStream = esiStreamContextNew (http, context); + assert (esiStream != NULL); httpHeaderInit (&tempheaders, hoRequest); httpHeaderPutStr (&tempheaders, HDR_SURROGATE_CAPABILITY, "Surrogate/1.0"); - if (context->remaining_esi_requests-- && clientBeginRequest(METHOD_GET, "http://192.168.0.2/ESI_fragment_1.txt", esiBufferRecipient, NULL, &tempheaders)) { + if (clientBeginRequest(METHOD_GET, "http://192.168.0.2/ESI_fragment_1.txt", esiBufferRecipient, esiStream, &tempheaders)) { debug (86,0 ) ("starting new ESI subrequest failed\n"); } httpHeaderClean (&tempheaders); @@ -108,9 +409,15 @@ /* Bit test the entry for aborts */ /* flush the esi processor */ } - - cbdataReferenceDone(context); - node->next->func (node->next, http, rep, body_data, body_size); + + if (0) + // send an error + { + esiFail (context, http); + cbdataReferenceDone(context); + return; + } +// cbdataReferenceDone(context); } void @@ -120,26 +427,74 @@ return; } +esiContext * +esiContextNew (HttpReply *rep) +{ + esiContext *rv; + CBDATA_INIT_TYPE_FREECB(esiContext, esiContextFree); + rv = cbdataAlloc(esiContext); + assert (rep); + /* ESI TODO: remove this hardcoded test parameter */ + rv->remaining_esi_requests = 2; + rv->rep = rep; + if (esiAlwaysPassthrough(rep->sline.status)) { + rv->flags.passthrough = 1; + } else { + /* TODO: remove Etag, content-length headers */ + } + return rv; +} + +void esiFail (esiContext *context, clientHttpRequest *http) +{ + ErrorState *err; + /* Stop altering this request */ + context->flags.passthrough = 1; + /* create an error object */ + err = clientBuildError(ERR_TOO_BIG, HTTP_FORBIDDEN, NULL, + http->conn ? &http->conn->peer.sin_addr : &no_addr, http->request); + /* cancel the current request data */ + storeUnregister(http->sc, http->entry, http); + http->sc = NULL; + + storeUnlockObject(http->entry); + /* create a NEW reply. */ + { + clientStreamNode *node = http->stream_head.tail->prev->data; + http->entry = clientCreateStoreEntry(node->data, http->request->method, + + null_request_flags); + } + errorAppendEntry(http->entry, err); + if (context->rep) { + httpReplyDestroy(context->rep); + context->rep = NULL; + } +} + /* - * Write a chunk of data to a client socket. If the reply is present, send the reply headers down the wire too, + * Write a chunk of data to a client 'socket'. + * If the reply is present, send the reply headers down the wire too, * and clean them up when finished. * Pre-condition: - * The request is one backed by a connection, not an internal request. - * data context is null + * The request is an internal ESI subrequest. + * data context is not NULL * There are no more entries in the stream chain. */ void esiBufferRecipient (clientStreamNode *node, clientHttpRequest *http, HttpReply *rep, const char *body_data, ssize_t body_size) { + esiStreamContext *esiStream; /* Test preconditions */ assert (node != NULL); /* ESI TODO: handle this rather than asserting - it should only ever happen if we cause an abort and the callback chain * loops back to here, so we can simply return. However, that itself shouldn't happen, so it stays as an assert for now. */ assert (cbdataReferenceValid (node)); - /* ESI TODO: give this routine it's context */ - assert (node->data == NULL); - assert (node->next == NULL); + assert (node->data != NULL); + assert (node->node.next == NULL); assert (http->conn == NULL); + + esiStream = cbdataReference (node->data); debug (86,0) ("esiBufferRecipient\n"); @@ -169,7 +524,7 @@ #endif if (rep) { //mb = httpReplyPack(rep); - http->out.offset += rep->hdr_sz; +// http->out.offset += rep->hdr_sz; #if HEADERS_LOG // should be done in the store rather than ever recipient? headersLog(0, 0, http->request->method, rep); @@ -195,27 +550,27 @@ case 1: /* ok */ // usertell request finished debug (86,0)("ESI subrequest finished OK\n"); + cbdataReferenceDone (esiStream); + httpRequestFree (http); return; case 2: // usertell request failed debug (86,0)("ESI subrequest failed transfer\n"); + cbdataReferenceDone (esiStream); + httpRequestFree (http); return; case 3: /* More data will be coming from primary server; register with * storage manager. */ - http->reqofs = 0; - storeClientCopy(http->sc, http->entry, - http->out.offset, - HTTP_REQBUF_SZ, - http->reqbuf, - clientSendMoreData, - http); - debug (86,0)("Requesting more data for ESI subrequest\n"); + clientStreamRead (node->node.prev->data, + http, http->out.offset, + HTTP_REQBUF_SZ, + http->reqbuf); + debug (86,0)("Requested more data for ESI subrequest\n"); break; default:fatal ("Hit unreachable code in esiBufferRecipient\n"); } - - + cbdataReferenceDone (esiStream); } #if 0 @@ -232,3 +587,24 @@ } #endif +/* esiStream functions */ +void +esiStreamContextFree (void *data) +{ + esiStreamContext *esiStream = data; + assert (esiStream); + cbdataReferenceDone (esiStream->http); + cbdataReferenceDone (esiStream->esi); + debug (0,0)("Freeing stream context\n"); +} + +esiStreamContext * +esiStreamContextNew (clientHttpRequest *http, esiContext *esi) +{ + esiStreamContext *rv = NULL; + CBDATA_INIT_TYPE_FREECB(esiStreamContext, esiStreamContextFree); + rv = cbdataAlloc(esiStreamContext); + rv->http = cbdataReference (http); + rv->esi = cbdataReference (esi); + return rv; +} Index: squid/src/clientStream.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/Attic/clientStream.c,v retrieving revision 1.1.2.2 retrieving revision 1.1.2.3 diff -u -r1.1.2.2 -r1.1.2.3 --- squid/src/clientStream.c 9 Aug 2002 11:58:45 -0000 1.1.2.2 +++ squid/src/clientStream.c 14 Aug 2002 11:16:05 -0000 1.1.2.3 @@ -1,6 +1,6 @@ /* - * $Id: clientStream.c,v 1.1.2.2 2002/08/09 11:58:45 rbcollins Exp $ + * $Id: clientStream.c,v 1.1.2.3 2002/08/14 11:16:05 rbcollins Exp $ * * DEBUG: section 87 Client-side Strean routines. * AUTHOR: Robert Collins @@ -33,34 +33,100 @@ * */ +/* a client Stream is a uni directional pipe, with the usual non-blocking + * asynchronous approach present elsewhere in squid. + * + * Each pipe node has a data push function, and a data request function. + * This limits flexability - the data flow is no longer assembled at each + * step. + * + * An alternative approach is to pass each node in the pipe the call- + * back to use on each IO call. This allows the callbacks to be changed + * very easily by a participating node, but requires more maintenance + * in each node (store the call back to the msot recent IO request in + * the nodes context.) Such an approach also prevents dynamically + * changing the pipeline from outside without an additional interface + * method to extract the callback and context from the next node. + * + * One important characteristic of the stream is that the readfunc + * on the terminating node, and the callback on the first node + * will be NULL, and never used. + */ + #include "squid.h" CBDATA_TYPE(clientStreamNode); +/* TODO: rather than each node undeleting the next, have a clientStreamDelete that walks the list + */ /* Local functions */ static FREE clientStreamFree; clientStreamNode * -clientStreamNew (CSR *func, void *data) +clientStreamNew (CSR *readfunc, CSCB *callback, void *data) { clientStreamNode *temp; CBDATA_INIT_TYPE_FREECB(clientStreamNode, clientStreamFree); temp = cbdataAlloc (clientStreamNode); - temp->func = func; + temp->readfunc = readfunc; + temp->callback = callback; temp->data = data; return temp; } +/* Initialise a client Stream. + */ void -clientStreamInsertHead (clientStreamNode **head, CSR *func, void *data) +clientStreamInit (dlink_list *list, CSR *func, void *readdata, CSCB*callback, void *callbackdata) +{ + clientStreamNode *temp = clientStreamNew(func, NULL, readdata); + dlinkAdd (temp, &temp->node, list); + /* FIXME: clientStreamDelete */ + cbdataReference (temp); + temp->head = list; + clientStreamInsertHead (list, NULL, callback, callbackdata); +} + +/* Doesn't actually insert at head. Instead it inserts one *after* + * head. This is because HEAD is a special node, as is tail + * This function is not suitable for inserting the real HEAD. + */ +void +clientStreamInsertHead (dlink_list *list, CSR *func, CSCB *callback, void *data) { clientStreamNode *temp; /* test preconditions */ - assert (head != NULL); - temp = clientStreamNew(func, data); - temp->next = *head; - *head = cbdataReference (temp); + assert (list != NULL); + assert (list->head); + temp = clientStreamNew(func, callback, data); + temp->head = list; + dlinkAddAfter (temp, &temp->node, list->head, list); + /* FIXME (clientStreamDelete */ + cbdataReference (temp); +} + +/* Callback the next node the in chain with it's requested data + */ +void +clientStreamCallback (clientStreamNode *this, clientHttpRequest *http, HttpReply *rep, const char *body_data, ssize_t body_size) +{ + /* No asserts for speed. This could even be a #define if needed */ + clientStreamNode *next = this->node.next->data; + next->callback (next, http, rep, body_data, body_size); +} + +/* Call the previous node in the chain to read some data */ +void +clientStreamRead (clientStreamNode *this, clientHttpRequest *http, off_t readoff, size_t readlen, char *readbuf) +{ + /* no asserts for speed. This could even be a #define if needed */ + /* place the parameters on the 'stack' */ + clientStreamNode *prev = this->node.prev->data; + this->readoff = readoff; + this->readlen = readlen; + this->readbuf = readbuf; + prev->readfunc (prev,http); } @@ -68,16 +134,17 @@ void clientStreamFree (void *foo) { - clientStreamNode *node = foo; - debug (87, 9) ("Freeing clientStreamNode %p\n", node); + clientStreamNode *this = foo; + debug (87, 9) ("Freeing clientStreamNode %p\n", this); /* ESI TODO: push refcount class through to head */ - if (node->data) { - cbdataFree (node->data); + if (this->data) { + cbdataFree (this->data); } - if (node->next) { - clientStreamNode *temp = node->next; - cbdataReferenceDone (temp); - cbdataFree (node->next); + if (this->node.next) { + clientStreamNode *temp = this->node.next->data; + cbdataReferenceDone (this->node.next->data); + dlinkDelete (&this->node, this->head); + cbdataFree (temp); } } Index: squid/src/client_side.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/client_side.c,v retrieving revision 1.65.2.3 retrieving revision 1.65.2.4 diff -u -r1.65.2.3 -r1.65.2.4 --- squid/src/client_side.c 11 Aug 2002 10:16:35 -0000 1.65.2.3 +++ squid/src/client_side.c 14 Aug 2002 11:16:05 -0000 1.65.2.4 @@ -1,6 +1,6 @@ /* - * $Id: client_side.c,v 1.65.2.3 2002/08/11 10:16:35 rbcollins Exp $ + * $Id: client_side.c,v 1.65.2.4 2002/08/14 11:16:05 rbcollins Exp $ * * DEBUG: section 33 Client-side Routines * AUTHOR: Duane Wessels @@ -33,6 +33,26 @@ * */ +/* Errors and client side + * + * Problem the first: the store entry is no longer authoritative on the + * reply status. EBITTEST (E_ABORT) is no longer valid. + * Problem the second: resources are wasted if we delay in cleaning up. + * Problem the third we can't depend on a connection close to clean up. + * + * Nice thing the first: Any step in the stream can callback with data + * representing an error. + * Nice thing the second: once you stop requesting reads from upstream, + * upstream stops too. + * + * Solution #1: Error has a callback mechanism to hand over a membuf + * with the error content. The failing node pushes that back as the + * reply. + * How to deal with pre-stream errors? + * Tell client_side_reply that we *want* an error page before any + * stream calls occur. + */ + #include "squid.h" #if IPF_TRANSPARENT @@ -85,6 +105,24 @@ #define FAILURE_MODE_TIME 300 +/* Persistent connection logic: + * + * requests (httpClientRequest structs) get added to the connection + * list, with the current one being chr + * + * The request is *immediately* kicked off, and data flows through + * to clientSocketRecipient. + * + * If the data that arrives at clientSocketRecipient is not for the current + * request, clientSocketRecipient simply returns, without requesting more + * data, or sending it. + * + * ClientKeepAliveNextRequest will then detect the presence of data in + * the next clientHttpRequest, and will send it, restablishing the + * data flow. + */ + + /* Local functions */ static CWCB clientWriteComplete; @@ -99,13 +137,22 @@ #if USE_IDENT static IDCB clientIdentDone; #endif -static CSR clientSocketRecipient; +static CSCB clientSocketRecipient; static void clientSetKeepaliveFlag(clientHttpRequest *); static int clientCheckContentLength(request_t * r); static DEFER httpAcceptDefer; static int clientRequestBodyTooLarge(int clen); static void clientProcessBody(ConnStateData * conn); +/* Move to protos.h - it's used anywhere on the request pipeline, before + * any reply data flows + */ +void +clientSetReplyToError (void *data, + err_type err, http_status status, method_t method, char const *uri, + struct in_addr *addr, request_t *failedrequest, char *unparsedrequest) +; + #if USE_IDENT static void clientIdentDone(const char *ident, void *data) @@ -180,16 +227,15 @@ } } -static void +void httpRequestFree(void *data) { clientHttpRequest *http = data; - clientHttpRequest **H; ConnStateData *conn = http->conn; StoreEntry *e; request_t *request = http->request; MemObject *mem = NULL; - debug(33, 3) ("httpRequestFree: %s\n", storeUrl(http->entry)); + debug(33, 3) ("httpRequestFree: %s\n", http->uri); if (!clientCheckTransferDone(http)) { if (request && request->body_connection) clientAbortBody(request); /* abort body transter */ @@ -210,7 +256,7 @@ http->al.http.code = mem->reply->sline.status; http->al.http.content_type = strBuf(mem->reply->content_type); } - http->al.cache.caddr = conn->log_addr; + http->al.cache.caddr = conn ? conn->log_addr : no_addr; http->al.cache.size = http->out.size; http->al.cache.code = http->log_type; http->al.cache.msec = tvSubMsec(http->start, current_time); @@ -229,14 +275,15 @@ authenticateAuthUserRequestUnlock(request->auth_user_request); request->auth_user_request = NULL; } - if (conn->rfc931[0]) + if (conn && conn->rfc931[0]) http->al.cache.rfc931 = conn->rfc931; packerClean(&p); memBufClean(&mb); } accessLogLog(&http->al); clientUpdateCounters(http); - clientdbUpdate(conn->peer.sin_addr, http->log_type, PROTO_HTTP, http->out.size); + if (conn) + clientdbUpdate(conn->peer.sin_addr, http->log_type, PROTO_HTTP, http->out.size); } if (http->acl_checklist) aclChecklistFree(http->acl_checklist); @@ -265,22 +312,25 @@ requestUnlink(http->request); /* ESI TODO: push refcount class through to head */ { - clientStreamNode *temp = http->stream_head; + clientStreamNode *temp = http->stream_head.head->data; cbdataReferenceDone (temp); - cbdataFree (http->stream_head); + cbdataFree (http->stream_head.head->data); } assert(http != http->next); - assert(http->conn->chr != NULL); - /* Unlink us from the clients request list */ - H = &http->conn->chr; - while (*H) { - if (*H == http) - break; - H = &(*H)->next; + if (conn) { + clientHttpRequest **H; + assert(http->conn->chr != NULL); + /* Unlink us from the clients request list */ + H = &http->conn->chr; + while (*H) { + if (*H == http) + break; + H = &(*H)->next; + } + assert(*H != NULL); + *H = http->next; + http->next = NULL; } - assert(*H != NULL); - *H = http->next; - http->next = NULL; dlinkDelete(&http->active, &ClientActiveRequests); cbdataFree(http); } @@ -393,7 +443,7 @@ * and clean them up when finished. * Pre-condition: * The request is one backed by a connection, not an internal request. - * data context is null + * data context is NULL * There are no more entries in the stream chain. */ static void @@ -402,13 +452,25 @@ int fd; /* Test preconditions */ assert (node != NULL); - /* ESI TODO: handle this rather than asserting - it should only ever happen if we cause an abort and the callback chain - * loops back to here, so we can simply return. However, that itself shouldn't happen, so it stays as an assert for now. */ + /* ESI TODO: handle this rather than asserting - it should only ever happen if we cause an abort and + * the callback chain loops back to here, so we can simply return. + * However, that itself shouldn't happen, so it stays as an assert for now. + */ assert (cbdataReferenceValid (node)); assert (node->data == NULL); - assert (node->next == NULL); + assert (node->node.next == NULL); assert (http->conn && http->conn->fd != -1); fd = http->conn->fd; + if (http->conn->chr != http) { + /* there is another object in progress, defer this one */ + debug(33, 2) ("clientSocketRecipient: Deferring %s\n", http->uri); + http->flags.deferred = 1; + http->deferredparams.node = node; + http->deferredparams.rep = rep; + http->deferredparams.body_data = body_data; + http->deferredparams.body_size = body_size; + return; + } /* EOF / Read error / aborted entry */ if (rep == NULL && body_data == NULL && body_size == 0) { clientWriteComplete(fd, NULL, 0, COMM_OK, http); @@ -429,7 +491,7 @@ /* init mb; put status line and headers if any */ if (rep) { mb = httpReplyPack(rep); - http->out.offset += rep->hdr_sz; +/* http->out.offset += rep->hdr_sz; */ #if HEADERS_LOG headersLog(0, 0, http->request->method, rep); #endif @@ -468,7 +530,6 @@ clientKeepaliveNextRequest(clientHttpRequest * http) { ConnStateData *conn = http->conn; - StoreEntry *entry; debug(33, 3) ("clientKeepaliveNextRequest: FD %d\n", conn->fd); conn->defer.until = 0; /* Kick it to read a new request */ @@ -495,7 +556,7 @@ /* * Note, the FD may be closed at this point. */ - } else if ((entry = http->entry) == NULL) { + } else if (http->entry == NULL) { /* * this request is in progress, maybe doing an ACL or a redirect, * execution will resume after the operation completes. @@ -503,32 +564,20 @@ } else { debug(33, 2) ("clientKeepaliveNextRequest: FD %d Sending next\n", conn->fd); - assert(entry); - if (0 == storeClientCopyPending(http->sc, entry, http)) { - if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) + assert(http->entry); + if (0 == storeClientCopyPending(http->sc, http->entry, http)) { + if (EBIT_TEST(http->entry->flags, ENTRY_ABORTED)) debug(33, 0) ("clientKeepaliveNextRequest: ENTRY_ABORTED\n"); - /* If we have any data in our reqbuf, use it */ - if (http->reqsize > 0) { - /* - * We can pass in reqbuf/size here, since clientSendMoreData ignores what - * is passed and uses them itself.. :-) - * -- adrian - */ - clientSendMoreData(http, http->reqbuf, http->reqsize); - } else { - assert(http->out.offset == 0); - /* - * here - have no data (don't ever think we get here..) - * so lets start copying.. - * -- adrian - */ - storeClientCopy(http->sc, entry, - http->out.offset, - HTTP_REQBUF_SZ, - http->reqbuf, - clientSendMoreData, - http); + /* If the client stream is waiting on a socket write to occured, thenm */ + if (http->flags.deferred) { + /* NO data is allowed to have been sent */ + assert (http->out.size == 0); + clientSocketRecipient (http->deferredparams.node, http, + http->deferredparams.rep, + http->deferredparams.body_data, + http->deferredparams.body_size); } + /* otherwise, the request is still active, and we are done */ } } } @@ -544,7 +593,7 @@ StoreEntry *entry = http->entry; http->out.size += size; debug(33, 5) ("clientWriteComplete: FD %d, sz %ld, err %d, off %ld, len %d\n", - fd, (long int) size, errflag, (long int) http->out.offset, entry ? objectLen(entry) : 0); + fd, (long int) size, errflag, (long int) http->out.size, entry ? objectLen(entry) : 0); if (size > 0 && fd > -1) { kb_incr(&statCounter.client_http.kbytes_out, size); if (isTcpHit(http->log_type)) @@ -570,20 +619,25 @@ comm_close (fd); return; case 3: - /* More data will be coming from primary server; register with - * storage manager. */ + /* More data will be coming from the stream. */ http->reqofs = 0; - storeClientCopy(http->sc, entry, +/* storeClientCopy(http->sc, entry, http->out.offset, HTTP_REQBUF_SZ, http->reqbuf, clientSendMoreData, - http); + http); */ + clientStreamRead (http->stream_head.tail->data, http, http->out.offset, + HTTP_REQBUF_SZ, + http->reqbuf); + break; default:fatal ("Hit unreachable code in clientWriteComplete\n"); } } +extern CSR clientGetMoreData; + static clientHttpRequest * parseHttpRequestAbort(ConnStateData * conn, const char *uri) { @@ -595,9 +649,7 @@ http->uri = xstrdup(uri); http->log_uri = xstrndup(uri, MAX_URL); http->reqbuf = http->norm_reqbuf; - /* default socket stream terminator */ - http->stream_head = NULL; - clientStreamInsertHead (&http->stream_head, clientSocketRecipient, NULL); + clientStreamInit (&http->stream_head, clientGetMoreData, clientReplyNewContext (http), clientSocketRecipient, NULL); dlinkAdd(http, &http->active, &ClientActiveRequests); return http; } @@ -755,9 +807,7 @@ http->start = current_time; http->req_sz = prefix_sz; http->reqbuf = http->norm_reqbuf; - /* default socket based stream terminator */ - http->stream_head = NULL; - clientStreamInsertHead (&http->stream_head, clientSocketRecipient, NULL); + clientStreamInit (&http->stream_head, clientGetMoreData, clientReplyNewContext (http), clientSocketRecipient, NULL); *prefix_p = xmalloc(prefix_sz + 1); xmemcpy(*prefix_p, conn->in.buf, prefix_sz); *(*prefix_p + prefix_sz) = '\0'; @@ -1067,6 +1117,7 @@ if (!http) safe_free(prefix); if (http) { + /* We have an initial client stream in place should it be needed */ assert(http->req_sz > 0); conn->in.offset -= http->req_sz; assert(conn->in.offset >= 0); @@ -1083,22 +1134,41 @@ conn->nrequests++; commSetTimeout(fd, Config.Timeout.lifetime, clientLifetimeTimeout, http); if (parser_return_code < 0) { + clientStreamNode *node = http->stream_head.tail->prev->data; debug(33, 1) ("clientReadRequest: FD %d Invalid Request\n", fd); + clientSetReplyToError (node->data, + ERR_INVALID_REQ, HTTP_BAD_REQUEST, method, NULL, &conn->peer.sin_addr, + NULL, conn->in.buf); +#if 0 err = clientBuildError(ERR_INVALID_REQ, HTTP_BAD_REQUEST, NULL, &conn->peer.sin_addr, NULL); err->request_hdrs = xstrdup(conn->in.buf); - http->entry = clientCreateStoreEntry(http, method, null_request_flags); + /* FIXME */ + http->entry = clientCreateStoreEntry(node->data, method, null_request_flags); errorAppendEntry(http->entry, err); +#endif + clientStreamRead (http->stream_head.tail->data, http, 0, + HTTP_REQBUF_SZ, + http->reqbuf); safe_free(prefix); break; } if ((request = urlParse(method, http->uri)) == NULL) { + clientStreamNode *node = http->stream_head.tail->prev->data; debug(33, 5) ("Invalid URL: %s\n", http->uri); + clientSetReplyToError (node->data, + ERR_INVALID_URL, HTTP_BAD_REQUEST, method, http->uri, + &conn->peer.sin_addr, NULL, NULL); + clientStreamRead (http->stream_head.tail->data, http, 0, + HTTP_REQBUF_SZ, + http->reqbuf); +#if 0 err = clientBuildError(ERR_INVALID_URL, HTTP_BAD_REQUEST, http->uri, &conn->peer.sin_addr, NULL); http->al.http.code = err->http_status; - http->entry = clientCreateStoreEntry(http, method, null_request_flags); + http->entry = clientCreateStoreEntry(node->data, method, null_request_flags); errorAppendEntry(http->entry, err); +#endif safe_free(prefix); break; } else { @@ -1137,19 +1207,21 @@ request->http_ver = http->http_ver; if (!urlCheckRequest(request) || httpHeaderHas(&request->header, HDR_TRANSFER_ENCODING)) { + clientStreamNode *node = http->stream_head.tail->prev->data; err = clientBuildError(ERR_UNSUP_REQ, HTTP_NOT_IMPLEMENTED, NULL, &conn->peer.sin_addr, request); request->flags.proxy_keepalive = 0; http->al.http.code = err->http_status; - http->entry = clientCreateStoreEntry(http, request->method, null_request_flags); + http->entry = clientCreateStoreEntry(node->data, request->method, null_request_flags); errorAppendEntry(http->entry, err); break; } if (!clientCheckContentLength(request)) { + clientStreamNode *node = http->stream_head.tail->prev->data; err = clientBuildError(ERR_INVALID_REQ, HTTP_LENGTH_REQUIRED, NULL, &conn->peer.sin_addr, request); http->al.http.code = err->http_status; - http->entry = clientCreateStoreEntry(http, request->method, null_request_flags); + http->entry = clientCreateStoreEntry(node->data, request->method, null_request_flags); errorAppendEntry(http->entry, err); break; } @@ -1161,9 +1233,10 @@ request->body_connection = conn; /* Is it too large? */ if (clientRequestBodyTooLarge(request->content_length)) { + clientStreamNode *node = http->stream_head.tail->prev->data; err = clientBuildError(ERR_TOO_BIG, HTTP_REQUEST_ENTITY_TOO_LARGE, NULL, &conn->peer.sin_addr, http->request); - http->entry = clientCreateStoreEntry(http, + http->entry = clientCreateStoreEntry(node->data, METHOD_NONE, null_request_flags); errorAppendEntry(http->entry, err); break; @@ -1177,6 +1250,7 @@ * is happy with the input */ if (conn->in.offset >= Config.maxRequestHeaderSize) { + clientStreamNode *node = http->stream_head.tail->prev->data; /* The request is too large to handle */ debug(33, 1) ("Request header is too large (%d bytes)\n", (int) conn->in.offset); @@ -1188,7 +1262,7 @@ /* add to the client request queue */ for (H = &conn->chr; *H; H = &(*H)->next); *H = http; - http->entry = clientCreateStoreEntry(http, METHOD_NONE, null_request_flags); + http->entry = clientCreateStoreEntry(node->data, METHOD_NONE, null_request_flags); errorAppendEntry(http->entry, err); return; } Index: squid/src/client_side_reply.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/Attic/client_side_reply.c,v retrieving revision 1.1.2.2 retrieving revision 1.1.2.3 diff -u -r1.1.2.2 -r1.1.2.3 --- squid/src/client_side_reply.c 9 Aug 2002 12:18:53 -0000 1.1.2.2 +++ squid/src/client_side_reply.c 14 Aug 2002 11:16:06 -0000 1.1.2.3 @@ -1,6 +1,6 @@ /* - * $Id: client_side_reply.c,v 1.1.2.2 2002/08/09 12:18:53 rbcollins Exp $ + * $Id: client_side_reply.c,v 1.1.2.3 2002/08/14 11:16:06 rbcollins Exp $ * * DEBUG: section 88 Client-side Reply Routines * AUTHOR: Robert Collins (Originally Duane Wessels in client_side.c) @@ -35,28 +35,78 @@ #include "squid.h" +typedef struct _clientReplyContext { + clientHttpRequest *http; + int headers_sz; + struct { + int storelogiccomplete:1; + } flags; + clientStreamNode *ourNode; /* This will go away if/when this file gets refactored some more */ +} clientReplyContext; +CBDATA_TYPE (clientReplyContext); + static const char *const crlf = "\r\n"; /* Local functions */ -extern aclCheck_t * clientAclChecklistCreate(const acl_access * acl, const clientHttpRequest * http); /* used in all client_* source files */ static int clientGotNotEnough(clientHttpRequest const *); -/* static */ int clientReplyBodyTooLarge(HttpReply *, ssize_t); -/* This will go away when clientProcessRequest gets split */ -extern void clientProcessRequest(clientHttpRequest *); +static int clientReplyBodyTooLarge(HttpReply *, ssize_t); static int clientOnlyIfCached(clientHttpRequest * http); -static void clientProcessExpired(void *data); -/* used from the request side */ void clientProcessMiss(clientHttpRequest *); - STCB clientCacheHit; -static void clientProcessOnlyIfCachedMiss(clientHttpRequest *); +static void clientProcessExpired(clientReplyContext *); +static void clientProcessMiss(clientReplyContext *); +static STCB clientCacheHit; +static void clientProcessOnlyIfCachedMiss(clientReplyContext *); static int clientGetsOldEntry(StoreEntry * new, StoreEntry * old, request_t * request); static STCB clientHandleIMSReply; static int modifiedSince(StoreEntry *, request_t *); +static log_type clientIdentifyStoreObject(clientHttpRequest * http); +/* The clientReply clean interface */ +/* privates */ +static FREE clientReplyFree; +void +clientReplyFree (void *data) +{ + clientReplyContext *this = data; + cbdataReferenceDone (this->http); +} + +void * +clientReplyNewContext (clientHttpRequest *clientContext) +{ + clientReplyContext *context; + CBDATA_INIT_TYPE_FREECB (clientReplyContext, clientReplyFree); + context = cbdataAlloc (clientReplyContext); + context->http = cbdataReference (clientContext); + return context; +} + +void clientSetReplyToError (void *,err_type,http_status,method_t,char const *,struct in_addr *, request_t *, char *); + + +/* create an error in the store awaiting the client side to read it. */ +void +clientSetReplyToError (void *data, + err_type err, http_status status, method_t method, char const *uri, + struct in_addr *addr, request_t *failedrequest, char *unparsedrequest) +{ + clientReplyContext *context = data; + ErrorState *errstate = clientBuildError(err, status, uri, addr, failedrequest); + if (unparsedrequest) + errstate->request_hdrs = xstrdup (unparsedrequest); + + if (status == HTTP_NOT_IMPLEMENTED) + /* prevent confusion over whether we default to persistent or not */ + context->http->request->flags.proxy_keepalive = 0; + context->http->al.http.code = errstate->http_status; + + context->http->entry = clientCreateStoreEntry(context, method, null_request_flags); + errorAppendEntry (context->http->entry, errstate); +} static void -clientProcessExpired(void *data) +clientProcessExpired(clientReplyContext *context) { - clientHttpRequest *http = data; + clientHttpRequest *http = context->http; char *url = http->uri; StoreEntry *entry = NULL; debug(88, 3) ("clientProcessExpired: '%s'\n", http->uri); @@ -67,7 +117,7 @@ * a stale entry *if* it matches client requirements */ if (clientOnlyIfCached(http)) { - clientProcessOnlyIfCachedMiss(http); + clientProcessOnlyIfCachedMiss(context); return; } http->request->flags.refresh = 1; @@ -184,7 +234,8 @@ void clientHandleIMSReply(void *data, char *buf, ssize_t size) { - clientHttpRequest *http = data; + clientReplyContext *context = data; + clientHttpRequest *http = context->http; StoreEntry *entry = http->entry; MemObject *mem; const char *url = storeUrl(entry); @@ -285,9 +336,10 @@ http->old_reqsize = 0; assert(!EBIT_TEST(entry->flags, ENTRY_ABORTED)); - clientSendMoreData(data, http->reqbuf, http->reqsize); + clientSendMoreData(context, http->reqbuf, http->reqsize); } +CSR clientGetMoreData; /* * clientCacheHit should only be called until the HTTP reply headers @@ -299,7 +351,8 @@ void clientCacheHit(void *data, char *buf, ssize_t size) { - clientHttpRequest *http = data; + clientReplyContext *context = data; + clientHttpRequest *http = context->http; StoreEntry *e = http->entry; MemObject *mem; request_t *r = http->request; @@ -317,7 +370,7 @@ http->sc = NULL; storeUnlockObject(e); } - clientProcessMiss(http); + clientProcessMiss(context); return; } assert(size > 0); @@ -331,9 +384,9 @@ * punt to clientProcessMiss. */ if (e->mem_status == IN_MEMORY || e->store_status == STORE_OK) { - clientProcessMiss(http); + clientProcessMiss(context); } else if (size + http->reqofs >= HTTP_REQBUF_SZ && http->out.offset == 0) { - clientProcessMiss(http); + clientProcessMiss(context); } else { debug(88, 3) ("clientCacheHit: waiting for HTTP reply headers\n"); http->reqofs += size; @@ -343,7 +396,7 @@ HTTP_REQBUF_SZ, http->reqbuf + http->reqofs, clientCacheHit, - http); + context); } return; } @@ -371,12 +424,13 @@ * so we only get here once. (it also takes care of cancelling loops) */ debug(88, 2) ("clientProcessHit: Vary detected!\n"); - clientProcessRequest(http); + clientGetMoreData (context->ourNode, http); +// clientProcessRequest(http); return; case VARY_CANCEL: /* varyEvaluateMatch found a object loop. Process as miss */ debug(88, 1) ("clientProcessHit: Vary object loop!\n"); - clientProcessMiss(http); + clientProcessMiss(context); return; } if (r->method == METHOD_PURGE) { @@ -384,12 +438,12 @@ storeUnregister(http->sc, e, http); http->sc = NULL; storeUnlockObject(e); - clientPurgeRequest(http); + clientPurgeRequest(context); return; } if (storeCheckNegativeHit(e)) { http->log_type = LOG_TCP_NEGATIVE_HIT; - clientSendMoreData(data, buf, size); + clientSendMoreData(context, buf, size); } else if (r->method == METHOD_HEAD) { /* * RFC 2068 seems to indicate there is no "conditional HEAD" @@ -398,7 +452,7 @@ */ if (e->mem_status == IN_MEMORY) http->log_type = LOG_TCP_MEM_HIT; - clientSendMoreData(data, buf, size); + clientSendMoreData(context, buf, size); } else if (refreshCheckHTTP(e, r) && !http->flags.internal) { debug(88, 5) ("clientCacheHit: in refreshCheck() block\n"); /* @@ -418,28 +472,28 @@ * we cannot revalidate it. */ http->log_type = LOG_TCP_MISS; - clientProcessMiss(http); + clientProcessMiss(context); } else if (r->flags.nocache) { /* * This did not match a refresh pattern that overrides no-cache * we should honour the client no-cache header. */ http->log_type = LOG_TCP_CLIENT_REFRESH_MISS; - clientProcessMiss(http); + clientProcessMiss(context); } else if (r->protocol == PROTO_HTTP) { /* * Object needs to be revalidated * XXX This could apply to FTP as well, if Last-Modified is known. */ http->log_type = LOG_TCP_REFRESH_MISS; - clientProcessExpired(http); + clientProcessExpired(context); } else { /* * We don't know how to re-validate other protocols. Handle * them as if the object has expired. */ http->log_type = LOG_TCP_MISS; - clientProcessMiss(http); + clientProcessMiss(context); } } else if (r->flags.ims) { /* @@ -450,10 +504,10 @@ debug(88, 4) ("clientCacheHit: Reply code %d != 200\n", mem->reply->sline.status); http->log_type = LOG_TCP_MISS; - clientProcessMiss(http); + clientProcessMiss(context); } else if (modifiedSince(e, http->request)) { http->log_type = LOG_TCP_IMS_HIT; - clientSendMoreData(data, buf, size); + clientSendMoreData(context, buf, size); } else { time_t timestamp = e->timestamp; MemBuf mb = httpPacked304Reply(e->mem_obj->reply); @@ -461,7 +515,7 @@ storeUnregister(http->sc, e, http); http->sc = NULL; storeUnlockObject(e); - e = clientCreateStoreEntry(http, http->request->method, null_request_flags); + e = clientCreateStoreEntry(context, http->request->method, null_request_flags); /* * Copy timestamp from the original entry so the 304 * reply has a meaningful Age: header. @@ -481,7 +535,7 @@ http->log_type = LOG_TCP_MEM_HIT; else if (Config.onoff.offline) http->log_type = LOG_TCP_OFFLINE_HIT; - clientSendMoreData(data, buf, size); + clientSendMoreData(context, buf, size); } } @@ -489,8 +543,9 @@ * Prepare to fetch the object as it's a cache miss of some kind. */ void -clientProcessMiss(clientHttpRequest * http) +clientProcessMiss(clientReplyContext *context) { + clientHttpRequest * http=context->http; char *url = http->uri; request_t *r = http->request; ErrorState *err = NULL; @@ -512,11 +567,11 @@ http->entry = NULL; } if (r->method == METHOD_PURGE) { - clientPurgeRequest(http); + clientPurgeRequest(context); return; } if (clientOnlyIfCached(http)) { - clientProcessOnlyIfCachedMiss(http); + clientProcessOnlyIfCachedMiss(context); return; } /* @@ -526,12 +581,12 @@ http->al.http.code = HTTP_FORBIDDEN; err = clientBuildError(ERR_ACCESS_DENIED, HTTP_FORBIDDEN, NULL, &http->conn->peer.sin_addr, http->request); - http->entry = clientCreateStoreEntry(http, r->method, null_request_flags); + http->entry = clientCreateStoreEntry(context, r->method, null_request_flags); errorAppendEntry(http->entry, err); return; } assert(http->out.offset == 0); - http->entry = clientCreateStoreEntry(http, r->method, r->flags); + http->entry = clientCreateStoreEntry(context, r->method, r->flags); if (http->redirect.status) { HttpReply *rep = httpReplyCreate(); #if LOG_TCP_REDIRECTS @@ -556,8 +611,9 @@ * respond with a 504 (Gateway Timeout) as suggested in [RFC 2068] */ static void -clientProcessOnlyIfCachedMiss(clientHttpRequest * http) +clientProcessOnlyIfCachedMiss(clientReplyContext *context) { + clientHttpRequest * http = context->http; char *url = http->uri; request_t *r = http->request; ErrorState *err = NULL; @@ -571,13 +627,15 @@ http->sc = NULL; storeUnlockObject(http->entry); } - http->entry = clientCreateStoreEntry(http, r->method, null_request_flags); + http->entry = clientCreateStoreEntry(context, r->method, null_request_flags); errorAppendEntry(http->entry, err); } void -clientPurgeRequest(clientHttpRequest * http) +clientPurgeRequest(void *data) { + clientReplyContext *context = data; + clientHttpRequest * http = context->http; StoreEntry *entry; ErrorState *err = NULL; HttpReply *r; @@ -588,7 +646,7 @@ http->log_type = LOG_TCP_DENIED; err = clientBuildError(ERR_ACCESS_DENIED, HTTP_FORBIDDEN, NULL, &http->conn->peer.sin_addr, http->request); - http->entry = clientCreateStoreEntry(http, http->request->method, null_request_flags); + http->entry = clientCreateStoreEntry(context, http->request->method, null_request_flags); errorAppendEntry(http->entry, err); return; } @@ -618,7 +676,7 @@ HTTP_REQBUF_SZ, http->reqbuf, clientCacheHit, - http); + context); return; } } @@ -659,7 +717,7 @@ * Make a new entry to hold the reply to be written * to the client. */ - http->entry = clientCreateStoreEntry(http, http->request->method, null_request_flags); + http->entry = clientCreateStoreEntry(context, http->request->method, null_request_flags); httpReplyReset(r = http->entry->mem_obj->reply); httpBuildVersion(&version, 1, 0); httpReplySetHeaders(r, version, status, NULL, NULL, 0, 0, -1); @@ -761,6 +819,9 @@ * fd is either -1, or an open fd. * * TODO: enumify this + * + * This function is used by any http request sink, to determine the status + * of the object. */ int clientDetermineActionAfterWrite(int fd, clientHttpRequest const *http, StoreEntry *entry, size_t size) @@ -965,8 +1026,133 @@ rep = NULL; } return rep; -} +} + +static log_type +clientIdentifyStoreObject(clientHttpRequest * http) +{ + request_t *r = http->request; + StoreEntry *e; + if (r->flags.cachable || r->flags.internal) + e = http->entry = storeGetPublicByRequest(r); + else + e = http->entry = NULL; + /* Release negatively cached IP-cache entries on reload */ + if (r->flags.nocache) + ipcacheInvalidate(r->host); +#if HTTP_VIOLATIONS + else if (r->flags.nocache_hack) + ipcacheInvalidate(r->host); +#endif +#if USE_CACHE_DIGESTS + http->lookup_type = e ? "HIT" : "MISS"; +#endif + if (NULL == e) { + /* this object isn't in the cache */ + debug(85, 3) ("clientProcessRequest2: storeGet() MISS\n"); + return LOG_TCP_MISS; + } + if (Config.onoff.offline) { + debug(85, 3) ("clientProcessRequest2: offline HIT\n"); + http->entry = e; + return LOG_TCP_HIT; + } + if (http->redirect.status) { + /* force this to be a miss */ + http->entry = NULL; + return LOG_TCP_MISS; + } + if (!storeEntryValidToSend(e)) { + debug(85, 3) ("clientProcessRequest2: !storeEntryValidToSend MISS\n"); + http->entry = NULL; + return LOG_TCP_MISS; + } + if (EBIT_TEST(e->flags, ENTRY_SPECIAL)) { + /* Special entries are always hits, no matter what the client says */ + debug(85, 3) ("clientProcessRequest2: ENTRY_SPECIAL HIT\n"); + http->entry = e; + return LOG_TCP_HIT; + } +#if HTTP_VIOLATIONS + if (e->store_status == STORE_PENDING) { + if (r->flags.nocache || r->flags.nocache_hack) { + debug(85, 3) ("Clearing no-cache for STORE_PENDING request\n\t%s\n", + storeUrl(e)); + r->flags.nocache = 0; + r->flags.nocache_hack = 0; + } + } +#endif + if (r->flags.nocache) { + debug(85, 3) ("clientProcessRequest2: no-cache REFRESH MISS\n"); + http->entry = NULL; + return LOG_TCP_CLIENT_REFRESH_MISS; + } + /* We don't cache any range requests (for now!) -- adrian */ + if (r->flags.range) { + http->entry = NULL; + return LOG_TCP_MISS; + } + debug(85, 3) ("clientProcessRequest2: default HIT\n"); + http->entry = e; + return LOG_TCP_HIT; +} +/* Request more data from the store for the client Stream + * This is *the* entry point to this module. + * + * Preconditions: + * This is the head of the list. + * There is at least one more node. + * data context is not null + */ +void +clientGetMoreData (clientStreamNode *this, clientHttpRequest *http) +{ + clientStreamNode *next; + clientReplyContext *context; + /* Test preconditions */ + assert (this != NULL); + assert (cbdataReferenceValid (this)); + assert (this->data != NULL); + assert (this->node.prev == NULL); + assert (this->node.next != NULL); + + context = this->data; + next = this->node.next->data; + if (!context->ourNode) + context->ourNode = this; /* no cbdatareference, this is only used once, and safely */ + if (!context->flags.storelogiccomplete) { + http->log_type = clientIdentifyStoreObject(http); + /* We still have to do store logic processing - vary, cache hit etc */ + if (context->http->entry != NULL) { + /* someone found the object in the cache for us */ + storeLockObject(context->http->entry); + storeCreateMemObject(context->http->entry, context->http->uri, context->http->log_uri); + context->http->entry->mem_obj->method = context->http->request->method; + context->http->sc = storeClientListAdd(context->http->entry, context->http); +#if DELAY_POOLS + delaySetStoreClient(context->http->sc, delayClient(context->http)); +#endif + assert(context->http->log_type == LOG_TCP_HIT); + context->http->reqofs = 0; + assert (http->out.offset == http->out.size && http->out.offset == 0); + storeClientCopy(http->sc, http->entry, + http->out.offset, + HTTP_REQBUF_SZ, + http->reqbuf, + clientCacheHit, + context); + } else { + /* MISS CASE, http->log_type is already set! */ + clientProcessMiss(context); + } + } + else + storeClientCopy(http->sc, http->entry, next->readoff + context->headers_sz, + next->readlen, next->readbuf, + clientSendMoreData, context); +} /* * accepts chunk of a http message in buf, parses prefix, filters headers and @@ -975,7 +1161,8 @@ void clientSendMoreData(void *data, char *retbuf, ssize_t retsize) { - clientHttpRequest *http = data; + clientReplyContext *context = data; + clientHttpRequest *http = context->http; StoreEntry *entry = http->entry; ConnStateData *conn = http->conn; int fd = conn ? conn->fd : -1; @@ -984,24 +1171,22 @@ const char *body_buf = buf; ssize_t size = http->reqofs + retsize; ssize_t body_size = size; + + /* We've got the final data to start pushing... */ + context->flags.storelogiccomplete = 1; debug(88, 5) ("clientSendMoreData: %s, %d bytes (%d new bytes)\n", http->uri, (int) size, retsize); assert(size <= HTTP_REQBUF_SZ); assert(http->request != NULL); /* ESI TODO: remove this assert once everything is stable */ - assert(http->stream_head && cbdataReferenceValid(http->stream_head)); + assert(http->stream_head.head->data && cbdataReferenceValid(http->stream_head.head->data)); dlinkDelete(&http->active, &ClientActiveRequests); dlinkAdd(http, &http->active, &ClientActiveRequests); debug(88, 5) ("clientSendMoreData: FD %d '%s', out.offset=%ld \n", fd, storeUrl(entry), (long int) http->out.offset); /* update size of the request */ http->reqsize = size; - /* ESI sub-requests go through regardless */ - if (conn && conn->chr != http && !http->flags.esi) { - /* there is another object in progress, defer this one */ - debug(88, 2) ("clientSendMoreData: Deferring %s\n", storeUrl(entry)); - return; - } else if (http->request->flags.reset_tcp) { + if (http->request->flags.reset_tcp) { if (fd != -1) comm_reset_close(fd); return; @@ -1013,7 +1198,7 @@ /* We call into the stream, because we don't know that there is a * client socket! */ - http->stream_head->func (http->stream_head, http, NULL, NULL, 0); + clientStreamCallback (http->stream_head.head->data, http, NULL, NULL, 0); /* clientWriteComplete(fd, NULL, 0, COMM_OK, http); */ return; } @@ -1021,7 +1206,7 @@ * HEAD and may not be true for pipelining. * */ if (http->out.offset != 0) { - http->stream_head->func (http->stream_head, http, NULL, buf, size); + clientStreamCallback (http->stream_head.head->data, http, NULL, buf, size); return; } /* handle headers */ @@ -1044,12 +1229,13 @@ storeUnregister(http->sc, http->entry, http); http->sc = NULL; storeUnlockObject(http->entry); - http->entry = clientCreateStoreEntry(http, http->request->method, + http->entry = clientCreateStoreEntry(context, http->request->method, null_request_flags); errorAppendEntry(http->entry, err); httpReplyDestroy(rep); return; } + context->headers_sz = rep->hdr_sz; body_size = size - rep->hdr_sz; assert(body_size >= 0); body_buf = buf + rep->hdr_sz; @@ -1075,16 +1261,17 @@ storeUnregister(http->sc, http->entry, http); http->sc = NULL; storeUnlockObject(http->entry); - http->entry = clientCreateStoreEntry(http, http->request->method, + http->entry = clientCreateStoreEntry(context, http->request->method, null_request_flags); errorAppendEntry(http->entry, err); httpReplyDestroy(rep); return; } /* ESI TODO: replace me with a true ESI header check */ - if (http->flags.accel && strstr (http->uri, "esi")) { + if (http->flags.accel && rep->sline.status != HTTP_FORBIDDEN && + !clientAlwaysAllowResponse(rep->sline.status) && strstr (http->uri, "esi")) { debug(88, 0) ("Enabling ESI processing for %s\n", http->uri); - clientStreamInsertHead (&http->stream_head, esiProcessStream, NULL); + clientStreamInsertHead (&http->stream_head, esiStreamRead, esiProcessStream, NULL); } } else if (size < HTTP_REQBUF_SZ && entry->store_status == STORE_PENDING) { /* wait for more to arrive */ @@ -1095,7 +1282,7 @@ HTTP_REQBUF_SZ - http->reqofs, http->reqbuf + http->reqofs, clientSendMoreData, - http); + context); return; } if (http->request->method == METHOD_HEAD) { @@ -1117,7 +1304,7 @@ } } assert(rep || (body_buf && body_size)); - http->stream_head->func (http->stream_head, http, rep, body_buf, body_size); + clientStreamCallback (http->stream_head.head->data, http, rep, body_buf, body_size); } int @@ -1145,10 +1332,15 @@ EBIT_TEST(r->cache_control->mask, CC_ONLY_IF_CACHED); } +/* Using this breaks the client layering just a little! + */ StoreEntry * -clientCreateStoreEntry(clientHttpRequest * h, method_t m, request_flags flags) +clientCreateStoreEntry(void *data, method_t m, request_flags flags) { + clientReplyContext *context = data; + clientHttpRequest *h = context->http; StoreEntry *e; + assert (h != NULL); /* * For erroneous requests, we might not have a h->request, * so make a fake one. @@ -1165,8 +1357,17 @@ /* I don't think this is actually needed! -- adrian */ /* h->reqbuf = h->norm_reqbuf; */ assert(h->reqbuf == h->norm_reqbuf); - storeClientCopy(h->sc, e, 0, HTTP_REQBUF_SZ, h->reqbuf, - clientSendMoreData, h); + /* The next line is illegal because we don't know if the client stream + * buffers have been set up + */ +// storeClientCopy(h->sc, e, 0, HTTP_REQBUF_SZ, h->reqbuf, +// clientSendMoreData, context); + /* So, we mark the store logic as complete */ + context->flags.storelogiccomplete = 1; + /* and get the caller to request a read, from whereever they are */ + /* NOTE: after ANY data flows down the pipe, even one step, + * this function CAN NOT be used to manage errors + */ return e; } Index: squid/src/client_side_request.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/Attic/client_side_request.c,v retrieving revision 1.1.2.3 retrieving revision 1.1.2.4 diff -u -r1.1.2.3 -r1.1.2.4 --- squid/src/client_side_request.c 11 Aug 2002 09:51:14 -0000 1.1.2.3 +++ squid/src/client_side_request.c 14 Aug 2002 11:16:06 -0000 1.1.2.4 @@ -1,6 +1,6 @@ /* - * $Id: client_side_request.c,v 1.1.2.3 2002/08/11 09:51:14 rbcollins Exp $ + * $Id: client_side_request.c,v 1.1.2.4 2002/08/14 11:16:06 rbcollins Exp $ * * DEBUG: section 85 Client-side Request Routines * AUTHOR: Robert Collins (Originall Duane Wessels in client_side.c) @@ -33,6 +33,14 @@ * */ + +/* General logic of request processing: + * + * We run a series of tests to determine if access will be permitted, + * and to do any redirection. Then we call into the result clientStream + * to retrieve data. From that point on it's up to reply management. + */ + #include "squid.h" #if LINGERING_CLOSE @@ -50,16 +58,12 @@ static RH clientRedirectDone; static void clientCheckNoCache(clientHttpRequest *); static void clientCheckNoCacheDone(int answer, void *data); -static log_type clientProcessRequest2(clientHttpRequest * http); void clientProcessRequest(clientHttpRequest *); - -/* client reply functions - TODO: put in protos.h? */ -extern STCB clientCacheHit; -extern void clientProcessMiss(clientHttpRequest *); +extern CSR clientGetMoreData; /* Create a request and kick it off */ int /* returns nonzero on failure */ -clientBeginRequest (method_t method, char const *url, CSR *streamfunc, void *streamdata, HttpHeader const *header) +clientBeginRequest (method_t method, char const *url, CSCB *streamcallback, void *streamdata, HttpHeader const *header) { size_t url_sz; http_version_t http_ver = {1,0}; @@ -71,9 +75,8 @@ /* this is only used to adjust the connection offset in client_side.c */ http->req_sz = 0; http->reqbuf = http->norm_reqbuf; - /* stream terminator */ - http->stream_head = NULL; - clientStreamInsertHead (&http->stream_head, streamfunc, streamdata); + /* client stream setup */ + clientStreamInit (&http->stream_head, clientGetMoreData, clientReplyNewContext (http), streamcallback, streamdata); /* make it visible in the 'current acctive requests list' */ dlinkAdd(http, &http->active, &ClientActiveRequests); /* Set flags */ @@ -196,6 +199,7 @@ http->redirect_state = REDIRECT_PENDING; redirectStart(http, clientRedirectDone, http); } else { + clientStreamNode *node = http->stream_head.tail->prev->data; debug(85, 5) ("Access Denied: %s\n", http->uri); debug(85, 5) ("AclMatchedName = %s\n", AclMatchedName ? AclMatchedName : ""); @@ -209,7 +213,7 @@ */ page_id = aclGetDenyInfoPage(&Config.denyInfoList, AclMatchedName); http->log_type = LOG_TCP_DENIED; - http->entry = clientCreateStoreEntry(http, http->request->method, + http->entry = clientCreateStoreEntry(node->data, http->request->method, null_request_flags); if (answer == ACCESS_REQ_PROXY_AUTH || aclIsProxyAuth(AclMatchedName)) { if (!http->flags.accel) { @@ -494,76 +498,6 @@ clientProcessRequest(http); } -static log_type -clientProcessRequest2(clientHttpRequest * http) -{ - request_t *r = http->request; - StoreEntry *e; - if (r->flags.cachable || r->flags.internal) - e = http->entry = storeGetPublicByRequest(r); - else - e = http->entry = NULL; - /* Release negatively cached IP-cache entries on reload */ - if (r->flags.nocache) - ipcacheInvalidate(r->host); -#if HTTP_VIOLATIONS - else if (r->flags.nocache_hack) - ipcacheInvalidate(r->host); -#endif -#if USE_CACHE_DIGESTS - http->lookup_type = e ? "HIT" : "MISS"; -#endif - if (NULL == e) { - /* this object isn't in the cache */ - debug(85, 3) ("clientProcessRequest2: storeGet() MISS\n"); - return LOG_TCP_MISS; - } - if (Config.onoff.offline) { - debug(85, 3) ("clientProcessRequest2: offline HIT\n"); - http->entry = e; - return LOG_TCP_HIT; - } - if (http->redirect.status) { - /* force this to be a miss */ - http->entry = NULL; - return LOG_TCP_MISS; - } - if (!storeEntryValidToSend(e)) { - debug(85, 3) ("clientProcessRequest2: !storeEntryValidToSend MISS\n"); - http->entry = NULL; - return LOG_TCP_MISS; - } - if (EBIT_TEST(e->flags, ENTRY_SPECIAL)) { - /* Special entries are always hits, no matter what the client says */ - debug(85, 3) ("clientProcessRequest2: ENTRY_SPECIAL HIT\n"); - http->entry = e; - return LOG_TCP_HIT; - } -#if HTTP_VIOLATIONS - if (e->store_status == STORE_PENDING) { - if (r->flags.nocache || r->flags.nocache_hack) { - debug(85, 3) ("Clearing no-cache for STORE_PENDING request\n\t%s\n", - storeUrl(e)); - r->flags.nocache = 0; - r->flags.nocache_hack = 0; - } - } -#endif - if (r->flags.nocache) { - debug(85, 3) ("clientProcessRequest2: no-cache REFRESH MISS\n"); - http->entry = NULL; - return LOG_TCP_CLIENT_REFRESH_MISS; - } - /* We don't cache any range requests (for now!) -- adrian */ - if (r->flags.range) { - http->entry = NULL; - return LOG_TCP_MISS; - } - debug(85, 3) ("clientProcessRequest2: default HIT\n"); - http->entry = e; - return LOG_TCP_HIT; -} - void clientProcessRequest(clientHttpRequest * http) { @@ -579,11 +513,13 @@ sslStart(http, &http->out.size, &http->al.http.code); return; } else if (r->method == METHOD_PURGE) { - clientPurgeRequest(http); + clientStreamNode *node = http->stream_head.tail->prev->data; + clientPurgeRequest(node->data); return; } else if (r->method == METHOD_TRACE) { if (r->max_forwards == 0) { - http->entry = clientCreateStoreEntry(http, r->method, null_request_flags); + clientStreamNode *node = http->stream_head.tail->prev->data; + http->entry = clientCreateStoreEntry(node->data, r->method, null_request_flags); storeReleaseRequest(http->entry); storeBuffer(http->entry); rep = httpReplyCreate(); @@ -599,36 +535,16 @@ /* yes, continue */ http->log_type = LOG_TCP_MISS; } else { - http->log_type = clientProcessRequest2(http); + http->log_type = LOG_TAG_NONE; } debug(85, 4) ("clientProcessRequest: %s for '%s'\n", log_tags[http->log_type], http->uri); - /* Here is where we start requesting data - from the cache or from the network - * TODO: Split the function here, and this point on goes to client_side_reply.c - */ - http->out.offset = 0; - if (NULL != http->entry) { - storeLockObject(http->entry); - storeCreateMemObject(http->entry, http->uri, http->log_uri); - http->entry->mem_obj->method = r->method; - http->sc = storeClientListAdd(http->entry, http); -#if DELAY_POOLS - delaySetStoreClient(http->sc, delayClient(http)); -#endif - assert(http->log_type == LOG_TCP_HIT); - http->reqofs = 0; - debug(0, 0) ("clientProcessRequest: Will be checking for ESI flags here\n"); - storeClientCopy(http->sc, http->entry, - http->out.offset, - HTTP_REQBUF_SZ, - http->reqbuf, - clientCacheHit, - http); - } else { - /* MISS CASE, http->log_type is already set! */ - clientProcessMiss(http); - } + /* no one should have touched this */ + assert (http->out.offset == 0); + /* Use the Stream Luke */ + clientStreamRead (http->stream_head.tail->data, http, 0, HTTP_REQBUF_SZ, + http->reqbuf); } /* Widely used utility functions */ Index: squid/src/protos.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/protos.h,v retrieving revision 1.59.2.3 retrieving revision 1.59.2.4 diff -u -r1.59.2.3 -r1.59.2.4 --- squid/src/protos.h 11 Aug 2002 09:51:14 -0000 1.59.2.3 +++ squid/src/protos.h 14 Aug 2002 11:16:06 -0000 1.59.2.4 @@ -1,6 +1,6 @@ /* - * $Id: protos.h,v 1.59.2.3 2002/08/11 09:51:14 rbcollins Exp $ + * $Id: protos.h,v 1.59.2.4 2002/08/14 11:16:06 rbcollins Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -137,26 +137,33 @@ extern void clientAccessCheck(void *); extern void clientAccessCheckDone(int, void *); extern char *clientConstructTraceEcho(clientHttpRequest *); -extern void clientPurgeRequest(clientHttpRequest *); +/* The first parameter is supplied by clientReplyNewContext () */ +extern void clientPurgeRequest(void *); extern void clientOpenListenSockets(void); extern void clientHttpConnectionsClose(void); -extern StoreEntry *clientCreateStoreEntry(clientHttpRequest *, method_t, request_flags); +/* The first parameter is supplied by clientReplyNewContext () */ +extern StoreEntry *clientCreateStoreEntry(void *, method_t, request_flags); extern int isTcpHit(log_type); extern void clientReadBody(request_t * req, char *buf, size_t size, CBCB * callback, void *data); extern int clientAbortBody(request_t * req); +extern void httpRequestFree (void *); /* client_side_request.c - client side request related routines (pure logic) */ extern int clientDetermineActionAfterWrite(int fd, clientHttpRequest const *http, StoreEntry *entry, size_t size); extern STCB clientSendMoreData; extern ErrorState *clientBuildError (err_type, http_status, char const *, struct in_addr *, request_t *); -extern int clientBeginRequest (method_t, char const *, CSR *, void *, HttpHeader const *); +extern int clientBeginRequest (method_t, char const *, CSCB *, void *, HttpHeader const *); /* client_side_reply.c - client side reply related routines (pure logic, no comms) */ extern int clientCheckTransferDone(clientHttpRequest const *); +extern void *clientReplyNewContext (clientHttpRequest *); /* clientStream.c */ -extern void clientStreamInsertHead (clientStreamNode **, CSR *, void *); -extern clientStreamNode *clientStreamNew (CSR *, void *); +extern void clientStreamInit (dlink_list *, CSR *, void *, CSCB *, void *); +extern void clientStreamInsertHead (dlink_list *, CSR *, CSCB *, void *); +extern clientStreamNode *clientStreamNew (CSR *, CSCB *, void *); +extern void clientStreamCallback (clientStreamNode *, clientHttpRequest *, HttpReply *, const char *, ssize_t); +extern void clientStreamRead (clientStreamNode *, clientHttpRequest *, off_t, size_t, char *); extern int commSetNonBlocking(int fd); extern int commUnsetNonBlocking(int fd); @@ -311,8 +318,8 @@ extern void whoisStart(FwdState *); /* ESI.c */ -extern CSR esiProcessStream; -extern CSR esiBufferRecipient; +extern CSR esiStreamRead; +extern CSCB esiProcessStream; /* http.c */ extern int httpCachable(method_t); @@ -1162,6 +1169,7 @@ /* tools.c */ extern void dlinkAdd(void *data, dlink_node *, dlink_list *); +extern void dlinkAddAfter (void *, dlink_node *, dlink_node *, dlink_list *); extern void dlinkAddTail(void *data, dlink_node *, dlink_list *); extern void dlinkDelete(dlink_node * m, dlink_list * list); extern void dlinkNodeDelete(dlink_node * m); Index: squid/src/store_client.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/store_client.c,v retrieving revision 1.13 retrieving revision 1.13.4.1 diff -u -r1.13 -r1.13.4.1 --- squid/src/store_client.c 21 Apr 2002 21:56:26 -0000 1.13 +++ squid/src/store_client.c 14 Aug 2002 11:16:06 -0000 1.13.4.1 @@ -1,6 +1,6 @@ /* - * $Id: store_client.c,v 1.13 2002/04/21 21:56:26 squidadm Exp $ + * $Id: store_client.c,v 1.13.4.1 2002/08/14 11:16:06 rbcollins Exp $ * * DEBUG: section 20 Storage Manager Client-Side Interface * AUTHOR: Duane Wessels @@ -492,9 +492,14 @@ #if STORE_CLIENT_LIST_DEBUG assert(sc == storeClientListSearch(e->mem_obj, data)); #endif +#ifndef SILLY_CODE + assert(sc); +#endif assert(sc->entry == e); +#if SILLY_CODE if (sc == NULL) return 0; +#endif if (sc->callback == NULL) return 0; return 1; Index: squid/src/structs.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/structs.h,v retrieving revision 1.61.2.1 retrieving revision 1.61.2.2 diff -u -r1.61.2.1 -r1.61.2.2 --- squid/src/structs.h 9 Aug 2002 11:49:45 -0000 1.61.2.1 +++ squid/src/structs.h 14 Aug 2002 11:16:06 -0000 1.61.2.2 @@ -1,6 +1,6 @@ /* - * $Id: structs.h,v 1.61.2.1 2002/08/09 11:49:45 rbcollins Exp $ + * $Id: structs.h,v 1.61.2.2 2002/08/14 11:16:06 rbcollins Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -1047,9 +1047,14 @@ }; struct _clientStreamNode { - CSR *func; - void *data; - clientStreamNode *next; + dlink_node node; + dlink_list *head; /* sucks I know, but hey, the interface is limited */ + CSR *readfunc; + CSCB *callback; + void *data; /* Context for the node */ + char *readbuf; /* where *this* node wants its data returned; */ + size_t readlen; /* how much data *this* node can handle */ + off_t readoff; /* where *this* node wants it's data read from in the stream */ }; struct _clientHttpRequest { @@ -1085,8 +1090,16 @@ unsigned int done_copying:1; unsigned int purging:1; unsigned int esi:1; /* Invoke the ESI processor on the body */ + unsigned int deferred:1; /* This is a pipelined request waiting for the + * current object to complete */ } flags; struct { + clientStreamNode *node; + HttpReply *rep; + const char *body_data; + ssize_t body_size; + } deferredparams; + struct { http_status status; char *location; } redirect; @@ -1098,7 +1111,7 @@ int reqsize; /* ESI TODO: build an appropriate management struct and link here */ int remaining_esi_requests; - clientStreamNode *stream_head; + dlink_list stream_head; }; struct _ConnStateData { Index: squid/src/tools.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/tools.c,v retrieving revision 1.24 retrieving revision 1.24.4.1 diff -u -r1.24 -r1.24.4.1 --- squid/src/tools.c 27 Apr 2002 22:56:33 -0000 1.24 +++ squid/src/tools.c 14 Aug 2002 11:16:06 -0000 1.24.4.1 @@ -1,6 +1,6 @@ /* - * $Id: tools.c,v 1.24 2002/04/27 22:56:33 squidadm Exp $ + * $Id: tools.c,v 1.24.4.1 2002/08/14 11:16:06 rbcollins Exp $ * * DEBUG: section 21 Misc Functions * AUTHOR: Harvest Derived @@ -807,6 +807,21 @@ } void +dlinkAddAfter (void *data, dlink_node *m, dlink_node *n, dlink_list *list) +{ + m->data = data; + m->prev = n; + m->next = n->next; + if (n->next) + n->next->prev = m; + else { + assert (list->tail == n); + list->tail = m; + } + n->next = m; +} + +void dlinkAddTail(void *data, dlink_node * m, dlink_list * list) { m->data = data; Index: squid/src/typedefs.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/typedefs.h,v retrieving revision 1.27.2.1 retrieving revision 1.27.2.2 diff -u -r1.27.2.1 -r1.27.2.2 --- squid/src/typedefs.h 9 Aug 2002 11:49:45 -0000 1.27.2.1 +++ squid/src/typedefs.h 14 Aug 2002 11:16:06 -0000 1.27.2.2 @@ -1,6 +1,6 @@ /* - * $Id: typedefs.h,v 1.27.2.1 2002/08/09 11:49:45 rbcollins Exp $ + * $Id: typedefs.h,v 1.27.2.2 2002/08/14 11:16:06 rbcollins Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -199,8 +199,10 @@ #endif /* client_side.c callbacks and callforwards */ -/* client stream recipient */ -typedef void CSR (clientStreamNode *, clientHttpRequest *, HttpReply *, const char *, ssize_t); +/* client stream read callback */ +typedef void CSCB (clientStreamNode *, clientHttpRequest *, HttpReply *, const char *, ssize_t); +/* client stream read */ +typedef void CSR (clientStreamNode *, clientHttpRequest *); typedef void CWCB(int fd, char *, size_t size, int flag, void *data); typedef void CNCB(int fd, int status, void *);