"Fossies" - the Fresh Open Source Software Archive 
Member "postal-0.76/tcp.cpp" (1 Jan 2012, 11063 Bytes) of package /linux/privat/postal-0.76.tgz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style:
standard) with prefixed line numbers and
code folding option.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "tcp.cpp" see the
Fossies "Dox" file reference documentation.
1 #define TCP_BODY
2
3 #include "tcp.h"
4
5 #include <sys/socket.h>
6 #include <unistd.h>
7 #include <arpa/inet.h>
8 #include <fcntl.h>
9 #include <errno.h>
10 #include <cstring>
11 #include "postal.h"
12 #include "userlist.h"
13 #include "address.h"
14 #include "logit.h"
15
16 tcp::tcp(int *exitCount, const char *addr, unsigned short default_port
17 , Logit *log
18 #ifdef USE_SSL
19 , int ssl
20 #endif
21 , const char *sourceAddr, Logit *debug)
22 : m_md5()
23 , m_destAffinity(0)
24 , m_log(log)
25 #ifdef USE_SSL
26 , m_canTLS(false)
27 , m_useTLS(ssl)
28 #endif
29 , m_exitCount(exitCount)
30 , m_fd(-1)
31 , m_start(0)
32 , m_end(0)
33 , m_open(false)
34 , m_addr(new address(addr, default_port))
35 , m_sourceAddr(NULL)
36 , m_debug(debug)
37 #ifdef USE_SSL
38 #ifdef USE_OPENSSL
39 , m_sslMeth(NULL)
40 , m_sslCtx(NULL)
41 , m_ssl(NULL)
42 #else
43 , m_gnutls_session(NULL)
44 , m_gnutls_anoncred(NULL)
45 #endif
46 , m_isTLS(false)
47 #endif
48 {
49 if(sourceAddr)
50 m_sourceAddr = new address(sourceAddr);
51 m_destAffinity = getThreadNum() % m_addr->addressCount();
52 #ifdef USE_SSL
53 if(m_useTLS)
54 {
55 #ifdef USE_OPENSSL
56 //don't seem to need this SSL_library_init();
57 SSLeay_add_ssl_algorithms();
58 SSL_load_error_strings();
59 #endif
60 }
61 #endif
62 }
63
64 tcp::tcp(int threadNum, const tcp *parent)
65 : Thread(threadNum, parent)
66 , m_md5()
67 , m_destAffinity(parent->m_destAffinity)
68 , m_log(parent->m_log)
69 #ifdef USE_SSL
70 , m_canTLS(false)
71 , m_useTLS(parent->m_useTLS)
72 #endif
73 , m_exitCount(parent->m_exitCount)
74 , m_fd(-1)
75 , m_start(0)
76 , m_end(0)
77 , m_open(false)
78 , m_addr(parent->m_addr)
79 , m_sourceAddr(parent->m_sourceAddr)
80 , m_debug(parent->m_debug ? new Logit(*(parent->m_debug), threadNum) : NULL)
81 #ifdef USE_SSL
82 #ifdef USE_OPENSSL
83 , m_sslMeth(NULL)
84 , m_sslCtx(NULL)
85 , m_ssl(NULL)
86 #else
87 , m_gnutls_session(NULL)
88 , m_gnutls_anoncred(NULL)
89 #endif
90 , m_isTLS(false)
91 #endif
92 {
93 }
94
95 tcp::~tcp()
96 {
97 disconnect();
98 if(getThreadNum() < 1)
99 {
100 delete m_addr;
101 delete m_sourceAddr;
102 }
103 if(m_debug)
104 delete m_debug;
105 }
106
107 int tcp::Connect(short port)
108 {
109 if(*m_exitCount)
110 return 1;
111 #ifdef USE_SSL
112 m_canTLS = false;
113 #endif
114 m_start = 0;
115 m_end = 0;
116 #ifdef USE_SSL
117 m_isTLS = false;
118 #endif
119 sockaddr *sa;
120 sa = m_addr->get_addr(m_destAffinity);
121 if(!sa)
122 return 1;
123 m_fd = socket(PF_INET, SOCK_STREAM, 0);
124 if(m_fd < 0)
125 {
126 fprintf(stderr, "Can't open socket.\n");
127 error();
128 return 2;
129 }
130 int rc;
131 if(m_sourceAddr)
132 {
133 sockaddr *source;
134 source = (sockaddr *)m_sourceAddr->get_rand_addr();
135 rc = bind(m_fd, source, sizeof(struct sockaddr_in));
136 if(rc)
137 {
138 fprintf(stderr, "Can't bind to port.\n");
139 error();
140 close(m_fd);
141 return 2;
142 }
143 }
144 m_poll.fd = m_fd;
145 if(port)
146 {
147 struct sockaddr_in newAddr;
148 memcpy(&newAddr, sa, sizeof(newAddr));
149 newAddr.sin_port = htons(port);
150 rc = connect(m_fd, (sockaddr *)&newAddr, sizeof(struct sockaddr_in));
151 }
152 else
153 {
154 rc = connect(m_fd, sa, sizeof(struct sockaddr_in));
155 }
156 if(rc)
157 {
158 fprintf(stderr, "Can't connect to %s port %d.\n"
159 , inet_ntoa(((sockaddr_in *)sa)->sin_addr)
160 , int(ntohs(((sockaddr_in *)sa)->sin_port)) );
161 error();
162 close(m_fd);
163 return 1;
164 }
165 socklen_t namelen = sizeof(m_connectionLocalAddr);
166 rc = getsockname(m_fd, (struct sockaddr *)&m_connectionLocalAddr, &namelen);
167 if(rc)
168 fprintf(stderr, "Can't getsockname!\n");
169 if(m_debug)
170 m_debug->reopen();
171 m_open = true;
172 return 0;
173 }
174
175 #ifdef USE_SSL
176 int tcp::ConnectTLS()
177 {
178 #ifdef USE_OPENSSL
179 m_sslCtx = NULL;
180 m_ssl = NULL;
181 m_sslMeth = SSLv2_client_method();
182 if(m_sslMeth == NULL)
183 {
184 fprintf(stderr, "Can't get SSLv2_client_method.\n");
185 error();
186 return 2;
187 }
188 m_sslCtx = SSL_CTX_new(m_sslMeth);
189 if(m_sslCtx == NULL)
190 {
191 fprintf(stderr, "Can't SSL_CTX_new\n");
192 error();
193 return 2;
194 }
195 if((m_ssl = SSL_new(m_sslCtx)) == NULL)
196 {
197 fprintf(stderr, "Can't SSL_new\n");
198 SSL_CTX_free(m_sslCtx);
199 error();
200 return 2;
201 }
202 SSL_set_fd(m_ssl, m_fd);
203 if(-1 == SSL_connect(m_ssl))
204 {
205 fprintf(stderr, "Can't SSL_CONNECT\n");
206 SSL_free(m_ssl);
207 SSL_CTX_free(m_sslCtx);
208 error();
209 return 1;
210 }
211 #else
212 gnutls_anon_allocate_client_credentials(&m_gnutls_anoncred);
213 m_gnutls_session = new gnutls_session_t;
214 // Initialize TLS session
215 gnutls_init (m_gnutls_session, GNUTLS_CLIENT);
216 // Use default priorities
217 gnutls_set_default_priority(*m_gnutls_session); // bug in gnutls interface?
218 // Need to enable anonymous specifically
219 gnutls_priority_set_direct(*m_gnutls_session, "NORMAL:+ANON-DH", NULL);
220 // put the anonymous credentials to the current session
221 gnutls_credentials_set(*m_gnutls_session, GNUTLS_CRD_ANON, m_gnutls_anoncred);
222 gnutls_transport_set_ptr(*m_gnutls_session, (gnutls_transport_ptr_t)m_fd);
223
224 // Perform the TLS handshake
225 if(gnutls_handshake(*m_gnutls_session) < 0)
226 {
227 fprintf(stderr, "Can't gnutls_handshake\n");
228 gnutls_deinit(*m_gnutls_session);
229 gnutls_anon_free_client_credentials(m_gnutls_anoncred);
230 error();
231 return 1;
232 }
233 #endif
234 if(*m_exitCount > 1)
235 return 3;
236 m_isTLS = true;
237
238 #if 0
239 // openssl debugging code that may be useful to have around
240 // in a commented-out state.
241 /* Following two steps are optional and not required for
242 data exchange to be successful. */
243
244 /* Get the cipher - opt */
245
246 printf ("SSL connection using %s\n", SSL_get_cipher(m_ssl));
247
248 /* Get server's certificate (note: beware of dynamic allocation) - opt */
249
250 X509 *server_cert;
251 server_cert = SSL_get_peer_certificate(m_ssl);
252 if(!server_cert)
253 {
254 fprintf(stderr, "Can't SSL_get_peer_certificate\n");
255 return 2;
256 }
257 printf ("Server certificate:\n");
258
259 char *str = X509_NAME_oneline(X509_get_subject_name(server_cert),0,0);
260 if(!str)
261 {
262 fprintf(stderr, "Can't X509_NAME_oneline\n");
263 return 2;
264 }
265 printf ("\t subject: %s\n", str);
266 Free (str);
267 str = X509_NAME_oneline (X509_get_issuer_name(server_cert),0,0);
268 if(!str)
269 {
270 fprintf(stderr, "Can't X509_get_issuer_name\n");
271 return 2;
272 }
273 printf ("\t issuer: %s\n", str);
274 Free (str);
275
276 /* We could do all sorts of certificate verification stuff here before
277 deallocating the certificate. */
278
279 X509_free(server_cert);
280 #endif
281 return 0;
282 }
283 #endif
284
285 int tcp::disconnect()
286 {
287 if(m_open)
288 {
289 #ifdef USE_SSL
290 if(m_isTLS)
291 {
292 #ifdef USE_OPENSSL
293 SSL_shutdown(m_ssl);
294 close(m_fd);
295 SSL_free(m_ssl);
296 SSL_CTX_free(m_sslCtx);
297 #else
298 gnutls_bye(*m_gnutls_session, GNUTLS_SHUT_RDWR);
299 close(m_fd);
300 gnutls_deinit(*m_gnutls_session);
301 gnutls_anon_free_client_credentials(m_gnutls_anoncred);
302 #endif
303 m_isTLS = false;
304 }
305 else
306 #endif
307 {
308 close(m_fd);
309 }
310 }
311 m_open = false;
312 return 0;
313 }
314
315 ERROR_TYPE tcp::sendData(CPCCHAR buf, int size)
316 {
317 if(!m_open)
318 return eCorrupt;
319 int sent = 0;
320 m_poll.events = POLLOUT | POLLERR | POLLHUP | POLLNVAL;
321 int rc;
322 while(sent != size)
323 {
324 if(*m_exitCount > 1)
325 return eCtrl_C;
326 rc = poll(&m_poll, 1, 60000);
327 if(rc == 0)
328 {
329 fprintf(stderr, "Server timed out on write.\n");
330 error();
331 return eTimeout;
332 }
333 if(rc < 0)
334 {
335 fprintf(stderr, "Poll error.\n");
336 error();
337 return eSocket;
338 }
339 #ifdef USE_SSL
340 if(m_isTLS)
341 {
342 #ifdef USE_OPENSSL
343 rc = SSL_write(m_ssl, &buf[sent], size - sent);
344 #else
345 rc = gnutls_record_send(*m_gnutls_session, &buf[sent], size - sent);
346 #endif
347 }
348 else
349 #endif
350 {
351 rc = write(m_fd, &buf[sent], size - sent);
352 }
353 if(rc < 1)
354 {
355 fprintf(stderr, "Can't write to socket.\n");
356 error();
357 return eSocket;
358 }
359 if(m_debug)
360 m_debug->Write(buf, rc);
361 sent += rc;
362 }
363 sentData(size);
364 return eNoError;
365 }
366
367 // fgets() doesn't
368 int tcp::readLine(char *buf, int bufSize)
369 {
370 if(!m_open)
371 return eCorrupt;
372 int ind = 0;
373 if(m_start < m_end)
374 {
375 do
376 {
377 buf[ind] = m_buf[m_start];
378 ind++;
379 m_start++;
380 }
381 while(m_start < m_end && m_buf[m_start - 1] != '\n' && ind < bufSize);
382 }
383 if(ind == bufSize || (ind > 0 && buf[ind - 1] == '\n') )
384 {
385 if(ind < bufSize)
386 buf[ind] = '\0';
387 receivedData(ind);
388 if(m_debug)
389 m_debug->Write(buf, ind);
390 return ind;
391 }
392 // buffer is empty
393 m_start = 0;
394 m_end = 0;
395
396 time_t now = time(NULL);
397 m_poll.events = POLLIN | POLLERR | POLLHUP | POLLNVAL;
398 while(1)
399 {
400 if(*m_exitCount > 1)
401 return 3;
402 int timeout = 60 - (time(NULL) - now);
403 int rc;
404 if(timeout < 0 || (rc = poll(&m_poll, 1, timeout * 1000)) == 0)
405 {
406 fprintf(stderr, "Server timed out on read.\n");
407 error();
408 return eSocket;
409 }
410 if(rc < 0)
411 {
412 if(errno == EINTR)
413 continue;
414 fprintf(stderr, "Poll error.\n");
415 error();
416 return eCorrupt;
417 }
418 #ifdef USE_SSL
419 if(m_isTLS)
420 {
421 #ifdef USE_OPENSSL
422 rc = SSL_read(m_ssl, m_buf, sizeof(m_buf));
423 #else
424 rc = gnutls_record_recv(*m_gnutls_session, m_buf, sizeof(m_buf));
425 #endif
426 }
427 else
428 #endif
429 {
430 rc = read(m_fd, m_buf, sizeof(m_buf));
431 }
432 if(rc < 0)
433 {
434 error();
435 return eSocket;
436 }
437 m_end = rc;
438 do
439 {
440 buf[ind] = m_buf[m_start];
441 ind++;
442 m_start++;
443 } while(m_start < m_end && m_buf[m_start - 1] != '\n' && ind < bufSize);
444
445 if(ind == bufSize || (ind > 0 && buf[ind - 1] == '\n') )
446 {
447 if(ind < bufSize)
448 buf[ind] = '\0';
449 receivedData(ind);
450 if(m_debug)
451 m_debug->Write(buf, ind);
452 return ind;
453 }
454 if(m_start == m_end)
455 {
456 m_start = 0;
457 m_end = 0;
458 }
459 }
460 return 0;
461 }
462
463 ERROR_TYPE tcp::sendCommandString(const string &str, bool important)
464 {
465 return sendCommandData(str.c_str(), str.size(), important);
466 }
467
468 ERROR_TYPE tcp::sendCommandData(const char *buf, int size, bool important)
469 {
470 ERROR_TYPE rc = sendData(buf, size);
471 if(rc)
472 return rc;
473 return readCommandResp(important);
474 }
475
476 void tcp::endIt()
477 {
478 if(m_open)
479 close(m_fd);
480 m_open = false;
481 }
482
483 int tcp::doAllWork(int rate)
484 {
485 double workCount = 0.0;
486 char data[2048];
487 time_t lastTime = time(NULL) - RESULTS_LAG;
488 int toSend;
489 for(unsigned int i = 0; i < sizeof(data); i++)
490 data[i] = 1;
491
492 while(1)
493 {
494 int rc;
495 if(rate)
496 {
497 time_t newTime = time(NULL);
498 workCount += double(newTime - lastTime) / 60.0 * double(rate);
499 toSend = int(workCount);
500 if(toSend > int(sizeof(data)) )
501 {
502 toSend = sizeof(data);
503 workCount = 0.0;
504 }
505 else
506 {
507 workCount -= double(toSend);
508 }
509 lastTime = newTime;
510 }
511 else
512 toSend = sizeof(data);
513 // NB if data can't be written then it is discarded.
514 // Buffer size will be at least 1024 bytes, if worker threads aren't
515 // keeping up then we don't really want more than that in the queue.
516 if(toSend)
517 {
518 rc = WriteWork(data, toSend, 5);
519 if(rc < 0)
520 return -rc;
521 }
522
523 if(*m_exitCount > 1)
524 return -1;
525 rc = pollRead();
526 if(rc)
527 return rc;
528 }
529 return 0;
530 }
531