openmpi  3.1.6
About: Open MPI is a high performance Message Passing Interface (MPI) library project combining technologies and resources from several other projects (FT-MPI, LA-MPI, LAM/MPI, and PACX-MPI) in order to build the best MPI library available. 3.x series.
  Fossies Dox: openmpi-3.1.6.tar.bz2  ("unofficial" and yet experimental doxygen-generated source code documentation)  

dfs_app.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2012-2013 Los Alamos National Security, LLC.
3  * All rights reserved.
4  * Copyright (c) 2014-2017 Intel, Inc. All rights reserved.
5  * Copyright (c) 2014 Research Organization for Information Science
6  * and Technology (RIST). All rights reserved.
7  * $COPYRIGHT$
8  *
9  * Additional copyrights may follow
10  *
11  * $HEADER$
12  */
13 
14 #include "orte_config.h"
15 
16 #include <sys/types.h>
17 #ifdef HAVE_UNISTD_H
18 #include <unistd.h>
19 #endif /* HAVE_UNISTD_H */
20 #include <string.h>
21 #ifdef HAVE_FCNTL_H
22 #include <fcntl.h>
23 #endif
24 #include <sys/stat.h>
25 
26 #include "opal/util/if.h"
27 #include "opal/util/output.h"
28 #include "opal/util/uri.h"
29 #include "opal/dss/dss.h"
30 #include "opal/mca/pmix/pmix.h"
31 
33 #include "orte/util/name_fns.h"
34 #include "orte/util/proc_info.h"
35 #include "orte/util/show_help.h"
36 #include "orte/util/threads.h"
38 #include "orte/mca/errmgr/errmgr.h"
39 #include "orte/mca/rml/rml.h"
40 
41 #include "orte/mca/dfs/base/base.h"
42 #include "dfs_app.h"
43 
44 /*
45  * Module functions: Global
46  */
47 static int init(void);
48 static int finalize(void);
49 
50 static void dfs_open(char *uri,
52  void *cbdata);
53 static void dfs_close(int fd,
55  void *cbdata);
56 static void dfs_get_file_size(int fd,
58  void *cbdata);
59 static void dfs_seek(int fd, long offset, int whence,
61  void *cbdata);
62 static void dfs_read(int fd, uint8_t *buffer,
63  long length,
65  void *cbdata);
66 static void dfs_post_file_map(opal_buffer_t *bo,
68  void *cbdata);
69 static void dfs_get_file_map(orte_process_name_t *target,
71  void *cbdata);
73  opal_buffer_t *bo,
75  void *cbdata);
78  void *cbdata);
79 
80 
84  init,
85  finalize,
86  dfs_open,
87  dfs_close,
89  dfs_seek,
90  dfs_read,
95 };
96 
98 static int local_fd = 0;
99 static uint64_t req_id = 0;
100 static void recv_dfs(int status, orte_process_name_t* sender,
101  opal_buffer_t* buffer, orte_rml_tag_t tag,
102  void* cbdata);
103 
104 static int init(void)
105 {
111  recv_dfs,
112  NULL);
113  return ORTE_SUCCESS;
114 }
115 
116 static int finalize(void)
117 {
118  opal_list_item_t *item;
119 
121  while (NULL != (item = opal_list_remove_first(&requests))) {
122  OBJ_RELEASE(item);
123  }
125  while (NULL != (item = opal_list_remove_first(&active_files))) {
126  OBJ_RELEASE(item);
127  }
129  return ORTE_SUCCESS;
130 }
131 
132 /* receives take place in an event, so we are free to process
133  * the request list without fear of getting things out-of-order
134  */
135 static void recv_dfs(int status, orte_process_name_t* sender,
136  opal_buffer_t* buffer, orte_rml_tag_t tag,
137  void* cbdata)
138 {
139  orte_dfs_cmd_t cmd;
140  int32_t cnt;
141  orte_dfs_request_t *dfs, *dptr;
142  opal_list_item_t *item;
143  int remote_fd, rc;
144  int64_t i64;
145  uint64_t rid;
146  orte_dfs_tracker_t *trk;
147 
148  /* unpack the command this message is responding to */
149  cnt = 1;
150  if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &cmd, &cnt, ORTE_DFS_CMD_T))) {
151  ORTE_ERROR_LOG(rc);
152  return;
153  }
154 
156  "%s recvd cmd %d from sender %s",
158  ORTE_NAME_PRINT(sender));
159 
160  switch (cmd) {
161  case ORTE_DFS_OPEN_CMD:
162  /* unpack the request id */
163  cnt = 1;
164  if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
165  ORTE_ERROR_LOG(rc);
166  return;
167  }
168  /* unpack the remote fd */
169  cnt = 1;
170  if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &remote_fd, &cnt, OPAL_INT))) {
171  ORTE_ERROR_LOG(rc);
172  return;
173  }
174  /* search our list of requests to find the matching one */
175  dfs = NULL;
176  for (item = opal_list_get_first(&requests);
177  item != opal_list_get_end(&requests);
178  item = opal_list_get_next(item)) {
179  dptr = (orte_dfs_request_t*)item;
180  if (dptr->id == rid) {
181  /* as the request has been fulfilled, remove it */
183  dfs = dptr;
184  break;
185  }
186  }
187  if (NULL == dfs) {
189  "%s recvd open file - no corresponding request found for local fd %d",
192  return;
193  }
194 
195  /* if the remote_fd < 0, then we had an error, so return
196  * the error value to the caller
197  */
198  if (remote_fd < 0) {
200  "%s recvd open file response error file %s [error: %d]",
202  dfs->uri, remote_fd);
203  if (NULL != dfs->open_cbfunc) {
204  dfs->open_cbfunc(remote_fd, dfs->cbdata);
205  }
206  /* release the request */
207  OBJ_RELEASE(dfs);
208  return;
209  }
210  /* otherwise, create a tracker for this file */
212  trk->requestor.jobid = ORTE_PROC_MY_NAME->jobid;
213  trk->requestor.vpid = ORTE_PROC_MY_NAME->vpid;
214  trk->host_daemon.jobid = sender->jobid;
215  trk->host_daemon.vpid = sender->vpid;
216  trk->uri = strdup(dfs->uri);
217  /* break the uri down into scheme and filename */
218  trk->scheme = opal_uri_get_scheme(dfs->uri);
219  trk->filename = opal_filename_from_uri(dfs->uri, NULL);
220  /* define the local fd */
221  trk->local_fd = local_fd++;
222  /* record the remote file descriptor */
223  trk->remote_fd = remote_fd;
224  /* add it to our list of active files */
226  /* return the local_fd to the caller for
227  * subsequent operations
228  */
230  "%s recvd open file completed for file %s [local fd: %d remote fd: %d]",
232  dfs->uri, trk->local_fd, remote_fd);
233  if (NULL != dfs->open_cbfunc) {
234  dfs->open_cbfunc(trk->local_fd, dfs->cbdata);
235  }
236  /* release the request */
237  OBJ_RELEASE(dfs);
238  break;
239 
240  case ORTE_DFS_SIZE_CMD:
241  /* unpack the request id for this request */
242  cnt = 1;
243  if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
244  ORTE_ERROR_LOG(rc);
245  return;
246  }
247  /* search our list of requests to find the matching one */
248  dfs = NULL;
249  for (item = opal_list_get_first(&requests);
250  item != opal_list_get_end(&requests);
251  item = opal_list_get_next(item)) {
252  dptr = (orte_dfs_request_t*)item;
253  if (dptr->id == rid) {
254  /* request was fulfilled, so remove it */
256  dfs = dptr;
257  break;
258  }
259  }
260  if (NULL == dfs) {
262  "%s recvd size - no corresponding request found for local fd %d",
265  return;
266  }
267  /* get the size */
268  cnt = 1;
269  if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
270  ORTE_ERROR_LOG(rc);
271  OBJ_RELEASE(dfs);
272  return;
273  }
274  /* pass it back to the original caller */
275  if (NULL != dfs->size_cbfunc) {
276  dfs->size_cbfunc(i64, dfs->cbdata);
277  }
278  /* release the request */
279  OBJ_RELEASE(dfs);
280  break;
281 
282  case ORTE_DFS_SEEK_CMD:
283  /* unpack the request id for this read */
284  cnt = 1;
285  if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
286  ORTE_ERROR_LOG(rc);
287  return;
288  }
289  /* search our list of requests to find the matching one */
290  dfs = NULL;
291  for (item = opal_list_get_first(&requests);
292  item != opal_list_get_end(&requests);
293  item = opal_list_get_next(item)) {
294  dptr = (orte_dfs_request_t*)item;
295  if (dptr->id == rid) {
296  /* request was fulfilled, so remove it */
298  dfs = dptr;
299  break;
300  }
301  }
302  if (NULL == dfs) {
304  "%s recvd seek - no corresponding request found for local fd %d",
307  return;
308  }
309  /* get the returned offset/status */
310  cnt = 1;
311  if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
312  ORTE_ERROR_LOG(rc);
313  OBJ_RELEASE(dfs);
314  return;
315  }
316  /* pass it back to the original caller */
317  if (NULL != dfs->seek_cbfunc) {
318  dfs->seek_cbfunc(i64, dfs->cbdata);
319  }
320  /* release the request */
321  OBJ_RELEASE(dfs);
322  break;
323 
324  case ORTE_DFS_READ_CMD:
325  /* unpack the request id for this read */
326  cnt = 1;
327  if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
328  ORTE_ERROR_LOG(rc);
329  return;
330  }
331  /* search our list of requests to find the matching one */
332  dfs = NULL;
333  for (item = opal_list_get_first(&requests);
334  item != opal_list_get_end(&requests);
335  item = opal_list_get_next(item)) {
336  dptr = (orte_dfs_request_t*)item;
337  if (dptr->id == rid) {
338  /* request was fulfilled, so remove it */
340  dfs = dptr;
341  break;
342  }
343  }
344  if (NULL == dfs) {
346  "%s recvd read - no corresponding request found for local fd %d",
349  return;
350  }
351  /* get the bytes read */
352  cnt = 1;
353  if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
354  ORTE_ERROR_LOG(rc);
355  OBJ_RELEASE(dfs);
356  return;
357  }
358  if (0 < i64) {
359  cnt = i64;
360  if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, dfs->read_buffer, &cnt, OPAL_UINT8))) {
361  ORTE_ERROR_LOG(rc);
362  OBJ_RELEASE(dfs);
363  return;
364  }
365  }
366  /* pass them back to the original caller */
367  if (NULL != dfs->read_cbfunc) {
368  dfs->read_cbfunc(i64, dfs->read_buffer, dfs->cbdata);
369  }
370  /* release the request */
371  OBJ_RELEASE(dfs);
372  break;
373 
374  case ORTE_DFS_POST_CMD:
375  /* unpack the request id for this read */
376  cnt = 1;
377  if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
378  ORTE_ERROR_LOG(rc);
379  return;
380  }
381  /* search our list of requests to find the matching one */
382  dfs = NULL;
383  for (item = opal_list_get_first(&requests);
384  item != opal_list_get_end(&requests);
385  item = opal_list_get_next(item)) {
386  dptr = (orte_dfs_request_t*)item;
387  if (dptr->id == rid) {
388  /* request was fulfilled, so remove it */
390  dfs = dptr;
391  break;
392  }
393  }
394  if (NULL == dfs) {
396  "%s recvd post - no corresponding request found",
399  return;
400  }
401  if (NULL != dfs->post_cbfunc) {
402  dfs->post_cbfunc(dfs->cbdata);
403  }
404  OBJ_RELEASE(dfs);
405  break;
406 
407  case ORTE_DFS_GETFM_CMD:
408  /* unpack the request id for this read */
409  cnt = 1;
410  if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
411  ORTE_ERROR_LOG(rc);
412  return;
413  }
414  /* search our list of requests to find the matching one */
415  dfs = NULL;
416  for (item = opal_list_get_first(&requests);
417  item != opal_list_get_end(&requests);
418  item = opal_list_get_next(item)) {
419  dptr = (orte_dfs_request_t*)item;
420  if (dptr->id == rid) {
421  /* request was fulfilled, so remove it */
423  dfs = dptr;
424  break;
425  }
426  }
427  if (NULL == dfs) {
429  "%s recvd getfm - no corresponding request found",
432  return;
433  }
434  /* return it to caller */
435  if (NULL != dfs->fm_cbfunc) {
436  dfs->fm_cbfunc(buffer, dfs->cbdata);
437  }
438  OBJ_RELEASE(dfs);
439  break;
440 
441  default:
442  opal_output(0, "APP:DFS:RECV WTF");
443  break;
444  }
445 }
446 
448 {
449  char *filename;
450  orte_dfs_tracker_t *trk;
451 
452  /* extract the filename from the uri */
453  if (NULL == (filename = opal_filename_from_uri(dfs->uri, NULL))) {
454  /* something wrong - error was reported, so just get out */
455  if (NULL != dfs->open_cbfunc) {
456  dfs->open_cbfunc(-1, dfs->cbdata);
457  }
458  OBJ_RELEASE(dfs);
459  return;
460  }
462  "%s opening local file %s",
464  filename);
465  /* attempt to open the file */
466  if (0 > (dfs->remote_fd = open(filename, O_RDONLY))) {
468  if (NULL != dfs->open_cbfunc) {
469  dfs->open_cbfunc(dfs->remote_fd, dfs->cbdata);
470  }
471  return;
472  }
473  /* otherwise, create a tracker for this file */
475  trk->requestor.jobid = ORTE_PROC_MY_NAME->jobid;
476  trk->requestor.vpid = ORTE_PROC_MY_NAME->vpid;
477  trk->uri = strdup(dfs->uri);
478  /* break the uri down into scheme and filename */
479  trk->scheme = opal_uri_get_scheme(dfs->uri);
480  trk->filename = strdup(filename);
481  /* define the local fd */
482  trk->local_fd = local_fd++;
483  /* record the remote file descriptor */
484  trk->remote_fd = dfs->remote_fd;
485  /* add it to our list of active files */
487  /* the file is locally hosted */
488  trk->host_daemon.jobid = ORTE_PROC_MY_DAEMON->jobid;
489  trk->host_daemon.vpid = ORTE_PROC_MY_DAEMON->vpid;
491  "%s local file %s mapped localfd %d to remotefd %d",
493  filename, trk->local_fd, trk->remote_fd);
494  /* let the caller know */
495  if (NULL != dfs->open_cbfunc) {
496  dfs->open_cbfunc(trk->local_fd, dfs->cbdata);
497  }
498  /* request will be released by the calling routing */
499 }
500 
501 static void process_opens(int fd, short args, void *cbdata)
502 {
504  int rc;
505  opal_buffer_t *buffer;
506  char *scheme, *host, *filename;
507  orte_process_name_t daemon;
508  opal_list_t lt;
509  opal_namelist_t *nm;
510 
512 
513  /* get the scheme to determine if we can process locally or not */
514  if (NULL == (scheme = opal_uri_get_scheme(dfs->uri))) {
516  goto complete;
517  }
518 
519  if (0 == strcmp(scheme, "nfs")) {
521  /* the callback was done in the above function */
522  OBJ_RELEASE(dfs);
523  return;
524  }
525 
526  if (0 != strcmp(scheme, "file")) {
527  /* not yet supported */
528  orte_show_help("orte_dfs_help.txt", "unsupported-filesystem",
529  true, dfs->uri);
530  goto complete;
531  }
532 
533  /* dissect the uri to extract host and filename/path */
534  if (NULL == (filename = opal_filename_from_uri(dfs->uri, &host))) {
535  goto complete;
536  }
537  if (NULL == host) {
538  host = strdup(orte_process_info.nodename);
539  }
540 
541  /* if the host is our own, then treat it as a local file */
542  if (orte_ifislocal(host)) {
544  "%s file %s on local host",
546  filename);
548  /* the callback was done in the above function */
549  OBJ_RELEASE(dfs);
550  return;
551  }
552 
553  /* ident the daemon on that host */
554  daemon.jobid = ORTE_PROC_MY_DAEMON->jobid;
555  /* fetch the daemon for this hostname */
557  "%s looking for daemon on host %s",
560  if (ORTE_SUCCESS != (rc = opal_pmix.resolve_peers(host, daemon.jobid, &lt))) {
561  ORTE_ERROR_LOG(rc);
562  OBJ_DESTRUCT(&lt);
563  goto complete;
564  }
566  daemon.vpid = nm->name.vpid;
567  OPAL_LIST_DESTRUCT(&lt);
568 
570  "%s file %s on host %s daemon %s",
572  filename, host, ORTE_NAME_PRINT(&daemon));
573 
574  /* double-check: if it is our local daemon, then we
575  * treat this as local
576  */
577  if (daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
579  "%s local file %s on same daemon",
581  filename);
583  /* the callback was done in the above function */
584  OBJ_RELEASE(dfs);
585  return;
586  }
587 
588  /* add this request to our local list so we can
589  * match it with the returned response when it comes
590  */
591  dfs->id = req_id++;
592  opal_list_append(&requests, &dfs->super);
593 
594  /* setup a message for the daemon telling
595  * them what file we want to access
596  */
597  buffer = OBJ_NEW(opal_buffer_t);
598  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
599  ORTE_ERROR_LOG(rc);
601  goto complete;
602  }
603  /* pass the request id */
604  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
605  ORTE_ERROR_LOG(rc);
607  goto complete;
608  }
609  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &filename, 1, OPAL_STRING))) {
610  ORTE_ERROR_LOG(rc);
612  goto complete;
613  }
614 
616  "%s sending open file request to %s file %s",
618  ORTE_NAME_PRINT(&daemon),
619  filename);
620  /* send it */
622  &daemon, buffer,
625  ORTE_ERROR_LOG(rc);
626  OBJ_RELEASE(buffer);
628  goto complete;
629  }
630  /* don't release it */
631  return;
632 
633  complete:
634  /* we get here if an error occurred - execute any
635  * pending callback so the proc doesn't hang
636  */
637  if (NULL != dfs->open_cbfunc) {
638  dfs->open_cbfunc(-1, dfs->cbdata);
639  }
640  OBJ_RELEASE(dfs);
641 }
642 
643 
644 /* in order to handle the possible opening/reading of files by
645  * multiple threads, we have to ensure that all operations are
646  * carried out in events - so the "open" cmd simply posts an
647  * event containing the required info, and then returns
648  */
649 static void dfs_open(char *uri,
651  void *cbdata)
652 {
654 
656  "%s opening file %s",
658 
659  /* setup the request */
661  dfs->cmd = ORTE_DFS_OPEN_CMD;
662  dfs->uri = strdup(uri);
663  dfs->open_cbfunc = cbfunc;
664  dfs->cbdata = cbdata;
665 
666  /* post it for processing */
668 }
669 
670 static void process_close(int fd, short args, void *cbdata)
671 {
672  orte_dfs_request_t *close_dfs = (orte_dfs_request_t*)cbdata;
673  orte_dfs_tracker_t *tptr, *trk;
674  opal_list_item_t *item;
675  opal_buffer_t *buffer;
676  int rc;
677 
678  ORTE_ACQUIRE_OBJECT(close_dfs);
679 
681  "%s closing fd %d",
683  close_dfs->local_fd);
684 
685  /* look in our local records for this fd */
686  trk = NULL;
687  for (item = opal_list_get_first(&active_files);
689  item = opal_list_get_next(item)) {
690  tptr = (orte_dfs_tracker_t*)item;
691  if (tptr->local_fd == close_dfs->local_fd) {
692  trk = tptr;
693  break;
694  }
695  }
696  if (NULL == trk) {
698  if (NULL != close_dfs->close_cbfunc) {
699  close_dfs->close_cbfunc(close_dfs->local_fd, close_dfs->cbdata);
700  }
701  OBJ_RELEASE(close_dfs);
702  return;
703  }
704 
705  /* if the file is local, close it */
706  if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
707  close(trk->remote_fd);
708  goto complete;
709  }
710 
711  /* setup a message for the daemon telling
712  * them what file to close
713  */
714  buffer = OBJ_NEW(opal_buffer_t);
715  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &close_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
716  ORTE_ERROR_LOG(rc);
717  goto complete;
718  }
719  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
720  ORTE_ERROR_LOG(rc);
721  goto complete;
722  }
723 
725  "%s sending close file request to %s for fd %d",
728  trk->local_fd);
729  /* send it */
731  &trk->host_daemon, buffer,
734  ORTE_ERROR_LOG(rc);
735  OBJ_RELEASE(buffer);
736  goto complete;
737  }
738 
739  complete:
741  OBJ_RELEASE(trk);
742  if (NULL != close_dfs->close_cbfunc) {
743  close_dfs->close_cbfunc(close_dfs->local_fd, close_dfs->cbdata);
744  }
745  OBJ_RELEASE(close_dfs);
746 }
747 
748 static void dfs_close(int fd,
750  void *cbdata)
751 {
753 
755  "%s close called on fd %d",
757 
759  dfs->cmd = ORTE_DFS_CLOSE_CMD;
760  dfs->local_fd = fd;
761  dfs->close_cbfunc = cbfunc;
762  dfs->cbdata = cbdata;
763 
764  /* post it for processing */
766 }
767 
768 static void process_sizes(int fd, short args, void *cbdata)
769 {
770  orte_dfs_request_t *size_dfs = (orte_dfs_request_t*)cbdata;
771  orte_dfs_tracker_t *tptr, *trk;
772  opal_list_item_t *item;
773  opal_buffer_t *buffer;
774  int rc;
775  struct stat buf;
776 
777  ORTE_ACQUIRE_OBJECT(size_dfs);
778 
780  "%s processing get_size on fd %d",
782  size_dfs->local_fd);
783 
784  /* look in our local records for this fd */
785  trk = NULL;
786  for (item = opal_list_get_first(&active_files);
788  item = opal_list_get_next(item)) {
789  tptr = (orte_dfs_tracker_t*)item;
790  if (tptr->local_fd == size_dfs->local_fd) {
791  trk = tptr;
792  break;
793  }
794  }
795  if (NULL == trk) {
797  OBJ_RELEASE(size_dfs);
798  return;
799  }
800 
801  /* if the file is local, execute the seek on it - we
802  * stuck the "whence" value in the remote_fd
803  */
804  if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
805  /* stat the file and get its size */
806  if (0 > stat(trk->filename, &buf)) {
807  /* cannot stat file */
809  "%s could not stat %s",
811  trk->filename);
812  if (NULL != size_dfs->size_cbfunc) {
813  size_dfs->size_cbfunc(-1, size_dfs->cbdata);
814  }
815  } else {
816  if (NULL != size_dfs->size_cbfunc) {
817  size_dfs->size_cbfunc(buf.st_size, size_dfs->cbdata);
818  }
819  }
820  goto complete;
821  }
822  /* add this request to our local list so we can
823  * match it with the returned response when it comes
824  */
825  size_dfs->id = req_id++;
826  opal_list_append(&requests, &size_dfs->super);
827 
828  /* setup a message for the daemon telling
829  * them what file we want to access
830  */
831  buffer = OBJ_NEW(opal_buffer_t);
832  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &size_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
833  ORTE_ERROR_LOG(rc);
834  opal_list_remove_item(&requests, &size_dfs->super);
835  goto complete;
836  }
837  /* pass the request id */
838  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &size_dfs->id, 1, OPAL_UINT64))) {
839  ORTE_ERROR_LOG(rc);
840  opal_list_remove_item(&requests, &size_dfs->super);
841  goto complete;
842  }
843  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
844  ORTE_ERROR_LOG(rc);
845  opal_list_remove_item(&requests, &size_dfs->super);
846  goto complete;
847  }
848 
850  "%s sending get_size request to %s for fd %d",
853  trk->local_fd);
854  /* send it */
856  &trk->host_daemon, buffer,
859  ORTE_ERROR_LOG(rc);
860  OBJ_RELEASE(buffer);
861  opal_list_remove_item(&requests, &size_dfs->super);
862  if (NULL != size_dfs->size_cbfunc) {
863  size_dfs->size_cbfunc(-1, size_dfs->cbdata);
864  }
865  goto complete;
866  }
867  /* leave the request there */
868  return;
869 
870  complete:
871  OBJ_RELEASE(size_dfs);
872 }
873 
874 static void dfs_get_file_size(int fd,
876  void *cbdata)
877 {
879 
881  "%s get_size called on fd %d",
883 
885  dfs->cmd = ORTE_DFS_SIZE_CMD;
886  dfs->local_fd = fd;
887  dfs->size_cbfunc = cbfunc;
888  dfs->cbdata = cbdata;
889 
890  /* post it for processing */
892 }
893 
894 
895 static void process_seeks(int fd, short args, void *cbdata)
896 {
897  orte_dfs_request_t *seek_dfs = (orte_dfs_request_t*)cbdata;
898  orte_dfs_tracker_t *tptr, *trk;
899  opal_list_item_t *item;
900  opal_buffer_t *buffer;
901  int64_t i64;
902  int rc;
903  struct stat buf;
904 
905  ORTE_ACQUIRE_OBJECT(seek_dfs);
906 
908  "%s processing seek on fd %d",
910  seek_dfs->local_fd);
911 
912  /* look in our local records for this fd */
913  trk = NULL;
914  for (item = opal_list_get_first(&active_files);
916  item = opal_list_get_next(item)) {
917  tptr = (orte_dfs_tracker_t*)item;
918  if (tptr->local_fd == seek_dfs->local_fd) {
919  trk = tptr;
920  break;
921  }
922  }
923  if (NULL == trk) {
925  OBJ_RELEASE(seek_dfs);
926  return;
927  }
928 
929  /* if the file is local, execute the seek on it - we
930  * stuck the "whence" value in the remote_fd
931  */
932  if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
934  "%s local seek on fd %d",
936  seek_dfs->local_fd);
937  /* stat the file and get its size */
938  if (0 > stat(trk->filename, &buf)) {
939  /* cannot stat file */
941  "%s could not stat %s",
943  trk->filename);
944  if (NULL != seek_dfs->seek_cbfunc) {
945  seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
946  }
947  } else if (buf.st_size < seek_dfs->read_length &&
948  SEEK_SET == seek_dfs->remote_fd) {
949  /* seek would take us past EOF */
950  if (NULL != seek_dfs->seek_cbfunc) {
951  seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
952  }
953  } else if (buf.st_size < (off_t)(trk->location + seek_dfs->read_length) &&
954  SEEK_CUR == seek_dfs->remote_fd) {
955  /* seek would take us past EOF */
956  if (NULL != seek_dfs->seek_cbfunc) {
957  seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
958  }
959  } else {
960  lseek(trk->remote_fd, seek_dfs->read_length, seek_dfs->remote_fd);
961  if (SEEK_SET == seek_dfs->remote_fd) {
962  trk->location = seek_dfs->read_length;
963  } else {
964  trk->location += seek_dfs->read_length;
965  }
966  if (NULL != seek_dfs->seek_cbfunc) {
967  seek_dfs->seek_cbfunc(seek_dfs->read_length, seek_dfs->cbdata);
968  }
969  }
970  goto complete;
971  }
972  /* add this request to our local list so we can
973  * match it with the returned response when it comes
974  */
975  seek_dfs->id = req_id++;
976  opal_list_append(&requests, &seek_dfs->super);
977 
978  /* setup a message for the daemon telling
979  * them what file to seek
980  */
981  buffer = OBJ_NEW(opal_buffer_t);
982  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
983  ORTE_ERROR_LOG(rc);
984  goto complete;
985  }
986  /* pass the request id */
987  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->id, 1, OPAL_UINT64))) {
988  ORTE_ERROR_LOG(rc);
989  opal_list_remove_item(&requests, &seek_dfs->super);
990  goto complete;
991  }
992  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
993  ORTE_ERROR_LOG(rc);
994  goto complete;
995  }
996  i64 = (int64_t)seek_dfs->read_length;
997  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &i64, 1, OPAL_INT64))) {
998  ORTE_ERROR_LOG(rc);
999  goto complete;
1000  }
1001  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->remote_fd, 1, OPAL_INT))) {
1002  ORTE_ERROR_LOG(rc);
1003  goto complete;
1004  }
1005 
1007  "%s sending seek file request to %s for fd %d",
1010  trk->local_fd);
1011  /* send it */
1013  &trk->host_daemon, buffer,
1016  ORTE_ERROR_LOG(rc);
1017  OBJ_RELEASE(buffer);
1018  goto complete;
1019  }
1020  /* leave the request */
1021  return;
1022 
1023  complete:
1024  OBJ_RELEASE(seek_dfs);
1025 }
1026 
1027 
1028 static void dfs_seek(int fd, long offset, int whence,
1030  void *cbdata)
1031 {
1033 
1035  "%s seek called on fd %d",
1037 
1039  dfs->cmd = ORTE_DFS_SEEK_CMD;
1040  dfs->local_fd = fd;
1041  dfs->read_length = offset;
1042  dfs->remote_fd = whence;
1043  dfs->seek_cbfunc = cbfunc;
1044  dfs->cbdata = cbdata;
1045 
1046  /* post it for processing */
1048 }
1049 
1050 static void process_reads(int fd, short args, void *cbdata)
1051 {
1052  orte_dfs_request_t *read_dfs = (orte_dfs_request_t*)cbdata;
1053  orte_dfs_tracker_t *tptr, *trk;
1054  long nbytes;
1055  opal_list_item_t *item;
1056  opal_buffer_t *buffer;
1057  int64_t i64;
1058  int rc;
1059 
1060  ORTE_ACQUIRE_OBJECT(read_dfs);
1061 
1062  /* look in our local records for this fd */
1063  trk = NULL;
1064  for (item = opal_list_get_first(&active_files);
1065  item != opal_list_get_end(&active_files);
1066  item = opal_list_get_next(item)) {
1067  tptr = (orte_dfs_tracker_t*)item;
1068  if (tptr->local_fd == read_dfs->local_fd) {
1069  trk = tptr;
1070  break;
1071  }
1072  }
1073  if (NULL == trk) {
1075  OBJ_RELEASE(read_dfs);
1076  return;
1077  }
1078 
1079  /* if the file is local, read the desired bytes */
1080  if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
1081  nbytes = read(trk->remote_fd, read_dfs->read_buffer, read_dfs->read_length);
1082  if (0 < nbytes) {
1083  /* update our location */
1084  trk->location += nbytes;
1085  }
1086  /* pass them back to the caller */
1087  if (NULL != read_dfs->read_cbfunc) {
1088  read_dfs->read_cbfunc(nbytes, read_dfs->read_buffer, read_dfs->cbdata);
1089  }
1090  /* request is complete */
1091  OBJ_RELEASE(read_dfs);
1092  return;
1093  }
1094  /* add this request to our pending list */
1095  read_dfs->id = req_id++;
1096  opal_list_append(&requests, &read_dfs->super);
1097 
1098  /* setup a message for the daemon telling
1099  * them what file to read
1100  */
1101  buffer = OBJ_NEW(opal_buffer_t);
1102  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &read_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
1103  ORTE_ERROR_LOG(rc);
1104  goto complete;
1105  }
1106  /* include the request id */
1107  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &read_dfs->id, 1, OPAL_UINT64))) {
1108  ORTE_ERROR_LOG(rc);
1109  goto complete;
1110  }
1111  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
1112  ORTE_ERROR_LOG(rc);
1113  goto complete;
1114  }
1115  i64 = (int64_t)read_dfs->read_length;
1116  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &i64, 1, OPAL_INT64))) {
1117  ORTE_ERROR_LOG(rc);
1118  goto complete;
1119  }
1120 
1122  "%s sending read file request to %s for fd %d",
1125  trk->local_fd);
1126  /* send it */
1128  &trk->host_daemon, buffer,
1131  ORTE_ERROR_LOG(rc);
1132  OBJ_RELEASE(buffer);
1133  }
1134  /* don't release the request */
1135  return;
1136 
1137  complete:
1138  /* don't need to hang on to this request */
1139  opal_list_remove_item(&requests, &read_dfs->super);
1140  OBJ_RELEASE(read_dfs);
1141 }
1142 
1143 static void dfs_read(int fd, uint8_t *buffer,
1144  long length,
1146  void *cbdata)
1147 {
1149 
1151  dfs->cmd = ORTE_DFS_READ_CMD;
1152  dfs->local_fd = fd;
1153  dfs->read_buffer = buffer;
1154  dfs->read_length = length;
1155  dfs->read_cbfunc = cbfunc;
1156  dfs->cbdata = cbdata;
1157 
1158  /* post it for processing */
1160 }
1161 
1162 static void process_posts(int fd, short args, void *cbdata)
1163 {
1165  opal_buffer_t *buffer;
1166  int rc;
1167 
1169 
1170  /* we will get confirmation in our receive function, so
1171  * add this request to our list */
1172  dfs->id = req_id++;
1173  opal_list_append(&requests, &dfs->super);
1174 
1175  /* Send the buffer's contents to our local daemon for storage */
1176  buffer = OBJ_NEW(opal_buffer_t);
1177  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
1178  ORTE_ERROR_LOG(rc);
1179  goto error;
1180  }
1181  /* include the request id */
1182  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
1183  ORTE_ERROR_LOG(rc);
1184  goto error;
1185  }
1186  /* add my name */
1187  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
1188  ORTE_ERROR_LOG(rc);
1189  goto error;
1190  }
1191  /* pack the payload */
1192  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->bptr, 1, OPAL_BUFFER))) {
1193  ORTE_ERROR_LOG(rc);
1194  goto error;
1195  }
1196  /* send it */
1198  ORTE_PROC_MY_DAEMON, buffer,
1201  ORTE_ERROR_LOG(rc);
1202  goto error;
1203  }
1204  return;
1205 
1206  error:
1207  OBJ_RELEASE(buffer);
1208  opal_list_remove_item(&requests, &dfs->super);
1209  if (NULL != dfs->post_cbfunc) {
1210  dfs->post_cbfunc(dfs->cbdata);
1211  }
1212  OBJ_RELEASE(dfs);
1213 }
1214 
1217  void *cbdata)
1218 {
1220 
1222  dfs->cmd = ORTE_DFS_POST_CMD;
1223  dfs->bptr = bo;
1224  dfs->post_cbfunc = cbfunc;
1225  dfs->cbdata = cbdata;
1226 
1227  /* post it for processing */
1229 }
1230 
1231 static void process_getfm(int fd, short args, void *cbdata)
1232 {
1234  opal_buffer_t *buffer;
1235  int rc;
1236 
1238 
1239  /* we will get confirmation in our receive function, so
1240  * add this request to our list */
1241  dfs->id = req_id++;
1242  opal_list_append(&requests, &dfs->super);
1243 
1244  /* Send the request to our local daemon */
1245  buffer = OBJ_NEW(opal_buffer_t);
1246  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
1247  ORTE_ERROR_LOG(rc);
1248  goto error;
1249  }
1250  /* include the request id */
1251  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
1252  ORTE_ERROR_LOG(rc);
1253  goto error;
1254  }
1255  /* and the target */
1256  if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->target, 1, ORTE_NAME))) {
1257  ORTE_ERROR_LOG(rc);
1258  goto error;
1259  }
1260  /* send it */
1262  ORTE_PROC_MY_DAEMON, buffer,
1265  ORTE_ERROR_LOG(rc);
1266  goto error;
1267  }
1268  return;
1269 
1270  error:
1271  OBJ_RELEASE(buffer);
1272  opal_list_remove_item(&requests, &dfs->super);
1273  if (NULL != dfs->fm_cbfunc) {
1274  dfs->fm_cbfunc(NULL, dfs->cbdata);
1275  }
1276  OBJ_RELEASE(dfs);
1277 }
1278 
1281  void *cbdata)
1282 {
1284 
1286  dfs->cmd = ORTE_DFS_GETFM_CMD;
1287  dfs->target.jobid = target->jobid;
1288  dfs->target.vpid = target->vpid;
1289  dfs->fm_cbfunc = cbfunc;
1290  dfs->cbdata = cbdata;
1291 
1292  /* post it for processing */
1294 }
1295 
1297  opal_buffer_t *bo,
1299  void *cbdata)
1300 {
1301  /* apps don't store file maps */
1302  if (NULL != cbfunc) {
1303  cbfunc(cbdata);
1304  }
1305 }
1306 
1309  void *cbdata)
1310 {
1311  /* apps don't store file maps */
1312  if (NULL != cbfunc) {
1313  cbfunc(cbdata);
1314  }
1315 }
orte_dfs_fm_callback_fn_t
void(* orte_dfs_fm_callback_fn_t)(opal_buffer_t *fmaps, void *cbdata)
Definition: dfs_types.h:68
ORTE_DFS_GETFM_CMD
#define ORTE_DFS_GETFM_CMD
Definition: dfs_types.h:33
finalize
static int finalize(void)
Definition: dfs_app.c:116
orte_dfs_size_callback_fn_t
void(* orte_dfs_size_callback_fn_t)(long size, void *cbdata)
Definition: dfs_types.h:58
OPAL_BUFFER
#define OPAL_BUFFER
pack the remaining contents of a buffer as an object
Definition: dss_types.h:98
orte_mgmt_conduit
int orte_mgmt_conduit
Definition: orte_globals.c:77
orte_dfs_purge_callback_fn_t
void(* orte_dfs_purge_callback_fn_t)(void *cbdata)
Definition: dfs_types.h:72
name_fns.h
opal_buffer_t
Structure for holding a buffer to be used with the RML or OOB subsystems.
Definition: dss_types.h:267
uri.h
OBJ_CONSTRUCT
#define OBJ_CONSTRUCT(object, type)
Construct (initialize) objects that are not dynamically allocated.
Definition: opal_object.h:356
ORTE_NAME_PRINT
#define ORTE_NAME_PRINT(n)
Definition: name_fns.h:54
orte_dfs_base_module_1_0_0_t
Definition: dfs.h:144
opal_list_get_next
#define opal_list_get_next(item)
Get the next item in a list.
Definition: opal_list.h:132
opal_dss_t::pack
opal_dss_pack_fn_t pack
Definition: dss.h:492
ORTE_RML_TAG_DFS_CMD
#define ORTE_RML_TAG_DFS_CMD
Definition: rml_types.h:142
orte_dfs_tracker_t::remote_fd
int remote_fd
Definition: base.h:48
show_help.h
orte_dfs_request_t::read_cbfunc
orte_dfs_read_callback_fn_t read_cbfunc
Definition: base.h:71
ORTE_ERR_FILE_OPEN_FAILURE
@ ORTE_ERR_FILE_OPEN_FAILURE
Definition: constants.h:59
base.h
proc_info.h
orte_dfs_post_callback_fn_t
void(* orte_dfs_post_callback_fn_t)(void *cbdata)
Definition: dfs_types.h:66
process_seeks
static void process_seeks(int fd, short args, void *cbdata)
Definition: dfs_app.c:895
orte_globals.h
ORTE_ERROR_LOG
#define ORTE_ERROR_LOG(n)
Definition: errmgr.h:75
ORTE_NAME_WILDCARD
#define ORTE_NAME_WILDCARD
Definition: orte_globals.h:90
process_opens
static void process_opens(int fd, short args, void *cbdata)
Definition: dfs_app.c:501
ORTE_DFS_SIZE_CMD
#define ORTE_DFS_SIZE_CMD
Definition: dfs_types.h:29
opal_list_get_first
static opal_list_item_t * opal_list_get_first(opal_list_t *list)
Return the first item on the list (does not remove it).
Definition: opal_list.h:324
process_sizes
static void process_sizes(int fd, short args, void *cbdata)
Definition: dfs_app.c:768
dfs_purge_file_maps
static void dfs_purge_file_maps(orte_jobid_t jobid, orte_dfs_purge_callback_fn_t cbfunc, void *cbdata)
Definition: dfs_app.c:1307
dfs_close
static void dfs_close(int fd, orte_dfs_close_callback_fn_t cbfunc, void *cbdata)
Definition: dfs_app.c:748
orte_dfs_request_t
Definition: base.h:54
opal_pmix_base_module_t::resolve_peers
opal_pmix_base_module_resolve_peers_fn_t resolve_peers
Definition: pmix.h:900
dfs_seek
static void dfs_seek(int fd, long offset, int whence, orte_dfs_seek_callback_fn_t cbfunc, void *cbdata)
Definition: dfs_app.c:1028
opal_list_remove_first
static opal_list_item_t * opal_list_remove_first(opal_list_t *list)
Remove the first item from the list and return it.
Definition: opal_list.h:651
orte_dfs_load_callback_fn_t
void(* orte_dfs_load_callback_fn_t)(void *cbdata)
Definition: dfs_types.h:70
threads.h
dfs_app.h
orte_dfs_app_module
orte_dfs_base_module_t orte_dfs_app_module
APP module.
Definition: dfs_app.c:83
OBJ_DESTRUCT
#define OBJ_DESTRUCT(object)
Destruct (finalize) an object that is not dynamically allocated.
Definition: opal_object.h:388
cbfunc
static void cbfunc(int status, void *cbdata)
Definition: show_help.c:619
opal_namelist_t
Definition: proc.h:101
opal_process_name_t::jobid
opal_jobid_t jobid
Definition: dss_types.h:47
errmgr.h
opal_uri_get_scheme
char * opal_uri_get_scheme(const char *uri)
Parse a uri to retrieve the scheme.
Definition: uri.c:30
ORTE_DFS_POST_CMD
#define ORTE_DFS_POST_CMD
Definition: dfs_types.h:32
orte_event_base
opal_event_base_t * orte_event_base
Definition: orte_init.c:115
jobid
static uint32_t jobid(void)
Definition: coll_hcoll_rte.c:330
opal_output_verbose
void opal_output_verbose(int level, int output_id, const char *format,...)
Definition: output.c:381
orte_dfs_request_t::read_buffer
uint8_t * read_buffer
Definition: base.h:63
opal_pmix
opal_pmix_base_module_t opal_pmix
Definition: pmix_base_frame.c:35
orte_rml_base_API_t::recv_buffer_nb
orte_rml_API_recv_buffer_nb_fn_t recv_buffer_nb
Receive non-blocking buffer message.
Definition: rml.h:454
process_close
static void process_close(int fd, short args, void *cbdata)
Definition: dfs_app.c:670
OPAL_UINT8
#define OPAL_UINT8
an 8-bit unsigned integer
Definition: dss_types.h:80
orte_dfs_tracker_t::requestor
orte_process_name_t requestor
Definition: base.h:42
orte_jobid_t
opal_jobid_t orte_jobid_t
Definition: types.h:68
OPAL_INT64
#define OPAL_INT64
a 64-bit integer
Definition: dss_types.h:77
orte_config.h
ORTE_DFS_CMD_T
#define ORTE_DFS_CMD_T
Definition: dfs_types.h:25
orte_dfs_request_t::read_length
long read_length
Definition: base.h:64
if.h
ORTE_NAME
#define ORTE_NAME
an orte_process_name_t
Definition: types.h:153
output.h
dfs_read
static void dfs_read(int fd, uint8_t *buffer, long length, orte_dfs_read_callback_fn_t cbfunc, void *cbdata)
Definition: dfs_app.c:1143
opal_list_get_end
static opal_list_item_t * opal_list_get_end(opal_list_t *list)
Return the end of the list; an invalid list entry suitable for comparison only.
Definition: opal_list.h:403
req_id
static uint64_t req_id
Definition: dfs_app.c:99
rml.h
ORTE_DFS_SEEK_CMD
#define ORTE_DFS_SEEK_CMD
Definition: dfs_types.h:30
orte_dfs_tracker_t::uri
char * uri
Definition: base.h:44
orte_process_info
orte_proc_info_t orte_process_info
Global process info descriptor.
Definition: proc_info.c:55
requests
static opal_list_t requests
Definition: dfs_app.c:97
init
static int init(void)
Definition: dfs_app.c:104
process_posts
static void process_posts(int fd, short args, void *cbdata)
Definition: dfs_app.c:1162
dfs_open
static void dfs_open(char *uri, orte_dfs_open_callback_fn_t cbfunc, void *cbdata)
Definition: dfs_app.c:649
dfs_get_file_map
static void dfs_get_file_map(orte_process_name_t *target, orte_dfs_fm_callback_fn_t cbfunc, void *cbdata)
Definition: dfs_app.c:1279
orte_dfs_request_t::id
uint64_t id
Definition: base.h:57
opal_dss
OPAL_DECLSPEC opal_dss_t opal_dss
Definition: dss_open_close.c:53
open_local_file
static void open_local_file(orte_dfs_request_t *dfs)
Definition: dfs_app.c:447
OPAL_INT
#define OPAL_INT
generic integer
Definition: dss_types.h:73
orte_dfs_cmd_t
BEGIN_C_DECLS typedef uint8_t orte_dfs_cmd_t
Definition: dfs_types.h:24
ORTE_PROC_MY_DAEMON
#define ORTE_PROC_MY_DAEMON
Definition: orte_globals.h:104
orte_rml_base_API_t::send_buffer_nb
orte_rml_API_send_buffer_nb_fn_t send_buffer_nb
Send non-blocking buffer message.
Definition: rml.h:448
active_files
static opal_list_t active_files
Definition: dfs_app.c:97
local_fd
static int local_fd
Definition: dfs_app.c:98
ORTE_ERR_BAD_PARAM
@ ORTE_ERR_BAD_PARAM
Definition: constants.h:43
ORTE_DFS_OPEN_CMD
#define ORTE_DFS_OPEN_CMD
Definition: dfs_types.h:27
opal_namelist_t::name
opal_process_name_t name
Definition: proc.h:103
orte_dfs_request_t::super
opal_list_item_t super
Definition: base.h:55
opal_process_name_t
Definition: dss_types.h:46
OBJ_NEW
#define OBJ_NEW(type)
Definition: opal_object.h:272
orte_dfs_request_t::seek_cbfunc
orte_dfs_seek_callback_fn_t seek_cbfunc
Definition: base.h:70
opal_dss_t::unpack
opal_dss_unpack_fn_t unpack
Definition: dss.h:493
opal_list_append
#define opal_list_append(l, i)
Append an item to the end of the list.
Definition: opal_list.h:539
orte_dfs_request_t::local_fd
int local_fd
Definition: base.h:61
error_strings.h
opal_list_item_t
Definition: opal_list.h:103
ORTE_THREADSHIFT
#define ORTE_THREADSHIFT(x, eb, f, p)
Definition: threads.h:37
orte_dfs_tracker_t::local_fd
int local_fd
Definition: base.h:47
OPAL_LIST_DESTRUCT
#define OPAL_LIST_DESTRUCT(list)
Cleanly destruct a list.
Definition: opal_list.h:173
opal_output
void opal_output(int output_id, const char *format,...)
Definition: output.c:367
opal_filename_from_uri
char * opal_filename_from_uri(const char *uri, char **hostname)
Extract the filename (and hostname) from a uri.
Definition: uri.c:104
orte_dfs_request_t::cmd
orte_dfs_cmd_t cmd
Definition: base.h:58
orte_rml_tag_t
uint32_t orte_rml_tag_t
Message matching tag.
Definition: rml_types.h:197
OPAL_STRING
#define OPAL_STRING
a NULL terminated string
Definition: dss_types.h:69
OPAL_UINT64
#define OPAL_UINT64
a 64-bit unsigned integer
Definition: dss_types.h:83
OPAL_SUCCESS
@ OPAL_SUCCESS
Definition: constants.h:29
ORTE_ERR_NOT_FOUND
@ ORTE_ERR_NOT_FOUND
Definition: constants.h:51
orte_dfs_request_t::cbdata
void * cbdata
Definition: base.h:76
orte_dfs_request_t::remote_fd
int remote_fd
Definition: base.h:62
orte_rml_send_callback
void orte_rml_send_callback(int status, orte_process_name_t *peer, opal_buffer_t *buffer, orte_rml_tag_t tag, void *cbdata)
Definition: rml_base_frame.c:221
orte_dfs_tracker_t::host_daemon
orte_process_name_t host_daemon
Definition: base.h:43
mca_base_framework_t::framework_output
int framework_output
Opal output for this framework (or -1)
Definition: mca_base_framework.h:154
orte_proc_info_t::nodename
char * nodename
string name for this node
Definition: proc_info.h:98
orte_dfs_tracker_t::filename
char * filename
Definition: base.h:46
ORTE_SUCCESS
@ ORTE_SUCCESS
Definition: constants.h:37
pmix.h
ORTE_RML_TAG_DFS_DATA
#define ORTE_RML_TAG_DFS_DATA
Definition: rml_types.h:143
opal_process_name_t::vpid
opal_vpid_t vpid
Definition: dss_types.h:48
dfs
void dfs(int, int, int, double *, double *, int, int)
Definition: tm_bucket.c:225
orte_dfs_request_t::size_cbfunc
orte_dfs_size_callback_fn_t size_cbfunc
Definition: base.h:69
orte_dfs_tracker_t::super
opal_list_item_t super
Definition: base.h:41
ORTE_PROC_MY_NAME
#define ORTE_PROC_MY_NAME
instantiated in orte/runtime/orte_init.c
Definition: orte_globals.h:95
orte_dfs_seek_callback_fn_t
void(* orte_dfs_seek_callback_fn_t)(long offset, void *cbdata)
Definition: dfs_types.h:60
dfs_load_file_maps
static void dfs_load_file_maps(orte_jobid_t jobid, opal_buffer_t *bo, orte_dfs_load_callback_fn_t cbfunc, void *cbdata)
Definition: dfs_app.c:1296
ORTE_ACQUIRE_OBJECT
#define ORTE_ACQUIRE_OBJECT(o)
Definition: threads.h:28
opal_list_t
Definition: opal_list.h:152
ORTE_RML_PERSISTENT
#define ORTE_RML_PERSISTENT
Definition: rml_types.h:49
dfs_get_file_size
static void dfs_get_file_size(int fd, orte_dfs_size_callback_fn_t cbfunc, void *cbdata)
Definition: dfs_app.c:874
orte_dfs_base_framework
mca_base_framework_t orte_dfs_base_framework
dfs_post_file_map
static void dfs_post_file_map(opal_buffer_t *bo, orte_dfs_post_callback_fn_t cbfunc, void *cbdata)
Definition: dfs_app.c:1215
orte_dfs_close_callback_fn_t
void(* orte_dfs_close_callback_fn_t)(int fd, void *cbdata)
Definition: dfs_types.h:56
orte_dfs_tracker_t
Definition: base.h:40
dss.h
orte_dfs_tracker_t::location
size_t location
Definition: base.h:49
orte_dfs_request_t::close_cbfunc
orte_dfs_close_callback_fn_t close_cbfunc
Definition: base.h:68
process_getfm
static void process_getfm(int fd, short args, void *cbdata)
Definition: dfs_app.c:1231
process_reads
static void process_reads(int fd, short args, void *cbdata)
Definition: dfs_app.c:1050
ORTE_SYS_PRI
#define ORTE_SYS_PRI
Definition: orte_globals.h:131
opal_list_remove_item
static opal_list_item_t * opal_list_remove_item(opal_list_t *list, opal_list_item_t *item)
Remove an item from a list.
Definition: opal_list.h:477
orte_show_help
int orte_show_help(const char *filename, const char *topic, bool want_error_header,...)
Show help.
Definition: show_help.c:593
orte_dfs_tracker_t::scheme
char * scheme
Definition: base.h:45
orte_dfs_open_callback_fn_t
void(* orte_dfs_open_callback_fn_t)(int fd, void *cbdata)
Definition: dfs_types.h:54
recv_dfs
static void recv_dfs(int status, orte_process_name_t *sender, opal_buffer_t *buffer, orte_rml_tag_t tag, void *cbdata)
Definition: dfs_app.c:135
filename
static char * filename
Definition: sync.c:26
ORTE_DFS_CLOSE_CMD
#define ORTE_DFS_CLOSE_CMD
Definition: dfs_types.h:28
orte_ifislocal
bool orte_ifislocal(const char *hostname)
Definition: proc_info.c:336
orte_rml_base_API_t::recv_cancel
orte_rml_API_recv_cancel_fn_t recv_cancel
Cancel posted non-blocking receive.
Definition: rml.h:457
ORTE_DFS_READ_CMD
#define ORTE_DFS_READ_CMD
Definition: dfs_types.h:31
OBJ_RELEASE
#define OBJ_RELEASE(object)
Release an object (by decrementing its reference count).
Definition: opal_object.h:338
NULL
#define NULL
Copyright (C) 2000-2004 by Etnus, LLC.
Definition: ompi_msgq_dll.c:136
orte_rml
orte_rml_base_API_t orte_rml
Interface for RML communication.
Definition: rml_base_frame.c:43
orte_dfs_read_callback_fn_t
void(* orte_dfs_read_callback_fn_t)(long status, uint8_t *buffer, void *cbdata)
Definition: dfs_types.h:62