"Fossies" - the Fresh Open Source Software Archive

Member "fd-8.1.1/src/walk.rs" (25 May 2020, 18308 Bytes) of package /linux/privat/fd-8.1.1.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Rust source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. See also the last Fossies "Diffs" side-by-side code changes report for "walk.rs": 8.0.0_vs_8.1.0.

    1 use std::borrow::Cow;
    2 use std::ffi::OsStr;
    3 use std::fs::{FileType, Metadata};
    4 use std::io;
    5 use std::path::{Path, PathBuf};
    6 use std::process;
    7 use std::sync::atomic::{AtomicBool, Ordering};
    8 use std::sync::mpsc::{channel, Receiver, Sender};
    9 use std::sync::{Arc, Mutex};
   10 use std::thread;
   11 use std::time;
   12 
   13 use anyhow::{anyhow, Result};
   14 use ignore::overrides::OverrideBuilder;
   15 use ignore::{self, WalkBuilder};
   16 use regex::bytes::Regex;
   17 
   18 use crate::error::print_error;
   19 use crate::exec;
   20 use crate::exit_codes::{merge_exitcodes, ExitCode};
   21 use crate::filesystem;
   22 use crate::options::Options;
   23 use crate::output;
   24 
   25 /// The receiver thread can either be buffering results or directly streaming to the console.
   26 enum ReceiverMode {
   27     /// Receiver is still buffering in order to sort the results, if the search finishes fast
   28     /// enough.
   29     Buffering,
   30 
   31     /// Receiver is directly printing results to the output.
   32     Streaming,
   33 }
   34 
   35 /// The Worker threads can result in a valid entry having PathBuf or an error.
   36 pub enum WorkerResult {
   37     Entry(PathBuf),
   38     Error(ignore::Error),
   39 }
   40 
   41 /// Maximum size of the output buffer before flushing results to the console
   42 pub const MAX_BUFFER_LENGTH: usize = 1000;
   43 
   44 /// Recursively scan the given search path for files / pathnames matching the pattern.
   45 ///
   46 /// If the `--exec` argument was supplied, this will create a thread pool for executing
   47 /// jobs in parallel from a given command line and the discovered paths. Otherwise, each
   48 /// path will simply be written to standard output.
   49 pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<Options>) -> Result<ExitCode> {
   50     let mut path_iter = path_vec.iter();
   51     let first_path_buf = path_iter
   52         .next()
   53         .expect("Error: Path vector can not be empty");
   54     let (tx, rx) = channel();
   55 
   56     let mut override_builder = OverrideBuilder::new(first_path_buf.as_path());
   57 
   58     for pattern in &config.exclude_patterns {
   59         override_builder
   60             .add(pattern)
   61             .map_err(|e| anyhow!("Malformed exclude pattern: {}", e))?;
   62     }
   63     let overrides = override_builder
   64         .build()
   65         .map_err(|_| anyhow!("Mismatch in exclude patterns"))?;
   66 
   67     let mut walker = WalkBuilder::new(first_path_buf.as_path());
   68     walker
   69         .hidden(config.ignore_hidden)
   70         .ignore(config.read_fdignore)
   71         .parents(config.read_fdignore || config.read_vcsignore)
   72         .git_ignore(config.read_vcsignore)
   73         .git_global(config.read_vcsignore)
   74         .git_exclude(config.read_vcsignore)
   75         .overrides(overrides)
   76         .follow_links(config.follow_links)
   77         // No need to check for supported platforms, option is unavailable on unsupported ones
   78         .same_file_system(config.one_file_system)
   79         .max_depth(config.max_depth);
   80 
   81     if config.read_fdignore {
   82         walker.add_custom_ignore_filename(".fdignore");
   83     }
   84 
   85     if config.read_global_ignore {
   86         #[cfg(target_os = "macos")]
   87         let config_dir_op = std::env::var_os("XDG_CONFIG_HOME")
   88             .map(PathBuf::from)
   89             .filter(|p| p.is_absolute())
   90             .or_else(|| dirs::home_dir().map(|d| d.join(".config")));
   91 
   92         #[cfg(not(target_os = "macos"))]
   93         let config_dir_op = dirs::config_dir();
   94 
   95         if let Some(global_ignore_file) = config_dir_op
   96             .map(|p| p.join("fd").join("ignore"))
   97             .filter(|p| p.is_file())
   98         {
   99             let result = walker.add_ignore(global_ignore_file);
  100             match result {
  101                 Some(ignore::Error::Partial(_)) => (),
  102                 Some(err) => {
  103                     print_error(format!(
  104                         "Malformed pattern in global ignore file. {}.",
  105                         err.to_string()
  106                     ));
  107                 }
  108                 None => (),
  109             }
  110         }
  111     }
  112 
  113     for ignore_file in &config.ignore_files {
  114         let result = walker.add_ignore(ignore_file);
  115         match result {
  116             Some(ignore::Error::Partial(_)) => (),
  117             Some(err) => {
  118                 print_error(format!(
  119                     "Malformed pattern in custom ignore file. {}.",
  120                     err.to_string()
  121                 ));
  122             }
  123             None => (),
  124         }
  125     }
  126 
  127     for path_entry in path_iter {
  128         walker.add(path_entry.as_path());
  129     }
  130 
  131     let parallel_walker = walker.threads(config.threads).build_parallel();
  132 
  133     let wants_to_quit = Arc::new(AtomicBool::new(false));
  134     if config.ls_colors.is_some() && config.command.is_none() {
  135         let wq = Arc::clone(&wants_to_quit);
  136         ctrlc::set_handler(move || {
  137             if wq.load(Ordering::Relaxed) {
  138                 // Ctrl-C has been pressed twice, exit NOW
  139                 process::exit(ExitCode::KilledBySigint.into());
  140             } else {
  141                 wq.store(true, Ordering::Relaxed);
  142             }
  143         })
  144         .unwrap();
  145     }
  146 
  147     // Spawn the thread that receives all results through the channel.
  148     let receiver_thread = spawn_receiver(&config, &wants_to_quit, rx);
  149 
  150     // Spawn the sender threads.
  151     spawn_senders(&config, &wants_to_quit, pattern, parallel_walker, tx);
  152 
  153     // Wait for the receiver thread to print out all results.
  154     let exit_code = receiver_thread.join().unwrap();
  155 
  156     if wants_to_quit.load(Ordering::Relaxed) {
  157         Ok(ExitCode::KilledBySigint)
  158     } else {
  159         Ok(exit_code)
  160     }
  161 }
  162 
  163 fn spawn_receiver(
  164     config: &Arc<Options>,
  165     wants_to_quit: &Arc<AtomicBool>,
  166     rx: Receiver<WorkerResult>,
  167 ) -> thread::JoinHandle<ExitCode> {
  168     let config = Arc::clone(config);
  169     let wants_to_quit = Arc::clone(wants_to_quit);
  170 
  171     let show_filesystem_errors = config.show_filesystem_errors;
  172     let threads = config.threads;
  173 
  174     thread::spawn(move || {
  175         // This will be set to `Some` if the `--exec` argument was supplied.
  176         if let Some(ref cmd) = config.command {
  177             if cmd.in_batch_mode() {
  178                 exec::batch(rx, cmd, show_filesystem_errors)
  179             } else {
  180                 let shared_rx = Arc::new(Mutex::new(rx));
  181 
  182                 let out_perm = Arc::new(Mutex::new(()));
  183 
  184                 // Each spawned job will store it's thread handle in here.
  185                 let mut handles = Vec::with_capacity(threads);
  186                 for _ in 0..threads {
  187                     let rx = Arc::clone(&shared_rx);
  188                     let cmd = Arc::clone(cmd);
  189                     let out_perm = Arc::clone(&out_perm);
  190 
  191                     // Spawn a job thread that will listen for and execute inputs.
  192                     let handle =
  193                         thread::spawn(move || exec::job(rx, cmd, out_perm, show_filesystem_errors));
  194 
  195                     // Push the handle of the spawned thread into the vector for later joining.
  196                     handles.push(handle);
  197                 }
  198 
  199                 // Wait for all threads to exit before exiting the program.
  200                 let mut results: Vec<ExitCode> = Vec::new();
  201                 for h in handles {
  202                     results.push(h.join().unwrap());
  203                 }
  204 
  205                 merge_exitcodes(&results)
  206             }
  207         } else {
  208             let start = time::Instant::now();
  209 
  210             let mut buffer = vec![];
  211 
  212             // Start in buffering mode
  213             let mut mode = ReceiverMode::Buffering;
  214 
  215             // Maximum time to wait before we start streaming to the console.
  216             let max_buffer_time = config
  217                 .max_buffer_time
  218                 .unwrap_or_else(|| time::Duration::from_millis(100));
  219 
  220             let stdout = io::stdout();
  221             let mut stdout = stdout.lock();
  222 
  223             let mut num_results = 0;
  224 
  225             for worker_result in rx {
  226                 match worker_result {
  227                     WorkerResult::Entry(value) => {
  228                         match mode {
  229                             ReceiverMode::Buffering => {
  230                                 buffer.push(value);
  231 
  232                                 // Have we reached the maximum buffer size or maximum buffering time?
  233                                 if buffer.len() > MAX_BUFFER_LENGTH
  234                                     || time::Instant::now() - start > max_buffer_time
  235                                 {
  236                                     // Flush the buffer
  237                                     for v in &buffer {
  238                                         output::print_entry(
  239                                             &mut stdout,
  240                                             v,
  241                                             &config,
  242                                             &wants_to_quit,
  243                                         );
  244                                     }
  245                                     buffer.clear();
  246 
  247                                     // Start streaming
  248                                     mode = ReceiverMode::Streaming;
  249                                 }
  250                             }
  251                             ReceiverMode::Streaming => {
  252                                 output::print_entry(&mut stdout, &value, &config, &wants_to_quit);
  253                             }
  254                         }
  255 
  256                         num_results += 1;
  257                     }
  258                     WorkerResult::Error(err) => {
  259                         if show_filesystem_errors {
  260                             print_error(err.to_string());
  261                         }
  262                     }
  263                 }
  264 
  265                 if let Some(max_results) = config.max_results {
  266                     if num_results >= max_results {
  267                         break;
  268                     }
  269                 }
  270             }
  271 
  272             // If we have finished fast enough (faster than max_buffer_time), we haven't streamed
  273             // anything to the console, yet. In this case, sort the results and print them:
  274             if !buffer.is_empty() {
  275                 buffer.sort();
  276                 for value in buffer {
  277                     output::print_entry(&mut stdout, &value, &config, &wants_to_quit);
  278                 }
  279             }
  280 
  281             ExitCode::Success
  282         }
  283     })
  284 }
  285 
  286 pub enum DirEntry {
  287     Normal(ignore::DirEntry),
  288     BrokenSymlink(PathBuf),
  289 }
  290 
  291 impl DirEntry {
  292     pub fn path(&self) -> &Path {
  293         match self {
  294             DirEntry::Normal(e) => e.path(),
  295             DirEntry::BrokenSymlink(pathbuf) => pathbuf.as_path(),
  296         }
  297     }
  298 
  299     pub fn file_type(&self) -> Option<FileType> {
  300         match self {
  301             DirEntry::Normal(e) => e.file_type(),
  302             DirEntry::BrokenSymlink(pathbuf) => {
  303                 pathbuf.symlink_metadata().map(|m| m.file_type()).ok()
  304             }
  305         }
  306     }
  307 
  308     pub fn metadata(&self) -> Option<Metadata> {
  309         match self {
  310             DirEntry::Normal(e) => e.metadata().ok(),
  311             DirEntry::BrokenSymlink(_) => None,
  312         }
  313     }
  314 
  315     pub fn depth(&self) -> Option<usize> {
  316         match self {
  317             DirEntry::Normal(e) => Some(e.depth()),
  318             DirEntry::BrokenSymlink(_) => None,
  319         }
  320     }
  321 }
  322 
  323 fn spawn_senders(
  324     config: &Arc<Options>,
  325     wants_to_quit: &Arc<AtomicBool>,
  326     pattern: Arc<Regex>,
  327     parallel_walker: ignore::WalkParallel,
  328     tx: Sender<WorkerResult>,
  329 ) {
  330     parallel_walker.run(|| {
  331         let config = Arc::clone(config);
  332         let pattern = Arc::clone(&pattern);
  333         let tx_thread = tx.clone();
  334         let wants_to_quit = Arc::clone(wants_to_quit);
  335 
  336         Box::new(move |entry_o| {
  337             if wants_to_quit.load(Ordering::Relaxed) {
  338                 return ignore::WalkState::Quit;
  339             }
  340 
  341             let entry = match entry_o {
  342                 Ok(ref e) if e.depth() == 0 => {
  343                     // Skip the root directory entry.
  344                     return ignore::WalkState::Continue;
  345                 }
  346                 Ok(e) => DirEntry::Normal(e),
  347                 Err(ignore::Error::WithPath {
  348                     path,
  349                     err: inner_err,
  350                 }) => match inner_err.as_ref() {
  351                     ignore::Error::Io(io_error)
  352                         if io_error.kind() == io::ErrorKind::NotFound
  353                             && path
  354                                 .symlink_metadata()
  355                                 .ok()
  356                                 .map_or(false, |m| m.file_type().is_symlink()) =>
  357                     {
  358                         DirEntry::BrokenSymlink(path)
  359                     }
  360                     _ => {
  361                         tx_thread
  362                             .send(WorkerResult::Error(ignore::Error::WithPath {
  363                                 path,
  364                                 err: inner_err,
  365                             }))
  366                             .unwrap();
  367                         return ignore::WalkState::Continue;
  368                     }
  369                 },
  370                 Err(err) => {
  371                     tx_thread.send(WorkerResult::Error(err)).unwrap();
  372                     return ignore::WalkState::Continue;
  373                 }
  374             };
  375 
  376             if let Some(min_depth) = config.min_depth {
  377                 if entry.depth().map_or(true, |d| d < min_depth) {
  378                     return ignore::WalkState::Continue;
  379                 }
  380             }
  381 
  382             // Check the name first, since it doesn't require metadata
  383             let entry_path = entry.path();
  384 
  385             let search_str: Cow<OsStr> = if config.search_full_path {
  386                 let path_abs_buf = filesystem::path_absolute_form(entry_path)
  387                     .expect("Retrieving absolute path succeeds");
  388                 Cow::Owned(path_abs_buf.as_os_str().to_os_string())
  389             } else {
  390                 match entry_path.file_name() {
  391                     Some(filename) => Cow::Borrowed(filename),
  392                     None => unreachable!(
  393                         "Encountered file system entry without a file name. This should only \
  394                          happen for paths like 'foo/bar/..' or '/' which are not supposed to \
  395                          appear in a file system traversal."
  396                     ),
  397                 }
  398             };
  399 
  400             if !pattern.is_match(&filesystem::osstr_to_bytes(search_str.as_ref())) {
  401                 return ignore::WalkState::Continue;
  402             }
  403 
  404             // Filter out unwanted extensions.
  405             if let Some(ref exts_regex) = config.extensions {
  406                 if let Some(path_str) = entry_path.file_name() {
  407                     if !exts_regex.is_match(&filesystem::osstr_to_bytes(path_str)) {
  408                         return ignore::WalkState::Continue;
  409                     }
  410                 } else {
  411                     return ignore::WalkState::Continue;
  412                 }
  413             }
  414 
  415             // Filter out unwanted file types.
  416             if let Some(ref file_types) = config.file_types {
  417                 if let Some(ref entry_type) = entry.file_type() {
  418                     if (!file_types.files && entry_type.is_file())
  419                         || (!file_types.directories && entry_type.is_dir())
  420                         || (!file_types.symlinks && entry_type.is_symlink())
  421                         || (!file_types.sockets && filesystem::is_socket(entry_type))
  422                         || (!file_types.pipes && filesystem::is_pipe(entry_type))
  423                         || (file_types.executables_only
  424                             && !entry
  425                                 .metadata()
  426                                 .map(|m| filesystem::is_executable(&m))
  427                                 .unwrap_or(false))
  428                         || (file_types.empty_only && !filesystem::is_empty(&entry))
  429                         || !(entry_type.is_file()
  430                             || entry_type.is_dir()
  431                             || entry_type.is_symlink()
  432                             || filesystem::is_socket(entry_type)
  433                             || filesystem::is_pipe(entry_type))
  434                     {
  435                         return ignore::WalkState::Continue;
  436                     }
  437                 } else {
  438                     return ignore::WalkState::Continue;
  439                 }
  440             }
  441 
  442             #[cfg(unix)]
  443             {
  444                 if let Some(ref owner_constraint) = config.owner_constraint {
  445                     if let Ok(ref metadata) = entry_path.metadata() {
  446                         if !owner_constraint.matches(&metadata) {
  447                             return ignore::WalkState::Continue;
  448                         }
  449                     } else {
  450                         return ignore::WalkState::Continue;
  451                     }
  452                 }
  453             }
  454 
  455             // Filter out unwanted sizes if it is a file and we have been given size constraints.
  456             if !config.size_constraints.is_empty() {
  457                 if entry_path.is_file() {
  458                     if let Ok(metadata) = entry_path.metadata() {
  459                         let file_size = metadata.len();
  460                         if config
  461                             .size_constraints
  462                             .iter()
  463                             .any(|sc| !sc.is_within(file_size))
  464                         {
  465                             return ignore::WalkState::Continue;
  466                         }
  467                     } else {
  468                         return ignore::WalkState::Continue;
  469                     }
  470                 } else {
  471                     return ignore::WalkState::Continue;
  472                 }
  473             }
  474 
  475             // Filter out unwanted modification times
  476             if !config.time_constraints.is_empty() {
  477                 let mut matched = false;
  478                 if let Ok(metadata) = entry_path.metadata() {
  479                     if let Ok(modified) = metadata.modified() {
  480                         matched = config
  481                             .time_constraints
  482                             .iter()
  483                             .all(|tf| tf.applies_to(&modified));
  484                     }
  485                 }
  486                 if !matched {
  487                     return ignore::WalkState::Continue;
  488                 }
  489             }
  490 
  491             let send_result = tx_thread.send(WorkerResult::Entry(entry_path.to_owned()));
  492 
  493             if send_result.is_err() {
  494                 return ignore::WalkState::Quit;
  495             }
  496 
  497             ignore::WalkState::Continue
  498         })
  499     });
  500 }