--------------------- PatchSet 1584 Date: 2001/02/16 17:08:29 Author: adri Branch: eventio Tag: (none) Log: Some more changes to get this new code ready for testing. * change the comm_callback() and ncomm_write() routines to take a FREE function pointer. I know its bad - we really want this to support referenced buffers - but the existing codebase uses free functions, and so we'll supply it to them. When all the free funcs map to NULL or cbdataUnlock(), it'll disappear again. * Add 'ncomm_write_mbuf()' which does what comm_write_mbuf() does. * remove my stupid incrementing of bytes_read / bytes_written and call fd_bytes() to do the work for us. Its much neater. * add some REAL tasty logic in ncomm_close(), which is called by comm_close() when a fd goes away. Right now, I'm walking the callback list and completing any pending callbacks. If one of the callbacks is a pending write, we close it with COMM_ERR_CLOSING so the routine knows we're going away. The logic is *TASTY* here, and its not going to scale well because we are searching the pending request list, but again this code will only exist whilst small debugging is done, and then it'll come out and be replaced with something a little nicer. * rework the http.c code to use the new world order stuff for comm writing. httpSendComplete() and httpSendRequestEntry() became COMMCBs. As mentioned above, the code assumes that it can do evil tricks to free the data after its being written. That'll all go away after the initial conversion of routines to use this code. I haven't tested this for memory leaks yet, and we're only testing the write code (and softly at that.) Members: src/comm.c:1.7->1.7.12.1 src/comm_server.c:1.1.2.2->1.1.2.3 src/http.c:1.11->1.11.12.1 src/protos.h:1.18.8.1->1.18.8.2 src/structs.h:1.24.8.2->1.24.8.3 src/typedefs.h:1.15.8.1->1.15.8.2 Index: squid/src/comm.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/comm.c,v retrieving revision 1.7 retrieving revision 1.7.12.1 diff -u -r1.7 -r1.7.12.1 --- squid/src/comm.c 12 Jan 2001 08:20:32 -0000 1.7 +++ squid/src/comm.c 16 Feb 2001 17:08:29 -0000 1.7.12.1 @@ -1,6 +1,6 @@ /* - * $Id: comm.c,v 1.7 2001/01/12 08:20:32 hno Exp $ + * $Id: comm.c,v 1.7.12.1 2001/02/16 17:08:29 adri Exp $ * * DEBUG: section 5 Socket Functions * AUTHOR: Harvest Derived @@ -582,6 +582,7 @@ assert(F->type != FD_FILE); F->flags.closing = 1; CommWriteStateCallbackAndFree(fd, COMM_ERR_CLOSING); + ncomm_close(fd); commCallCloseHandlers(fd); if (F->uses) /* assume persistent connect count */ pconnHistCount(1, F->uses); Index: squid/src/comm_server.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/Attic/comm_server.c,v retrieving revision 1.1.2.2 retrieving revision 1.1.2.3 diff -u -r1.1.2.2 -r1.1.2.3 --- squid/src/comm_server.c 15 Feb 2001 21:17:03 -0000 1.1.2.2 +++ squid/src/comm_server.c 16 Feb 2001 17:08:30 -0000 1.1.2.3 @@ -20,22 +20,23 @@ static unsigned int pollnfds = 0; static unsigned int pollfirstfree = 0; -comm_callback_entry_t *cbhead = NULL; -comm_callback_entry_t **cbtail = &cbhead; +dlink_list callbacklist; -static void -comm_callback(int fd, void *buf, ssize_t len, int errno_nr, COMMCB callback, void *data) +void +comm_callback(int fd, void *buf, ssize_t len, int error, int errno_nr, + COMMCB *callback, void *data, FREE *freefunc) { comm_callback_entry_t *cbent = xcalloc(1, sizeof(comm_callback_entry_t)); cbent->callback = callback; cbent->fd = fd; cbent->buf = buf; cbent->len = len; + cbent->error = error; cbent->errno_nr = errno_nr; cbent->data = data; - cbent->next = NULL; - *cbtail = cbent; - cbtail = &cbent->next; + cbent->freefunc = freefunc; + + dlinkAdd(cbent, &cbent->node, &callbacklist); } static inline void @@ -59,6 +60,7 @@ { int comm_index = fd_table[fd].new.comm_index; int done = fd_table[fd].new.read.handler(fd, &fd_table[fd].new.read); + if (done) { pollfds[comm_index].events &= ~POLLIN; cleanup_poll(fd, (unsigned int)comm_index); @@ -71,6 +73,7 @@ { int comm_index = fd_table[fd].new.comm_index; int done = fd_table[fd].new.write.handler(fd, &fd_table[fd].new.write); + if (done) { pollfds[comm_index].events &= ~POLLOUT; cleanup_poll(fd, (unsigned int)comm_index); @@ -103,13 +106,20 @@ } } /* Do callbacks */ - while(cbhead) { - comm_callback_entry_t *cb = cbhead; - cb->callback(cb->fd, cb->buf, cb->len, cb->errno_nr, cb->data); - cbhead = cbhead->next; + while(callbacklist.head) { + comm_callback_entry_t *cb = callbacklist.head->data; + + /* I hate this. It'll be replaced with a cbdataLock/Unlock soon .. */ + if (cb->freefunc) { + cb->freefunc(cb->buf); + cb->buf = NULL; + } + + cb->callback(cb->fd, cb->buf, cb->len, cb->error, cb->errno_nr, + cb->data); + dlinkDelete(&cb->node, &callbacklist); xfree(cb); } - cbtail = &cbhead; } static void @@ -147,15 +157,17 @@ /* Read in some data */ const ssize_t len = read(fd, fhr->buf, fhr->size); statCounter.syscalls.sock.reads++; + fd_bytes(fd, len, FD_READ); if (len >= 0) { /* We got some data to deliver to the application */ - fd_table[fd].bytes_read += len; - comm_callback(fd, fhr->buf, len, 0, fhr->callback, fhr->data); + comm_callback(fd, fhr->buf, len, COMM_OK, 0, fhr->callback, fhr->data, + NULL); return 1; } if (len == -1 && !ignoreErrno(errno)) { /* Error */ - comm_callback(fd, fhr->buf, len, errno, fhr->callback, fhr->data); + comm_callback(fd, fhr->buf, len, COMM_ERROR, errno, fhr->callback, + fhr->data, NULL); return 1; } /* Try again */ @@ -163,20 +175,20 @@ } void -ncomm_read(int fd, void *buf, size_t size, COMMCB callback, void *data) +ncomm_read(int fd, void *buf, size_t size, COMMCB *callback, void *data) { /* Read in some data */ const ssize_t len = read(fd, buf, size); statCounter.syscalls.sock.reads++; + fd_bytes(fd, len, FD_READ); if (len >= 0) { /* We immediately got some data to deliver to the application */ - fd_table[fd].bytes_read += len; - comm_callback(fd, buf, len, 0, callback, data); + comm_callback(fd, buf, len, COMM_OK, 0, callback, data, NULL); return; } if (len == -1 && !ignoreErrno(errno)) { /* Error */ - comm_callback(fd, buf, len, errno, callback, data); + comm_callback(fd, buf, len, COMM_ERROR, errno, callback, data, NULL); return; } /* Oops.. we could not get any data. Register for read event */ @@ -196,17 +208,17 @@ const size_t size = fhw->size - fhw->done; ssize_t len = write(fd, buf, size); statCounter.syscalls.sock.writes++; + fd_bytes(fd, len, FD_WRITE); if ((size_t)len == size) { /* finished */ - fd_table[fd].bytes_written += len; - comm_callback(fd, fhw->buf, (ssize_t)fhw->size, 0, fhw->callback, - fhw->data); + comm_callback(fd, fhw->buf, (ssize_t)fhw->size, COMM_OK, 0, + fhw->callback, fhw->data, fhw->freefunc); return 1; } if (len == -1 && !ignoreErrno(errno)) { /* error */ - comm_callback(fd, fhw->buf, fhw->done, errno, fhw->callback, - fhw->data); + comm_callback(fd, fhw->buf, fhw->done, COMM_ERROR, errno, + fhw->callback, fhw->data, fhw->freefunc); return 1; } /* try again */ @@ -216,19 +228,22 @@ } void -ncomm_write(int fd, const void *buf, size_t size, COMMCB callback, void *data) +ncomm_write(int fd, const void *buf, size_t size, COMMCB *callback, void *data, + FREE *freefunc) { ssize_t len = write(fd, buf, size); statCounter.syscalls.sock.writes++; + fd_bytes(fd, len, FD_WRITE); if ((size_t)len == size) { /* Finished immediately */ - fd_table[fd].bytes_written += len; - comm_callback(fd, (void *)buf, len, 0, callback, data); + comm_callback(fd, (void *)buf, len, COMM_OK, 0, callback, data, + freefunc); return; } if (len == -1 && !ignoreErrno(errno)) { /* error */ - comm_callback(fd, (void *)buf, len, errno, callback, data); + comm_callback(fd, (void *)buf, len, COMM_ERROR, errno, callback, data, + freefunc); return; } /* Oops.. we could not write all data. Register for write event */ @@ -244,6 +259,16 @@ comm_register_for_event(fd, EVENT_WRITE); } +/* a wrapper around comm_write to allow for MemBuf to be comm_written in a snap +*/ +void +ncomm_write_mbuf(int fd, MemBuf mb, COMMCB *handler, void *handler_data) +{ + ncomm_write(fd, mb.buf, mb.size, handler, handler_data, + memBufFreeFunc(&mb) ); +} + + void comm_new_fd(int fd) { memset(&fd_table[fd].new, 0, sizeof(fd_table[fd].new)); @@ -283,8 +308,8 @@ commSetNonBlocking(nfd); /* Finally, hand over the socket */ - comm_callback(nfd, &fd_table[nfd].new.peer, addrlen, 0, - fhr->callback, fhr->data); + comm_callback(nfd, &fd_table[nfd].new.peer, addrlen, COMM_OK, 0, + fhr->callback, fhr->data, NULL); } if (!ignoreErrno(errno)) { perror("accept:"); @@ -294,7 +319,7 @@ } int -ncomm_listen(int fd, int backlog, COMMCB callback, void *data) +ncomm_listen(int fd, int backlog, COMMCB *callback, void *data) { int err = listen(fd, backlog); if (err != 0) @@ -316,3 +341,57 @@ { /* nothing for now */ } + + +/* This will eventually be merged with comm.c when the old comm code goes */ + +/* + * Here's a problem - we can have pending callbacks in the callback queue, *and* + * we can have a write callback. We don't want to call the write callback if its in + * the queue. So, if its in the queue, we mark it and don't call it at the end. + * + * XXX yes this is evil, I know. We'll tidy it up once the old comm code goes away. + * -- adrian + */ +void +ncomm_close(int fd) +{ + dlink_node *node, *tnode; + comm_callback_entry_t *cb; + int found_write_cb = 0; + fde *F = &fd_table[fd]; + COMMCB *callback; + + /* Flush any pending callbacks in the queue that are for us */ + /* XXX this is obviously O(N) rather than O(1) which needs changing! */ + node = callbacklist.head; + + while (node != NULL) { + tnode = node->next; + cb = node->data; + if (cb->fd == fd) { + if ((cb->data == F->new.write.data) && (cb->callback == F->new.write.callback) && + (cb->buf == F->new.write.buf)) { + found_write_cb = 1; + cb->callback(cb->fd, cb->buf, cb->len, COMM_ERR_CLOSING, cb->errno_nr, cb->data); + } else { + cb->callback(cb->fd, cb->buf, cb->len, cb->error, cb->errno_nr, cb->data); + } + + dlinkDelete(&cb->node, &callbacklist); + xfree(cb); + } + node = tnode; + } + + /* Now, kill the existing write callback */ + if (found_write_cb == 0) { + if (F->new.write.data && F->new.write.callback) { + callback = F->new.write.callback; + F->new.write.callback = NULL; + callback(fd, F->new.write.buf, 0, COMM_ERR_CLOSING, 0, F->new.write.data); + } + } + + /* Done! */ +} Index: squid/src/http.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/http.c,v retrieving revision 1.11 retrieving revision 1.11.12.1 diff -u -r1.11 -r1.11.12.1 --- squid/src/http.c 12 Jan 2001 08:20:33 -0000 1.11 +++ squid/src/http.c 16 Feb 2001 17:08:30 -0000 1.11.12.1 @@ -1,6 +1,6 @@ /* - * $Id: http.c,v 1.11 2001/01/12 08:20:33 hno Exp $ + * $Id: http.c,v 1.11.12.1 2001/02/16 17:08:30 adri Exp $ * * DEBUG: section 11 Hypertext Transfer Protocol (HTTP) * AUTHOR: Harvest Derived @@ -42,8 +42,8 @@ static const char *const crlf = "\r\n"; -static CWCB httpSendComplete; -static CWCB httpSendRequestEntry; +static COMMCB httpSendComplete; +static COMMCB httpSendRequestEntry; static PF httpReadReply; static void httpSendRequest(HttpStateData *); @@ -589,7 +589,8 @@ /* This will be called when request write is complete. Schedule read of * reply. */ static void -httpSendComplete(int fd, char *bufnotused, size_t size, int errflag, void *data) +httpSendComplete(int fd, void *bufnotused, ssize_t size, int errflag, + int reterrno, void *data) { HttpStateData *httpState = data; StoreEntry *entry = httpState->entry; @@ -600,7 +601,6 @@ assert(entry->mem_obj->chksum == url_checksum(entry->mem_obj->url)); #endif if (size > 0) { - fd_bytes(fd, size, FD_WRITE); kb_incr(&statCounter.server.all.kbytes_out, size); kb_incr(&statCounter.server.http.kbytes_out, size); } @@ -872,7 +872,7 @@ StoreEntry *entry = httpState->entry; int cfd; peer *p = httpState->peer; - CWCB *sendHeaderDone; + COMMCB *sendHeaderDone; debug(11, 5) ("httpSendRequest: FD %d: httpState %p.\n", httpState->fd, httpState); @@ -915,7 +915,7 @@ cfd, httpState->flags); debug(11, 6) ("httpSendRequest: FD %d:\n%s\n", httpState->fd, mb.buf); - comm_write_mbuf(httpState->fd, mb, sendHeaderDone, httpState); + ncomm_write_mbuf(httpState->fd, mb, sendHeaderDone, httpState); } void @@ -986,13 +986,13 @@ ch.request = httpState->request; if (!Config.accessList.brokenPosts) { debug(11, 5) ("httpSendRequestEntryDone: No brokenPosts list\n"); - httpSendComplete(fd, NULL, 0, 0, data); + httpSendComplete(fd, NULL, 0, COMM_OK, 0, data); } else if (!aclCheckFast(Config.accessList.brokenPosts, &ch)) { debug(11, 5) ("httpSendRequestEntryDone: didn't match brokenPosts\n"); - httpSendComplete(fd, NULL, 0, 0, data); + httpSendComplete(fd, NULL, 0, COMM_OK, 0, data); } else { debug(11, 2) ("httpSendRequestEntryDone: matched brokenPosts\n"); - comm_write(fd, "\r\n", 2, httpSendComplete, data, NULL); + ncomm_write(fd, "\r\n", 2, httpSendComplete, data, NULL); } } @@ -1001,7 +1001,7 @@ { HttpStateData *httpState = (HttpStateData *) data; if (size > 0) { - comm_write(httpState->fd, buf, size, httpSendRequestEntry, data, memFree8K); + ncomm_write(httpState->fd, buf, size, httpSendRequestEntry, data, memFree8K); } else if (size == 0) { /* End of body */ memFree8K(buf); @@ -1009,12 +1009,13 @@ } else { /* Failed to get whole body, probably aborted */ memFree8K(buf); - httpSendComplete(httpState->fd, NULL, 0, COMM_ERR_CLOSING, data); + httpSendComplete(httpState->fd, NULL, 0, COMM_ERR_CLOSING, 0, data); } } static void -httpSendRequestEntry(int fd, char *bufnotused, size_t size, int errflag, void *data) +httpSendRequestEntry(int fd, void *bufnotused, ssize_t size, int errflag, int reterrno, + void *data) { HttpStateData *httpState = data; StoreEntry *entry = httpState->entry; @@ -1022,7 +1023,6 @@ debug(11, 5) ("httpSendRequestEntry: FD %d: size %d: errflag %d.\n", fd, size, errflag); if (size > 0) { - fd_bytes(fd, size, FD_WRITE); kb_incr(&statCounter.server.all.kbytes_out, size); kb_incr(&statCounter.server.http.kbytes_out, size); } @@ -1030,7 +1030,7 @@ return; if (errflag) { err = errorCon(ERR_WRITE_ERROR, HTTP_INTERNAL_SERVER_ERROR); - err->xerrno = errno; + err->xerrno = reterrno; err->request = requestLink(httpState->orig_request); errorAppendEntry(entry, err); comm_close(fd); Index: squid/src/protos.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/protos.h,v retrieving revision 1.18.8.1 retrieving revision 1.18.8.2 diff -u -r1.18.8.1 -r1.18.8.2 --- squid/src/protos.h 15 Feb 2001 15:53:37 -0000 1.18.8.1 +++ squid/src/protos.h 16 Feb 2001 17:08:30 -0000 1.18.8.2 @@ -1,6 +1,6 @@ /* - * $Id: protos.h,v 1.18.8.1 2001/02/15 15:53:37 adri Exp $ + * $Id: protos.h,v 1.18.8.2 2001/02/16 17:08:30 adri Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -1299,7 +1299,11 @@ */ extern void ncomm_init(void); extern void ncomm_handle_events(void); -extern void ncomm_read(int fd, void *buf, size_t size, COMMCB callback, +extern void ncomm_read(int fd, void *buf, size_t size, COMMCB *callback, void *data); -extern int ncomm_listen(int fd, int backlog, COMMCB callback, void *data); - +extern void ncomm_write(int fd, const void *buf, size_t size, COMMCB *callback, + void *data, FREE *freefunc); +extern void ncomm_write_mbuf(int fd, MemBuf mb, COMMCB *handler, + void *handler_data); +extern int ncomm_listen(int fd, int backlog, COMMCB *callback, void *data); +extern void ncomm_close(int fd); Index: squid/src/structs.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/structs.h,v retrieving revision 1.24.8.2 retrieving revision 1.24.8.3 diff -u -r1.24.8.2 -r1.24.8.3 --- squid/src/structs.h 15 Feb 2001 21:17:03 -0000 1.24.8.2 +++ squid/src/structs.h 16 Feb 2001 17:08:30 -0000 1.24.8.3 @@ -1,6 +1,6 @@ /* - * $Id: structs.h,v 1.24.8.2 2001/02/15 21:17:03 adri Exp $ + * $Id: structs.h,v 1.24.8.3 2001/02/16 17:08:30 adri Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -697,7 +697,7 @@ struct _fh_read { void *buf; size_t size; - COMMCB callback; + COMMCB *callback; void *data; int (*handler)(int fd, struct _fh_read *); }; @@ -706,8 +706,9 @@ void *buf; size_t size; size_t done; - COMMCB callback; + COMMCB *callback; void *data; + FREE *freefunc; int (*handler)(int fd, struct _fh_write *); }; @@ -2137,12 +2138,14 @@ * comm_server stuff */ struct _comm_callback_entry { - struct _comm_callback_entry *next; - COMMCB callback; + dlink_node node; + COMMCB *callback; int fd; void *buf; ssize_t len; int errno_nr; + int error; void *data; -}; + FREE *freefunc; +}; Index: squid/src/typedefs.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/typedefs.h,v retrieving revision 1.15.8.1 retrieving revision 1.15.8.2 diff -u -r1.15.8.1 -r1.15.8.2 --- squid/src/typedefs.h 15 Feb 2001 15:53:37 -0000 1.15.8.1 +++ squid/src/typedefs.h 16 Feb 2001 17:08:30 -0000 1.15.8.2 @@ -1,6 +1,6 @@ /* - * $Id: typedefs.h,v 1.15.8.1 2001/02/15 15:53:37 adri Exp $ + * $Id: typedefs.h,v 1.15.8.2 2001/02/16 17:08:30 adri Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -359,8 +359,8 @@ /* * comm_server */ -typedef void (*COMMCB)(int fd, void *buf, ssize_t len, int errno_nr, - void *data); +typedef void COMMCB(int fd, void *buf, ssize_t len, int error, + int errno_nr, void *data); #endif /* _TYPEDEFS_H_ */