irods  4.2.8
About: iRODS (the integrated Rule Oriented Data System) is a distributed data-management system for creating data grids, digital libraries, persistent archives, and real-time data systems.
  Fossies Dox: irods-4.2.8.tar.gz  ("unofficial" and yet experimental doxygen-generated source code documentation)  

xmsgLib.cpp
Go to the documentation of this file.
1 
4 /* xmsgLib.c - library routines for irodsXmsg
5  */
6 
7 // =-=-=-=-=-=-=-
8 // boost includes
9 #include <boost/thread/thread.hpp>
10 #include <boost/thread/mutex.hpp>
11 #include <boost/thread/condition.hpp>
12 
13 // =-=-=-=-=-=-=-
14 // irods includes
15 #include "rsApiHandler.hpp"
16 //#include "reGlobalsExtern.hpp"
17 #include "miscServerFunct.hpp"
18 #include "xmsgLib.hpp"
19 //#include "reFuncDefs.hpp"
21 #include "initServer.hpp"
22 
24 
25 static boost::mutex ReqQueCondMutex;
26 static boost::mutex MessQueCondMutex;
27 static boost::thread* ProcReqThread[ NUM_XMSG_THR ];
28 static boost::condition_variable ReqQueCond;
29 
32 static xmsgReq_t* XmsgReqTail = NULL; /* points to last item in queue */
34 
35 // =-=-=-=-=-=-=-
36 // globally referenced variable
38 
39 
40 
41 int
43  return 0;
44 }
45 
46 
47 int
48 addXmsgToQues( irodsXmsg_t *irodsXmsg, ticketMsgStruct_t *ticketMsgStruct ) {
49 
50 
51  MessQueCondMutex.lock();
52 
53  addXmsgToXmsgQue( irodsXmsg, &XmsgQue );
54  int status = addXmsgToTicketMsgStruct( irodsXmsg, ticketMsgStruct );
55 
56  MessQueCondMutex.unlock();
57 
58  return status;
59 
60 }
61 
62 int
64 
65  if ( xmsg == NULL || xmsgQue == NULL ) {
67  "addXmsgToQue: input xmsg or xmsgQue is NULL" );
69  }
70 
71  xmsg->next = xmsg->prev = NULL;
72 
73  if ( xmsgQue->head == NULL ) {
74  xmsgQue->head = xmsgQue->tail = xmsg;
75  }
76  else {
77  /* que it on top */
78  xmsgQue->head->prev = xmsg;
79  xmsg->next = xmsgQue->head;
80  xmsgQue->head = xmsg;
81  }
82 
83  return 0;
84 }
85 
86 int
88  if ( xmsg == NULL || xmsgQue == NULL ) {
90  "addXmsgToQue: input xmsg or xmsgQue is NULL" );
92  }
93 
94  if ( xmsg->prev == NULL ) {
95  /* at head */
96  xmsgQue->head = xmsg->next;
97  }
98  else {
99  xmsg->prev->next = xmsg->next;
100  }
101 
102  if ( xmsg->next == NULL ) {
103  /* at tail */
104  xmsgQue->tail = xmsg->prev;
105  }
106  else {
107  xmsg->next->prev = xmsg->prev;
108  }
109 
110  xmsg->prev = xmsg->next = NULL;
111 
112  return 0;
113 }
114 
115 int
117  if ( xmsg == NULL || xmsgQue == NULL ) {
119  "addXmsgToQue: input xmsg or xmsgQue is NULL" );
121  }
122 
123  if ( xmsg->tprev == NULL ) {
124  /* at head */
125  xmsgQue->head = xmsg->tnext;
126  }
127  else {
128  xmsg->tprev->tnext = xmsg->tnext;
129  }
130 
131  if ( xmsg->tnext == NULL ) {
132  /* at tail */
133  xmsgQue->tail = xmsg->tprev;
134  }
135  else {
136  xmsg->tnext->tprev = xmsg->tprev;
137  }
138 
139  xmsg->tprev = xmsg->tnext = NULL;
140 
141  return 0;
142 }
143 
144 int
146  ticketMsgStruct_t *ticketMsgStruct ) {
147  if ( xmsg == NULL || ticketMsgStruct == NULL ) {
149  "addXmsgToTicketMsgStruct: input xmsg or ticketMsgStruct is NULL" );
151  }
152 
153  /* up the expire time */
154  if ( xmsg->sendTime + INC_EXPIRE_INT > ticketMsgStruct->ticket.expireTime ) {
155  ticketMsgStruct->ticket.expireTime = xmsg->sendTime + INC_EXPIRE_INT;
156  }
157 
158  if ( ticketMsgStruct->xmsgQue.head == NULL ) {
159  ticketMsgStruct->xmsgQue.head = ticketMsgStruct->xmsgQue.tail = xmsg;
160  xmsg->tnext = xmsg->tprev = NULL;
161  }
162  else {
163  /* link it to the end */
164  ticketMsgStruct->xmsgQue.tail->tnext = xmsg;
165  xmsg->tprev = ticketMsgStruct->xmsgQue.tail;
166  ticketMsgStruct->xmsgQue.tail = xmsg;
167  xmsg->tnext = NULL;
168  }
169  xmsg->ticketMsgStruct = ticketMsgStruct;
170  xmsg->seqNumber = ticketMsgStruct->nxtSeqNumber;
171  ticketMsgStruct->nxtSeqNumber = ticketMsgStruct->nxtSeqNumber + 1;
172 
173  return xmsg->seqNumber;
174 }
175 
176 int checkMsgCondition( irodsXmsg_t *irodsXmsg, char *msgCond ) {
177  char condStr[MAX_NAME_LEN * 2];//, res[MAX_NAME_LEN * 2];
178 
179  if ( msgCond == NULL || strlen( msgCond ) == 0 ) {
180  return 0;
181  }
182 
183  strcpy( condStr, msgCond );
184 
185  XMsgMsParamArray.msParam[0]->inOutStruct = ( char * ) irodsXmsg->sendXmsgInfo->msgType; /* *XHDR*/
186  XMsgMsParamArray.msParam[1]->inOutStruct = ( char * ) irodsXmsg->sendUserName; /* *XUSER*/
187  XMsgMsParamArray.msParam[2]->inOutStruct = ( char * ) irodsXmsg->sendAddr; /* *XADDR*/
188  XMsgMsParamArray.msParam[3]->inOutStruct = ( char * ) irodsXmsg->sendXmsgInfo->miscInfo; /* *XMISC*/
189  * ( int * ) XMsgMsParamArray.msParam[4]->inOutStruct = ( int ) irodsXmsg->sendXmsgInfo->msgNumber; /* *XMSGNUM*/
190  * ( int * ) XMsgMsParamArray.msParam[5]->inOutStruct = ( int ) irodsXmsg->seqNumber; /* *XSEQNUM*/
191  * ( int * ) XMsgMsParamArray.msParam[6]->inOutStruct = ( int ) irodsXmsg->sendTime; /* *XTIME*/
192 
193  if ( strcmp( condStr, "" ) == 0 ) {
194  return 0;
195  }
196  //int ret = 1;
197  //int grdf[2];
198  // disableReDebugger( grdf );
199  // ret = !( computeExpression( condStr, &XMsgMsParamArray, NULL, 0, res ) == 0 );
200  // enableReDebugger( grdf );
201  return 0;//ret;
202 
203 }
204 
205 
206 
207 
208 
209 int getIrodsXmsg( rcvXmsgInp_t *rcvXmsgInp, irodsXmsg_t **outIrodsXmsg ) {
210  int rcvTicket = rcvXmsgInp->rcvTicket;
211  char *msgCond = rcvXmsgInp->msgCondition;
212 
213  if ( outIrodsXmsg == NULL ) {
215  "getIrodsXmsgByMsgNum: input outIrodsXmsg is NULL" );
217  }
218 
219  /* locate the ticketMsgStruct_t */
220 
221  ticketMsgStruct_t *ticketMsgStruct;
222  int status = getTicketMsgStructByTicket( rcvTicket, &ticketMsgStruct );
223 
224  if ( status < 0 ) {
225  return status;
226  }
227 
228  /* now locate the irodsXmsg_t */
229 
230  MessQueCondMutex.lock();
231  irodsXmsg_t *tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
232 
233  while ( tmpIrodsXmsg != NULL && checkMsgCondition( tmpIrodsXmsg, msgCond ) != 0 ) {
234  tmpIrodsXmsg = tmpIrodsXmsg->tnext;
235  }
236 
237  *outIrodsXmsg = tmpIrodsXmsg;
238  MessQueCondMutex.unlock();
239  if ( tmpIrodsXmsg == NULL ) {
241  }
242  return 0;
243 }
244 
245 int
246 getIrodsXmsgByMsgNum( int rcvTicket, int msgNumber,
247  irodsXmsg_t **outIrodsXmsg ) {
248  int status;
249  irodsXmsg_t *tmpIrodsXmsg;
250  ticketMsgStruct_t *ticketMsgStruct;
251 
252  if ( outIrodsXmsg == NULL ) {
254  "getIrodsXmsgByMsgNum: input outIrodsXmsg is NULL" );
256  }
257 
258  /* locate the ticketMsgStruct_t */
259 
260  status = getTicketMsgStructByTicket( rcvTicket, &ticketMsgStruct );
261 
262  if ( status < 0 ) {
263  return status;
264  }
265 
266  /* now locate the irodsXmsg_t */
267 
268  tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
269 
270  if ( msgNumber != ANY_MSG_NUMBER ) {
271  while ( tmpIrodsXmsg != NULL ) {
272  if ( ( int ) tmpIrodsXmsg->sendXmsgInfo->msgNumber == msgNumber ) {
273  break;
274  }
275  tmpIrodsXmsg = tmpIrodsXmsg->tnext;
276  }
277  }
278  *outIrodsXmsg = tmpIrodsXmsg;
279  if ( tmpIrodsXmsg == NULL ) {
281  }
282  else {
283  return 0;
284  }
285 }
286 
287 int
289  int status;
290 
291  ticketMsgStruct_t *tmpTicketMsgStruct;
292 
293  if ( ticket == NULL || ticketHQue == NULL ) {
295  "addTicketToHQue: input ticket or ticketHQue is NULL" );
297  }
298 
299  tmpTicketMsgStruct = ( ticketMsgStruct_t* )calloc( 1, sizeof( ticketMsgStruct_t ) );
300 
301  /* copy the content of the ticket */
302 
303  tmpTicketMsgStruct->ticket = *ticket;
304  status = addTicketMsgStructToHQue( tmpTicketMsgStruct, ticketHQue );
305 
306  if ( status < 0 ) {
307  free( tmpTicketMsgStruct );
308  }
309 
310  return status;
311 }
312 
313 int
315  ticketHashQue_t *ticketHQue ) {
316  ticketMsgStruct_t *tmpTicketMsgStruct;
317 
318  if ( ticketMsgStruct == NULL || ticketHQue == NULL ) {
320  "addTicketMsgStructToHQue: ticketMsgStruct or ticketHQue is NULL" );
322  }
323 
324  ticketMsgStruct->hnext = ticketMsgStruct->hprev = NULL;
325  ticketMsgStruct->nxtSeqNumber = 0;
326  ticketMsgStruct->ticketHashQue = ticketHQue;
327 
328  if ( ticketHQue->head == NULL ) {
329  ticketHQue->head = ticketHQue->tail = ticketMsgStruct;
330  return 0;
331  }
332 
333 
334  /* que in descending order of rcvTicket */
335  tmpTicketMsgStruct = ticketHQue->head;
336  while ( tmpTicketMsgStruct != NULL ) {
337  if ( ticketMsgStruct->ticket.rcvTicket ==
338  tmpTicketMsgStruct->ticket.rcvTicket ) {
340  }
341  else if ( ticketMsgStruct->ticket.rcvTicket >
342  tmpTicketMsgStruct->ticket.rcvTicket ) {
343  break;
344  }
345  else {
346  tmpTicketMsgStruct = tmpTicketMsgStruct->hnext;
347  }
348  }
349  if ( tmpTicketMsgStruct == NULL ) {
350  /* reached the end */
351  ticketHQue->tail->hnext = ticketMsgStruct;
352  ticketMsgStruct->hprev = ticketHQue->tail;
353  ticketHQue->tail = ticketMsgStruct;
354  }
355  else if ( tmpTicketMsgStruct == ticketHQue->head ) {
356  /* need to put ticketMsgStruct at the head */
357  ticketHQue->head->hprev = ticketMsgStruct;
358  ticketMsgStruct->hnext = ticketHQue->head;
359  ticketHQue->head = ticketMsgStruct;
360  }
361  else {
362  /* in the middle */
363  ticketMsgStruct->hprev = tmpTicketMsgStruct->hprev;
364  ticketMsgStruct->hnext = tmpTicketMsgStruct;
365  tmpTicketMsgStruct->hprev->hnext = ticketMsgStruct;
366  tmpTicketMsgStruct->hprev = tmpTicketMsgStruct;
367  }
368 
369  return 0;
370 }
371 
372 int
374  ticketHashQue_t *ticketHQue ) {
375  if ( ticketMsgStruct == NULL || ticketHQue == NULL ) {
377  "rmTicketMsgStructFromHQue: ticketMsgStruct or ticketHQue is NULL" );
379  }
380 
381  if ( ticketMsgStruct->hprev == NULL ) {
382  /* at head */
383  ticketHQue->head = ticketMsgStruct->hnext;
384  }
385  else {
386  ticketMsgStruct->hprev->hnext = ticketMsgStruct->hnext;
387  }
388 
389  if ( ticketMsgStruct->hnext == NULL ) {
390  /* at tail */
391  ticketHQue->tail = ticketMsgStruct->hprev;
392  }
393  else {
394  ticketMsgStruct->hnext->hprev = ticketMsgStruct->hprev;
395  }
396 
397  ticketMsgStruct->hprev = ticketMsgStruct->hnext = NULL;
398 
399  return 0;
400 }
401 
402 /* add incoming request to the bottom of the link list */
403 
404 int
405 addReqToQue( int sock ) {
406  xmsgReq_t *myXmsgReq;
407 
408  myXmsgReq = ( xmsgReq_t* )calloc( 1, sizeof( xmsgReq_t ) );
409 
410  myXmsgReq->sock = sock;
411 
412  ReqQueCondMutex.lock();
413 
414  if ( XmsgReqHead == NULL ) {
415  XmsgReqHead = myXmsgReq;
416  XmsgReqTail = myXmsgReq; /* points to last item in queue */
417  }
418  else {
419  XmsgReqTail->next = myXmsgReq;
420  XmsgReqTail = myXmsgReq;
421  }
422 
423  ReqQueCondMutex.unlock();
424 
425  ReqQueCond.notify_all();
426 
427  return 0;
428 }
429 
430 xmsgReq_t *
432  xmsgReq_t *myXmsgReq = NULL;
433 
434  while ( myXmsgReq == NULL ) {
435 
436  ReqQueCondMutex.lock();
437 
438  if ( XmsgReqHead != NULL ) {
439  myXmsgReq = XmsgReqHead;
441  ReqQueCondMutex.unlock();
442 
443  break;
444  }
445 
446  ReqQueCondMutex.unlock();
447 
448  boost::unique_lock<boost::mutex> boost_lock( ReqQueCondMutex );
449  ReqQueCond.wait( boost_lock );
450  if ( XmsgReqHead == NULL ) {
451  boost_lock.unlock();
452 
453  continue;
454  }
455  else {
456  myXmsgReq = XmsgReqHead;
458  boost_lock.unlock();
459 
460  break;
461  }
462  }
463 
464  return myXmsgReq;
465 }
466 
467 int
469  for ( int i = 0; i < NUM_XMSG_THR; i++ ) {
470  try {
471  ProcReqThread[i] = new boost::thread( procReqRoutine );
472  }
473  catch ( const boost::thread_resource_error& ) {
475  }
476  }
477 
478  return 0;
479 }
480 
481 void
483  struct timeval msgTimeout;
484  memset( &msgTimeout, 0, sizeof( msgTimeout ) );
485  msgTimeout.tv_sec = REQ_MSG_TIMEOUT_TIME;
486 
487  while ( 1 ) {
488  xmsgReq_t * myXmsgReq = getReqFromQue();
489  if ( myXmsgReq == NULL ) {
490  /* someone else took care of it */
491  continue;
492  }
493 
494  rsComm_t rsComm;
495  memset( &rsComm, 0, sizeof( rsComm ) );
496  rsComm.sock = myXmsgReq->sock;
497 
498  // =-=-=-=-=-=-=-
499  // manufacture a network object
501  irods::error ret = irods::network_factory( &rsComm, net_obj );
502  if ( !ret.ok() ) {
503  irods::log( PASS( ret ) );
504  }
505 
507  ret = readStartupPack( net_obj, &startupPack, NULL );
508  if ( !ret.ok() ) {
510  "procReqRoutine: readStartupPack error, status = %d", ret.code() );
511  free( myXmsgReq );
512  continue;
513  }
515  free( startupPack );
516 
517  ret = sendVersion( net_obj, 0, 0, NULL, 0 );
518  if ( !ret.ok() ) {
519  sendVersion( net_obj, SYS_AGENT_INIT_ERR, 0, NULL, 0 );
520  free( myXmsgReq );
521  continue;
522  }
523  fd_set sockMask;
524  FD_ZERO( &sockMask );
525  while ( 1 ) {
526  int numSock;
527 
528  FD_SET( rsComm.sock, &sockMask );
529  while ( ( numSock = select( rsComm.sock + 1, &sockMask,
530  ( fd_set * ) NULL, ( fd_set * ) NULL, &msgTimeout ) ) <= 0 ) {
531  if ( errno == EINTR ) {
533  "procReqRoutine: select() interrupted" );
534  FD_SET( rsComm.sock, &sockMask );
535  continue;
536  }
537  else {
538  break; /* timedout or something */
539  }
540  }
541  if ( numSock < 0 ) {
542  break;
543  }
544  if ( readAndProcClientMsg( &rsComm, 0 ) < 0 ) {
545  break;
546  }
547  }
548  close( rsComm.sock );
549  free( myXmsgReq );
550  }
551 }
552 
553 /* The hash function which use rcvTicket as the key. It take the modulo of
554  * rcvTicket/NUM_HASH_SLOT
555  */
556 
557 int
558 ticketHashFunc( uint rcvTicket ) {
559  const int mySlot = rcvTicket % NUM_HASH_SLOT;
560  return mySlot;
561 }
562 
563 int
565 
566  xmsgTicketInfo_t *outXmsgTicketInfo;
567  time_t thisTime;
568  int hashSlotNum;
569 
570  memset( XmsgHashQue, 0, NUM_HASH_SLOT * sizeof( ticketHashQue_t ) );
571  memset( &XmsgQue, 0, sizeof( XmsgQue ) );
572 
573 
575  thisTime = time( NULL );
576 
577  outXmsgTicketInfo = ( xmsgTicketInfo_t* )calloc( 1, sizeof( xmsgTicketInfo_t ) );
578  outXmsgTicketInfo->expireTime = thisTime + ( MAX_EXPIRE_INT * 500 );
579  outXmsgTicketInfo->rcvTicket = 1;
580  outXmsgTicketInfo->sendTicket = 1;
581  outXmsgTicketInfo->flag = 1;
582  hashSlotNum = ticketHashFunc( outXmsgTicketInfo->rcvTicket );
583  addTicketToHQue( outXmsgTicketInfo, &XmsgHashQue[hashSlotNum] );
584  free( outXmsgTicketInfo ); // JMC cppcheck - leak
585  outXmsgTicketInfo = ( xmsgTicketInfo_t* )calloc( 1, sizeof( xmsgTicketInfo_t ) );
586  outXmsgTicketInfo->expireTime = thisTime + ( MAX_EXPIRE_INT * 500 );
587  outXmsgTicketInfo->rcvTicket = 2;
588  outXmsgTicketInfo->sendTicket = 2;
589  outXmsgTicketInfo->flag = 1;
590  hashSlotNum = ticketHashFunc( outXmsgTicketInfo->rcvTicket );
591  addTicketToHQue( outXmsgTicketInfo, &XmsgHashQue[hashSlotNum] );
592  free( outXmsgTicketInfo ); // JMC cppcheck - leak
593 
594  outXmsgTicketInfo = ( xmsgTicketInfo_t* )calloc( 1, sizeof( xmsgTicketInfo_t ) );
595  outXmsgTicketInfo->expireTime = thisTime + ( MAX_EXPIRE_INT * 500 );
596  outXmsgTicketInfo->rcvTicket = 3;
597  outXmsgTicketInfo->sendTicket = 3;
598  outXmsgTicketInfo->flag = 1;
599  hashSlotNum = ticketHashFunc( outXmsgTicketInfo->rcvTicket );
600  addTicketToHQue( outXmsgTicketInfo, &XmsgHashQue[hashSlotNum] );
601  free( outXmsgTicketInfo ); // JMC cppcheck - leak
602 
603  outXmsgTicketInfo = ( xmsgTicketInfo_t* )calloc( 1, sizeof( xmsgTicketInfo_t ) );
604  outXmsgTicketInfo->expireTime = thisTime + ( MAX_EXPIRE_INT * 500 );
605  outXmsgTicketInfo->rcvTicket = 4;
606  outXmsgTicketInfo->sendTicket = 4;
607  outXmsgTicketInfo->flag = 1;
608  hashSlotNum = ticketHashFunc( outXmsgTicketInfo->rcvTicket );
609  addTicketToHQue( outXmsgTicketInfo, &XmsgHashQue[hashSlotNum] );
610  free( outXmsgTicketInfo ); // JMC cppcheck - leak
611 
612  outXmsgTicketInfo = ( xmsgTicketInfo_t* )calloc( 1, sizeof( xmsgTicketInfo_t ) );
613  outXmsgTicketInfo->expireTime = thisTime + ( MAX_EXPIRE_INT * 500 );
614  outXmsgTicketInfo->rcvTicket = 5;
615  outXmsgTicketInfo->sendTicket = 5;
616  outXmsgTicketInfo->flag = 1;
617  hashSlotNum = ticketHashFunc( outXmsgTicketInfo->rcvTicket );
618  addTicketToHQue( outXmsgTicketInfo, &XmsgHashQue[hashSlotNum] );
619  free( outXmsgTicketInfo ); // JMC cppcheck - leak
620 
621  addMsParam( &XMsgMsParamArray, "*XHDR", STR_MS_T, NULL, NULL );
622  addMsParam( &XMsgMsParamArray, "*XUSER", STR_MS_T, NULL, NULL );
623  addMsParam( &XMsgMsParamArray, "*XADDR", STR_MS_T, NULL, NULL );
624  addMsParam( &XMsgMsParamArray, "*XMISC", STR_MS_T, NULL, NULL );
625  addIntParamToArray( &XMsgMsParamArray, "*XMSGNUM", 0 );
626  addIntParamToArray( &XMsgMsParamArray, "*XSEQNUM", 0 );
627  addIntParamToArray( &XMsgMsParamArray, "*XTIME", 0 );
628 
629 
631  return 0;
632 }
633 
634 int
636  ticketMsgStruct_t **outTicketMsgStruct ) {
637  int hashSlotNum;
638  ticketMsgStruct_t *tmpTicketMsgStruct;
639 
640  hashSlotNum = ticketHashFunc( rcvTicket );
641 
642  tmpTicketMsgStruct = XmsgHashQue[hashSlotNum].head;
643 
644  while ( tmpTicketMsgStruct != NULL ) {
645  if ( rcvTicket == tmpTicketMsgStruct->ticket.rcvTicket ) {
646  *outTicketMsgStruct = tmpTicketMsgStruct;
647  return 0;
648  }
649  else if ( rcvTicket > tmpTicketMsgStruct->ticket.rcvTicket ) {
650  *outTicketMsgStruct = NULL;
652  }
653  else {
654  tmpTicketMsgStruct = tmpTicketMsgStruct->hnext;
655  }
656  }
657 
658  /* no match */
659  *outTicketMsgStruct = NULL;
661 }
662 
663 int
664 _rsRcvXmsg( irodsXmsg_t *irodsXmsg, rcvXmsgOut_t *rcvXmsgOut ) {
665  sendXmsgInfo_t *sendXmsgInfo;
666  ticketMsgStruct_t *ticketMsgStruct;
667 
668  if ( irodsXmsg == NULL || rcvXmsgOut == NULL ) {
670  "_rsRcvXmsg: input irodsXmsg or rcvXmsgOut is NULL" );
671  MessQueCondMutex.unlock();
673  }
674 
675  sendXmsgInfo = irodsXmsg->sendXmsgInfo;
676  ticketMsgStruct = ( ticketMsgStruct_t* )irodsXmsg->ticketMsgStruct;
677 
678  /* rodsLog (LOG_NOTICE,
679  "_rsRcvXmsg: SEQNum=%d, numRcv=%d", irodsXmsg->seqNumber,
680  sendXmsgInfo->numRcv); */
681  sendXmsgInfo = irodsXmsg->sendXmsgInfo;
682 
683  sendXmsgInfo->numRcv--;
684 
685  if ( sendXmsgInfo->numRcv <= 0 && sendXmsgInfo->numDeli <= 0 ) {
686  /* done with this msg */
687  rcvXmsgOut->msg = sendXmsgInfo->msg;
688  rcvXmsgOut->seqNumber = irodsXmsg->seqNumber;
689  rcvXmsgOut->msgNumber = sendXmsgInfo->msgNumber;
690  sendXmsgInfo->msg = NULL;
691  rstrcpy( rcvXmsgOut->msgType, sendXmsgInfo->msgType, HEADER_TYPE_LEN );
692  rstrcpy( rcvXmsgOut->sendUserName, irodsXmsg->sendUserName,
693  NAME_LEN );
694  rstrcpy( rcvXmsgOut->sendAddr, irodsXmsg->sendAddr,
695  NAME_LEN );
696  rmXmsgFromXmsgQue( irodsXmsg, &XmsgQue );
697  rmXmsgFromXmsgTcketQue( irodsXmsg, &ticketMsgStruct->xmsgQue );
698  clearSendXmsgInfo( sendXmsgInfo );
699  free( sendXmsgInfo );
700  free( irodsXmsg );
701  }
702  else {
703  rcvXmsgOut->msg = strdup( sendXmsgInfo->msg );
704  rcvXmsgOut->seqNumber = irodsXmsg->seqNumber;
705  rcvXmsgOut->msgNumber = sendXmsgInfo->msgNumber;
706  rstrcpy( rcvXmsgOut->msgType, sendXmsgInfo->msgType, HEADER_TYPE_LEN );
707  rstrcpy( rcvXmsgOut->sendUserName, irodsXmsg->sendUserName,
708  NAME_LEN );
709  rstrcpy( rcvXmsgOut->sendAddr, irodsXmsg->sendAddr,
710  NAME_LEN );
711  }
712  MessQueCondMutex.unlock();
713  return 0;
714 }
715 
716 int
717 clearOneXMessage( ticketMsgStruct_t *ticketMsgStruct, int seqNum ) {
718 
719 
720  irodsXmsg_t *tmpIrodsXmsg;
721 
722  tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
723  while ( tmpIrodsXmsg != NULL ) {
724  if ( ( int ) tmpIrodsXmsg->seqNumber == seqNum ) {
725  rmXmsgFromXmsgQue( tmpIrodsXmsg, &XmsgQue );
726  rmXmsgFromXmsgTcketQue( tmpIrodsXmsg, &ticketMsgStruct->xmsgQue );
727  clearSendXmsgInfo( tmpIrodsXmsg->sendXmsgInfo );
728  free( tmpIrodsXmsg->sendXmsgInfo );
729  free( tmpIrodsXmsg );
730  return 0;
731  }
732  tmpIrodsXmsg = tmpIrodsXmsg->tnext;
733  }
734 
735 
736  return 0;
737 }
738 
739 int
741 
742  irodsXmsg_t *tmpIrodsXmsg, *tmpIrodsXmsg2;
743 
744  tmpIrodsXmsg = ticketMsgStruct->xmsgQue.head;
745  while ( tmpIrodsXmsg != NULL ) {
746  tmpIrodsXmsg2 = tmpIrodsXmsg->tnext;
747  rmXmsgFromXmsgQue( tmpIrodsXmsg, &XmsgQue );
748  clearSendXmsgInfo( tmpIrodsXmsg->sendXmsgInfo );
749  free( tmpIrodsXmsg->sendXmsgInfo );
750  free( tmpIrodsXmsg );
751  tmpIrodsXmsg = tmpIrodsXmsg2;
752  }
753 
754  ticketMsgStruct->xmsgQue.head = NULL;
755  ticketMsgStruct->xmsgQue.tail = NULL;
756  return 0;
757 }
rodsLog
void rodsLog(int level, const char *formatStr,...)
Definition: rodsLog.cpp:86
addXmsgToTicketMsgStruct
int addXmsgToTicketMsgStruct(irodsXmsg_t *xmsg, ticketMsgStruct_t *ticketMsgStruct)
Definition: xmsgLib.cpp:145
NULL
#define NULL
Definition: rodsDef.h:70
clearSendXmsgInfo
int clearSendXmsgInfo(sendXmsgInfo_t *sendXmsgInfo)
Definition: rcMisc.cpp:3473
rsComm_t
Definition: rcConnect.h:145
XmsgQue
Definition: rodsXmsg.h:63
sendVersion
irods::error sendVersion(irods::network_object_ptr, int, int, char *, int)
Definition: sockComm.cpp:1168
clearOneXMessage
int clearOneXMessage(ticketMsgStruct_t *ticketMsgStruct, int seqNum)
Definition: xmsgLib.cpp:717
SYS_INTERNAL_NULL_INPUT_ERR
@ SYS_INTERNAL_NULL_INPUT_ERR
Definition: rodsErrorTable.h:92
SYS_NO_XMSG_FOR_MSG_NUMBER
@ SYS_NO_XMSG_FOR_MSG_NUMBER
Definition: rodsErrorTable.h:130
IrodsXmsg
Definition: rodsXmsg.h:50
startXmsgThreads
int startXmsgThreads()
Definition: xmsgLib.cpp:468
IrodsXmsg::sendTime
uint sendTime
Definition: rodsXmsg.h:52
SYS_AGENT_INIT_ERR
@ SYS_AGENT_INIT_ERR
Definition: rodsErrorTable.h:105
ReqQueCondMutex
static boost::mutex ReqQueCondMutex
Definition: xmsgLib.cpp:25
XmsgQue
static xmsgQue_t XmsgQue
Definition: xmsgLib.cpp:30
SendXmsgInfo::numRcv
uint numRcv
Definition: rodsXmsg.h:33
RcvXmsgInp
Definition: rodsXmsg.h:88
TicketMsgStruct::ticket
xmsgTicketInfo_t ticket
Definition: rodsXmsg.h:69
SendXmsgInfo
Definition: rodsXmsg.h:30
addTicketMsgStructToHQue
int addTicketMsgStructToHQue(ticketMsgStruct_t *ticketMsgStruct, ticketHashQue_t *ticketHQue)
Definition: xmsgLib.cpp:314
HEADER_TYPE_LEN
#define HEADER_TYPE_LEN
Definition: rodsDef.h:53
PASS
#define PASS(prev_error_)
Definition: irods_error.hpp:118
XmsgQue::tail
irodsXmsg_t * tail
Definition: rodsXmsg.h:65
NUM_HASH_SLOT
#define NUM_HASH_SLOT
Definition: xmsgLib.hpp:24
REQ_MSG_TIMEOUT_TIME
#define REQ_MSG_TIMEOUT_TIME
Definition: xmsgLib.hpp:22
TicketMsgStruct::xmsgQue
xmsgQue_t xmsgQue
Definition: rodsXmsg.h:70
getIrodsXmsgByMsgNum
int getIrodsXmsgByMsgNum(int rcvTicket, int msgNumber, irodsXmsg_t **outIrodsXmsg)
Definition: xmsgLib.cpp:246
TicketHashQue::head
ticketMsgStruct_t * head
Definition: rodsXmsg.h:79
XmsgTicketInfo::flag
uint flag
Definition: rodsXmsg.h:27
MAX_EXPIRE_INT
#define MAX_EXPIRE_INT
Definition: rodsXmsg.h:9
TicketHashQue
Definition: rodsXmsg.h:78
initThreadEnv
int initThreadEnv()
Definition: xmsgLib.cpp:42
RcvXmsgOut::sendUserName
char sendUserName[NAME_LEN]
Definition: rodsXmsg.h:97
getTicketMsgStructByTicket
int getTicketMsgStructByTicket(uint rcvTicket, ticketMsgStruct_t **outTicketMsgStruct)
Definition: xmsgLib.cpp:635
RcvXmsgOut
Definition: rodsXmsg.h:95
XmsgReq
Definition: rodsXmsg.h:83
SYS_DUPLICATE_XMSG_TICKET
@ SYS_DUPLICATE_XMSG_TICKET
Definition: rodsErrorTable.h:128
XmsgTicketInfo::expireTime
uint expireTime
Definition: rodsXmsg.h:26
rmXmsgFromXmsgQue
int rmXmsgFromXmsgQue(irodsXmsg_t *xmsg, xmsgQue_t *xmsgQue)
Definition: xmsgLib.cpp:87
LOG_ERROR
#define LOG_ERROR
Definition: rodsLog.h:43
XmsgReq::next
struct XmsgReq * next
Definition: rodsXmsg.h:85
RcvXmsgOut::msgType
char msgType[HEADER_TYPE_LEN]
Definition: rodsXmsg.h:96
rmTicketMsgStructFromHQue
int rmTicketMsgStructFromHQue(ticketMsgStruct_t *ticketMsgStruct, ticketHashQue_t *ticketHQue)
Definition: xmsgLib.cpp:373
STR_MS_T
#define STR_MS_T
Definition: msParam.h:21
ReqQueCond
static boost::condition_variable ReqQueCond
Definition: xmsgLib.cpp:28
SendXmsgInfo::miscInfo
char * miscInfo
Definition: rodsXmsg.h:39
INC_EXPIRE_INT
#define INC_EXPIRE_INT
Definition: rodsXmsg.h:10
IrodsXmsg::seqNumber
uint seqNumber
Definition: rodsXmsg.h:59
SendXmsgInfo::msgNumber
uint msgNumber
Definition: rodsXmsg.h:31
SendXmsgInfo::msg
char * msg
Definition: rodsXmsg.h:35
irods_network_factory.hpp
XmsgTicketInfo::sendTicket
uint sendTicket
Definition: rodsXmsg.h:24
irods::error::code
long long code() const
Definition: irods_error.cpp:194
TicketMsgStruct
Definition: rodsXmsg.h:68
irods::network_factory
irods::error network_factory(rcComm_t *, irods::network_object_ptr &)
Definition: irods_network_factory.cpp:8
IrodsXmsg::tnext
struct IrodsXmsg * tnext
Definition: rodsXmsg.h:57
_rsRcvXmsg
int _rsRcvXmsg(irodsXmsg_t *irodsXmsg, rcvXmsgOut_t *rcvXmsgOut)
Definition: xmsgLib.cpp:664
XmsgReq::sock
int sock
Definition: rodsXmsg.h:84
TicketMsgStruct::hprev
struct TicketMsgStruct * hprev
Definition: rodsXmsg.h:71
MsParamArray::msParam
msParam_t ** msParam
Definition: msParam.h:87
clearAllXMessages
int clearAllXMessages(ticketMsgStruct_t *ticketMsgStruct)
Definition: xmsgLib.cpp:740
XmsgQue::head
irodsXmsg_t * head
Definition: rodsXmsg.h:64
MAX_NAME_LEN
#define MAX_NAME_LEN
Definition: rodsDef.h:61
addXmsgToXmsgQue
int addXmsgToXmsgQue(irodsXmsg_t *xmsg, xmsgQue_t *xmsgQue)
Definition: xmsgLib.cpp:63
addReqToQue
int addReqToQue(int sock)
Definition: xmsgLib.cpp:405
XmsgReqTail
static xmsgReq_t * XmsgReqTail
Definition: xmsgLib.cpp:32
readAndProcClientMsg
int readAndProcClientMsg(rsComm_t *rsComm, int retApiStatus)
Definition: rsApiHandler.cpp:488
MessQueCondMutex
static boost::mutex MessQueCondMutex
Definition: xmsgLib.cpp:26
IrodsXmsg::sendXmsgInfo
sendXmsgInfo_t * sendXmsgInfo
Definition: rodsXmsg.h:51
SYS_UNMATCHED_XMSG_TICKET
@ SYS_UNMATCHED_XMSG_TICKET
Definition: rodsErrorTable.h:129
NUM_XMSG_THR
#define NUM_XMSG_THR
Definition: xmsgLib.hpp:25
irods.pypyodbc.status
status
Definition: pypyodbc.py:467
irods::log
void log(const error &)
Definition: irods_log.cpp:13
LOG_NOTICE
#define LOG_NOTICE
Definition: rodsLog.h:33
xmsgLib.hpp
IrodsXmsg::sendAddr
char sendAddr[NAME_LEN]
Definition: rodsXmsg.h:54
irods::error
Definition: irods_error.hpp:23
miscServerFunct.hpp
SendXmsgInfo::msgType
char msgType[HEADER_TYPE_LEN]
Definition: rodsXmsg.h:32
RcvXmsgOut::msgNumber
uint msgNumber
Definition: rodsXmsg.h:99
XmsgHashQue
ticketHashQue_t XmsgHashQue[47]
Definition: xmsgLib.cpp:37
XmsgTicketInfo
Definition: rodsXmsg.h:23
IrodsXmsg::ticketMsgStruct
void * ticketMsgStruct
Definition: rodsXmsg.h:60
getIrodsXmsg
int getIrodsXmsg(rcvXmsgInp_t *rcvXmsgInp, irodsXmsg_t **outIrodsXmsg)
Definition: xmsgLib.cpp:209
XmsgTicketInfo::rcvTicket
uint rcvTicket
Definition: rodsXmsg.h:25
irods::network_object_ptr
boost::shared_ptr< network_object > network_object_ptr
Definition: irods_network_object.hpp:78
procReqRoutine
void procReqRoutine()
Definition: xmsgLib.cpp:482
startupPack
Definition: rodsDef.h:234
IrodsXmsg::prev
struct IrodsXmsg * prev
Definition: rodsXmsg.h:55
addXmsgToQues
int addXmsgToQues(irodsXmsg_t *irodsXmsg, ticketMsgStruct_t *ticketMsgStruct)
Definition: xmsgLib.cpp:48
TicketMsgStruct::nxtSeqNumber
uint nxtSeqNumber
Definition: rodsXmsg.h:74
XmsgReqHead
static xmsgReq_t * XmsgReqHead
Definition: xmsgLib.cpp:31
addIntParamToArray
int addIntParamToArray(msParamArray_t *msParamArray, char *label, int inpInt)
Definition: msParam.cpp:24
rmXmsgFromXmsgTcketQue
int rmXmsgFromXmsgTcketQue(irodsXmsg_t *xmsg, xmsgQue_t *xmsgQue)
Definition: xmsgLib.cpp:116
RcvXmsgOut::msg
char * msg
Definition: rodsXmsg.h:101
addTicketToHQue
int addTicketToHQue(xmsgTicketInfo_t *ticket, ticketHashQue_t *ticketHQue)
Definition: xmsgLib.cpp:288
initServer.hpp
initRsCommWithStartupPack
int initRsCommWithStartupPack(rsComm_t *rsComm, startupPack_t *startupPack)
Definition: initServer.cpp:834
TicketHashQue::tail
ticketMsgStruct_t * tail
Definition: rodsXmsg.h:80
IrodsXmsg::next
struct IrodsXmsg * next
Definition: rodsXmsg.h:56
readStartupPack
irods::error readStartupPack(irods::network_object_ptr _ptr, startupPack_t **startupPack, struct timeval *tv)
Definition: miscServerFunct.cpp:2716
getReqFromQue
xmsgReq_t * getReqFromQue()
Definition: xmsgLib.cpp:431
MsParam::inOutStruct
void * inOutStruct
Definition: msParam.h:80
addMsParam
int addMsParam(msParamArray_t *msParamArray, const char *label, const char *packInstruct, void *inOutStruct, bytesBuf_t *inpOutBuf)
Definition: msParam.cpp:17
rstrcpy
char * rstrcpy(char *dest, const char *src, int maxLen)
Definition: stringOpr.cpp:51
initXmsgHashQue
int initXmsgHashQue()
Definition: xmsgLib.cpp:564
rsApiHandler.hpp
MsParamArray
Definition: msParam.h:84
checkMsgCondition
int checkMsgCondition(irodsXmsg_t *irodsXmsg, char *msgCond)
Definition: xmsgLib.cpp:176
IrodsXmsg::tprev
struct IrodsXmsg * tprev
Definition: rodsXmsg.h:58
ProcReqThread
static boost::thread * ProcReqThread[40]
Definition: xmsgLib.cpp:27
ANY_MSG_NUMBER
#define ANY_MSG_NUMBER
Definition: rodsXmsg.h:12
TicketMsgStruct::ticketHashQue
void * ticketHashQue
Definition: rodsXmsg.h:73
NAME_LEN
#define NAME_LEN
Definition: rodsDef.h:55
RcvXmsgOut::seqNumber
uint seqNumber
Definition: rodsXmsg.h:100
XMsgMsParamArray
static msParamArray_t XMsgMsParamArray
Definition: xmsgLib.cpp:33
sockCommNetworkInterface.hpp
irods::error::ok
bool ok()
Definition: irods_error.cpp:258
RcvXmsgOut::sendAddr
char sendAddr[NAME_LEN]
Definition: rodsXmsg.h:98
IrodsXmsg::sendUserName
char sendUserName[NAME_LEN]
Definition: rodsXmsg.h:53
SYS_THREAD_RESOURCE_ERR
@ SYS_THREAD_RESOURCE_ERR
Definition: rodsErrorTable.h:214
rsComm_t::sock
int sock
Definition: rcConnect.h:147
RcvXmsgInp::rcvTicket
uint rcvTicket
Definition: rodsXmsg.h:89
ticketHashFunc
int ticketHashFunc(uint rcvTicket)
Definition: xmsgLib.cpp:558
RcvXmsgInp::msgCondition
char msgCondition[MAX_NAME_LEN]
Definition: rodsXmsg.h:92
TicketMsgStruct::hnext
struct TicketMsgStruct * hnext
Definition: rodsXmsg.h:72
SendXmsgInfo::numDeli
int numDeli
Definition: rodsXmsg.h:36