lessfs  1.7.0
About: Lessfs is a inline data deduplicating file system for Linux (implemented in user space with FUSE; especially useful for backup purposes).
  Fossies Dox: lessfs-1.7.0.tar.gz  ("inofficial" and yet experimental doxygen-generated source code documentation)  

lib_repl.c
Go to the documentation of this file.
1 #ifdef HAVE_CONFIG_H
2 #include <config.h>
3 #endif
4 #ifndef LFATAL
5 #include "lib_log.h"
6 #endif
7 
8 #include <unistd.h>
9 #include <sys/types.h>
10 #include <stdio.h>
11 #include <stdlib.h>
12 #include <string.h>
13 #include <errno.h>
14 #include <sys/types.h>
15 #include <sys/stat.h>
16 #include <time.h>
17 #include <sys/types.h>
18 #include <fuse.h>
19 #include <fcntl.h>
20 #include <pthread.h>
21 #include <sys/socket.h>
22 #include <sys/un.h>
23 #include <sys/file.h>
24 
25 
26 #include <tcutil.h>
27 #include <tcbdb.h>
28 #include <tchdb.h>
29 #include <stdlib.h>
30 #include <stdbool.h>
31 #include <stdint.h>
32 #include <aio.h>
33 #include <mhash.h>
34 #include <mutils/mhash.h>
35 #include <sys/types.h>
36 #include <dirent.h>
37 
38 
39 #include "lib_safe.h"
40 #include "lib_cfg.h"
41 #include "lib_net.h"
42 #include "retcodes.h"
43 #ifdef LZO
44 #include "lib_lzo.h"
45 #endif
46 #include "lib_qlz.h"
47 #include "lib_common.h"
48 #include "lib_repl.h"
49 #include "lib_crypto.h"
50 #include "file_io.h"
51 
52 #ifdef BERKELEYDB
53 #include <db.h>
54 #include "lib_bdb.h"
55 #else
56 #ifndef HAMSTERDB
57 #include "lib_tc.h"
58 #else
59 #include "lib_hamster.h"
60 #endif
61 #endif
62 
63 
64 extern TCHDB *dbb;
65 extern TCHDB *dbu;
66 extern TCHDB *dbp;
67 extern TCBDB *dbl;
68 extern TCHDB *dbs;
69 extern TCHDB *dbdta;
70 extern TCBDB *dbdirent;
71 extern TCBDB *freelist;
72 extern TCTREE *workqtree;
73 extern TCTREE *readcachetree;
74 extern TCTREE *path2inotree;
75 extern int fdbdta;
76 extern int frepl;
77 extern int freplbl;
78 int rrepl;
79 extern int BLKSIZE;
80 extern unsigned long long nextoffset;
81 
82 unsigned char *crc32(unsigned char *buf, int size, int thread_number)
83 {
84  MHASH td[MAX_ALLOWED_THREADS];
85  unsigned char *hash[thread_number];
86 
87  td[thread_number] = mhash_init(MHASH_CRC32);
88  if (td[thread_number] == MHASH_FAILED)
89  exit(1);
90 
91  mhash(td[thread_number], buf, size);
92  hash[thread_number] = mhash_end(td[thread_number]);
93  return hash[thread_number];
94 }
95 
97 {
98  int ret = OK;
99  if (config->shutdown || config->replication_enabled == 0) {
100  config->safe_down = 1;
101  ret = FAIL;
102  }
103  return (ret);
104 }
105 
107 {
108  unsigned int msgsize;
109  int result;
110  char *data;
111  struct stat stbuf;
112  unsigned long offset = 0;
113  unsigned long line = 0;
114  REPLICATIONMSG replicationmsg;
115  char *crccalc;
116  char *crcrec;
117 
118  LINFO("merge_replog");
119  if (-1 == fstat(rrepl, &stbuf))
120  die_syserr();
121  if (0 == stbuf.st_size) {
122  fprintf(stderr, "This replication logfile is empty\n");
123  return;
124  }
125  lseek(rrepl, offset, SEEK_SET);
126  while (1) {
127  line++;
128  result = fullRead(rrepl, (unsigned char *) &msgsize, sizeof(int));
129  if (sizeof(int) != result) {
130  break;
131  }
132  offset += sizeof(int);
133  if (msgsize > 2 * BLKSIZE) {
134  fprintf(stderr,
135  "create_report : corrupt replog file detected : msgsize %u",
136  msgsize);
137  die_dataerr("create_report : corrupt replog file detected");
138  }
139  data = s_zmalloc(msgsize);
140  result = fullRead(rrepl, (unsigned char *) data, msgsize);
141  if (result != msgsize)
142  die_syserr();
143  if (data[msgsize - 1] != '~') {
144  offset += msgsize;
145  s_free(data);
146  lseek(rrepl, offset, SEEK_SET);
147  printf("SKIP\n");
148  continue;
149  }
150  replicationmsg.database = data[0];
151  replicationmsg.operation = data[1];
152  memcpy(&replicationmsg.ksize, &data[2], sizeof(int));
153  memcpy(&replicationmsg.vsize, &data[2 + sizeof(int)], sizeof(int));
154  replicationmsg.key =
155  (unsigned char *) &data[2 + (2 * sizeof(int))];
156  replicationmsg.value =
157  (unsigned char *) &data[2 + (2 * sizeof(int)) +
158  replicationmsg.ksize];
159  crccalc =
160  (char *) crc32((unsigned char *) data,
161  msgsize - ((sizeof(int) + 1)), 1);
162  crcrec = &data[msgsize - (sizeof(int) + 1)];
163  if (0 != memcmp(crccalc, crcrec, sizeof(int))) {
164  LINFO("crcsend != crcrecv %02x %02x : %02x %02x",
165  data[0], data[1],
166  data[msgsize - sizeof(int) - 1],
167  data[msgsize - sizeof(int)]);
168  free(crccalc);
169  die_dataerr("CRC errors in replication logfile, abort");
170  }
171  free(crccalc);
172  if (replicationmsg.operation != TRANSACTIONCOMMIT &&
173  replicationmsg.operation != TRANSACTIONABORT) {
174  process_message(data, msgsize - (1 + sizeof(int)));
175  }
176  offset += msgsize;
177  s_free(data);
178  lseek(rrepl, offset, SEEK_SET);
179  }
180  return;
181 }
182 
183 
184 // Try to send the replication backlog.
185 // 0 if OK, 1 if failed.
187 {
188  unsigned int msgsize;
189  int ret = OK;
190  int retry = 0;
191  int result;
192  unsigned char *flagwritten = (unsigned char *) "Z";
193  char *data;
194  char *msgstart;
195  unsigned int remote;
196  struct stat stbuf;
197  unsigned long long offset;
198  bool dotrunc = 0;
199  bool connected = 0;
200 
201  FUNC;
202  if (config->frozen)
203  return (FAIL);
204  if (config->replication_enabled == 0)
205  return (FAIL);
206  lseek(freplbl, 0, SEEK_SET);
207  offset = 0;
208  if (-1 == fstat(freplbl, &stbuf))
209  die_syserr();
210  if (0 == stbuf.st_size) {
211  LDEBUG("send_backlog : replog is empty %lu, %lu", stbuf.st_size,
212  stbuf.st_ino);
213  return (ret);
214  }
215  LDEBUG("send_backlog : start sending the backlog to the slave");
216  final_run:
217  while (1) {
218  result =
219  fullRead(freplbl, (unsigned char *) &msgsize, sizeof(int));
220  if (sizeof(int) != result)
221  break;
222  offset = offset + sizeof(int);
223  LDEBUG("msgsize=%i", msgsize);
224  if ( 0 == msgsize ) die_dataerr("send_backlog : corrupt replog file detected, msgsize=0");
225  if (msgsize > 2 * BLKSIZE)
226  die_dataerr("send_backlog : corrupt replog file detected");
227  data = s_zmalloc(msgsize);
228  result = fullRead(freplbl, (unsigned char *) data, msgsize);
229  if (result != msgsize) {
230  s_free(data);
231  LFATAL("send_backlog : !msgsize");
232  return (FAIL);
233  }
234  if (data[msgsize - 1] != '~') {
235  s_free(data);
236  offset = offset + msgsize;
237  lseek(freplbl, offset, SEEK_SET);
238  LDEBUG
239  ("send_backlog : has been written before, skip (not an error)");
240  continue;
241  }
242  if (!connected) {
243  while (OK != reconnect()) {
244  sleep(1); // Make sure we don't flood the other size with SYN's
245  LINFO("send_backlog : failed to connect to the slave");
246  }
247  connected = 1;
248  }
249  msgstart =
250  as_sprintf(__FILE__, __LINE__, "START : %i", msgsize - 1);
251  while (1) {
252  if (OK != (ret = check_abort()))
253  break;
254  result =
256  (unsigned char *) msgstart,
257  strlen(msgstart) + 1);
258  if (result != strlen(msgstart) + 1) {
259  LINFO("send_backlog : write msgstart failed");
260  while (OK != reconnect()) {
261  sleep(1); // Make sure we don't flood the other size with SYN's
262  LINFO("send_backlog : failed to connect to the slave");
263  }
264  continue;
265  }
266  result =
268  (unsigned char *) &remote,
269  sizeof(unsigned int));
270  if (OK != (ret = check_abort()))
271  break;
272  if (remote != msgsize - 1) {
273  if (OK != reconnect()) {
274  ret = FAIL;
275  break;
276  }
277  LINFO("send_backlog : invalid message size");
278  continue;
279  }
280 
281  result =
283  (unsigned char *) data, msgsize - 1);
284  if (OK != (ret = check_abort()))
285  break;
286  if (result != msgsize - 1) {
287  LINFO
288  ("send_backlog : failed to write the message, reset connection");
289  while (OK != reconnect()) {
290  sleep(1); // Make sure we don't flood the other size with SYN's
291  LINFO("send_backlog : failed to connect to the slave");
292  }
293  continue;
294  }
295  result =
297  (unsigned char *) &remote,
298  sizeof(unsigned int));
299  if (OK != (ret = check_abort()))
300  break;
301  if (remote == ACK) {
302  retry = 0;
303  LDEBUG("send_backlog : remote == ACK");
304  break;
305  }
306  LINFO("send_backlog : no ACK on %02x %02x : %02x %02x",
307  (u_char) data[0], (u_char) data[1],
308  (u_char) data[msgsize - sizeof(int) - 1],
309  (u_char) data[msgsize - sizeof(int)]);
310  while (OK != reconnect()) {
311  sleep(1); // Make sure we don't flood the other size with SYN's
312  LINFO("send_backlog : failed to connect to the slave");
313  }
314  if (OK != (ret = check_abort()))
315  break;
316  }
317  retry = 0;
318  s_free(msgstart);
319  s_free(data);
320  if (ret == FAIL)
321  break;
322  lseek(freplbl, offset + msgsize - 1, SEEK_SET);
323  result = fullWrite(freplbl, flagwritten, 1);
324  if (result != sizeof(unsigned char))
326  ("send_backlog : write error on disk: this should never happen");
327  offset = offset + msgsize;
328  lseek(freplbl, offset, SEEK_SET);
329  }
330  if (ret == OK) {
331  if (dotrunc == 0)
332  repl_lock((char *) __PRETTY_FUNCTION__);
333  if (0 == dotrunc) {
334  dotrunc = 1;
335  goto final_run;
336  }
337  lseek(freplbl, 0, SEEK_SET);
338  if (-1 == ftruncate(freplbl, 0))
339  die_dataerr("Failed to truncate replog");
341  }
342  if (ret == OK) {
343  LDEBUG("send_backlog : send_backlog returns OK");
344  } else {
345  if (config->shutdown == 0) {
346  LINFO("send_backlog : return FAIL");
347  } else
348  LINFO
349  ("send_backlog : refuse request, going down after shutdown request");
350  }
351  close(config->replication_socket);
352  return (ret);
353 }
354 
356 {
357  char *newfile;
358  struct stat stbuf;
359  unsigned long sequence;
360 
361  FUNC;
363  newfile =
364  as_sprintf(__FILE__, __LINE__, "%s-%lu",
366  if (-1 != stat(newfile, &stbuf)) {
367  die_dataerr("Replication logfile with name %s already exists",
368  newfile);
369  }
370  flock(frepl, LOCK_UN);
371  fsync(frepl);
372  close(frepl);
373  close(freplbl);
374  if (0 != rename(config->replication_logfile, newfile))
375  die_dataerr("Failed to rename %s to %s",
376  config->replication_logfile, newfile);
377  if (-1 ==
378  (frepl =
379  s_open2(config->replication_logfile, O_CREAT | O_RDWR, S_IRWXU)))
380  die_syserr();
381  if (0 != flock(frepl, LOCK_EX | LOCK_NB)) {
382  LFATAL("Failed to lock the replication logfile %s",
384  exit(EXIT_USAGE);
385  }
386  if (-1 ==
387  (freplbl = s_open2(config->replication_logfile, O_RDWR, S_IRWXU)))
388  die_syserr();
389  next_sequence();
391 
392  s_free(newfile);
393  EFUNC;
394  return;
395 }
396 
397 void write_replication_data(unsigned char db, unsigned char op, char *key,
398  int ksize, char *value, int vsize,
399  int threadnr)
400 {
401  char *replicationmsg;
402  int replicationmsg_size;
403  int result;
404  char *crc;
405 
406 // replication_msg : db (char), op (char), ksize (int),vsize (int), key (char *),value (char *), crc32 (int)
407  replicationmsg_size =
408  ksize + vsize + (sizeof(unsigned char) * 2) + (sizeof(int) * 2) +
409  sizeof(int);
410  replicationmsg = s_zmalloc(replicationmsg_size + 1);
411  replicationmsg[0] = db;
412  replicationmsg[1] = op;
413  memcpy(replicationmsg + (sizeof(unsigned char) * 2), &ksize,
414  sizeof(int));
415  memcpy(replicationmsg + (sizeof(unsigned char) * 2) + sizeof(int),
416  &vsize, sizeof(int));
417  memcpy(replicationmsg + (sizeof(unsigned char) * 2) +
418  (sizeof(int) * 2), key, ksize);
419  if (op == REPLWRITE || op == REPLDUPWRITE || op == REPLDELETECURKEY) {
420  memcpy(replicationmsg + ksize + (sizeof(unsigned char) * 2) +
421  (2 * sizeof(int)), value, vsize);
422 // For safety we will calc a crc32 checksum of the message.
423 // drbd has seen cases where data received was garbled despite the
424 // use of tcp as transfer protocol.
425  }
426  crc =
427  (char *) crc32((unsigned char *) replicationmsg,
428  replicationmsg_size - sizeof(int), threadnr);
429  memcpy(replicationmsg + replicationmsg_size - sizeof(int), crc,
430  sizeof(int));
431  free(crc);
432 
434  LDEBUG("write to replog, msgsize = %u", replicationmsg_size);
435  lseek(frepl, 0, SEEK_END);
436  replicationmsg_size++;
437  result =
438  fullWrite(frepl, (unsigned char *) &replicationmsg_size,
439  sizeof(int));
440  replicationmsg[replicationmsg_size - 1] = '~';
441  // Add one byte that will be used later to flag when a message has been send succesfully.
442  result =
443  fullWrite(frepl, (unsigned char *) replicationmsg,
444  replicationmsg_size);
445  if (result != replicationmsg_size)
446  die_dataerr("Failed to write to replication log");
447  s_free(replicationmsg);
448 }
449 
450 void write_repl_data(unsigned char db, unsigned char op, char *key,
451  int ksize, char *value, int vsize, int threadnr)
452 {
453  struct stat stbuf;
454  //unsigned int curtime = time(NULL);
455 
456  FUNC;
457  repl_lock((char *) __PRETTY_FUNCTION__);
458  if (-1 == fstat(frepl, &stbuf))
459  die_syserr();
460  /*if (0 == strcmp(config->replication_partner_ip, "-1")
461  && config->replication && config->replication_role == 0) {
462  if (stbuf.st_size > config->rotate_replog_size
463  || curtime - config->replication_last_rotated > REPLOG_DELAY) {
464  config->replication_last_rotated = time(NULL);
465  rotate_replog();
466  }
467  }*/
468  if (0 != stbuf.st_size) {
470  } else
472  write_replication_data(db, op, key, ksize, value, vsize, threadnr);
474  EFUNC;
475  return;
476 }
477 
479 {
480  int retry = 0;
481  close(config->replication_socket);
482  while (1) {
486  if (config->replication_socket != -1)
487  break;
488  if (OK != check_abort())
489  return (FAIL);
490  LINFO("Retry connect to %s:%s", config->replication_partner_ip,
492  retry++;
493  sleep(retry);
494  if (retry > 3)
495  return (FAIL);
496  }
497  return (OK);
498 }
499 
500 
502 {
503  char trash;
504  while (0 <
506  (unsigned char *) &trash, sizeof(char)));
507  return;
508 }
509 
510 void watchdir()
511 {
512  struct stat stbuf;
513  DIR *dp = NULL;
514  struct dirent *entry;
515  char *name;
516  unsigned long sequence;
517  bool found = 0;
518  int sleeptime = WATCHDIR_SLEEPINTERVAL;
519 
520  LINFO("watchdir : open %s", config->replication_watchdir);
521  if (-1 == stat(config->replication_watchdir, &stbuf))
522  die_dataerr("watchdir : Failed to stat %s",
524  if (-1 == chdir(config->replication_watchdir))
525  die_dataerr("watchdir : Failed to chdir to %s",
527  while (1) {
528  if (config->shutdown)
529  break;
530  while (config->frozen) {
531  sleep(1);
532  }
533  if (NULL == (dp = (opendir(config->replication_watchdir)))) {
534  LFATAL("watchdir : Failed to open %s\n",
536  exit(EXIT_WDIR);
537  }
538  while (entry = readdir(dp)) {
540  if (-1 == stat(entry->d_name, &stbuf)) {
541  LINFO("watchdir: Error on stat %s\n", entry->d_name);
542  continue;
543  }
544  if (S_ISDIR(stbuf.st_mode)) {
545  continue;
546  }
547  name =
548  as_sprintf(__FILE__, __LINE__, "replog.dta-%lu", sequence);
549  if (0 != strcmp(name, entry->d_name)) {
550  s_free(name);
551  continue;
552  }
553  if (-1 == (rrepl = s_open2(name, O_RDWR, S_IRWXU)))
554  die_syserr();
555  merge_replog();
556  found = 1;
557  next_sequence();
560  s_free(name);
561  name =
562  as_sprintf(__FILE__, __LINE__, "replog.dta-%lu-processed",
563  sequence);
564  if (0 != rename(entry->d_name, name))
565  die_dataerr("Failed to rename %s to %s", entry->d_name,
566  name);
567  s_free(name);
568  }
569  closedir(dp);
570  if (found) {
571  sleeptime = 0;
572  } else if (sleeptime < WATCHDIR_SLEEPINTERVAL)
573  sleeptime++;
574  sleep(sleeptime);
575  found = 0;
576  }
577  config->safe_down = 1;
578 }
579 
580 void *replication_worker(void *arg)
581 {
582  int msocket;
583  const char *proto = "tcp";
584  struct sockaddr_un client_address;
585  socklen_t client_len;
586  char *message = NULL;
587  int result;
588  int msglen = 0;
589  char *p;
590  char *crcrec;
591  char *crccalc;
592  unsigned int nakcount = 0;
593 
595  watchdir();
596  pthread_exit(NULL);
597  }
598  msocket = -1;
599  if (NULL == config->replication_listen_port)
601 
602  while (1) {
603  msocket =
606  if (msocket != -1)
607  break;
608  LINFO("replication_worker : serverinit failed: retry");
609  sleep(1);
610  close(msocket);
611  }
612  message = s_zmalloc(BLKSIZE * 2); // Enough space to hold the message and the kitchen sink
613  client_len = sizeof(client_address);
614  while (1) {
615  nakcount = 0;
616  config->safe_down = 1;
618  accept(msocket, (struct sockaddr *) &client_address,
619  &client_len);
620  config->safe_down = 0;
621  LDEBUG("replication_worker : connected");
622  while (1) {
623  if (OK != check_abort()) {
624  LINFO
625  ("Refuse replication data while filesystem is frozen or shutting down");
626  close(config->replication_socket);
627  break;
628  }
629  if (config->replication_enabled == 0) {
630  LINFO
631  ("replication_worker : replication is disabled, disconnect");
632  close(config->replication_socket);
633  break;
634  }
635  if (nakcount > 3) {
636  LINFO("replication_worker : nakcount > 3, disconnect");
637  close(config->replication_socket);
638  break;
639  }
640  result =
641  readstring(300, config->replication_socket, message, 64);
642  if (result == TIMEOUT) {
643  LINFO("No START message after connect, handshake failure");
644  close(config->replication_socket);
645  break;
646  }
647  if (result == -1) {
648  LDEBUG
649  ("replication_worker : readstring %i close connection",
650  result);
651  close(config->replication_socket);
652  break;
653  }
654  if (result == 0) {
655  if (0 != strncmp("START :", message, strlen("START :"))) {
656  send_nak();
657  LDEBUG
658  ("replication_worker : readstring no valid start of message");
659  nakcount++;
660  continue;
661  }
662  p = strchr(message, ':');
663  p++;
664  msglen = atoi(p);
665  } else {
666  send_nak();
667  LINFO("replication_worker : readstring no valid message");
668  nakcount++;
669  continue;
670  }
671  result =
673  (unsigned char *) &msglen, sizeof(int));
674  if (result != sizeof(int)) {
675  LINFO
676  ("replication_worker : failed to send ACK, disconnect");
677  close(config->replication_socket);
678  break;
679  }
680  result =
682  (unsigned char *) message, msglen);
683  if (result != msglen) {
684  LINFO("replication_worker : got %i expected %i", result,
685  msglen);
686  send_nak();
687  nakcount++;
688  continue;
689  }
690  if (msglen < sizeof(int)) {
691 #ifdef x86_64
692  LINFO("replication_worker : got %i bytes expected %lu",
693  msglen, sizeof(int));
694 #else
695  LINFO("replication_worker : got %i bytes expected %u",
696  msglen, sizeof(int));
697 #endif
698  send_nak();
699  nakcount++;
700  continue;
701  }
702  crccalc =
703  (char *) crc32((unsigned char *) message,
704  msglen - (sizeof(int)), 1);
705  crcrec = &message[msglen - sizeof(int)];
706  if (0 != memcmp(crccalc, crcrec, sizeof(int))) {
707  LINFO("crcsend != crcrecv %02x %02x : %02x %02x",
708  message[0], message[1],
709  message[msglen - sizeof(int) - 1],
710  message[msglen - sizeof(int)]);
711  free(crccalc);
712  send_nak();
713  nakcount++;
714  continue;
715  }
716  free(crccalc);
717  send_ack();
718  nakcount = 0;
719  process_message(message, msglen - sizeof(int));
720  }
721  }
722 // We never get here.
723  s_free(message);
724  pthread_exit(NULL);
725 }
726 
727 void process_message(char *message, int msglen)
728 {
729 //message : db (char), op (char), ksize (int),vsize (int), key (char *),value (char *)
730  REPLICATIONMSG replicationmsg;
731  unsigned long long offset;
732  unsigned long cursequence;
733  unsigned long newsequence;
734 
735  replicationmsg.database = message[0];
736  replicationmsg.operation = message[1];
737  memcpy(&replicationmsg.ksize, &message[2], sizeof(int));
738  memcpy(&replicationmsg.vsize, &message[2 + sizeof(int)], sizeof(int));
739  replicationmsg.key = (unsigned char *) &message[2 + (2 * sizeof(int))];
740  replicationmsg.value =
741  (unsigned char *) &message[2 + (2 * sizeof(int)) +
742  replicationmsg.ksize];
743  while (config->frozen) {
744  usleep(10000);
745  }
746  if (replicationmsg.operation == REPLWRITE) {
747  LDEBUG("process_message : write keylen %i, vsize %i",
748  replicationmsg.ksize, replicationmsg.vsize);
749  if (replicationmsg.database == DBDTA) {
750  bin_write_dbdata(DBDTA, replicationmsg.key,
751  replicationmsg.ksize, replicationmsg.value,
752  replicationmsg.vsize);
753  }
754  if (replicationmsg.database == DBU) {
755  bin_write_dbdata(DBU, replicationmsg.key, replicationmsg.ksize,
756  replicationmsg.value, replicationmsg.vsize);
757  }
758  if (replicationmsg.database == DBB) {
759  bin_write_dbdata(DBB, replicationmsg.key, replicationmsg.ksize,
760  replicationmsg.value, replicationmsg.vsize);
761  }
762  if (replicationmsg.database == DBP) {
763  if (3 == replicationmsg.ksize) {
764  if (0 == memcmp(replicationmsg.key, "SEQ", 3)) {
765  cursequence = get_sequence();
766  memcpy(&newsequence, replicationmsg.value,
767  sizeof(newsequence));
768  if (newsequence != cursequence)
770  ("replication log with sequence %lu while expecting %lu",
771  newsequence, cursequence);
772  }
773  }
774  bin_write_dbdata(DBP, replicationmsg.key, replicationmsg.ksize,
775  replicationmsg.value, replicationmsg.vsize);
776  cachep2i_lock((char *) __PRETTY_FUNCTION__);
777  tctreeclear(path2inotree);
779  }
780  if (replicationmsg.database == DBS) {
781  bin_write_dbdata(DBS, replicationmsg.key, replicationmsg.ksize,
782  replicationmsg.value, replicationmsg.vsize);
783  }
784  if (replicationmsg.database == DBDIRENT) {
785  btbin_write_dbdata(DBDIRENT, replicationmsg.key,
786  replicationmsg.ksize, replicationmsg.value,
787  replicationmsg.vsize);
788  cachep2i_lock((char *) __PRETTY_FUNCTION__);
789  tctreeclear(path2inotree);
791  }
792  if (replicationmsg.database == FREELIST) {
793  btbin_write_dbdata(FREELIST, replicationmsg.key,
794  replicationmsg.ksize, replicationmsg.value,
795  replicationmsg.vsize);
796  }
797  if (replicationmsg.database == DBL) {
798  btbin_write_dbdata(DBL, replicationmsg.key,
799  replicationmsg.ksize, replicationmsg.value,
800  replicationmsg.vsize);
801  cachep2i_lock((char *) __PRETTY_FUNCTION__);
802  tctreeclear(path2inotree);
804  }
805  if (replicationmsg.database == FDBDTA) {
806 // With a write to fdbdta we use the key as store for the offset.
807  memcpy(&offset, replicationmsg.key, replicationmsg.ksize);
808  s_lckpwrite(fdbdta, replicationmsg.value, replicationmsg.vsize,
809  offset);
810  }
811  }
812  if (replicationmsg.operation == REPLDUPWRITE) {
813  if (replicationmsg.database == DBDIRENT) {
814  btbin_write_dup(DBDIRENT, replicationmsg.key,
815  replicationmsg.ksize, replicationmsg.value,
816  replicationmsg.vsize, LOCK);
817  }
818  if (replicationmsg.database == FREELIST) {
819  btbin_write_dup(FREELIST, replicationmsg.key,
820  replicationmsg.ksize, replicationmsg.value,
821  replicationmsg.vsize, LOCK);
822  }
823  if (replicationmsg.database == DBL) {
824  btbin_write_dup(DBL, replicationmsg.key, replicationmsg.ksize,
825  replicationmsg.value, replicationmsg.vsize,
826  LOCK);
827  }
828  }
829  if (replicationmsg.operation == REPLDELETE) {
830  if (replicationmsg.database == DBDTA) {
831  delete_key(DBDTA, replicationmsg.key, replicationmsg.ksize,
832  NULL);
833  }
834  if (replicationmsg.database == DBU) {
835  delete_key(DBU, replicationmsg.key, replicationmsg.ksize,
836  NULL);
837  }
838  if (replicationmsg.database == DBB) {
839  delete_key(DBB, replicationmsg.key, replicationmsg.ksize,
840  NULL);
841  }
842  if (replicationmsg.database == DBP) {
843  delete_key(DBP, replicationmsg.key, replicationmsg.ksize,
844  NULL);
845  }
846  if (replicationmsg.database == DBS) {
847  delete_key(DBS, replicationmsg.key, replicationmsg.ksize,
848  NULL);
849  }
850  }
851  if (replicationmsg.operation == TRANSACTIONCOMMIT) {
852  get_global_lock((char *) __PRETTY_FUNCTION__);
853  write_lock((char *) __PRETTY_FUNCTION__);
854  flush_wait(0);
855  purge_read_cache(0, 1, (char *) __PRETTY_FUNCTION__);
860  }
861  if (replicationmsg.operation == TRANSACTIONABORT) {
863  }
864  if (replicationmsg.operation == REPLDELETECURKEY) {
865  if (replicationmsg.database == DBDIRENT) {
866  btdelete_curkey(DBDIRENT, replicationmsg.key,
867  replicationmsg.ksize, replicationmsg.value,
868  replicationmsg.vsize,
869  (char *) __PRETTY_FUNCTION__);
870  }
871  if (replicationmsg.database == FREELIST) {
872  btdelete_curkey(FREELIST, replicationmsg.key,
873  replicationmsg.ksize, replicationmsg.value,
874  replicationmsg.vsize,
875  (char *) __PRETTY_FUNCTION__);
876  }
877  if (replicationmsg.database == DBL) {
878  btdelete_curkey(DBL, replicationmsg.key, replicationmsg.ksize,
879  replicationmsg.value, replicationmsg.vsize,
880  (char *) __PRETTY_FUNCTION__);
881  }
882  }
883  if (replicationmsg.operation == REPLSETNEXTOFFSET) {
884  memcpy(&nextoffset, replicationmsg.key, replicationmsg.ksize);
885  }
886  return;
887 }
888 
889 int send_nak()
890 {
891  int confirm = NAK;
892  int result;
893  FUNC;
895  result =
897  (unsigned char *) &confirm, sizeof(int));
898  if (result != sizeof(unsigned int)) {
899  close(config->replication_socket);
900  return (-1);
901  }
902  EFUNC;
903  return (0);
904 }
905 
906 int send_ack()
907 {
908  int confirm = ACK;
909  int result;
910  FUNC;
911  result =
913  (unsigned char *) &confirm, sizeof(int));
914  if (result != sizeof(unsigned int)) {
915  close(config->replication_socket);
916  return (-1);
917  }
918  EFUNC;
919  return (0);
920 }
REPLICATIONMSG::vsize
int vsize
Definition: lib_repl.h:8
lib_hamster.h
lib_qlz.h
next_sequence
void next_sequence()
Definition: lib_common.c:1444
commit_transactions
void commit_transactions()
Definition: lib_tc.c:1407
lib_safe.h
cachep2i_lock
void cachep2i_lock(const char *msg)
Definition: lib_common.c:701
TIMEOUT
#define TIMEOUT
Definition: lib_net.h:21
configdata::replication_socket
int replication_socket
Definition: lib_cfg.h:64
send_nak
int send_nak()
Definition: lib_repl.c:889
start_flush_commit
void start_flush_commit()
Definition: lib_tc.c:1440
lib_common.h
configdata::replication_partner_port
char * replication_partner_port
Definition: lib_cfg.h:66
DBL
#define DBL
Definition: lib_repl.h:26
readstring
int readstring(int sec, int sockid, char *buf, int maxlen)
Definition: lib_net.c:83
FUNC
#define FUNC
Definition: lib_log.h:76
MAX_ALLOWED_THREADS
#define MAX_ALLOWED_THREADS
Definition: lib_common.h:13
EFUNC
#define EFUNC
Definition: lib_log.h:77
FAIL
#define FAIL
Definition: retcodes.h:33
dbl
TCBDB * dbl
Definition: lib_common.c:101
TRANSACTIONABORT
#define TRANSACTIONABORT
Definition: lib_repl.h:19
check_abort
int check_abort()
Definition: lib_repl.c:96
configdata::replication_backlog
int replication_backlog
Definition: lib_cfg.h:71
process_message
void process_message(char *message, int msglen)
Definition: lib_repl.c:727
REPLDELETE
#define REPLDELETE
Definition: lib_repl.h:13
DBDTA
#define DBDTA
Definition: lib_repl.h:21
die_dataerr
#define die_dataerr(f...)
Definition: file_io.c:94
REPLICATIONMSG
Definition: lib_repl.h:2
lib_log.h
REPLICATIONMSG::key
unsigned char * key
Definition: lib_repl.h:5
fullWrite
int fullWrite(int fd, unsigned char *buf, int len)
Definition: lib_net.c:419
rotate_replog
void rotate_replog()
Definition: lib_repl.c:355
serverinit
int serverinit(const char *addr, const char *port, const char *proto)
Definition: lib_net.c:293
path2inotree
TCTREE * path2inotree
Definition: lib_common.c:110
dbp
TCHDB * dbp
Definition: lib_common.c:100
FDBDTA
#define FDBDTA
Definition: lib_repl.h:31
REPLICATIONMSG::database
unsigned char database
Definition: lib_repl.h:3
dbb
TCHDB * dbb
Definition: lib_common.c:98
configdata::replication_listen_ip
char * replication_listen_ip
Definition: lib_cfg.h:67
configdata::shutdown
int shutdown
Definition: lib_cfg.h:76
configdata::replication_partner_ip
char * replication_partner_ip
Definition: lib_cfg.h:65
rrepl
int rrepl
Definition: lib_repl.c:78
get_sequence
unsigned long get_sequence()
Definition: lib_common.c:1428
flush_wait
void flush_wait(unsigned long long inode)
Definition: lib_common.c:2564
release_repl_lock
void release_repl_lock()
Definition: lib_common.c:923
TRANSACTIONCOMMIT
#define TRANSACTIONCOMMIT
Definition: lib_repl.h:18
freplbl
int freplbl
Definition: lib_tc.c:94
freelist
TCBDB * freelist
Definition: lib_common.c:105
dbs
TCHDB * dbs
Definition: lib_common.c:102
dbdta
TCHDB * dbdta
Definition: lib_common.c:103
bin_write_dbdata
void bin_write_dbdata(int, void *, int, void *, int)
Definition: lib_tc.c:658
as_sprintf
void * as_sprintf(char *file, unsigned int line, const char *fmt,...)
Definition: lib_safe.c:564
release_global_lock
void release_global_lock()
Definition: lib_common.c:637
reconnect
int reconnect()
Definition: lib_repl.c:478
configdata::safe_down
int safe_down
Definition: lib_cfg.h:78
DBB
#define DBB
Definition: lib_repl.h:23
get_global_lock
void get_global_lock(const char *msg)
Definition: lib_common.c:616
release_write_lock
void release_write_lock()
Definition: lib_common.c:694
BLKSIZE
int BLKSIZE
Definition: commons.h:8
EXIT_WDIR
#define EXIT_WDIR
Definition: retcodes.h:29
merge_replog
void merge_replog()
Definition: lib_repl.c:106
s_lckpwrite
int s_lckpwrite(int fd, const void *buf, size_t len, off_t off)
Definition: lib_safe.c:198
lib_tc.h
btdelete_curkey
int btdelete_curkey(int, void *, int, void *, int, const char *)
Definition: lib_tc.c:854
LINFO
#define LINFO(f...)
Definition: lib_log.h:69
lib_bdb.h
s_zmalloc
#define s_zmalloc(size)
Definition: lib_safe.h:24
configdata::replication_watchdir
char * replication_watchdir
Definition: lib_cfg.h:44
start_transactions
void start_transactions()
Definition: lib_tc.c:1463
OK
#define OK
Definition: retcodes.h:32
LFATAL
#define LFATAL(f...)
Definition: lib_log.h:68
lib_crypto.h
lib_net.h
fulltimWrite
int fulltimWrite(int sec, int fd, unsigned char *buf, int len)
Definition: lib_net.c:371
frepl
int frepl
Definition: lib_tc.c:93
LOCK
#define LOCK
Definition: lib_common.h:5
flush_recv_buffer
void flush_recv_buffer()
Definition: lib_repl.c:501
LDEBUG
#define LDEBUG(f...)
Definition: lib_log.h:78
crc32
unsigned char * crc32(unsigned char *buf, int size, int thread_number)
Definition: lib_repl.c:82
send_backlog
int send_backlog()
Definition: lib_repl.c:186
REPLDUPWRITE
#define REPLDUPWRITE
Definition: lib_repl.h:15
file_io.h
btbin_write_dbdata
void btbin_write_dbdata(int, void *, int, void *, int)
Definition: lib_tc.c:612
replication_worker
void * replication_worker(void *arg)
Definition: lib_repl.c:580
btbin_write_dup
void btbin_write_dup(int, void *, int, void *, int, bool)
Definition: lib_tc.c:581
configdata::frozen
int frozen
Definition: lib_cfg.h:77
clientconnect
int clientconnect(const char *host, const char *port, const char *proto)
Definition: lib_net.c:253
REPLWRITE
#define REPLWRITE
Definition: lib_repl.h:12
fulltimRead
int fulltimRead(int sec, int fd, unsigned char *buf, int len)
Definition: lib_net.c:348
DBP
#define DBP
Definition: lib_repl.h:24
ACK
#define ACK
Definition: lib_repl.h:34
retcodes.h
dbu
TCHDB * dbu
Definition: lib_common.c:99
config
struct configdata * config
Definition: lib_cfg.h:91
lib_repl.h
EXIT_USAGE
#define EXIT_USAGE
Definition: retcodes.h:23
DBDIRENT
#define DBDIRENT
Definition: lib_repl.h:28
NAK
#define NAK
Definition: lib_repl.h:35
abort_transactions
void abort_transactions()
Definition: lib_tc.c:1373
configdata::replication_listen_port
char * replication_listen_port
Definition: lib_cfg.h:68
lib_cfg.h
DBS
#define DBS
Definition: lib_repl.h:25
release_cachep2i_lock
void release_cachep2i_lock()
Definition: lib_common.c:721
configdata::replication_enabled
int replication_enabled
Definition: lib_cfg.h:70
purge_read_cache
void purge_read_cache(unsigned long long inode, bool force, char *caller)
Definition: lib_common.c:2645
delete_key
void delete_key(int, void *, int, const char *)
Definition: lib_tc.c:1007
REPLSETNEXTOFFSET
#define REPLSETNEXTOFFSET
Definition: lib_repl.h:17
fdbdta
int fdbdta
Definition: lib_tc.c:92
REPLICATIONMSG::value
unsigned char * value
Definition: lib_repl.h:7
REPLICATIONMSG::ksize
int ksize
Definition: lib_repl.h:6
sequence
unsigned long sequence
Definition: lessfs.c:101
dbdirent
TCBDB * dbdirent
Definition: lib_common.c:104
watchdir
void watchdir()
Definition: lib_repl.c:510
REPLDELETECURKEY
#define REPLDELETECURKEY
Definition: lib_repl.h:16
repl_lock
void repl_lock(const char *msg)
Definition: lib_common.c:902
DBU
#define DBU
Definition: lib_repl.h:22
die_syserr
#define die_syserr()
Definition: lessfsck.c:71
fullRead
int fullRead(int fd, unsigned char *buf, int len)
Definition: lib_net.c:397
write_replication_data
void write_replication_data(unsigned char db, unsigned char op, char *key, int ksize, char *value, int vsize, int threadnr)
Definition: lib_repl.c:397
s_open2
int s_open2(const char *pathname, int flags, mode_t mode)
Definition: lib_safe.c:489
send_ack
int send_ack()
Definition: lib_repl.c:906
configdata::replication_logfile
char * replication_logfile
Definition: lib_cfg.h:43
end_flush_commit
void end_flush_commit()
Definition: lib_tc.c:1480
workqtree
TCTREE * workqtree
Definition: lib_common.c:108
lib_lzo.h
FREELIST
#define FREELIST
Definition: lib_repl.h:27
WATCHDIR_SLEEPINTERVAL
#define WATCHDIR_SLEEPINTERVAL
Definition: lib_repl.h:36
REPLICATIONMSG::operation
unsigned char operation
Definition: lib_repl.h:4
write_repl_data
void write_repl_data(unsigned char db, unsigned char op, char *key, int ksize, char *value, int vsize, int threadnr)
Definition: lib_repl.c:450
nextoffset
unsigned long long nextoffset
Definition: lib_common.c:90
s_free
#define s_free(mem_ref)
Definition: lib_safe.h:25
write_lock
void write_lock(const char *msg)
Definition: lib_common.c:674
readcachetree
TCTREE * readcachetree
Definition: lib_common.c:109