"Fossies" - the Fresh Open Source Software Archive 
Member "wrk-4.2.0/src/wrk.c" (7 Feb 2021, 17037 Bytes) of package /linux/www/wrk-4.2.0.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style:
standard) with prefixed line numbers and
code folding option.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "wrk.c" see the
Fossies "Dox" file reference documentation.
1 // Copyright (C) 2012 - Will Glozer. All rights reserved.
2
3 #include "wrk.h"
4 #include "script.h"
5 #include "main.h"
6
7 static struct config {
8 uint64_t connections;
9 uint64_t duration;
10 uint64_t threads;
11 uint64_t timeout;
12 uint64_t pipeline;
13 bool delay;
14 bool dynamic;
15 bool latency;
16 char *host;
17 char *script;
18 SSL_CTX *ctx;
19 } cfg;
20
21 static struct {
22 stats *latency;
23 stats *requests;
24 } statistics;
25
26 static struct sock sock = {
27 .connect = sock_connect,
28 .close = sock_close,
29 .read = sock_read,
30 .write = sock_write,
31 .readable = sock_readable
32 };
33
34 static struct http_parser_settings parser_settings = {
35 .on_message_complete = response_complete
36 };
37
38 static volatile sig_atomic_t stop = 0;
39
40 static void handler(int sig) {
41 stop = 1;
42 }
43
44 static void usage() {
45 printf("Usage: wrk <options> <url> \n"
46 " Options: \n"
47 " -c, --connections <N> Connections to keep open \n"
48 " -d, --duration <T> Duration of test \n"
49 " -t, --threads <N> Number of threads to use \n"
50 " \n"
51 " -s, --script <S> Load Lua script file \n"
52 " -H, --header <H> Add header to request \n"
53 " --latency Print latency statistics \n"
54 " --timeout <T> Socket/request timeout \n"
55 " -v, --version Print version details \n"
56 " \n"
57 " Numeric arguments may include a SI unit (1k, 1M, 1G)\n"
58 " Time arguments may include a time unit (2s, 2m, 2h)\n");
59 }
60
61 int main(int argc, char **argv) {
62 char *url, **headers = zmalloc(argc * sizeof(char *));
63 struct http_parser_url parts = {};
64
65 if (parse_args(&cfg, &url, &parts, headers, argc, argv)) {
66 usage();
67 exit(1);
68 }
69
70 char *schema = copy_url_part(url, &parts, UF_SCHEMA);
71 char *host = copy_url_part(url, &parts, UF_HOST);
72 char *port = copy_url_part(url, &parts, UF_PORT);
73 char *service = port ? port : schema;
74
75 if (!strncmp("https", schema, 5)) {
76 if ((cfg.ctx = ssl_init()) == NULL) {
77 fprintf(stderr, "unable to initialize SSL\n");
78 ERR_print_errors_fp(stderr);
79 exit(1);
80 }
81 sock.connect = ssl_connect;
82 sock.close = ssl_close;
83 sock.read = ssl_read;
84 sock.write = ssl_write;
85 sock.readable = ssl_readable;
86 }
87
88 signal(SIGPIPE, SIG_IGN);
89 signal(SIGINT, SIG_IGN);
90
91 statistics.latency = stats_alloc(cfg.timeout * 1000);
92 statistics.requests = stats_alloc(MAX_THREAD_RATE_S);
93 thread *threads = zcalloc(cfg.threads * sizeof(thread));
94
95 lua_State *L = script_create(cfg.script, url, headers);
96 if (!script_resolve(L, host, service)) {
97 char *msg = strerror(errno);
98 fprintf(stderr, "unable to connect to %s:%s %s\n", host, service, msg);
99 exit(1);
100 }
101
102 cfg.host = host;
103
104 for (uint64_t i = 0; i < cfg.threads; i++) {
105 thread *t = &threads[i];
106 t->loop = aeCreateEventLoop(10 + cfg.connections * 3);
107 t->connections = cfg.connections / cfg.threads;
108
109 t->L = script_create(cfg.script, url, headers);
110 script_init(L, t, argc - optind, &argv[optind]);
111
112 if (i == 0) {
113 cfg.pipeline = script_verify_request(t->L);
114 cfg.dynamic = !script_is_static(t->L);
115 cfg.delay = script_has_delay(t->L);
116 if (script_want_response(t->L)) {
117 parser_settings.on_header_field = header_field;
118 parser_settings.on_header_value = header_value;
119 parser_settings.on_body = response_body;
120 }
121 }
122
123 if (!t->loop || pthread_create(&t->thread, NULL, &thread_main, t)) {
124 char *msg = strerror(errno);
125 fprintf(stderr, "unable to create thread %"PRIu64": %s\n", i, msg);
126 exit(2);
127 }
128 }
129
130 struct sigaction sa = {
131 .sa_handler = handler,
132 .sa_flags = 0,
133 };
134 sigfillset(&sa.sa_mask);
135 sigaction(SIGINT, &sa, NULL);
136
137 char *time = format_time_s(cfg.duration);
138 printf("Running %s test @ %s\n", time, url);
139 printf(" %"PRIu64" threads and %"PRIu64" connections\n", cfg.threads, cfg.connections);
140
141 uint64_t start = time_us();
142 uint64_t complete = 0;
143 uint64_t bytes = 0;
144 errors errors = { 0 };
145
146 sleep(cfg.duration);
147 stop = 1;
148
149 for (uint64_t i = 0; i < cfg.threads; i++) {
150 thread *t = &threads[i];
151 pthread_join(t->thread, NULL);
152
153 complete += t->complete;
154 bytes += t->bytes;
155
156 errors.connect += t->errors.connect;
157 errors.read += t->errors.read;
158 errors.write += t->errors.write;
159 errors.timeout += t->errors.timeout;
160 errors.status += t->errors.status;
161 }
162
163 uint64_t runtime_us = time_us() - start;
164 long double runtime_s = runtime_us / 1000000.0;
165 long double req_per_s = complete / runtime_s;
166 long double bytes_per_s = bytes / runtime_s;
167
168 if (complete / cfg.connections > 0) {
169 int64_t interval = runtime_us / (complete / cfg.connections);
170 stats_correct(statistics.latency, interval);
171 }
172
173 print_stats_header();
174 print_stats("Latency", statistics.latency, format_time_us);
175 print_stats("Req/Sec", statistics.requests, format_metric);
176 if (cfg.latency) print_stats_latency(statistics.latency);
177
178 char *runtime_msg = format_time_us(runtime_us);
179
180 printf(" %"PRIu64" requests in %s, %sB read\n", complete, runtime_msg, format_binary(bytes));
181 if (errors.connect || errors.read || errors.write || errors.timeout) {
182 printf(" Socket errors: connect %d, read %d, write %d, timeout %d\n",
183 errors.connect, errors.read, errors.write, errors.timeout);
184 }
185
186 if (errors.status) {
187 printf(" Non-2xx or 3xx responses: %d\n", errors.status);
188 }
189
190 printf("Requests/sec: %9.2Lf\n", req_per_s);
191 printf("Transfer/sec: %10sB\n", format_binary(bytes_per_s));
192
193 if (script_has_done(L)) {
194 script_summary(L, runtime_us, complete, bytes);
195 script_errors(L, &errors);
196 script_done(L, statistics.latency, statistics.requests);
197 }
198
199 return 0;
200 }
201
202 void *thread_main(void *arg) {
203 thread *thread = arg;
204
205 char *request = NULL;
206 size_t length = 0;
207
208 if (!cfg.dynamic) {
209 script_request(thread->L, &request, &length);
210 }
211
212 thread->cs = zcalloc(thread->connections * sizeof(connection));
213 connection *c = thread->cs;
214
215 for (uint64_t i = 0; i < thread->connections; i++, c++) {
216 c->thread = thread;
217 c->ssl = cfg.ctx ? SSL_new(cfg.ctx) : NULL;
218 c->request = request;
219 c->length = length;
220 c->delayed = cfg.delay;
221 connect_socket(thread, c);
222 }
223
224 aeEventLoop *loop = thread->loop;
225 aeCreateTimeEvent(loop, RECORD_INTERVAL_MS, record_rate, thread, NULL);
226
227 thread->start = time_us();
228 aeMain(loop);
229
230 aeDeleteEventLoop(loop);
231 zfree(thread->cs);
232
233 return NULL;
234 }
235
236 static int connect_socket(thread *thread, connection *c) {
237 struct addrinfo *addr = thread->addr;
238 struct aeEventLoop *loop = thread->loop;
239 int fd, flags;
240
241 fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
242
243 flags = fcntl(fd, F_GETFL, 0);
244 fcntl(fd, F_SETFL, flags | O_NONBLOCK);
245
246 if (connect(fd, addr->ai_addr, addr->ai_addrlen) == -1) {
247 if (errno != EINPROGRESS) goto error;
248 }
249
250 flags = 1;
251 setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
252
253 flags = AE_READABLE | AE_WRITABLE;
254 if (aeCreateFileEvent(loop, fd, flags, socket_connected, c) == AE_OK) {
255 c->parser.data = c;
256 c->fd = fd;
257 return fd;
258 }
259
260 error:
261 thread->errors.connect++;
262 close(fd);
263 return -1;
264 }
265
266 static int reconnect_socket(thread *thread, connection *c) {
267 aeDeleteFileEvent(thread->loop, c->fd, AE_WRITABLE | AE_READABLE);
268 sock.close(c);
269 close(c->fd);
270 return connect_socket(thread, c);
271 }
272
273 static int record_rate(aeEventLoop *loop, long long id, void *data) {
274 thread *thread = data;
275
276 if (thread->requests > 0) {
277 uint64_t elapsed_ms = (time_us() - thread->start) / 1000;
278 uint64_t requests = (thread->requests / (double) elapsed_ms) * 1000;
279
280 stats_record(statistics.requests, requests);
281
282 thread->requests = 0;
283 thread->start = time_us();
284 }
285
286 if (stop) aeStop(loop);
287
288 return RECORD_INTERVAL_MS;
289 }
290
291 static int delay_request(aeEventLoop *loop, long long id, void *data) {
292 connection *c = data;
293 c->delayed = false;
294 aeCreateFileEvent(loop, c->fd, AE_WRITABLE, socket_writeable, c);
295 return AE_NOMORE;
296 }
297
298 static int header_field(http_parser *parser, const char *at, size_t len) {
299 connection *c = parser->data;
300 if (c->state == VALUE) {
301 *c->headers.cursor++ = '\0';
302 c->state = FIELD;
303 }
304 buffer_append(&c->headers, at, len);
305 return 0;
306 }
307
308 static int header_value(http_parser *parser, const char *at, size_t len) {
309 connection *c = parser->data;
310 if (c->state == FIELD) {
311 *c->headers.cursor++ = '\0';
312 c->state = VALUE;
313 }
314 buffer_append(&c->headers, at, len);
315 return 0;
316 }
317
318 static int response_body(http_parser *parser, const char *at, size_t len) {
319 connection *c = parser->data;
320 buffer_append(&c->body, at, len);
321 return 0;
322 }
323
324 static int response_complete(http_parser *parser) {
325 connection *c = parser->data;
326 thread *thread = c->thread;
327 uint64_t now = time_us();
328 int status = parser->status_code;
329
330 thread->complete++;
331 thread->requests++;
332
333 if (status > 399) {
334 thread->errors.status++;
335 }
336
337 if (c->headers.buffer) {
338 *c->headers.cursor++ = '\0';
339 script_response(thread->L, status, &c->headers, &c->body);
340 c->state = FIELD;
341 }
342
343 if (--c->pending == 0) {
344 if (!stats_record(statistics.latency, now - c->start)) {
345 thread->errors.timeout++;
346 }
347 c->delayed = cfg.delay;
348 aeCreateFileEvent(thread->loop, c->fd, AE_WRITABLE, socket_writeable, c);
349 }
350
351 if (!http_should_keep_alive(parser)) {
352 reconnect_socket(thread, c);
353 goto done;
354 }
355
356 http_parser_init(parser, HTTP_RESPONSE);
357
358 done:
359 return 0;
360 }
361
362 static void socket_connected(aeEventLoop *loop, int fd, void *data, int mask) {
363 connection *c = data;
364
365 switch (sock.connect(c, cfg.host)) {
366 case OK: break;
367 case ERROR: goto error;
368 case RETRY: return;
369 }
370
371 http_parser_init(&c->parser, HTTP_RESPONSE);
372 c->written = 0;
373
374 aeCreateFileEvent(c->thread->loop, fd, AE_READABLE, socket_readable, c);
375 aeCreateFileEvent(c->thread->loop, fd, AE_WRITABLE, socket_writeable, c);
376
377 return;
378
379 error:
380 c->thread->errors.connect++;
381 reconnect_socket(c->thread, c);
382 }
383
384 static void socket_writeable(aeEventLoop *loop, int fd, void *data, int mask) {
385 connection *c = data;
386 thread *thread = c->thread;
387
388 if (c->delayed) {
389 uint64_t delay = script_delay(thread->L);
390 aeDeleteFileEvent(loop, fd, AE_WRITABLE);
391 aeCreateTimeEvent(loop, delay, delay_request, c, NULL);
392 return;
393 }
394
395 if (!c->written) {
396 if (cfg.dynamic) {
397 script_request(thread->L, &c->request, &c->length);
398 }
399 c->start = time_us();
400 c->pending = cfg.pipeline;
401 }
402
403 char *buf = c->request + c->written;
404 size_t len = c->length - c->written;
405 size_t n;
406
407 switch (sock.write(c, buf, len, &n)) {
408 case OK: break;
409 case ERROR: goto error;
410 case RETRY: return;
411 }
412
413 c->written += n;
414 if (c->written == c->length) {
415 c->written = 0;
416 aeDeleteFileEvent(loop, fd, AE_WRITABLE);
417 }
418
419 return;
420
421 error:
422 thread->errors.write++;
423 reconnect_socket(thread, c);
424 }
425
426 static void socket_readable(aeEventLoop *loop, int fd, void *data, int mask) {
427 connection *c = data;
428 size_t n;
429
430 do {
431 switch (sock.read(c, &n)) {
432 case OK: break;
433 case ERROR: goto error;
434 case RETRY: return;
435 }
436
437 if (http_parser_execute(&c->parser, &parser_settings, c->buf, n) != n) goto error;
438 if (n == 0 && !http_body_is_final(&c->parser)) goto error;
439
440 c->thread->bytes += n;
441 } while (n == RECVBUF && sock.readable(c) > 0);
442
443 return;
444
445 error:
446 c->thread->errors.read++;
447 reconnect_socket(c->thread, c);
448 }
449
450 static uint64_t time_us() {
451 struct timeval t;
452 gettimeofday(&t, NULL);
453 return (t.tv_sec * 1000000) + t.tv_usec;
454 }
455
456 static char *copy_url_part(char *url, struct http_parser_url *parts, enum http_parser_url_fields field) {
457 char *part = NULL;
458
459 if (parts->field_set & (1 << field)) {
460 uint16_t off = parts->field_data[field].off;
461 uint16_t len = parts->field_data[field].len;
462 part = zcalloc(len + 1 * sizeof(char));
463 memcpy(part, &url[off], len);
464 }
465
466 return part;
467 }
468
469 static struct option longopts[] = {
470 { "connections", required_argument, NULL, 'c' },
471 { "duration", required_argument, NULL, 'd' },
472 { "threads", required_argument, NULL, 't' },
473 { "script", required_argument, NULL, 's' },
474 { "header", required_argument, NULL, 'H' },
475 { "latency", no_argument, NULL, 'L' },
476 { "timeout", required_argument, NULL, 'T' },
477 { "help", no_argument, NULL, 'h' },
478 { "version", no_argument, NULL, 'v' },
479 { NULL, 0, NULL, 0 }
480 };
481
482 static int parse_args(struct config *cfg, char **url, struct http_parser_url *parts, char **headers, int argc, char **argv) {
483 char **header = headers;
484 int c;
485
486 memset(cfg, 0, sizeof(struct config));
487 cfg->threads = 2;
488 cfg->connections = 10;
489 cfg->duration = 10;
490 cfg->timeout = SOCKET_TIMEOUT_MS;
491
492 while ((c = getopt_long(argc, argv, "t:c:d:s:H:T:Lrv?", longopts, NULL)) != -1) {
493 switch (c) {
494 case 't':
495 if (scan_metric(optarg, &cfg->threads)) return -1;
496 break;
497 case 'c':
498 if (scan_metric(optarg, &cfg->connections)) return -1;
499 break;
500 case 'd':
501 if (scan_time(optarg, &cfg->duration)) return -1;
502 break;
503 case 's':
504 cfg->script = optarg;
505 break;
506 case 'H':
507 *header++ = optarg;
508 break;
509 case 'L':
510 cfg->latency = true;
511 break;
512 case 'T':
513 if (scan_time(optarg, &cfg->timeout)) return -1;
514 cfg->timeout *= 1000;
515 break;
516 case 'v':
517 printf("wrk %s [%s] ", VERSION, aeGetApiName());
518 printf("Copyright (C) 2012 Will Glozer\n");
519 break;
520 case 'h':
521 case '?':
522 case ':':
523 default:
524 return -1;
525 }
526 }
527
528 if (optind == argc || !cfg->threads || !cfg->duration) return -1;
529
530 if (!script_parse_url(argv[optind], parts)) {
531 fprintf(stderr, "invalid URL: %s\n", argv[optind]);
532 return -1;
533 }
534
535 if (!cfg->connections || cfg->connections < cfg->threads) {
536 fprintf(stderr, "number of connections must be >= threads\n");
537 return -1;
538 }
539
540 *url = argv[optind];
541 *header = NULL;
542
543 return 0;
544 }
545
546 static void print_stats_header() {
547 printf(" Thread Stats%6s%11s%8s%12s\n", "Avg", "Stdev", "Max", "+/- Stdev");
548 }
549
550 static void print_units(long double n, char *(*fmt)(long double), int width) {
551 char *msg = fmt(n);
552 int len = strlen(msg), pad = 2;
553
554 if (isalpha(msg[len-1])) pad--;
555 if (isalpha(msg[len-2])) pad--;
556 width -= pad;
557
558 printf("%*.*s%.*s", width, width, msg, pad, " ");
559
560 free(msg);
561 }
562
563 static void print_stats(char *name, stats *stats, char *(*fmt)(long double)) {
564 uint64_t max = stats->max;
565 long double mean = stats_mean(stats);
566 long double stdev = stats_stdev(stats, mean);
567
568 printf(" %-10s", name);
569 print_units(mean, fmt, 8);
570 print_units(stdev, fmt, 10);
571 print_units(max, fmt, 9);
572 printf("%8.2Lf%%\n", stats_within_stdev(stats, mean, stdev, 1));
573 }
574
575 static void print_stats_latency(stats *stats) {
576 long double percentiles[] = { 50.0, 75.0, 90.0, 99.0 };
577 printf(" Latency Distribution\n");
578 for (size_t i = 0; i < sizeof(percentiles) / sizeof(long double); i++) {
579 long double p = percentiles[i];
580 uint64_t n = stats_percentile(stats, p);
581 printf("%7.0Lf%%", p);
582 print_units(n, format_time_us, 10);
583 printf("\n");
584 }
585 }