This patch is generated from the tidyup_deferred_reads branch of s2_5 in squid Wed Dec 14 03:13:58 2005 GMT See http://devel.squid-cache.org/ Index: squid/acconfig.h diff -u squid/acconfig.h:1.13.2.4 squid/acconfig.h:1.13.2.4.12.1 --- squid/acconfig.h:1.13.2.4 Wed Jun 9 07:05:51 2004 +++ squid/acconfig.h Thu Nov 3 01:14:53 2005 @@ -402,6 +402,11 @@ */ #undef X_ACCELERATOR_VARY +/* + * Supports epoll + */ +#undef HAVE_EPOLL + @BOTTOM@ #endif /* __CONFIGURE_H__ */ Index: squid/configure.in diff -u squid/configure.in:1.42.2.77 squid/configure.in:1.42.2.59.6.2 --- squid/configure.in:1.42.2.77 Thu Sep 22 19:13:14 2005 +++ squid/configure.in Thu Nov 3 01:14:53 2005 @@ -704,6 +704,25 @@ esac ]) +dnl Enable epoll() +AC_ARG_ENABLE(epoll, +[ --enable-epoll Enable epoll(). epoll() is best where available, but + explicitly set at the moment. + --disable-epoll Disable the use of epoll().], +[ + case "$enableval" in + yes) + echo "Forcing epoll() to be enabled" + ac_cv_func_epoll='yes' + ;; + no) + echo "Forcing epoll() to be disabled" + ac_cv_func_epoll='no' + ;; + esac +]) + + dnl Disable HTTP violations AC_ARG_ENABLE(http-violations, [ --disable-http-violations @@ -1670,6 +1689,21 @@ AC_CHECK_LIB(rt, aio_read) fi +LIB_EPOLL='' +dnl Check for libepoll +if test "$ac_cv_func_epoll" = "yes"; then + AC_CHECK_FUNCS(epoll_ctl,[AC_DEFINE_UNQUOTED(HAVE_EPOLL, 1)], + [AC_CHECK_LIB(epoll, epoll_ctl,[AC_DEFINE_UNQUOTED(HAVE_EPOLL, 1)] LIB_EPOLL="-lepoll", + [echo "Error - no epoll found"; + echo "Try running 'sh ./scripts/get_epoll-lib.sh'"; + echo "then run configure again"; + exit -1], + [-L ./lib] + )] + ) +fi +AC_SUBST(LIB_EPOLL) + dnl -lintl is needed on SCO version 3.2v4.2 for strftime() dnl Robert Side dnl Mon, 18 Jan 1999 17:48:00 GMT Index: squid/scripts/get_epoll-lib.sh diff -u /dev/null squid/scripts/get_epoll-lib.sh:1.1.4.2 --- /dev/null Thu Jan 1 01:00:00 1970 +++ squid/scripts/get_epoll-lib.sh Thu Nov 3 01:14:54 2005 @@ -0,0 +1,76 @@ +#!/bin/bash + +set -e + +EPOLL_URL="http://www.xmailserver.org/linux-patches/epoll-lib-0.11.tar.gz" +EPOLL_FILE="epoll-lib-0.11.tar.gz" +EPOLL_DIR="epoll-lib-0.11" + +KERNELSOURCE=$1 + +if [ "$0" != "./scripts/get_epoll-lib.sh" ] +then + echo + echo "You must run this program from the root squid source directory" + echo "ie /usr/src/squid-2.5.STABLE9" + echo "to run type:" + echo + echo "sh ./scripts/get_epoll-lib.sh" + echo + echo + echo + exit -1; +fi + +if [ ! -f $EPOLL_FILE ] +then + if [ ! -x "`which wget`" ] + then + echo + echo "This script uses wget to download the source file" + echo "Please either install wget, or download libepoll from:" + echo "$EPOLL_URL" + echo + echo + echo + exit -1 + fi + + wget $EPOLL_URL +fi + +if [ ! -d $EPOLL_DIR ] +then + tar -zxvf $EPOLL_FILE +fi + +pushd $EPOLL_DIR + set +e + if [ -z "$KERNELSOURCE" ] + then + make lib/libepoll.a PREFIX=.. + else + make lib/libepoll.a PREFIX=.. KERNELDIR=$KERNELSOURCE + fi + + if [ $? -ne 0 ] + then + echo + echo "epoll make failed" + echo "You may need to run $0 /usr/src/linux-2.6" + echo "(or give the correct path to a 2.6 kernel source)" + echo + popd + exit -1 + fi + + if [ ! -d ../include/sys ] + then + set -e + mkdir ../include/sys + set +e + fi + + make install PREFIX=.. 2>/dev/null +popd + Index: squid/src/Makefile.am diff -u squid/src/Makefile.am:1.13.2.10 squid/src/Makefile.am:1.13.2.9.12.2 --- squid/src/Makefile.am:1.13.2.10 Fri Apr 22 19:15:35 2005 +++ squid/src/Makefile.am Thu Nov 3 01:14:54 2005 @@ -227,6 +227,7 @@ @SNMPLIB@ \ @LIB_MALLOC@ \ @SSLLIB@ \ + @LIB_EPOLL@ \ -lmiscutil \ @XTRA_LIBS@ Index: squid/src/client.c diff -u squid/src/client.c:1.8.6.7 squid/src/client.c:1.8.6.5.14.2 --- squid/src/client.c:1.8.6.7 Wed Mar 30 18:22:59 2005 +++ squid/src/client.c Sun Oct 9 03:18:35 2005 @@ -54,6 +54,7 @@ static struct stat sb; int total_bytes = 0; int io_timeout = 120; +int opt_delayread = 0; static void usage(const char *progname) @@ -63,6 +64,7 @@ "Options:\n" " -P file PUT request.\n" " -a Do NOT include Accept: header.\n" + " -d Delay each read() by one second.\n" " -r Force cache to reload URL.\n" " -s Silent. Do not print data to stdout.\n" " -v Verbose. Print outgoing message to stderr.\n" @@ -128,11 +130,14 @@ url[BUFSIZ - 1] = '\0'; if (url[0] == '-') usage(argv[0]); - while ((c = getopt(argc, argv, "ah:l:P:i:km:p:rsvt:g:p:I:H:T:u:U:w:W:?")) != -1) + while ((c = getopt(argc, argv, "adh:l:P:i:km:p:rsvt:g:p:I:H:T:u:U:w:W:?")) != -1) switch (c) { case 'a': opt_noaccept = 1; break; + case 'd': + opt_delayread = 1; + break; case 'h': /* remote host */ if (optarg != NULL) hostname = optarg; @@ -494,6 +499,8 @@ myread(int fd, void *buf, size_t len) { alarm(io_timeout); + if (opt_delayread == 1) + sleep(1); return read(fd, buf, len); } Index: squid/src/client_side.c diff -u squid/src/client_side.c:1.47.2.70 squid/src/client_side.c:1.47.2.59.4.11 --- squid/src/client_side.c:1.47.2.70 Thu Sep 15 19:13:25 2005 +++ squid/src/client_side.c Wed Oct 26 02:49:59 2005 @@ -142,7 +142,6 @@ static int clientCachable(clientHttpRequest * http); static int clientHierarchical(clientHttpRequest * http); static int clientCheckContentLength(request_t * r); -static DEFER httpAcceptDefer; static log_type clientProcessRequest2(clientHttpRequest * http); static int clientReplyBodyTooLarge(clientHttpRequest *, squid_off_t clen); static int clientRequestBodyTooLarge(squid_off_t clen); @@ -2157,7 +2156,7 @@ ConnStateData *conn = http->conn; StoreEntry *entry; debug(33, 3) ("clientKeepaliveNextRequest: FD %d\n", conn->fd); - conn->defer.until = 0; /* Kick it to read a new request */ + commUnHalfClose(conn->fd); httpRequestFree(http); if ((http = conn->chr) == NULL) { debug(33, 5) ("clientKeepaliveNextRequest: FD %d reading next req\n", @@ -2174,7 +2173,7 @@ * blocking!. */ #ifdef _SQUID_CYGWIN_ - commSetSelect(conn->fd, COMM_SELECT_READ, clientReadRequest, conn, 0); + commSetSelect(conn->fd, COMM_SELECT_READ, clientReadRequest, conn, 0); #else clientReadRequest(conn->fd, conn); /* Read next request */ #endif @@ -2958,7 +2957,7 @@ if (conn->body.size_left && !F->flags.socket_eof) return conn->in.offset >= conn->in.size - 1; else - return conn->defer.until > squid_curtime; + return 0; } static void @@ -2977,6 +2976,11 @@ fde *F = &fd_table[fd]; int len = conn->in.size - conn->in.offset - 1; debug(33, 4) ("clientReadRequest: FD %d: reading request...\n", fd); + if (clientReadDefer(conn->fd, conn)) { + /* We're reading too fast - back off for a second */ + commDeferReadUntil(conn->fd, 1, clientReadRequest, conn); + return; + } commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, conn, 0); if (len == 0) { /* Grow the request memory area to accomodate for a large request */ @@ -3023,10 +3027,7 @@ } /* It might be half-closed, we can't tell */ debug(33, 5) ("clientReadRequest: FD %d closed?\n", fd); - F->flags.socket_eof = 1; - conn->defer.until = squid_curtime + 1; - conn->defer.n++; - fd_note(fd, "half-closed"); + commHalfClose(fd, 1, clientReadRequest, conn); /* There is one more close check at the end, to detect aborted * (partial) requests. At this point we can't tell if the request * is partial. @@ -3050,6 +3051,11 @@ cbdataUnlock(conn); return; } + if (clientReadDefer(conn->fd, conn)) { + /* We're reading too fast - back off for a second */ + commDeferReadUntil(conn->fd, 1, clientReadRequest, conn); + return; + } } /* Process next request */ while (conn->in.offset > 0 && conn->body.size_left == 0) { @@ -3068,7 +3074,7 @@ if (nrequests >= (Config.onoff.pipeline_prefetch ? 2 : 1)) { debug(33, 3) ("clientReadRequest: FD %d max concurrent requests reached\n", fd); debug(33, 5) ("clientReadRequest: FD %d defering new request until one is done\n", fd); - conn->defer.until = squid_curtime + 100; /* Reset when a request is complete */ + commDeferReadUntil(fd, 100, clientReadRequest, conn); break; } conn->in.buf[conn->in.offset] = '\0'; /* Terminate the string */ @@ -3215,7 +3221,7 @@ } if (request->method == METHOD_CONNECT) { /* Stop reading requests... */ - commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); + commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); if (conn->chr == http) clientAccessCheck(http); else { @@ -3458,7 +3464,7 @@ /* * Aha, but we don't want a read handler! */ - commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); + commSetSelect(conn->fd, COMM_SELECT_READ, NULL, NULL, 0); } #else /* @@ -3486,16 +3492,16 @@ } static int -httpAcceptDefer(int fdunused, void *dataunused) +httpCanAccept(int fd, void *dataunused) { static time_t last_warn = 0; if (fdNFree() >= RESERVED_FD) - return 0; + return 1; if (last_warn + 15 < squid_curtime) { debug(33, 0) ("WARNING! Your cache is running out of filedescriptors\n"); last_warn = squid_curtime; } - return 1; + return 0; } /* Handle a new connection on HTTP socket. */ @@ -3512,8 +3518,11 @@ #if USE_IDENT static aclCheck_t identChecklist; #endif - commSetSelect(sock, COMM_SELECT_READ, httpAccept, NULL, 0); - while (max-- && !httpAcceptDefer(sock, NULL)) { + while (max--) { + if (! httpCanAccept(sock, NULL)) { + commDeferReadUntil(sock, 1, httpAccept, NULL); + return; + } memset(&peer, '\0', sizeof(struct sockaddr_in)); memset(&me, '\0', sizeof(struct sockaddr_in)); if ((fd = comm_accept(sock, &peer, &me)) < 0) { @@ -3544,12 +3553,12 @@ if (aclCheckFast(Config.accessList.identLookup, &identChecklist)) identStart(&me, &peer, clientIdentDone, connState); #endif - commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, connState, 0); - commSetDefer(fd, clientReadDefer, connState); + commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, connState, 0); clientdbEstablished(peer.sin_addr, 1); assert(N); (*N)++; } + commSetSelect(sock, COMM_SELECT_READ, httpAccept, NULL, 0); } #if USE_SSL @@ -3590,7 +3599,6 @@ } else { debug(83, 5) ("clientNegotiateSSL: FD %d has no certificate.\n", fd); } - commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, conn, 0); } @@ -3618,8 +3626,11 @@ #if USE_IDENT static aclCheck_t identChecklist; #endif - commSetSelect(sock, COMM_SELECT_READ, httpsAccept, https_port, 0); - while (max-- && !httpAcceptDefer(sock, NULL)) { + while (max--) { + if (! httpCanAccept(sock, NULL)) { + commDeferReadUntil(sock, 1, httpsAccept, NULL); + return; + } memset(&peer, '\0', sizeof(struct sockaddr_in)); memset(&me, '\0', sizeof(struct sockaddr_in)); if ((fd = comm_accept(sock, &peer, &me)) < 0) { @@ -3663,10 +3674,10 @@ identStart(&me, &peer, clientIdentDone, connState); #endif commSetSelect(fd, COMM_SELECT_READ, clientNegotiateSSL, connState, 0); - commSetDefer(fd, clientReadDefer, connState); clientdbEstablished(peer.sin_addr, 1); (*N)++; } + commSetSelect(sock, COMM_SELECT_READ, httpsAccept, https_port, 0); } #endif /* USE_SSL */ @@ -3818,11 +3829,6 @@ continue; comm_listen(fd); commSetSelect(fd, COMM_SELECT_READ, httpAccept, NULL, 0); - /* - * We need to set a defer handler here so that we don't - * peg the CPU with select() when we hit the FD limit. - */ - commSetDefer(fd, httpAcceptDefer, NULL); debug(1, 1) ("Accepting HTTP connections at %s, port %d, FD %d.\n", inet_ntoa(s->s.sin_addr), (int) ntohs(s->s.sin_port), @@ -3859,7 +3865,6 @@ https_port->sslContext = sslCreateContext(s->cert, s->key, s->version, s->cipher, s->options); comm_listen(fd); commSetSelect(fd, COMM_SELECT_READ, httpsAccept, https_port, 0); - commSetDefer(fd, httpAcceptDefer, NULL); debug(1, 1) ("Accepting HTTPS connections at %s, port %d, FD %d.\n", inet_ntoa(s->s.sin_addr), (int) ntohs(s->s.sin_port), Index: squid/src/comm.c diff -u squid/src/comm.c:1.18.6.6 squid/src/comm.c:1.18.6.5.6.7 --- squid/src/comm.c:1.18.6.6 Sat Sep 10 19:13:22 2005 +++ squid/src/comm.c Thu Nov 3 01:14:54 2005 @@ -73,32 +73,32 @@ static int commRetryConnect(ConnectStateData * cs); CBDATA_TYPE(ConnectStateData); -static MemPool *comm_write_pool = NULL; static MemPool *conn_close_pool = NULL; static void CommWriteStateCallbackAndFree(int fd, int code) { - CommWriteStateData *CommWriteState = fd_table[fd].rwstate; - CWCB *callback = NULL; - void *data; - fd_table[fd].rwstate = NULL; - if (CommWriteState == NULL) - return; - if (CommWriteState->free_func) { - FREE *free_func = CommWriteState->free_func; - void *free_buf = CommWriteState->buf; - CommWriteState->free_func = NULL; - CommWriteState->buf = NULL; - free_func(free_buf); - } - callback = CommWriteState->handler; - data = CommWriteState->handler_data; - CommWriteState->handler = NULL; + CommWriteStateData CommWriteState = fd_table[fd].rwstate; + CWCB *callback = CommWriteState.handler; + void *data = CommWriteState.handler_data; + size_t offset = CommWriteState.offset; + FREE *free_func = CommWriteState.free_func; + void *buf = CommWriteState.buf; + + if (callback == NULL) + return; + + fd_table[fd].rwstate.handler = NULL; + + if (free_func != NULL) { + free_func(buf); + buf = NULL; + free_func = NULL; + } + if (callback && cbdataValid(data)) - callback(fd, CommWriteState->buf, CommWriteState->offset, code, data); + callback(fd, buf, offset, code, data); cbdataUnlock(data); - memPoolFree(comm_write_pool, CommWriteState); } /* Return the local port associated with fd. */ @@ -691,14 +691,7 @@ return x; } -void -commSetDefer(int fd, DEFER * func, void *data) -{ - fde *F = &fd_table[fd]; - F->defer_check = func; - F->defer_data = data; -} - +#if !HAVE_EPOLL void commSetSelect(int fd, unsigned int type, PF * handler, void *client_data, time_t timeout) { @@ -719,6 +712,7 @@ if (timeout) F->timeout = squid_curtime + timeout; } +#endif void comm_add_close_handler(int fd, PF * handler, void *data) @@ -869,7 +863,6 @@ * Since Squid_MaxFD can be as high as several thousand, don't waste them */ RESERVED_FD = XMIN(100, Squid_MaxFD / 4); CBDATA_INIT_TYPE(ConnectStateData); - comm_write_pool = memPoolCreate("CommWriteStateData", sizeof(CommWriteStateData)); conn_close_pool = memPoolCreate("close_handler", sizeof(close_handler)); } @@ -938,15 +931,14 @@ void comm_write(int fd, const char *buf, int size, CWCB * handler, void *handler_data, FREE * free_func) { - CommWriteStateData *state = fd_table[fd].rwstate; + CommWriteStateData *state = &fd_table[fd].rwstate; debug(5, 5) ("comm_write: FD %d: sz %d: hndl %p: data %p.\n", fd, size, handler, handler_data); - if (NULL != state) { - debug(5, 1) ("comm_write: fd_table[%d].rwstate != NULL\n", fd); - memPoolFree(comm_write_pool, state); - fd_table[fd].rwstate = NULL; + + if (state->handler != NULL) { + debug(5, 1) ("comm_write: fd_table[%d].rwstate.handler != NULL\n", fd); + fd_table[fd].rwstate.handler = NULL; } - fd_table[fd].rwstate = state = memPoolAlloc(comm_write_pool); state->buf = (char *) buf; state->size = size; state->offset = 0; @@ -1015,3 +1007,47 @@ } } } + +/* + * Read half-close and defer related crack + */ +void +commHalfClose(int fd, int time, PF *handler, void *data) +{ + commDeferReadUntil(fd, time, handler, data); + fd_table[fd].defer.n++; + fd_table[fd].flags.socket_eof = 1; + fd_note(fd, "half-closed"); +} + +void +commDeferReadUntil(int fd, int time, PF *handler, void *data) +{ + fd_table[fd].defer.read_handler = handler; + fd_table[fd].defer.read_data = data; + fd_table[fd].defer.until = squid_curtime + time; + commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); +} + +void +commUnHalfClose(int fd) +{ + fd_table[fd].defer.until = 0; + fd_table[fd].flags.socket_eof = 0; + commSetSelect(fd, COMM_SELECT_READ, fd_table[fd].defer.read_handler, + fd_table[fd].defer.read_data, 0); + fd_table[fd].defer.read_handler = NULL; + fd_table[fd].defer.read_data = NULL; +} + +void +commCheckHalfClose(int fd) +{ + if (fd_table[fd].defer.until == 0) { + return; + } + if (fd_table[fd].defer.until < squid_curtime) { + commUnHalfClose(fd); + return; + } +} Index: squid/src/comm_select.c diff -u squid/src/comm_select.c:1.8.6.6 squid/src/comm_select.c:1.8.6.6.24.6 --- squid/src/comm_select.c:1.8.6.6 Sun May 11 19:14:21 2003 +++ squid/src/comm_select.c Thu Nov 3 19:49:10 2005 @@ -46,30 +46,33 @@ #define FD_MASK_BITS (FD_MASK_BYTES*NBBY) /* STATIC */ -#if !HAVE_POLL +#if HAVE_SELECT static int examine_select(fd_set *, fd_set *); #endif +#if HAVE_POLL || HAVE_SELECT static int fdIsHttp(int fd); static int fdIsIcp(int fd); static int fdIsDns(int fd); -static int commDeferRead(int fd); -static void checkTimeouts(void); static OBJH commIncomingStats; +#endif +static void checkTimeouts(void); #if HAVE_POLL static int comm_check_incoming_poll_handlers(int nfds, int *fds); static void comm_poll_dns_incoming(void); -#else +#elif HAVE_SELECT static int comm_check_incoming_select_handlers(int nfds, int *fds); static void comm_select_dns_incoming(void); #endif -#if !HAVE_POLL +#if HAVE_SELECT static struct timeval zero_tv; #endif +#if HAVE_POLL || HAVE_SELECT static fd_set global_readfds; static fd_set global_writefds; static int nreadfds; static int nwritefds; +#endif /* * Automatic tuning for incoming requests: @@ -122,25 +125,19 @@ #define MAX_INCOMING_INTEGER 256 #define INCOMING_FACTOR 5 #define MAX_INCOMING_INTERVAL (MAX_INCOMING_INTEGER << INCOMING_FACTOR) +#if HAVE_POLL || HAVE_SELECT static int icp_io_events = 0; static int dns_io_events = 0; static int http_io_events = 0; static int incoming_icp_interval = 16 << INCOMING_FACTOR; static int incoming_dns_interval = 16 << INCOMING_FACTOR; static int incoming_http_interval = 16 << INCOMING_FACTOR; +#endif #define commCheckICPIncoming (++icp_io_events > (incoming_icp_interval>> INCOMING_FACTOR)) #define commCheckDNSIncoming (++dns_io_events > (incoming_dns_interval>> INCOMING_FACTOR)) #define commCheckHTTPIncoming (++http_io_events > (incoming_http_interval>> INCOMING_FACTOR)) -static int -commDeferRead(int fd) -{ - fde *F = &fd_table[fd]; - if (F->defer_check == NULL) - return 0; - return F->defer_check(fd, F->defer_data); -} - +#if HAVE_POLL || HAVE_SELECT static int fdIsIcp(int fd) { @@ -169,32 +166,9 @@ } return 0; } - -#if DELAY_POOLS -static int slowfdcnt = 0; -static int slowfdarr[SQUID_MAXFD]; - -static void -commAddSlowFd(int fd) -{ - assert(slowfdcnt < SQUID_MAXFD); - slowfdarr[slowfdcnt++] = fd; -} - -static int -commGetSlowFd(void) -{ - int whichfd, retfd; - - if (!slowfdcnt) - return -1; - whichfd = squid_random() % slowfdcnt; - retfd = slowfdarr[whichfd]; - slowfdarr[whichfd] = slowfdarr[--slowfdcnt]; - return retfd; -} #endif + #if HAVE_POLL static int comm_check_incoming_poll_handlers(int nfds, int *fds) @@ -288,8 +262,6 @@ for (j = 0; j < NHttpSockets; j++) { if (HttpSockets[j] < 0) continue; - if (commDeferRead(HttpSockets[j])) - continue; fds[nfds++] = HttpSockets[j]; } nevents = comm_check_incoming_poll_handlers(nfds, fds); @@ -309,9 +281,6 @@ comm_poll(int msec) { struct pollfd pfds[SQUID_MAXFD]; -#if DELAY_POOLS - fd_set slowfds; -#endif PF *hdl = NULL; int fd; unsigned int i; @@ -331,9 +300,6 @@ #endif /* Handle any fs callbacks that need doing */ storeDirCallback(); -#if DELAY_POOLS - FD_ZERO(&slowfds); -#endif if (commCheckICPIncoming) comm_poll_icp_incoming(); if (commCheckDNSIncoming) @@ -349,21 +315,7 @@ events = 0; /* Check each open socket for a handler. */ if (fd_table[i].read_handler) { - switch (commDeferRead(i)) { - case 0: events |= POLLRDNORM; - break; - case 1: - break; -#if DELAY_POOLS - case -1: - events |= POLLRDNORM; - FD_SET(i, &slowfds); - break; -#endif - default: - fatalf("bad return value from commDeferRead(FD %d)\n", i); - } } if (fd_table[i].write_handler) events |= POLLWRNORM; @@ -436,10 +388,6 @@ debug(5, 6) ("comm_poll: FD %d ready for reading\n", fd); if (NULL == (hdl = F->read_handler)) (void) 0; -#if DELAY_POOLS - else if (FD_ISSET(fd, &slowfds)) - commAddSlowFd(fd); -#endif else { F->read_handler = NULL; hdl(fd, F->read_data); @@ -497,23 +445,6 @@ comm_poll_dns_incoming(); if (callhttp) comm_poll_http_incoming(); -#if DELAY_POOLS - while ((fd = commGetSlowFd()) != -1) { - fde *F = &fd_table[fd]; - debug(5, 6) ("comm_select: slow FD %d selected for reading\n", fd); - if ((hdl = F->read_handler)) { - F->read_handler = NULL; - hdl(fd, F->read_data); - statCounter.select_fds++; - if (commCheckICPIncoming) - comm_poll_icp_incoming(); - if (commCheckDNSIncoming) - comm_poll_dns_incoming(); - if (commCheckHTTPIncoming) - comm_poll_http_incoming(); - } - } -#endif #if !ALARM_UPDATES_TIME getCurrentTime(); statCounter.select_time += (current_dtime - start); @@ -525,7 +456,8 @@ return COMM_TIMEOUT; } -#else +#endif +#if HAVE_SELECT static int comm_check_incoming_select_handlers(int nfds, int *fds) @@ -622,8 +554,6 @@ for (j = 0; j < NHttpSockets; j++) { if (HttpSockets[j] < 0) continue; - if (commDeferRead(HttpSockets[j])) - continue; fds[nfds++] = HttpSockets[j]; } nevents = comm_check_incoming_select_handlers(nfds, fds); @@ -645,9 +575,6 @@ fd_set readfds; fd_set pendingfds; fd_set writefds; -#if DELAY_POOLS - fd_set slowfds; -#endif PF *hdl = NULL; int fd; int maxfd; @@ -704,20 +631,6 @@ continue; /* Found a set bit */ fd = (j * FD_MASK_BITS) + k; - switch (commDeferRead(fd)) { - case 0: - break; - case 1: - FD_CLR(fd, &readfds); - break; -#if DELAY_POOLS - case -1: - FD_SET(fd, &slowfds); - break; -#endif - default: - fatalf("bad return value from commDeferRead(FD %d)\n", fd); - } if (FD_ISSET(fd, &readfds) && fd_table[fd].flags.read_pending) { FD_SET(fd, &pendingfds); pending++; @@ -727,11 +640,7 @@ #if DEBUG_FDBITS for (i = 0; i < maxfd; i++) { /* Check each open socket for a handler. */ -#if DELAY_POOLS - if (fd_table[i].read_handler && commDeferRead(i) != 1) { -#else - if (fd_table[i].read_handler && !commDeferRead(i)) { -#endif + if (fd_table[i].read_handler) { assert(FD_ISSET(i, &readfds)); } if (fd_table[i].write_handler) { @@ -811,10 +720,6 @@ debug(5, 6) ("comm_select: FD %d ready for reading\n", fd); if (NULL == (hdl = F->read_handler)) (void) 0; -#if DELAY_POOLS - else if (FD_ISSET(fd, &slowfds)) - commAddSlowFd(fd); -#endif else { F->read_handler = NULL; commUpdateReadBits(fd, NULL); @@ -879,24 +784,6 @@ comm_select_dns_incoming(); if (callhttp) comm_select_http_incoming(); -#if DELAY_POOLS - while ((fd = commGetSlowFd()) != -1) { - F = &fd_table[fd]; - debug(5, 6) ("comm_select: slow FD %d selected for reading\n", fd); - if ((hdl = F->read_handler)) { - F->read_handler = NULL; - commUpdateReadBits(fd, NULL); - hdl(fd, F->read_data); - statCounter.select_fds++; - if (commCheckICPIncoming) - comm_select_icp_incoming(); - if (commCheckDNSIncoming) - comm_select_dns_incoming(); - if (commCheckHTTPIncoming) - comm_select_http_incoming(); - } - } -#endif #if !ALARM_UPDATES_TIME getCurrentTime(); statCounter.select_time += (current_dtime - start); @@ -907,55 +794,7 @@ debug(5, 8) ("comm_select: time out: %d\n", (int) squid_curtime); return COMM_TIMEOUT; } -#endif - -static void -#if HAVE_POLL -comm_poll_dns_incoming(void) -#else -comm_select_dns_incoming(void) -#endif -{ - int nfds = 0; - int fds[2]; - int nevents; - dns_io_events = 0; - if (DnsSocket < 0) - return; - fds[nfds++] = DnsSocket; -#if HAVE_POLL - nevents = comm_check_incoming_poll_handlers(nfds, fds); -#else - nevents = comm_check_incoming_select_handlers(nfds, fds); -#endif - if (nevents < 0) - return; - incoming_dns_interval += Config.comm_incoming.dns_average - nevents; - if (incoming_dns_interval < Config.comm_incoming.dns_min_poll) - incoming_dns_interval = Config.comm_incoming.dns_min_poll; - if (incoming_dns_interval > MAX_INCOMING_INTERVAL) - incoming_dns_interval = MAX_INCOMING_INTERVAL; - if (nevents > INCOMING_DNS_MAX) - nevents = INCOMING_DNS_MAX; - statHistCount(&statCounter.comm_dns_incoming, nevents); -} -void -comm_select_init(void) -{ -#if !HAVE_POLL - zero_tv.tv_sec = 0; - zero_tv.tv_usec = 0; -#endif - cachemgrRegister("comm_incoming", - "comm_incoming() stats", - commIncomingStats, 0, 1); - FD_ZERO(&global_readfds); - FD_ZERO(&global_writefds); - nreadfds = nwritefds = 0; -} - -#if !HAVE_POLL /* * examine_select - debug routine. * @@ -1023,6 +862,306 @@ } #endif +#if HAVE_POLL || HAVE_SELECT +static void +#if HAVE_POLL +comm_poll_dns_incoming(void) +#else +comm_select_dns_incoming(void) +#endif +{ + int nfds = 0; + int fds[2]; + int nevents; + dns_io_events = 0; + if (DnsSocket < 0) + return; + fds[nfds++] = DnsSocket; +#if HAVE_POLL + nevents = comm_check_incoming_poll_handlers(nfds, fds); +#else + nevents = comm_check_incoming_select_handlers(nfds, fds); +#endif + if (nevents < 0) + return; + incoming_dns_interval += Config.comm_incoming.dns_average - nevents; + if (incoming_dns_interval < Config.comm_incoming.dns_min_poll) + incoming_dns_interval = Config.comm_incoming.dns_min_poll; + if (incoming_dns_interval > MAX_INCOMING_INTERVAL) + incoming_dns_interval = MAX_INCOMING_INTERVAL; + if (nevents > INCOMING_DNS_MAX) + nevents = INCOMING_DNS_MAX; + statHistCount(&statCounter.comm_dns_incoming, nevents); +} +#endif + +#if HAVE_POLL || HAVE_SELECT +void +comm_select_init(void) +{ +#if !HAVE_POLL + zero_tv.tv_sec = 0; + zero_tv.tv_usec = 0; +#endif + cachemgrRegister("comm_incoming", + "comm_incoming() stats", + commIncomingStats, 0, 1); + FD_ZERO(&global_readfds); + FD_ZERO(&global_writefds); + nreadfds = nwritefds = 0; +} +#endif + +#if HAVE_SELECT +#endif + +#if HAVE_EPOLL + +/* Snarfed from Steven Wilton's epoll branch */ + +/* epoll structs */ +static int kdpfd; +static struct epoll_event *pevents; + +/* Array to keep track of backed off filedescriptors */ +static int backoff_fds[FD_SETSIZE]; + +static const char* +epolltype_atoi(int x) +{ + switch(x) { + + case EPOLL_CTL_ADD: + return "EPOLL_CTL_ADD"; + + case EPOLL_CTL_DEL: + return "EPOLL_CTL_DEL"; + + case EPOLL_CTL_MOD: + return "EPOLL_CTL_MOD"; + + default: + return "UNKNOWN_EPOLLCTL_OP"; + } +} + +void +comm_select_init() +{ + int i; + pevents = (struct epoll_event *) xmalloc(SQUID_MAXFD * sizeof(struct epoll_event)); + if (!pevents) { + fatalf("comm_select_init: xmalloc() failed: %s\n",xstrerror()); + } + + kdpfd = epoll_create(SQUID_MAXFD); + + if (kdpfd < 0) { + fatalf("comm_select_init: epoll_create(): %s\n",xstrerror()); + } + + for (i = 0; i < FD_SETSIZE; i++) { + backoff_fds[i]=0; + } +} + +void +commUpdateReadBits(int fd, PF *handler) +{ + /* Not imlpemented */ +} + +void +commUpdateWriteBits(int fd, PF *handler) +{ + /* Not imlpemented */ +} + +void +commSetSelect(int fd, unsigned int type, PF * handler, void *client_data, time_t timeout) + { + fde *F = &fd_table[fd]; + int epoll_ctl_type = 0; + struct epoll_event ev; + + assert(fd >= 0); + assert(F->flags.open); + debug(5, 8) ("commSetSelect(fd=%d,type=%u,handler=%p,client_data=%p,timeout=%ld)\n",fd,type,handler,client_data,timeout); + + ev.events = 0; + ev.data.fd = fd; + + if (type & COMM_SELECT_READ) { + if (handler) { + ev.events |= EPOLLIN; + } + + F->read_handler = handler; + F->read_data = client_data; + + // Otherwise, use previously stored value */ + } else if ((F->epoll_state & EPOLLIN) && (F->read_handler)) { + ev.events |= EPOLLIN; + } + + if (type & COMM_SELECT_WRITE) { + if (handler) { + ev.events |= EPOLLOUT; + } + + F->write_handler = handler; + F->write_data = client_data; + + // Otherwise, use previously stored value + } else if ((F->epoll_state & EPOLLOUT) && (F->write_handler)){ + ev.events |= EPOLLOUT; + } + + if (ev.events) { + ev.events |= EPOLLHUP | EPOLLERR; + } + + /* If the type is 0, force adding the fd to the epoll set */ + if(!(type)) { + F->epoll_state=0; + } + + if (ev.events != F->epoll_state) { + // If the struct is already in epoll MOD or DEL, else ADD + if (F->epoll_state) { + epoll_ctl_type = ev.events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; + } + else + { + epoll_ctl_type = EPOLL_CTL_ADD; + } + + /* Update the state */ + F->epoll_state = ev.events; + + if (epoll_ctl(kdpfd, epoll_ctl_type, fd, &ev) < 0) { + debug(5, 1) ("commSetSelect: epoll_ctl(%s): failed on fd=%d: %s\n", + epolltype_atoi(epoll_ctl_type), fd, xstrerror()); + } + } + + if (timeout) + F->timeout = squid_curtime + timeout; +} + +int comm_epoll(int msec) +{ + struct timespec ts; + static time_t last_timeout = 0; + int i; + int num; + int fd; + fde *F; + PF *hdl; + struct epoll_event *cevents; + double timeout = current_dtime + (msec / 1000.0); + + if (msec > MAX_POLL_TIME) + msec = MAX_POLL_TIME; + + debug(50, 3)("comm_epoll: timeout %d\n", msec); + + do { +#if !ALARM_UPDATES_TIME + double start; + getCurrentTime(); + start = current_dtime; +#endif + ts.tv_sec = msec/1000; + ts.tv_nsec = (msec % 1000) * 1000; + + /* Check timeouts once per second */ + if (last_timeout < squid_curtime) { + last_timeout = squid_curtime; + checkTimeouts(); + } + + /* Check for disk io callbacks */ + storeDirCallback(); + + for (;;) { + statCounter.syscalls.polls++; + num = epoll_wait(kdpfd, pevents, SQUID_MAXFD, msec); + statCounter.select_loops++; + + if (num >= 0) + break; + + if (ignoreErrno(errno)) + break; + + debug(5, 0) ("comm_epoll: epoll failure: %s\n", xstrerror()); + + return COMM_ERROR; + } + + statHistCount(&statCounter.select_fds_hist, num); + + if (num <= 0) + continue; + + for (i = 0, cevents = pevents; i < num; i++, cevents++) { + fd = cevents->data.fd; + F = &fd_table[fd]; + debug(5, 8) ("comm_epoll(): got fd=%d events=%x monitoring=%x F->read_handler=%p F->write_handler=%p\n" + ,fd,cevents->events,F->epoll_state,F->read_handler,F->write_handler); + if (cevents->events & (EPOLLIN|EPOLLHUP|EPOLLERR)) { + if((hdl = F->read_handler) != NULL) { + debug(5, 8) ("comm_epoll(): Calling read handler on fd=%d\n",fd); + F->read_handler = NULL; + hdl(fd, F->read_data); + statCounter.select_fds++; + if((F->read_handler == NULL) && (F->flags.open)) { + commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); + } + } else if(cevents->events & EPOLLIN) { + debug(5, 2) ("comm_epoll(): no read handler for fd=%d",fd); + if(F->flags.open) { + commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); + } + } + } + + if (cevents->events & (EPOLLOUT|EPOLLHUP|EPOLLERR)) { + if((hdl = F->write_handler) != NULL) { + debug(5,8) ("comm_epoll(): Calling write handler on fd=%d\n",fd); + F->write_handler = NULL; + hdl(fd, F->write_data); + statCounter.select_fds++; + if((F->write_handler == NULL) && (F->flags.open)) { + commSetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0); + } + } else if(cevents->events & EPOLLOUT) { + debug(5, 2) ("comm_epoll(): no write handler for fd=%d\n",fd); + if(F->flags.open) { + commSetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0); + } + } + } + } +#if !ALARM_UPDATES_TIME + getCurrentTime(); + statCounter.select_time += (current_dtime - start); +#endif + return COMM_OK; + } + while (timeout > current_dtime); + + debug(5, 8) ("comm_epoll: time out: %ld.\n", (long int) squid_curtime); + return COMM_TIMEOUT; +} +#endif /* HAVE_EPOLL */ + + + + +/* Common code follows */ + static void checkTimeouts(void) { @@ -1033,6 +1172,12 @@ F = &fd_table[fd]; if (!F->flags.open) continue; + /* re-kick read IO if we care about it */ + /* + * Note: this may register an IO and then have it cancelled below. + * It shouldn't be a problem, it just may be inefficient. + */ + commCheckHalfClose(fd); if (F->timeout == 0) continue; if (F->timeout > squid_curtime) @@ -1050,6 +1195,7 @@ } } +#if HAVE_POLL || HAVE_SELECT static void commIncomingStats(StoreEntry * sentry) { @@ -1105,6 +1251,7 @@ nwritefds--; } } +#endif /* Called by async-io or diskd to speed up the polling */ void Index: squid/src/defines.h diff -u squid/src/defines.h:1.15.6.4 squid/src/defines.h:1.15.6.3.30.2 --- squid/src/defines.h:1.15.6.4 Fri Mar 25 19:15:56 2005 +++ squid/src/defines.h Thu Nov 3 19:49:12 2005 @@ -308,4 +308,6 @@ #define FILE_MODE(x) ((x)&(O_RDONLY|O_WRONLY|O_RDWR)) #endif +#define HAVE_SELECT ( (!HAVE_POLL) && (!HAVE_EPOLL) ) + #endif /* SQUID_DEFINES_H */ Index: squid/src/enums.h diff -u squid/src/enums.h:1.29.2.17 squid/src/enums.h:1.29.2.14.10.3 --- squid/src/enums.h:1.29.2.17 Tue Sep 20 17:59:13 2005 +++ squid/src/enums.h Thu Oct 6 20:23:11 2005 @@ -741,4 +741,11 @@ #endif +typedef enum { + STKICK_NONE, + STKICK_WAIT, + STKICK_FETCH, + STKICK_RUN +} store_kick_type_t; + #endif /* SQUID_ENUMS_H */ Index: squid/src/fd.c diff -u squid/src/fd.c:1.7.12.1 squid/src/fd.c:1.7.12.1.18.1 --- squid/src/fd.c:1.7.12.1 Sun Dec 14 19:13:47 2003 +++ squid/src/fd.c Thu Oct 13 02:14:35 2005 @@ -121,6 +121,10 @@ F->flags.open = 1; F->read_method = &default_read_method; F->write_method = &default_write_method; + F->defer.n = 0; + F->defer.until = 0; + F->defer.read_handler = NULL; + F->defer.read_data = NULL; fdUpdateBiggest(fd, 1); if (desc) xstrncpy(F->desc, desc, FD_DESC_SZ); Index: squid/src/forward.c diff -u squid/src/forward.c:1.13.6.15 squid/src/forward.c:1.13.6.13.6.2 --- squid/src/forward.c:1.13.6.15 Thu Sep 1 19:13:43 2005 +++ squid/src/forward.c Wed Oct 12 00:53:33 2005 @@ -658,49 +658,6 @@ peerSelect(r, e, fwdStartComplete, fwdState); } -int -fwdCheckDeferRead(int fd, void *data) -{ - StoreEntry *e = data; - MemObject *mem = e->mem_obj; - int rc = 0; - if (mem == NULL) - return 0; -#if URL_CHECKSUM_DEBUG - assert(e->mem_obj->chksum == url_checksum(e->mem_obj->url)); -#endif -#if DELAY_POOLS - if (fd < 0) - (void) 0; - else if (delayIsNoDelay(fd)) - (void) 0; - else { - int i = delayMostBytesWanted(mem, INT_MAX); - if (0 == i) - return 1; - /* was: rc = -(rc != INT_MAX); */ - else if (INT_MAX == i) - rc = 0; - else - rc = -1; - } -#endif - if (EBIT_TEST(e->flags, ENTRY_FWD_HDR_WAIT)) - return rc; - if (EBIT_TEST(e->flags, RELEASE_REQUEST)) { - /* Just a small safety cap to defer storing more data into the object - * if there already is way too much. This handles the case when there - * is disk clients pending on a too large object being fetched and a - * few other corner cases. - */ - if (mem->inmem_hi - mem->inmem_lo > SM_PAGE_SIZE + Config.Store.maxInMemObjSize + READ_AHEAD_GAP) - return 1; - } - if (mem->inmem_hi - storeLowestMemReaderOffset(e) > READ_AHEAD_GAP) - return 1; - return rc; -} - void fwdFail(FwdState * fwdState, ErrorState * errorState) { Index: squid/src/ftp.c diff -u squid/src/ftp.c:1.18.6.25 squid/src/ftp.c:1.18.6.20.4.6 --- squid/src/ftp.c:1.18.6.25 Sat Sep 10 19:13:22 2005 +++ squid/src/ftp.c Fri Oct 28 03:24:31 2005 @@ -128,6 +128,8 @@ FREE *freefunc; char *host; u_short port; + int pending_read; + store_kick_type_t pending_kick; } data; struct _ftp_flags flags; FwdState *fwd; @@ -274,6 +276,7 @@ ftpState->data.fd = -1; comm_close(fd); } + /* Don't need to call storeServerClearCallback - its done for us */ cbdataFree(ftpState); } @@ -914,6 +917,65 @@ ftpScheduleReadControlReply(ftpState, 1); } +/* + * The ftpScheduleData* functions check the data.fd and only schedule IO if + * we have a data FD. The store side doesn't know that our fd has closed + * from underneath us, only that it wants some more IO. + * The FTP code does close the data fd (and set it to -1) when the + * transfer has completed and this can result in IO 'kicking' when + * we don't have a data FD. + */ +static void +ftpScheduleDataClear(FtpStateData *ftpState) +{ + debug(9, 3) ("ftpScheduleDataClear: FD %d\n", ftpState->data.fd); + ftpState->data.pending_read = 0; + if (ftpState->data.fd == -1) { + return; + } + commSetSelect(ftpState->data.fd, COMM_SELECT_READ, NULL, NULL, 0); +} + +#define ftpScheduleDataRead(a) ftpScheduleDataReadDebug((a), __FILE__, __LINE__) + +static void +ftpScheduleDataReadDebug(FtpStateData *ftpState, const char *file, int line) +{ + debug(9, 3) ("ftpScheduleDataRead: FD %d, from %s:%d\n", ftpState->data.fd, file, line); + if (ftpState->data.fd == -1) { + return; + } + ftpState->data.pending_read = 1; + commSetSelect(ftpState->data.fd, + COMM_SELECT_READ, + ftpDataRead, + ftpState, + Config.Timeout.read); +} + +static void +ftpDataReadMaybe(FtpStateData *ftpState) +{ + if (ftpState->data.pending_kick == STKICK_FETCH || ftpState->data.pending_kick == STKICK_RUN) + ftpScheduleDataRead(ftpState); +} + +void +ftpDataReadKick(StoreEntry *e, void *data, store_kick_type_t type) +{ + FtpStateData *ftpState = (FtpStateData *)data; + ftpState->data.pending_kick = type; + if (type == STKICK_WAIT) { + debug(9, 3) ("ftpDataReadKick: WAITING\n"); + ftpScheduleDataClear(ftpState); + } else if (type == STKICK_RUN || type == STKICK_FETCH) { + debug(9, 3) ("ftpDataReadKick: RUNNING\n"); + ftpScheduleDataRead(ftpState); + } else { + fatal("ftpDataReadKick: unknown IO kick type!\n"); + } +} + static void ftpDataRead(int fd, void *data) { @@ -926,6 +988,7 @@ #if DELAY_POOLS delay_id delay_id; #endif + ftpState->data.pending_read = 0; assert(fd == ftpState->data.fd); if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { comm_close(ftpState->ctrl.fd); @@ -963,11 +1026,8 @@ if (len < 0) { debug(50, ignoreErrno(errno) ? 3 : 1) ("ftpDataRead: read error: %s\n", xstrerror()); if (ignoreErrno(errno)) { - commSetSelect(fd, - COMM_SELECT_READ, - ftpDataRead, - data, - Config.Timeout.read); + assert(fd == ftpState->data.fd); + ftpScheduleDataRead(ftpState); } else { if (!ftpState->flags.http_header_sent && !ftpState->fwd->flags.ftp_pasv_failed && ftpState->flags.pasv_supported) { ftpState->fwd->flags.dont_retry = 0; /* this is a retryable error */ @@ -987,11 +1047,9 @@ ftpState->data.offset = 0; } storeBufferFlush(entry); - commSetSelect(fd, - COMM_SELECT_READ, - ftpDataRead, - data, - Config.Timeout.read); + assert(fd == ftpState->data.fd); + /* XXX this'll be turned into 'mayberead' when everything else is right */ + ftpDataReadMaybe(ftpState); } } @@ -1113,8 +1171,11 @@ ftpState->request = requestLink(request); ftpState->ctrl.fd = fd; ftpState->data.fd = -1; + ftpState->data.pending_read = 0; + ftpState->data.pending_kick = STKICK_RUN; ftpState->size = -1; ftpState->mdtm = -1; + storeServerSetCallback(fwd->entry, ftpDataReadKick, ftpState); if (Config.Ftp.passive && !fwd->flags.ftp_pasv_failed) ftpState->flags.pasv_supported = 1; ftpState->flags.rest_supported = 1; @@ -2176,12 +2237,7 @@ debug(9, 3) ("This is ftpReadList\n"); if (code == 125 || (code == 150 && ftpState->data.host)) { /* Begin data transfer */ - commSetSelect(ftpState->data.fd, - COMM_SELECT_READ, - ftpDataRead, - ftpState, - Config.Timeout.read); - commSetDefer(ftpState->data.fd, fwdCheckDeferRead, ftpState->entry); + ftpScheduleDataRead(ftpState); ftpState->state = READING_DATA; /* * Cancel the timeout on the Control socket and establish one @@ -2229,12 +2285,7 @@ if (code == 125 || (code == 150 && ftpState->data.host)) { /* Begin data transfer */ debug(9, 3) ("ftpReadRetr: reading data channel\n"); - commSetSelect(ftpState->data.fd, - COMM_SELECT_READ, - ftpDataRead, - ftpState, - Config.Timeout.read); - commSetDefer(ftpState->data.fd, fwdCheckDeferRead, ftpState->entry); + ftpScheduleDataRead(ftpState); ftpState->state = READING_DATA; /* * Cancel the timeout on the Control socket and establish one Index: squid/src/gopher.c diff -u squid/src/gopher.c:1.10.6.10 squid/src/gopher.c:1.10.6.8.10.6 --- squid/src/gopher.c:1.10.6.10 Sat Sep 10 19:13:22 2005 +++ squid/src/gopher.c Wed Oct 12 00:52:55 2005 @@ -86,6 +86,8 @@ int fd; request_t *req; FwdState *fwdState; + int pending_read; + store_kick_type_t pending_kick; } GopherStateData; static PF gopherStateFree; @@ -608,6 +610,44 @@ comm_close(fd); } + +static void +gopherScheduleRead(GopherStateData *gopherState) +{ + gopherState->pending_read = 1; + commSetSelect(gopherState->fd, COMM_SELECT_READ, gopherReadReply, gopherState, 0); +} + +static void +gopherClearRead(GopherStateData *gopherState) +{ + gopherState->pending_read = 0; + commSetSelect(gopherState->fd, COMM_SELECT_READ, NULL, NULL, 0); +} + +static void +gopherReadKick(StoreEntry *e, void *data, store_kick_type_t type) +{ + GopherStateData *gopherState = data; + gopherState->pending_kick = type; + if (type == STKICK_WAIT) { + gopherClearRead(gopherState); + } else if (type == STKICK_RUN || type == STKICK_FETCH) { + gopherScheduleRead(gopherState); + } else { + fatal("gopherReadKick: unknown IO kick type!\n"); + } +} + +static void +gopherMaybeScheduleRead(GopherStateData *gopherState) +{ + if (gopherState->pending_kick == STKICK_RUN + || gopherState->pending_kick == STKICK_FETCH) { + gopherScheduleRead(gopherState); + } +} + /* This will be called when data is ready to be read from fd. Read until * error or connection closed. */ static void @@ -627,6 +667,7 @@ comm_close(fd); return; } + gopherState->pending_read = 0; errno = 0; buf = memAllocate(MEM_4K_BUF); read_sz = 4096 - 1; /* leave room for termination */ @@ -655,7 +696,7 @@ if (len < 0) { debug(50, 1) ("gopherReadReply: error reading: %s\n", xstrerror()); if (ignoreErrno(errno)) { - commSetSelect(fd, COMM_SELECT_READ, gopherReadReply, data, 0); + gopherScheduleRead(gopherState); } else { ErrorState *err; err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR); @@ -681,10 +722,7 @@ } else { storeAppend(entry, buf, len); } - commSetSelect(fd, - COMM_SELECT_READ, - gopherReadReply, - data, 0); + gopherMaybeScheduleRead(gopherState); } memFree(buf, MEM_4K_BUF); return; @@ -745,8 +783,7 @@ storeBufferFlush(entry); } /* Schedule read reply. */ - commSetSelect(fd, COMM_SELECT_READ, gopherReadReply, gopherState, 0); - commSetDefer(fd, fwdCheckDeferRead, entry); + gopherScheduleRead(gopherState); if (buf) memFree(buf, MEM_4K_BUF); /* Allocated by gopherSendRequest. */ } @@ -800,6 +837,9 @@ debug(10, 3) ("gopherStart: %s\n", storeUrl(entry)); statCounter.server.all.requests++; statCounter.server.other.requests++; + storeServerSetCallback(fwdState->entry, gopherReadKick, gopherState); + gopherState->pending_kick = STKICK_RUN; + gopherState->pending_read = 0; /* Parse url. */ gopher_request_parse(fwdState->request, &gopherState->type_id, gopherState->request); Index: squid/src/http.c diff -u squid/src/http.c:1.17.6.30 squid/src/http.c:1.17.6.27.4.18 --- squid/src/http.c:1.17.6.30 Sat Sep 10 19:13:22 2005 +++ squid/src/http.c Mon Dec 12 23:16:04 2005 @@ -64,6 +64,7 @@ #endif if (httpState == NULL) return; + storeServerClearCallback(httpState->entry); if (httpState->body_buf) { requestAbortBody(httpState->orig_request); if (httpState->body_buf) { @@ -97,7 +98,7 @@ { HttpStateData *httpState = data; StoreEntry *entry = httpState->entry; - debug(11, 4) ("httpTimeout: FD %d: '%s'\n", fd, storeUrl(entry)); + debug(11, 1) ("httpTimeout: FD %d: '%s'\n", fd, storeUrl(entry)); if (entry->store_status == STORE_PENDING) { fwdFail(httpState->fwd, errorCon(ERR_READ_TIMEOUT, HTTP_GATEWAY_TIMEOUT)); @@ -550,6 +551,51 @@ return 1; } + +/* Schedule read if the store side lets us */ +void +httpMaybeReadSchedule(int fd, HttpStateData *httpState) +{ + assert(fd == httpState->fd); + if (httpState->pending_kick == STKICK_FETCH || httpState->pending_kick == STKICK_RUN) + commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0); +} + +void +httpReadSchedule(int fd, HttpStateData *httpState) +{ + httpState->pending_read = 1; + assert(fd == httpState->fd); + commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0); +} + +void +httpReadClear(int fd, HttpStateData *httpState) +{ + httpState->pending_read = 0; + assert(fd == httpState->fd); + commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); +} + +static void +httpReadKick(StoreEntry *e, void *data, store_kick_type_t type) +{ + HttpStateData *httpState = (HttpStateData *)data; + /* + * For now, we'll just retry scheduling IO. + * This won't be very efficient as we may already /have/ scheduled IO, + * but we can cross that bridge when we know this works. + */ + httpState->pending_kick = type; + if (type == STKICK_WAIT) { + httpReadClear(httpState->fd, httpState); + } else if (type == STKICK_RUN || type == STKICK_FETCH) { + httpReadSchedule(httpState->fd, httpState); + } else { + fatal("httpReadKick: unknown IO kick type!\n"); + } +} + /* This will be called when data is ready to be read from fd. Read until * error or connection closed. */ /* XXX this function is too long! */ @@ -568,6 +614,7 @@ delay_id delay_id; #endif + httpState->pending_read = 0; if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { comm_close(fd); return; @@ -602,7 +649,7 @@ if (len == 0) { /* Continue to read... */ /* Timeout NOT increased. This whitespace was from previous reply */ - commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0); + httpReadSchedule(fd, httpState); return; } } @@ -610,7 +657,7 @@ debug(50, 2) ("httpReadReply: FD %d: read failure: %s.\n", fd, xstrerror()); if (ignoreErrno(errno)) { - commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0); + httpReadSchedule(fd, httpState); } else { ErrorState *err; err = errorCon(ERR_READ_ERROR, HTTP_BAD_GATEWAY); @@ -730,9 +777,8 @@ } if (keep_alive) { /* yes we have to clear all these! */ - commSetDefer(fd, NULL, NULL); commSetTimeout(fd, -1, NULL, NULL); - commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); + httpReadClear(fd, httpState); #if DELAY_POOLS delayClearNoDelay(fd); #endif @@ -755,7 +801,7 @@ } else { commSetTimeout(fd, Config.Timeout.read, NULL, NULL); } - commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0); + httpMaybeReadSchedule(fd, httpState); return; case -1: /* Server is nasty on us. Shut down */ @@ -779,7 +825,6 @@ httpSendComplete(int fd, char *bufnotused, size_t size, int errflag, void *data) { HttpStateData *httpState = data; - StoreEntry *entry = httpState->entry; debug(11, 5) ("httpSendComplete: FD %d: size %d: errflag %d.\n", fd, (int) size, errflag); #if URL_CHECKSUM_DEBUG @@ -809,7 +854,6 @@ * request bodies. */ commSetTimeout(fd, Config.Timeout.read, httpTimeout, httpState); - commSetDefer(fd, fwdCheckDeferRead, entry); } httpState->flags.request_sent = 1; } @@ -1074,7 +1118,9 @@ /* Schedule read reply. (but no timeout set until request fully sent) */ commSetTimeout(fd, Config.Timeout.lifetime, httpTimeout, httpState); - commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0); + /* Allow free-running IO until the store hints at us otherwise */ + httpState->pending_kick = STKICK_RUN; + httpReadSchedule(fd, httpState); if (httpState->orig_request->body_reader) sendHeaderDone = httpSendRequestEntry; @@ -1110,6 +1156,11 @@ comm_write_mbuf(fd, mb, sendHeaderDone, httpState); } +/* + * [ahc] storeServerSetCallback() here is registering the callback after + * the initial storeClientCopy() by the client side. + * Hm! + */ void httpStart(FwdState * fwd) { @@ -1122,6 +1173,7 @@ storeUrl(fwd->entry)); httpState = cbdataAlloc(HttpStateData); storeLockObject(fwd->entry); + storeServerSetCallback(fwd->entry, httpReadKick, httpState); httpState->fwd = fwd; httpState->entry = fwd->entry; httpState->fd = fd; Index: squid/src/main.c diff -u squid/src/main.c:1.28.6.25 squid/src/main.c:1.28.6.19.4.2 --- squid/src/main.c:1.28.6.25 Mon Jun 27 19:16:51 2005 +++ squid/src/main.c Thu Nov 3 01:14:54 2005 @@ -741,7 +741,9 @@ eventRun(); if ((loop_delay = eventNextTime()) < 0) loop_delay = 0; -#if HAVE_POLL +#if HAVE_EPOLL + switch (comm_epoll(loop_delay)) { +#elif HAVE_POLL switch (comm_poll(loop_delay)) { #else switch (comm_select(loop_delay)) { Index: squid/src/protos.h diff -u squid/src/protos.h:1.41.6.33 squid/src/protos.h:1.41.6.22.6.7 --- squid/src/protos.h:1.41.6.33 Thu Sep 15 19:13:25 2005 +++ squid/src/protos.h Thu Nov 3 01:14:54 2005 @@ -172,16 +172,21 @@ extern void comm_write_mbuf(int fd, MemBuf mb, CWCB * handler, void *handler_data); extern void commCallCloseHandlers(int fd); extern int commSetTimeout(int fd, int, PF *, void *); -extern void commSetDefer(int fd, DEFER * func, void *); extern int ignoreErrno(int); extern void commCloseAllSockets(void); +extern void commDeferReadUntil(int fd, int time, PF *handler, void *data); +extern void commHalfClose(int fd, int time, PF *handler, void *data); +extern void commUnHalfClose(int fd); +extern void commCheckHalfClose(int fd); /* * comm_select.c */ extern void comm_select_init(void); -#if HAVE_POLL +#if HAVE_EPOLL +extern int comm_epoll(int); +#elif HAVE_POLL extern int comm_poll(int); #else extern int comm_select(int); @@ -706,7 +711,6 @@ /* forward.c */ extern void fwdStart(int, StoreEntry *, request_t *); -extern DEFER fwdCheckDeferRead; extern void fwdFail(FwdState *, ErrorState *); extern void fwdUnregister(int fd, FwdState *); extern void fwdComplete(FwdState * fwdState); @@ -886,10 +890,15 @@ extern void storeInit(void); extern int storeClientWaiting(const StoreEntry *); extern void storeAbort(StoreEntry *); +extern int storeCheckDefer(StoreEntry *); extern void storeAppend(StoreEntry *, const char *, int); extern void storeLockObject(StoreEntry *); extern void storeRelease(StoreEntry *); extern int storeUnlockObject(StoreEntry *); +extern void storeServerKick(StoreEntry *); +extern void storeServerWait(StoreEntry *); +extern void storeServerSetCallback(StoreEntry *, STKICK *, void *); +extern void storeServerClearCallback(StoreEntry *); extern EVH storeMaintainSwapSpace; extern void storeExpireNow(StoreEntry *); extern void storeReleaseRequest(StoreEntry *); Index: squid/src/squid.h diff -u squid/src/squid.h:1.13.6.8 squid/src/squid.h:1.13.6.7.12.2 --- squid/src/squid.h:1.13.6.8 Fri Mar 25 19:15:58 2005 +++ squid/src/squid.h Thu Nov 3 01:14:54 2005 @@ -252,6 +252,10 @@ #endif /* HAVE_POLL_H */ #endif /* HAVE_POLL */ +#if HAVE_EPOLL +#include +#endif + #if defined(HAVE_STDARG_H) #include #define HAVE_STDARGS /* let's hope that works everywhere (mj) */ Index: squid/src/ssl.c diff -u squid/src/ssl.c:1.13.6.12 squid/src/ssl.c:1.13.6.7.12.4 --- squid/src/ssl.c:1.13.6.12 Thu Sep 1 19:13:43 2005 +++ squid/src/ssl.c Tue Oct 25 01:50:59 2005 @@ -70,9 +70,6 @@ static void sslConnected(int fd, void *); static void sslProxyConnected(int fd, void *); static void sslSetSelect(SslStateData * sslState); -#if DELAY_POOLS -static DEFER sslDeferServerRead; -#endif static void sslAbort(SslStateData * sslState) @@ -130,20 +127,6 @@ cbdataFree(sslState); } -#if DELAY_POOLS -static int -sslDeferServerRead(int fdnotused, void *data) -{ - SslStateData *s = data; - int i = delayBytesWanted(s->delay_id, 0, INT_MAX); - if (i == INT_MAX) - return 0; - if (i == 0) - return 1; - return -1; -} -#endif - static void sslSetSelect(SslStateData * sslState) { @@ -444,9 +427,6 @@ Config.Timeout.read, sslTimeout, sslState); -#if DELAY_POOLS - commSetDefer(sslState->server.fd, sslDeferServerRead, sslState); -#endif } } Index: squid/src/stat.c diff -u squid/src/stat.c:1.13.6.14 squid/src/stat.c:1.13.6.11.6.4 --- squid/src/stat.c:1.13.6.14 Tue Mar 29 18:17:46 2005 +++ squid/src/stat.c Thu Nov 3 01:14:54 2005 @@ -318,7 +318,7 @@ storeUnlockObject(state->sentry); cbdataFree(state); return; - } else if (fwdCheckDeferRead(-1, state->sentry)) { + } else if (storeCheckDefer(state->sentry)) { eventAdd("statObjects", statObjects, state, 0.1, 1); return; } @@ -835,7 +835,7 @@ storeAppendPrintf(sentry, "aborted_requests = %f/sec\n", XAVG(aborted_requests)); -#if HAVE_POLL +#if HAVE_POLL || HAVE_EPOLL storeAppendPrintf(sentry, "syscalls.polls = %f/sec\n", XAVG(syscalls.polls)); #else storeAppendPrintf(sentry, "syscalls.selects = %f/sec\n", XAVG(syscalls.selects)); @@ -1462,7 +1462,7 @@ storeAppendPrintf(s, "\tnrequests: %d\n", conn->nrequests); storeAppendPrintf(s, "\tdefer: n %d, until %ld\n", - conn->defer.n, (long int) conn->defer.until); + fd_table[fd].defer.n, (long int) fd_table[fd].defer.until); } storeAppendPrintf(s, "uri %s\n", http->uri); storeAppendPrintf(s, "log_type %s\n", log_tags[http->log_type]); Index: squid/src/store.c diff -u squid/src/store.c:1.16.6.9 squid/src/store.c:1.16.6.7.10.10 --- squid/src/store.c:1.16.6.9 Thu Sep 1 19:13:43 2005 +++ squid/src/store.c Tue Nov 1 18:33:44 2005 @@ -121,6 +121,8 @@ #endif mem->log_url = xstrdup(log_url); mem->object_sz = -1; + mem->srv.callback = NULL; + mem->srv.data = NULL; /* XXX account log_url */ debug(20, 3) ("new_MemObject: returning %p\n", mem); return mem; @@ -149,6 +151,7 @@ #if URL_CHECKSUM_DEBUG assert(mem->chksum == url_checksum(mem->url)); #endif + storeServerClearCallback(e); e->mem_obj = NULL; if (!shutting_down) assert(mem->swapout.sio == NULL); @@ -513,6 +516,24 @@ e->expires = squid_curtime; } +int +storeCheckDefer(StoreEntry *e) +{ + MemObject *mem = e->mem_obj; + /* A StoreEntry with no MemObject can't defer, can it? */ + if (mem == NULL) + return 0; + if (EBIT_TEST(e->flags, ENTRY_FWD_HDR_WAIT)) + return 0; + if (EBIT_TEST(e->flags, RELEASE_REQUEST)) { + if (mem->inmem_hi - mem->inmem_lo > SM_PAGE_SIZE + Config.Store.maxInMemObjSize + READ_AHEAD_GAP) + return 1; + } + if (mem->inmem_hi - storeLowestMemReaderOffset(e) > READ_AHEAD_GAP) + return 1; + return 0; +} + /* Append incoming data from a primary server to an entry. */ void storeAppend(StoreEntry * e, const char *buf, int len) @@ -531,6 +552,16 @@ } if (EBIT_TEST(e->flags, DELAY_SENDING)) return; + /* + * [ahc] TODO: check if the clients are too far forward: if so, + * we slow the server for a little while + * XXX this doesn't quite yet work with delay pools! + */ + /* fwdCheckDeferRead() not used here because it needs an FD .. */ + if (storeCheckDefer(e)) { + /* We're a little far in IO, so hint to the server side we should stop */ + storeServerWait(e); + } InvokeHandlers(e); storeSwapOut(e); } @@ -1380,3 +1411,111 @@ } } #endif + + +/* Store server feed related stuff */ +/** + * storeServerSetCallback - set a callback to be called whenever + * data is needed from the relevant store server + * @param e StoreEntry, related memObject must be attached + * @param cb callback to call whenever data is needed + * @param data data to pass to callback + * + * data will be cbDataLock()ed at this point and unlocked when + * the memObject is destroyed. + */ +void +storeServerSetCallback(StoreEntry *e, STKICK *cb, void *data) +{ + assert(e); + assert(e->mem_obj); + assert(cb); + debug (98, 3) ("storeServerSetCallback: %.8X: setting %.8X (%.8X)\n", e, cb, data); +// assert(e->mem_obj->srv.callback == NULL); +// assert(e->mem_obj->srv.data == NULL); + if (e->mem_obj->srv.callback != NULL) { + debug (20, 2) ("storeServerSetCallback: have new callback!\n"); + /* + * Since we're not hinting to them anymore, tell the + * about-to-be-replaced server side that it indeed + * wants to start caring about its own IO again + */ + storeServerKick(e); + cbdataUnlock(e->mem_obj->srv.data); + e->mem_obj->srv.callback = NULL; + e->mem_obj->srv.data = NULL; + } + + e->mem_obj->srv.callback = cb; + e->mem_obj->srv.data = data; + cbdataLock(data); +} + + +/** + * storeServerClearCallback - remove the callback from the + * given StoreEntry + * + * @param e StoreEntry, with memObject, to clear the callback from + * + * Now, something to remember. In the 'current-school', reads aren't + * ignored unless the server is too far forward. This does mean that if + * the server data flow finishes then the server socket read handler + * won't know about it, and be able to reuse the connection, until + * the clients have caught up enough. + * + * This needs to be mirrored here. The minute the callback is removed + * the client side /must/ be placed back into the 'current' setup - ie, + * the read handler gets registered so it can handle other server-related + * crap. + * + * So, give this some thought. A third state might be needed to represent + * the way squid currently handles store server IO and we'll need to + * incorporate this into whatever mechanism we choose to 'kick' store + * servers into reading IO again. + */ +void +storeServerClearCallback(StoreEntry *e) +{ + assert(e); + assert(e->mem_obj); +// assert(e->mem_obj->srv.data == data); + cbdataUnlock(e->mem_obj->srv.data); + e->mem_obj->srv.callback = NULL; + e->mem_obj->srv.data = NULL; +} + + +/** + * storeServerKick: called to 'kick' the server side into + * action to feed us some data + * + * @param e StoreEntry to kick for + */ +void +storeServerKick(StoreEntry *e) +{ + debug (98, 4) ("storeServerKick: called: %.8x, %.8x, %.8x\n", e, e->mem_obj, e->mem_obj->srv.callback); + if (e->mem_obj == NULL) + return; + if (e->mem_obj->srv.callback == NULL) + return; + if (cbdataValid(e->mem_obj->srv.data)) { + debug (98, 2) ("storeServerKick: kicking\n"); + e->mem_obj->srv.callback(e, e->mem_obj->srv.data, STKICK_RUN); + } +} + +void +storeServerWait(StoreEntry *e) +{ + debug (98, 4) ("storeServerWait: called: %.8x, %.8x, %.8x\n", e, e->mem_obj, e->mem_obj->srv.callback); + if (e->mem_obj == NULL) + return; + if (e->mem_obj->srv.callback == NULL) + return; + if (cbdataValid(e->mem_obj->srv.data)) { + debug (98, 2) ("storeServerWait: wait\n"); + e->mem_obj->srv.callback(e, e->mem_obj->srv.data, STKICK_WAIT); + } +} Index: squid/src/store_client.c diff -u squid/src/store_client.c:1.9.6.4 squid/src/store_client.c:1.9.6.2.20.4 --- squid/src/store_client.c:1.9.6.4 Tue Apr 19 19:18:33 2005 +++ squid/src/store_client.c Mon Dec 12 23:16:05 2005 @@ -270,6 +270,11 @@ } if (e->store_status == STORE_PENDING && sc->seen_offset >= mem->inmem_hi) { /* client has already seen this, wait for more */ + /* + * [ahc] - this is where we 'kick' the server side + * to hint that it should resume feeding us data + */ + storeServerKick(e); debug(20, 3) ("storeClientCopy3: Waiting for more\n"); return; } @@ -508,6 +513,12 @@ debug(20, 3) ("storeUnregister: called for '%s'\n", storeKeyText(e->hash.key)); if (sc == NULL) return 0; + /* + * We kick the server side just in case we get here and + * the last client has completed but the server side is just STWAITing + * It hopefully deals with any possible timed out persistant connections + */ + storeServerKick(e); if (mem->clients.head == NULL) return 0; dlinkDelete(&sc->node, &mem->clients); Index: squid/src/store_swapout.c diff -u squid/src/store_swapout.c:1.11.2.10 squid/src/store_swapout.c:1.11.2.1.34.2 --- squid/src/store_swapout.c:1.11.2.10 Tue May 10 19:17:58 2005 +++ squid/src/store_swapout.c Wed Oct 12 00:53:33 2005 @@ -225,7 +225,7 @@ * Wait until we are below the disk FD limit, only if the * next server-side read won't be deferred. */ - if (storeTooManyDiskFilesOpen() && !fwdCheckDeferRead(-1, e)) + if (storeTooManyDiskFilesOpen() && !storeCheckDefer(e)) return; } /* Ok, we have stuff to swap out. Is there a swapout.sio open? */ Index: squid/src/structs.h diff -u squid/src/structs.h:1.48.2.43 squid/src/structs.h:1.48.2.34.4.10 --- squid/src/structs.h:1.48.2.43 Sat Sep 3 19:13:28 2005 +++ squid/src/structs.h Thu Nov 3 01:14:54 2005 @@ -763,6 +763,15 @@ int weak; /* true if it is a weak validator */ }; +struct _CommWriteStateData { + char *buf; + size_t size; + size_t offset; + CWCB *handler; + void *handler_data; + FREE *free_func; +}; + struct _fde { unsigned int type; u_short local_port; @@ -788,6 +797,9 @@ squid_off_t bytes_read; squid_off_t bytes_written; int uses; /* ie # req's over persistent conn */ +#if HAVE_EPOLL + unsigned epoll_state; +#endif struct _fde_disk { DWCB *wrt_handle; void *wrt_handle_data; @@ -806,13 +818,19 @@ close_handler *close_handler; /* linked list */ DEFER *defer_check; /* check if we should defer read */ void *defer_data; - CommWriteStateData *rwstate; /* State data for comm_write */ + CommWriteStateData rwstate; /* State data for comm_write */ READ_HANDLER *read_method; WRITE_HANDLER *write_method; #if USE_SSL SSL *ssl; int ssl_shutdown:1; #endif + struct { + int until; + PF *read_handler; + void *read_data; + int n; + } defer; }; struct _fileMap { @@ -987,6 +1005,8 @@ request_t *request; MemBuf reply_hdr; int reply_hdr_state; + int pending_read; + store_kick_type_t pending_kick; peer *peer; /* peer request made to */ int eof; /* reached end-of-object? */ request_t *orig_request; @@ -1128,10 +1148,6 @@ struct in_addr log_addr; char rfc931[USER_IDENT_SZ]; int nrequests; - struct { - int n; - time_t until; - } defer; }; struct _ipcache_addrs { @@ -1514,6 +1530,10 @@ STABH *callback; void *data; } abort; + struct { + STKICK *callback; + void *data; + } srv; char *log_url; RemovalPolicyNode repl; int id; @@ -1706,15 +1726,6 @@ } flags; }; -struct _CommWriteStateData { - char *buf; - size_t size; - size_t offset; - CWCB *handler; - void *handler_data; - FREE *free_func; -}; - struct _ErrorState { err_type type; int page_id; @@ -1858,7 +1869,7 @@ int recvfroms; int sendtos; } sock; -#if HAVE_POLL +#if HAVE_POLL || HAVE_EPOLL int polls; #else int selects; Index: squid/src/typedefs.h diff -u squid/src/typedefs.h:1.25.6.8 squid/src/typedefs.h:1.25.6.6.12.5 --- squid/src/typedefs.h:1.25.6.8 Sat Mar 26 18:16:17 2005 +++ squid/src/typedefs.h Tue Oct 25 21:49:31 2005 @@ -229,6 +229,7 @@ #endif typedef void CWCB(int fd, char *, size_t size, int flag, void *data); +typedef void CRCB(int fd, char *, size_t retlen, int flag, void *data); typedef void CNCB(int fd, int status, void *); typedef void FREE(void *); @@ -386,4 +387,6 @@ typedef struct _external_acl external_acl; typedef struct _external_acl_entry external_acl_entry; +typedef void STKICK(StoreEntry *, void *, store_kick_type_t); + #endif /* SQUID_TYPEDEFS_H */ Index: squid/src/wais.c diff -u squid/src/wais.c:1.8.6.2 squid/src/wais.c:1.8.70.2 --- squid/src/wais.c:1.8.6.2 Sat Sep 10 19:13:23 2005 +++ squid/src/wais.c Wed Oct 12 00:39:16 2005 @@ -43,6 +43,8 @@ char url[MAX_URL]; request_t *request; FwdState *fwd; + int pending_read; + store_kick_type_t pending_kick; } WaisStateData; static PF waisStateFree; @@ -76,6 +78,45 @@ comm_close(fd); } + +static void +waisScheduleRead(WaisStateData *waisState) +{ + waisState->pending_read = 1; + commSetSelect(waisState->fd, COMM_SELECT_READ, waisReadReply, waisState, 0); +} + +static void +waisClearRead(WaisStateData *waisState) +{ + waisState->pending_read = 0; + commSetSelect(waisState->fd, COMM_SELECT_READ, NULL, NULL, 0); +} + +static void +waisReadKick(StoreEntry *e, void *data, store_kick_type_t type) +{ + WaisStateData *waisState = data; + + waisState->pending_kick = type; + if (type == STKICK_RUN || type == STKICK_FETCH) { + waisScheduleRead(data); + } else if (type == STKICK_WAIT) { + waisClearRead(data); + } else { + fatal("waisReadKick: unknown kick type!\n"); + } +} + +static void +waisMaybeScheduleRead(WaisStateData *waisState) +{ + if (waisState->pending_kick == STKICK_RUN || + waisState->pending_kick == STKICK_FETCH) { + waisScheduleRead(waisState); + } +} + /* This will be called when data is ready to be read from fd. Read until * error or connection closed. */ static void @@ -95,6 +136,7 @@ comm_close(fd); return; } + waisState->pending_read = 0; errno = 0; read_sz = 4096; #if DELAY_POOLS @@ -124,8 +166,7 @@ if (ignoreErrno(errno)) { /* reinstall handlers */ /* XXX This may loop forever */ - commSetSelect(fd, COMM_SELECT_READ, - waisReadReply, waisState, 0); + waisScheduleRead(waisState); } else { ErrorState *err; err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR); @@ -143,10 +184,7 @@ comm_close(fd); } else { storeAppend(entry, buf, len); - commSetSelect(fd, - COMM_SELECT_READ, - waisReadReply, - waisState, 0); + waisMaybeScheduleRead(waisState); } } @@ -174,11 +212,7 @@ comm_close(fd); } else { /* Schedule read reply. */ - commSetSelect(fd, - COMM_SELECT_READ, - waisReadReply, - waisState, 0); - commSetDefer(fd, fwdCheckDeferRead, entry); + waisScheduleRead(waisState); } } @@ -225,6 +259,9 @@ waisState->request_hdr = &request->header; waisState->fd = fd; waisState->entry = entry; + waisState->pending_kick = STKICK_RUN; + waisState->pending_read = 0; + storeServerSetCallback(fwd->entry, waisReadKick, waisState); xstrncpy(waisState->url, url, MAX_URL); waisState->request = requestLink(request); waisState->fwd = fwd;