"Fossies" - the Fresh Open Source Software Archive

Member "postal-0.76/tcp.cpp" (1 Jan 2012, 11063 Bytes) of package /linux/privat/postal-0.76.tgz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "tcp.cpp" see the Fossies "Dox" file reference documentation.

    1 #define TCP_BODY
    2 
    3 #include "tcp.h"
    4 
    5 #include <sys/socket.h>
    6 #include <unistd.h>
    7 #include <arpa/inet.h>
    8 #include <fcntl.h>
    9 #include <errno.h>
   10 #include <cstring>
   11 #include "postal.h"
   12 #include "userlist.h"
   13 #include "address.h"
   14 #include "logit.h"
   15 
   16 tcp::tcp(int *exitCount, const char *addr, unsigned short default_port
   17        , Logit *log
   18 #ifdef USE_SSL
   19        , int ssl
   20 #endif
   21        , const char *sourceAddr, Logit *debug)
   22  : m_md5()
   23  , m_destAffinity(0)
   24  , m_log(log)
   25 #ifdef USE_SSL
   26  , m_canTLS(false)
   27  , m_useTLS(ssl)
   28 #endif
   29  , m_exitCount(exitCount)
   30  , m_fd(-1)
   31  , m_start(0)
   32  , m_end(0)
   33  , m_open(false)
   34  , m_addr(new address(addr, default_port))
   35  , m_sourceAddr(NULL)
   36  , m_debug(debug)
   37 #ifdef USE_SSL
   38 #ifdef USE_OPENSSL
   39  , m_sslMeth(NULL)
   40  , m_sslCtx(NULL)
   41  , m_ssl(NULL)
   42 #else
   43  , m_gnutls_session(NULL)
   44  , m_gnutls_anoncred(NULL)
   45 #endif
   46  , m_isTLS(false)
   47 #endif
   48 {
   49   if(sourceAddr)
   50     m_sourceAddr = new address(sourceAddr);
   51   m_destAffinity = getThreadNum() % m_addr->addressCount();
   52 #ifdef USE_SSL
   53   if(m_useTLS)
   54   {
   55 #ifdef USE_OPENSSL
   56 //don't seem to need this    SSL_library_init();
   57     SSLeay_add_ssl_algorithms();
   58     SSL_load_error_strings();
   59 #endif
   60   }
   61 #endif
   62 }
   63 
   64 tcp::tcp(int threadNum, const tcp *parent)
   65  : Thread(threadNum, parent)
   66  , m_md5()
   67  , m_destAffinity(parent->m_destAffinity)
   68  , m_log(parent->m_log)
   69 #ifdef USE_SSL
   70  , m_canTLS(false)
   71  , m_useTLS(parent->m_useTLS)
   72 #endif
   73  , m_exitCount(parent->m_exitCount)
   74  , m_fd(-1)
   75  , m_start(0)
   76  , m_end(0)
   77  , m_open(false)
   78  , m_addr(parent->m_addr)
   79  , m_sourceAddr(parent->m_sourceAddr)
   80  , m_debug(parent->m_debug ? new Logit(*(parent->m_debug), threadNum) : NULL)
   81 #ifdef USE_SSL
   82 #ifdef USE_OPENSSL
   83  , m_sslMeth(NULL)
   84  , m_sslCtx(NULL)
   85  , m_ssl(NULL)
   86 #else
   87  , m_gnutls_session(NULL)
   88  , m_gnutls_anoncred(NULL)
   89 #endif
   90  , m_isTLS(false)
   91 #endif
   92 {
   93 }
   94 
   95 tcp::~tcp()
   96 {
   97   disconnect();
   98   if(getThreadNum() < 1)
   99   {
  100     delete m_addr;
  101     delete m_sourceAddr;
  102   }
  103   if(m_debug)
  104     delete m_debug;
  105 }
  106 
  107 int tcp::Connect(short port)
  108 {
  109   if(*m_exitCount)
  110     return 1;
  111 #ifdef USE_SSL
  112   m_canTLS = false;
  113 #endif
  114   m_start = 0;
  115   m_end = 0;
  116 #ifdef USE_SSL
  117   m_isTLS = false;
  118 #endif
  119   sockaddr *sa;
  120   sa = m_addr->get_addr(m_destAffinity);
  121   if(!sa)
  122     return 1;
  123   m_fd = socket(PF_INET, SOCK_STREAM, 0);
  124   if(m_fd < 0)
  125   {
  126     fprintf(stderr, "Can't open socket.\n");
  127     error();
  128     return 2;
  129   }
  130   int rc;
  131   if(m_sourceAddr)
  132   {
  133     sockaddr *source;
  134     source = (sockaddr *)m_sourceAddr->get_rand_addr();
  135     rc = bind(m_fd, source, sizeof(struct sockaddr_in));
  136     if(rc)
  137     {
  138       fprintf(stderr, "Can't bind to port.\n");
  139       error();
  140       close(m_fd);
  141       return 2;
  142     }
  143   }
  144   m_poll.fd = m_fd;
  145   if(port)
  146   {
  147     struct sockaddr_in newAddr;
  148     memcpy(&newAddr, sa, sizeof(newAddr));
  149     newAddr.sin_port = htons(port);
  150     rc = connect(m_fd, (sockaddr *)&newAddr, sizeof(struct sockaddr_in));
  151   }
  152   else
  153   {
  154     rc = connect(m_fd, sa, sizeof(struct sockaddr_in));
  155   }
  156   if(rc)
  157   {
  158     fprintf(stderr, "Can't connect to %s port %d.\n"
  159                   , inet_ntoa(((sockaddr_in *)sa)->sin_addr)
  160                   , int(ntohs(((sockaddr_in *)sa)->sin_port)) );
  161     error();
  162     close(m_fd);
  163     return 1;
  164   }
  165   socklen_t namelen = sizeof(m_connectionLocalAddr);
  166   rc = getsockname(m_fd, (struct sockaddr *)&m_connectionLocalAddr, &namelen);
  167   if(rc)
  168     fprintf(stderr, "Can't getsockname!\n");
  169   if(m_debug)
  170     m_debug->reopen();
  171   m_open = true;
  172   return 0;
  173 }
  174 
  175 #ifdef USE_SSL
  176 int tcp::ConnectTLS()
  177 {
  178 #ifdef USE_OPENSSL
  179   m_sslCtx = NULL;
  180   m_ssl = NULL;
  181   m_sslMeth = SSLv2_client_method();
  182   if(m_sslMeth == NULL)
  183   {
  184     fprintf(stderr, "Can't get SSLv2_client_method.\n");
  185     error();
  186     return 2;
  187   }
  188   m_sslCtx = SSL_CTX_new(m_sslMeth);
  189   if(m_sslCtx == NULL)
  190   {
  191     fprintf(stderr, "Can't SSL_CTX_new\n");
  192     error();
  193     return 2;
  194   }
  195   if((m_ssl = SSL_new(m_sslCtx)) == NULL)
  196   {
  197     fprintf(stderr, "Can't SSL_new\n");
  198     SSL_CTX_free(m_sslCtx);
  199     error();
  200     return 2;
  201   }
  202   SSL_set_fd(m_ssl, m_fd);
  203   if(-1 == SSL_connect(m_ssl))
  204   {
  205     fprintf(stderr, "Can't SSL_CONNECT\n");
  206     SSL_free(m_ssl);
  207     SSL_CTX_free(m_sslCtx);
  208     error();
  209     return 1;
  210   }
  211 #else
  212   gnutls_anon_allocate_client_credentials(&m_gnutls_anoncred);
  213   m_gnutls_session = new gnutls_session_t;
  214   // Initialize TLS session
  215   gnutls_init (m_gnutls_session, GNUTLS_CLIENT);
  216   // Use default priorities
  217   gnutls_set_default_priority(*m_gnutls_session); // bug in gnutls interface?
  218   // Need to enable anonymous specifically
  219   gnutls_priority_set_direct(*m_gnutls_session, "NORMAL:+ANON-DH", NULL);
  220   // put the anonymous credentials to the current session
  221   gnutls_credentials_set(*m_gnutls_session, GNUTLS_CRD_ANON, m_gnutls_anoncred);
  222   gnutls_transport_set_ptr(*m_gnutls_session, (gnutls_transport_ptr_t)m_fd);
  223 
  224   // Perform the TLS handshake
  225   if(gnutls_handshake(*m_gnutls_session) < 0)
  226   {
  227     fprintf(stderr, "Can't gnutls_handshake\n");
  228     gnutls_deinit(*m_gnutls_session);
  229     gnutls_anon_free_client_credentials(m_gnutls_anoncred);
  230     error();
  231     return 1;
  232   }
  233 #endif
  234   if(*m_exitCount > 1)
  235     return 3;
  236   m_isTLS = true;
  237 
  238 #if 0
  239 // openssl debugging code that may be useful to have around
  240 // in a commented-out state.
  241   /* Following two steps are optional and not required for
  242      data exchange to be successful. */
  243  
  244   /* Get the cipher - opt */
  245  
  246   printf ("SSL connection using %s\n", SSL_get_cipher(m_ssl));
  247  
  248   /* Get server's certificate (note: beware of dynamic allocation) - opt */
  249  
  250   X509 *server_cert;
  251   server_cert = SSL_get_peer_certificate(m_ssl);
  252   if(!server_cert)
  253   {
  254     fprintf(stderr, "Can't SSL_get_peer_certificate\n");
  255     return 2;
  256   }
  257   printf ("Server certificate:\n");
  258  
  259   char *str = X509_NAME_oneline(X509_get_subject_name(server_cert),0,0);
  260   if(!str)
  261   {
  262     fprintf(stderr, "Can't X509_NAME_oneline\n");
  263     return 2;
  264   }
  265   printf ("\t subject: %s\n", str);
  266   Free (str);
  267   str = X509_NAME_oneline (X509_get_issuer_name(server_cert),0,0);
  268   if(!str)
  269   {
  270     fprintf(stderr, "Can't X509_get_issuer_name\n");
  271     return 2;
  272   }
  273   printf ("\t issuer: %s\n", str);
  274   Free (str);
  275  
  276   /* We could do all sorts of certificate verification stuff here before
  277      deallocating the certificate. */
  278  
  279   X509_free(server_cert);
  280 #endif 
  281   return 0;
  282 }
  283 #endif
  284 
  285 int tcp::disconnect()
  286 {
  287   if(m_open)
  288   {
  289 #ifdef USE_SSL
  290     if(m_isTLS)
  291     {
  292 #ifdef USE_OPENSSL
  293       SSL_shutdown(m_ssl);
  294       close(m_fd);
  295       SSL_free(m_ssl);
  296       SSL_CTX_free(m_sslCtx);
  297 #else
  298       gnutls_bye(*m_gnutls_session, GNUTLS_SHUT_RDWR);
  299       close(m_fd);
  300       gnutls_deinit(*m_gnutls_session);
  301       gnutls_anon_free_client_credentials(m_gnutls_anoncred);
  302 #endif
  303       m_isTLS = false;
  304     }
  305     else
  306 #endif
  307     {
  308       close(m_fd);
  309     }
  310   }
  311   m_open = false;
  312   return 0;
  313 }
  314 
  315 ERROR_TYPE tcp::sendData(CPCCHAR buf, int size)
  316 {
  317   if(!m_open)
  318     return eCorrupt;
  319   int sent = 0;
  320   m_poll.events = POLLOUT | POLLERR | POLLHUP | POLLNVAL;
  321   int rc;
  322   while(sent != size)
  323   {
  324     if(*m_exitCount > 1)
  325       return eCtrl_C;
  326     rc = poll(&m_poll, 1, 60000);
  327     if(rc == 0)
  328     {
  329       fprintf(stderr, "Server timed out on write.\n");
  330       error();
  331       return eTimeout;
  332     }
  333     if(rc < 0)
  334     {
  335       fprintf(stderr, "Poll error.\n");
  336       error();
  337       return eSocket;
  338     }
  339 #ifdef USE_SSL
  340     if(m_isTLS)
  341     {
  342 #ifdef USE_OPENSSL
  343       rc = SSL_write(m_ssl, &buf[sent], size - sent);
  344 #else
  345       rc = gnutls_record_send(*m_gnutls_session, &buf[sent], size - sent);
  346 #endif
  347     }
  348     else
  349 #endif
  350     {
  351       rc = write(m_fd, &buf[sent], size - sent);
  352     }
  353     if(rc < 1)
  354     {
  355       fprintf(stderr, "Can't write to socket.\n");
  356       error();
  357       return eSocket;
  358     }
  359     if(m_debug)
  360       m_debug->Write(buf, rc);
  361     sent += rc;
  362   }
  363   sentData(size);
  364   return eNoError;
  365 }
  366 
  367 // fgets() doesn't
  368 int tcp::readLine(char *buf, int bufSize)
  369 {
  370   if(!m_open)
  371     return eCorrupt;
  372   int ind = 0;
  373   if(m_start < m_end)
  374   {
  375     do
  376     {
  377       buf[ind] = m_buf[m_start];
  378       ind++;
  379       m_start++;
  380     }
  381     while(m_start < m_end && m_buf[m_start - 1] != '\n' && ind < bufSize);
  382   }
  383   if(ind == bufSize || (ind > 0 && buf[ind - 1] == '\n') )
  384   {
  385     if(ind < bufSize)
  386       buf[ind] = '\0';
  387     receivedData(ind);
  388     if(m_debug)
  389       m_debug->Write(buf, ind);
  390     return ind;
  391   }
  392   // buffer is empty
  393   m_start = 0;
  394   m_end = 0;
  395 
  396   time_t now = time(NULL);
  397   m_poll.events = POLLIN | POLLERR | POLLHUP | POLLNVAL;
  398   while(1)
  399   {
  400     if(*m_exitCount > 1)
  401       return 3;
  402     int timeout = 60 - (time(NULL) - now);
  403     int rc;
  404     if(timeout < 0 || (rc = poll(&m_poll, 1, timeout * 1000)) == 0)
  405     {
  406       fprintf(stderr, "Server timed out on read.\n");
  407       error();
  408       return eSocket;
  409     }
  410     if(rc < 0)
  411     {
  412       if(errno == EINTR)
  413         continue;
  414       fprintf(stderr, "Poll error.\n");
  415       error();
  416       return eCorrupt;
  417     }
  418 #ifdef USE_SSL
  419     if(m_isTLS)
  420     {
  421 #ifdef USE_OPENSSL
  422       rc = SSL_read(m_ssl, m_buf, sizeof(m_buf));
  423 #else
  424       rc = gnutls_record_recv(*m_gnutls_session, m_buf, sizeof(m_buf));
  425 #endif
  426     }
  427     else
  428 #endif
  429     {
  430       rc = read(m_fd, m_buf, sizeof(m_buf));
  431     }
  432     if(rc < 0)
  433     {
  434       error();
  435       return eSocket;
  436     }
  437     m_end = rc;
  438     do
  439     {
  440       buf[ind] = m_buf[m_start];
  441       ind++;
  442       m_start++;
  443     } while(m_start < m_end && m_buf[m_start - 1] != '\n' && ind < bufSize);
  444 
  445     if(ind == bufSize || (ind > 0 && buf[ind - 1] == '\n') )
  446     {
  447       if(ind < bufSize)
  448         buf[ind] = '\0';
  449       receivedData(ind);
  450       if(m_debug)
  451         m_debug->Write(buf, ind);
  452       return ind;
  453     }
  454     if(m_start == m_end)
  455     {
  456       m_start = 0;
  457       m_end = 0;
  458     }
  459   }
  460   return 0;
  461 }
  462 
  463 ERROR_TYPE tcp::sendCommandString(const string &str, bool important)
  464 {
  465   return sendCommandData(str.c_str(), str.size(), important);
  466 }
  467 
  468 ERROR_TYPE tcp::sendCommandData(const char *buf, int size, bool important)
  469 {
  470   ERROR_TYPE rc = sendData(buf, size);
  471   if(rc)
  472     return rc;
  473   return readCommandResp(important);
  474 }
  475 
  476 void tcp::endIt()
  477 {
  478   if(m_open)
  479     close(m_fd);
  480   m_open = false;
  481 }
  482 
  483 int tcp::doAllWork(int rate)
  484 {
  485   double workCount = 0.0;
  486   char data[2048];
  487   time_t lastTime = time(NULL) - RESULTS_LAG;
  488   int toSend;
  489   for(unsigned int i = 0; i < sizeof(data); i++)
  490     data[i] = 1;
  491  
  492   while(1)
  493   {
  494     int rc;
  495     if(rate)
  496     {
  497       time_t newTime = time(NULL);
  498       workCount += double(newTime - lastTime) / 60.0 * double(rate);
  499       toSend = int(workCount);
  500       if(toSend > int(sizeof(data)) )
  501       {
  502         toSend = sizeof(data);
  503         workCount = 0.0;
  504       }
  505       else
  506       {
  507         workCount -= double(toSend);
  508       }
  509       lastTime = newTime;
  510     }
  511     else
  512       toSend = sizeof(data);
  513     // NB if data can't be written then it is discarded.
  514     // Buffer size will be at least 1024 bytes, if worker threads aren't
  515     // keeping up then we don't really want more than that in the queue.
  516     if(toSend)
  517     {
  518       rc = WriteWork(data, toSend, 5);
  519       if(rc < 0)
  520         return -rc;
  521     }
  522  
  523     if(*m_exitCount > 1)
  524       return -1;
  525     rc = pollRead();
  526     if(rc)
  527       return rc;
  528   }
  529   return 0;
  530 }
  531