--------------------- PatchSet 3347 Date: 2001/11/20 22:54:48 Author: jkay Branch: push Tag: (none) Log: PUT message support. Members: src/put.c:1.1->1.1.2.1 --- /dev/null Wed Feb 14 00:55:47 2007 +++ squid/src/put.c Wed Feb 14 00:56:33 2007 @@ -0,0 +1,412 @@ +/* + * $Id: put.c,v 1.1.2.1 2001/11/20 22:54:48 jkay Exp $ + * + * DEBUG: section 81 PUT transmission and reception + * AUTHOR: Jon Kay, 1997 + * + * SQUID Internet Object Cache http://squid.nlanr.net/Squid/ + * -------------------------------------------------------- + * + * Squid is the result of efforts by numerous individuals from the + * Internet community. Development is led by Duane Wessels of the + * National Laboratory for Applied Network Research and funded by + * the National Science Foundation. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +#include "squid.h" + +#define PUT_PAGE_SIZE (CLIENT_SOCK_SZ - sizeof(Pusher)) + +#define PUT_REPLY_HDR() "HTTP/1.0 200 OK\r\n" + +#define PUT_DELETE_GAP (1<<18) + +static PF putRecvFree; +static void putReply(int fd, StoreEntry *entry); +static void putProcessHeader(HttpStateData *, clientHttpRequest *); +static void putHandleStore(void *data, char *buf, ssize_t len); +static CNCB putConnected; +static CWCB putSendData; + +/* + * Code for dealing with incoming PUT requests + */ +void +putRecv(clientHttpRequest *http) +{ + request_t *request = http->request; + ConnStateData *conn = http->conn; + HttpStateData *httpState; + StoreEntry *entry; + const cache_key *pubkey; + HttpReply *reply; + + pubkey = storeKeyPublic(http->uri, request->method); + http->request->flags.cachable = 1; + if (http->log_type != LOG_UDP_PUT) + http->log_type = LOG_TCP_PUT; + http->out.offset = 0; + + entry = storeGet(pubkey); + + if (entry) { + storeSetPrivateKey(entry); + storeRelease(entry); + } + + entry = storeCreateEntry(http->uri, http->uri, + request->flags, request->method); + assert(entry->lock_count == 1 && entry->lock_count < 200); + + http->entry = entry; + + /* Need a request pointer in the mem_obj */ + requestLink(http->request); + entry->mem_obj->request = request; + + debug(81, 4) ("putRecv: %s for '%s'\n", log_tags[http->log_type], + http->uri); + + /* Update timestamp */ + storeTimestampsSet(entry); + + /* Create an HTTP context from client_side info */ + assert(http->entry->lock_count >= 1 && http->entry->lock_count < 200); + httpState = httpStateFromClient(http); + assert(http->entry->lock_count >= 2 && http->entry->lock_count < 200); + httpState->entry = entry; + /* Prepare to reply and free httpState if PUT succeeds. */ + comm_add_close_handler(conn->fd, putRecvFree, httpState); + + assert(http->entry->lock_count >= 2 && http->entry->lock_count < 200); + /* Parse the headers */ + putProcessHeader(httpState, http); + reply = entry->mem_obj->reply; + + /* Update the timestamp */ + if (entry->mem_obj->reply->last_modified) + entry->lastmod = entry->mem_obj->reply->last_modified; + else + /* Make it clear the version has changed even if no lastmod */ + ++entry->lastmod; + + /* Make sure dist knows about the sender, and distribute the update */ + if (request->dport) + distNewUpdatee(entry, conn->peer.sin_addr, request->dport); + distEntryUpdate(entry, request, &conn->peer); + +#ifdef USE_ICPDATA + /* fwdpush support */ + if (http->log_type == LOG_TCP_PUT) { + for (p = FwdPushLinks; p; p = p->fwdpush) { + ++ndistrib; + if (p->options & NEIGHBOR_ICPDATA) { + putSend(entry, &p->in_addr, NULL, NULL, P_ICPDATA, p->icp_data_q); + } + else { + sin.sin_family = AF_INET; + sin.sin_addr = p->in_addr.sin_addr; + sin.sin_port = p->http_port; + putSend(entry, &sin, NULL, NULL, 0, 0); + } + } + } +#endif + + /* XXX - Can we use httpReplyParse to simplify this? */ + + /* httpReadReply reads msgs with bodies */ + if (entry->mem_obj->inmem_hi < reply->content_length + reply->hdr_sz) { + httpReadReply(conn->fd, httpState); + } + else + putReplyAndClose(conn->fd, entry); +} + + +/* Process PUT header. */ +static void +putProcessHeader(HttpStateData *httpState, clientHttpRequest *client) +{ + static char replline[] = "HTTP/1.0 200 OK"; + char *hdr = client->hdr_str; + ConnStateData *conn = client->conn; + int hdrlen; + int bodylen = conn->in.offset; /* !yep! */ + char *eol; + + /* First change the PUT first line to a reply first line */ + eol = strchr(hdr, '\r'); + eol[0] = '\0'; + strcpy(hdr, replline); + memset(hdr + sizeof(replline) - 1, ' ', eol - hdr - sizeof(replline) + 1); + eol[0] = '\r'; + + hdrlen = strlen(hdr); + + httpProcessReplyHeader(httpState, hdr, hdrlen); + debug(81, 6) ("putProcessHeader: need %d bytes\n", httpState->body_remain); + + /* store header and start of data */ + storeAppend(client->entry, hdr, hdrlen); + if (bodylen > 0) + storeAppend(client->entry, conn->in.buf, bodylen); + + httpState->body_remain -= client->entry->mem_obj->inmem_hi; + debug(81, 6) ("putProcessHeader: got %d bytes, %d remain\n", + hdrlen + bodylen, httpState->body_remain); +} + +void +putReplyAndClose(int fd, StoreEntry *entry) +{ + storeComplete(entry); + putReply(fd, entry); + comm_close(fd); /* close fd and call putRecvFree */ +} + +static void +putRecvFree(int fd, void *data) +{ + HttpStateData *httpState = data; + StoreEntry *entry = httpState->entry; + + putReply(fd, entry); + httpStateFree(fd, httpState); + assert(entry->lock_count >= 1); + storeUnlockObject(entry); +} + +static void +putReply(int fd, StoreEntry *entry) +{ + if (entry->mem_obj->reply->sline.status != 200) { + debug(81, 2) ("putReply: not replying - code = %d\n", + entry->mem_obj->reply->sline.status); + /* Already aborted, hopefully with error msg */ + return; + } + + /* + * PUT succeeded, acknowledge this. + * Note: comm_write is inappropriate here because this + * is a close handler. Using bare write() is a bit of a hack; + * it is 99.9% likely to work because this is the only thing + * we're writing back and it is of strictly limited length. + */ + debug(81, 6) ("putReply: replying on %d\n", fd); + write(fd, PUT_REPLY_HDR(), sizeof(PUT_REPLY_HDR())); +} + + +/* Output PUTs */ + +/* + * Make up and send a PUT to a given location. + * + * The sockaddr_in does not have to persist when + * the call completes. + */ +Pusher * +putSend(StoreEntry *entry, struct sockaddr_in *saddr, + PF *handler, void *arg, int dodist) +{ + struct in_addr nobody; + Pusher *p; + char *buf; + int fd; + + /* Make sure entry stays inmem until operation complete */ + storeLockObject(entry); + + /* Allocate memory for buffer and state tracking */ + p = (Pusher *) memAllocate(MEM_CLIENT_SOCK_BUF); + buf = (char *) &p[1]; + p->sc = storeClientListAdd(entry, p); + p->flags.connecting = 1; + p->flags.disting = dodist; + p->flags.active = 1; + p->off = 0; + p->objlen = 0; + p->entry = entry; + p->handler = handler; + p->arg = arg; +#ifdef USE_ICPDATA + if (flags & P_ICPDATA) { + p->fd = comm_virtual_fd(); + p->q = q; + q->dst = *saddr; + icpSendData(p->fd, entry, p); + return(p); + } + else + p->q = 0; +#endif + + /* Need to create and connect a PUT socket */ + nobody.s_addr = INADDR_ANY; + p->fd = fd = comm_open(SOCK_STREAM, 0, nobody, 0, COMM_NONBLOCKING, "put"); + if (fd == COMM_ERROR) { + debug(11, 1) ("putSend: Could not create new socket.\n"); + storeUnlockObject(entry); + return(0); + } + + /* Deal with socket closing */ + comm_add_close_handler(fd, (PF *) putSendDone, entry); + + /* Connect to this updatee */ + strcpy(p->hname, inet_ntoa(saddr->sin_addr)); + commConnectStart(fd, p->hname, saddr->sin_port, putConnected, p); + + return(p); +} + +/* Send the object's data */ +void +putSendDone(int fd, Pusher *p) +{ + StoreEntry *entry = p->entry; + + if (!(p->flags.active)) + return; + p->flags.active = 0; + + if (p->handler) + (*p->handler)(fd, p->arg); + + storeUnregister(p->sc, entry, (void *) fd); + storeUnlockObject(entry); + + /* Free 8k buffer and pusher state along with it */ + xfree(p); + + /* Don't close if ICP - ICP sends via theIcpOutConnection, + * which we cannot afford to close. */ + if (fd != -1) + comm_close(fd); +} + +/* xlate storeRegister calling conventions to comm_write ones */ +static void +putHandleStore(void *data, char *buf, ssize_t len) +{ + Pusher *p = (Pusher *) data; + + /* Do the work. */ + putSendData(p->fd, 0, 0, 0, p); +} + +/* Xfer data from mem_obj to updatee */ +/* XXX - duplication with icpSendMoreData, etc. */ +static void +putSendData(int fd, char *obuf, size_t size, int errflag, void *data) +{ + Pusher *p = (Pusher *) data; + StoreEntry *entry = p->entry; + char *buf, *tbuf; + + debug(53, 5) ("putSendData: FD %d '%s'\n", fd, entry->mem_obj->url); + + if (errflag) { + debug(53, 3) ("putSendData: write failed, errno %d\n", errno); + putSendDone(fd, p); + return; + } + + if (size > 0) + /* We must have been called by successful comm_write */ + p->flags.writing = 0; + + if (!(p->flags.connected) || (p->flags.writing)) + return; + + p->off += size; + if (p->off >= entry->mem_obj->inmem_hi) { + /* We've sent everything we've seen so far */ + /* note: object_len == inmem_hi iff storeComplete been called */ + if (entry->mem_obj->object_sz == entry->mem_obj->inmem_hi) + /* We've seen all we're gonna see of this version */ + putSendDone(fd, p); + return; + } + + buf = (char *) &p[1]; + storeClientCopy(p->sc, entry, + p->off + size, p->off, + PUT_PAGE_SIZE, + buf, + putHandleStore, p); + p->flags.writing = 1; + + /* + * Skip over the first line, if this is the first call to putSendData + * and that first line starts with "HTTP" - that would be the first line + * in a server reply, and is incorrect in a PUT request. + */ + if (p->off == 0 && !strncasecmp(buf, "HTTP", 4)) { + tbuf = strchr(buf, '\n'); + if (!tbuf) + /* Malformatted or not enough data present yet. + * Either way, don't propagate this message. */ + return; + + while (*++tbuf == '\r') + ; + size -= tbuf - buf; + p->off += tbuf - buf; + buf = tbuf; + } + + comm_write(fd, buf, size, putSendData, p, 0); +} + +/* Update given updatee on contents of entry */ +static void +putConnected(int fd, int commstat, void *data) +{ + Pusher *p = (Pusher *) data; + StoreEntry *entry = p->entry; + char *buf; + char *url; + + if (commstat == COMM_ERROR) { + /* XXX - this should be more sophisticated */ + debug(11, 4) ("putConnected: Could not connect to %s\n", p->hname); + comm_close(fd); + return; + } + + p->flags.connecting = 0; + p->flags.connected = 1; + + /* Figure out what URL to use */ + url = entry->mem_obj->url; + if (p->flags.disting) { + url = distifyUrl(url); + } + + /* Compose header */ + buf = (char *) &p[1]; + snprintf(buf, PUT_PAGE_SIZE, "PUT %s HTTP/1.0\r\n", url); + p->off = -strlen(buf); + + /* Write object asynchronously, starting with header */ + comm_write(fd, buf, strlen(buf), putSendData, p, 0); +} +