"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "src/gm.c" between
NetPIPE_4.x.tar.gz and NetPIPE-3.7.2.tar.gz

About: NetPIPE - a Network Protocol Independent Performance Evaluator

gm.c  (NetPIPE_4.x):gm.c  (NetPIPE-3.7.2)
/* This is the GM module for NetPIPE
* Originally written by Xuehua Chen (I think)
* Modified for multiple pairwise interactions by Dave Turner
*/
#include "netpipe.h" #include "netpipe.h"
struct gm_port *gm_p; extern struct gm_port *gm_p;
int sizeof_workspace; extern unsigned long *ltime, *lrpt;
char *workspace; extern char *sync, *sync1;
int *r, *s; /* incoming and outgoing node numbers */
int stokens;
void Init(ArgStruct *p, int* pargc, char*** pargv) void Init(ArgStruct *p, int* pargc, char*** pargv)
{ {
p->tr = 0;
p->rcv = 1;
} }
void Setup(ArgStruct *p) void Setup(ArgStruct *p)
{ {
FILE *fd; if(gm_open(&gm_p,0,5,"port2",(enum gm_api_version) GM_API_VERSION) != GM_SUCCE
int i; SS)
char *myhostname, *short_hostname, *dot; {
printf(" Couldn't open board 0 port 2\n");
/* Open port 2 on the Myrinet card */ exit(-1);
}
if( gm_open( &gm_p, 0, 5, "port2", (enum gm_api_version) GM_API_VERSION ) else
!= GM_SUCCESS) { printf("Opened board 0 port2\n");
printf(" Couldn't open board 0 port 2\n");
exit(-1);
} else {
/* printf("Opened board 0 port2\n");*/
}
/* Open the hostfile, read in the host names and get the host_ids */
myhostname = (char *) malloc( 100 );
gethostname( myhostname, 100);
if( (dot=strstr( myhostname, "." )) != NULL ) *dot = '\0';
if( (fd = fopen( p->hostfile, "r")) == NULL ) {
fprintf(stderr, "%d Could not open the hostfile %s, (errno=%d)\n",
p->myproc, p->hostfile, errno );
exit(0);
}
/* allocate space for the list of hosts and their host id's */
p->prot.host_id = (short int *) malloc( p->nprocs * sizeof(short int) );
p->host = (char **) malloc( p->nprocs * sizeof(char *) );
for( i=0; i<p->nprocs; i++ )
p->host[i] = (char *) malloc( 100 * sizeof(char) );
for( i=0; i<p->nprocs; i++ ) {
fscanf( fd, "%s", p->host[i] );
if( (dot=strstr( p->host[i], "." )) != NULL ) *dot = '\0';
p->prot.host_id[i] = gm_host_name_to_node_id(gm_p, p->host[i]);
if( p->prot.host_id[i] == GM_NO_SUCH_NODE_ID ) {
fprintf(stderr,"GM ERROR: %d failed to connect to host %s\n",
p->myproc, p->host[i]);
exit(0);
}
/* Set myproc if this is my host and it was not set by nplaunch */
if( p->myproc < 0 && !strcmp(p->host[i],myhostname) ) p->myproc = i;
}
/* The first half will pair up with the second half in overlapping fashion
*/
p->tr = p->rcv = 0;
if( p->myproc < p->nprocs/2 ) p->tr = 1;
else p->rcv = 1;
p->dest = p->source = p->nprocs - 1 - p->myproc;
if( p->myproc == 0 ) p->master = 1;
gm_free_send_tokens(gm_p, GM_LOW_PRIORITY, gm_num_send_tokens(gm_p)/2);
gm_free_send_tokens(gm_p, GM_HIGH_PRIORITY, gm_num_send_tokens(gm_p)/2);
/* Set up pinned workspace for Sync, Broadcast and Gather */
r = (int *) gm_dma_malloc( gm_p, 3*sizeof(int) ); if( p->tr )
s = (int *) gm_dma_malloc( gm_p, 3*sizeof(int) ); p->prot.host_id = gm_host_name_to_node_id(gm_p, p->host);
for(i=0; i<3; i++) s[i] = p->myproc + 1000;
sizeof_workspace = p->nprocs * ( sizeof( double ) + sizeof(int) ); gm_free_send_tokens(gm_p, GM_LOW_PRIORITY, gm_num_send_tokens(gm_p));
workspace = (char *) gm_dma_malloc( gm_p, sizeof_workspace ); ltime = gm_dma_malloc(gm_p, sizeof(unsigned long));
lrpt = gm_dma_malloc(gm_p, sizeof(unsigned long));
sync = gm_dma_malloc(gm_p, 64);
sync1 = gm_dma_malloc(gm_p, 64);
sprintf(sync, "Syncme");
Sync(p); /* Just does a sync of all procs to test */ establish(p);
p->prot.num_stokens = stokens = gm_num_send_tokens(gm_p); p->prot.num_stokens = gm_num_send_tokens(gm_p);
}
void my_low_send_callback(struct gm_port *port, void *context,
gm_status_t status)
{
if (status != GM_SUCCESS && status != GM_SEND_DROPPED) {
gm_perror("low send completed with error", status);
} else {
gm_free_send_token( port, GM_LOW_PRIORITY);
stokens++;
}
}
void my_high_send_callback(struct gm_port *port, void *context,
gm_status_t status)
{
if (status != GM_SUCCESS && status != GM_SEND_DROPPED) {
gm_perror("high send completed with error", status);
} else {
gm_free_send_token( port, GM_HIGH_PRIORITY);
stokens++;
}
} }
/* Write all the data to the remote node. buf must be DMA-able. */ void my_send_callback (struct gm_port *port, void *context, gm_status_t status)
int writeFully(ArgStruct *p, int node, void *buf, int nbytes,
unsigned int priority)
{ {
stokens--;
/*fprintf(stderr, "%d has %d send tokens\n", p->myproc, stokens);*/
if( priority == GM_LOW_PRIORITY ) {
gm_send_to_peer_with_callback(gm_p, buf,
gm_min_size_for_length( nbytes ), nbytes, GM_LOW_PRIORITY,
p->prot.host_id[node], my_low_send_callback, NULL);
} else {
gm_send_to_peer_with_callback(gm_p, buf,
gm_min_size_for_length( nbytes ), nbytes, GM_HIGH_PRIORITY,
p->prot.host_id[node], my_high_send_callback, NULL);
} if (status != GM_SUCCESS)
{
return nbytes; if (status != GM_SEND_DROPPED)
{
gm_perror ("send completed with error", status);
}
}
} }
/* Read all the data from the remote node. buf must be DMA-able. void establish(ArgStruct *p)
* NOTE: readFully() will accept any message matching the size
* and priority regardless of the source.
*/
int readFully(ArgStruct *p, int node, void *buf, int nbytes,
unsigned int priority)
{ {
int bytesRead = 0, bytesLeft = nbytes; gm_recv_event_t *e;
char *ptr = buf; int bytesRead, recv_sz;
gm_recv_event_t *e; char temp[60];
int todo = 1;
gm_provide_receive_buffer(gm_p, buf,
gm_min_size_for_length(nbytes), priority); if((p->r_buff = gm_dma_calloc(gm_p, 1, 64)) == 0) {
printf("Couldn't allocate memory \n");
exit(0);
}
while( bytesLeft > 0 ){ if((p->s_buff = gm_dma_calloc(gm_p, 1, 64)) == 0) {
printf("Couldn't allocate memory \n");
exit(0);
}
if(p->tr){
sprintf(p->s_buff, "this is the sender!!");
gm_send_to_peer_with_callback(gm_p, p->s_buff, 7,
(unsigned long) strlen(p->s_buff), GM_LOW_PRIORITY,
p->prot.host_id, my_send_callback, NULL);
}
else
{
gm_provide_receive_buffer(gm_p, p->r_buff, 7, GM_LOW_PRIORITY);
while (todo) {
e = gm_receive(gm_p); e = gm_receive(gm_p);
switch( gm_ntoh_u8(e->recv.type) ){ switch(gm_ntoh_u8(e->recv.type)) {
case GM_RECV_EVENT:
case GM_FAST_RECV_EVENT: case GM_PEER_RECV_EVENT:
case GM_FAST_HIGH_RECV_EVENT: case GM_FAST_PEER_RECV_EVENT:
case GM_FAST_PEER_RECV_EVENT: /*
case GM_FAST_HIGH_PEER_RECV_EVENT: printf("[recv] Received: \"%s\"\n",
(char *) gm_ntohp (e->recv.message));
/* DDT - This mode is used for messages below 128 bytes */ */
recv_sz=(int) gm_ntoh_u32 (e->recv.length);
bytesRead = (int) gm_ntoh_u32 (e->recv.length); p->prot.host_id= gm_ntoh_u16(e->recv.sender_node_id);
todo--;
bcopy( gm_ntohp(e->recv.message), ptr, bytesRead); break;
case GM_NO_RECV_EVENT:
bytesLeft -= bytesRead; break;
ptr += bytesRead; default:
gm_unknown(gm_p,e);
break;
case GM_RECV_EVENT:
case GM_PEER_RECV_EVENT:
case GM_HIGH_RECV_EVENT:
case GM_HIGH_PEER_RECV_EVENT:
/* DDT - This mode is used for large messages above 127 bytes */
bytesRead = (int) gm_ntoh_u32 (e->recv.length);
bytesLeft -= bytesRead;
break;
case GM_NO_RECV_EVENT:
break;
default:
gm_unknown(gm_p,e);
} }
} }
return nbytes; }
} }
int readOnce(ArgStruct *p) int readFully(void *buff, int len)
{ {
int bytesRead = 0; int bytesRead,bytesLeft;
gm_recv_event_t *e; gm_recv_event_t *e;
e = gm_receive(gm_p); bytesLeft=len;
bytesRead=0;
switch( gm_ntoh_u8(e->recv.type) ){ while (bytesLeft>0) {
e = gm_receive(gm_p);
case GM_FAST_RECV_EVENT:
case GM_FAST_HIGH_RECV_EVENT:
case GM_FAST_PEER_RECV_EVENT:
case GM_FAST_HIGH_PEER_RECV_EVENT:
/* DDT - This mode is used for messages below 128 bytes */
switch(gm_ntoh_u8(e->recv.type)) {
case GM_FAST_PEER_RECV_EVENT:
bytesRead = (int) gm_ntoh_u32 (e->recv.length); bytesRead = (int) gm_ntoh_u32 (e->recv.length);
bytesLeft -= bytesRead;
bcopy( gm_ntohp(e->recv.message), p->r_ptr, bytesRead); /* gm_memorize_message(buff, gm_ntohp (e->recv.message), gm_ntohl (e->re
cv.length)); */
p->bytesLeft -= bytesRead; bcopy(gm_ntohp(e->recv.message), buff,bytesRead);
p->r_ptr += bytesRead; /* strncpy(buff, (char *) gm_ntohp (e->recv.message),bytesRead); */
break; break;
case GM_RECV_EVENT:
case GM_RECV_EVENT: case GM_PEER_RECV_EVENT:
case GM_PEER_RECV_EVENT:
case GM_HIGH_RECV_EVENT:
case GM_HIGH_PEER_RECV_EVENT:
/* DDT - This mode is used for large messages above 127 bytes */
bytesRead = (int) gm_ntoh_u32 (e->recv.length); bytesRead = (int) gm_ntoh_u32 (e->recv.length);
bytesLeft -= bytesRead;
p->bytesLeft -= bytesRead;
p->r_ptr += bytesRead;
break; break;
case GM_NO_RECV_EVENT:
case GM_NO_RECV_EVENT:
break; break;
default: default:
gm_unknown(gm_p,e); gm_unknown(gm_p,e);
}
} }
return 1;
return bytesRead;
} }
/* Global sync with the master node, then local sync with your comm pair.
* The global sync is done at GM_HIGH_PRIORITY to prevent confusion
* with ping-pong traffic, then the local sync and following ping-pong
* traffic is done at GM_LOW_PRIORITY.
*/
void Sync(ArgStruct *p) void Sync(ArgStruct *p)
{ {
int i; int len;
static int* sa; /* sync array */ len=strlen(sync);
gm_send_to_peer_with_callback(gm_p, sync, gm_min_size_for_length(len),
if( sa == NULL ) sa = (int *) malloc( p->nprocs * sizeof(int) ); (unsigned long) (len+1), GM_LOW_PRIORITY,
p->prot.host_id, my_send_callback, NULL);
for( i=0; i<p->nprocs; i++ ) sa[i] = 0;
/* First do a global synchronization with the master proc 0 */
if( p->master ) {
/* Read from procs in the order that they arrive */
for( i=1; i<p->nprocs; i++ ) {
readFully( p, i, r, sizeof(int), GM_HIGH_PRIORITY );
*r -= 1000;
if( *r < 1 || *r > p->nprocs || sa[*r] != 0 ) {
fprintf(stderr,"%d NetPIPE: error reading global sync value"
" from proc %d (%d)\n", p->myproc, i, *r);
exit(0);
}
sa[*r]++; /* mark that node as having responded */
}
/* Write back to each proc */
for( i=1; i<p->nprocs; i++ ) {
writeFully(p, i, s, sizeof(int), GM_HIGH_PRIORITY);
}
} else {
writeFully(p, 0, s, sizeof(int), GM_HIGH_PRIORITY); gm_provide_receive_buffer(gm_p, sync1, gm_min_size_for_length(len),
readFully( p, 0, r, sizeof(int), GM_HIGH_PRIORITY); GM_LOW_PRIORITY);
if( *r-1000 != 0 ) { readFully(sync1,len+1);
fprintf(stderr,"%d NetPIPE: error reading global sync value"
" from proc 0 (%d)\n", p->myproc, *r);
exit(0);
}
}
/* Now do a local sync with your comm pair. The size of 12 bytes
* is chosen to seperate these messages from the global syncs. */
if( p->myproc != 0 && p->myproc != p->nprocs-1 ) {
writeFully(p, p->dest, s, 3*sizeof(int), GM_HIGH_PRIORITY);
readFully( p, p->dest, r, 3*sizeof(int), GM_HIGH_PRIORITY);
if( *r-1000 != p->dest ) {
fprintf(stderr,"%d NetPIPE: error reading pair sync value"
" from proc %d (%d)\n", p->myproc, p->dest, *r);
exit(0);
}
}
} }
/* This is used only for CPU workload measurements. PrepareToReceive sets
* up the receive buffer, then readOnce receives data in chunks.
*/
void PrepareToReceive(ArgStruct *p) void PrepareToReceive(ArgStruct *p)
{ {
gm_provide_receive_buffer(gm_p, p->r_ptr, /*
gm_min_size_for_length(p->bufflen), GM_LOW_PRIORITY The GM interface doesn't have a method to pre-post
); a buffer for reception of data.
*/
} }
void SendData(ArgStruct *p) void SendData(ArgStruct *p)
{ {
writeFully( p, p->dest, p->s_ptr, p->bufflen, GM_LOW_PRIORITY); gm_send_to_peer_with_callback(gm_p, p->s_ptr,
gm_min_size_for_length(p->bufflen),
p->bufflen, GM_LOW_PRIORITY,
p->prot.host_id, my_send_callback, NULL);
} }
/* This function reads the GM buffer once then tests for message void RecvData(ArgStruct *p)
* completion. It may need to be adapted to read multiple times until
* 0 bytes is read.
*/
int TestForCompletion( ArgStruct *p )
{ {
readOnce( p ); gm_provide_receive_buffer(gm_p, p->r_ptr,
gm_min_size_for_length(p->bufflen),
GM_LOW_PRIORITY);
if( p->bytesLeft == 0 ) return 1; /* The message is complete */ readFully(p->r_ptr, p->bufflen);
else return 0; /* The message is incomplete */
} }
void RecvData(ArgStruct *p) void SendTime(ArgStruct *p, double *t)
{ {
/* Note: readFully will accept any message matching the size,
* regardless of the source */
if( p->bytesLeft > 0 ) /* protect against re-read in CPU workload measureme
nts */
readFully( p, p->source, p->r_ptr, p->bufflen, GM_LOW_PRIORITY);
else
p->r_ptr = p->r_ptr_saved;
/*
Multiply the number of seconds by 1e6 to get time in microseconds
and convert value to an unsigned 32-bit integer.
*/
*ltime = (unsigned long)(*t * 1.e6);
gm_send_to_peer_with_callback(gm_p, ltime, gm_min_size_for_length(sizeof(uns
igned long)), sizeof(unsigned long),
GM_LOW_PRIORITY, p->prot.host_id, my_send_callback, NULL);
} }
/* Pass myproc with the data to guarantee node order */ void RecvTime(ArgStruct *p, double *t)
void Gather( ArgStruct *p, double *buf)
{ {
int i, proc, nbytes = sizeof(double); gm_provide_receive_buffer(gm_p, ltime, gm_min_size_for_length(sizeof(unsigne
char *pbuf = (char *) buf; d long)), GM_LOW_PRIORITY);
char *pwork = (char *) workspace; readFully(ltime, sizeof(unsigned long));
/* ltime = ntohl(p->buff1); */
Sync( p ); /* Needed to seperate from ping-pong traffic */
if( p->nprocs * (nbytes+sizeof(int)) > sizeof_workspace ) {
fprintf(stderr,"%d pinned workspace is too small in Gather\n",p->myproc);
exit(-1);
}
if( p->master ) { /* Gather the data from the other procs */ /* Result is ltime (in microseconds) divided by 1.0e6 to get seconds */
*t = (double)(*ltime) / 1.0e6;
bcopy( &p->myproc, pwork, sizeof(int) );
bcopy( pbuf, pwork+sizeof(int), nbytes ); /* copy proc 0's data */
for( proc=1; proc<p->nprocs; proc++ ) {
pwork += (sizeof(int) + nbytes);
readFully( p, proc, pwork, sizeof(int) + nbytes, GM_LOW_PRIORITY);
}
pwork = workspace;
for( i=0; i<p->nprocs; i++ ) {
bcopy( pwork, &proc, sizeof(int));
bcopy( pwork+sizeof(int), buf + proc * nbytes, nbytes );
pwork += sizeof(int) + nbytes;
}
} else { /* Write my section to the master proc 0 */
pwork += p->myproc * (sizeof(int) + nbytes);
pbuf += p->myproc * (sizeof(int) + nbytes);
bcopy( &p->myproc, pwork, sizeof(int) );
bcopy( pbuf, pwork+sizeof(int), nbytes ); /* copy to pinned memory */
writeFully( p, 0, pwork, sizeof(int)+nbytes, GM_LOW_PRIORITY);
}
} }
void Broadcast(ArgStruct *p, unsigned int *buf) void SendRepeat(ArgStruct *p, int rpt)
{ {
int i, nbytes = sizeof(int);
Sync( p ); /* Needed to seperate from ping-pong traffic */ *lrpt = (int)rpt;
/* Send repeat count as a long in network order */
if( nbytes > sizeof_workspace ) { gm_send_to_peer_with_callback(gm_p, lrpt, gm_min_size_for_length(sizeof(unsign
fprintf(stderr,"%d pinned workspace is too small in Broadcast\n",p->myproc ed long)), sizeof(unsigned long),
); GM_LOW_PRIORITY, p->prot.host_id, my_send_callback, NULL);
exit(-1); }
}
if( p->master ) {
bcopy( buf, workspace, nbytes ); /* copy to pinned memory */
for( i=1; i<p->nprocs; i++ ) {
writeFully( p, i, workspace, nbytes, GM_LOW_PRIORITY);
}
} else {
readFully( p, 0, workspace, nbytes, GM_LOW_PRIORITY);
bcopy( workspace, buf, nbytes ); /* copy from pinned memory */
} void RecvRepeat(ArgStruct *p, int *rpt)
{
gm_provide_receive_buffer(gm_p, lrpt, gm_min_size_for_length(sizeof(unsigned l
ong)), GM_LOW_PRIORITY);
readFully(lrpt,sizeof(unsigned long));
*rpt =(int)*lrpt;
} }
void CleanUp(ArgStruct *p) void CleanUp(ArgStruct *p)
{ {
sleep(2); sleep(2);
gm_close(gm_p); gm_close(gm_p);
gm_exit(GM_SUCCESS); gm_exit(GM_SUCCESS);
gm_finalize(); gm_finalize();
} }
void Reset(ArgStruct *p) { } void Reset(ArgStruct *p)
{
void MyMalloc(ArgStruct *p, int bufflen) }
void AfterAlignmentInit(ArgStruct *p)
{
}
void MyMalloc(ArgStruct *p, int bufflen, int soffset, int roffset)
{ {
if((p->r_buff = (char *)gm_dma_malloc(gm_p, bufflen+MAX(p->soffset,p->roffset) ))==(char *)NULL) if((p->r_buff = (char *)gm_dma_malloc(gm_p, bufflen+MAX(soffset,roffset)))==(c har *)NULL)
{ {
fprintf(stderr,"couldn't allocate memory\n"); fprintf(stderr,"couldn't allocate memory\n");
exit(-1); exit(-1);
} }
if(!p->cache) if(!p->cache)
if((p->s_buff = (char *)gm_dma_malloc(gm_p, bufflen+p->soffset))==(char *)NU LL) if((p->s_buff = (char *)gm_dma_malloc(gm_p, bufflen+soffset))==(char *)NULL)
{ {
fprintf(stderr,"Couldn't allocate memory\n"); fprintf(stderr,"Couldn't allocate memory\n");
exit(-1); exit(-1);
} }
} }
void FreeBuff(char *buff1, char *buff2) void FreeBuff(char *buff1, char *buff2)
{ {
if(buff1 != NULL) if(buff1 != NULL)
gm_dma_free(gm_p, buff1); gm_dma_free(gm_p, buff1);
 End of changes. 53 change blocks. 
385 lines changed or deleted 154 lines changed or added

Home  |  About  |  All  |  Newest  |  Fossies Dox  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTPS