irods
4.2.8
About: iRODS (the integrated Rule Oriented Data System) is a distributed data-management system for creating data grids, digital libraries, persistent archives, and real-time data systems. Fossies Dox: irods-4.2.8.tar.gz ("unofficial" and yet experimental doxygen-generated source code documentation) 
|
Go to the documentation of this file.
9 #include <boost/thread/thread.hpp>
10 #include <boost/thread/mutex.hpp>
11 #include <boost/thread/condition.hpp>
65 if ( xmsg ==
NULL || xmsgQue ==
NULL ) {
67 "addXmsgToQue: input xmsg or xmsgQue is NULL" );
74 xmsgQue->
head = xmsgQue->
tail = xmsg;
88 if ( xmsg ==
NULL || xmsgQue ==
NULL ) {
90 "addXmsgToQue: input xmsg or xmsgQue is NULL" );
117 if ( xmsg ==
NULL || xmsgQue ==
NULL ) {
119 "addXmsgToQue: input xmsg or xmsgQue is NULL" );
147 if ( xmsg ==
NULL || ticketMsgStruct ==
NULL ) {
149 "addXmsgToTicketMsgStruct: input xmsg or ticketMsgStruct is NULL" );
179 if ( msgCond ==
NULL || strlen( msgCond ) == 0 ) {
183 strcpy( condStr, msgCond );
193 if ( strcmp( condStr,
"" ) == 0 ) {
213 if ( outIrodsXmsg ==
NULL ) {
215 "getIrodsXmsgByMsgNum: input outIrodsXmsg is NULL" );
234 tmpIrodsXmsg = tmpIrodsXmsg->
tnext;
237 *outIrodsXmsg = tmpIrodsXmsg;
239 if ( tmpIrodsXmsg ==
NULL ) {
252 if ( outIrodsXmsg ==
NULL ) {
254 "getIrodsXmsgByMsgNum: input outIrodsXmsg is NULL" );
271 while ( tmpIrodsXmsg !=
NULL ) {
275 tmpIrodsXmsg = tmpIrodsXmsg->
tnext;
278 *outIrodsXmsg = tmpIrodsXmsg;
279 if ( tmpIrodsXmsg ==
NULL ) {
293 if ( ticket ==
NULL || ticketHQue ==
NULL ) {
295 "addTicketToHQue: input ticket or ticketHQue is NULL" );
303 tmpTicketMsgStruct->
ticket = *ticket;
307 free( tmpTicketMsgStruct );
318 if ( ticketMsgStruct ==
NULL || ticketHQue ==
NULL ) {
320 "addTicketMsgStructToHQue: ticketMsgStruct or ticketHQue is NULL" );
329 ticketHQue->
head = ticketHQue->
tail = ticketMsgStruct;
335 tmpTicketMsgStruct = ticketHQue->
head;
336 while ( tmpTicketMsgStruct !=
NULL ) {
346 tmpTicketMsgStruct = tmpTicketMsgStruct->
hnext;
349 if ( tmpTicketMsgStruct ==
NULL ) {
351 ticketHQue->
tail->
hnext = ticketMsgStruct;
352 ticketMsgStruct->
hprev = ticketHQue->
tail;
353 ticketHQue->
tail = ticketMsgStruct;
355 else if ( tmpTicketMsgStruct == ticketHQue->
head ) {
357 ticketHQue->
head->
hprev = ticketMsgStruct;
358 ticketMsgStruct->
hnext = ticketHQue->
head;
359 ticketHQue->
head = ticketMsgStruct;
363 ticketMsgStruct->
hprev = tmpTicketMsgStruct->
hprev;
364 ticketMsgStruct->
hnext = tmpTicketMsgStruct;
365 tmpTicketMsgStruct->
hprev->
hnext = ticketMsgStruct;
366 tmpTicketMsgStruct->
hprev = tmpTicketMsgStruct;
375 if ( ticketMsgStruct ==
NULL || ticketHQue ==
NULL ) {
377 "rmTicketMsgStructFromHQue: ticketMsgStruct or ticketHQue is NULL" );
383 ticketHQue->
head = ticketMsgStruct->
hnext;
391 ticketHQue->
tail = ticketMsgStruct->
hprev;
410 myXmsgReq->
sock = sock;
434 while ( myXmsgReq ==
NULL ) {
473 catch (
const boost::thread_resource_error& ) {
483 struct timeval msgTimeout;
484 memset( &msgTimeout, 0,
sizeof( msgTimeout ) );
489 if ( myXmsgReq ==
NULL ) {
495 memset( &rsComm, 0,
sizeof( rsComm ) );
510 "procReqRoutine: readStartupPack error, status = %d", ret.
code() );
524 FD_ZERO( &sockMask );
528 FD_SET( rsComm.
sock, &sockMask );
529 while ( ( numSock = select( rsComm.
sock + 1, &sockMask,
530 ( fd_set * )
NULL, ( fd_set * )
NULL, &msgTimeout ) ) <= 0 ) {
531 if ( errno == EINTR ) {
533 "procReqRoutine: select() interrupted" );
534 FD_SET( rsComm.
sock, &sockMask );
548 close( rsComm.
sock );
575 thisTime = time(
NULL );
581 outXmsgTicketInfo->
flag = 1;
584 free( outXmsgTicketInfo );
589 outXmsgTicketInfo->
flag = 1;
592 free( outXmsgTicketInfo );
598 outXmsgTicketInfo->
flag = 1;
601 free( outXmsgTicketInfo );
607 outXmsgTicketInfo->
flag = 1;
610 free( outXmsgTicketInfo );
616 outXmsgTicketInfo->
flag = 1;
619 free( outXmsgTicketInfo );
644 while ( tmpTicketMsgStruct !=
NULL ) {
646 *outTicketMsgStruct = tmpTicketMsgStruct;
650 *outTicketMsgStruct =
NULL;
654 tmpTicketMsgStruct = tmpTicketMsgStruct->
hnext;
659 *outTicketMsgStruct =
NULL;
668 if ( irodsXmsg ==
NULL || rcvXmsgOut ==
NULL ) {
670 "_rsRcvXmsg: input irodsXmsg or rcvXmsgOut is NULL" );
685 if ( sendXmsgInfo->
numRcv <= 0 && sendXmsgInfo->
numDeli <= 0 ) {
687 rcvXmsgOut->
msg = sendXmsgInfo->
msg;
699 free( sendXmsgInfo );
703 rcvXmsgOut->
msg = strdup( sendXmsgInfo->
msg );
723 while ( tmpIrodsXmsg !=
NULL ) {
724 if ( (
int ) tmpIrodsXmsg->
seqNumber == seqNum ) {
729 free( tmpIrodsXmsg );
732 tmpIrodsXmsg = tmpIrodsXmsg->
tnext;
745 while ( tmpIrodsXmsg !=
NULL ) {
746 tmpIrodsXmsg2 = tmpIrodsXmsg->
tnext;
750 free( tmpIrodsXmsg );
751 tmpIrodsXmsg = tmpIrodsXmsg2;
void rodsLog(int level, const char *formatStr,...)
int addXmsgToTicketMsgStruct(irodsXmsg_t *xmsg, ticketMsgStruct_t *ticketMsgStruct)
int clearSendXmsgInfo(sendXmsgInfo_t *sendXmsgInfo)
irods::error sendVersion(irods::network_object_ptr, int, int, char *, int)
int clearOneXMessage(ticketMsgStruct_t *ticketMsgStruct, int seqNum)
@ SYS_INTERNAL_NULL_INPUT_ERR
@ SYS_NO_XMSG_FOR_MSG_NUMBER
static boost::mutex ReqQueCondMutex
int addTicketMsgStructToHQue(ticketMsgStruct_t *ticketMsgStruct, ticketHashQue_t *ticketHQue)
#define PASS(prev_error_)
#define REQ_MSG_TIMEOUT_TIME
int getIrodsXmsgByMsgNum(int rcvTicket, int msgNumber, irodsXmsg_t **outIrodsXmsg)
char sendUserName[NAME_LEN]
int getTicketMsgStructByTicket(uint rcvTicket, ticketMsgStruct_t **outTicketMsgStruct)
@ SYS_DUPLICATE_XMSG_TICKET
int rmXmsgFromXmsgQue(irodsXmsg_t *xmsg, xmsgQue_t *xmsgQue)
char msgType[HEADER_TYPE_LEN]
int rmTicketMsgStructFromHQue(ticketMsgStruct_t *ticketMsgStruct, ticketHashQue_t *ticketHQue)
static boost::condition_variable ReqQueCond
irods::error network_factory(rcComm_t *, irods::network_object_ptr &)
int _rsRcvXmsg(irodsXmsg_t *irodsXmsg, rcvXmsgOut_t *rcvXmsgOut)
struct TicketMsgStruct * hprev
int clearAllXMessages(ticketMsgStruct_t *ticketMsgStruct)
int addXmsgToXmsgQue(irodsXmsg_t *xmsg, xmsgQue_t *xmsgQue)
int addReqToQue(int sock)
static xmsgReq_t * XmsgReqTail
int readAndProcClientMsg(rsComm_t *rsComm, int retApiStatus)
static boost::mutex MessQueCondMutex
sendXmsgInfo_t * sendXmsgInfo
@ SYS_UNMATCHED_XMSG_TICKET
char msgType[HEADER_TYPE_LEN]
ticketHashQue_t XmsgHashQue[47]
int getIrodsXmsg(rcvXmsgInp_t *rcvXmsgInp, irodsXmsg_t **outIrodsXmsg)
boost::shared_ptr< network_object > network_object_ptr
int addXmsgToQues(irodsXmsg_t *irodsXmsg, ticketMsgStruct_t *ticketMsgStruct)
static xmsgReq_t * XmsgReqHead
int addIntParamToArray(msParamArray_t *msParamArray, char *label, int inpInt)
int rmXmsgFromXmsgTcketQue(irodsXmsg_t *xmsg, xmsgQue_t *xmsgQue)
int addTicketToHQue(xmsgTicketInfo_t *ticket, ticketHashQue_t *ticketHQue)
int initRsCommWithStartupPack(rsComm_t *rsComm, startupPack_t *startupPack)
irods::error readStartupPack(irods::network_object_ptr _ptr, startupPack_t **startupPack, struct timeval *tv)
xmsgReq_t * getReqFromQue()
int addMsParam(msParamArray_t *msParamArray, const char *label, const char *packInstruct, void *inOutStruct, bytesBuf_t *inpOutBuf)
char * rstrcpy(char *dest, const char *src, int maxLen)
int checkMsgCondition(irodsXmsg_t *irodsXmsg, char *msgCond)
static boost::thread * ProcReqThread[40]
static msParamArray_t XMsgMsParamArray
char sendUserName[NAME_LEN]
@ SYS_THREAD_RESOURCE_ERR
int ticketHashFunc(uint rcvTicket)
char msgCondition[MAX_NAME_LEN]
struct TicketMsgStruct * hnext