--------------------- PatchSet 1579 Date: 2001/02/15 15:53:37 Author: adri Branch: eventio Tag: (none) Log: Initial import of Henrik's draft eventio-poll code. I have only brought it into the build - a lot more work needs to be done before I can start converting bits of the code to use this new IO methods. Members: src/Makefile.in:1.7->1.7.8.1 src/comm_server.c:1.1->1.1.2.1 src/enums.h:1.14->1.14.8.1 src/protos.h:1.18->1.18.8.1 src/structs.h:1.24->1.24.8.1 src/typedefs.h:1.15->1.15.8.1 Index: squid/src/Makefile.in =================================================================== RCS file: /cvsroot/squid-sf//squid/src/Attic/Makefile.in,v retrieving revision 1.7 retrieving revision 1.7.8.1 diff -u -r1.7 -r1.7.8.1 --- squid/src/Makefile.in 10 Feb 2001 16:49:03 -0000 1.7 +++ squid/src/Makefile.in 15 Feb 2001 15:53:37 -0000 1.7.8.1 @@ -1,7 +1,7 @@ # # Makefile for the Squid Object Cache server # -# $Id: Makefile.in,v 1.7 2001/02/10 16:49:03 hno Exp $ +# $Id: Makefile.in,v 1.7.8.1 2001/02/15 15:53:37 adri Exp $ # # Uncomment and customize the following to suit your needs: # @@ -103,6 +103,7 @@ client_side.o \ comm.o \ comm_select.o \ + comm_server.o \ debug.o \ @DELAY_OBJS@ \ disk.o \ --- /dev/null Wed Feb 14 00:50:05 2007 +++ squid/src/comm_server.c Wed Feb 14 00:50:28 2007 @@ -0,0 +1,285 @@ +/* + * Adrian's todo list with this code + * + * - make the callback list code use a dlink + * - convert things to use memory pools + * - add debugging statements + * - merge in the "new" stuff with fd.c, and use fd.c right here + * (ie use fd_open/close to clear the struct completely, call + * fd_open whenever we create something..) + * - Make sure we call the comm.c routines after accept() to set + * non-blocking, etc. + */ + +#include "squid.h" + +#define UNUSED __attribute__((unused)) + + +static struct pollfd pollfds[FD_SETSIZE]; +static unsigned int pollnfds = 0; +static unsigned int pollfirstfree = 0; + +comm_callback_entry_t *cbhead = NULL; +comm_callback_entry_t **cbtail = &cbhead; + +static void +comm_callback(int fd, void *buf, ssize_t len, int errno_nr, COMMCB callback, void *data) +{ + comm_callback_entry_t *cbent = xcalloc(1, sizeof(comm_callback_entry_t)); + cbent->callback = callback; + cbent->fd = fd; + cbent->buf = buf; + cbent->len = len; + cbent->errno_nr = errno_nr; + cbent->data = data; + cbent->next = NULL; + *cbtail = cbent; + cbtail = &cbent->next; +} + +static inline void +cleanup_poll(int fd, unsigned int comm_index) +{ + if (!pollfds[comm_index].events) { + fd_table[fd].new.comm_index = -1; + pollfds[comm_index].fd = -1; + if (comm_index < pollfirstfree) + pollfirstfree = comm_index; + if (comm_index == pollnfds - 1) { + /* Shrink the active set */ + while(pollnfds > 0 && pollfds[pollnfds-1].fd == -1) + pollnfds--; + } + } +} + +static inline void +comm_read_event(int fd) +{ + int comm_index = fd_table[fd].new.comm_index; + int done = fd_table[fd].new.read.handler(fd, &fd_table[fd].new.read); + if (done) { + pollfds[comm_index].events &= ~POLLIN; + cleanup_poll(fd, (unsigned int)comm_index); + } +} + + +static inline void +comm_write_event(int fd) +{ + int comm_index = fd_table[fd].new.comm_index; + int done = fd_table[fd].new.write.handler(fd, &fd_table[fd].new.write); + if (done) { + pollfds[comm_index].events &= ~POLLOUT; + cleanup_poll(fd, (unsigned int)comm_index); + } +} + +void +ncomm_handle_events(void) +{ + int comm_index; + int fds = poll(pollfds, pollnfds, -1); + for (comm_index=0; comm_index < (int)pollnfds && fds > 0; comm_index++) { + short revents = pollfds[comm_index].revents; + int fd = pollfds[comm_index].fd; + if(revents) { + fds--; + if (revents & POLLIN) { + comm_read_event(fd); + } else if (revents & POLLOUT) { + comm_write_event(fd); + } else if (revents & (POLLHUP | POLLERR | POLLNVAL)) { + short events = pollfds[comm_index].events; + /* I am pretty sure there is better ways to handle errors.. */ + if (events & POLLOUT) + comm_write_event(fd); + else if (events & POLLIN) + comm_read_event(fd); + } + } + } + /* Do callbacks */ + while(cbhead) { + comm_callback_entry_t *cb = cbhead; + cb->callback(cb->fd, cb->buf, cb->len, cb->errno_nr, cb->data); + cbhead = cbhead->next; + xfree(cb); + } + cbtail = &cbhead; +} + +static void +comm_register_for_event(int fd, enum comm_event_type event) +{ + int comm_index = fd_table[fd].new.comm_index; + short events = 0; + switch (event) { + case EVENT_READ: + events = POLLIN; + break; + case EVENT_WRITE: + events = POLLOUT; + break; + default: + /* ERROR! */ + return; + } + if (comm_index < 0) { + comm_index = pollfirstfree; + while (pollfds[comm_index].fd >= 0 && comm_index < (int)pollnfds) + comm_index++; + if (comm_index >= (int)pollnfds) + pollnfds = (unsigned int)comm_index + 1; + pollfirstfree = comm_index + 1; + pollfds[comm_index].fd = fd; + fd_table[fd].new.comm_index = comm_index; + } + pollfds[comm_index].events |= events; +} + +static int +comm_do_read(int fd, fh_read_t *fhr) +{ + /* Read in some data */ + const ssize_t len = read(fd, fhr->buf, fhr->size); + if (len >= 0) { + /* We got some data to deliver to the application */ + comm_callback(fd, fhr->buf, len, 0, fhr->callback, fhr->data); + return 1; + } + if (len == -1 && !ignoreErrno(errno)) { + /* Error */ + comm_callback(fd, fhr->buf, len, errno, fhr->callback, fhr->data); + return 1; + } + /* Try again */ + return 0; +} + +void +ncomm_read(int fd, void *buf, size_t size, COMMCB callback, void *data) +{ + /* Read in some data */ + const ssize_t len = read(fd, buf, size); + if (len >= 0) { + /* We immediately got some data to deliver to the application */ + comm_callback(fd, buf, len, 0, callback, data); + return; + } + if (len == -1 && !ignoreErrno(errno)) { + /* Error */ + comm_callback(fd, buf, len, errno, callback, data); + return; + } + /* Oops.. we could not get any data. Register for read event */ + fd_table[fd].new.read.buf = buf; + fd_table[fd].new.read.size = size; + fd_table[fd].new.read.callback = callback; + fd_table[fd].new.read.data = data; + fd_table[fd].new.read.handler = comm_do_read; + comm_register_for_event(fd, EVENT_READ); +} + +static int +comm_do_write(int fd, fh_write_t *fhw) +{ + /* Write out some more data */ + const void *buf = (void *)((char *)fhw->buf + fhw->done); + const size_t size = fhw->size - fhw->done; + ssize_t len = write(fd, buf, size); + if ((size_t)len == size) { + /* finished */ + comm_callback(fd, fhw->buf, (ssize_t)fhw->size, 0, fhw->callback, fhw->data); + return 1; + } + if (len == -1 && !ignoreErrno(errno)) { + /* error */ + comm_callback(fd, fhw->buf, fhw->done, errno, fhw->callback, fhw->data); + return 1; + } + /* try again */ + if (len > 0) + fhw->done += len; + return 0; +} + +void +ncomm_write(int fd, const void *buf, size_t size, COMMCB callback, void *data) +{ + ssize_t len = write(fd, buf, size); + if ((size_t)len == size) { + /* Finished immediately */ + comm_callback(fd, (void *)buf, len, 0, callback, data); + return; + } + if (len == -1 && !ignoreErrno(errno)) { + /* error */ + comm_callback(fd, (void *)buf, len, errno, callback, data); + return; + } + /* Oops.. we could not write all data. Register for write event */ + fd_table[fd].new.write.buf = (void *)buf; + fd_table[fd].new.write.size = size; + if (len != -1) + fd_table[fd].new.write.done = len; + else + fd_table[fd].new.write.done = 0; + fd_table[fd].new.write.callback = callback; + fd_table[fd].new.write.data = data; + fd_table[fd].new.write.handler = comm_do_write; + comm_register_for_event(fd, EVENT_WRITE); +} + +void comm_new_fd(int fd) +{ + memset(&fd_table[fd].new, 0, sizeof(fd_table[fd].new)); + fd_table[fd].new.comm_index = -1; + fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); +} + +static int +comm_do_accept(int fd, fh_read_t *fhr) +{ + int nfd; + struct sockaddr saddr; + socklen_t addrlen = sizeof(saddr); + + while((nfd = accept(fd, &saddr, &addrlen)) != -1) { + memset(&fd_table[nfd].new, 0, sizeof(fd_table[nfd].new)); + comm_new_fd(nfd); + memcpy(&fd_table[nfd].new.peer, &saddr, sizeof(saddr)); + comm_callback(nfd, &fd_table[nfd].new.peer, addrlen, 0, fhr->callback, fhr->data); + } + if (!ignoreErrno(errno)) { + perror("accept:"); + } + /* continue listening */ + return 0; +} + +int +ncomm_listen(int fd, int backlog, COMMCB callback, void *data) +{ + int err = listen(fd, backlog); + if (err != 0) + return err; + if (err != 0) + return err; + fd_table[fd].new.read.buf = NULL; + fd_table[fd].new.read.size = 0; + fd_table[fd].new.read.callback = callback; + fd_table[fd].new.read.data = data; + fd_table[fd].new.read.handler = comm_do_accept; + comm_register_for_event(fd, EVENT_READ); + comm_do_accept(fd, &fd_table[fd].new.read); + return 0; +} + +void +ncomm_init(void) +{ + /* nothing for now */ +} Index: squid/src/enums.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/enums.h,v retrieving revision 1.14 retrieving revision 1.14.8.1 diff -u -r1.14 -r1.14.8.1 --- squid/src/enums.h 9 Feb 2001 19:52:05 -0000 1.14 +++ squid/src/enums.h 15 Feb 2001 15:53:37 -0000 1.14.8.1 @@ -1,6 +1,6 @@ /* - * $Id: enums.h,v 1.14 2001/02/09 19:52:05 hno Exp $ + * $Id: enums.h,v 1.14.8.1 2001/02/15 15:53:37 adri Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -749,3 +749,13 @@ CBDATA_storeIOState, CBDATA_FIRST_CUSTOM_TYPE = 1000 } cbdata_type; + +/* + * comm_server stuff + */ +enum comm_event_type { + EVENT_NONE, + EVENT_READ, + EVENT_WRITE +}; + Index: squid/src/protos.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/protos.h,v retrieving revision 1.18 retrieving revision 1.18.8.1 diff -u -r1.18 -r1.18.8.1 --- squid/src/protos.h 10 Feb 2001 16:49:04 -0000 1.18 +++ squid/src/protos.h 15 Feb 2001 15:53:37 -0000 1.18.8.1 @@ -1,6 +1,6 @@ /* - * $Id: protos.h,v 1.18 2001/02/10 16:49:04 hno Exp $ + * $Id: protos.h,v 1.18.8.1 2001/02/15 15:53:37 adri Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -1293,3 +1293,13 @@ * hack to allow snmp access to the statistics counters */ extern StatCounters *snmpStatGet(int); + +/* + * comm_server.c + */ +extern void ncomm_init(void); +extern void ncomm_handle_events(void); +extern void ncomm_read(int fd, void *buf, size_t size, COMMCB callback, + void *data); +extern int ncomm_listen(int fd, int backlog, COMMCB callback, void *data); + Index: squid/src/structs.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/structs.h,v retrieving revision 1.24 retrieving revision 1.24.8.1 diff -u -r1.24 -r1.24.8.1 --- squid/src/structs.h 10 Feb 2001 16:49:04 -0000 1.24 +++ squid/src/structs.h 15 Feb 2001 15:53:37 -0000 1.24.8.1 @@ -1,6 +1,6 @@ /* - * $Id: structs.h,v 1.24 2001/02/10 16:49:04 hno Exp $ + * $Id: structs.h,v 1.24.8.1 2001/02/15 15:53:37 adri Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -693,6 +693,24 @@ int weak; /* true if it is a weak validator */ }; + +struct _fh_read { + void *buf; + size_t size; + COMMCB callback; + void *data; + int (*handler)(int fd, struct _fh_read *); +}; + +struct _fh_write { + void *buf; + size_t size; + size_t done; + COMMCB callback; + void *data; + int (*handler)(int fd, struct _fh_write *); +}; + struct _fde { unsigned int type; u_short local_port; @@ -732,6 +750,14 @@ DEFER *defer_check; /* check if we should defer read */ void *defer_data; CommWriteStateData *rwstate; /* State data for comm_write */ + + /* new comm_server stuff */ + struct { + struct _fh_read read; + struct _fh_write write; + int comm_index; + struct sockaddr peer; + } new; }; struct _fileMap { @@ -2105,3 +2131,17 @@ char *name; void (*parse) (SwapDir * sd, const char *option, const char *value, int reconfiguring); }; + +/* + * comm_server stuff + */ +struct _comm_callback_entry { + struct _comm_callback_entry *next; + COMMCB callback; + int fd; + void *buf; + ssize_t len; + int errno_nr; + void *data; +}; + Index: squid/src/typedefs.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/typedefs.h,v retrieving revision 1.15 retrieving revision 1.15.8.1 diff -u -r1.15 -r1.15.8.1 --- squid/src/typedefs.h 7 Feb 2001 19:11:48 -0000 1.15 +++ squid/src/typedefs.h 15 Feb 2001 15:53:37 -0000 1.15.8.1 @@ -1,6 +1,6 @@ /* - * $Id: typedefs.h,v 1.15 2001/02/07 19:11:48 hno Exp $ + * $Id: typedefs.h,v 1.15.8.1 2001/02/15 15:53:37 adri Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -88,6 +88,8 @@ typedef struct _dnsStatData dnsStatData; typedef struct _dwrite_q dwrite_q; typedef struct _ETag ETag; +typedef struct _fh_read fh_read_t; +typedef struct _fh_write fh_write_t; typedef struct _fde fde; typedef struct _fileMap fileMap; typedef struct _HttpReply http_reply; @@ -190,6 +192,7 @@ typedef struct _RemovalPolicySettings RemovalPolicySettings; typedef struct _http_version_t http_version_t; +typedef struct _comm_callback_entry comm_callback_entry_t; #if SQUID_SNMP typedef variable_list *(oid_ParseFn) (variable_list *, snint *); @@ -353,4 +356,11 @@ typedef int STDIRSELECT(const StoreEntry *); +/* + * comm_server + */ +typedef void (*COMMCB)(int fd, void *buf, ssize_t len, int errno_nr, + void *data); + + #endif /* _TYPEDEFS_H_ */