"Fossies" - the Fresh Open Source Software Archive

Member "lessfs-1.7.0/lib_repl.c" (16 Nov 2013, 31024 Bytes) of package /linux/misc/old/lessfs-1.7.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "lib_repl.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 1.5.13_vs_1.7.0.

    1 #ifdef HAVE_CONFIG_H
    2 #include <config.h>
    3 #endif
    4 #ifndef LFATAL
    5 #include "lib_log.h"
    6 #endif
    7 
    8 #include <unistd.h>
    9 #include <sys/types.h>
   10 #include <stdio.h>
   11 #include <stdlib.h>
   12 #include <string.h>
   13 #include <errno.h>
   14 #include <sys/types.h>
   15 #include <sys/stat.h>
   16 #include <time.h>
   17 #include <sys/types.h>
   18 #include <fuse.h>
   19 #include <fcntl.h>
   20 #include <pthread.h>
   21 #include <sys/socket.h>
   22 #include <sys/un.h>
   23 #include <sys/file.h>
   24 
   25 
   26 #include <tcutil.h>
   27 #include <tcbdb.h>
   28 #include <tchdb.h>
   29 #include <stdlib.h>
   30 #include <stdbool.h>
   31 #include <stdint.h>
   32 #include <aio.h>
   33 #include <mhash.h>
   34 #include <mutils/mhash.h>
   35 #include <sys/types.h>
   36 #include <dirent.h>
   37 
   38 
   39 #include "lib_safe.h"
   40 #include "lib_cfg.h"
   41 #include "lib_net.h"
   42 #include "retcodes.h"
   43 #ifdef LZO
   44 #include "lib_lzo.h"
   45 #endif
   46 #include "lib_qlz.h"
   47 #include "lib_common.h"
   48 #include "lib_repl.h"
   49 #include "lib_crypto.h"
   50 #include "file_io.h"
   51 
   52 #ifdef BERKELEYDB
   53 #include <db.h>
   54 #include "lib_bdb.h"
   55 #else
   56 #ifndef HAMSTERDB
   57 #include "lib_tc.h"
   58 #else
   59 #include "lib_hamster.h"
   60 #endif
   61 #endif
   62 
   63 
   64 extern TCHDB *dbb;
   65 extern TCHDB *dbu;
   66 extern TCHDB *dbp;
   67 extern TCBDB *dbl;
   68 extern TCHDB *dbs;
   69 extern TCHDB *dbdta;
   70 extern TCBDB *dbdirent;
   71 extern TCBDB *freelist;
   72 extern TCTREE *workqtree;
   73 extern TCTREE *readcachetree;
   74 extern TCTREE *path2inotree;
   75 extern int fdbdta;
   76 extern int frepl;
   77 extern int freplbl;
   78 int rrepl;
   79 extern int BLKSIZE;
   80 extern unsigned long long nextoffset;
   81 
   82 unsigned char *crc32(unsigned char *buf, int size, int thread_number)
   83 {
   84     MHASH td[MAX_ALLOWED_THREADS];
   85     unsigned char *hash[thread_number];
   86 
   87     td[thread_number] = mhash_init(MHASH_CRC32);
   88     if (td[thread_number] == MHASH_FAILED)
   89         exit(1);
   90 
   91     mhash(td[thread_number], buf, size);
   92     hash[thread_number] = mhash_end(td[thread_number]);
   93     return hash[thread_number];
   94 }
   95 
   96 int check_abort()
   97 {
   98     int ret = OK;
   99     if (config->shutdown || config->replication_enabled == 0) {
  100         config->safe_down = 1;
  101         ret = FAIL;
  102     }
  103     return (ret);
  104 }
  105 
  106 void merge_replog()
  107 {
  108     unsigned int msgsize;
  109     int result;
  110     char *data;
  111     struct stat stbuf;
  112     unsigned long offset = 0;
  113     unsigned long line = 0;
  114     REPLICATIONMSG replicationmsg;
  115     char *crccalc;
  116     char *crcrec;
  117 
  118     LINFO("merge_replog");
  119     if (-1 == fstat(rrepl, &stbuf))
  120         die_syserr();
  121     if (0 == stbuf.st_size) {
  122         fprintf(stderr, "This replication logfile is empty\n");
  123         return;
  124     }
  125     lseek(rrepl, offset, SEEK_SET);
  126     while (1) {
  127         line++;
  128         result = fullRead(rrepl, (unsigned char *) &msgsize, sizeof(int));
  129         if (sizeof(int) != result) {
  130             break;
  131         }
  132         offset += sizeof(int);
  133         if (msgsize > 2 * BLKSIZE) {
  134             fprintf(stderr,
  135                     "create_report : corrupt replog file detected : msgsize %u",
  136                     msgsize);
  137             die_dataerr("create_report : corrupt replog file detected");
  138         }
  139         data = s_zmalloc(msgsize);
  140         result = fullRead(rrepl, (unsigned char *) data, msgsize);
  141         if (result != msgsize)
  142             die_syserr();
  143         if (data[msgsize - 1] != '~') {
  144             offset += msgsize;
  145             s_free(data);
  146             lseek(rrepl, offset, SEEK_SET);
  147             printf("SKIP\n");
  148             continue;
  149         }
  150         replicationmsg.database = data[0];
  151         replicationmsg.operation = data[1];
  152         memcpy(&replicationmsg.ksize, &data[2], sizeof(int));
  153         memcpy(&replicationmsg.vsize, &data[2 + sizeof(int)], sizeof(int));
  154         replicationmsg.key =
  155             (unsigned char *) &data[2 + (2 * sizeof(int))];
  156         replicationmsg.value =
  157             (unsigned char *) &data[2 + (2 * sizeof(int)) +
  158                                     replicationmsg.ksize];
  159         crccalc =
  160             (char *) crc32((unsigned char *) data,
  161                            msgsize - ((sizeof(int) + 1)), 1);
  162         crcrec = &data[msgsize - (sizeof(int) + 1)];
  163         if (0 != memcmp(crccalc, crcrec, sizeof(int))) {
  164             LINFO("crcsend != crcrecv %02x %02x : %02x %02x",
  165                   data[0], data[1],
  166                   data[msgsize - sizeof(int) - 1],
  167                   data[msgsize - sizeof(int)]);
  168             free(crccalc);
  169             die_dataerr("CRC errors in replication logfile, abort");
  170         }
  171         free(crccalc);
  172         if (replicationmsg.operation != TRANSACTIONCOMMIT &&
  173             replicationmsg.operation != TRANSACTIONABORT) {
  174             process_message(data, msgsize - (1 + sizeof(int)));
  175         }
  176         offset += msgsize;
  177         s_free(data);
  178         lseek(rrepl, offset, SEEK_SET);
  179     }
  180     return;
  181 }
  182 
  183 
  184 // Try to send the replication backlog.
  185 // 0 if OK, 1 if failed.
  186 int send_backlog()
  187 {
  188     unsigned int msgsize;
  189     int ret = OK;
  190     int retry = 0;
  191     int result;
  192     unsigned char *flagwritten = (unsigned char *) "Z";
  193     char *data;
  194     char *msgstart;
  195     unsigned int remote;
  196     struct stat stbuf;
  197     unsigned long long offset;
  198     bool dotrunc = 0;
  199     bool connected = 0;
  200 
  201     FUNC;
  202     if (config->frozen)
  203         return (FAIL);
  204     if (config->replication_enabled == 0)
  205         return (FAIL);
  206     lseek(freplbl, 0, SEEK_SET);
  207     offset = 0;
  208     if (-1 == fstat(freplbl, &stbuf))
  209         die_syserr();
  210     if (0 == stbuf.st_size) {
  211         LDEBUG("send_backlog : replog is empty %lu, %lu", stbuf.st_size,
  212                stbuf.st_ino);
  213         return (ret);
  214     }
  215     LDEBUG("send_backlog : start sending the backlog to the slave");
  216   final_run:
  217     while (1) {
  218         result =
  219             fullRead(freplbl, (unsigned char *) &msgsize, sizeof(int));
  220         if (sizeof(int) != result)
  221             break;
  222         offset = offset + sizeof(int);
  223         LDEBUG("msgsize=%i", msgsize);
  224         if ( 0 == msgsize ) die_dataerr("send_backlog : corrupt replog file detected, msgsize=0");
  225         if (msgsize > 2 * BLKSIZE)
  226             die_dataerr("send_backlog : corrupt replog file detected");
  227         data = s_zmalloc(msgsize);
  228         result = fullRead(freplbl, (unsigned char *) data, msgsize);
  229         if (result != msgsize) {
  230             s_free(data);
  231             LFATAL("send_backlog : !msgsize");
  232             return (FAIL);
  233         }
  234         if (data[msgsize - 1] != '~') {
  235             s_free(data);
  236             offset = offset + msgsize;
  237             lseek(freplbl, offset, SEEK_SET);
  238             LDEBUG
  239                 ("send_backlog : has been written before, skip (not an error)");
  240             continue;
  241         }
  242         if (!connected) {
  243             while (OK != reconnect()) {
  244                 sleep(1);       // Make sure we don't flood the other size with SYN's
  245                 LINFO("send_backlog : failed to connect to the slave");
  246             }
  247             connected = 1;
  248         }
  249         msgstart =
  250             as_sprintf(__FILE__, __LINE__, "START : %i", msgsize - 1);
  251         while (1) {
  252             if (OK != (ret = check_abort()))
  253                 break;
  254             result =
  255                 fulltimWrite(5, config->replication_socket,
  256                              (unsigned char *) msgstart,
  257                              strlen(msgstart) + 1);
  258             if (result != strlen(msgstart) + 1) {
  259                 LINFO("send_backlog : write msgstart failed");
  260                 while (OK != reconnect()) {
  261                     sleep(1);   // Make sure we don't flood the other size with SYN's
  262                     LINFO("send_backlog : failed to connect to the slave");
  263                 }
  264                 continue;
  265             }
  266             result =
  267                 fulltimRead(5, config->replication_socket,
  268                             (unsigned char *) &remote,
  269                             sizeof(unsigned int));
  270             if (OK != (ret = check_abort()))
  271                 break;
  272             if (remote != msgsize - 1) {
  273                 if (OK != reconnect()) {
  274                     ret = FAIL;
  275                     break;
  276                 }
  277                 LINFO("send_backlog : invalid message size");
  278                 continue;
  279             }
  280 
  281             result =
  282                 fulltimWrite(5, config->replication_socket,
  283                              (unsigned char *) data, msgsize - 1);
  284             if (OK != (ret = check_abort()))
  285                 break;
  286             if (result != msgsize - 1) {
  287                 LINFO
  288                     ("send_backlog : failed to write the message, reset connection");
  289                 while (OK != reconnect()) {
  290                     sleep(1);   // Make sure we don't flood the other size with SYN's
  291                     LINFO("send_backlog : failed to connect to the slave");
  292                 }
  293                 continue;
  294             }
  295             result =
  296                 fulltimRead(5, config->replication_socket,
  297                             (unsigned char *) &remote,
  298                             sizeof(unsigned int));
  299             if (OK != (ret = check_abort()))
  300                 break;
  301             if (remote == ACK) {
  302                 retry = 0;
  303                 LDEBUG("send_backlog : remote == ACK");
  304                 break;
  305             }
  306             LINFO("send_backlog : no ACK on %02x %02x : %02x %02x",
  307                   (u_char) data[0], (u_char) data[1],
  308                   (u_char) data[msgsize - sizeof(int) - 1],
  309                   (u_char) data[msgsize - sizeof(int)]);
  310             while (OK != reconnect()) {
  311                 sleep(1);       // Make sure we don't flood the other size with SYN's
  312                 LINFO("send_backlog : failed to connect to the slave");
  313             }
  314             if (OK != (ret = check_abort()))
  315                 break;
  316         }
  317         retry = 0;
  318         s_free(msgstart);
  319         s_free(data);
  320         if (ret == FAIL)
  321             break;
  322         lseek(freplbl, offset + msgsize - 1, SEEK_SET);
  323         result = fullWrite(freplbl, flagwritten, 1);
  324         if (result != sizeof(unsigned char))
  325             die_dataerr
  326                 ("send_backlog : write error on disk: this should never happen");
  327         offset = offset + msgsize;
  328         lseek(freplbl, offset, SEEK_SET);
  329     }
  330     if (ret == OK) {
  331         if (dotrunc == 0)
  332             repl_lock((char *) __PRETTY_FUNCTION__);
  333         if (0 == dotrunc) {
  334             dotrunc = 1;
  335             goto final_run;
  336         }
  337         lseek(freplbl, 0, SEEK_SET);
  338         if (-1 == ftruncate(freplbl, 0))
  339             die_dataerr("Failed to truncate replog");
  340         release_repl_lock();
  341     }
  342     if (ret == OK) {
  343         LDEBUG("send_backlog : send_backlog returns OK");
  344     } else {
  345         if (config->shutdown == 0) {
  346             LINFO("send_backlog : return FAIL");
  347         } else
  348             LINFO
  349                 ("send_backlog : refuse request, going down after shutdown request");
  350     }
  351     close(config->replication_socket);
  352     return (ret);
  353 }
  354 
  355 void rotate_replog()
  356 {
  357     char *newfile;
  358     struct stat stbuf;
  359     unsigned long sequence;
  360 
  361     FUNC;
  362     sequence = get_sequence();
  363     newfile =
  364         as_sprintf(__FILE__, __LINE__, "%s-%lu",
  365                    config->replication_logfile, sequence);
  366     if (-1 != stat(newfile, &stbuf)) {
  367         die_dataerr("Replication logfile with name %s already exists",
  368                     newfile);
  369     }
  370     flock(frepl, LOCK_UN);
  371     fsync(frepl);
  372     close(frepl);
  373     close(freplbl);
  374     if (0 != rename(config->replication_logfile, newfile))
  375         die_dataerr("Failed to rename %s to %s",
  376                     config->replication_logfile, newfile);
  377     if (-1 ==
  378         (frepl =
  379          s_open2(config->replication_logfile, O_CREAT | O_RDWR, S_IRWXU)))
  380         die_syserr();
  381     if (0 != flock(frepl, LOCK_EX | LOCK_NB)) {
  382         LFATAL("Failed to lock the replication logfile %s",
  383                config->replication_logfile);
  384         exit(EXIT_USAGE);
  385     }
  386     if (-1 ==
  387         (freplbl = s_open2(config->replication_logfile, O_RDWR, S_IRWXU)))
  388         die_syserr();
  389     next_sequence();
  390     sequence = get_sequence();
  391 
  392     s_free(newfile);
  393     EFUNC;
  394     return;
  395 }
  396 
  397 void write_replication_data(unsigned char db, unsigned char op, char *key,
  398                             int ksize, char *value, int vsize,
  399                             int threadnr)
  400 {
  401     char *replicationmsg;
  402     int replicationmsg_size;
  403     int result;
  404     char *crc;
  405 
  406 // replication_msg : db (char), op (char), ksize (int),vsize (int), key (char *),value (char *), crc32 (int)
  407     replicationmsg_size =
  408         ksize + vsize + (sizeof(unsigned char) * 2) + (sizeof(int) * 2) +
  409         sizeof(int);
  410     replicationmsg = s_zmalloc(replicationmsg_size + 1);
  411     replicationmsg[0] = db;
  412     replicationmsg[1] = op;
  413     memcpy(replicationmsg + (sizeof(unsigned char) * 2), &ksize,
  414            sizeof(int));
  415     memcpy(replicationmsg + (sizeof(unsigned char) * 2) + sizeof(int),
  416            &vsize, sizeof(int));
  417     memcpy(replicationmsg + (sizeof(unsigned char) * 2) +
  418            (sizeof(int) * 2), key, ksize);
  419     if (op == REPLWRITE || op == REPLDUPWRITE || op == REPLDELETECURKEY) {
  420         memcpy(replicationmsg + ksize + (sizeof(unsigned char) * 2) +
  421                (2 * sizeof(int)), value, vsize);
  422 // For safety we will calc a crc32 checksum of the message.
  423 // drbd has seen cases where data received was garbled despite the
  424 // use of tcp as transfer protocol.
  425     }
  426     crc =
  427         (char *) crc32((unsigned char *) replicationmsg,
  428                        replicationmsg_size - sizeof(int), threadnr);
  429     memcpy(replicationmsg + replicationmsg_size - sizeof(int), crc,
  430            sizeof(int));
  431     free(crc);
  432 
  433     config->replication_backlog = 1;
  434     LDEBUG("write to replog, msgsize = %u", replicationmsg_size);
  435     lseek(frepl, 0, SEEK_END);
  436     replicationmsg_size++;
  437     result =
  438         fullWrite(frepl, (unsigned char *) &replicationmsg_size,
  439                   sizeof(int));
  440     replicationmsg[replicationmsg_size - 1] = '~';
  441     // Add one byte that will be used later to flag when a message has been send succesfully.
  442     result =
  443         fullWrite(frepl, (unsigned char *) replicationmsg,
  444                   replicationmsg_size);
  445     if (result != replicationmsg_size)
  446         die_dataerr("Failed to write to replication log");
  447     s_free(replicationmsg);
  448 }
  449 
  450 void write_repl_data(unsigned char db, unsigned char op, char *key,
  451                      int ksize, char *value, int vsize, int threadnr)
  452 {
  453     struct stat stbuf;
  454     //unsigned int curtime = time(NULL);
  455 
  456     FUNC;
  457     repl_lock((char *) __PRETTY_FUNCTION__);
  458     if (-1 == fstat(frepl, &stbuf))
  459         die_syserr();
  460     /*if (0 == strcmp(config->replication_partner_ip, "-1")
  461         && config->replication && config->replication_role == 0) {
  462         if (stbuf.st_size > config->rotate_replog_size
  463             || curtime - config->replication_last_rotated > REPLOG_DELAY) {
  464             config->replication_last_rotated = time(NULL);
  465             rotate_replog();
  466         }
  467     }*/
  468     if (0 != stbuf.st_size) {
  469         config->replication_backlog = 1;
  470     } else
  471         config->replication_backlog = 0;
  472     write_replication_data(db, op, key, ksize, value, vsize, threadnr);
  473     release_repl_lock();
  474     EFUNC;
  475     return;
  476 }
  477 
  478 int reconnect()
  479 {
  480     int retry = 0;
  481     close(config->replication_socket);
  482     while (1) {
  483         config->replication_socket =
  484             clientconnect(config->replication_partner_ip,
  485                           config->replication_partner_port, "tcp");
  486         if (config->replication_socket != -1)
  487             break;
  488         if (OK != check_abort())
  489             return (FAIL);
  490         LINFO("Retry connect to %s:%s", config->replication_partner_ip,
  491               config->replication_partner_port);
  492         retry++;
  493         sleep(retry);
  494         if (retry > 3)
  495             return (FAIL);
  496     }
  497     return (OK);
  498 }
  499 
  500 
  501 void flush_recv_buffer()
  502 {
  503     char trash;
  504     while (0 <
  505            fulltimRead(1, config->replication_socket,
  506                        (unsigned char *) &trash, sizeof(char)));
  507     return;
  508 }
  509 
  510 void watchdir()
  511 {
  512     struct stat stbuf;
  513     DIR *dp = NULL;
  514     struct dirent *entry;
  515     char *name;
  516     unsigned long sequence;
  517     bool found = 0;
  518     int sleeptime = WATCHDIR_SLEEPINTERVAL;
  519 
  520     LINFO("watchdir : open %s", config->replication_watchdir);
  521     if (-1 == stat(config->replication_watchdir, &stbuf))
  522         die_dataerr("watchdir : Failed to stat %s",
  523                     config->replication_watchdir);
  524     if (-1 == chdir(config->replication_watchdir))
  525         die_dataerr("watchdir : Failed to chdir to %s",
  526                     config->replication_watchdir);
  527     while (1) {
  528         if (config->shutdown)
  529             break;
  530         while (config->frozen) {
  531             sleep(1);
  532         }
  533         if (NULL == (dp = (opendir(config->replication_watchdir)))) {
  534             LFATAL("watchdir : Failed to open %s\n",
  535                    config->replication_watchdir);
  536             exit(EXIT_WDIR);
  537         }
  538         while (entry = readdir(dp)) {
  539             sequence = get_sequence();
  540             if (-1 == stat(entry->d_name, &stbuf)) {
  541                 LINFO("watchdir: Error on stat %s\n", entry->d_name);
  542                 continue;
  543             }
  544             if (S_ISDIR(stbuf.st_mode)) {
  545                 continue;
  546             }
  547             name =
  548                 as_sprintf(__FILE__, __LINE__, "replog.dta-%lu", sequence);
  549             if (0 != strcmp(name, entry->d_name)) {
  550                 s_free(name);
  551                 continue;
  552             }
  553             if (-1 == (rrepl = s_open2(name, O_RDWR, S_IRWXU)))
  554                 die_syserr();
  555             merge_replog();
  556             found = 1;
  557             next_sequence();
  558             commit_transactions();
  559             start_transactions();
  560             s_free(name);
  561             name =
  562                 as_sprintf(__FILE__, __LINE__, "replog.dta-%lu-processed",
  563                            sequence);
  564             if (0 != rename(entry->d_name, name))
  565                 die_dataerr("Failed to rename %s to %s", entry->d_name,
  566                             name);
  567             s_free(name);
  568         }
  569         closedir(dp);
  570         if (found) {
  571             sleeptime = 0;
  572         } else if (sleeptime < WATCHDIR_SLEEPINTERVAL)
  573             sleeptime++;
  574         sleep(sleeptime);
  575         found = 0;
  576     }
  577     config->safe_down = 1;
  578 }
  579 
  580 void *replication_worker(void *arg)
  581 {
  582     int msocket;
  583     const char *proto = "tcp";
  584     struct sockaddr_un client_address;
  585     socklen_t client_len;
  586     char *message = NULL;
  587     int result;
  588     int msglen = 0;
  589     char *p;
  590     char *crcrec;
  591     char *crccalc;
  592     unsigned int nakcount = 0;
  593 
  594     if (config->replication_watchdir) {
  595         watchdir();
  596         pthread_exit(NULL);
  597     }
  598     msocket = -1;
  599     if (NULL == config->replication_listen_port)
  600         config->replication_listen_port = "101";
  601 
  602     while (1) {
  603         msocket =
  604             serverinit(config->replication_listen_ip,
  605                        config->replication_listen_port, proto);
  606         if (msocket != -1)
  607             break;
  608         LINFO("replication_worker : serverinit failed: retry");
  609         sleep(1);
  610         close(msocket);
  611     }
  612     message = s_zmalloc(BLKSIZE * 2);   // Enough space to hold the message and the kitchen sink
  613     client_len = sizeof(client_address);
  614     while (1) {
  615         nakcount = 0;
  616         config->safe_down = 1;
  617         config->replication_socket =
  618             accept(msocket, (struct sockaddr *) &client_address,
  619                    &client_len);
  620         config->safe_down = 0;
  621         LDEBUG("replication_worker : connected");
  622         while (1) {
  623             if (OK != check_abort()) {
  624                 LINFO
  625                     ("Refuse replication data while filesystem is frozen or shutting down");
  626                 close(config->replication_socket);
  627                 break;
  628             }
  629             if (config->replication_enabled == 0) {
  630                 LINFO
  631                     ("replication_worker : replication is disabled, disconnect");
  632                 close(config->replication_socket);
  633                 break;
  634             }
  635             if (nakcount > 3) {
  636                 LINFO("replication_worker : nakcount > 3, disconnect");
  637                 close(config->replication_socket);
  638                 break;
  639             }
  640             result =
  641                 readstring(300, config->replication_socket, message, 64);
  642             if (result == TIMEOUT) {
  643                 LINFO("No START message after connect, handshake failure");
  644                 close(config->replication_socket);
  645                 break;
  646             }
  647             if (result == -1) {
  648                 LDEBUG
  649                     ("replication_worker : readstring %i close connection",
  650                      result);
  651                 close(config->replication_socket);
  652                 break;
  653             }
  654             if (result == 0) {
  655                 if (0 != strncmp("START :", message, strlen("START :"))) {
  656                     send_nak();
  657                     LDEBUG
  658                         ("replication_worker : readstring no valid start of message");
  659                     nakcount++;
  660                     continue;
  661                 }
  662                 p = strchr(message, ':');
  663                 p++;
  664                 msglen = atoi(p);
  665             } else {
  666                 send_nak();
  667                 LINFO("replication_worker : readstring no valid message");
  668                 nakcount++;
  669                 continue;
  670             }
  671             result =
  672                 fulltimWrite(5, config->replication_socket,
  673                              (unsigned char *) &msglen, sizeof(int));
  674             if (result != sizeof(int)) {
  675                 LINFO
  676                     ("replication_worker : failed to send ACK, disconnect");
  677                 close(config->replication_socket);
  678                 break;
  679             }
  680             result =
  681                 fulltimRead(10, config->replication_socket,
  682                             (unsigned char *) message, msglen);
  683             if (result != msglen) {
  684                 LINFO("replication_worker : got %i expected %i", result,
  685                       msglen);
  686                 send_nak();
  687                 nakcount++;
  688                 continue;
  689             }
  690             if (msglen < sizeof(int)) {
  691 #ifdef x86_64
  692                 LINFO("replication_worker : got %i bytes expected %lu",
  693                       msglen, sizeof(int));
  694 #else
  695                 LINFO("replication_worker : got %i bytes expected %u",
  696                       msglen, sizeof(int));
  697 #endif
  698                 send_nak();
  699                 nakcount++;
  700                 continue;
  701             }
  702             crccalc =
  703                 (char *) crc32((unsigned char *) message,
  704                                msglen - (sizeof(int)), 1);
  705             crcrec = &message[msglen - sizeof(int)];
  706             if (0 != memcmp(crccalc, crcrec, sizeof(int))) {
  707                 LINFO("crcsend != crcrecv %02x %02x : %02x %02x",
  708                       message[0], message[1],
  709                       message[msglen - sizeof(int) - 1],
  710                       message[msglen - sizeof(int)]);
  711                 free(crccalc);
  712                 send_nak();
  713                 nakcount++;
  714                 continue;
  715             }
  716             free(crccalc);
  717             send_ack();
  718             nakcount = 0;
  719             process_message(message, msglen - sizeof(int));
  720         }
  721     }
  722 // We never get here.
  723     s_free(message);
  724     pthread_exit(NULL);
  725 }
  726 
  727 void process_message(char *message, int msglen)
  728 {
  729 //message : db (char), op (char), ksize (int),vsize (int), key (char *),value (char *)
  730     REPLICATIONMSG replicationmsg;
  731     unsigned long long offset;
  732     unsigned long cursequence;
  733     unsigned long newsequence;
  734 
  735     replicationmsg.database = message[0];
  736     replicationmsg.operation = message[1];
  737     memcpy(&replicationmsg.ksize, &message[2], sizeof(int));
  738     memcpy(&replicationmsg.vsize, &message[2 + sizeof(int)], sizeof(int));
  739     replicationmsg.key = (unsigned char *) &message[2 + (2 * sizeof(int))];
  740     replicationmsg.value =
  741         (unsigned char *) &message[2 + (2 * sizeof(int)) +
  742                                    replicationmsg.ksize];
  743     while (config->frozen) {
  744         usleep(10000);
  745     }
  746     if (replicationmsg.operation == REPLWRITE) {
  747         LDEBUG("process_message : write keylen %i, vsize %i",
  748                replicationmsg.ksize, replicationmsg.vsize);
  749         if (replicationmsg.database == DBDTA) {
  750             bin_write_dbdata(DBDTA, replicationmsg.key,
  751                              replicationmsg.ksize, replicationmsg.value,
  752                              replicationmsg.vsize);
  753         }
  754         if (replicationmsg.database == DBU) {
  755             bin_write_dbdata(DBU, replicationmsg.key, replicationmsg.ksize,
  756                              replicationmsg.value, replicationmsg.vsize);
  757         }
  758         if (replicationmsg.database == DBB) {
  759             bin_write_dbdata(DBB, replicationmsg.key, replicationmsg.ksize,
  760                              replicationmsg.value, replicationmsg.vsize);
  761         }
  762         if (replicationmsg.database == DBP) {
  763             if (3 == replicationmsg.ksize) {
  764                 if (0 == memcmp(replicationmsg.key, "SEQ", 3)) {
  765                     cursequence = get_sequence();
  766                     memcpy(&newsequence, replicationmsg.value,
  767                            sizeof(newsequence));
  768                     if (newsequence != cursequence)
  769                         die_dataerr
  770                             ("replication log with sequence %lu while expecting %lu",
  771                              newsequence, cursequence);
  772                 }
  773             }
  774             bin_write_dbdata(DBP, replicationmsg.key, replicationmsg.ksize,
  775                              replicationmsg.value, replicationmsg.vsize);
  776             cachep2i_lock((char *) __PRETTY_FUNCTION__);
  777             tctreeclear(path2inotree);
  778             release_cachep2i_lock();
  779         }
  780         if (replicationmsg.database == DBS) {
  781             bin_write_dbdata(DBS, replicationmsg.key, replicationmsg.ksize,
  782                              replicationmsg.value, replicationmsg.vsize);
  783         }
  784         if (replicationmsg.database == DBDIRENT) {
  785             btbin_write_dbdata(DBDIRENT, replicationmsg.key,
  786                                replicationmsg.ksize, replicationmsg.value,
  787                                replicationmsg.vsize);
  788             cachep2i_lock((char *) __PRETTY_FUNCTION__);
  789             tctreeclear(path2inotree);
  790             release_cachep2i_lock();
  791         }
  792         if (replicationmsg.database == FREELIST) {
  793             btbin_write_dbdata(FREELIST, replicationmsg.key,
  794                                replicationmsg.ksize, replicationmsg.value,
  795                                replicationmsg.vsize);
  796         }
  797         if (replicationmsg.database == DBL) {
  798             btbin_write_dbdata(DBL, replicationmsg.key,
  799                                replicationmsg.ksize, replicationmsg.value,
  800                                replicationmsg.vsize);
  801             cachep2i_lock((char *) __PRETTY_FUNCTION__);
  802             tctreeclear(path2inotree);
  803             release_cachep2i_lock();
  804         }
  805         if (replicationmsg.database == FDBDTA) {
  806 // With a write to fdbdta we use the key as store for the offset.
  807             memcpy(&offset, replicationmsg.key, replicationmsg.ksize);
  808             s_lckpwrite(fdbdta, replicationmsg.value, replicationmsg.vsize,
  809                         offset);
  810         }
  811     }
  812     if (replicationmsg.operation == REPLDUPWRITE) {
  813         if (replicationmsg.database == DBDIRENT) {
  814             btbin_write_dup(DBDIRENT, replicationmsg.key,
  815                             replicationmsg.ksize, replicationmsg.value,
  816                             replicationmsg.vsize, LOCK);
  817         }
  818         if (replicationmsg.database == FREELIST) {
  819             btbin_write_dup(FREELIST, replicationmsg.key,
  820                             replicationmsg.ksize, replicationmsg.value,
  821                             replicationmsg.vsize, LOCK);
  822         }
  823         if (replicationmsg.database == DBL) {
  824             btbin_write_dup(DBL, replicationmsg.key, replicationmsg.ksize,
  825                             replicationmsg.value, replicationmsg.vsize,
  826                             LOCK);
  827         }
  828     }
  829     if (replicationmsg.operation == REPLDELETE) {
  830         if (replicationmsg.database == DBDTA) {
  831             delete_key(DBDTA, replicationmsg.key, replicationmsg.ksize,
  832                        NULL);
  833         }
  834         if (replicationmsg.database == DBU) {
  835             delete_key(DBU, replicationmsg.key, replicationmsg.ksize,
  836                        NULL);
  837         }
  838         if (replicationmsg.database == DBB) {
  839             delete_key(DBB, replicationmsg.key, replicationmsg.ksize,
  840                        NULL);
  841         }
  842         if (replicationmsg.database == DBP) {
  843             delete_key(DBP, replicationmsg.key, replicationmsg.ksize,
  844                        NULL);
  845         }
  846         if (replicationmsg.database == DBS) {
  847             delete_key(DBS, replicationmsg.key, replicationmsg.ksize,
  848                        NULL);
  849         }
  850     }
  851     if (replicationmsg.operation == TRANSACTIONCOMMIT) {
  852         get_global_lock((char *) __PRETTY_FUNCTION__);
  853         write_lock((char *) __PRETTY_FUNCTION__);
  854         flush_wait(0);
  855         purge_read_cache(0, 1, (char *) __PRETTY_FUNCTION__);
  856         release_write_lock();
  857         start_flush_commit();
  858         end_flush_commit();
  859         release_global_lock();
  860     }
  861     if (replicationmsg.operation == TRANSACTIONABORT) {
  862         abort_transactions();
  863     }
  864     if (replicationmsg.operation == REPLDELETECURKEY) {
  865         if (replicationmsg.database == DBDIRENT) {
  866             btdelete_curkey(DBDIRENT, replicationmsg.key,
  867                             replicationmsg.ksize, replicationmsg.value,
  868                             replicationmsg.vsize,
  869                             (char *) __PRETTY_FUNCTION__);
  870         }
  871         if (replicationmsg.database == FREELIST) {
  872             btdelete_curkey(FREELIST, replicationmsg.key,
  873                             replicationmsg.ksize, replicationmsg.value,
  874                             replicationmsg.vsize,
  875                             (char *) __PRETTY_FUNCTION__);
  876         }
  877         if (replicationmsg.database == DBL) {
  878             btdelete_curkey(DBL, replicationmsg.key, replicationmsg.ksize,
  879                             replicationmsg.value, replicationmsg.vsize,
  880                             (char *) __PRETTY_FUNCTION__);
  881         }
  882     }
  883     if (replicationmsg.operation == REPLSETNEXTOFFSET) {
  884         memcpy(&nextoffset, replicationmsg.key, replicationmsg.ksize);
  885     }
  886     return;
  887 }
  888 
  889 int send_nak()
  890 {
  891     int confirm = NAK;
  892     int result;
  893     FUNC;
  894     flush_recv_buffer();
  895     result =
  896         fulltimWrite(3, config->replication_socket,
  897                      (unsigned char *) &confirm, sizeof(int));
  898     if (result != sizeof(unsigned int)) {
  899         close(config->replication_socket);
  900         return (-1);
  901     }
  902     EFUNC;
  903     return (0);
  904 }
  905 
  906 int send_ack()
  907 {
  908     int confirm = ACK;
  909     int result;
  910     FUNC;
  911     result =
  912         fulltimWrite(3, config->replication_socket,
  913                      (unsigned char *) &confirm, sizeof(int));
  914     if (result != sizeof(unsigned int)) {
  915         close(config->replication_socket);
  916         return (-1);
  917     }
  918     EFUNC;
  919     return (0);
  920 }