"Fossies" - the Fresh Open Source Software Archive 
Member "libextractor-1.11/src/main/extractor_ipc_w32.c" (30 Jan 2021, 23109 Bytes) of package /linux/privat/libextractor-1.11.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style:
standard) with prefixed line numbers and
code folding option.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "extractor_ipc_w32.c" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
1.10_vs_1.11.
1 /*
2 This file is part of libextractor.
3 Copyright (C) 2002, 2003, 2004, 2005, 2006, 2009, 2012 Vidyut Samanta and Christian Grothoff
4
5 libextractor is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 libextractor is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with libextractor; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
19 */
20
21 #include "platform.h"
22 #include "plibc.h"
23 #include "extractor.h"
24 #include "extractor_datasource.h"
25 #include "extractor_plugin_main.h"
26 #include "extractor_plugins.h"
27 #include "extractor_ipc.h"
28 #include "extractor_logging.h"
29
30 /**
31 */
32 struct EXTRACTOR_SharedMemory
33 {
34
35 /**
36 * W32 handle of the shm into which data is uncompressed
37 */
38 HANDLE map;
39
40 /**
41 * Name of the shm
42 */
43 char shm_name[MAX_SHM_NAME + 1];
44
45 /**
46 * Pointer to the mapped region of the shm (covers the whole shm)
47 */
48 void *ptr;
49
50 /**
51 * Position within shm
52 */
53 int64_t pos;
54
55 /**
56 * Allocated size of the shm
57 */
58 int64_t shm_size;
59
60 /**
61 * Number of bytes in shm (<= shm_size)
62 */
63 size_t shm_buf_size;
64
65 size_t shm_map_size;
66
67 /**
68 * Reference counter describing how many references share this SHM.
69 */
70 unsigned int rc;
71 };
72
73
74 /**
75 * Definition of an IPC communication channel with
76 * some plugin.
77 */
78 struct EXTRACTOR_Channel
79 {
80
81 /**
82 * Process ID of the child process for this plugin. 0 for none.
83 */
84 HANDLE hProcess;
85
86 /**
87 * Pipe used to communicate information to the plugin child process.
88 * NULL if not initialized.
89 */
90 HANDLE cpipe_in;
91
92 /**
93 * Handle of the shm object
94 */
95 HANDLE map_handle;
96
97 /**
98 * Pipe used to read information about extracted meta data from
99 * the plugin child process. -1 if not initialized.
100 */
101 HANDLE cpipe_out;
102
103 /**
104 * A structure for overlapped reads on W32.
105 */
106 OVERLAPPED ov_read;
107
108 /**
109 * A structure for overlapped writes on W32.
110 */
111 OVERLAPPED ov_write;
112
113 /**
114 * A write buffer for overlapped writes on W32
115 */
116 unsigned char *ov_write_buffer;
117
118 /**
119 * The plugin this channel is to communicate with.
120 */
121 struct EXTRACTOR_PluginList *plugin;
122
123 /**
124 * Memory segment shared with this process.
125 */
126 struct EXTRACTOR_SharedMemory *shm;
127
128 void *old_buf;
129
130 /**
131 * Buffer for reading data from the plugin.
132 */
133 char *mdata;
134
135 /**
136 * Size of the 'mdata' buffer.
137 */
138 size_t mdata_size;
139
140 /**
141 * Number of valid bytes in the channel's buffer.
142 */
143 size_t size;
144 };
145
146
147 /**
148 * Create a shared memory area.
149 *
150 * @param size size of the shared area
151 * @return NULL on error
152 */
153 struct EXTRACTOR_SharedMemory *
154 EXTRACTOR_IPC_shared_memory_create_ (size_t size)
155 {
156 struct EXTRACTOR_SharedMemory *shm;
157 const char *tpath = "Local\\";
158
159 if (NULL == (shm = malloc (sizeof (struct EXTRACTOR_SharedMemory))))
160 return NULL;
161
162 snprintf (shm->shm_name, MAX_SHM_NAME,
163 "%slibextractor-shm-%u-%u",
164 tpath, getpid (),
165 (unsigned int) random ());
166 shm->map = CreateFileMapping (INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE, 0,
167 size, shm->shm_name);
168 shm->ptr = MapViewOfFile (shm->map, FILE_MAP_WRITE, 0, 0, size);
169 if (shm->ptr == NULL)
170 {
171 CloseHandle (shm->map);
172 free (shm);
173 return NULL;
174 }
175 shm->shm_size = size;
176 shm->rc = 0;
177 return shm;
178 }
179
180
181 /**
182 * Change the reference counter for this shm instance.
183 *
184 * @param shm instance to update
185 * @param delta value to change RC by
186 * @return new RC
187 */
188 unsigned int
189 EXTRACTOR_IPC_shared_memory_change_rc_ (struct EXTRACTOR_SharedMemory *shm,
190 int delta)
191 {
192 shm->rc += delta;
193 return shm->rc;
194 }
195
196
197 /**
198 * Destroy shared memory area.
199 *
200 * @param shm memory area to destroy
201 * @return NULL on error
202 */
203 void
204 EXTRACTOR_IPC_shared_memory_destroy_ (struct EXTRACTOR_SharedMemory *shm)
205 {
206 if (shm->ptr != NULL)
207 UnmapViewOfFile (shm->ptr);
208 if (shm->map != 0)
209 CloseHandle (shm->map);
210 free (shm);
211 }
212
213
214 /**
215 * Initialize shared memory area from data source.
216 *
217 * @param shm memory area to initialize
218 * @param ds data source to use for initialization
219 * @param off offset to use in data source
220 * @param size number of bytes to copy
221 * @return -1 on error, otherwise number of bytes copied
222 */
223 ssize_t
224 EXTRACTOR_IPC_shared_memory_set_ (struct EXTRACTOR_SharedMemory *shm,
225 struct EXTRACTOR_Datasource *ds,
226 uint64_t off,
227 size_t size)
228 {
229 if (-1 ==
230 EXTRACTOR_datasource_seek_ (ds, off, SEEK_SET))
231 {
232 LOG ("Failed to set IPC memory due to seek error\n");
233 return -1;
234 }
235 if (size > shm->shm_size)
236 size = shm->shm_size;
237 return EXTRACTOR_datasource_read_ (ds,
238 shm->ptr,
239 size);
240 }
241
242
243 /**
244 * Query datasource for current position
245 *
246 * @param ds data source to query
247 * @return current position in the datasource or UINT_MAX on error
248 */
249 uint64_t
250 EXTRACTOR_datasource_get_pos_ (struct EXTRACTOR_Datasource *ds)
251 {
252 int64_t pos = EXTRACTOR_datasource_seek_ (ds, 0, SEEK_CUR);
253 if (-1 == pos)
254 return UINT_MAX;
255 return pos;
256 }
257
258
259 #ifndef PIPE_BUF
260 #define PIPE_BUF 512
261 #endif
262
263 /* Copyright Bob Byrnes <byrnes <at> curl.com>
264 http://permalink.gmane.org/gmane.os.cygwin.patches/2121
265 */
266 /* Create a pipe, and return handles to the read and write ends,
267 just like CreatePipe, but ensure that the write end permits
268 FILE_READ_ATTRIBUTES access, on later versions of win32 where
269 this is supported. This access is needed by NtQueryInformationFile,
270 which is used to implement select and nonblocking writes.
271 Note that the return value is either NO_ERROR or GetLastError,
272 unlike CreatePipe, which returns a bool for success or failure. */
273 static int
274 create_selectable_pipe (PHANDLE read_pipe_ptr, PHANDLE write_pipe_ptr,
275 LPSECURITY_ATTRIBUTES sa_ptr, DWORD psize,
276 DWORD dwReadMode, DWORD dwWriteMode)
277 {
278 /* Default to error. */
279 *read_pipe_ptr = *write_pipe_ptr = INVALID_HANDLE_VALUE;
280
281 HANDLE read_pipe = INVALID_HANDLE_VALUE, write_pipe = INVALID_HANDLE_VALUE;
282
283 /* Ensure that there is enough pipe buffer space for atomic writes. */
284 if (psize < PIPE_BUF)
285 psize = PIPE_BUF;
286
287 char pipename[MAX_PATH];
288
289 /* Retry CreateNamedPipe as long as the pipe name is in use.
290 * Retrying will probably never be necessary, but we want
291 * to be as robust as possible. */
292 while (1)
293 {
294 static volatile LONG pipe_unique_id;
295
296 snprintf (pipename, sizeof pipename, "\\\\.\\pipe\\gnunet-%d-%ld",
297 getpid (), InterlockedIncrement ((LONG *) &pipe_unique_id));
298 /* Use CreateNamedPipe instead of CreatePipe, because the latter
299 * returns a write handle that does not permit FILE_READ_ATTRIBUTES
300 * access, on versions of win32 earlier than WinXP SP2.
301 * CreatePipe also stupidly creates a full duplex pipe, which is
302 * a waste, since only a single direction is actually used.
303 * It's important to only allow a single instance, to ensure that
304 * the pipe was not created earlier by some other process, even if
305 * the pid has been reused. We avoid FILE_FLAG_FIRST_PIPE_INSTANCE
306 * because that is only available for Win2k SP2 and WinXP. */read_pipe = CreateNamedPipeA (pipename, PIPE_ACCESS_INBOUND | dwReadMode,
307 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, 1, /* max instances */
308 psize, /* output buffer size */
309 psize, /* input buffer size */
310 NMPWAIT_USE_DEFAULT_WAIT, sa_ptr);
311
312 if (read_pipe != INVALID_HANDLE_VALUE)
313 {
314 break;
315 }
316
317 DWORD err = GetLastError ();
318
319 switch (err)
320 {
321 case ERROR_PIPE_BUSY:
322 /* The pipe is already open with compatible parameters.
323 * Pick a new name and retry. */
324 continue;
325 case ERROR_ACCESS_DENIED:
326 /* The pipe is already open with incompatible parameters.
327 * Pick a new name and retry. */
328 continue;
329 case ERROR_CALL_NOT_IMPLEMENTED:
330 /* We are on an older Win9x platform without named pipes.
331 * Return an anonymous pipe as the best approximation. */
332 if (CreatePipe (read_pipe_ptr, write_pipe_ptr, sa_ptr, psize))
333 {
334 return 0;
335 }
336 err = GetLastError ();
337 return err;
338 default:
339 return err;
340 }
341 /* NOTREACHED */
342 }
343
344 /* Open the named pipe for writing.
345 * Be sure to permit FILE_READ_ATTRIBUTES access. */
346 write_pipe = CreateFileA (pipename, GENERIC_WRITE | FILE_READ_ATTRIBUTES, 0, /* share mode */
347 sa_ptr, OPEN_EXISTING, dwWriteMode, /* flags and attributes */
348 0); /* handle to template file */
349
350 if (write_pipe == INVALID_HANDLE_VALUE)
351 {
352 /* Failure. */
353 DWORD err = GetLastError ();
354
355 CloseHandle (read_pipe);
356 return err;
357 }
358
359 /* Success. */
360 *read_pipe_ptr = read_pipe;
361 *write_pipe_ptr = write_pipe;
362 return 0;
363 }
364
365
366 /**
367 * Communicates plugin data (library name, options) to the plugin
368 * process. This is only necessary on W32, where this information
369 * is not inherited by the plugin, because it is not forked.
370 *
371 * @param plugin plugin context
372 *
373 * @return 0 on success, -1 on failure
374 */
375 static int
376 write_plugin_data (struct EXTRACTOR_PluginList *plugin,
377 struct EXTRACTOR_Channel *channel)
378 {
379 size_t libname_len, shortname_len, opts_len;
380 DWORD len;
381 char *str;
382 size_t total_len = 0;
383 unsigned char *buf, *ptr;
384 ssize_t write_result;
385
386 libname_len = strlen (plugin->libname) + 1;
387 total_len += sizeof (size_t) + libname_len;
388 shortname_len = strlen (plugin->short_libname) + 1;
389 total_len += sizeof (size_t) + shortname_len;
390 if (plugin->plugin_options != NULL)
391 {
392 opts_len = strlen (plugin->plugin_options) + 1;
393 total_len += opts_len;
394 }
395 else
396 {
397 opts_len = 0;
398 }
399 total_len += sizeof (size_t);
400
401 buf = malloc (total_len);
402 if (buf == NULL)
403 return -1;
404 ptr = buf;
405 memcpy (ptr, &libname_len, sizeof (size_t));
406 ptr += sizeof (size_t);
407 memcpy (ptr, plugin->libname, libname_len);
408 ptr += libname_len;
409 memcpy (ptr, &shortname_len, sizeof (size_t));
410 ptr += sizeof (size_t);
411 memcpy (ptr, plugin->short_libname, shortname_len);
412 ptr += shortname_len;
413 memcpy (ptr, &opts_len, sizeof (size_t));
414 ptr += sizeof (size_t);
415 if (opts_len > 0)
416 {
417 memcpy (ptr, plugin->plugin_options, opts_len);
418 ptr += opts_len;
419 }
420 write_result = EXTRACTOR_IPC_channel_send_ (channel, buf, total_len);
421 free (buf);
422 return total_len == write_result;
423 }
424
425
426 /**
427 * Create a channel to communicate with a process wrapping
428 * the plugin of the given name. Starts the process as well.
429 *
430 * @param plugin the plugin
431 * @param shm memory to share with the process
432 * @return NULL on error, otherwise IPC channel
433 */
434 struct EXTRACTOR_Channel *
435 EXTRACTOR_IPC_channel_create_ (struct EXTRACTOR_PluginList *plugin,
436 struct EXTRACTOR_SharedMemory *shm)
437 {
438 struct EXTRACTOR_Channel *channel;
439 HANDLE p1[2];
440 HANDLE p2[2];
441 struct InitMessage *init;
442 size_t slen;
443
444 STARTUPINFOA startup;
445 PROCESS_INFORMATION proc;
446 char cmd[MAX_PATH + 1];
447 char arg1[10], arg2[10];
448 HANDLE p10_os_inh = INVALID_HANDLE_VALUE;
449 HANDLE p21_os_inh = INVALID_HANDLE_VALUE;
450 SECURITY_ATTRIBUTES sa;
451
452 if (NULL == (channel = malloc (sizeof (struct EXTRACTOR_Channel))))
453 {
454 LOG_STRERROR ("malloc");
455 return NULL;
456 }
457 memset (channel, 0, sizeof (struct EXTRACTOR_Channel));
458 channel->mdata_size = 1024;
459 if (NULL == (channel->mdata = malloc (channel->mdata_size)))
460 {
461 LOG_STRERROR ("malloc");
462 free (channel);
463 return NULL;
464 }
465 channel->shm = shm;
466 channel->plugin = plugin;
467 channel->size = 0;
468
469 sa.nLength = sizeof (sa);
470 sa.lpSecurityDescriptor = NULL;
471 sa.bInheritHandle = FALSE;
472
473 if (0 != create_selectable_pipe (&p1[0], &p1[1], &sa, 1024,
474 FILE_FLAG_OVERLAPPED, FILE_FLAG_OVERLAPPED))
475 {
476 LOG_STRERROR ("pipe");
477 free (channel);
478 return NULL;
479 }
480 if (0 != create_selectable_pipe (&p2[0], &p2[1], &sa, 1024,
481 FILE_FLAG_OVERLAPPED, FILE_FLAG_OVERLAPPED))
482 {
483 LOG_STRERROR ("pipe");
484 CloseHandle (p1[0]);
485 CloseHandle (p1[1]);
486 free (channel);
487 return NULL;
488 }
489
490 if (! DuplicateHandle (GetCurrentProcess (), p1[0], GetCurrentProcess (),
491 &p10_os_inh, 0, TRUE, DUPLICATE_SAME_ACCESS)
492 || ! DuplicateHandle (GetCurrentProcess (), p2[1], GetCurrentProcess (),
493 &p21_os_inh, 0, TRUE, DUPLICATE_SAME_ACCESS))
494 {
495 LOG_STRERROR ("DuplicateHandle");
496 if (p10_os_inh != INVALID_HANDLE_VALUE)
497 CloseHandle (p10_os_inh);
498 if (p21_os_inh != INVALID_HANDLE_VALUE)
499 CloseHandle (p21_os_inh);
500 CloseHandle (p1[0]);
501 CloseHandle (p1[1]);
502 CloseHandle (p2[0]);
503 CloseHandle (p2[1]);
504 CloseHandle (p1[0]);
505 CloseHandle (p1[1]);
506 free (channel);
507 return NULL;
508 }
509
510 memset (&proc, 0, sizeof (PROCESS_INFORMATION));
511 memset (&startup, 0, sizeof (STARTUPINFOA));
512
513 /* TODO: write our own plugin-hosting executable? rundll32, for once, has smaller than usual stack size.
514 * Also, users might freak out seeing over 9000 rundll32 processes (seeing over 9000 processes named
515 * "libextractor_plugin_helper" is probably less confusing).
516 */
517 snprintf (cmd, MAX_PATH,
518 "rundll32.exe libextractor-3.dll,RundllEntryPoint@16 %lu %lu",
519 p10_os_inh, p21_os_inh);
520 cmd[MAX_PATH] = '\0';
521 startup.cb = sizeof (STARTUPINFOA);
522 if (CreateProcessA (NULL, cmd, NULL, NULL, TRUE, CREATE_SUSPENDED, NULL, NULL,
523 &startup, &proc))
524 {
525 channel->hProcess = proc.hProcess;
526 ResumeThread (proc.hThread);
527 CloseHandle (proc.hThread);
528 }
529 else
530 {
531 LOG_STRERROR ("CreateProcess");
532 CloseHandle (p1[0]);
533 CloseHandle (p1[1]);
534 CloseHandle (p2[0]);
535 CloseHandle (p2[1]);
536 free (channel);
537 return NULL;
538 }
539 CloseHandle (p1[0]);
540 CloseHandle (p2[1]);
541 CloseHandle (p10_os_inh);
542 CloseHandle (p21_os_inh);
543
544 channel->cpipe_in = p1[1];
545 channel->cpipe_out = p2[0];
546
547 memset (&channel->ov_read, 0, sizeof (OVERLAPPED));
548 memset (&channel->ov_write, 0, sizeof (OVERLAPPED));
549
550 channel->ov_write_buffer = NULL;
551
552 channel->ov_write.hEvent = CreateEvent (NULL, TRUE, TRUE, NULL);
553 channel->ov_read.hEvent = CreateEvent (NULL, TRUE, TRUE, NULL);
554
555 if (! write_plugin_data (plugin, channel))
556 {
557 LOG_STRERROR ("write_plugin_data");
558 EXTRACTOR_IPC_channel_destroy_ (channel);
559 return NULL;
560 }
561
562 slen = strlen (shm->shm_name) + 1;
563 if (NULL == (init = malloc (sizeof (struct InitMessage) + slen)))
564 {
565 LOG_STRERROR ("malloc");
566 EXTRACTOR_IPC_channel_destroy_ (channel);
567 return NULL;
568 }
569 init->opcode = MESSAGE_INIT_STATE;
570 init->reserved = 0;
571 init->reserved2 = 0;
572 init->shm_name_length = slen;
573 init->shm_map_size = shm->shm_size;
574 memcpy (&init[1], shm->shm_name, slen);
575 if (sizeof (struct InitMessage) + slen !=
576 EXTRACTOR_IPC_channel_send_ (channel, init,
577 sizeof (struct InitMessage) + slen))
578 {
579 LOG ("Failed to send INIT_STATE message to plugin\n");
580 EXTRACTOR_IPC_channel_destroy_ (channel);
581 free (init);
582 return NULL;
583 }
584 free (init);
585 return channel;
586 }
587
588
589 /**
590 * Destroy communication channel with a plugin/process. Also
591 * destroys the process.
592 *
593 * @param channel channel to communicate with the plugin
594 */
595 void
596 EXTRACTOR_IPC_channel_destroy_ (struct EXTRACTOR_Channel *channel)
597 {
598 int status;
599
600 CloseHandle (channel->cpipe_out);
601 CloseHandle (channel->cpipe_in);
602 CloseHandle (channel->ov_read.hEvent);
603 CloseHandle (channel->ov_write.hEvent);
604 if (channel->ov_write_buffer != NULL)
605 {
606 free (channel->ov_write_buffer);
607 channel->ov_write_buffer = NULL;
608 }
609 if (NULL != channel->plugin)
610 channel->plugin->channel = NULL;
611 free (channel->mdata);
612 WaitForSingleObject (channel->hProcess, 1000);
613 TerminateProcess (channel->hProcess, 0);
614 CloseHandle (channel->hProcess);
615 free (channel);
616 }
617
618
619 /**
620 * Send data via the given IPC channel (blocking).
621 *
622 * @param channel channel to communicate with the plugin
623 * @param buf data to send
624 * @param size number of bytes in buf to send
625 * @return -1 on error, number of bytes sent on success
626 * (never does partial writes)
627 */
628 ssize_t
629 EXTRACTOR_IPC_channel_send_ (struct EXTRACTOR_Channel *channel,
630 const void *data,
631 size_t size)
632 {
633 DWORD written;
634 DWORD err;
635 BOOL bresult;
636 const char *cdata = data;
637
638 if (WAIT_OBJECT_0 != WaitForSingleObject (channel->ov_write.hEvent, INFINITE))
639 return -1;
640
641 ResetEvent (channel->ov_write.hEvent);
642
643 if (channel->old_buf != NULL)
644 free (channel->old_buf);
645
646 channel->old_buf = malloc (size);
647 if (channel->old_buf == NULL)
648 return -1;
649
650 memcpy (channel->old_buf, data, size);
651 written = 0;
652 channel->ov_write.Offset = 0;
653 channel->ov_write.OffsetHigh = 0;
654 channel->ov_write.Pointer = 0;
655 channel->ov_write.Internal = 0;
656 channel->ov_write.InternalHigh = 0;
657 bresult = WriteFile (channel->cpipe_in, channel->old_buf, size, &written,
658 &channel->ov_write);
659
660 if (bresult == TRUE)
661 {
662 SetEvent (channel->ov_write.hEvent);
663 free (channel->old_buf);
664 channel->old_buf = NULL;
665 return written;
666 }
667
668 err = GetLastError ();
669 if (err == ERROR_IO_PENDING)
670 return size;
671 SetEvent (channel->ov_write.hEvent);
672 free (channel->old_buf);
673 channel->old_buf = NULL;
674 SetLastError (err);
675 return -1;
676 }
677
678
679 /**
680 * Receive data from any of the given IPC channels (blocking).
681 * Wait for one of the plugins to reply.
682 * Selects on plugin output pipes, runs 'receive_reply'
683 * on each activated pipe until it gets a seek request
684 * or a done message. Called repeatedly by the user until all pipes are dry or
685 * broken.
686 *
687 * @param channels array of channels, channels that break may be set to NULL
688 * @param num_channels length of the 'channels' array
689 * @param proc function to call to process messages (may be called
690 * more than once)
691 * @param proc_cls closure for 'proc'
692 * @return -1 on error, 1 on success
693 */
694 int
695 EXTRACTOR_IPC_channel_recv_ (struct EXTRACTOR_Channel **channels,
696 unsigned int num_channels,
697 EXTRACTOR_ChannelMessageProcessor proc,
698 void *proc_cls)
699 {
700 DWORD ms;
701 DWORD first_ready;
702 DWORD dwresult;
703 DWORD bytes_read;
704 BOOL bresult;
705 unsigned int i;
706 unsigned int c;
707 char *ndata;
708 HANDLE events[MAXIMUM_WAIT_OBJECTS];
709 int closed_channel;
710
711 c = 0;
712 for (i = 0; i < num_channels; i++)
713 {
714 if (NULL == channels[i])
715 continue;
716 if (MAXIMUM_WAIT_OBJECTS == c)
717 return -1;
718 if (WaitForSingleObject (channels[i]->ov_read.hEvent, 0) == WAIT_OBJECT_0)
719 {
720 ResetEvent (channels[i]->ov_read.hEvent);
721 bresult = ReadFile (channels[i]->cpipe_out, &i, 0, &bytes_read,
722 &channels[i]->ov_read);
723 if (bresult == TRUE)
724 {
725 SetEvent (channels[i]->ov_read.hEvent);
726 }
727 else
728 {
729 DWORD err = GetLastError ();
730 if (err != ERROR_IO_PENDING)
731 SetEvent (channels[i]->ov_read.hEvent);
732 }
733 }
734 events[c] = channels[i]->ov_read.hEvent;
735 c++;
736 }
737
738 if (c == 0)
739 return 1; /* nothing left to do! */
740
741 ms = 500;
742 first_ready = WaitForMultipleObjects (c, events, FALSE, ms);
743 if ((first_ready == WAIT_TIMEOUT) || (first_ready == WAIT_FAILED))
744 {
745 /* an error or timeout -> something's wrong or all plugins hung up */
746 closed_channel = 0;
747 for (i = 0; i < num_channels; i++)
748 {
749 struct EXTRACTOR_Channel *channel = channels[i];
750 if (NULL == channel)
751 continue;
752 if (-1 == channel->plugin->seek_request)
753 {
754 /* plugin blocked for too long, kill the channel */
755 LOG ("Channel blocked, closing channel to %s\n",
756 channel->plugin->libname);
757 channel->plugin->channel = NULL;
758 channel->plugin->round_finished = 1;
759 EXTRACTOR_IPC_channel_destroy_ (channel);
760 channels[i] = NULL;
761 closed_channel = 1;
762 }
763 }
764 if (1 == closed_channel)
765 return 1;
766 LOG_STRERROR ("WaitForMultipleObjects");
767 return -1;
768 }
769
770 i = 0;
771 for (i = 0; i < num_channels; i++)
772 {
773 if (NULL == channels[i])
774 continue;
775 dwresult = WaitForSingleObject (channels[i]->ov_read.hEvent, 0);
776 if (dwresult == WAIT_OBJECT_0)
777 {
778 int ret;
779 if (channels[i]->mdata_size == channels[i]->size)
780 {
781 /* not enough space, need to grow allocation (if allowed) */
782 if (MAX_META_DATA == channels[i]->mdata_size)
783 {
784 LOG ("Inbound message from channel too large, aborting\n");
785 EXTRACTOR_IPC_channel_destroy_ (channels[i]);
786 channels[i] = NULL;
787 }
788 channels[i]->mdata_size *= 2;
789 if (channels[i]->mdata_size > MAX_META_DATA)
790 channels[i]->mdata_size = MAX_META_DATA;
791 if (NULL == (ndata = realloc (channels[i]->mdata,
792 channels[i]->mdata_size)))
793 {
794 LOG_STRERROR ("realloc");
795 EXTRACTOR_IPC_channel_destroy_ (channels[i]);
796 channels[i] = NULL;
797 }
798 channels[i]->mdata = ndata;
799 }
800 bresult = ReadFile (channels[i]->cpipe_out,
801 &channels[i]->mdata[channels[i]->size],
802 channels[i]->mdata_size - channels[i]->size,
803 &bytes_read, NULL);
804 if (bresult)
805 ret = EXTRACTOR_IPC_process_reply_ (channels[i]->plugin,
806 channels[i]->mdata,
807 channels[i]->size + bytes_read,
808 proc, proc_cls);
809 if (! bresult || (-1 == ret))
810 {
811 DWORD error = GetLastError ();
812 SetErrnoFromWinError (error);
813 if (! bresult)
814 LOG_STRERROR ("ReadFile");
815 EXTRACTOR_IPC_channel_destroy_ (channels[i]);
816 channels[i] = NULL;
817 }
818 else
819 {
820 memmove (channels[i]->mdata, &channels[i]->mdata[ret],
821 channels[i]->size + bytes_read - ret);
822 channels[i]->size = channels[i]->size + bytes_read - ret;
823 }
824 }
825 }
826 return 1;
827 }