--------------------- PatchSet 10650 Date: 2008/07/15 16:33:34 Author: adri Branch: delay_pool_write Tag: (none) Log: Implement the first attempt at delayed writes: * create a pair of slow write fd lists - the active and the "draining" list * schedule a per-second event to drain the lists * the list drain swaps active with draining list and drains the draining list randomly to try and give FDs a chance at doing "stuff"; its entirely possible the FD will end up right back on the drained list * push the FD onto the slow write fd list if the delay pool has no space left for the given delay id. * try to ensure that an FD won't appear on the same list twice by marking the write as "delayed" and not rescheduling two pending write callbacks? All this is as of yet untested. Members: src/comm.c:1.60.2.1->1.60.2.2 src/structs.h:1.188.2.1->1.188.2.2 Index: squid/src/comm.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/comm.c,v retrieving revision 1.60.2.1 retrieving revision 1.60.2.2 diff -u -r1.60.2.1 -r1.60.2.2 --- squid/src/comm.c 15 Jul 2008 15:59:54 -0000 1.60.2.1 +++ squid/src/comm.c 15 Jul 2008 16:33:34 -0000 1.60.2.2 @@ -1,6 +1,6 @@ /* - * $Id: comm.c,v 1.60.2.1 2008/07/15 15:59:54 adri Exp $ + * $Id: comm.c,v 1.60.2.2 2008/07/15 16:33:34 adri Exp $ * * DEBUG: section 5 Socket Functions * AUTHOR: Harvest Derived @@ -81,6 +81,12 @@ static MemPool *comm_write_pool = NULL; static MemPool *conn_close_pool = NULL; +#if DELAY_POOLS +static int *slow_wfds = NULL, *slow_wfds_alt = NULL; +static int n_slow_wfds = 0; +static EVH comm_slow_wfds_wakeup_event; +#endif + static void CommWriteStateCallbackAndFree(int fd, int code) { @@ -1180,8 +1186,60 @@ CBDATA_INIT_TYPE(ConnectStateData); comm_write_pool = memPoolCreate("CommWriteStateData", sizeof(CommWriteStateData)); conn_close_pool = memPoolCreate("close_handler", sizeof(close_handler)); +#if DELAY_POOLS + slow_wfds = xcalloc(sizeof(int), Squid_MaxFD); + slow_wfds_alt = xcalloc(sizeof(int), Squid_MaxFD); + /* High priority so it runs before other events */ + eventAdd("comm_slow_wfds_wakeup_event", comm_slow_wfds_wakeup_event, NULL, 1.0, 256); +#endif } +#if DELAY_POOLS +static void +comm_slow_wfds_add(int fd) +{ + slow_wfds[n_slow_wfds] = fd; + n_slow_wfds++; + assert(n_slow_fds < Squid_MaxFD); +} + +/* Swap the list over, set the active list to length 0 */ +static void +comm_slow_wfds_swap_list(void) +{ + int *a; + + a = slow_wfds; + slow_wfds = slow_wfds_alt; + slow_wfds_alt = a; + n_slow_wfds = 0; +} + +static void +comm_slow_wfds_wakeup_event(void *notused) +{ + int j, n, fd; + fde *F; + + /* Swap the lists over so additions don't end up on this list */ + n = n_slow_wfds; + comm_slow_wfds_swap_list(); + while (n) { + j = (squid_random() % n) + 1; + fd = slow_wfds_alt[j]; + F = &fd_table[fd]; + n--; + + /* call the write callback attempt - this may requeue the FD for sleep */ + F->rwstate.write_delayed = 0; /* let the write run now */ + commHandleWrite(fd, &F->rwstate); + } + + /* High priority so it runs before other events */ + eventAdd("comm_slow_wfds_wakeup_event", comm_slow_wfds_wakeup_event, NULL, 1.0, 256); +} +#endif + /* Write to FD. */ static void commHandleWrite(int fd, void *data) @@ -1195,6 +1253,10 @@ assert(state->valid); + /* Don't try to write if the write has been delayed - we'll be woken up shortly */ + if (state->write_delayed) + return; + debug(5, 5) ("commHandleWrite: FD %d: off %ld, hd %ld, sz %ld.\n", fd, (long int) state->offset, (long int) state->header_size, (long int) state->size); @@ -1211,6 +1273,11 @@ * If the bucket is empty then we push ourselves onto the slow write fds * list and worry about the write later. */ + if (writesz == 0) { + comm_slow_wfds_add(fd); + state->write_delayed = 1; + return; + } /* Ok we have some bytes to write; write them */ } @@ -1299,6 +1366,8 @@ if (state->valid) { debug(5, 1) ("comm_write: fd_table[%d].rwstate.valid == true!\n", fd); fd_table[fd].rwstate.valid = 0; + /* XXX If there's a delay pool involved then it may be in one of the slow write fd lists? */ + assert(state->write_delayed == 0); } state->buf = (char *) buf; state->size = size; @@ -1310,6 +1379,7 @@ state->valid = 1; #if DELAY_POOLS state->delayid = 0; /* no pool */ + state->write_delayed = 0; #endif cbdataLock(handler_data); commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, NULL, 0); Index: squid/src/structs.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/structs.h,v retrieving revision 1.188.2.1 retrieving revision 1.188.2.2 diff -u -r1.188.2.1 -r1.188.2.2 --- squid/src/structs.h 15 Jul 2008 15:59:55 -0000 1.188.2.1 +++ squid/src/structs.h 15 Jul 2008 16:33:34 -0000 1.188.2.2 @@ -1,6 +1,6 @@ /* - * $Id: structs.h,v 1.188.2.1 2008/07/15 15:59:55 adri Exp $ + * $Id: structs.h,v 1.188.2.2 2008/07/15 16:33:34 adri Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -899,6 +899,7 @@ size_t header_size; #if DELAY_POOLS delay_id delayid; + int write_delayed; #endif };