"Fossies" - the Fresh Open Source Software Archive

Member "wrk-4.2.0/src/wrk.c" (7 Feb 2021, 17037 Bytes) of package /linux/www/wrk-4.2.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "wrk.c" see the Fossies "Dox" file reference documentation.

    1 // Copyright (C) 2012 - Will Glozer.  All rights reserved.
    2 
    3 #include "wrk.h"
    4 #include "script.h"
    5 #include "main.h"
    6 
    7 static struct config {
    8     uint64_t connections;
    9     uint64_t duration;
   10     uint64_t threads;
   11     uint64_t timeout;
   12     uint64_t pipeline;
   13     bool     delay;
   14     bool     dynamic;
   15     bool     latency;
   16     char    *host;
   17     char    *script;
   18     SSL_CTX *ctx;
   19 } cfg;
   20 
   21 static struct {
   22     stats *latency;
   23     stats *requests;
   24 } statistics;
   25 
   26 static struct sock sock = {
   27     .connect  = sock_connect,
   28     .close    = sock_close,
   29     .read     = sock_read,
   30     .write    = sock_write,
   31     .readable = sock_readable
   32 };
   33 
   34 static struct http_parser_settings parser_settings = {
   35     .on_message_complete = response_complete
   36 };
   37 
   38 static volatile sig_atomic_t stop = 0;
   39 
   40 static void handler(int sig) {
   41     stop = 1;
   42 }
   43 
   44 static void usage() {
   45     printf("Usage: wrk <options> <url>                            \n"
   46            "  Options:                                            \n"
   47            "    -c, --connections <N>  Connections to keep open   \n"
   48            "    -d, --duration    <T>  Duration of test           \n"
   49            "    -t, --threads     <N>  Number of threads to use   \n"
   50            "                                                      \n"
   51            "    -s, --script      <S>  Load Lua script file       \n"
   52            "    -H, --header      <H>  Add header to request      \n"
   53            "        --latency          Print latency statistics   \n"
   54            "        --timeout     <T>  Socket/request timeout     \n"
   55            "    -v, --version          Print version details      \n"
   56            "                                                      \n"
   57            "  Numeric arguments may include a SI unit (1k, 1M, 1G)\n"
   58            "  Time arguments may include a time unit (2s, 2m, 2h)\n");
   59 }
   60 
   61 int main(int argc, char **argv) {
   62     char *url, **headers = zmalloc(argc * sizeof(char *));
   63     struct http_parser_url parts = {};
   64 
   65     if (parse_args(&cfg, &url, &parts, headers, argc, argv)) {
   66         usage();
   67         exit(1);
   68     }
   69 
   70     char *schema  = copy_url_part(url, &parts, UF_SCHEMA);
   71     char *host    = copy_url_part(url, &parts, UF_HOST);
   72     char *port    = copy_url_part(url, &parts, UF_PORT);
   73     char *service = port ? port : schema;
   74 
   75     if (!strncmp("https", schema, 5)) {
   76         if ((cfg.ctx = ssl_init()) == NULL) {
   77             fprintf(stderr, "unable to initialize SSL\n");
   78             ERR_print_errors_fp(stderr);
   79             exit(1);
   80         }
   81         sock.connect  = ssl_connect;
   82         sock.close    = ssl_close;
   83         sock.read     = ssl_read;
   84         sock.write    = ssl_write;
   85         sock.readable = ssl_readable;
   86     }
   87 
   88     signal(SIGPIPE, SIG_IGN);
   89     signal(SIGINT,  SIG_IGN);
   90 
   91     statistics.latency  = stats_alloc(cfg.timeout * 1000);
   92     statistics.requests = stats_alloc(MAX_THREAD_RATE_S);
   93     thread *threads     = zcalloc(cfg.threads * sizeof(thread));
   94 
   95     lua_State *L = script_create(cfg.script, url, headers);
   96     if (!script_resolve(L, host, service)) {
   97         char *msg = strerror(errno);
   98         fprintf(stderr, "unable to connect to %s:%s %s\n", host, service, msg);
   99         exit(1);
  100     }
  101 
  102     cfg.host = host;
  103 
  104     for (uint64_t i = 0; i < cfg.threads; i++) {
  105         thread *t      = &threads[i];
  106         t->loop        = aeCreateEventLoop(10 + cfg.connections * 3);
  107         t->connections = cfg.connections / cfg.threads;
  108 
  109         t->L = script_create(cfg.script, url, headers);
  110         script_init(L, t, argc - optind, &argv[optind]);
  111 
  112         if (i == 0) {
  113             cfg.pipeline = script_verify_request(t->L);
  114             cfg.dynamic  = !script_is_static(t->L);
  115             cfg.delay    = script_has_delay(t->L);
  116             if (script_want_response(t->L)) {
  117                 parser_settings.on_header_field = header_field;
  118                 parser_settings.on_header_value = header_value;
  119                 parser_settings.on_body         = response_body;
  120             }
  121         }
  122 
  123         if (!t->loop || pthread_create(&t->thread, NULL, &thread_main, t)) {
  124             char *msg = strerror(errno);
  125             fprintf(stderr, "unable to create thread %"PRIu64": %s\n", i, msg);
  126             exit(2);
  127         }
  128     }
  129 
  130     struct sigaction sa = {
  131         .sa_handler = handler,
  132         .sa_flags   = 0,
  133     };
  134     sigfillset(&sa.sa_mask);
  135     sigaction(SIGINT, &sa, NULL);
  136 
  137     char *time = format_time_s(cfg.duration);
  138     printf("Running %s test @ %s\n", time, url);
  139     printf("  %"PRIu64" threads and %"PRIu64" connections\n", cfg.threads, cfg.connections);
  140 
  141     uint64_t start    = time_us();
  142     uint64_t complete = 0;
  143     uint64_t bytes    = 0;
  144     errors errors     = { 0 };
  145 
  146     sleep(cfg.duration);
  147     stop = 1;
  148 
  149     for (uint64_t i = 0; i < cfg.threads; i++) {
  150         thread *t = &threads[i];
  151         pthread_join(t->thread, NULL);
  152 
  153         complete += t->complete;
  154         bytes    += t->bytes;
  155 
  156         errors.connect += t->errors.connect;
  157         errors.read    += t->errors.read;
  158         errors.write   += t->errors.write;
  159         errors.timeout += t->errors.timeout;
  160         errors.status  += t->errors.status;
  161     }
  162 
  163     uint64_t runtime_us = time_us() - start;
  164     long double runtime_s   = runtime_us / 1000000.0;
  165     long double req_per_s   = complete   / runtime_s;
  166     long double bytes_per_s = bytes      / runtime_s;
  167 
  168     if (complete / cfg.connections > 0) {
  169         int64_t interval = runtime_us / (complete / cfg.connections);
  170         stats_correct(statistics.latency, interval);
  171     }
  172 
  173     print_stats_header();
  174     print_stats("Latency", statistics.latency, format_time_us);
  175     print_stats("Req/Sec", statistics.requests, format_metric);
  176     if (cfg.latency) print_stats_latency(statistics.latency);
  177 
  178     char *runtime_msg = format_time_us(runtime_us);
  179 
  180     printf("  %"PRIu64" requests in %s, %sB read\n", complete, runtime_msg, format_binary(bytes));
  181     if (errors.connect || errors.read || errors.write || errors.timeout) {
  182         printf("  Socket errors: connect %d, read %d, write %d, timeout %d\n",
  183                errors.connect, errors.read, errors.write, errors.timeout);
  184     }
  185 
  186     if (errors.status) {
  187         printf("  Non-2xx or 3xx responses: %d\n", errors.status);
  188     }
  189 
  190     printf("Requests/sec: %9.2Lf\n", req_per_s);
  191     printf("Transfer/sec: %10sB\n", format_binary(bytes_per_s));
  192 
  193     if (script_has_done(L)) {
  194         script_summary(L, runtime_us, complete, bytes);
  195         script_errors(L, &errors);
  196         script_done(L, statistics.latency, statistics.requests);
  197     }
  198 
  199     return 0;
  200 }
  201 
  202 void *thread_main(void *arg) {
  203     thread *thread = arg;
  204 
  205     char *request = NULL;
  206     size_t length = 0;
  207 
  208     if (!cfg.dynamic) {
  209         script_request(thread->L, &request, &length);
  210     }
  211 
  212     thread->cs = zcalloc(thread->connections * sizeof(connection));
  213     connection *c = thread->cs;
  214 
  215     for (uint64_t i = 0; i < thread->connections; i++, c++) {
  216         c->thread = thread;
  217         c->ssl     = cfg.ctx ? SSL_new(cfg.ctx) : NULL;
  218         c->request = request;
  219         c->length  = length;
  220         c->delayed = cfg.delay;
  221         connect_socket(thread, c);
  222     }
  223 
  224     aeEventLoop *loop = thread->loop;
  225     aeCreateTimeEvent(loop, RECORD_INTERVAL_MS, record_rate, thread, NULL);
  226 
  227     thread->start = time_us();
  228     aeMain(loop);
  229 
  230     aeDeleteEventLoop(loop);
  231     zfree(thread->cs);
  232 
  233     return NULL;
  234 }
  235 
  236 static int connect_socket(thread *thread, connection *c) {
  237     struct addrinfo *addr = thread->addr;
  238     struct aeEventLoop *loop = thread->loop;
  239     int fd, flags;
  240 
  241     fd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
  242 
  243     flags = fcntl(fd, F_GETFL, 0);
  244     fcntl(fd, F_SETFL, flags | O_NONBLOCK);
  245 
  246     if (connect(fd, addr->ai_addr, addr->ai_addrlen) == -1) {
  247         if (errno != EINPROGRESS) goto error;
  248     }
  249 
  250     flags = 1;
  251     setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
  252 
  253     flags = AE_READABLE | AE_WRITABLE;
  254     if (aeCreateFileEvent(loop, fd, flags, socket_connected, c) == AE_OK) {
  255         c->parser.data = c;
  256         c->fd = fd;
  257         return fd;
  258     }
  259 
  260   error:
  261     thread->errors.connect++;
  262     close(fd);
  263     return -1;
  264 }
  265 
  266 static int reconnect_socket(thread *thread, connection *c) {
  267     aeDeleteFileEvent(thread->loop, c->fd, AE_WRITABLE | AE_READABLE);
  268     sock.close(c);
  269     close(c->fd);
  270     return connect_socket(thread, c);
  271 }
  272 
  273 static int record_rate(aeEventLoop *loop, long long id, void *data) {
  274     thread *thread = data;
  275 
  276     if (thread->requests > 0) {
  277         uint64_t elapsed_ms = (time_us() - thread->start) / 1000;
  278         uint64_t requests = (thread->requests / (double) elapsed_ms) * 1000;
  279 
  280         stats_record(statistics.requests, requests);
  281 
  282         thread->requests = 0;
  283         thread->start    = time_us();
  284     }
  285 
  286     if (stop) aeStop(loop);
  287 
  288     return RECORD_INTERVAL_MS;
  289 }
  290 
  291 static int delay_request(aeEventLoop *loop, long long id, void *data) {
  292     connection *c = data;
  293     c->delayed = false;
  294     aeCreateFileEvent(loop, c->fd, AE_WRITABLE, socket_writeable, c);
  295     return AE_NOMORE;
  296 }
  297 
  298 static int header_field(http_parser *parser, const char *at, size_t len) {
  299     connection *c = parser->data;
  300     if (c->state == VALUE) {
  301         *c->headers.cursor++ = '\0';
  302         c->state = FIELD;
  303     }
  304     buffer_append(&c->headers, at, len);
  305     return 0;
  306 }
  307 
  308 static int header_value(http_parser *parser, const char *at, size_t len) {
  309     connection *c = parser->data;
  310     if (c->state == FIELD) {
  311         *c->headers.cursor++ = '\0';
  312         c->state = VALUE;
  313     }
  314     buffer_append(&c->headers, at, len);
  315     return 0;
  316 }
  317 
  318 static int response_body(http_parser *parser, const char *at, size_t len) {
  319     connection *c = parser->data;
  320     buffer_append(&c->body, at, len);
  321     return 0;
  322 }
  323 
  324 static int response_complete(http_parser *parser) {
  325     connection *c = parser->data;
  326     thread *thread = c->thread;
  327     uint64_t now = time_us();
  328     int status = parser->status_code;
  329 
  330     thread->complete++;
  331     thread->requests++;
  332 
  333     if (status > 399) {
  334         thread->errors.status++;
  335     }
  336 
  337     if (c->headers.buffer) {
  338         *c->headers.cursor++ = '\0';
  339         script_response(thread->L, status, &c->headers, &c->body);
  340         c->state = FIELD;
  341     }
  342 
  343     if (--c->pending == 0) {
  344         if (!stats_record(statistics.latency, now - c->start)) {
  345             thread->errors.timeout++;
  346         }
  347         c->delayed = cfg.delay;
  348         aeCreateFileEvent(thread->loop, c->fd, AE_WRITABLE, socket_writeable, c);
  349     }
  350 
  351     if (!http_should_keep_alive(parser)) {
  352         reconnect_socket(thread, c);
  353         goto done;
  354     }
  355 
  356     http_parser_init(parser, HTTP_RESPONSE);
  357 
  358   done:
  359     return 0;
  360 }
  361 
  362 static void socket_connected(aeEventLoop *loop, int fd, void *data, int mask) {
  363     connection *c = data;
  364 
  365     switch (sock.connect(c, cfg.host)) {
  366         case OK:    break;
  367         case ERROR: goto error;
  368         case RETRY: return;
  369     }
  370 
  371     http_parser_init(&c->parser, HTTP_RESPONSE);
  372     c->written = 0;
  373 
  374     aeCreateFileEvent(c->thread->loop, fd, AE_READABLE, socket_readable, c);
  375     aeCreateFileEvent(c->thread->loop, fd, AE_WRITABLE, socket_writeable, c);
  376 
  377     return;
  378 
  379   error:
  380     c->thread->errors.connect++;
  381     reconnect_socket(c->thread, c);
  382 }
  383 
  384 static void socket_writeable(aeEventLoop *loop, int fd, void *data, int mask) {
  385     connection *c = data;
  386     thread *thread = c->thread;
  387 
  388     if (c->delayed) {
  389         uint64_t delay = script_delay(thread->L);
  390         aeDeleteFileEvent(loop, fd, AE_WRITABLE);
  391         aeCreateTimeEvent(loop, delay, delay_request, c, NULL);
  392         return;
  393     }
  394 
  395     if (!c->written) {
  396         if (cfg.dynamic) {
  397             script_request(thread->L, &c->request, &c->length);
  398         }
  399         c->start   = time_us();
  400         c->pending = cfg.pipeline;
  401     }
  402 
  403     char  *buf = c->request + c->written;
  404     size_t len = c->length  - c->written;
  405     size_t n;
  406 
  407     switch (sock.write(c, buf, len, &n)) {
  408         case OK:    break;
  409         case ERROR: goto error;
  410         case RETRY: return;
  411     }
  412 
  413     c->written += n;
  414     if (c->written == c->length) {
  415         c->written = 0;
  416         aeDeleteFileEvent(loop, fd, AE_WRITABLE);
  417     }
  418 
  419     return;
  420 
  421   error:
  422     thread->errors.write++;
  423     reconnect_socket(thread, c);
  424 }
  425 
  426 static void socket_readable(aeEventLoop *loop, int fd, void *data, int mask) {
  427     connection *c = data;
  428     size_t n;
  429 
  430     do {
  431         switch (sock.read(c, &n)) {
  432             case OK:    break;
  433             case ERROR: goto error;
  434             case RETRY: return;
  435         }
  436 
  437         if (http_parser_execute(&c->parser, &parser_settings, c->buf, n) != n) goto error;
  438         if (n == 0 && !http_body_is_final(&c->parser)) goto error;
  439 
  440         c->thread->bytes += n;
  441     } while (n == RECVBUF && sock.readable(c) > 0);
  442 
  443     return;
  444 
  445   error:
  446     c->thread->errors.read++;
  447     reconnect_socket(c->thread, c);
  448 }
  449 
  450 static uint64_t time_us() {
  451     struct timeval t;
  452     gettimeofday(&t, NULL);
  453     return (t.tv_sec * 1000000) + t.tv_usec;
  454 }
  455 
  456 static char *copy_url_part(char *url, struct http_parser_url *parts, enum http_parser_url_fields field) {
  457     char *part = NULL;
  458 
  459     if (parts->field_set & (1 << field)) {
  460         uint16_t off = parts->field_data[field].off;
  461         uint16_t len = parts->field_data[field].len;
  462         part = zcalloc(len + 1 * sizeof(char));
  463         memcpy(part, &url[off], len);
  464     }
  465 
  466     return part;
  467 }
  468 
  469 static struct option longopts[] = {
  470     { "connections", required_argument, NULL, 'c' },
  471     { "duration",    required_argument, NULL, 'd' },
  472     { "threads",     required_argument, NULL, 't' },
  473     { "script",      required_argument, NULL, 's' },
  474     { "header",      required_argument, NULL, 'H' },
  475     { "latency",     no_argument,       NULL, 'L' },
  476     { "timeout",     required_argument, NULL, 'T' },
  477     { "help",        no_argument,       NULL, 'h' },
  478     { "version",     no_argument,       NULL, 'v' },
  479     { NULL,          0,                 NULL,  0  }
  480 };
  481 
  482 static int parse_args(struct config *cfg, char **url, struct http_parser_url *parts, char **headers, int argc, char **argv) {
  483     char **header = headers;
  484     int c;
  485 
  486     memset(cfg, 0, sizeof(struct config));
  487     cfg->threads     = 2;
  488     cfg->connections = 10;
  489     cfg->duration    = 10;
  490     cfg->timeout     = SOCKET_TIMEOUT_MS;
  491 
  492     while ((c = getopt_long(argc, argv, "t:c:d:s:H:T:Lrv?", longopts, NULL)) != -1) {
  493         switch (c) {
  494             case 't':
  495                 if (scan_metric(optarg, &cfg->threads)) return -1;
  496                 break;
  497             case 'c':
  498                 if (scan_metric(optarg, &cfg->connections)) return -1;
  499                 break;
  500             case 'd':
  501                 if (scan_time(optarg, &cfg->duration)) return -1;
  502                 break;
  503             case 's':
  504                 cfg->script = optarg;
  505                 break;
  506             case 'H':
  507                 *header++ = optarg;
  508                 break;
  509             case 'L':
  510                 cfg->latency = true;
  511                 break;
  512             case 'T':
  513                 if (scan_time(optarg, &cfg->timeout)) return -1;
  514                 cfg->timeout *= 1000;
  515                 break;
  516             case 'v':
  517                 printf("wrk %s [%s] ", VERSION, aeGetApiName());
  518                 printf("Copyright (C) 2012 Will Glozer\n");
  519                 break;
  520             case 'h':
  521             case '?':
  522             case ':':
  523             default:
  524                 return -1;
  525         }
  526     }
  527 
  528     if (optind == argc || !cfg->threads || !cfg->duration) return -1;
  529 
  530     if (!script_parse_url(argv[optind], parts)) {
  531         fprintf(stderr, "invalid URL: %s\n", argv[optind]);
  532         return -1;
  533     }
  534 
  535     if (!cfg->connections || cfg->connections < cfg->threads) {
  536         fprintf(stderr, "number of connections must be >= threads\n");
  537         return -1;
  538     }
  539 
  540     *url    = argv[optind];
  541     *header = NULL;
  542 
  543     return 0;
  544 }
  545 
  546 static void print_stats_header() {
  547     printf("  Thread Stats%6s%11s%8s%12s\n", "Avg", "Stdev", "Max", "+/- Stdev");
  548 }
  549 
  550 static void print_units(long double n, char *(*fmt)(long double), int width) {
  551     char *msg = fmt(n);
  552     int len = strlen(msg), pad = 2;
  553 
  554     if (isalpha(msg[len-1])) pad--;
  555     if (isalpha(msg[len-2])) pad--;
  556     width -= pad;
  557 
  558     printf("%*.*s%.*s", width, width, msg, pad, "  ");
  559 
  560     free(msg);
  561 }
  562 
  563 static void print_stats(char *name, stats *stats, char *(*fmt)(long double)) {
  564     uint64_t max = stats->max;
  565     long double mean  = stats_mean(stats);
  566     long double stdev = stats_stdev(stats, mean);
  567 
  568     printf("    %-10s", name);
  569     print_units(mean,  fmt, 8);
  570     print_units(stdev, fmt, 10);
  571     print_units(max,   fmt, 9);
  572     printf("%8.2Lf%%\n", stats_within_stdev(stats, mean, stdev, 1));
  573 }
  574 
  575 static void print_stats_latency(stats *stats) {
  576     long double percentiles[] = { 50.0, 75.0, 90.0, 99.0 };
  577     printf("  Latency Distribution\n");
  578     for (size_t i = 0; i < sizeof(percentiles) / sizeof(long double); i++) {
  579         long double p = percentiles[i];
  580         uint64_t n = stats_percentile(stats, p);
  581         printf("%7.0Lf%%", p);
  582         print_units(n, format_time_us, 10);
  583         printf("\n");
  584     }
  585 }