"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "src/tcp.c" between
NetPIPE_4.x.tar.gz and NetPIPE-3.7.2.tar.gz

About: NetPIPE - a Network Protocol Independent Performance Evaluator

tcp.c  (NetPIPE_4.x):tcp.c  (NetPIPE-3.7.2)
/*****************************************************************************/ /*****************************************************************************/
/* "NetPIPE" -- Network Protocol Independent Performance Evaluator. */ /* "NetPIPE" -- Network Protocol Independent Performance Evaluator. */
/* Copyright 1997, 1998 Iowa State University Research Foundation, Inc. */ /* Copyright 1997, 1998 Iowa State University Research Foundation, Inc. */
/* */ /* */
/* This program is free software; you can redistribute it and/or modify */ /* This program is free software; you can redistribute it and/or modify */
/* it under the terms of the GNU General Public License as published by */ /* it under the terms of the GNU General Public License as published by */
/* the Free Software Foundation. You should have received a copy of the */ /* the Free Software Foundation. You should have received a copy of the */
/* GNU General Public License along with this program; if not, write to the */ /* GNU General Public License along with this program; if not, write to the */
/* Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ /* Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
/* */ /* */
/* * tcp.c ---- TCP calls source */
/* * tcp.h ---- Include file for TCP calls and data structs */
/*****************************************************************************/ /*****************************************************************************/
#include "netpipe.h" #include "netpipe.h"
#include <fcntl.h>
#if defined (MPLITE) #if defined (MPLITE)
#include "mplite.h" #include "mplite.h"
#endif #endif
int doing_reset = 0; int doing_reset = 0;
void Init(ArgStruct *p, int* pargc, char*** pargv) void Init(ArgStruct *p, int* pargc, char*** pargv)
{ {
p->reset_conn = 0; /* Default to not resetting connection */ p->reset_conn = 0; /* Default to not resetting connection */
p->prot.sndbufsz = p->prot.rcvbufsz = 0; p->prot.sndbufsz = p->prot.rcvbufsz = 0;
p->tr = 0; /* The transmitter will be set using the -h host flag. */ p->tr = 0; /* The transmitter will be set using the -h host flag. */
p->rcv = 1; p->rcv = 1;
} }
void Setup(ArgStruct *p) void Setup(ArgStruct *p)
{ {
int i, j, bound; int one = 1;
FILE *fd; int sockfd;
struct sockaddr_in *lsdout; /* ptr to sockaddr_in in ArgStruct */ struct sockaddr_in *lsin1, *lsin2; /* ptr to sockaddr_in in ArgStruct */
char *myhostname, *otherhostname, *dot; char *host;
struct hostent *addr;
/* allocate space for the list of hosts and their port numbers */ struct protoent *proto;
int send_size, recv_size, sizeofint = sizeof(int);
p->host = (char **) malloc( p->nprocs * sizeof(char *) ); int socket_family = AF_INET;
for( i=0; i<p->nprocs; i++ )
p->host[i] = (char *) malloc( 100 * sizeof(char) ); host = p->host; /* copy ptr to hostname */
p->port = (int *) malloc( p->nprocs * sizeof( int ) ); if (p->use_sdp){
printf("Using AF_INET_SDP (27) socket family\n");
/* Read the hostfile */ socket_family = 27;
}
if( p->hostfile == NULL ) { /* No hostfile */
lsin1 = &(p->prot.sin1);
fprintf(stderr, "You must use <-H hostfile> to specify a hostfile\n"); lsin2 = &(p->prot.sin2);
exit(0);
bzero((char *) lsin1, sizeof(*lsin1));
bzero((char *) lsin2, sizeof(*lsin2));
if ( (sockfd = socket(socket_family, SOCK_STREAM, 0)) < 0){
printf("NetPIPE: can't open stream socket! errno=%d\n", errno);
exit(-4);
}
if(!(proto = getprotobyname("tcp"))){
printf("NetPIPE: protocol 'tcp' unknown!\n");
exit(555);
}
/* Attempt to set TCP_NODELAY */
if(setsockopt(sockfd, proto->p_proto, TCP_NODELAY, &one, sizeof(one)) < 0)
{
printf("NetPIPE: setsockopt: TCP_NODELAY failed! errno=%d\n", errno);
exit(556);
}
/* If requested, set the send and receive buffer sizes */
if(p->prot.sndbufsz > 0)
{
if(setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &(p->prot.sndbufsz),
sizeof(p->prot.sndbufsz)) < 0)
{
printf("NetPIPE: setsockopt: SO_SNDBUF failed! errno=%d\n", errno);
printf("You may have asked for a buffer larger than the system can han
dle\n");
exit(556);
}
if(setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &(p->prot.rcvbufsz),
sizeof(p->prot.rcvbufsz)) < 0)
{
printf("NetPIPE: setsockopt: SO_RCVBUF failed! errno=%d\n", errno);
printf("You may have asked for a buffer larger than the system can han
dle\n");
exit(556);
}
}
getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF,
(char *) &send_size, (void *) &sizeofint);
getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF,
(char *) &recv_size, (void *) &sizeofint);
if(!doing_reset) {
fprintf(stderr,"Send and receive buffers are %d and %d bytes\n",
send_size, recv_size);
fprintf(stderr, "(A bug in Linux doubles the requested buffer sizes)\n");
}
if( p->tr ) { /* Primary transmitter */
if (atoi(host) > 0) { /* Numerical IP address */
lsin1->sin_family = AF_INET;
lsin1->sin_addr.s_addr = inet_addr(host);
} else { } else {
if( (fd = fopen( p->hostfile, "r")) == NULL ) { if ((addr = gethostbyname(host)) == NULL){
fprintf(stderr, "%d Could not open the hostfile |%s| (errno=%d)\n", printf("NetPIPE: invalid hostname '%s'\n", host);
p->myproc, p->hostfile, errno ); exit(-5);
exit(0); }
}
for( i=0; i<p->nprocs; i++ ) {
fscanf( fd, "%s", p->host[i] );
/* Set the port uniquely for each process on an SMP box.
* For SMP procs, myproc must be set on input using a comma
* after the hostfilename <-H hostfile,proc#>. The nplaunch
* script does this automatically. */
p->port[i] = DEFPORT;
for( j=0; j<i; j++ )
if( ! strcmp( p->host[j], p->host[i] ) ) p->port[i]++;
if( p->port[i] > MAXPORT ) {
fprintf(stderr, "%d NetPIPE: Port for node %d is above MAXPORT=%d\n"
,
p->myproc, i, MAXPORT);
fprintf(stderr, " Change MAXPORT in src/netpipe.h and recompile\n
");
exit(0);
}
if( p->myproc < 0 ) { /* myproc not set yet, hope this is not SMP
*/
myhostname = (char *) malloc( 100 );
gethostname( myhostname, 100);
if( (dot=strstr( myhostname, "." )) != NULL ) *dot = '\0';
otherhostname = (char *) malloc( 100 );
strcpy( otherhostname, p->host[i] );
if( (dot=strstr( otherhostname, "." )) != NULL ) *dot = '\0';
if( ! strcmp( myhostname, otherhostname ) ) p->myproc = i;
if( p->port[i] != DEFPORT ) {
fprintf(stderr, "%d NetPIPE: The process number must be specified
for SMP nodes\n", p->myproc);
fprintf(stderr, " using <-H hostfile,proc#>. nplaunch
does this automatically\n");
exit(0);
}
}
}
}
if( p->myproc == 0 ) p->master = 1;
if( p->myproc < 0 || p-> myproc >= p->nprocs ) {
fprintf(stderr, "NetPIPE: myproc=%d was not set properly\n", p->myproc);
exit(0);
}
/* Set the first half to be transmitters */
p->tr = p->rcv = 0;
if( p->myproc < p->nprocs/2 ) p->tr = 1;
else p->rcv = 1;
p->dest = p->source = p->nprocs - 1 - p->myproc;
/* Now initialize the socket structures */
p->comm_sd = malloc( p->nprocs * sizeof( int ) );
for( i=0; i<p->nprocs; i++ ) p->comm_sd[i] = 0;
/* Create the listen socket, for any interface on my port */
lsdout = &(p->prot.sdout);
bzero((char *) lsdout, sizeof(*lsdout));
lsdout->sin_family = AF_INET;
lsdout->sin_addr.s_addr = htonl(INADDR_ANY);
lsdout->sin_port = htons(p->port[p->myproc]);
p->serv_sd = create_socket( p ); /* This will be the listening socket */
bound = bind(p->serv_sd, (struct sockaddr *) lsdout, sizeof(*lsdout));
if( bound != 0 ) {
fprintf(stderr,"%d NetPIPE: bind on local address failed! errno=%d\n",
p->myproc, errno);
exit(0);
}
listen(p->serv_sd, 5); /* Set the socket to listen for incoming connections *
/
establish(p); /* Establish connections */
}
/* Create a generic socket */
int create_socket(ArgStruct *p)
{
int sd, one=1, sizeofint=sizeof(int), send_size, recv_size;
static int print_once = 1;
struct protoent *proto;
if ( (sd = socket(AF_INET, SOCK_STREAM, 0)) < 0 ){ lsin1->sin_family = addr->h_addrtype;
fprintf(stderr,"NetPIPE: can't open the stream socket! errno=%d\n", errno) bcopy(addr->h_addr, (char*) &(lsin1->sin_addr.s_addr), addr->h_length);
;
exit(0);
} }
if( ! (proto=getprotobyname("tcp")) ){ lsin1->sin_port = htons(p->port);
fprintf(stderr,"NetPIPE: protocol 'tcp' unknown!\n");
exit(0);
}
/* Attempt to set TCP_NODELAY */ p->commfd = sockfd;
if( setsockopt(sd, proto->p_proto, TCP_NODELAY, (const void *) &one, sizeof(o } else if( p->rcv ) { /* we are the receiver */
ne)) < 0) {
fprintf(stderr,"NetPIPE: setsockopt: TCP_NODELAY failed! errno=%d\n", errn
o);
exit(0);
}
if(setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (const void *) &one, sizeof(one)) if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int))) {
< 0) { printf("NetPIPE: server: unable to setsockopt -- errno %d\n", errno);
printf("NetPIPE: setsockopt: SO_REUSEADDR failed! errno=%d\n", errno); exit(557);
exit(0);
} }
/* If requested, set the send and receive buffer sizes */ bzero((char *) lsin1, sizeof(*lsin1));
lsin1->sin_family = AF_INET;
if(p->prot.sndbufsz > 0) { lsin1->sin_addr.s_addr = htonl(INADDR_ANY);
lsin1->sin_port = htons(p->port);
if(setsockopt(sd, SOL_SOCKET, SO_SNDBUF,
(const void *) &(p->prot.sndbufsz), sizeof(p->prot.sndbufsz)) < 0) {
fprintf(stderr,"NetPIPE: setsockopt: SO_SNDBUF failed! errno=%d\n", err
no);
fprintf(stderr,"You may have asked for a buffer larger than the system
can handle\n");
exit(0);
}
if(setsockopt(sd, SOL_SOCKET, SO_RCVBUF,
(const void *) &(p->prot.rcvbufsz), sizeof(p->prot.rcvbufsz)) < 0) {
fprintf(stderr,"NetPIPE: setsockopt: SO_RCVBUF failed! errno=%d\n", err if (bind(sockfd, (struct sockaddr *) lsin1, sizeof(*lsin1)) < 0){
no); printf("NetPIPE: server: bind on local address failed! errno=%d\n", errno);
fprintf(stderr,"You may have asked for a buffer larger than the system exit(-6);
can handle\n");
exit(0);
}
} }
getsockopt(sd, SOL_SOCKET, SO_SNDBUF, p->servicefd = sockfd;
(char *) &send_size, (void *) &sizeofint); }
getsockopt(sd, SOL_SOCKET, SO_RCVBUF, p->upper = send_size + recv_size;
(char *) &recv_size, (void *) &sizeofint);
if(!doing_reset && p->master && print_once) {
fprintf(stderr,"Send and receive buffers are %d and %d bytes\n",
send_size, recv_size);
fprintf(stderr, "(A bug in Linux doubles the requested buffer sizes)\n");
print_once = 0;
}
p->upper = send_size + recv_size; establish(p); /* Establish connections */
return sd;
} }
/* Create a socket and connect it to the node specified */ static int
readFully(int fd, void *obuf, int len)
static char buf[] = " ";
void connect_to( ArgStruct *p, int node )
{ {
int one = 1; int bytesLeft = len;
socklen_t clen; char *buf = (char *) obuf;
struct protoent *proto; int bytesRead = 0;
struct sockaddr_in *lsdin; /* ptr to sockaddr_in in ArgStruct */
struct hostent *addr;
double When(), t0 = When();
/* Set the socket info for initiating connections */
lsdin = &(p->prot.sdin);
bzero((char *) lsdin, sizeof(*lsdin));
if (atoi(p->host[node]) > 0) { /* Numerical IP address */
lsdin->sin_family = AF_INET;
lsdin->sin_addr.s_addr = inet_addr(p->host[node]);
} else {
if ((addr = gethostbyname(p->host[node])) == NULL){
fprintf(stderr,"%d NetPIPE: invalid hostname '%s'\n",
p->myproc, p->host[node]);
exit(0);
}
lsdin->sin_family = addr->h_addrtype;
bcopy(addr->h_addr, (char*) &(lsdin->sin_addr.s_addr), addr->h_length);
}
lsdin->sin_port = htons(p->port[node]);
clen = (socklen_t) sizeof(p->prot.sdin);
p->comm_sd[node] = create_socket( p );
while( connect(p->comm_sd[node], (struct sockaddr *) &(p->prot.sdin), while (bytesLeft > 0 &&
sizeof(p->prot.sdin)) < 0 ) { (bytesRead = read(fd, (void *) buf, bytesLeft)) > 0)
{
/* If we are doing a reset and we get a connection refused from bytesLeft -= bytesRead;
* the connect() call, assume that the other node has not yet buf += bytesRead;
* gotten to its corresponding accept() call and keep trying until }
* we have success. if (bytesRead <= 0) return bytesRead;
*/ return len;
if( (!doing_reset || errno != ECONNREFUSED) && When()-t0 > 20.0 ) {
fprintf(stderr,"%d NetPIPE: Cannot connect for 20 seconds! errno=%d\n",
p->myproc, errno);
exit(0);
}
sleep(1);
}
/* Do a read here to make sure the socket is completely connected. UNIX
* socket implementations can potentially queue up socket connections on the
* accepting end, even if the process there has not yet called accept().
* Thus, we can get into some nasty race conditions if we're connecting a
* bunch of sockets on different nodes. For example, running NetPIPE with 4
* nodes, it's possible for some socket descriptors to get mixed up in the
* establish function. */
readFully(p, node, buf, strlen(buf));
if(strcmp(buf, "SyncMe")!=0) {
fprintf(stderr, "Error reading sync buffer in connect_to()\n");
exit(-1);
}
} }
/* Accept a connect from node on the listen socket */ void Sync(ArgStruct *p)
void accept_from( ArgStruct *p, int node )
{ {
int one = 1; char s[] = "SyncMe", response[] = " ";
socklen_t clen;
struct protoent *proto;
char buf[] = "SyncMe";
/*fprintf(stderr,"%d accept_from( %d %s )\n", p->myproc, node, p->host[node]);*/
clen = (socklen_t) sizeof(p->prot.sdout);
p->comm_sd[node] = accept(p->serv_sd, (struct sockaddr *) &(p->prot.sdout), &
clen);
/*fprintf(stderr,"%d after accept()\n", p->myproc);*/
if(p->comm_sd[node] < 0){
fprintf(stderr,"%d NetPIPE: Accept Failed! errno=%d\n",p->myproc,errno);
exit(0);
}
/* Attempt to set TCP_NODELAY. TCP_NODELAY may or may not be propagated
* to accepted sockets. */
if( ! (proto = getprotobyname("tcp")) ){
fprintf(stderr,"%d NetPIPE: unknown protocol!\n", p->myproc);
exit(0);
}
if(setsockopt(p->comm_sd[node], proto->p_proto, TCP_NODELAY,
(const void *) &one, sizeof(one)) < 0) {
fprintf(stderr,"setsockopt: TCP_NODELAY failed! errno=%d\n", errno);
exit(0);
}
if(setsockopt(p->comm_sd[node], SOL_SOCKET, SO_REUSEADDR, (const void *) &one if (write(p->commfd, s, strlen(s)) < 0 || /* Write to nbor */
, sizeof(one)) < 0) { readFully(p->commfd, response, strlen(s)) < 0) /* Read from nbor */
printf("NetPIPE: setsockopt: SO_REUSEADDR failed! errno=%d\n", errno); {
exit(0); perror("NetPIPE: error writing or reading synchronization string");
} exit(3);
}
/* If requested, set the send and receive buffer sizes */ if (strncmp(s, response, strlen(s)))
{
if(p->prot.sndbufsz > 0) { fprintf(stderr, "NetPIPE: Synchronization string incorrect! |%s|\n", res
ponse);
if(setsockopt(p->comm_sd[node], SOL_SOCKET, SO_SNDBUF, exit(3);
(const void *) &(p->prot.sndbufsz), sizeof(p->prot.sndbufsz)) < 0) {
fprintf(stderr,"setsockopt: SO_SNDBUF failed! errno=%d\n", errno);
exit(0);
}
if(setsockopt(p->comm_sd[node], SOL_SOCKET, SO_RCVBUF,
(const void *) &(p->prot.rcvbufsz), sizeof(p->prot.rcvbufsz)) < 0) {
fprintf(stderr,"setsockopt: SO_RCVBUF failed! errno=%d\n", errno);
exit(0);
} }
}
writeFully(p, node, buf, strlen(buf));
} }
void establish(ArgStruct *p) void PrepareToReceive(ArgStruct *p)
{ {
int i, r; /*
long socket_flags; The Berkeley sockets interface doesn't have a method to pre-post
a buffer for reception of data.
/* First connect to my communication partner */ */
if( p->tr ){
connect_to( p, p->dest );
} else {
accept_from( p, p->source );
}
/* Now connect all to the master proc 0 */
if( p->master ) {
for( i=1; i<p->nprocs; i++ ) {
if( i != p->dest ) connect_to( p, i );
}
} else if( p->dest != 0 ) {
accept_from( p, 0 );
}
/* Make sockets non-blocking for CPU workload measurements */
if( p->workload ) {
socket_flags = fcntl(p->comm_sd[p->dest], F_GETFL, 0);
#if defined (FNONBLK)
socket_flags = socket_flags + FNONBLK;
#elif defined (FNONBLOCK)
socket_flags = socket_flags + FNONBLOCK;
#else
socket_flags = socket_flags + O_NONBLOCK;
#endif
r = fcntl(p->comm_sd[p->dest], F_SETFL, socket_flags);
}
} }
/* Read once from the socket (used only for TestForCompletion) */ void SendData(ArgStruct *p)
int readOnce(ArgStruct *p)
{ {
int bytesRead, errno; int bytesWritten, bytesLeft;
char *q;
bytesRead = read(p->comm_sd[p->source], (void *) p->r_ptr, p->bytesLeft); bytesLeft = p->bufflen;
bytesWritten = 0;
/*fprintf(stderr,"%d readOnce %d of %d bytes\n", p->myproc, bytesRead, p->buffle q = p->s_ptr;
n);*/ while (bytesLeft > 0 &&
(bytesWritten = write(p->commfd, q, bytesLeft)) > 0)
if( bytesRead < 0 && errno == EWOULDBLOCK ) bytesRead = 0; {
bytesLeft -= bytesWritten;
if( bytesRead < 0 ) { q += bytesWritten;
fprintf(stderr,"%d NetPIPE: ReadOnce() error, read %d of %d (errno=%d)\n", }
p->myproc, p->bufflen - p->bytesLeft, p->bufflen, errno); if (bytesWritten == -1)
exit(0); {
} printf("NetPIPE: write: error encountered, errno=%d\n", errno);
exit(401);
p->bytesLeft -= bytesRead; }
p->r_ptr += bytesRead;
return bytesRead;
} }
/* Read p->bufflen from the socket until complete */ void RecvData(ArgStruct *p)
int readFully( ArgStruct *p, int node, void *buf, int nbytes )
{ {
int bytesLeft = nbytes; int bytesLeft;
char *ptr = (char *) buf; int bytesRead;
int bytesRead, errno; char *q;
while (bytesLeft > 0 ) {
bytesRead = read(p->comm_sd[node], (void *) ptr, bytesLeft); bytesLeft = p->bufflen;
bytesRead = 0;
if( bytesRead < 0 && errno == EWOULDBLOCK ) bytesRead = 0; q = p->r_ptr;
while (bytesLeft > 0 &&
if( bytesRead < 0 ) { (bytesRead = read(p->commfd, q, bytesLeft)) > 0)
fprintf(stderr,"%d NetPIPE: ReadFully() error, read %d of %d (errno=%d) {
\n", bytesLeft -= bytesRead;
p->myproc, nbytes - bytesLeft, nbytes, errno); q += bytesRead;
exit(0); }
if (bytesLeft > 0 && bytesRead == 0)
{
printf("NetPIPE: \"end of file\" encountered on reading from socket\n");
}
else if (bytesRead == -1)
{
printf("NetPIPE: read: error encountered, errno=%d\n", errno);
exit(401);
} }
bytesLeft -= bytesRead;
ptr += bytesRead;
}
/*fprintf(stderr,"%d readFully %d bytes\n", p->myproc, nbytes);*/
return nbytes;
} }
/* Write nbytes to the socket until complete */ /* uint32_t is used to insure that the integer size is the same even in tests
* between 64-bit and 32-bit architectures. */
int writeFully(ArgStruct *p, int node, void *buf, int nbytes) void SendTime(ArgStruct *p, double *t)
{ {
int bytesLeft = nbytes; uint32_t ltime, ntime;
char *ptr = (char *) buf;
int bytesSent, errno;
/*fprintf(stderr,"%d writing %d to %d\n", p->myproc, nbytes, node);*/
while (bytesLeft > 0 ) {
bytesSent = write(p->comm_sd[node], (void *) ptr, bytesLeft);
/*fprintf(stderr,"%d wrote %d of %d to %d\n", p->myproc, bytesSent, nbytes, node
);*/
if( bytesSent < 0 && errno == EWOULDBLOCK ) bytesSent = 0; /*
Multiply the number of seconds by 1e8 to get time in 0.01 microseconds
and convert value to an unsigned 32-bit integer.
*/
ltime = (uint32_t)(*t * 1.e8);
if(bytesSent < 0) { /* Send time in network order */
fprintf(stderr,"%d NetPIPE: write to %d error, errno=%d\n", ntime = htonl(ltime);
p->myproc, node, errno); if (write(p->commfd, (char *)&ntime, sizeof(uint32_t)) < 0)
exit(0); {
printf("NetPIPE: write failed in SendTime: errno=%d\n", errno);
exit(301);
} }
bytesLeft -= bytesSent;
ptr += bytesSent;
}
return nbytes;
} }
/* Global sync with the master node, then local sync with your comm pair */ void RecvTime(ArgStruct *p, double *t)
static char s[] = "SyncMe", response[] = " ";
void Sync(ArgStruct *p)
{ {
int i; uint32_t ltime, ntime;
int bytesRead;
/* First do a global synchronization with the master proc 0 bytesRead = readFully(p->commfd, (void *)&ntime, sizeof(uint32_t));
* if there are multiple pairs. */ if (bytesRead < 0)
{
if( p->master && p->nprocs > 2 ) { printf("NetPIPE: read failed in RecvTime: errno=%d\n", errno);
exit(302);
/* Read from each proc first */
for( i=1; i<p->nprocs; i++ ) {
strcpy( response, " " );
readFully(p, i, response, strlen(s)); /* Read from nbor */
if( strncmp(s, response, strlen(s)) ) {
fprintf(stderr,"NetPIPE: error reading synchronization string");
exit(0);
}
} }
else if (bytesRead != sizeof(uint32_t))
/* Write to each proc */ {
fprintf(stderr, "NetPIPE: partial read in RecvTime of %d bytes\n",
for( i=1; i<p->nprocs; i++ ) { bytesRead);
exit(303);
writeFully(p, i, s, strlen(s));
}
} else if( p->nprocs > 2 ) {
writeFully(p, 0, s, strlen(s));
readFully( p, 0, response, strlen(s));
if( strncmp(s, response, strlen(s)) ) {
fprintf(stderr,"%d NetPIPE: error writing or reading sync string\n", p-
>myproc);
exit(0);
} }
} ltime = ntohl(ntime);
/* Now do a local sync with your comm pair. */
strcpy( response, " " );
writeFully(p, p->dest, s, strlen(s)); /* Result is ltime (in microseconds) divided by 1.0e8 to get seconds */
readFully( p, p->dest, response, strlen(s));
if( strncmp(s, response, strlen(s)) ) { *t = (double)ltime / 1.0e8;
fprintf(stderr,"%d NetPIPE: error writing or reading sync string\n", p->my
proc);
exit(0);
}
} }
void PrepareToReceive(ArgStruct *p) void SendRepeat(ArgStruct *p, int rpt)
{ {
/* uint32_t lrpt, nrpt;
The Berkeley sockets interface doesn't have a method to pre-post
a buffer for reception of data.
*/
}
/* Send the data segment to the proc I am paired with */ lrpt = rpt;
/* Send repeat count as a long in network order */
void SendData(ArgStruct *p) nrpt = htonl(lrpt);
{ if (write(p->commfd, (void *) &nrpt, sizeof(uint32_t)) < 0)
writeFully(p, p->dest, p->s_ptr, p->bufflen); {
printf("NetPIPE: write failed in SendRepeat: errno=%d\n", errno);
exit(304);
}
} }
/* This function reads the socket buffers once then tests for message void RecvRepeat(ArgStruct *p, int *rpt)
* completion. It may need to be adapted to read multiple times until
* 0 bytes is read.
*/
int TestForCompletion( ArgStruct *p )
{ {
readOnce( p ); uint32_t lrpt, nrpt;
int bytesRead;
if( p->bytesLeft == 0 ) return 1; /* The message is complete */ bytesRead = readFully(p->commfd, (void *)&nrpt, sizeof(uint32_t));
else return 0; /* The message is incomplete */ if (bytesRead < 0)
} {
printf("NetPIPE: read failed in RecvRepeat: errno=%d\n", errno);
exit(305);
}
else if (bytesRead != sizeof(uint32_t))
{
fprintf(stderr, "NetPIPE: partial read in RecvRepeat of %d bytes\n",
bytesRead);
exit(306);
}
lrpt = ntohl(nrpt);
void RecvData(ArgStruct *p) *rpt = lrpt;
{
if( p->bytesLeft > 0 ) /* protect against re-read in CPU workload measureme
nts */
readFully( p, p->source, p->r_ptr, p->bufflen);
else
p->r_ptr = p->r_ptr_saved;
} }
/* Generic Gather routine (used to gather times) */ void establish(ArgStruct *p)
void Gather( ArgStruct *p, double *buf)
{
int proc;
if( p->master ) { /* Gather the data from the other procs */
for( proc=1; proc<p->nprocs; proc++ ) {
buf++;
readFully(p, proc, buf, sizeof(double));
}
} else { /* Write my section to the master proc 0 */
buf++;
writeFully(p, 0, buf, sizeof(double));
}
}
/* Broadcast from master 0 to all other procs (used for nrepeat) */
void Broadcast(ArgStruct *p, unsigned int *buf)
{ {
int proc; int one = 1;
socklen_t clen;
struct protoent *proto;
if( p->master ) { clen = (socklen_t) sizeof(p->prot.sin2);
for( proc=1; proc<p->nprocs; proc++ ) { if( p->tr ){
writeFully(p, proc, buf, sizeof(int)); while( connect(p->commfd, (struct sockaddr *) &(p->prot.sin1),
sizeof(p->prot.sin1)) < 0 ) {
/* If we are doing a reset and we get a connection refused from
* the connect() call, assume that the other node has not yet
* gotten to its corresponding accept() call and keep trying until
* we have success.
*/
if(!doing_reset || errno != ECONNREFUSED) {
printf("Client: Cannot Connect! errno=%d\n",errno);
exit(-10);
}
}
} else if( p->rcv ) {
/* SERVER */
listen(p->servicefd, 5);
p->commfd = accept(p->servicefd, (struct sockaddr *) &(p->prot.sin2), &clen)
;
if(p->commfd < 0){
printf("Server: Accept Failed! errno=%d\n",errno);
exit(-12);
}
/*
Attempt to set TCP_NODELAY. TCP_NODELAY may or may not be propagated
to accepted sockets.
*/
if(!(proto = getprotobyname("tcp"))){
printf("unknown protocol!\n");
exit(555);
}
if(setsockopt(p->commfd, proto->p_proto, TCP_NODELAY,
&one, sizeof(one)) < 0)
{
printf("setsockopt: TCP_NODELAY failed! errno=%d\n", errno);
exit(556);
}
/* If requested, set the send and receive buffer sizes */
if(p->prot.sndbufsz > 0)
{
/* printf("Send and Receive Buffers on accepted socket set to %d bytes\n",*
/
/* p->prot.sndbufsz);*/
if(setsockopt(p->commfd, SOL_SOCKET, SO_SNDBUF, &(p->prot.sndbufsz),
sizeof(p->prot.sndbufsz)) < 0)
{
printf("setsockopt: SO_SNDBUF failed! errno=%d\n", errno);
exit(556);
}
if(setsockopt(p->commfd, SOL_SOCKET, SO_RCVBUF, &(p->prot.rcvbufsz),
sizeof(p->prot.rcvbufsz)) < 0)
{
printf("setsockopt: SO_RCVBUF failed! errno=%d\n", errno);
exit(556);
} }
}
} else { }
readFully(p, 0, buf, sizeof(int));
}
} }
void CleanUp(ArgStruct *p) void CleanUp(ArgStruct *p)
{ {
int proc; char quit[] = "QUIT";
Sync( p ); if (p->tr) {
for( proc=0; proc<p->nprocs; proc++) { write(p->commfd,quit, 5);
read(p->commfd, quit, 5);
close(p->commfd);
if( p->comm_sd[proc] != 0 ) close(p->comm_sd[proc]); } else if( p->rcv ) {
} read(p->commfd,quit, 5);
write(p->commfd,quit,5);
close(p->commfd);
close(p->servicefd);
close(p->serv_sd); }
} }
void Reset(ArgStruct *p) void Reset(ArgStruct *p)
{ {
/* Reset sockets */ /* Reset sockets */
if(p->reset_conn) { if(p->reset_conn) {
doing_reset = 1; doing_reset = 1;
/* Close the sockets */ /* Close the sockets */
CleanUp(p); CleanUp(p);
/* Now open and connect new sockets */ /* Now open and connect new sockets */
Setup(p); Setup(p);
} }
}
void AfterAlignmentInit(ArgStruct *p)
{
} }
 End of changes. 67 change blocks. 
552 lines changed or deleted 316 lines changed or added

Home  |  About  |  All  |  Newest  |  Fossies Dox  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTPS