--------------------- PatchSet 6276 Date: 2004/08/17 21:23:02 Author: hno Branch: cerberian Tag: (none) Log: Imported last version from MARA Members: src/helper.c:1.16.2.10.4.2->1.16.2.10.4.3 src/structs.h:1.48.2.20.2.2->1.48.2.20.2.3 Index: squid/src/helper.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/helper.c,v retrieving revision 1.16.2.10.4.2 retrieving revision 1.16.2.10.4.3 diff -u -r1.16.2.10.4.2 -r1.16.2.10.4.3 --- squid/src/helper.c 17 Aug 2004 21:17:13 -0000 1.16.2.10.4.2 +++ squid/src/helper.c 17 Aug 2004 21:23:02 -0000 1.16.2.10.4.3 @@ -1,6 +1,6 @@ /* - * $Id: helper.c,v 1.16.2.10.4.2 2004/08/17 21:17:13 hno Exp $ + * $Id: helper.c,v 1.16.2.10.4.3 2004/08/17 21:23:02 hno Exp $ * * DEBUG: section 84 Helper process maintenance * AUTHOR: Harvest Derived? @@ -192,11 +192,9 @@ srv->index = k; srv->rfd = rfd; srv->wfd = wfd; - /* XXX srv->rbuf should really be a memAllocBuf(), but thisis 2.5.. */ - srv->rbuf = memAllocate(MEM_8K_BUF); - srv->rbuf_sz = 8192; - srv->roffset = 0; - srv->requests = xcalloc(hlp->concurrency ? hlp->concurrency : 1, sizeof(*srv->requests)); + srv->buf = memAllocate(MEM_8K_BUF); + srv->buf_sz = 8192; + srv->offset = 0; srv->parent = hlp; if (hlp->datapool != NULL) srv->data = memPoolAlloc(hlp->datapool); @@ -215,10 +213,6 @@ if (wfd != rfd) commSetNonBlocking(wfd); comm_add_close_handler(rfd, helperStatefulServerFree, srv); - commSetSelect(srv->rfd, - COMM_SELECT_READ, - helperStatefulHandleRead, - srv, 0); } hlp->last_restart = squid_curtime; safe_free(shortname); @@ -262,31 +256,6 @@ if (buf) r->buf = xstrdup(buf); cbdataLock(r->data); -<<<<<<< helper.c - if ((buf != NULL) && lastserver) { - debug(84, 5) ("StatefulSubmit with lastserver %p\n", lastserver); - /* the queue doesn't count for this assert because queued requests - * have already gone through here and been tested. - * It's legal to have deferred_requests == 0 and queue entries - * and status of S_HELPEER_DEFERRED. - * BUT: It's not legal to submit a new request w/lastserver in - * that state. - */ - assert(!(lastserver->deferred_requests == 0 && - lastserver->flags.reserved == S_HELPER_DEFERRED)); - if (lastserver->flags.reserved != S_HELPER_RESERVED) { - lastserver->stats.submits++; - lastserver->deferred_requests--; - } - /* XXX This needs to remember last slot */ - if (!(lastserver->stats.pending)) { - debug(84, 5) ("StatefulSubmit dispatching\n"); - helperStatefulDispatch(lastserver, r); - } else { - debug(84, 5) ("StatefulSubmit queuing\n"); - StatefulServerEnqueue(lastserver, r); - } -======= if (!srv) srv = helperStatefulGetServer(hlp); if (srv) { @@ -294,7 +263,6 @@ assert(!srv->request); assert(!srv->flags.busy); helperStatefulDispatch(srv, r); ->>>>>>> 1.16.2.13 } else { debug(84, 9) ("helperStatefulSubmit: enqueued, buf '%s'.\n", buf ? buf : "NULL"); StatefulEnqueue(hlp, r); @@ -329,15 +297,15 @@ statefulhelper *hlp = srv->parent; helper_stateful_request *r; debug(84, 5) ("helperStatefulReset: %p\n", srv); - int slot = 0; /* XXX Needs to know which slot to reset */ - r = srv->requests[slot]; + r = srv->request; if (r != NULL) { /* reset attempt DURING an outstaning request */ debug(84, 1) ("helperStatefulReset: RESET During request %s \n", hlp->id_name); - srv->stats.pending--; - srv->requests[slot] = NULL; + srv->flags.busy = 0; + srv->offset = 0; helperStatefulRequestFree(r); + srv->request = NULL; } srv->flags.busy = 0; srv->flags.reserved = 0; @@ -424,7 +392,9 @@ void helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp) { + helper_stateful_server *srv; dlink_node *link; + double tt; storeAppendPrintf(sentry, "number running: %d of %d\n", hlp->n_running, hlp->n_to_start); storeAppendPrintf(sentry, "requests sent: %d\n", @@ -436,7 +406,7 @@ storeAppendPrintf(sentry, "avg service time: %d msec\n", hlp->stats.avg_svc_time); storeAppendPrintf(sentry, "\n"); - storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\t%s\n", + storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n", "#", "FD", "PID", @@ -446,23 +416,21 @@ "Offset", "Request"); for (link = hlp->servers.head; link; link = link->next) { - helper_stateful_server *srv = link->data; - double tt = srv->requests[0] ? 0.001 * - tvSubMsec(srv->requests[0]->dispatch_time, current_time) : 0.0; + srv = link->data; + tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time); storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n", srv->index + 1, srv->rfd, srv->pid, srv->stats.uses, srv->flags.alive ? 'A' : ' ', - srv->stats.pending ? 'B' : ' ', + srv->flags.busy ? 'B' : ' ', srv->flags.closing ? 'C' : ' ', srv->flags.reserved ? 'R' : ' ', srv->flags.shutdown ? 'S' : ' ', - srv->requests[0] ? (srv->requests[0]->placeholder ? 'P' : ' ') : ' ', tt < 0.0 ? 0.0 : tt, - (int) srv->roffset, - srv->requests[0] ? log_quote(srv->requests[0]->buf) : "(none)"); + (int) srv->offset, + srv->request ? log_quote(srv->request->buf) : "(none)"); } storeAppendPrintf(sentry, "\nFlags key:\n\n"); storeAppendPrintf(sentry, " A = ALIVE\n"); @@ -523,7 +491,7 @@ continue; } srv->flags.shutdown = 1; /* request it to shut itself down */ - if (srv->stats.pending) { + if (srv->flags.busy) { debug(84, 3) ("helperStatefulShutdown: %s #%d is BUSY.\n", hlp->id_name, srv->index + 1); continue; @@ -646,26 +614,17 @@ helper_stateful_server *srv = data; statefulhelper *hlp = srv->parent; helper_stateful_request *r; - int i, concurrency = hlp->concurrency; - if (!concurrency) - concurrency = 1; assert(srv->rfd == fd); - if (srv->rbuf) { - /* XXX srv->rbuf should really be a memAllocBuf(), but thisis 2.5.. */ - memFree(srv->rbuf, MEM_8K_BUF); - srv->rbuf = NULL; + if (srv->buf) { + memFree(srv->buf, MEM_8K_BUF); + srv->buf = NULL; } - if (!memBufIsNull(&srv->wqueue)) - memBufClean(&srv->wqueue); - for (i = 0; i < concurrency; i++) { - if ((r = srv->requests[i])) { - if (cbdataValid(r->data)) - r->callback(r->data, srv, NULL); - helperStatefulRequestFree(r); - srv->requests[i] = NULL; - } + if ((r = srv->request)) { + if (cbdataValid(r->data)) + r->callback(r->data, srv, srv->buf); + helperStatefulRequestFree(r); + srv->request = NULL; } - safe_free(srv->requests); /* TODO: walk the local queue of requests and carry them all out */ if (srv->wfd != srv->rfd && srv->wfd != -1) comm_close(srv->wfd); @@ -733,7 +692,6 @@ char *msg = srv->rbuf; int i = 0; /* end of reply found */ -<<<<<<< helper.c *t++ = '\0'; debug(84, 3) ("helperHandleRead: end of reply found: %s\n", srv->rbuf); if (hlp->concurrency) { @@ -762,24 +720,6 @@ } if (srv->flags.shutdown && !srv->stats.pending) { comm_close(fd); -======= - debug(84, 3) ("helperHandleRead: end of reply found\n"); - *t = '\0'; - srv->flags.busy = 0; - srv->offset = 0; - srv->request = NULL; - hlp->stats.replies++; - srv->answer_time = current_time; - hlp->stats.avg_svc_time = - intAverage(hlp->stats.avg_svc_time, - tvSubUsec(srv->dispatch_time, current_time), - hlp->stats.replies, REDIRECT_AV_FACTOR); - if (cbdataValid(r->data)) - r->callback(r->data, srv->buf); - helperRequestFree(r); - if (!srv->flags.shutdown) - helperKickQueue(hlp); ->>>>>>> 1.16.2.13 } else { helperKickQueue(hlp); } @@ -791,126 +731,49 @@ int len; char *t = NULL; helper_stateful_server *srv = data; + helper_stateful_request *r; statefulhelper *hlp = srv->parent; assert(fd == srv->rfd); assert(cbdataValid(data)); statCounter.syscalls.sock.reads++; - /* XXX srv->rbuf should really be a memAllocBuf(), but thisis 2.5.. */ - assert(srv->roffset < srv->rbuf_sz); - len = FD_READ_METHOD(fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset); + len = FD_READ_METHOD(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset); fd_bytes(fd, len, FD_READ); debug(84, 5) ("helperStatefulHandleRead: %d bytes from %s #%d.\n", len, hlp->id_name, srv->index + 1); - if (len == 0) { - comm_close(fd); - return; - } - commSetSelect(fd, COMM_SELECT_READ, helperStatefulHandleRead, srv, 0); - if (len < 0) { - if (!ignoreErrno(errno)) { + if (len <= 0) { + if (len < 0) debug(84, 1) ("helperStatefulHandleRead: FD %d read: %s\n", fd, xstrerror()); - comm_close(fd); - } + comm_close(fd); return; } - srv->roffset += len; - srv->rbuf[srv->roffset] = '\0'; - if (!srv->stats.pending) { + srv->offset += len; + srv->buf[srv->offset] = '\0'; + r = srv->request; + if (r == NULL) { /* someone spoke without being spoken to */ debug(84, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes\n", hlp->id_name, srv->index + 1, len); - srv->roffset = 0; - srv->rbuf[srv->roffset] = '\0'; - } - while ((t = strchr(srv->rbuf, '\n'))) { - helper_stateful_request *r; - char *msg = srv->rbuf; - int i = 0; + srv->offset = 0; + } else if ((t = strchr(srv->buf, '\n'))) { + /* end of reply found */ debug(84, 3) ("helperStatefulHandleRead: end of reply found\n"); -<<<<<<< helper.c - *t++ = '\0'; - if (hlp->concurrency) { - i = strtol(msg, &msg, 10); - while (*msg && isspace(*msg)) - msg++; - } - r = srv->requests[i]; - srv->requests[i] = NULL; - if (cbdataValid(r->data)) { - switch ((r->callback(r->data, srv, msg))) { /*if non-zero reserve helper */ - case S_HELPER_UNKNOWN: - fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was recieved.\n"); - break; - case S_HELPER_RELEASE: /* helper finished with */ - if (!srv->deferred_requests && !srv->queue.head) { - srv->flags.reserved = S_HELPER_FREE; - if ((srv->parent->OnEmptyQueue != NULL) && (srv->data)) - srv->parent->OnEmptyQueue(srv->data); - debug(84, 5) ("StatefulHandleRead: releasing %s #%d\n", hlp->id_name, srv->index + 1); - } else { - srv->flags.reserved = S_HELPER_DEFERRED; - debug(84, 5) ("StatefulHandleRead: outstanding deferred requests on %s #%d. reserving for deferred requests.\n", hlp->id_name, srv->index + 1); - } - break; - case S_HELPER_RESERVE: /* 'pin' this helper for the caller */ - if (!srv->queue.head) { - assert(srv->deferred_requests == 0); - srv->flags.reserved = S_HELPER_RESERVED; - debug(84, 5) ("StatefulHandleRead: reserving %s #%d\n", hlp->id_name, srv->index + 1); - } else { - fatal("StatefulHandleRead: Callback routine attempted to reserve a stateful helper with deferred requests. This can lead to deadlock.\n"); - } - break; - case S_HELPER_DEFER: - /* the helper is still needed, but can - * be used for other requests in the meantime. - */ - srv->flags.reserved = S_HELPER_DEFERRED; - srv->deferred_requests++; - srv->stats.deferbycb++; - debug(84, 5) ("StatefulHandleRead: reserving %s #%d for deferred requests.\n", hlp->id_name, srv->index + 1); - break; - default: - fatal("helperStatefulHandleRead: unknown stateful helper callback result.\n"); - } - - } else { - debug(84, 1) ("StatefulHandleRead: no callback data registered\n"); - } - srv->roffset -= (t - srv->rbuf); - memmove(srv->rbuf, t, srv->roffset); - srv->stats.pending--; -======= *t = '\0'; srv->flags.busy = 0; srv->offset = 0; srv->request = NULL; ->>>>>>> 1.16.2.13 hlp->stats.replies++; hlp->stats.avg_svc_time = intAverage(hlp->stats.avg_svc_time, - tvSubMsec(r->dispatch_time, current_time), + tvSubMsec(srv->dispatch_time, current_time), hlp->stats.replies, REDIRECT_AV_FACTOR); -<<<<<<< helper.c - helperStatefulRequestFree(r); - } - if (srv->flags.shutdown - && srv->flags.reserved == S_HELPER_FREE - && !srv->deferred_requests && !srv->stats.pending) { - comm_close(fd); -======= if (cbdataValid(r->data)) { r->callback(r->data, srv, srv->buf); } else { debug(84, 1) ("StatefulHandleRead: no callback data registered\n"); } helperStatefulRequestFree(r); ->>>>>>> 1.16.2.13 } else { - if (srv->queue.head) - helperStatefulServerKickQueue(srv); - else - helperStatefulKickQueue(hlp); + commSetSelect(srv->rfd, COMM_SELECT_READ, helperStatefulHandleRead, srv, 0); } } @@ -1028,39 +891,24 @@ StatefulGetFirstAvailable(statefulhelper * hlp) { dlink_node *n; - helper_stateful_server *srv; - helper_stateful_server *selected = NULL; + helper_stateful_server *srv = NULL; debug(84, 5) ("StatefulGetFirstAvailable: Running servers %d.\n", hlp->n_running); if (hlp->n_running == 0) return NULL; - /* Find "least" loaded helper (approx) */ for (n = hlp->servers.head; n != NULL; n = n->next) { srv = n->data; - if (selected && selected->stats.pending <= srv->stats.pending) + if (srv->flags.busy) continue; if (srv->flags.reserved) continue; if (!srv->flags.alive) continue; - if (srv->flags.shutdown) - continue; if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data))) continue; - if (selected) { - selected = srv; - break; - } - selected = srv; - if (!selected->stats.pending) - break; + return srv; } - /* Check for overload */ - if (!selected) - return NULL; - if (selected->stats.pending >= (hlp->concurrency ? hlp->concurrency : 1)) - return NULL; - - return selected; + debug(84, 5) ("StatefulGetFirstAvailable: None available.\n"); + return NULL; } @@ -1144,31 +992,9 @@ } static void -helperStatefulDispatch_done(int fd, char *buf, size_t size, int status, void *data) -{ - helper_stateful_server *srv = data; - if (status != COMM_OK) { - /* Helper server has crashed.. */ - debug(84, 0)("ERROR: Helper on fd %d has crashed!\n", fd); - } else if (!memBufIsNull(&srv->wqueue)) { - MemBuf mb = srv->wqueue; - srv->wqueue = MemBufNull; - srv->flags.writing = 1; - comm_write_mbuf(srv->wfd, - mb, - helperDispatch_done, /* Handler */ - srv); - } else { - srv->flags.writing = 0; /* done */ - } -} - -static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r) { statefulhelper *hlp = srv->parent; - helper_stateful_request **ptr = NULL; - int slot; if (!cbdataValid(r->data)) { debug(84, 1) ("helperStatefulDispatch: invalid callback data\n"); helperStatefulRequestFree(r); @@ -1181,56 +1007,8 @@ debug(84, 1) ("helperStatefulDispatch: no callback data registered\n"); } helperStatefulRequestFree(r); -<<<<<<< helper.c -#if 0 - /* and push the queue. Note that the callback may have submitted a new - * request to the helper which is why we test for the request*/ - if (srv->request == NULL) { - if (srv->flags.shutdown - && srv->flags.reserved == S_HELPER_FREE - && !srv->deferred_requests) { - int wfd = srv->wfd; - srv->wfd = -1; - comm_close(wfd); - } else { - if (srv->queue.head) - helperStatefulServerKickQueue(srv); - else - helperStatefulKickQueue(hlp); - } - } -#endif -======= ->>>>>>> 1.16.2.13 return; } -<<<<<<< helper.c - for (slot = 0; slot < (hlp->concurrency ? hlp->concurrency : 1); slot++) { - if (!srv->requests[slot]) { - ptr = &srv->requests[slot]; - break; - } - } - assert(ptr); - *ptr = r; - srv->stats.pending++; - r->dispatch_time = current_time; - if (memBufIsNull(&srv->wqueue)) - memBufDefInit(&srv->wqueue); - if (hlp->concurrency) - memBufPrintf(&srv->wqueue, "%d %s", slot, r->buf); - else - memBufAppend(&srv->wqueue, r->buf, strlen(r->buf)); - if (!srv->flags.writing) { - MemBuf mb = srv->wqueue; - srv->wqueue = MemBufNull; - srv->flags.writing = 1; - comm_write_mbuf(srv->wfd, - mb, - helperStatefulDispatch_done, - srv); - } -======= debug(84, 9) ("helperStatefulDispatch busying helper %s #%d\n", hlp->id_name, srv->index + 1); srv->flags.busy = 1; srv->request = r; @@ -1245,7 +1023,6 @@ COMM_SELECT_READ, helperStatefulHandleRead, srv, 0); ->>>>>>> 1.16.2.13 debug(84, 5) ("helperStatefulDispatch: Request sent to %s #%d, %d bytes\n", hlp->id_name, srv->index + 1, (int) strlen(r->buf)); srv->stats.uses++; Index: squid/src/structs.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/structs.h,v retrieving revision 1.48.2.20.2.2 retrieving revision 1.48.2.20.2.3 diff -u -r1.48.2.20.2.2 -r1.48.2.20.2.3 --- squid/src/structs.h 17 Aug 2004 21:17:16 -0000 1.48.2.20.2.2 +++ squid/src/structs.h 17 Aug 2004 21:23:02 -0000 1.48.2.20.2.3 @@ -1,6 +1,6 @@ /* - * $Id: structs.h,v 1.48.2.20.2.2 2004/08/17 21:17:16 hno Exp $ + * $Id: structs.h,v 1.48.2.20.2.3 2004/08/17 21:23:02 hno Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -2086,16 +2086,17 @@ int pid; int rfd; int wfd; - MemBuf wqueue; - char *rbuf; - size_t rbuf_sz; - off_t roffset; + char *buf; + size_t buf_sz; + off_t offset; + struct timeval dispatch_time; + struct timeval answer_time; dlink_node link; statefulhelper *parent; - helper_stateful_request **requests; + helper_stateful_request *request; struct _helper_stateful_flags { - unsigned int writing:1; unsigned int alive:1; + unsigned int busy:1; unsigned int closing:1; unsigned int shutdown:1; unsigned int reserved:1;