--------------------- PatchSet 182 Date: 2000/04/24 22:12:40 Author: hno Branch: hno-devel Tag: (none) Log: Reworked the aiops thread IPC interface to use a variant of normal thread signalling and mutexes instead of one signal per thread. This should scale a whole lot better CPU wise when the number of threads increases. Also removed the data copying performed aioWrite(). Members: src/fs/aufs/aiops.c:1.1.6.1->1.1.6.1.2.1 Index: squid/src/fs/aufs/aiops.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/fs/aufs/aiops.c,v retrieving revision 1.1.6.1 retrieving revision 1.1.6.1.2.1 diff -u -r1.1.6.1 -r1.1.6.1.2.1 --- squid/src/fs/aufs/aiops.c 4 Feb 2000 18:28:13 -0000 1.1.6.1 +++ squid/src/fs/aufs/aiops.c 24 Apr 2000 22:12:40 -0000 1.1.6.1.2.1 @@ -49,19 +49,8 @@ #define RIDICULOUS_LENGTH 4096 -#if defined(_SQUID_LINUX_) -/* Linux requires proper use of mutexes or it will segfault deep in the - * thread libraries. Observed on Alpha SMP Linux 2.2.10-ac12. - */ -#define USE_PROPER_MUTEX 1 -#endif - -#if defined(_SQUID_LINUX_) -/* Linux requires proper use of mutexes or it will segfault deep in the - * thread libraries. Observed on Alpha SMP Linux 2.2.10-ac12. - */ -#define AIO_PROPER_MUTEX 1 -#endif +#define NR_AIO_REQUEST_QUEUES 2 +#define NR_AIO_DONE_QUEUES 2 enum _aio_thread_status { _THREAD_STARTING = 0, @@ -83,6 +72,7 @@ }; typedef struct aio_request_t { + struct aio_request_t *next; enum _aio_request_type request_type; int cancelled; char *path; @@ -99,21 +89,26 @@ struct stat *tmpstatp; struct stat *statp; aio_result_t *resultp; - struct aio_request_t *next; + dlink_node node; } aio_request_t; +typedef struct aio_request_queue_t { + pthread_mutex_t mutex; + pthread_cond_t cond; + aio_request_t *head; + aio_request_t **tailp; + unsigned long requests; + unsigned long blocked; /* main failed to lock the queue */ +} aio_request_queue_t; typedef struct aio_thread_t { pthread_t thread; enum _aio_thread_status status; - pthread_mutex_t mutex; /* Mutex for testing condition variable */ - pthread_cond_t cond; /* Condition variable */ - struct aio_request_t *volatile req; /* set by main, cleared by thread */ - struct aio_request_t *processed_req; /* reminder to main */ + struct aio_request_t *current_req; struct aio_thread_t *next; + unsigned long requests; } aio_thread_t; - int aio_cancel(aio_result_t *); int aio_open(const char *, int, mode_t, aio_result_t *); int aio_read(int, char *, int, off_t, int, aio_result_t *); @@ -126,7 +121,6 @@ static void aio_init(void); static void aio_queue_request(aio_request_t *); -static void aio_process_request_queue(void); static void aio_cleanup_request(aio_request_t *); static void *aio_thread_loop(void *); static void aio_do_open(aio_request_t *); @@ -139,20 +133,17 @@ static void *aio_do_opendir(aio_request_t *); #endif static void aio_debug(aio_request_t *); -static void aio_poll_threads(void); +static void aio_poll_queues(void); -static aio_thread_t *threads; +static aio_thread_t threads[NUMTHREADS]; static int aio_initialised = 0; +static dlink_list requests; static int request_queue_len = 0; static MemPool *aio_request_pool = NULL; -static aio_request_t *request_queue_head = NULL; -static aio_request_t *request_queue_tail = NULL; -static aio_request_t *request_done_head = NULL; -static aio_request_t *request_done_tail = NULL; -static aio_thread_t *wait_threads = NULL; -static aio_thread_t *busy_threads_head = NULL; -static aio_thread_t *busy_threads_tail = NULL; +static aio_request_queue_t request_queue[NR_AIO_REQUEST_QUEUES + 1]; +static aio_request_queue_t done_queue[NR_AIO_DONE_QUEUES]; +static struct { aio_request_t *head, **tailp; } done_requests = { NULL, &done_requests.head }; static pthread_attr_t globattr; static struct sched_param globsched; static pthread_t main_thread; @@ -179,33 +170,44 @@ #if HAVE_PTHREAD_ATTR_SETSCHEDPARAM pthread_attr_setschedparam(&globattr, &globsched); #endif + + /* Initialize request queues */ + for (i = 0; i < NR_AIO_REQUEST_QUEUES + 1; i++) { + aio_request_queue_t *queue = &request_queue[i]; + if (pthread_mutex_init(&(queue->mutex), NULL)) + fatal("Failed to create mutex"); + if (pthread_cond_init(&(queue->cond), NULL)) + fatal("Failed to create condition variable"); + queue->head = NULL; + queue->tailp = &queue->head; + queue->requests = 0; + queue->blocked = 0; + } + /* Initialize done queues */ + for (i = 0; i < NR_AIO_DONE_QUEUES; i++) { + aio_request_queue_t *queue = &done_queue[i]; + if (pthread_mutex_init(&(queue->mutex), NULL)) + fatal("Failed to create mutex"); + if (pthread_cond_init(&(queue->cond), NULL)) + fatal("Failed to create condition variable"); + queue->head = NULL; + queue->tailp = &queue->head; + queue->requests = 0; + queue->blocked = 0; + } /* Create threads and get them to sit in their wait loop */ - threads = xcalloc(NUMTHREADS, sizeof(aio_thread_t)); - for (i = 0; i < NUMTHREADS; i++) { threadp = &threads[i]; threadp->status = _THREAD_STARTING; - if (pthread_mutex_init(&(threadp->mutex), NULL)) { - threadp->status = _THREAD_FAILED; - continue; - } - if (pthread_cond_init(&(threadp->cond), NULL)) { - threadp->status = _THREAD_FAILED; - continue; - } - threadp->req = NULL; - threadp->processed_req = NULL; + threadp->current_req = NULL; + threadp->next = NULL; + threadp->requests = 0; if (pthread_create(&threadp->thread, &globattr, aio_thread_loop, threadp)) { fprintf(stderr, "Thread creation failed\n"); threadp->status = _THREAD_FAILED; continue; } - threadp->next = wait_threads; - wait_threads = threadp; -#if AIO_PROPER_MUTEX - pthread_mutex_lock(&threadp->mutex); -#endif } /* Create request pool */ @@ -221,9 +223,9 @@ aio_thread_t *threadp = ptr; aio_request_t *request; sigset_t new; -#if !AIO_PROPER_MUTEX + int i; struct timespec wait_time; -#endif + aio_request_queue_t *queue; /* * Make sure to ignore signals which may possibly get sent to @@ -247,29 +249,50 @@ sigaddset(&new, SIGALRM); pthread_sigmask(SIG_BLOCK, &new, NULL); - pthread_mutex_lock(&threadp->mutex); while (1) { -#if AIO_PROPER_MUTEX - while (threadp->req == NULL) { - threadp->status = _THREAD_WAITING; - pthread_cond_wait(&threadp->cond, &threadp->mutex); - } -#else - /* The timeout is used to unlock the race condition where - * ->req is set between the check and pthread_cond_wait. - * The thread steps it's own clock on each timeout, to avoid a CPU - * spin situation if the main thread is suspended (paging), and - * squid_curtime is not being updated timely. - */ - wait_time.tv_sec = squid_curtime + 1; /* little quicker first time */ + threadp->current_req = request = NULL; + wait_time.tv_sec = current_time.tv_sec + 1; wait_time.tv_nsec = 0; - while (threadp->req == NULL) { - threadp->status = _THREAD_WAITING; - pthread_cond_timedwait(&threadp->cond, &threadp->mutex, &wait_time); - wait_time.tv_sec += 3; /* then wait 3 seconds between each check */ + request = NULL; + while(!request) { + /* poll the overflow queues */ + for (i=NR_AIO_REQUEST_QUEUES-1; !request && i > 0; i--) { + queue = &request_queue[i]; + if (queue->head && !pthread_mutex_trylock(&queue->mutex)) { + request = queue->head; + if (request) { + queue->head = request->next; + if (!queue->head) + queue->tailp = &queue->head; + } + pthread_mutex_unlock(&queue->mutex); + } + } + if (!request) { + /* poll the main queue */ + queue = request_queue; + threadp->status = _THREAD_WAITING; + pthread_mutex_lock(&queue->mutex); + if (!queue->head) { + /* no request here, go to sleep with a timeout to catch + * some low load deadlocks caused by the multiple + * queue design or pthread_cond_signal.. + */ + pthread_cond_timedwait(&queue->cond, &queue->mutex, &wait_time); + wait_time.tv_sec += 5; /* idle some when idle */ + } + /* Get the request if any */ + request = queue->head; + if (request) + queue->head = request->next; + if (!queue->head) + queue->tailp = &queue->head; + pthread_mutex_unlock(&queue->mutex); + } } -#endif - request = threadp->req; + threadp->status = _THREAD_BUSY; + request->next = NULL; + threadp->current_req = request; errno = 0; if (!request->cancelled) { switch (request->request_type) { @@ -305,51 +328,69 @@ request->ret = -1; request->err = EINTR; } - threadp->req = NULL; /* tells main thread that we are done */ - } /* while */ + threadp->status = _THREAD_DONE; + /* find a done queue */ + for (i=0; i < NR_AIO_DONE_QUEUES; i++) { + queue = &done_queue[i]; + if (i != NR_AIO_DONE_QUEUES-1) { + if (!pthread_mutex_trylock(&queue->mutex)) + break; + } else { + pthread_mutex_lock(&queue->mutex); + break; + } + } + *queue->tailp = request; + queue->tailp = &request->next; + pthread_mutex_unlock(&queue->mutex); + threadp->requests++; + } /* while forever */ return NULL; } /* aio_thread_loop */ static void -aio_do_request(aio_request_t * requestp) +aio_queue_request(aio_request_t * request) { - if (wait_threads == NULL && busy_threads_head == NULL) { - fprintf(stderr, "PANIC: No threads to service requests with!\n"); - exit(-1); - } - aio_queue_request(requestp); -} /* aio_do_request */ - - -static void -aio_queue_request(aio_request_t * requestp) -{ - aio_request_t *rp; static int last_warn = 0; static int high_start = 0; static int queue_high, queue_low; int i; + aio_request_queue_t *queue; /* Mark it as not executed (failing result, no error) */ - requestp->ret = -1; - requestp->err = 0; - /* Queue it on the request queue */ - if (request_queue_head == NULL) { - request_queue_head = requestp; - request_queue_tail = requestp; - } else { - request_queue_tail->next = requestp; - request_queue_tail = requestp; - } - requestp->next = NULL; + request->ret = -1; + request->err = 0; + dlinkAdd(request,&request->node,&requests); + /* Find a available request queue */ + for (i=0; i < NR_AIO_REQUEST_QUEUES+1; i++) { + queue = &request_queue[i]; + if (!pthread_mutex_trylock(&queue->mutex)) + break; + else { + queue->blocked+=1; + queue = NULL; + } + } + assert(queue); + request->next = NULL; + *queue->tailp = request; + queue->tailp = &request->next; + /* kick requests in the last overflow (main only) queue */ + if (i < NR_AIO_REQUEST_QUEUES && + request_queue[NR_AIO_REQUEST_QUEUES].head) { + *queue->tailp = request_queue[NR_AIO_REQUEST_QUEUES].head; + queue->tailp = request_queue[NR_AIO_REQUEST_QUEUES].tailp; + request_queue[NR_AIO_REQUEST_QUEUES].head = NULL; + request_queue[NR_AIO_REQUEST_QUEUES].tailp = &request_queue[NR_AIO_REQUEST_QUEUES].head; + } + if (i==0) + pthread_cond_signal(&queue->cond); + pthread_mutex_unlock(&queue->mutex); + queue->requests += 1; request_queue_len += 1; - /* Poll done threads if needed */ - if (wait_threads == NULL) - aio_poll_threads(); - /* Kick it rolling */ - aio_process_request_queue(); /* Warn if out of threads */ - if (request_queue_len > (NUMTHREADS >> 1)) { + if (request_queue_len > (NUMTHREADS << 1)) { + aio_poll_queues(); if (high_start == 0) { high_start = squid_curtime; queue_high = request_queue_len; @@ -360,40 +401,14 @@ if (request_queue_len < queue_low) queue_low = request_queue_len; if (squid_curtime >= (last_warn + 15) && - squid_curtime >= (high_start + 3)) { - debug(43, 1) ("aio_queue_request: WARNING - Running out of I/O threads\n"); - debug(43, 2) ("aio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%d\n", + squid_curtime >= (high_start + 5)) { + debug(43, 1) ("aio_queue_request: WARNING - Disk I/O overloading\n"); + if (squid_curtime >= (high_start + 15)) { + debug(43, 1) ("aio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%d\n", request_queue_len, queue_high, queue_low, squid_curtime - high_start); - debug(43, 1) ("aio_queue_request: Perhaps you should increase NUMTHREADS\n"); - debug(43, 1) ("aio_queue_request: Or install more disks to share the load\n"); - debug(43, 3) ("aio_queue_request: First %d items on request queue\n", NUMTHREADS); - rp = request_queue_head; - for (i = 1; i <= NUMTHREADS; i++) { - switch (rp->request_type) { - case _AIO_OP_OPEN: - debug(43, 3) ("aio_queue_request: %d : open -> %s\n", i, rp->path); - break; - case _AIO_OP_READ: - debug(43, 3) ("aio_queue_request: %d : read -> FD = %d\n", i, rp->fd); - break; - case _AIO_OP_WRITE: - debug(43, 3) ("aio_queue_request: %d : write -> FD = %d\n", i, rp->fd); - break; - case _AIO_OP_CLOSE: - debug(43, 3) ("aio_queue_request: %d : close -> FD = %d\n", i, rp->fd); - break; - case _AIO_OP_UNLINK: - debug(43, 3) ("aio_queue_request: %d : unlink -> %s\n", i, rp->path); - break; - case _AIO_OP_STAT: - debug(43, 3) ("aio_queue_request: %d : stat -> %s\n", i, rp->path); - break; - default: - debug(43, 1) ("aio_queue_request: %d : Unimplemented request type: %d\n", i, rp->request_type); - break; - } - if ((rp = rp->next) == NULL) - break; + debug(43, 1) ("aio_queue_request: Perhaps you should increase NUMTHREADS,\n"); + debug(43, 1) ("aio_queue_request: install more disks to share the loadr\n"); + debug(43, 1) ("aio_queue_request: or add a second CPU for the I/O\n"); } last_warn = squid_curtime; } @@ -408,116 +423,66 @@ } } /* aio_queue_request */ - static void -aio_process_request_queue(void) +aio_cleanup_request(aio_request_t * request) { - aio_thread_t *threadp; - aio_request_t *requestp; - - for (;;) { - if (wait_threads == NULL || request_queue_head == NULL) - return; - - requestp = request_queue_head; - if ((request_queue_head = requestp->next) == NULL) - request_queue_tail = NULL; - requestp->next = NULL; - request_queue_len--; - - if (requestp->cancelled) { - aio_cleanup_request(requestp); - continue; - } - threadp = wait_threads; - wait_threads = threadp->next; - threadp->next = NULL; - - if (busy_threads_head != NULL) - busy_threads_tail->next = threadp; - else - busy_threads_head = threadp; - busy_threads_tail = threadp; - - threadp->status = _THREAD_BUSY; - threadp->req = threadp->processed_req = requestp; - pthread_cond_signal(&(threadp->cond)); -#if AIO_PROPER_MUTEX - pthread_mutex_unlock(&threadp->mutex); -#endif - } -} /* aio_process_request_queue */ - - -static void -aio_cleanup_request(aio_request_t * requestp) -{ - aio_result_t *resultp = requestp->resultp; - int cancelled = requestp->cancelled; + aio_result_t *resultp = request->resultp; + int cancelled = request->cancelled; /* Free allocated structures and copy data back to user space if the */ /* request hasn't been cancelled */ - switch (requestp->request_type) { + switch (request->request_type) { case _AIO_OP_STAT: - if (!cancelled && requestp->ret == 0) - xmemcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat)); - xfree(requestp->tmpstatp); + if (!cancelled && request->ret == 0) + xmemcpy(request->statp, request->tmpstatp, sizeof(struct stat)); + xfree(request->tmpstatp); case _AIO_OP_OPEN: - if (cancelled && requestp->ret >= 0) + if (cancelled && request->ret >= 0) /* The open() was cancelled but completed */ - close(requestp->ret); - xfree(requestp->path); + close(request->ret); + xfree(request->path); break; case _AIO_OP_CLOSE: - if (cancelled && requestp->ret < 0) + if (cancelled && request->ret < 0) /* The close() was cancelled and never got executed */ - close(requestp->fd); + close(request->fd); break; case _AIO_OP_UNLINK: case _AIO_OP_OPENDIR: - xfree(requestp->path); + xfree(request->path); break; case _AIO_OP_READ: - if (!cancelled && requestp->ret > 0) - xmemcpy(requestp->bufferp, requestp->tmpbufp, requestp->ret); + if (!cancelled && request->ret > 0) + xmemcpy(request->bufferp, request->tmpbufp, request->ret); + xfree(request->tmpbufp); + break; case _AIO_OP_WRITE: - xfree(requestp->tmpbufp); break; default: break; } if (resultp != NULL && !cancelled) { - resultp->aio_return = requestp->ret; - resultp->aio_errno = requestp->err; + resultp->aio_return = request->ret; + resultp->aio_errno = request->err; } - memPoolFree(aio_request_pool, requestp); + memPoolFree(aio_request_pool, request); } /* aio_cleanup_request */ int aio_cancel(aio_result_t * resultp) { - aio_thread_t *threadp; - aio_request_t *requestp; + aio_request_t *request; + dlink_node *node; - for (threadp = busy_threads_head; threadp != NULL; threadp = threadp->next) - if (threadp->processed_req->resultp == resultp) { - threadp->processed_req->cancelled = 1; - threadp->processed_req->resultp = NULL; - return 0; - } - for (requestp = request_queue_head; requestp != NULL; requestp = requestp->next) - if (requestp->resultp == resultp) { - requestp->cancelled = 1; - requestp->resultp = NULL; - return 0; - } - for (requestp = request_done_head; requestp != NULL; requestp = requestp->next) - if (requestp->resultp == resultp) { - requestp->cancelled = 1; - requestp->resultp = NULL; + for (node = requests.head; node != NULL; node = node->next) { + request = node->data; + if (request->resultp == resultp) { + request->cancelled = 1; + request->resultp = NULL; return 0; } + } return 1; } /* aio_cancel */ @@ -525,238 +490,234 @@ int aio_open(const char *path, int oflag, mode_t mode, aio_result_t * resultp) { - aio_request_t *requestp; + aio_request_t *request; int len; if (!aio_initialised) aio_init(); - if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { + if ((request = memPoolAlloc(aio_request_pool)) == NULL) { errno = ENOMEM; return -1; } len = strlen(path) + 1; - if ((requestp->path = (char *) xmalloc(len)) == NULL) { - memPoolFree(aio_request_pool, requestp); + if ((request->path = (char *) xmalloc(len)) == NULL) { + memPoolFree(aio_request_pool, request); errno = ENOMEM; return -1; } - strncpy(requestp->path, path, len); - requestp->oflag = oflag; - requestp->mode = mode; - requestp->resultp = resultp; - requestp->request_type = _AIO_OP_OPEN; - requestp->cancelled = 0; + strncpy(request->path, path, len); + request->oflag = oflag; + request->mode = mode; + request->resultp = resultp; + request->request_type = _AIO_OP_OPEN; + request->cancelled = 0; - aio_do_request(requestp); + aio_queue_request(request); return 0; } static void -aio_do_open(aio_request_t * requestp) +aio_do_open(aio_request_t * request) { - requestp->ret = open(requestp->path, requestp->oflag, requestp->mode); - requestp->err = errno; + request->ret = open(request->path, request->oflag, request->mode); + request->err = errno; } int aio_read(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t * resultp) { - aio_request_t *requestp; + aio_request_t *request; if (!aio_initialised) aio_init(); - if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { + if ((request = memPoolAlloc(aio_request_pool)) == NULL) { errno = ENOMEM; return -1; } - requestp->fd = fd; - requestp->bufferp = bufp; - if ((requestp->tmpbufp = (char *) xmalloc(bufs)) == NULL) { - memPoolFree(aio_request_pool, requestp); + request->fd = fd; + request->bufferp = bufp; + if ((request->tmpbufp = (char *) xmalloc(bufs)) == NULL) { + memPoolFree(aio_request_pool, request); errno = ENOMEM; return -1; } - requestp->buflen = bufs; - requestp->offset = offset; - requestp->whence = whence; - requestp->resultp = resultp; - requestp->request_type = _AIO_OP_READ; - requestp->cancelled = 0; + request->buflen = bufs; + request->offset = offset; + request->whence = whence; + request->resultp = resultp; + request->request_type = _AIO_OP_READ; + request->cancelled = 0; - aio_do_request(requestp); + aio_queue_request(request); return 0; } static void -aio_do_read(aio_request_t * requestp) +aio_do_read(aio_request_t * request) { - lseek(requestp->fd, requestp->offset, requestp->whence); - requestp->ret = read(requestp->fd, requestp->tmpbufp, requestp->buflen); - requestp->err = errno; + lseek(request->fd, request->offset, request->whence); + request->ret = read(request->fd, request->tmpbufp, request->buflen); + request->err = errno; } int aio_write(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t * resultp) { - aio_request_t *requestp; + aio_request_t *request; if (!aio_initialised) aio_init(); - if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { + if ((request = memPoolAlloc(aio_request_pool)) == NULL) { errno = ENOMEM; return -1; } - requestp->fd = fd; - if ((requestp->tmpbufp = (char *) xmalloc(bufs)) == NULL) { - memPoolFree(aio_request_pool, requestp); - errno = ENOMEM; - return -1; - } - xmemcpy(requestp->tmpbufp, bufp, bufs); - requestp->buflen = bufs; - requestp->offset = offset; - requestp->whence = whence; - requestp->resultp = resultp; - requestp->request_type = _AIO_OP_WRITE; - requestp->cancelled = 0; + request->fd = fd; + request->tmpbufp = NULL; + request->bufferp = bufp; + request->buflen = bufs; + request->offset = offset; + request->whence = whence; + request->resultp = resultp; + request->request_type = _AIO_OP_WRITE; + request->cancelled = 0; - aio_do_request(requestp); + aio_queue_request(request); return 0; } static void -aio_do_write(aio_request_t * requestp) +aio_do_write(aio_request_t * request) { - requestp->ret = write(requestp->fd, requestp->tmpbufp, requestp->buflen); - requestp->err = errno; + request->ret = write(request->fd, request->bufferp, request->buflen); + request->err = errno; } int aio_close(int fd, aio_result_t * resultp) { - aio_request_t *requestp; + aio_request_t *request; if (!aio_initialised) aio_init(); - if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { + if ((request = memPoolAlloc(aio_request_pool)) == NULL) { errno = ENOMEM; return -1; } - requestp->fd = fd; - requestp->resultp = resultp; - requestp->request_type = _AIO_OP_CLOSE; - requestp->cancelled = 0; + request->fd = fd; + request->resultp = resultp; + request->request_type = _AIO_OP_CLOSE; + request->cancelled = 0; - aio_do_request(requestp); + aio_queue_request(request); return 0; } static void -aio_do_close(aio_request_t * requestp) +aio_do_close(aio_request_t * request) { - requestp->ret = close(requestp->fd); - requestp->err = errno; + request->ret = close(request->fd); + request->err = errno; } int aio_stat(const char *path, struct stat *sb, aio_result_t * resultp) { - aio_request_t *requestp; + aio_request_t *request; int len; if (!aio_initialised) aio_init(); - if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { + if ((request = memPoolAlloc(aio_request_pool)) == NULL) { errno = ENOMEM; return -1; } len = strlen(path) + 1; - if ((requestp->path = (char *) xmalloc(len)) == NULL) { - memPoolFree(aio_request_pool, requestp); + if ((request->path = (char *) xmalloc(len)) == NULL) { + memPoolFree(aio_request_pool, request); errno = ENOMEM; return -1; } - strncpy(requestp->path, path, len); - requestp->statp = sb; - if ((requestp->tmpstatp = (struct stat *) xmalloc(sizeof(struct stat))) == NULL) { - xfree(requestp->path); - memPoolFree(aio_request_pool, requestp); + strncpy(request->path, path, len); + request->statp = sb; + if ((request->tmpstatp = (struct stat *) xmalloc(sizeof(struct stat))) == NULL) { + xfree(request->path); + memPoolFree(aio_request_pool, request); errno = ENOMEM; return -1; } - requestp->resultp = resultp; - requestp->request_type = _AIO_OP_STAT; - requestp->cancelled = 0; + request->resultp = resultp; + request->request_type = _AIO_OP_STAT; + request->cancelled = 0; - aio_do_request(requestp); + aio_queue_request(request); return 0; } static void -aio_do_stat(aio_request_t * requestp) +aio_do_stat(aio_request_t * request) { - requestp->ret = stat(requestp->path, requestp->tmpstatp); - requestp->err = errno; + request->ret = stat(request->path, request->tmpstatp); + request->err = errno; } int aio_unlink(const char *path, aio_result_t * resultp) { - aio_request_t *requestp; + aio_request_t *request; int len; if (!aio_initialised) aio_init(); - if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { + if ((request = memPoolAlloc(aio_request_pool)) == NULL) { errno = ENOMEM; return -1; } len = strlen(path) + 1; - if ((requestp->path = (char *) xmalloc(len)) == NULL) { - memPoolFree(aio_request_pool, requestp); + if ((request->path = (char *) xmalloc(len)) == NULL) { + memPoolFree(aio_request_pool, request); errno = ENOMEM; return -1; } - strncpy(requestp->path, path, len); - requestp->resultp = resultp; - requestp->request_type = _AIO_OP_UNLINK; - requestp->cancelled = 0; + strncpy(request->path, path, len); + request->resultp = resultp; + request->request_type = _AIO_OP_UNLINK; + request->cancelled = 0; - aio_do_request(requestp); + aio_queue_request(request); return 0; } static void -aio_do_unlink(aio_request_t * requestp) +aio_do_unlink(aio_request_t * request) { - requestp->ret = unlink(requestp->path); - requestp->err = errno; + request->ret = unlink(request->path); + request->err = errno; } #if AIO_OPENDIR -/* XXX aio_opendir NOT implemented? */ +/* XXX aio_opendir NOT implemented yet.. */ int aio_opendir(const char *path, aio_result_t * resultp) { - aio_request_t *requestp; + aio_request_t *request; int len; if (!aio_initialised) aio_init(); - if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { + if ((request = memPoolAlloc(aio_request_pool)) == NULL) { errno = ENOMEM; return -1; } @@ -764,89 +725,78 @@ } static void -aio_do_opendir(aio_request_t * requestp) +aio_do_opendir(aio_request_t * request) { /* NOT IMPLEMENTED */ } #endif - -void -aio_poll_threads(void) +static void +aio_poll_queues(void) { - aio_thread_t *prev; - aio_thread_t *threadp; - aio_request_t *requestp; - - do { /* while found completed thread */ - prev = NULL; - threadp = busy_threads_head; - while (threadp) { - debug(43, 9) ("aio_poll_threads: %p: request type %d -> status %d\n", - threadp, - threadp->processed_req->request_type, - threadp->status); -#if AIO_PROPER_MUTEX - if (threadp->req == NULL) - if (pthread_mutex_trylock(&threadp->mutex) == 0) - break; -#else - if (threadp->req == NULL) + int i; + aio_request_queue_t *queue; + aio_request_t *request; + /* kick "last-overflow" request queue */ + if (request_queue[NR_AIO_REQUEST_QUEUES].head) { + for(i=0; imutex)) break; -#endif - prev = threadp; - threadp = threadp->next; + queue=NULL; } - if (threadp == NULL) - break; - - if (prev == NULL) - busy_threads_head = busy_threads_head->next; - else - prev->next = threadp->next; - - if (busy_threads_tail == threadp) - busy_threads_tail = prev; - - requestp = threadp->processed_req; - threadp->processed_req = NULL; - - threadp->next = wait_threads; - wait_threads = threadp; - - if (request_done_tail != NULL) - request_done_tail->next = requestp; - else - request_done_head = requestp; - request_done_tail = requestp; - } while (threadp); - - aio_process_request_queue(); -} /* aio_poll_threads */ + if (queue) { + *queue->tailp=request_queue[NR_AIO_REQUEST_QUEUES].head; + queue->tailp=request_queue[NR_AIO_REQUEST_QUEUES].tailp; + request_queue[NR_AIO_REQUEST_QUEUES].head=NULL; + request_queue[NR_AIO_REQUEST_QUEUES].tailp=&request_queue[NR_AIO_REQUEST_QUEUES].head; + if (i==0) + pthread_cond_signal(&queue->cond); + pthread_mutex_unlock(&queue->mutex); + } + } + /* poll done queues */ + for(i=0; ihead && !pthread_mutex_trylock(&queue->mutex)) { + for(request=queue->head;request;request = request->next) { + request_queue_len -= 1; + queue->requests += 1; + } + *done_requests.tailp = queue->head; + done_requests.tailp = queue->tailp; + queue->head=NULL; + queue->tailp = &queue->head; + pthread_mutex_unlock(&queue->mutex); + } + } +} aio_result_t * aio_poll_done(void) { - aio_request_t *requestp; + aio_request_t *request; aio_result_t *resultp; int cancelled; + if (done_requests.head == NULL) { + aio_poll_queues(); + } AIO_REPOLL: - aio_poll_threads(); - if (request_done_head == NULL) { + if (done_requests.head == NULL) { return NULL; } - requestp = request_done_head; - request_done_head = requestp->next; - if (!request_done_head) - request_done_tail = NULL; - - resultp = requestp->resultp; - cancelled = requestp->cancelled; - aio_debug(requestp); - debug(43, 5) ("DONE: %d -> %d\n", requestp->ret, requestp->err); - aio_cleanup_request(requestp); + request = done_requests.head; + done_requests.head = request->next; + if (!done_requests.head) + done_requests.tailp = &done_requests.head; + resultp = request->resultp; + cancelled = request->cancelled; + aio_debug(request); + debug(43, 5) ("DONE: %d -> %d\n", request->ret, request->err); + aio_cleanup_request(request); + dlinkDelete(&request->node, &requests); if (cancelled) goto AIO_REPOLL; return resultp; @@ -855,7 +805,7 @@ int aio_operations_pending(void) { - return request_queue_len + (request_done_head != NULL) + (busy_threads_head != NULL); + return requests.head != NULL; } int @@ -875,10 +825,9 @@ int aio_sync(void) { - int loop_count = 0; + /* XXX This might take a while if the queue is large.. */ do { - aio_poll_threads(); - assert(++loop_count < 10); + aio_poll_queues(); } while (request_queue_len > 0); return aio_operations_pending(); } @@ -890,23 +839,23 @@ } static void -aio_debug(aio_request_t * requestp) +aio_debug(aio_request_t * request) { - switch (requestp->request_type) { + switch (request->request_type) { case _AIO_OP_OPEN: - debug(43, 5) ("OPEN of %s to FD %d\n", requestp->path, requestp->ret); + debug(43, 5) ("OPEN of %s to FD %d\n", request->path, request->ret); break; case _AIO_OP_READ: - debug(43, 5) ("READ on fd: %d\n", requestp->fd); + debug(43, 5) ("READ on fd: %d\n", request->fd); break; case _AIO_OP_WRITE: - debug(43, 5) ("WRITE on fd: %d\n", requestp->fd); + debug(43, 5) ("WRITE on fd: %d\n", request->fd); break; case _AIO_OP_CLOSE: - debug(43, 5) ("CLOSE of fd: %d\n", requestp->fd); + debug(43, 5) ("CLOSE of fd: %d\n", request->fd); break; case _AIO_OP_UNLINK: - debug(43, 5) ("UNLINK of %s\n", requestp->path); + debug(43, 5) ("UNLINK of %s\n", request->path); break; default: break;