--------------------- PatchSet 1732 Date: 2001/03/15 06:04:18 Author: rbcollins Branch: cygwin Tag: (none) Log: awin32 native async ufs fs for win32. It needed a new autoconf test thus the acinclude.m4 and aclocal.m4 Members: acinclude.m4:1.1->1.1.2.1 aclocal.m4:1.1->1.1.2.1 configure.in:1.8.2.1->1.8.2.2 include/snmp-internal.h:1.2->1.2.28.1 src/fs/awin32/Makefile.in:1.1->1.1.2.1 src/fs/awin32/aiops.c:1.1->1.1.2.1 src/fs/awin32/async_io.c:1.1->1.1.2.1 src/fs/awin32/store_asyncufs.h:1.1->1.1.2.1 src/fs/awin32/store_dir_aufs.c:1.1->1.1.2.1 src/fs/awin32/store_io_aufs.c:1.1->1.1.2.1 --- /dev/null Wed Feb 14 00:50:05 2007 +++ squid/acinclude.m4 Wed Feb 14 00:51:28 2007 @@ -0,0 +1,43 @@ +AC_DEFUN(AC_PROG_CC_WIN32, [ +dnl figure out how to run CC with access to the win32 api if present +dnl configure that as the CC program, and set ac_cc_win32 to yes if found +dnl or no if not found. We don't simply abort because +dnl some folk may just want to test their environment and fallback with #defines +dnl in their code. +dnl WIN32 may be present with WINE, under cygwin, or under mingw, +dnl or cross compilers targeting those same three targets. +dnl as it happens, I can only test cygwin, so extra input here will be appreciated +dnl send bug reports to Robert Collins + +dnl logic: is CC already configured? if not, call AC_PROG_CC. +dnl if so - try it. If that doesn't work ,try -mwin32. If that doesn't work, fail +dnl +dnl 2001-03-15 - Changed from yes/no to true/false -suggested by Lars J Aas +dnl + +AC_REQUIRE([AC_PROG_CC]) +echo $ECHO_N "checking how to access the Win32 API..." >&6 +AC_TRY_CPP([#include ], [ +dnl found windows.h with the current config. +echo "${ECHO_T}Win32 API found by default" >&6 +ac_cc_win32=true +], [ +dnl try -mwin32 +save_cpp="$ac_cpp" +ac_cpp="$ac_cpp -mwin32" +AC_TRY_CPP([#include ], [ +dnl found windows.h using -mwin32 +echo "${ECHO_T}Win32 API found via -mwin32" >&6 +ac_cc_win32=true +ac_compile="$ac_compile -mwin32" +ac_link="$ac_link -mwin32" +CC="$CC -mwin32" +], [ +ac_cc_win32=false +ac_cpp="$save_cpp" +echo "${ECHO_T}Win32 API Not found" >&6 +]) +]) + +AC_PROVIDE([$0]) +]) --- /dev/null Wed Feb 14 00:50:05 2007 +++ squid/aclocal.m4 Wed Feb 14 00:51:28 2007 @@ -0,0 +1,56 @@ +dnl aclocal.m4 generated automatically by aclocal 1.4 + +dnl Copyright (C) 1994, 1995-8, 1999 Free Software Foundation, Inc. +dnl This file is free software; the Free Software Foundation +dnl gives unlimited permission to copy and/or distribute it, +dnl with or without modifications, as long as this notice is preserved. + +dnl This program is distributed in the hope that it will be useful, +dnl but WITHOUT ANY WARRANTY, to the extent permitted by law; without +dnl even the implied warranty of MERCHANTABILITY or FITNESS FOR A +dnl PARTICULAR PURPOSE. + +AC_DEFUN(AC_PROG_CC_WIN32, [ +dnl figure out how to run CC with access to the win32 api if present +dnl configure that as the CC program, and set ac_cc_win32 to yes if found +dnl or no if not found. We don't simply abort because +dnl some folk may just want to test their environment and fallback with #defines +dnl in their code. +dnl WIN32 may be present with WINE, under cygwin, or under mingw, +dnl or cross compilers targeting those same three targets. +dnl as it happens, I can only test cygwin, so extra input here will be appreciated +dnl send bug reports to Robert Collins + +dnl logic: is CC already configured? if not, call AC_PROG_CC. +dnl if so - try it. If that doesn't work ,try -mwin32. If that doesn't work, fail +dnl +dnl 2001-03-15 - Changed from yes/no to true/false -suggested by Lars J Aas +dnl + +AC_REQUIRE([AC_PROG_CC]) +echo $ECHO_N "checking how to access the Win32 API..." >&6 +AC_TRY_CPP([#include ], [ +dnl found windows.h with the current config. +echo "${ECHO_T}Win32 API found by default" >&6 +ac_cc_win32=true +], [ +dnl try -mwin32 +save_cpp="$ac_cpp" +ac_cpp="$ac_cpp -mwin32" +AC_TRY_CPP([#include ], [ +dnl found windows.h using -mwin32 +echo "${ECHO_T}Win32 API found via -mwin32" >&6 +ac_cc_win32=true +ac_compile="$ac_compile -mwin32" +ac_link="$ac_link -mwin32" +CC="$CC -mwin32" +], [ +ac_cc_win32=false +ac_cpp="$save_cpp" +echo "${ECHO_T}Win32 API Not found" >&6 +]) +]) + +AC_PROVIDE([$0]) +]) + Index: squid/configure.in =================================================================== RCS file: /cvsroot/squid-sf//squid/configure.in,v retrieving revision 1.8.2.1 retrieving revision 1.8.2.2 diff -u -r1.8.2.1 -r1.8.2.2 --- squid/configure.in 14 Mar 2001 22:17:30 -0000 1.8.2.1 +++ squid/configure.in 15 Mar 2001 06:04:18 -0000 1.8.2.2 @@ -3,13 +3,13 @@ dnl dnl Duane Wessels, wessels@nlanr.net, February 1996 (autoconf v2.9) dnl -dnl $Id: configure.in,v 1.8.2.1 2001/03/14 22:17:30 rbcollins Exp $ +dnl $Id: configure.in,v 1.8.2.2 2001/03/15 06:04:18 rbcollins Exp $ dnl dnl dnl AC_INIT(src/main.c) AC_CONFIG_HEADER(include/autoconf.h) -AC_REVISION($Revision: 1.8.2.1 $)dnl +AC_REVISION($Revision: 1.8.2.2 $)dnl AC_PREFIX_DEFAULT(/usr/local/squid) AC_CONFIG_AUX_DIR(cfgaux) @@ -63,6 +63,9 @@ dnl Check for GNU cc AC_PROG_CC +dnl Do we have the Win32 API available to the cc? +AC_PROG_CC_WIN32 + dnl Gerben Wierda case "$host" in mab-next-nextstep3) Index: squid/include/snmp-internal.h =================================================================== RCS file: /cvsroot/squid-sf//squid/include/snmp-internal.h,v retrieving revision 1.2 retrieving revision 1.2.28.1 diff -u -r1.2 -r1.2.28.1 --- squid/include/snmp-internal.h 23 Oct 2000 15:04:18 -0000 1.2 +++ squid/include/snmp-internal.h 15 Mar 2001 06:04:19 -0000 1.2.28.1 @@ -24,7 +24,7 @@ * ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS * SOFTWARE. * - * $Id: snmp-internal.h,v 1.2 2000/10/23 15:04:18 hno Exp $ + * $Id: snmp-internal.h,v 1.2.28.1 2001/03/15 06:04:19 rbcollins Exp $ * **********************************************************************/ @@ -33,9 +33,9 @@ #define SNMP_MAX_LEN 484 #ifdef DEBUG -#define ERROR(string) printf("%s(%d): %s\n",__FILE__, __LINE__, string); +#define SNMP_ERROR(string) printf("%s(%d): %s\n",__FILE__, __LINE__, string); #else -#define ERROR(string) +#define SNMP_ERROR(string) #endif #endif /* _SNMP_INTERNAL_H_ */ --- /dev/null Wed Feb 14 00:50:05 2007 +++ squid/src/fs/awin32/Makefile.in Wed Feb 14 00:51:28 2007 @@ -0,0 +1,59 @@ +# +# Makefile for the AUFS storage driver for the Squid Object Cache server +# +# $Id: Makefile.in,v 1.1.2.1 2001/03/15 06:04:19 rbcollins Exp $ +# + +FS = awin32 + +top_srcdir = @top_srcdir@ +VPATH = @srcdir@ + +CC = @CC@ +MAKEDEPEND = @MAKEDEPEND@ +AR_R = @AR_R@ +RANLIB = @RANLIB@ +AC_CFLAGS = @CFLAGS@ +SHELL = /bin/sh + +INCLUDE = -I../../../include -I$(top_srcdir)/include -I$(top_srcdir)/src/ +CFLAGS = $(AC_CFLAGS) $(INCLUDE) $(DEFINES) + +OUT = ../$(FS).a + +OBJS = \ + aiops.o \ + async_io.o \ + store_dir_aufs.o \ + store_io_aufs.o + + +all: $(OUT) + +$(OUT): $(OBJS) + @rm -f ../stamp + $(AR_R) $(OUT) $(OBJS) + $(RANLIB) $(OUT) + +$(OBJS): $(top_srcdir)/include/version.h ../../../include/autoconf.h +$(OBJS): store_asyncufs.h + +.c.o: + @rm -f ../stamp + $(CC) $(CFLAGS) -c $< + +clean: + -rm -rf *.o *pure_* core ../$(FS).a + +distclean: clean + -rm -f Makefile + -rm -f Makefile.bak + -rm -f tags + +install: + +tags: + ctags *.[ch] $(top_srcdir)/src/*.[ch] $(top_srcdir)/include/*.h $(top_srcdir)/lib/*.[ch] + +depend: + $(MAKEDEPEND) $(INCLUDE) -fMakefile *.c --- /dev/null Wed Feb 14 00:50:05 2007 +++ squid/src/fs/awin32/aiops.c Wed Feb 14 00:51:28 2007 @@ -0,0 +1,1086 @@ +/* + * $Id: aiops.c,v 1.1.2.1 2001/03/15 06:04:19 rbcollins Exp $ + * + * DEBUG: section 43 AIOPS + * AUTHOR: Stewart Forster + * + * SQUID Web Proxy Cache http://www.squid-cache.org/ + * ---------------------------------------------------------- + * + * Squid is the result of efforts by numerous individuals from + * the Internet community; see the CONTRIBUTORS file for full + * details. Many organizations have provided support for Squid's + * development; see the SPONSORS file for full details. Squid is + * Copyrighted (C) 2001 by the Regents of the University of + * California; see the COPYRIGHT file for full details. Squid + * incorporates software developed and/or copyrighted by other + * sources; see the CREDITS file for full details. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. + * + */ + +#include "squid.h" +#include +#include "store_asyncufs.h" + + +#include +#include +#include +#include +//#include +#include +#include +#include +//#if HAVE_SCHED_H +//#include +//#endif + +#define RIDICULOUS_LENGTH 4096 + +enum _aio_thread_status { + _THREAD_STARTING = 0, + _THREAD_WAITING, + _THREAD_BUSY, + _THREAD_FAILED, + _THREAD_DONE +}; + +enum _aio_request_type { + _AIO_OP_NONE = 0, + _AIO_OP_OPEN, + _AIO_OP_READ, + _AIO_OP_WRITE, + _AIO_OP_CLOSE, + _AIO_OP_UNLINK, + _AIO_OP_TRUNCATE, + _AIO_OP_OPENDIR, + _AIO_OP_STAT +}; + +typedef struct aio_request_t { + struct aio_request_t *next; + enum _aio_request_type request_type; + int cancelled; + char *path; + int oflag; + mode_t mode; + int fd; + char *bufferp; + char *tmpbufp; + int buflen; + off_t offset; + int whence; + int ret; + int err; + struct stat *tmpstatp; + struct stat *statp; + aio_result_t *resultp; +} aio_request_t; + +typedef struct aio_request_queue_t { + HANDLE mutex; /* pthread_mutex_t mutex; */ + HANDLE cond; /*pthread_cond_t cond; */ /* See Event objects */ + aio_request_t *volatile head; + aio_request_t *volatile *volatile tailp; + unsigned long requests; + unsigned long blocked; /* main failed to lock the queue */ +} aio_request_queue_t; + +typedef struct aio_thread_t aio_thread_t; +struct aio_thread_t { + aio_thread_t *next; + HANDLE thread; /* pthread_t thread; */ + DWORD dwThreadId; /* thread ID */ + enum _aio_thread_status status; + struct aio_request_t *current_req; + unsigned long requests; +}; + +int aio_cancel(aio_result_t *); +int aio_open(const char *, int, mode_t, aio_result_t *); +int aio_read(int, char *, int, off_t, int, aio_result_t *); +int aio_write(int, char *, int, off_t, int, aio_result_t *); +int aio_close(int, aio_result_t *); +int aio_unlink(const char *, aio_result_t *); +int aio_truncate(const char *, off_t length, aio_result_t *); +int aio_opendir(const char *, aio_result_t *); +aio_result_t *aio_poll_done(); +int aio_sync(void); + +static void aio_init(void); +static void aio_queue_request(aio_request_t *); +static void aio_cleanup_request(aio_request_t *); +/* static void *aio_thread_loop(void *); */ +static DWORD WINAPI aio_thread_loop( LPVOID lpParam ); +static void aio_do_open(aio_request_t *); +static void aio_do_read(aio_request_t *); +static void aio_do_write(aio_request_t *); +static void aio_do_close(aio_request_t *); +static void aio_do_stat(aio_request_t *); +static void aio_do_unlink(aio_request_t *); +static void aio_do_truncate(aio_request_t *); +#if AIO_OPENDIR +static void *aio_do_opendir(aio_request_t *); +#endif +static void aio_debug(aio_request_t *); +static void aio_poll_queues(void); + +static aio_thread_t *threads = NULL; +static int aio_initialised = 0; + + +#define AIO_LARGE_BUFS 16384 +#define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1 +#define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2 +#define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3 +#define AIO_MICRO_BUFS 128 + +static MemPool *aio_large_bufs = NULL; /* 16K */ +static MemPool *aio_medium_bufs = NULL; /* 8K */ +static MemPool *aio_small_bufs = NULL; /* 4K */ +static MemPool *aio_tiny_bufs = NULL; /* 2K */ +static MemPool *aio_micro_bufs = NULL; /* 128K */ + +static int request_queue_len = 0; +static MemPool *aio_request_pool = NULL; +static MemPool *aio_thread_pool = NULL; +static aio_request_queue_t request_queue; +static struct { + aio_request_t *head, **tailp; +} request_queue2 = { + + NULL, &request_queue2.head +}; +static aio_request_queue_t done_queue; +static struct { + aio_request_t *head, **tailp; +} done_requests = { + + NULL, &done_requests.head +}; +/* static pthread_attr_t globattr; The attributes of a created thread */ +/*static struct sched_param globsched; */ +/* static pthread_t main_thread; */ +static HANDLE main_thread; + +static MemPool * +aio_get_pool(int size) +{ + MemPool *p; + if (size <= AIO_LARGE_BUFS) { + if (size <= AIO_MICRO_BUFS) + p = aio_micro_bufs; + else if (size <= AIO_TINY_BUFS) + p = aio_tiny_bufs; + else if (size <= AIO_SMALL_BUFS) + p = aio_small_bufs; + else if (size <= AIO_MEDIUM_BUFS) + p = aio_medium_bufs; + else + p = aio_large_bufs; + } else + p = NULL; + return p; +} + +static void * +aio_xmalloc(int size) +{ + void *p; + MemPool *pool; + + if ((pool = aio_get_pool(size)) != NULL) { + p = memPoolAlloc(pool); + } else + p = xmalloc(size); + + return p; +} + +static char * +aio_xstrdup(const char *str) +{ + char *p; + int len = strlen(str) + 1; + + p = aio_xmalloc(len); + strncpy(p, str, len); + + return p; +} + +static void +aio_xfree(void *p, int size) +{ + MemPool *pool; + + if ((pool = aio_get_pool(size)) != NULL) { + memPoolFree(pool, p); + } else + xfree(p); +} + +static void +aio_xstrfree(char *str) +{ + MemPool *pool; + int len = strlen(str) + 1; + + if ((pool = aio_get_pool(len)) != NULL) { + memPoolFree(pool, str); + } else + xfree(str); +} + +static void +aio_init(void) +{ + int i; + aio_thread_t *threadp; + + if (aio_initialised) + return; + + /* pthread_attr_init(&globattr); */ +/* contend for resources system wide */ +#if HAVE_PTHREAD_ATTR_SETSCOPE + pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM); +#endif +/* globsched.sched_priority = 1; */ +/* + main_thread = pthread_self(); +*/ + if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */ + GetCurrentThread(), /* pseudo handle to copy */ + GetCurrentProcess(), /* pseudo handle, don't close */ + &main_thread, + 0, /* required access */ + FALSE, /* child process's don't inherit the handle */ + DUPLICATE_SAME_ACCESS)) { + /* spit errors */ + fatal("couldn't get current thread handle\n"); + } + /* FIXME: where do we CloseHandle the main_thread ?*/ +/* FIXME set the current thread to priority 1 */ +#if HAVE_PTHREAD_SETSCHEDPARAM + pthread_setschedparam(main_thread, SCHED_OTHER, &globsched); +#endif +/* FIXME set new threads to priority 2 + globsched.sched_priority = 2; +#if HAVE_PTHREAD_ATTR_SETSCHEDPARAM + pthread_attr_setschedparam(&globattr, &globsched); +#endif +*/ + + /* Initialize request queue */ + /*if (pthread_mutex_init(&(request_queue.mutex), NULL)) + fatal("Failed to create mutex");*/ + if ((request_queue.mutex=CreateMutex( NULL, /* no inheritance */ + FALSE, /* start unowned (as per mutex_init) */ + NULL /* no name */ ) + )==NULL) { + fatal("failed to create mutex\n"); + } + /* if (pthread_cond_init(&(request_queue.cond), NULL)) + fatal("Failed to create condition variable"); */ + if ((request_queue.cond=CreateEvent( NULL, /* no inheritance */ + FALSE, /* auto signal reset - which I think is pthreads like ? */ + FALSE, /* start non signaled */ + NULL /* no name */ ) + )==NULL) { + fatal("failed to create event\n"); + } + request_queue.head = NULL; + request_queue.tailp = &request_queue.head; + request_queue.requests = 0; + request_queue.blocked = 0; + + /* Initialize done queue */ + /* if (pthread_mutex_init(&(done_queue.mutex), NULL)) + fatal("Failed to create mutex"); + if (pthread_cond_init(&(done_queue.cond), NULL)) + fatal("Failed to create condition variable"); */ + + if ((done_queue.mutex=CreateMutex( NULL, /* no inheritance */ + FALSE, /* start unowned (as per mutex_init) */ + NULL /* no name */ ) + )==NULL) { + fatal("failed to create mutex\n"); + } + if ((done_queue.cond=CreateEvent( NULL, /* no inheritance */ + TRUE, /* manually signaled - which I think is pthreads like ? */ + FALSE, /* start non signaled */ + NULL /* no name */ ) + )==NULL) { + fatal("failed to create event\n"); + } + done_queue.head = NULL; + done_queue.tailp = &done_queue.head; + done_queue.requests = 0; + done_queue.blocked = 0; + + /* Create threads and get them to sit in their wait loop */ + aio_thread_pool = memPoolCreate("aio_thread", sizeof(aio_thread_t)); + for (i = 0; i < NUMTHREADS; i++) { + threadp = memPoolAlloc(aio_thread_pool); + threadp->status = _THREAD_STARTING; + threadp->current_req = NULL; + threadp->requests = 0; + threadp->next = threads; + threads = threadp; +/* if (pthread_create(&threadp->thread, &globattr, aio_thread_loop, threadp)) { + fprintf(stderr, "Thread creation failed\n"); + threadp->status = _THREAD_FAILED; + continue; + }*/ + if ((threadp->thread = CreateThread( + NULL, // no security attributes + 0, // use default stack size + aio_thread_loop, // thread function + threadp, // argument to thread function + 0, // use default creation flags + &(threadp->dwThreadId)) // returns the thread identifier + )==NULL) { + fprintf(stderr, "Thread creation failed\n"); + threadp->status = _THREAD_FAILED; + continue; + } + } + + /* Create request pool */ + aio_request_pool = memPoolCreate("aio_request", sizeof(aio_request_t)); + aio_large_bufs = memPoolCreate("aio_large_bufs", AIO_LARGE_BUFS); + aio_medium_bufs = memPoolCreate("aio_medium_bufs", AIO_MEDIUM_BUFS); + aio_small_bufs = memPoolCreate("aio_small_bufs", AIO_SMALL_BUFS); + aio_tiny_bufs = memPoolCreate("aio_tiny_bufs", AIO_TINY_BUFS); + aio_micro_bufs = memPoolCreate("aio_micro_bufs", AIO_MICRO_BUFS); + + aio_initialised = 1; +} + + +/* static void * +aio_thread_loop(void *ptr) */ +static DWORD WINAPI +aio_thread_loop( LPVOID lpParam ) +{ + aio_thread_t *threadp = lpParam; + aio_request_t *request; + HANDLE cond; /* local copy of the event queue */ +/* sigset_t new; */ + + /* + * Does WIN32 have this problem? + */ +#if 0 + + /* + * Make sure to ignore signals which may possibly get sent to + * the parent squid thread. Causes havoc with mutex's and + * condition waits otherwise + */ + + sigemptyset(&new); + sigaddset(&new, SIGPIPE); + sigaddset(&new, SIGCHLD); +#ifdef _SQUID_LINUX_THREADS_ + sigaddset(&new, SIGQUIT); + sigaddset(&new, SIGTRAP); +#else + sigaddset(&new, SIGUSR1); + sigaddset(&new, SIGUSR2); +#endif + sigaddset(&new, SIGHUP); + sigaddset(&new, SIGTERM); + sigaddset(&new, SIGINT); + sigaddset(&new, SIGALRM); + pthread_sigmask(SIG_BLOCK, &new, NULL); +#endif + + /* lock the thread info */ + if (WAIT_FAILED == WaitForSingleObject( + request_queue.mutex, // handle to object + INFINITE // time-out interval + )) { + /* wait failed.... what to do ? */ + } + /* duplicate the handle */ + if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */ + request_queue.cond, /* handle to copy */ + GetCurrentProcess(), /* pseudo handle, don't close */ + &cond, + 0, /* required access */ + FALSE, /* child process's don't inherit the handle */ + DUPLICATE_SAME_ACCESS)) + exit(1); + if (!ReleaseMutex(request_queue.mutex)) { + CloseHandle(cond); + exit(1); + } + + + while (1) { + DWORD rv; + threadp->current_req = request = NULL; + request = NULL; + /* Get a request to process */ + threadp->status = _THREAD_WAITING; + /* pthread_mutex_lock(&request_queue.mutex);*/ + rv = WaitForSingleObject( + request_queue.mutex, // handle to object + INFINITE // time-out interval + ); + if (rv == WAIT_FAILED) { + /* wait failed.... what to do ? */ + } + + + while (!request_queue.head) { + /* + pthread_cond_wait(&request_queue.cond, &request_queue.mutex); + */ + if (!ReleaseMutex(request_queue.mutex)) { + /* unexpected error */ + } + rv = WaitForSingleObject( + cond, // handle to object + INFINITE // time-out interval + ); + if (rv == WAIT_FAILED) { + /* wait failed.... what to do ? */ + } + rv = WaitForSingleObject( + request_queue.mutex, // handle to object + INFINITE // time-out interval + ); + if (rv == WAIT_FAILED) { + /* wait failed.... what to do ? */ + } + } + request = request_queue.head; + if (request) + request_queue.head = request->next; + if (!request_queue.head) + request_queue.tailp = &request_queue.head; + /* + pthread_mutex_unlock(&request_queue.mutex); + */ + if (!ReleaseMutex(request_queue.mutex)) { + /* unexpected error */ + } + /* process the request */ + threadp->status = _THREAD_BUSY; + request->next = NULL; + threadp->current_req = request; + errno = 0; + if (!request->cancelled) { + switch (request->request_type) { + case _AIO_OP_OPEN: + aio_do_open(request); + break; + case _AIO_OP_READ: + aio_do_read(request); + break; + case _AIO_OP_WRITE: + aio_do_write(request); + break; + case _AIO_OP_CLOSE: + aio_do_close(request); + break; + case _AIO_OP_UNLINK: + aio_do_unlink(request); + break; + case _AIO_OP_TRUNCATE: + aio_do_truncate(request); + break; +#if AIO_OPENDIR /* Opendir not implemented yet */ + case _AIO_OP_OPENDIR: + aio_do_opendir(request); + break; +#endif + case _AIO_OP_STAT: + aio_do_stat(request); + break; + default: + request->ret = -1; + request->err = EINVAL; + break; + } + } else { /* cancelled */ + request->ret = -1; + request->err = EINTR; + } + threadp->status = _THREAD_DONE; + /* put the request in the done queue */ + /* + pthread_mutex_lock(&done_queue.mutex); + */ + rv = WaitForSingleObject( + done_queue.mutex, // handle to object + INFINITE // time-out interval + ); + if (rv == WAIT_FAILED) { + /* wait failed.... what to do ? */ + } + *done_queue.tailp = request; + done_queue.tailp = &request->next; + /* + pthread_mutex_unlock(&done_queue.mutex); + */ + if (!ReleaseMutex(done_queue.mutex)) { + /* unexpected error */ + } + threadp->requests++; + } /* while forever */ + CloseHandle(cond); + return 0; +} /* aio_thread_loop */ + +static void +aio_queue_request(aio_request_t * request) +{ + static int high_start = 0; + debug(41, 9) ("aio_queue_request: %p type=%d result=%p\n", + request, request->request_type, request->resultp); + /* Mark it as not executed (failing result, no error) */ + request->ret = -1; + request->err = 0; + /* Internal housekeeping */ + request_queue_len += 1; + request->resultp->_data = request; + /* Play some tricks with the request_queue2 queue */ + request->next = NULL; + if (!request_queue2.head) { + /* If queue2 is empty, insert into queue 1 and try to "push" the queue + If the "push" fails, queue it via queue2 */ +/* + if (pthread_mutex_trylock(&request_queue.mutex) == 0) { +*/ + if (WaitForSingleObject(request_queue.mutex, 0)==WAIT_OBJECT_0) { + /* Normal path */ + *request_queue.tailp = request; + request_queue.tailp = &request->next; + /* this unblocks the next waiting thread we should be the current owner + pthread_cond_signal(&request_queue.cond); + */ + if (!SetEvent(request_queue.cond)) fatal("couldn't push queue\n"); + + + /* pthread_mutex_unlock(&request_queue.mutex); */ + if (!ReleaseMutex(request_queue.mutex)) { + /* unexpected error */ + } + } else { + /* Oops, the request queue is blocked, use request_queue2 */ + *request_queue2.tailp = request; + request_queue2.tailp = &request->next; + } + } else { + /* Secondary path. We have blocked requests to deal with */ + /* add the request to the chain */ + *request_queue2.tailp = request; +/* + if (pthread_mutex_trylock(&request_queue.mutex) == 0) { +*/ + if (WaitForSingleObject(request_queue.mutex, 0)==WAIT_OBJECT_0) { + /* Ok, the queue is no longer blocked */ + *request_queue.tailp = request_queue2.head; + request_queue.tailp = &request->next; + /* + pthread_cond_signal(&request_queue.cond); + pthread_mutex_unlock(&request_queue.mutex); +*/ + if (!SetEvent(request_queue.cond)) fatal("couldn't push queue\n"); + if (!ReleaseMutex(request_queue.mutex)) { + /* unexpected error */ + } + request_queue2.head = NULL; + request_queue2.tailp = &request_queue2.head; + } else { + /* still blocked, bump the blocked request chain */ + request_queue2.tailp = &request->next; + } + } + if (request_queue2.head) { + static int filter = 0; + static int filter_limit = 8; + if (++filter >= filter_limit) { + filter_limit += filter; + filter = 0; + debug(43, 1) ("aio_queue_request: WARNING - Queue congestion\n"); + } + } + /* Warn if out of threads */ + if (request_queue_len > MAGIC1) { + static int last_warn = 0; + static int queue_high, queue_low; + if (high_start == 0) { + high_start = squid_curtime; + queue_high = request_queue_len; + queue_low = request_queue_len; + } + if (request_queue_len > queue_high) + queue_high = request_queue_len; + if (request_queue_len < queue_low) + queue_low = request_queue_len; + if (squid_curtime >= (last_warn + 15) && + squid_curtime >= (high_start + 5)) { + debug(43, 1) ("aio_queue_request: WARNING - Disk I/O overloading\n"); + if (squid_curtime >= (high_start + 15)) + debug(43, 1) ("aio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%d\n", + request_queue_len, queue_high, queue_low, squid_curtime - high_start); + last_warn = squid_curtime; + } + } else { + high_start = 0; + } + /* Warn if seriously overloaded */ + if (request_queue_len > RIDICULOUS_LENGTH) { + debug(43, 0) ("aio_queue_request: Async request queue growing uncontrollably!\n"); + debug(43, 0) ("aio_queue_request: Syncing pending I/O operations.. (blocking)\n"); + aio_sync(); + debug(43, 0) ("aio_queue_request: Synced\n"); + } +} /* aio_queue_request */ + +static void +aio_cleanup_request(aio_request_t * requestp) +{ + aio_result_t *resultp = requestp->resultp; + int cancelled = requestp->cancelled; + + /* Free allocated structures and copy data back to user space if the */ + /* request hasn't been cancelled */ + switch (requestp->request_type) { + case _AIO_OP_STAT: + if (!cancelled && requestp->ret == 0) + xmemcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat)); + aio_xfree(requestp->tmpstatp, sizeof(struct stat)); + aio_xstrfree(requestp->path); + break; + case _AIO_OP_OPEN: + if (cancelled && requestp->ret >= 0) + /* The open() was cancelled but completed */ + close(requestp->ret); + aio_xstrfree(requestp->path); + break; + case _AIO_OP_CLOSE: + if (cancelled && requestp->ret < 0) + /* The close() was cancelled and never got executed */ + close(requestp->fd); + break; + case _AIO_OP_UNLINK: + case _AIO_OP_TRUNCATE: + case _AIO_OP_OPENDIR: + aio_xstrfree(requestp->path); + break; + case _AIO_OP_READ: + if (!cancelled && requestp->ret > 0) + xmemcpy(requestp->bufferp, requestp->tmpbufp, requestp->ret); + aio_xfree(requestp->tmpbufp, requestp->buflen); + break; + case _AIO_OP_WRITE: + aio_xfree(requestp->tmpbufp, requestp->buflen); + break; + default: + break; + } + if (resultp != NULL && !cancelled) { + resultp->aio_return = requestp->ret; + resultp->aio_errno = requestp->err; + } + memPoolFree(aio_request_pool, requestp); +} /* aio_cleanup_request */ + + +int +aio_cancel(aio_result_t * resultp) +{ + aio_request_t *request = resultp->_data; + + if (request && request->resultp == resultp) { + debug(41, 9) ("aio_cancel: %p type=%d result=%p\n", + request, request->request_type, request->resultp); + request->cancelled = 1; + request->resultp = NULL; + resultp->_data = NULL; + return 0; + } + return 1; +} /* aio_cancel */ + + +int +aio_open(const char *path, int oflag, mode_t mode, aio_result_t * resultp) +{ + aio_request_t *requestp; + + if (!aio_initialised) + aio_init(); + requestp = memPoolAlloc(aio_request_pool); + requestp->path = (char *) aio_xstrdup(path); + requestp->oflag = oflag; + requestp->mode = mode; + requestp->resultp = resultp; + requestp->request_type = _AIO_OP_OPEN; + requestp->cancelled = 0; + + aio_queue_request(requestp); + return 0; +} + + +static void +aio_do_open(aio_request_t * requestp) +{ + requestp->ret = open(requestp->path, requestp->oflag, requestp->mode); + requestp->err = errno; +} + + +int +aio_read(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t * resultp) +{ + aio_request_t *requestp; + + if (!aio_initialised) + aio_init(); + requestp = memPoolAlloc(aio_request_pool); + requestp->fd = fd; + requestp->bufferp = bufp; + requestp->tmpbufp = (char *) aio_xmalloc(bufs); + requestp->buflen = bufs; + requestp->offset = offset; + requestp->whence = whence; + requestp->resultp = resultp; + requestp->request_type = _AIO_OP_READ; + requestp->cancelled = 0; + + aio_queue_request(requestp); + return 0; +} + + +static void +aio_do_read(aio_request_t * requestp) +{ + lseek(requestp->fd, requestp->offset, requestp->whence); + requestp->ret = read(requestp->fd, requestp->tmpbufp, requestp->buflen); + requestp->err = errno; +} + + +int +aio_write(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t * resultp) +{ + aio_request_t *requestp; + + if (!aio_initialised) + aio_init(); + requestp = memPoolAlloc(aio_request_pool); + requestp->fd = fd; + requestp->tmpbufp = (char *) aio_xmalloc(bufs); + xmemcpy(requestp->tmpbufp, bufp, bufs); + requestp->buflen = bufs; + requestp->offset = offset; + requestp->whence = whence; + requestp->resultp = resultp; + requestp->request_type = _AIO_OP_WRITE; + requestp->cancelled = 0; + + aio_queue_request(requestp); + return 0; +} + + +static void +aio_do_write(aio_request_t * requestp) +{ + requestp->ret = write(requestp->fd, requestp->tmpbufp, requestp->buflen); + requestp->err = errno; +} + + +int +aio_close(int fd, aio_result_t * resultp) +{ + aio_request_t *requestp; + + if (!aio_initialised) + aio_init(); + requestp = memPoolAlloc(aio_request_pool); + requestp->fd = fd; + requestp->resultp = resultp; + requestp->request_type = _AIO_OP_CLOSE; + requestp->cancelled = 0; + + aio_queue_request(requestp); + return 0; +} + + +static void +aio_do_close(aio_request_t * requestp) +{ + requestp->ret = close(requestp->fd); + requestp->err = errno; +} + + +int +aio_stat(const char *path, struct stat *sb, aio_result_t * resultp) +{ + aio_request_t *requestp; + + if (!aio_initialised) + aio_init(); + requestp = memPoolAlloc(aio_request_pool); + requestp->path = (char *) aio_xstrdup(path); + requestp->statp = sb; + requestp->tmpstatp = (struct stat *) aio_xmalloc(sizeof(struct stat)); + requestp->resultp = resultp; + requestp->request_type = _AIO_OP_STAT; + requestp->cancelled = 0; + + aio_queue_request(requestp); + return 0; +} + + +static void +aio_do_stat(aio_request_t * requestp) +{ + requestp->ret = stat(requestp->path, requestp->tmpstatp); + requestp->err = errno; +} + + +int +aio_unlink(const char *path, aio_result_t * resultp) +{ + aio_request_t *requestp; + + if (!aio_initialised) + aio_init(); + requestp = memPoolAlloc(aio_request_pool); + requestp->path = aio_xstrdup(path); + requestp->resultp = resultp; + requestp->request_type = _AIO_OP_UNLINK; + requestp->cancelled = 0; + + aio_queue_request(requestp); + return 0; +} + + +static void +aio_do_unlink(aio_request_t * requestp) +{ + requestp->ret = unlink(requestp->path); + requestp->err = errno; +} + +int +aio_truncate(const char *path, off_t length, aio_result_t * resultp) +{ + aio_request_t *requestp; + + if (!aio_initialised) + aio_init(); + requestp = memPoolAlloc(aio_request_pool); + requestp->path = (char *) aio_xstrdup(path); + requestp->offset = length; + requestp->resultp = resultp; + requestp->request_type = _AIO_OP_TRUNCATE; + requestp->cancelled = 0; + + aio_queue_request(requestp); + return 0; +} + + +static void +aio_do_truncate(aio_request_t * requestp) +{ + requestp->ret = truncate(requestp->path, requestp->offset); + requestp->err = errno; +} + + +#if AIO_OPENDIR +/* XXX aio_opendir NOT implemented yet.. */ + +int +aio_opendir(const char *path, aio_result_t * resultp) +{ + aio_request_t *requestp; + int len; + + if (!aio_initialised) + aio_init(); + requestp = memPoolAlloc(aio_request_pool); + return -1; +} + +static void +aio_do_opendir(aio_request_t * requestp) +{ + /* NOT IMPLEMENTED */ +} + +#endif + +static void +aio_poll_queues(void) +{ + /* kick "overflow" request queue */ + if (request_queue2.head && +/* + pthread_mutex_trylock(&request_queue.mutex) == 0) { +*/ + (WaitForSingleObject(request_queue.mutex, 0)==WAIT_OBJECT_0)) { + *request_queue.tailp = request_queue2.head; + request_queue.tailp = request_queue2.tailp; + /* + pthread_cond_signal(&request_queue.cond); + pthread_mutex_unlock(&request_queue.mutex); + */ + if (!SetEvent(request_queue.cond)) fatal("couldn't push queue\n"); + if (!ReleaseMutex(request_queue.mutex)) { + /* unexpected error */ + } + request_queue2.head = NULL; + request_queue2.tailp = &request_queue2.head; + } + /* poll done queue */ + if (done_queue.head && +/* pthread_mutex_trylock(&done_queue.mutex) == 0) { +*/ + (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) { + struct aio_request_t *requests = done_queue.head; + done_queue.head = NULL; + done_queue.tailp = &done_queue.head; + /* + pthread_mutex_unlock(&done_queue.mutex); + */ + if (!ReleaseMutex(done_queue.mutex)) { + /* unexpected error */ + } + *done_requests.tailp = requests; + request_queue_len -= 1; + while (requests->next) { + requests = requests->next; + request_queue_len -= 1; + } + done_requests.tailp = &requests->next; + } + /* Give up the CPU to allow the threads to do their work */ + /* + * For Andres thoughts about yield(), see + * http://www.squid-cache.org/mail-archive/squid-dev/200012/0001.html + */ +#if 0 + if (done_queue.head || request_queue.head) +#ifndef _SQUID_SOLARIS_ + sched_yield(); +#else + yield(); +#endif +#endif +} + +aio_result_t * +aio_poll_done(void) +{ + aio_request_t *request; + aio_result_t *resultp; + int cancelled; + int polled = 0; + + AIO_REPOLL: + request = done_requests.head; + if (request == NULL && !polled) { + aio_poll_queues(); + polled = 1; + request = done_requests.head; + } + if (!request) { + return NULL; + } + debug(41, 9) ("aio_poll_done: %p type=%d result=%p\n", + request, request->request_type, request->resultp); + done_requests.head = request->next; + if (!done_requests.head) + done_requests.tailp = &done_requests.head; + resultp = request->resultp; + cancelled = request->cancelled; + aio_debug(request); + debug(43, 5) ("DONE: %d -> %d\n", request->ret, request->err); + aio_cleanup_request(request); + if (cancelled) + goto AIO_REPOLL; + return resultp; +} /* aio_poll_done */ + +int +aio_operations_pending(void) +{ + return request_queue_len + (done_requests.head ? 1 : 0); +} + +int +aio_sync(void) +{ + /* XXX This might take a while if the queue is large.. */ + do { + aio_poll_queues(); + } while (request_queue_len > 0); + return aio_operations_pending(); +} + +int +aio_get_queue_len(void) +{ + return request_queue_len; +} + +static void +aio_debug(aio_request_t * request) +{ + switch (request->request_type) { + case _AIO_OP_OPEN: + debug(43, 5) ("OPEN of %s to FD %d\n", request->path, request->ret); + break; + case _AIO_OP_READ: + debug(43, 5) ("READ on fd: %d\n", request->fd); + break; + case _AIO_OP_WRITE: + debug(43, 5) ("WRITE on fd: %d\n", request->fd); + break; + case _AIO_OP_CLOSE: + debug(43, 5) ("CLOSE of fd: %d\n", request->fd); + break; + case _AIO_OP_UNLINK: + debug(43, 5) ("UNLINK of %s\n", request->path); + break; + case _AIO_OP_TRUNCATE: + debug(43, 5) ("UNLINK of %s\n", request->path); + break; + default: + break; + } +} --- /dev/null Wed Feb 14 00:50:05 2007 +++ squid/src/fs/awin32/async_io.c Wed Feb 14 00:51:28 2007 @@ -0,0 +1,365 @@ + +/* + * $Id: async_io.c,v 1.1.2.1 2001/03/15 06:04:19 rbcollins Exp $ + * + * DEBUG: section 32 Asynchronous Disk I/O + * AUTHOR: Pete Bentley + * AUTHOR: Stewart Forster + * + * SQUID Web Proxy Cache http://www.squid-cache.org/ + * ---------------------------------------------------------- + * + * Squid is the result of efforts by numerous individuals from + * the Internet community; see the CONTRIBUTORS file for full + * details. Many organizations have provided support for Squid's + * development; see the SPONSORS file for full details. Squid is + * Copyrighted (C) 2001 by the Regents of the University of + * California; see the COPYRIGHT file for full details. Squid + * incorporates software developed and/or copyrighted by other + * sources; see the CREDITS file for full details. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. + * + */ + +#include "squid.h" +#include "store_asyncufs.h" + +#define _AIO_OPEN 0 +#define _AIO_READ 1 +#define _AIO_WRITE 2 +#define _AIO_CLOSE 3 +#define _AIO_UNLINK 4 +#define _AIO_TRUNCATE 4 +#define _AIO_OPENDIR 5 +#define _AIO_STAT 6 + +typedef struct aio_ctrl_t { + struct aio_ctrl_t *next; + int fd; + int operation; + AIOCB *done_handler; + void *done_handler_data; + aio_result_t result; + char *bufp; + FREE *free_func; + dlink_node node; +} aio_ctrl_t; + +struct { + int open; + int close; + int cancel; + int write; + int read; + int stat; + int unlink; + int check_callback; +} aio_counts; + +typedef struct aio_unlinkq_t { + char *path; + struct aio_unlinkq_t *next; +} aio_unlinkq_t; + +static dlink_list used_list; +static int initialised = 0; +static OBJH aioStats; +static MemPool *aio_ctrl_pool; +static void aioFDWasClosed(int fd); + +static void +aioFDWasClosed(int fd) +{ + if (fd_table[fd].flags.closing) + fd_close(fd); +} + +void +aioInit(void) +{ + if (initialised) + return; + aio_ctrl_pool = memPoolCreate("aio_ctrl", sizeof(aio_ctrl_t)); + cachemgrRegister("aio_counts", "Async IO Function Counters", + aioStats, 0, 1); + initialised = 1; + comm_quick_poll_required(); +} + +void +aioDone(void) +{ + memPoolDestroy(aio_ctrl_pool); + initialised = 0; +} + +void +aioOpen(const char *path, int oflag, mode_t mode, AIOCB * callback, void *callback_data) +{ + aio_ctrl_t *ctrlp; + + assert(initialised); + aio_counts.open++; + ctrlp = memPoolAlloc(aio_ctrl_pool); + ctrlp->fd = -2; + ctrlp->done_handler = callback; + ctrlp->done_handler_data = callback_data; + ctrlp->operation = _AIO_OPEN; + cbdataLock(callback_data); + ctrlp->result.data = ctrlp; + aio_open(path, oflag, mode, &ctrlp->result); + dlinkAdd(ctrlp, &ctrlp->node, &used_list); + return; +} + +void +aioClose(int fd) +{ + aio_ctrl_t *ctrlp; + + assert(initialised); + aio_counts.close++; + aioCancel(fd); + ctrlp = memPoolAlloc(aio_ctrl_pool); + ctrlp->fd = fd; + ctrlp->done_handler = NULL; + ctrlp->done_handler_data = NULL; + ctrlp->operation = _AIO_CLOSE; + ctrlp->result.data = ctrlp; + aio_close(fd, &ctrlp->result); + dlinkAdd(ctrlp, &ctrlp->node, &used_list); + return; +} + +void +aioCancel(int fd) +{ + aio_ctrl_t *curr; + AIOCB *done_handler; + void *their_data; + dlink_node *m, *next; + + assert(initialised); + aio_counts.cancel++; + for (m = used_list.head; m; m = next) { + while (m) { + curr = m->data; + if (curr->fd == fd) + break; + m = m->next; + } + if (m == NULL) + break; + + aio_cancel(&curr->result); + + if ((done_handler = curr->done_handler)) { + their_data = curr->done_handler_data; + curr->done_handler = NULL; + curr->done_handler_data = NULL; + debug(0, 0) ("this be aioCancel\n"); + if (cbdataValid(their_data)) + done_handler(fd, their_data, -2, -2); + cbdataUnlock(their_data); + } + next = m->next; + dlinkDelete(m, &used_list); + memPoolFree(aio_ctrl_pool, curr); + } +} + + +void +aioWrite(int fd, int offset, char *bufp, int len, AIOCB * callback, void *callback_data, FREE * free_func) +{ + aio_ctrl_t *ctrlp; + int seekmode; + + assert(initialised); + aio_counts.write++; + ctrlp = memPoolAlloc(aio_ctrl_pool); + ctrlp->fd = fd; + ctrlp->done_handler = callback; + ctrlp->done_handler_data = callback_data; + ctrlp->operation = _AIO_WRITE; + ctrlp->bufp = bufp; + ctrlp->free_func = free_func; + if (offset >= 0) + seekmode = SEEK_SET; + else { + seekmode = SEEK_END; + offset = 0; + } + cbdataLock(callback_data); + ctrlp->result.data = ctrlp; + aio_write(fd, bufp, len, offset, seekmode, &ctrlp->result); + dlinkAdd(ctrlp, &ctrlp->node, &used_list); +} /* aioWrite */ + + +void +aioRead(int fd, int offset, char *bufp, int len, AIOCB * callback, void *callback_data) +{ + aio_ctrl_t *ctrlp; + int seekmode; + + assert(initialised); + aio_counts.read++; + ctrlp = memPoolAlloc(aio_ctrl_pool); + ctrlp->fd = fd; + ctrlp->done_handler = callback; + ctrlp->done_handler_data = callback_data; + ctrlp->operation = _AIO_READ; + if (offset >= 0) + seekmode = SEEK_SET; + else { + seekmode = SEEK_CUR; + offset = 0; + } + cbdataLock(callback_data); + ctrlp->result.data = ctrlp; + aio_read(fd, bufp, len, offset, seekmode, &ctrlp->result); + dlinkAdd(ctrlp, &ctrlp->node, &used_list); + return; +} /* aioRead */ + +void +aioStat(char *path, struct stat *sb, AIOCB * callback, void *callback_data) +{ + aio_ctrl_t *ctrlp; + + assert(initialised); + aio_counts.stat++; + ctrlp = memPoolAlloc(aio_ctrl_pool); + ctrlp->fd = -2; + ctrlp->done_handler = callback; + ctrlp->done_handler_data = callback_data; + ctrlp->operation = _AIO_STAT; + cbdataLock(callback_data); + ctrlp->result.data = ctrlp; + aio_stat(path, sb, &ctrlp->result); + dlinkAdd(ctrlp, &ctrlp->node, &used_list); + return; +} /* aioStat */ + +void +aioUnlink(const char *path, AIOCB * callback, void *callback_data) +{ + aio_ctrl_t *ctrlp; + assert(initialised); + aio_counts.unlink++; + ctrlp = memPoolAlloc(aio_ctrl_pool); + ctrlp->fd = -2; + ctrlp->done_handler = callback; + ctrlp->done_handler_data = callback_data; + ctrlp->operation = _AIO_UNLINK; + cbdataLock(callback_data); + ctrlp->result.data = ctrlp; + aio_unlink(path, &ctrlp->result); + dlinkAdd(ctrlp, &ctrlp->node, &used_list); +} /* aioUnlink */ + +void +aioTruncate(const char *path, off_t length, AIOCB * callback, void *callback_data) +{ + aio_ctrl_t *ctrlp; + assert(initialised); + aio_counts.unlink++; + ctrlp = memPoolAlloc(aio_ctrl_pool); + ctrlp->fd = -2; + ctrlp->done_handler = callback; + ctrlp->done_handler_data = callback_data; + ctrlp->operation = _AIO_TRUNCATE; + cbdataLock(callback_data); + ctrlp->result.data = ctrlp; + aio_truncate(path, length, &ctrlp->result); + dlinkAdd(ctrlp, &ctrlp->node, &used_list); +} /* aioTruncate */ + + +int +aioCheckCallbacks(SwapDir * SD) +{ + aio_result_t *resultp; + aio_ctrl_t *ctrlp; + AIOCB *done_handler; + void *their_data; + int retval = 0; + + assert(initialised); + aio_counts.check_callback++; + for (;;) { + if ((resultp = aio_poll_done()) == NULL) + break; + ctrlp = (aio_ctrl_t *) resultp->data; + if (ctrlp == NULL) + continue; /* XXX Should not happen */ + dlinkDelete(&ctrlp->node, &used_list); + if ((done_handler = ctrlp->done_handler)) { + their_data = ctrlp->done_handler_data; + ctrlp->done_handler = NULL; + ctrlp->done_handler_data = NULL; + if (cbdataValid(their_data)) { + retval = 1; /* Return that we've actually done some work */ + done_handler(ctrlp->fd, their_data, + ctrlp->result.aio_return, ctrlp->result.aio_errno); + } + cbdataUnlock(their_data); + } + /* free data if requested to aioWrite() */ + if (ctrlp->free_func) + ctrlp->free_func(ctrlp->bufp); + if (ctrlp->operation == _AIO_CLOSE) + aioFDWasClosed(ctrlp->fd); + memPoolFree(aio_ctrl_pool, ctrlp); + } + return retval; +} + +void +aioStats(StoreEntry * sentry) +{ + storeAppendPrintf(sentry, "ASYNC IO Counters:\n"); + storeAppendPrintf(sentry, "open\t%d\n", aio_counts.open); + storeAppendPrintf(sentry, "close\t%d\n", aio_counts.close); + storeAppendPrintf(sentry, "cancel\t%d\n", aio_counts.cancel); + storeAppendPrintf(sentry, "write\t%d\n", aio_counts.write); + storeAppendPrintf(sentry, "read\t%d\n", aio_counts.read); + storeAppendPrintf(sentry, "stat\t%d\n", aio_counts.stat); + storeAppendPrintf(sentry, "unlink\t%d\n", aio_counts.unlink); + storeAppendPrintf(sentry, "check_callback\t%d\n", aio_counts.check_callback); + storeAppendPrintf(sentry, "queue\t%d\n", aio_get_queue_len()); +} + +/* Flush all pending I/O */ +void +aioSync(SwapDir * SD) +{ + if (!initialised) + return; /* nothing to do then */ + /* Flush all pending operations */ + debug(32, 1) ("aioSync: flushing pending I/O operations\n"); + do { + aioCheckCallbacks(SD); + } while (aio_sync()); + debug(32, 1) ("aioSync: done\n"); +} + +int +aioQueueSize(void) +{ + return memPoolInUseCount(aio_ctrl_pool); +} --- /dev/null Wed Feb 14 00:50:05 2007 +++ squid/src/fs/awin32/store_asyncufs.h Wed Feb 14 00:51:28 2007 @@ -0,0 +1,132 @@ +/* + * store_aufs.h + * + * Internal declarations for the aufs routines + */ + +#ifndef __STORE_ASYNCUFS_H__ +#define __STORE_ASYNCUFS_H__ + +#ifdef ASYNC_IO_THREADS +#define NUMTHREADS ASYNC_IO_THREADS +#else +#define NUMTHREADS (Config.cacheSwap.n_configured*16) +#endif + +/* Queue limit where swapouts are deferred (load calculation) */ +#define MAGIC1 (NUMTHREADS*Config.cacheSwap.n_configured*5) +/* Queue limit where swapins are deferred (open/create fails) */ +#define MAGIC2 (NUMTHREADS*Config.cacheSwap.n_configured*20) + +/* Which operations to run async */ +#define ASYNC_OPEN 1 +#define ASYNC_CLOSE 0 +#define ASYNC_CREATE 1 +#define ASYNC_WRITE 0 +#define ASYNC_READ 1 + +struct _aio_result_t { + int aio_return; + int aio_errno; + void *_data; /* Internal housekeeping */ + void *data; /* Available to the caller */ +}; + +typedef struct _aio_result_t aio_result_t; + +typedef void AIOCB(int fd, void *, int aio_return, int aio_errno); + +int aio_cancel(aio_result_t *); +int aio_open(const char *, int, mode_t, aio_result_t *); +int aio_read(int, char *, int, off_t, int, aio_result_t *); +int aio_write(int, char *, int, off_t, int, aio_result_t *); +int aio_close(int, aio_result_t *); +int aio_stat(const char *, struct stat *, aio_result_t *); +int aio_unlink(const char *, aio_result_t *); +int aio_truncate(const char *, off_t length, aio_result_t *); +int aio_opendir(const char *, aio_result_t *); +aio_result_t *aio_poll_done(void); +int aio_operations_pending(void); +int aio_sync(void); +int aio_get_queue_len(void); + +void aioInit(void); +void aioDone(void); +void aioCancel(int); +void aioOpen(const char *, int, mode_t, AIOCB *, void *); +void aioClose(int); +void aioWrite(int, int offset, char *, int size, AIOCB *, void *, FREE *); +void aioRead(int, int offset, char *, int size, AIOCB *, void *); +void aioStat(char *, struct stat *, AIOCB *, void *); +void aioUnlink(const char *, AIOCB *, void *); +void aioTruncate(const char *, off_t length, AIOCB *, void *); +int aioCheckCallbacks(SwapDir *); +void aioSync(SwapDir *); +int aioQueueSize(void); + +struct _aioinfo_t { + int swaplog_fd; + int l1; + int l2; + fileMap *map; + int suggest; +}; + +struct _aiostate_t { + int fd; + struct { + unsigned int close_request:1; + unsigned int reading:1; + unsigned int writing:1; + unsigned int opening:1; + unsigned int write_kicking:1; + unsigned int read_kicking:1; + unsigned int inreaddone:1; + } flags; + const char *read_buf; + link_list *pending_writes; + link_list *pending_reads; +}; + +struct _queued_write { + char *buf; + size_t size; + off_t offset; + FREE *free_func; +}; + +struct _queued_read { + char *buf; + size_t size; + off_t offset; + STRCB *callback; + void *callback_data; +}; + +typedef struct _aioinfo_t aioinfo_t; +typedef struct _aiostate_t aiostate_t; + +/* The aio_state memory pools */ +extern MemPool *aio_state_pool; +extern MemPool *aio_qread_pool; +extern MemPool *aio_qwrite_pool; + +extern void storeAufsDirMapBitReset(SwapDir *, sfileno); +extern int storeAufsDirMapBitAllocate(SwapDir *); + +extern char *storeAufsDirFullPath(SwapDir * SD, sfileno filn, char *fullpath); +extern void storeAufsDirUnlinkFile(SwapDir *, sfileno); +extern void storeAufsDirReplAdd(SwapDir * SD, StoreEntry *); +extern void storeAufsDirReplRemove(StoreEntry *); + +/* + * Store IO stuff + */ +extern STOBJCREATE storeAufsCreate; +extern STOBJOPEN storeAufsOpen; +extern STOBJCLOSE storeAufsClose; +extern STOBJREAD storeAufsRead; +extern STOBJWRITE storeAufsWrite; +extern STOBJUNLINK storeAufsUnlink; + +#endif --- /dev/null Wed Feb 14 00:50:05 2007 +++ squid/src/fs/awin32/store_dir_aufs.c Wed Feb 14 00:51:28 2007 @@ -0,0 +1,1718 @@ + +/* + * $Id: store_dir_aufs.c,v 1.1.2.1 2001/03/15 06:04:19 rbcollins Exp $ + * + * DEBUG: section 47 Store Directory Routines + * AUTHOR: Duane Wessels + * + * SQUID Web Proxy Cache http://www.squid-cache.org/ + * ---------------------------------------------------------- + * + * Squid is the result of efforts by numerous individuals from + * the Internet community; see the CONTRIBUTORS file for full + * details. Many organizations have provided support for Squid's + * development; see the SPONSORS file for full details. Squid is + * Copyrighted (C) 2001 by the Regents of the University of + * California; see the COPYRIGHT file for full details. Squid + * incorporates software developed and/or copyrighted by other + * sources; see the CREDITS file for full details. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. + * + */ + +#include "squid.h" + +#include "store_asyncufs.h" + +#define DefaultLevelOneDirs 16 +#define DefaultLevelTwoDirs 256 +#define STORE_META_BUFSZ 4096 + +typedef struct _RebuildState RebuildState; +struct _RebuildState { + SwapDir *sd; + int n_read; + FILE *log; + int speed; + int curlvl1; + int curlvl2; + struct { + unsigned int need_to_validate:1; + unsigned int clean:1; + unsigned int init:1; + } flags; + int done; + int in_dir; + int fn; + struct dirent *entry; + DIR *td; + char fullpath[SQUID_MAXPATHLEN]; + char fullfilename[SQUID_MAXPATHLEN]; + struct _store_rebuild_data counts; +}; + +static int n_asyncufs_dirs = 0; +static int *asyncufs_dir_index = NULL; +MemPool *aio_state_pool = NULL; +MemPool *aio_qread_pool = NULL; +MemPool *aio_qwrite_pool = NULL; +static int asyncufs_initialised = 0; + +static char *storeAufsDirSwapSubDir(SwapDir *, int subdirn); +static int storeAufsDirCreateDirectory(const char *path, int); +static int storeAufsDirVerifyCacheDirs(SwapDir *); +static int storeAufsDirVerifyDirectory(const char *path); +static void storeAufsDirCreateSwapSubDirs(SwapDir *); +static char *storeAufsDirSwapLogFile(SwapDir *, const char *); +static EVH storeAufsDirRebuildFromDirectory; +static EVH storeAufsDirRebuildFromSwapLog; +static int storeAufsDirGetNextFile(RebuildState *, int *sfileno, int *size); +static StoreEntry *storeAufsDirAddDiskRestore(SwapDir * SD, const cache_key * key, + int file_number, + size_t swap_file_sz, + time_t expires, + time_t timestamp, + time_t lastref, + time_t lastmod, + u_num32 refcount, + u_short flags, + int clean); +static void storeAufsDirRebuild(SwapDir * sd); +static void storeAufsDirCloseTmpSwapLog(SwapDir * sd); +static FILE *storeAufsDirOpenTmpSwapLog(SwapDir *, int *, int *); +static STLOGOPEN storeAufsDirOpenSwapLog; +static STINIT storeAufsDirInit; +static STFREE storeAufsDirFree; +static STLOGCLEANSTART storeAufsDirWriteCleanStart; +static STLOGCLEANNEXTENTRY storeAufsDirCleanLogNextEntry; +static STLOGCLEANWRITE storeAufsDirWriteCleanEntry; +static STLOGCLEANDONE storeAufsDirWriteCleanDone; +static STLOGCLOSE storeAufsDirCloseSwapLog; +static STLOGWRITE storeAufsDirSwapLog; +static STNEWFS storeAufsDirNewfs; +static STDUMP storeAufsDirDump; +static STMAINTAINFS storeAufsDirMaintain; +static STCHECKOBJ storeAufsDirCheckObj; +static STREFOBJ storeAufsDirRefObj; +static STUNREFOBJ storeAufsDirUnrefObj; +static QS rev_int_sort; +static int storeAufsDirClean(int swap_index); +static EVH storeAufsDirCleanEvent; +static int storeAufsDirIs(SwapDir * sd); +static int storeAufsFilenoBelongsHere(int fn, int F0, int F1, int F2); +static int storeAufsCleanupDoubleCheck(SwapDir *, StoreEntry *); +static void storeAufsDirStats(SwapDir *, StoreEntry *); +static void storeAufsDirInitBitmap(SwapDir *); +static int storeAufsDirValidFileno(SwapDir *, sfileno, int); + +/* The MAIN externally visible function */ +STSETUP storeFsSetup_awin32; + +/* + * These functions were ripped straight out of the heart of store_dir.c. + * They assume that the given filenum is on a asyncufs partiton, which may or + * may not be true.. + * XXX this evilness should be tidied up at a later date! + */ + +static int +storeAufsDirMapBitTest(SwapDir * SD, int fn) +{ + sfileno filn = fn; + aioinfo_t *aioinfo; + aioinfo = (aioinfo_t *) SD->fsdata; + return file_map_bit_test(aioinfo->map, filn); +} + +static void +storeAufsDirMapBitSet(SwapDir * SD, int fn) +{ + sfileno filn = fn; + aioinfo_t *aioinfo; + aioinfo = (aioinfo_t *) SD->fsdata; + file_map_bit_set(aioinfo->map, filn); +} + +void +storeAufsDirMapBitReset(SwapDir * SD, int fn) +{ + sfileno filn = fn; + aioinfo_t *aioinfo; + aioinfo = (aioinfo_t *) SD->fsdata; + /* + * We have to test the bit before calling file_map_bit_reset. + * file_map_bit_reset doesn't do bounds checking. It assumes + * filn is a valid file number, but it might not be because + * the map is dynamic in size. Also clearing an already clear + * bit puts the map counter of-of-whack. + */ + if (file_map_bit_test(aioinfo->map, filn)) + file_map_bit_reset(aioinfo->map, filn); +} + +int +storeAufsDirMapBitAllocate(SwapDir * SD) +{ + aioinfo_t *aioinfo = (aioinfo_t *) SD->fsdata; + int fn; + fn = file_map_allocate(aioinfo->map, aioinfo->suggest); + file_map_bit_set(aioinfo->map, fn); + aioinfo->suggest = fn + 1; + return fn; +} + +/* + * Initialise the asyncufs bitmap + * + * If there already is a bitmap, and the numobjects is larger than currently + * configured, we allocate a new bitmap and 'grow' the old one into it. + */ +static void +storeAufsDirInitBitmap(SwapDir * sd) +{ + aioinfo_t *aioinfo = (aioinfo_t *) sd->fsdata; + + if (aioinfo->map == NULL) { + /* First time */ + aioinfo->map = file_map_create(); + } else if (aioinfo->map->max_n_files) { + /* it grew, need to expand */ + /* XXX We don't need it anymore .. */ + } + /* else it shrunk, and we leave the old one in place */ +} + +static char * +storeAufsDirSwapSubDir(SwapDir * sd, int subdirn) +{ + aioinfo_t *aioinfo = (aioinfo_t *) sd->fsdata; + + LOCAL_ARRAY(char, fullfilename, SQUID_MAXPATHLEN); + assert(0 <= subdirn && subdirn < aioinfo->l1); + snprintf(fullfilename, SQUID_MAXPATHLEN, "%s/%02X", sd->path, subdirn); + return fullfilename; +} + +static int +storeAufsDirCreateDirectory(const char *path, int should_exist) +{ + int created = 0; + struct stat st; + getCurrentTime(); + if (0 == stat(path, &st)) { + if (S_ISDIR(st.st_mode)) { + debug(20, should_exist ? 3 : 1) ("%s exists\n", path); + } else { + fatalf("Swap directory %s is not a directory.", path); + } + } else if (0 == mkdir(path, 0755)) { + debug(20, should_exist ? 1 : 3) ("%s created\n", path); + created = 1; + } else { + fatalf("Failed to make swap directory %s: %s", + path, xstrerror()); + } + return created; +} + +static int +storeAufsDirVerifyDirectory(const char *path) +{ + struct stat sb; + if (stat(path, &sb) < 0) { + debug(20, 0) ("%s: %s\n", path, xstrerror()); + return -1; + } + if (S_ISDIR(sb.st_mode) == 0) { + debug(20, 0) ("%s is not a directory\n", path); + return -1; + } + return 0; +} + +/* + * This function is called by storeAufsDirInit(). If this returns < 0, + * then Squid exits, complains about swap directories not + * existing, and instructs the admin to run 'squid -z' + */ +static int +storeAufsDirVerifyCacheDirs(SwapDir * sd) +{ + aioinfo_t *aioinfo = (aioinfo_t *) sd->fsdata; + int j; + const char *path = sd->path; + + if (storeAufsDirVerifyDirectory(path) < 0) + return -1; + for (j = 0; j < aioinfo->l1; j++) { + path = storeAufsDirSwapSubDir(sd, j); + if (storeAufsDirVerifyDirectory(path) < 0) + return -1; + } + return 0; +} + +static void +storeAufsDirCreateSwapSubDirs(SwapDir * sd) +{ + aioinfo_t *aioinfo = (aioinfo_t *) sd->fsdata; + int i, k; + int should_exist; + LOCAL_ARRAY(char, name, MAXPATHLEN); + for (i = 0; i < aioinfo->l1; i++) { + snprintf(name, MAXPATHLEN, "%s/%02X", sd->path, i); + if (storeAufsDirCreateDirectory(name, 0)) + should_exist = 0; + else + should_exist = 1; + debug(47, 1) ("Making directories in %s\n", name); + for (k = 0; k < aioinfo->l2; k++) { + snprintf(name, MAXPATHLEN, "%s/%02X/%02X", sd->path, i, k); + storeAufsDirCreateDirectory(name, should_exist); + } + } +} + +static char * +storeAufsDirSwapLogFile(SwapDir * sd, const char *ext) +{ + LOCAL_ARRAY(char, path, SQUID_MAXPATHLEN); + LOCAL_ARRAY(char, pathtmp, SQUID_MAXPATHLEN); + LOCAL_ARRAY(char, digit, 32); + char *pathtmp2; + if (Config.Log.swap) { + xstrncpy(pathtmp, sd->path, SQUID_MAXPATHLEN - 64); + while (index(pathtmp, '/')) + *index(pathtmp, '/') = '.'; + while (strlen(pathtmp) && pathtmp[strlen(pathtmp) - 1] == '.') + pathtmp[strlen(pathtmp) - 1] = '\0'; + for (pathtmp2 = pathtmp; *pathtmp2 == '.'; pathtmp2++); + snprintf(path, SQUID_MAXPATHLEN - 64, Config.Log.swap, pathtmp2); + if (strncmp(path, Config.Log.swap, SQUID_MAXPATHLEN - 64) == 0) { + strcat(path, "."); + snprintf(digit, 32, "%02d", sd->index); + strncat(path, digit, 3); + } + } else { + xstrncpy(path, sd->path, SQUID_MAXPATHLEN - 64); + strcat(path, "/swap.state"); + } + if (ext) + strncat(path, ext, 16); + return path; +} + +static void +storeAufsDirOpenSwapLog(SwapDir * sd) +{ + aioinfo_t *aioinfo = (aioinfo_t *) sd->fsdata; + char *path; + int fd; + path = storeAufsDirSwapLogFile(sd, NULL); + fd = file_open(path, O_WRONLY | O_CREAT); + if (fd < 0) { + debug(50, 1) ("%s: %s\n", path, xstrerror()); + fatal("storeAufsDirOpenSwapLog: Failed to open swap log."); + } + debug(47, 3) ("Cache Dir #%d log opened on FD %d\n", sd->index, fd); + aioinfo->swaplog_fd = fd; + if (0 == n_asyncufs_dirs) + assert(NULL == asyncufs_dir_index); + n_asyncufs_dirs++; + assert(n_asyncufs_dirs <= Config.cacheSwap.n_configured); +} + +static void +storeAufsDirCloseSwapLog(SwapDir * sd) +{ + aioinfo_t *aioinfo = (aioinfo_t *) sd->fsdata; + if (aioinfo->swaplog_fd < 0) /* not open */ + return; + file_close(aioinfo->swaplog_fd); + debug(47, 3) ("Cache Dir #%d log closed on FD %d\n", + sd->index, aioinfo->swaplog_fd); + aioinfo->swaplog_fd = -1; + n_asyncufs_dirs--; + assert(n_asyncufs_dirs >= 0); + if (0 == n_asyncufs_dirs) + safe_free(asyncufs_dir_index); +} + +static void +storeAufsDirInit(SwapDir * sd) +{ + static int started_clean_event = 0; + static const char *errmsg = + "\tFailed to verify one of the swap directories, Check cache.log\n" + "\tfor details. Run 'squid -z' to create swap directories\n" + "\tif needed, or if running Squid for the first time."; + storeAufsDirInitBitmap(sd); + if (storeAufsDirVerifyCacheDirs(sd) < 0) + fatal(errmsg); + storeAufsDirOpenSwapLog(sd); + storeAufsDirRebuild(sd); + if (!started_clean_event) { + eventAdd("storeDirClean", storeAufsDirCleanEvent, NULL, 15.0, 1); + started_clean_event = 1; + } + (void) storeDirGetBlkSize(sd->path, &sd->fs.blksize); +} + +static void +storeAufsDirRebuildFromDirectory(void *data) +{ + RebuildState *rb = data; + SwapDir *SD = rb->sd; + LOCAL_ARRAY(char, hdr_buf, SM_PAGE_SIZE); + StoreEntry *e = NULL; + StoreEntry tmpe; + cache_key key[MD5_DIGEST_CHARS]; + int sfileno = 0; + int count; + int size; + struct stat sb; + int swap_hdr_len; + int fd = -1; + tlv *tlv_list; + tlv *t; + assert(rb != NULL); + debug(20, 3) ("storeAufsDirRebuildFromDirectory: DIR #%d\n", rb->sd->index); + for (count = 0; count < rb->speed; count++) { + assert(fd == -1); + fd = storeAufsDirGetNextFile(rb, &sfileno, &size); + if (fd == -2) { + debug(20, 1) ("Done scanning %s swaplog (%d entries)\n", + rb->sd->path, rb->n_read); + store_dirs_rebuilding--; + storeAufsDirCloseTmpSwapLog(rb->sd); + storeRebuildComplete(&rb->counts); + cbdataFree(rb); + return; + } else if (fd < 0) { + continue; + } + assert(fd > -1); + /* lets get file stats here */ + if (fstat(fd, &sb) < 0) { + debug(20, 1) ("storeAufsDirRebuildFromDirectory: fstat(FD %d): %s\n", + fd, xstrerror()); + file_close(fd); + store_open_disk_fd--; + fd = -1; + continue; + } + if ((++rb->counts.scancount & 0xFFFF) == 0) + debug(20, 3) (" %s %7d files opened so far.\n", + rb->sd->path, rb->counts.scancount); + debug(20, 9) ("file_in: fd=%d %08X\n", fd, sfileno); + statCounter.syscalls.disk.reads++; + if (read(fd, hdr_buf, SM_PAGE_SIZE) < 0) { + debug(20, 1) ("storeAufsDirRebuildFromDirectory: read(FD %d): %s\n", + fd, xstrerror()); + file_close(fd); + store_open_disk_fd--; + fd = -1; + continue; + } + file_close(fd); + store_open_disk_fd--; + fd = -1; + swap_hdr_len = 0; +#if USE_TRUNCATE + if (sb.st_size == 0) + continue; +#endif + tlv_list = storeSwapMetaUnpack(hdr_buf, &swap_hdr_len); + if (tlv_list == NULL) { + debug(20, 1) ("storeAufsDirRebuildFromDirectory: failed to get meta data\n"); + /* XXX shouldn't this be a call to storeAufsUnlink ? */ + storeAufsDirUnlinkFile(SD, sfileno); + continue; + } + debug(20, 3) ("storeAufsDirRebuildFromDirectory: successful swap meta unpacking\n"); + memset(key, '\0', MD5_DIGEST_CHARS); + memset(&tmpe, '\0', sizeof(StoreEntry)); + for (t = tlv_list; t; t = t->next) { + switch (t->type) { + case STORE_META_KEY: + assert(t->length == MD5_DIGEST_CHARS); + xmemcpy(key, t->value, MD5_DIGEST_CHARS); + break; + case STORE_META_STD: + assert(t->length == STORE_HDR_METASIZE); + xmemcpy(&tmpe.timestamp, t->value, STORE_HDR_METASIZE); + break; + default: + break; + } + } + storeSwapTLVFree(tlv_list); + tlv_list = NULL; + if (storeKeyNull(key)) { + debug(20, 1) ("storeAufsDirRebuildFromDirectory: NULL key\n"); + storeAufsDirUnlinkFile(SD, sfileno); + continue; + } + tmpe.hash.key = key; + /* check sizes */ + if (tmpe.swap_file_sz == 0) { + tmpe.swap_file_sz = sb.st_size; + } else if (tmpe.swap_file_sz == sb.st_size - swap_hdr_len) { + tmpe.swap_file_sz = sb.st_size; + } else if (tmpe.swap_file_sz != sb.st_size) { + debug(20, 1) ("storeAufsDirRebuildFromDirectory: SIZE MISMATCH %d!=%d\n", + tmpe.swap_file_sz, (int) sb.st_size); + storeAufsDirUnlinkFile(SD, sfileno); + continue; + } + if (EBIT_TEST(tmpe.flags, KEY_PRIVATE)) { + storeAufsDirUnlinkFile(SD, sfileno); + rb->counts.badflags++; + continue; + } + e = storeGet(key); + if (e && e->lastref >= tmpe.lastref) { + /* key already exists, current entry is newer */ + /* keep old, ignore new */ + rb->counts.dupcount++; + continue; + } else if (NULL != e) { + /* URL already exists, this swapfile not being used */ + /* junk old, load new */ + storeRelease(e); /* release old entry */ + rb->counts.dupcount++; + } + rb->counts.objcount++; + storeEntryDump(&tmpe, 5); + e = storeAufsDirAddDiskRestore(SD, key, + sfileno, + tmpe.swap_file_sz, + tmpe.expires, + tmpe.timestamp, + tmpe.lastref, + tmpe.lastmod, + tmpe.refcount, /* refcount */ + tmpe.flags, /* flags */ + (int) rb->flags.clean); + storeDirSwapLog(e, SWAP_LOG_ADD); + } + eventAdd("storeRebuild", storeAufsDirRebuildFromDirectory, rb, 0.0, 1); +} + +static void +storeAufsDirRebuildFromSwapLog(void *data) +{ + RebuildState *rb = data; + SwapDir *SD = rb->sd; + StoreEntry *e = NULL; + storeSwapLogData s; + size_t ss = sizeof(storeSwapLogData); + int count; + int used; /* is swapfile already in use? */ + int disk_entry_newer; /* is the log entry newer than current entry? */ + double x; + assert(rb != NULL); + /* load a number of objects per invocation */ + for (count = 0; count < rb->speed; count++) { + if (fread(&s, ss, 1, rb->log) != 1) { + debug(20, 1) ("Done reading %s swaplog (%d entries)\n", + rb->sd->path, rb->n_read); + fclose(rb->log); + rb->log = NULL; + store_dirs_rebuilding--; + storeAufsDirCloseTmpSwapLog(rb->sd); + storeRebuildComplete(&rb->counts); + cbdataFree(rb); + return; + } + rb->n_read++; + if (s.op <= SWAP_LOG_NOP) + continue; + if (s.op >= SWAP_LOG_MAX) + continue; + /* + * BC: during 2.4 development, we changed the way swap file + * numbers are assigned and stored. The high 16 bits used + * to encode the SD index number. There used to be a call + * to storeDirProperFileno here that re-assigned the index + * bits. Now, for backwards compatibility, we just need + * to mask it off. + */ + s.swap_filen &= 0x00FFFFFF; + debug(20, 3) ("storeAufsDirRebuildFromSwapLog: %s %s %08X\n", + swap_log_op_str[(int) s.op], + storeKeyText(s.key), + s.swap_filen); + if (s.op == SWAP_LOG_ADD) { + (void) 0; + } else if (s.op == SWAP_LOG_DEL) { + if ((e = storeGet(s.key)) != NULL) { + /* + * Make sure we don't unlink the file, it might be + * in use by a subsequent entry. Also note that + * we don't have to subtract from store_swap_size + * because adding to store_swap_size happens in + * the cleanup procedure. + */ + storeExpireNow(e); + storeReleaseRequest(e); + if (e->swap_filen > -1) { + storeAufsDirReplRemove(e); + storeAufsDirMapBitReset(SD, e->swap_filen); + e->swap_filen = -1; + e->swap_dirn = -1; + } + storeRelease(e); + rb->counts.objcount--; + rb->counts.cancelcount++; + } + continue; + } else { + x = log(++rb->counts.bad_log_op) / log(10.0); + if (0.0 == x - (double) (int) x) + debug(20, 1) ("WARNING: %d invalid swap log entries found\n", + rb->counts.bad_log_op); + rb->counts.invalid++; + continue; + } + if ((++rb->counts.scancount & 0xFFF) == 0) { + struct stat sb; + if (0 == fstat(fileno(rb->log), &sb)) + storeRebuildProgress(SD->index, + (int) sb.st_size / ss, rb->n_read); + } + if (!storeAufsDirValidFileno(SD, s.swap_filen, 0)) { + rb->counts.invalid++; + continue; + } + if (EBIT_TEST(s.flags, KEY_PRIVATE)) { + rb->counts.badflags++; + continue; + } + e = storeGet(s.key); + used = storeAufsDirMapBitTest(SD, s.swap_filen); + /* If this URL already exists in the cache, does the swap log + * appear to have a newer entry? Compare 'lastref' from the + * swap log to e->lastref. */ + disk_entry_newer = e ? (s.lastref > e->lastref ? 1 : 0) : 0; + if (used && !disk_entry_newer) { + /* log entry is old, ignore it */ + rb->counts.clashcount++; + continue; + } else if (used && e && e->swap_filen == s.swap_filen && e->swap_dirn == SD->index) { + /* swapfile taken, same URL, newer, update meta */ + if (e->store_status == STORE_OK) { + e->lastref = s.timestamp; + e->timestamp = s.timestamp; + e->expires = s.expires; + e->lastmod = s.lastmod; + e->flags = s.flags; + e->refcount += s.refcount; + storeAufsDirUnrefObj(SD, e); + } else { + debug_trap("storeAufsDirRebuildFromSwapLog: bad condition"); + debug(20, 1) ("\tSee %s:%d\n", __FILE__, __LINE__); + } + continue; + } else if (used) { + /* swapfile in use, not by this URL, log entry is newer */ + /* This is sorta bad: the log entry should NOT be newer at this + * point. If the log is dirty, the filesize check should have + * caught this. If the log is clean, there should never be a + * newer entry. */ + debug(20, 1) ("WARNING: newer swaplog entry for dirno %d, fileno %08X\n", + SD->index, s.swap_filen); + /* I'm tempted to remove the swapfile here just to be safe, + * but there is a bad race condition in the NOVM version if + * the swapfile has recently been opened for writing, but + * not yet opened for reading. Because we can't map + * swapfiles back to StoreEntrys, we don't know the state + * of the entry using that file. */ + /* We'll assume the existing entry is valid, probably because + * were in a slow rebuild and the the swap file number got taken + * and the validation procedure hasn't run. */ + assert(rb->flags.need_to_validate); + rb->counts.clashcount++; + continue; + } else if (e && !disk_entry_newer) { + /* key already exists, current entry is newer */ + /* keep old, ignore new */ + rb->counts.dupcount++; + continue; + } else if (e) { + /* key already exists, this swapfile not being used */ + /* junk old, load new */ + storeExpireNow(e); + storeReleaseRequest(e); + if (e->swap_filen > -1) { + storeAufsDirReplRemove(e); + /* Make sure we don't actually unlink the file */ + storeAufsDirMapBitReset(SD, e->swap_filen); + e->swap_filen = -1; + e->swap_dirn = -1; + } + storeRelease(e); + rb->counts.dupcount++; + } else { + /* URL doesnt exist, swapfile not in use */ + /* load new */ + (void) 0; + } + /* update store_swap_size */ + rb->counts.objcount++; + e = storeAufsDirAddDiskRestore(SD, s.key, + s.swap_filen, + s.swap_file_sz, + s.expires, + s.timestamp, + s.lastref, + s.lastmod, + s.refcount, + s.flags, + (int) rb->flags.clean); + storeDirSwapLog(e, SWAP_LOG_ADD); + } + eventAdd("storeRebuild", storeAufsDirRebuildFromSwapLog, rb, 0.0, 1); +} + +static int +storeAufsDirGetNextFile(RebuildState * rb, int *sfileno, int *size) +{ + SwapDir *SD = rb->sd; + aioinfo_t *aioinfo = (aioinfo_t *) SD->fsdata; + int fd = -1; + int used = 0; + int dirs_opened = 0; + debug(20, 3) ("storeAufsDirGetNextFile: flag=%d, %d: /%02X/%02X\n", + rb->flags.init, + rb->sd->index, + rb->curlvl1, + rb->curlvl2); + if (rb->done) + return -2; + while (fd < 0 && rb->done == 0) { + fd = -1; + if (0 == rb->flags.init) { /* initialize, open first file */ + rb->done = 0; + rb->curlvl1 = 0; + rb->curlvl2 = 0; + rb->in_dir = 0; + rb->flags.init = 1; + assert(Config.cacheSwap.n_configured > 0); + } + if (0 == rb->in_dir) { /* we need to read in a new directory */ + snprintf(rb->fullpath, SQUID_MAXPATHLEN, "%s/%02X/%02X", + rb->sd->path, + rb->curlvl1, rb->curlvl2); + if (rb->flags.init && rb->td != NULL) + closedir(rb->td); + rb->td = NULL; + if (dirs_opened) + return -1; + rb->td = opendir(rb->fullpath); + dirs_opened++; + if (rb->td == NULL) { + debug(50, 1) ("storeAufsDirGetNextFile: opendir: %s: %s\n", + rb->fullpath, xstrerror()); + } else { + rb->entry = readdir(rb->td); /* skip . and .. */ + rb->entry = readdir(rb->td); + if (rb->entry == NULL && errno == ENOENT) + debug(20, 1) ("storeAufsDirGetNextFile: directory does not exist!.\n"); + debug(20, 3) ("storeAufsDirGetNextFile: Directory %s\n", rb->fullpath); + } + } + if (rb->td != NULL && (rb->entry = readdir(rb->td)) != NULL) { + rb->in_dir++; + if (sscanf(rb->entry->d_name, "%x", &rb->fn) != 1) { + debug(20, 3) ("storeAufsDirGetNextFile: invalid %s\n", + rb->entry->d_name); + continue; + } + if (!storeAufsFilenoBelongsHere(rb->fn, rb->sd->index, rb->curlvl1, rb->curlvl2)) { + debug(20, 3) ("storeAufsDirGetNextFile: %08X does not belong in %d/%d/%d\n", + rb->fn, rb->sd->index, rb->curlvl1, rb->curlvl2); + continue; + } + used = storeAufsDirMapBitTest(SD, rb->fn); + if (used) { + debug(20, 3) ("storeAufsDirGetNextFile: Locked, continuing with next.\n"); + continue; + } + snprintf(rb->fullfilename, SQUID_MAXPATHLEN, "%s/%s", + rb->fullpath, rb->entry->d_name); + debug(20, 3) ("storeAufsDirGetNextFile: Opening %s\n", rb->fullfilename); + fd = file_open(rb->fullfilename, O_RDONLY); + if (fd < 0) + debug(50, 1) ("storeAufsDirGetNextFile: %s: %s\n", rb->fullfilename, xstrerror()); + else + store_open_disk_fd++; + continue; + } + rb->in_dir = 0; + if (++rb->curlvl2 < aioinfo->l2) + continue; + rb->curlvl2 = 0; + if (++rb->curlvl1 < aioinfo->l1) + continue; + rb->curlvl1 = 0; + rb->done = 1; + } + *sfileno = rb->fn; + return fd; +} + +/* Add a new object to the cache with empty memory copy and pointer to disk + * use to rebuild store from disk. */ +static StoreEntry * +storeAufsDirAddDiskRestore(SwapDir * SD, const cache_key * key, + int file_number, + size_t swap_file_sz, + time_t expires, + time_t timestamp, + time_t lastref, + time_t lastmod, + u_num32 refcount, + u_short flags, + int clean) +{ + StoreEntry *e = NULL; + debug(20, 5) ("storeAufsAddDiskRestore: %s, fileno=%08X\n", storeKeyText(key), file_number); + /* if you call this you'd better be sure file_number is not + * already in use! */ + e = new_StoreEntry(STORE_ENTRY_WITHOUT_MEMOBJ, NULL, NULL); + e->store_status = STORE_OK; + storeSetMemStatus(e, NOT_IN_MEMORY); + e->swap_status = SWAPOUT_DONE; + e->swap_filen = file_number; + e->swap_dirn = SD->index; + e->swap_file_sz = swap_file_sz; + e->lock_count = 0; + e->lastref = lastref; + e->timestamp = timestamp; + e->expires = expires; + e->lastmod = lastmod; + e->refcount = refcount; + e->flags = flags; + EBIT_SET(e->flags, ENTRY_CACHABLE); + EBIT_CLR(e->flags, RELEASE_REQUEST); + EBIT_CLR(e->flags, KEY_PRIVATE); + e->ping_status = PING_NONE; + EBIT_CLR(e->flags, ENTRY_VALIDATED); + storeAufsDirMapBitSet(SD, e->swap_filen); + storeHashInsert(e, key); /* do it after we clear KEY_PRIVATE */ + storeAufsDirReplAdd(SD, e); + return e; +} + +CBDATA_TYPE(RebuildState); + +static void +storeAufsDirRebuild(SwapDir * sd) +{ + RebuildState *rb; + int clean = 0; + int zero = 0; + FILE *fp; + EVH *func = NULL; + CBDATA_INIT_TYPE(RebuildState); + rb = cbdataAlloc(RebuildState); + rb->sd = sd; + rb->speed = opt_foreground_rebuild ? 1 << 30 : 50; + /* + * If the swap.state file exists in the cache_dir, then + * we'll use storeAufsDirRebuildFromSwapLog(), otherwise we'll + * use storeAufsDirRebuildFromDirectory() to open up each file + * and suck in the meta data. + */ + fp = storeAufsDirOpenTmpSwapLog(sd, &clean, &zero); + if (fp == NULL || zero) { + if (fp != NULL) + fclose(fp); + func = storeAufsDirRebuildFromDirectory; + } else { + func = storeAufsDirRebuildFromSwapLog; + rb->log = fp; + rb->flags.clean = (unsigned int) clean; + } + if (!clean) + rb->flags.need_to_validate = 1; + debug(20, 1) ("Rebuilding storage in %s (%s)\n", + sd->path, clean ? "CLEAN" : "DIRTY"); + store_dirs_rebuilding++; + eventAdd("storeRebuild", func, rb, 0.0, 1); +} + +static void +storeAufsDirCloseTmpSwapLog(SwapDir * sd) +{ + aioinfo_t *aioinfo = (aioinfo_t *) sd->fsdata; + char *swaplog_path = xstrdup(storeAufsDirSwapLogFile(sd, NULL)); + char *new_path = xstrdup(storeAufsDirSwapLogFile(sd, ".new")); + int fd; + file_close(aioinfo->swaplog_fd); +#ifdef _SQUID_OS2_ + if (unlink(swaplog_path) < 0) { + debug(50, 0) ("%s: %s\n", swaplog_path, xstrerror()); + fatal("storeAufsDirCloseTmpSwapLog: unlink failed"); + } +#endif + if (xrename(new_path, swaplog_path) < 0) { + fatal("storeAufsDirCloseTmpSwapLog: rename failed"); + } + fd = file_open(swaplog_path, O_WRONLY | O_CREAT); + if (fd < 0) { + debug(50, 1) ("%s: %s\n", swaplog_path, xstrerror()); + fatal("storeAufsDirCloseTmpSwapLog: Failed to open swap log."); + } + safe_free(swaplog_path); + safe_free(new_path); + aioinfo->swaplog_fd = fd; + debug(47, 3) ("Cache Dir #%d log opened on FD %d\n", sd->index, fd); +} + +static FILE * +storeAufsDirOpenTmpSwapLog(SwapDir * sd, int *clean_flag, int *zero_flag) +{ + aioinfo_t *aioinfo = (aioinfo_t *) sd->fsdata; + char *swaplog_path = xstrdup(storeAufsDirSwapLogFile(sd, NULL)); + char *clean_path = xstrdup(storeAufsDirSwapLogFile(sd, ".last-clean")); + char *new_path = xstrdup(storeAufsDirSwapLogFile(sd, ".new")); + struct stat log_sb; + struct stat clean_sb; + FILE *fp; + int fd; + if (stat(swaplog_path, &log_sb) < 0) { + debug(47, 1) ("Cache Dir #%d: No log file\n", sd->index); + safe_free(swaplog_path); + safe_free(clean_path); + safe_free(new_path); + return NULL; + } + *zero_flag = log_sb.st_size == 0 ? 1 : 0; + /* close the existing write-only FD */ + if (aioinfo->swaplog_fd >= 0) + file_close(aioinfo->swaplog_fd); + /* open a write-only FD for the new log */ + fd = file_open(new_path, O_WRONLY | O_CREAT | O_TRUNC); + if (fd < 0) { + debug(50, 1) ("%s: %s\n", new_path, xstrerror()); + fatal("storeDirOpenTmpSwapLog: Failed to open swap log."); + } + aioinfo->swaplog_fd = fd; + /* open a read-only stream of the old log */ + fp = fopen(swaplog_path, "r"); + if (fp == NULL) { + debug(50, 0) ("%s: %s\n", swaplog_path, xstrerror()); + fatal("Failed to open swap log for reading"); + } + memset(&clean_sb, '\0', sizeof(struct stat)); + if (stat(clean_path, &clean_sb) < 0) + *clean_flag = 0; + else if (clean_sb.st_mtime < log_sb.st_mtime) + *clean_flag = 0; + else + *clean_flag = 1; + safeunlink(clean_path, 1); + safe_free(swaplog_path); + safe_free(clean_path); + safe_free(new_path); + return fp; +} + +struct _clean_state { + char *cur; + char *new; + char *cln; + char *outbuf; + off_t outbuf_offset; + int fd; + RemovalPolicyWalker *walker; +}; + +#define CLEAN_BUF_SZ 16384 +/* + * Begin the process to write clean cache state. For AUFS this means + * opening some log files and allocating write buffers. Return 0 if + * we succeed, and assign the 'func' and 'data' return pointers. + */ +static int +storeAufsDirWriteCleanStart(SwapDir * sd) +{ + struct _clean_state *state = xcalloc(1, sizeof(*state)); + struct stat sb; + sd->log.clean.write = NULL; + sd->log.clean.state = NULL; + state->new = xstrdup(storeAufsDirSwapLogFile(sd, ".clean")); + state->fd = file_open(state->new, O_WRONLY | O_CREAT | O_TRUNC); + if (state->fd < 0) { + xfree(state->new); + xfree(state); + return -1; + } + state->cur = xstrdup(storeAufsDirSwapLogFile(sd, NULL)); + state->cln = xstrdup(storeAufsDirSwapLogFile(sd, ".last-clean")); + state->outbuf = xcalloc(CLEAN_BUF_SZ, 1); + state->outbuf_offset = 0; + state->walker = sd->repl->WalkInit(sd->repl); + unlink(state->new); + unlink(state->cln); + debug(20, 3) ("storeDirWriteCleanLogs: opened %s, FD %d\n", + state->new, state->fd); +#if HAVE_FCHMOD + if (stat(state->cur, &sb) == 0) + fchmod(state->fd, sb.st_mode); +#endif + sd->log.clean.write = storeAufsDirWriteCleanEntry; + sd->log.clean.state = state; + return 0; +} + +/* + * Get the next entry that is a candidate for clean log writing + */ +const StoreEntry * +storeAufsDirCleanLogNextEntry(SwapDir * sd) +{ + const StoreEntry *entry = NULL; + struct _clean_state *state = sd->log.clean.state; + if (state->walker) + entry = state->walker->Next(state->walker); + return entry; +} + +/* + * "write" an entry to the clean log file. + */ +static void +storeAufsDirWriteCleanEntry(SwapDir * sd, const StoreEntry * e) +{ + storeSwapLogData s; + static size_t ss = sizeof(storeSwapLogData); + struct _clean_state *state = sd->log.clean.state; + memset(&s, '\0', ss); + s.op = (char) SWAP_LOG_ADD; + s.swap_filen = e->swap_filen; + s.timestamp = e->timestamp; + s.lastref = e->lastref; + s.expires = e->expires; + s.lastmod = e->lastmod; + s.swap_file_sz = e->swap_file_sz; + s.refcount = e->refcount; + s.flags = e->flags; + xmemcpy(&s.key, e->hash.key, MD5_DIGEST_CHARS); + xmemcpy(state->outbuf + state->outbuf_offset, &s, ss); + state->outbuf_offset += ss; + /* buffered write */ + if (state->outbuf_offset + ss > CLEAN_BUF_SZ) { + if (write(state->fd, state->outbuf, state->outbuf_offset) < 0) { + debug(50, 0) ("storeDirWriteCleanLogs: %s: write: %s\n", + state->new, xstrerror()); + debug(20, 0) ("storeDirWriteCleanLogs: Current swap logfile not replaced.\n"); + file_close(state->fd); + state->fd = -1; + unlink(state->new); + safe_free(state); + sd->log.clean.state = NULL; + sd->log.clean.write = NULL; + } + state->outbuf_offset = 0; + } +} + +static void +storeAufsDirWriteCleanDone(SwapDir * sd) +{ + struct _clean_state *state = sd->log.clean.state; + if (NULL == state) + return; + if (state->fd < 0) + return; + state->walker->Done(state->walker); + if (write(state->fd, state->outbuf, state->outbuf_offset) < 0) { + debug(50, 0) ("storeDirWriteCleanLogs: %s: write: %s\n", + state->new, xstrerror()); + debug(20, 0) ("storeDirWriteCleanLogs: Current swap logfile " + "not replaced.\n"); + file_close(state->fd); + state->fd = -1; + unlink(state->new); + } + safe_free(state->outbuf); + /* + * You can't rename open files on Microsoft "operating systems" + * so we have to close before renaming. + */ + storeAufsDirCloseSwapLog(sd); + /* rename */ + if (state->fd >= 0) { +#ifdef _SQUID_OS2_ + file_close(state->fd); + state->fd = -1; + if (unlink(cur) < 0) + debug(50, 0) ("storeDirWriteCleanLogs: unlinkd failed: %s, %s\n", + xstrerror(), cur); +#endif + xrename(state->new, state->cur); + } + /* touch a timestamp file if we're not still validating */ + if (store_dirs_rebuilding) + (void) 0; + else if (state->fd < 0) + (void) 0; + else + file_close(file_open(state->cln, O_WRONLY | O_CREAT | O_TRUNC)); + /* close */ + safe_free(state->cur); + safe_free(state->new); + safe_free(state->cln); + if (state->fd >= 0) + file_close(state->fd); + state->fd = -1; + safe_free(state); + sd->log.clean.state = NULL; + sd->log.clean.write = NULL; +} + +static void +storeSwapLogDataFree(void *s) +{ + memFree(s, MEM_SWAP_LOG_DATA); +} + +static void +storeAufsDirSwapLog(const SwapDir * sd, const StoreEntry * e, int op) +{ + aioinfo_t *aioinfo = (aioinfo_t *) sd->fsdata; + storeSwapLogData *s = memAllocate(MEM_SWAP_LOG_DATA); + s->op = (char) op; + s->swap_filen = e->swap_filen; + s->timestamp = e->timestamp; + s->lastref = e->lastref; + s->expires = e->expires; + s->lastmod = e->lastmod; + s->swap_file_sz = e->swap_file_sz; + s->refcount = e->refcount; + s->flags = e->flags; + xmemcpy(s->key, e->hash.key, MD5_DIGEST_CHARS); + file_write(aioinfo->swaplog_fd, + -1, + s, + sizeof(storeSwapLogData), + NULL, + NULL, + (FREE *) storeSwapLogDataFree); +} + +static void +storeAufsDirNewfs(SwapDir * sd) +{ + debug(47, 3) ("Creating swap space in %s\n", sd->path); + storeAufsDirCreateDirectory(sd->path, 0); + storeAufsDirCreateSwapSubDirs(sd); +} + +static int +rev_int_sort(const void *A, const void *B) +{ + const int *i1 = A; + const int *i2 = B; + return *i2 - *i1; +} + +static int +storeAufsDirClean(int swap_index) +{ + DIR *dp = NULL; + struct dirent *de = NULL; + LOCAL_ARRAY(char, p1, MAXPATHLEN + 1); + LOCAL_ARRAY(char, p2, MAXPATHLEN + 1); +#if USE_TRUNCATE + struct stat sb; +#endif + int files[20]; + int swapfileno; + int fn; /* same as swapfileno, but with dirn bits set */ + int n = 0; + int k = 0; + int N0, N1, N2; + int D0, D1, D2; + SwapDir *SD; + aioinfo_t *aioinfo; + N0 = n_asyncufs_dirs; + D0 = asyncufs_dir_index[swap_index % N0]; + SD = &Config.cacheSwap.swapDirs[D0]; + aioinfo = (aioinfo_t *) SD->fsdata; + N1 = aioinfo->l1; + D1 = (swap_index / N0) % N1; + N2 = aioinfo->l2; + D2 = ((swap_index / N0) / N1) % N2; + snprintf(p1, SQUID_MAXPATHLEN, "%s/%02X/%02X", + Config.cacheSwap.swapDirs[D0].path, D1, D2); + debug(36, 3) ("storeDirClean: Cleaning directory %s\n", p1); + dp = opendir(p1); + if (dp == NULL) { + if (errno == ENOENT) { + debug(36, 0) ("storeDirClean: WARNING: Creating %s\n", p1); + if (mkdir(p1, 0777) == 0) + return 0; + } + debug(50, 0) ("storeDirClean: %s: %s\n", p1, xstrerror()); + safeunlink(p1, 1); + return 0; + } + while ((de = readdir(dp)) != NULL && k < 20) { + if (sscanf(de->d_name, "%X", &swapfileno) != 1) + continue; + fn = swapfileno; /* XXX should remove this cruft ! */ + if (storeAufsDirValidFileno(SD, fn, 1)) + if (storeAufsDirMapBitTest(SD, fn)) + if (storeAufsFilenoBelongsHere(fn, D0, D1, D2)) + continue; +#if USE_TRUNCATE + if (!stat(de->d_name, &sb)) + if (sb.st_size == 0) + continue; +#endif + files[k++] = swapfileno; + } + closedir(dp); + if (k == 0) + return 0; + qsort(files, k, sizeof(int), rev_int_sort); + if (k > 10) + k = 10; + for (n = 0; n < k; n++) { + debug(36, 3) ("storeDirClean: Cleaning file %08X\n", files[n]); + snprintf(p2, MAXPATHLEN + 1, "%s/%08X", p1, files[n]); +#if USE_TRUNCATE + truncate(p2, 0); +#else + safeunlink(p2, 0); +#endif + statCounter.swap.files_cleaned++; + } + debug(36, 3) ("Cleaned %d unused files from %s\n", k, p1); + return k; +} + +static void +storeAufsDirCleanEvent(void *unused) +{ + static int swap_index = 0; + int i; + int j = 0; + int n = 0; + /* + * Assert that there are AUFS cache_dirs configured, otherwise + * we should never be called. + */ + assert(n_asyncufs_dirs); + if (NULL == asyncufs_dir_index) { + SwapDir *sd; + aioinfo_t *aioinfo; + /* + * Initialize the little array that translates AUFS cache_dir + * number into the Config.cacheSwap.swapDirs array index. + */ + asyncufs_dir_index = xcalloc(n_asyncufs_dirs, sizeof(*asyncufs_dir_index)); + for (i = 0, n = 0; i < Config.cacheSwap.n_configured; i++) { + sd = &Config.cacheSwap.swapDirs[i]; + if (!storeAufsDirIs(sd)) + continue; + asyncufs_dir_index[n++] = i; + aioinfo = (aioinfo_t *) sd->fsdata; + j += (aioinfo->l1 * aioinfo->l2); + } + assert(n == n_asyncufs_dirs); + /* + * Start the storeAufsDirClean() swap_index with a random + * value. j equals the total number of AUFS level 2 + * swap directories + */ + swap_index = (int) (squid_random() % j); + } + if (0 == store_dirs_rebuilding) { + n = storeAufsDirClean(swap_index); + swap_index++; + } + eventAdd("storeDirClean", storeAufsDirCleanEvent, NULL, + 15.0 * exp(-0.25 * n), 1); +} + +static int +storeAufsDirIs(SwapDir * sd) +{ + if (strncmp(sd->type, "awin32", 3) == 0) + return 1; + return 0; +} + +/* + * Does swapfile number 'fn' belong in cachedir #F0, + * level1 dir #F1, level2 dir #F2? + */ +static int +storeAufsFilenoBelongsHere(int fn, int F0, int F1, int F2) +{ + int D1, D2; + int L1, L2; + int filn = fn; + aioinfo_t *aioinfo; + assert(F0 < Config.cacheSwap.n_configured); + aioinfo = (aioinfo_t *) Config.cacheSwap.swapDirs[F0].fsdata; + L1 = aioinfo->l1; + L2 = aioinfo->l2; + D1 = ((filn / L2) / L2) % L1; + if (F1 != D1) + return 0; + D2 = (filn / L2) % L2; + if (F2 != D2) + return 0; + return 1; +} + +int +storeAufsDirValidFileno(SwapDir * SD, sfileno filn, int flag) +{ + aioinfo_t *aioinfo = (aioinfo_t *) SD->fsdata; + if (filn < 0) + return 0; + /* + * If flag is set it means out-of-range file number should + * be considered invalid. + */ + if (flag) + if (filn > aioinfo->map->max_n_files) + return 0; + return 1; +} + +void +storeAufsDirMaintain(SwapDir * SD) +{ + StoreEntry *e = NULL; + int removed = 0; + int max_scan; + int max_remove; + double f; + RemovalPurgeWalker *walker; + /* We can't delete objects while rebuilding swap */ + if (store_dirs_rebuilding) { + return; + } else { + f = (double) (SD->cur_size - SD->low_size) / (SD->max_size - SD->low_size); + f = f < 0.0 ? 0.0 : f > 1.0 ? 1.0 : f; + max_scan = (int) (f * 400.0 + 100.0); + max_remove = (int) (f * 70.0 + 10.0); + /* + * This is kinda cheap, but so we need this priority hack? + */ + } + debug(20, 3) ("storeMaintainSwapSpace: f=%f, max_scan=%d, max_remove=%d\n", + f, max_scan, max_remove); + walker = SD->repl->PurgeInit(SD->repl, max_scan); + while (1) { + if (SD->cur_size < SD->low_size) + break; + if (removed >= max_remove) + break; + e = walker->Next(walker); + if (!e) + break; /* no more objects */ + removed++; + storeRelease(e); + } + walker->Done(walker); + debug(20, (removed ? 2 : 3)) ("storeUfsDirMaintain: %s removed %d/%d f=%.03f max_scan=%d\n", + SD->path, removed, max_remove, f, max_scan); +} + +/* + * storeAufsDirCheckObj + * + * This routine is called by storeDirSelectSwapDir to see if the given + * object is able to be stored on this filesystem. AUFS filesystems will + * happily store anything as long as the LRU time isn't too small. + */ +int +storeAufsDirCheckObj(SwapDir * SD, const StoreEntry * e) +{ + int loadav; + int ql; + +#if OLD_UNUSED_CODE + if (storeAufsDirExpiredReferenceAge(SD) < 300) { + debug(20, 3) ("storeAufsDirCheckObj: NO: LRU Age = %d\n", + storeAufsDirExpiredReferenceAge(SD)); + /* store_check_cachable_hist.no.lru_age_too_low++; */ + return -1; + } +#endif + ql = aioQueueSize(); + if (ql == 0) + loadav = 0; + loadav = ql * 1000 / MAGIC1; + debug(41, 9) ("storeAufsDirCheckObj: load=%d\n", loadav); + return loadav; +} + +/* + * storeAufsDirRefObj + * + * This routine is called whenever an object is referenced, so we can + * maintain replacement information within the storage fs. + */ +void +storeAufsDirRefObj(SwapDir * SD, StoreEntry * e) +{ + debug(1, 3) ("storeAufsDirRefObj: referencing %p %d/%d\n", e, e->swap_dirn, + e->swap_filen); + if (SD->repl->Referenced) + SD->repl->Referenced(SD->repl, e, &e->repl); +} + +/* + * storeAufsDirUnrefObj + * This routine is called whenever the last reference to an object is + * removed, to maintain replacement information within the storage fs. + */ +void +storeAufsDirUnrefObj(SwapDir * SD, StoreEntry * e) +{ + debug(1, 3) ("storeAufsDirUnrefObj: referencing %p %d/%d\n", e, e->swap_dirn, + e->swap_filen); + if (SD->repl->Dereferenced) + SD->repl->Dereferenced(SD->repl, e, &e->repl); +} + +/* + * storeAufsDirUnlinkFile + * + * This routine unlinks a file and pulls it out of the bitmap. + * It used to be in storeAufsUnlink(), however an interface change + * forced this bit of code here. Eeek. + */ +void +storeAufsDirUnlinkFile(SwapDir * SD, sfileno f) +{ + debug(79, 3) ("storeAufsDirUnlinkFile: unlinking fileno %08X\n", f); + /* storeAufsDirMapBitReset(SD, f); */ +#if USE_TRUNCATE_NOT_UNLINK + aioTruncate(storeAufsDirFullPath(SD, f, NULL), NULL, NULL); +#else + aioUnlink(storeAufsDirFullPath(SD, f, NULL), NULL, NULL); +#endif +} + +/* + * Add and remove the given StoreEntry from the replacement policy in + * use. + */ + +void +storeAufsDirReplAdd(SwapDir * SD, StoreEntry * e) +{ + debug(20, 4) ("storeAufsDirReplAdd: added node %p to dir %d\n", e, + SD->index); + SD->repl->Add(SD->repl, e, &e->repl); +} + + +void +storeAufsDirReplRemove(StoreEntry * e) +{ + SwapDir *SD = INDEXSD(e->swap_dirn); + debug(20, 4) ("storeAufsDirReplRemove: remove node %p from dir %d\n", e, + SD->index); + SD->repl->Remove(SD->repl, e, &e->repl); +} + + + +/* ========== LOCAL FUNCTIONS ABOVE, GLOBAL FUNCTIONS BELOW ========== */ + +void +storeAufsDirStats(SwapDir * SD, StoreEntry * sentry) +{ + aioinfo_t *aioinfo = SD->fsdata; + int totl_kb = 0; + int free_kb = 0; + int totl_in = 0; + int free_in = 0; + int x; + storeAppendPrintf(sentry, "First level subdirectories: %d\n", aioinfo->l1); + storeAppendPrintf(sentry, "Second level subdirectories: %d\n", aioinfo->l2); + storeAppendPrintf(sentry, "Maximum Size: %d KB\n", SD->max_size); + storeAppendPrintf(sentry, "Current Size: %d KB\n", SD->cur_size); + storeAppendPrintf(sentry, "Percent Used: %0.2f%%\n", + 100.0 * SD->cur_size / SD->max_size); + storeAppendPrintf(sentry, "Filemap bits in use: %d of %d (%d%%)\n", + aioinfo->map->n_files_in_map, aioinfo->map->max_n_files, + percent(aioinfo->map->n_files_in_map, aioinfo->map->max_n_files)); + x = storeDirGetUFSStats(SD->path, &totl_kb, &free_kb, &totl_in, &free_in); + if (0 == x) { + storeAppendPrintf(sentry, "Filesystem Space in use: %d/%d KB (%d%%)\n", + totl_kb - free_kb, + totl_kb, + percent(totl_kb - free_kb, totl_kb)); + storeAppendPrintf(sentry, "Filesystem Inodes in use: %d/%d (%d%%)\n", + totl_in - free_in, + totl_in, + percent(totl_in - free_in, totl_in)); + } + storeAppendPrintf(sentry, "Flags:"); + if (SD->flags.selected) + storeAppendPrintf(sentry, " SELECTED"); + if (SD->flags.read_only) + storeAppendPrintf(sentry, " READ-ONLY"); + storeAppendPrintf(sentry, "\n"); +#if OLD_UNUSED_CODE +#if !HEAP_REPLACEMENT + storeAppendPrintf(sentry, "LRU Expiration Age: %6.2f days\n", + (double) storeAufsDirExpiredReferenceAge(SD) / 86400.0); +#else + storeAppendPrintf(sentry, "Storage Replacement Threshold:\t%f\n", + heap_peepminkey(sd.repl.heap.heap)); +#endif +#endif /* OLD_UNUSED_CODE */ +} + +static struct cache_dir_option options[] = +{ +#if NOT_YET_DONE + {"L1", storeAufsDirParseL1}, + {"L2", storeAufsDirParseL2}, +#endif + {NULL, NULL} +}; + +/* + * storeAufsDirReconfigure + * + * This routine is called when the given swapdir needs reconfiguring + */ +static void +storeAufsDirReconfigure(SwapDir * sd, int index, char *path) +{ + int i; + int size; + int l1; + int l2; + + i = GetInteger(); + size = i << 10; /* Mbytes to kbytes */ + if (size <= 0) + fatal("storeAufsDirReconfigure: invalid size value"); + i = GetInteger(); + l1 = i; + if (l1 <= 0) + fatal("storeAufsDirReconfigure: invalid level 1 directories value"); + i = GetInteger(); + l2 = i; + if (l2 <= 0) + fatal("storeAufsDirReconfigure: invalid level 2 directories value"); + + /* just reconfigure it */ + if (size == sd->max_size) + debug(3, 1) ("Cache dir '%s' size remains unchanged at %d KB\n", + path, size); + else + debug(3, 1) ("Cache dir '%s' size changed to %d KB\n", + path, size); + sd->max_size = size; + + parse_cachedir_options(sd, options, 0); + + return; +} + +void +storeAufsDirDump(StoreEntry * entry, const char *name, SwapDir * s) +{ + aioinfo_t *aioinfo = (aioinfo_t *) s->fsdata; + storeAppendPrintf(entry, "%s %s %s %d %d %d\n", + name, + "awin32", + s->path, + s->max_size >> 10, + aioinfo->l1, + aioinfo->l2); +} + +/* + * Only "free" the filesystem specific stuff here + */ +static void +storeAufsDirFree(SwapDir * s) +{ + aioinfo_t *aioinfo = (aioinfo_t *) s->fsdata; + if (aioinfo->swaplog_fd > -1) { + file_close(aioinfo->swaplog_fd); + aioinfo->swaplog_fd = -1; + } + filemapFreeMemory(aioinfo->map); + xfree(aioinfo); + s->fsdata = NULL; /* Will aid debugging... */ +} + +char * +storeAufsDirFullPath(SwapDir * SD, sfileno filn, char *fullpath) +{ + LOCAL_ARRAY(char, fullfilename, SQUID_MAXPATHLEN); + aioinfo_t *aioinfo = (aioinfo_t *) SD->fsdata; + int L1 = aioinfo->l1; + int L2 = aioinfo->l2; + if (!fullpath) + fullpath = fullfilename; + fullpath[0] = '\0'; + snprintf(fullpath, SQUID_MAXPATHLEN, "%s/%02X/%02X/%08X", + SD->path, + ((filn / L2) / L2) % L1, + (filn / L2) % L2, + filn); + return fullpath; +} + +/* + * storeAufsCleanupDoubleCheck + * + * This is called by storeCleanup() if -S was given on the command line. + */ +static int +storeAufsCleanupDoubleCheck(SwapDir * sd, StoreEntry * e) +{ + struct stat sb; + if (stat(storeAufsDirFullPath(sd, e->swap_filen, NULL), &sb) < 0) { + debug(20, 0) ("storeAufsCleanupDoubleCheck: MISSING SWAP FILE\n"); + debug(20, 0) ("storeAufsCleanupDoubleCheck: FILENO %08X\n", e->swap_filen); + debug(20, 0) ("storeAufsCleanupDoubleCheck: PATH %s\n", + storeAufsDirFullPath(sd, e->swap_filen, NULL)); + storeEntryDump(e, 0); + return -1; + } + if (e->swap_file_sz != sb.st_size) { + debug(20, 0) ("storeAufsCleanupDoubleCheck: SIZE MISMATCH\n"); + debug(20, 0) ("storeAufsCleanupDoubleCheck: FILENO %08X\n", e->swap_filen); + debug(20, 0) ("storeAufsCleanupDoubleCheck: PATH %s\n", + storeAufsDirFullPath(sd, e->swap_filen, NULL)); + debug(20, 0) ("storeAufsCleanupDoubleCheck: ENTRY SIZE: %d, FILE SIZE: %d\n", + e->swap_file_sz, (int) sb.st_size); + storeEntryDump(e, 0); + return -1; + } + return 0; +} + +/* + * storeAufsDirParse * + * Called when a *new* fs is being setup. + */ +static void +storeAufsDirParse(SwapDir * sd, int index, char *path) +{ + int i; + int size; + int l1; + int l2; + aioinfo_t *aioinfo; + + i = GetInteger(); + size = i << 10; /* Mbytes to kbytes */ + if (size <= 0) + fatal("storeAufsDirParse: invalid size value"); + i = GetInteger(); + l1 = i; + if (l1 <= 0) + fatal("storeAufsDirParse: invalid level 1 directories value"); + i = GetInteger(); + l2 = i; + if (l2 <= 0) + fatal("storeAufsDirParse: invalid level 2 directories value"); + + aioinfo = xmalloc(sizeof(aioinfo_t)); + if (aioinfo == NULL) + fatal("storeAufsDirParse: couldn't xmalloc() aioinfo_t!\n"); + + sd->index = index; + sd->path = xstrdup(path); + sd->max_size = size; + sd->fsdata = aioinfo; + aioinfo->l1 = l1; + aioinfo->l2 = l2; + aioinfo->swaplog_fd = -1; + aioinfo->map = NULL; /* Debugging purposes */ + aioinfo->suggest = 0; + sd->init = storeAufsDirInit; + sd->newfs = storeAufsDirNewfs; + sd->dump = storeAufsDirDump; + sd->freefs = storeAufsDirFree; + sd->dblcheck = storeAufsCleanupDoubleCheck; + sd->statfs = storeAufsDirStats; + sd->maintainfs = storeAufsDirMaintain; + sd->checkobj = storeAufsDirCheckObj; + sd->refobj = storeAufsDirRefObj; + sd->unrefobj = storeAufsDirUnrefObj; + sd->callback = aioCheckCallbacks; + sd->sync = aioSync; + sd->obj.create = storeAufsCreate; + sd->obj.open = storeAufsOpen; + sd->obj.close = storeAufsClose; + sd->obj.read = storeAufsRead; + sd->obj.write = storeAufsWrite; + sd->obj.unlink = storeAufsUnlink; + sd->log.open = storeAufsDirOpenSwapLog; + sd->log.close = storeAufsDirCloseSwapLog; + sd->log.write = storeAufsDirSwapLog; + sd->log.clean.start = storeAufsDirWriteCleanStart; + sd->log.clean.nextentry = storeAufsDirCleanLogNextEntry; + sd->log.clean.done = storeAufsDirWriteCleanDone; + + parse_cachedir_options(sd, options, 0); + + /* Initialise replacement policy stuff */ + sd->repl = createRemovalPolicy(Config.replPolicy); +} + +/* + * Initial setup / end destruction + */ +static void +storeAufsDirDone(void) +{ + aioDone(); + memPoolDestroy(aio_state_pool); + memPoolDestroy(aio_qread_pool); + memPoolDestroy(aio_qwrite_pool); + asyncufs_initialised = 0; +} + +void +storeFsSetup_awin32(storefs_entry_t * storefs) +{ + assert(!asyncufs_initialised); + storefs->parsefunc = storeAufsDirParse; + storefs->reconfigurefunc = storeAufsDirReconfigure; + storefs->donefunc = storeAufsDirDone; + aio_state_pool = memPoolCreate("AUFS IO State data", sizeof(aiostate_t)); + aio_qread_pool = memPoolCreate("AUFS Queued read data", + sizeof(queued_read)); + aio_qwrite_pool = memPoolCreate("AUFS Queued write data", + sizeof(queued_write)); + + asyncufs_initialised = 1; + aioInit(); +} --- /dev/null Wed Feb 14 00:50:05 2007 +++ squid/src/fs/awin32/store_io_aufs.c Wed Feb 14 00:51:28 2007 @@ -0,0 +1,445 @@ + +/* + * DEBUG 78 + */ + +#include "squid.h" +#include "store_asyncufs.h" + +#if ASYNC_READ +static AIOCB storeAufsReadDone; +#else +static DRCB storeAufsReadDone; +#endif +#if ASYNC_WRITE +static AIOCB storeAufsWriteDone; +#else +static DWCB storeAufsWriteDone; +#endif +static void storeAufsIOCallback(storeIOState * sio, int errflag); +static AIOCB storeAufsOpenDone; +static int storeAufsSomethingPending(storeIOState *); +static int storeAufsKickWriteQueue(storeIOState * sio); +static CBDUNL storeAufsIOFreeEntry; + +CBDATA_TYPE(storeIOState); + +/* === PUBLIC =========================================================== */ + +/* open for reading */ +storeIOState * +storeAufsOpen(SwapDir * SD, StoreEntry * e, STFNCB * file_callback, + STIOCB * callback, void *callback_data) +{ + sfileno f = e->swap_filen; + char *path = storeAufsDirFullPath(SD, f, NULL); + storeIOState *sio; +#if !ASYNC_OPEN + int fd; +#endif + debug(78, 3) ("storeAufsOpen: fileno %08X\n", f); + /* + * we should detect some 'too many files open' condition and return + * NULL here. + */ +#ifdef MAGIC2 + if (aioQueueSize() > MAGIC2) + return NULL; +#endif +#if !ASYNC_OPEN + fd = file_open(path, O_RDONLY); + if (fd < 0) { + debug(78, 3) ("storeAufsOpen: got failude (%d)\n", errno); + return NULL; + } +#endif + CBDATA_INIT_TYPE_FREECB(storeIOState, storeAufsIOFreeEntry); + sio = cbdataAlloc(storeIOState); + sio->fsstate = memPoolAlloc(aio_state_pool); + ((aiostate_t *) (sio->fsstate))->fd = -1; + ((aiostate_t *) (sio->fsstate))->flags.opening = 1; + sio->swap_filen = f; + sio->swap_dirn = SD->index; + sio->mode = O_RDONLY; + sio->callback = callback; + sio->callback_data = callback_data; + sio->e = e; + cbdataLock(callback_data); + Opening_FD++; +#if ASYNC_OPEN + aioOpen(path, O_RDONLY, 0644, storeAufsOpenDone, sio); +#else + storeAufsOpenDone(fd, sio, fd, 0); +#endif + store_open_disk_fd++; + return sio; +} + +/* open for creating */ +storeIOState * +storeAufsCreate(SwapDir * SD, StoreEntry * e, STFNCB * file_callback, STIOCB * callback, void *callback_data) +{ + char *path; + storeIOState *sio; + sfileno filn; + sdirno dirn; +#if !ASYNC_CREATE + int fd; +#endif + + /* Allocate a number */ + dirn = SD->index; + filn = storeAufsDirMapBitAllocate(SD); + path = storeAufsDirFullPath(SD, filn, NULL); + + debug(78, 3) ("storeAufsCreate: fileno %08X\n", filn); + /* + * we should detect some 'too many files open' condition and return + * NULL here. + */ +#ifdef MAGIC2 + if (aioQueueSize() > MAGIC2) + return NULL; +#endif +#if !ASYNC_CREATE + fd = file_open(path, O_WRONLY | O_CREAT | O_TRUNC); + if (fd < 0) { + debug(78, 3) ("storeAufsCreate: got failude (%d)\n", errno); + return NULL; + } +#endif + CBDATA_INIT_TYPE_FREECB(storeIOState, storeAufsIOFreeEntry); + sio = cbdataAlloc(storeIOState); + sio->fsstate = memPoolAlloc(aio_state_pool); + ((aiostate_t *) (sio->fsstate))->fd = -1; + ((aiostate_t *) (sio->fsstate))->flags.opening = 1; + sio->swap_filen = filn; + sio->swap_dirn = dirn; + sio->mode = O_WRONLY; + sio->callback = callback; + sio->callback_data = callback_data; + sio->e = (StoreEntry *) e; + cbdataLock(callback_data); + Opening_FD++; +#if ASYNC_CREATE + aioOpen(path, O_WRONLY | O_CREAT | O_TRUNC, 0644, storeAufsOpenDone, sio); +#else + storeAufsOpenDone(fd, sio, fd, 0); +#endif + store_open_disk_fd++; + + /* now insert into the replacement policy */ + storeAufsDirReplAdd(SD, e); + return sio; + +} + + + +/* Close */ +void +storeAufsClose(SwapDir * SD, storeIOState * sio) +{ + aiostate_t *aiostate = (aiostate_t *) sio->fsstate; + debug(78, 3) ("storeAufsClose: dirno %d, fileno %08X, FD %d\n", + sio->swap_dirn, sio->swap_filen, aiostate->fd); + if (storeAufsSomethingPending(sio)) { + aiostate->flags.close_request = 1; + return; + } + storeAufsIOCallback(sio, DISK_OK); +} + + +/* Read */ +void +storeAufsRead(SwapDir * SD, storeIOState * sio, char *buf, size_t size, off_t offset, STRCB * callback, void *callback_data) +{ + aiostate_t *aiostate = (aiostate_t *) sio->fsstate; + assert(sio->read.callback == NULL); + assert(sio->read.callback_data == NULL); + assert(!aiostate->flags.reading); + if (aiostate->fd < 0) { + struct _queued_read *q; + debug(78, 3) ("storeAufsRead: queueing read because FD < 0\n"); + assert(aiostate->flags.opening); + assert(aiostate->pending_reads == NULL); + q = memPoolAlloc(aio_qread_pool); + q->buf = buf; + q->size = size; + q->offset = offset; + q->callback = callback; + q->callback_data = callback_data; + linklistPush(&(aiostate->pending_reads), q); + return; + } + sio->read.callback = callback; + sio->read.callback_data = callback_data; + aiostate->read_buf = buf; + cbdataLock(callback_data); + debug(78, 3) ("storeAufsRead: dirno %d, fileno %08X, FD %d\n", + sio->swap_dirn, sio->swap_filen, aiostate->fd); + sio->offset = offset; + aiostate->flags.reading = 1; +#if ASYNC_READ + aioRead(aiostate->fd, offset, buf, size, storeAufsReadDone, sio); +#else + file_read(aiostate->fd, offset, buf, size, storeAufsReadDone, sio); +#endif +} + + +/* Write */ +void +storeAufsWrite(SwapDir * SD, storeIOState * sio, char *buf, size_t size, off_t offset, FREE * free_func) +{ + aiostate_t *aiostate = (aiostate_t *) sio->fsstate; + debug(78, 3) ("storeAufsWrite: dirno %d, fileno %08X, FD %d\n", + sio->swap_dirn, sio->swap_filen, aiostate->fd); + if (aiostate->fd < 0) { + /* disk file not opened yet */ + struct _queued_write *q; + assert(aiostate->flags.opening); + q = memPoolAlloc(aio_qwrite_pool); + q->buf = buf; + q->size = size; + q->offset = offset; + q->free_func = free_func; + linklistPush(&(aiostate->pending_writes), q); + return; + } +#if ASYNC_WRITE + if (aiostate->flags.writing) { + struct _queued_write *q; + debug(78, 3) ("storeAufsWrite: queuing write\n"); + q = memPoolAlloc(aio_qwrite_pool); + q->buf = buf; + q->size = size; + q->offset = offset; + q->free_func = free_func; + linklistPush(&(aiostate->pending_writes), q); + return; + } + aiostate->flags.writing = 1; + aioWrite(aiostate->fd, offset, buf, size, storeAufsWriteDone, sio, + free_func); +#else + file_write(aiostate->fd, offset, buf, size, storeAufsWriteDone, sio, + free_func); +#endif +} + +/* Unlink */ +void +storeAufsUnlink(SwapDir * SD, StoreEntry * e) +{ + debug(78, 3) ("storeAufsUnlink: dirno %d, fileno %08X\n", SD->index, e->swap_filen); + storeAufsDirReplRemove(e); + storeAufsDirMapBitReset(SD, e->swap_filen); + storeAufsDirUnlinkFile(SD, e->swap_filen); +} + +/* === STATIC =========================================================== */ + +static int +storeAufsKickWriteQueue(storeIOState * sio) +{ + aiostate_t *aiostate = (aiostate_t *) sio->fsstate; + struct _queued_write *q = linklistShift(&aiostate->pending_writes); + if (NULL == q) + return 0; + debug(78, 3) ("storeAufsKickWriteQueue: writing queued chunk of %d bytes\n", + q->size); + storeAufsWrite(INDEXSD(sio->swap_dirn), sio, q->buf, q->size, q->offset, q->free_func); + memPoolFree(aio_qwrite_pool, q); + return 1; +} + +static int +storeAufsKickReadQueue(storeIOState * sio) +{ + aiostate_t *aiostate = (aiostate_t *) sio->fsstate; + struct _queued_read *q = linklistShift(&(aiostate->pending_reads)); + if (NULL == q) + return 0; + debug(78, 3) ("storeAufsKickReadQueue: reading queued request of %d bytes\n", + q->size); + storeAufsRead(INDEXSD(sio->swap_dirn), sio, q->buf, q->size, q->offset, q->callback, q->callback_data); + memPoolFree(aio_qread_pool, q); + return 1; +} + +static void +storeAufsOpenDone(int unused, void *my_data, int fd, int errflag) +{ + storeIOState *sio = my_data; + aiostate_t *aiostate = (aiostate_t *) sio->fsstate; + debug(78, 3) ("storeAufsOpenDone: FD %d, errflag %d\n", fd, errflag); + Opening_FD--; + aiostate->flags.opening = 0; + if (errflag || fd < 0) { + errno = errflag; + debug(78, 0) ("storeAufsOpenDone: %s\n", xstrerror()); + debug(78, 1) ("\t%s\n", storeAufsDirFullPath(INDEXSD(sio->swap_dirn), sio->swap_filen, NULL)); + storeAufsIOCallback(sio, DISK_ERROR); + return; + } + aiostate->fd = fd; + commSetCloseOnExec(fd); + fd_open(fd, FD_FILE, storeAufsDirFullPath(INDEXSD(sio->swap_dirn), sio->swap_filen, NULL)); + if (sio->mode == O_WRONLY) + storeAufsKickWriteQueue(sio); + else if (sio->mode == O_RDONLY) + storeAufsKickReadQueue(sio); + debug(78, 3) ("storeAufsOpenDone: exiting\n"); +} + +#if ASYNC_READ +static void +storeAufsReadDone(int fd, void *my_data, int len, int errflag) +#else +static void +storeAufsReadDone(int fd, int errflag, size_t len, void *my_data) +#endif +{ + storeIOState *sio = my_data; + aiostate_t *aiostate = (aiostate_t *) sio->fsstate; + STRCB *callback = sio->read.callback; + void *their_data = sio->read.callback_data; + ssize_t rlen; + debug(78, 3) ("storeAufsReadDone: dirno %d, fileno %08X, FD %d, len %d\n", + sio->swap_dirn, sio->swap_filen, fd, len); + aiostate->flags.inreaddone = 1; + aiostate->flags.reading = 0; + if (errflag) { + debug(78, 3) ("storeAufsReadDone: got failure (%d)\n", errflag); + rlen = -1; + } else { + rlen = (ssize_t) len; + sio->offset += len; + } +#if ASYNC_READ + /* translate errflag from errno to Squid disk error */ + errno = errflag; + if (errflag) + errflag = DISK_ERROR; + else + errflag = DISK_OK; +#else + if (errflag == DISK_EOF) + errflag = DISK_OK; /* EOF is signalled by len == 0, not errors... */ +#endif + assert(callback); + assert(their_data); + sio->read.callback = NULL; + sio->read.callback_data = NULL; + if (cbdataValid(their_data)) + callback(their_data, aiostate->read_buf, rlen); + cbdataUnlock(their_data); + aiostate->flags.inreaddone = 0; + if (aiostate->flags.close_request) + storeAufsIOCallback(sio, errflag); +} + +#if ASYNC_WRITE +static void +storeAufsWriteDone(int fd, void *my_data, int len, int errflag) +#else +static void +storeAufsWriteDone(int fd, int errflag, size_t len, void *my_data) +#endif +{ + static int loop_detect = 0; + storeIOState *sio = my_data; + aiostate_t *aiostate = (aiostate_t *) sio->fsstate; + debug(78, 3) ("storeAufsWriteDone: dirno %d, fileno %08X, FD %d, len %d, err=%d\n", + sio->swap_dirn, sio->swap_filen, fd, len, errflag); +#if ASYNC_WRITE + /* Translate from errno to Squid disk error */ + errno = errflag; + if (errflag) + errflag = errno == ENOSP ? DISK_NO_SPACE_LEFT : DISK_ERROR; + else + errflag = DISK_OK; +#endif + assert(++loop_detect < 10); + aiostate->flags.writing = 0; + if (errflag) { + debug(78, 0) ("storeAufsWriteDone: got failure (%d)\n", errflag); + storeAufsIOCallback(sio, errflag); + loop_detect--; + return; + } + sio->offset += len; +#if ASYNC_WRITE + if (!storeAufsKickWriteQueue(sio)) + 0; + else if (aiostate->flags.close_request) + storeAufsIOCallback(sio, errflag); +#else + if (!aiostate->flags.write_kicking) { + aiostate->flags.write_kicking = 1; + while (storeAufsKickWriteQueue(sio)) + (void) 0; + aiostate->flags.write_kicking = 0; + if (aiostate->flags.close_request) + storeAufsIOCallback(sio, errflag); + } +#endif + loop_detect--; +} + +static void +storeAufsIOCallback(storeIOState * sio, int errflag) +{ + STIOCB *callback = sio->callback; + void *their_data = sio->callback_data; + aiostate_t *aiostate = (aiostate_t *) sio->fsstate; + int fd = aiostate->fd; + debug(78, 3) ("storeAufsIOCallback: errflag=%d\n", errflag); + sio->callback = NULL; + sio->callback_data = NULL; + debug(78, 3) ("%s:%d\n", __FILE__, __LINE__); + if (callback) + if (NULL == their_data || cbdataValid(their_data)) + callback(their_data, errflag, sio); + debug(78, 3) ("%s:%d\n", __FILE__, __LINE__); + cbdataUnlock(their_data); + aiostate->fd = -1; + cbdataFree(sio); + if (fd < 0) + return; + debug(78, 3) ("%s:%d\n", __FILE__, __LINE__); + aioClose(fd); + fd_close(fd); + store_open_disk_fd--; + debug(78, 3) ("%s:%d\n", __FILE__, __LINE__); +} + + +static int +storeAufsSomethingPending(storeIOState * sio) +{ + aiostate_t *aiostate = (aiostate_t *) sio->fsstate; + if (aiostate->flags.reading) + return 1; + if (aiostate->flags.writing) + return 1; + if (aiostate->flags.opening) + return 1; + if (aiostate->flags.inreaddone) + return 1; + return 0; +} + + +/* + * Clean up references from the SIO before it gets released. + * The actuall SIO is managed by cbdata so we do not need + * to bother with that. + */ +static void +storeAufsIOFreeEntry(void *sio) +{ + memPoolFree(aio_state_pool, ((storeIOState *) sio)->fsstate); +}