"Fossies" - the Fresh Open Source Software Archive

Member "lrzip-0.641/stream.c" (5 Mar 2021, 56493 Bytes) of package /linux/misc/lrzip-0.641.tar.xz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "stream.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 0.640_vs_0.641.

    1 /*
    2    Copyright (C) 2006-2016,2018,2021 Con Kolivas
    3    Copyright (C) 2011 Serge Belyshev
    4    Copyright (C) 2011 Peter Hyman
    5    Copyright (C) 1998 Andrew Tridgell
    6 
    7    This program is free software; you can redistribute it and/or modify
    8    it under the terms of the GNU General Public License as published by
    9    the Free Software Foundation; either version 2 of the License, or
   10    (at your option) any later version.
   11 
   12    This program is distributed in the hope that it will be useful,
   13    but WITHOUT ANY WARRANTY; without even the implied warranty of
   14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   15    GNU General Public License for more details.
   16 
   17    You should have received a copy of the GNU General Public License
   18    along with this program. If not, see <http://www.gnu.org/licenses/>.
   19 */
   20 /* multiplex N streams into a file - the streams are passed
   21    through different compressors */
   22 
   23 #ifdef HAVE_CONFIG_H
   24 # include "config.h"
   25 #endif
   26 
   27 #ifdef HAVE_SYS_TIME_H
   28 # include <sys/time.h>
   29 #endif
   30 #ifdef HAVE_SYS_TYPES_H
   31 # include <sys/types.h>
   32 #endif
   33 #ifdef HAVE_SYS_RESOURCE_H
   34 # include <sys/resource.h>
   35 #endif
   36 #ifdef HAVE_UNISTD_H
   37 # include <unistd.h>
   38 #endif
   39 #include <sys/statvfs.h>
   40 #include <pthread.h>
   41 #include <bzlib.h>
   42 #include <zlib.h>
   43 #include <lzo/lzoconf.h>
   44 #include <lzo/lzo1x.h>
   45 #include <lz4.h>
   46 #ifdef HAVE_ERRNO_H
   47 # include <errno.h>
   48 #endif
   49 #ifdef HAVE_ENDIAN_H
   50 # include <endian.h>
   51 #elif HAVE_SYS_ENDIAN_H
   52 # include <sys/endian.h>
   53 #endif
   54 #ifdef HAVE_ARPA_INET_H
   55 # include <arpa/inet.h>
   56 #endif
   57 
   58 /* LZMA C Wrapper */
   59 #include "lzma/C/LzmaLib.h"
   60 
   61 #include "util.h"
   62 #include "lrzip_core.h"
   63 
   64 #define STREAM_BUFSIZE (1024 * 1024 * 10)
   65 
   66 static struct compress_thread {
   67     uchar *s_buf;   /* Uncompressed buffer -> Compressed buffer */
   68     uchar c_type;   /* Compression type */
   69     i64 s_len;  /* Data length uncompressed */
   70     i64 c_len;  /* Data length compressed */
   71     cksem_t cksem;  /* This thread's semaphore */
   72     struct stream_info *sinfo;
   73     int streamno;
   74     uchar salt[SALT_LEN];
   75 } *cthreads;
   76 
   77 typedef struct stream_thread_struct {
   78     int i;
   79     rzip_control *control;
   80     struct stream_info *sinfo;
   81 } stream_thread_struct;
   82 
   83 static long output_thread;
   84 static pthread_mutex_t output_lock = PTHREAD_MUTEX_INITIALIZER;
   85 static pthread_cond_t output_cond = PTHREAD_COND_INITIALIZER;
   86 
   87 bool init_mutex(rzip_control *control, pthread_mutex_t *mutex)
   88 {
   89     if (unlikely(pthread_mutex_init(mutex, NULL)))
   90         fatal_return(("Failed to pthread_mutex_init\n"), false);
   91     return true;
   92 }
   93 
   94 bool unlock_mutex(rzip_control *control, pthread_mutex_t *mutex)
   95 {
   96     if (unlikely(pthread_mutex_unlock(mutex)))
   97         fatal_return(("Failed to pthread_mutex_unlock\n"), false);
   98     return true;
   99 }
  100 
  101 bool lock_mutex(rzip_control *control, pthread_mutex_t *mutex)
  102 {
  103     if (unlikely(pthread_mutex_lock(mutex)))
  104         fatal_return(("Failed to pthread_mutex_lock\n"), false);
  105     return true;
  106 }
  107 
  108 static bool cond_wait(rzip_control *control, pthread_cond_t *cond, pthread_mutex_t *mutex)
  109 {
  110     if (unlikely(pthread_cond_wait(cond, mutex)))
  111         fatal_return(("Failed to pthread_cond_wait\n"), false);
  112     return true;
  113 }
  114 
  115 static bool cond_broadcast(rzip_control *control, pthread_cond_t *cond)
  116 {
  117     if (unlikely(pthread_cond_broadcast(cond)))
  118         fatal_return(("Failed to pthread_cond_broadcast\n"), false);
  119     return true;
  120 }
  121 
  122 bool create_pthread(rzip_control *control, pthread_t *thread, pthread_attr_t * attr,
  123     void * (*start_routine)(void *), void *arg)
  124 {
  125     if (unlikely(pthread_create(thread, attr, start_routine, arg)))
  126         fatal_return(("Failed to pthread_create\n"), false);
  127     return true;
  128 }
  129 
  130 bool detach_pthread(rzip_control *control, pthread_t *thread)
  131 {
  132     if (unlikely(pthread_detach(*thread)))
  133         fatal_return(("Failed to pthread_detach\n"), false);
  134     return true;
  135 }
  136 
  137 bool join_pthread(rzip_control *control, pthread_t th, void **thread_return)
  138 {
  139     if (pthread_join(th, thread_return))
  140         fatal_return(("Failed to pthread_join\n"), false);
  141     return true;
  142 }
  143 
  144 /* just to keep things clean, declare function here
  145  * but move body to the end since it's a work function
  146 */
  147 static int lz4_compresses(rzip_control *control, uchar *s_buf, i64 s_len);
  148 
  149 /*
  150   ***** COMPRESSION FUNCTIONS *****
  151 
  152   ZPAQ, BZIP, GZIP, LZMA, LZO
  153 
  154   try to compress a buffer. If compression fails for whatever reason then
  155   leave uncompressed. Return the compression type in c_type and resulting
  156   length in c_len
  157 */
  158 
  159 static int zpaq_compress_buf(rzip_control *control, struct compress_thread *cthread, long thread)
  160 {
  161     i64 c_len, c_size;
  162     uchar *c_buf;
  163 
  164     if (!lz4_compresses(control, cthread->s_buf, cthread->s_len))
  165         return 0;
  166 
  167     c_size = round_up_page(control, cthread->s_len + 10000);
  168     c_buf = malloc(c_size);
  169     if (!c_buf) {
  170         print_err("Unable to allocate c_buf in zpaq_compress_buf\n");
  171         return -1;
  172     }
  173 
  174     c_len = 0;
  175 
  176     zpaq_compress(c_buf, &c_len, cthread->s_buf, cthread->s_len, control->compression_level / 4 + 1,
  177               control->msgout, SHOW_PROGRESS ? true: false, thread);
  178 
  179     if (unlikely(c_len >= cthread->c_len)) {
  180         print_maxverbose("Incompressible block\n");
  181         /* Incompressible, leave as CTYPE_NONE */
  182         dealloc(c_buf);
  183         return 0;
  184     }
  185 
  186     cthread->c_len = c_len;
  187     dealloc(cthread->s_buf);
  188     cthread->s_buf = c_buf;
  189     cthread->c_type = CTYPE_ZPAQ;
  190     return 0;
  191 }
  192 
  193 static int bzip2_compress_buf(rzip_control *control, struct compress_thread *cthread)
  194 {
  195     u32 dlen = round_up_page(control, cthread->s_len);
  196     int bzip2_ret;
  197     uchar *c_buf;
  198 
  199     if (!lz4_compresses(control, cthread->s_buf, cthread->s_len))
  200         return 0;
  201 
  202     c_buf = malloc(dlen);
  203     if (!c_buf) {
  204         print_err("Unable to allocate c_buf in bzip2_compress_buf\n");
  205         return -1;
  206     }
  207 
  208     bzip2_ret = BZ2_bzBuffToBuffCompress((char *)c_buf, &dlen,
  209         (char *)cthread->s_buf, cthread->s_len,
  210         control->compression_level, 0, control->compression_level * 10);
  211 
  212     /* if compressed data is bigger then original data leave as
  213      * CTYPE_NONE */
  214 
  215     if (bzip2_ret == BZ_OUTBUFF_FULL) {
  216         print_maxverbose("Incompressible block\n");
  217         /* Incompressible, leave as CTYPE_NONE */
  218         dealloc(c_buf);
  219         return 0;
  220     }
  221 
  222     if (unlikely(bzip2_ret != BZ_OK)) {
  223         dealloc(c_buf);
  224         print_maxverbose("BZ2 compress failed\n");
  225         return -1;
  226     }
  227 
  228     if (unlikely(dlen >= cthread->c_len)) {
  229         print_maxverbose("Incompressible block\n");
  230         /* Incompressible, leave as CTYPE_NONE */
  231         dealloc(c_buf);
  232         return 0;
  233     }
  234 
  235     cthread->c_len = dlen;
  236     dealloc(cthread->s_buf);
  237     cthread->s_buf = c_buf;
  238     cthread->c_type = CTYPE_BZIP2;
  239     return 0;
  240 }
  241 
  242 static int gzip_compress_buf(rzip_control *control, struct compress_thread *cthread)
  243 {
  244     unsigned long dlen = round_up_page(control, cthread->s_len);
  245     uchar *c_buf;
  246     int gzip_ret;
  247 
  248     c_buf = malloc(dlen);
  249     if (!c_buf) {
  250         print_err("Unable to allocate c_buf in gzip_compress_buf\n");
  251         return -1;
  252     }
  253 
  254     gzip_ret = compress2(c_buf, &dlen, cthread->s_buf, cthread->s_len,
  255         control->compression_level);
  256 
  257     /* if compressed data is bigger then original data leave as
  258      * CTYPE_NONE */
  259 
  260     if (gzip_ret == Z_BUF_ERROR) {
  261         print_maxverbose("Incompressible block\n");
  262         /* Incompressible, leave as CTYPE_NONE */
  263         dealloc(c_buf);
  264         return 0;
  265     }
  266 
  267     if (unlikely(gzip_ret != Z_OK)) {
  268         dealloc(c_buf);
  269         print_maxverbose("compress2 failed\n");
  270         return -1;
  271     }
  272 
  273     if (unlikely((i64)dlen >= cthread->c_len)) {
  274         print_maxverbose("Incompressible block\n");
  275         /* Incompressible, leave as CTYPE_NONE */
  276         dealloc(c_buf);
  277         return 0;
  278     }
  279 
  280     cthread->c_len = dlen;
  281     dealloc(cthread->s_buf);
  282     cthread->s_buf = c_buf;
  283     cthread->c_type = CTYPE_GZIP;
  284     return 0;
  285 }
  286 
  287 static int lzma_compress_buf(rzip_control *control, struct compress_thread *cthread)
  288 {
  289     unsigned char lzma_properties[5]; /* lzma properties, encoded */
  290     int lzma_level, lzma_ret;
  291     size_t prop_size = 5; /* return value for lzma_properties */
  292     uchar *c_buf;
  293     size_t dlen;
  294 
  295     if (!lz4_compresses(control, cthread->s_buf, cthread->s_len))
  296         return 0;
  297 
  298     /* only 7 levels with lzma, scale them */
  299     lzma_level = control->compression_level * 7 / 9;
  300     if (!lzma_level)
  301         lzma_level = 1;
  302 
  303     print_maxverbose("Starting lzma back end compression thread...\n");
  304 retry:
  305     dlen = round_up_page(control, cthread->s_len);
  306     c_buf = malloc(dlen);
  307     if (!c_buf) {
  308         print_err("Unable to allocate c_buf in lzma_compress_buf\n");
  309         return -1;
  310     }
  311 
  312     /* with LZMA SDK 4.63, we pass compression level and threads only
  313      * and receive properties in lzma_properties */
  314 
  315     lzma_ret = LzmaCompress(c_buf, &dlen, cthread->s_buf,
  316         (size_t)cthread->s_len, lzma_properties, &prop_size,
  317                 lzma_level,
  318                 0, /* dict size. set default, choose by level */
  319                 -1, -1, -1, -1, /* lc, lp, pb, fb */
  320                 control->threads > 1 ? 2: 1);
  321                 /* LZMA spec has threads = 1 or 2 only. */
  322     if (lzma_ret != SZ_OK) {
  323         switch (lzma_ret) {
  324             case SZ_ERROR_MEM:
  325                 break;
  326             case SZ_ERROR_PARAM:
  327                 print_err("LZMA Parameter ERROR: %d. This should not happen.\n", SZ_ERROR_PARAM);
  328                 break;
  329             case SZ_ERROR_OUTPUT_EOF:
  330                 print_maxverbose("Harmless LZMA Output Buffer Overflow error: %d. Incompressible block.\n", SZ_ERROR_OUTPUT_EOF);
  331                 break;
  332             case SZ_ERROR_THREAD:
  333                 print_err("LZMA Multi Thread ERROR: %d. This should not happen.\n", SZ_ERROR_THREAD);
  334                 break;
  335             default:
  336                 print_err("Unidentified LZMA ERROR: %d. This should not happen.\n", lzma_ret);
  337                 break;
  338         }
  339         /* can pass -1 if not compressible! Thanks Lasse Collin */
  340         dealloc(c_buf);
  341         if (lzma_ret == SZ_ERROR_MEM) {
  342             if (lzma_level > 1) {
  343                 lzma_level--;
  344                 print_verbose("LZMA Warning: %d. Can't allocate enough RAM for compression window, trying smaller.\n", SZ_ERROR_MEM);
  345                 goto retry;
  346             }
  347             /* lzma compress can be fragile on 32 bit. If it fails,
  348              * fall back to bzip2 compression so the block doesn't
  349              * remain uncompressed */
  350             print_verbose("Unable to allocate enough RAM for any sized compression window, falling back to bzip2 compression.\n");
  351             return bzip2_compress_buf(control, cthread);
  352         } else if (lzma_ret == SZ_ERROR_OUTPUT_EOF)
  353             return 0;
  354         return -1;
  355     }
  356 
  357     if (unlikely((i64)dlen >= cthread->c_len)) {
  358         /* Incompressible, leave as CTYPE_NONE */
  359         print_maxverbose("Incompressible block\n");
  360         dealloc(c_buf);
  361         return 0;
  362     }
  363 
  364     /* Make sure multiple threads don't race on writing lzma_properties */
  365     lock_mutex(control, &control->control_lock);
  366     if (!control->lzma_prop_set) {
  367         memcpy(control->lzma_properties, lzma_properties, 5);
  368         control->lzma_prop_set = true;
  369         /* Reset the magic written flag so we write it again if we
  370          * get lzma properties and haven't written them yet. */
  371         if (TMP_OUTBUF)
  372             control->magic_written = 0;
  373     }
  374     unlock_mutex(control, &control->control_lock);
  375 
  376     cthread->c_len = dlen;
  377     dealloc(cthread->s_buf);
  378     cthread->s_buf = c_buf;
  379     cthread->c_type = CTYPE_LZMA;
  380     return 0;
  381 }
  382 
  383 static int lzo_compress_buf(rzip_control *control, struct compress_thread *cthread)
  384 {
  385     lzo_uint in_len = cthread->s_len;
  386     lzo_uint dlen = round_up_page(control, in_len + in_len / 16 + 64 + 3);
  387     lzo_bytep wrkmem;
  388     uchar *c_buf;
  389     int ret = -1;
  390 
  391     wrkmem = (lzo_bytep) calloc(1, LZO1X_1_MEM_COMPRESS);
  392     if (unlikely(wrkmem == NULL)) {
  393         print_maxverbose("Failed to malloc wkmem\n");
  394         return ret;
  395     }
  396 
  397     c_buf = malloc(dlen);
  398     if (!c_buf) {
  399         print_err("Unable to allocate c_buf in lzo_compress_buf");
  400         goto out_free;
  401     }
  402 
  403     /* lzo1x_1_compress does not return anything but LZO_OK so we ignore
  404      * the return value */
  405     lzo1x_1_compress(cthread->s_buf, in_len, c_buf, &dlen, wrkmem);
  406     ret = 0;
  407 
  408     if (dlen >= in_len){
  409         /* Incompressible, leave as CTYPE_NONE */
  410         print_maxverbose("Incompressible block\n");
  411         dealloc(c_buf);
  412         goto out_free;
  413     }
  414 
  415     cthread->c_len = dlen;
  416     dealloc(cthread->s_buf);
  417     cthread->s_buf = c_buf;
  418     cthread->c_type = CTYPE_LZO;
  419 out_free:
  420     dealloc(wrkmem);
  421     return ret;
  422 }
  423 
  424 /*
  425   ***** DECOMPRESSION FUNCTIONS *****
  426 
  427   ZPAQ, BZIP, GZIP, LZMA, LZO
  428 
  429   try to decompress a buffer. Return 0 on success and -1 on failure.
  430 */
  431 static int zpaq_decompress_buf(rzip_control *control __UNUSED__, struct uncomp_thread *ucthread, long thread)
  432 {
  433     i64 dlen = ucthread->u_len;
  434     uchar *c_buf;
  435     int ret = 0;
  436 
  437     c_buf = ucthread->s_buf;
  438     ucthread->s_buf = malloc(round_up_page(control, dlen));
  439     if (unlikely(!ucthread->s_buf)) {
  440         print_err("Failed to allocate %ld bytes for decompression\n", dlen);
  441         ret = -1;
  442         goto out;
  443     }
  444 
  445     dlen = 0;
  446     zpaq_decompress(ucthread->s_buf, &dlen, c_buf, ucthread->c_len,
  447             control->msgout, SHOW_PROGRESS ? true: false, thread);
  448 
  449     if (unlikely(dlen != ucthread->u_len)) {
  450         print_err("Inconsistent length after decompression. Got %ld bytes, expected %lld\n", dlen, ucthread->u_len);
  451         ret = -1;
  452     } else
  453         dealloc(c_buf);
  454 out:
  455     if (ret == -1) {
  456         dealloc(ucthread->s_buf);
  457         ucthread->s_buf = c_buf;
  458     }
  459     return ret;
  460 }
  461 
  462 static int bzip2_decompress_buf(rzip_control *control __UNUSED__, struct uncomp_thread *ucthread)
  463 {
  464     u32 dlen = ucthread->u_len;
  465     int ret = 0, bzerr;
  466     uchar *c_buf;
  467 
  468     c_buf = ucthread->s_buf;
  469     ucthread->s_buf = malloc(round_up_page(control, dlen));
  470     if (unlikely(!ucthread->s_buf)) {
  471         print_err("Failed to allocate %d bytes for decompression\n", dlen);
  472         ret = -1;
  473         goto out;
  474     }
  475 
  476     bzerr = BZ2_bzBuffToBuffDecompress((char*)ucthread->s_buf, &dlen, (char*)c_buf, ucthread->c_len, 0, 0);
  477     if (unlikely(bzerr != BZ_OK)) {
  478         print_err("Failed to decompress buffer - bzerr=%d\n", bzerr);
  479         ret = -1;
  480         goto out;
  481     }
  482 
  483     if (unlikely(dlen != ucthread->u_len)) {
  484         print_err("Inconsistent length after decompression. Got %d bytes, expected %lld\n", dlen, ucthread->u_len);
  485         ret = -1;
  486     } else
  487         dealloc(c_buf);
  488 out:
  489     if (ret == -1) {
  490         dealloc(ucthread->s_buf);
  491         ucthread->s_buf = c_buf;
  492     }
  493     return ret;
  494 }
  495 
  496 static int gzip_decompress_buf(rzip_control *control __UNUSED__, struct uncomp_thread *ucthread)
  497 {
  498     unsigned long dlen = ucthread->u_len;
  499     int ret = 0, gzerr;
  500     uchar *c_buf;
  501 
  502     c_buf = ucthread->s_buf;
  503     ucthread->s_buf = malloc(round_up_page(control, dlen));
  504     if (unlikely(!ucthread->s_buf)) {
  505         print_err("Failed to allocate %ld bytes for decompression\n", dlen);
  506         ret = -1;
  507         goto out;
  508     }
  509 
  510     gzerr = uncompress(ucthread->s_buf, &dlen, c_buf, ucthread->c_len);
  511     if (unlikely(gzerr != Z_OK)) {
  512         print_err("Failed to decompress buffer - gzerr=%d\n", gzerr);
  513         ret = -1;
  514         goto out;
  515     }
  516 
  517     if (unlikely((i64)dlen != ucthread->u_len)) {
  518         print_err("Inconsistent length after decompression. Got %ld bytes, expected %lld\n", dlen, ucthread->u_len);
  519         ret = -1;
  520     } else
  521         dealloc(c_buf);
  522 out:
  523     if (ret == -1) {
  524         dealloc(ucthread->s_buf);
  525         ucthread->s_buf = c_buf;
  526     }
  527     return ret;
  528 }
  529 
  530 static int lzma_decompress_buf(rzip_control *control, struct uncomp_thread *ucthread)
  531 {
  532     size_t dlen = ucthread->u_len;
  533     int ret = 0, lzmaerr;
  534     uchar *c_buf;
  535     SizeT c_len = ucthread->c_len;
  536 
  537     c_buf = ucthread->s_buf;
  538     ucthread->s_buf = malloc(round_up_page(control, dlen));
  539     if (unlikely(!ucthread->s_buf)) {
  540         print_err("Failed to allocate %lld bytes for decompression\n", (i64)dlen);
  541         ret = -1;
  542         goto out;
  543     }
  544 
  545     /* With LZMA SDK 4.63 we pass control->lzma_properties
  546      * which is needed for proper uncompress */
  547     lzmaerr = LzmaUncompress(ucthread->s_buf, &dlen, c_buf, &c_len, control->lzma_properties, 5);
  548     if (unlikely(lzmaerr)) {
  549         print_err("Failed to decompress buffer - lzmaerr=%d\n", lzmaerr);
  550         ret = -1;
  551         goto out;
  552     }
  553 
  554     if (unlikely((i64)dlen != ucthread->u_len)) {
  555         print_err("Inconsistent length after decompression. Got %lld bytes, expected %lld\n", (i64)dlen, ucthread->u_len);
  556         ret = -1;
  557     } else
  558         dealloc(c_buf);
  559 out:
  560     if (ret == -1) {
  561         dealloc(ucthread->s_buf);
  562         ucthread->s_buf = c_buf;
  563     }
  564     return ret;
  565 }
  566 
  567 static int lzo_decompress_buf(rzip_control *control __UNUSED__, struct uncomp_thread *ucthread)
  568 {
  569     lzo_uint dlen = ucthread->u_len;
  570     int ret = 0, lzerr;
  571     uchar *c_buf;
  572 
  573     c_buf = ucthread->s_buf;
  574     ucthread->s_buf = malloc(round_up_page(control, dlen));
  575     if (unlikely(!ucthread->s_buf)) {
  576         print_err("Failed to allocate %lu bytes for decompression\n", (unsigned long)dlen);
  577         ret = -1;
  578         goto out;
  579     }
  580 
  581     lzerr = lzo1x_decompress_safe((uchar*)c_buf, ucthread->c_len, (uchar*)ucthread->s_buf, &dlen, NULL);
  582     if (unlikely(lzerr != LZO_E_OK)) {
  583         print_err("Failed to decompress buffer - lzerr=%d\n", lzerr);
  584         ret = -1;
  585         goto out;
  586     }
  587 
  588     if (unlikely((i64)dlen != ucthread->u_len)) {
  589         print_err("Inconsistent length after decompression. Got %lu bytes, expected %lld\n", (unsigned long)dlen, ucthread->u_len);
  590         ret = -1;
  591     } else
  592         dealloc(c_buf);
  593 out:
  594     if (ret == -1) {
  595         dealloc(ucthread->s_buf);
  596         ucthread->s_buf = c_buf;
  597     }
  598     return ret;
  599 }
  600 
  601 /* WORK FUNCTIONS */
  602 
  603 /* Look at whether we're writing to a ram location or physical files and write
  604  * the data accordingly. */
  605 ssize_t put_fdout(rzip_control *control, void *offset_buf, ssize_t ret)
  606 {
  607     if (!TMP_OUTBUF)
  608         return write(control->fd_out, offset_buf, (size_t)ret);
  609 
  610     if (unlikely(control->out_ofs + ret > control->out_maxlen)) {
  611         /* The data won't fit in a temporary output buffer so we have
  612          * to fall back to temporary files. */
  613         print_verbose("Unable to decompress entirely in ram, will use physical files\n");
  614         if (unlikely(control->fd_out == -1))
  615             failure("Was unable to decompress entirely in ram and no temporary file creation was possible\n");
  616         if (unlikely(!write_fdout(control, control->tmp_outbuf, control->out_len))) {
  617             print_err("Unable to write_fdout tmpoutbuf in put_fdout\n");
  618             return -1;
  619         }
  620         close_tmpoutbuf(control);
  621         if (unlikely(!write_fdout(control, offset_buf, ret))) {
  622             print_err("Unable to write_fdout offset_buf in put_fdout\n");
  623             return -1;
  624         }
  625         return ret;
  626     }
  627     memcpy(control->tmp_outbuf + control->out_ofs, offset_buf, ret);
  628     control->out_ofs += ret;
  629     if (likely(control->out_ofs > control->out_len))
  630         control->out_len = control->out_ofs;
  631     return ret;
  632 }
  633 
  634 /* This is a custom version of write() which writes in 1GB chunks to avoid
  635    the overflows at the >= 2GB mark thanks to 32bit fuckage. This should help
  636    even on the rare occasion write() fails to write 1GB as well. */
  637 ssize_t write_1g(rzip_control *control, void *buf, i64 len)
  638 {
  639     uchar *offset_buf = buf;
  640     ssize_t ret;
  641     i64 total;
  642 
  643     total = 0;
  644     while (len > 0) {
  645         ret = MIN(len, one_g);
  646         ret = put_fdout(control, offset_buf, (size_t)ret);
  647         if (unlikely(ret <= 0))
  648             return ret;
  649         len -= ret;
  650         offset_buf += ret;
  651         total += ret;
  652     }
  653     return total;
  654 }
  655 
  656 /* Should be called only if we know the buffer will be large enough, otherwise
  657  * we must dump_stdin first */
  658 static bool read_fdin(struct rzip_control *control, i64 len)
  659 {
  660     int tmpchar;
  661     i64 i;
  662 
  663     for (i = 0; i < len; i++) {
  664         tmpchar = getchar();
  665         if (unlikely(tmpchar == EOF))
  666             failure_return(("Reached end of file on STDIN prematurely on read_fdin, asked for %lld got %lld\n",
  667                 len, i), false);
  668         control->tmp_inbuf[control->in_ofs + i] = (char)tmpchar;
  669     }
  670     control->in_len = control->in_ofs + len;
  671     return true;
  672 }
  673 
  674 /* Dump STDIN into a temporary file */
  675 static int dump_stdin(rzip_control *control)
  676 {
  677     if (unlikely(!write_fdin(control)))
  678         return -1;
  679     if (unlikely(!read_tmpinfile(control, control->fd_in)))
  680         return -1;
  681     close_tmpinbuf(control);
  682     return 0;
  683 }
  684 
  685 /* Ditto for read */
  686 ssize_t read_1g(rzip_control *control, int fd, void *buf, i64 len)
  687 {
  688     uchar *offset_buf = buf;
  689     ssize_t ret;
  690     i64 total;
  691 
  692     if (TMP_INBUF && fd == control->fd_in) {
  693         /* We're decompressing from STDIN */
  694         if (unlikely(control->in_ofs + len > control->in_maxlen)) {
  695             /* We're unable to fit it all into the temp buffer */
  696             if (dump_stdin(control))
  697                 failure_return(("Inadequate ram to %compress from STDIN and unable to create in tmpfile"), -1);
  698             goto read_fd;
  699         }
  700         if (control->in_ofs + len > control->in_len) {
  701             if (unlikely(!read_fdin(control, control->in_ofs + len - control->in_len)))
  702                 return false;
  703         }
  704         memcpy(buf, control->tmp_inbuf + control->in_ofs, len);
  705         control->in_ofs += len;
  706         return len;
  707     }
  708 
  709     if (TMP_OUTBUF && fd == control->fd_out) {
  710         if (unlikely(control->out_ofs + len > control->out_maxlen))
  711             failure_return(("Trying to read beyond out_ofs in tmpoutbuf\n"), -1);
  712         memcpy(buf, control->tmp_outbuf + control->out_ofs, len);
  713         control->out_ofs += len;
  714         return len;
  715     }
  716 
  717 read_fd:
  718     total = 0;
  719     while (len > 0) {
  720         ret = MIN(len, one_g);
  721         ret = read(fd, offset_buf, (size_t)ret);
  722         if (unlikely(ret <= 0))
  723             return ret;
  724         len -= ret;
  725         offset_buf += ret;
  726         total += ret;
  727     }
  728     return total;
  729 }
  730 
  731 /* write to a file, return 0 on success and -1 on failure */
  732 static int write_buf(rzip_control *control, uchar *p, i64 len)
  733 {
  734     ssize_t ret;
  735 
  736     ret = write_1g(control, p, (size_t)len);
  737     if (unlikely(ret == -1)) {
  738         print_err("Write of length %lld failed - %s\n", len, strerror(errno));
  739         return -1;
  740     }
  741     if (unlikely(ret != (ssize_t)len)) {
  742         print_err("Partial write!? asked for %lld bytes but got %lld\n", len, (i64)ret);
  743         return -1;
  744     }
  745     return 0;
  746 }
  747 
  748 /* write a byte */
  749 static inline int write_u8(rzip_control *control, uchar v)
  750 {
  751     return write_buf(control, &v, 1);
  752 }
  753 
  754 static inline int write_val(rzip_control *control, i64 v, int len)
  755 {
  756     v = htole64(v);
  757     return write_buf(control, (uchar *)&v, len);
  758 }
  759 
  760 static int read_buf(rzip_control *control, int f, uchar *p, i64 len)
  761 {
  762     ssize_t ret;
  763 
  764     ret = read_1g(control, f, p, (size_t)len);
  765     if (unlikely(ret == -1)) {
  766         print_err("Read of length %lld failed - %s\n", len, strerror(errno));
  767         return -1;
  768     }
  769     if (unlikely(ret != (ssize_t)len)) {
  770         print_err("Partial read!? asked for %lld bytes but got %lld\n", len, (i64)ret);
  771         return -1;
  772     }
  773     return 0;
  774 }
  775 
  776 static inline int read_u8(rzip_control *control, int f, uchar *v)
  777 {
  778     return read_buf(control, f, v, 1);
  779 }
  780 
  781 static inline int read_u32(rzip_control *control, int f, u32 *v)
  782 {
  783     int ret = read_buf(control, f, (uchar *)v, 4);
  784 
  785     *v = le32toh(*v);
  786     return ret;
  787 }
  788 
  789 static inline int read_val(rzip_control *control, int f, i64 *v, int len)
  790 {
  791     int ret;
  792 
  793     /* We only partially read all 8 bytes so have to zero v here */
  794     *v = 0;
  795     ret = read_buf(control, f, (uchar *)v, len);
  796     return ret;
  797 }
  798 
  799 static int fd_seekto(rzip_control *control, struct stream_info *sinfo, i64 spos, i64 pos)
  800 {
  801     if (unlikely(lseek(sinfo->fd, spos, SEEK_SET) != spos)) {
  802         print_err("Failed to seek to %lld in stream\n", pos);
  803         return -1;
  804     }
  805     return 0;
  806 }
  807 
  808 /* seek to a position within a set of streams - return -1 on failure */
  809 static int seekto(rzip_control *control, struct stream_info *sinfo, i64 pos)
  810 {
  811     i64 spos = pos + sinfo->initial_pos;
  812 
  813     if (TMP_OUTBUF) {
  814         spos -= control->out_relofs;
  815         control->out_ofs = spos;
  816         if (unlikely(spos > control->out_len || spos < 0)) {
  817             print_err("Trying to seek to %lld outside tmp outbuf in seekto\n", spos);
  818             return -1;
  819         }
  820         return 0;
  821     }
  822 
  823     return fd_seekto(control, sinfo, spos, pos);
  824 }
  825 
  826 static int read_seekto(rzip_control *control, struct stream_info *sinfo, i64 pos)
  827 {
  828     i64 spos = pos + sinfo->initial_pos;
  829 
  830     if (TMP_INBUF) {
  831         if (spos > control->in_len) {
  832             i64 len = spos - control->in_len;
  833 
  834             if (control->in_ofs + len > control->in_maxlen) {
  835                 if (unlikely(dump_stdin(control)))
  836                     return -1;
  837                 goto fd_seek;
  838             } else {
  839                 if (unlikely(!read_fdin(control, len)))
  840                     return -1;
  841             }
  842         }
  843         control->in_ofs = spos;
  844         if (unlikely(spos < 0)) {
  845             print_err("Trying to seek to %lld outside tmp inbuf in read_seekto\n", spos);
  846             return -1;
  847         }
  848         return 0;
  849     }
  850 fd_seek:
  851     return fd_seekto(control, sinfo, spos, pos);
  852 }
  853 
  854 static i64 get_seek(rzip_control *control, int fd)
  855 {
  856     i64 ret;
  857 
  858     if (TMP_OUTBUF)
  859         return control->out_relofs + control->out_ofs;
  860     ret = lseek(fd, 0, SEEK_CUR);
  861     if (unlikely(ret == -1))
  862         fatal_return(("Failed to lseek in get_seek\n"), -1);
  863     return ret;
  864 }
  865 
  866 i64 get_readseek(rzip_control *control, int fd)
  867 {
  868     i64 ret;
  869 
  870     if (TMP_INBUF)
  871         return control->in_ofs;
  872     ret = lseek(fd, 0, SEEK_CUR);
  873     if (unlikely(ret == -1))
  874         fatal_return(("Failed to lseek in get_seek\n"), -1);
  875     return ret;
  876 }
  877 
  878 bool prepare_streamout_threads(rzip_control *control)
  879 {
  880     pthread_t *threads;
  881     int i;
  882 
  883     /* As we serialise the generation of threads during the rzip
  884      * pre-processing stage, it's faster to have one more thread available
  885      * to keep all CPUs busy. There is no point splitting up the chunks
  886      * into multiple threads if there will be no compression back end. */
  887     if (control->threads > 1)
  888         ++control->threads;
  889     if (NO_COMPRESS)
  890         control->threads = 1;
  891     threads = control->pthreads = calloc(sizeof(pthread_t), control->threads);
  892     if (unlikely(!threads))
  893         fatal_return(("Unable to calloc threads in prepare_streamout_threads\n"), false);
  894 
  895     cthreads = calloc(sizeof(struct compress_thread), control->threads);
  896     if (unlikely(!cthreads)) {
  897         dealloc(threads);
  898         fatal_return(("Unable to calloc cthreads in prepare_streamout_threads\n"), false);
  899     }
  900 
  901     for (i = 0; i < control->threads; i++) {
  902         cksem_init(control, &cthreads[i].cksem);
  903         cksem_post(control, &cthreads[i].cksem);
  904     }
  905     return true;
  906 }
  907 
  908 
  909 bool close_streamout_threads(rzip_control *control)
  910 {
  911     int i, close_thread = output_thread;
  912 
  913     /* Wait for the threads in the correct order in case they end up
  914      * serialised */
  915     for (i = 0; i < control->threads; i++) {
  916         cksem_wait(control, &cthreads[close_thread].cksem);
  917 
  918         if (++close_thread == control->threads)
  919             close_thread = 0;
  920     }
  921     dealloc(cthreads);
  922     dealloc(control->pthreads);
  923     return true;
  924 }
  925 
  926 /* open a set of output streams, compressing with the given
  927    compression level and algorithm */
  928 void *open_stream_out(rzip_control *control, int f, unsigned int n, i64 chunk_limit, char cbytes)
  929 {
  930     struct stream_info *sinfo;
  931     i64 testsize, limit;
  932     uchar *testmalloc;
  933     unsigned int i, testbufs;
  934 
  935     sinfo = calloc(sizeof(struct stream_info), 1);
  936     if (unlikely(!sinfo))
  937         return NULL;
  938     if (chunk_limit < control->page_size)
  939         chunk_limit = control->page_size;
  940     sinfo->bufsize = sinfo->size = limit = chunk_limit;
  941 
  942     sinfo->chunk_bytes = cbytes;
  943     sinfo->num_streams = n;
  944     sinfo->fd = f;
  945 
  946     sinfo->s = calloc(sizeof(struct stream), n);
  947     if (unlikely(!sinfo->s)) {
  948         dealloc(sinfo);
  949         return NULL;
  950     }
  951 
  952     /* Find the largest we can make the window based on ability to malloc
  953      * ram. We need 2 buffers for each compression thread and the overhead
  954      * of each compression back end. No 2nd buf is required when there is
  955      * no back end compression. We limit the total regardless to 1/3 ram
  956      * for when the OS lies due to heavy overcommit. */
  957     if (NO_COMPRESS)
  958         testbufs = 1;
  959     else
  960         testbufs = 2;
  961 
  962     testsize = (limit * testbufs) + (control->overhead * control->threads);
  963     if (testsize > control->usable_ram)
  964         limit = (control->usable_ram - (control->overhead * control->threads)) / testbufs;
  965 
  966     /* If we don't have enough ram for the number of threads, decrease the
  967      * number of threads till we do, or only have one thread. */
  968     while (limit < STREAM_BUFSIZE && limit < chunk_limit) {
  969         if (control->threads > 1)
  970             --control->threads;
  971         else
  972             break;
  973         limit = (control->usable_ram - (control->overhead * control->threads)) / testbufs;
  974         limit = MIN(limit, chunk_limit);
  975     }
  976     if (BITS32) {
  977         limit = MIN(limit, one_g);
  978         if (limit + (control->overhead * control->threads) > one_g)
  979             limit = one_g - (control->overhead * control->threads);
  980     }
  981     /* Use a nominal minimum size should we fail all previous shrinking */
  982     limit = MAX(limit, STREAM_BUFSIZE);
  983     limit = MIN(limit, chunk_limit);
  984 retest_malloc:
  985     testsize = limit + (control->overhead * control->threads);
  986     testmalloc = malloc(testsize);
  987     if (!testmalloc) {
  988         limit = limit / 10 * 9;
  989         if (limit < 100000000) {
  990             /* If we can't even allocate 100MB then we'll never
  991              * succeed */
  992             print_err("Unable to allocate enough memory for operation\n");
  993             dealloc(sinfo->s);
  994             dealloc(sinfo);
  995             return NULL;
  996         }
  997         goto retest_malloc;
  998     }
  999     if (!NO_COMPRESS) {
 1000         char *testmalloc2 = malloc(limit);
 1001 
 1002         if (!testmalloc2) {
 1003             dealloc(testmalloc);
 1004             limit = limit / 10 * 9;
 1005             goto retest_malloc;
 1006         }
 1007         dealloc(testmalloc2);
 1008     }
 1009     dealloc(testmalloc);
 1010     print_maxverbose("Succeeded in testing %lld sized malloc for back end compression\n", testsize);
 1011 
 1012     /* Make the bufsize no smaller than STREAM_BUFSIZE. Round up the
 1013      * bufsize to fit X threads into it */
 1014     sinfo->bufsize = MIN(limit, MAX((limit + control->threads - 1) / control->threads,
 1015                     STREAM_BUFSIZE));
 1016 
 1017     if (control->threads > 1)
 1018         print_maxverbose("Using up to %d threads to compress up to %lld bytes each.\n",
 1019             control->threads, sinfo->bufsize);
 1020     else
 1021         print_maxverbose("Using only 1 thread to compress up to %lld bytes\n",
 1022             sinfo->bufsize);
 1023 
 1024     for (i = 0; i < n; i++) {
 1025         sinfo->s[i].buf = calloc(sinfo->bufsize , 1);
 1026         if (unlikely(!sinfo->s[i].buf)) {
 1027             fatal("Unable to malloc buffer of size %lld in open_stream_out\n", sinfo->bufsize);
 1028             dealloc(sinfo->s);
 1029             dealloc(sinfo);
 1030             return NULL;
 1031         }
 1032     }
 1033 
 1034     return (void *)sinfo;
 1035 }
 1036 
 1037 /* The block headers are all encrypted so we read the data and salt associated
 1038  * with them, decrypt the data, then return the decrypted version of the
 1039  * values */
 1040 static bool decrypt_header(rzip_control *control, uchar *head, uchar *c_type,
 1041                i64 *c_len, i64 *u_len, i64 *last_head)
 1042 {
 1043     uchar *buf = head + SALT_LEN;
 1044 
 1045     memcpy(buf, c_type, 1);
 1046     memcpy(buf + 1, c_len, 8);
 1047     memcpy(buf + 9, u_len, 8);
 1048     memcpy(buf + 17, last_head, 8);
 1049 
 1050     if (unlikely(!lrz_decrypt(control, buf, 25, head)))
 1051         return false;
 1052 
 1053     memcpy(c_type, buf, 1);
 1054     memcpy(c_len, buf + 1, 8);
 1055     memcpy(u_len, buf + 9, 8);
 1056     memcpy(last_head, buf + 17, 8);
 1057     return true;
 1058 }
 1059 
 1060 /* prepare a set of n streams for reading on file descriptor f */
 1061 void *open_stream_in(rzip_control *control, int f, int n, char chunk_bytes)
 1062 {
 1063     struct uncomp_thread *ucthreads;
 1064     struct stream_info *sinfo;
 1065     int total_threads, i;
 1066     pthread_t *threads;
 1067     i64 header_length;
 1068 
 1069     sinfo = calloc(sizeof(struct stream_info), 1);
 1070     if (unlikely(!sinfo))
 1071         return NULL;
 1072 
 1073     /* We have one thread dedicated to stream 0, and one more thread than
 1074      * CPUs to keep them busy, unless we're running single-threaded. */
 1075     if (control->threads > 1)
 1076         total_threads = control->threads + 2;
 1077     else
 1078         total_threads = control->threads + 1;
 1079     threads = control->pthreads = calloc(sizeof(pthread_t), total_threads);
 1080     if (unlikely(!threads))
 1081         return NULL;
 1082 
 1083     sinfo->ucthreads = ucthreads = calloc(sizeof(struct uncomp_thread), total_threads);
 1084     if (unlikely(!ucthreads)) {
 1085         dealloc(sinfo);
 1086         dealloc(threads);
 1087         fatal_return(("Unable to calloc ucthreads in open_stream_in\n"), NULL);
 1088     }
 1089 
 1090     sinfo->num_streams = n;
 1091     sinfo->fd = f;
 1092     sinfo->chunk_bytes = chunk_bytes;
 1093 
 1094     sinfo->s = calloc(sizeof(struct stream), n);
 1095     if (unlikely(!sinfo->s)) {
 1096         dealloc(sinfo);
 1097         dealloc(threads);
 1098         dealloc(ucthreads);
 1099         return NULL;
 1100     }
 1101 
 1102     sinfo->s[0].total_threads = 1;
 1103     sinfo->s[1].total_threads = total_threads - 1;
 1104 
 1105     if (control->major_version == 0 && control->minor_version > 5) {
 1106         /* Read in flag that tells us if there are more chunks after
 1107          * this. Ignored if we know the final file size */
 1108         print_maxverbose("Reading eof flag at %lld\n", get_readseek(control, f));
 1109         if (unlikely(read_u8(control, f, &control->eof))) {
 1110             print_err("Failed to read eof flag in open_stream_in\n");
 1111             goto failed;
 1112         }
 1113         print_maxverbose("EOF: %d\n", control->eof);
 1114 
 1115         /* Read in the expected chunk size */
 1116         if (!ENCRYPT) {
 1117             print_maxverbose("Reading expected chunksize at %lld\n", get_readseek(control, f));
 1118             if (unlikely(read_val(control, f, &sinfo->size, sinfo->chunk_bytes))) {
 1119                 print_err("Failed to read in chunk size in open_stream_in\n");
 1120                 goto failed;
 1121             }
 1122             sinfo->size = le64toh(sinfo->size);
 1123             print_maxverbose("Chunk size: %lld\n", sinfo->size);
 1124             control->st_size += sinfo->size;
 1125             if (unlikely(sinfo->chunk_bytes < 1 || sinfo->chunk_bytes > 8 || sinfo->size < 0)) {
 1126                 print_err("Invalid chunk data size %d bytes %lld\n", sinfo->size, sinfo->chunk_bytes);
 1127                 goto failed;
 1128             }
 1129         }
 1130     }
 1131     sinfo->initial_pos = get_readseek(control, f);
 1132     if (unlikely(sinfo->initial_pos == -1))
 1133         goto failed;
 1134 
 1135     for (i = 0; i < n; i++) {
 1136         uchar c, enc_head[25 + SALT_LEN];
 1137         i64 v1, v2;
 1138 
 1139         sinfo->s[i].base_thread = i;
 1140         sinfo->s[i].uthread_no = sinfo->s[i].base_thread;
 1141         sinfo->s[i].unext_thread = sinfo->s[i].base_thread;
 1142 
 1143         if (unlikely(ENCRYPT && read_buf(control, f, enc_head, SALT_LEN)))
 1144             goto failed;
 1145 again:
 1146         if (unlikely(read_u8(control, f, &c)))
 1147             goto failed;
 1148 
 1149         /* Compatibility crap for versions < 0.40 */
 1150         if (control->major_version == 0 && control->minor_version < 4) {
 1151             u32 v132, v232, last_head32;
 1152 
 1153             if (unlikely(read_u32(control, f, &v132)))
 1154                 goto failed;
 1155             if (unlikely(read_u32(control, f, &v232)))
 1156                 goto failed;
 1157             if (unlikely(read_u32(control, f, &last_head32)))
 1158                 goto failed;
 1159 
 1160             v1 = v132;
 1161             v2 = v232;
 1162             sinfo->s[i].last_head = last_head32;
 1163             header_length = 13;
 1164         } else {
 1165             int read_len;
 1166 
 1167             print_maxverbose("Reading stream %d header at %lld\n", i, get_readseek(control, f));
 1168             if ((control->major_version == 0 && control->minor_version < 6) ||
 1169                 ENCRYPT)
 1170                     read_len = 8;
 1171             else
 1172                 read_len = sinfo->chunk_bytes;
 1173             if (unlikely(read_val(control, f, &v1, read_len)))
 1174                 goto failed;
 1175             if (unlikely(read_val(control, f, &v2, read_len)))
 1176                 goto failed;
 1177             if (unlikely(read_val(control, f, &sinfo->s[i].last_head, read_len)))
 1178                 goto failed;
 1179             header_length = 1 + (read_len * 3);
 1180         }
 1181         sinfo->total_read += header_length;
 1182 
 1183         if (ENCRYPT) {
 1184             if (unlikely(!decrypt_header(control, enc_head, &c, &v1, &v2, &sinfo->s[i].last_head)))
 1185                 goto failed;
 1186             sinfo->total_read += SALT_LEN;
 1187         }
 1188 
 1189         v1 = le64toh(v1);
 1190         v2 = le64toh(v2);
 1191         sinfo->s[i].last_head = le64toh(sinfo->s[i].last_head);
 1192 
 1193         if (unlikely(c == CTYPE_NONE && v1 == 0 && v2 == 0 && sinfo->s[i].last_head == 0 && i == 0)) {
 1194             print_err("Enabling stream close workaround\n");
 1195             sinfo->initial_pos += header_length;
 1196             goto again;
 1197         }
 1198 
 1199         if (unlikely(c != CTYPE_NONE)) {
 1200             print_err("Unexpected initial tag %d in streams\n", c);
 1201             if (ENCRYPT)
 1202                 print_err("Wrong password?\n");
 1203             goto failed;
 1204         }
 1205         if (unlikely(v1)) {
 1206             print_err("Unexpected initial c_len %lld in streams %lld\n", v1, v2);
 1207             goto failed;
 1208         }
 1209         if (unlikely(v2)) {
 1210             print_err("Unexpected initial u_len %lld in streams\n", v2);
 1211             goto failed;
 1212         }
 1213     }
 1214 
 1215     return (void *)sinfo;
 1216 
 1217 failed:
 1218     dealloc(sinfo->s);
 1219     dealloc(sinfo);
 1220     dealloc(threads);
 1221     dealloc(ucthreads);
 1222     return NULL;
 1223 }
 1224 
 1225 #define MIN_SIZE (ENCRYPT ? CBC_LEN : 0)
 1226 
 1227 /* Once the final data has all been written to the block header, we go back
 1228  * and write SALT_LEN bytes of salt before it, and encrypt the header in place
 1229  * by reading what has been written, encrypting it, and writing back over it.
 1230  * This is very convoluted depending on whether a last_head value is written
 1231  * to this block or not. See the callers of this function */
 1232 static bool rewrite_encrypted(rzip_control *control, struct stream_info *sinfo, i64 ofs)
 1233 {
 1234     uchar *buf, *head;
 1235     i64 cur_ofs;
 1236 
 1237     cur_ofs = get_seek(control, sinfo->fd) - sinfo->initial_pos;
 1238     if (unlikely(cur_ofs == -1))
 1239         return false;
 1240     head = malloc(25 + SALT_LEN);
 1241     if (unlikely(!head))
 1242         fatal_return(("Failed to malloc head in rewrite_encrypted\n"), false);
 1243     buf = head + SALT_LEN;
 1244     if (unlikely(!get_rand(control, head, SALT_LEN)))
 1245         goto error;
 1246     if (unlikely(seekto(control, sinfo, ofs - SALT_LEN)))
 1247         failure_goto(("Failed to seekto buf ofs in rewrite_encrypted\n"), error);
 1248     if (unlikely(write_buf(control, head, SALT_LEN)))
 1249         failure_goto(("Failed to write_buf head in rewrite_encrypted\n"), error);
 1250     if (unlikely(read_buf(control, sinfo->fd, buf, 25)))
 1251         failure_goto(("Failed to read_buf buf in rewrite_encrypted\n"), error);
 1252 
 1253     if (unlikely(!lrz_encrypt(control, buf, 25, head)))
 1254         goto error;
 1255 
 1256     if (unlikely(seekto(control, sinfo, ofs)))
 1257         failure_goto(("Failed to seek back to ofs in rewrite_encrypted\n"), error);
 1258     if (unlikely(write_buf(control, buf, 25)))
 1259         failure_goto(("Failed to write_buf encrypted buf in rewrite_encrypted\n"), error);
 1260     dealloc(head);
 1261     seekto(control, sinfo, cur_ofs);
 1262     return true;
 1263 error:
 1264     dealloc(head);
 1265     return false;
 1266 }
 1267 
 1268 /* Enter with s_buf allocated,s_buf points to the compressed data after the
 1269  * backend compression and is then freed here */
 1270 static void *compthread(void *data)
 1271 {
 1272     stream_thread_struct *s = data;
 1273     rzip_control *control = s->control;
 1274     long i = s->i;
 1275     struct compress_thread *cti;
 1276     struct stream_info *ctis;
 1277     int waited = 0, ret = 0;
 1278     i64 padded_len;
 1279     int write_len;
 1280 
 1281     /* Make sure this thread doesn't already exist */
 1282 
 1283     dealloc(data);
 1284     cti = &cthreads[i];
 1285     ctis = cti->sinfo;
 1286 
 1287     if (unlikely(setpriority(PRIO_PROCESS, 0, control->nice_val) == -1)) {
 1288         print_err("Warning, unable to set thread nice value %d...Resetting to %d\n", control->nice_val, control->current_priority);
 1289         setpriority(PRIO_PROCESS, 0, (control->nice_val=control->current_priority));
 1290     }
 1291     cti->c_type = CTYPE_NONE;
 1292     cti->c_len = cti->s_len;
 1293 
 1294     /* Flushing writes to disk frees up any dirty ram, improving chances
 1295      * of succeeding in allocating more ram */
 1296     fsync(ctis->fd);
 1297 
 1298     /* This is a cludge in case we are compressing to stdout and our first
 1299      * stream is not compressed, but subsequent ones are compressed by
 1300      * lzma and we can no longer seek back to the beginning of the file
 1301      * to write the lzma properties which are effectively always starting
 1302      * with 93. */
 1303     if (TMP_OUTBUF && LZMA_COMPRESS)
 1304         control->lzma_properties[0] = 93;
 1305 retry:
 1306     /* Very small buffers have issues to do with minimum amounts of ram
 1307      * allocatable to a buffer combined with the MINIMUM_MATCH of rzip
 1308      * being 31 bytes so don't bother trying to compress anything less
 1309      * than 64 bytes. */
 1310     if (!NO_COMPRESS && cti->c_len >= 64) {
 1311         if (LZMA_COMPRESS)
 1312             ret = lzma_compress_buf(control, cti);
 1313         else if (LZO_COMPRESS)
 1314             ret = lzo_compress_buf(control, cti);
 1315         else if (BZIP2_COMPRESS)
 1316             ret = bzip2_compress_buf(control, cti);
 1317         else if (ZLIB_COMPRESS)
 1318             ret = gzip_compress_buf(control, cti);
 1319         else if (ZPAQ_COMPRESS)
 1320             ret = zpaq_compress_buf(control, cti, i);
 1321         else failure_goto(("Dunno wtf compression to use!\n"), error);
 1322     }
 1323 
 1324     padded_len = cti->c_len;
 1325     if (!ret && padded_len < MIN_SIZE) {
 1326         /* We need to pad out each block to at least be CBC_LEN bytes
 1327          * long or encryption cannot work. We pad it with random
 1328          * data */
 1329         padded_len = MIN_SIZE;
 1330         cti->s_buf = realloc(cti->s_buf, MIN_SIZE);
 1331         if (unlikely(!cti->s_buf))
 1332             fatal_goto(("Failed to realloc s_buf in compthread\n"), error);
 1333         if (unlikely(!get_rand(control, cti->s_buf + cti->c_len, MIN_SIZE - cti->c_len)))
 1334             goto error;
 1335     }
 1336 
 1337     /* If compression fails for whatever reason multithreaded, then wait
 1338      * for the previous thread to finish, serialising the work to decrease
 1339      * the memory requirements, increasing the chance of success */
 1340     if (unlikely(ret && waited))
 1341         failure_goto(("Failed to compress in compthread\n"), error);
 1342 
 1343     if (!waited) {
 1344         lock_mutex(control, &output_lock);
 1345         while (output_thread != i)
 1346             cond_wait(control, &output_cond, &output_lock);
 1347         unlock_mutex(control, &output_lock);
 1348         waited = 1;
 1349     }
 1350     if (unlikely(ret)) {
 1351         print_maxverbose("Unable to compress in parallel, waiting for previous thread to complete before trying again\n");
 1352         goto retry;
 1353     }
 1354 
 1355     /* Need to be big enough to fill one CBC_LEN */
 1356     if (ENCRYPT)
 1357         write_len = 8;
 1358     else
 1359         write_len = ctis->chunk_bytes;
 1360 
 1361     if (!ctis->chunks++) {
 1362         int j;
 1363 
 1364         if (TMP_OUTBUF) {
 1365             lock_mutex(control, &control->control_lock);
 1366             if (!control->magic_written)
 1367                 write_magic(control);
 1368             unlock_mutex(control, &control->control_lock);
 1369 
 1370             if (unlikely(!flush_tmpoutbuf(control))) {
 1371                 print_err("Failed to flush_tmpoutbuf in compthread\n");
 1372                 goto error;
 1373             }
 1374         }
 1375 
 1376         print_maxverbose("Writing initial chunk bytes value %d at %lld\n",
 1377                  ctis->chunk_bytes, get_seek(control, ctis->fd));
 1378         /* Write chunk bytes of this block */
 1379         write_u8(control, ctis->chunk_bytes);
 1380 
 1381         /* Write whether this is the last chunk, followed by the size
 1382          * of this chunk */
 1383         print_maxverbose("Writing EOF flag as %d\n", control->eof);
 1384         write_u8(control, control->eof);
 1385         if (!ENCRYPT)
 1386             write_val(control, ctis->size, ctis->chunk_bytes);
 1387 
 1388         /* First chunk of this stream, write headers */
 1389         ctis->initial_pos = get_seek(control, ctis->fd);
 1390         if (unlikely(ctis->initial_pos == -1))
 1391             goto error;
 1392 
 1393         print_maxverbose("Writing initial header at %lld\n", ctis->initial_pos);
 1394         for (j = 0; j < ctis->num_streams; j++) {
 1395             /* If encrypting, we leave SALT_LEN room to write in salt
 1396             * later */
 1397             if (ENCRYPT) {
 1398                 if (unlikely(write_val(control, 0, SALT_LEN)))
 1399                     fatal_goto(("Failed to write_buf blank salt in compthread %d\n", i), error);
 1400                 ctis->cur_pos += SALT_LEN;
 1401             }
 1402             ctis->s[j].last_head = ctis->cur_pos + 1 + (write_len * 2);
 1403             write_u8(control, CTYPE_NONE);
 1404             write_val(control, 0, write_len);
 1405             write_val(control, 0, write_len);
 1406             write_val(control, 0, write_len);
 1407             ctis->cur_pos += 1 + (write_len * 3);
 1408         }
 1409     }
 1410 
 1411     print_maxverbose("Compthread %ld seeking to %lld to store length %d\n", i, ctis->s[cti->streamno].last_head, write_len);
 1412 
 1413     if (unlikely(seekto(control, ctis, ctis->s[cti->streamno].last_head)))
 1414         fatal_goto(("Failed to seekto in compthread %d\n", i), error);
 1415 
 1416     if (unlikely(write_val(control, ctis->cur_pos, write_len)))
 1417         fatal_goto(("Failed to write_val cur_pos in compthread %d\n", i), error);
 1418 
 1419     if (ENCRYPT)
 1420         rewrite_encrypted(control, ctis, ctis->s[cti->streamno].last_head - 17);
 1421 
 1422     ctis->s[cti->streamno].last_head = ctis->cur_pos + 1 + (write_len * 2) + (ENCRYPT ? SALT_LEN : 0);
 1423 
 1424     print_maxverbose("Compthread %ld seeking to %lld to write header\n", i, ctis->cur_pos);
 1425 
 1426     if (unlikely(seekto(control, ctis, ctis->cur_pos)))
 1427         fatal_goto(("Failed to seekto cur_pos in compthread %d\n", i), error);
 1428 
 1429     print_maxverbose("Thread %ld writing %lld compressed bytes from stream %d\n", i, padded_len, cti->streamno);
 1430 
 1431     if (ENCRYPT) {
 1432         if (unlikely(write_val(control, 0, SALT_LEN)))
 1433             fatal_goto(("Failed to write_buf header salt in compthread %d\n", i), error);
 1434         ctis->cur_pos += SALT_LEN;
 1435         ctis->s[cti->streamno].last_headofs = ctis->cur_pos;
 1436     }
 1437     /* We store the actual c_len even though we might pad it out */
 1438     if (unlikely(write_u8(control, cti->c_type) ||
 1439         write_val(control, cti->c_len, write_len) ||
 1440         write_val(control, cti->s_len, write_len) ||
 1441         write_val(control, 0, write_len))) {
 1442             fatal_goto(("Failed write in compthread %d\n", i), error);
 1443     }
 1444     ctis->cur_pos += 1 + (write_len * 3);
 1445 
 1446     if (ENCRYPT) {
 1447         if (unlikely(!get_rand(control, cti->salt, SALT_LEN)))
 1448             goto error;
 1449         if (unlikely(write_buf(control, cti->salt, SALT_LEN)))
 1450             fatal_goto(("Failed to write_buf block salt in compthread %d\n", i), error);
 1451         if (unlikely(!lrz_encrypt(control, cti->s_buf, padded_len, cti->salt)))
 1452             goto error;
 1453         ctis->cur_pos += SALT_LEN;
 1454     }
 1455 
 1456     print_maxverbose("Compthread %ld writing data at %lld\n", i, ctis->cur_pos);
 1457 
 1458     if (unlikely(write_buf(control, cti->s_buf, padded_len)))
 1459         fatal_goto(("Failed to write_buf s_buf in compthread %d\n", i), error);
 1460 
 1461     ctis->cur_pos += padded_len;
 1462     dealloc(cti->s_buf);
 1463 
 1464     lock_mutex(control, &output_lock);
 1465     if (++output_thread == control->threads)
 1466         output_thread = 0;
 1467     cond_broadcast(control, &output_cond);
 1468     unlock_mutex(control, &output_lock);
 1469 
 1470 error:
 1471     cksem_post(control, &cti->cksem);
 1472 
 1473     return NULL;
 1474 }
 1475 
 1476 static void clear_buffer(rzip_control *control, struct stream_info *sinfo, int streamno, int newbuf)
 1477 {
 1478     pthread_t *threads = control->pthreads;
 1479     stream_thread_struct *s;
 1480     static int i = 0;
 1481 
 1482     /* Make sure this thread doesn't already exist */
 1483     cksem_wait(control, &cthreads[i].cksem);
 1484 
 1485     cthreads[i].sinfo = sinfo;
 1486     cthreads[i].streamno = streamno;
 1487     cthreads[i].s_buf = sinfo->s[streamno].buf;
 1488     cthreads[i].s_len = sinfo->s[streamno].buflen;
 1489 
 1490     print_maxverbose("Starting thread %ld to compress %lld bytes from stream %d\n",
 1491              i, cthreads[i].s_len, streamno);
 1492 
 1493     s = malloc(sizeof(stream_thread_struct));
 1494     if (unlikely(!s)) {
 1495         cksem_post(control, &cthreads[i].cksem);
 1496         failure("Unable to malloc in clear_buffer");
 1497     }
 1498     s->i = i;
 1499     s->control = control;
 1500     if (unlikely((!create_pthread(control, &threads[i], NULL, compthread, s)) ||
 1501                  (!detach_pthread(control, &threads[i]))))
 1502         failure("Unable to create compthread in clear_buffer");
 1503 
 1504     if (newbuf) {
 1505         /* The stream buffer has been given to the thread, allocate a
 1506          * new one. */
 1507         sinfo->s[streamno].buf = malloc(sinfo->bufsize);
 1508         if (unlikely(!sinfo->s[streamno].buf))
 1509             failure("Unable to malloc buffer of size %lld in flush_buffer\n", sinfo->bufsize);
 1510         sinfo->s[streamno].buflen = 0;
 1511     }
 1512 
 1513     if (++i == control->threads)
 1514         i = 0;
 1515 }
 1516 
 1517 /* flush out any data in a stream buffer */
 1518 void flush_buffer(rzip_control *control, struct stream_info *sinfo, int streamno)
 1519 {
 1520     clear_buffer(control, sinfo, streamno, 1);
 1521 }
 1522 
 1523 static void *ucompthread(void *data)
 1524 {
 1525     stream_thread_struct *sts = data;
 1526     rzip_control *control = sts->control;
 1527     int waited = 0, ret = 0, i = sts->i;
 1528     struct uncomp_thread *uci = &sts->sinfo->ucthreads[i];
 1529 
 1530     dealloc(data);
 1531 
 1532     if (unlikely(setpriority(PRIO_PROCESS, 0, control->nice_val) == -1)) {
 1533         print_err("Warning, unable to set thread nice value %d...Resetting to %d\n", control->nice_val, control->current_priority);
 1534         setpriority(PRIO_PROCESS, 0, (control->nice_val=control->current_priority));
 1535     }
 1536 
 1537 retry:
 1538     if (uci->c_type != CTYPE_NONE) {
 1539         switch (uci->c_type) {
 1540             case CTYPE_LZMA:
 1541                 ret = lzma_decompress_buf(control, uci);
 1542                 break;
 1543             case CTYPE_LZO:
 1544                 ret = lzo_decompress_buf(control, uci);
 1545                 break;
 1546             case CTYPE_BZIP2:
 1547                 ret = bzip2_decompress_buf(control, uci);
 1548                 break;
 1549             case CTYPE_GZIP:
 1550                 ret = gzip_decompress_buf(control, uci);
 1551                 break;
 1552             case CTYPE_ZPAQ:
 1553                 ret = zpaq_decompress_buf(control, uci, i);
 1554                 break;
 1555             default:
 1556                 failure_return(("Dunno wtf decompression type to use!\n"), NULL);
 1557                 break;
 1558         }
 1559     }
 1560 
 1561     /* As per compression, serialise the decompression if it fails in
 1562      * parallel */
 1563     if (unlikely(ret)) {
 1564         if (unlikely(waited))
 1565             failure_return(("Failed to decompress in ucompthread\n"), (void*)1);
 1566         print_maxverbose("Unable to decompress in parallel, waiting for previous thread to complete before trying again\n");
 1567         /* We do not strictly need to wait for this, so it's used when
 1568          * decompression fails due to inadequate memory to try again
 1569          * serialised. */
 1570         lock_mutex(control, &output_lock);
 1571         while (output_thread != i)
 1572             cond_wait(control, &output_cond, &output_lock);
 1573         unlock_mutex(control, &output_lock);
 1574         waited = 1;
 1575         goto retry;
 1576     }
 1577 
 1578     print_maxverbose("Thread %ld decompressed %lld bytes from stream %d\n", i, uci->u_len, uci->streamno);
 1579 
 1580     return NULL;
 1581 }
 1582 
 1583 /* fill a buffer from a stream - return -1 on failure */
 1584 static int fill_buffer(rzip_control *control, struct stream_info *sinfo, struct stream *s, int streamno)
 1585 {
 1586     i64 u_len, c_len, last_head, padded_len, header_length, max_len;
 1587     uchar enc_head[25 + SALT_LEN], blocksalt[SALT_LEN];
 1588     struct uncomp_thread *ucthreads = sinfo->ucthreads;
 1589     pthread_t *threads = control->pthreads;
 1590     stream_thread_struct *sts;
 1591     uchar c_type, *s_buf;
 1592     void *thr_return;
 1593 
 1594     dealloc(s->buf);
 1595     if (s->eos)
 1596         goto out;
 1597 fill_another:
 1598     if (unlikely(ucthreads[s->uthread_no].busy))
 1599         failure_return(("Trying to start a busy thread, this shouldn't happen!\n"), -1);
 1600 
 1601     if (unlikely(read_seekto(control, sinfo, s->last_head)))
 1602         return -1;
 1603 
 1604     if (ENCRYPT) {
 1605         if (unlikely(read_buf(control, sinfo->fd, enc_head, SALT_LEN)))
 1606             return -1;
 1607         sinfo->total_read += SALT_LEN;
 1608     }
 1609 
 1610     if (unlikely(read_u8(control, sinfo->fd, &c_type)))
 1611         return -1;
 1612 
 1613     /* Compatibility crap for versions < 0.4 */
 1614     if (control->major_version == 0 && control->minor_version < 4) {
 1615         u32 c_len32, u_len32, last_head32;
 1616 
 1617         if (unlikely(read_u32(control, sinfo->fd, &c_len32)))
 1618             return -1;
 1619         if (unlikely(read_u32(control, sinfo->fd, &u_len32)))
 1620             return -1;
 1621         if (unlikely(read_u32(control, sinfo->fd, &last_head32)))
 1622             return -1;
 1623         c_len = c_len32;
 1624         u_len = u_len32;
 1625         last_head = last_head32;
 1626         header_length = 13;
 1627     } else {
 1628         int read_len;
 1629 
 1630         print_maxverbose("Reading ucomp header at %lld\n", get_readseek(control, sinfo->fd));
 1631         if ((control->major_version == 0 && control->minor_version < 6) || ENCRYPT)
 1632             read_len = 8;
 1633         else
 1634             read_len = sinfo->chunk_bytes;
 1635         if (unlikely(read_val(control, sinfo->fd, &c_len, read_len)))
 1636             return -1;
 1637         if (unlikely(read_val(control, sinfo->fd, &u_len, read_len)))
 1638             return -1;
 1639         if (unlikely(read_val(control, sinfo->fd, &last_head, read_len)))
 1640             return -1;
 1641         header_length = 1 + (read_len * 3);
 1642     }
 1643     sinfo->total_read += header_length;
 1644 
 1645     if (ENCRYPT) {
 1646         if (unlikely(!decrypt_header(control, enc_head, &c_type, &c_len, &u_len, &last_head)))
 1647             return -1;
 1648         if (unlikely(read_buf(control, sinfo->fd, blocksalt, SALT_LEN)))
 1649             return -1;
 1650         sinfo->total_read += SALT_LEN;
 1651     }
 1652     c_len = le64toh(c_len);
 1653     u_len = le64toh(u_len);
 1654     last_head = le64toh(last_head);
 1655     print_maxverbose("Fill_buffer stream %d c_len %lld u_len %lld last_head %lld\n", streamno, c_len, u_len, last_head);
 1656 
 1657     /* It is possible for there to be an empty match block at the end of
 1658      * incompressible data */
 1659     if (unlikely(c_len == 0 && u_len == 0 && streamno == 1 && last_head == 0)) {
 1660         print_maxverbose("Skipping empty match block\n");
 1661         goto skip_empty;
 1662     }
 1663 
 1664     /* Check for invalid data and that the last_head is actually moving
 1665      * forward correctly. */
 1666     if (unlikely(c_len < 1 || u_len < 1 || last_head < 0 || (last_head && last_head <= s->last_head))) {
 1667         fatal_return(("Invalid data compressed len %lld uncompressed %lld last_head %lld\n",
 1668                  c_len, u_len, last_head), -1);
 1669     }
 1670 
 1671     padded_len = MAX(c_len, MIN_SIZE);
 1672     sinfo->total_read += padded_len;
 1673     fsync(control->fd_out);
 1674 
 1675     if (unlikely(u_len > control->maxram))
 1676         print_progress("Warning, attempting to malloc very large buffer for this environment of size %lld\n", u_len);
 1677     max_len = MAX(u_len, MIN_SIZE);
 1678     max_len = MAX(max_len, c_len);
 1679     s_buf = malloc(max_len);
 1680     if (unlikely(!s_buf))
 1681         fatal_return(("Unable to malloc buffer of size %lld in fill_buffer\n", u_len), -1);
 1682     sinfo->ram_alloced += u_len;
 1683 
 1684     if (unlikely(read_buf(control, sinfo->fd, s_buf, padded_len))) {
 1685         dealloc(s_buf);
 1686         return -1;
 1687     }
 1688 
 1689     if (unlikely(ENCRYPT && !lrz_decrypt(control, s_buf, padded_len, blocksalt))) {
 1690         dealloc(s_buf);
 1691         return -1;
 1692     }
 1693 
 1694     ucthreads[s->uthread_no].s_buf = s_buf;
 1695     ucthreads[s->uthread_no].c_len = c_len;
 1696     ucthreads[s->uthread_no].u_len = u_len;
 1697     ucthreads[s->uthread_no].c_type = c_type;
 1698     ucthreads[s->uthread_no].streamno = streamno;
 1699     s->last_head = last_head;
 1700 
 1701     /* List this thread as busy */
 1702     ucthreads[s->uthread_no].busy = 1;
 1703     print_maxverbose("Starting thread %ld to decompress %lld bytes from stream %d\n",
 1704              s->uthread_no, padded_len, streamno);
 1705 
 1706     sts = malloc(sizeof(stream_thread_struct));
 1707     if (unlikely(!sts))
 1708         fatal_return(("Unable to malloc in fill_buffer"), -1);
 1709     sts->i = s->uthread_no;
 1710     sts->control = control;
 1711     sts->sinfo = sinfo;
 1712     if (unlikely(!create_pthread(control, &threads[s->uthread_no], NULL, ucompthread, sts))) {
 1713         dealloc(sts);
 1714         return -1;
 1715     }
 1716 
 1717     if (++s->uthread_no == s->base_thread + s->total_threads)
 1718         s->uthread_no = s->base_thread;
 1719 skip_empty:
 1720     /* Reached the end of this stream, no more data to read in, otherwise
 1721      * see if the next thread is free to grab more data. We also check that
 1722      * we're not going to be allocating too much ram to generate all these
 1723      * threads. */
 1724     if (!last_head)
 1725         s->eos = 1;
 1726     else if (s->uthread_no != s->unext_thread && !ucthreads[s->uthread_no].busy &&
 1727          sinfo->ram_alloced < control->maxram)
 1728             goto fill_another;
 1729 out:
 1730     lock_mutex(control, &output_lock);
 1731     output_thread = s->unext_thread;
 1732     cond_broadcast(control, &output_cond);
 1733     unlock_mutex(control, &output_lock);
 1734 
 1735     /* join_pthread here will make it wait till the data is ready */
 1736     thr_return = NULL;
 1737     if (unlikely(!join_pthread(control, threads[s->unext_thread], &thr_return) || !!thr_return))
 1738         return -1;
 1739     ucthreads[s->unext_thread].busy = 0;
 1740 
 1741     print_maxverbose("Taking decompressed data from thread %ld\n", s->unext_thread);
 1742     s->buf = ucthreads[s->unext_thread].s_buf;
 1743     ucthreads[s->unext_thread].s_buf = NULL;
 1744     s->buflen = ucthreads[s->unext_thread].u_len;
 1745     sinfo->ram_alloced -= s->buflen;
 1746     s->bufp = 0;
 1747 
 1748     if (++s->unext_thread == s->base_thread + s->total_threads)
 1749         s->unext_thread = s->base_thread;
 1750 
 1751     return 0;
 1752 }
 1753 
 1754 /* write some data to a stream. Return -1 on failure */
 1755 void write_stream(rzip_control *control, void *ss, int streamno, uchar *p, i64 len)
 1756 {
 1757     struct stream_info *sinfo = ss;
 1758 
 1759     while (len) {
 1760         i64 n;
 1761 
 1762         n = MIN(sinfo->bufsize - sinfo->s[streamno].buflen, len);
 1763 
 1764         memcpy(sinfo->s[streamno].buf + sinfo->s[streamno].buflen, p, n);
 1765         sinfo->s[streamno].buflen += n;
 1766         p += n;
 1767         len -= n;
 1768 
 1769         /* Flush the buffer every sinfo->bufsize into one thread */
 1770         if (sinfo->s[streamno].buflen == sinfo->bufsize)
 1771             flush_buffer(control, sinfo, streamno);
 1772     }
 1773 }
 1774 
 1775 /* read some data from a stream. Return number of bytes read, or -1
 1776    on failure */
 1777 i64 read_stream(rzip_control *control, void *ss, int streamno, uchar *p, i64 len)
 1778 {
 1779     struct stream_info *sinfo = ss;
 1780     struct stream *s = &sinfo->s[streamno];
 1781     i64 ret = 0;
 1782 
 1783     while (len) {
 1784         i64 n;
 1785 
 1786         n = MIN(s->buflen - s->bufp, len);
 1787 
 1788         if (n > 0) {
 1789             if (unlikely(!s->buf))
 1790                 failure_return(("Stream ran out prematurely, likely corrupt archive\n"), -1);
 1791             memcpy(p, s->buf + s->bufp, n);
 1792             s->bufp += n;
 1793             p += n;
 1794             len -= n;
 1795             ret += n;
 1796         }
 1797 
 1798         if (len && s->bufp == s->buflen) {
 1799             if (unlikely(fill_buffer(control, sinfo, s, streamno)))
 1800                 return -1;
 1801             if (s->bufp == s->buflen)
 1802                 break;
 1803         }
 1804     }
 1805 
 1806     return ret;
 1807 }
 1808 
 1809 /* flush and close down a stream. return -1 on failure */
 1810 int close_stream_out(rzip_control *control, void *ss)
 1811 {
 1812     struct stream_info *sinfo = ss;
 1813     int i;
 1814 
 1815     for (i = 0; i < sinfo->num_streams; i++)
 1816         clear_buffer(control, sinfo, i, 0);
 1817 
 1818     if (ENCRYPT) {
 1819         /* Last two compressed blocks do not have an offset written
 1820          * to them so we have to go back and encrypt them now, but we
 1821          * must wait till the threads return. */
 1822         int close_thread = output_thread;
 1823 
 1824         for (i = 0; i < control->threads; i++) {
 1825             cksem_wait(control, &cthreads[close_thread].cksem);
 1826             cksem_post(control, &cthreads[close_thread].cksem);
 1827             if (++close_thread == control->threads)
 1828                 close_thread = 0;
 1829         }
 1830         for (i = 0; i < sinfo->num_streams; i++)
 1831             rewrite_encrypted(control, sinfo, sinfo->s[i].last_headofs);
 1832     }
 1833 
 1834     /* Note that sinfo->s and sinfo are not released here but after compression
 1835      * has completed as they cannot be freed immediately because their values
 1836      * are read after the next stream has started.
 1837      */
 1838 
 1839     return 0;
 1840 }
 1841 
 1842 /* Add to an runzip list to safely deallocate memory after all threads have
 1843  * returned. */
 1844 static void add_to_rulist(rzip_control *control, struct stream_info *sinfo)
 1845 {
 1846     struct runzip_node *node = calloc(sizeof(struct runzip_node), 1);
 1847 
 1848     if (unlikely(!node))
 1849         failure("Failed to calloc struct node in add_rulist\n");
 1850     node->sinfo = sinfo;
 1851     node->pthreads = control->pthreads;
 1852     node->prev = control->rulist;
 1853     control->ruhead = node;
 1854 }
 1855 
 1856 /* close down an input stream */
 1857 int close_stream_in(rzip_control *control, void *ss)
 1858 {
 1859     struct stream_info *sinfo = ss;
 1860     int i;
 1861 
 1862     print_maxverbose("Closing stream at %lld, want to seek to %lld\n",
 1863              get_readseek(control, control->fd_in),
 1864              sinfo->initial_pos + sinfo->total_read);
 1865     if (unlikely(read_seekto(control, sinfo, sinfo->total_read)))
 1866         return -1;
 1867 
 1868     for (i = 0; i < sinfo->num_streams; i++)
 1869         dealloc(sinfo->s[i].buf);
 1870 
 1871     output_thread = 0;
 1872     /* We cannot safely release the sinfo and pthread data here till all
 1873      * threads are shut down. */
 1874     add_to_rulist(control, sinfo);
 1875 
 1876     return 0;
 1877 }
 1878 
 1879 /* As others are slow and lz4 very fast, it is worth doing a quick lz4 pass
 1880    to see if there is any compression at all with lz4 first. It is unlikely
 1881    that others will be able to compress if lz4 is unable to drop a single byte
 1882    so do not compress any block that is incompressible by lz4. */
 1883 static int lz4_compresses(rzip_control *control, uchar *s_buf, i64 s_len)
 1884 {
 1885     int dlen, test_len;
 1886     char *c_buf = NULL, *test_buf = (char *)s_buf;
 1887     int ret = 0;
 1888     int workcounter = 0;    /* count # of passes */
 1889     int best_dlen = INT_MAX; /* save best compression estimate */
 1890 
 1891     if (!LZ4_TEST)
 1892         return 1;
 1893 
 1894     dlen = MIN(s_len, STREAM_BUFSIZE);
 1895     test_len = MIN(dlen, STREAM_BUFSIZE >> 8);
 1896     c_buf = malloc(dlen);
 1897     if (unlikely(!c_buf))
 1898         fatal_return(("Unable to allocate c_buf in lz4_compresses\n"), 0);
 1899 
 1900     /* Test progressively larger blocks at a time and as soon as anything
 1901        compressible is found, jump out as a success */
 1902     do {
 1903         int lz4_ret;
 1904 
 1905         workcounter++;
 1906         lz4_ret = LZ4_compress_default((const char *)test_buf, c_buf, test_len, dlen);
 1907         if (!lz4_ret) // Bigger than dlen
 1908             lz4_ret = test_len;
 1909         if (lz4_ret < best_dlen)
 1910             best_dlen = lz4_ret;
 1911         if (lz4_ret < test_len) {
 1912             ret = 1;
 1913             break;
 1914         }
 1915         /* expand test length */
 1916         test_len <<= 1;
 1917     } while (test_len <= dlen);
 1918 
 1919     if (!ret)
 1920         print_maxverbose("lz4 testing FAILED for chunk %ld. %d Passes\n", workcounter);
 1921     else {
 1922         print_maxverbose("lz4 testing OK for chunk %ld. Compressed size = %5.2F%% of chunk, %d Passes\n",
 1923                 s_len, 100 * ((double) best_dlen / (double) test_len), workcounter);
 1924     }
 1925 
 1926     dealloc(c_buf);
 1927 
 1928     return ret;
 1929 }