--------------------- PatchSet 1656 Date: 2001/02/25 18:20:18 Author: hno Branch: eventio Tag: (none) Log: ncomm_write and ncomm_connect debugged * ncomm_write broke when IOBuf was implemented * ncomm_connect forgot about non-blocking I/O, and some other errors * NULL callbacks was not handled Members: src/ncomm.c:1.1.2.8->1.1.2.9 src/ncomm_test.c:1.1.2.3->1.1.2.4 Index: squid/src/ncomm.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/Attic/ncomm.c,v retrieving revision 1.1.2.8 retrieving revision 1.1.2.9 diff -u -r1.1.2.8 -r1.1.2.9 --- squid/src/ncomm.c 25 Feb 2001 12:22:06 -0000 1.1.2.8 +++ squid/src/ncomm.c 25 Feb 2001 18:20:18 -0000 1.1.2.9 @@ -147,14 +147,13 @@ switch(cb->type) { case COMMCB_io: /* NOT YET DONE */ - if (valid) + if (valid && cb->cb.io.callback) cb->cb.io.callback(fh, cb->cb.io.buf, cb->cb.io.offset, cb->cb.io.len, cb->error, cb->cbdata); cbdataUnreference(cb->cb.io.buf); break; case COMMCB_new: - if (valid) + if (valid && cb->cb.new.callback) cb->cb.new.callback(fh, cb->error, &fh->local, &fh->peer, fh->addrsize, cb->cbdata); - /* NOT YET DONE */ break; case COMMCB_close: /* NOT YET DONE */ @@ -203,13 +202,15 @@ const ssize_t len = read(fd, inbuf, fhr->size); statCounter.syscalls.sock.reads++; fh_bytes(fh, len, FD_READ); - if (len >= 0) { + if (len > 0) { /* We got some data to deliver to the application */ fhr->buf = IOBufCreate(inbuf, len); comm_read_callback(fh, fhr, len, 0); return 1; - } - if (len == -1 && !ncommIgnoreErrno(errno)) { + } else if (len == 0) { + /* EOF */ + comm_read_callback(fh, fhr, 0, 0); + } else if (len == -1 && !ncommIgnoreErrno(errno)) { /* Error */ comm_read_callback(fh, fhr, len, errno); return 1; @@ -258,7 +259,7 @@ fh_write_t *fhw = fh->write; int fd = fh->fd; /* Write out some more data */ - const void *buf = (void *)((char *)fhw->buf + fhw->done); + const void *buf = (void *)(*fhw->buf->bufp + fhw->done); const size_t size = fhw->size - fhw->done; ssize_t len; assert(size > 0); @@ -408,6 +409,8 @@ if (fd < 0) return NULL; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)); + ncommSetCloseOnExec(fd); + ncommSetNonBlocking(fd); rc = bind(fd, where, addrsize); statCounter.syscalls.sock.binds++; if (rc < 0) { @@ -431,17 +434,25 @@ fh->connect.callback = callback; fh->connect.cbdata = cbdataReference(cbdata); fh_open(fh, "Listening socket"); - ncommSetCloseOnExec(fd); - ncommSetNonBlocking(fd); comm_register_for_read_event(fh, comm_do_listen); return fh; } +static void +destroy_filehandle(void *data) +{ + filehandle *fh = data; + if (fh->fd) { + close(fh->fd); + fh->fd = -1; + } +} + void ncomm_module_init(void) { CBDATA_INIT_TYPE(comm_callback_entry_t); - CBDATA_INIT_TYPE(filehandle); + CBDATA_INIT_TYPE_FREECB(filehandle, destroy_filehandle); CBDATA_INIT_TYPE_FREECB(IOBuf, IOBufIsFreed); CBDATA_INIT_TYPE(fh_write_t); } @@ -521,9 +532,16 @@ } if (done) { socklen_t addrsize = sizeof(fh->local); + err = errno; /* just in case */ getsockname(fh->fd, &fh->local, &addrsize); if (addrsize < fh->addrsize) fh->addrsize = addrsize; + if (err) { + comm_new_callback(fh, err, fh->connect.callback, fh->connect.cbdata); + cbdataFree(fh); + } else { + comm_new_callback(fh, 0, fh->connect.callback, fh->connect.cbdata); + } } return done; } @@ -545,8 +563,12 @@ debug(5, 1) ("comm_connet: Cannot create new socket: %s\n", xstrerror()); return NULL; } + ncommSetCloseOnExec(sock); + ncommSetNonBlocking(sock); /* Bind to local endpoint */ if (local) { + int on = 1; + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)); x = bind(sock, local, addrsize); statCounter.syscalls.sock.binds++; if (x != 0) @@ -564,10 +586,13 @@ fh->connect.cbdata = cbdataReference(cbdata); /* Establish connection. */ + cbdataLock(fh); done = comm_do_connect(fh); if (!done) comm_register_for_write_event(fh, comm_do_connect); + else if (!cbdataValid(fh)) + cbdataUnreference(fh); return fh; } Index: squid/src/ncomm_test.c =================================================================== RCS file: /cvsroot/squid-sf//squid/src/Attic/ncomm_test.c,v retrieving revision 1.1.2.3 retrieving revision 1.1.2.4 diff -u -r1.1.2.3 -r1.1.2.4 --- squid/src/ncomm_test.c 25 Feb 2001 12:22:06 -0000 1.1.2.3 +++ squid/src/ncomm_test.c 25 Feb 2001 18:20:18 -0000 1.1.2.4 @@ -5,24 +5,63 @@ { } +typedef struct { + filehandle *client; + filehandle *server; +} connection; + +CBDATA_TYPE(connection); + void fatal(const char *message) { - fprintf(stderr, "FATAL: %s\n", message); + fprintf(stderr, "FATAL: %s (%s)\n", message, strerror(errno)); abort(); } static void -got_data(filehandle *fh, IOBuf *buf, int offset, int len, int error, void *data) +got_server_data(filehandle *fh, IOBuf *buf, int offset, int len, int error, void *cbdata) +{ + connection *conn = cbdata; + printf("Got %d server bytes\n", len); + ncomm_write(conn->client, buf, NULL, NULL); + ncomm_read(fh, 65536, got_server_data, conn); +} + +static void +got_client_data(filehandle *fh, IOBuf *buf, int offset, int len, int error, void *cbdata) { - printf("Got %d bytes\n", len); - ncomm_read(fh, 65536, got_data, NULL); + connection *conn = cbdata; + printf("Got %d client bytes\n", len); + ncomm_write(conn->server, buf, NULL, NULL); + ncomm_read(fh, 65536, got_client_data, conn); } static void -new_connection(filehandle *fh, int error, struct sockaddr *local, struct sockaddr *remote, void *cbdata) +new_server_connection(filehandle *fh, int error, struct sockaddr *local, struct sockaddr *remote, int addrsize, void *cbdata) { + connection *conn = cbdata; + if (error) { + errno = error; + fatal("server connect failed"); + } + conn->server = fh; + ncomm_read(conn->client, 65536, got_client_data, conn); + ncomm_read(conn->server, 65536, got_server_data, conn); +} + +static void +new_client_connection(filehandle *fh, int error, struct sockaddr *local, struct sockaddr *remote_client, int addrsize, void *cbdata) +{ + connection *conn; + struct sockaddr_in remote; printf("New connection!\n"); - ncomm_read(fh, 65536, got_data, NULL); + conn = cbdataAlloc(connection); + conn->client = fh; + memset(&remote, 0, sizeof remote); + remote.sin_family = AF_INET; + remote.sin_port = htons(21); + inet_aton("127.0.0.1", &remote.sin_addr); + ncomm_connect(SOCK_STREAM, 0, NULL, (struct sockaddr *)&remote, sizeof remote, new_server_connection, conn); } int main(int argc, char **argv) @@ -34,10 +73,12 @@ memBuf_module_init(); ncomm_module_init(); + CBDATA_INIT_TYPE(connection); + memset(&me, 0, sizeof(me)); me.sin_family = AF_INET; me.sin_port = htons(45678); - lfh = ncomm_listen(SOCK_STREAM, 0, (struct sockaddr *)&me, sizeof(me), 8192, new_connection, NULL); + lfh = ncomm_listen(SOCK_STREAM, 0, (struct sockaddr *)&me, sizeof(me), 8192, new_client_connection, NULL); while(1) ncomm_handle_events(1000);