libextractor  1.11
About: GNU libextractor is a library used to extract meta-data from files of arbitrary type.
  Fossies Dox: libextractor-1.11.tar.gz  ("unofficial" and yet experimental doxygen-generated source code documentation)  

extractor_ipc_gnu.c
Go to the documentation of this file.
1 /*
2  This file is part of libextractor.
3  Copyright (C) 2012 Vidyut Samanta and Christian Grothoff
4 
5  libextractor is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published
7  by the Free Software Foundation; either version 3, or (at your
8  option) any later version.
9 
10  libextractor is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  General Public License for more details.
14 
15  You should have received a copy of the GNU General Public License
16  along with libextractor; see the file COPYING. If not, write to the
17  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18  Boston, MA 02110-1301, USA.
19  */
20 /**
21  * @file main/extractor_ipc_gnu.c
22  * @brief IPC with plugin for GNU/POSIX systems
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "extractor.h"
27 #include "extractor_datasource.h"
28 #include "extractor_logging.h"
29 #include "extractor_plugin_main.h"
30 #include "extractor_plugins.h"
31 #include "extractor_ipc.h"
32 #include <dirent.h>
33 #include <sys/types.h>
34 #include <sys/wait.h>
35 #include <sys/shm.h>
36 #include <signal.h>
37 #if HAVE_SYS_APPARMOR_H
38 #include <sys/apparmor.h>
39 #endif
40 
41 /**
42  * A shared memory resource (often shared with several
43  * other processes).
44  */
46 {
47  /**
48  * Pointer to the mapped region of the shm (covers the whole shm)
49  */
50  void *shm_ptr;
51 
52  /**
53  * Allocated size of the shm
54  */
55  size_t shm_size;
56 
57  /**
58  * POSIX id of the shm into which data is uncompressed
59  */
60  int shm_id;
61 
62  /**
63  * Name of the shm
64  */
66 
67  /**
68  * Reference counter describing how many references share this SHM.
69  */
70  unsigned int rc;
71 
72 };
73 
74 
75 /**
76  * Definition of an IPC communication channel with
77  * some plugin.
78  */
80 {
81 
82  /**
83  * Buffer for reading data from the plugin.
84  */
85  char *mdata;
86 
87  /**
88  * Size of the @e mdata buffer.
89  */
90  size_t mdata_size;
91 
92  /**
93  * Memory segment shared with this process.
94  */
96 
97  /**
98  * The plugin this channel is to communicate with.
99  */
101 
102  /**
103  * Pipe used to communicate information to the plugin child process.
104  * NULL if not initialized.
105  */
106  int cpipe_in;
107 
108  /**
109  * Number of valid bytes in the channel's buffer.
110  */
111  size_t size;
112 
113  /**
114  * Pipe used to read information about extracted meta data from
115  * the plugin child process. -1 if not initialized.
116  */
118 
119  /**
120  * Process ID of the child process for this plugin. 0 for none.
121  */
122  pid_t cpid;
123 
124 };
125 
126 
127 /**
128  * Create a shared memory area.
129  *
130  * @param size size of the shared area
131  * @return NULL on error
132  */
133 struct EXTRACTOR_SharedMemory *
135 {
136  struct EXTRACTOR_SharedMemory *shm;
137  const char *tpath;
138 
139  if (NULL == (shm = malloc (sizeof (struct EXTRACTOR_SharedMemory))))
140  {
141  LOG_STRERROR ("malloc");
142  return NULL;
143  }
144 #if SOMEBSD
145  /* this works on FreeBSD, not sure about others... */
146  tpath = getenv ("TMPDIR");
147  if (NULL == tpath)
148  tpath = "/tmp/";
149 #else
150  tpath = "/"; /* Linux */
151 #endif
152  snprintf (shm->shm_name,
153  MAX_SHM_NAME,
154  "%sLE-%u-%u",
155  tpath,
156  getpid (),
157  (unsigned int) random ());
158  if (-1 == (shm->shm_id = shm_open (shm->shm_name,
159  O_RDWR | O_CREAT,
160  S_IRUSR | S_IWUSR)))
161  {
162  LOG_STRERROR_FILE ("shm_open",
163  shm->shm_name);
164  free (shm);
165  return NULL;
166  }
167  if ( (0 != ftruncate (shm->shm_id, size)) ||
168  (NULL == (shm->shm_ptr = mmap (NULL,
169  size,
170  PROT_WRITE,
171  MAP_SHARED,
172  shm->shm_id,
173  0))) ||
174  (((void*) -1) == shm->shm_ptr) )
175  {
176  LOG_STRERROR ("ftruncate/mmap");
177  (void) close (shm->shm_id);
178  (void) shm_unlink (shm->shm_name);
179  free (shm);
180  return NULL;
181  }
182  shm->shm_size = size;
183  shm->rc = 0;
184  return shm;
185 }
186 
187 
188 /**
189  * Change the reference counter for this shm instance.
190  *
191  * @param shm instance to update
192  * @param delta value to change RC by
193  * @return new RC
194  */
195 unsigned int
197  int delta)
198 {
199  shm->rc += delta;
200  return shm->rc;
201 }
202 
203 
204 /**
205  * Destroy shared memory area.
206  *
207  * @param shm memory area to destroy
208  * @return NULL on error
209  */
210 void
212 {
213  munmap (shm->shm_ptr,
214  shm->shm_size);
215  (void) close (shm->shm_id);
216  (void) shm_unlink (shm->shm_name);
217  free (shm);
218 }
219 
220 
221 /**
222  * Initialize shared memory area from data source.
223  *
224  * @param shm memory area to initialize
225  * @param ds data source to use for initialization
226  * @param off offset to use in data source
227  * @param size number of bytes to copy
228  * @return -1 on error, otherwise number of bytes copied
229  */
230 ssize_t
232  struct EXTRACTOR_Datasource *ds,
233  uint64_t off,
234  size_t size)
235 {
236  if (-1 ==
238  off,
239  SEEK_SET))
240  {
241  LOG ("Failed to set IPC memory due to seek error\n");
242  return -1;
243  }
244  if (size > shm->shm_size)
245  size = shm->shm_size;
246  return EXTRACTOR_datasource_read_ (ds,
247  shm->shm_ptr,
248  size);
249 }
250 
251 
252 /**
253  * Query datasource for current position
254  *
255  * @param ds data source to query
256  * @return current position in the datasource or UINT_MAX on error
257  */
258 uint64_t
260 {
261  int64_t pos = EXTRACTOR_datasource_seek_ (ds,
262  0,
263  SEEK_CUR);
264 
265  if (-1 == pos)
266  return UINT_MAX;
267  return pos;
268 }
269 
270 
271 /**
272  * Create a channel to communicate with a process wrapping
273  * the plugin of the given name. Starts the process as well.
274  *
275  * @param plugin the plugin
276  * @param shm memory to share with the process
277  * @return NULL on error, otherwise IPC channel
278  */
279 struct EXTRACTOR_Channel *
281  struct EXTRACTOR_SharedMemory *shm)
282 {
283  struct EXTRACTOR_Channel *channel;
284  int p1[2];
285  int p2[2];
286  pid_t pid;
287  struct InitMessage *init;
288  size_t slen;
289 
290  if (NULL == (channel = malloc (sizeof (struct EXTRACTOR_Channel))))
291  {
292  LOG_STRERROR ("malloc");
293  return NULL;
294  }
295  channel->mdata_size = 1024;
296  if (NULL == (channel->mdata = malloc (channel->mdata_size)))
297  {
298  LOG_STRERROR ("malloc");
299  free (channel);
300  return NULL;
301  }
302  channel->shm = shm;
303  channel->plugin = plugin;
304  channel->size = 0;
305  if (0 != pipe (p1))
306  {
307  LOG_STRERROR ("pipe");
308  free (channel->mdata);
309  free (channel);
310  return NULL;
311  }
312  if (0 != pipe (p2))
313  {
314  LOG_STRERROR ("pipe");
315  (void) close (p1[0]);
316  (void) close (p1[1]);
317  free (channel->mdata);
318  free (channel);
319  return NULL;
320  }
321  pid = fork ();
322  if (pid == -1)
323  {
324  LOG_STRERROR ("fork");
325  (void) close (p1[0]);
326  (void) close (p1[1]);
327  (void) close (p2[0]);
328  (void) close (p2[1]);
329  free (channel->mdata);
330  free (channel);
331  return NULL;
332  }
333  if (0 == pid)
334  {
335  (void) close (p1[1]);
336  (void) close (p2[0]);
337  free (channel->mdata);
338  free (channel);
339 #if HAVE_SYS_APPARMOR_H
340 #if HAVE_APPARMOR
341  if (0 > aa_change_profile ("libextractor"))
342  {
343  int eno = errno;
344 
345  if ( (EINVAL != eno) &&
346  (ENOENT != eno) )
347  {
348  fprintf (stderr,
349  "Failure changing AppArmor profile: %s\n",
350  strerror (errno));
351  _exit (1);
352  }
353  }
354 #endif
355 #endif
356  EXTRACTOR_plugin_main_ (plugin, p1[0], p2[1]);
357  _exit (0);
358  }
359  (void) close (p1[0]);
360  (void) close (p2[1]);
361  channel->cpipe_in = p1[1];
362  channel->cpipe_out = p2[0];
363  channel->cpid = pid;
364  slen = strlen (shm->shm_name) + 1;
365  if (NULL == (init = malloc (sizeof (struct InitMessage) + slen)))
366  {
367  LOG_STRERROR ("malloc");
369  return NULL;
370  }
371  init->opcode = MESSAGE_INIT_STATE;
372  init->reserved = 0;
373  init->reserved2 = 0;
374  init->shm_name_length = slen;
375  init->shm_map_size = shm->shm_size;
376  memcpy (&init[1], shm->shm_name, slen);
377  if (sizeof (struct InitMessage) + slen !=
379  init,
380  sizeof (struct InitMessage) + slen) )
381  {
382  LOG ("Failed to send INIT_STATE message to plugin\n");
384  free (init);
385  return NULL;
386  }
387  free (init);
388  return channel;
389 }
390 
391 
392 /**
393  * Destroy communication channel with a plugin/process. Also
394  * destroys the process.
395  *
396  * @param channel channel to communicate with the plugin
397  */
398 void
400 {
401  int status;
402 
403  if (0 != kill (channel->cpid, SIGKILL))
404  LOG_STRERROR ("kill");
405  if (-1 == waitpid (channel->cpid, &status, 0))
406  LOG_STRERROR ("waitpid");
407  if (0 != close (channel->cpipe_out))
408  LOG_STRERROR ("close");
409  if (0 != close (channel->cpipe_in))
410  LOG_STRERROR ("close");
411  if (NULL != channel->plugin)
412  channel->plugin->channel = NULL;
413  free (channel->mdata);
414  free (channel);
415 }
416 
417 
418 /**
419  * Send data via the given IPC channel (blocking).
420  *
421  * @param channel channel to communicate with the plugin
422  * @param buf data to send
423  * @param size number of bytes in buf to send
424  * @return -1 on error, number of bytes sent on success
425  * (never does partial writes)
426  */
427 ssize_t
429  const void *data,
430  size_t size)
431 {
432  const char *cdata = data;
433  size_t off = 0;
434  ssize_t ret;
435 
436  while (off < size)
437  {
438  ret = write (channel->cpipe_in, &cdata[off], size - off);
439  if (ret <= 0)
440  {
441  if (-1 == ret)
442  LOG_STRERROR ("write");
443  return -1;
444  }
445  off += ret;
446  }
447  return size;
448 }
449 
450 
451 /**
452  * Receive data from any of the given IPC channels (blocking).
453  * Wait for one of the plugins to reply.
454  * Selects on plugin output pipes, runs 'receive_reply'
455  * on each activated pipe until it gets a seek request
456  * or a done message. Called repeatedly by the user until all pipes are dry or
457  * broken.
458  *
459  * @param channels array of channels, channels that break may be set to NULL
460  * @param num_channels length of the @a channels array
461  * @param proc function to call to process messages (may be called
462  * more than once)
463  * @param proc_cls closure for @a proc
464  * @return -1 on error, 1 on success
465  */
466 int
468  unsigned int num_channels,
470  void *proc_cls)
471 {
472  struct timeval tv;
473  fd_set to_check;
474  int max;
475  unsigned int i;
476  struct EXTRACTOR_Channel *channel;
477  ssize_t ret;
478  ssize_t iret;
479  char *ndata;
480  int closed_channel;
481 
482  FD_ZERO (&to_check);
483  max = -1;
484  for (i = 0; i<num_channels; i++)
485  {
486  channel = channels[i];
487  if (NULL == channel)
488  continue;
489  FD_SET (channel->cpipe_out, &to_check);
490  if (max < channel->cpipe_out)
491  max = channel->cpipe_out;
492  }
493  if (-1 == max)
494  {
495  return 1; /* nothing left to do! */
496  }
497  tv.tv_sec = 0;
498  tv.tv_usec = 500000; /* 500 ms */
499  if (0 >= select (max + 1, &to_check, NULL, NULL, &tv))
500  {
501  /* an error or timeout -> something's wrong or all plugins hung up */
502  closed_channel = 0;
503  for (i = 0; i<num_channels; i++)
504  {
505  channel = channels[i];
506  if (NULL == channel)
507  continue;
508  if (-1 == channel->plugin->seek_request)
509  {
510  /* plugin blocked for too long, kill channel */
511  LOG ("Channel blocked, closing channel to %s\n",
512  channel->plugin->libname);
513  channel->plugin->channel = NULL;
514  channel->plugin->round_finished = 1;
516  channels[i] = NULL;
517  closed_channel = 1;
518  }
519  }
520  if (1 == closed_channel)
521  return 1;
522  /* strange, no channel is to blame, let's die just to be safe */
523  if ((EINTR != errno) && (0 != errno))
524  LOG_STRERROR ("select");
525  return -1;
526  }
527  for (i = 0; i<num_channels; i++)
528  {
529  channel = channels[i];
530  if (NULL == channel)
531  continue;
532  if (! FD_ISSET (channel->cpipe_out, &to_check))
533  continue;
534  if (channel->mdata_size == channel->size)
535  {
536  /* not enough space, need to grow allocation (if allowed) */
537  if (MAX_META_DATA == channel->mdata_size)
538  {
539  LOG ("Inbound message from channel too large, aborting\n");
541  channels[i] = NULL;
542  continue;
543  }
544  channel->mdata_size *= 2;
545  if (channel->mdata_size > MAX_META_DATA)
546  channel->mdata_size = MAX_META_DATA;
547  if (NULL == (ndata = realloc (channel->mdata,
548  channel->mdata_size)))
549  {
550  LOG_STRERROR ("realloc");
552  channels[i] = NULL;
553  continue;
554  }
555  channel->mdata = ndata;
556  }
557  if ( (-1 == (iret = read (channel->cpipe_out,
558  &channel->mdata[channel->size],
559  channel->mdata_size - channel->size)) ) ||
560  (0 == iret) ||
561  (-1 == (ret = EXTRACTOR_IPC_process_reply_ (channel->plugin,
562  channel->mdata,
563  channel->size + iret,
564  proc, proc_cls)) ) )
565  {
566  if (-1 == iret)
567  LOG_STRERROR ("read");
568  LOG ("Read error from channel, closing channel %s\n",
569  channel->plugin->libname);
571  channels[i] = NULL;
572  continue;
573  }
574  else
575  {
576  channel->size = channel->size + iret - ret;
577  memmove (channel->mdata,
578  &channel->mdata[ret],
579  channel->size);
580  }
581  }
582  return 1;
583 }
584 
585 
586 /* end of extractor_ipc_gnu.c */
ssize_t EXTRACTOR_datasource_read_(void *cls, void *data, size_t size)
int64_t EXTRACTOR_datasource_seek_(void *cls, int64_t pos, int whence)
random access and possibly decompression of data from buffer in memory or file on disk
ssize_t EXTRACTOR_IPC_process_reply_(struct EXTRACTOR_PluginList *plugin, const void *data, size_t size, EXTRACTOR_ChannelMessageProcessor proc, void *proc_cls)
Definition: extractor_ipc.c:42
IPC with plugin (OS-independent API)
#define MAX_SHM_NAME
Definition: extractor_ipc.h:82
#define MAX_META_DATA
Definition: extractor_ipc.h:77
void(* EXTRACTOR_ChannelMessageProcessor)(void *cls, struct EXTRACTOR_PluginList *plugin, enum EXTRACTOR_MetaType meta_type, enum EXTRACTOR_MetaFormat meta_format, const char *mime, const void *value, size_t value_len)
#define MESSAGE_INIT_STATE
Definition: extractor_ipc.h:87
struct EXTRACTOR_SharedMemory * EXTRACTOR_IPC_shared_memory_create_(size_t size)
struct EXTRACTOR_Channel * EXTRACTOR_IPC_channel_create_(struct EXTRACTOR_PluginList *plugin, struct EXTRACTOR_SharedMemory *shm)
unsigned int EXTRACTOR_IPC_shared_memory_change_rc_(struct EXTRACTOR_SharedMemory *shm, int delta)
ssize_t EXTRACTOR_IPC_channel_send_(struct EXTRACTOR_Channel *channel, const void *data, size_t size)
void EXTRACTOR_IPC_shared_memory_destroy_(struct EXTRACTOR_SharedMemory *shm)
ssize_t EXTRACTOR_IPC_shared_memory_set_(struct EXTRACTOR_SharedMemory *shm, struct EXTRACTOR_Datasource *ds, uint64_t off, size_t size)
int EXTRACTOR_IPC_channel_recv_(struct EXTRACTOR_Channel **channels, unsigned int num_channels, EXTRACTOR_ChannelMessageProcessor proc, void *proc_cls)
void EXTRACTOR_IPC_channel_destroy_(struct EXTRACTOR_Channel *channel)
uint64_t EXTRACTOR_datasource_get_pos_(struct EXTRACTOR_Datasource *ds)
logging API for GNU libextractor
#define LOG(...)
#define LOG_STRERROR(syscall)
#define LOG_STRERROR_FILE(syscall, filename)
void EXTRACTOR_plugin_main_(struct EXTRACTOR_PluginList *plugin, int in, int out)
code to load plugins
#define NULL
Definition: getopt1.c:60
char * getenv()
plaform specifics
struct EXTRACTOR_SharedMemory * shm
struct EXTRACTOR_PluginList * plugin
struct EXTRACTOR_Channel * channel
uint16_t reserved2
unsigned char reserved
uint32_t shm_map_size
unsigned char opcode
Definition: extractor_ipc.h:97
uint32_t shm_name_length