This patch is generated from the commloops branch of HEAD in squid Wed Sep 29 01:26:22 2004 GMT See http://devel.squid-cache.org/ Index: squid/acconfig.h diff -u squid/acconfig.h:1.21 squid/acconfig.h:removed --- squid/acconfig.h:1.21 Wed Oct 2 04:11:24 2002 +++ squid/acconfig.h Tue Sep 28 18:30:55 2004 @@ -1,408 +0,0 @@ -/* - * All configurable options are enabled by using --enable-.... - * when running configure. See configure --help for a list - * of all available options. - * - * You are free to edit this file, but it will be overwritten - * each time you run configure. You may need to edit this file - * if configure falsely picks up a library function or structure - * that doesn't really work on your system. - * - * Another way to block a function that should not be detected - * is to - * setenv ac_cv_func_ no - * before running configure, as in - * setenv ac_cv_func_setresuid no - * - * It is possible to enable some of the configurable options - * by editing this file alone, but some of them requires changes - * in the Makefiles, wich is done automatically by configure. - * - */ - -#ifndef __CONFIGURE_H__ -#define __CONFIGURE_H__ -@TOP@ -/* $Id$ */ - -/* - * configure command line used to configure Squid - */ -#undef SQUID_CONFIGURE_OPTIONS - -/********************************* - * START OF CONFIGURABLE OPTIONS * - *********************************/ -/* - * If you are upset that the cachemgr.cgi form comes up with the hostname - * field blank, then define this to getfullhostname() - */ -#undef CACHEMGR_HOSTNAME - -/* - * What default TCP port to use for HTTP listening? - */ -#ifndef CACHE_HTTP_PORT -#undef CACHE_HTTP_PORT -#endif - -/* - * What default UDP port to use for ICP listening? - */ -#ifndef CACHE_ICP_PORT -#undef CACHE_ICP_PORT -#endif - -/* Compile & use the malloc package by Doug Lea] */ -#undef USE_DLMALLOC - -/* Define to do simple malloc debugging */ -#undef XMALLOC_DEBUG - -/* Define for log file trace of mem alloc/free */ -#undef MEM_GEN_TRACE - -/* Define to have malloc statistics */ -#undef XMALLOC_STATISTICS - -/* Define to have a detailed trace of memory allocations */ -#undef XMALLOC_TRACE - -#undef FORW_VIA_DB - -/* Define to enable CPU profiling within Squid */ -#undef USE_XPROF_STATS - -/* Define if you have problems with memPools and want to disable Pools */ -#undef DISABLE_POOLS - -/* Defines how many threads aufs uses for I/O */ -#undef AUFS_IO_THREADS - -/* - * If you want to use Squid's ICMP features (highly recommended!) then - * define this. When USE_ICMP is defined, Squid will send ICMP pings - * to origin server sites. This information is used in numerous ways: - * - Sent in ICP replies so neighbor caches know how close - * you are to the source. - * - For finding the closest instance of a URN. - * - With the 'test_reachability' option. Squid will return - * ICP_OP_MISS_NOFETCH for sites which it cannot ping. - */ -#undef USE_ICMP - -/* - * Traffic management via "delay pools". - */ -#undef DELAY_POOLS - -/* - * If you want to log User-Agent request header values, define this. - * By default, they are written to useragent.log in the Squid log - * directory. - */ -#undef USE_USERAGENT_LOG - -/* - * If you want to log Referer request header values, define this. - * By default, they are written to referer.log in the Squid log - * directory. - */ -#undef USE_REFERER_LOG - -/* - * A dangerous feature which causes Squid to kill its parent process - * (presumably the RunCache script) upon receipt of SIGTERM or SIGINT. - * Use with caution. - */ -#undef KILL_PARENT_OPT - -/* Define to enable SNMP monitoring of Squid */ -#undef SQUID_SNMP - -/* - * Define to enable WCCP - */ -#define USE_WCCP 1 - -/* - * Define this to include code which lets you specify access control - * elements based on ethernet hardware addresses. This code uses - * functions found in 4.4 BSD derviations (e.g. FreeBSD, ?). - */ -#undef USE_ARP_ACL - -/* - * Define this to include code for the Hypertext Cache Protocol (HTCP) - */ -#undef USE_HTCP - -/* - * Use Cache Digests for locating objects in neighbor caches. This - * code is still semi-experimental. - */ -#undef USE_CACHE_DIGESTS - -/* - * Cache Array Routing Protocol - */ -#define USE_CARP 1 - -/* Define if NTLM is allowed to fail gracefully when a helper has problems */ -#undef NTLM_FAIL_OPEN - -/******************************** - * END OF CONFIGURABLE OPTIONS * - ********************************/ - -/* Define if struct tm has tm_gmtoff member */ -#undef HAVE_TM_GMTOFF - -/* Define if struct mallinfo has mxfast member */ -#undef HAVE_EXT_MALLINFO - -/* Default FD_SETSIZE value */ -#undef DEFAULT_FD_SETSIZE - -/* Maximum number of open filedescriptors */ -#undef SQUID_MAXFD - -/* UDP send buffer size */ -#undef SQUID_UDP_SO_SNDBUF - -/* UDP receive buffer size */ -#undef SQUID_UDP_SO_RCVBUF - -/* TCP send buffer size */ -#undef SQUID_TCP_SO_SNDBUF - -/* TCP receive buffer size */ -#undef SQUID_TCP_SO_RCVBUF - -/* Host type from configure */ -#undef CONFIG_HOST_TYPE - -/* If we need to declare sys_errlist[] as external */ -#undef NEED_SYS_ERRLIST - -/* If gettimeofday is known to take only one argument */ -#undef GETTIMEOFDAY_NO_TZP - -/* If libresolv.a has been hacked to export _dns_ttl_ */ -#undef LIBRESOLV_DNS_TTL_HACK - -/* Define if struct ip has ip_hl member */ -#undef HAVE_IP_HL - -/* Define if your compiler supports prototyping */ -#undef HAVE_ANSI_PROTOTYPES - -/* Define if we should use GNU regex */ -#undef USE_GNUREGEX - -/* signed size_t, grr */ -#undef ssize_t - -/* - * Yay! Another Linux brokenness. Its not good enough to know that - * setresuid() exists, because RedHat 5.0 declare setresuid() but - * doesn't implement it. - */ -#undef HAVE_SETRESUID - -/* Define if you have struct rusage */ -#undef HAVE_STRUCT_RUSAGE - -/* - * This makes warnings go away. If you have socklen_t defined in your - * /usr/include files, then this should remain undef'd. Otherwise it - * should be defined to int. - */ -#undef socklen_t - -/* - * By default (for now anyway) Squid includes options which allows - * the cache administrator to violate the HTTP protocol specification - * in terms of cache behaviour. Setting this to '0' will disable - * such code. - */ -#define HTTP_VIOLATIONS 1 - -/* - * Enable support for Transparent Proxy on systems using IP-Filter - * address redirection. This provides "masquerading" support for non - * Linux system. - */ -#undef IPF_TRANSPARENT - -/* - * Enable support for Transparent Proxy on systems using PF address - * redirection. This provides "masquerading" support for OpenBSD. - */ -#undef PF_TRANSPARENT - -/* - * Enable code for assiting in finding memory leaks. Hacker stuff only. - */ -#undef USE_LEAKFINDER - -/* - * type of fd_set array - */ -#undef fd_mask - -/* - * If _res structure has nsaddr_list member - */ -#undef HAVE_RES_NSADDR_LIST - -/* - * If _res structure has ns_list member - */ -#undef HAVE_RES_NS_LIST - -/* - * Compile in support for Ident (RFC 931) lookups? Enabled by default. - */ -#define USE_IDENT 1 - -/* - * If your system has statvfs(), and if it actually works! - */ -#undef HAVE_STATVFS - -/* - * If --disable-internal-dns was given to configure, then we'll use - * the dnsserver processes instead. - */ -#undef USE_DNSSERVERS - -/* - * we check for the existance of struct mallinfo - */ -#undef HAVE_STRUCT_MALLINFO - -/* - * Do we want to use truncate(2) or unlink(2)? - */ -#undef USE_TRUNCATE - -/* - * Allow underscores in host names - */ -#undef ALLOW_HOSTNAME_UNDERSCORES - -/* - * Use the heap-based replacement techniques - */ -#undef HEAP_REPLACEMENT - -/* - * message type for message queues - */ -#undef mtyp_t - -/* - * Define this to include code for SSL encryption. - */ -#undef USE_SSL - -/* - * Define this to make use of the OpenSSL libraries for - * MD5 calculation rather than Squid's own MD5 implementation - * or if building with SSL encryption (USE_SSL) - */ -#undef USE_OPENSSL - -/* Define if you want to set the COSS membuf size */ -#undef COSS_MEMBUF_SZ - -/* Print stacktraces on fatal errors */ -#undef PRINT_STACK_TRACE - -/* - * Define this if unlinkd is required - * (strongly recommended for ufs storage type) - */ -#undef USE_UNLINKD - -/* - * Enable support for Transparent Proxy on Linux 2.4 systems - */ -#undef LINUX_NETFILTER - -/* - * Enable for cbdata debug information - */ -#undef CBDATA_DEBUG - -/* - * Do we have unix sockets? (required for the winbind ntlm helper - */ -#undef HAVE_UNIXSOCKET - -/* - * Known-size integers - */ - -#undef int16_t - -#undef u_int16_t - -#undef int32_t - -#undef u_int32_t - -#undef int64_t - -#undef u_int64_t - -/* The number of bytes in a __int64. */ -#undef SIZEOF___INT64 - -/* The number of bytes in a int16_t. */ -#undef SIZEOF_INT16_T - -/* The number of bytes in a int32_t. */ -#undef SIZEOF_INT32_T - -/* The number of bytes in a int64_t. */ -#undef SIZEOF_INT64_T - -/* The number of bytes in a off_t. */ -#undef SIZEOF_OFF_T - -/* The number of bytes in a size_t. */ -#undef SIZEOF_SIZE_T - -/* The number of bytes in a u_int16_t. */ -#undef SIZEOF_U_INT16_T - -/* The number of bytes in a u_int32_t. */ -#undef SIZEOF_U_INT32_T - -/* The number of bytes in a u_int64_t. */ -#undef SIZEOF_U_INT64_T - -/* The number of bytes in a uint16_t. */ -#undef SIZEOF_UINT16_T - -/* The number of bytes in a uint32_t. */ -#undef SIZEOF_UINT32_T - -/* The number of bytes in a uint64_t. */ -#undef SIZEOF_UINT64_T - -/* - * Enable support for the X-Accelerator-Vary HTTP header - */ -#undef X_ACCELERATOR_VARY - -/* Support for poll/select/etc stuff */ -#undef USE_POLL -#undef USE_SELECT -#undef USE_KQUEUE - -@BOTTOM@ - -#endif /* __CONFIGURE_H__ */ Index: squid/src/Makefile.am diff -u squid/src/Makefile.am:1.29 squid/src/Makefile.am:1.16.10.4 --- squid/src/Makefile.am:1.29 Wed Oct 2 04:10:44 2002 +++ squid/src/Makefile.am Sun Oct 6 00:12:18 2002 @@ -132,6 +132,7 @@ clientStream.c \ clientStream.h \ comm.c \ + comm.h \ comm_select.c \ comm_poll.c \ comm_kqueue.c \ @@ -155,6 +156,7 @@ helper.c \ $(HTCPSOURCE) \ http.c \ + http.h \ HttpStatusLine.c \ HttpHdrCc.c \ HttpHdrRange.c \ Index: squid/src/asn.c diff -u squid/src/asn.c:1.18 squid/src/asn.c:1.16.8.3 --- squid/src/asn.c:1.18 Tue Sep 24 03:59:13 2002 +++ squid/src/asn.c Wed Sep 25 23:08:02 2002 @@ -231,7 +231,7 @@ char *buf = asState->reqbuf; int leftoversz = -1; - debug(53, 3) ("asHandleReply: Called with size=%u\n", result.length); + debug(53, 3) ("asHandleReply: Called with size=%u\n", (unsigned int)result.length); debug(53, 3) ("asHandleReply: buffer='%s'\n", buf); /* First figure out whether we should abort the request */ @@ -243,7 +243,7 @@ asStateFree(asState); return; } else if (result.flags.error) { - debug(53, 1) ("asHandleReply: Called with Error set and size=%u\n", result.length); + debug(53, 1) ("asHandleReply: Called with Error set and size=%u\n", (unsigned int)result.length); asStateFree(asState); return; } else if (HTTP_OK != e->mem_obj->reply->sline.status) { Index: squid/src/cbdata.c diff -u squid/src/cbdata.c:1.16 squid/src/cbdata.c:1.14.22.3 --- squid/src/cbdata.c:1.16 Tue Sep 24 03:59:13 2002 +++ squid/src/cbdata.c Fri Oct 4 06:36:30 2002 @@ -132,7 +132,6 @@ CREATE_CBDATA(helper_server); CREATE_CBDATA(statefulhelper); CREATE_CBDATA(helper_stateful_server); - CREATE_CBDATA(HttpStateData); CREATE_CBDATA_FREE(peer, peerDestroy); CREATE_CBDATA(ps_state); CREATE_CBDATA(RemovalPolicy); Index: squid/src/client_side.c diff -u squid/src/client_side.c:1.77 squid/src/client_side.c:1.52.4.17 --- squid/src/client_side.c:1.77 Thu Oct 3 05:55:26 2002 +++ squid/src/client_side.c Fri Oct 11 21:44:55 2002 @@ -57,6 +57,8 @@ #include "squid.h" #include "clientStream.h" +#include "StoreIOBuffer.h" +#include "comm.h" #include "IPInterception.h" #if LINGERING_CLOSE @@ -90,6 +92,10 @@ struct { int deferred:1; /* This is a pipelined request waiting for the * current object to complete */ + int parsed_ok:1; /* Was this parsed correctly? */ + int mayUseConnection:1; /* This request may use the connection - + * don't read anymore requests for now + */ } flags; struct { clientStreamNode *node; @@ -108,14 +114,14 @@ /* other */ static CWCB clientWriteComplete; static CWCB clientWriteBodyComplete; -static PF clientReadRequest; +static IOCB clientReadRequest; static PF connStateFree; static PF requestTimeout; static PF clientLifetimeTimeout; static void checkFailureRatio(err_type, hier_code); static clientSocketContext *parseHttpRequestAbort(ConnStateData * conn, const char *uri); -static clientSocketContext *parseHttpRequest(ConnStateData *, method_t *, int *, +static clientSocketContext *parseHttpRequest(ConnStateData *, method_t *, char **, size_t *); #if USE_IDENT static IDCB clientIdentDone; @@ -139,7 +145,6 @@ static void clientPrepareLogWithRequestDetails(request_t *, AccessLogEntry *); static void clientLogRequest(clientHttpRequest *); static void httpRequestFreeResources(clientHttpRequest *); -static void connEmptyOSReadBuffers(int fd); static int connIsUsable(ConnStateData * conn); static clientSocketContext *connGetCurrentContext(ConnStateData * conn); static void contextDeferRecipientForLater(clientSocketContext * context, clientStreamNode * node, HttpReply * rep, StoreIOBuffer recievedData); @@ -157,7 +162,7 @@ static char *findTrailingHTTPVersion(char *uriAndHTTPVersion); static void trimTrailingSpaces(char *aString, size_t len); static clientSocketContext *parseURIandHTTPVersion(char **url_p, http_version_t * http_ver_p, ConnStateData * conn); -static void setLogUri(clientHttpRequest * http, char *uri); +static void setLogUri(clientHttpRequest * http, char const *uri); static void prepareInternalUrl(clientHttpRequest * http, char *url); static void prepareForwardProxyUrl(clientHttpRequest * http, char *url); static void prepareAcceleratedUrl(clientHttpRequest * http, char *url, char *req_hdr); @@ -165,12 +170,12 @@ static void connMakeSpaceAvailable(ConnStateData * conn); static void connAddContextToQueue(ConnStateData * conn, clientSocketContext * context); static int connGetConcurrentRequestCount(ConnStateData * conn); -static int connReadWasError(ConnStateData * conn, int size); +static int connReadWasError(ConnStateData * conn, comm_err_t flag, int size); static int connFinishedWithConn(ConnStateData * conn, int size); static void connNoteUseOfBuffer(ConnStateData * conn, int byteCount); static int connKeepReadingIncompleteRequest(ConnStateData * conn); static void connCancelIncompleteRequests(ConnStateData * conn); -static ConnStateData *connStateCreate(struct sockaddr_in peer, struct sockaddr_in me, int fd); +static ConnStateData *connStateCreate(struct sockaddr_in *peer, struct sockaddr_in *me, int fd); static clientStreamNode *getClientReplyContext(clientSocketContext * context); static int connAreAllContextsForThisConnection(ConnStateData * connState); static void connFreeAllContexts(ConnStateData * connState); @@ -188,6 +193,24 @@ return context->http->client_stream.tail->prev->data; } + +/* + * This routine should be called to grow the inbuf and then + * call comm_read(). + */ +void +clientReadSomeData(ConnStateData *conn) +{ + size_t len; + + debug(33, 4) ("clientReadSomeData: FD %d: reading request...\n", conn->fd); + + connMakeSpaceAvailable(conn); + len = connGetAvailableBufferLength(conn) - 1; + + comm_read(conn->fd, conn->in.buf + conn->in.notYetUsed, len, clientReadRequest, conn); +} + void clientSocketRemoveThisFromConnectionList(clientSocketContext * context, ConnStateData * conn) @@ -455,16 +478,6 @@ } } -void -connEmptyOSReadBuffers(int fd) -{ -#ifdef _SQUID_LINUX_ - /* prevent those nasty RST packets */ - char buf[SQUID_TCP_SO_RCVBUF]; - while (FD_READ_METHOD(fd, buf, SQUID_TCP_SO_RCVBUF) > 0); -#endif -} - /* This is a handler normally called by comm_close() */ static void connStateFree(int fd, void *data) @@ -482,7 +495,6 @@ memFreeBuf(connState->in.allocatedSize, connState->in.buf); pconnHistCount(0, connState->nrequests); cbdataFree(connState); - connEmptyOSReadBuffers(fd); } /* @@ -711,21 +723,12 @@ */ commSetTimeout(conn->fd, Config.Timeout.persistent_request, requestTimeout, conn); - /* - * CYGWIN has a problem and is blocking on read() requests when there - * is no data present. - * This hack may hit performance a little, but it's better than - * blocking!. - */ conn->defer.until = 0; /* Kick it to read a new request */ -#ifdef _SQUID_CYGWIN_ - commSetSelect(conn->fd, COMM_SELECT_READ, clientReadRequest, conn, 0); -#else - clientReadRequest(conn->fd, conn); /* Read next request */ -#endif - /* - * Note, the FD may be closed at this point. - */ + /* Make sure we're still reading from the client side! */ + /* XXX this could take a bit of CPU time! aiee! -- adrian */ + assert(comm_has_pending_read(conn->fd)); + /* clientReadSomeData(conn); */ + /* Please don't do anything with the FD past here! */ } void @@ -836,7 +839,7 @@ http->start = current_time; http->req_sz = conn->in.notYetUsed; http->uri = xstrdup(uri); - http->log_uri = xstrndup(uri, MAX_URL); + setLogUri (http, uri); context = clientSocketContextNew(http); tempBuffer.data = context->reqbuf; tempBuffer.length = HTTP_REQBUF_SZ; @@ -955,8 +958,9 @@ } void -setLogUri(clientHttpRequest * http, char *uri) +setLogUri(clientHttpRequest * http, char const *uri) { + safe_free(http->log_uri); if (!stringHasCntl(uri)) http->log_uri = xstrndup(uri, MAX_URL); else @@ -1047,11 +1051,13 @@ * parseHttpRequest() * * Returns - * NULL on error or incomplete request - * a clientHttpRequest structure on success + * NULL on incomplete requests + * a clientSocketContext structure on success or failure. + * Sets result->flags.parsed_ok to 1 if failed to parse the request. + * Sets result->flags.parsed_ok to 0 if we have a good request. */ static clientSocketContext * -parseHttpRequest(ConnStateData * conn, method_t * method_p, int *status, +parseHttpRequest(ConnStateData * conn, method_t * method_p, char **prefix_p, size_t * req_line_sz_p) { char *inbuf = NULL; @@ -1069,11 +1075,9 @@ /* pre-set these values to make aborting simpler */ *prefix_p = NULL; *method_p = METHOD_NONE; - *status = -1; if ((req_sz = headersEnd(conn->in.buf, conn->in.notYetUsed)) == 0) { debug(33, 5) ("Incomplete request, waiting for end of headers\n"); - *status = 0; return NULL; } assert(req_sz <= conn->in.notYetUsed); @@ -1098,7 +1102,6 @@ header_sz = req_sz - (req_hdr - inbuf); if (0 == header_sz) { debug(33, 3) ("parseHttpRequest: header_sz == 0\n"); - *status = 0; xfree(inbuf); return NULL; } @@ -1148,7 +1151,7 @@ setLogUri(http, http->uri); debug(33, 5) ("parseHttpRequest: Complete request received\n"); xfree(inbuf); - *status = 1; + result->flags.parsed_ok = 1; return result; } @@ -1199,8 +1202,12 @@ } int -connReadWasError(ConnStateData * conn, int size) +connReadWasError(ConnStateData * conn, comm_err_t flag, int size) { + if (flag != COMM_OK) { + debug(50, 2) ("connReadWasError: FD %d: got flag %d\n", conn->fd, flag); + return 1; + } if (size < 0) { if (!ignoreErrno(errno)) { debug(50, 2) ("connReadWasError: FD %d: %s\n", conn->fd, xstrerror()); @@ -1269,24 +1276,196 @@ } static void -clientReadRequest(int fd, void *data) +clientMaybeReadData(ConnStateData *conn, int do_next_read) { - ConnStateData *conn = data; - int parser_return_code = 0; + if (do_next_read) { + conn->flags.readMoreRequests = 1; + clientReadSomeData(conn); + } +} + +static void +clientAfterReadingRequests(int fd, ConnStateData *conn, int do_next_read) +{ + fde *F = &fd_table[fd]; + + /* Check if a half-closed connection was aborted in the middle */ + if (F->flags.socket_eof) { + if (conn->in.notYetUsed != conn->body.size_left) { /* != 0 when no request body */ + /* Partial request received. Abort client connection! */ + debug(33, 3) ("clientReadRequest: FD %d aborted, partial request\n", + fd); + comm_close(fd); + return; + } + } + + clientMaybeReadData (conn, do_next_read); +} + +static void +clientProcessRequest(ConnStateData *conn, clientSocketContext *context, method_t method, char *prefix, size_t req_line_sz) +{ + clientHttpRequest *http = context->http; request_t *request = NULL; - int size; + /* We have an initial client stream in place should it be needed */ + /* setup our private context */ + connNoteUseOfBuffer(conn, http->req_sz); + + connAddContextToQueue(conn, context); + + if (context->flags.parsed_ok == 0) { + clientStreamNode *node = getClientReplyContext(context); + debug(33, 1) ("clientReadRequest: Invalid Request\n"); + clientSetReplyToError(node->data, + ERR_INVALID_REQ, HTTP_BAD_REQUEST, method, NULL, + &conn->peer.sin_addr, NULL, conn->in.buf, NULL); + assert(context->http->out.offset == 0); + clientPullData(context); + conn->flags.readMoreRequests = 0; + return; + } + + if ((request = urlParse(method, http->uri)) == NULL) { + clientStreamNode *node = getClientReplyContext(context); + debug(33, 5) ("Invalid URL: %s\n", http->uri); + clientSetReplyToError(node->data, + ERR_INVALID_URL, HTTP_BAD_REQUEST, method, http->uri, + &conn->peer.sin_addr, NULL, NULL, NULL); + assert(context->http->out.offset == 0); + clientPullData(context); + conn->flags.readMoreRequests = 0; + return; + } else { + /* compile headers */ + /* we should skip request line! */ + if (!httpRequestParseHeader(request, prefix + req_line_sz)) + debug(33, 1) ("Failed to parse request headers: %s\n%s\n", + http->uri, prefix); + /* continue anyway? */ + } + + request->flags.accelerated = http->flags.accel; + if (!http->flags.internal) { + if (internalCheck(strBuf(request->urlpath))) { + if (internalHostnameIs(request->host) && + request->port == getMyPort()) { + http->flags.internal = 1; + } else if (internalStaticCheck(strBuf(request->urlpath))) { + xstrncpy(request->host, internalHostname(), + SQUIDHOSTNAMELEN); + request->port = getMyPort(); + http->flags.internal = 1; + } + } + } + + /* + * cache the Content-length value in request_t. + */ + request->content_length = httpHeaderGetInt(&request->header, + HDR_CONTENT_LENGTH); + request->flags.internal = http->flags.internal; + setLogUri (http, urlCanonicalClean(request)); + request->client_addr = conn->peer.sin_addr; + request->my_addr = conn->me.sin_addr; + request->my_port = ntohs(conn->me.sin_port); + request->http_ver = http->http_ver; + if (!urlCheckRequest(request) || + httpHeaderHas(&request->header, HDR_TRANSFER_ENCODING)) { + clientStreamNode *node = getClientReplyContext(context); + clientSetReplyToError(node->data, ERR_UNSUP_REQ, + HTTP_NOT_IMPLEMENTED, request->method, NULL, + &conn->peer.sin_addr, request, NULL, NULL); + assert(context->http->out.offset == 0); + clientPullData(context); + conn->flags.readMoreRequests = 0; + return; + } + + + if (!clientIsContentLengthValid(request)) { + clientStreamNode *node = getClientReplyContext(context); + clientSetReplyToError(node->data, ERR_INVALID_REQ, + HTTP_LENGTH_REQUIRED, request->method, NULL, + &conn->peer.sin_addr, request, NULL, NULL); + assert(context->http->out.offset == 0); + clientPullData(context); + conn->flags.readMoreRequests = 0; + return; + } + + http->request = requestLink(request); + + clientSetKeepaliveFlag(http); + /* Do we expect a request-body? */ + if (request->content_length > 0) { + conn->body.size_left = request->content_length; + request->body_connection = conn; + /* Is it too large? */ + if (!clientIsRequestBodyValid(request->content_length) || + clientIsRequestBodyTooLargeForPolicy(request->content_length)) { + clientStreamNode *node = getClientReplyContext(context); + clientSetReplyToError(node->data, ERR_TOO_BIG, + HTTP_REQUEST_ENTITY_TOO_LARGE, METHOD_NONE, NULL, + &conn->peer.sin_addr, http->request, NULL, NULL); + assert(context->http->out.offset == 0); + clientPullData(context); + conn->flags.readMoreRequests = 0; + return; + } + } + + /* If this is a CONNECT, don't schedule a read - ssl.c will handle it */ + if (http->request->method == METHOD_CONNECT) + context->flags.mayUseConnection = 1; + clientAccessCheck(http); +} + +static void +connStripBufferWhitespace (ConnStateData *conn) +{ + while (conn->in.notYetUsed > 0 && xisspace(conn->in.buf[0])) { + xmemmove(conn->in.buf, conn->in.buf + 1, conn->in.notYetUsed - 1); + --conn->in.notYetUsed; + } +} + +static int +connOkToAddRequest(ConnStateData *conn) +{ + int result = connGetConcurrentRequestCount(conn) < (Config.onoff.pipeline_prefetch ? 2 : 1); + if (!result) { + debug(33, 3) ("clientReadRequest: FD %d max concurrent requests reached\n", + conn->fd); + debug(33, 5) ("clientReadRequest: FD %d defering new request until one is done\n", + conn->fd); + } + return result; +} + +static void +connSetDefer (ConnStateData *conn, size_t milliSeconds) +{ + conn->defer.until = squid_curtime + milliSeconds; + conn->defer.n++; +} + +static void +clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, void *data) +{ + ConnStateData *conn = data; method_t method; char *prefix = NULL; - fde *F = &fd_table[fd]; - int len; clientSocketContext *context; - debug(33, 4) ("clientReadRequest: FD %d: reading request...\n", fd); - connMakeSpaceAvailable(conn); - len = connGetAvailableBufferLength(conn) - 1; - commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, conn, 0); - statCounter.syscalls.sock.reads++; - /* TODO: read should callback */ - size = FD_READ_METHOD(fd, conn->in.buf + conn->in.notYetUsed, len); + int do_next_read = 1; /* the default _is_ to read data! - adrian */ + + assert (fd == conn->fd); + + /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */ + if (flag == COMM_ERR_CLOSING) { + return; + } /* * Don't reset the timeout value here. The timeout value will be * set to Config.Timeout.request by httpAccept() and @@ -1294,184 +1473,88 @@ * whole, not individual read() calls. Plus, it breaks our * lame half-close detection */ - if (size > 0) { - fd_bytes(fd, size, FD_READ); - kb_incr(&statCounter.client_http.kbytes_in, size); - conn->in.notYetUsed += size; - conn->in.buf[conn->in.notYetUsed] = '\0'; /* Terminate the string */ - } else if (size == 0) { - if (connFinishedWithConn(conn, size)) { - comm_close(fd); - return; - } - /* It might be half-closed, we can't tell */ - debug(33, 5) ("clientReadRequest: FD %d closed?\n", fd); - F->flags.socket_eof = 1; - conn->defer.until = squid_curtime + 1; - conn->defer.n++; - fd_note(fd, "half-closed"); - /* There is one more close check at the end, to detect aborted - * (partial) requests. At this point we can't tell if the request - * is partial. - */ - /* Continue to process previously read data */ - } else if (connReadWasError(conn, size)) { + if (connReadWasError(conn, flag, size)) { comm_close(fd); return; } + + if (flag == COMM_OK) { + if (size > 0) { + kb_incr(&statCounter.client_http.kbytes_in, size); + conn->in.notYetUsed += size; + conn->in.buf[conn->in.notYetUsed] = '\0'; /* Terminate the string */ + } else if (size == 0) { + debug(33, 5) ("clientReadRequest: FD %d closed?\n", fd); + if (connFinishedWithConn(conn, size)) { + comm_close(fd); + return; + } + /* It might be half-closed, we can't tell */ + fd_table[fd].flags.socket_eof = 1; + connSetDefer (conn, 1); + fd_note(fd, "half-closed"); + /* There is one more close check at the end, to detect aborted + * (partial) requests. At this point we can't tell if the request + * is partial. + */ + /* Continue to process previously read data */ + } + } + /* Process request body if any */ if (conn->in.notYetUsed > 0 && conn->body.callback != NULL) clientProcessBody(conn); + /* Process next request */ if (connGetConcurrentRequestCount(conn) == 0) fd_note(conn->fd, "Reading next request"); while (conn->in.notYetUsed > 0 && conn->body.size_left == 0) { size_t req_line_sz; - /* Skip leading ( or trail from previous request) whitespace */ - while (conn->in.notYetUsed > 0 && xisspace(conn->in.buf[0])) { - xmemmove(conn->in.buf, conn->in.buf + 1, conn->in.notYetUsed - 1); - --conn->in.notYetUsed; + connStripBufferWhitespace (conn); + if (conn->in.notYetUsed == 0) { + clientAfterReadingRequests(fd, conn, do_next_read); + return; } - conn->in.buf[conn->in.notYetUsed] = '\0'; /* Terminate the string */ - if (conn->in.notYetUsed == 0) - break; /* Limit the number of concurrent requests to 2 */ - if (connGetConcurrentRequestCount(conn) >= (Config.onoff.pipeline_prefetch ? 2 : 1)) { - debug(33, 3) ("clientReadRequest: FD %d max concurrent requests reached\n", - fd); - debug(33, 5) ("clientReadRequest: FD %d defering new request until one is done\n", - fd); - conn->defer.until = squid_curtime + 100; /* Reset when a request is complete */ - conn->defer.n++; + if (!connOkToAddRequest(conn)) { + /* Reset when a request is complete */ + connSetDefer (conn, 100); + clientMaybeReadData (conn, do_next_read); return; } - conn->in.buf[conn->in.notYetUsed] = '\0'; /* Terminate the string */ + /* Should not be needed anymore */ + /* Terminate the string */ + conn->in.buf[conn->in.notYetUsed] = '\0'; /* Process request */ context = parseHttpRequest(conn, - &method, &parser_return_code, &prefix, &req_line_sz); - if (!context) + &method, &prefix, &req_line_sz); + /* partial or incomplete request */ + if (!context) { safe_free(prefix); + if (!connKeepReadingIncompleteRequest(conn)) + connCancelIncompleteRequests(conn); + break; /* conn->in.notYetUsed > 0 && conn->body.size_left == 0 */ + } + + /* status -1 or 1 */ if (context) { - clientHttpRequest *http = context->http; - /* We have an initial client stream in place should it be needed */ - /* setup our private context */ - connNoteUseOfBuffer(conn, http->req_sz); - - connAddContextToQueue(conn, context); commSetTimeout(fd, Config.Timeout.lifetime, clientLifetimeTimeout, - http); - if (parser_return_code < 0) { - clientStreamNode *node = getClientReplyContext(context); - debug(33, 1) ("clientReadRequest: FD %d Invalid Request\n", fd); - clientSetReplyToError(node->data, - ERR_INVALID_REQ, HTTP_BAD_REQUEST, method, NULL, - &conn->peer.sin_addr, NULL, conn->in.buf, NULL); - assert(context->http->out.offset == 0); - clientPullData(context); - safe_free(prefix); - break; - } - if ((request = urlParse(method, http->uri)) == NULL) { - clientStreamNode *node = getClientReplyContext(context); - debug(33, 5) ("Invalid URL: %s\n", http->uri); - clientSetReplyToError(node->data, - ERR_INVALID_URL, HTTP_BAD_REQUEST, method, http->uri, - &conn->peer.sin_addr, NULL, NULL, NULL); - assert(context->http->out.offset == 0); - clientPullData(context); - safe_free(prefix); - break; - } else { - /* compile headers */ - /* we should skip request line! */ - if (!httpRequestParseHeader(request, prefix + req_line_sz)) - debug(33, 1) ("Failed to parse request headers: %s\n%s\n", - http->uri, prefix); - /* continue anyway? */ - } - request->flags.accelerated = http->flags.accel; - if (!http->flags.internal) { - if (internalCheck(strBuf(request->urlpath))) { - if (internalHostnameIs(request->host) && - request->port == getMyPort()) { - http->flags.internal = 1; - } else if (internalStaticCheck(strBuf(request->urlpath))) { - xstrncpy(request->host, internalHostname(), - SQUIDHOSTNAMELEN); - request->port = getMyPort(); - http->flags.internal = 1; - } - } - } - /* - * cache the Content-length value in request_t. - */ - request->content_length = httpHeaderGetInt(&request->header, - HDR_CONTENT_LENGTH); - request->flags.internal = http->flags.internal; + context->http); + + clientProcessRequest(conn, context, method, prefix, req_line_sz); + safe_free(prefix); - safe_free(http->log_uri); - http->log_uri = xstrdup(urlCanonicalClean(request)); - request->client_addr = conn->peer.sin_addr; - request->my_addr = conn->me.sin_addr; - request->my_port = ntohs(conn->me.sin_port); - request->http_ver = http->http_ver; - if (!urlCheckRequest(request) || - httpHeaderHas(&request->header, HDR_TRANSFER_ENCODING)) { - clientStreamNode *node = getClientReplyContext(context); - clientSetReplyToError(node->data, ERR_UNSUP_REQ, - HTTP_NOT_IMPLEMENTED, request->method, NULL, - &conn->peer.sin_addr, request, NULL, NULL); - assert(context->http->out.offset == 0); - clientPullData(context); + if (!conn->flags.readMoreRequests) { + conn->flags.readMoreRequests = 1; break; } - if (!clientIsContentLengthValid(request)) { - clientStreamNode *node = getClientReplyContext(context); - clientSetReplyToError(node->data, ERR_INVALID_REQ, - HTTP_LENGTH_REQUIRED, request->method, NULL, - &conn->peer.sin_addr, request, NULL, NULL); - assert(context->http->out.offset == 0); - clientPullData(context); - break; - } - http->request = requestLink(request); - clientSetKeepaliveFlag(http); - /* Do we expect a request-body? */ - if (request->content_length > 0) { - conn->body.size_left = request->content_length; - request->body_connection = conn; - /* Is it too large? */ - if (!clientIsRequestBodyValid(request->content_length) || - clientIsRequestBodyTooLargeForPolicy(request->content_length)) { - clientStreamNode *node = getClientReplyContext(context); - clientSetReplyToError(node->data, ERR_TOO_BIG, - HTTP_REQUEST_ENTITY_TOO_LARGE, METHOD_NONE, NULL, - &conn->peer.sin_addr, http->request, NULL, NULL); - assert(context->http->out.offset == 0); - clientPullData(context); - break; - } - } - clientAccessCheck(http); + if (context->flags.mayUseConnection) + do_next_read = 0; continue; /* while offset > 0 && body.size_left == 0 */ - } else if (parser_return_code == 0) { - if (!connKeepReadingIncompleteRequest(conn)) - connCancelIncompleteRequests(conn); - break; } } /* while offset > 0 && conn->body.size_left == 0 */ - /* Check if a half-closed connection was aborted in the middle */ - if (F->flags.socket_eof) { - if (conn->in.notYetUsed != conn->body.size_left) { /* != 0 when no request body */ - /* Partial request received. Abort client connection! */ - debug(33, 3) ("clientReadRequest: FD %d aborted, partial request\n", - fd); - comm_close(fd); - return; - } - } + clientAfterReadingRequests(fd, conn, do_next_read); } /* file_read like function, for reading body content */ @@ -1681,60 +1764,61 @@ } ConnStateData * -connStateCreate(struct sockaddr_in peer, struct sockaddr_in me, int fd) +connStateCreate(struct sockaddr_in *peer, struct sockaddr_in *me, int fd) { ConnStateData *result = cbdataAlloc(ConnStateData); - result->peer = peer; - result->log_addr = peer.sin_addr; + memcpy(&result->peer, peer, sizeof(struct sockaddr_in)); + memcpy(&result->log_addr, &peer->sin_addr, sizeof(result->log_addr)); result->log_addr.s_addr &= Config.Addrs.client_netmask.s_addr; - result->me = me; + memcpy(&result->me, me, sizeof(struct sockaddr_in)); result->fd = fd; result->in.buf = memAllocBuf(CLIENT_REQ_BUF_SZ, &result->in.allocatedSize); + result->flags.readMoreRequests = 1; return result; } /* Handle a new connection on HTTP socket. */ void -httpAccept(int sock, void *data) +httpAccept(int sock, int newfd, struct sockaddr_in *me, struct sockaddr_in *peer, + comm_err_t flag, int xerrno, void *data) { int *N = &incoming_sockets_accepted; - int fd = -1; ConnStateData *connState = NULL; - struct sockaddr_in peer; - struct sockaddr_in me; - int max = INCOMING_HTTP_MAX; #if USE_IDENT static aclCheck_t identChecklist; #endif - commSetSelect(sock, COMM_SELECT_READ, httpAccept, NULL, 0); - while (max-- && !httpAcceptDefer(sock, NULL)) { - memset(&peer, '\0', sizeof(struct sockaddr_in)); - memset(&me, '\0', sizeof(struct sockaddr_in)); - if ((fd = comm_accept(sock, &peer, &me)) < 0) { - if (!ignoreErrno(errno)) + /* kick off another one for later */ + comm_accept(sock, httpAccept, NULL); + + /* XXX we're not considering httpAcceptDefer yet! */ + + do { + if (flag != COMM_OK) { + errno = xerrno; debug(50, 1) ("httpAccept: FD %d: accept failure: %s\n", sock, xstrerror()); - break; - } - debug(33, 4) ("httpAccept: FD %d: accepted\n", fd); - connState = connStateCreate(peer, me, fd); - comm_add_close_handler(fd, connStateFree, connState); + return; + } + + debug(33, 4) ("httpAccept: FD %d: accepted\n", newfd); + connState = connStateCreate(peer, me, newfd); + comm_add_close_handler(newfd, connStateFree, connState); if (Config.onoff.log_fqdn) - fqdncache_gethostbyaddr(peer.sin_addr, FQDN_LOOKUP_IF_MISS); - commSetTimeout(fd, Config.Timeout.request, requestTimeout, connState); + fqdncache_gethostbyaddr(peer->sin_addr, FQDN_LOOKUP_IF_MISS); + commSetTimeout(newfd, Config.Timeout.request, requestTimeout, connState); #if USE_IDENT - identChecklist.src_addr = peer.sin_addr; - identChecklist.my_addr = me.sin_addr; - identChecklist.my_port = ntohs(me.sin_port); + identChecklist.src_addr = peer->sin_addr; + identChecklist.my_addr = me->sin_addr; + identChecklist.my_port = ntohs(me->sin_port); if (aclCheckFast(Config.accessList.identLookup, &identChecklist)) - identStart(&me, &peer, clientIdentDone, connState); + identStart(me, peer, clientIdentDone, connState); #endif - commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, connState, 0); - commSetDefer(fd, clientReadDefer, connState); - clientdbEstablished(peer.sin_addr, 1); + clientReadSomeData(connState); + commSetDefer(newfd, clientReadDefer, connState); + clientdbEstablished(peer->sin_addr, 1); assert(N); (*N)++; - } + } while (0); } #if USE_SSL @@ -1777,7 +1861,7 @@ debug(83, 5) ("clientNegotiateSSL: FD %d has no certificate.\n", fd); } - commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, conn, 0); + clientReadSomeData(conn); } struct _https_port_data { @@ -1788,61 +1872,57 @@ /* handle a new HTTPS connection */ static void -httpsAccept(int sock, void *data) +httpsAccept(int sock, int newfd, struct sockaddr_in *me, struct sockaddr_in *peer, + comm_err_t flag, int xerrno, void *data) { int *N = &incoming_sockets_accepted; https_port_data *https_port = data; SSL_CTX *sslContext = https_port->sslContext; - int fd = -1; ConnStateData *connState = NULL; - struct sockaddr_in peer; - struct sockaddr_in me; - int max = INCOMING_HTTP_MAX; SSL *ssl; int ssl_error; #if USE_IDENT static aclCheck_t identChecklist; #endif - commSetSelect(sock, COMM_SELECT_READ, httpsAccept, https_port, 0); - while (max-- && !httpAcceptDefer(sock, NULL)) { - memset(&peer, '\0', sizeof(struct sockaddr_in)); - memset(&me, '\0', sizeof(struct sockaddr_in)); - if ((fd = comm_accept(sock, &peer, &me)) < 0) { - if (!ignoreErrno(errno)) + comm_accept(sock, httpsAccept, NULL); + do { + if (flag != COMM_OK) { + errno = xerrno; debug(50, 1) ("httpsAccept: FD %d: accept failure: %s\n", sock, xstrerror()); - break; - } + return; + } + if ((ssl = SSL_new(sslContext)) == NULL) { ssl_error = ERR_get_error(); debug(83, 1) ("httpsAccept: Error allocating handle: %s\n", ERR_error_string(ssl_error, NULL)); break; } - SSL_set_fd(ssl, fd); - fd_table[fd].ssl = ssl; - fd_table[fd].read_method = &ssl_read_method; - fd_table[fd].write_method = &ssl_write_method; - debug(50, 5) ("httpsAccept: FD %d accepted, starting SSL negotiation.\n", fd); + SSL_set_fd(ssl, newfd); + fd_table[newfd].ssl = ssl; + fd_table[newfd].read_method = &ssl_read_method; + fd_table[newfd].write_method = &ssl_write_method; + debug(50, 5) ("httpsAccept: FD %d accepted, starting SSL negotiation.\n", newfd); - connState = connStateCreate(peer, me, fd); + connState = connStateCreate(peer, me, newfd); /* XXX account connState->in.buf */ - comm_add_close_handler(fd, connStateFree, connState); + comm_add_close_handler(newfd, connStateFree, connState); if (Config.onoff.log_fqdn) - fqdncache_gethostbyaddr(peer.sin_addr, FQDN_LOOKUP_IF_MISS); - commSetTimeout(fd, Config.Timeout.request, requestTimeout, connState); + fqdncache_gethostbyaddr(peer->sin_addr, FQDN_LOOKUP_IF_MISS); + commSetTimeout(newfd, Config.Timeout.request, requestTimeout, connState); #if USE_IDENT - identChecklist.src_addr = peer.sin_addr; - identChecklist.my_addr = me.sin_addr; - identChecklist.my_port = ntohs(me.sin_port); + identChecklist.src_addr = peer->sin_addr; + identChecklist.my_addr = me->sin_addr; + identChecklist.my_port = ntohs(me->sin_port); if (aclCheckFast(Config.accessList.identLookup, &identChecklist)) - identStart(&me, &peer, clientIdentDone, connState); + identStart(me, peer, clientIdentDone, connState); #endif - commSetSelect(fd, COMM_SELECT_READ, clientNegotiateSSL, connState, 0); - commSetDefer(fd, clientReadDefer, connState); - clientdbEstablished(peer.sin_addr, 1); + commSetSelect(newfd, COMM_SELECT_READ, clientNegotiateSSL, connState, 0); + commSetDefer(newfd, clientReadDefer, connState); + clientdbEstablished(peer->sin_addr, 1); (*N)++; - } + } while (0); } #endif /* USE_SSL */ @@ -1911,7 +1991,7 @@ if (fd < 0) continue; comm_listen(fd); - commSetSelect(fd, COMM_SELECT_READ, httpAccept, NULL, 0); + comm_accept(fd, httpAccept, NULL); /* * We need to set a defer handler here so that we don't * peg the CPU with select() when we hit the FD limit. @@ -1950,7 +2030,7 @@ sslCreateContext(s->cert, s->key, s->version, s->cipher, s->options); comm_listen(fd); - commSetSelect(fd, COMM_SELECT_READ, httpsAccept, https_port, 0); + comm_accept(fd, httpsAccept, NULL); commSetDefer(fd, httpAcceptDefer, NULL); debug(1, 1) ("Accepting HTTPS connections at %s, port %d, FD %d.\n", inet_ntoa(s->s.sin_addr), (int) ntohs(s->s.sin_port), fd); Index: squid/src/client_side_reply.c diff -u squid/src/client_side_reply.c:1.10 squid/src/client_side_reply.c:1.7.2.3 --- squid/src/client_side_reply.c:1.10 Thu Oct 3 05:55:27 2002 +++ squid/src/client_side_reply.c Sun Oct 6 00:12:20 2002 @@ -484,7 +484,7 @@ StoreEntry *e = http->entry; MemObject *mem; request_t *r = http->request; - debug(88, 3) ("clientCacheHit: %s, %ud bytes\n", http->uri, result.length); + debug(88, 3) ("clientCacheHit: %s, %u bytes\n", http->uri, (unsigned int)result.length); if (http->entry == NULL) { debug(88, 3) ("clientCacheHit: request aborted\n"); return; @@ -1426,7 +1426,7 @@ context->flags.storelogiccomplete = 1; debug(88, 5) ("clientSendMoreData: %s, %d bytes (%u new bytes)\n", - http->uri, (int) size, result.length); + http->uri, (int) size, (unsigned int)result.length); assert(size <= HTTP_REQBUF_SZ || context->flags.headersSent); assert(http->request != NULL); /* ESI TODO: remove this assert once everything is stable */ @@ -1443,10 +1443,8 @@ if (fd != -1) comm_reset_close(fd); return; - } else if ( /* aborted request */ - (entry && EBIT_TEST(entry->flags, ENTRY_ABORTED)) || - /* Upstream read error */ (result.flags.error) || - /* Upstream EOF */ (body_size == 0)) { + } else if ((entry && EBIT_TEST(entry->flags, ENTRY_ABORTED)) || + (result.flags.error) || body_size == 0) { /* call clientWriteComplete so the client socket gets closed */ /* We call into the stream, because we don't know that there is a * client socket! @@ -1498,7 +1496,9 @@ } context->headers_sz = rep->hdr_sz; body_size = size - rep->hdr_sz; +#if 0 assert(body_size >= 0); +#endif body_buf = buf + rep->hdr_sz; debug(88, 3) Index: squid/src/comm.c diff -u squid/src/comm.c:1.27 squid/src/comm.c:1.21.4.28 --- squid/src/comm.c:1.27 Wed Oct 2 04:10:46 2002 +++ squid/src/comm.c Sat Oct 12 20:16:41 2002 @@ -34,6 +34,8 @@ */ #include "squid.h" +#include "StoreIOBuffer.h" +#include "comm.h" #if defined(_SQUID_CYGWIN_) #include @@ -74,8 +76,476 @@ static int commRetryConnect(ConnectStateData * cs); CBDATA_TYPE(ConnectStateData); + +struct _fdc_t { + int active; + dlink_list CommCallbackList; + struct { + char *buf; + int size; + IOCB *handler; + void *handler_data; + } read; + struct { + struct sockaddr_in me; + struct sockaddr_in pn; + IOACB *handler; + void *handler_data; + } accept; + struct CommFiller { + StoreIOBuffer requestedData; + size_t amountDone; + IOFCB *handler; + void *handler_data; + } fill; + +}; +typedef struct _fdc_t fdc_t; + +typedef enum { + COMM_CB_READ = 1, + COMM_CB_WRITE, + COMM_CB_ACCEPT, + COMM_CB_FILL +} comm_callback_t; + +struct _CommCallbackData { + comm_callback_t type; + dlink_node fd_node; + dlink_node h_node; + int fd; + int newfd; /* for accept() */ + char *buf; + int retval; + union { + IOCB *r_callback; + IOACB *a_callback; + IOFCB *f_callback; + } c; + void *callback_data; + comm_err_t errcode; + int xerrno; + int seqnum; + struct sockaddr_in me; + struct sockaddr_in pn; + StoreIOBuffer sb; +}; +typedef struct _CommCallbackData CommCallbackData; + +struct _fd_debug_t { + char *close_file; + int close_line; +}; +typedef struct _fd_debug_t fd_debug_t; + static MemPool *comm_write_pool = NULL; static MemPool *conn_close_pool = NULL; +static MemPool *comm_callback_pool = NULL; +fdc_t *fdc_table = NULL; +fd_debug_t *fdd_table = NULL; +dlink_list CommCallbackList; +static int CommCallbackSeqnum = 1; + + +/* New and improved stuff */ + +/* + * return whether there are entries in the callback queue + */ +int +comm_existsiocallback(void) +{ + return CommCallbackList.head == NULL; +} + +/* + * add an IO callback + * + * IO callbacks are added when we want to notify someone that some IO + * has finished but we don't want to risk re-entering a non-reentrant + * code block. + */ +static void +comm_addreadcallback(int fd, IOCB *callback, char *buf, size_t retval, comm_err_t errcode, + int xerrno, void *callback_data) +{ + CommCallbackData *cio; + + assert(fdc_table[fd].active == 1); + + /* Allocate a new struct */ + cio = memPoolAlloc(comm_callback_pool); + + /* Throw our data into it */ + cio->fd = fd; + cio->retval = retval; + cio->xerrno = xerrno; + cio->errcode = errcode; + cio->c.r_callback = callback; + cio->callback_data = callback_data; + cio->seqnum = CommCallbackSeqnum; + cio->buf = buf; + cio->type = COMM_CB_READ; + + /* Add it to the end of the list */ + dlinkAddTail(cio, &(cio->h_node), &CommCallbackList); + + /* and add it to the end of the fd list */ + dlinkAddTail(cio, &(cio->fd_node), &(fdc_table[fd].CommCallbackList)); + +} + + +static void +comm_addacceptcallback(int fd, int newfd, IOACB *callback, struct sockaddr_in *pn, + struct sockaddr_in *me, comm_err_t errcode, int xerrno, void *callback_data) +{ + CommCallbackData *cio; + + assert(fdc_table[fd].active == 1); + + /* Allocate a new struct */ + cio = memPoolAlloc(comm_callback_pool); + + /* Throw our data into it */ + cio->fd = fd; + cio->xerrno = xerrno; + cio->errcode = errcode; + cio->c.a_callback = callback; + cio->callback_data = callback_data; + cio->seqnum = CommCallbackSeqnum; + cio->type = COMM_CB_ACCEPT; + cio->newfd = newfd; + cio->pn = *pn; + cio->me = *me; + + /* Add it to the end of the list */ + dlinkAddTail(cio, &(cio->h_node), &CommCallbackList); + + /* and add it to the end of the fd list */ + dlinkAddTail(cio, &(cio->fd_node), &(fdc_table[fd].CommCallbackList)); + +} + +static void +comm_add_fill_callback(int fd, size_t retval, comm_err_t errcode, int xerrno) +{ + CommCallbackData *cio; + + assert(fdc_table[fd].active == 1); + + /* Allocate a new struct */ + cio = memPoolAlloc(comm_callback_pool); + + /* Throw our data into it */ + cio->fd = fd; + cio->xerrno = xerrno; + cio->errcode = errcode; + cio->c.f_callback = fdc_table[fd].fill.handler; + cio->callback_data = fdc_table[fd].fill.handler_data; + cio->seqnum = CommCallbackSeqnum; + cio->type = COMM_CB_FILL; + /* retval not used */ + cio->retval = -1; + cio->sb = fdc_table[fd].fill.requestedData; + cio->sb.length = retval; + /* Clear out fd state */ + fdc_table[fd].fill.handler = fdc_table[fd].fill.handler_data = NULL; + + /* Add it to the end of the list */ + dlinkAddTail(cio, &(cio->h_node), &CommCallbackList); + + /* and add it to the end of the fd list */ + dlinkAddTail(cio, &(cio->fd_node), &(fdc_table[fd].CommCallbackList)); +} + + + + +static void +comm_call_io_callback(CommCallbackData *cio) +{ + switch(cio->type) { + case COMM_CB_READ: + cio->c.r_callback(cio->fd, cio->buf, cio->retval, cio->errcode, cio->xerrno, + cio->callback_data); + break; + case COMM_CB_WRITE: + fatal("write comm hasn't been implemented yet!"); + break; + case COMM_CB_ACCEPT: + cio->c.a_callback(cio->fd, cio->newfd, &cio->me, &cio->pn, cio->errcode, + cio->xerrno, cio->callback_data); + break; + case COMM_CB_FILL: + cio->c.f_callback(cio->fd, cio->sb, cio->errcode, + cio->xerrno, cio->callback_data); + break; + default: + fatal("unknown comm io callback type!"); + break; + }; +} + + +/* + * call the IO callbacks + * + * This should be called before comm_select() so code can attempt to + * initiate some IO. + * + * When io callbacks are added, they are added with the current + * sequence number. The sequence number is incremented in this routine - + * since callbacks are added to the _tail_ of the list, when we hit a + * callback with a seqnum _not_ what it was when we entered this routine, + * we can stop. + */ +void +comm_calliocallback(void) +{ + CommCallbackData *cio; + dlink_node *node; + int oldseqnum = CommCallbackSeqnum; + + /* Call our callbacks until we hit NULL or the seqnum changes */ + while (CommCallbackList.head != NULL) { + node = CommCallbackList.head; + cio = node->data; + + /* If seqnum isn't the same, its time to die */ + if (cio->seqnum != oldseqnum) + break; /* we've hit newly-added events */ + + assert(fdc_table[cio->fd].active == 1); + + dlinkDelete(&cio->h_node, &CommCallbackList); + dlinkDelete(&cio->fd_node, &(fdc_table[cio->fd].CommCallbackList)); + comm_call_io_callback(cio); + memPoolFree(comm_callback_pool, cio); + } +} + + +/* + * Queue a callback + */ +static void +comm_read_callback(int fd, int retval, comm_err_t errcode, int xerrno) +{ + fdc_t *Fc = &fdc_table[fd]; + + assert(Fc->read.handler != NULL); + + comm_addreadcallback(fd, Fc->read.handler, Fc->read.buf, retval, errcode, xerrno, + Fc->read.handler_data); + Fc->read.handler = NULL; + Fc->read.handler_data = NULL; +} + +/* + * Attempt a read + * + * If the read attempt succeeds or fails, call the callback. + * Else, wait for another IO notification. + */ +static void +comm_read_try(int fd, void *data) +{ + fdc_t *Fc = &fdc_table[fd]; + int retval; + + /* make sure we actually have a callback */ + assert(Fc->read.handler != NULL); + + /* Attempt a read */ + statCounter.syscalls.sock.reads++; + retval = FD_READ_METHOD(fd, Fc->read.buf, Fc->read.size); + if (retval < 0 && !ignoreErrno(errno)) { + comm_read_callback(fd, -1, COMM_ERROR, errno); + return; + }; + + /* See if we read anything */ + /* Note - read 0 == socket EOF, which is a valid read */ + if (retval >= 0) { + fd_bytes(fd, retval, FD_READ); + comm_read_callback(fd, retval, COMM_OK, 0); + return; + } + + /* Nope, register for some more IO */ + commSetSelect(fd, COMM_SELECT_READ, comm_read_try, NULL, 0); +} + +/* + * Queue a read. handler/handler_data are called when the read + * completes, on error, or on file descriptor close. + */ +void +comm_read(int fd, char *buf, int size, IOCB *handler, void *handler_data) +{ + /* Make sure we're not reading anything and we're not closing */ + assert(fdc_table[fd].active == 1); + assert(fdc_table[fd].read.handler == NULL); + assert(!fd_table[fd].flags.closing); + + /* Queue a read */ + fdc_table[fd].read.buf = buf; + fdc_table[fd].read.size = size; + fdc_table[fd].read.handler = handler; + fdc_table[fd].read.handler_data = handler_data; + +#if OPTIMISTIC_IO + comm_read_try(fd, NULL); +#else + /* Register intrest in a FD read */ + commSetSelect(fd, COMM_SELECT_READ, comm_read_try, NULL, 0); +#endif +} + +static void +comm_fill_read(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) +{ + /* TODO use a reference to the table entry, or use C++ :] */ + StoreIOBuffer *sb; + struct CommFiller *fill; + assert(fdc_table[fd].active == 1); + + if (flag != COMM_OK) { + /* Error! */ + comm_add_fill_callback(fd, -1, flag, xerrno); + return; + } + /* flag is COMM_OK */ + /* We handle EOFs as read lengths of 0! Its eww, but its consistent */ + fill = &fdc_table[fd].fill; + fill->amountDone += len; + sb = &fdc_table[fd].fill.requestedData; + assert(fill->amountDone <= sb->length); + comm_add_fill_callback(fd, fill->amountDone, COMM_OK, 0); +} + +/* + * Try filling a StoreIOBuffer with some data, and call a callback when successful + */ +void +comm_fill_immediate(int fd, StoreIOBuffer sb, IOFCB *callback, void *data) +{ + assert(fdc_table[fd].fill.handler == NULL); + /* prevent confusion */ + assert (sb.offset == 0); + + /* If we don't have any data, record details and schedule a read */ + fdc_table[fd].fill.handler = callback; + fdc_table[fd].fill.handler_data = data; + fdc_table[fd].fill.requestedData = sb; + fdc_table[fd].fill.amountDone = 0; + + comm_read(fd, sb.data, sb.length, comm_fill_read, NULL); +} + + +/* + * Empty the read buffers + * + * This is a magical routine that empties the read buffers. + * Under some platforms (Linux) if a buffer has data in it before + * you call close(), the socket will hang and take quite a while + * to timeout. + */ +static void +comm_empty_os_read_buffers(int fd) +{ +#ifdef _SQUID_LINUX_ + /* prevent those nasty RST packets */ + char buf[SQUID_TCP_SO_RCVBUF]; + if (fd_table[fd].flags.nonblocking == 1) + while (FD_READ_METHOD(fd, buf, SQUID_TCP_SO_RCVBUF) > 0); +#endif +} + + +/* + * Return whether a file descriptor has any pending read request callbacks + * + * Assumptions: the fd is open (ie, its not closing) + */ +int +comm_has_pending_read_callback(int fd) +{ + dlink_node *node; + CommCallbackData *cd; + + assert(fd_table[fd].flags.open == 1); + assert(fdc_table[fd].active == 1); + + /* + * XXX I don't like having to walk the list! + * Instead, if this routine is called often enough, we should + * also maintain a linked list of _read_ events - we can just + * check if the list head a HEAD.. + * - adrian + */ + node = fdc_table[fd].CommCallbackList.head; + while (node != NULL) { + cd = node->data; + if (cd->type == COMM_CB_READ) + return 1; + node = node->next; + } + + /* Not found */ + return 0; +} + +/* + * return whether a file descriptor has a read handler + * + * Assumptions: the fd is open + */ +int +comm_has_pending_read(int fd) +{ + assert(fd_table[fd].flags.open == 1); + assert(fdc_table[fd].active == 1); + + return (fdc_table[fd].read.handler != NULL); +} + +/* + * Cancel a pending read. Assert that we have the right parameters, + * and that there are no pending read events! + */ +void +comm_read_cancel(int fd, IOCB *callback, void *data) +{ + assert(fd_table[fd].flags.open == 1); + assert(fdc_table[fd].active == 1); + + assert(fdc_table[fd].read.handler == callback); + assert(fdc_table[fd].read.handler_data == data); + + assert(!comm_has_pending_read_callback(fd)); + + /* Ok, we can be reasonably sure we won't lose any data here! */ + + /* Delete the callback */ + fdc_table[fd].read.handler = NULL; + fdc_table[fd].read.handler_data = NULL; +} + + +void +fdc_open(int fd, unsigned int type, char *desc) +{ + assert(fdc_table[fd].active == 0); + + fdc_table[fd].active = 1; + fd_open(fd, type, desc); +} + + +/* Older stuff */ static void CommWriteStateCallbackAndFree(int fd, comm_err_t code) @@ -207,6 +677,10 @@ /* update fdstat */ debug(5, 5) ("comm_open: FD %d is a new socket\n", new_socket); fd_open(new_socket, FD_SOCKET, note); + fdd_table[new_socket].close_file = NULL; + fdd_table[new_socket].close_line = 0; + assert(fdc_table[new_socket].active == 0); + fdc_table[new_socket].active = 1; F = &fd_table[new_socket]; F->local_addr = addr; F->tos = tos; @@ -243,26 +717,6 @@ return new_socket; } -/* - * NOTE: set the listen queue to Squid_MaxFD/4 and rely on the kernel to - * impose an upper limit. Solaris' listen(3n) page says it has - * no limit on this parameter, but sys/socket.h sets SOMAXCONN - * to 5. HP-UX currently has a limit of 20. SunOS is 5 and - * OSF 3.0 is 8. - */ -int -comm_listen(int sock) -{ - int x; - if ((x = listen(sock, Squid_MaxFD >> 2)) < 0) { - debug(50, 0) ("comm_listen: listen(%d, %d): %s\n", - Squid_MaxFD >> 2, - sock, xstrerror()); - return x; - } - return sock; -} - void commConnectStart(int fd, const char *host, u_short port, CNCB * callback, void *data) { @@ -526,7 +980,7 @@ /* Wait for an incoming connection on FD. FD should be a socket returned * from comm_listen. */ int -comm_accept(int fd, struct sockaddr_in *pn, struct sockaddr_in *me) +comm_old_accept(int fd, struct sockaddr_in *pn, struct sockaddr_in *me) { int sock; struct sockaddr_in P; @@ -539,13 +993,13 @@ if ((sock = accept(fd, (struct sockaddr *) &P, &Slen)) < 0) { PROF_stop(comm_accept); if (ignoreErrno(errno)) { - debug(50, 5) ("comm_accept: FD %d: %s\n", fd, xstrerror()); + debug(50, 5) ("comm_old_accept: FD %d: %s\n", fd, xstrerror()); return COMM_NOMESSAGE; } else if (ENFILE == errno || EMFILE == errno) { - debug(50, 3) ("comm_accept: FD %d: %s\n", fd, xstrerror()); + debug(50, 3) ("comm_old_accept: FD %d: %s\n", fd, xstrerror()); return COMM_ERROR; } else { - debug(50, 1) ("comm_accept: FD %d: %s\n", fd, xstrerror()); + debug(50, 1) ("comm_old_accept: FD %d: %s\n", fd, xstrerror()); return COMM_ERROR; } } @@ -559,6 +1013,9 @@ commSetCloseOnExec(sock); /* fdstat update */ fd_open(sock, FD_SOCKET, "HTTP Request"); + fdd_table[sock].close_file = NULL; + fdd_table[sock].close_line = 0; + fdc_table[sock].active = 1; F = &fd_table[sock]; xstrncpy(F->ipaddr, inet_ntoa(P.sin_addr), 16); F->remote_port = htons(P.sin_port); @@ -638,21 +1095,35 @@ comm_close(fd); } + +/* + * Close the socket fd. + * + * + call write handlers with ERR_CLOSING + * + call read handlers with ERR_CLOSING + * + call closing handlers + */ void -comm_close(int fd) +_comm_close(int fd, char *file, int line) { fde *F = NULL; + dlink_node *node; + CommCallbackData *cio; debug(5, 5) ("comm_close: FD %d\n", fd); assert(fd >= 0); assert(fd < Squid_MaxFD); F = &fd_table[fd]; + fdd_table[fd].close_file = file; + fdd_table[fd].close_line = line; if (F->flags.closing) return; if (shutting_down && (!F->flags.open || F->type == FD_FILE)) return; assert(F->flags.open); + /* The following fails because ipc.c is doing calls to pipe() to create sockets! */ + /* assert(fdc_table[fd].active == 1); */ assert(F->type != FD_FILE); PROF_start(comm_close); F->flags.closing = 1; @@ -662,6 +1133,19 @@ #endif commSetTimeout(fd, -1, NULL, NULL); CommWriteStateCallbackAndFree(fd, COMM_ERR_CLOSING); + + /* Delete any pending io callbacks */ + while (fdc_table[fd].CommCallbackList.head != NULL) { + node = fdc_table[fd].CommCallbackList.head; + cio = node->data; + assert(fd == cio->fd); /* just paranoid */ + dlinkDelete(&cio->h_node, &CommCallbackList); + dlinkDelete(&cio->fd_node, &(fdc_table[cio->fd].CommCallbackList)); + + comm_call_io_callback(cio); + memPoolFree(comm_callback_pool, cio); + } + commCallCloseHandlers(fd); if (F->uses) /* assume persistent connect count */ pconnHistCount(1, F->uses); @@ -671,8 +1155,11 @@ F->ssl = NULL; } #endif + comm_empty_os_read_buffers(fd); fd_close(fd); /* update fdstat */ close(fd); + fdc_table[fd].active = 0; + bzero(&fdc_table[fd], sizeof(fdc_t)); statCounter.syscalls.sock.closes++; PROF_stop(comm_close); } @@ -853,12 +1340,16 @@ comm_init(void) { fd_table = xcalloc(Squid_MaxFD, sizeof(fde)); + fdd_table = xcalloc(Squid_MaxFD, sizeof(fd_debug_t)); + fdc_table = xcalloc(Squid_MaxFD, sizeof(fdc_t)); /* XXX account fd_table */ /* Keep a few file descriptors free so that we don't run out of FD's * after accepting a client but before it opens a socket or a file. * Since Squid_MaxFD can be as high as several thousand, don't waste them */ RESERVED_FD = XMIN(100, Squid_MaxFD / 4); CBDATA_INIT_TYPE(ConnectStateData); + + comm_callback_pool = memPoolCreate("comm callbacks", sizeof(CommCallbackData)); comm_write_pool = memPoolCreate("CommWriteStateData", sizeof(CommWriteStateData)); conn_close_pool = memPoolCreate("close_handler", sizeof(close_handler)); } @@ -935,6 +1426,9 @@ comm_write(int fd, const char *buf, int size, CWCB * handler, void *handler_data, FREE * free_func) { CommWriteStateData *state = fd_table[fd].rwstate; + + assert(!fd_table[fd].flags.closing); + debug(5, 5) ("comm_write: FD %d: sz %d: hndl %p: data %p.\n", fd, size, handler, handler_data); if (NULL != state) { @@ -959,6 +1453,7 @@ comm_write(fd, mb.buf, mb.size, handler, handler_data, memBufFreeFunc(&mb)); } + /* * hm, this might be too general-purpose for all the places we'd * like to use it. @@ -1048,3 +1543,96 @@ return 0; return F->defer_check(fd, F->defer_data); } + + +/* + * New-style listen and accept routines + * + * Listen simply registers our interest in an FD for listening, + * and accept takes a callback to call when an FD has been + * accept()ed. + */ +int +comm_listen(int sock) +{ + int x; + if ((x = listen(sock, Squid_MaxFD >> 2)) < 0) { + debug(50, 0) ("comm_listen: listen(%d, %d): %s\n", + Squid_MaxFD >> 2, + sock, xstrerror()); + return x; + } + return sock; +} + + +/* + * This callback is called whenever a filedescriptor is ready + * to dupe itself and fob off an accept()ed connection + */ +static void +comm_accept_try(int fd, void *data) +{ + int newfd; + fdc_t *Fc; + + assert(fdc_table[fd].active == 1); + + Fc = &(fdc_table[fd]); + + /* Accept a new connection */ + newfd = comm_old_accept(fd, &Fc->accept.pn, &Fc->accept.me); + + if (newfd < 0) { + /* Issues - check them */ + if (newfd == COMM_NOMESSAGE) { + /* register interest again */ + commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0); + return; + } + /* Problem! */ + comm_addacceptcallback(fd, -1, Fc->accept.handler, &Fc->accept.pn, &Fc->accept.me, COMM_ERROR, errno, Fc->accept.handler_data); + Fc->accept.handler = NULL; + Fc->accept.handler_data = NULL; + return; + } + + /* setup our new filedescriptor in fd_table */ + /* and set it up in fdc_table */ + + /* queue a completed callback with the new FD */ + comm_addacceptcallback(fd, newfd, Fc->accept.handler, &Fc->accept.pn, &Fc->accept.me, COMM_OK, 0, Fc->accept.handler_data); + Fc->accept.handler = NULL; + Fc->accept.handler_data = NULL; + +} + + +/* + * Notes: + * + the current interface will queue _one_ accept per io loop. + * this isn't very optimal and should be revisited at a later date. + */ +void +comm_accept(int fd, IOACB *handler, void *handler_data) +{ + fdc_t *Fc; + + assert(fd_table[fd].flags.open == 1); + assert(fdc_table[fd].active == 1); + + /* make sure we're not pending! */ + assert(fdc_table[fd].accept.handler == NULL); + + /* Record our details */ + Fc = &fdc_table[fd]; + Fc->accept.handler = handler; + Fc->accept.handler_data = handler_data; + + /* Kick off the accept */ +#if OPTIMISTIC_IO + comm_accept_try(fd, NULL); +#else + commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0); +#endif +} Index: squid/src/comm.h diff -u /dev/null squid/src/comm.h:1.1.2.5 --- /dev/null Tue Sep 28 18:31:27 2004 +++ squid/src/comm.h Fri Oct 11 21:43:01 2002 @@ -0,0 +1,14 @@ +#ifndef __COMM_H__ +#define __COMM_H__ + + +typedef void IOFCB(int fd, StoreIOBuffer recievedData, comm_err_t flag, int xerrno, void *data); +/* fill sb with up to length data from fd */ +extern void comm_fill_immediate(int fd, StoreIOBuffer sb, IOFCB *callback, void *data); + +extern int comm_has_pending_read_callback(int fd); +extern int comm_has_pending_read(int fd); +extern void comm_read_cancel(int fd, IOCB *callback, void *data); +extern void fdc_open(int fd, unsigned int type, char *desc); + +#endif Index: squid/src/enums.h diff -u squid/src/enums.h:1.40 squid/src/enums.h:1.32.2.4 --- squid/src/enums.h:1.40 Tue Sep 24 03:59:15 2002 +++ squid/src/enums.h Fri Oct 4 06:36:31 2002 @@ -720,7 +720,6 @@ CBDATA_helper_server, CBDATA_statefulhelper, CBDATA_helper_stateful_server, - CBDATA_HttpStateData, CBDATA_peer, CBDATA_ps_state, CBDATA_RemovalPolicy, @@ -751,6 +750,7 @@ DIGEST_READ_DONE } digest_read_state_t; + typedef enum { COMM_OK = 0, COMM_ERROR = -1, Index: squid/src/ftp.c diff -u squid/src/ftp.c:1.28 squid/src/ftp.c:1.20.10.8 --- squid/src/ftp.c:1.28 Sun Sep 15 04:06:32 2002 +++ squid/src/ftp.c Mon Oct 14 00:00:59 2002 @@ -145,12 +145,12 @@ /* Local functions */ static CNCB ftpPasvCallback; -static PF ftpDataRead; +static IOCB ftpDataRead; static PF ftpDataWrite; static CWCB ftpDataWriteCallback; static PF ftpStateFree; static PF ftpTimeout; -static PF ftpReadControlReply; +static IOCB ftpReadControlReply; static CWCB ftpWriteCommandCallback; static void ftpLoginParser(const char *, FtpStateData *, int escaped); static wordlist *ftpParseControlReply(char *, size_t, int *, int *); @@ -870,11 +870,11 @@ ftpScheduleReadControlReply(ftpState, 1); } + static void -ftpDataRead(int fd, void *data) +ftpDataRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { FtpStateData *ftpState = data; - int len; int j; int bin; StoreEntry *entry = ftpState->entry; @@ -884,20 +884,18 @@ delay_id delay_id = delayMostBytesAllowed(mem); #endif assert(fd == ftpState->data.fd); + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (flag == COMM_ERR_CLOSING) { + return; + } + if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { comm_close(ftpState->ctrl.fd); return; } - errno = 0; - read_sz = ftpState->data.size - ftpState->data.offset; -#if DELAY_POOLS - read_sz = delayBytesWanted(delay_id, 1, read_sz); -#endif - memset(ftpState->data.buf + ftpState->data.offset, '\0', read_sz); - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, ftpState->data.buf + ftpState->data.offset, read_sz); - if (len > 0) { - fd_bytes(fd, len, FD_READ); + + if (flag == COMM_OK && len > 0) { #if DELAY_POOLS delayBytesIn(delay_id, len); #endif @@ -905,8 +903,8 @@ kb_incr(&statCounter.server.ftp.kbytes_in, len); ftpState->data.offset += len; } - debug(9, 5) ("ftpDataRead: FD %d, Read %d bytes\n", fd, len); - if (len > 0) { + debug(9, 5) ("ftpDataRead: FD %d, Read %d bytes\n", fd, (int)len); + if (flag == COMM_OK && len > 0) { IOStats.Ftp.reads++; for (j = len - 1, bin = 0; j; bin++) j >>= 1; @@ -915,14 +913,15 @@ if (ftpState->flags.isdir && !ftpState->flags.html_header_sent && len >= 0) { ftpListingStart(ftpState); } - if (len < 0) { + if (flag != COMM_OK || len < 0) { debug(50, ignoreErrno(errno) ? 3 : 1) ("ftpDataRead: read error: %s\n", xstrerror()); if (ignoreErrno(errno)) { - commSetSelect(fd, - COMM_SELECT_READ, - ftpDataRead, - data, - Config.Timeout.read); + /* XXX what about Config.Timeout.read? */ + read_sz = ftpState->data.size - ftpState->data.offset; +#if DELAY_POOLS + read_sz = delayBytesWanted(delay_id, 1, read_sz); +#endif + comm_read(fd, ftpState->data.buf + ftpState->data.offset, read_sz, ftpDataRead, data); } else { ftpFailed(ftpState, ERR_READ_ERROR); /* ftpFailed closes ctrl.fd and frees ftpState */ @@ -937,11 +936,12 @@ storeAppend(entry, ftpState->data.buf, len); ftpState->data.offset = 0; } - commSetSelect(fd, - COMM_SELECT_READ, - ftpDataRead, - data, - Config.Timeout.read); + /* XXX what about Config.Timeout.read? */ + read_sz = ftpState->data.size - ftpState->data.offset; +#if DELAY_POOLS + read_sz = delayBytesWanted(delay_id, 1, read_sz); +#endif + comm_read(fd, ftpState->data.buf + ftpState->data.offset, read_sz, ftpDataRead, data); } } @@ -1220,11 +1220,9 @@ /* We've already read some reply data */ ftpHandleControlReply(ftpState); } else { - commSetSelect(ftpState->ctrl.fd, - COMM_SELECT_READ, - ftpReadControlReply, - ftpState, - Config.Timeout.read); + /* XXX What about Config.Timeout.read? */ + comm_read(ftpState->ctrl.fd, ftpState->ctrl.buf + ftpState->ctrl.offset, + ftpState->ctrl.size - ftpState->ctrl.offset, ftpReadControlReply, ftpState); /* * Cancel the timeout on the Data socket (if any) and * establish one on the control socket. @@ -1237,28 +1235,29 @@ } static void -ftpReadControlReply(int fd, void *data) +ftpReadControlReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { FtpStateData *ftpState = data; StoreEntry *entry = ftpState->entry; - int len; debug(9, 5) ("ftpReadControlReply\n"); + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (flag == COMM_ERR_CLOSING) { + return; + } + if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { comm_close(ftpState->ctrl.fd); return; } assert(ftpState->ctrl.offset < ftpState->ctrl.size); - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, - ftpState->ctrl.buf + ftpState->ctrl.offset, - ftpState->ctrl.size - ftpState->ctrl.offset); - if (len > 0) { + if (flag == COMM_OK && len > 0) { fd_bytes(fd, len, FD_READ); kb_incr(&statCounter.server.all.kbytes_in, len); kb_incr(&statCounter.server.ftp.kbytes_in, len); } - debug(9, 5) ("ftpReadControlReply: FD %d, Read %d bytes\n", fd, len); - if (len < 0) { + debug(9, 5) ("ftpReadControlReply: FD %d, Read %d bytes\n", fd, (int)len); + if (flag != COMM_OK || len < 0) { debug(50, ignoreErrno(errno) ? 3 : 1) ("ftpReadControlReply: read error: %s\n", xstrerror()); if (ignoreErrno(errno)) { ftpScheduleReadControlReply(ftpState, 0); @@ -1892,42 +1891,39 @@ /* "read" handler to accept data connection */ static void -ftpAcceptDataConnection(int fd, void *data) +ftpAcceptDataConnection(int sock, int newfd, struct sockaddr_in *me, struct sockaddr_in *my_peer, + comm_err_t flag, int xerrno, void *data) { FtpStateData *ftpState = data; - struct sockaddr_in my_peer, me; debug(9, 3) ("ftpAcceptDataConnection\n"); if (EBIT_TEST(ftpState->entry->flags, ENTRY_ABORTED)) { comm_close(ftpState->ctrl.fd); return; } - fd = comm_accept(fd, &my_peer, &me); + if (Config.Ftp.sanitycheck) { - char *ipaddr = inet_ntoa(my_peer.sin_addr); + char *ipaddr = inet_ntoa(my_peer->sin_addr); if (strcmp(fd_table[ftpState->ctrl.fd].ipaddr, ipaddr) != 0) { - debug(9, 1) ("FTP data connection from unexpected server (%s:%d), expecting %s\n", ipaddr, (int) ntohs(my_peer.sin_port), fd_table[ftpState->ctrl.fd].ipaddr); - comm_close(fd); - commSetSelect(ftpState->data.fd, - COMM_SELECT_READ, - ftpAcceptDataConnection, - ftpState, - 0); + debug(9, 1) ("FTP data connection from unexpected server (%s:%d), expecting %s\n", ipaddr, (int) ntohs(my_peer->sin_port), fd_table[ftpState->ctrl.fd].ipaddr); + comm_close(newfd); + comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState); return; } } - if (fd < 0) { - debug(9, 1) ("ftpHandleDataAccept: comm_accept(%d): %s", fd, xstrerror()); + if (flag != COMM_OK) { + errno = xerrno; + debug(9, 1) ("ftpHandleDataAccept: comm_old_accept(%d): %s", newfd, xstrerror()); /* XXX Need to set error message */ ftpFail(ftpState); return; } /* Replace the Listen socket with the accepted data socket */ comm_close(ftpState->data.fd); - debug(9, 3) ("ftpAcceptDataConnection: Connected data socket on FD %d\n", fd); - ftpState->data.fd = fd; - ftpState->data.port = ntohs(my_peer.sin_port); - ftpState->data.host = xstrdup(inet_ntoa(my_peer.sin_addr)); + debug(9, 3) ("ftpAcceptDataConnection: Connected data socket on FD %d\n", newfd); + ftpState->data.fd = newfd; + ftpState->data.port = ntohs(my_peer->sin_port); + ftpState->data.host = xstrdup(inet_ntoa(my_peer->sin_addr)); commSetTimeout(ftpState->ctrl.fd, -1, NULL, NULL); commSetTimeout(ftpState->data.fd, Config.Timeout.read, ftpTimeout, ftpState); @@ -2006,11 +2002,7 @@ } else if (code == 150) { /* Accept data channel */ debug(9, 3) ("ftpReadStor: accepting data channel\n"); - commSetSelect(ftpState->data.fd, - COMM_SELECT_READ, - ftpAcceptDataConnection, - ftpState, - 0); + comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState); } else { debug(9, 3) ("ftpReadStor: Unexpected reply code %03d\n", code); ftpFail(ftpState); @@ -2096,11 +2088,9 @@ if (code == 125 || (code == 150 && ftpState->data.host)) { /* Begin data transfer */ ftpAppendSuccessHeader(ftpState); - commSetSelect(ftpState->data.fd, - COMM_SELECT_READ, - ftpDataRead, - ftpState, - Config.Timeout.read); + /* XXX what about Config.Timeout.read? */ + assert(ftpState->data.offset == 0); + comm_read(ftpState->data.fd, ftpState->data.buf, ftpState->data.size, ftpDataRead, ftpState); commSetDefer(ftpState->data.fd, fwdCheckDeferRead, ftpState->entry); ftpState->state = READING_DATA; /* @@ -2112,11 +2102,7 @@ return; } else if (code == 150) { /* Accept data channel */ - commSetSelect(ftpState->data.fd, - COMM_SELECT_READ, - ftpAcceptDataConnection, - ftpState, - 0); + comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState); /* * Cancel the timeout on the Control socket and establish one * on the data socket @@ -2145,16 +2131,19 @@ ftpReadRetr(FtpStateData * ftpState) { int code = ftpState->ctrl.replycode; + int read_sz; debug(9, 3) ("This is ftpReadRetr\n"); if (code == 125 || (code == 150 && ftpState->data.host)) { /* Begin data transfer */ debug(9, 3) ("ftpReadRetr: reading data channel\n"); ftpAppendSuccessHeader(ftpState); - commSetSelect(ftpState->data.fd, - COMM_SELECT_READ, - ftpDataRead, - ftpState, - Config.Timeout.read); + /* XXX what about Config.Timeout.read? */ + read_sz = ftpState->data.size - ftpState->data.offset; +#if DELAY_POOLS + read_sz = delayBytesWanted(delay_id, 1, read_sz); +#endif + comm_read(ftpState->data.fd, ftpState->data.buf + ftpState->data.offset, + read_sz, ftpDataRead, ftpState); commSetDefer(ftpState->data.fd, fwdCheckDeferRead, ftpState->entry); ftpState->state = READING_DATA; /* @@ -2166,11 +2155,7 @@ ftpState); } else if (code == 150) { /* Accept data channel */ - commSetSelect(ftpState->data.fd, - COMM_SELECT_READ, - ftpAcceptDataConnection, - ftpState, - 0); + comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState); /* * Cancel the timeout on the Control socket and establish one * on the data socket Index: squid/src/gopher.c diff -u squid/src/gopher.c:1.19 squid/src/gopher.c:1.10.22.4 --- squid/src/gopher.c:1.19 Tue Sep 24 03:59:15 2002 +++ squid/src/gopher.c Wed Oct 9 06:46:27 2002 @@ -86,6 +86,7 @@ int fd; request_t *req; FwdState *fwdState; + char replybuf[BUFSIZ]; } GopherStateData; static PF gopherStateFree; @@ -97,7 +98,7 @@ static void gopherEndHTML(GopherStateData *); static void gopherToHTML(GopherStateData *, char *inbuf, int len); static PF gopherTimeout; -static PF gopherReadReply; +static IOCB gopherReadReply; static CWCB gopherSendComplete; static PF gopherSendRequest; @@ -614,51 +615,54 @@ /* This will be called when data is ready to be read from fd. Read until * error or connection closed. */ static void -gopherReadReply(int fd, void *data) +gopherReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { GopherStateData *gopherState = data; StoreEntry *entry = gopherState->entry; - char *buf = NULL; - int len; int clen; int bin; - size_t read_sz; + size_t read_sz = BUFSIZ; + int do_next_read = 0; #if DELAY_POOLS delay_id delay_id = delayMostBytesAllowed(entry->mem_obj); #endif + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (flag == COMM_ERR_CLOSING) { + return; + } + + assert(buf == gopherState->replybuf); if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { comm_close(fd); return; } + errno = 0; - buf = memAllocate(MEM_4K_BUF); - read_sz = 4096 - 1; /* leave room for termination */ #if DELAY_POOLS read_sz = delayBytesWanted(delay_id, 1, read_sz); #endif + /* leave one space for \0 in gopherToHTML */ - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, buf, read_sz); - if (len > 0) { - fd_bytes(fd, len, FD_READ); + if (flag == COMM_OK && len > 0) { #if DELAY_POOLS delayBytesIn(delay_id, len); #endif kb_incr(&statCounter.server.all.kbytes_in, len); kb_incr(&statCounter.server.other.kbytes_in, len); } - debug(10, 5) ("gopherReadReply: FD %d read len=%d\n", fd, len); - if (len > 0) { + debug(10, 5) ("gopherReadReply: FD %d read len=%d\n", fd, (int)len); + if (flag == COMM_OK && len > 0) { commSetTimeout(fd, Config.Timeout.read, NULL, NULL); IOStats.Gopher.reads++; for (clen = len - 1, bin = 0; clen; bin++) clen >>= 1; IOStats.Gopher.read_hist[bin]++; } - if (len < 0) { + if (flag != COMM_OK || len < 0) { debug(50, 1) ("gopherReadReply: error reading: %s\n", xstrerror()); if (ignoreErrno(errno)) { - commSetSelect(fd, COMM_SELECT_READ, gopherReadReply, data, 0); + do_next_read = 1; } else if (entry->mem_obj->inmem_hi == 0) { ErrorState *err; err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR); @@ -666,8 +670,10 @@ err->url = xstrdup(storeUrl(entry)); errorAppendEntry(entry, err); comm_close(fd); + do_next_read = 0; } else { comm_close(fd); + do_next_read = 0; } } else if (len == 0 && entry->mem_obj->inmem_hi == 0) { ErrorState *err; @@ -676,6 +682,7 @@ err->url = xstrdup(gopherState->request); errorAppendEntry(entry, err); comm_close(fd); + do_next_read = 0; } else if (len == 0) { /* Connection closed; retrieval done. */ /* flush the rest of data in temp buf if there is one. */ @@ -685,18 +692,17 @@ storeBufferFlush(entry); fwdComplete(gopherState->fwdState); comm_close(fd); + do_next_read = 0; } else { if (gopherState->conversion != NORMAL) { gopherToHTML(data, buf, len); } else { storeAppend(entry, buf, len); } - commSetSelect(fd, - COMM_SELECT_READ, - gopherReadReply, - data, 0); + do_next_read = 1; } - memFree(buf, MEM_4K_BUF); + if (do_next_read) + comm_read(fd, buf, read_sz, gopherReadReply, gopherState); return; } @@ -756,7 +762,8 @@ gopherState->conversion = NORMAL; } /* Schedule read reply. */ - commSetSelect(fd, COMM_SELECT_READ, gopherReadReply, gopherState, 0); + /* XXX this read isn't being bound by delay pools! */ + comm_read(fd, gopherState->replybuf, BUFSIZ, gopherReadReply, gopherState); commSetDefer(fd, fwdCheckDeferRead, entry); if (buf) memFree(buf, MEM_4K_BUF); /* Allocated by gopherSendRequest. */ @@ -846,6 +853,6 @@ } gopherState->fd = fd; gopherState->fwdState = fwdState; - commSetSelect(fd, COMM_SELECT_WRITE, gopherSendRequest, gopherState, 0); + gopherSendRequest(fd, gopherState); commSetTimeout(fd, Config.Timeout.read, gopherTimeout, gopherState); } Index: squid/src/helper.c diff -u squid/src/helper.c:1.26 squid/src/helper.c:1.16.18.4 --- squid/src/helper.c:1.26 Sun Sep 29 05:39:30 2002 +++ squid/src/helper.c Wed Oct 9 06:46:28 2002 @@ -37,8 +37,8 @@ #define HELPER_MAX_ARGS 64 -static PF helperHandleRead; -static PF helperStatefulHandleRead; +static IOCB helperHandleRead; +static IOCB helperStatefulHandleRead; static PF helperServerFree; static PF helperStatefulServerFree; static void Enqueue(helper * hlp, helper_request *); @@ -696,21 +696,23 @@ static void -helperHandleRead(int fd, void *data) +helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { - int len; char *t = NULL; helper_server *srv = data; helper_request *r; helper *hlp = srv->parent; assert(fd == srv->rfd); assert(cbdataReferenceValid(data)); - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset); - fd_bytes(fd, len, FD_READ); + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (flag == COMM_ERR_CLOSING) { + return; + } + debug(84, 5) ("helperHandleRead: %d bytes from %s #%d.\n", - len, hlp->id_name, srv->index + 1); - if (len <= 0) { + (int)len, hlp->id_name, srv->index + 1); + if (flag != COMM_OK || len <= 0) { if (len < 0) debug(50, 1) ("helperHandleRead: FD %d read: %s\n", fd, xstrerror()); comm_close(fd); @@ -722,7 +724,7 @@ if (r == NULL) { /* someone spoke without being spoken to */ debug(84, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n", - hlp->id_name, srv->index + 1, len); + hlp->id_name, srv->index + 1, (int)len); srv->offset = 0; } else if ((t = strchr(srv->buf, '\n'))) { /* end of reply found */ @@ -751,26 +753,28 @@ } else helperKickQueue(hlp); } else { - commSetSelect(srv->rfd, COMM_SELECT_READ, helperHandleRead, srv, 0); + comm_read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset, helperHandleRead, data); } } static void -helperStatefulHandleRead(int fd, void *data) +helperStatefulHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { - int len; char *t = NULL; helper_stateful_server *srv = data; helper_stateful_request *r; statefulhelper *hlp = srv->parent; assert(fd == srv->rfd); assert(cbdataReferenceValid(data)); - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset); - fd_bytes(fd, len, FD_READ); + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (flag == COMM_ERR_CLOSING) { + return; + } + debug(84, 5) ("helperStatefulHandleRead: %d bytes from %s #%d.\n", - len, hlp->id_name, srv->index + 1); - if (len <= 0) { + (int)len, hlp->id_name, srv->index + 1); + if (flag != COMM_OK || len <= 0) { if (len < 0) debug(50, 1) ("helperStatefulHandleRead: FD %d read: %s\n", fd, xstrerror()); comm_close(fd); @@ -782,7 +786,7 @@ if (r == NULL) { /* someone spoke without being spoken to */ debug(84, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes\n", - hlp->id_name, srv->index + 1, len); + hlp->id_name, srv->index + 1, (int)len); srv->offset = 0; } else if ((t = strchr(srv->buf, '\n'))) { /* end of reply found */ @@ -851,7 +855,8 @@ helperStatefulKickQueue(hlp); } } else { - commSetSelect(srv->rfd, COMM_SELECT_READ, helperStatefulHandleRead, srv, 0); + comm_read(srv->rfd, srv->buf + srv->offset, srv->buf_sz - srv->offset, + helperStatefulHandleRead, srv); } } @@ -1023,10 +1028,7 @@ NULL, /* Handler */ NULL, /* Handler-data */ NULL); /* free */ - commSetSelect(srv->rfd, - COMM_SELECT_READ, - helperHandleRead, - srv, 0); + comm_read(srv->rfd, srv->buf + srv->offset, srv->buf_sz - srv->offset, helperHandleRead, srv); debug(84, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n", hlp->id_name, srv->index + 1, (int) strlen(r->buf)); srv->stats.uses++; @@ -1077,10 +1079,8 @@ NULL, /* Handler */ NULL, /* Handler-data */ NULL); /* free */ - commSetSelect(srv->rfd, - COMM_SELECT_READ, - helperStatefulHandleRead, - srv, 0); + comm_read(srv->rfd, srv->buf + srv->offset, srv->buf_sz - srv->offset, + helperStatefulHandleRead, srv); debug(84, 5) ("helperStatefulDispatch: Request sent to %s #%d, %d bytes\n", hlp->id_name, srv->index + 1, (int) strlen(r->buf)); srv->stats.uses++; Index: squid/src/http.c diff -u squid/src/http.c:1.26 squid/src/http.c:1.20.8.7 --- squid/src/http.c:1.26 Fri Oct 4 14:45:32 2002 +++ squid/src/http.c Wed Oct 9 06:46:28 2002 @@ -39,13 +39,15 @@ */ #include "squid.h" +#include "http.h" +CBDATA_TYPE(HttpStateData); static const char *const crlf = "\r\n"; static CWCB httpSendComplete; static CWCB httpSendRequestEntity; -static PF httpReadReply; +static IOCB httpReadReply; static void httpSendRequest(HttpStateData *); static PF httpStateFree; static PF httpTimeout; @@ -542,16 +544,15 @@ * error or connection closed. */ /* XXX this function is too long! */ static void -httpReadReply(int fd, void *data) +httpReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { HttpStateData *httpState = data; - LOCAL_ARRAY(char, buf, SQUID_TCP_SO_RCVBUF); StoreEntry *entry = httpState->entry; const request_t *request = httpState->request; - int len; int bin; int clen; - size_t read_sz; + size_t read_sz = SQUID_TCP_SO_RCVBUF; + int do_next_read = 0; #if DELAY_POOLS delay_id delay_id; @@ -561,21 +562,28 @@ else delay_id = delayMostBytesAllowed(entry->mem_obj); #endif + + assert(buf == httpState->buf); + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (flag == COMM_ERR_CLOSING) { + return; + } + if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { comm_close(fd); - return; + do_next_read = 0; + goto finish; } - /* check if we want to defer reading */ + errno = 0; - read_sz = SQUID_TCP_SO_RCVBUF; + /* prepare the read size for the next read (if any) */ #if DELAY_POOLS read_sz = delayBytesWanted(delay_id, 1, read_sz); #endif - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, buf, read_sz); - debug(11, 5) ("httpReadReply: FD %d: len %d.\n", fd, len); - if (len > 0) { - fd_bytes(fd, len, FD_READ); + + debug(11, 5) ("httpReadReply: FD %d: len %d.\n", fd, (int)len); + if (flag == COMM_OK && len > 0) { #if DELAY_POOLS delayBytesIn(delay_id, len); #endif @@ -587,40 +595,43 @@ clen >>= 1; IOStats.Http.read_hist[bin]++; } - if (!httpState->reply_hdr && len > 0) { + if (!httpState->reply_hdr && flag == COMM_OK && len > 0) { /* Skip whitespace */ while (len > 0 && xisspace(*buf)) xmemmove(buf, buf + 1, len--); if (len == 0) { /* Continue to read... */ - commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0); - return; + do_next_read = 1; + goto finish; } } - if (len < 0) { + if (flag != COMM_OK || len < 0) { debug(50, 2) ("httpReadReply: FD %d: read failure: %s.\n", fd, xstrerror()); if (ignoreErrno(errno)) { - commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0); + do_next_read = 1; } else if (entry->mem_obj->inmem_hi == 0) { ErrorState *err; err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR); err->request = requestLink((request_t *) request); err->xerrno = errno; fwdFail(httpState->fwd, err); + do_next_read = 0; comm_close(fd); } else { + do_next_read = 0; comm_close(fd); } - } else if (len == 0 && entry->mem_obj->inmem_hi == 0) { + } else if (flag == COMM_OK && len == 0 && entry->mem_obj->inmem_hi == 0) { ErrorState *err; err = errorCon(ERR_ZERO_SIZE_OBJECT, HTTP_SERVICE_UNAVAILABLE); err->xerrno = errno; err->request = requestLink((request_t *) request); fwdFail(httpState->fwd, err); httpState->eof = 1; + do_next_read = 0; comm_close(fd); - } else if (len == 0) { + } else if (flag == COMM_OK && len == 0) { /* Connection closed; retrieval done. */ httpState->eof = 1; if (httpState->reply_hdr_state < 2) @@ -632,6 +643,7 @@ */ httpProcessReplyHeader(httpState, buf, len); fwdComplete(httpState->fwd); + do_next_read = 0; comm_close(fd); } else { if (httpState->reply_hdr_state < 2) { @@ -661,7 +673,7 @@ /* yes we have to clear all these! */ commSetDefer(fd, NULL, NULL); commSetTimeout(fd, -1, NULL, NULL); - commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); + do_next_read = 0; #if DELAY_POOLS delayClearNoDelay(fd); #endif @@ -673,9 +685,13 @@ httpStateFree(fd, httpState); } else { /* Wait for EOF condition */ - commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0); + do_next_read = 1; } } + +finish: + if (do_next_read) + comm_read(fd, httpState->buf, read_sz, httpReadReply, httpState); } /* This will be called when request write is complete. Schedule read of @@ -696,18 +712,20 @@ kb_incr(&statCounter.server.all.kbytes_out, size); kb_incr(&statCounter.server.http.kbytes_out, size); } - if (errflag == COMM_ERR_CLOSING) - return; + if (errflag == COMM_ERR_CLOSING) { + return; + } if (errflag) { err = errorCon(ERR_WRITE_ERROR, HTTP_INTERNAL_SERVER_ERROR); err->xerrno = errno; err->request = requestLink(httpState->orig_request); errorAppendEntry(entry, err); comm_close(fd); - return; + return; } else { /* Schedule read reply. */ - commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0); + /* XXX we're not taking into account delay pools on this read! */ + comm_read(fd, httpState->buf, SQUID_TCP_SO_RCVBUF, httpReadReply, httpState); /* * Set the read timeout here because it hasn't been set yet. * We only set the read timeout after the request has been @@ -988,6 +1006,7 @@ debug(11, 3) ("httpStart: \"%s %s\"\n", RequestMethodStr[orig_req->method], storeUrl(fwd->entry)); + CBDATA_INIT_TYPE(HttpStateData); httpState = cbdataAlloc(HttpStateData); storeLockObject(fwd->entry); httpState->fwd = fwd; Index: squid/src/http.h diff -u /dev/null squid/src/http.h:1.1.12.2 --- /dev/null Tue Sep 28 18:31:27 2004 +++ squid/src/http.h Fri Oct 4 23:33:08 2002 @@ -0,0 +1,54 @@ + +/* + * $Id$ + * + * + * SQUID Web Proxy Cache http://www.squid-cache.org/ + * ---------------------------------------------------------- + * + * Squid is the result of efforts by numerous individuals from + * the Internet community; see the CONTRIBUTORS file for full + * details. Many organizations have provided support for Squid's + * development; see the SPONSORS file for full details. Squid is + * Copyrighted (C) 2001 by the Regents of the University of + * California; see the COPYRIGHT file for full details. Squid + * incorporates software developed and/or copyrighted by other + * sources; see the CREDITS file for full details. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. + * + */ + +#ifndef SQUID_HTTP_H +#define SQUID_HTTP_H + +#include "StoreIOBuffer.h" + +struct _HttpStateData { + StoreEntry *entry; + request_t *request; + char *reply_hdr; + size_t reply_hdr_size; + int reply_hdr_state; + peer *_peer; /* peer request made to */ + int eof; /* reached end-of-object? */ + request_t *orig_request; + int fd; + http_state_flags flags; + FwdState *fwd; + char buf[SQUID_TCP_SO_RCVBUF]; +}; + +#endif /* SQUID_HTTP_H */ Index: squid/src/ident.c diff -u squid/src/ident.c:1.10 squid/src/ident.c:1.8.32.3 --- squid/src/ident.c:1.10 Sun Sep 15 04:06:32 2002 +++ squid/src/ident.c Sat Oct 5 03:34:26 2002 @@ -52,9 +52,10 @@ struct sockaddr_in me; struct sockaddr_in my_peer; IdentClient *clients; + char buf[BUFSIZ]; } IdentStateData; -static PF identReadReply; +static IOCB identReadReply; static PF identClose; static PF identTimeout; static CNCB identConnectDone; @@ -125,23 +126,22 @@ ntohs(state->my_peer.sin_port), ntohs(state->me.sin_port)); comm_write_mbuf(fd, mb, NULL, state); - commSetSelect(fd, COMM_SELECT_READ, identReadReply, state, 0); + comm_read(fd, state->buf, BUFSIZ, identReadReply, state); commSetTimeout(fd, Config.Timeout.ident, identTimeout, state); } static void -identReadReply(int fd, void *data) +identReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { IdentStateData *state = data; - LOCAL_ARRAY(char, buf, BUFSIZ); char *ident = NULL; char *t = NULL; - int len = -1; + + assert(buf == state->buf); + buf[0] = '\0'; - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, buf, BUFSIZ - 1); - fd_bytes(fd, len, FD_READ); - if (len <= 0) { + + if (flag != COMM_OK || len <= 0) { comm_close(fd); return; } Index: squid/src/ipc.c diff -u squid/src/ipc.c:1.9 squid/src/ipc.c:1.8.10.2 --- squid/src/ipc.c:1.9 Sat Apr 6 03:34:50 2002 +++ squid/src/ipc.c Wed Oct 9 02:56:05 2002 @@ -34,6 +34,8 @@ */ #include "squid.h" +#include "StoreIOBuffer.h" +#include "comm.h" static const char *hello_string = "hi there\n"; #define HELLO_BUF_SZ 32 @@ -119,10 +121,10 @@ debug(50, 0) ("ipcCreate: pipe: %s\n", xstrerror()); return -1; } - fd_open(prfd = p2c[0], FD_PIPE, "IPC FIFO Parent Read"); - fd_open(cwfd = p2c[1], FD_PIPE, "IPC FIFO Child Write"); - fd_open(crfd = c2p[0], FD_PIPE, "IPC FIFO Child Read"); - fd_open(pwfd = c2p[1], FD_PIPE, "IPC FIFO Parent Write"); + fdc_open(prfd = p2c[0], FD_PIPE, "IPC FIFO Parent Read"); + fdc_open(cwfd = p2c[1], FD_PIPE, "IPC FIFO Child Write"); + fdc_open(crfd = c2p[0], FD_PIPE, "IPC FIFO Child Read"); + fdc_open(pwfd = c2p[1], FD_PIPE, "IPC FIFO Parent Write"); #if HAVE_SOCKETPAIR && defined(AF_UNIX) } else if (type == IPC_UNIX_STREAM) { int fds[2]; @@ -135,16 +137,16 @@ setsockopt(fds[0], SOL_SOCKET, SO_RCVBUF, (void *) &buflen, sizeof(buflen)); setsockopt(fds[1], SOL_SOCKET, SO_SNDBUF, (void *) &buflen, sizeof(buflen)); setsockopt(fds[1], SOL_SOCKET, SO_RCVBUF, (void *) &buflen, sizeof(buflen)); - fd_open(prfd = pwfd = fds[0], FD_PIPE, "IPC UNIX STREAM Parent"); - fd_open(crfd = cwfd = fds[1], FD_PIPE, "IPC UNIX STREAM Parent"); + fdc_open(prfd = pwfd = fds[0], FD_PIPE, "IPC UNIX STREAM Parent"); + fdc_open(crfd = cwfd = fds[1], FD_PIPE, "IPC UNIX STREAM Parent"); } else if (type == IPC_UNIX_DGRAM) { int fds[2]; if (socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) < 0) { debug(50, 0) ("ipcCreate: socketpair: %s\n", xstrerror()); return -1; } - fd_open(prfd = pwfd = fds[0], FD_PIPE, "IPC UNIX DGRAM Parent"); - fd_open(crfd = cwfd = fds[1], FD_PIPE, "IPC UNIX DGRAM Parent"); + fdc_open(prfd = pwfd = fds[0], FD_PIPE, "IPC UNIX DGRAM Parent"); + fdc_open(crfd = cwfd = fds[1], FD_PIPE, "IPC UNIX DGRAM Parent"); #endif } else { assert(IPC_NONE); Index: squid/src/main.c diff -u squid/src/main.c:1.37 squid/src/main.c:1.30.10.3 --- squid/src/main.c:1.37 Wed Oct 2 04:10:47 2002 +++ squid/src/main.c Sun Oct 6 00:12:21 2002 @@ -714,6 +714,7 @@ eventRun(); if ((loop_delay = eventNextTime()) < 0) loop_delay = 0; + comm_calliocallback(); switch (comm_select(loop_delay)) { case COMM_OK: errcount = 0; /* reset if successful */ Index: squid/src/mem.c diff -u squid/src/mem.c:1.21 squid/src/mem.c:1.16.8.3 --- squid/src/mem.c:1.21 Sun Sep 15 04:06:33 2002 +++ squid/src/mem.c Wed Sep 25 23:08:06 2002 @@ -264,7 +264,7 @@ { /* XXX This can be optimized on very large buffers to use realloc() */ /* TODO: if the existing gross size is >= new gross size, do nothing */ - int new_gross_size; + size_t new_gross_size; void *newbuf = memAllocBuf(net_size, &new_gross_size); if (oldbuf) { int data_size = *gross_size; Index: squid/src/pconn.c diff -u squid/src/pconn.c:1.7 squid/src/pconn.c:1.6.32.5 --- squid/src/pconn.c:1.7 Sat Apr 13 16:09:17 2002 +++ squid/src/pconn.c Fri Oct 11 21:43:01 2002 @@ -34,12 +34,15 @@ */ #include "squid.h" +#include "StoreIOBuffer.h" +#include "comm.h" struct _pconn { hash_link hash; /* must be first */ int *fds; int nfds_alloc; int nfds; + char buf[BUFSIZ]; }; typedef struct _pconn pconn; @@ -48,7 +51,7 @@ int client_pconn_hist[PCONN_HIST_SZ]; int server_pconn_hist[PCONN_HIST_SZ]; -static PF pconnRead; +static IOCB pconnRead; static PF pconnTimeout; static const char *pconnKey(const char *host, u_short port); static hash_table *table = NULL; @@ -96,18 +99,37 @@ cbdataFree(p); } -static void -pconnRemoveFD(struct _pconn *p, int fd) +static int +pconnFindFDIndex (struct _pconn *p, int fd) { - int i; - for (i = 0; i < p->nfds; i++) { - if (p->fds[i] == fd) - break; + int result; + for (result = 0; result < p->nfds; ++result) { + if (p->fds[result] == fd) + return result; } + return p->nfds; +} + +static void +pconnRemoveFDByIndex (struct _pconn *p, int index) +{ + for (; index < p->nfds - 1; index++) + p->fds[index] = p->fds[index + 1]; +} + +static void +pconnPreventHandingOutFD(struct _pconn *p, int fd) +{ + int i = pconnFindFDIndex (p, fd); assert(i < p->nfds); debug(48, 3) ("pconnRemoveFD: found FD %d at index %d\n", fd, i); - for (; i < p->nfds - 1; i++) - p->fds[i] = p->fds[i + 1]; + pconnRemoveFDByIndex(p, i); +} + +static void +pconnRemoveFD(struct _pconn *p, int fd) +{ + pconnPreventHandingOutFD(p, fd); if (--p->nfds == 0) pconnDelete(p); } @@ -123,15 +145,15 @@ } static void -pconnRead(int fd, void *data) +pconnRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { - LOCAL_ARRAY(char, buf, 256); struct _pconn *p = data; - int n; assert(table != NULL); - statCounter.syscalls.sock.reads++; - n = FD_READ_METHOD(fd, buf, 256); - debug(48, 3) ("pconnRead: %d bytes from FD %d, %s\n", n, fd, + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (flag == COMM_ERR_CLOSING) { + return; + } + debug(48, 3) ("pconnRead: %d bytes from FD %d, %s\n", (int)len, fd, hashKeyStr(&p->hash)); pconnRemoveFD(p, fd); comm_close(fd); @@ -219,32 +241,52 @@ xfree(old); } p->fds[p->nfds++] = fd; - commSetSelect(fd, COMM_SELECT_READ, pconnRead, p, 0); + comm_read(fd, p->buf, BUFSIZ, pconnRead, p); commSetTimeout(fd, Config.Timeout.pconn, pconnTimeout, p); snprintf(desc, FD_DESC_SZ, "%s idle connection", host); fd_note(fd, desc); debug(48, 3) ("pconnPush: pushed FD %d for %s\n", fd, key); } + +/* + * return a pconn fd for host:port, or -1 if none are available + * + * XXX this routine isn't terribly efficient - if there's a pending + * read event (which signifies the fd will close in the next IO loop!) + * we ignore the FD and move onto the next one. This means, as an example, + * if we have a lot of FDs open to a very popular server and we get a bunch + * of requests JUST as they timeout (say, it shuts down) we'll be wasting + * quite a bit of CPU. Just keep it in mind. + */ int pconnPop(const char *host, u_short port) { struct _pconn *p; hash_link *hptr; int fd = -1; + int i; LOCAL_ARRAY(char, key, SQUIDHOSTNAMELEN + 10); assert(table != NULL); strcpy(key, pconnKey(host, port)); hptr = hash_lookup(table, key); + if (hptr != NULL) { p = (struct _pconn *) hptr; assert(p->nfds > 0); - fd = p->fds[0]; - pconnRemoveFD(p, fd); - commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); - commSetTimeout(fd, -1, NULL, NULL); + for (i = 0; i < p->nfds; i++) { + fd = p->fds[0]; + /* If there are pending read callbacks - we're about to close it, so don't issue it! */ + if (!comm_has_pending_read_callback(fd)) { + pconnRemoveFD(p, fd); + comm_read_cancel(fd, pconnRead, p); + commSetTimeout(fd, -1, NULL, NULL); + return fd; + } + } } - return fd; + /* Nothing (valid!) found */ + return -1; } void Index: squid/src/protos.h diff -u squid/src/protos.h:1.65 squid/src/protos.h:1.49.4.14 --- squid/src/protos.h:1.65 Fri Oct 4 14:45:32 2002 +++ squid/src/protos.h Mon Oct 14 00:01:18 2002 @@ -151,11 +151,20 @@ extern int clientHttpRequestStatus(int fd, clientHttpRequest const *http); extern void clientSetReplyToError(void *, err_type, http_status, method_t, char const *, struct in_addr *, request_t *, char *, auth_user_request_t * auth_user_request); + +/* comm.c */ +extern int comm_existsiocallback(void); +extern void comm_calliocallback(void); +extern void comm_read(int fd, char *buf, int len, IOCB *handler, void *data); + +extern void comm_accept(int fd, IOACB *handler, void *handler_data); +extern int comm_listen(int fd); extern int commSetNonBlocking(int fd); extern int commUnsetNonBlocking(int fd); extern void commSetCloseOnExec(int fd); -extern int comm_accept(int fd, struct sockaddr_in *, struct sockaddr_in *); -extern void comm_close(int fd); +extern int comm_old_accept(int fd, struct sockaddr_in *, struct sockaddr_in *); +extern void _comm_close(int fd, char *file, int line); +#define comm_close(fd) (_comm_close((fd), __FILE__, __LINE__)) extern void comm_reset_close(int fd); #if LINGERING_CLOSE extern void comm_lingering_close(int fd); @@ -163,7 +172,6 @@ extern void commConnectStart(int fd, const char *, u_short, CNCB *, void *); extern int comm_connect_addr(int sock, const struct sockaddr_in *); extern void comm_init(void); -extern int comm_listen(int sock); extern int comm_open(int, int, struct in_addr, u_short port, int, const char *note); extern int comm_openex(int, int, struct in_addr, u_short, int, unsigned char TOS, const char *); extern u_short comm_local_port(int fd); @@ -532,7 +540,6 @@ extern int icpUdpSend(int, const struct sockaddr_in *, icp_common_t *, log_type, int); extern PF icpHandleUdp; extern PF icpUdpSendQueue; -extern PF httpAccept; #ifdef SQUID_SNMP extern PF snmpHandleUdp; Index: squid/src/ssl.c diff -u squid/src/ssl.c:1.17 squid/src/ssl.c:1.13.22.10 --- squid/src/ssl.c:1.17 Fri Oct 4 14:45:32 2002 +++ squid/src/ssl.c Mon Oct 14 00:46:21 2002 @@ -59,16 +59,15 @@ static ERCB sslErrorComplete; static PF sslServerClosed; static PF sslClientClosed; -static PF sslReadClient; -static PF sslReadServer; +static IOCB sslReadClient; +static IOCB sslReadServer; static PF sslTimeout; -static PF sslWriteClient; -static PF sslWriteServer; +static CWCB sslWriteClientDone; +static CWCB sslWriteServerDone; static PSC sslPeerSelectComplete; static void sslStateFree(SslStateData * sslState); static void sslConnected(int fd, void *); static void sslProxyConnected(int fd, void *); -static void sslSetSelect(SslStateData * sslState); #if DELAY_POOLS static DEFER sslDeferServerRead; #endif @@ -129,81 +128,24 @@ } #endif -static void -sslSetSelect(SslStateData * sslState) -{ - size_t read_sz = SQUID_TCP_SO_RCVBUF; - assert(sslState->server.fd > -1 || sslState->client.fd > -1); - if (sslState->client.fd > -1) { - if (sslState->server.len > 0) { - commSetSelect(sslState->client.fd, - COMM_SELECT_WRITE, - sslWriteClient, - sslState, - 0); - } - if (sslState->client.len < read_sz) { - commSetSelect(sslState->client.fd, - COMM_SELECT_READ, - sslReadClient, - sslState, - Config.Timeout.read); - } - } else if (sslState->client.len == 0) { - comm_close(sslState->server.fd); - } - if (sslState->server.fd > -1) { - if (sslState->client.len > 0) { - commSetSelect(sslState->server.fd, - COMM_SELECT_WRITE, - sslWriteServer, - sslState, - 0); - } -#if DELAY_POOLS - /* - * If this was allowed to return 0, there would be a possibility - * of the socket becoming "hung" with data accumulating but no - * write handler (server.len==0) and no read handler (!(0<0)) and - * no data flowing in the other direction. Hence the argument of - * 1 as min. - */ - read_sz = delayBytesWanted(sslState->delay_id, 1, read_sz); -#endif - if (sslState->server.len < read_sz) { - /* Have room to read more */ - commSetSelect(sslState->server.fd, - COMM_SELECT_READ, - sslReadServer, - sslState, - Config.Timeout.read); - } - } else if (sslState->client.fd == -1) { - /* client already closed, nothing more to do */ - } else if (sslState->server.len == 0) { - comm_close(sslState->client.fd); - } -} /* Read from server side and queue it for writing to the client */ static void -sslReadServer(int fd, void *data) +sslReadServer(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) { SslStateData *sslState = data; - int len; - size_t read_sz = SQUID_TCP_SO_RCVBUF - sslState->server.len; + assert(fd == sslState->server.fd); - debug(26, 3) ("sslReadServer: FD %d, reading %d bytes at offset %d\n", - fd, (int) read_sz, sslState->server.len); errno = 0; #if DELAY_POOLS read_sz = delayBytesWanted(sslState->delay_id, 1, read_sz); #endif - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, sslState->server.buf + sslState->server.len, read_sz); - debug(26, 3) ("sslReadServer: FD %d, read %d bytes\n", fd, len); + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (errcode == COMM_ERR_CLOSING) { + return; + } + debug(26, 3) ("sslReadServer: FD %d, read %d bytes\n", fd, (int)len); if (len > 0) { - fd_bytes(fd, len, FD_READ); #if DELAY_POOLS delayBytesIn(sslState->delay_id, len); #endif @@ -219,29 +161,29 @@ comm_close(fd); } else if (len == 0) { comm_close(sslState->server.fd); - } - if (cbdataReferenceValid(sslState)) - sslSetSelect(sslState); + /* Only close the remote end if we've finished queueing data to it */ + if (sslState->server.len == 0 && sslState->client.fd != -1) { + comm_close(sslState->client.fd); + } + } else if (cbdataReferenceValid(sslState)) + comm_write(sslState->client.fd, sslState->server.buf, len, sslWriteClientDone, sslState, NULL); cbdataInternalUnlock(sslState); /* ??? */ } /* Read from client side and queue it for writing to the server */ static void -sslReadClient(int fd, void *data) +sslReadClient(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) { SslStateData *sslState = data; - int len; assert(fd == sslState->client.fd); - debug(26, 3) ("sslReadClient: FD %d, reading %d bytes at offset %d\n", - fd, SQUID_TCP_SO_RCVBUF - sslState->client.len, - sslState->client.len); - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, - sslState->client.buf + sslState->client.len, - SQUID_TCP_SO_RCVBUF - sslState->client.len); - debug(26, 3) ("sslReadClient: FD %d, read %d bytes\n", fd, len); + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (errcode == COMM_ERR_CLOSING) { + return; + } + + debug(26, 3) ("sslReadClient: FD %d, read %d bytes\n", fd, (int) len); if (len > 0) { - fd_bytes(fd, len, FD_READ); kb_incr(&statCounter.client_http.kbytes_in, len); sslState->client.len += len; } @@ -249,100 +191,106 @@ if (len < 0) { int level = 1; #ifdef ECONNRESET - if (errno == ECONNRESET) + if (xerrno == ECONNRESET) level = 2; #endif - if (ignoreErrno(errno)) + if (ignoreErrno(xerrno)) level = 3; + /* XXX xstrerror() should be changed to take errno as an arg! */ + errno = xerrno; debug(50, level) ("sslReadClient: FD %d: read failure: %s\n", fd, xstrerror()); - if (!ignoreErrno(errno)) + if (!ignoreErrno(xerrno)) comm_close(fd); } else if (len == 0) { - comm_close(fd); - } - if (cbdataReferenceValid(sslState)) - sslSetSelect(sslState); + comm_close(sslState->client.fd); + /* Only close the remote end if we've finished queueing data to it */ + if (sslState->client.len == 0 && sslState->server.fd != -1) { + comm_close(sslState->server.fd); + } + } else if (cbdataReferenceValid(sslState)) + comm_write(sslState->server.fd, sslState->client.buf, len, sslWriteServerDone, sslState, NULL); cbdataInternalUnlock(sslState); /* ??? */ } /* Writes data from the client buffer to the server side */ static void -sslWriteServer(int fd, void *data) +sslWriteServerDone(int fd, char *buf, size_t len, comm_err_t flag, void *data) { SslStateData *sslState = data; - int len; assert(fd == sslState->server.fd); - debug(26, 3) ("sslWriteServer: FD %d, %d bytes to write\n", - fd, sslState->client.len); - statCounter.syscalls.sock.writes++; - len = FD_WRITE_METHOD(fd, - sslState->client.buf, - sslState->client.len); - debug(26, 3) ("sslWriteServer: FD %d, %d bytes written\n", fd, len); + debug(26, 3) ("sslWriteServer: FD %d, %d bytes written\n", fd, (int)len); + /* Valid data */ if (len > 0) { - fd_bytes(fd, len, FD_WRITE); kb_incr(&statCounter.server.all.kbytes_out, len); kb_incr(&statCounter.server.other.kbytes_out, len); - assert(len <= sslState->client.len); - sslState->client.len -= len; - if (sslState->client.len > 0) { - /* we didn't write the whole thing */ - xmemmove(sslState->client.buf, - sslState->client.buf + len, - sslState->client.len); - } + assert(len == sslState->client.len); + sslState->client.len = 0; + } + /* EOF */ + if (len == 0) { + comm_close(sslState->server.fd); + return; + } + /* If the other end has closed, so should we */ + if (sslState->client.fd == -1) { + comm_close(sslState->server.fd); + return; } cbdataInternalLock(sslState); /* ??? should be locked by the caller... */ + /* Error? */ if (len < 0) { debug(50, ignoreErrno(errno) ? 3 : 1) ("sslWriteServer: FD %d: write failure: %s.\n", fd, xstrerror()); if (!ignoreErrno(errno)) comm_close(fd); } - if (cbdataReferenceValid(sslState)) - sslSetSelect(sslState); + if (cbdataReferenceValid(sslState)) { + assert(sslState->client.len == 0); + comm_read(sslState->client.fd, sslState->client.buf, SQUID_TCP_SO_RCVBUF, + sslReadClient, sslState); + } cbdataInternalUnlock(sslState); /* ??? */ } /* Writes data from the server buffer to the client side */ static void -sslWriteClient(int fd, void *data) +sslWriteClientDone(int fd, char *buf, size_t len, comm_err_t flag, void *data) { SslStateData *sslState = data; - int len; assert(fd == sslState->client.fd); - debug(26, 3) ("sslWriteClient: FD %d, %d bytes to write\n", - fd, sslState->server.len); - statCounter.syscalls.sock.writes++; - len = FD_WRITE_METHOD(fd, - sslState->server.buf, - sslState->server.len); - debug(26, 3) ("sslWriteClient: FD %d, %d bytes written\n", fd, len); + debug(26, 3) ("sslWriteClient: FD %d, %d bytes written\n", fd, (int)len); if (len > 0) { - fd_bytes(fd, len, FD_WRITE); kb_incr(&statCounter.client_http.kbytes_out, len); - assert(len <= sslState->server.len); - sslState->server.len -= len; + assert(len == sslState->server.len); + sslState->server.len =0; /* increment total object size */ if (sslState->size_ptr) *sslState->size_ptr += len; - if (sslState->server.len > 0) { - /* we didn't write the whole thing */ - xmemmove(sslState->server.buf, - sslState->server.buf + len, - sslState->server.len); - } + } + /* EOF */ + if (len == 0) { + comm_close(sslState->client.fd); + return; + } + /* If the other end has closed, so should we */ + if (sslState->server.fd == -1) { + comm_close(sslState->client.fd); + return; } cbdataInternalLock(sslState); /* ??? should be locked by the caller... */ + /* Error? */ if (len < 0) { debug(50, ignoreErrno(errno) ? 3 : 1) ("sslWriteClient: FD %d: write failure: %s.\n", fd, xstrerror()); if (!ignoreErrno(errno)) comm_close(fd); } - if (cbdataReferenceValid(sslState)) - sslSetSelect(sslState); + if (cbdataReferenceValid(sslState)) { + assert(sslState->server.len == 0); + comm_read(sslState->server.fd, sslState->server.buf, SQUID_TCP_SO_RCVBUF, + sslReadServer, sslState); + } cbdataInternalUnlock(sslState); /* ??? */ } @@ -358,14 +306,51 @@ } static void +sslConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, void *data) +{ + SslStateData *sslState = data; + if (flag != COMM_OK) { + sslErrorComplete(fd, data, 0); + return; + } + if (cbdataReferenceValid(sslState)) { + assert(sslState->server.len == 0); + comm_read(sslState->server.fd, sslState->server.buf, SQUID_TCP_SO_RCVBUF, + sslReadServer, sslState); + comm_read(sslState->client.fd, sslState->client.buf, SQUID_TCP_SO_RCVBUF, + sslReadClient, sslState); + } +} + + +/* + * handle the write completion from a proxy request to an upstream proxy + */ +static void +sslProxyConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, void *data) +{ + SslStateData *sslState = data; + if (flag != COMM_OK) { + sslErrorComplete(fd, data, 0); + return; + } + if (cbdataReferenceValid(sslState)) { + assert(sslState->server.len == 0); + comm_read(sslState->server.fd, sslState->server.buf, SQUID_TCP_SO_RCVBUF, + sslReadServer, sslState); + comm_read(sslState->client.fd, sslState->client.buf, SQUID_TCP_SO_RCVBUF, + sslReadClient, sslState); + } +} + +static void sslConnected(int fd, void *data) { SslStateData *sslState = data; debug(26, 3) ("sslConnected: FD %d sslState=%p\n", fd, sslState); *sslState->status_ptr = HTTP_OK; - xstrncpy(sslState->server.buf, conn_established, SQUID_TCP_SO_RCVBUF); - sslState->server.len = strlen(conn_established); - sslSetSelect(sslState); + comm_write(sslState->client.fd, conn_established, strlen(conn_established), + sslConnectedWriteDone, sslState, NULL); } static void @@ -417,8 +402,9 @@ } else { if (sslState->servers->_peer) sslProxyConnected(sslState->server.fd, sslState); - else + else { sslConnected(sslState->server.fd, sslState); + } commSetTimeout(sslState->server.fd, Config.Timeout.read, sslTimeout, @@ -549,15 +535,13 @@ httpHeaderClean(&hdr_out); packerClean(&p); memBufAppend(&mb, "\r\n", 2); - xstrncpy(sslState->client.buf, mb.buf, SQUID_TCP_SO_RCVBUF); - debug(26, 3) ("sslProxyConnected: Sending {%s}\n", sslState->client.buf); - sslState->client.len = mb.size; - memBufClean(&mb); + + comm_write_mbuf(sslState->server.fd, mb, sslProxyConnectedWriteDone, sslState); + commSetTimeout(sslState->server.fd, Config.Timeout.read, sslTimeout, sslState); - sslSetSelect(sslState); } static void Index: squid/src/stmem.c diff -u squid/src/stmem.c:1.8 squid/src/stmem.c:1.7.22.2 --- squid/src/stmem.c:1.8 Sun Sep 15 04:06:33 2002 +++ squid/src/stmem.c Wed Sep 25 23:08:08 2002 @@ -133,7 +133,7 @@ char *ptr_to_buf = NULL; int bytes_from_this_packet = 0; int bytes_into_this_packet = 0; - debug(19, 6) ("memCopy: offset %ld: size %u\n", (long int) offset, size); + debug(19, 6) ("memCopy: offset %ld: size %d\n", (long int) offset, (int)size); if (p == NULL) return 0; /* RC: the next assert is useless */ Index: squid/src/store_client.c diff -u squid/src/store_client.c:1.16 squid/src/store_client.c:1.11.4.6 --- squid/src/store_client.c:1.16 Thu Sep 26 14:46:06 2002 +++ squid/src/store_client.c Wed Oct 9 01:10:55 2002 @@ -192,7 +192,7 @@ debug(20, 3) ("storeClientCopy: %s, from %lu, for length %d, cb %p, cbdata %p\n", storeKeyText(e->hash.key), (unsigned long) copyInto.offset, - copyInto.length, + (int) copyInto.length, callback, data); assert(sc != NULL); Index: squid/src/structs.h diff -u squid/src/structs.h:1.70 squid/src/structs.h:1.51.4.6 --- squid/src/structs.h:1.70 Thu Sep 26 14:46:06 2002 +++ squid/src/structs.h Sun Oct 6 03:20:29 2002 @@ -973,20 +973,6 @@ unsigned int only_if_cached:1; }; -struct _HttpStateData { - StoreEntry *entry; - request_t *request; - char *reply_hdr; - size_t reply_hdr_size; - int reply_hdr_state; - peer *_peer; /* peer request made to */ - int eof; /* reached end-of-object? */ - request_t *orig_request; - int fd; - http_state_flags flags; - FwdState *fwd; -}; - struct _icpUdpData { struct sockaddr_in address; void *msg; @@ -1115,6 +1101,9 @@ int n; time_t until; } defer; + struct { + int readMoreRequests:1; + } flags; }; struct _ipcache_addrs { Index: squid/src/typedefs.h diff -u squid/src/typedefs.h:1.29 squid/src/typedefs.h:1.25.22.5 --- squid/src/typedefs.h:1.29 Tue Sep 24 03:59:17 2002 +++ squid/src/typedefs.h Thu Oct 3 00:29:21 2002 @@ -200,6 +200,9 @@ typedef void CWCB(int fd, char *, size_t size, comm_err_t flag, void *data); typedef void CNCB(int fd, comm_err_t status, void *); +typedef void IOCB(int fd, char *, size_t size, comm_err_t flag, int xerrno, void *data); +typedef void IOACB(int fd, int nfd, struct sockaddr_in *me, struct sockaddr_in *pn, + comm_err_t flag, int xerrno, void *data); typedef void FREE(void *); typedef void CBDUNL(void *); Index: squid/src/urn.c diff -u squid/src/urn.c:1.18 squid/src/urn.c:1.12.8.3 --- squid/src/urn.c:1.18 Tue Sep 24 03:59:17 2002 +++ squid/src/urn.c Wed Sep 25 23:08:08 2002 @@ -197,7 +197,7 @@ char *buf = urnState->reqbuf; StoreIOBuffer tempBuffer; - debug(52, 3) ("urnHandleReply: Called with size=%u.\n", result.length); + debug(52, 3) ("urnHandleReply: Called with size=%u.\n", (unsigned int)result.length); if (EBIT_TEST(urlres_e->flags, ENTRY_ABORTED)) { goto error; } Index: squid/src/wais.c diff -u squid/src/wais.c:1.9 squid/src/wais.c:1.8.22.4 --- squid/src/wais.c:1.9 Sun Sep 15 04:06:34 2002 +++ squid/src/wais.c Sat Oct 12 20:11:56 2002 @@ -43,11 +43,12 @@ char url[MAX_URL]; request_t *request; FwdState *fwd; + char buf[BUFSIZ]; } WaisStateData; static PF waisStateFree; static PF waisTimeout; -static PF waisReadReply; +static IOCB waisReadReply; static CWCB waisSendComplete; static PF waisSendRequest; @@ -81,53 +82,53 @@ /* This will be called when data is ready to be read from fd. Read until * error or connection closed. */ static void -waisReadReply(int fd, void *data) +waisReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { WaisStateData *waisState = data; - LOCAL_ARRAY(char, buf, 4096); StoreEntry *entry = waisState->entry; - int len; int clen; int bin; size_t read_sz; #if DELAY_POOLS delay_id delay_id = delayMostBytesAllowed(entry->mem_obj); #endif + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (flag == COMM_ERR_CLOSING) { + return; + } + if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { comm_close(fd); return; } errno = 0; - read_sz = 4096; -#if DELAY_POOLS - read_sz = delayBytesWanted(delay_id, 1, read_sz); -#endif - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, buf, read_sz); - if (len > 0) { - fd_bytes(fd, len, FD_READ); + read_sz = BUFSIZ; + if (flag == COMM_OK && len > 0) { #if DELAY_POOLS delayBytesIn(delay_id, len); #endif kb_incr(&statCounter.server.all.kbytes_in, len); kb_incr(&statCounter.server.other.kbytes_in, len); } - debug(24, 5) ("waisReadReply: FD %d read len:%d\n", fd, len); - if (len > 0) { +#if DELAY_POOLS + read_sz = delayBytesWanted(delay_id, 1, read_sz); +#endif + debug(24, 5) ("waisReadReply: FD %d read len:%d\n", fd, (int)len); + if (flag == COMM_OK && len > 0) { commSetTimeout(fd, Config.Timeout.read, NULL, NULL); IOStats.Wais.reads++; for (clen = len - 1, bin = 0; clen; bin++) clen >>= 1; IOStats.Wais.read_hist[bin]++; } - if (len < 0) { + if (flag != COMM_OK || len < 0) { debug(50, 1) ("waisReadReply: FD %d: read failure: %s.\n", fd, xstrerror()); - if (ignoreErrno(errno)) { + if (ignoreErrno(xerrno)) { /* reinstall handlers */ /* XXX This may loop forever */ - commSetSelect(fd, COMM_SELECT_READ, - waisReadReply, waisState, 0); + comm_read(fd, waisState->buf, read_sz, waisReadReply, waisState); } else { ErrorState *err; EBIT_CLR(entry->flags, ENTRY_CACHABLE); @@ -138,24 +139,21 @@ errorAppendEntry(entry, err); comm_close(fd); } - } else if (len == 0 && entry->mem_obj->inmem_hi == 0) { + } else if (flag == COMM_OK && len == 0 && entry->mem_obj->inmem_hi == 0) { ErrorState *err; err = errorCon(ERR_ZERO_SIZE_OBJECT, HTTP_SERVICE_UNAVAILABLE); err->xerrno = errno; err->request = requestLink(waisState->request); errorAppendEntry(entry, err); comm_close(fd); - } else if (len == 0) { + } else if (flag == COMM_OK && len == 0) { /* Connection closed; retrieval done. */ entry->expires = squid_curtime; fwdComplete(waisState->fwd); comm_close(fd); } else { storeAppend(entry, buf, len); - commSetSelect(fd, - COMM_SELECT_READ, - waisReadReply, - waisState, 0); + comm_read(fd, waisState->buf, read_sz, waisReadReply, waisState); } } @@ -184,10 +182,7 @@ comm_close(fd); } else { /* Schedule read reply. */ - commSetSelect(fd, - COMM_SELECT_READ, - waisReadReply, - waisState, 0); + comm_read(fd, waisState->buf, BUFSIZ, waisReadReply, waisState); commSetDefer(fd, fwdCheckDeferRead, entry); } } @@ -240,6 +235,6 @@ waisState->fwd = fwd; comm_add_close_handler(waisState->fd, waisStateFree, waisState); storeLockObject(entry); - commSetSelect(fd, COMM_SELECT_WRITE, waisSendRequest, waisState, 0); commSetTimeout(fd, Config.Timeout.read, waisTimeout, waisState); + waisSendRequest(fd, waisState); } Index: squid/src/whois.c diff -u squid/src/whois.c:1.9 squid/src/whois.c:1.7.32.3 --- squid/src/whois.c:1.9 Thu Aug 22 05:30:09 2002 +++ squid/src/whois.c Wed Oct 9 06:46:29 2002 @@ -41,11 +41,12 @@ StoreEntry *entry; request_t *request; FwdState *fwd; + char buf[BUFSIZ]; } WhoisState; static PF whoisClose; static PF whoisTimeout; -static PF whoisReadReply; +static IOCB whoisReadReply; /* PUBLIC */ @@ -69,7 +70,7 @@ buf = xmalloc(l); snprintf(buf, l, "%s\r\n", strBuf(p->request->urlpath) + 1); comm_write(fd, buf, strlen(buf), NULL, p, xfree); - commSetSelect(fd, COMM_SELECT_READ, whoisReadReply, p, 0); + comm_read(fd, p->buf, BUFSIZ, whoisReadReply, p); commSetTimeout(fd, Config.Timeout.read, whoisTimeout, p); } @@ -84,39 +85,43 @@ } static void -whoisReadReply(int fd, void *data) +whoisReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { WhoisState *p = data; StoreEntry *entry = p->entry; - char *buf = memAllocate(MEM_4K_BUF); MemObject *mem = entry->mem_obj; - int len; - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, buf, 4095); + int do_next_read = 0; + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (flag == COMM_ERR_CLOSING) { + return; + } + buf[len] = '\0'; - debug(75, 3) ("whoisReadReply: FD %d read %d bytes\n", fd, len); + debug(75, 3) ("whoisReadReply: FD %d read %d bytes\n", fd, (int)len); debug(75, 5) ("{%s}\n", buf); - if (len > 0) { + if (flag == COMM_OK && len > 0) { if (0 == mem->inmem_hi) mem->reply->sline.status = HTTP_OK; - fd_bytes(fd, len, FD_READ); kb_incr(&statCounter.server.all.kbytes_in, len); kb_incr(&statCounter.server.http.kbytes_in, len); storeAppend(entry, buf, len); - commSetSelect(fd, COMM_SELECT_READ, whoisReadReply, p, Config.Timeout.read); - } else if (len < 0) { + do_next_read = 1; + } else if (flag != COMM_OK || len < 0) { debug(50, 2) ("whoisReadReply: FD %d: read failure: %s.\n", fd, xstrerror()); if (ignoreErrno(errno)) { - commSetSelect(fd, COMM_SELECT_READ, whoisReadReply, p, Config.Timeout.read); + do_next_read = 1; } else if (mem->inmem_hi == 0) { ErrorState *err; err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR); err->xerrno = errno; fwdFail(p->fwd, err); comm_close(fd); + do_next_read = 0; } else { comm_close(fd); + do_next_read = 0; } } else { storeTimestampsSet(entry); @@ -126,8 +131,10 @@ fwdComplete(p->fwd); debug(75, 3) ("whoisReadReply: Done: %s\n", storeUrl(entry)); comm_close(fd); + do_next_read = 0; } - memFree(buf, MEM_4K_BUF); + if (do_next_read) + comm_read(fd, p->buf, BUFSIZ, whoisReadReply, p); } static void