"Fossies" - the Fresh Open Source Software Archive

Member "libextractor-1.11/src/main/extractor_ipc_w32.c" (30 Jan 2021, 23109 Bytes) of package /linux/privat/libextractor-1.11.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "extractor_ipc_w32.c" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 1.10_vs_1.11.

    1 /*
    2      This file is part of libextractor.
    3      Copyright (C) 2002, 2003, 2004, 2005, 2006, 2009, 2012 Vidyut Samanta and Christian Grothoff
    4 
    5      libextractor is free software; you can redistribute it and/or modify
    6      it under the terms of the GNU General Public License as published
    7      by the Free Software Foundation; either version 3, or (at your
    8      option) any later version.
    9 
   10      libextractor is distributed in the hope that it will be useful, but
   11      WITHOUT ANY WARRANTY; without even the implied warranty of
   12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
   13      General Public License for more details.
   14 
   15      You should have received a copy of the GNU General Public License
   16      along with libextractor; see the file COPYING.  If not, write to the
   17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
   18      Boston, MA 02110-1301, USA.
   19  */
   20 
   21 #include "platform.h"
   22 #include "plibc.h"
   23 #include "extractor.h"
   24 #include "extractor_datasource.h"
   25 #include "extractor_plugin_main.h"
   26 #include "extractor_plugins.h"
   27 #include "extractor_ipc.h"
   28 #include "extractor_logging.h"
   29 
   30 /**
   31  */
   32 struct EXTRACTOR_SharedMemory
   33 {
   34 
   35   /**
   36    * W32 handle of the shm into which data is uncompressed
   37    */
   38   HANDLE map;
   39 
   40   /**
   41    * Name of the shm
   42    */
   43   char shm_name[MAX_SHM_NAME + 1];
   44 
   45   /**
   46    * Pointer to the mapped region of the shm (covers the whole shm)
   47    */
   48   void *ptr;
   49 
   50   /**
   51    * Position within shm
   52    */
   53   int64_t pos;
   54 
   55   /**
   56    * Allocated size of the shm
   57    */
   58   int64_t shm_size;
   59 
   60   /**
   61    * Number of bytes in shm (<= shm_size)
   62    */
   63   size_t shm_buf_size;
   64 
   65   size_t shm_map_size;
   66 
   67   /**
   68    * Reference counter describing how many references share this SHM.
   69    */
   70   unsigned int rc;
   71 };
   72 
   73 
   74 /**
   75  * Definition of an IPC communication channel with
   76  * some plugin.
   77  */
   78 struct EXTRACTOR_Channel
   79 {
   80 
   81   /**
   82    * Process ID of the child process for this plugin. 0 for none.
   83    */
   84   HANDLE hProcess;
   85 
   86   /**
   87    * Pipe used to communicate information to the plugin child process.
   88    * NULL if not initialized.
   89    */
   90   HANDLE cpipe_in;
   91 
   92   /**
   93    * Handle of the shm object
   94    */
   95   HANDLE map_handle;
   96 
   97   /**
   98    * Pipe used to read information about extracted meta data from
   99    * the plugin child process.  -1 if not initialized.
  100    */
  101   HANDLE cpipe_out;
  102 
  103   /**
  104    * A structure for overlapped reads on W32.
  105    */
  106   OVERLAPPED ov_read;
  107 
  108   /**
  109    * A structure for overlapped writes on W32.
  110    */
  111   OVERLAPPED ov_write;
  112 
  113   /**
  114    * A write buffer for overlapped writes on W32
  115    */
  116   unsigned char *ov_write_buffer;
  117 
  118   /**
  119    * The plugin this channel is to communicate with.
  120    */
  121   struct EXTRACTOR_PluginList *plugin;
  122 
  123   /**
  124    * Memory segment shared with this process.
  125    */
  126   struct EXTRACTOR_SharedMemory *shm;
  127 
  128   void *old_buf;
  129 
  130   /**
  131    * Buffer for reading data from the plugin.
  132    */
  133   char *mdata;
  134 
  135   /**
  136    * Size of the 'mdata' buffer.
  137    */
  138   size_t mdata_size;
  139 
  140   /**
  141    * Number of valid bytes in the channel's buffer.
  142    */
  143   size_t size;
  144 };
  145 
  146 
  147 /**
  148  * Create a shared memory area.
  149  *
  150  * @param size size of the shared area
  151  * @return NULL on error
  152  */
  153 struct EXTRACTOR_SharedMemory *
  154 EXTRACTOR_IPC_shared_memory_create_ (size_t size)
  155 {
  156   struct EXTRACTOR_SharedMemory *shm;
  157   const char *tpath = "Local\\";
  158 
  159   if (NULL == (shm = malloc (sizeof (struct EXTRACTOR_SharedMemory))))
  160     return NULL;
  161 
  162   snprintf (shm->shm_name, MAX_SHM_NAME,
  163             "%slibextractor-shm-%u-%u",
  164             tpath, getpid (),
  165             (unsigned int) random ());
  166   shm->map = CreateFileMapping (INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0,
  167                                 size, shm->shm_name);
  168   shm->ptr = MapViewOfFile (shm->map, FILE_MAP_WRITE, 0, 0, size);
  169   if (shm->ptr == NULL)
  170   {
  171     CloseHandle (shm->map);
  172     free (shm);
  173     return NULL;
  174   }
  175   shm->shm_size = size;
  176   shm->rc = 0;
  177   return shm;
  178 }
  179 
  180 
  181 /**
  182  * Change the reference counter for this shm instance.
  183  *
  184  * @param shm instance to update
  185  * @param delta value to change RC by
  186  * @return new RC
  187  */
  188 unsigned int
  189 EXTRACTOR_IPC_shared_memory_change_rc_ (struct EXTRACTOR_SharedMemory *shm,
  190                                         int delta)
  191 {
  192   shm->rc += delta;
  193   return shm->rc;
  194 }
  195 
  196 
  197 /**
  198  * Destroy shared memory area.
  199  *
  200  * @param shm memory area to destroy
  201  * @return NULL on error
  202  */
  203 void
  204 EXTRACTOR_IPC_shared_memory_destroy_ (struct EXTRACTOR_SharedMemory *shm)
  205 {
  206   if (shm->ptr != NULL)
  207     UnmapViewOfFile (shm->ptr);
  208   if (shm->map != 0)
  209     CloseHandle (shm->map);
  210   free (shm);
  211 }
  212 
  213 
  214 /**
  215  * Initialize shared memory area from data source.
  216  *
  217  * @param shm memory area to initialize
  218  * @param ds data source to use for initialization
  219  * @param off offset to use in data source
  220  * @param size number of bytes to copy
  221  * @return -1 on error, otherwise number of bytes copied
  222  */
  223 ssize_t
  224 EXTRACTOR_IPC_shared_memory_set_ (struct EXTRACTOR_SharedMemory *shm,
  225                                   struct EXTRACTOR_Datasource *ds,
  226                                   uint64_t off,
  227                                   size_t size)
  228 {
  229   if (-1 ==
  230       EXTRACTOR_datasource_seek_ (ds, off, SEEK_SET))
  231   {
  232     LOG ("Failed to set IPC memory due to seek error\n");
  233     return -1;
  234   }
  235   if (size > shm->shm_size)
  236     size = shm->shm_size;
  237   return EXTRACTOR_datasource_read_ (ds,
  238                                      shm->ptr,
  239                                      size);
  240 }
  241 
  242 
  243 /**
  244  * Query datasource for current position
  245  *
  246  * @param ds data source to query
  247  * @return current position in the datasource or UINT_MAX on error
  248  */
  249 uint64_t
  250 EXTRACTOR_datasource_get_pos_ (struct EXTRACTOR_Datasource *ds)
  251 {
  252   int64_t pos = EXTRACTOR_datasource_seek_ (ds, 0, SEEK_CUR);
  253   if (-1 == pos)
  254     return UINT_MAX;
  255   return pos;
  256 }
  257 
  258 
  259 #ifndef PIPE_BUF
  260 #define PIPE_BUF 512
  261 #endif
  262 
  263 /* Copyright Bob Byrnes  <byrnes <at> curl.com>
  264    http://permalink.gmane.org/gmane.os.cygwin.patches/2121
  265 */
  266 /* Create a pipe, and return handles to the read and write ends,
  267    just like CreatePipe, but ensure that the write end permits
  268    FILE_READ_ATTRIBUTES access, on later versions of win32 where
  269    this is supported.  This access is needed by NtQueryInformationFile,
  270    which is used to implement select and nonblocking writes.
  271    Note that the return value is either NO_ERROR or GetLastError,
  272    unlike CreatePipe, which returns a bool for success or failure.  */
  273 static int
  274 create_selectable_pipe (PHANDLE read_pipe_ptr, PHANDLE write_pipe_ptr,
  275                         LPSECURITY_ATTRIBUTES sa_ptr, DWORD psize,
  276                         DWORD dwReadMode, DWORD dwWriteMode)
  277 {
  278   /* Default to error. */
  279   *read_pipe_ptr = *write_pipe_ptr = INVALID_HANDLE_VALUE;
  280 
  281   HANDLE read_pipe = INVALID_HANDLE_VALUE, write_pipe = INVALID_HANDLE_VALUE;
  282 
  283   /* Ensure that there is enough pipe buffer space for atomic writes.  */
  284   if (psize < PIPE_BUF)
  285     psize = PIPE_BUF;
  286 
  287   char pipename[MAX_PATH];
  288 
  289   /* Retry CreateNamedPipe as long as the pipe name is in use.
  290    * Retrying will probably never be necessary, but we want
  291    * to be as robust as possible.  */
  292   while (1)
  293   {
  294     static volatile LONG pipe_unique_id;
  295 
  296     snprintf (pipename, sizeof pipename, "\\\\.\\pipe\\gnunet-%d-%ld",
  297               getpid (), InterlockedIncrement ((LONG *) &pipe_unique_id));
  298     /* Use CreateNamedPipe instead of CreatePipe, because the latter
  299      * returns a write handle that does not permit FILE_READ_ATTRIBUTES
  300      * access, on versions of win32 earlier than WinXP SP2.
  301      * CreatePipe also stupidly creates a full duplex pipe, which is
  302      * a waste, since only a single direction is actually used.
  303      * It's important to only allow a single instance, to ensure that
  304      * the pipe was not created earlier by some other process, even if
  305      * the pid has been reused.  We avoid FILE_FLAG_FIRST_PIPE_INSTANCE
  306      * because that is only available for Win2k SP2 and WinXP.  */read_pipe = CreateNamedPipeA (pipename, PIPE_ACCESS_INBOUND | dwReadMode,
  307                                   PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, 1,                                               /* max instances */
  308                                   psize,        /* output buffer size */
  309                                   psize,        /* input buffer size */
  310                                   NMPWAIT_USE_DEFAULT_WAIT, sa_ptr);
  311 
  312     if (read_pipe != INVALID_HANDLE_VALUE)
  313     {
  314       break;
  315     }
  316 
  317     DWORD err = GetLastError ();
  318 
  319     switch (err)
  320     {
  321     case ERROR_PIPE_BUSY:
  322       /* The pipe is already open with compatible parameters.
  323        * Pick a new name and retry.  */
  324       continue;
  325     case ERROR_ACCESS_DENIED:
  326       /* The pipe is already open with incompatible parameters.
  327        * Pick a new name and retry.  */
  328       continue;
  329     case ERROR_CALL_NOT_IMPLEMENTED:
  330       /* We are on an older Win9x platform without named pipes.
  331        * Return an anonymous pipe as the best approximation.  */
  332       if (CreatePipe (read_pipe_ptr, write_pipe_ptr, sa_ptr, psize))
  333       {
  334         return 0;
  335       }
  336       err = GetLastError ();
  337       return err;
  338     default:
  339       return err;
  340     }
  341     /* NOTREACHED */
  342   }
  343 
  344   /* Open the named pipe for writing.
  345    * Be sure to permit FILE_READ_ATTRIBUTES access.  */
  346   write_pipe = CreateFileA (pipename, GENERIC_WRITE | FILE_READ_ATTRIBUTES, 0,  /* share mode */
  347                             sa_ptr, OPEN_EXISTING, dwWriteMode, /* flags and attributes */
  348                             0); /* handle to template file */
  349 
  350   if (write_pipe == INVALID_HANDLE_VALUE)
  351   {
  352     /* Failure. */
  353     DWORD err = GetLastError ();
  354 
  355     CloseHandle (read_pipe);
  356     return err;
  357   }
  358 
  359   /* Success. */
  360   *read_pipe_ptr = read_pipe;
  361   *write_pipe_ptr = write_pipe;
  362   return 0;
  363 }
  364 
  365 
  366 /**
  367  * Communicates plugin data (library name, options) to the plugin
  368  * process. This is only necessary on W32, where this information
  369  * is not inherited by the plugin, because it is not forked.
  370  *
  371  * @param plugin plugin context
  372  *
  373  * @return 0 on success, -1 on failure
  374  */
  375 static int
  376 write_plugin_data (struct EXTRACTOR_PluginList *plugin,
  377                    struct EXTRACTOR_Channel *channel)
  378 {
  379   size_t libname_len, shortname_len, opts_len;
  380   DWORD len;
  381   char *str;
  382   size_t total_len = 0;
  383   unsigned char *buf, *ptr;
  384   ssize_t write_result;
  385 
  386   libname_len = strlen (plugin->libname) + 1;
  387   total_len += sizeof (size_t) + libname_len;
  388   shortname_len = strlen (plugin->short_libname) + 1;
  389   total_len += sizeof (size_t) + shortname_len;
  390   if (plugin->plugin_options != NULL)
  391   {
  392     opts_len = strlen (plugin->plugin_options) + 1;
  393     total_len += opts_len;
  394   }
  395   else
  396   {
  397     opts_len = 0;
  398   }
  399   total_len += sizeof (size_t);
  400 
  401   buf = malloc (total_len);
  402   if (buf == NULL)
  403     return -1;
  404   ptr = buf;
  405   memcpy (ptr, &libname_len, sizeof (size_t));
  406   ptr += sizeof (size_t);
  407   memcpy (ptr, plugin->libname, libname_len);
  408   ptr += libname_len;
  409   memcpy (ptr, &shortname_len, sizeof (size_t));
  410   ptr += sizeof (size_t);
  411   memcpy (ptr, plugin->short_libname, shortname_len);
  412   ptr += shortname_len;
  413   memcpy (ptr, &opts_len, sizeof (size_t));
  414   ptr += sizeof (size_t);
  415   if (opts_len > 0)
  416   {
  417     memcpy (ptr, plugin->plugin_options, opts_len);
  418     ptr += opts_len;
  419   }
  420   write_result = EXTRACTOR_IPC_channel_send_ (channel, buf, total_len);
  421   free (buf);
  422   return total_len == write_result;
  423 }
  424 
  425 
  426 /**
  427  * Create a channel to communicate with a process wrapping
  428  * the plugin of the given name.  Starts the process as well.
  429  *
  430  * @param plugin the plugin
  431  * @param shm memory to share with the process
  432  * @return NULL on error, otherwise IPC channel
  433  */
  434 struct EXTRACTOR_Channel *
  435 EXTRACTOR_IPC_channel_create_ (struct EXTRACTOR_PluginList *plugin,
  436                                struct EXTRACTOR_SharedMemory *shm)
  437 {
  438   struct EXTRACTOR_Channel *channel;
  439   HANDLE p1[2];
  440   HANDLE p2[2];
  441   struct InitMessage *init;
  442   size_t slen;
  443 
  444   STARTUPINFOA startup;
  445   PROCESS_INFORMATION proc;
  446   char cmd[MAX_PATH + 1];
  447   char arg1[10], arg2[10];
  448   HANDLE p10_os_inh = INVALID_HANDLE_VALUE;
  449   HANDLE p21_os_inh = INVALID_HANDLE_VALUE;
  450   SECURITY_ATTRIBUTES sa;
  451 
  452   if (NULL == (channel = malloc (sizeof (struct EXTRACTOR_Channel))))
  453   {
  454     LOG_STRERROR ("malloc");
  455     return NULL;
  456   }
  457   memset (channel, 0, sizeof (struct EXTRACTOR_Channel));
  458   channel->mdata_size = 1024;
  459   if (NULL == (channel->mdata = malloc (channel->mdata_size)))
  460   {
  461     LOG_STRERROR ("malloc");
  462     free (channel);
  463     return NULL;
  464   }
  465   channel->shm = shm;
  466   channel->plugin = plugin;
  467   channel->size = 0;
  468 
  469   sa.nLength = sizeof (sa);
  470   sa.lpSecurityDescriptor = NULL;
  471   sa.bInheritHandle = FALSE;
  472 
  473   if (0 != create_selectable_pipe (&p1[0], &p1[1], &sa, 1024,
  474                                    FILE_FLAG_OVERLAPPED, FILE_FLAG_OVERLAPPED))
  475   {
  476     LOG_STRERROR ("pipe");
  477     free (channel);
  478     return NULL;
  479   }
  480   if (0 != create_selectable_pipe (&p2[0], &p2[1], &sa, 1024,
  481                                    FILE_FLAG_OVERLAPPED, FILE_FLAG_OVERLAPPED))
  482   {
  483     LOG_STRERROR ("pipe");
  484     CloseHandle (p1[0]);
  485     CloseHandle (p1[1]);
  486     free (channel);
  487     return NULL;
  488   }
  489 
  490   if (! DuplicateHandle (GetCurrentProcess (), p1[0], GetCurrentProcess (),
  491                          &p10_os_inh, 0, TRUE, DUPLICATE_SAME_ACCESS)
  492       || ! DuplicateHandle (GetCurrentProcess (), p2[1], GetCurrentProcess (),
  493                             &p21_os_inh, 0, TRUE, DUPLICATE_SAME_ACCESS))
  494   {
  495     LOG_STRERROR ("DuplicateHandle");
  496     if (p10_os_inh != INVALID_HANDLE_VALUE)
  497       CloseHandle (p10_os_inh);
  498     if (p21_os_inh != INVALID_HANDLE_VALUE)
  499       CloseHandle (p21_os_inh);
  500     CloseHandle (p1[0]);
  501     CloseHandle (p1[1]);
  502     CloseHandle (p2[0]);
  503     CloseHandle (p2[1]);
  504     CloseHandle (p1[0]);
  505     CloseHandle (p1[1]);
  506     free (channel);
  507     return NULL;
  508   }
  509 
  510   memset (&proc, 0, sizeof (PROCESS_INFORMATION));
  511   memset (&startup, 0, sizeof (STARTUPINFOA));
  512 
  513   /* TODO: write our own plugin-hosting executable? rundll32, for once, has smaller than usual stack size.
  514    * Also, users might freak out seeing over 9000 rundll32 processes (seeing over 9000 processes named
  515    * "libextractor_plugin_helper" is probably less confusing).
  516    */
  517   snprintf (cmd, MAX_PATH,
  518             "rundll32.exe libextractor-3.dll,RundllEntryPoint@16 %lu %lu",
  519             p10_os_inh, p21_os_inh);
  520   cmd[MAX_PATH] = '\0';
  521   startup.cb = sizeof (STARTUPINFOA);
  522   if (CreateProcessA (NULL, cmd, NULL, NULL, TRUE, CREATE_SUSPENDED, NULL, NULL,
  523                       &startup, &proc))
  524   {
  525     channel->hProcess = proc.hProcess;
  526     ResumeThread (proc.hThread);
  527     CloseHandle (proc.hThread);
  528   }
  529   else
  530   {
  531     LOG_STRERROR ("CreateProcess");
  532     CloseHandle (p1[0]);
  533     CloseHandle (p1[1]);
  534     CloseHandle (p2[0]);
  535     CloseHandle (p2[1]);
  536     free (channel);
  537     return NULL;
  538   }
  539   CloseHandle (p1[0]);
  540   CloseHandle (p2[1]);
  541   CloseHandle (p10_os_inh);
  542   CloseHandle (p21_os_inh);
  543 
  544   channel->cpipe_in = p1[1];
  545   channel->cpipe_out = p2[0];
  546 
  547   memset (&channel->ov_read, 0, sizeof (OVERLAPPED));
  548   memset (&channel->ov_write, 0, sizeof (OVERLAPPED));
  549 
  550   channel->ov_write_buffer = NULL;
  551 
  552   channel->ov_write.hEvent = CreateEvent (NULL, TRUE, TRUE, NULL);
  553   channel->ov_read.hEvent = CreateEvent (NULL, TRUE, TRUE, NULL);
  554 
  555   if (! write_plugin_data (plugin, channel))
  556   {
  557     LOG_STRERROR ("write_plugin_data");
  558     EXTRACTOR_IPC_channel_destroy_ (channel);
  559     return NULL;
  560   }
  561 
  562   slen = strlen (shm->shm_name) + 1;
  563   if (NULL == (init = malloc (sizeof (struct InitMessage) + slen)))
  564   {
  565     LOG_STRERROR ("malloc");
  566     EXTRACTOR_IPC_channel_destroy_ (channel);
  567     return NULL;
  568   }
  569   init->opcode = MESSAGE_INIT_STATE;
  570   init->reserved = 0;
  571   init->reserved2 = 0;
  572   init->shm_name_length = slen;
  573   init->shm_map_size = shm->shm_size;
  574   memcpy (&init[1], shm->shm_name, slen);
  575   if (sizeof (struct InitMessage) + slen !=
  576       EXTRACTOR_IPC_channel_send_ (channel, init,
  577                                    sizeof (struct InitMessage) + slen))
  578   {
  579     LOG ("Failed to send INIT_STATE message to plugin\n");
  580     EXTRACTOR_IPC_channel_destroy_ (channel);
  581     free (init);
  582     return NULL;
  583   }
  584   free (init);
  585   return channel;
  586 }
  587 
  588 
  589 /**
  590  * Destroy communication channel with a plugin/process.  Also
  591  * destroys the process.
  592  *
  593  * @param channel channel to communicate with the plugin
  594  */
  595 void
  596 EXTRACTOR_IPC_channel_destroy_ (struct EXTRACTOR_Channel *channel)
  597 {
  598   int status;
  599 
  600   CloseHandle (channel->cpipe_out);
  601   CloseHandle (channel->cpipe_in);
  602   CloseHandle (channel->ov_read.hEvent);
  603   CloseHandle (channel->ov_write.hEvent);
  604   if (channel->ov_write_buffer != NULL)
  605   {
  606     free (channel->ov_write_buffer);
  607     channel->ov_write_buffer = NULL;
  608   }
  609   if (NULL != channel->plugin)
  610     channel->plugin->channel = NULL;
  611   free (channel->mdata);
  612   WaitForSingleObject (channel->hProcess, 1000);
  613   TerminateProcess (channel->hProcess, 0);
  614   CloseHandle (channel->hProcess);
  615   free (channel);
  616 }
  617 
  618 
  619 /**
  620  * Send data via the given IPC channel (blocking).
  621  *
  622  * @param channel channel to communicate with the plugin
  623  * @param buf data to send
  624  * @param size number of bytes in buf to send
  625  * @return -1 on error, number of bytes sent on success
  626  *           (never does partial writes)
  627  */
  628 ssize_t
  629 EXTRACTOR_IPC_channel_send_ (struct EXTRACTOR_Channel *channel,
  630                              const void *data,
  631                              size_t size)
  632 {
  633   DWORD written;
  634   DWORD err;
  635   BOOL bresult;
  636   const char *cdata = data;
  637 
  638   if (WAIT_OBJECT_0 != WaitForSingleObject (channel->ov_write.hEvent, INFINITE))
  639     return -1;
  640 
  641   ResetEvent (channel->ov_write.hEvent);
  642 
  643   if (channel->old_buf != NULL)
  644     free (channel->old_buf);
  645 
  646   channel->old_buf = malloc (size);
  647   if (channel->old_buf == NULL)
  648     return -1;
  649 
  650   memcpy (channel->old_buf, data, size);
  651   written = 0;
  652   channel->ov_write.Offset = 0;
  653   channel->ov_write.OffsetHigh = 0;
  654   channel->ov_write.Pointer = 0;
  655   channel->ov_write.Internal = 0;
  656   channel->ov_write.InternalHigh = 0;
  657   bresult = WriteFile (channel->cpipe_in, channel->old_buf, size, &written,
  658                        &channel->ov_write);
  659 
  660   if (bresult == TRUE)
  661   {
  662     SetEvent (channel->ov_write.hEvent);
  663     free (channel->old_buf);
  664     channel->old_buf = NULL;
  665     return written;
  666   }
  667 
  668   err = GetLastError ();
  669   if (err == ERROR_IO_PENDING)
  670     return size;
  671   SetEvent (channel->ov_write.hEvent);
  672   free (channel->old_buf);
  673   channel->old_buf = NULL;
  674   SetLastError (err);
  675   return -1;
  676 }
  677 
  678 
  679 /**
  680  * Receive data from any of the given IPC channels (blocking).
  681  * Wait for one of the plugins to reply.
  682  * Selects on plugin output pipes, runs 'receive_reply'
  683  * on each activated pipe until it gets a seek request
  684  * or a done message. Called repeatedly by the user until all pipes are dry or
  685  * broken.
  686  *
  687  * @param channels array of channels, channels that break may be set to NULL
  688  * @param num_channels length of the 'channels' array
  689  * @param proc function to call to process messages (may be called
  690  *             more than once)
  691  * @param proc_cls closure for 'proc'
  692  * @return -1 on error, 1 on success
  693  */
  694 int
  695 EXTRACTOR_IPC_channel_recv_ (struct EXTRACTOR_Channel **channels,
  696                              unsigned int num_channels,
  697                              EXTRACTOR_ChannelMessageProcessor proc,
  698                              void *proc_cls)
  699 {
  700   DWORD ms;
  701   DWORD first_ready;
  702   DWORD dwresult;
  703   DWORD bytes_read;
  704   BOOL bresult;
  705   unsigned int i;
  706   unsigned int c;
  707   char *ndata;
  708   HANDLE events[MAXIMUM_WAIT_OBJECTS];
  709   int closed_channel;
  710 
  711   c = 0;
  712   for (i = 0; i < num_channels; i++)
  713   {
  714     if (NULL == channels[i])
  715       continue;
  716     if (MAXIMUM_WAIT_OBJECTS == c)
  717       return -1;
  718     if (WaitForSingleObject (channels[i]->ov_read.hEvent, 0) == WAIT_OBJECT_0)
  719     {
  720       ResetEvent (channels[i]->ov_read.hEvent);
  721       bresult = ReadFile (channels[i]->cpipe_out, &i, 0, &bytes_read,
  722                           &channels[i]->ov_read);
  723       if (bresult == TRUE)
  724       {
  725         SetEvent (channels[i]->ov_read.hEvent);
  726       }
  727       else
  728       {
  729         DWORD err = GetLastError ();
  730         if (err != ERROR_IO_PENDING)
  731           SetEvent (channels[i]->ov_read.hEvent);
  732       }
  733     }
  734     events[c] = channels[i]->ov_read.hEvent;
  735     c++;
  736   }
  737 
  738   if (c == 0)
  739     return 1; /* nothing left to do! */
  740 
  741   ms = 500;
  742   first_ready = WaitForMultipleObjects (c, events, FALSE, ms);
  743   if ((first_ready == WAIT_TIMEOUT) || (first_ready == WAIT_FAILED))
  744   {
  745     /* an error or timeout -> something's wrong or all plugins hung up */
  746     closed_channel = 0;
  747     for (i = 0; i < num_channels; i++)
  748     {
  749       struct EXTRACTOR_Channel *channel = channels[i];
  750       if (NULL == channel)
  751         continue;
  752       if (-1 == channel->plugin->seek_request)
  753       {
  754         /* plugin blocked for too long, kill the channel */
  755         LOG ("Channel blocked, closing channel to %s\n",
  756              channel->plugin->libname);
  757         channel->plugin->channel = NULL;
  758         channel->plugin->round_finished = 1;
  759         EXTRACTOR_IPC_channel_destroy_ (channel);
  760         channels[i] = NULL;
  761         closed_channel = 1;
  762       }
  763     }
  764     if (1 == closed_channel)
  765       return 1;
  766     LOG_STRERROR ("WaitForMultipleObjects");
  767     return -1;
  768   }
  769 
  770   i = 0;
  771   for (i = 0; i < num_channels; i++)
  772   {
  773     if (NULL == channels[i])
  774       continue;
  775     dwresult = WaitForSingleObject (channels[i]->ov_read.hEvent, 0);
  776     if (dwresult == WAIT_OBJECT_0)
  777     {
  778       int ret;
  779       if (channels[i]->mdata_size == channels[i]->size)
  780       {
  781         /* not enough space, need to grow allocation (if allowed) */
  782         if (MAX_META_DATA == channels[i]->mdata_size)
  783         {
  784           LOG ("Inbound message from channel too large, aborting\n");
  785           EXTRACTOR_IPC_channel_destroy_ (channels[i]);
  786           channels[i] = NULL;
  787         }
  788         channels[i]->mdata_size *= 2;
  789         if (channels[i]->mdata_size > MAX_META_DATA)
  790           channels[i]->mdata_size = MAX_META_DATA;
  791         if (NULL == (ndata = realloc (channels[i]->mdata,
  792                                       channels[i]->mdata_size)))
  793         {
  794           LOG_STRERROR ("realloc");
  795           EXTRACTOR_IPC_channel_destroy_ (channels[i]);
  796           channels[i] = NULL;
  797         }
  798         channels[i]->mdata = ndata;
  799       }
  800       bresult = ReadFile (channels[i]->cpipe_out,
  801                           &channels[i]->mdata[channels[i]->size],
  802                           channels[i]->mdata_size - channels[i]->size,
  803                           &bytes_read, NULL);
  804       if (bresult)
  805         ret = EXTRACTOR_IPC_process_reply_ (channels[i]->plugin,
  806                                             channels[i]->mdata,
  807                                             channels[i]->size + bytes_read,
  808                                             proc, proc_cls);
  809       if (! bresult || (-1 == ret))
  810       {
  811         DWORD error = GetLastError ();
  812         SetErrnoFromWinError (error);
  813         if (! bresult)
  814           LOG_STRERROR ("ReadFile");
  815         EXTRACTOR_IPC_channel_destroy_ (channels[i]);
  816         channels[i] = NULL;
  817       }
  818       else
  819       {
  820         memmove (channels[i]->mdata, &channels[i]->mdata[ret],
  821                  channels[i]->size + bytes_read - ret);
  822         channels[i]->size = channels[i]->size + bytes_read - ret;
  823       }
  824     }
  825   }
  826   return 1;
  827 }