"Fossies" - the Fresh Open Source Software Archive 
Member "libextractor-1.11/src/main/extractor_ipc_gnu.c" (30 Jan 2021, 14893 Bytes) of package /linux/privat/libextractor-1.11.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) C and C++ source code syntax highlighting (style:
standard) with prefixed line numbers and
code folding option.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "extractor_ipc_gnu.c" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
1.10_vs_1.11.
1 /*
2 This file is part of libextractor.
3 Copyright (C) 2012 Vidyut Samanta and Christian Grothoff
4
5 libextractor is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 libextractor is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with libextractor; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
19 */
20 /**
21 * @file main/extractor_ipc_gnu.c
22 * @brief IPC with plugin for GNU/POSIX systems
23 * @author Christian Grothoff
24 */
25 #include "platform.h"
26 #include "extractor.h"
27 #include "extractor_datasource.h"
28 #include "extractor_logging.h"
29 #include "extractor_plugin_main.h"
30 #include "extractor_plugins.h"
31 #include "extractor_ipc.h"
32 #include <dirent.h>
33 #include <sys/types.h>
34 #include <sys/wait.h>
35 #include <sys/shm.h>
36 #include <signal.h>
37 #if HAVE_SYS_APPARMOR_H
38 #include <sys/apparmor.h>
39 #endif
40
41 /**
42 * A shared memory resource (often shared with several
43 * other processes).
44 */
45 struct EXTRACTOR_SharedMemory
46 {
47 /**
48 * Pointer to the mapped region of the shm (covers the whole shm)
49 */
50 void *shm_ptr;
51
52 /**
53 * Allocated size of the shm
54 */
55 size_t shm_size;
56
57 /**
58 * POSIX id of the shm into which data is uncompressed
59 */
60 int shm_id;
61
62 /**
63 * Name of the shm
64 */
65 char shm_name[MAX_SHM_NAME + 1];
66
67 /**
68 * Reference counter describing how many references share this SHM.
69 */
70 unsigned int rc;
71
72 };
73
74
75 /**
76 * Definition of an IPC communication channel with
77 * some plugin.
78 */
79 struct EXTRACTOR_Channel
80 {
81
82 /**
83 * Buffer for reading data from the plugin.
84 */
85 char *mdata;
86
87 /**
88 * Size of the @e mdata buffer.
89 */
90 size_t mdata_size;
91
92 /**
93 * Memory segment shared with this process.
94 */
95 struct EXTRACTOR_SharedMemory *shm;
96
97 /**
98 * The plugin this channel is to communicate with.
99 */
100 struct EXTRACTOR_PluginList *plugin;
101
102 /**
103 * Pipe used to communicate information to the plugin child process.
104 * NULL if not initialized.
105 */
106 int cpipe_in;
107
108 /**
109 * Number of valid bytes in the channel's buffer.
110 */
111 size_t size;
112
113 /**
114 * Pipe used to read information about extracted meta data from
115 * the plugin child process. -1 if not initialized.
116 */
117 int cpipe_out;
118
119 /**
120 * Process ID of the child process for this plugin. 0 for none.
121 */
122 pid_t cpid;
123
124 };
125
126
127 /**
128 * Create a shared memory area.
129 *
130 * @param size size of the shared area
131 * @return NULL on error
132 */
133 struct EXTRACTOR_SharedMemory *
134 EXTRACTOR_IPC_shared_memory_create_ (size_t size)
135 {
136 struct EXTRACTOR_SharedMemory *shm;
137 const char *tpath;
138
139 if (NULL == (shm = malloc (sizeof (struct EXTRACTOR_SharedMemory))))
140 {
141 LOG_STRERROR ("malloc");
142 return NULL;
143 }
144 #if SOMEBSD
145 /* this works on FreeBSD, not sure about others... */
146 tpath = getenv ("TMPDIR");
147 if (NULL == tpath)
148 tpath = "/tmp/";
149 #else
150 tpath = "/"; /* Linux */
151 #endif
152 snprintf (shm->shm_name,
153 MAX_SHM_NAME,
154 "%sLE-%u-%u",
155 tpath,
156 getpid (),
157 (unsigned int) random ());
158 if (-1 == (shm->shm_id = shm_open (shm->shm_name,
159 O_RDWR | O_CREAT,
160 S_IRUSR | S_IWUSR)))
161 {
162 LOG_STRERROR_FILE ("shm_open",
163 shm->shm_name);
164 free (shm);
165 return NULL;
166 }
167 if ( (0 != ftruncate (shm->shm_id, size)) ||
168 (NULL == (shm->shm_ptr = mmap (NULL,
169 size,
170 PROT_WRITE,
171 MAP_SHARED,
172 shm->shm_id,
173 0))) ||
174 (((void*) -1) == shm->shm_ptr) )
175 {
176 LOG_STRERROR ("ftruncate/mmap");
177 (void) close (shm->shm_id);
178 (void) shm_unlink (shm->shm_name);
179 free (shm);
180 return NULL;
181 }
182 shm->shm_size = size;
183 shm->rc = 0;
184 return shm;
185 }
186
187
188 /**
189 * Change the reference counter for this shm instance.
190 *
191 * @param shm instance to update
192 * @param delta value to change RC by
193 * @return new RC
194 */
195 unsigned int
196 EXTRACTOR_IPC_shared_memory_change_rc_ (struct EXTRACTOR_SharedMemory *shm,
197 int delta)
198 {
199 shm->rc += delta;
200 return shm->rc;
201 }
202
203
204 /**
205 * Destroy shared memory area.
206 *
207 * @param shm memory area to destroy
208 * @return NULL on error
209 */
210 void
211 EXTRACTOR_IPC_shared_memory_destroy_ (struct EXTRACTOR_SharedMemory *shm)
212 {
213 munmap (shm->shm_ptr,
214 shm->shm_size);
215 (void) close (shm->shm_id);
216 (void) shm_unlink (shm->shm_name);
217 free (shm);
218 }
219
220
221 /**
222 * Initialize shared memory area from data source.
223 *
224 * @param shm memory area to initialize
225 * @param ds data source to use for initialization
226 * @param off offset to use in data source
227 * @param size number of bytes to copy
228 * @return -1 on error, otherwise number of bytes copied
229 */
230 ssize_t
231 EXTRACTOR_IPC_shared_memory_set_ (struct EXTRACTOR_SharedMemory *shm,
232 struct EXTRACTOR_Datasource *ds,
233 uint64_t off,
234 size_t size)
235 {
236 if (-1 ==
237 EXTRACTOR_datasource_seek_ (ds,
238 off,
239 SEEK_SET))
240 {
241 LOG ("Failed to set IPC memory due to seek error\n");
242 return -1;
243 }
244 if (size > shm->shm_size)
245 size = shm->shm_size;
246 return EXTRACTOR_datasource_read_ (ds,
247 shm->shm_ptr,
248 size);
249 }
250
251
252 /**
253 * Query datasource for current position
254 *
255 * @param ds data source to query
256 * @return current position in the datasource or UINT_MAX on error
257 */
258 uint64_t
259 EXTRACTOR_datasource_get_pos_ (struct EXTRACTOR_Datasource *ds)
260 {
261 int64_t pos = EXTRACTOR_datasource_seek_ (ds,
262 0,
263 SEEK_CUR);
264
265 if (-1 == pos)
266 return UINT_MAX;
267 return pos;
268 }
269
270
271 /**
272 * Create a channel to communicate with a process wrapping
273 * the plugin of the given name. Starts the process as well.
274 *
275 * @param plugin the plugin
276 * @param shm memory to share with the process
277 * @return NULL on error, otherwise IPC channel
278 */
279 struct EXTRACTOR_Channel *
280 EXTRACTOR_IPC_channel_create_ (struct EXTRACTOR_PluginList *plugin,
281 struct EXTRACTOR_SharedMemory *shm)
282 {
283 struct EXTRACTOR_Channel *channel;
284 int p1[2];
285 int p2[2];
286 pid_t pid;
287 struct InitMessage *init;
288 size_t slen;
289
290 if (NULL == (channel = malloc (sizeof (struct EXTRACTOR_Channel))))
291 {
292 LOG_STRERROR ("malloc");
293 return NULL;
294 }
295 channel->mdata_size = 1024;
296 if (NULL == (channel->mdata = malloc (channel->mdata_size)))
297 {
298 LOG_STRERROR ("malloc");
299 free (channel);
300 return NULL;
301 }
302 channel->shm = shm;
303 channel->plugin = plugin;
304 channel->size = 0;
305 if (0 != pipe (p1))
306 {
307 LOG_STRERROR ("pipe");
308 free (channel->mdata);
309 free (channel);
310 return NULL;
311 }
312 if (0 != pipe (p2))
313 {
314 LOG_STRERROR ("pipe");
315 (void) close (p1[0]);
316 (void) close (p1[1]);
317 free (channel->mdata);
318 free (channel);
319 return NULL;
320 }
321 pid = fork ();
322 if (pid == -1)
323 {
324 LOG_STRERROR ("fork");
325 (void) close (p1[0]);
326 (void) close (p1[1]);
327 (void) close (p2[0]);
328 (void) close (p2[1]);
329 free (channel->mdata);
330 free (channel);
331 return NULL;
332 }
333 if (0 == pid)
334 {
335 (void) close (p1[1]);
336 (void) close (p2[0]);
337 free (channel->mdata);
338 free (channel);
339 #if HAVE_SYS_APPARMOR_H
340 #if HAVE_APPARMOR
341 if (0 > aa_change_profile ("libextractor"))
342 {
343 int eno = errno;
344
345 if ( (EINVAL != eno) &&
346 (ENOENT != eno) )
347 {
348 fprintf (stderr,
349 "Failure changing AppArmor profile: %s\n",
350 strerror (errno));
351 _exit (1);
352 }
353 }
354 #endif
355 #endif
356 EXTRACTOR_plugin_main_ (plugin, p1[0], p2[1]);
357 _exit (0);
358 }
359 (void) close (p1[0]);
360 (void) close (p2[1]);
361 channel->cpipe_in = p1[1];
362 channel->cpipe_out = p2[0];
363 channel->cpid = pid;
364 slen = strlen (shm->shm_name) + 1;
365 if (NULL == (init = malloc (sizeof (struct InitMessage) + slen)))
366 {
367 LOG_STRERROR ("malloc");
368 EXTRACTOR_IPC_channel_destroy_ (channel);
369 return NULL;
370 }
371 init->opcode = MESSAGE_INIT_STATE;
372 init->reserved = 0;
373 init->reserved2 = 0;
374 init->shm_name_length = slen;
375 init->shm_map_size = shm->shm_size;
376 memcpy (&init[1], shm->shm_name, slen);
377 if (sizeof (struct InitMessage) + slen !=
378 EXTRACTOR_IPC_channel_send_ (channel,
379 init,
380 sizeof (struct InitMessage) + slen) )
381 {
382 LOG ("Failed to send INIT_STATE message to plugin\n");
383 EXTRACTOR_IPC_channel_destroy_ (channel);
384 free (init);
385 return NULL;
386 }
387 free (init);
388 return channel;
389 }
390
391
392 /**
393 * Destroy communication channel with a plugin/process. Also
394 * destroys the process.
395 *
396 * @param channel channel to communicate with the plugin
397 */
398 void
399 EXTRACTOR_IPC_channel_destroy_ (struct EXTRACTOR_Channel *channel)
400 {
401 int status;
402
403 if (0 != kill (channel->cpid, SIGKILL))
404 LOG_STRERROR ("kill");
405 if (-1 == waitpid (channel->cpid, &status, 0))
406 LOG_STRERROR ("waitpid");
407 if (0 != close (channel->cpipe_out))
408 LOG_STRERROR ("close");
409 if (0 != close (channel->cpipe_in))
410 LOG_STRERROR ("close");
411 if (NULL != channel->plugin)
412 channel->plugin->channel = NULL;
413 free (channel->mdata);
414 free (channel);
415 }
416
417
418 /**
419 * Send data via the given IPC channel (blocking).
420 *
421 * @param channel channel to communicate with the plugin
422 * @param buf data to send
423 * @param size number of bytes in buf to send
424 * @return -1 on error, number of bytes sent on success
425 * (never does partial writes)
426 */
427 ssize_t
428 EXTRACTOR_IPC_channel_send_ (struct EXTRACTOR_Channel *channel,
429 const void *data,
430 size_t size)
431 {
432 const char *cdata = data;
433 size_t off = 0;
434 ssize_t ret;
435
436 while (off < size)
437 {
438 ret = write (channel->cpipe_in, &cdata[off], size - off);
439 if (ret <= 0)
440 {
441 if (-1 == ret)
442 LOG_STRERROR ("write");
443 return -1;
444 }
445 off += ret;
446 }
447 return size;
448 }
449
450
451 /**
452 * Receive data from any of the given IPC channels (blocking).
453 * Wait for one of the plugins to reply.
454 * Selects on plugin output pipes, runs 'receive_reply'
455 * on each activated pipe until it gets a seek request
456 * or a done message. Called repeatedly by the user until all pipes are dry or
457 * broken.
458 *
459 * @param channels array of channels, channels that break may be set to NULL
460 * @param num_channels length of the @a channels array
461 * @param proc function to call to process messages (may be called
462 * more than once)
463 * @param proc_cls closure for @a proc
464 * @return -1 on error, 1 on success
465 */
466 int
467 EXTRACTOR_IPC_channel_recv_ (struct EXTRACTOR_Channel **channels,
468 unsigned int num_channels,
469 EXTRACTOR_ChannelMessageProcessor proc,
470 void *proc_cls)
471 {
472 struct timeval tv;
473 fd_set to_check;
474 int max;
475 unsigned int i;
476 struct EXTRACTOR_Channel *channel;
477 ssize_t ret;
478 ssize_t iret;
479 char *ndata;
480 int closed_channel;
481
482 FD_ZERO (&to_check);
483 max = -1;
484 for (i = 0; i<num_channels; i++)
485 {
486 channel = channels[i];
487 if (NULL == channel)
488 continue;
489 FD_SET (channel->cpipe_out, &to_check);
490 if (max < channel->cpipe_out)
491 max = channel->cpipe_out;
492 }
493 if (-1 == max)
494 {
495 return 1; /* nothing left to do! */
496 }
497 tv.tv_sec = 0;
498 tv.tv_usec = 500000; /* 500 ms */
499 if (0 >= select (max + 1, &to_check, NULL, NULL, &tv))
500 {
501 /* an error or timeout -> something's wrong or all plugins hung up */
502 closed_channel = 0;
503 for (i = 0; i<num_channels; i++)
504 {
505 channel = channels[i];
506 if (NULL == channel)
507 continue;
508 if (-1 == channel->plugin->seek_request)
509 {
510 /* plugin blocked for too long, kill channel */
511 LOG ("Channel blocked, closing channel to %s\n",
512 channel->plugin->libname);
513 channel->plugin->channel = NULL;
514 channel->plugin->round_finished = 1;
515 EXTRACTOR_IPC_channel_destroy_ (channel);
516 channels[i] = NULL;
517 closed_channel = 1;
518 }
519 }
520 if (1 == closed_channel)
521 return 1;
522 /* strange, no channel is to blame, let's die just to be safe */
523 if ((EINTR != errno) && (0 != errno))
524 LOG_STRERROR ("select");
525 return -1;
526 }
527 for (i = 0; i<num_channels; i++)
528 {
529 channel = channels[i];
530 if (NULL == channel)
531 continue;
532 if (! FD_ISSET (channel->cpipe_out, &to_check))
533 continue;
534 if (channel->mdata_size == channel->size)
535 {
536 /* not enough space, need to grow allocation (if allowed) */
537 if (MAX_META_DATA == channel->mdata_size)
538 {
539 LOG ("Inbound message from channel too large, aborting\n");
540 EXTRACTOR_IPC_channel_destroy_ (channel);
541 channels[i] = NULL;
542 continue;
543 }
544 channel->mdata_size *= 2;
545 if (channel->mdata_size > MAX_META_DATA)
546 channel->mdata_size = MAX_META_DATA;
547 if (NULL == (ndata = realloc (channel->mdata,
548 channel->mdata_size)))
549 {
550 LOG_STRERROR ("realloc");
551 EXTRACTOR_IPC_channel_destroy_ (channel);
552 channels[i] = NULL;
553 continue;
554 }
555 channel->mdata = ndata;
556 }
557 if ( (-1 == (iret = read (channel->cpipe_out,
558 &channel->mdata[channel->size],
559 channel->mdata_size - channel->size)) ) ||
560 (0 == iret) ||
561 (-1 == (ret = EXTRACTOR_IPC_process_reply_ (channel->plugin,
562 channel->mdata,
563 channel->size + iret,
564 proc, proc_cls)) ) )
565 {
566 if (-1 == iret)
567 LOG_STRERROR ("read");
568 LOG ("Read error from channel, closing channel %s\n",
569 channel->plugin->libname);
570 EXTRACTOR_IPC_channel_destroy_ (channel);
571 channels[i] = NULL;
572 continue;
573 }
574 else
575 {
576 channel->size = channel->size + iret - ret;
577 memmove (channel->mdata,
578 &channel->mdata[ret],
579 channel->size);
580 }
581 }
582 return 1;
583 }
584
585
586 /* end of extractor_ipc_gnu.c */