--------------------- PatchSet 575 Date: 2000/09/16 20:47:58 Author: hno Branch: hno-devel Tag: (none) Log: Major cleanup of the threads optimisations, and corrected how the I/O load is calculated and used. Now there are two limits: /* Queue limit where swapouts are deferred (load calculation) */ #define MAGIC1 (NUMTHREADS*Config.cacheSwap.n_configured*5) /* Queue limit where swapins are deferred (open/create fails) */ #define MAGIC2 (NUMTHREADS*Config.cacheSwap.n_configured*20) Members: src/fs/aufs/Makefile.in:1.1.6.3.2.1->1.1.6.3.2.2 src/fs/aufs/aiops.c:1.1.6.1.2.3->1.1.6.1.2.4 src/fs/aufs/store_asyncufs.h:1.1.6.3.2.4->1.1.6.3.2.5 src/fs/aufs/store_dir_aufs.c:1.1.6.6.2.5->1.1.6.6.2.6 src/fs/aufs/store_io_aufs.c:1.1.6.6.2.4->1.1.6.6.2.5 Index: squid/src/fs/aufs/Makefile.in =================================================================== RCS file: /cvsroot/squid-sf//squid/src/fs/aufs/Attic/Makefile.in,v retrieving revision 1.1.6.3.2.1 retrieving revision 1.1.6.3.2.2 diff -u -r1.1.6.3.2.1 -r1.1.6.3.2.2 --- squid/src/fs/aufs/Makefile.in 2 May 2000 22:50:29 -0000 1.1.6.3.2.1 +++ squid/src/fs/aufs/Makefile.in 16 Sep 2000 20:47:58 -0000 1.1.6.3.2.2 @@ -36,6 +36,7 @@ $(RANLIB) $(OUT) $(OBJS): $(top_srcdir)/include/version.h ../../../include/autoconf.h +$(OBJS): store_asyncufs.h .c.o: @rm -f ../stamp Index: squid/src/fs/aufs/aiops.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/fs/aufs/aiops.c,v retrieving revision 1.1.6.1.2.3 retrieving revision 1.1.6.1.2.4 diff -u -r1.1.6.1.2.3 -r1.1.6.1.2.4 --- squid/src/fs/aufs/aiops.c 1 Jul 2000 14:12:52 -0000 1.1.6.1.2.3 +++ squid/src/fs/aufs/aiops.c 16 Sep 2000 20:48:01 -0000 1.1.6.1.2.4 @@ -49,9 +49,6 @@ #define RIDICULOUS_LENGTH 4096 -#define NR_AIO_REQUEST_QUEUES 2 -#define NR_AIO_DONE_QUEUES 2 - enum _aio_thread_status { _THREAD_STARTING = 0, _THREAD_WAITING, @@ -90,7 +87,6 @@ struct stat *tmpstatp; struct stat *statp; aio_result_t *resultp; - dlink_node node; } aio_request_t; typedef struct aio_request_queue_t { @@ -141,11 +137,11 @@ static aio_thread_t threads[NUMTHREADS]; static int aio_initialised = 0; -static dlink_list requests; static int request_queue_len = 0; static MemPool *aio_request_pool = NULL; -static aio_request_queue_t request_queue[NR_AIO_REQUEST_QUEUES + 1]; -static aio_request_queue_t done_queue[NR_AIO_DONE_QUEUES]; +static aio_request_queue_t request_queue; +static struct { aio_request_t *head, **tailp; } request_queue2 = { NULL, &request_queue2.head }; +static aio_request_queue_t done_queue; static struct { aio_request_t *head, **tailp; } done_requests = { NULL, &done_requests.head }; static pthread_attr_t globattr; static struct sched_param globsched; @@ -174,30 +170,31 @@ pthread_attr_setschedparam(&globattr, &globsched); #endif - /* Initialize request queues */ - for (i = 0; i < NR_AIO_REQUEST_QUEUES + 1; i++) { - aio_request_queue_t *queue = &request_queue[i]; - if (pthread_mutex_init(&(queue->mutex), NULL)) - fatal("Failed to create mutex"); - if (pthread_cond_init(&(queue->cond), NULL)) - fatal("Failed to create condition variable"); - queue->head = NULL; - queue->tailp = &queue->head; - queue->requests = 0; - queue->blocked = 0; - } - /* Initialize done queues */ - for (i = 0; i < NR_AIO_DONE_QUEUES; i++) { - aio_request_queue_t *queue = &done_queue[i]; - if (pthread_mutex_init(&(queue->mutex), NULL)) - fatal("Failed to create mutex"); - if (pthread_cond_init(&(queue->cond), NULL)) - fatal("Failed to create condition variable"); - queue->head = NULL; - queue->tailp = &queue->head; - queue->requests = 0; - queue->blocked = 0; - } + /* Initialize request queue */ + if (pthread_mutex_init(&(request_queue.mutex), NULL)) + fatal("Failed to create mutex"); + if (pthread_cond_init(&(request_queue.cond), NULL)) + fatal("Failed to create condition variable"); + request_queue.head = NULL; + request_queue.tailp = &request_queue.head; + request_queue.requests = 0; + request_queue.blocked = 0; + + /* Initialize request2 queue for temporary storage + * when request_queue is blocked + */ + request_queue2.head = NULL; + request_queue2.tailp = &request_queue.head; + + /* Initialize done queue */ + if (pthread_mutex_init(&(done_queue.mutex), NULL)) + fatal("Failed to create mutex"); + if (pthread_cond_init(&(done_queue.cond), NULL)) + fatal("Failed to create condition variable"); + done_queue.head = NULL; + done_queue.tailp = &done_queue.head; + done_queue.requests = 0; + done_queue.blocked = 0; /* Create threads and get them to sit in their wait loop */ for (i = 0; i < NUMTHREADS; i++) { @@ -226,9 +223,6 @@ aio_thread_t *threadp = ptr; aio_request_t *request; sigset_t new; - int i; - struct timespec wait_time; - aio_request_queue_t *queue; /* * Make sure to ignore signals which may possibly get sent to @@ -254,45 +248,20 @@ while (1) { threadp->current_req = request = NULL; - wait_time.tv_sec = current_time.tv_sec + 1; - wait_time.tv_nsec = 0; request = NULL; - while(!request) { - /* poll the overflow queues */ - for (i=NR_AIO_REQUEST_QUEUES-1; !request && i > 0; i--) { - queue = &request_queue[i]; - if (queue->head && !pthread_mutex_trylock(&queue->mutex)) { - request = queue->head; - if (request) { - queue->head = request->next; - if (!queue->head) - queue->tailp = &queue->head; - } - pthread_mutex_unlock(&queue->mutex); - } - } - if (!request) { - /* poll the main queue */ - queue = request_queue; - threadp->status = _THREAD_WAITING; - pthread_mutex_lock(&queue->mutex); - if (!queue->head) { - /* no request here, go to sleep with a timeout to catch - * some low load deadlocks caused by the multiple - * queue design or pthread_cond_signal.. - */ - pthread_cond_timedwait(&queue->cond, &queue->mutex, &wait_time); - wait_time.tv_sec += 5; /* idle some when idle */ - } - /* Get the request if any */ - request = queue->head; - if (request) - queue->head = request->next; - if (!queue->head) - queue->tailp = &queue->head; - pthread_mutex_unlock(&queue->mutex); - } - } + /* Get a request to process */ + threadp->status = _THREAD_WAITING; + pthread_mutex_lock(&request_queue.mutex); + while(!request_queue.head) { + pthread_cond_wait(&request_queue.cond, &request_queue.mutex); + } + request = request_queue.head; + if (request) + request_queue.head = request->next; + if (!request_queue.head) + request_queue.tailp = &request_queue.head; + pthread_mutex_unlock(&request_queue.mutex); + /* process the request */ threadp->status = _THREAD_BUSY; request->next = NULL; threadp->current_req = request; @@ -335,20 +304,11 @@ request->err = EINTR; } threadp->status = _THREAD_DONE; - /* find a done queue */ - for (i=0; i < NR_AIO_DONE_QUEUES; i++) { - queue = &done_queue[i]; - if (i != NR_AIO_DONE_QUEUES-1) { - if (!pthread_mutex_trylock(&queue->mutex)) - break; - } else { - pthread_mutex_lock(&queue->mutex); - break; - } - } - *queue->tailp = request; - queue->tailp = &request->next; - pthread_mutex_unlock(&queue->mutex); + /* put the request in the done queue */ + pthread_mutex_lock(&done_queue.mutex); + *done_queue.tailp = request; + done_queue.tailp = &request->next; + pthread_mutex_unlock(&done_queue.mutex); threadp->requests++; } /* while forever */ return NULL; @@ -360,40 +320,48 @@ static int last_warn = 0; static int high_start = 0; static int queue_high, queue_low; - int i; - aio_request_queue_t *queue; + debug(41, 9) ("aio_queue_request: %p type=%d result=%p\n", + request, request->request_type, request->resultp); /* Mark it as not executed (failing result, no error) */ request->ret = -1; request->err = 0; - dlinkAdd(request,&request->node,&requests); - /* Find a available request queue */ - for (i=0; i < NR_AIO_REQUEST_QUEUES+1; i++) { - queue = &request_queue[i]; - if (!pthread_mutex_trylock(&queue->mutex)) - break; - else { - queue->blocked+=1; - queue = NULL; + /* Internal housekeeping */ + request_queue_len += 1; + request->resultp->_data = request; + /* Play some tricks with the request_queue2 queue */ + request->next = NULL; + if (!request_queue2.head) { + if (pthread_mutex_trylock(&request_queue.mutex) == 0) { + /* Normal path */ + *request_queue.tailp = request; + request_queue.tailp = &request->next; + pthread_cond_signal(&request_queue.cond); + pthread_mutex_unlock(&request_queue.mutex); + } else { + /* Oops, the request queue is blocked, use request_queue2 */ + *request_queue2.tailp = request; + request_queue2.tailp = &request->next; + } + } else { + /* Secondary path. We have blocked requests to deal with */ + /* add the request to the chain */ + *request_queue2.tailp = request; + if (pthread_mutex_trylock(&request_queue.mutex) == 0) { + /* Ok, the queue is no longer blocked */ + *request_queue.tailp = request_queue2.head; + request_queue.tailp = &request->next; + pthread_cond_signal(&request_queue.cond); + pthread_mutex_unlock(&request_queue.mutex); + request_queue2.head = NULL; + request_queue2.tailp = &request_queue2.head; + } else { + /* still blocked, bump the blocked request chain */ + request_queue2.tailp = &request->next; } } - assert(queue); - request->next = NULL; - *queue->tailp = request; - queue->tailp = &request->next; - /* kick requests in the last overflow (main only) queue */ - if (i < NR_AIO_REQUEST_QUEUES && - request_queue[NR_AIO_REQUEST_QUEUES].head) { - *queue->tailp = request_queue[NR_AIO_REQUEST_QUEUES].head; - queue->tailp = request_queue[NR_AIO_REQUEST_QUEUES].tailp; - request_queue[NR_AIO_REQUEST_QUEUES].head = NULL; - request_queue[NR_AIO_REQUEST_QUEUES].tailp = &request_queue[NR_AIO_REQUEST_QUEUES].head; - } - if (i==0) - pthread_cond_signal(&queue->cond); - pthread_mutex_unlock(&queue->mutex); - queue->requests += 1; - request_queue_len += 1; + if (request_queue2.head) + debug(43, 1) ("aio_queue_request: WARNING - Queue congestion\n"); /* Warn if out of threads */ if (request_queue_len > (NUMTHREADS << 1)) { aio_poll_queues(); @@ -479,16 +447,15 @@ int aio_cancel(aio_result_t * resultp) { - aio_request_t *request; - dlink_node *node; + aio_request_t *request = resultp->_data; - for (node = requests.head; node != NULL; node = node->next) { - request = node->data; - if (request->resultp == resultp) { - request->cancelled = 1; - request->resultp = NULL; - return 0; - } + if (request && request->resultp == resultp) { + debug(41, 9) ("aio_cancel: %p type=%d result=%p\n", + request, request->request_type, request->resultp); + request->cancelled = 1; + request->resultp = NULL; + resultp->_data = NULL; + return 0; } return 1; } /* aio_cancel */ @@ -778,41 +745,29 @@ static void aio_poll_queues(void) { - int i; - aio_request_queue_t *queue; - aio_request_t *request; - /* kick "last-overflow" request queue */ - if (request_queue[NR_AIO_REQUEST_QUEUES].head) { - for(i=0; imutex)) - break; - queue=NULL; - } - if (queue) { - *queue->tailp=request_queue[NR_AIO_REQUEST_QUEUES].head; - queue->tailp=request_queue[NR_AIO_REQUEST_QUEUES].tailp; - request_queue[NR_AIO_REQUEST_QUEUES].head=NULL; - request_queue[NR_AIO_REQUEST_QUEUES].tailp=&request_queue[NR_AIO_REQUEST_QUEUES].head; - if (i==0) - pthread_cond_signal(&queue->cond); - pthread_mutex_unlock(&queue->mutex); - } - } - /* poll done queues */ - for(i=0; ihead && !pthread_mutex_trylock(&queue->mutex)) { - for(request=queue->head;request;request = request->next) { - request_queue_len -= 1; - queue->requests += 1; - } - *done_requests.tailp = queue->head; - done_requests.tailp = queue->tailp; - queue->head=NULL; - queue->tailp = &queue->head; - pthread_mutex_unlock(&queue->mutex); + /* kick "overflow" request queue */ + if (request_queue2.head && + pthread_mutex_trylock(&request_queue.mutex) == 0) { + *request_queue.tailp = request_queue2.head; + request_queue.tailp = request_queue2.tailp; + pthread_cond_signal(&request_queue.cond); + pthread_mutex_unlock(&request_queue.mutex); + request_queue2.head = NULL; + request_queue2.tailp = &request_queue2.head; + } + /* poll done queue */ + if (done_queue.head && pthread_mutex_trylock(&done_queue.mutex) == 0) { + struct aio_request_t *requests = done_queue.head; + done_queue.head = NULL; + done_queue.tailp = &done_queue.head; + pthread_mutex_unlock(&done_queue.mutex); + *done_requests.tailp = requests; + request_queue_len -= 1; + while(requests->next) { + requests = requests->next; + request_queue_len -= 1; } + done_requests.tailp = &requests->next; } } @@ -822,15 +777,20 @@ aio_request_t *request; aio_result_t *resultp; int cancelled; + int polled = 0; - if (done_requests.head == NULL) { + AIO_REPOLL: + request = done_requests.head; + if (request == NULL && !polled) { aio_poll_queues(); + polled = 1; + request = done_requests.head; } - AIO_REPOLL: - if (done_requests.head == NULL) { + if (!request) { return NULL; } - request = done_requests.head; + debug(41, 9) ("aio_poll_done: %p type=%d result=%p\n", + request, request->request_type, request->resultp); done_requests.head = request->next; if (!done_requests.head) done_requests.tailp = &done_requests.head; @@ -839,7 +799,6 @@ aio_debug(request); debug(43, 5) ("DONE: %d -> %d\n", request->ret, request->err); aio_cleanup_request(request); - dlinkDelete(&request->node, &requests); if (cancelled) goto AIO_REPOLL; return resultp; @@ -848,7 +807,7 @@ int aio_operations_pending(void) { - return requests.head != NULL; + return request_queue_len > 0 || done_requests.head; } int Index: squid/src/fs/aufs/store_asyncufs.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/fs/aufs/store_asyncufs.h,v retrieving revision 1.1.6.3.2.4 retrieving revision 1.1.6.3.2.5 diff -u -r1.1.6.3.2.4 -r1.1.6.3.2.5 --- squid/src/fs/aufs/store_asyncufs.h 1 Jul 2000 12:20:15 -0000 1.1.6.3.2.4 +++ squid/src/fs/aufs/store_asyncufs.h 16 Sep 2000 20:48:01 -0000 1.1.6.3.2.5 @@ -13,11 +13,15 @@ #define NUMTHREADS 16 #endif -#define MAGIC1 (NUMTHREADS/Config.cacheSwap.n_configured/2) +/* Queue limit where swapouts are deferred (load calculation) */ +#define MAGIC1 (NUMTHREADS*Config.cacheSwap.n_configured*5) +/* Queue limit where swapins are deferred (open/create fails) */ +#define MAGIC2 (NUMTHREADS*Config.cacheSwap.n_configured*20) struct _aio_result_t { int aio_return; int aio_errno; + void *_data; /* Internal housekeeping */ }; typedef struct _aio_result_t aio_result_t; Index: squid/src/fs/aufs/store_dir_aufs.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/fs/aufs/store_dir_aufs.c,v retrieving revision 1.1.6.6.2.5 retrieving revision 1.1.6.6.2.6 diff -u -r1.1.6.6.2.5 -r1.1.6.6.2.6 --- squid/src/fs/aufs/store_dir_aufs.c 1 Jul 2000 12:20:15 -0000 1.1.6.6.2.5 +++ squid/src/fs/aufs/store_dir_aufs.c 16 Sep 2000 20:48:01 -0000 1.1.6.6.2.6 @@ -1344,10 +1344,8 @@ ql = aioQueueSize(); if (ql == 0) loadav = 0; - else if (ql >= MAGIC1) /* Queue is too long, don't even consider it */ - loadav = -1; - else - loadav = MAGIC1 * 1000 / ql; + loadav = ql * 1000 / MAGIC1; + debug(41, 9) ("storeAufsDirCheckObj: load=%d\n", loadav); return loadav; } Index: squid/src/fs/aufs/store_io_aufs.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/fs/aufs/store_io_aufs.c,v retrieving revision 1.1.6.6.2.4 retrieving revision 1.1.6.6.2.5 diff -u -r1.1.6.6.2.4 -r1.1.6.6.2.5 --- squid/src/fs/aufs/store_io_aufs.c 30 May 2000 09:56:05 -0000 1.1.6.6.2.4 +++ squid/src/fs/aufs/store_io_aufs.c 16 Sep 2000 20:48:01 -0000 1.1.6.6.2.5 @@ -44,8 +44,10 @@ * we should detect some 'too many files open' condition and return * NULL here. */ - while (aioQueueSize() > MAGIC1) +#ifdef MAGIC2 + if (aioQueueSize() > MAGIC2) return NULL; +#endif sio = memAllocate(MEM_STORE_IO); cbdataAdd(sio, storeAufsIOFreeEntry, MEM_STORE_IO); sio->fsstate = memPoolAlloc(aio_state_pool); @@ -83,8 +85,10 @@ * we should detect some 'too many files open' condition and return * NULL here. */ - while (aioQueueSize() > MAGIC1) +#ifdef MAGIC2 + if (aioQueueSize() > MAGIC2) return NULL; +#endif sio = memAllocate(MEM_STORE_IO); cbdataAdd(sio, storeAufsIOFreeEntry, MEM_STORE_IO); sio->fsstate = memPoolAlloc(aio_state_pool);