--------------------- PatchSet 542 Date: 2000/08/19 04:47:18 Author: hno Branch: hno-2_3 Tag: (none) Log: Backed out a big chunk of AIO changes which I do not know how they got here in the first place. Should only be on hno-devel.. Members: src/aiops.c:1.1.1.3.4.1.4.3->1.1.1.3.4.1.4.4 Index: squid/src/aiops.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/Attic/aiops.c,v retrieving revision 1.1.1.3.4.1.4.3 retrieving revision 1.1.1.3.4.1.4.4 diff -u -r1.1.1.3.4.1.4.3 -r1.1.1.3.4.1.4.4 --- squid/src/aiops.c 1 Aug 2000 00:48:07 -0000 1.1.1.3.4.1.4.3 +++ squid/src/aiops.c 19 Aug 2000 04:47:18 -0000 1.1.1.3.4.1.4.4 @@ -1,5 +1,5 @@ /* - * $Id: aiops.c,v 1.1.1.3.4.1.4.3 2000/08/01 00:48:07 hno Exp $ + * $Id: aiops.c,v 1.1.1.3.4.1.4.4 2000/08/19 04:47:18 hno Exp $ * * DEBUG: section 43 AIOPS * AUTHOR: Stewart Forster @@ -11,10 +11,10 @@ * Internet community. Development is led by Duane Wessels of the * National Laboratory for Applied Network Research and funded by the * National Science Foundation. Squid is Copyrighted (C) 1998 by - * Duane Wessels and the University of California San Diego. Please - * see the COPYRIGHT file for full details. Squid incorporates - * software developed and/or copyrighted by other sources. Please see - * the CREDITS file for full details. + * the Regents of the University of California. Please see the + * COPYRIGHT file for full details. Squid incorporates software + * developed and/or copyrighted by other sources. Please see the + * CREDITS file for full details. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -48,8 +48,19 @@ #define RIDICULOUS_LENGTH 4096 -#define NR_AIO_REQUEST_QUEUES 2 -#define NR_AIO_DONE_QUEUES 2 +#if defined(_SQUID_LINUX_) +/* Linux requires proper use of mutexes or it will segfault deep in the + * thread libraries. Observed on Alpha SMP Linux 2.2.10-ac12. + */ +#define USE_PROPER_MUTEX 1 +#endif + +#if defined(_SQUID_LINUX_) +/* Linux requires proper use of mutexes or it will segfault deep in the + * thread libraries. Observed on Alpha SMP Linux 2.2.10-ac12. + */ +#define AIO_PROPER_MUTEX 1 +#endif enum _aio_thread_status { _THREAD_STARTING = 0, @@ -71,7 +82,6 @@ }; typedef struct aio_request_t { - struct aio_request_t *next; enum _aio_request_type request_type; int cancelled; char *path; @@ -88,26 +98,21 @@ struct stat *tmpstatp; struct stat *statp; aio_result_t *resultp; - dlink_node node; + struct aio_request_t *next; } aio_request_t; -typedef struct aio_request_queue_t { - pthread_mutex_t mutex; - pthread_cond_t cond; - aio_request_t *head; - aio_request_t **tailp; - unsigned long requests; - unsigned long blocked; /* main failed to lock the queue */ -} aio_request_queue_t; typedef struct aio_thread_t { pthread_t thread; enum _aio_thread_status status; - struct aio_request_t *current_req; + pthread_mutex_t mutex; /* Mutex for testing condition variable */ + pthread_cond_t cond; /* Condition variable */ + struct aio_request_t *volatile req; /* set by main, cleared by thread */ + struct aio_request_t *processed_req; /* reminder to main */ struct aio_thread_t *next; - unsigned long requests; } aio_thread_t; + int aio_cancel(aio_result_t *); int aio_open(const char *, int, mode_t, aio_result_t *); int aio_read(int, char *, int, off_t, int, aio_result_t *); @@ -120,6 +125,7 @@ static void aio_init(void); static void aio_queue_request(aio_request_t *); +static void aio_process_request_queue(void); static void aio_cleanup_request(aio_request_t *); static void *aio_thread_loop(void *); static void aio_do_open(aio_request_t *); @@ -132,17 +138,20 @@ static void *aio_do_opendir(aio_request_t *); #endif static void aio_debug(aio_request_t *); -static void aio_poll_queues(void); +static void aio_poll_threads(void); -static aio_thread_t threads[NUMTHREADS]; +static aio_thread_t *threads; static int aio_initialised = 0; -static dlink_list requests; static int request_queue_len = 0; static MemPool *aio_request_pool = NULL; -static aio_request_queue_t request_queue[NR_AIO_REQUEST_QUEUES + 1]; -static aio_request_queue_t done_queue[NR_AIO_DONE_QUEUES]; -static struct { aio_request_t *head, **tailp; } done_requests = { NULL, &done_requests.head }; +static aio_request_t *request_queue_head = NULL; +static aio_request_t *request_queue_tail = NULL; +static aio_request_t *request_done_head = NULL; +static aio_request_t *request_done_tail = NULL; +static aio_thread_t *wait_threads = NULL; +static aio_thread_t *busy_threads_head = NULL; +static aio_thread_t *busy_threads_tail = NULL; static pthread_attr_t globattr; static struct sched_param globsched; static pthread_t main_thread; @@ -158,8 +167,16 @@ pthread_attr_init(&globattr); #if HAVE_PTHREAD_ATTR_SETSCOPE +#if defined(_SQUID_SGI_) + /* + * Erik Hofman suggests PTHREAD_SCOPE_PROCESS + * instead of PTHREAD_SCOPE_SYSTEM, esp for IRIX. + */ + pthread_attr_setscope(&globattr, PTHREAD_SCOPE_PROCESS); +#else pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM); #endif +#endif globsched.sched_priority = 1; main_thread = pthread_self(); #if HAVE_PTHREAD_SETSCHEDPARAM @@ -169,44 +186,36 @@ #if HAVE_PTHREAD_ATTR_SETSCHEDPARAM pthread_attr_setschedparam(&globattr, &globsched); #endif - - /* Initialize request queues */ - for (i = 0; i < NR_AIO_REQUEST_QUEUES + 1; i++) { - aio_request_queue_t *queue = &request_queue[i]; - if (pthread_mutex_init(&(queue->mutex), NULL)) - fatal("Failed to create mutex"); - if (pthread_cond_init(&(queue->cond), NULL)) - fatal("Failed to create condition variable"); - queue->head = NULL; - queue->tailp = &queue->head; - queue->requests = 0; - queue->blocked = 0; - } - /* Initialize done queues */ - for (i = 0; i < NR_AIO_DONE_QUEUES; i++) { - aio_request_queue_t *queue = &done_queue[i]; - if (pthread_mutex_init(&(queue->mutex), NULL)) - fatal("Failed to create mutex"); - if (pthread_cond_init(&(queue->cond), NULL)) - fatal("Failed to create condition variable"); - queue->head = NULL; - queue->tailp = &queue->head; - queue->requests = 0; - queue->blocked = 0; - } +#if defined(_SQUID_SGI_) + pthread_setconcurrency(NUMTHREADS + 1); +#endif /* Create threads and get them to sit in their wait loop */ + threads = xcalloc(NUMTHREADS, sizeof(aio_thread_t)); + for (i = 0; i < NUMTHREADS; i++) { threadp = &threads[i]; threadp->status = _THREAD_STARTING; - threadp->current_req = NULL; - threadp->next = NULL; - threadp->requests = 0; + if (pthread_mutex_init(&(threadp->mutex), NULL)) { + threadp->status = _THREAD_FAILED; + continue; + } + if (pthread_cond_init(&(threadp->cond), NULL)) { + threadp->status = _THREAD_FAILED; + continue; + } + threadp->req = NULL; + threadp->processed_req = NULL; if (pthread_create(&threadp->thread, &globattr, aio_thread_loop, threadp)) { fprintf(stderr, "Thread creation failed\n"); threadp->status = _THREAD_FAILED; continue; } + threadp->next = wait_threads; + wait_threads = threadp; +#if AIO_PROPER_MUTEX + pthread_mutex_lock(&threadp->mutex); +#endif } /* Create request pool */ @@ -222,9 +231,9 @@ aio_thread_t *threadp = ptr; aio_request_t *request; sigset_t new; - int i; +#if !AIO_PROPER_MUTEX struct timespec wait_time; - aio_request_queue_t *queue; +#endif /* * Make sure to ignore signals which may possibly get sent to @@ -257,50 +266,29 @@ sigthreadmask(SIG_BLOCK, &new, NULL); #endif + pthread_mutex_lock(&threadp->mutex); while (1) { - threadp->current_req = request = NULL; - wait_time.tv_sec = current_time.tv_sec + 1; +#if AIO_PROPER_MUTEX + while (threadp->req == NULL) { + threadp->status = _THREAD_WAITING; + pthread_cond_wait(&threadp->cond, &threadp->mutex); + } +#else + /* The timeout is used to unlock the race condition where + * ->req is set between the check and pthread_cond_wait. + * The thread steps it's own clock on each timeout, to avoid a CPU + * spin situation if the main thread is suspended (paging), and + * squid_curtime is not being updated timely. + */ + wait_time.tv_sec = squid_curtime + 1; /* little quicker first time */ wait_time.tv_nsec = 0; - request = NULL; - while(!request) { - /* poll the overflow queues */ - for (i=NR_AIO_REQUEST_QUEUES-1; !request && i > 0; i--) { - queue = &request_queue[i]; - if (queue->head && !pthread_mutex_trylock(&queue->mutex)) { - request = queue->head; - if (request) { - queue->head = request->next; - if (!queue->head) - queue->tailp = &queue->head; - } - pthread_mutex_unlock(&queue->mutex); - } - } - if (!request) { - /* poll the main queue */ - queue = request_queue; - threadp->status = _THREAD_WAITING; - pthread_mutex_lock(&queue->mutex); - if (!queue->head) { - /* no request here, go to sleep with a timeout to catch - * some low load deadlocks caused by the multiple - * queue design or pthread_cond_signal.. - */ - pthread_cond_timedwait(&queue->cond, &queue->mutex, &wait_time); - wait_time.tv_sec += 5; /* idle some when idle */ - } - /* Get the request if any */ - request = queue->head; - if (request) - queue->head = request->next; - if (!queue->head) - queue->tailp = &queue->head; - pthread_mutex_unlock(&queue->mutex); - } + while (threadp->req == NULL) { + threadp->status = _THREAD_WAITING; + pthread_cond_timedwait(&threadp->cond, &threadp->mutex, &wait_time); + wait_time.tv_sec += 3; /* then wait 3 seconds between each check */ } - threadp->status = _THREAD_BUSY; - request->next = NULL; - threadp->current_req = request; +#endif + request = threadp->req; errno = 0; if (!request->cancelled) { switch (request->request_type) { @@ -336,69 +324,51 @@ request->ret = -1; request->err = EINTR; } - threadp->status = _THREAD_DONE; - /* find a done queue */ - for (i=0; i < NR_AIO_DONE_QUEUES; i++) { - queue = &done_queue[i]; - if (i != NR_AIO_DONE_QUEUES-1) { - if (!pthread_mutex_trylock(&queue->mutex)) - break; - } else { - pthread_mutex_lock(&queue->mutex); - break; - } - } - *queue->tailp = request; - queue->tailp = &request->next; - pthread_mutex_unlock(&queue->mutex); - threadp->requests++; - } /* while forever */ + threadp->req = NULL; /* tells main thread that we are done */ + } /* while */ return NULL; } /* aio_thread_loop */ static void -aio_queue_request(aio_request_t * request) +aio_do_request(aio_request_t * requestp) +{ + if (wait_threads == NULL && busy_threads_head == NULL) { + fprintf(stderr, "PANIC: No threads to service requests with!\n"); + exit(-1); + } + aio_queue_request(requestp); +} /* aio_do_request */ + + +static void +aio_queue_request(aio_request_t * requestp) { + aio_request_t *rp; static int last_warn = 0; static int high_start = 0; static int queue_high, queue_low; int i; - aio_request_queue_t *queue; /* Mark it as not executed (failing result, no error) */ - request->ret = -1; - request->err = 0; - dlinkAdd(request,&request->node,&requests); - /* Find a available request queue */ - for (i=0; i < NR_AIO_REQUEST_QUEUES+1; i++) { - queue = &request_queue[i]; - if (!pthread_mutex_trylock(&queue->mutex)) - break; - else { - queue->blocked+=1; - queue = NULL; - } - } - assert(queue); - request->next = NULL; - *queue->tailp = request; - queue->tailp = &request->next; - /* kick requests in the last overflow (main only) queue */ - if (i < NR_AIO_REQUEST_QUEUES && - request_queue[NR_AIO_REQUEST_QUEUES].head) { - *queue->tailp = request_queue[NR_AIO_REQUEST_QUEUES].head; - queue->tailp = request_queue[NR_AIO_REQUEST_QUEUES].tailp; - request_queue[NR_AIO_REQUEST_QUEUES].head = NULL; - request_queue[NR_AIO_REQUEST_QUEUES].tailp = &request_queue[NR_AIO_REQUEST_QUEUES].head; - } - if (i==0) - pthread_cond_signal(&queue->cond); - pthread_mutex_unlock(&queue->mutex); - queue->requests += 1; + requestp->ret = -1; + requestp->err = 0; + /* Queue it on the request queue */ + if (request_queue_head == NULL) { + request_queue_head = requestp; + request_queue_tail = requestp; + } else { + request_queue_tail->next = requestp; + request_queue_tail = requestp; + } + requestp->next = NULL; request_queue_len += 1; + /* Poll done threads if needed */ + if (wait_threads == NULL) + aio_poll_threads(); + /* Kick it rolling */ + aio_process_request_queue(); /* Warn if out of threads */ - if (request_queue_len > (NUMTHREADS << 1)) { - aio_poll_queues(); + if (request_queue_len > (NUMTHREADS >> 1)) { if (high_start == 0) { high_start = squid_curtime; queue_high = request_queue_len; @@ -409,14 +379,40 @@ if (request_queue_len < queue_low) queue_low = request_queue_len; if (squid_curtime >= (last_warn + 15) && - squid_curtime >= (high_start + 5)) { - debug(43, 1) ("aio_queue_request: WARNING - Disk I/O overloading\n"); - if (squid_curtime >= (high_start + 15)) { - debug(43, 1) ("aio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%d\n", + squid_curtime >= (high_start + 3)) { + debug(43, 1) ("aio_queue_request: WARNING - Running out of I/O threads\n"); + debug(43, 2) ("aio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%d\n", request_queue_len, queue_high, queue_low, squid_curtime - high_start); - debug(43, 1) ("aio_queue_request: Perhaps you should increase NUMTHREADS,\n"); - debug(43, 1) ("aio_queue_request: install more disks to share the loadr\n"); - debug(43, 1) ("aio_queue_request: or add a second CPU for the I/O\n"); + debug(43, 1) ("aio_queue_request: Perhaps you should increase NUMTHREADS\n"); + debug(43, 1) ("aio_queue_request: Or install more disks to share the load\n"); + debug(43, 3) ("aio_queue_request: First %d items on request queue\n", NUMTHREADS); + rp = request_queue_head; + for (i = 1; i <= NUMTHREADS; i++) { + switch (rp->request_type) { + case _AIO_OP_OPEN: + debug(43, 3) ("aio_queue_request: %d : open -> %s\n", i, rp->path); + break; + case _AIO_OP_READ: + debug(43, 3) ("aio_queue_request: %d : read -> FD = %d\n", i, rp->fd); + break; + case _AIO_OP_WRITE: + debug(43, 3) ("aio_queue_request: %d : write -> FD = %d\n", i, rp->fd); + break; + case _AIO_OP_CLOSE: + debug(43, 3) ("aio_queue_request: %d : close -> FD = %d\n", i, rp->fd); + break; + case _AIO_OP_UNLINK: + debug(43, 3) ("aio_queue_request: %d : unlink -> %s\n", i, rp->path); + break; + case _AIO_OP_STAT: + debug(43, 3) ("aio_queue_request: %d : stat -> %s\n", i, rp->path); + break; + default: + debug(43, 1) ("aio_queue_request: %d : Unimplemented request type: %d\n", i, rp->request_type); + break; + } + if ((rp = rp->next) == NULL) + break; } last_warn = squid_curtime; } @@ -431,66 +427,116 @@ } } /* aio_queue_request */ + +static void +aio_process_request_queue(void) +{ + aio_thread_t *threadp; + aio_request_t *requestp; + + for (;;) { + if (wait_threads == NULL || request_queue_head == NULL) + return; + + requestp = request_queue_head; + if ((request_queue_head = requestp->next) == NULL) + request_queue_tail = NULL; + requestp->next = NULL; + request_queue_len--; + + if (requestp->cancelled) { + aio_cleanup_request(requestp); + continue; + } + threadp = wait_threads; + wait_threads = threadp->next; + threadp->next = NULL; + + if (busy_threads_head != NULL) + busy_threads_tail->next = threadp; + else + busy_threads_head = threadp; + busy_threads_tail = threadp; + + threadp->status = _THREAD_BUSY; + threadp->req = threadp->processed_req = requestp; + pthread_cond_signal(&(threadp->cond)); +#if AIO_PROPER_MUTEX + pthread_mutex_unlock(&threadp->mutex); +#endif + } +} /* aio_process_request_queue */ + + static void -aio_cleanup_request(aio_request_t * request) +aio_cleanup_request(aio_request_t * requestp) { - aio_result_t *resultp = request->resultp; - int cancelled = request->cancelled; + aio_result_t *resultp = requestp->resultp; + int cancelled = requestp->cancelled; /* Free allocated structures and copy data back to user space if the */ /* request hasn't been cancelled */ - switch (request->request_type) { + switch (requestp->request_type) { case _AIO_OP_STAT: - if (!cancelled && request->ret == 0) - xmemcpy(request->statp, request->tmpstatp, sizeof(struct stat)); - xfree(request->tmpstatp); + if (!cancelled && requestp->ret == 0) + xmemcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat)); + xfree(requestp->tmpstatp); case _AIO_OP_OPEN: - if (cancelled && request->ret >= 0) + if (cancelled && requestp->ret >= 0) /* The open() was cancelled but completed */ - close(request->ret); - xfree(request->path); + close(requestp->ret); + xfree(requestp->path); break; case _AIO_OP_CLOSE: - if (cancelled && request->ret < 0) + if (cancelled && requestp->ret < 0) /* The close() was cancelled and never got executed */ - close(request->fd); + close(requestp->fd); break; case _AIO_OP_UNLINK: case _AIO_OP_OPENDIR: - xfree(request->path); + xfree(requestp->path); break; case _AIO_OP_READ: - if (!cancelled && request->ret > 0) - xmemcpy(request->bufferp, request->tmpbufp, request->ret); - xfree(request->tmpbufp); - break; + if (!cancelled && requestp->ret > 0) + xmemcpy(requestp->bufferp, requestp->tmpbufp, requestp->ret); case _AIO_OP_WRITE: + xfree(requestp->tmpbufp); break; default: break; } if (resultp != NULL && !cancelled) { - resultp->aio_return = request->ret; - resultp->aio_errno = request->err; + resultp->aio_return = requestp->ret; + resultp->aio_errno = requestp->err; } - memPoolFree(aio_request_pool, request); + memPoolFree(aio_request_pool, requestp); } /* aio_cleanup_request */ int aio_cancel(aio_result_t * resultp) { - aio_request_t *request; - dlink_node *node; + aio_thread_t *threadp; + aio_request_t *requestp; - for (node = requests.head; node != NULL; node = node->next) { - request = node->data; - if (request->resultp == resultp) { - request->cancelled = 1; - request->resultp = NULL; + for (threadp = busy_threads_head; threadp != NULL; threadp = threadp->next) + if (threadp->processed_req->resultp == resultp) { + threadp->processed_req->cancelled = 1; + threadp->processed_req->resultp = NULL; + return 0; + } + for (requestp = request_queue_head; requestp != NULL; requestp = requestp->next) + if (requestp->resultp == resultp) { + requestp->cancelled = 1; + requestp->resultp = NULL; + return 0; + } + for (requestp = request_done_head; requestp != NULL; requestp = requestp->next) + if (requestp->resultp == resultp) { + requestp->cancelled = 1; + requestp->resultp = NULL; return 0; } - } return 1; } /* aio_cancel */ @@ -498,234 +544,240 @@ int aio_open(const char *path, int oflag, mode_t mode, aio_result_t * resultp) { - aio_request_t *request; + aio_request_t *requestp; int len; if (!aio_initialised) aio_init(); - if ((request = memPoolAlloc(aio_request_pool)) == NULL) { + if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { errno = ENOMEM; return -1; } len = strlen(path) + 1; - if ((request->path = (char *) xmalloc(len)) == NULL) { - memPoolFree(aio_request_pool, request); + if ((requestp->path = (char *) xmalloc(len)) == NULL) { + memPoolFree(aio_request_pool, requestp); errno = ENOMEM; return -1; } - strncpy(request->path, path, len); - request->oflag = oflag; - request->mode = mode; - request->resultp = resultp; - request->request_type = _AIO_OP_OPEN; - request->cancelled = 0; + strncpy(requestp->path, path, len); + requestp->oflag = oflag; + requestp->mode = mode; + requestp->resultp = resultp; + requestp->request_type = _AIO_OP_OPEN; + requestp->cancelled = 0; - aio_queue_request(request); + aio_do_request(requestp); return 0; } static void -aio_do_open(aio_request_t * request) +aio_do_open(aio_request_t * requestp) { - request->ret = open(request->path, request->oflag, request->mode); - request->err = errno; + requestp->ret = open(requestp->path, requestp->oflag, requestp->mode); + requestp->err = errno; } int aio_read(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t * resultp) { - aio_request_t *request; + aio_request_t *requestp; if (!aio_initialised) aio_init(); - if ((request = memPoolAlloc(aio_request_pool)) == NULL) { + if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { errno = ENOMEM; return -1; } - request->fd = fd; - request->bufferp = bufp; - if ((request->tmpbufp = (char *) xmalloc(bufs)) == NULL) { - memPoolFree(aio_request_pool, request); + requestp->fd = fd; + requestp->bufferp = bufp; + if ((requestp->tmpbufp = (char *) xmalloc(bufs)) == NULL) { + memPoolFree(aio_request_pool, requestp); errno = ENOMEM; return -1; } - request->buflen = bufs; - request->offset = offset; - request->whence = whence; - request->resultp = resultp; - request->request_type = _AIO_OP_READ; - request->cancelled = 0; + requestp->buflen = bufs; + requestp->offset = offset; + requestp->whence = whence; + requestp->resultp = resultp; + requestp->request_type = _AIO_OP_READ; + requestp->cancelled = 0; - aio_queue_request(request); + aio_do_request(requestp); return 0; } static void -aio_do_read(aio_request_t * request) +aio_do_read(aio_request_t * requestp) { - lseek(request->fd, request->offset, request->whence); - request->ret = read(request->fd, request->tmpbufp, request->buflen); - request->err = errno; + lseek(requestp->fd, requestp->offset, requestp->whence); + requestp->ret = read(requestp->fd, requestp->tmpbufp, requestp->buflen); + requestp->err = errno; } int aio_write(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t * resultp) { - aio_request_t *request; + aio_request_t *requestp; if (!aio_initialised) aio_init(); - if ((request = memPoolAlloc(aio_request_pool)) == NULL) { + if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { + errno = ENOMEM; + return -1; + } + requestp->fd = fd; + if ((requestp->tmpbufp = (char *) xmalloc(bufs)) == NULL) { + memPoolFree(aio_request_pool, requestp); errno = ENOMEM; return -1; } - request->fd = fd; - request->tmpbufp = NULL; - request->bufferp = bufp; - request->buflen = bufs; - request->offset = offset; - request->whence = whence; - request->resultp = resultp; - request->request_type = _AIO_OP_WRITE; - request->cancelled = 0; + xmemcpy(requestp->tmpbufp, bufp, bufs); + requestp->buflen = bufs; + requestp->offset = offset; + requestp->whence = whence; + requestp->resultp = resultp; + requestp->request_type = _AIO_OP_WRITE; + requestp->cancelled = 0; - aio_queue_request(request); + aio_do_request(requestp); return 0; } static void -aio_do_write(aio_request_t * request) +aio_do_write(aio_request_t * requestp) { - request->ret = write(request->fd, request->bufferp, request->buflen); - request->err = errno; + requestp->ret = write(requestp->fd, requestp->tmpbufp, requestp->buflen); + requestp->err = errno; } int aio_close(int fd, aio_result_t * resultp) { - aio_request_t *request; + aio_request_t *requestp; if (!aio_initialised) aio_init(); - if ((request = memPoolAlloc(aio_request_pool)) == NULL) { + if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { errno = ENOMEM; return -1; } - request->fd = fd; - request->resultp = resultp; - request->request_type = _AIO_OP_CLOSE; - request->cancelled = 0; + requestp->fd = fd; + requestp->resultp = resultp; + requestp->request_type = _AIO_OP_CLOSE; + requestp->cancelled = 0; - aio_queue_request(request); + aio_do_request(requestp); return 0; } static void -aio_do_close(aio_request_t * request) +aio_do_close(aio_request_t * requestp) { - request->ret = close(request->fd); - request->err = errno; + requestp->ret = close(requestp->fd); + requestp->err = errno; } int aio_stat(const char *path, struct stat *sb, aio_result_t * resultp) { - aio_request_t *request; + aio_request_t *requestp; int len; if (!aio_initialised) aio_init(); - if ((request = memPoolAlloc(aio_request_pool)) == NULL) { + if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { errno = ENOMEM; return -1; } len = strlen(path) + 1; - if ((request->path = (char *) xmalloc(len)) == NULL) { - memPoolFree(aio_request_pool, request); + if ((requestp->path = (char *) xmalloc(len)) == NULL) { + memPoolFree(aio_request_pool, requestp); errno = ENOMEM; return -1; } - strncpy(request->path, path, len); - request->statp = sb; - if ((request->tmpstatp = (struct stat *) xmalloc(sizeof(struct stat))) == NULL) { - xfree(request->path); - memPoolFree(aio_request_pool, request); + strncpy(requestp->path, path, len); + requestp->statp = sb; + if ((requestp->tmpstatp = (struct stat *) xmalloc(sizeof(struct stat))) == NULL) { + xfree(requestp->path); + memPoolFree(aio_request_pool, requestp); errno = ENOMEM; return -1; } - request->resultp = resultp; - request->request_type = _AIO_OP_STAT; - request->cancelled = 0; + requestp->resultp = resultp; + requestp->request_type = _AIO_OP_STAT; + requestp->cancelled = 0; - aio_queue_request(request); + aio_do_request(requestp); return 0; } static void -aio_do_stat(aio_request_t * request) +aio_do_stat(aio_request_t * requestp) { - request->ret = stat(request->path, request->tmpstatp); - request->err = errno; + requestp->ret = stat(requestp->path, requestp->tmpstatp); + requestp->err = errno; } int aio_unlink(const char *path, aio_result_t * resultp) { - aio_request_t *request; + aio_request_t *requestp; int len; if (!aio_initialised) aio_init(); - if ((request = memPoolAlloc(aio_request_pool)) == NULL) { + if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { errno = ENOMEM; return -1; } len = strlen(path) + 1; - if ((request->path = (char *) xmalloc(len)) == NULL) { - memPoolFree(aio_request_pool, request); + if ((requestp->path = (char *) xmalloc(len)) == NULL) { + memPoolFree(aio_request_pool, requestp); errno = ENOMEM; return -1; } - strncpy(request->path, path, len); - request->resultp = resultp; - request->request_type = _AIO_OP_UNLINK; - request->cancelled = 0; + strncpy(requestp->path, path, len); + requestp->resultp = resultp; + requestp->request_type = _AIO_OP_UNLINK; + requestp->cancelled = 0; - aio_queue_request(request); + aio_do_request(requestp); return 0; } static void -aio_do_unlink(aio_request_t * request) +aio_do_unlink(aio_request_t * requestp) { - request->ret = unlink(request->path); - request->err = errno; + requestp->ret = unlink(requestp->path); + requestp->err = errno; + /* assume that postincrement is an atomic operation. */ + Counter.unlink.requests++; } #if AIO_OPENDIR -/* XXX aio_opendir NOT implemented yet.. */ +/* XXX aio_opendir NOT implemented? */ int aio_opendir(const char *path, aio_result_t * resultp) { - aio_request_t *request; + aio_request_t *requestp; int len; if (!aio_initialised) aio_init(); - if ((request = memPoolAlloc(aio_request_pool)) == NULL) { + if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { errno = ENOMEM; return -1; } @@ -733,78 +785,89 @@ } static void -aio_do_opendir(aio_request_t * request) +aio_do_opendir(aio_request_t * requestp) { /* NOT IMPLEMENTED */ } #endif -static void -aio_poll_queues(void) + +void +aio_poll_threads(void) { - int i; - aio_request_queue_t *queue; - aio_request_t *request; - /* kick "last-overflow" request queue */ - if (request_queue[NR_AIO_REQUEST_QUEUES].head) { - for(i=0; imutex)) + aio_thread_t *prev; + aio_thread_t *threadp; + aio_request_t *requestp; + + do { /* while found completed thread */ + prev = NULL; + threadp = busy_threads_head; + while (threadp) { + debug(43, 9) ("aio_poll_threads: %p: request type %d -> status %d\n", + threadp, + threadp->processed_req->request_type, + threadp->status); +#if AIO_PROPER_MUTEX + if (threadp->req == NULL) + if (pthread_mutex_trylock(&threadp->mutex) == 0) + break; +#else + if (threadp->req == NULL) break; - queue=NULL; - } - if (queue) { - *queue->tailp=request_queue[NR_AIO_REQUEST_QUEUES].head; - queue->tailp=request_queue[NR_AIO_REQUEST_QUEUES].tailp; - request_queue[NR_AIO_REQUEST_QUEUES].head=NULL; - request_queue[NR_AIO_REQUEST_QUEUES].tailp=&request_queue[NR_AIO_REQUEST_QUEUES].head; - if (i==0) - pthread_cond_signal(&queue->cond); - pthread_mutex_unlock(&queue->mutex); - } - } - /* poll done queues */ - for(i=0; ihead && !pthread_mutex_trylock(&queue->mutex)) { - for(request=queue->head;request;request = request->next) { - request_queue_len -= 1; - queue->requests += 1; - } - *done_requests.tailp = queue->head; - done_requests.tailp = queue->tailp; - queue->head=NULL; - queue->tailp = &queue->head; - pthread_mutex_unlock(&queue->mutex); +#endif + prev = threadp; + threadp = threadp->next; } - } -} + if (threadp == NULL) + break; + + if (prev == NULL) + busy_threads_head = busy_threads_head->next; + else + prev->next = threadp->next; + + if (busy_threads_tail == threadp) + busy_threads_tail = prev; + + requestp = threadp->processed_req; + threadp->processed_req = NULL; + + threadp->next = wait_threads; + wait_threads = threadp; + + if (request_done_tail != NULL) + request_done_tail->next = requestp; + else + request_done_head = requestp; + request_done_tail = requestp; + } while (threadp); + + aio_process_request_queue(); +} /* aio_poll_threads */ aio_result_t * aio_poll_done(void) { - aio_request_t *request; + aio_request_t *requestp; aio_result_t *resultp; int cancelled; - if (done_requests.head == NULL) { - aio_poll_queues(); - } AIO_REPOLL: - if (done_requests.head == NULL) { + aio_poll_threads(); + if (request_done_head == NULL) { return NULL; } - request = done_requests.head; - done_requests.head = request->next; - if (!done_requests.head) - done_requests.tailp = &done_requests.head; - resultp = request->resultp; - cancelled = request->cancelled; - aio_debug(request); - debug(43, 5) ("DONE: %d -> %d\n", request->ret, request->err); - aio_cleanup_request(request); - dlinkDelete(&request->node, &requests); + requestp = request_done_head; + request_done_head = requestp->next; + if (!request_done_head) + request_done_tail = NULL; + + resultp = requestp->resultp; + cancelled = requestp->cancelled; + aio_debug(requestp); + debug(43, 5) ("DONE: %d -> %d\n", requestp->ret, requestp->err); + aio_cleanup_request(requestp); if (cancelled) goto AIO_REPOLL; return resultp; @@ -813,7 +876,7 @@ int aio_operations_pending(void) { - return requests.head != NULL; + return request_queue_len + (request_done_head != NULL) + (busy_threads_head != NULL); } int @@ -833,9 +896,10 @@ int aio_sync(void) { - /* XXX This might take a while if the queue is large.. */ + int loop_count = 0; do { - aio_poll_queues(); + aio_poll_threads(); + assert(++loop_count < 10); } while (request_queue_len > 0); return aio_operations_pending(); } @@ -847,23 +911,23 @@ } static void -aio_debug(aio_request_t * request) +aio_debug(aio_request_t * requestp) { - switch (request->request_type) { + switch (requestp->request_type) { case _AIO_OP_OPEN: - debug(43, 5) ("OPEN of %s to FD %d\n", request->path, request->ret); + debug(43, 5) ("OPEN of %s to FD %d\n", requestp->path, requestp->ret); break; case _AIO_OP_READ: - debug(43, 5) ("READ on fd: %d\n", request->fd); + debug(43, 5) ("READ on fd: %d\n", requestp->fd); break; case _AIO_OP_WRITE: - debug(43, 5) ("WRITE on fd: %d\n", request->fd); + debug(43, 5) ("WRITE on fd: %d\n", requestp->fd); break; case _AIO_OP_CLOSE: - debug(43, 5) ("CLOSE of fd: %d\n", request->fd); + debug(43, 5) ("CLOSE of fd: %d\n", requestp->fd); break; case _AIO_OP_UNLINK: - debug(43, 5) ("UNLINK of %s\n", request->path); + debug(43, 5) ("UNLINK of %s\n", requestp->path); break; default: break;