--------------------- PatchSet 1593 Date: 2001/02/17 23:24:10 Author: hno Branch: eventio Tag: (none) Log: Renamed comm_server.c to ncomm.c Members: src/Makefile.in:1.7.8.1->1.7.8.2 src/comm_server.c:1.1.2.5->1.1.2.6(DEAD) src/ncomm.c:1.1->1.1.2.1 Index: squid/src/Makefile.in =================================================================== RCS file: /cvsroot/squid-sf//squid/src/Attic/Makefile.in,v retrieving revision 1.7.8.1 retrieving revision 1.7.8.2 diff -u -r1.7.8.1 -r1.7.8.2 --- squid/src/Makefile.in 15 Feb 2001 15:53:37 -0000 1.7.8.1 +++ squid/src/Makefile.in 17 Feb 2001 23:24:10 -0000 1.7.8.2 @@ -1,7 +1,7 @@ # # Makefile for the Squid Object Cache server # -# $Id: Makefile.in,v 1.7.8.1 2001/02/15 15:53:37 adri Exp $ +# $Id: Makefile.in,v 1.7.8.2 2001/02/17 23:24:10 hno Exp $ # # Uncomment and customize the following to suit your needs: # @@ -103,7 +103,7 @@ client_side.o \ comm.o \ comm_select.o \ - comm_server.o \ + ncomm.o \ debug.o \ @DELAY_OBJS@ \ disk.o \ --- squid/src/comm_server.c Wed Feb 14 00:50:29 2007 +++ /dev/null Wed Feb 14 00:50:05 2007 @@ -1,401 +0,0 @@ -/* - * Adrian's todo list with this code - * - * - make the callback list code use a dlink - * - convert things to use memory pools - * - add debugging statements - * - merge in the "new" stuff with fd.c, and use fd.c right here - * (ie use fd_open/close to clear the struct completely, call - * fd_open whenever we create something..) - * - Make sure we call the comm.c routines after accept() to set - * non-blocking, etc. - */ - -#include "squid.h" - -#define UNUSED __attribute__((unused)) - - -static struct pollfd pollfds[FD_SETSIZE]; -static unsigned int pollnfds = 0; -static unsigned int pollfirstfree = 1; /* 0 is skipped to simplify logics */ - -dlink_list callbacklist; - -static void -comm_callback(int fd, void *buf, ssize_t len, int error, int errno_nr, - COMMCB *callback, void *data, FREE *freefunc) -{ - comm_callback_entry_t *cbent = xcalloc(1, sizeof(comm_callback_entry_t)); - cbent->callback = callback; - cbent->fd = fd; - cbent->buf = buf; - cbent->len = len; - cbent->error = error; - cbent->errno_nr = errno_nr; - cbent->data = data; - cbent->freefunc = freefunc; - - dlinkAdd(cbent, &cbent->node, &callbacklist); -} - -static inline void -cleanup_poll(int fd, unsigned int poll_index) -{ - if (!pollfds[poll_index].events) { - fd_table[fd].new.poll_index = 0; - pollfds[poll_index].fd = -1; - if (poll_index < pollfirstfree) - pollfirstfree = poll_index; - if (poll_index == pollnfds) { - /* Shrink the active set */ - while(pollnfds > 0 && pollfds[pollnfds].fd == -1) - pollnfds--; - } - } -} - -static inline void -comm_read_event(int fd) -{ - int poll_index = fd_table[fd].new.poll_index; - int done = fd_table[fd].new.read.handler(fd, &fd_table[fd].new.read); - - if (done) { - pollfds[poll_index].events &= ~POLLIN; - cleanup_poll(fd, (unsigned int)poll_index); - } -} - - -static inline void -comm_write_event(int fd) -{ - int poll_index = fd_table[fd].new.poll_index; - int done = fd_table[fd].new.write.handler(fd, &fd_table[fd].new.write); - - if (done) { - pollfds[poll_index].events &= ~POLLOUT; - cleanup_poll(fd, (unsigned int)poll_index); - } -} - -static void -ncomm_call_callbacks(void) -{ - /* Do callbacks */ - while(callbacklist.head) { - comm_callback_entry_t *cb = callbacklist.head->data; - - /* I hate this. It'll be replaced with a cbdataLock/Unlock soon .. */ - if (cb->freefunc) { - cb->freefunc(cb->buf); - cb->buf = NULL; - } - - cb->callback(cb->fd, cb->buf, cb->len, cb->error, cb->errno_nr, - cb->data); - dlinkDelete(&cb->node, &callbacklist); - xfree(cb); - } -} - -#define CALLBACK_MAGIC1 16 -void -ncomm_handle_events(int timeout) -{ - int poll_index; - int fds = poll(pollfds+1 /* skip position 0 */, pollnfds, timeout); - int callback_limit = CALLBACK_MAGIC1; - for (poll_index=0; poll_index < (int)pollnfds && fds > 0; poll_index++) { - short revents = pollfds[poll_index].revents; - int fd = pollfds[poll_index].fd; - if(revents) { - fds--; - if (revents & POLLIN) { - comm_read_event(fd); - } else if (revents & POLLOUT) { - comm_write_event(fd); - } else if (revents & (POLLHUP | POLLERR | POLLNVAL)) { - short events = pollfds[poll_index].events; - /* I am pretty sure there is better ways to handle errors.. */ - if (events & POLLOUT) - comm_write_event(fd); - else if (events & POLLIN) - comm_read_event(fd); - } - } - if (--callback_limit == 0) { - ncomm_call_callbacks(); - callback_limit = CALLBACK_MAGIC1; - } - } - if (callback_limit) - ncomm_call_callbacks(); -} - -static void -comm_register_for_event(int fd, enum comm_event_type event) -{ - int poll_index = fd_table[fd].new.poll_index; - short events = 0; - switch (event) { - case EVENT_READ: - events = POLLIN; - break; - case EVENT_WRITE: - events = POLLOUT; - break; - default: - /* ERROR! */ - return; - } - if (!poll_index) { - poll_index = pollfirstfree; - while (pollfds[poll_index].fd >= 0 && poll_index < (int)pollnfds) - poll_index++; - if (poll_index > (int)pollnfds) - pollnfds = (unsigned int)poll_index; - pollfirstfree = poll_index + 1; - pollfds[poll_index].fd = fd; - fd_table[fd].new.poll_index = poll_index; - } - pollfds[poll_index].events |= events; -} - -static int -comm_do_read(int fd, fh_read_t *fhr) -{ - /* Read in some data */ - const ssize_t len = read(fd, fhr->buf, fhr->size); - statCounter.syscalls.sock.reads++; - fd_bytes(fd, len, FD_READ); - if (len >= 0) { - /* We got some data to deliver to the application */ - comm_callback(fd, fhr->buf, len, COMM_OK, 0, fhr->callback, fhr->data, - NULL); - return 1; - } - if (len == -1 && !ignoreErrno(errno)) { - /* Error */ - comm_callback(fd, fhr->buf, len, COMM_ERROR, errno, fhr->callback, - fhr->data, NULL); - return 1; - } - /* Try again */ - return 0; -} - -void -ncomm_read(int fd, void *buf, size_t size, COMMCB *callback, void *data) -{ - /* Read in some data */ - const ssize_t len = read(fd, buf, size); - statCounter.syscalls.sock.reads++; - fd_bytes(fd, len, FD_READ); - if (len >= 0) { - /* We immediately got some data to deliver to the application */ - comm_callback(fd, buf, len, COMM_OK, 0, callback, data, NULL); - return; - } - if (len == -1 && !ignoreErrno(errno)) { - /* Error */ - comm_callback(fd, buf, len, COMM_ERROR, errno, callback, data, NULL); - return; - } - /* Oops.. we could not get any data. Register for read event */ - fd_table[fd].new.read.buf = buf; - fd_table[fd].new.read.size = size; - fd_table[fd].new.read.callback = callback; - fd_table[fd].new.read.data = data; - fd_table[fd].new.read.handler = comm_do_read; - comm_register_for_event(fd, EVENT_READ); -} - -static int -comm_do_write(int fd, fh_write_t *fhw) -{ - /* Write out some more data */ - const void *buf = (void *)((char *)fhw->buf + fhw->done); - const size_t size = fhw->size - fhw->done; - ssize_t len = write(fd, buf, size); - statCounter.syscalls.sock.writes++; - fd_bytes(fd, len, FD_WRITE); - if ((size_t)len == size) { - /* finished */ - comm_callback(fd, fhw->buf, (ssize_t)fhw->size, COMM_OK, 0, - fhw->callback, fhw->data, fhw->freefunc); - return 1; - } - if (len == -1 && !ignoreErrno(errno)) { - /* error */ - comm_callback(fd, fhw->buf, fhw->done, COMM_ERROR, errno, - fhw->callback, fhw->data, fhw->freefunc); - return 1; - } - /* try again */ - if (len > 0) - fhw->done += len; - return 0; -} - -void -ncomm_write(int fd, const void *buf, size_t size, COMMCB *callback, void *data, - FREE *freefunc) -{ - ssize_t len = write(fd, buf, size); - statCounter.syscalls.sock.writes++; - fd_bytes(fd, len, FD_WRITE); - if ((size_t)len == size) { - /* Finished immediately */ - comm_callback(fd, (void *)buf, len, COMM_OK, 0, callback, data, - freefunc); - return; - } - if (len == -1 && !ignoreErrno(errno)) { - /* error */ - comm_callback(fd, (void *)buf, len, COMM_ERROR, errno, callback, data, - freefunc); - return; - } - /* Oops.. we could not write all data. Register for write event */ - fd_table[fd].new.write.buf = (void *)buf; - fd_table[fd].new.write.size = size; - if (len != -1) - fd_table[fd].new.write.done = len; - else - fd_table[fd].new.write.done = 0; - fd_table[fd].new.write.callback = callback; - fd_table[fd].new.write.data = data; - fd_table[fd].new.write.handler = comm_do_write; - comm_register_for_event(fd, EVENT_WRITE); -} - -/* a wrapper around comm_write to allow for MemBuf to be comm_written in a snap -*/ -void -ncomm_write_mbuf(int fd, MemBuf mb, COMMCB *handler, void *handler_data) -{ - ncomm_write(fd, mb.buf, mb.size, handler, handler_data, - memBufFreeFunc(&mb) ); -} - -static int -comm_do_accept(int fd, fh_read_t *fhr) -{ - int nfd; - struct sockaddr saddr; - socklen_t addrlen; - fde *F; - - while(1) { - addrlen = sizeof(struct sockaddr); - nfd = accept(fd, &saddr, &addrlen); - statCounter.syscalls.sock.accepts++; - if (nfd < 0) - break; - /* Update the FD state */ - F = &fd_table[nfd]; - - memset(&F->new, 0, sizeof(fd_table[nfd].new)); - fd_open(nfd, FD_SOCKET, "Incoming request"); - - /* Update the socket information */ - addrlen = sizeof(struct sockaddr); - getsockname(nfd, (struct sockaddr *) &F->new.local, &addrlen); - xstrncpy(F->ipaddr, inet_ntoa(F->new.peer.sin_addr), 16); - F->remote_port = htons(F->new.peer.sin_port); - F->local_port = htons(F->new.local.sin_port); - - /* Set the flags */ - commSetCloseOnExec(nfd); - commSetNonBlocking(nfd); - - /* Finally, hand over the socket */ - comm_callback(nfd, &fd_table[nfd].new.peer, addrlen, COMM_OK, 0, - fhr->callback, fhr->data, NULL); - } - if (!ignoreErrno(errno)) { - perror("accept:"); - } - /* continue listening */ - return 0; -} - -int -ncomm_listen(int fd, int backlog, COMMCB *callback, void *data) -{ - int err = listen(fd, backlog); - if (err != 0) - return err; - if (err != 0) - return err; - fd_table[fd].new.read.buf = NULL; - fd_table[fd].new.read.size = 0; - fd_table[fd].new.read.callback = callback; - fd_table[fd].new.read.data = data; - fd_table[fd].new.read.handler = comm_do_accept; - comm_register_for_event(fd, EVENT_READ); - comm_do_accept(fd, &fd_table[fd].new.read); - return 0; -} - -void -ncomm_init(void) -{ - /* nothing for now */ -} - - -/* This will eventually be merged with comm.c when the old comm code goes */ - -/* - * Here's a problem - we can have pending callbacks in the callback queue, *and* - * we can have a write callback. We don't want to call the write callback if its in - * the queue. So, if its in the queue, we mark it and don't call it at the end. - * - * XXX yes this is evil, I know. We'll tidy it up once the old comm code goes away. - * -- adrian - */ -void -ncomm_close(int fd) -{ - dlink_node *node, *tnode; - comm_callback_entry_t *cb; - int found_write_cb = 0; - fde *F = &fd_table[fd]; - COMMCB *callback; - - /* Flush any pending callbacks in the queue that are for us */ - /* XXX this is obviously O(N) rather than O(1) which needs changing! */ - node = callbacklist.head; - - while (node != NULL) { - tnode = node->next; - cb = node->data; - if (cb->fd == fd) { - if ((cb->data == F->new.write.data) && (cb->callback == F->new.write.callback) && - (cb->buf == F->new.write.buf)) { - found_write_cb = 1; - cb->callback(cb->fd, cb->buf, cb->len, COMM_ERR_CLOSING, cb->errno_nr, cb->data); - } else { - cb->callback(cb->fd, cb->buf, cb->len, cb->error, cb->errno_nr, cb->data); - } - - dlinkDelete(&cb->node, &callbacklist); - xfree(cb); - } - node = tnode; - } - - /* Now, kill the existing write callback */ - if (found_write_cb == 0) { - if (F->new.write.data && F->new.write.callback) { - callback = F->new.write.callback; - F->new.write.callback = NULL; - callback(fd, F->new.write.buf, 0, COMM_ERR_CLOSING, 0, F->new.write.data); - } - } - - /* Done! */ -} --- /dev/null Wed Feb 14 00:50:05 2007 +++ squid/src/ncomm.c Wed Feb 14 00:50:29 2007 @@ -0,0 +1,401 @@ +/* + * Adrian's todo list with this code + * + * - make the callback list code use a dlink + * - convert things to use memory pools + * - add debugging statements + * - merge in the "new" stuff with fd.c, and use fd.c right here + * (ie use fd_open/close to clear the struct completely, call + * fd_open whenever we create something..) + * - Make sure we call the comm.c routines after accept() to set + * non-blocking, etc. + */ + +#include "squid.h" + +#define UNUSED __attribute__((unused)) + + +static struct pollfd pollfds[FD_SETSIZE]; +static unsigned int pollnfds = 0; +static unsigned int pollfirstfree = 1; /* 0 is skipped to simplify logics */ + +dlink_list callbacklist; + +static void +comm_callback(int fd, void *buf, ssize_t len, int error, int errno_nr, + COMMCB *callback, void *data, FREE *freefunc) +{ + comm_callback_entry_t *cbent = xcalloc(1, sizeof(comm_callback_entry_t)); + cbent->callback = callback; + cbent->fd = fd; + cbent->buf = buf; + cbent->len = len; + cbent->error = error; + cbent->errno_nr = errno_nr; + cbent->data = data; + cbent->freefunc = freefunc; + + dlinkAdd(cbent, &cbent->node, &callbacklist); +} + +static inline void +cleanup_poll(int fd, unsigned int poll_index) +{ + if (!pollfds[poll_index].events) { + fd_table[fd].new.poll_index = 0; + pollfds[poll_index].fd = -1; + if (poll_index < pollfirstfree) + pollfirstfree = poll_index; + if (poll_index == pollnfds) { + /* Shrink the active set */ + while(pollnfds > 0 && pollfds[pollnfds].fd == -1) + pollnfds--; + } + } +} + +static inline void +comm_read_event(int fd) +{ + int poll_index = fd_table[fd].new.poll_index; + int done = fd_table[fd].new.read.handler(fd, &fd_table[fd].new.read); + + if (done) { + pollfds[poll_index].events &= ~POLLIN; + cleanup_poll(fd, (unsigned int)poll_index); + } +} + + +static inline void +comm_write_event(int fd) +{ + int poll_index = fd_table[fd].new.poll_index; + int done = fd_table[fd].new.write.handler(fd, &fd_table[fd].new.write); + + if (done) { + pollfds[poll_index].events &= ~POLLOUT; + cleanup_poll(fd, (unsigned int)poll_index); + } +} + +static void +ncomm_call_callbacks(void) +{ + /* Do callbacks */ + while(callbacklist.head) { + comm_callback_entry_t *cb = callbacklist.head->data; + + /* I hate this. It'll be replaced with a cbdataLock/Unlock soon .. */ + if (cb->freefunc) { + cb->freefunc(cb->buf); + cb->buf = NULL; + } + + cb->callback(cb->fd, cb->buf, cb->len, cb->error, cb->errno_nr, + cb->data); + dlinkDelete(&cb->node, &callbacklist); + xfree(cb); + } +} + +#define CALLBACK_MAGIC1 16 +void +ncomm_handle_events(int timeout) +{ + int poll_index; + int fds = poll(pollfds+1 /* skip position 0 */, pollnfds, timeout); + int callback_limit = CALLBACK_MAGIC1; + for (poll_index=0; poll_index < (int)pollnfds && fds > 0; poll_index++) { + short revents = pollfds[poll_index].revents; + int fd = pollfds[poll_index].fd; + if(revents) { + fds--; + if (revents & POLLIN) { + comm_read_event(fd); + } else if (revents & POLLOUT) { + comm_write_event(fd); + } else if (revents & (POLLHUP | POLLERR | POLLNVAL)) { + short events = pollfds[poll_index].events; + /* I am pretty sure there is better ways to handle errors.. */ + if (events & POLLOUT) + comm_write_event(fd); + else if (events & POLLIN) + comm_read_event(fd); + } + } + if (--callback_limit == 0) { + ncomm_call_callbacks(); + callback_limit = CALLBACK_MAGIC1; + } + } + if (callback_limit) + ncomm_call_callbacks(); +} + +static void +comm_register_for_event(int fd, enum comm_event_type event) +{ + int poll_index = fd_table[fd].new.poll_index; + short events = 0; + switch (event) { + case EVENT_READ: + events = POLLIN; + break; + case EVENT_WRITE: + events = POLLOUT; + break; + default: + /* ERROR! */ + return; + } + if (!poll_index) { + poll_index = pollfirstfree; + while (pollfds[poll_index].fd >= 0 && poll_index < (int)pollnfds) + poll_index++; + if (poll_index > (int)pollnfds) + pollnfds = (unsigned int)poll_index; + pollfirstfree = poll_index + 1; + pollfds[poll_index].fd = fd; + fd_table[fd].new.poll_index = poll_index; + } + pollfds[poll_index].events |= events; +} + +static int +comm_do_read(int fd, fh_read_t *fhr) +{ + /* Read in some data */ + const ssize_t len = read(fd, fhr->buf, fhr->size); + statCounter.syscalls.sock.reads++; + fd_bytes(fd, len, FD_READ); + if (len >= 0) { + /* We got some data to deliver to the application */ + comm_callback(fd, fhr->buf, len, COMM_OK, 0, fhr->callback, fhr->data, + NULL); + return 1; + } + if (len == -1 && !ignoreErrno(errno)) { + /* Error */ + comm_callback(fd, fhr->buf, len, COMM_ERROR, errno, fhr->callback, + fhr->data, NULL); + return 1; + } + /* Try again */ + return 0; +} + +void +ncomm_read(int fd, void *buf, size_t size, COMMCB *callback, void *data) +{ + /* Read in some data */ + const ssize_t len = read(fd, buf, size); + statCounter.syscalls.sock.reads++; + fd_bytes(fd, len, FD_READ); + if (len >= 0) { + /* We immediately got some data to deliver to the application */ + comm_callback(fd, buf, len, COMM_OK, 0, callback, data, NULL); + return; + } + if (len == -1 && !ignoreErrno(errno)) { + /* Error */ + comm_callback(fd, buf, len, COMM_ERROR, errno, callback, data, NULL); + return; + } + /* Oops.. we could not get any data. Register for read event */ + fd_table[fd].new.read.buf = buf; + fd_table[fd].new.read.size = size; + fd_table[fd].new.read.callback = callback; + fd_table[fd].new.read.data = data; + fd_table[fd].new.read.handler = comm_do_read; + comm_register_for_event(fd, EVENT_READ); +} + +static int +comm_do_write(int fd, fh_write_t *fhw) +{ + /* Write out some more data */ + const void *buf = (void *)((char *)fhw->buf + fhw->done); + const size_t size = fhw->size - fhw->done; + ssize_t len = write(fd, buf, size); + statCounter.syscalls.sock.writes++; + fd_bytes(fd, len, FD_WRITE); + if ((size_t)len == size) { + /* finished */ + comm_callback(fd, fhw->buf, (ssize_t)fhw->size, COMM_OK, 0, + fhw->callback, fhw->data, fhw->freefunc); + return 1; + } + if (len == -1 && !ignoreErrno(errno)) { + /* error */ + comm_callback(fd, fhw->buf, fhw->done, COMM_ERROR, errno, + fhw->callback, fhw->data, fhw->freefunc); + return 1; + } + /* try again */ + if (len > 0) + fhw->done += len; + return 0; +} + +void +ncomm_write(int fd, const void *buf, size_t size, COMMCB *callback, void *data, + FREE *freefunc) +{ + ssize_t len = write(fd, buf, size); + statCounter.syscalls.sock.writes++; + fd_bytes(fd, len, FD_WRITE); + if ((size_t)len == size) { + /* Finished immediately */ + comm_callback(fd, (void *)buf, len, COMM_OK, 0, callback, data, + freefunc); + return; + } + if (len == -1 && !ignoreErrno(errno)) { + /* error */ + comm_callback(fd, (void *)buf, len, COMM_ERROR, errno, callback, data, + freefunc); + return; + } + /* Oops.. we could not write all data. Register for write event */ + fd_table[fd].new.write.buf = (void *)buf; + fd_table[fd].new.write.size = size; + if (len != -1) + fd_table[fd].new.write.done = len; + else + fd_table[fd].new.write.done = 0; + fd_table[fd].new.write.callback = callback; + fd_table[fd].new.write.data = data; + fd_table[fd].new.write.handler = comm_do_write; + comm_register_for_event(fd, EVENT_WRITE); +} + +/* a wrapper around comm_write to allow for MemBuf to be comm_written in a snap +*/ +void +ncomm_write_mbuf(int fd, MemBuf mb, COMMCB *handler, void *handler_data) +{ + ncomm_write(fd, mb.buf, mb.size, handler, handler_data, + memBufFreeFunc(&mb) ); +} + +static int +comm_do_accept(int fd, fh_read_t *fhr) +{ + int nfd; + struct sockaddr saddr; + socklen_t addrlen; + fde *F; + + while(1) { + addrlen = sizeof(struct sockaddr); + nfd = accept(fd, &saddr, &addrlen); + statCounter.syscalls.sock.accepts++; + if (nfd < 0) + break; + /* Update the FD state */ + F = &fd_table[nfd]; + + memset(&F->new, 0, sizeof(fd_table[nfd].new)); + fd_open(nfd, FD_SOCKET, "Incoming request"); + + /* Update the socket information */ + addrlen = sizeof(struct sockaddr); + getsockname(nfd, (struct sockaddr *) &F->new.local, &addrlen); + xstrncpy(F->ipaddr, inet_ntoa(F->new.peer.sin_addr), 16); + F->remote_port = htons(F->new.peer.sin_port); + F->local_port = htons(F->new.local.sin_port); + + /* Set the flags */ + commSetCloseOnExec(nfd); + commSetNonBlocking(nfd); + + /* Finally, hand over the socket */ + comm_callback(nfd, &fd_table[nfd].new.peer, addrlen, COMM_OK, 0, + fhr->callback, fhr->data, NULL); + } + if (!ignoreErrno(errno)) { + perror("accept:"); + } + /* continue listening */ + return 0; +} + +int +ncomm_listen(int fd, int backlog, COMMCB *callback, void *data) +{ + int err = listen(fd, backlog); + if (err != 0) + return err; + if (err != 0) + return err; + fd_table[fd].new.read.buf = NULL; + fd_table[fd].new.read.size = 0; + fd_table[fd].new.read.callback = callback; + fd_table[fd].new.read.data = data; + fd_table[fd].new.read.handler = comm_do_accept; + comm_register_for_event(fd, EVENT_READ); + comm_do_accept(fd, &fd_table[fd].new.read); + return 0; +} + +void +ncomm_init(void) +{ + /* nothing for now */ +} + + +/* This will eventually be merged with comm.c when the old comm code goes */ + +/* + * Here's a problem - we can have pending callbacks in the callback queue, *and* + * we can have a write callback. We don't want to call the write callback if its in + * the queue. So, if its in the queue, we mark it and don't call it at the end. + * + * XXX yes this is evil, I know. We'll tidy it up once the old comm code goes away. + * -- adrian + */ +void +ncomm_close(int fd) +{ + dlink_node *node, *tnode; + comm_callback_entry_t *cb; + int found_write_cb = 0; + fde *F = &fd_table[fd]; + COMMCB *callback; + + /* Flush any pending callbacks in the queue that are for us */ + /* XXX this is obviously O(N) rather than O(1) which needs changing! */ + node = callbacklist.head; + + while (node != NULL) { + tnode = node->next; + cb = node->data; + if (cb->fd == fd) { + if ((cb->data == F->new.write.data) && (cb->callback == F->new.write.callback) && + (cb->buf == F->new.write.buf)) { + found_write_cb = 1; + cb->callback(cb->fd, cb->buf, cb->len, COMM_ERR_CLOSING, cb->errno_nr, cb->data); + } else { + cb->callback(cb->fd, cb->buf, cb->len, cb->error, cb->errno_nr, cb->data); + } + + dlinkDelete(&cb->node, &callbacklist); + xfree(cb); + } + node = tnode; + } + + /* Now, kill the existing write callback */ + if (found_write_cb == 0) { + if (F->new.write.data && F->new.write.callback) { + callback = F->new.write.callback; + F->new.write.callback = NULL; + callback(fd, F->new.write.buf, 0, COMM_ERR_CLOSING, 0, F->new.write.data); + } + } + + /* Done! */ +}