--------------------- PatchSet 3464 Date: 2001/12/10 22:22:24 Author: jkay Branch: push Tag: (none) Log: Created out of satcuttlefish/icp.c diffs. Members: src/icp_data.c:1.1->1.1.2.1 --- /dev/null Wed Feb 14 00:55:47 2007 +++ squid/src/icp_data.c Wed Feb 14 00:56:47 2007 @@ -0,0 +1,414 @@ +/* + * $Id: icp_data.c,v 1.1.2.1 2001/12/10 22:22:24 jkay Exp $ + * + * DEBUG: section 83 ICP data transmission + * AUTHOR: Jon Kay, 1999 + * + * SQUID Internet Object Cache http://squid.nlanr.net/Squid/ + * -------------------------------------------------------- + * + * Squid is the result of efforts by numerous individuals from the + * Internet community. Development is led by Duane Wessels of the + * National Laboratory for Applied Network Research and funded by + * the National Science Foundation. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + */ + +static icpUdpData *UdpQueueHead = NULL; +static icpUdpData *UdpQueueTail = NULL; +icpQueue UdpCtlQ; +static List_Links icpFragQ; +ICPData_Stats icpdata_stats; + +static void icpRecvPush(int fd, struct sockaddr_in *from, icp_common_t *, + char *buf, int len); +static icpFragList *icpFragNew(char *buf, int off, int nextoff, int totlen, + icp_common_t *hdr, int isfirst); + +static icpFragList *icpFragFree(icpFragList *fl); + + +#define ICP_PUTLINE "PUT %s HTTP/1.0\r\n" + +/* Fragment and encapsulate stuff to be sent */ +void +icpSendData(int vfd, StoreEntry *entry, struct _pusher *p) +{ + int fd = theOutIcpConnection; + icpQueue *q = p->q; + int len, hdrlen, curlen; + struct icp_datab_s *dbhdr; + struct _http_reply *reply; + struct icp_data_s *dhdr; + char *firstline, *lnend; + icp_common_t *hdr; + char *buf, *tbuf; + icpUdpData *pktl; + int islast = 0; + int lchg, nllen, ollen; + + ++icpdata_stats.nsends; + debug(12, 5, "icpDataSend: vfd %d fd %d '%s'\n", vfd, fd, entry->key); + + reply = entry->mem_obj->reply; + curlen = entry->mem_obj->e_current_len; + if (p->objlen == 0) + p->objlen = reply->content_length + reply->hdr_sz; + + if (p->off < p->objlen) + /* Reregister handler */ + /* excess storeRegisters deregistered in putSendDone() call */ + storeRegister(entry, vfd, (PIF) icpSendData, p); + + while (p->off < curlen) { + /* Allocate 8k page, and put AppendUdp and ICP headers at beginning */ + buf = get_free_8k_page(); + hdr = (icp_common_t *) buf; + tbuf = (char *) &hdr[1]; + + /* Fill in ICP header */ + hdr->version = ICP_VERSION_CURRENT; + hdr->flags = 0; + hdr->pad = 0; + hdr->shostid = htonl(theOutICPAddr.s_addr); + + /* Is this the last fragment? */ + /* note: object_len == e_current_len iff storeComplete been called */ + islast = (p->off + 8000 >= entry->mem_obj->e_current_len + && (entry->object_len == curlen || p->objlen <= curlen)); + + if (p->off == 0) { + /* First fragment */ + p->reqnum = htonl(storeReqnum(entry, METHOD_PUT)); + hdr->opcode = ICP_OP_DATABEG; + dbhdr = (struct icp_datab_s *) tbuf; + tbuf = (char *) &dbhdr->db_data; + dbhdr->db_ttl = htonl(8); /* XXX*/ + dbhdr->db_ts = htonl(entry->lastmod); + hdrlen = sizeof(icp_common_t) + 12; + } + else { + /* Remaining fragments */ + hdr->opcode = islast ? ICP_OP_DATAEND : ICP_OP_DATA; + dhdr = (struct icp_data_s *) tbuf; + tbuf = (char *) &dhdr->d_data; + dhdr->d_offset = htonl(p->off); + hdrlen = sizeof(icp_common_t) + 4; + } + + storeClientCopy(entry, p->off, 8000, tbuf, &len, vfd); + + if (p->off == 0) { + /* Change the first line into a PUT line from a REPLY line */ + firstline = xmalloc(sizeof(ICP_PUTLINE) + strlen(entry->url)); + sprintf(firstline, ICP_PUTLINE, entry->url); + lnend = strchr(tbuf, '\n'); + if (lnend == NULL) { + /* Not enough stuff to send. */ + safe_free(firstline); + safe_free(buf); + storeUnregister(entry, vfd); + return; + } + while (lnend[1] == '\r' || lnend[1] == '\n') + ++lnend; + ollen = lnend - tbuf + 1; + nllen = strlen(firstline); + lchg = nllen - ollen; + memmove(tbuf + nllen, tbuf + ollen, len - ollen); + xmemcpy(tbuf, firstline, nllen); + p->objlen += lchg; + len += lchg; + dbhdr->db_size = htonl(p->objlen); + } + + p->off += len; + + hdr->length = hdrlen + len; + hdr->length = htons(hdr->length); + hdr->reqnum = p->reqnum; + + pktl = (icpUdpData *) xmalloc(sizeof(*pktl)); + pktl->address = q->dst; + pktl->msg = buf; + pktl->len = hdrlen + len; + pktl->start = current_time; + pktl->logcode = LOG_TAG_NONE; + pktl->proto = PROTO_NONE; + AppendUdp(q, pktl); + if (q->maxrate) + icpUdpReply(fd, q); + else + commSetSelect(fd, COMM_SELECT_WRITE, + (PF) icpUdpReply, (void *) q, 0); + + if (islast) + putSendDone(p->fd, p); + } + + if (p->off >= p->objlen) + storeUnregister(entry, vfd); +} + +void +icpRecvData(int fd, struct sockaddr_in *src, icp_common_t *hdr, char *buf, int len) +{ + struct icp_datab_s *dbhdr; + struct icp_data_s *dhdr; + icpFragList *fl, *fl2, *efl; + int off, datalen, objlen; + int ismerged; + char *nbuf; + + if (len < sizeof(icp_common_t)) { + debug(12, 0, "icpRecvData: packet shorter than headers (%d bytes)\n", len); + return; + } + if (len < hdr->length) { + debug(12, 0, "icpRecvData: incomplete packet (%d bytes out of %d bytes)\n", + len, hdr->length); + return; + } + + if (hdr->opcode == ICP_OP_DATABEG) { + dbhdr = (struct icp_datab_s *) buf; + buf = (char *) &dbhdr->db_data; + datalen = hdr->length - sizeof(icp_common_t) - 12; + off = 0; + objlen = ntohl(dbhdr->db_size); + if (objlen <= datalen) { + /* One packet long - no defrag needed. */ + nbuf = xmalloc(objlen + 1); + xmemcpy(nbuf, buf, objlen); + icpRecvPush(fd, src, hdr, nbuf, objlen); + return; + } + } + else if (hdr->opcode == ICP_OP_DATA || hdr->opcode == ICP_OP_DATAEND) { + /* A middle fragment */ + dhdr = (struct icp_data_s *) buf; + buf = (char *) &dhdr->d_data; + datalen = hdr->length - sizeof(icp_common_t) - 4; + off = ntohl(dhdr->d_offset); + objlen = 0; + } + else { + debug(12, 0, "icpRecvData: invalid opcode %d\n", hdr->opcode); + return; + } + + /* Find first fragment, with entry */ + debug(12, 5, "icpRecvData: id 0x%x defragging %d-%d\n", + hdr->reqnum, off, off + datalen); + ++icpdata_stats.nfrags; + efl = NULL; + ismerged = 0; + if (off == 0) { + assert(objlen); + fl = icpFragNew(buf, off, off + datalen, objlen, hdr, (int) efl); + LIST_FORALL2(&fl->links, icpFragList *, fl2); { + /* Look for more fragments */ + loopstart: + if (fl2->hdr.reqnum == hdr->reqnum && fl2->off == fl->nextoff) { + /* Consolidate this fragment */ + + if (fl2->off == 0) { + /* this is a dup */ + ++icpdata_stats.ndups; + icpFragFree(fl); + return; + } + + /* merge data */ + if (fl->nextoff < fl2->nextoff) + fl->nextoff = fl2->nextoff; + xmemcpy(fl->buf + fl2->off, fl2->buf, fl2->nextoff - fl2->off); + + /* delete old fragment */ + fl2 = icpFragFree(fl); + + goto loopstart; + } + } + } + else { + LIST_FORALL2(&icpFragQ, icpFragList *, fl) { + if (fl->hdr.reqnum == hdr->reqnum && fl->off == 0) { + if (!efl) + efl = fl; + if (fl->nextoff == off) { + /* Append to this packet */ + fl->nextoff += datalen; + xmemcpy(fl->buf + off, buf, datalen); + ismerged = 1; + if (fl->off == 0 && fl->objlen > 0 && fl->nextoff >= fl->objlen) { + /* Assembly complete*/ + icpRecvPush(fd, src, &fl->hdr, fl->buf, fl->objlen); + fl->buf = 0; + icpFragFree(fl); + return; + } + } + } + } + if (!ismerged) + icpFragNew(buf, off, off + datalen, objlen, hdr, (int) efl); + } +} + +static icpFragList * +icpFragNew(char *buf, int off, int nextoff, int objlen, + icp_common_t *hdr, int isfirst) +{ + icpFragList *fl; + int len; + + fl = (icpFragList *) xmalloc(sizeof *fl); + List_Insert(&fl->links, LIST_ATREAR(&icpFragQ)); + + len = (off ? nextoff - off : objlen + 1); + fl->buf = xmalloc(len + 1); + + xmemcpy(fl->buf, buf, nextoff - off); + fl->lasttime = squid_curtime; + fl->hdr = *hdr; + fl->off = off; + fl->nextoff = nextoff; + fl->objlen = objlen; + + return(fl); +} + +static icpFragList * +icpFragFree(icpFragList *fl) +{ + icpFragList *nfl; + +#if 0 + printf("icpFragFree: id 0x%x freeing %d-%d obj at 0x%x\n", + fl->hdr.reqnum, fl->off, fl->nextoff, fl->buf); +#endif + nfl = (icpFragList *) List_Next(&fl->links); + List_Remove(&fl->links); + if (fl->buf) + safe_free(fl->buf); + safe_free(fl); + return(nfl); +} + +static void +icpFragTimer() +{ + icpFragList *fl; +#if 0 /* JSKDEBUG */ + int nfrags = 0, mem = 0; +#endif + + eventAdd("icpdata fragment timer", (EVH) icpFragTimer, NULL, + Config.icp_frag_tmo); + + LIST_FORALL2(&icpFragQ, icpFragList *, fl) { + loopstart: + if ((char *) fl != (char *) &icpFragQ) { +#if 0 /* JSKDEBUG */ + ++nfrags; + mem += fl->objlen + sizeof(*fl); +#endif + if (squid_curtime - fl->lasttime >= Config.icp_frag_tmo) { + ++icpdata_stats.ntmofrags; + fl = icpFragFree(fl); + goto loopstart; + } + } + } +#if 0 /* JSKDEBUG */ + printf("icpFragTimer: %d frags, for total of %d bytes used\n", + nfrags, mem); +#endif +} + + +static void +icpRecvPush(int fd, struct sockaddr_in *src, icp_common_t *hdr,char *buf, int len) +{ + icpStateData *icpState = xcalloc(sizeof(icpStateData), 1); + request_t *request = NULL; + int err; + + debug(12, 3, "icpRecvPush on fd %d: Hi!\n", fd); + ++icpdata_stats.nrecvs; + + /* Construct icpState */ + icpState->start = current_time; + icpState->inbuf = buf; + icpState->inbufsize = 8192; + icpState->header = *hdr; + icpState->peer = *src; + icpState->log_addr = src->sin_addr; + icpState->log_addr.s_addr &= Config.Addrs.client_netmask.s_addr; + icpState->log_type = LOG_UDP_PUT; + icpState->me = *getMyAddr(); + icpState->me.sin_port = Config.Port.icp; + icpState->entry = NULL; + icpState->in_offset = len; + icpState->fd = comm_virtual_fd(); + + fd_note(fd, inet_ntoa(icpState->log_addr)); + comm_add_close_handler(icpState->fd, icpStateFree, (void *) icpState); + + err = parseHttpRequest(icpState); + icpState->inbuf[icpState->in_offset] = '\0'; /* Terminate the string */ + if (err == 1) { + if ((request = urlParse(icpState->method, icpState->url)) == NULL) { + debug(12, 0, "icpRecvPush: Invalid URL: %s\n", icpState->url); + return; + } + safe_free(icpState->log_url); + icpState->log_url = xstrdup(urlCanonicalClean(request)); + request->http_ver = icpState->http_ver; + if (!urlCheckRequest(request)) { + debug(12, 0, "icpRecvPush: Invalid request: %s\n"); + return; + } + icpState->request = requestLink(request); + + /* XXX - Uncomment to make acls work with icpdata pushes */ + /* Currently causes core dump, needs debugging */ + /* clientAccessCheck(icpState, clientAccessCheckDone);*/ + } + else if (err == 0) { + debug(12, 0, "icpRecvPush: got partial request\n"); + return; + } + else { /* err == -1 */ + debug(12, 0, "icpRecvPush: got invalid request\n"); + return; + } + + putRecv(icpState); +} + +void +icpDataInit() +{ + List_Init(&icpFragQ); + List_Init(&UdpCtlQ.links); + UdpCtlQ.nextsend.tv_sec = UdpCtlQ.nextsend.tv_usec = 0; + UdpCtlQ.maxrate = 0; + eventAdd("icpdata fragment timer", (EVH) icpFragTimer, NULL, + Config.icp_frag_tmo); +}