/* * OpenPBS (Portable Batch System) v2.3 Software License * * Copyright (c) 1999-2000 Veridian Information Solutions, Inc. * All rights reserved. * * --------------------------------------------------------------------------- * For a license to use or redistribute the OpenPBS software under conditions * other than those described below, or to purchase support for this software, * please contact Veridian Systems, PBS Products Department ("Licensor") at: * * www.OpenPBS.org +1 650 967-4675 sales@OpenPBS.org * 877 902-4PBS (US toll-free) * --------------------------------------------------------------------------- * * This license covers use of the OpenPBS v2.3 software (the "Software") at * your site or location, and, for certain users, redistribution of the * Software to other sites and locations. Use and redistribution of * OpenPBS v2.3 in source and binary forms, with or without modification, * are permitted provided that all of the following conditions are met. * After December 31, 2001, only conditions 3-6 must be met: * * 1. Commercial and/or non-commercial use of the Software is permitted * provided a current software registration is on file at www.OpenPBS.org. * If use of this software contributes to a publication, product, or * service, proper attribution must be given; see www.OpenPBS.org/credit.html * * 2. Redistribution in any form is only permitted for non-commercial, * non-profit purposes. There can be no charge for the Software or any * software incorporating the Software. Further, there can be no * expectation of revenue generated as a consequence of redistributing * the Software. * * 3. Any Redistribution of source code must retain the above copyright notice * and the acknowledgment contained in paragraph 6, this list of conditions * and the disclaimer contained in paragraph 7. * * 4. Any Redistribution in binary form must reproduce the above copyright * notice and the acknowledgment contained in paragraph 6, this list of * conditions and the disclaimer contained in paragraph 7 in the * documentation and/or other materials provided with the distribution. * * 5. Redistributions in any form must be accompanied by information on how to * obtain complete source code for the OpenPBS software and any * modifications and/or additions to the OpenPBS software. The source code * must either be included in the distribution or be available for no more * than the cost of distribution plus a nominal fee, and all modifications * and additions to the Software must be freely redistributable by any party * (including Licensor) without restriction. * * 6. All advertising materials mentioning features or use of the Software must * display the following acknowledgment: * * "This product includes software developed by NASA Ames Research Center, * Lawrence Livermore National Laboratory, and Veridian Information * Solutions, Inc. * Visit www.OpenPBS.org for OpenPBS software support, * products, and information." * * 7. DISCLAIMER OF WARRANTY * * THIS SOFTWARE IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND. ANY EXPRESS * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT * ARE EXPRESSLY DISCLAIMED. * * IN NO EVENT SHALL VERIDIAN CORPORATION, ITS AFFILIATED COMPANIES, OR THE * U.S. GOVERNMENT OR ANY OF ITS AGENCIES BE LIABLE FOR ANY DIRECT OR INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * This license will be governed by the laws of the Commonwealth of Virginia, * without reference to its choice of law rules. */ /* ** Routines to communicate with UDP packets - Reliable Packet Protocol. ** ** This package provides for sending information in "messages" ** which are complete blocks of data which will either arrive ** complete or not at all. */ #if !defined(_BSD) && defined(_AIX) /* this is needed by AIX */ #define _BSD 1 #endif #include /* the master config generated by configure */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if defined(NTOHL_NEEDS_ARPA_INET_H) && defined(HAVE_ARPA_INET_H) #include #endif #include "rpp.h" #include "log.h" #if !defined(H_ERRNO_DECLARED) && !defined(_AIX) extern int h_errno; #endif /* ** Boolean Constants */ #ifndef FALSE #define FALSE 0 #endif #ifndef TRUE #define TRUE 1 #endif /* ** Turn on for debug log dump to /tmp. #define RPPLOG 1 */ /* ** Size-Constants for the various parts of RPP packet */ #define RPP_PKT_SIZE 4*1024 /* max packet length, including header */ #define RPP_PKT_HEAD 26 /* Size of packet header, includes CRC */ #define RPP_HDR_SID 2 /* position of stream id in packet header */ #define RPP_HDR_SEQ 10 /* position of "sequence" in pkt header */ #define RPP_CRC_LEN 8 /* Length of CRC field in pkt header */ #define RPP_PKT_CRC (RPP_PKT_HEAD - RPP_CRC_LEN) #define RPP_PKT_DATA (RPP_PKT_SIZE - RPP_PKT_HEAD) /* ** RPP packet-header fields ** ** length field ** ======== ========= ** 2 type ** 8 stream id ** 8 sequence ** 8 crc */ /* ** Integer codes for all the valid RPP message types */ #define RPP_ACK 1 #define RPP_DATA 2 #define RPP_EOD 3 #define RPP_HELLO1 4 #define RPP_HELLO2 5 #define RPP_GOODBYE 6 /* ** Integer codes for all the valid RPP state values */ #define RPP_DEAD -1 /* only set during clear_stream() */ #define RPP_FREE 0 #define RPP_OPEN_PEND 1 #define RPP_OPEN_WAIT 2 #define RPP_CONNECT 3 #define RPP_CLOSE_PEND 4 #define RPP_LAST_ACK 5 #define RPP_CLOSE_WAIT1 6 #define RPP_CLOSE_WAIT2 7 #define RPP_STALE 99 /* set when a packet has been sent more than RPP_RETRY times */ /* ** Time in seconds; packet on the master send queue is not sent more ** often than every RPP_TIMEOUT seconds. */ #define DEFAULT_RPP_TIMEOUT 4 /* ** Default number of sendto attempts on a *packet. */ #define DEFAULT_RPP_RETRY 48 /* ** Max allowed number of outstanding pkts */ #define RPP_HIGHWATER 60 int RPPTimeOut = DEFAULT_RPP_TIMEOUT; int RPPRetry = DEFAULT_RPP_RETRY; char *server_alias = NULL; long rpp_throttle_sleeptime = 0; /* external prototypes */ void rpp_shutdown_on_exit(int, void *); /* END external prototypes */ /* ** Several kinds of linked lists hang from each RPP stream structure. ** In particular, each stream structure has a list of send_packet ** structures. These structures record information that's necessary for ** managing a piece of data that's being sent to the other end of the ** connection. Besides having a pointer that links it to the next ** send_packet on the stream, a send_packet posseses a pair of pointers ** (up,down) which link the send_packet to the master send list when ** connection sequencing has reached the point where it is now proper to ** attach it to the list (stream is fully opened). Once on the master ** sendlist, any invocation of the rpp_send_out routine will attempt to ** transfer the send_packet's data buffer to the other end of the stream ** connection if all of the following hold: ** ** 1) packet hasn't reached its maximum-transfer-attempt limit ** 2) RPP_TIMEOUT or more seconds have elapsed since the last attempt ** 3) the transfer has yet to be ACK'd by the other side. ** 4) less than RPP_HIGHWATER number of non-duplicate packets are ** un-ACK'd ** ** Those "send_packets" that deal with stream control ** (RPP_ACK, RPP_HELLO1, RPP_HELLO2, RPP_GOODBYE) have no ** associated data, the send_packet's data buffer is comprised ** of only the header. */ struct send_packet { u_char *data; /* points to a buffer to be transferred */ u_short type; /* RPP_ACK, RPP_HELLO1, RPP_DATA, etc */ u_short sent_out; /* number of times sent; <= sp->retry */ int len; /* size *data not counting the header */ int index; /* other end's stream id */ int sequence; /* sequential value that is placed into */ /* data buffer's header and is returned */ /* in the RPP_ACK; identifies the data */ /* that was transferred and now needs */ /* to be removed from master send list */ time_t time_sent; /* time packet was last sent; zero if */ /* it has yet to be sent */ struct send_packet *next; /* next packet on stream's send list */ struct send_packet *up; /* used when the send_packet is */ /* on RPP's master send list */ /* pointer to preceeding packet */ struct send_packet *down; /* similar to up, but pointing to */ /* packet following this one */ }; /* ** Hanging from an stream structure is a linked list of recv_packets ** Each recv_packet records the information necessary for managing a ** piece of data that was received from the other end of a connection. ** With the exception of RPP_GOODBYE (a surrogate RPP_EOD), pkts that deal ** with stream control (RPP_ACK,RPP_HELLO1,RPP_HELLO2,) don't ever make it ** to the stream's receive list, they are handled directly as they come ** in. So, the only types of packets on a stream's receive list should ** be ones of type RPP_DATA, RPP_EOD, and RPP_GOODBYE. */ struct recv_packet { u_char *data; /* pointer to byte string that was sent from */ /* the other end of the stream */ u_short type; /* RPP_DATA, RPP_EOD or RPP_GOODBYE; i.e. a */ /* piece of the message or the end of the */ /* message (RPP_EOD or RPP_GOODBYE) */ int len; /* size in bytes of the received string; */ /* does not include the length of the header */ int sequence; /* decoded sequential number; it describes */ /* the buffer's sequential output order */ /* relative to the other buffers */ struct recv_packet *next; /* pointer to next recv_packet in */ /* the linked receive list for */ /* this stream */ }; /* ** Each stream structure has a linked list of pend structs hanging ** from it. This list of structs is used to manage a set of data buffers ** that may or may not all get 'packetized' and sent over to the other ** end of the connection. Subsequent to the creation of the list of data ** buffers, the creating program decides to either commit or not commit ** the sending of this data to the other side-- done by calling the ** interface function, rpp_wcommit. The interface function ** rpp_write doesn't send the data, it merely ** attaches the data buffer to a pending struct and links this struct to ** the stream's list of pending structs. No transfer to the other end ** gets set in motion by calling rpp_write. ** ** Doing an rpp_wcommit, on the other hand, entails adjoining a header ** to each pending struct's data buffer, attaching the buffer to a ** new send_packet struct, linking the send_packet to the master send ** list, freeing the pend struct, and doing an update of the stream's ** pend_commit variable -- a running sum of the number of bytes sent ** to the other side. ** ** Decommiting data that was written for transfer to the other end entails ** removing and freeing the pending structs and their associated data ** buffer from the stream's pend list and, updating the stream's ** pend_attempt varable back to the byte count that is stored in its ** pend_commit variable. Refer to relevant fields in struct stream. */ struct pending { u_char *data; /* pointer to a buffer of user data */ struct pending *next; /* pointer to the next pending struct */ }; /* ** Every stream that comes into existence during the life of the ** process gets realized on each end of the connection by a stream ** struct. All the stream structs generated by a process are in a ** dynamic array in the process' heap area. ** Each stream is a finite state machine. */ struct stream { int state; /* state of this end of the */ /* connection; RPP_OPEN, etc */ struct sockaddr_in addr; /* address of the other end; */ /* port/family/IPadrs */ struct in_addr *addr_array; /* array of alternate network */ /* addresses for other end */ /* of the connection */ int fd; /* must be in rpp_fd_array */ int stream_id; /* id of other end of the */ /* connection; array position */ /* of stream struct on the */ /* other end */ int retry; /* sendto retry limit */ int open_key; /* unique bit pattern created */ /* by the end issuing the */ /* rpp_open. It's the same */ /* same for each end of the */ /* connecton; used in setting */ /* up the stream connection */ int msg_cnt; /* size in bytes of current */ /* DATA/EOD/GOODBYE message */ int send_sequence; /* initialized to value of 1 */ /* and incremented by 1 for */ /* each packet that's added */ /* to the master send list */ struct pending *pend_head; /* head and tail pointers for */ struct pending *pend_tail; /* stream's pend list; see */ /* struct pend definition */ int pend_commit; /* total number of data bytes */ /* sent to other end connect */ int pend_attempt; /* total number bytes that */ /* reside in the list of */ /* pending struct buffers. */ /* pend_commit<=pend_attempt */ struct send_packet *send_head; /* head and tail pointers for */ struct send_packet *send_tail; /* stream's master send list */ /* see struct send_packet */ int recv_sequence; /* monotonic,increasing, by 1 */ /* starts from zero; A packet */ /* on the stream's recv list */ /* having a sequence number */ /* less than this value is a */ /* packet of an earlier mesg */ struct recv_packet *recv_head; /* head and tail pointers for */ struct recv_packet *recv_tail; /* the stream's recv list; */ /* see struct recv_packet */ int recv_commit; /* number of bytes from */ /* start of current message */ /* that have been accepted */ /* by the reader on this end */ int recv_attempt; /* number bytes, from start */ /* of current message, that */ /* have been read */ }; /* ** Static Variables */ static struct stream *stream_array = NULL; /* pointer to stream struct */ /* dynamic array */ static int stream_num = 0; /* current number of stream */ /* structs in stream_array */ static int pkts_sent = 0; /* range: 0 - RPP_HIGHWATER; */ /* incremented with each new */ /* pkt sent; retransmissions */ /* are not counted */ static int open_key = 0; /* monotonicly increasing */ /* value stored in stream */ /* struct on rpp_open and */ /* passed to other end to */ /* be recorded in */ /* corresponding stream */ /* struct there */ static struct send_packet *top = NULL; /* ptrs to beginning and end */ static struct send_packet *bottom = NULL; /* of master send list; */ /* All sent date is linked */ /* between top and bottom. */ /* ** Global Variables */ int rpp_dbprt = 0; /* controls debug printing */ /* ** Current file descriptor. Any call to rpp_open will use this ** for the returned stream. */ int rpp_fd = -1; /* ** A dynamic array of socket descriptors bound to a network address. ** More than one call to rpp_bind can result in this having multiple ** entries. The value of rpp_fd will be contained in this array. */ int *rpp_fd_array = NULL; /* ** Number of elements in rpp_fd_array */ int rpp_fd_num = 0; /* ** Tables used by the macros I2TOH, HTOI2, I8TOH, HTOI8 ** to convert 2 and 8 digit hexidecimal strings to integer and back. */ char cval[] = { -1, -1, -1, -1, -1, -1, -1, -1, /* nul .. bel */ -1, -1, -1, -1, -1, -1, -1, -1, /* bs .. si */ -1, -1, -1, -1, -1, -1, -1, -1, /* dle .. etb */ -1, -1, -1, -1, -1, -1, -1, -1, /* can .. us */ -1, -1, -1, -1, -1, -1, -1, -1, /* sp .. ' */ -1, -1, -1, -1, -1, -1, -1, -1, /* ( .. / */ 0, 1, 2, 3, 4, 5, 6, 7, /* 0 .. 7 */ 8, 9, -1, -1, -1, -1, -1, -1, /* 8 .. ? */ -1, 10, 11, 12, 13, 14, 15, -1, /* @ .. G */ -1, -1, -1, -1, -1, -1, -1, -1, /* H .. O */ -1, -1, -1, -1, -1, -1, -1, -1, /* P .. W */ -1, -1, -1, -1, -1, -1, -1, -1, /* X .. _ */ -1, 10, 11, 12, 13, 14, 15, -1, /* ` .. g */ -1, -1, -1, -1, -1, -1, -1, -1, /* h .. o */ -1, -1, -1, -1, -1, -1, -1, -1, /* p .. w */ -1, -1, -1, -1, -1, -1, -1, -1 /* x .. del */ }; char ival[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' }; /* ** Conversion macros */ #define I2TOH(i, h) \ { \ int num = i; \ char *str = h; \ str[1] = ival[num & 0xF]; num >>= 4 ; \ str[0] = ival[num & 0xF]; \ } #define I8TOH(i, h) \ { \ u_long num = i; \ char *str = h; \ str[7] = ival[num & 0xF]; num >>= 4; \ str[6] = ival[num & 0xF]; num >>= 4; \ str[5] = ival[num & 0xF]; num >>= 4; \ str[4] = ival[num & 0xF]; num >>= 4; \ str[3] = ival[num & 0xF]; num >>= 4; \ str[2] = ival[num & 0xF]; num >>= 4; \ str[1] = ival[num & 0xF]; num >>= 4; \ str[0] = ival[num & 0xF]; \ } #define HTOI2(h, i) \ { \ char *str = h; \ int num = 0; \ num = cval[str[0] & 0xFF]; num <<= 4; \ num |= cval[str[1] & 0xFF]; \ i = num; \ } #define HTOI8(h, i) \ { \ char *str = h; \ u_long num; \ num = (long)cval[str[0] & 0x7F]; num <<= 4; \ num |= (long)cval[str[1] & 0x7F]; num <<= 4; \ num |= (long)cval[str[2] & 0x7F]; num <<= 4; \ num |= (long)cval[str[3] & 0x7F]; num <<= 4; \ num |= (long)cval[str[4] & 0x7F]; num <<= 4; \ num |= (long)cval[str[5] & 0x7F]; num <<= 4; \ num |= (long)cval[str[6] & 0x7F]; num <<= 4; \ num |= (long)cval[str[7] & 0x7F]; \ i = num; \ } /* ** Different print macros for use in debugging. */ #ifdef DEBUG #define DBTO stdout #define LOCAL_DBPRT(x) \ if (rpp_dbprt) { \ int err = errno; \ fprintf(DBTO, "%lX: ", time(0)); \ fprintf x; \ errno = err; \ } #define DOID(x) static char id[] = x; #elif defined(RPPLOG) static char *blog_buf = NULL; static int blog_buflen = 0; static int blog_head = 0; void blog_init(char *s, int len) { if (blog_buf != NULL || len <= 0) return; if (s == NULL) blog_buf = (char *)malloc(len); else blog_buf = s; blog_buflen = len; } int blog_write(char *s) { int i, len; if (s == NULL || *s == '\0') return 0; if (blog_buf == NULL) blog_init(NULL, 64*1024); len = strlen(s) + 1; if (len > blog_buflen) return -1; for (i = 0; i < len; i++) { blog_buf[blog_head++] = *s++; blog_head %= blog_buflen; } return len -1; } void blog_out(char *filename) { FILE *f; int btrail; int c; if (blog_buf == NULL) return; if ((f = fopen(filename, "a")) == NULL) return; for (btrail = blog_head; blog_buf[btrail] != '\0'; btrail++) btrail %= blog_buflen; for (btrail++; btrail != blog_head; btrail++) { btrail %= blog_buflen; c = (int)blog_buf[btrail]; if (c == '\0') continue; putc(c, f); } fclose(f); return; } static char logbuf[4096], *logp; #define DBTO logp #define LOCAL_DBPRT(x) \ sprintf(logbuf, "%lX: ", time(0)); \ logp = logbuf + strlen(logbuf); \ sprintf x; \ blog_write(logbuf); \ if (rpp_dbprt) { \ int err = errno; \ sprintf(logbuf, "/tmp/rpp_log.%d", getpid()); \ blog_out(logbuf); \ rpp_dbprt = 0; \ errno = err; \ } #define DOID(x) static char id[] = x; #else #define DBTO xxx #define LOCAL_DBPRT(x) #define DOID(x) #endif #ifndef MIN #define MIN(x, y) (((x) < (y)) ? (x) : (y)) #endif #ifndef MAX #define MAX(x, y) (((x) > (y)) ? (x) : (y)) #endif /* ** BEGIN included source */ /*- * Copyright (c) 1991, 1993 * The Regents of the University of California. All rights reserved. * * This code is derived from software contributed to Berkeley by * James W. Williams of NASA Goddard Space Flight Center. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. All advertising materials mentioning features or use of this software * must display the following acknowledgment: * This product includes software developed by the University of * California, Berkeley and its contributors. * 4. Neither the name of the University nor the names of its contributors * may be used to endorse or promote products derived from this software * without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ static u_long crctab[] = { 0x0, 0x04c11db7, 0x09823b6e, 0x0d4326d9, 0x130476dc, 0x17c56b6b, 0x1a864db2, 0x1e475005, 0x2608edb8, 0x22c9f00f, 0x2f8ad6d6, 0x2b4bcb61, 0x350c9b64, 0x31cd86d3, 0x3c8ea00a, 0x384fbdbd, 0x4c11db70, 0x48d0c6c7, 0x4593e01e, 0x4152fda9, 0x5f15adac, 0x5bd4b01b, 0x569796c2, 0x52568b75, 0x6a1936c8, 0x6ed82b7f, 0x639b0da6, 0x675a1011, 0x791d4014, 0x7ddc5da3, 0x709f7b7a, 0x745e66cd, 0x9823b6e0, 0x9ce2ab57, 0x91a18d8e, 0x95609039, 0x8b27c03c, 0x8fe6dd8b, 0x82a5fb52, 0x8664e6e5, 0xbe2b5b58, 0xbaea46ef, 0xb7a96036, 0xb3687d81, 0xad2f2d84, 0xa9ee3033, 0xa4ad16ea, 0xa06c0b5d, 0xd4326d90, 0xd0f37027, 0xddb056fe, 0xd9714b49, 0xc7361b4c, 0xc3f706fb, 0xceb42022, 0xca753d95, 0xf23a8028, 0xf6fb9d9f, 0xfbb8bb46, 0xff79a6f1, 0xe13ef6f4, 0xe5ffeb43, 0xe8bccd9a, 0xec7dd02d, 0x34867077, 0x30476dc0, 0x3d044b19, 0x39c556ae, 0x278206ab, 0x23431b1c, 0x2e003dc5, 0x2ac12072, 0x128e9dcf, 0x164f8078, 0x1b0ca6a1, 0x1fcdbb16, 0x018aeb13, 0x054bf6a4, 0x0808d07d, 0x0cc9cdca, 0x7897ab07, 0x7c56b6b0, 0x71159069, 0x75d48dde, 0x6b93dddb, 0x6f52c06c, 0x6211e6b5, 0x66d0fb02, 0x5e9f46bf, 0x5a5e5b08, 0x571d7dd1, 0x53dc6066, 0x4d9b3063, 0x495a2dd4, 0x44190b0d, 0x40d816ba, 0xaca5c697, 0xa864db20, 0xa527fdf9, 0xa1e6e04e, 0xbfa1b04b, 0xbb60adfc, 0xb6238b25, 0xb2e29692, 0x8aad2b2f, 0x8e6c3698, 0x832f1041, 0x87ee0df6, 0x99a95df3, 0x9d684044, 0x902b669d, 0x94ea7b2a, 0xe0b41de7, 0xe4750050, 0xe9362689, 0xedf73b3e, 0xf3b06b3b, 0xf771768c, 0xfa325055, 0xfef34de2, 0xc6bcf05f, 0xc27dede8, 0xcf3ecb31, 0xcbffd686, 0xd5b88683, 0xd1799b34, 0xdc3abded, 0xd8fba05a, 0x690ce0ee, 0x6dcdfd59, 0x608edb80, 0x644fc637, 0x7a089632, 0x7ec98b85, 0x738aad5c, 0x774bb0eb, 0x4f040d56, 0x4bc510e1, 0x46863638, 0x42472b8f, 0x5c007b8a, 0x58c1663d, 0x558240e4, 0x51435d53, 0x251d3b9e, 0x21dc2629, 0x2c9f00f0, 0x285e1d47, 0x36194d42, 0x32d850f5, 0x3f9b762c, 0x3b5a6b9b, 0x0315d626, 0x07d4cb91, 0x0a97ed48, 0x0e56f0ff, 0x1011a0fa, 0x14d0bd4d, 0x19939b94, 0x1d528623, 0xf12f560e, 0xf5ee4bb9, 0xf8ad6d60, 0xfc6c70d7, 0xe22b20d2, 0xe6ea3d65, 0xeba91bbc, 0xef68060b, 0xd727bbb6, 0xd3e6a601, 0xdea580d8, 0xda649d6f, 0xc423cd6a, 0xc0e2d0dd, 0xcda1f604, 0xc960ebb3, 0xbd3e8d7e, 0xb9ff90c9, 0xb4bcb610, 0xb07daba7, 0xae3afba2, 0xaafbe615, 0xa7b8c0cc, 0xa379dd7b, 0x9b3660c6, 0x9ff77d71, 0x92b45ba8, 0x9675461f, 0x8832161a, 0x8cf30bad, 0x81b02d74, 0x857130c3, 0x5d8a9099, 0x594b8d2e, 0x5408abf7, 0x50c9b640, 0x4e8ee645, 0x4a4ffbf2, 0x470cdd2b, 0x43cdc09c, 0x7b827d21, 0x7f436096, 0x7200464f, 0x76c15bf8, 0x68860bfd, 0x6c47164a, 0x61043093, 0x65c52d24, 0x119b4be9, 0x155a565e, 0x18197087, 0x1cd86d30, 0x029f3d35, 0x065e2082, 0x0b1d065b, 0x0fdc1bec, 0x3793a651, 0x3352bbe6, 0x3e119d3f, 0x3ad08088, 0x2497d08d, 0x2056cd3a, 0x2d15ebe3, 0x29d4f654, 0xc5a92679, 0xc1683bce, 0xcc2b1d17, 0xc8ea00a0, 0xd6ad50a5, 0xd26c4d12, 0xdf2f6bcb, 0xdbee767c, 0xe3a1cbc1, 0xe760d676, 0xea23f0af, 0xeee2ed18, 0xf0a5bd1d, 0xf464a0aa, 0xf9278673, 0xfde69bc4, 0x89b8fd09, 0x8d79e0be, 0x803ac667, 0x84fbdbd0, 0x9abc8bd5, 0x9e7d9662, 0x933eb0bb, 0x97ffad0c, 0xafb010b1, 0xab710d06, 0xa6322bdf, 0xa2f33668, 0xbcb4666d, 0xb8757bda, 0xb5365d03, 0xb1f740b4 }; /* * Compute a POSIX 1003.2 checksum. This routine has been broken out so that * other programs can use it. It takes a char pointer and length. * It ruturns the crc value for the data in buf. */ u_long crc(buf, clen) u_char *buf; u_long clen; { register u_char *p; register u_long crc, len; #define COMPUTE(var, ch) (var) = (((var) << 8) ^ \ crctab[(((var) >> 24) & 0xff) ^ (ch)]) & 0xffffffff for (crc = 0, len = clen, p = buf; len--; ++p) { COMPUTE(crc, *p); } /* Include the length of the file. */ for (; clen != 0; clen >>= 8) { COMPUTE(crc, clen & 0xff); } return (~crc & 0xffffffff); } /* ** END of included source */ /* ** Generate a sequence number for a packet. */ static int next_seq(int *seq) { (*seq)++; if (*seq < 0) /* had a rollover */ { errno = EFBIG; return -1; } return 0; } /* ** Put a human readable representation of a network addres into ** a staticly allocated string. */ char * netaddr(struct sockaddr_in *ap) { static char out[80]; u_long ipadd; if (ap == NULL) return "unknown"; ipadd = ntohl(ap->sin_addr.s_addr); sprintf(out, "%ld.%ld.%ld.%ld:%d", (ipadd & 0xff000000) >> 24, (ipadd & 0x00ff0000) >> 16, (ipadd & 0x0000ff00) >> 8, (ipadd & 0x000000ff), ntohs(ap->sin_port)); return out; } /* ** set_rpp_throttle_sleep_time sets the throttle sleep time ** for rpp. This is an option set in the MOM config file. ** By default the value of rpp_throttle_sleeptime is 0 ** which means false. Other wise the value is in micro seconds ** to be used with the usleep function */ void set_rpp_throttle_sleep_time(long sleep_time) { rpp_throttle_sleeptime = sleep_time; return; } /* ** Create a packet of the given type, fill in the sequence and ** index number. If buf is NULL, malloc an area for just ** a header. If buf is not NULL, it should contain space ** for len+RPP_PKT_HEADER bytes. */ static void rpp_form_pkt( int index, int type, int seq, u_char *buf, int len) { DOID("form_pkt") struct send_packet *pktp; struct stream *sp; LOCAL_DBPRT((DBTO, "%s: index %d type %d seq %d len %d\n", id, index, type, seq, len)) sp = &stream_array[index]; pktp = (struct send_packet *)malloc(sizeof(struct send_packet)); assert(pktp != NULL); pktp->type = type; pktp->sequence = seq; pktp->time_sent = 0; pktp->sent_out = 0; pktp->len = len; pktp->index = index; if (buf) pktp->data = (u_char *)realloc(buf, len + RPP_PKT_HEAD); else pktp->data = (u_char *)malloc(RPP_PKT_HEAD); assert(pktp->data != NULL); /* ** Put on stream send list */ if (sp->send_head == NULL) sp->send_head = pktp; else sp->send_tail->next = pktp; sp->send_tail = pktp; pktp->next = NULL; pktp->down = NULL; /* ** if open has not completed yet, hold off putting on send queue */ if (sp->stream_id == -1) { pktp->up = NULL; return; } /* ** if the stream is fully open, format and put on the send queue */ LOCAL_DBPRT((DBTO, "%s: idx %d link %d seq %d len %d to sendq\n", id, index, type, seq, len)) I2TOH(type, (char *)&pktp->data[len]) I8TOH(sp->stream_id, (char *)&pktp->data[len+RPP_HDR_SID]) I8TOH(seq, (char *)&pktp->data[len+RPP_HDR_SEQ]) I8TOH(crc(pktp->data, (u_long)(len + RPP_PKT_CRC)), (char *)&pktp->data[len+RPP_PKT_CRC]) if (bottom) bottom->down = pktp; pktp->up = bottom; if (top == NULL) /* first one */ top = pktp; bottom = pktp; return; } /* ** Check to make sure an incoming packet goes with one of the ** streams we have. */ static struct stream *rpp_check_pkt( int index, struct sockaddr_in *addrp) { DOID("check_pkt") struct stream *sp; struct in_addr *addrs; int i; if ((index < 0) || (index >= stream_num)) { LOCAL_DBPRT((DBTO, "%s: BAD INDEX %d outside limit %d\n", id, index, stream_num)) return(NULL); } sp = &stream_array[index]; if (sp->state <= RPP_FREE) { LOCAL_DBPRT((DBTO, "%s: FREE STREAM\n", id)) return(NULL); } if ((addrp->sin_family == 0) || (addrp->sin_family >= AF_MAX)) { /* * Some systems have a buggy recvfrom() that doesn't set * sin_family for UDP sockets. This was directly observed * on Sandia's Tru64 system -garrick */ LOCAL_DBPRT((DBTO, "%s: buggy recvfrom(), fixing bogus sin_family value: %d\n", id, addrp->sin_family)); addrp->sin_family = sp->addr.sin_family; } if (addrp->sin_port != sp->addr.sin_port) goto bad; if (addrp->sin_family != sp->addr.sin_family) goto bad; if (addrp->sin_addr.s_addr == sp->addr.sin_addr.s_addr) { return(sp); } if ((addrs = sp->addr_array) != NULL) { for (i = 0;addrs[i].s_addr;i++) { if (addrs[i].s_addr == addrp->sin_addr.s_addr) { return(sp); } } } bad: LOCAL_DBPRT((DBTO, "%s: ADDRESS MISMATCH\n", id)) LOCAL_DBPRT((DBTO, "\tstream %d addr %s\n", index, netaddr(&sp->addr))) LOCAL_DBPRT((DBTO, "\tpkt addr %s\n", netaddr(addrp))) return(NULL); } /* END rpp_check_pkt() */ /* ** Send outstanding information starting with top and working ** down to bottom. Will not cause state change. */ static void rpp_send_out(void) { struct send_packet *pp; struct stream *sp; time_t curr; torque_socklen_t len; char *id = "rpp_send_out"; curr = time(NULL); for (pp = top;pp != NULL;pp = pp->down) { if ((curr - pp->time_sent) < RPPTimeOut) continue; if ((pp->time_sent == 0) && (pkts_sent >= RPP_HIGHWATER)) break; sp = &stream_array[pp->index]; LOCAL_DBPRT((DBTO, "%s index %d type %d sent %d seq %d to %s crc %8.8s\n", id, pp->index, pp->type, pp->sent_out, pp->sequence, netaddr(&sp->addr), (char *)&pp->data[pp->len+RPP_PKT_CRC])) len = sizeof(struct sockaddr_in); if (sendto( sp->fd, (char *)pp->data, RPP_PKT_HEAD + pp->len, 0, (struct sockaddr *)&sp->addr, len) == -1) { LOCAL_DBPRT((DBTO, "%s: SENDTO errno %d (%s)\n", id, errno, strerror(errno))) log_err(errno,id,"Error in sendto\n"); pp->sent_out++; continue; } /* has the rpp_throttle option been set? If it is * not 0 it is set. */ if(rpp_throttle_sleeptime > 0) { usleep(rpp_throttle_sleeptime); } if (pp->time_sent == 0) /* new one */ pkts_sent++; pp->time_sent = curr; pp->sent_out++; } /* END for (pp) */ /* NOTE: failure cannot be detected */ return; } /* END rpp_send_out */ /* ** Create or reuse a position in stream_array. */ static int rpp_create_sp(void) { int i; struct stream *sp = NULL; if (stream_array == NULL) { stream_array = (struct stream *)malloc(sizeof(struct stream)); if (stream_array == NULL) { return(-1); } memset(stream_array, '\0', sizeof(struct stream)); stream_num = 1; } for (i = 0;i < stream_num;i++) { sp = &stream_array[i]; if (sp->state == RPP_FREE) break; } if (i == stream_num) { for (i = 0;i < stream_num;i++) { sp = &stream_array[i]; if (sp->state == RPP_DEAD) break; } } if (i == stream_num) { /* no free streams available */ sp = (struct stream *)realloc( (void *)stream_array, (stream_num * 2) * sizeof(struct stream)); if (sp == NULL) { sp = (struct stream *)realloc( (void *)stream_array, (stream_num + 1) * sizeof(struct stream)); if (sp == NULL) { return(-1); } stream_num++; } else { stream_num *= 2; } stream_array = sp; sp = &stream_array[i]; memset((void *)sp, '\0', (stream_num - i) * sizeof(struct stream)); } else { memset((void *)sp, '\0', sizeof(struct stream)); } LOCAL_DBPRT((DBTO, "rpp_create_sp: new index %d\n", i)) return(i); } /* END rpp_create_sp() */ /* ** Look up the "canonical" name for the host by ** calling gethostbyaddr with an IP address. */ static struct hostent *rpp_get_cname( struct sockaddr_in *addr) { DOID("get_cname") struct hostent *hp; char *hname; if ((hp = gethostbyaddr( (void *)&addr->sin_addr, sizeof(struct in_addr), addr->sin_family)) == NULL) { LOCAL_DBPRT((DBTO, "%s: addr not found, h_errno=%d errno=%d (%s)\n", id, h_errno, errno, strerror(errno))) return(NULL); } if ((hname = (char *)strdup(hp->h_name)) == NULL) { return(NULL); } if ((hp = gethostbyname(hname)) == NULL) { LOCAL_DBPRT((DBTO, "%s: canonical name %s not found, h_errno=%d errno=%d (%s)\n", id, hname, h_errno, errno, strerror(errno))) } free(hname); /* SUCCESS */ return(hp); } /* ** Allocate a list of alternate address for a host and save ** them in the stream structure. */ static void rpp_alist( struct hostent *hp, struct stream *sp) { int i, j; for (i = 1;hp->h_addr_list[i];i++); if (i == 1) { return; } sp->addr_array = (struct in_addr *)calloc(i, sizeof(struct in_addr)); if (sp->addr_array == NULL) { return; } j = 0; for (i = 0;hp->h_addr_list[i];i++) { if (memcmp(&sp->addr.sin_addr, hp->h_addr_list[i], hp->h_length) == 0) continue; memcpy(&sp->addr_array[j++], hp->h_addr_list[i], hp->h_length); } sp->addr_array[j].s_addr = 0; return; } static int rpp_send_ack( struct stream *sp, int seq) { DOID("send_ack") char buf[RPP_PKT_HEAD]; u_long xcrc; if (sp->stream_id < 0) { /* can't send yet */ LOCAL_DBPRT((DBTO, "%s: STREAM NOT OPEN seq %d\n", id, seq)) return(0); } I2TOH(RPP_ACK, buf) I8TOH(sp->stream_id, &buf[2]) I8TOH(seq, &buf[10]) xcrc = crc((u_char *)buf, (u_long)RPP_PKT_CRC); I8TOH(xcrc, &buf[RPP_PKT_CRC]) LOCAL_DBPRT((DBTO, "%s: seq %d to %s crc %lX\n", id, seq, netaddr(&sp->addr), xcrc)) if (sendto( sp->fd, buf, RPP_PKT_HEAD, 0, (struct sockaddr *)&sp->addr, sizeof(struct sockaddr_in)) == -1) { LOCAL_DBPRT((DBTO, "%s: ACK error %d (%s)\n", id, errno, strerror(errno))) if ((errno != EWOULDBLOCK) && (errno != ENOBUFS)) { return(-1); } } return(0); } /* END rpp_send_ack() */ /* ** Take a packet off the send queue and free it. */ static void dqueue( struct send_packet *pp) { if (pp->down == NULL) bottom = pp->up; else pp->down->up = pp->up; if (pp->up == NULL) top = pp->down; else pp->up->down = pp->down; if (--pkts_sent < 0) pkts_sent = 0; free(pp->data); free(pp); return; } /* ** Get rid of anything on the pend and send queue for a stream. */ static void clear_send( struct stream *sp) { struct pending *ppp, *pprev; struct send_packet *spp, *sprev; for (ppp = sp->pend_head;ppp;ppp = pprev) { pprev = ppp->next; free(ppp->data); free(ppp); } sp->pend_head = NULL; sp->pend_tail = NULL; sp->pend_commit = 0; sp->pend_attempt = 0; for (spp = sp->send_head;spp;spp = sprev) { sprev = spp->next; if (sp->stream_id == -1) { /* not open yet */ struct send_packet *look; /* might not be */ /* on send queue */ for (look = top;look;look = look->down) { if (look == spp) break; } if (look == NULL) { free(spp->data); free(spp); continue; } } dqueue(spp); } sp->send_head = NULL; sp->send_tail = NULL; return; } /* END clear_send() */ /* ** Remove packets from receive, pending and send queues for ** a stream, free all the memory and zero the stream_array ** entry. */ static void clear_stream( struct stream *sp) { struct recv_packet *rpp, *rprev; LOCAL_DBPRT((DBTO, "CLEAR stream %ld\n", ((long)sp - (long)stream_array) / sizeof(struct stream))) for (rpp = sp->recv_head;rpp;rpp = rprev) { rprev = rpp->next; if (rpp->data) free(rpp->data); free(rpp); } sp->recv_head = NULL; sp->recv_tail = NULL; clear_send(sp); if (sp->addr_array) { free(sp->addr_array); sp->addr_array = NULL; } sp->state = RPP_DEAD; return; } /* END clear_stream() */ /* ** Do a recvfrom() call to get a packet off of all file descriptors. ** Return the index of the stream the packet belonged to, ** -2 if it was no data, or -1 if there was an error. ** Return -3 if there was no data to read. ** MAY CAUSE STATE CHANGE! */ static int rpp_recv_pkt( int fd) /* I */ { DOID("recv_pkt") torque_socklen_t flen; int len; struct sockaddr_in addr; struct hostent *hp; int i, streamid; struct send_packet *spp, *sprev; struct recv_packet *rpp, *rprev; struct recv_packet *pkt; struct stream *sp; char *data; int type; int sequence; u_long pktcrc; data = malloc(RPP_PKT_SIZE); assert(data != NULL); flen = sizeof(struct sockaddr_in); /* ** Loop so we can avoid failing on EINTR. Thanks to ** Pete Wyckoff for finding this. */ for (;;) { len = recvfrom( fd, data, RPP_PKT_SIZE, 0, (struct sockaddr *) & addr, &flen); if (len != -1) break; if (errno == EINTR) continue; free(data); if ((errno == EWOULDBLOCK) || (errno == EAGAIN) || (errno == ECONNREFUSED)) { errno = 0; return(-3); } return(-1); } /* END for (;;) */ LOCAL_DBPRT((DBTO, "%s: addr %s len %d\n", id, netaddr(&addr), len)) if (len < RPP_PKT_HEAD) /* less than minimum size */ goto err_out; HTOI8(&data[len-RPP_CRC_LEN], pktcrc) if (pktcrc != crc((u_char *)data, (u_long)(len - RPP_CRC_LEN))) { LOCAL_DBPRT((DBTO, "%s: packet crc %08lX failed\n", id, pktcrc)) goto err_out; } HTOI2(&data[len-RPP_PKT_HEAD], type) HTOI8(&data[len-RPP_PKT_HEAD+RPP_HDR_SID], streamid) HTOI8(&data[len-RPP_PKT_HEAD+RPP_HDR_SEQ], sequence) switch (type) { case RPP_ACK: LOCAL_DBPRT((DBTO, "%s: ACK stream %d sequence %d crc %08lX\n", id, streamid, sequence, pktcrc)) free(data); if ((sp = rpp_check_pkt(streamid, &addr)) == NULL) { return(-2); } if (sp->state == RPP_OPEN_PEND) { if (sequence != sp->open_key) { LOCAL_DBPRT((DBTO, "%s: WILD ACK in RPP_OPEN_PEND %d\n", id, streamid)) return(-2); } spp = sp->send_head; assert(spp->type == RPP_HELLO2); assert(spp->next == NULL); sp->state = RPP_CONNECT; sp->send_head = NULL; sp->send_tail = NULL; dqueue(spp); /* SUCCESS */ return(streamid); } else if (sp->stream_id == -1) { LOCAL_DBPRT((DBTO, "%s: ACK for closed stream %d\n", id, streamid)) return(-2); } for (spp = sp->send_head, sprev = NULL;spp;sprev = spp, spp = spp->next) { if (spp->sequence == sequence) break; } if (spp != NULL) { LOCAL_DBPRT((DBTO, "%s: stream %d seq %d took %ld\n", id, streamid, sequence, (long)(time(NULL) - spp->time_sent))) if ((sp->state == RPP_CLOSE_WAIT1) && (spp->type == RPP_GOODBYE)) sp->state = RPP_CLOSE_WAIT2; if (sprev == NULL) sp->send_head = spp->next; else sprev->next = spp->next; if (sp->send_tail == spp) sp->send_tail = sprev; dqueue(spp); if ((sp->state == RPP_LAST_ACK) && (sp->send_head == NULL)) { clear_stream(sp); return(-2); } } return(streamid); /*NOTREACHED*/ break; case RPP_GOODBYE: LOCAL_DBPRT((DBTO, "%s: GOODBYE stream %d sequence %d crc %08lX\n", id, streamid, sequence, pktcrc)) free(data); if ((sp = rpp_check_pkt(streamid, &addr)) == NULL) { return(-2); } if (rpp_send_ack(sp, sequence) == -1) { return(-1); } switch (sp->state) { case RPP_OPEN_PEND: case RPP_OPEN_WAIT: case RPP_CLOSE_PEND: case RPP_LAST_ACK: return(-2); /*NOTREACHED*/ break; case RPP_CLOSE_WAIT1: sp->state = RPP_LAST_ACK; return(-2); /*NOTREACHED*/ break; case RPP_CLOSE_WAIT2: clear_stream(sp); return(-2); /*NOTREACHED*/ break; default: /* NO-OP */ break; } /* END switch (sp->state) */ sp->state = RPP_CLOSE_PEND; clear_send(sp); /* other side not reading now */ for (rpp = sp->recv_head, rprev = NULL;rpp != NULL;rprev = rpp, rpp = rpp->next) { if (rpp->sequence >= sequence) break; } if ((rpp == NULL) || (rpp->sequence > sequence)) { LOCAL_DBPRT((DBTO, "%s: GOOD seq %d\n", id, sequence)) pkt = (struct recv_packet *)malloc(sizeof(struct recv_packet)); assert(pkt != NULL); pkt->type = type; pkt->sequence = sequence; pkt->len = 0; pkt->data = NULL; if (rprev == NULL) { pkt->next = sp->recv_head; sp->recv_head = pkt; } else { pkt->next = rprev->next; rprev->next = pkt; } if (sp->recv_tail == rprev) sp->recv_tail = pkt; } else { LOCAL_DBPRT((DBTO, "%s: DUPLICATE seq %d MAX seen %d\n", id, sequence, rpp->sequence)) } return(-2); /*NOTREACHED*/ break; case RPP_DATA: case RPP_EOD: LOCAL_DBPRT((DBTO, "%s: DATA stream %d sequence %d crc %08lX len %d\n", id, streamid, sequence, pktcrc, len)) if ((sp = rpp_check_pkt(streamid, &addr)) == NULL) goto err_out; if (rpp_send_ack(sp, sequence) == -1) { free(data); return(-1); } switch (sp->state) { case RPP_OPEN_WAIT: LOCAL_DBPRT((DBTO, "INPUT on unconnected stream %d\n", streamid)) free(data); return(-2); case RPP_CLOSE_WAIT1: case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: LOCAL_DBPRT((DBTO, "INPUT on closed stream %d\n", streamid)) free(data); return(-2); break; default: /* NO-OP */ break; } if (sequence < sp->recv_sequence) { LOCAL_DBPRT((DBTO, "%s: OLD seq %d\n", id, sequence)) free(data); return(-2); } for (rpp = sp->recv_head, rprev = NULL;rpp != NULL;rprev = rpp, rpp = rpp->next) { if (rpp->sequence >= sequence) break; } if ((rpp == NULL) || (rpp->sequence > sequence)) { LOCAL_DBPRT((DBTO, "%s: GOOD seq %d\n", id, sequence)) data = realloc(data, len); assert(data != NULL); pkt = (struct recv_packet *)malloc(sizeof(struct recv_packet)); assert(pkt != NULL); pkt->type = type; pkt->sequence = sequence; pkt->len = len - RPP_PKT_HEAD; pkt->data = (u_char *)data; if (rprev == NULL) { pkt->next = sp->recv_head; sp->recv_head = pkt; } else { pkt->next = rprev->next; rprev->next = pkt; } if (sp->recv_tail == rprev) sp->recv_tail = pkt; if (sp->state == RPP_OPEN_PEND) return -2; else return streamid; } else { LOCAL_DBPRT((DBTO, "%s: DUPLICATE seq %d MAX seen %d\n", id, sequence, rpp->sequence)) free(data); } break; case RPP_HELLO1: /* ** HELLO1 packets have the remote side's stream index ** in the "streamid" field and open key in the sequence. */ LOCAL_DBPRT((DBTO, "%s: HELLO1 stream %d sequence %d\n", id, streamid, sequence)) free(data); for (i = 0; i < stream_num; i++) { sp = &stream_array[i]; if (sp->state <= RPP_FREE) continue; if (memcmp(&sp->addr, &addr, sizeof(addr))) continue; if (sp->open_key == sequence) { rpp_send_out(); return -2; } LOCAL_DBPRT((DBTO, "OLD STREAM state %d reopened %d %d\n", sp->state, sp->open_key, sequence)) clear_stream(sp); /* old stream */ } i = rpp_create_sp(); if (i == -1) return -1; sp = &stream_array[i]; sp->state = RPP_OPEN_PEND; sp->fd = fd; sp->retry = RPPRetry; memcpy(&sp->addr, &addr, sizeof(addr)); if ((hp = rpp_get_cname(&addr)) != NULL) rpp_alist(hp, sp); sp->stream_id = streamid; sp->open_key = sequence; open_key = MAX(open_key, sequence); rpp_form_pkt(i, RPP_HELLO2, i, NULL, 0); rpp_send_out(); break; case RPP_HELLO2: /* ** HELLO2 packet has this side's stream index in ** "streamid" as usual and the remote side's ** stream index overloaded in the "sequence" field. */ LOCAL_DBPRT((DBTO, "%s: HELLO2 stream %d sequence %d\n", id, streamid, sequence)) free(data); if ((sp = rpp_check_pkt(streamid, &addr)) == NULL) return -2; switch (sp->state) { case RPP_OPEN_WAIT: sp->state = RPP_CONNECT; break; case RPP_CLOSE_WAIT1: /* called close before open done */ case RPP_LAST_ACK: break; default: if (sp->stream_id == sequence) { LOCAL_DBPRT((DBTO, "%s: stream %d got DUP HELLO2 %d\n", id, streamid, sp->state)) if (rpp_send_ack(sp, sp->open_key) == -1) return -1; } else { LOCAL_DBPRT((DBTO, "%s: NON-DUP HELLO2\n", id)) } return -2; } sp->stream_id = sequence; if (rpp_send_ack(sp, sp->open_key) == -1) return -1; if ((spp = sp->send_head) == NULL) { LOCAL_DBPRT((DBTO, "%s: stream %d got HELLO2 but sendq NULL\n", id, streamid)) return -2; } if (spp->type != RPP_HELLO1) { LOCAL_DBPRT((DBTO, "%s: stream %d sendq %d rather than HELLO1\n", id, streamid, spp->type)) return -2; } sp->send_head = spp->next; /* remove HELLO1 pkt */ if (sp->send_tail == spp) sp->send_tail = NULL; dqueue(spp); /* ** Put any waitting packets onto the send queue */ for (spp = sp->send_head; spp; spp = spp->next) { int len = spp->len; LOCAL_DBPRT((DBTO, "%s: idx %d link %d seq %d len %d to sendq\n", id, streamid, spp->type, spp->sequence, len)) I2TOH(spp->type, (char *)&spp->data[len]) I8TOH(sp->stream_id, (char *)&spp->data[len+RPP_HDR_SID]) I8TOH(spp->sequence, (char *)&spp->data[len+RPP_HDR_SEQ]) I8TOH(crc(spp->data, (u_long)(len + RPP_PKT_CRC)), (char *)&spp->data[len+RPP_PKT_CRC]) if (bottom) bottom->down = spp; spp->up = bottom; spp->down = NULL; if (top == NULL) /* first one */ top = spp; bottom = spp; } break; default: LOCAL_DBPRT((DBTO, "%s: UNKNOWN packet type %d stream %d sequence %d\n", id, type, streamid, sequence)) free(data); break; } /* END switch (type) */ return(-2); err_out: /* no data to read */ free(data); return(-2); } /* END rpp_recv_pkt() */ /* ** Do recv calls until there is one that shows data. */ static int rpp_recv_all(void) { int i, ret; int rc = -3; LOCAL_DBPRT((DBTO, "entered rpp_recv_all\n")) for (i = 0;i < rpp_fd_num;i++) { ret = rpp_recv_pkt(rpp_fd_array[i]); rc = MAX(ret, rc); if (ret == -1) { /* failure detected */ break; } } /* END for (i) */ return(rc); } /* rpp_recv_all() */ /* ** Check to see if any packet being sent out on a stream has ** been sent more than a reasonable number of times. */ static void rpp_stale( struct stream *sp) /* I (cleared on corruption) */ { struct send_packet *pp; int counter; if ((sp->state <= RPP_FREE) || (sp->state == RPP_STALE)) { return; } counter = 0; for (pp = sp->send_head;pp != NULL;pp = pp->next) { counter++; if (pp->sent_out >= sp->retry) break; if (counter > 1024) { if (pp->next == pp) { LOCAL_DBPRT((DBTO, "RPP PACKET is corrupt - seq %d sent %d of %d - fixing linked list\n", pp->sequence, pp->sent_out, sp->retry)) pp->next = NULL; } else { /* stream is corrupt - destroy it */ LOCAL_DBPRT((DBTO, "RPP PACKET is corrupt - seq %d sent %d of %d - destroying stream\n", pp->sequence, pp->sent_out, sp->retry)) clear_stream(sp); return; } } } /* END for (pp) */ if (pp != NULL) { LOCAL_DBPRT((DBTO, "STALE PACKET seq %d sent %d of %d\n", pp->sequence, pp->sent_out, sp->retry)) switch (sp->state) { case RPP_OPEN_PEND: case RPP_CLOSE_WAIT1: case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: /* what do these states indicate? */ clear_stream(sp); break; default: sp->state = RPP_STALE; break; } } /* END if (pp != NULL) */ return; } /* END rpp_stale() */ /* ** Form data packets for any pending data. If flag is true, ** create an EOD packet too. */ static int rpp_dopending( int index, /* I */ int flag) /* I */ { DOID("dopending") struct stream *sp; struct pending *pp; LOCAL_DBPRT((DBTO, "%s: entered index %d\n", id, index)) sp = &stream_array[index]; for (pp = sp->pend_head;pp != sp->pend_tail;pp = sp->pend_head) { rpp_form_pkt(index, RPP_DATA, sp->send_sequence, pp->data, RPP_PKT_DATA); sp->pend_head = pp->next; free(pp); sp->pend_attempt -= RPP_PKT_DATA; if (next_seq(&sp->send_sequence) == -1) { return(-1); } } if (flag) { rpp_form_pkt( index, RPP_EOD, sp->send_sequence, (pp != NULL) ? pp->data : NULL, sp->pend_attempt); if (pp != NULL) { free(pp); sp->pend_head = NULL; sp->pend_tail = NULL; } sp->pend_attempt = 0; if (next_seq(&sp->send_sequence) == -1) { return(-1); } } sp->pend_commit = sp->pend_attempt; return(0); } /* END rpp_dopending() */ /* ** Flush all data out of a stream -- do an end of message. ** Return 0 if it all went well, -1 on error. */ int rpp_flush( int index) { DOID("flush") struct stream *sp; LOCAL_DBPRT((DBTO, "%s: entered index %d\n", id, index)) if ((index < 0) || (index >= stream_num)) { errno = EINVAL; return(-1); } sp = &stream_array[index]; switch (sp->state) { case RPP_CLOSE_PEND: errno = EPIPE; return(-1); /*NOTREACHED*/ break; case RPP_DEAD: case RPP_FREE: case RPP_OPEN_PEND: case RPP_CLOSE_WAIT1: case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: errno = ENOTCONN; return(-1); /*NOTREACHED*/ break; default: /* NO-OP */ break; } /* ** if something is pending or we need to return a zero len EOM, ** call rpp_dopending(). */ if ((sp->pend_head != NULL) || (sp->send_head == NULL)) { if (rpp_dopending(index, TRUE)) { return(-1); } } if (rpp_recv_all() == -1) { return(-1); } rpp_send_out(); return(0); } /* END rpp_flush() */ /* ** Create a new socket if needed and bind a local port. ** If port is 0, pick a free port number. */ int rpp_bind( uint port) { struct sockaddr_in from; int flags; if (rpp_fd == -1) { if ((rpp_fd = socket(PF_INET, SOCK_DGRAM, 0)) == -1) { return(-1); } /* set close on exec */ if ((flags = fcntl(rpp_fd, F_GETFD)) == -1) { close(rpp_fd); rpp_fd = -1; return(-1); } flags |= FD_CLOEXEC; if (fcntl(rpp_fd, F_SETFD, flags) == -1) { close(rpp_fd); rpp_fd = -1; return(-1); } /* set no delay */ if ((flags = fcntl(rpp_fd, F_GETFL)) == -1) { close(rpp_fd); rpp_fd = -1; return(-1); } #if defined(FNDELAY) && !defined(__hpux) flags |= FNDELAY; #else flags |= O_NONBLOCK; #endif if (fcntl(rpp_fd, F_SETFL, flags) == -1) { close(rpp_fd); rpp_fd = -1; return(-1); } } /* END if (rpp_fd == -1) */ if (rpp_fd_array != NULL) { int i; for (i = 0;i < rpp_fd_num;i++) { if (rpp_fd_array[i] == rpp_fd) { return(rpp_fd); } } } memset(&from, '\0', sizeof(from)); from.sin_family = AF_INET; from.sin_addr.s_addr = htonl(INADDR_ANY); from.sin_port = htons((u_short)port); if (bind(rpp_fd, (struct sockaddr *)&from, sizeof(from)) == -1) { return(-1); } LOCAL_DBPRT((DBTO, "bind to port %d\n", ntohs(from.sin_port))) if (rpp_fd_array == NULL) { rpp_fd_array = (int *)malloc(sizeof(int)); rpp_fd_num = 1; #if defined(HAVE_ATEXIT) atexit(rpp_shutdown); #elif defined(HAVE_ON_EXIT) on_exit(rpp_shutdown_on_exit, NULL); #else /* atexit() or on_exit() must be defined */ abort compile #endif /* HAVE_ATEXIT */ } else { rpp_fd_num++; rpp_fd_array = (int *)realloc(rpp_fd_array, sizeof(int) * rpp_fd_num); } assert(rpp_fd_array); rpp_fd_array[rpp_fd_num-1] = rpp_fd; return(rpp_fd); } /** * Allocate a communication stream - return -1 on FAILURE */ int rpp_open( char *name, /* I */ uint port, /* I */ char *EMsg) /* O (optional,minsize=1024) */ { DOID("rpp_open") int i, stream; struct hostent *hp; struct stream *sp; LOCAL_DBPRT((DBTO, "%s: entered %s:%d\n", id, name, port)) if (EMsg != NULL) EMsg[0] = '\0'; if (rpp_bind(0) == -1) /* bind if we need to */ { if (EMsg != NULL) sprintf(EMsg, "cannot bind rpp socket"); /* FAILURE */ return(-1); } /* First, we look up the IP address for this name. */ if ((hp = gethostbyname(name)) == NULL) { LOCAL_DBPRT((DBTO, "%s: host %s not found\n", id, name)) errno = ENOENT; if (EMsg != NULL) { sprintf(EMsg, "hostname resolution for '%s' failed, errno=%d", name, h_errno); } /* FAILURE */ return(-1); } /* ** Look for previously existant stream to the given ** host. If one is found in an open state, just ** return it. */ for (i = 0;i < stream_num;i++) { sp = &stream_array[i]; if (sp->state <= RPP_FREE) continue; if (memcmp(&sp->addr.sin_addr, hp->h_addr, hp->h_length)) continue; if (sp->addr.sin_port != htons((unsigned short)port)) continue; if (sp->addr.sin_family != hp->h_addrtype) continue; if (sp->state > RPP_CLOSE_PEND) { LOCAL_DBPRT((DBTO, "%s: OLD STREAM state %d reopened %d\n", id, sp->state, sp->open_key)) clear_stream(sp); /* old stream */ } else { LOCAL_DBPRT((DBTO, "%s: reopen of %s, sp->retry %d, global %d\n", id, netaddr(&sp->addr), sp->retry, RPPRetry)) sp->retry = RPPRetry; /* SUCCESS */ return(i); } } stream = rpp_create_sp(); if (stream == -1) { if (EMsg != NULL) { sprintf(EMsg, "cannot create new stream"); } /* FAILURE */ return(-1); } sp = &stream_array[stream]; if (open_key == 0) open_key = (int)time(0) & 0x0fff; /* ** We save the address returned for the name given so we ** can send out on the preferred interface. */ memcpy(&sp->addr.sin_addr, hp->h_addr, hp->h_length); sp->addr.sin_port = htons((unsigned short)port); sp->addr.sin_family = hp->h_addrtype; sp->fd = rpp_fd; sp->retry = RPPRetry; if (hp->h_addr_list[1] == NULL) { if ((hp = rpp_get_cname(&sp->addr)) == NULL) { errno = ENOENT; if (EMsg != NULL) { sprintf(EMsg, "cannot lookup cname for host '%s'", name); } /* FAILURE */ return(-1); } } rpp_alist(hp, sp); if(server_alias) { hp = gethostbyname(server_alias); if(hp) { if(sp->addr_array == NULL) { sp->addr_array = (struct in_addr *)calloc(1, sizeof(struct in_addr)); if(sp->addr_array) { memcpy(&sp->addr_array[0], hp->h_addr_list[0], hp->h_length); } } else { struct in_addr *tmp_array; int j; for(i = 0; &sp->addr_array[i] != NULL; i++); tmp_array = ( struct in_addr *)calloc(i, sizeof(struct in_addr)); if(tmp_array) { for(j = 0; j < i; j++) { memcpy(&tmp_array[j], &sp->addr_array[j], hp->h_length); } memcpy(&tmp_array[i], hp->h_addr_list[0], hp->h_length); } } } } sp->stream_id = stream; /* use my streamid for HELLO1 */ sp->state = RPP_OPEN_WAIT; sp->open_key = open_key++; rpp_form_pkt(stream, RPP_HELLO1, sp->open_key, NULL, 0); sp->stream_id = -1; /* don't know his stream id yet */ if (rpp_recv_all() == -1) { if (EMsg != NULL) { sprintf(EMsg, "rpp_recv_all failed"); } /* FAILURE */ return(-1); } /* SUCCESS */ rpp_send_out(); return(stream); } /* END rpp_open() */ /* ** Return the network address for a stream. */ struct sockaddr_in *rpp_getaddr( int index) { DOID("getaddr") struct stream *sp; LOCAL_DBPRT((DBTO, "%s: entered index %d\n", id, index)) if ((index < 0) || (index >= stream_num)) { errno = EINVAL; return(NULL); } sp = &stream_array[index]; if (sp->state <= RPP_FREE) { errno = ENOTCONN; return(NULL); } return(&sp->addr); } /* ** Free all memory and close the socket. */ void rpp_terminate(void) { struct stream *sp; struct send_packet *spp; struct pending *ppp; struct recv_packet *rpp; int i; for (i = 0;i < rpp_fd_num;i++) close(rpp_fd_array[i]); if (rpp_fd_array) { free(rpp_fd_array); rpp_fd_array = NULL; rpp_fd_num = 0; } for (i = 0;i < stream_num;i++) { sp = &stream_array[i]; if (sp->state == RPP_DEAD) continue; for (ppp = sp->pend_head;ppp != NULL;ppp = sp->pend_head) { free(ppp->data); sp->pend_head = ppp->next; free(ppp); } for (rpp = sp->recv_head;rpp != NULL;rpp = sp->recv_head) { if (rpp->data != NULL) free(rpp->data); sp->recv_head = rpp->next; free(rpp); } for (spp = sp->send_head;spp != NULL;spp = sp->send_head) { free(spp->data); sp->send_head = spp->next; free(spp); } } top = NULL; bottom = NULL; if (stream_array) free(stream_array); stream_num = 0; stream_array = NULL; rpp_fd = -1; return; } /* END rpp_terminate() */ /* ** Trampoline for on_exit */ void rpp_shutdown_on_exit( int foo, void *bar) { rpp_shutdown(); return; } /* END rpp_shutdown_on_exit() */ /* ** Shutdown the library. Flush and close all open streams ** and call rpp_terminate(). */ void rpp_shutdown(void) { int timeouts, num, i; fd_set fdset; struct timeval tv; FD_ZERO(&fdset); for (i = 0;i < stream_num;i++) { rpp_close(i); } for (timeouts = 0;timeouts < 3;) { for (i = 0;i < stream_num;i++) { if (stream_array[i].state > RPP_FREE) break; } if (i == stream_num) break; LOCAL_DBPRT((DBTO, "shutdown: stream %d state %d\n", i, stream_array[i].state)) if ((num = rpp_recv_all()) == -1) break; rpp_send_out(); if (num == -3) { /* got nothing -- wait a bit */ tv.tv_sec = RPPTimeOut; tv.tv_usec = 0; for (i = 0;i < rpp_fd_num;i++) FD_SET(rpp_fd_array[i], &fdset); i = select(FD_SETSIZE, &fdset, NULL, NULL, &tv); if (i == 0) timeouts++; if (i == -1) break; } } /* END for (timeouts) */ rpp_terminate(); return; } /* END rpp_shutdown() */ /* ** Terminate a connection stream. ** Return 0 if it all went well, -1 on error. */ int rpp_close( int index) { DOID("close") struct stream *sp; LOCAL_DBPRT((DBTO, "%s: entered index %d\n", id, index)) if ((index < 0) || (index >= stream_num)) { errno = EINVAL; return(-1); } sp = &stream_array[index]; switch (sp->state) { case RPP_STALE: clear_stream(sp); return(0); /*NOTREACHED*/ break; case RPP_CLOSE_PEND: sp->state = RPP_LAST_ACK; break; case RPP_OPEN_WAIT: case RPP_CONNECT: if (sp->pend_head != NULL) { if (rpp_dopending(index, TRUE)) { return(-1); } } sp->state = RPP_CLOSE_WAIT1; break; default: errno = ENOTCONN; return(-1); /* stream closed */ /*NOTREACHED*/ break; } rpp_form_pkt(index, RPP_GOODBYE, sp->send_sequence, NULL, 0); if (rpp_recv_all() == -1) { return(-1); } rpp_send_out(); return(0); } /* END rpp_close() */ /* ** Add information to the stream given by index. ** Return -1 on error, otherwise number of bytes written. */ int rpp_write( int index, void *buf, int len) { DOID("write") struct stream *sp; struct pending *pp; int hold, residual, more; LOCAL_DBPRT((DBTO, "%s: entered index %d size %d\n", id, index, len)) if ((index < 0) || (index >= stream_num) || (len < 0)) { errno = EINVAL; return(-1); } if (len == 0) { return(0); } sp = &stream_array[index]; rpp_stale(sp); /* check freshness */ switch (sp->state) { case RPP_STALE: errno = ETIMEDOUT; return(-1); /*NOTREACHED*/ break; case RPP_CLOSE_PEND: errno = EPIPE; return(-1); /*NOTREACHED*/ break; case RPP_OPEN_PEND: /* shouldn't happen */ case RPP_DEAD: case RPP_FREE: case RPP_CLOSE_WAIT1: /* stream closed */ case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: errno = ENOTCONN; return(-1); /*NOTREACHED*/ break; default: /* NO-OP */ break; } residual = 0; while (residual < len) { hold = sp->pend_attempt % RPP_PKT_DATA; if (((pp = sp->pend_tail) == NULL) || (hold == 0)) { pp = (struct pending *)malloc(sizeof(struct pending)); if (sp->pend_tail == NULL) sp->pend_head = pp; else sp->pend_tail->next = pp; sp->pend_tail = pp; pp->data = (u_char *)malloc(RPP_PKT_SIZE); assert(pp->data != NULL); pp->next = NULL; } more = MIN(len - residual, RPP_PKT_DATA - hold); memcpy(&pp->data[hold], (char *)buf + residual, more); residual += more; sp->pend_attempt += more; } if (rpp_recv_all() == -1) { return(-1); } rpp_send_out(); return(residual); } /* END rpp_write() */ /* ** Check a stream to see if it needs attention. */ static int rpp_attention( int index) /* I */ { DOID("attention") int mesg, count; int seq; struct stream *sp; struct recv_packet *pp; sp = &stream_array[index]; LOCAL_DBPRT((DBTO, "%s: stream %d in state %d addr %s\n", id, index, sp->state, netaddr(&sp->addr))) rpp_stale(sp); switch (sp->state) { case RPP_STALE: /* need to report error */ return(TRUE); /*NOTREACHED*/ break; case RPP_CLOSE_PEND: /* we haven't closed yet */ case RPP_CONNECT: /* check for message */ /* NO-OP */ break; default: return(FALSE); /*NOTREACHED*/ break; } /* END switch (sp->state) */ if ((sp->msg_cnt > 0) && (sp->recv_attempt <= sp->msg_cnt)) { return(TRUE); /* message to read */ } mesg = FALSE; count = 0; for (pp = sp->recv_head, seq = sp->recv_sequence;pp != NULL;pp = pp->next, seq++) { count += pp->len; if (pp->sequence != seq) break; if (pp->type != RPP_DATA) { /* end of message */ mesg = TRUE; break; } } if (mesg == TRUE) sp->msg_cnt = count; return(mesg); } /* END rpp_attention() */ /* ** Check some state before reading or skipping. If it is ** okay to continue, return 1. Otherwise, return <= 0. */ static int rpp_okay( int index) /* I */ { struct stream *sp; fd_set fdset; struct timeval tv; FD_ZERO(&fdset); while (rpp_attention(index) == FALSE) { int i; tv.tv_sec = RPPTimeOut; tv.tv_usec = 0; for (i = 0;i < rpp_fd_num;i++) FD_SET(rpp_fd_array[i], &fdset); i = select(FD_SETSIZE, &fdset, NULL, NULL, &tv); if ((i == -1) || (i == 0)) { /* if tiemout or select failure, return FAILURE (CRI) */ return(-1); } if (rpp_recv_all() == -1) { return(-1); } rpp_send_out(); } sp = &stream_array[index]; if (sp->state == RPP_STALE) { /* stale output */ errno = ETIMEDOUT; return(-1); } if (sp->recv_attempt == sp->msg_cnt) { /* end of message */ if (sp->state == RPP_CLOSE_PEND) { return(-2); } /* SUCCESS */ return(0); } /* okay to return */ return(1); } /* END rpp_okay() */ /* ** Read a message. Return data up to the end of a message ** or the end of the provided buffer. ** Return -1 on error, -2 if other side has closed, otherwise ** number of bytes read. */ int rpp_read( int index, /* I */ void *buf, /* O */ int len) /* I */ { DOID("read") int hiwater, cpylen, hold, ret, xlen; struct recv_packet *pp; struct stream *sp; LOCAL_DBPRT((DBTO, "%s: entered index %d\n", id, index)) errno = 0; if ((index < 0) || (index >= stream_num) || (len < 0)) { errno = EINVAL; return(-1); } if (len == 0) { /* no data to read, return SUCCESS */ return(0); } sp = &stream_array[index]; switch (sp->state) { case RPP_DEAD: case RPP_FREE: case RPP_CLOSE_WAIT1: case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: errno = ENOTCONN; return(-1); /* stream closed */ /*NOTREACHED*/ break; default: /* NO-OP */ break; } if ((ret = rpp_okay(index)) <= 0) { return(ret); } sp = &stream_array[index]; cpylen = 0; /* find packet to copy from */ for (pp = sp->recv_head;pp != NULL;pp = pp->next) { int bump = cpylen + pp->len; if (sp->recv_attempt < bump) break; cpylen = bump; } /* END for (pp) */ hiwater = 0; xlen = MIN(len, sp->msg_cnt); hold = sp->recv_attempt - cpylen; /* start point in pkt data */ while ((pp != NULL) && (xlen > hiwater)) { /* got room */ cpylen = MIN(pp->len - hold, xlen - hiwater); memcpy((char *)buf + hiwater, &pp->data[hold], cpylen); hiwater += cpylen; sp->recv_attempt += cpylen; hold = 0; pp = pp->next; } /* END while () */ return(hiwater); } /* END rpp_read() */ /* ** Commit data which has been read up to recv_attempt if flag ** is TRUE. Otherwise, set recv_attempt back to the previous ** commit point recv_commit. ** Return -1 on error, FALSE on decommit or if end-of-message has ** not been reached, TRUE if end-of-message has been reached. */ int rpp_rcommit( int index, /* I */ int flag) /* I */ { DOID("rcommit") struct stream *sp; LOCAL_DBPRT((DBTO, "%s: entered index %d\n", id, index)) if ((index < 0) || (index >= stream_num)) { errno = EINVAL; return(-1); } sp = &stream_array[index]; switch (sp->state) { case RPP_CLOSE_WAIT1: /* stream closed */ case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: case RPP_OPEN_PEND: /* shouldn't happen */ case RPP_FREE: case RPP_DEAD: errno = ENOTCONN; return(-1); /*NOTREACHED*/ break; default: /* NO-OP */ break; } if (flag == FALSE) { /* no commit */ sp->recv_attempt = sp->recv_commit; return(0); } sp->recv_commit = sp->recv_attempt; if (sp->recv_commit == sp->msg_cnt) { /* return TRUE, end of message reached */ return(1); } return(0); } /* END rpp_rcommit() */ /* ** Reset end-of-message condition on a stream. Any packets ** on the receive queue are freed. ** Return -1 on error, 0 otherwise. */ int rpp_eom( int index) { DOID("eom") struct stream *sp; struct recv_packet *pp; LOCAL_DBPRT((DBTO, "%s: entered index %d\n", id, index)) if ((index < 0) || (index >= stream_num)) { errno = EINVAL; return(-1); } sp = &stream_array[index]; switch (sp->state) { case RPP_CLOSE_WAIT1: /* stream closed */ case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: case RPP_OPEN_PEND: /* shouldn't happen */ case RPP_FREE: case RPP_DEAD: errno = ENOTCONN; return(-1); /*NOTREACHED*/ break; default: /* NO-OP */ break; } /* ** work though recv packets */ for (pp = sp->recv_head;pp != NULL;pp = sp->recv_head) { if (pp->type == RPP_GOODBYE) /* stream finished */ break; if (sp->msg_cnt < pp->len) break; sp->recv_sequence++; sp->msg_cnt -= pp->len; if (pp->data) free(pp->data); sp->recv_head = pp->next; free(pp); } if (sp->recv_head == NULL) sp->recv_tail = NULL; sp->recv_attempt = 0; sp->recv_commit = 0; return(0); } /* END rpp_eom() */ /* ** Commit data which has been written up to pend_attempt if flag ** is TRUE. Otherwise, set pend_attempt back to the previous ** commit point pend_commit. ** Return -1 on error, 0 otherwise. */ int rpp_wcommit( int index, int flag) { DOID("wcommit") struct pending *pp, *next; struct stream *sp; LOCAL_DBPRT((DBTO, "%s: entered index %d\n", id, index)) if ((index < 0) || (index >= stream_num)) { errno = EINVAL; return(-1); } sp = &stream_array[index]; switch (sp->state) { case RPP_CLOSE_PEND: errno = EPIPE; return(-1); /*NOTREACHED*/ break; case RPP_STALE: errno = ETIMEDOUT; return(-1); /*NOTREACHED*/ break; case RPP_CLOSE_WAIT1: /* stream closed */ case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: case RPP_OPEN_PEND: /* shouldn't happen */ case RPP_FREE: case RPP_DEAD: errno = ENOTCONN; return(-1); break; default: /* NO-OP */ break; } /* END switch (sp->state) */ if (flag) { /* commit */ if (rpp_dopending(index, FALSE)) { return(-1); } if (rpp_recv_all() == -1) { return(-1); } rpp_send_out(); return(0); } sp->pend_attempt = sp->pend_commit; if (sp->pend_head == NULL) { return(0); } for (pp = sp->pend_head->next;pp;pp = next) { free(pp->data); next = pp->next; free(pp); } /* for (pp) */ sp->pend_head->next = NULL; sp->pend_tail = sp->pend_head; return(0); } /* rpp_wcommit() */ /* ** Skip len characters of a message. */ int rpp_skip( int index, int len) { DOID("skip") struct stream *sp; int ret, hiwater; LOCAL_DBPRT((DBTO, "%s: entered index %d\n", id, index)) if ((index < 0) || (index >= stream_num)) { errno = EINVAL; return(-1); } sp = &stream_array[index]; switch (sp->state) { case RPP_DEAD: case RPP_FREE: case RPP_CLOSE_WAIT1: case RPP_CLOSE_WAIT2: case RPP_LAST_ACK: errno = ENOTCONN; return(-1); /* stream closed */ /*NOTREACHED*/ break; default: /* NO-OP */ break; } if ((ret = rpp_okay(index)) <= 0) { /* FAILURE - may be 0, -1, or -2 */ return(ret); } sp = &stream_array[index]; hiwater = MIN(sp->msg_cnt - sp->recv_attempt, len); sp->recv_attempt += hiwater; return(hiwater); } /* END rpp_skip() */ /* ** Check for any stream with a message waiting and ** return the stream number or a -1 if there are none. */ int rpp_poll(void) { DOID("poll") int i; LOCAL_DBPRT((DBTO, "%s: entered streams %d\n", id, stream_num)) /* ** Read socket to get any packets */ for (;;) { i = rpp_recv_all(); if ((i == -1) || (i == -3)) break; } if (i == -1) { return(-1); } /* see if any stream has a message waiting */ for (i = 0;i < stream_num;i++) { if (rpp_attention(i)) break; } if (i < stream_num) /* found one */ { return(i); } rpp_send_out(); /* unknown stream identifier */ return(-2); } /* ** Process any stream i/o. ** Return 0 or a -1 if there was an error. */ int rpp_io(void) { DOID("io") int i; LOCAL_DBPRT((DBTO, "%s: entered streams %d\n", id, stream_num)) /* ** Read socket to get any packets */ for (;;) { i = rpp_recv_all(); if (i == -1 || i == -3) break; } if (i == -1) { return(-1); } rpp_send_out(); return(0); } /* END rpp_io() */ /* ** Read a character. ** Returns >=0 the char read ** -1 error or EOD ** -2 EOF */ int rpp_getc( int index) { int ret; u_char c; if ((ret = rpp_read(index, &c, 1)) == 1) { /* SUCCESS */ return((int)c); } if (ret == -2) { /* FAILURE: type is ??? */ return(-2); } /* FAILURE */ return(-1); } /* END rpp_getc() */ /* ** Write a character. */ int rpp_putc( int index, int c) { u_char x = (u_char)c; if (rpp_write(index, &x, 1) != 1) { return(-1); } return(0); } int RPPConfigure( int SRPPTimeOut, /* I */ int SRPPRetry) /* I */ { if (SRPPTimeOut > 0) RPPTimeOut = SRPPTimeOut; if (SRPPRetry > 1) /* always need an "extra" retry to invalidate existing conns */ RPPRetry = SRPPRetry; return(0); } int RPPReset(void) { RPPTimeOut = DEFAULT_RPP_TIMEOUT; RPPRetry = DEFAULT_RPP_RETRY; return(0); } int rpp_get_stream_state(int index) { struct stream *sp; sp = &stream_array[index]; if(sp) { return(sp->state); } return(-1); } /* END rpp.c */