--------------------- PatchSet 5133 Date: 2002/09/26 06:08:02 Author: adri Branch: commloops Tag: (none) Log: * commit code to fix compilation problems under OSX 10.2.1. Grr gcc3.1 . * add in the io callback queue magic * add in an implementation of comm_read() which uses said io callback queue magic Problems: * since ipc.c creates its own filedescriptors (through pipe()) we don't ever see a comm_open(). Perhaps we should move the pipe() calls into comm.c to properly track stuff? * (Linked to the above) .. any comm IO done on pipe()ed FDs from ipc.c is going to barf it. It'll have to be fixed before this is committed to -HEAD. * the type changes I've committed because of OSX10.2.1/gcc3.1 are valid, but I've *cough* fudged a few. I'll revisit these as well before this goes into -HEAD. * comm_read() hasn't yet been tested, but this implementation is basically a 1:1 implementation I have in my own network IO library so I'll be surprised if it _doesn't_ work.. Members: src/asn.c:1.16.8.2->1.16.8.3 src/client_side_reply.c:1.7.2.1->1.7.2.2 src/comm.c:1.21.4.3->1.21.4.4 src/main.c:1.30.10.1->1.30.10.2 src/mem.c:1.16.8.2->1.16.8.3 src/protos.h:1.49.4.3->1.49.4.4 src/stmem.c:1.7.22.1->1.7.22.2 src/store_client.c:1.11.4.2->1.11.4.3 src/typedefs.h:1.25.22.3->1.25.22.4 src/urn.c:1.12.8.2->1.12.8.3 Index: squid/src/asn.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/asn.c,v retrieving revision 1.16.8.2 retrieving revision 1.16.8.3 diff -u -r1.16.8.2 -r1.16.8.3 --- squid/src/asn.c 26 Sep 2002 04:12:14 -0000 1.16.8.2 +++ squid/src/asn.c 26 Sep 2002 06:08:02 -0000 1.16.8.3 @@ -1,6 +1,6 @@ /* - * $Id: asn.c,v 1.16.8.2 2002/09/26 04:12:14 adri Exp $ + * $Id: asn.c,v 1.16.8.3 2002/09/26 06:08:02 adri Exp $ * * DEBUG: section 53 AS Number handling * AUTHOR: Duane Wessels, Kostas Anagnostakis @@ -231,7 +231,7 @@ char *buf = asState->reqbuf; int leftoversz = -1; - debug(53, 3) ("asHandleReply: Called with size=%u\n", result.length); + debug(53, 3) ("asHandleReply: Called with size=%u\n", (unsigned int)result.length); debug(53, 3) ("asHandleReply: buffer='%s'\n", buf); /* First figure out whether we should abort the request */ @@ -243,7 +243,7 @@ asStateFree(asState); return; } else if (result.flags.error) { - debug(53, 1) ("asHandleReply: Called with Error set and size=%u\n", result.length); + debug(53, 1) ("asHandleReply: Called with Error set and size=%u\n", (unsigned int)result.length); asStateFree(asState); return; } else if (HTTP_OK != e->mem_obj->reply->sline.status) { Index: squid/src/client_side_reply.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/Attic/client_side_reply.c,v retrieving revision 1.7.2.1 retrieving revision 1.7.2.2 diff -u -r1.7.2.1 -r1.7.2.2 --- squid/src/client_side_reply.c 26 Sep 2002 04:12:18 -0000 1.7.2.1 +++ squid/src/client_side_reply.c 26 Sep 2002 06:08:03 -0000 1.7.2.2 @@ -1,6 +1,6 @@ /* - * $Id: client_side_reply.c,v 1.7.2.1 2002/09/26 04:12:18 adri Exp $ + * $Id: client_side_reply.c,v 1.7.2.2 2002/09/26 06:08:03 adri Exp $ * * DEBUG: section 88 Client-side Reply Routines * AUTHOR: Robert Collins (Originally Duane Wessels in client_side.c) @@ -484,7 +484,7 @@ StoreEntry *e = http->entry; MemObject *mem; request_t *r = http->request; - debug(88, 3) ("clientCacheHit: %s, %ud bytes\n", http->uri, result.length); + debug(88, 3) ("clientCacheHit: %s, %u bytes\n", http->uri, (unsigned int)result.length); if (http->entry == NULL) { debug(88, 3) ("clientCacheHit: request aborted\n"); return; @@ -1416,7 +1416,7 @@ context->flags.storelogiccomplete = 1; debug(88, 5) ("clientSendMoreData: %s, %d bytes (%u new bytes)\n", - http->uri, (int) size, result.length); + http->uri, (int) size, (unsigned int)result.length); assert(size <= HTTP_REQBUF_SZ || context->flags.headersSent); assert(http->request != NULL); /* ESI TODO: remove this assert once everything is stable */ @@ -1433,10 +1433,8 @@ if (fd != -1) comm_reset_close(fd); return; - } else if ( /* aborted request */ - (entry && EBIT_TEST(entry->flags, ENTRY_ABORTED)) || - /* Upstream read error */ (result.flags.error) || - /* Upstream EOF */ (body_size == 0)) { + } else if ((entry && EBIT_TEST(entry->flags, ENTRY_ABORTED)) || + (result.flags.error) || body_size == 0) { /* call clientWriteComplete so the client socket gets closed */ /* We call into the stream, because we don't know that there is a * client socket! @@ -1488,7 +1486,9 @@ } context->headers_sz = rep->hdr_sz; body_size = size - rep->hdr_sz; +#if 0 assert(body_size >= 0); +#endif body_buf = buf + rep->hdr_sz; debug(88, 3) Index: squid/src/comm.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/comm.c,v retrieving revision 1.21.4.3 retrieving revision 1.21.4.4 diff -u -r1.21.4.3 -r1.21.4.4 --- squid/src/comm.c 26 Sep 2002 04:12:18 -0000 1.21.4.3 +++ squid/src/comm.c 26 Sep 2002 06:08:04 -0000 1.21.4.4 @@ -1,6 +1,6 @@ /* - * $Id: comm.c,v 1.21.4.3 2002/09/26 04:12:18 adri Exp $ + * $Id: comm.c,v 1.21.4.4 2002/09/26 06:08:04 adri Exp $ * * DEBUG: section 5 Socket Functions * AUTHOR: Harvest Derived @@ -74,8 +74,202 @@ static int commRetryConnect(ConnectStateData * cs); CBDATA_TYPE(ConnectStateData); +struct _fdc_t { + int active; + dlink_list CommCallbackList; + struct { + char *buf; + int size; + IOCB *handler; + void *handler_data; + } read; +}; +typedef struct _fdc_t fdc_t; + +struct _CommCallbackData { + dlink_node fd_node; + dlink_node h_node; + int fd; + char *buf; + int retval; + IOCB *callback; + void *callback_data; + comm_err_t errcode; + int xerrno; + int seqnum; +}; +typedef struct _CommCallbackData CommCallbackData; + static MemPool *comm_write_pool = NULL; static MemPool *conn_close_pool = NULL; +static MemPool *comm_callback_pool = NULL; +static fdc_t *fdc_table = NULL; +dlink_list CommCallbackList; +static int CommCallbackSeqnum = 1; + + +/* New and improved stuff */ + +/* + * return whether there are entries in the callback queue + */ +int +comm_existsiocallback(void) +{ + return CommCallbackList.head == NULL; +} + +/* + * add an IO callback + * + * IO callbacks are added when we want to notify someone that some IO + * has finished but we don't want to risk re-entering a non-reentrant + * code block. + */ +static void +comm_addiocallback(int fd, IOCB *callback, size_t retval, comm_err_t errcode, + int xerrno, void *callback_data) +{ + CommCallbackData *cio; + + assert(fdc_table[fd].active == 1); + + /* Allocate a new struct */ + cio = memPoolAlloc(comm_callback_pool); + + /* Throw our data into it */ + cio->fd = fd; + cio->retval = retval; + cio->xerrno = xerrno; + cio->errcode = errcode; + cio->callback = callback; + cio->callback_data = callback_data; + cio->seqnum = CommCallbackSeqnum; + + /* Add it to the end of the list */ + dlinkAddTail(cio, &(cio->h_node), &CommCallbackList); + + /* and add it to the end of the fd list */ + dlinkAddTail(cio, &(cio->fd_node), &(fdc_table[fd].CommCallbackList)); + +} + + +/* + * call the IO callbacks + * + * This should be called before comm_select() so code can attempt to + * initiate some IO. + * + * When io callbacks are added, they are added with the current + * sequence number. The sequence number is incremented in this routine - + * since callbacks are added to the _tail_ of the list, when we hit a + * callback with a seqnum _not_ what it was when we entered this routine, + * we can stop. + */ +void +comm_calliocallback(void) +{ + CommCallbackData *cio; + dlink_node *node; + int oldseqnum = CommCallbackSeqnum; + + /* Call our callbacks until we hit NULL or the seqnum changes */ + while (CommCallbackList.head != NULL) { + node = CommCallbackList.head; + cio = node->data; + + /* If seqnum isn't the same, its time to die */ + if (cio->seqnum != oldseqnum) + break; /* we've hit newly-added events */ + + assert(fdc_table[cio->fd].active == 1); + + dlinkDelete(&cio->h_node, &CommCallbackList); + dlinkDelete(&cio->fd_node, &(fdc_table[cio->fd].CommCallbackList)); + cio->callback(cio->fd, cio->buf, cio->retval, cio->errcode, cio->xerrno, + cio->callback_data); + memPoolFree(comm_callback_pool, cio); + } +} + + +/* + * Queue a callback + */ +static void +comm_read_callback(int fd, int retval, comm_err_t errcode, int xerrno) +{ + fdc_t *Fc = &fdc_table[fd]; + + assert(Fc->read.handler != NULL); + + comm_addiocallback(fd, Fc->read.handler, retval, errcode, xerrno, + Fc->read.handler_data); + Fc->read.handler = NULL; + Fc->read.handler_data = NULL; +} + +/* + * Attempt a read + * + * If the read attempt succeeds or fails, call the callback. + * Else, wait for another IO notification. + */ +static void +comm_read_try(int fd, void *data) +{ + fdc_t *Fc = &fdc_table[fd]; + int retval; + + /* make sure we actually have a callback */ + assert(Fc->read.handler != NULL); + + /* Attempt a read */ + retval = read(fd, Fc->read.buf, Fc->read.size); + if (retval < 0 && !ignoreErrno(errno)) { + comm_read_callback(fd, -1, COMM_ERROR, errno); + return; + }; + + /* See if we read anything */ + /* Note - read 0 == socket EOF, which is a valid read */ + if (retval >= 0) { + comm_read_callback(fd, retval, COMM_OK, 0); + return; + } + + /* Nope, register for some more IO */ + commSetSelect(fd, COMM_SELECT_READ, comm_read_try, NULL, 0); +} + +/* + * Queue a read. handler/handler_data are called when the read + * completes, on error, or on file descriptor close. + */ +void +comm_read(int fd, char *buf, int size, IOCB *handler, void *handler_data) +{ + /* Make sure we're not reading anything */ + assert(fdc_table[fd].read.handler == NULL); + + /* Queue a read */ + fdc_table[fd].read.buf = buf; + fdc_table[fd].read.size = size; + fdc_table[fd].read.handler = handler; + fdc_table[fd].read.handler_data = handler_data; + +#if OPTIMISTIC_IO + comm_read_try(fd, NULL); +#else + /* Register intrest in a FD read */ + commSetSelect(fd, COMM_SELECT_READ, comm_read_try, NULL, 0); +#endif +} + + + +/* Older stuff */ static void CommWriteStateCallbackAndFree(int fd, comm_err_t code) @@ -205,6 +399,8 @@ /* update fdstat */ debug(5, 5) ("comm_open: FD %d is a new socket\n", new_socket); fd_open(new_socket, FD_SOCKET, note); + assert(fdc_table[new_socket].active == 0); + fdc_table[new_socket].active = 1; F = &fd_table[new_socket]; F->local_addr = addr; F->tos = tos; @@ -627,10 +823,20 @@ comm_close(fd); } + +/* + * Close the socket fd. + * + * + call write handlers with ERR_CLOSING + * + call read handlers with ERR_CLOSING + * + call closing handlers + */ void comm_close(int fd) { fde *F = NULL; + dlink_node *node; + CommCallbackData *cio; debug(5, 5) ("comm_close: FD %d\n", fd); assert(fd >= 0); @@ -642,6 +848,8 @@ if (shutting_down && (!F->flags.open || F->type == FD_FILE)) return; assert(F->flags.open); + /* The following fails because ipc.c is doing calls to pipe() to create sockets! */ + /* assert(fdc_table[fd].active == 1); */ assert(F->type != FD_FILE); F->flags.closing = 1; #if USE_SSL @@ -650,6 +858,21 @@ #endif commSetTimeout(fd, -1, NULL, NULL); CommWriteStateCallbackAndFree(fd, COMM_ERR_CLOSING); + + /* Delete any pending io callbacks */ + while (fdc_table[fd].CommCallbackList.head != NULL) { + node = fdc_table[fd].CommCallbackList.head; + cio = node->data; + assert(fd == cio->fd); /* just paranoid */ + dlinkDelete(&cio->h_node, &CommCallbackList); + dlinkDelete(&cio->fd_node, &(fdc_table[cio->fd].CommCallbackList)); + + cio->callback(cio->fd, cio->buf, cio->retval, COMM_ERR_CLOSING, cio->xerrno, + cio->callback_data); + + memPoolFree(comm_callback_pool, cio); + } + commCallCloseHandlers(fd); if (F->uses) /* assume persistent connect count */ pconnHistCount(1, F->uses); @@ -661,6 +884,7 @@ #endif fd_close(fd); /* update fdstat */ close(fd); + fdc_table[fd].active = 0; statCounter.syscalls.sock.closes++; } @@ -838,12 +1062,15 @@ comm_init(void) { fd_table = xcalloc(Squid_MaxFD, sizeof(fde)); + fdc_table = xcalloc(Squid_MaxFD, sizeof(fdc_t)); /* XXX account fd_table */ /* Keep a few file descriptors free so that we don't run out of FD's * after accepting a client but before it opens a socket or a file. * Since Squid_MaxFD can be as high as several thousand, don't waste them */ RESERVED_FD = XMIN(100, Squid_MaxFD / 4); CBDATA_INIT_TYPE(ConnectStateData); + + comm_callback_pool = memPoolCreate("comm callbacks", sizeof(CommCallbackData)); comm_write_pool = memPoolCreate("CommWriteStateData", sizeof(CommWriteStateData)); conn_close_pool = memPoolCreate("close_handler", sizeof(close_handler)); } @@ -942,6 +1169,7 @@ comm_write(fd, mb.buf, mb.size, handler, handler_data, memBufFreeFunc(&mb)); } + /* * hm, this might be too general-purpose for all the places we'd * like to use it. Index: squid/src/main.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/main.c,v retrieving revision 1.30.10.1 retrieving revision 1.30.10.2 diff -u -r1.30.10.1 -r1.30.10.2 --- squid/src/main.c 13 Sep 2002 04:51:27 -0000 1.30.10.1 +++ squid/src/main.c 26 Sep 2002 06:08:04 -0000 1.30.10.2 @@ -1,6 +1,6 @@ /* - * $Id: main.c,v 1.30.10.1 2002/09/13 04:51:27 adri Exp $ + * $Id: main.c,v 1.30.10.2 2002/09/26 06:08:04 adri Exp $ * * DEBUG: section 1 Startup and Main Loop * AUTHOR: Harvest Derived @@ -711,6 +711,7 @@ eventRun(); if ((loop_delay = eventNextTime()) < 0) loop_delay = 0; + comm_calliocallback(); switch (comm_select(loop_delay)) { case COMM_OK: errcount = 0; /* reset if successful */ Index: squid/src/mem.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/mem.c,v retrieving revision 1.16.8.2 retrieving revision 1.16.8.3 diff -u -r1.16.8.2 -r1.16.8.3 --- squid/src/mem.c 26 Sep 2002 04:12:21 -0000 1.16.8.2 +++ squid/src/mem.c 26 Sep 2002 06:08:06 -0000 1.16.8.3 @@ -1,6 +1,6 @@ /* - * $Id: mem.c,v 1.16.8.2 2002/09/26 04:12:21 adri Exp $ + * $Id: mem.c,v 1.16.8.3 2002/09/26 06:08:06 adri Exp $ * * DEBUG: section 13 High Level Memory Pool Management * AUTHOR: Harvest Derived @@ -264,7 +264,7 @@ { /* XXX This can be optimized on very large buffers to use realloc() */ /* TODO: if the existing gross size is >= new gross size, do nothing */ - int new_gross_size; + size_t new_gross_size; void *newbuf = memAllocBuf(net_size, &new_gross_size); if (oldbuf) { int data_size = *gross_size; Index: squid/src/protos.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/protos.h,v retrieving revision 1.49.4.3 retrieving revision 1.49.4.4 diff -u -r1.49.4.3 -r1.49.4.4 --- squid/src/protos.h 26 Sep 2002 04:12:22 -0000 1.49.4.3 +++ squid/src/protos.h 26 Sep 2002 06:08:06 -0000 1.49.4.4 @@ -1,6 +1,6 @@ /* - * $Id: protos.h,v 1.49.4.3 2002/09/26 04:12:22 adri Exp $ + * $Id: protos.h,v 1.49.4.4 2002/09/26 06:08:06 adri Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -150,6 +150,11 @@ extern int clientHttpRequestStatus(int fd, clientHttpRequest const *http); extern void clientSetReplyToError(void *, err_type, http_status, method_t, char const *, struct in_addr *, request_t *, char *, auth_user_request_t * auth_user_request); + +/* comm.c */ +extern int comm_existsiocallback(void); +extern void comm_calliocallback(void); + extern int commSetNonBlocking(int fd); extern int commUnsetNonBlocking(int fd); extern void commSetCloseOnExec(int fd); Index: squid/src/stmem.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/stmem.c,v retrieving revision 1.7.22.1 retrieving revision 1.7.22.2 diff -u -r1.7.22.1 -r1.7.22.2 --- squid/src/stmem.c 26 Sep 2002 04:12:23 -0000 1.7.22.1 +++ squid/src/stmem.c 26 Sep 2002 06:08:08 -0000 1.7.22.2 @@ -1,6 +1,6 @@ /* - * $Id: stmem.c,v 1.7.22.1 2002/09/26 04:12:23 adri Exp $ + * $Id: stmem.c,v 1.7.22.2 2002/09/26 06:08:08 adri Exp $ * * DEBUG: section 19 Store Memory Primitives * AUTHOR: Harvest Derived @@ -133,7 +133,7 @@ char *ptr_to_buf = NULL; int bytes_from_this_packet = 0; int bytes_into_this_packet = 0; - debug(19, 6) ("memCopy: offset %ld: size %u\n", (long int) offset, size); + debug(19, 6) ("memCopy: offset %ld: size %d\n", (long int) offset, (int)size); if (p == NULL) return 0; /* RC: the next assert is useless */ Index: squid/src/store_client.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/store_client.c,v retrieving revision 1.11.4.2 retrieving revision 1.11.4.3 diff -u -r1.11.4.2 -r1.11.4.3 --- squid/src/store_client.c 26 Sep 2002 04:12:23 -0000 1.11.4.2 +++ squid/src/store_client.c 26 Sep 2002 06:08:08 -0000 1.11.4.3 @@ -1,6 +1,6 @@ /* - * $Id: store_client.c,v 1.11.4.2 2002/09/26 04:12:23 adri Exp $ + * $Id: store_client.c,v 1.11.4.3 2002/09/26 06:08:08 adri Exp $ * * DEBUG: section 20 Storage Manager Client-Side Interface * AUTHOR: Duane Wessels @@ -189,12 +189,12 @@ void *data) { assert(!EBIT_TEST(e->flags, ENTRY_ABORTED)); - debug(20, 3) ("storeClientCopy: %s, from %lu, for length %d, cb %p, cbdata %p\n", + debug(20, 3) ("storeClientCopy: %s, from %d, for length %d, cb %p, cbdata %p\n", storeKeyText(e->hash.key), - copyInto.offset, - copyInto.length, - callback, - data); + (int)copyInto.offset, + (int)copyInto.length, + (void *)callback, + (void *)data); assert(sc != NULL); #if STORE_CLIENT_LIST_DEBUG assert(sc == storeClientListSearch(e->mem_obj, data)); @@ -275,7 +275,7 @@ MemObject *mem = e->mem_obj; size_t sz; - debug(33, 5) ("co: %lu, hi: %ld\n", sc->copyInto.offset, (long int) mem->inmem_hi); + debug(33, 5) ("co: %lu, hi: %ld\n", (unsigned long) sc->copyInto.offset, (long int) mem->inmem_hi); if (storeClientNoMoreToSend(e, sc)) { /* There is no more to send! */ @@ -695,7 +695,7 @@ return; storeAppendPrintf(output, "\tClient #%d, %p\n", clientNumber, thisClient->callback_data); storeAppendPrintf(output, "\t\tcopy_offset: %lu\n", - thisClient->copyInto.offset); + (unsigned long)thisClient->copyInto.offset); storeAppendPrintf(output, "\t\tcopy_size: %d\n", (int) thisClient->copyInto.length); storeAppendPrintf(output, "\t\tflags:"); Index: squid/src/typedefs.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/typedefs.h,v retrieving revision 1.25.22.3 retrieving revision 1.25.22.4 diff -u -r1.25.22.3 -r1.25.22.4 --- squid/src/typedefs.h 26 Sep 2002 04:12:25 -0000 1.25.22.3 +++ squid/src/typedefs.h 26 Sep 2002 06:08:08 -0000 1.25.22.4 @@ -1,6 +1,6 @@ /* - * $Id: typedefs.h,v 1.25.22.3 2002/09/26 04:12:25 adri Exp $ + * $Id: typedefs.h,v 1.25.22.4 2002/09/26 06:08:08 adri Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -200,6 +200,7 @@ typedef void CWCB(int fd, char *, size_t size, comm_err_t flag, void *data); typedef void CNCB(int fd, comm_err_t status, void *); +typedef void IOCB(int fd, char *, size_t size, comm_err_t flag, int xerrno, void *data); typedef void FREE(void *); typedef void CBDUNL(void *); Index: squid/src/urn.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/urn.c,v retrieving revision 1.12.8.2 retrieving revision 1.12.8.3 diff -u -r1.12.8.2 -r1.12.8.3 --- squid/src/urn.c 26 Sep 2002 04:12:25 -0000 1.12.8.2 +++ squid/src/urn.c 26 Sep 2002 06:08:08 -0000 1.12.8.3 @@ -1,6 +1,6 @@ /* - * $Id: urn.c,v 1.12.8.2 2002/09/26 04:12:25 adri Exp $ + * $Id: urn.c,v 1.12.8.3 2002/09/26 06:08:08 adri Exp $ * * DEBUG: section 52 URN Parsing * AUTHOR: Kostas Anagnostakis @@ -197,7 +197,7 @@ char *buf = urnState->reqbuf; StoreIOBuffer tempBuffer; - debug(52, 3) ("urnHandleReply: Called with size=%u.\n", result.length); + debug(52, 3) ("urnHandleReply: Called with size=%u.\n", (unsigned int)result.length); if (EBIT_TEST(urlres_e->flags, ENTRY_ABORTED)) { goto error; }