"Fossies" - the Fresh Open Source Software Archive

Member "lessfs-1.7.0/lessfs.c" (16 Nov 2013, 65926 Bytes) of package /linux/misc/old/lessfs-1.7.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "lessfs.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 1.5.13_vs_1.7.0.

    1 /*
    2  *   Lessfs: A data deduplicating filesystem.
    3  *   Copyright (C) 2008 Mark Ruijter <mruijter@lessfs.com>
    4  *
    5  *   This program is s_free software.
    6  *   You can redistribute lessfs and/or modify it under the terms of either
    7  *   (1) the GNU General Public License; either version 3 of the License,
    8  *   or (at your option) any later version as published by
    9  *   the Free Software Foundation; or (2) obtain a commercial license
   10  *   by contacting the Author.
   11  *
   12  *   This program is distributed in the hope that it will be useful,
   13  *   but WITHOUT ANY WARRANTY;  without even the implied warranty of
   14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See
   15  *   the GNU General Public License for more details.
   16  *
   17  *   You should have received a copy of the GNU General Public License
   18  *   along with this program;  if not, write to the Free Software
   19  *   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
   20  */
   21 
   22 #define FUSE_USE_VERSION 26
   23 #ifdef HAVE_CONFIG_H
   24 #include <config.h>
   25 #endif
   26 
   27 #ifndef LFATAL
   28 #include "lib_log.h"
   29 #endif
   30 
   31 #define LFSVERSION "1.7.0"
   32 
   33 #include <fuse.h>
   34 #include <stdio.h>
   35 #include <string.h>
   36 #include <unistd.h>
   37 #include <fcntl.h>
   38 #include <dirent.h>
   39 #include <errno.h>
   40 #include <malloc.h>
   41 #include <sys/time.h>
   42 #include <sys/stat.h>
   43 #include <sys/types.h>
   44 #include <sys/socket.h>
   45 #include <sys/file.h>
   46 #include <sys/un.h>
   47 #include <stdio.h>
   48 #include <stdlib.h>
   49 #include <pthread.h>
   50 #include <signal.h>
   51 #include <libgen.h>
   52 #include <sys/utsname.h>
   53 #include <sys/vfs.h>
   54 #include <sys/time.h>
   55 #include <sys/resource.h>
   56 #include <sys/types.h>
   57 #include <sys/wait.h>
   58 #include <mhash.h>
   59 #include <tcutil.h>
   60 #include <tchdb.h>
   61 #include <tcbdb.h>
   62 #include <stdlib.h>
   63 #include <stdbool.h>
   64 #include "lib_safe.h"
   65 #include "lib_common.h"
   66 #ifdef LZO
   67 #include "lib_lzo.h"
   68 #endif
   69 #include "lib_qlz.h"
   70 #include "lib_qlz15.h"
   71 #ifdef BERKELEYDB
   72 #include <db.h>
   73 #include "lib_bdb.h"
   74 #else
   75 #ifndef HAMSTERDB
   76 #include "lib_tc.h"
   77 #else
   78 #include "lib_hamster.h"
   79 #endif
   80 #endif
   81 #include "lib_net.h"
   82 #include "file_io.h"
   83 
   84 #ifdef HAVE_SETXATTR
   85 #include <sys/xattr.h>
   86 #endif
   87 #include "lib_cfg.h"
   88 #include "lib_str.h"
   89 #include "lib_repl.h"
   90 #include "retcodes.h"
   91 #ifdef ENABLE_CRYPTO
   92 #include "lib_crypto.h"
   93 #endif
   94 
   95 #include "commons.h"
   96 #ifdef BERKELEYDB
   97 extern char *bdb_lockedby;
   98 #endif
   99 
  100 unsigned long working = 0;
  101 unsigned long sequence = 0;
  102 
  103 void segvExit()
  104 {
  105     LFATAL("Exit caused by segfault!, exitting\n");
  106     db_close(0);
  107     exit(EXIT_OK);
  108 }
  109 
  110 void normalExit()
  111 {
  112     LFATAL("Exit signal received, exitting\n");
  113     db_close(0);
  114     exit(EXIT_OK);
  115 }
  116 
  117 void libSafeExit()
  118 {
  119     db_close(0);
  120     LFATAL("Exit signal received from lib_safe, exitting\n");
  121     exit(EXIT_OK);
  122 }
  123 
  124 int check_path_sanity(const char *path)
  125 {
  126     char *name;
  127     char *p;
  128     char *q;
  129     int retcode = 0;
  130 
  131     FUNC;
  132     LDEBUG("check_path_sanity : %s", path);
  133 
  134     name = s_strdup((char *) path);
  135     p = name;
  136     q = p;
  137     while (1) {
  138         p = strrchr(q, '/');
  139         if (p == NULL)
  140             break;
  141         q = p;
  142         q++;
  143         p[0] = 0;
  144         if (q) {
  145             if (strlen(q) > 255) {
  146                 LDEBUG("check_path_sanity : error q>255");
  147                 retcode = -ENAMETOOLONG;
  148             }
  149         }
  150         q = p--;
  151     }
  152     s_free(name);
  153     EFUNC;
  154     return (retcode);
  155 }
  156 
  157 void dbsync()
  158 {
  159 #ifndef BERKELEYDB
  160 #ifndef HAMSTERDB
  161     tcbdbsync(dbdirent);
  162     tchdbsync(dbu);
  163     tchdbsync(dbb);
  164     tchdbsync(dbp);
  165 #endif
  166 #endif
  167     if ( config->blockdata_io_type == FILE_IO ) {
  168         fsync(fdbdta);
  169     }
  170     if (config->replication == 1 && config->replication_role == 0) {
  171         fsync(frepl);
  172     }
  173 }
  174 
  175 static int lessfs_getattr(const char *path, struct stat *stbuf)
  176 {
  177     int res = 0;
  178     char *bname, *sqstr;
  179     unsigned long sequence;
  180     static int last_sequence = 1;
  181 
  182     FUNC;
  183     LDEBUG("lessfs_getattr %s", path);
  184     res = check_path_sanity(path);
  185     if (res == -ENAMETOOLONG)
  186         return (res);
  187 
  188     get_global_lock((char *) __PRETTY_FUNCTION__);
  189     if (0 == strncmp("/.lessfs/locks/", path, strlen("/.lessfs/locks/"))) {
  190         bname = s_basename((char *) path);
  191         get_inode_lock((char *) __PRETTY_FUNCTION__);
  192         res = lckname_to_stat(bname, stbuf);
  193         release_inode_lock();
  194         s_free(bname);
  195         goto end;
  196     }
  197     res = dbstat(path, stbuf, 1);
  198     if (!inode_isnot_locked(stbuf->st_ino))
  199         stbuf->st_mode ^= S_ISVTX;
  200     LDEBUG("lessfs_getattr : %s size %llu : result %i", path,
  201            (unsigned long long) stbuf->st_size, res);
  202     if (0 == strcmp("/.lessfs/lessfs_stats", path)) {
  203         s_free(config->lfsstats);
  204         config->lfsstats = lessfs_stats();
  205         stbuf->st_size = strlen(config->lfsstats);
  206     }
  207     if (0 == strcmp("/.lessfs/replication/enabled", path))
  208         stbuf->st_size = 2;
  209     if (0 == strcmp("/.lessfs/replication/backlog", path))
  210         stbuf->st_size = 2;
  211     if (0 == strcmp("/.lessfs/replication/rotate_replog", path))
  212         stbuf->st_size = 2;
  213     if (0 == strcmp("/.lessfs/replication/sequence", path)) {
  214         sequence = get_sequence();
  215         sqstr = as_sprintf(__FILE__, __LINE__, "%lu", sequence);
  216         // There is no such thing as invalidate cache for the
  217         // High level API. This is a crude hack to make sure that
  218         // the content is not read from cache.
  219         stbuf->st_size = strlen(sqstr) + last_sequence;
  220         last_sequence++;
  221         if (last_sequence > 2)
  222             last_sequence = 1;
  223         s_free(sqstr);
  224     }
  225   end:
  226     release_global_lock();
  227     return (res);
  228 }
  229 
  230 static int lessfs_access(const char *path, int mask)
  231 {
  232     int res = 0;
  233     FUNC;
  234 // Empty stub
  235     return (res);
  236 }
  237 
  238 static int lessfs_readlink(const char *path, char *buf, size_t size)
  239 {
  240     int res = 0;
  241 
  242     FUNC;
  243 
  244     LDEBUG("lessfs_readlink : size = %i", (int) size);
  245     get_global_lock((char *) __PRETTY_FUNCTION__);
  246     res = fs_readlink(path, buf, size);
  247     release_global_lock();
  248     return (res);
  249 }
  250 
  251 
  252 static int lessfs_readdir(const char *path, void *buf,
  253                           fuse_fill_dir_t filler, off_t offset,
  254                           struct fuse_file_info *fi)
  255 {
  256     int retcode;
  257     get_global_lock((char *) __PRETTY_FUNCTION__);
  258     retcode = fs_readdir(path, buf, filler, offset, fi);
  259     release_global_lock();
  260     return (retcode);
  261 }
  262 
  263 static int lessfs_mknod(const char *path, mode_t mode, dev_t rdev)
  264 {
  265     int retcode = 0;
  266     char *dname;
  267     struct stat stbuf;
  268     time_t thetime;
  269 
  270     FUNC;
  271     if (config->replication == 1 && config->replication_role == 1) {
  272         LFATAL("lessfs_mknod");
  273         return (-EPERM);
  274     }
  275     get_global_lock((char *) __PRETTY_FUNCTION__);
  276     thetime = time(NULL);
  277     dname = s_dirname((char *) path);
  278     retcode = dbstat(dname, &stbuf, 0);
  279     LDEBUG("lessfs_mknod : dbstat returns %i", retcode);
  280     if (0 == retcode) {
  281         dbmknod(path, mode, NULL, rdev);
  282         stbuf.st_ctim.tv_sec = thetime;
  283         stbuf.st_ctim.tv_nsec = 0;
  284         stbuf.st_mtim.tv_sec = thetime;
  285         stbuf.st_mtim.tv_nsec = 0;
  286         retcode = update_stat(dname, &stbuf);
  287         LDEBUG("lessfs_mknod : update_stat returns %i", retcode);
  288         s_free(dname);
  289     }
  290     release_global_lock();
  291     return (retcode);
  292 }
  293 
  294 static int lessfs_mkdir(const char *path, mode_t mode)
  295 {
  296     int ret;
  297     if (config->replication == 1 && config->replication_role == 1) {
  298         return (-EPERM);
  299     }
  300     get_global_lock((char *) __PRETTY_FUNCTION__);
  301     ret = fs_mkdir(path, mode);
  302     release_global_lock();
  303     return (ret);
  304 }
  305 
  306 static int lessfs_unlink(const char *path)
  307 {
  308     int res;
  309     struct stat *stbuf;
  310     FUNC;
  311 
  312     LDEBUG("lessfs_unlink : %s", path);
  313     if (config->replication == 1 && config->replication_role == 1) {
  314         return (-EPERM);
  315     }
  316 // /.lessfs_stats can not be deleted
  317     if (0 == strcmp(path, "/.lessfs/replication/rotate_replog"))
  318         return (0);
  319     if (0 == strcmp(path, "/.lessfs/replication/enabled"))
  320         return (0);
  321     if (0 == strcmp(path, "/.lessfs/replication/backlog"))
  322         return (0);
  323     if (0 == strcmp(path, "/.lessfs/lessfs_stats"))
  324         return (0);
  325     if (0 == strncmp(path, "/.lessfs/locks", strlen("/.lessfs/locks")))
  326         return (0);
  327     stbuf = s_malloc(sizeof(struct stat));
  328     get_global_lock((char *) __PRETTY_FUNCTION__);
  329     res = dbstat(path, stbuf, 0);
  330     if (res != 0) {
  331         release_global_lock();
  332         s_free(stbuf);
  333         return (res);
  334     }
  335     write_lock((char *) __PRETTY_FUNCTION__);
  336     flush_wait(stbuf->st_ino);
  337     purge_read_cache(stbuf->st_ino, 1, (char *) __PRETTY_FUNCTION__);
  338 
  339     meta_lock((char *) __PRETTY_FUNCTION__);
  340     update_filesize_onclose(stbuf->st_ino);
  341     tctreeout(metatree, &stbuf->st_ino, sizeof(unsigned long long));
  342     release_meta_lock();
  343 
  344     if (config->blockdatabs) {
  345         res = db_unlink_file(path);
  346     } else {
  347         res = file_unlink_file(path);
  348     }
  349     if (config->relax == 0)
  350         dbsync();
  351     invalidate_p2i((char *) path);
  352     release_write_lock();
  353     release_global_lock();
  354     s_free(stbuf);
  355     if (config->nospace == ENOSPC)
  356         config->nospace = -ENOSPC;
  357     EFUNC;
  358     return res;
  359 }
  360 
  361 static int lessfs_rmdir(const char *path)
  362 {
  363     int res;
  364     if (config->replication == 1 && config->replication_role == 1) {
  365         return (-EPERM);
  366     }
  367     if (0 == strncmp(path, "/.lessfs/locks", strlen("/.lessfs/locks")))
  368         return (0);
  369     get_global_lock((char *) __PRETTY_FUNCTION__);
  370     res = fs_rmdir(path);
  371     invalidate_p2i((char *) path);
  372     release_global_lock();
  373     return (res);
  374 
  375 }
  376 
  377 static int lessfs_symlink(const char *from, const char *to)
  378 {
  379     int res = 0;
  380 
  381     FUNC;
  382     get_global_lock((char *) __PRETTY_FUNCTION__);
  383     res = fs_symlink((char *) from, (char *) to);
  384     invalidate_p2i((char *) from);
  385     release_global_lock();
  386     return (res);
  387 }
  388 
  389 static int lessfs_rename(const char *from, const char *to)
  390 {
  391     int res = 0;
  392     struct stat stbuf;
  393 
  394     FUNC;
  395     LDEBUG("lessfs_rename : from %s , to %s", from, to);
  396     if (config->replication == 1 && config->replication_role == 1) {
  397         return (-EPERM);
  398     }
  399     get_global_lock((char *) __PRETTY_FUNCTION__);
  400     res = dbstat(from, &stbuf, 0);
  401     if (res == 0) {
  402         update_filesize_onclose(stbuf.st_ino);
  403         if (stbuf.st_nlink > 1 && !S_ISDIR(stbuf.st_mode)) {
  404             res = fs_rename_link(from, to, stbuf);
  405         } else {
  406             res = fs_rename(from, to, stbuf);
  407         }
  408     }
  409     if (S_ISDIR(stbuf.st_mode)) {
  410         erase_p2i();
  411     } else {
  412        invalidate_p2i((char *) from);
  413        invalidate_p2i((char *) to);
  414     }
  415     release_global_lock();
  416     return (res);
  417 }
  418 
  419 static int lessfs_link(const char *from, const char *to)
  420 {
  421     int res = 0;
  422 
  423     FUNC;
  424     if (config->replication == 1 && config->replication_role == 1) {
  425         return (-EPERM);
  426     }
  427     get_global_lock((char *) __PRETTY_FUNCTION__);
  428     res = fs_link((char *) from, (char *) to);
  429     release_global_lock();
  430     return (res);
  431 }
  432 
  433 static int lessfs_chmod(const char *path, mode_t mode)
  434 {
  435     int res;
  436     time_t thetime;
  437     struct stat stbuf;
  438     const char *data;
  439     int vsize;
  440     MEMDDSTAT *memddstat;
  441     DAT *ddbuf;
  442 
  443     FUNC;
  444     if (config->replication == 1 && config->replication_role == 1) {
  445         LFATAL("lessfs_chmod");
  446         return (-EPERM);
  447     }
  448     LDEBUG("lessfs_chmod : %s", path);
  449     get_global_lock((char *) __PRETTY_FUNCTION__);
  450     thetime = time(NULL);
  451     res = dbstat(path, &stbuf, 0);
  452     if (res != 0)
  453         return (res);
  454 
  455 //  Since we use the stickybit to indicate that the file is locked
  456 //  during truncation and deletion we do not allow
  457 //  to be set on a regular file in this situation.
  458     if (config->sticky_on_locked && S_ISREG(stbuf.st_mode)) {
  459         if (S_ISVTX == (S_ISVTX & stbuf.st_mode)) {
  460             stbuf.st_mode = stbuf.st_mode ^ S_ISVTX;
  461         }
  462     }
  463 
  464     meta_lock((char *) __PRETTY_FUNCTION__);
  465     data = tctreeget(metatree, &stbuf.st_ino,
  466                      sizeof(unsigned long long), &vsize);
  467     if (data) {
  468         memddstat = (MEMDDSTAT *) data;
  469         memddstat->stbuf.st_ctim.tv_sec = thetime;
  470         memddstat->stbuf.st_ctim.tv_nsec = 0;
  471         memddstat->stbuf.st_mode = mode;
  472         memddstat->updated = 1;
  473         ddbuf = create_mem_ddbuf(memddstat);
  474         tctreeput(metatree, &stbuf.st_ino,
  475                   sizeof(unsigned long long), (void *) ddbuf->data,
  476                   ddbuf->size);
  477         cache_p2i((char *) path, &memddstat->stbuf);
  478         release_meta_lock();
  479         DATfree(ddbuf);
  480     } else {
  481         stbuf.st_mode = mode;
  482         stbuf.st_ctim.tv_sec = thetime;
  483         stbuf.st_ctim.tv_nsec = 0;
  484         release_meta_lock();
  485         res = update_stat((char *) path, &stbuf);
  486     }
  487     release_global_lock();
  488     return (res);
  489 }
  490 
  491 static int lessfs_chown(const char *path, uid_t uid, gid_t gid)
  492 {
  493     int res;
  494     time_t thetime;
  495     struct stat stbuf;
  496     const char *data;
  497     int vsize;
  498     MEMDDSTAT *memddstat;
  499     DAT *ddbuf;
  500 
  501     FUNC;
  502     if (config->replication == 1 && config->replication_role == 1) {
  503         return (-EPERM);
  504     }
  505     get_global_lock((char *) __PRETTY_FUNCTION__);
  506     thetime = time(NULL);
  507     invalidate_p2i((char *) path);
  508     res = dbstat(path, &stbuf, 0);
  509     if (res != 0)
  510         goto end_unlock;
  511     meta_lock((char *) __PRETTY_FUNCTION__);
  512     data = tctreeget(metatree, &stbuf.st_ino,
  513                      sizeof(unsigned long long), &vsize);
  514     if (data) {
  515         memddstat = (MEMDDSTAT *) data;
  516         memddstat->stbuf.st_ctim.tv_sec = thetime;
  517         memddstat->stbuf.st_ctim.tv_nsec = 0;
  518         memddstat->stbuf.st_uid = uid;
  519         memddstat->stbuf.st_gid = gid;
  520         memddstat->updated = 1;
  521         ddbuf = create_mem_ddbuf(memddstat);
  522         tctreeput(metatree, &stbuf.st_ino,
  523                   sizeof(unsigned long long), (void *) ddbuf->data,
  524                   ddbuf->size);
  525         cache_p2i((char *) path, &memddstat->stbuf);
  526         release_meta_lock();
  527         DATfree(ddbuf);
  528     } else {
  529         if (-1 != uid)
  530             stbuf.st_uid = uid;
  531         if (-1 != gid)
  532             stbuf.st_gid = gid;
  533         stbuf.st_ctim.tv_sec = thetime;
  534         stbuf.st_ctim.tv_nsec = 0;
  535         release_meta_lock();
  536         res = update_stat((char *) path, &stbuf);
  537     }
  538   end_unlock:
  539     release_global_lock();
  540     return (res);
  541 }
  542 
  543 static int lessfs_truncate(const char *path, off_t size)
  544 {
  545     int res = 0;
  546     struct stat *stbuf;
  547     char *bname;
  548     if (config->replication == 1 && config->replication_role == 1) {
  549         if (0 != strcmp(path, "/.lessfs/replication/enabled"))
  550             return (-EPERM);
  551     }
  552     if (0 == strcmp(path, "/.lessfs/replication/sequence"))
  553         return (-EPERM);
  554     LDEBUG("lessfs_truncate : %s truncate to %llu", path, size);
  555     get_global_lock((char *) __PRETTY_FUNCTION__);
  556     bname = s_basename((char *) path);
  557     stbuf = s_malloc(sizeof(struct stat));
  558     res = dbstat(path, stbuf, 0);
  559     if (res != 0) {
  560         release_global_lock();
  561         s_free(stbuf);
  562         return (res);
  563     }
  564     if (S_ISDIR(stbuf->st_mode)) {
  565         release_global_lock();
  566         s_free(stbuf);
  567         return (-EISDIR);
  568     }
  569     /* Flush the blockcache before we continue. */
  570     create_inode_note(stbuf->st_ino);
  571     write_lock((char *) __PRETTY_FUNCTION__);
  572     flush_wait(stbuf->st_ino);
  573     purge_read_cache(stbuf->st_ino, 1, (char *) __PRETTY_FUNCTION__);
  574     if (size < stbuf->st_size) {
  575         if (config->blockdatabs) {
  576             res = db_fs_truncate(stbuf, size, bname, 0);
  577         } else {
  578             res = file_fs_truncate(stbuf, size, bname, 0);
  579         }
  580     } else {
  581         LDEBUG("lessfs_truncate : %s only change size to %llu", path,
  582                size);
  583         update_filesize_cache(stbuf, size);
  584         delete_inode_note(stbuf->st_ino);
  585     }
  586     invalidate_p2i((char *) path);
  587     release_write_lock();
  588     s_free(stbuf);
  589     s_free(bname);
  590     release_global_lock();
  591     if (config->nospace == ENOSPC)
  592         config->nospace = -ENOSPC;
  593     return (res);
  594 }
  595 
  596 static int lessfs_utimens(const char *path, const struct timespec ts[2])
  597 {
  598     int res = 0;
  599     struct stat stbuf;
  600     DDSTAT *ddstat;
  601     DAT *ddbuf;
  602     DAT *dskdata;
  603     const char *data;
  604     int vsize;
  605     MEMDDSTAT *memddstat;
  606 
  607     FUNC;
  608     get_global_lock((char *) __PRETTY_FUNCTION__);
  609     res = dbstat(path, &stbuf, 0);
  610     if (res != 0)
  611         goto out;
  612     data = tctreeget(metatree, &stbuf.st_ino,
  613                      sizeof(unsigned long long), &vsize);
  614     if (data) {
  615         memddstat = (MEMDDSTAT *) data;
  616         memddstat->stbuf.st_atim.tv_sec = ts[0].tv_sec;
  617         memddstat->stbuf.st_atim.tv_nsec = ts[0].tv_nsec;
  618         memddstat->stbuf.st_mtim.tv_sec = ts[1].tv_sec;
  619         memddstat->stbuf.st_mtim.tv_nsec = ts[1].tv_nsec;
  620         memddstat->updated = 1;
  621         ddbuf = create_mem_ddbuf(memddstat);
  622         tctreeput(metatree, &stbuf.st_ino,
  623                   sizeof(unsigned long long), (void *) ddbuf->data,
  624                   ddbuf->size);
  625         cache_p2i((char *) path, &memddstat->stbuf);
  626         DATfree(ddbuf);
  627     } else {
  628         dskdata =
  629             search_dbdata(DBP, &stbuf.st_ino, sizeof(unsigned long long),
  630                           LOCK);
  631         if (dskdata == NULL) {
  632 #ifdef x86_64
  633             die_dataerr("Failed to find file %lu", stbuf.st_ino);
  634 #else
  635             die_dataerr("Failed to find file %llu", stbuf.st_ino);
  636 #endif
  637         }
  638         ddstat = value_to_ddstat(dskdata);
  639         ddstat->stbuf.st_atim.tv_sec = ts[0].tv_sec;
  640         ddstat->stbuf.st_atim.tv_nsec = ts[0].tv_nsec;
  641         ddstat->stbuf.st_mtim.tv_sec = ts[1].tv_sec;
  642         ddstat->stbuf.st_mtim.tv_nsec = ts[1].tv_nsec;
  643         ddbuf =
  644             create_ddbuf(ddstat->stbuf, ddstat->filename,
  645                          ddstat->real_size);
  646         bin_write_dbdata(DBP, &stbuf.st_ino, sizeof(unsigned long long),
  647                          (void *) ddbuf->data, ddbuf->size);
  648         cache_p2i((char *) path, &ddstat->stbuf);
  649         DATfree(dskdata);
  650         DATfree(ddbuf);
  651         ddstatfree(ddstat);
  652     }
  653   out:
  654     release_global_lock();
  655     return (res);
  656 }
  657 
  658 unsigned long long get_real_size(unsigned long long inode)
  659 {
  660     DAT *data;
  661     DDSTAT *ddstat;
  662     unsigned long long real_size = 0;
  663 
  664     data = search_dbdata(DBP, &inode, sizeof(unsigned long long), LOCK);
  665     if (NULL == data) {
  666         return (real_size);
  667     }
  668     ddstat = value_to_ddstat(data);
  669     DATfree(data);
  670     real_size = ddstat->real_size;
  671     ddstatfree(ddstat);
  672     return (real_size);
  673 }
  674 
  675 static int lessfs_open(const char *path, struct fuse_file_info *fi)
  676 {
  677     struct stat stbuf;
  678     char *bname;
  679     DAT *ddbuf;
  680     const char *dataptr;
  681     unsigned long long blocknr = 0;
  682     MEMDDSTAT *memddstat;
  683     unsigned long long inode;
  684     int vsize;
  685 
  686     int res = 0;
  687 
  688     FUNC;
  689     LDEBUG("lessfs_open : %s strlen %i : uid %u", path,
  690            strlen((char *) path), fuse_get_context()->uid);
  691 
  692     get_global_lock((char *) __PRETTY_FUNCTION__);
  693     res = dbstat(path, &stbuf, 0);
  694     if (res == -ENOENT) {
  695         fi->fh = 0;
  696         stbuf.st_mode = fi->flags;
  697         stbuf.st_uid = fuse_get_context()->uid;
  698         stbuf.st_gid = fuse_get_context()->gid;
  699         LDEBUG("lessfs_open %s is a new file", path);
  700     } else {
  701         fi->fh = stbuf.st_ino;
  702         inode = stbuf.st_ino;
  703         bname = s_basename((char *) path);
  704 //  Check if we have already something in cache for this inode
  705         meta_lock((char *) __PRETTY_FUNCTION__);
  706         dataptr =
  707             tctreeget(metatree, &stbuf.st_ino,
  708                       sizeof(unsigned long long), &vsize);
  709         if (dataptr == NULL) {
  710             blocknr--;          /* Invalid block in cache */
  711 //  Initialize the cache
  712             memddstat = s_malloc(sizeof(MEMDDSTAT));
  713             memcpy(&memddstat->stbuf, &stbuf, sizeof(struct stat));
  714             memcpy(&memddstat->filename, bname, strlen(bname) + 1);
  715             memddstat->blocknr = blocknr;
  716             memddstat->updated = 0;
  717             memddstat->real_size = get_real_size(stbuf.st_ino);
  718             memddstat->opened = 1;
  719             memddstat->stbuf.st_atim.tv_sec = time(NULL);
  720             memddstat->stbuf.st_atim.tv_nsec = 0;
  721             LDEBUG("lessfs_open : initial open memddstat->opened = %u",
  722                    memddstat->opened);
  723             LDEBUG("memddstat->stbuf.st_atim.tv_sec = %lu",
  724                    memddstat->stbuf.st_atim.tv_sec);
  725             ddbuf = create_mem_ddbuf(memddstat);
  726             tctreeput(metatree, &inode, sizeof(unsigned long long),
  727                       (void *) ddbuf->data, ddbuf->size);
  728             DATfree(ddbuf);
  729             memddstatfree(memddstat);
  730         } else {
  731             memddstat = (MEMDDSTAT *) dataptr;
  732             memddstat->opened++;
  733             memddstat->stbuf.st_atim.tv_sec = time(NULL);
  734             memddstat->stbuf.st_atim.tv_nsec = 0;
  735             LDEBUG("lessfs_open : repeated open ddstat->opened = %u %lu",
  736                    memddstat->opened, memddstat->stbuf.st_atim.tv_sec);
  737             ddbuf = create_mem_ddbuf(memddstat);
  738             tctreeput(metatree, &inode, sizeof(unsigned long long),
  739                       (void *) ddbuf->data, ddbuf->size);
  740             DATfree(ddbuf);
  741         }
  742         release_meta_lock();
  743         s_free(bname);
  744     }
  745     release_global_lock();
  746     EFUNC;
  747     return (res);
  748 }
  749 
  750 static int lessfs_read(const char *path, char *buf, size_t size,
  751                        off_t offset, struct fuse_file_info *fi)
  752 {
  753     unsigned long long blocknr;
  754     size_t done = 0;
  755     size_t got = 0;
  756     size_t block_offset = 0;
  757     struct stat stbuf;
  758     unsigned long sequence;
  759 
  760     FUNC;
  761 
  762     get_global_lock((char *) __PRETTY_FUNCTION__);
  763     write_lock((char *) __PRETTY_FUNCTION__);
  764     memset(buf, 0, size);
  765     LDEBUG("lessfs_read called offset : %llu, size bytes %llu",
  766            (unsigned long long) offset, (unsigned long long) size);
  767     if (fi->fh == 10) {
  768         strncpy(buf, &config->lfsstats[offset], size);
  769         done = size;
  770         goto end_error;
  771     }
  772     if (fi->fh == 14) {
  773         sprintf(buf, "%i\n", config->replication_enabled);
  774         done = size;
  775         goto end_error;
  776     }
  777     if (fi->fh == 15) {
  778         sprintf(buf, "%i\n", config->replication_backlog);
  779         done = size;
  780         goto end_error;
  781     }
  782     if (fi->fh == 16) {
  783         sequence = get_sequence();
  784         memset(buf, 0, size);
  785         sprintf(buf, "%lu\n", sequence);
  786         done = size;
  787         goto end_error;
  788     }
  789     if (fi->fh == 17) {
  790         sprintf(buf, "%u\n", 0);
  791         done = size;
  792         goto end_error;
  793     }
  794 // Change this to creating a buffer that is allocated once.
  795 // Disable reads beyond EOF
  796     if (get_realsize_fromcache(fi->fh, &stbuf)) {
  797         LDEBUG("lessfs_read : get_realsize_fromcache returns %llu",
  798                (unsigned long long) stbuf.st_size);
  799         if (offset + size > stbuf.st_size) {
  800             if (offset > stbuf.st_size)
  801                 goto end_error;
  802             size = stbuf.st_size - offset;
  803         }
  804     }
  805     while (done < size) {
  806         blocknr = (offset + done) / BLKSIZE;
  807         block_offset = (done + offset) - (blocknr * BLKSIZE);
  808         if (config->blockdatabs) {
  809             got =
  810                 readBlock(blocknr, buf + done, size - done, block_offset,
  811                           fi->fh);
  812         } else {
  813             got =
  814                 file_read_block(blocknr, buf + done, size - done,
  815                                 block_offset, fi->fh);
  816         }
  817         done = done + BLKSIZE - block_offset;
  818         if (done > size)
  819             done = size;
  820     }
  821   end_error:
  822     release_write_lock();
  823     release_global_lock();
  824     return (done);
  825 }
  826 
  827 CCACHEDTA *update_stored(unsigned char *hash, INOBNO * inobno,
  828                          off_t offsetblock)
  829 {
  830     DAT *data;
  831     DAT *uncompdata;
  832     CCACHEDTA *ccachedta;
  833 
  834     ccachedta = s_zmalloc(sizeof(CCACHEDTA));
  835     ccachedta->creationtime = time(NULL);
  836     ccachedta->dirty = 1;
  837     ccachedta->pending = 0;
  838     ccachedta->newblock = 0;
  839 
  840     data = search_dbdata(DBDTA, hash, config->hashlen, LOCK);
  841     if (NULL == data) {
  842         s_free(ccachedta);
  843         return NULL;
  844     }
  845     uncompdata = lfsdecompress(data);
  846     LDEBUG("got uncompsize : %lu", uncompdata->size);
  847     memcpy(&ccachedta->data, uncompdata->data, uncompdata->size);
  848     ccachedta->datasize = uncompdata->size;
  849     ccachedta->updated = data->size;
  850     DATfree(uncompdata);
  851     DATfree(data);
  852     db_delete_stored(inobno);
  853     EFUNC;
  854     return ccachedta;
  855 }
  856 
  857 void add2cache(INOBNO * inobno, const char *buf, off_t offsetblock,
  858                size_t bsize)
  859 {
  860     CCACHEDTA *ccachedta;
  861     uintptr_t p;
  862     int size;
  863     char *key;
  864     DAT *data;
  865     bool found = 0;
  866 
  867 
  868   pending:
  869     write_lock((char *) __PRETTY_FUNCTION__);
  870     key =
  871         (char *) tctreeget(readcachetree, (void *) inobno, sizeof(INOBNO),
  872                            &size);
  873     if (key) {
  874         memcpy(&p, key, size);
  875         ccachedta = get_ccachedta(p);
  876         while (ccachedta->pending == 1) {
  877             release_write_lock();
  878             goto pending;
  879         }
  880         ccachedta->creationtime = time(NULL);
  881         ccachedta->dirty = 1;
  882         memcpy((void *) &ccachedta->data[offsetblock], buf, bsize);
  883         if (ccachedta->datasize < offsetblock + bsize)
  884             ccachedta->datasize = offsetblock + bsize;
  885         update_filesize(inobno->inode, bsize, offsetblock,
  886                         inobno->blocknr);
  887         release_write_lock();
  888         return;
  889     }
  890     update_filesize(inobno->inode, bsize, offsetblock, inobno->blocknr);
  891     if (bsize < BLKSIZE) {
  892         data = check_block_exists(inobno);
  893         if (data) {
  894             create_hash_note(data->data);
  895             if (NULL == config->blockdatabs) {
  896                 ccachedta =
  897                     file_update_stored(data->data, inobno, offsetblock);
  898             } else {
  899                 ccachedta =
  900                     update_stored((unsigned char *) data->data, inobno,
  901                                   offsetblock);
  902             }
  903             delete_hash_note(data->data);
  904             if (ccachedta) {
  905                 memcpy(&ccachedta->data[offsetblock], buf, bsize);
  906                 if (ccachedta->datasize < offsetblock + bsize)
  907                     ccachedta->datasize = offsetblock + bsize;
  908                 found = 1;
  909             }
  910             DATfree(data);
  911         }
  912     }
  913 
  914     if (!found) {
  915         ccachedta = s_zmalloc(sizeof(CCACHEDTA));
  916         memcpy(&ccachedta->data[offsetblock], buf, bsize);
  917         ccachedta->datasize = offsetblock + bsize;
  918         ccachedta->creationtime = time(NULL);
  919         ccachedta->dirty = 1;
  920         ccachedta->pending = 0;
  921         ccachedta->newblock = 1;
  922     }
  923 
  924     if (tctreernum(workqtree) * 2 > config->cachesize ||
  925         tctreernum(readcachetree) * 2 > config->cachesize) {
  926         flush_wait(0);
  927         purge_read_cache(0, 1, (char *) __PRETTY_FUNCTION__);
  928     }
  929     p = (uintptr_t) ccachedta;
  930 // A full block can be processed right away.
  931     if (bsize == BLKSIZE) {
  932         ccachedta->pending = 1;
  933         tctreeput(workqtree, (void *) inobno, sizeof(INOBNO), (void *) &p,
  934                   sizeof(p));
  935     }
  936 // When this is not a full block a separate thread is used.
  937     tctreeput(readcachetree, (void *) inobno, sizeof(INOBNO), (void *) &p,
  938               sizeof(p));
  939     release_write_lock();
  940     return;
  941 }
  942 
  943 void flush_rotate_replog()
  944 {
  945    write_lock((char *) __PRETTY_FUNCTION__);
  946    flush_wait(0);
  947    purge_read_cache(0, 1, (char *) __PRETTY_FUNCTION__);
  948    if ( config->blockdata_io_type == CHUNK_IO ) sync();
  949    start_flush_commit();
  950    end_flush_commit();
  951    config->replication_last_rotated = time(NULL);
  952    rotate_replog();
  953    release_repl_lock();
  954    release_write_lock();
  955 }
  956 
  957 static int lessfs_write(const char *path, const char *buf, size_t size,
  958                         off_t offset, struct fuse_file_info *fi)
  959 {
  960     off_t offsetblock;
  961     size_t bsize;
  962     size_t done = 0;
  963     INOBNO inobno;
  964 
  965     FUNC;
  966     if (config->nospace == ENOSPC)
  967         return (-ENOSPC);
  968     wait_inode_pending2(fi->fh);
  969     get_global_lock((char *) __PRETTY_FUNCTION__);
  970     if (fi->fh == 14) {
  971         if (0 == memcmp(buf, "0", 1)) {
  972             LINFO("Replication is now disabled");
  973             config->replication_enabled = 0;
  974         }
  975         if (0 == memcmp(buf, "1", 1)) {
  976             LINFO("Replication is now enabled");
  977             config->replication_enabled = 1;
  978         }
  979         goto end_exit;
  980     }
  981     if (fi->fh == 16) {
  982         size = -EPERM;
  983         goto end_exit;
  984     }
  985     if (fi->fh == 17) {
  986         if (config->replication == 1 && config->replication_role == 0) {
  987           if (0 == memcmp(buf, "1", 1)) {
  988             LINFO("Rotated replog after receiving rotation request"); 
  989             flush_rotate_replog();   
  990           }
  991         } else size = -EPERM;
  992         goto end_exit;
  993     }
  994     if (config->replication == 1 && config->replication_role == 1) {
  995         size = -EPERM;
  996         goto end_exit;
  997     }
  998     LDEBUG("lessfs_write offset %llu size %lu", offset,
  999            (unsigned long) size);
 1000     inobno.inode = fi->fh;
 1001     while (1) {
 1002         inobno.blocknr = (offset + done) / BLKSIZE;
 1003         offsetblock = offset + done - (inobno.blocknr * BLKSIZE);
 1004         bsize = size - done;
 1005         if (bsize + offsetblock > BLKSIZE)
 1006             bsize = BLKSIZE - offsetblock;
 1007         //Just put it in cache. We might need to fetch the block.
 1008         //This is done by add2cache.
 1009         add2cache(&inobno, buf + done, offsetblock, bsize);
 1010         done = done + bsize;
 1011         bsize = size - done;
 1012         if (done >= size)
 1013             break;
 1014     }
 1015   end_exit:
 1016     release_global_lock();
 1017     EFUNC;
 1018     return (size);
 1019 }
 1020 
 1021 static int lessfs_statfs(const char *path, struct statvfs *stbuf)
 1022 {
 1023     int res;
 1024     char *blockdatadir;
 1025 
 1026     if (config->blockdatabs) {
 1027         res = statvfs(config->blockdata, stbuf);
 1028     } else {
 1029         blockdatadir = s_dirname(config->blockdata);
 1030         res = statvfs(blockdatadir, stbuf);
 1031         s_free(blockdatadir);
 1032     }
 1033     if (res == -1)
 1034         return -errno;
 1035     return 0;
 1036 }
 1037 
 1038 static int lessfs_release(const char *path, struct fuse_file_info *fi)
 1039 {
 1040     const char *dataptr;
 1041     int vsize;
 1042     MEMDDSTAT *memddstat;
 1043     DAT *ddbuf;
 1044 
 1045     FUNC;
 1046     LDEBUG("lessfs_release");
 1047     get_global_lock((char *) __PRETTY_FUNCTION__);
 1048     meta_lock((char *) __PRETTY_FUNCTION__);
 1049     dataptr =
 1050         tctreeget(metatree, &fi->fh, sizeof(unsigned long long), &vsize);
 1051     if (dataptr) {
 1052         memddstat = (MEMDDSTAT *) dataptr;
 1053         if (memddstat->opened == 1) {
 1054             // Update the filesize when needed.
 1055             update_filesize_onclose(fi->fh);
 1056             tctreeout(metatree, &fi->fh, sizeof(unsigned long long));
 1057             invalidate_p2i((char *) path);
 1058             LDEBUG("lessfs_release : Delete cache for %llu", fi->fh);
 1059         } else {
 1060             memddstat->opened--;
 1061             ddbuf = create_mem_ddbuf(memddstat);
 1062             LDEBUG("lessfs_release : %llu is now %u open", fi->fh,
 1063                    memddstat->opened);
 1064             tctreeput(metatree, &fi->fh, sizeof(unsigned long long),
 1065                       (void *) ddbuf->data, ddbuf->size);
 1066             DATfree(ddbuf);
 1067         }
 1068     }
 1069     (void) path;
 1070     (void) fi;
 1071     release_meta_lock();
 1072     release_global_lock();
 1073     EFUNC;
 1074     return 0;
 1075 }
 1076 
 1077 static int lessfs_fsync(const char *path, int isdatasync,
 1078                         struct fuse_file_info *fi)
 1079 {
 1080     FUNC;
 1081     (void) path;
 1082     (void) isdatasync;
 1083     get_global_lock((char *) __PRETTY_FUNCTION__);
 1084     write_lock((char *) __PRETTY_FUNCTION__);
 1085     flush_wait(fi->fh);
 1086     purge_read_cache(fi->fh,0,(char *)__PRETTY_FUNCTION__);
 1087     release_write_lock();
 1088     /* Living on the edge, wait for pending I/O but do not flush the caches. */
 1089     if (config->relax < 2) {
 1090         update_filesize_onclose(fi->fh);
 1091     }
 1092     /* When config->relax == 1 dbsync is not called but the caches within lessfs
 1093        are flushed making this a safer option. */
 1094     if (config->relax == 0)
 1095         dbsync();
 1096     release_global_lock();
 1097     EFUNC;
 1098     return 0;
 1099 }
 1100 
 1101 static void lessfs_destroy(void *unused __attribute__ ((unused)))
 1102 {
 1103     FUNC;
 1104     LINFO("Lessfs is going down, please wait");
 1105     config->shutdown = 1;
 1106     if (config->replication_enabled) {
 1107         LFATAL("Wait for active replication threads to shutdown");
 1108         while (!config->safe_down) {
 1109             sleep(1);
 1110         }
 1111         LFATAL("No active replication threads, going down");
 1112         sleep(1);
 1113     }
 1114     get_global_lock((char *) __PRETTY_FUNCTION__);
 1115     write_lock((char *) __PRETTY_FUNCTION__);
 1116     purge_read_cache(0, 1, (char *) __PRETTY_FUNCTION__);
 1117     flush_wait(0);
 1118     if (0 != tctreernum(workqtree) || 0 != tctreernum(readcachetree))
 1119         LFATAL("Going down with data in queue, this should never happen");
 1120     release_write_lock();
 1121     release_global_lock();
 1122     config->replication = 0;
 1123     if (config->transactions)
 1124         lessfs_trans_stamp();
 1125     clear_dirty();
 1126     db_close(0);
 1127     s_free(config->lfsstats);
 1128 #ifdef ENABLE_CRYPTO
 1129     if (config->encryptdata) {
 1130         s_free(config->passwd);
 1131         s_free(config->iv);
 1132     }
 1133 #endif
 1134     if (config->transactions)
 1135         free(config->commithash);
 1136     if (config->replication_watchdir)
 1137         s_free(config->replication_watchdir);
 1138     s_free(config);
 1139 #ifdef MEMTRACE
 1140     leak_report();
 1141 #endif
 1142     LFATAL("Lessfs is down");
 1143     EFUNC;
 1144     return;
 1145 }
 1146 
 1147 /*  Return 0 when enough space s_free
 1148     Return 1 at MIN_SPACE_FREE
 1149     Return 2 at MIN_SPACE_CLEAN 
 1150     Return 3 at MIN_SPACE_FREE+3% 
 1151 */
 1152 int check_free_space(char *dbpath)
 1153 {
 1154     float dfree;
 1155     int mf, mc, sr;
 1156     char *minfree;
 1157     char *minclean;
 1158     char *startreclaim;
 1159     struct statfs sb;
 1160 
 1161     if (-1 == statfs(dbpath, &sb))
 1162         die_dataerr("Failed to stat : %s", dbpath);
 1163     dfree = (float) sb.f_blocks / (float) sb.f_bfree;
 1164     dfree = 100 / dfree;
 1165     minfree = getenv("MIN_SPACE_FREE");
 1166     minclean = getenv("MIN_SPACE_CLEAN");
 1167     if (minfree) {
 1168         mf = atoi(minfree);
 1169         if (mf > 100 || mf < 1)
 1170             mf = 10;
 1171     } else {
 1172         mf = 10;
 1173     }
 1174     startreclaim=getenv("START_RECLAIM");
 1175     if (startreclaim ) {
 1176         sr = atoi(startreclaim);
 1177         if (sr > 100 || sr < 1)
 1178             sr = 15;
 1179     }  else sr=1;
 1180     if ( 100-dfree >= sr ) {
 1181       config->reclaim=1;
 1182     } else config->reclaim=0;
 1183     /* Freeze has a higher prio then clean */
 1184     if (dfree <= mf) {
 1185         return (1);
 1186     }
 1187     if (minclean) {
 1188         mc = atoi(minclean);
 1189         if (mc > 100 || mc < 1)
 1190             mc = 15;
 1191     } else {
 1192         mc = 0;
 1193     }
 1194     if (dfree <= mc) {
 1195         return (2);
 1196     }
 1197     if (dfree <= mf + 1)
 1198         return (3);
 1199     return (0);
 1200 }
 1201 
 1202 void freeze_nospace(char *dbpath)
 1203 {
 1204     config->frozen = 1;
 1205     get_global_lock((char *) __PRETTY_FUNCTION__);
 1206     write_lock((char *) __PRETTY_FUNCTION__);
 1207     flush_wait(0);
 1208     purge_read_cache(0, 1, (char *) __PRETTY_FUNCTION__);
 1209     if (config->nospace != 0) {
 1210         release_write_lock();
 1211         release_global_lock();
 1212         if (config->nospace == 1) {
 1213             config->nospace = -ENOSPC;
 1214             LFATAL
 1215                 ("Filesystem for database %s has insufficient space to continue, ENOSPC",
 1216                  dbpath);
 1217             LINFO
 1218                 ("Writes will return ENOSPC when new blocks need to be allocated.");
 1219         }
 1220         return;
 1221     } else {
 1222         LFATAL
 1223             ("Filesystem for database %s has insufficient space to continue, freezing I/O",
 1224              dbpath);
 1225         LFATAL("All IO is now suspended until space comes available.");
 1226         LFATAL
 1227             ("If no other options are available it should be safe to kill lessfs.");
 1228     }
 1229     while (1) {
 1230         if (0 == check_free_space(dbpath)) {
 1231             if (config->nospace == 0) {
 1232                 config->frozen = 0;
 1233                 release_write_lock();
 1234                 release_global_lock();
 1235             }
 1236             break;
 1237         }
 1238         sleep(1);
 1239     }
 1240     if (config->nospace == 0)
 1241         LFATAL("Resuming IO : sufficient space available.");
 1242 }
 1243 
 1244 void exec_clean_program()
 1245 {
 1246     char *program;
 1247     pid_t rpid;
 1248     int res = 1;
 1249     /* We can read and log output from the program. */
 1250     int commpipe[2];
 1251 
 1252     FUNC;
 1253 
 1254     program = getenv("CLEAN_PROGRAM");
 1255     if (NULL == program) {
 1256         LINFO("CLEAN_PROGRAM is not defined");
 1257         return;
 1258     }
 1259 
 1260     LINFO("exec_clean_program : execute %s", program);
 1261     if (pipe(commpipe) == -1) {
 1262         LFATAL("Failed to open pipe.");
 1263         return;
 1264     }
 1265     /* Attempt to fork and check for errors */
 1266     if ((rpid = fork()) == -1) {
 1267         LFATAL("Fork error. Exiting.\n");       /* something went wrong */
 1268         return;
 1269     }
 1270     if (rpid) {
 1271         /* A positive (non-negative) PID indicates the parent process */
 1272         dup2(commpipe[0], 0);   /* Replace stdout with out side of the pipe */
 1273         close(commpipe[1]);     /* Close unused side of pipe (in side) */
 1274         setvbuf(stdout, (char *) NULL, _IONBF, 0);      /* Set non-buffered output on stdout */
 1275         wait(&res);
 1276     } else {
 1277         /* A zero PID indicates that this is the child process */
 1278         dup2(commpipe[1], 1);   /* Replace stdin with the in side of the pipe */
 1279         close(commpipe[0]);     /* Close unused side of pipe (out side) */
 1280         /* Replace the child fork with a new process */
 1281         if (execl(program, program, NULL) == -1) {
 1282             LFATAL("exec_clean_program : execl %s failed", program);
 1283             exit(-1);
 1284         }
 1285     }
 1286     return;
 1287     EFUNC;
 1288 }
 1289 
 1290 /* This thread does general housekeeping.
 1291    For now it checks that there is enough diskspace s_free for
 1292    the databases. If not it syncs the database, 
 1293    closes them and freezes I/O. */
 1294 void *housekeeping_worker(void *arg)
 1295 {
 1296     char *dbpath = NULL;
 1297     int result;
 1298     int count = 0;
 1299 
 1300     while (1) {
 1301         while (count <= 6) {
 1302             switch (count) {
 1303 #ifdef BERKELEYDB
 1304             case 0:
 1305                 dbpath =
 1306                     as_sprintf(__FILE__, __LINE__, "%s/metadata.db",
 1307                                config->meta);
 1308                 count = 7;
 1309                 break;
 1310             case 1:
 1311                 if ( config->blockdata_io_type == FILE_IO ) {
 1312                     dbpath = s_strdup(config->blockdata);
 1313                 }
 1314                 count = 7;
 1315                 break;
 1316 #else
 1317 #ifdef HAMSTERDB
 1318             case 0:
 1319                 dbpath =
 1320                     as_sprintf(__FILE__, __LINE__, "%s/lessfs.db",
 1321                                config->meta);
 1322                 count = 7;
 1323                 break;
 1324             case 1:
 1325                 if ( config->blockdata_io_type == FILE_IO ) {
 1326                     dbpath = s_strdup(config->blockdata);
 1327                 }
 1328                 count = 7;
 1329                 break;
 1330 #else
 1331             case 0:
 1332                 dbpath =
 1333                     as_sprintf(__FILE__, __LINE__, "%s/fileblock.tch",
 1334                                config->fileblock);
 1335                 break;
 1336             case 1:
 1337                 dbpath =
 1338                     as_sprintf(__FILE__, __LINE__, "%s/blockusage.tch",
 1339                                config->blockusage);
 1340                 break;
 1341             case 2:
 1342                 dbpath =
 1343                     as_sprintf(__FILE__, __LINE__, "%s/metadata.tcb",
 1344                                config->meta);
 1345                 break;
 1346             case 3:
 1347                 if (config->blockdatabs) {
 1348                     dbpath =
 1349                         as_sprintf(__FILE__, __LINE__, "%s/blockdata.tch",
 1350                                    config->blockdata);
 1351                 } else {
 1352                     dbpath = s_strdup(config->blockdata);
 1353                 }
 1354                 break;
 1355             case 4:
 1356                 dbpath =
 1357                     as_sprintf(__FILE__, __LINE__, "%s/symlink.tch",
 1358                                config->symlink);
 1359                 break;
 1360             case 5:
 1361                 dbpath =
 1362                     as_sprintf(__FILE__, __LINE__, "%s/dirent.tcb",
 1363                                config->dirent);
 1364                 break;
 1365             case 6:
 1366                 dbpath =
 1367                     as_sprintf(__FILE__, __LINE__, "%s/hardlink.tcb",
 1368                                config->hardlink);
 1369                 break;
 1370 #endif
 1371 #endif
 1372             default:
 1373                 break;
 1374             }
 1375             result = check_free_space(dbpath);
 1376             switch (result) {
 1377             case 1:
 1378                 freeze_nospace(dbpath);
 1379                 break;
 1380             case 2:
 1381                 exec_clean_program();
 1382                 break;
 1383             case 3:
 1384                 config->nospace = RECLAIM_AGRESSIVE;
 1385             default:
 1386                 break;
 1387             }
 1388             s_free(dbpath);
 1389             count++;
 1390             sleep(config->inspectdiskinterval);
 1391         }
 1392         count = 0;
 1393         repl_lock((char *) __PRETTY_FUNCTION__);
 1394         release_repl_lock();
 1395     }
 1396     return NULL;
 1397 }
 1398 
 1399 void show_lock_status(int csocket)
 1400 {
 1401     char *msg;
 1402     timeoutWrite(3, csocket,
 1403                  "---------------------\n",
 1404                  strlen("---------------------\n"));
 1405     timeoutWrite(3, csocket,
 1406                  "normally unset\n\n", strlen("normally unset\n\n"));
 1407     if (0 != try_global_lock()) {
 1408         msg =
 1409             as_sprintf(__FILE__, __LINE__, "global_lock : 1 (set) by %s\n",
 1410                        global_lockedby);
 1411         timeoutWrite(3, csocket, msg, strlen(msg));
 1412         s_free(msg);
 1413     } else {
 1414         release_global_lock();
 1415         timeoutWrite(3, csocket,
 1416                      "global_lock : 0 (not set)\n",
 1417                      strlen("global_lock : 0 (not set)\n"));
 1418     }
 1419     if (0 != try_meta_lock()) {
 1420         msg =
 1421             as_sprintf(__FILE__, __LINE__, "meta_lock : 1 (set) by %s\n",
 1422                        meta_lockedby);
 1423         timeoutWrite(3, csocket, msg, strlen(msg));
 1424         s_free(msg);
 1425     } else {
 1426         release_meta_lock();
 1427         timeoutWrite(3, csocket,
 1428                      "meta_lock : 0 (not set)\n",
 1429                      strlen("meta_lock : 0 (not set)\n"));
 1430     }
 1431     if (0 != try_write_lock()) {
 1432         msg =
 1433             as_sprintf(__FILE__, __LINE__, "write_lock : 1 (set) by %s\n",
 1434                        write_lockedby);
 1435         timeoutWrite(3, csocket, msg, strlen(msg));
 1436         s_free(msg);
 1437     } else {
 1438         release_write_lock();
 1439         timeoutWrite(3, csocket,
 1440                      "write_lock : 0 (not set)\n",
 1441                      strlen("write_lock : 0 (not set)\n"));
 1442     }
 1443 #ifdef BERKELEYDB
 1444     if (0 != try_bdb_lock()) {
 1445         msg =
 1446             as_sprintf(__FILE__, __LINE__, "bdb_lock : 1 (set) by %s\n",
 1447                        bdb_lockedby);
 1448         timeoutWrite(3, csocket, msg, strlen(msg));
 1449         s_free(msg);
 1450     } else {
 1451         release_bdb_lock();
 1452         timeoutWrite(3, csocket,
 1453                      "bdb_lock : 0 (not set)\n",
 1454                      strlen("bdb_lock : 0 (not set)\n"));
 1455     }
 1456 #endif
 1457     if (0 != try_offset_lock()) {
 1458         msg =
 1459             as_sprintf(__FILE__, __LINE__, "offset_lock : 1 (set) by %s\n",
 1460                        offset_lockedby);
 1461         timeoutWrite(3, csocket, msg, strlen(msg));
 1462         s_free(msg);
 1463     } else {
 1464         release_offset_lock();
 1465         timeoutWrite(3, csocket,
 1466                      "offset_lock : 0 (not set)\n",
 1467                      strlen("offset_lock : 0 (not set)\n"));
 1468     }
 1469     if (0 != try_hash_lock()) {
 1470         msg =
 1471             as_sprintf(__FILE__, __LINE__, "hash_lock : 1 (set) by %s\n",
 1472                        hash_lockedby);
 1473         timeoutWrite(3, csocket, msg, strlen(msg));
 1474         s_free(msg);
 1475     } else {
 1476         release_hash_lock();
 1477         timeoutWrite(3, csocket,
 1478                      "hash_lock : 0 (not set)\n",
 1479                      strlen("hash_lock : 0 (not set)\n"));
 1480     }
 1481     if (0 != try_cachep2i_lock()) {
 1482         msg =
 1483             as_sprintf(__FILE__, __LINE__,
 1484                        "cachep2i_lock : 1 (set) by %s\n",
 1485                        cachep2i_lockedby);
 1486         timeoutWrite(3, csocket, msg, strlen(msg));
 1487         s_free(msg);
 1488     } else {
 1489         release_cachep2i_lock();
 1490         timeoutWrite(3, csocket,
 1491                      "cachep2i_lock : 0 (not set)\n",
 1492                      strlen("cachep2i_lock : 0 (not set)\n"));
 1493     }
 1494 #ifdef HAMSTERDB
 1495     if (0 != try_ham_lock()) {
 1496         msg =
 1497             as_sprintf(__FILE__, __LINE__, "ham_lock : 1 (set) by %s\n",
 1498                        ham_lockedby);
 1499         timeoutWrite(3, csocket, msg, strlen(msg));
 1500         s_free(msg);
 1501     } else {
 1502         release_ham_lock();
 1503         timeoutWrite(3, csocket,
 1504                      "ham_lock : 0 (not set)\n",
 1505                      strlen("ham_lock : 0 (not set)\n"));
 1506     }
 1507 #endif
 1508     timeoutWrite(3, csocket,
 1509                  "---------------------\n",
 1510                  strlen("---------------------\n"));
 1511     timeoutWrite(3, csocket,
 1512                  "normally set\n\n", strlen("normally set\n\n"));
 1513 }
 1514 
 1515 void *ioctl_worker(void *arg)
 1516 {
 1517     int msocket;
 1518     int csocket;
 1519     const char *port;
 1520     const char *proto = "tcp";
 1521     struct sockaddr_un client_address;
 1522     socklen_t client_len;
 1523     char buf[1028];
 1524     char *addr;
 1525     int err = 0;
 1526     char *result;
 1527     char *message = NULL;
 1528     bool isfrozen = 0;
 1529 
 1530     msocket = -1;
 1531     while (1) {
 1532         addr = getenv("LISTEN_IP");
 1533         port = getenv("LISTEN_PORT");
 1534         if (NULL == port)
 1535             port = "100";
 1536         if (NULL == addr)
 1537             LWARNING
 1538                 ("The administration session is not limited to localhost, this is not recommended.");
 1539         msocket = serverinit(addr, port, proto);
 1540         if (msocket != -1)
 1541             break;
 1542         sleep(1);
 1543         close(msocket);
 1544     }
 1545 
 1546     client_len = sizeof(client_address);
 1547     while (1) {
 1548         csocket =
 1549             accept(msocket, (struct sockaddr *) &client_address,
 1550                    &client_len);
 1551         while (1) {
 1552             result = NULL;
 1553             if (-1 == timeoutWrite(3, csocket, ">", 1))
 1554                 break;
 1555             if (-1 == readnlstring(10, csocket, buf, 1024)) {
 1556                 result = "timeout";
 1557                 err = 1;
 1558                 break;
 1559             }
 1560             if (0 == strncmp(buf, "\r", strlen("\r")))
 1561                 continue;
 1562             if (0 == strncmp(buf, "quit\r", strlen("quit\r"))
 1563                 || 0 == strncmp(buf, "exit\r", strlen("exit\r"))) {
 1564                 err = 0;
 1565                 result = "bye";
 1566                 break;
 1567             }
 1568             if (0 == strncmp(buf, "freeze\r", strlen("freeze\r"))) {
 1569                 if (0 == isfrozen) {
 1570                     result = "All i/o is now suspended.";
 1571                     err = 0;
 1572                     get_global_lock((char *) __PRETTY_FUNCTION__);
 1573                     start_flush_commit();
 1574                     isfrozen = 1;
 1575                 } else {
 1576                     result = "i/o is already suspended.";
 1577                     err = 1;
 1578                 }
 1579             }
 1580 #ifdef BERKELEYDB
 1581             if (0 == strncmp(buf, "bdb_status\r", strlen("bdb_status\r"))) {
 1582                bdb_stat();
 1583                result = "Please look at the lessfs bdb error log.";
 1584                err=0;
 1585             }
 1586 #endif
 1587             if (0 == strncmp(buf, "defrost\r", strlen("defrost\r"))) {
 1588                 if (1 == isfrozen) {
 1589                     result = "Resuming i/o.";
 1590                     err = 0;
 1591                     end_flush_commit();
 1592                     release_global_lock();
 1593                     isfrozen = 0;
 1594                 } else {
 1595                     result = "i/o is not suspended.";
 1596                     err = 1;
 1597                 }
 1598             }
 1599 #ifndef BERKELEYDB
 1600 #ifndef HAMSTERDB
 1601             if (0 == strncmp(buf, "defrag\r", strlen("defrag\r"))) {
 1602                 result = "Resuming i/o after defragmentation.";
 1603                 err = 1;
 1604                 if (-1 ==
 1605                     timeoutWrite(3, csocket,
 1606                                  "Suspending i/o for defragmentation\n",
 1607                                  strlen
 1608                                  ("Suspending i/o for defragmentation\n")))
 1609                     break;
 1610                 err = 0;
 1611                 get_global_lock((char *) __PRETTY_FUNCTION__);
 1612                 tc_defrag();
 1613                 db_close(1);
 1614                 tc_open(1, 0, 0);
 1615                 release_global_lock();
 1616             }
 1617 #endif
 1618 #endif
 1619             if (0 == strncmp(buf, "help\r", strlen("help\r"))
 1620                 || 0 == strncmp(buf, "h\r", strlen("h\r"))) {
 1621 #ifdef BERKELEYDB
 1622                 result =
 1623                     "valid commands: bdb_status defrag defrost freeze help lockstatus quit|exit";
 1624 #else
 1625                 result =
 1626                     "valid commands: defrag defrost freeze help lockstatus quit|exit";
 1627 #endif
 1628                 err = 0;
 1629             }
 1630             if (0 == strncmp(buf, "lockstatus\r", strlen("lockstatus\r"))) {
 1631                 show_lock_status(csocket);
 1632                 result = "lockstatus listed";
 1633                 err = 0;
 1634             }
 1635             if (NULL == result) {
 1636                 err = -1;
 1637                 result = "unknown command";
 1638             }
 1639             if (err == 0) {
 1640                 message =
 1641                     as_sprintf(__FILE__, __LINE__, "+OK %s\n", result);
 1642                 if (-1 ==
 1643                     timeoutWrite(3, csocket, message, strlen(message)))
 1644                     break;
 1645             } else {
 1646                 message =
 1647                     as_sprintf(__FILE__, __LINE__, "-ERR %s\n", result);
 1648                 if (-1 ==
 1649                     timeoutWrite(3, csocket, message, strlen(message)))
 1650                     break;
 1651             }
 1652             s_free(message);
 1653             message = NULL;
 1654         }
 1655         if (message)
 1656             s_free(message);
 1657         if (err == 0) {
 1658             message = as_sprintf(__FILE__, __LINE__, "+OK %s\n", result);
 1659             timeoutWrite(3, csocket, message, strlen(message));
 1660         } else {
 1661             message = as_sprintf(__FILE__, __LINE__, "-ERR %s\n", result);
 1662             timeoutWrite(3, csocket, message, strlen(message));
 1663         }
 1664         s_free(message);
 1665         message = NULL;
 1666         close(csocket);
 1667     }
 1668     return NULL;
 1669 }
 1670 
 1671 void *init_worker(void *arg)
 1672 {
 1673     int count;
 1674     char *a;
 1675     uintptr_t p;
 1676     CCACHEDTA *ccachedta;
 1677     char *key;
 1678     int size;
 1679     int vsize;
 1680     int found[max_threads];
 1681     char *dupkey;
 1682     int index;
 1683     TCLIST *keylist;
 1684 
 1685     memcpy(&count, arg, sizeof(int));   // count is thread number.
 1686     s_free(arg);
 1687     found[count] = 0;
 1688     while (1) {
 1689         if (found[count] == 0) {
 1690             usleep(50000);
 1691             if (config->replication && config->replication_role == 0
 1692                 && 0 != strcmp(config->replication_partner_ip, "-1")) {
 1693                 if (0 == try_replbl_lock((char *) __PRETTY_FUNCTION__)) {
 1694                     send_backlog();
 1695                     release_replbl_lock();
 1696                 }
 1697             } else if (config->shutdown
 1698                        || config->replication_enabled == 0)
 1699                 config->safe_down = 1;
 1700         }
 1701         found[count] = 0;
 1702         write_lock((char *) __PRETTY_FUNCTION__);
 1703         keylist = tctreekeys(workqtree);
 1704         index = 0;
 1705         if (0 != tclistnum(keylist)) {
 1706             inobnolistsort(keylist);
 1707             key = (char *) tclistval(keylist, 0, &size);
 1708             a = (char *) tctreeget(workqtree, (void *) key, size, &vsize);
 1709             if (a) {
 1710                 memcpy(&p, a, vsize);
 1711                 ccachedta = get_ccachedta(p);
 1712                 dupkey = s_malloc(size);
 1713                 memcpy(dupkey, key, size);
 1714                 tctreeout(workqtree, key, size);
 1715                 found[count]++;
 1716                 tclistdel(keylist);
 1717                 release_write_lock();
 1718                 worker_lock((char *) __PRETTY_FUNCTION__);
 1719                 working++;
 1720                 sequence++;
 1721                 release_worker_lock();
 1722                 cook_cache(dupkey, size, ccachedta, sequence);
 1723                 s_free(dupkey);
 1724                 worker_lock((char *) __PRETTY_FUNCTION__);
 1725                 working--;
 1726                 release_worker_lock();
 1727             } else
 1728                 die_dataerr("Key without value in workers");
 1729         } else
 1730             tclistdel(keylist);
 1731         if (0 == found[count])
 1732             release_write_lock();
 1733     }
 1734     LFATAL("Thread %u exits", count);
 1735     pthread_exit(NULL);
 1736 }
 1737 
 1738 /* Write the hash for string LESSFS_DIRTY to DBU
 1739    When this hash is present during mount the filesystem
 1740    has not been unmounted cleanly previously.*/
 1741 void mark_dirty()
 1742 {
 1743     unsigned char *stiger;
 1744     char *brand;
 1745     INUSE finuse;
 1746     time_t thetime;
 1747     unsigned long long inuse;
 1748 
 1749     FUNC;
 1750     brand = as_sprintf(__FILE__, __LINE__, "LESSFS_DIRTY");
 1751     stiger = thash((unsigned char *) brand, strlen(brand));
 1752     thetime = time(NULL);
 1753     inuse = thetime;
 1754     if ( config->blockdata_io_type == TOKYOCABINET ) {
 1755         update_inuse(stiger, inuse);
 1756     } else {
 1757         finuse.inuse = BLKSIZE;
 1758         finuse.size = inuse;
 1759         finuse.offset = 0;
 1760         file_update_inuse(stiger, &finuse);
 1761     }
 1762     s_free(brand);
 1763     free(stiger);
 1764 #ifndef BERKELEYDB
 1765 #ifndef HAMSTERDB
 1766     tchdbsync(dbu);
 1767 #endif
 1768 #endif
 1769     EFUNC;
 1770     return;
 1771 }
 1772 
 1773 /* Return 1 when filesystem is dirty */
 1774 int check_dirty()
 1775 {
 1776     unsigned char *stiger;
 1777     char *brand;
 1778     unsigned long long inuse;
 1779     INUSE *finuse;
 1780     int dirty = 0;
 1781     brand = as_sprintf(__FILE__, __LINE__, "LESSFS_DIRTY");
 1782     stiger = thash((unsigned char *) brand, strlen(brand));
 1783     if (NULL == config->blockdatabs) {
 1784         finuse = file_get_inuse(stiger);
 1785         if (finuse) {
 1786             s_free(finuse);
 1787             dirty = 1;
 1788         }
 1789     } else {
 1790         inuse = getInUse(stiger);
 1791         if (0 != inuse)
 1792             dirty = 1;
 1793     }
 1794     free(stiger);
 1795     s_free(brand);
 1796     return (dirty);
 1797 }
 1798 
 1799 void check_blocksize()
 1800 {
 1801     int blksize;
 1802 
 1803     blksize = get_blocksize();
 1804     if (blksize != BLKSIZE)
 1805         die_dataerr
 1806             ("Not allowed to mount lessfs with blocksize %u when previously used with blocksize %i",
 1807              BLKSIZE, blksize);
 1808     return;
 1809 }
 1810 
 1811 // Flush data every flushtime seconds.
 1812 void *lessfs_flush(void *arg)
 1813 {
 1814     //static int masterslave = 0;
 1815     struct stat stbuf;
 1816     time_t curtime;
 1817 
 1818 
 1819     while (1) {
 1820         curtime = time(NULL);
 1821         if (config->replication) {
 1822             sleep(config->flushtime / 2);
 1823         } else
 1824             sleep(config->flushtime);
 1825 // When in slave mode the master tells when to commit.
 1826         if (config->replication != 1 || config->replication_role != 1) {
 1827             get_global_lock((char *) __PRETTY_FUNCTION__);
 1828             write_lock((char *) __PRETTY_FUNCTION__);
 1829             flush_wait(0);
 1830             purge_read_cache(0, 1, (char *) __PRETTY_FUNCTION__);
 1831             if ( config->blockdata_io_type == CHUNK_IO ) sync();
 1832             start_flush_commit();
 1833             end_flush_commit();
 1834             if (0 == strcmp(config->replication_partner_ip, "-1")
 1835                 && config->replication && config->replication_role == 0) {
 1836                 repl_lock((char *) __PRETTY_FUNCTION__);
 1837                 if (-1 == fstat(frepl, &stbuf))
 1838                     die_syserr();
 1839                 if (stbuf.st_size > config->rotate_replog_size || (curtime - config->replication_last_rotated) > REPLOG_DELAY) {
 1840                     config->replication_last_rotated = time(NULL);
 1841                     rotate_replog();
 1842                 } else fsync(frepl);
 1843                 release_repl_lock(); 
 1844             }
 1845             release_write_lock();
 1846             release_global_lock();
 1847             if (config->replication == 1 && config->replication_role == 0) {
 1848                  if (-1 == fstat(frepl, &stbuf))
 1849                      die_syserr();
 1850                  if (0 != config->max_backlog_size
 1851                      && stbuf.st_size > config->max_backlog_size) {
 1852                      LINFO("Waiting for the replication log to drain");
 1853                      trunc_lock((char *) __PRETTY_FUNCTION__);
 1854                      while (1) {
 1855                          if (-1 == fstat(frepl, &stbuf))
 1856                              die_syserr();
 1857                          if (stbuf.st_size < config->max_backlog_size)
 1858                              break;
 1859                          usleep(50000);
 1860                      }
 1861                      release_trunc_lock();
 1862                  }
 1863                  write_repl_data(WRITTEN, TRANSACTIONCOMMIT, " ", 1,
 1864                                  NULL, 0, MAX_ALLOWED_THREADS - 2);
 1865             }
 1866         }
 1867     }
 1868     pthread_exit(NULL);
 1869 }
 1870 
 1871 static void *lessfs_init()
 1872 {
 1873     unsigned int count = 0;
 1874     unsigned int *cnt;
 1875     int ret;
 1876     unsigned char *stiger;
 1877     char *hashstr;
 1878     INUSE *finuse;
 1879     struct tm *timeinfo = NULL;
 1880 
 1881     FUNC;
 1882 #ifdef LZO
 1883     initlzo();
 1884 #endif
 1885     if (getenv("MAX_THREADS"))
 1886         max_threads = atoi(getenv("MAX_THREADS"));
 1887     if (max_threads > MAX_ALLOWED_THREADS - 1)
 1888         die_dataerr
 1889             ("Configuration error : MAX_ALLOWED_THREADS should be less then %i",
 1890              MAX_ALLOWED_THREADS - 1);
 1891     pthread_t worker_thread[max_threads];
 1892     pthread_t ioctl_thread;
 1893     pthread_t replication_thread;
 1894     pthread_t housekeeping_thread;
 1895     pthread_t flush_thread;
 1896 
 1897     for (count = 0; count < max_threads; count++) {
 1898         cnt = s_malloc(sizeof(int));
 1899         memcpy(cnt, &count, sizeof(int));
 1900         ret =
 1901             pthread_create(&(worker_thread[count]), NULL, init_worker,
 1902                            (void *) cnt);
 1903         if (ret != 0)
 1904             die_syserr();
 1905         if (0 != pthread_detach(worker_thread[count]))
 1906             die_syserr();
 1907     }
 1908     ret = pthread_create(&ioctl_thread, NULL, ioctl_worker, (void *) NULL);
 1909     if (ret != 0)
 1910         die_syserr();
 1911     if (0 != pthread_detach(ioctl_thread))
 1912         die_syserr();
 1913 
 1914     if (config->replication) {
 1915         if (config->replication_role == 1) {
 1916 // A slave needs a listener.
 1917             ret =
 1918                 pthread_create(&replication_thread, NULL,
 1919                                replication_worker, (void *) NULL);
 1920             if (ret != 0)
 1921                 die_syserr();
 1922             if (0 != pthread_detach(replication_thread))
 1923                 die_syserr();
 1924         }
 1925     }
 1926 
 1927     ret =
 1928         pthread_create(&housekeeping_thread, NULL, housekeeping_worker,
 1929                        (void *) NULL);
 1930     if (ret != 0)
 1931         die_syserr();
 1932     if (0 != pthread_detach(housekeeping_thread))
 1933         die_syserr();
 1934 
 1935     ret = pthread_create(&flush_thread, NULL, lessfs_flush, (void *) NULL);
 1936     if (ret != 0)
 1937         die_syserr();
 1938     if (0 != pthread_detach(flush_thread))
 1939         die_syserr();
 1940     check_blocksize();
 1941     hashstr =
 1942         as_sprintf(__FILE__, __LINE__, "%s%i", config->hash,
 1943                    config->hashlen);
 1944     stiger = thash((unsigned char *) hashstr, strlen(hashstr));
 1945     if (NULL == config->blockdatabs) {
 1946         if (NULL == (finuse = file_get_inuse(stiger))) {
 1947             die_dataerr
 1948                 ("Invalid hashsize or hash found, do not change hash or hashsize after formatting lessfs.");
 1949         } else
 1950             s_free(finuse);
 1951     } else {
 1952         if (0 == getInUse(stiger))
 1953             die_dataerr
 1954                 ("Invalid hashsize or hash found, do not change hash or hashsize after formatting lessfs.");
 1955     }
 1956     s_free(hashstr);
 1957     free(stiger);
 1958 
 1959     timeinfo = init_transactions();
 1960 
 1961     if (check_dirty()) {
 1962         LINFO("Lessfs has not been unmounted cleanly.");
 1963         if (config->transactions) {
 1964             LINFO("Rollback to : %s", asctime(timeinfo));
 1965             write_repl_data(WRITTEN, TRANSACTIONABORT, " ", 1,
 1966                             NULL, 0, MAX_ALLOWED_THREADS - 2);
 1967         } else
 1968             LINFO
 1969                 ("Lessfs has not been unmounted cleanly, you are advised to run lessfsck.");
 1970     } else {
 1971         LINFO("The filesystem is clean.");
 1972         if (config->transactions) {
 1973             LINFO("Last used at : %s", asctime(timeinfo));
 1974         }
 1975         mark_dirty();
 1976     }
 1977     config->lfsstats = s_zmalloc(2);
 1978     if (config->replication != 1 || config->replication_role != 1) {
 1979 #ifdef BERKELEYDB
 1980        bdb_restart_truncation();
 1981 #else
 1982 #ifdef HAMSTERDB
 1983        hm_restart_truncation();
 1984 #else
 1985        tc_restart_truncation();
 1986 #endif
 1987 #endif
 1988     }
 1989     EFUNC;
 1990     return NULL;
 1991 }
 1992 
 1993 static struct fuse_operations lessfs_oper = {
 1994     .getattr = lessfs_getattr,
 1995     .access = lessfs_access,
 1996     .readlink = lessfs_readlink,
 1997     .readdir = lessfs_readdir,
 1998     .mknod = lessfs_mknod,
 1999     .mkdir = lessfs_mkdir,
 2000     .symlink = lessfs_symlink,
 2001     .unlink = lessfs_unlink,
 2002     .rmdir = lessfs_rmdir,
 2003     .rename = lessfs_rename,
 2004     .link = lessfs_link,
 2005     .chmod = lessfs_chmod,
 2006     .chown = lessfs_chown,
 2007     .truncate = lessfs_truncate,
 2008     .utimens = lessfs_utimens,
 2009     .open = lessfs_open,
 2010     .read = lessfs_read,
 2011     .write = lessfs_write,
 2012     .statfs = lessfs_statfs,
 2013     .release = lessfs_release,
 2014     .fsync = lessfs_fsync,
 2015     .destroy = lessfs_destroy,
 2016     .init = lessfs_init,
 2017 };
 2018 
 2019 void usage(char *appName)
 2020 {
 2021     char **argv = (char **) s_malloc(2 * sizeof(char *));
 2022     argv[0] = appName;
 2023     argv[1] = (char *) s_malloc(3 * sizeof(char));
 2024     memcpy(argv[1], "-h\0", 3);
 2025     fuse_main(2, argv, &lessfs_oper, NULL);
 2026     FUNC;
 2027     printf("\n"
 2028            "-------------------------------------------------------\n"
 2029            "lessfs %s\n"
 2030            "\n"
 2031            "Usage: %s [/path_to_config.cfg] [mount_point] <FUSE OPTIONS>\n"
 2032            "\n"
 2033            "Example :\nmklessfs /etc/lessfs.cfg \nlessfs   /etc/lessfs.cfg /mnt\n\n"
 2034            "A high performance example with big_writes.\n(Requires kernel 2.6.26 or higher and a recent version of fuse.)\n"
 2035            "lessfs /etc/lessfs.cfg /fuse -o use_ino,readdir_ino,default_permissions,\\\n       allow_other,big_writes,max_read=131072,max_write=131072\n"
 2036            "-------------------------------------------------------\n",
 2037            LFSVERSION, appName);
 2038     exit(EXIT_USAGE);
 2039 }
 2040 
 2041 int verify_kernel_version()
 2042 {
 2043     struct utsname un;
 2044     char *begin;
 2045     char *end;
 2046     int count;
 2047 
 2048     uname(&un);
 2049     begin = un.release;
 2050 
 2051     for (count = 0; count < 3; count++) {
 2052         end = strchr(begin, '.');
 2053         if (end) {
 2054             end[0] = 0;
 2055             end++;
 2056         }
 2057         if (count == 0 && atoi(begin) < 2)
 2058             return (-1);
 2059         if (count == 0 && atoi(begin) > 2)
 2060             break;
 2061         if (count == 1 && atoi(begin) < 6)
 2062             return (-2);
 2063         if (count == 1 && atoi(begin) > 6)
 2064             break;
 2065         if (count == 2 && atoi(begin) < 26)
 2066             return (-3);
 2067         begin = end;
 2068     }
 2069     return (0);
 2070 }
 2071 
 2072 int main(int argc, char *argv[])
 2073 {
 2074     int res;
 2075     char *p=NULL, *maxwrite, *maxread;
 2076     char **argv_new = (char **) s_malloc(argc * sizeof(char *));
 2077     int argc_new = argc - 1;
 2078     struct rlimit lessfslimit;
 2079 
 2080     FUNC;
 2081 
 2082     if ((argc > 1) && (strcmp(argv[1], "-h") == 0)) {
 2083         usage(argv[0]);
 2084     }
 2085 
 2086     if (argc < 3) {
 2087         usage(argv[0]);
 2088     }
 2089 
 2090     if (-1 == r_env_cfg(argv[1]))
 2091         usage(argv[0]);
 2092 
 2093     argv_new[0] = argv[0];
 2094     int i;
 2095     for (i = 1; i < argc - 1; i++) {
 2096         argv_new[i] = argv[i + 1];
 2097         if (strstr(argv[i + 1], "big_writes")) {
 2098             maxwrite = strstr(argv[i + 1], "max_write=");
 2099             maxread = strstr(argv[i + 1], "max_read=");
 2100             if (maxwrite && maxread) {
 2101                 p = strchr(maxwrite, '=');
 2102                 p++;
 2103                 BLKSIZE = atoi(p);
 2104                 p = strchr(maxread, '=');
 2105                 p++;
 2106                 if (atoi(p) != BLKSIZE) {
 2107                     LFATAL
 2108                         ("lessfs : Supplied values for max_read and max_write must match.");
 2109                     fprintf(stderr,
 2110                             "Supplied values for max_read and max_write must match.\n");
 2111                     exit(EXIT_SYSTEM);
 2112                 }
 2113                 if (BLKSIZE > 4096 && 0 != verify_kernel_version()) {
 2114                     LFATAL
 2115                         ("The kernel used is to old for larger then 4k blocksizes, kernel >= 2.6.26 is required.");
 2116                     exit(EXIT_SYSTEM);
 2117                 }
 2118             } else {
 2119                 LFATAL
 2120                     ("lessfs : big_writes specified without max_write or max_read.");
 2121                 fprintf(stderr,
 2122                         "big_writes specified without max_write or max_read.\n");
 2123                 exit(EXIT_SYSTEM);
 2124             }
 2125         }
 2126     }
 2127 // Enable dumping of core for debugging.
 2128    if (getenv("COREDUMPSIZE")) {
 2129         lessfslimit.rlim_cur = lessfslimit.rlim_max =
 2130             atoi(getenv("COREDUMPSIZE"));
 2131         if (0 != setrlimit(RLIMIT_CORE, &lessfslimit)) {
 2132             fprintf(stderr, "Failed to set COREDUMPSIZE to %i : error %s",
 2133                     atoi(getenv("COREDUMPSIZE")), strerror(errno));
 2134             exit(EXIT_SYSTEM);
 2135         }
 2136     } else {
 2137         signal(SIGSEGV, segvExit);
 2138     }
 2139     LDEBUG("lessfs : blocksize is set to %u", BLKSIZE);
 2140     signal(SIGHUP, normalExit);
 2141     signal(SIGTERM, normalExit);
 2142     signal(SIGALRM, normalExit);
 2143     signal(SIGINT, normalExit);
 2144     signal(SIGUSR1, libSafeExit);
 2145     if (getenv("DEBUG"))
 2146         debug = atoi(getenv("DEBUG"));
 2147     parseconfig(0, 0);
 2148 
 2149     struct fuse_args args = FUSE_ARGS_INIT(argc_new, argv_new);
 2150     fuse_opt_parse(&args, NULL, NULL, NULL);
 2151     p = as_sprintf(__FILE__, __LINE__, "-omax_readahead=128,max_write=%u,max_read=%u",
 2152                    BLKSIZE, BLKSIZE);
 2153     fuse_opt_add_arg(&args, p);
 2154     s_free(p);
 2155     if (BLKSIZE > 4096) {
 2156         if (0 != verify_kernel_version()) {
 2157             LFATAL
 2158                 ("The kernel used is to old for larger then 4k blocksizes, kernel >= 2.6.26 is required.");
 2159             exit(EXIT_SYSTEM);
 2160         }
 2161         fuse_opt_add_arg(&args, "-obig_writes");
 2162     }
 2163     fuse_opt_add_arg(&args,
 2164                      "-ohard_remove,kernel_cache,negative_timeout=0,entry_timeout=0,attr_timeout=1,use_ino,readdir_ino,default_permissions,allow_other");
 2165     s_free(argv_new);
 2166     return fuse_main(args.argc, args.argv, &lessfs_oper, NULL);
 2167     return (res);
 2168 }