--------------------- PatchSet 1659 Date: 2001/02/25 20:55:22 Author: hno Branch: eventio Tag: (none) Log: * COMMNEWCB calling corrected to handle errors (fh might be non-existant then) * fh_read_t cleaned up. As the IOBuf is created on the fly when there is data available, there is no need to keep a IOBuf entry in the filehandle.. also offset is removed, as there never can be a existing IOBuf where the read starts.. * ncomm_close implemented. This only indicates EOF, it does not actually close the socket. The socket is closed when the last reference to the filehandle goes away after ncomm_close. * ncomm_write now handles queued writes * ncomm_closed, to query if a filehandle has been closed or not * corrected error handling and filehandle reference count in ncomm_connect * ncomm_test extended with memPool report output on port 55555 Members: src/ncomm.c:1.1.2.9->1.1.2.10 src/ncomm_internals.h:1.1.2.3->1.1.2.4 src/ncomm_test.c:1.1.2.4->1.1.2.5 src/protos.h:1.18.8.8->1.18.8.9 Index: squid/src/ncomm.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/Attic/ncomm.c,v retrieving revision 1.1.2.9 retrieving revision 1.1.2.10 diff -u -r1.1.2.9 -r1.1.2.10 --- squid/src/ncomm.c 25 Feb 2001 18:20:18 -0000 1.1.2.9 +++ squid/src/ncomm.c 25 Feb 2001 20:55:22 -0000 1.1.2.10 @@ -152,8 +152,12 @@ cbdataUnreference(cb->cb.io.buf); break; case COMMCB_new: - if (valid && cb->cb.new.callback) - cb->cb.new.callback(fh, cb->error, &fh->local, &fh->peer, fh->addrsize, cb->cbdata); + if (valid && cb->cb.new.callback) { + if (fh) + cb->cb.new.callback(fh, cb->error, &fh->local, &fh->peer, fh->addrsize, cb->cbdata); + else + cb->cb.new.callback(NULL, cb->error, NULL, NULL, 0, cb->cbdata); + } break; case COMMCB_close: /* NOT YET DONE */ @@ -169,15 +173,16 @@ } static void -comm_read_callback(filehandle *fh, fh_read_t *fhr, int len, int error) +comm_read_callback(filehandle *fh, fh_read_t *fhr, void *data, int len, int error) { COMMIOCB *callback = fhr->callback; void *cbdata; - IOBuf *buf; + IOBuf *buf = NULL; fhr->callback = NULL; + if (len > 0) + buf = IOBufCreate(data, len); cbdata = cbdataEatReference(fhr->cbdata); - buf = cbdataEatReference(fhr->buf); - comm_io_callback(fh, buf, fhr->offset, len, error, callback, cbdata); + comm_io_callback(fh, buf, 0, len, error, callback, cbdata); cbdataUnreference(cbdata); cbdataUnreference(buf); } @@ -192,6 +197,15 @@ { } +static void +comm_do_close(filehandle *fh) +{ + if (!fh->shutdown) { + shutdown(fh->fd, 1); + fh->shutdown = 1; + } +} + static int comm_do_read(filehandle *fh) { @@ -202,17 +216,13 @@ const ssize_t len = read(fd, inbuf, fhr->size); statCounter.syscalls.sock.reads++; fh_bytes(fh, len, FD_READ); - if (len > 0) { + if (len >= 0) { /* We got some data to deliver to the application */ - fhr->buf = IOBufCreate(inbuf, len); - comm_read_callback(fh, fhr, len, 0); + comm_read_callback(fh, fhr, inbuf, len, 0); return 1; - } else if (len == 0) { - /* EOF */ - comm_read_callback(fh, fhr, 0, 0); } else if (len == -1 && !ncommIgnoreErrno(errno)) { /* Error */ - comm_read_callback(fh, fhr, len, errno); + comm_read_callback(fh, fhr, NULL, len, errno); return 1; } /* Try again */ @@ -256,12 +266,16 @@ static int comm_do_write(filehandle *fh) { - fh_write_t *fhw = fh->write; + fh_write_t *fhw; int fd = fh->fd; /* Write out some more data */ - const void *buf = (void *)(*fhw->buf->bufp + fhw->done); - const size_t size = fhw->size - fhw->done; + const void *buf; + size_t size = fhw->size - fhw->done; ssize_t len; +writemore: + fhw = fh->write; + buf = (void *)(*fhw->buf->bufp + fhw->done); + size = fhw->size - fhw->done; assert(size > 0); len = write(fd, buf, size); statCounter.syscalls.sock.writes++; @@ -271,11 +285,18 @@ fh->write = fhw->next; comm_write_callback(fh, fhw, (ssize_t)fhw->size, 0); cbdataFree(fhw); + if (fh->write) + goto writemore; + /* OK. No more data. We are done */ + if (fh->closed) + comm_do_close(fh); return 1; } if (len == -1 && !ncommIgnoreErrno(errno)) { /* error */ comm_write_callback(fh, fhw, fhw->done, errno); + if (fh->closed) + comm_do_close(fh); return 1; } /* try again */ @@ -448,31 +469,46 @@ } } +static void +destroy_fh_write(void *data) +{ + fh_write_t *fhw = data; + cbdataUnreference(fhw->buf); +} + void ncomm_module_init(void) { CBDATA_INIT_TYPE(comm_callback_entry_t); CBDATA_INIT_TYPE_FREECB(filehandle, destroy_filehandle); CBDATA_INIT_TYPE_FREECB(IOBuf, IOBufIsFreed); - CBDATA_INIT_TYPE(fh_write_t); + CBDATA_INIT_TYPE_FREECB(fh_write_t, destroy_fh_write); } void ncomm_module_shutdown(void) { } -int + +void ncomm_close(filehandle *fh) { - /* NOT YET IMPLEMENTED */ - return 1; + fh->closed = 1; + if (!fh->write) + comm_do_close(fh); + cbdataFree(fh); } int +ncomm_closed(filehandle *fh) +{ + return fh->closed; +} + +void ncomm_abort(filehandle *fh) { /* NOT YET IMPLEMENTED */ - return 1; } filehandle * @@ -589,10 +625,20 @@ cbdataLock(fh); done = comm_do_connect(fh); - if (!done) + if (done) { + /* Ouch.. an error occurred while setting up the connection */ + cbdataUnlock(fh); + return NULL; + } + + if (!done) { comm_register_for_write_event(fh, comm_do_connect); - else if (!cbdataValid(fh)) - cbdataUnreference(fh); + } else if (!cbdataValid(fh)) { + /* Ouch.. an error occurred */ + cbdataUnlock(fh); + return NULL; + } + cbdataUnlock(fh); return fh; } Index: squid/src/ncomm_internals.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/Attic/ncomm_internals.h,v retrieving revision 1.1.2.3 retrieving revision 1.1.2.4 diff -u -r1.1.2.3 -r1.1.2.4 --- squid/src/ncomm_internals.h 25 Feb 2001 12:22:06 -0000 1.1.2.3 +++ squid/src/ncomm_internals.h 25 Feb 2001 20:55:22 -0000 1.1.2.4 @@ -15,8 +15,6 @@ /* internal filehandle structure */ typedef struct { - IOBuf *buf; - size_t offset; size_t size; COMMIOCB *callback; void *cbdata; @@ -46,9 +44,7 @@ struct sockaddr local; COMMEVENTREAD *read_handler; COMMEVENTWRITE *write_handler; + unsigned int closed:1; + unsigned int shutdown:1; }; -IOBuf *IOBufAlloc(size_t size); -IOBuf *IOBufCreate(void *data, size_t size); -IOBuf *IOBufCreateFromMemBuf(MemBuf *mb); - Index: squid/src/ncomm_test.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/Attic/ncomm_test.c,v retrieving revision 1.1.2.4 retrieving revision 1.1.2.5 diff -u -r1.1.2.4 -r1.1.2.5 --- squid/src/ncomm_test.c 25 Feb 2001 18:20:18 -0000 1.1.2.4 +++ squid/src/ncomm_test.c 25 Feb 2001 20:55:22 -0000 1.1.2.5 @@ -23,8 +23,14 @@ { connection *conn = cbdata; printf("Got %d server bytes\n", len); - ncomm_write(conn->client, buf, NULL, NULL); - ncomm_read(fh, 65536, got_server_data, conn); + if (len > 0) { + ncomm_write(conn->client, buf, NULL, NULL); + ncomm_read(fh, 65536, got_server_data, conn); + } else { + ncomm_close(conn->client); + if (ncomm_closed(fh)) + cbdataFree(conn); + } } static void @@ -32,8 +38,14 @@ { connection *conn = cbdata; printf("Got %d client bytes\n", len); - ncomm_write(conn->server, buf, NULL, NULL); - ncomm_read(fh, 65536, got_client_data, conn); + if (len > 0) { + ncomm_write(conn->server, buf, NULL, NULL); + ncomm_read(fh, 65536, got_client_data, conn); + } else { + ncomm_close(conn->server); + if (ncomm_closed(fh)) + cbdataFree(conn); + } } static void @@ -44,7 +56,7 @@ errno = error; fatal("server connect failed"); } - conn->server = fh; + conn->server = cbdataReference(fh); ncomm_read(conn->client, 65536, got_client_data, conn); ncomm_read(conn->server, 65536, got_server_data, conn); } @@ -56,29 +68,51 @@ struct sockaddr_in remote; printf("New connection!\n"); conn = cbdataAlloc(connection); - conn->client = fh; + conn->client = cbdataReference(fh); memset(&remote, 0, sizeof remote); remote.sin_family = AF_INET; - remote.sin_port = htons(21); + remote.sin_port = htons(80); inet_aton("127.0.0.1", &remote.sin_addr); ncomm_connect(SOCK_STREAM, 0, NULL, (struct sockaddr *)&remote, sizeof remote, new_server_connection, conn); } +static void +new_debug_connection(filehandle *fh, int error, struct sockaddr *local, struct sockaddr *remote_client, int addrsize, void *cbdata) +{ + if (fh) { + IOBuf *buf = memReport(); + ncomm_write(fh, buf, NULL, NULL); + cbdataUnreference(buf); + ncomm_close(fh); + } +} + +static void destroy_connection(void *cbdata) +{ + connection *conn = cbdata; + + cbdataUnreference(conn->server); + cbdataUnreference(conn->client); +} + int main(int argc, char **argv) { struct sockaddr_in me; - filehandle *lfh; + filehandle *lfh, *debugfh; cbdataInit(); memInitModule(); memBuf_module_init(); ncomm_module_init(); - CBDATA_INIT_TYPE(connection); + CBDATA_INIT_TYPE_FREECB(connection, destroy_connection); memset(&me, 0, sizeof(me)); me.sin_family = AF_INET; me.sin_port = htons(45678); lfh = ncomm_listen(SOCK_STREAM, 0, (struct sockaddr *)&me, sizeof(me), 8192, new_client_connection, NULL); + me.sin_port = htons(55555); + debugfh = ncomm_listen(SOCK_STREAM, 0, (struct sockaddr *)&me, sizeof(me), 8192, new_debug_connection, NULL); + while(1) ncomm_handle_events(1000); Index: squid/src/protos.h =================================================================== RCS file: /cvsroot/squid-sf//squid/src/protos.h,v retrieving revision 1.18.8.8 retrieving revision 1.18.8.9 diff -u -r1.18.8.8 -r1.18.8.9 --- squid/src/protos.h 25 Feb 2001 01:15:32 -0000 1.18.8.8 +++ squid/src/protos.h 25 Feb 2001 20:55:22 -0000 1.18.8.9 @@ -1,6 +1,6 @@ /* - * $Id: protos.h,v 1.18.8.8 2001/02/25 01:15:32 hno Exp $ + * $Id: protos.h,v 1.18.8.9 2001/02/25 20:55:22 hno Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -851,10 +851,7 @@ extern int memPoolInUseCount(const MemPool * pool); extern size_t memPoolInUseSize(const MemPool * pool); extern int memPoolUsedCount(const MemPool * pool); -extern void memPoolReport(const MemPool * pool, StoreEntry * e); - -/* Mem */ -extern void memReport(StoreEntry * e); +extern IOBuf *memReport(void); extern int stmemFreeDataUpto(mem_hdr *, int); extern void stmemAppend(mem_hdr *, const char *, int); @@ -1305,8 +1302,9 @@ void ncomm_module_init(void); void ncomm_module_shutdown(void); void ncomm_handle_events(int timeout); -int ncomm_close(filehandle *fh); -int ncomm_abort(filehandle *fh); +void ncomm_close(filehandle *fh); +int ncomm_closed(filehandle *fh); +void ncomm_abort(filehandle *fh); filehandle * ncomm_listen(int sock_type, int proto, struct sockaddr *where, int addrsize, int backlog, COMMNEWCB *callback, void *cbdata); filehandle * ncomm_accept(int sock_type, int proto, struct sockaddr *where, struct sockaddr *from, int addrsize, COMMNEWCB callback, void *cbdata); filehandle * ncomm_connect(int sock_type, int proto, const struct sockaddr *local, const struct sockaddr *remote, int addrsize, COMMNEWCB *callback, void *cbdata); @@ -1315,3 +1313,7 @@ void ncomm_write_fragment(filehandle *fh, IOBuf *buf, size_t offset, size_t len, COMMIOCB *callback, void *cbdata); void ncomm_write_mbuf(filehandle *fh, MemBuf mb, COMMIOCB *callback, void *cbdata); int ncomm_add_close_handler(filehandle *fh, COMMCLOSECB *handler, void *cbdata); +IOBuf *IOBufAlloc(size_t size); +IOBuf *IOBufCreate(void *data, size_t size); +IOBuf *IOBufCreateFromMemBuf(MemBuf *mb); +