"Fossies" - the Fresh Open Source Software Archive  

Source code changes of the file "src/parallel" between
parallel-20210122.tar.bz2 and parallel-20210222.tar.bz2

About: GNU Parallel is a shell tool for executing jobs in parallel using multiple CPU cores and/or multiple computers.

parallel  (parallel-20210122.tar.bz2):parallel  (parallel-20210222.tar.bz2)
skipping to change at line 59 skipping to change at line 59
for my $fh (@$input_source_fh_ref) { for my $fh (@$input_source_fh_ref) {
my $line = <$fh>; my $line = <$fh>;
chomp($line); chomp($line);
$line =~ s/\r$//; $line =~ s/\r$//;
::debug("init", "Delimiter: '$delimiter'"); ::debug("init", "Delimiter: '$delimiter'");
for my $s (split /$delimiter/o, $line) { for my $s (split /$delimiter/o, $line) {
::debug("init", "Colname: '$s'"); ::debug("init", "Colname: '$s'");
# Replace {colname} with {2} # Replace {colname} with {2}
for(@$command_ref, @Global::ret_files, for(@$command_ref, @Global::ret_files,
@Global::transfer_files, $opt::tagstring, @Global::transfer_files, $opt::tagstring,
$opt::workdir, $opt::results, $opt::retries) { $opt::workdir, $opt::results, $opt::retries,
@Global::template_contents, @Global::template_names,
@opt::filter) {
# Skip if undefined # Skip if undefined
$_ or next; $_ or next;
s:\{$s(|/|//|\.|/\.)\}:\{$id$1\}:g; s:\{$s(|/|//|\.|/\.)\}:\{$id$1\}:g;
# {=header1 ... =} => {=1 ... =} # {=header1 ... =} => {=1 ... =}
s:$left $s (.*?) $right:$l$id$1$r:gx; s:$left $s (.*?) $right:$l$id$1$r:gx;
} }
$Global::input_source_header{$id} = $s; $Global::input_source_header{$id} = $s;
$id++; $id++;
} }
} }
skipping to change at line 1545 skipping to change at line 1547
"sshloginfile|slf=s" => \@opt::sshloginfile, "sshloginfile|slf=s" => \@opt::sshloginfile,
"controlmaster|M" => \$opt::controlmaster, "controlmaster|M" => \$opt::controlmaster,
"ssh=s" => \$opt::ssh, "ssh=s" => \$opt::ssh,
"transfer-file|transferfile|transfer-files|transferfiles|tf=s" "transfer-file|transferfile|transfer-files|transferfiles|tf=s"
=> \@opt::transfer_files, => \@opt::transfer_files,
"return=s" => \@opt::return, "return=s" => \@opt::return,
"trc=s" => \@opt::trc, "trc=s" => \@opt::trc,
"transfer" => \$opt::transfer, "transfer" => \$opt::transfer,
"cleanup" => \$opt::cleanup, "cleanup" => \$opt::cleanup,
"basefile|bf=s" => \@opt::basefile, "basefile|bf=s" => \@opt::basefile,
"template|tmpl=s" => \%opt::template,
"B=s" => \$opt::retired, "B=s" => \$opt::retired,
"ctrlc|ctrl-c" => \$opt::retired, "ctrlc|ctrl-c" => \$opt::retired,
"noctrlc|no-ctrlc|no-ctrl-c" => \$opt::retired, "noctrlc|no-ctrlc|no-ctrl-c" => \$opt::retired,
"workdir|work-dir|wd=s" => \$opt::workdir, "workdir|work-dir|wd=s" => \$opt::workdir,
"W=s" => \$opt::retired, "W=s" => \$opt::retired,
"rsync-opts|rsyncopts=s" => \$opt::rsync_opts, "rsync-opts|rsyncopts=s" => \$opt::rsync_opts,
"tmpdir|tempdir=s" => \$opt::tmpdir, "tmpdir|tempdir=s" => \$opt::tmpdir,
"use-compress-program|compress-program=s" => \$opt::compress_program, "use-compress-program|compress-program=s" => \$opt::compress_program,
"use-decompress-program|decompress-program=s" "use-decompress-program|decompress-program=s"
=> \$opt::decompress_program, => \$opt::decompress_program,
skipping to change at line 1572 skipping to change at line 1575
"bar" => \$opt::bar, "bar" => \$opt::bar,
"shuf" => \$opt::shuf, "shuf" => \$opt::shuf,
"arg-sep|argsep=s" => \$opt::arg_sep, "arg-sep|argsep=s" => \$opt::arg_sep,
"arg-file-sep|argfilesep=s" => \$opt::arg_file_sep, "arg-file-sep|argfilesep=s" => \$opt::arg_file_sep,
"trim=s" => \$opt::trim, "trim=s" => \$opt::trim,
"env=s" => \@opt::env, "env=s" => \@opt::env,
"recordenv|record-env" => \$opt::record_env, "recordenv|record-env" => \$opt::record_env,
"session" => \$opt::session, "session" => \$opt::session,
"plain" => \$opt::plain, "plain" => \$opt::plain,
"profile|J=s" => \@opt::profile, "profile|J=s" => \@opt::profile,
"pipe|spreadstdin" => \$opt::pipe,
"robin|round-robin|roundrobin" => \$opt::roundrobin,
"recstart=s" => \$opt::recstart,
"recend=s" => \$opt::recend,
"regexp|regex" => \$opt::regexp,
"remove-rec-sep|removerecsep|rrs" => \$opt::remove_rec_sep,
"files|output-as-files|outputasfiles" => \$opt::files,
"block|block-size|blocksize=s" => \$opt::blocksize,
"blocktimeout|block-timeout|bt=s" => \$opt::blocktimeout,
"tollef" => \$opt::tollef, "tollef" => \$opt::tollef,
"gnu" => \$opt::gnu, "gnu" => \$opt::gnu,
"link|xapply" => \$opt::link, "link|xapply" => \$opt::link,
"linkinputsource|xapplyinputsource=i" => \@opt::linkinputsource, "linkinputsource|xapplyinputsource=i" => \@opt::linkinputsource,
# Before changing this line, please read # Before changing this line, please read
# https://www.gnu.org/software/parallel/parallel_design.html#Citation-no tice # https://www.gnu.org/software/parallel/parallel_design.html#Citation-no tice
# https://git.savannah.gnu.org/cgit/parallel.git/tree/doc/citation-notic e-faq.txt # https://git.savannah.gnu.org/cgit/parallel.git/tree/doc/citation-notic e-faq.txt
"bibtex|citation" => \$opt::citation, "bibtex|citation" => \$opt::citation,
"wc|willcite|will-cite|nn|nonotice|no-notice" => \$opt::willcite, "wc|willcite|will-cite|nn|nonotice|no-notice" => \$opt::willcite,
# Termination and retries # Termination and retries
skipping to change at line 1634 skipping to change at line 1628
"fg" => \$opt::fg, "fg" => \$opt::fg,
"bg" => \$opt::bg, "bg" => \$opt::bg,
"wait" => \$opt::wait, "wait" => \$opt::wait,
# Shebang #!/usr/bin/parallel --shebang # Shebang #!/usr/bin/parallel --shebang
"shebang|hashbang" => \$opt::shebang, "shebang|hashbang" => \$opt::shebang,
"internal-pipe-means-argfiles" "internal-pipe-means-argfiles"
=> \$opt::internal_pipe_means_argfiles, => \$opt::internal_pipe_means_argfiles,
"Y" => \$opt::retired, "Y" => \$opt::retired,
"skip-first-line" => \$opt::skip_first_line, "skip-first-line" => \$opt::skip_first_line,
"bug" => \$opt::bug, "bug" => \$opt::bug,
# --pipe
"pipe|spreadstdin" => \$opt::pipe,
"robin|round-robin|roundrobin" => \$opt::roundrobin,
"recstart=s" => \$opt::recstart,
"recend=s" => \$opt::recend,
"regexp|regex" => \$opt::regexp,
"remove-rec-sep|removerecsep|rrs" => \$opt::remove_rec_sep,
"files|output-as-files|outputasfiles" => \$opt::files,
"block|block-size|blocksize=s" => \$opt::blocksize,
"blocktimeout|block-timeout|bt=s" => \$opt::blocktimeout,
"header=s" => \$opt::header, "header=s" => \$opt::header,
"cat" => \$opt::cat, "cat" => \$opt::cat,
"fifo" => \$opt::fifo, "fifo" => \$opt::fifo,
"pipepart|pipe-part" => \$opt::pipepart, "pipepart|pipe-part" => \$opt::pipepart,
"tee" => \$opt::tee, "tee" => \$opt::tee,
"shard=s" => \$opt::shard, "shard=s" => \$opt::shard,
"bin=s" => \$opt::bin, "bin=s" => \$opt::bin,
"groupby|group-by=s" => \$opt::groupby, "groupby|group-by=s" => \$opt::groupby,
#
"hgrp|hostgrp|hostgroup|hostgroups" => \$opt::hostgroups, "hgrp|hostgrp|hostgroup|hostgroups" => \$opt::hostgroups,
"embed" => \$opt::embed, "embed" => \$opt::embed,
"filter=s" => \@opt::filter,
); );
} }
sub get_options_from_array($@) { sub get_options_from_array($@) {
# Run GetOptions on @array # Run GetOptions on @array
# Input: # Input:
# $array_ref = ref to @ARGV to parse # $array_ref = ref to @ARGV to parse
# @keep_only = Keep only these options # @keep_only = Keep only these options
# Uses: # Uses:
# @ARGV # @ARGV
skipping to change at line 1802 skipping to change at line 1808
if(defined $opt::version) { version(); wait_and_exit(0); } if(defined $opt::version) { version(); wait_and_exit(0); }
if(defined $opt::record_env) { record_env(); wait_and_exit(0); } if(defined $opt::record_env) { record_env(); wait_and_exit(0); }
if(defined $opt::show_limits) { show_limits(); } if(defined $opt::show_limits) { show_limits(); }
if(@opt::sshlogin) { @Global::sshlogin = @opt::sshlogin; } if(@opt::sshlogin) { @Global::sshlogin = @opt::sshlogin; }
if(@opt::sshloginfile) { read_sshloginfiles(@opt::sshloginfile); } if(@opt::sshloginfile) { read_sshloginfiles(@opt::sshloginfile); }
if(@opt::return) { push @Global::ret_files, @opt::return; } if(@opt::return) { push @Global::ret_files, @opt::return; }
if($opt::transfer) { if($opt::transfer) {
push @Global::transfer_files, $opt::i || $opt::I || "{}"; push @Global::transfer_files, $opt::i || $opt::I || "{}";
} }
push @Global::transfer_files, @opt::transfer_files; push @Global::transfer_files, @opt::transfer_files;
if(%opt::template) {
while (my ($source, $template_name) = each %opt::template) {
if(open(my $tmpl, "<", $source)) {
local $/; # $/ = undef => slurp whole file
my $content = <$tmpl>;
push @Global::template_names, $template_name;
push @Global::template_contents, $content;
::debug("tmpl","Name: $template_name\n$content\n");
} else {
::error("Cannot open '$source'.");
wait_and_exit(255);
}
}
}
if(not defined $opt::recstart and if(not defined $opt::recstart and
not defined $opt::recend) { $opt::recend = "\n"; } not defined $opt::recend) { $opt::recend = "\n"; }
$Global::blocksize = multiply_binary_prefix($opt::blocksize || "1M"); $Global::blocksize = multiply_binary_prefix($opt::blocksize || "1M");
if($Global::blocksize > 2**31-1 and not $opt::pipepart) { if($Global::blocksize > 2**31-1 and not $opt::pipepart) {
warning("--blocksize >= 2G causes problems. Using 2G-1."); warning("--blocksize >= 2G causes problems. Using 2G-1.");
$Global::blocksize = 2**31-1; $Global::blocksize = 2**31-1;
} }
if($^O eq "cygwin" and if($^O eq "cygwin" and
($opt::pipe or $opt::pipepart or $opt::roundrobin) ($opt::pipe or $opt::pipepart or $opt::roundrobin)
and $Global::blocksize > 65535) { and $Global::blocksize > 65535) {
skipping to change at line 2071 skipping to change at line 2091
if(not defined $opt::jobs) { $opt::jobs = "100%"; } if(not defined $opt::jobs) { $opt::jobs = "100%"; }
open_joblog(); open_joblog();
open_json_csv(); open_json_csv();
if($opt::sqlmaster or $opt::sqlworker) { if($opt::sqlmaster or $opt::sqlworker) {
$Global::sql = SQL->new($opt::sqlmaster || $opt::sqlworker); $Global::sql = SQL->new($opt::sqlmaster || $opt::sqlworker);
} }
if($opt::sqlworker) { $Global::membuffer ||= 1; } if($opt::sqlworker) { $Global::membuffer ||= 1; }
# The sqlmaster groups the arguments, so the should just read one # The sqlmaster groups the arguments, so the should just read one
if($opt::sqlworker and not $opt::sqlmaster) { $Global::max_number_of_args = 1; } if($opt::sqlworker and not $opt::sqlmaster) { $Global::max_number_of_args = 1; }
} }
sub check_invalid_option_combinations() { sub check_invalid_option_combinations() {
if(defined $opt::timeout and if(defined $opt::timeout and
$opt::timeout !~ /^\d+(\.\d+)?%?$|^(\d+(\.\d+)?[dhms])+$/i) { $opt::timeout !~ /^\d+(\.\d+)?%?$|^(\d+(\.\d+)?[dhms])+$/i) {
::error("--timeout must be seconds or percentage."); ::error("--timeout must be seconds or percentage.");
wait_and_exit(255); wait_and_exit(255);
} }
if(defined $opt::fifo and defined $opt::cat) { if(defined $opt::fifo and defined $opt::cat) {
::error("--fifo cannot be combined with --cat."); ::error("--fifo cannot be combined with --cat.");
skipping to change at line 2146 skipping to change at line 2165
} }
if($opt::recend ne "\n") { if($opt::recend ne "\n") {
::error("--recend is not compatible with --groupby"); ::error("--recend is not compatible with --groupby");
::wait_and_exit(255); ::wait_and_exit(255);
} }
} }
} }
sub init_globals() { sub init_globals() {
# Defaults: # Defaults:
$Global::version = 20210122; $Global::version = 20210222;
$Global::progname = 'parallel'; $Global::progname = 'parallel';
$::name = "GNU Parallel"; $::name = "GNU Parallel";
$Global::infinity = 2**31; $Global::infinity = 2**31;
$Global::debug = 0; $Global::debug = 0;
$Global::verbose = 0; $Global::verbose = 0;
# Don't quote every part of the command line # Don't quote every part of the command line
$Global::quoting = 0; $Global::quoting = 0;
# Quote replacement strings # Quote replacement strings
$Global::quote_replace = 1; $Global::quote_replace = 1;
$Global::total_completed = 0; $Global::total_completed = 0;
skipping to change at line 2188 skipping to change at line 2207
'{+.}' => 's:.*\.::', '{+.}' => 's:.*\.::',
'{+..}' => 's:.*\.([^.]*\.):$1:', '{+..}' => 's:.*\.([^.]*\.):$1:',
'{+...}' => 's:.*\.([^.]*\.[^.]*\.):$1:', '{+...}' => 's:.*\.([^.]*\.[^.]*\.):$1:',
'{..}' => 's:\.[^/.]+$::; s:\.[^/.]+$::', '{..}' => 's:\.[^/.]+$::; s:\.[^/.]+$::',
'{...}' => 's:\.[^/.]+$::; s:\.[^/.]+$::; s:\.[^/.]+$::', '{...}' => 's:\.[^/.]+$::; s:\.[^/.]+$::; s:\.[^/.]+$::',
'{/..}' => 's:.*/::; s:\.[^/.]+$::; s:\.[^/.]+$::', '{/..}' => 's:.*/::; s:\.[^/.]+$::; s:\.[^/.]+$::',
'{/...}' => 's:.*/::; s:\.[^/.]+$::; s:\.[^/.]+$::; s:\.[^/.]+$::', '{/...}' => 's:.*/::; s:\.[^/.]+$::; s:\.[^/.]+$::; s:\.[^/.]+$::',
'{choose_k}' => 'for $t (2..$#arg){ if($arg[$t-1] ge $arg[$t]) { skip() } }', '{choose_k}' => 'for $t (2..$#arg){ if($arg[$t-1] ge $arg[$t]) { skip() } }',
# {##} = number of jobs # {##} = number of jobs
'{##}' => '$_=total_jobs()', '{##}' => '$_=total_jobs()',
# {0%} = 0-padded jobslot
'{0%}' => '$f=1+int((log($Global::max_jobs_running||1)/log(10))); $_=spr
intf("%0${f}d",slot())',
# {0%} = 0-padded seq
'{0#}' => '$f=1+int((log(total_jobs())/log(10))); $_=sprintf("%0${f}d",s
eq())',
## Bash inspired replacement strings
# Bash ${a:-myval} # Bash ${a:-myval}
'{:-([^}]+?)}' => '$_ ||= $$1', '{:-([^}]+?)}' => '$_ ||= $$1',
# Bash ${a:2} # Bash ${a:2}
'{:(\d+?)}' => 'substr($_,0,$$1) = ""', '{:(\d+?)}' => 'substr($_,0,$$1) = ""',
# Bash ${a:2:3} # Bash ${a:2:3}
'{:(\d+?):(\d+?)}' => '$_ = substr($_,$$1,$$2);', '{:(\d+?):(\d+?)}' => '$_ = substr($_,$$1,$$2);',
# Bash ${a#bc} # Bash ${a#bc}
'{#([^#}][^}]*?)}' => 's/^$$1//;', '{#([^#}][^}]*?)}' => 's/^$$1//;',
# Bash ${a%def} # Bash ${a%def}
'{%([^}]+?)}' => 's/$$1$//;', '{%([^}]+?)}' => 's/$$1$//;',
# Bash ${a/def/ghi} ${a/def/} # Bash ${a/def/ghi} ${a/def/}
'{/([^}]+?)/([^}]*?)}' => 's/$$1/$$2/;', '{/([^}]+?)/([^}]*?)}' => 's/$$1/$$2/;',
# Bash ${a^a} # Bash ${a^a}
'{^([^}]+?)}' => 's/^($$1)/uc($1)/e;', '{^([^}]+?)}' => 's/^($$1)/uc($1)/e;',
# Bash ${a^^a} # Bash ${a^^a}
'{^^([^}]+?)}' => 's/($$1)/uc($1)/eg;', '{^^([^}]+?)}' => 's/($$1)/uc($1)/eg;',
# Bash ${a,A} # Bash ${a,A}
'{,([^}]+?)}' => 's/^($$1)/lc($1)/e;', '{,([^}]+?)}' => 's/^($$1)/lc($1)/e;',
# Bash ${a,,A} # Bash ${a,,A}
'{,,([^}]+?)}' => 's/($$1)/lc($1)/eg;', '{,,([^}]+?)}' => 's/($$1)/lc($1)/eg;',
# {slot} = $PARALLEL_JOBSLOT # {slot} = $PARALLEL_JOBSLOT
'{slot}' => '1 $_="\${PARALLEL_JOBSLOT}";uq()', '{slot}' => '1 $_="\${PARALLEL_JOBSLOT}";uq()',
# {host} = ssh host # {host} = ssh host
'{host}' => '1 $_="\${PARALLEL_SSHHOST}";uq()', '{host}' => '1 $_="\${PARALLEL_SSHHOST}";uq()',
# {sshlogin} = sshlogin # {sshlogin} = sshlogin
'{sshlogin}' => '1 $_="\${PARALLEL_SSHLOGIN}";uq()', '{sshlogin}' => '1 $_="\${PARALLEL_SSHLOGIN}";uq()',
# {hgrp} = hostgroups of the host # {hgrp} = hostgroups of the host
'{hgrp}' => '1 $_="\${PARALLEL_HOSTGROUPS}";uq()', '{hgrp}' => '1 $_="\${PARALLEL_HOSTGROUPS}";uq()',
# {agrp} = hostgroups of the argument # {agrp} = hostgroups of the argument
'{agrp}' => '1 $_="\${PARALLEL_ARGHOSTGROUPS}";uq()', '{agrp}' => '1 $_="\${PARALLEL_ARGHOSTGROUPS}";uq()',
skipping to change at line 3450 skipping to change at line 3476
push @arg, map { $_->orig() } @$record; push @arg, map { $_->orig() } @$record;
} }
::error("No more processes: cannot run a single job. Some thing is wrong at @arg."); ::error("No more processes: cannot run a single job. Some thing is wrong at @arg.");
::wait_and_exit(255); ::wait_and_exit(255);
} }
$sshlogin->set_max_jobs_running($max); $sshlogin->set_max_jobs_running($max);
# Sleep up to 300 ms to give other processes time to die # Sleep up to 300 ms to give other processes time to die
::usleep(rand()*300); ::usleep(rand()*300);
::warning("No more processes: ". ::warning("No more processes: ".
"Decreasing number of running jobs to $max.", "Decreasing number of running jobs to $max.",
"Raising ulimit -u or /etc/security/limits.conf may "Try increasing 'ulimit -u' (try: ulimit -u `ulimit
help."); -Hu`)",
"or increasing 'nproc' in /etc/security/limits.conf
",
"or increasing /proc/sys/kernel/pid_max");
return 0; return 0;
} }
} }
} else { } else {
# No more file handles # No more file handles
$no_more_file_handles_warned++ or $no_more_file_handles_warned++ or
::warning("No more file handles. ", ::warning("No more file handles. ",
"Raising ulimit -n or /etc/security/limits.conf may hel "Try running 'parallel -j0 -N 100 --pipe parallel -j0'"
p."); ,
"or increasing 'ulimit -n' (try: ulimit -n `ulimit -Hn`
)",
"or increasing 'nofile' in /etc/security/limits.conf",
"or increasing /proc/sys/fs/file-max");
debug("start", "No more file handles. "); debug("start", "No more file handles. ");
return 0; return 0;
} }
} }
} }
sub init_progress() { sub init_progress() {
# Uses: # Uses:
# $opt::bar # $opt::bar
# Returns: # Returns:
skipping to change at line 4063 skipping to change at line 4094
if(not remote_hosts()) { if(not remote_hosts()) {
# There are no remote hosts # There are no remote hosts
if(@opt::trc) { if(@opt::trc) {
::warning("--trc ignored as there are no remote --sshlogin."); ::warning("--trc ignored as there are no remote --sshlogin.");
} elsif (defined $opt::transfer) { } elsif (defined $opt::transfer) {
::warning("--transfer ignored as there are no remote --sshlogin." ); ::warning("--transfer ignored as there are no remote --sshlogin." );
} elsif (@opt::transfer_files) { } elsif (@opt::transfer_files) {
::warning("--transferfile ignored as there are no remote --sshlog in."); ::warning("--transferfile ignored as there are no remote --sshlog in.");
} elsif (@opt::return) { } elsif (@opt::return) {
::warning("--return ignored as there are no remote --sshlogin.") ; ::warning("--return ignored as there are no remote --sshlogin.") ;
} elsif (defined $opt::cleanup) { } elsif (defined $opt::cleanup and not %opt::template) {
::warning("--cleanup ignored as there are no remote --sshlogin.") ; ::warning("--cleanup ignored as there are no remote --sshlogin.") ;
} elsif (@opt::basefile) { } elsif (@opt::basefile) {
::warning("--basefile ignored as there are no remote --sshlogin. "); ::warning("--basefile ignored as there are no remote --sshlogin. ");
} }
} }
} }
} }
sub remote_hosts() { sub remote_hosts() {
# Return sshlogins that are not ':' # Return sshlogins that are not ':'
skipping to change at line 4099 skipping to change at line 4130
my $rsync_destdir; my $rsync_destdir;
my $workdir; my $workdir;
for my $sshlogin (values %Global::host) { for my $sshlogin (values %Global::host) {
if($sshlogin->string() eq ":") { next } if($sshlogin->string() eq ":") { next }
for my $file (@opt::basefile) { for my $file (@opt::basefile) {
if($file !~ m:^/: and $opt::workdir eq "...") { if($file !~ m:^/: and $opt::workdir eq "...") {
::error("Work dir '...' will not work with relative basefiles."); ::error("Work dir '...' will not work with relative basefiles.");
::wait_and_exit(255); ::wait_and_exit(255);
} }
if(not $workdir) { if(not $workdir) {
my $dummycmdline = CommandLine->new(1,["true"],{},0,0,[],[],{},{},{}) ; my $dummycmdline = CommandLine->new(1,["true"],{},0,0,[],[],[],[],{}, {});
my $dummyjob = Job->new($dummycmdline); my $dummyjob = Job->new($dummycmdline);
$workdir = $dummyjob->workdir(); $workdir = $dummyjob->workdir();
} }
push @cmd, $sshlogin->rsync_transfer_cmd($file,$workdir); push @cmd, $sshlogin->rsync_transfer_cmd($file,$workdir);
} }
} }
debug("init", "basesetup: @cmd\n"); debug("init", "basesetup: @cmd\n");
my ($exitstatus,$stdout_ref,$stderr_ref) = my ($exitstatus,$stdout_ref,$stderr_ref) =
run_gnu_parallel((join "\n",@cmd),"-j0","--retries",5); run_gnu_parallel((join "\n",@cmd),"-j0","--retries",5);
if($exitstatus) { if($exitstatus) {
skipping to change at line 4126 skipping to change at line 4157
sub cleanup_basefile() { sub cleanup_basefile() {
# Remove the basefiles transferred # Remove the basefiles transferred
# Uses: # Uses:
# %Global::host # %Global::host
# @opt::basefile # @opt::basefile
# Returns: N/A # Returns: N/A
my @cmd; my @cmd;
my $workdir; my $workdir;
if(not $workdir) { if(not $workdir) {
my $dummycmdline = CommandLine->new(1,"true",0,0,0,0,0,{},{},{}); my $dummycmdline = CommandLine->new(1,["true"],{},0,0,[],[],[],[],{},{});
my $dummyjob = Job->new($dummycmdline); my $dummyjob = Job->new($dummycmdline);
$workdir = $dummyjob->workdir(); $workdir = $dummyjob->workdir();
} }
for my $sshlogin (values %Global::host) { for my $sshlogin (values %Global::host) {
if($sshlogin->string() eq ":") { next } if($sshlogin->string() eq ":") { next }
for my $file (@opt::basefile) { for my $file (@opt::basefile) {
push @cmd, $sshlogin->cleanup_cmd($file,$workdir); push @cmd, $sshlogin->cleanup_cmd($file,$workdir);
} }
} }
debug("init", "basecleanup: @cmd\n"); debug("init", "basecleanup: @cmd\n");
skipping to change at line 4329 skipping to change at line 4360
if(not $nsockets{$col[0]}) { if(not $nsockets{$col[0]}) {
$nsockets{$col[0]} = $col[1]; $nsockets{$col[0]} = $col[1];
} elsif(not $ncores{$col[0]}) { } elsif(not $ncores{$col[0]}) {
$ncores{$col[0]} = $col[1]; $ncores{$col[0]} = $col[1];
} elsif(not $nthreads{$col[0]}) { } elsif(not $nthreads{$col[0]}) {
$nthreads{$col[0]} = $col[1]; $nthreads{$col[0]} = $col[1];
} elsif(not $maxlen{$col[0]}) { } elsif(not $maxlen{$col[0]}) {
$maxlen{$col[0]} = $col[1]; $maxlen{$col[0]} = $col[1];
} elsif(not $echo{$col[0]}) { } elsif(not $echo{$col[0]}) {
$echo{$col[0]} = $col[1]; $echo{$col[0]} = $col[1];
} elsif(m/perl: warning:|LANGUAGE =|LC_ALL =|LANG =|are supported and installed/) { } elsif(m/perl: warning:|LANGUAGE =|LC_ALL =|LANG =|are supported and installed|Disconnected from|Received disconnect from/) {
# Skip these: # Skip these:
# perl: warning: Setting locale failed. # perl: warning: Setting locale failed.
# perl: warning: Please check that your locale settings: # perl: warning: Please check that your locale settings:
# LANGUAGE = (unset), # LANGUAGE = (unset),
# LC_ALL = (unset), # LC_ALL = (unset),
# LANG = "en_US.UTF-8" # LANG = "en_US.UTF-8"
# are supported and installed on your system. # are supported and installed on your system.
# perl: warning: Falling back to the standard locale ("C"). # perl: warning: Falling back to the standard locale ("C").
# Disconnected from 127.0.0.1 port 22
} else { } else {
::die_bug("host check too many col0: $_"); ::die_bug("host check too many col0: $_");
} }
} else { } else {
::die_bug("host check unmatched short jobline ($col[0]): $_"); ::die_bug("host check unmatched short jobline ($col[0]): $_");
} }
} }
@down_hosts = uniq(@down_hosts); @down_hosts = uniq(@down_hosts);
return(\%nsockets, \%ncores, \%nthreads, \%time_to_login, return(\%nsockets, \%ncores, \%nthreads, \%time_to_login,
\%maxlen, \%echo, \@down_hosts); \%maxlen, \%echo, \@down_hosts);
skipping to change at line 4372 skipping to change at line 4404
# Wrap with ssh and --env # Wrap with ssh and --env
# Return $default_value if command fails # Return $default_value if command fails
my $sshlogin = shift; my $sshlogin = shift;
my $command = shift; my $command = shift;
my $default_value = shift; my $default_value = shift;
# wrapper that returns $default_value if the command fails: # wrapper that returns $default_value if the command fails:
# bug #57886: Errors when using different version on remote # bug #57886: Errors when using different version on remote
# perl -e '$a=`$command`; print $? ? "$default_value" : $a' # perl -e '$a=`$command`; print $? ? "$default_value" : $a'
my $wcmd = q(perl -e '$a=`).$command.q(`;). my $wcmd = q(perl -e '$a=`).$command.q(`;).
q(print $? ? ").::pQ($default_value).q(" : $a'); q(print $? ? ").::pQ($default_value).q(" : $a');
my $commandline = CommandLine->new(1,[$wcmd],{},0,0,[],[],{},{},{}); my $commandline = CommandLine->new(1,[$wcmd],{},0,0,[],[],[],[],{},{});
my $job = Job->new($commandline); my $job = Job->new($commandline);
$job->set_sshlogin($sshlogin); $job->set_sshlogin($sshlogin);
$job->wrapped(); $job->wrapped();
return($job->{'wrapped'}); return($job->{'wrapped'});
} }
my(@sockets, @cores, @threads, @maxline, @echo); my(@sockets, @cores, @threads, @maxline, @echo);
while (my ($host, $sshlogin) = each %Global::host) { while (my ($host, $sshlogin) = each %Global::host) {
if($host eq ":") { next } if($host eq ":") { next }
# The 'true' is used to get the $host out later # The 'true' is used to get the $host out later
skipping to change at line 4678 skipping to change at line 4710
} }
} }
sub start_no_new_jobs() { sub start_no_new_jobs() {
# Start no more jobs # Start no more jobs
# Uses: # Uses:
# %Global::original_sig # %Global::original_sig
# %Global::unlink # %Global::unlink
# $Global::start_no_new_jobs # $Global::start_no_new_jobs
# Returns: N/A # Returns: N/A
# $SIG{TERM} = $Global::original_sig{TERM};
unlink keys %Global::unlink; unlink keys %Global::unlink;
::status ::status
("$Global::progname: SIGHUP received. No new jobs will be started.", ("$Global::progname: SIGHUP received. No new jobs will be started.",
"$Global::progname: Waiting for these ".(keys %Global::running). "$Global::progname: Waiting for these ".(keys %Global::running).
" jobs to finish. Send SIGTERM to stop now."); " jobs to finish. Send SIGTERM to stop now.");
list_running_jobs(); list_running_jobs();
$Global::start_no_new_jobs ||= 1; $Global::start_no_new_jobs ||= 1;
} }
sub reapers() { sub reapers() {
skipping to change at line 4917 skipping to change at line 4948
"--pipe Split stdin (standard input) to multiple jobs.", "--pipe Split stdin (standard input) to multiple jobs.",
"--recend str Record end separator for --pipe.", "--recend str Record end separator for --pipe.",
"--recstart str Record start separator for --pipe.", "--recstart str Record start separator for --pipe.",
"", "",
"GNU Parallel can do much more. See 'man $Global::progname' for details" , "GNU Parallel can do much more. See 'man $Global::progname' for details" ,
"", "",
"Academic tradition requires you to cite works you base your article on. ", "Academic tradition requires you to cite works you base your article on. ",
"If you use programs that use GNU Parallel to process data for an articl e in a", "If you use programs that use GNU Parallel to process data for an articl e in a",
"scientific publication, please cite:", "scientific publication, please cite:",
"", "",
" Tange, O. (2021, January 22). GNU Parallel 20210122 ('Capitol Riots' " Tange, O. (2021, February 22). GNU Parallel 20210222 ('AngSangSuKyi'
).", ).",
" Zenodo. https://doi.org/10.5281/zenodo.4454976", " Zenodo. https://doi.org/10.5281/zenodo.4554342",
"", "",
# Before changing this line, please read # Before changing this line, please read
# https://www.gnu.org/software/parallel/parallel_design.html#Citation-n otice # https://www.gnu.org/software/parallel/parallel_design.html#Citation-n otice
# https://git.savannah.gnu.org/cgit/parallel.git/tree/doc/citation-notic e-faq.txt # https://git.savannah.gnu.org/cgit/parallel.git/tree/doc/citation-notic e-faq.txt
"This helps funding further development; AND IT WON'T COST YOU A CENT.", "This helps funding further development; AND IT WON'T COST YOU A CENT.",
"If you pay 10000 EUR you should feel free to use GNU Parallel without c iting.", "If you pay 10000 EUR you should feel free to use GNU Parallel without c iting.",
"", "",
"",); "",);
} }
skipping to change at line 4948 skipping to change at line 4979
not -t $Global::original_stderr not -t $Global::original_stderr
or or
grep { -e "$_/will-cite" } @Global::config_dirs) { grep { -e "$_/will-cite" } @Global::config_dirs) {
# skip # skip
} else { } else {
::status ::status
("Academic tradition requires you to cite works you base your article on.", ("Academic tradition requires you to cite works you base your article on.",
"If you use programs that use GNU Parallel to process data for an ar ticle in a", "If you use programs that use GNU Parallel to process data for an ar ticle in a",
"scientific publication, please cite:", "scientific publication, please cite:",
"", "",
" Tange, O. (2021, January 22). GNU Parallel 20210122 ('Capitol Rio " Tange, O. (2021, February 22). GNU Parallel 20210222 ('AngSangSuK
ts').", yi').",
" Zenodo. https://doi.org/10.5281/zenodo.4454976", " Zenodo. https://doi.org/10.5281/zenodo.4554342",
"", "",
# Before changing this line, please read # Before changing this line, please read
# https://www.gnu.org/software/parallel/parallel_design.html#Citatio n-notice and # https://www.gnu.org/software/parallel/parallel_design.html#Citatio n-notice and
# https://git.savannah.gnu.org/cgit/parallel.git/tree/doc/citation-n otice-faq.txt # https://git.savannah.gnu.org/cgit/parallel.git/tree/doc/citation-n otice-faq.txt
"This helps funding further development; AND IT WON'T COST YOU A CEN T.", "This helps funding further development; AND IT WON'T COST YOU A CEN T.",
"If you pay 10000 EUR you should feel free to use GNU Parallel witho ut citing.", "If you pay 10000 EUR you should feel free to use GNU Parallel witho ut citing.",
"", "",
"More about funding GNU Parallel and the citation notice:", "More about funding GNU Parallel and the citation notice:",
"https://www.gnu.org/software/parallel/parallel_design.html#Citation -notice", "https://www.gnu.org/software/parallel/parallel_design.html#Citation -notice",
"", "",
skipping to change at line 5062 skipping to change at line 5093
if($all_argv ne $no_opts) { if($all_argv ne $no_opts) {
::warning("--citation ignores all other options and arguments."); ::warning("--citation ignores all other options and arguments.");
::status(""); ::status("");
} }
::status( ::status(
"Academic tradition requires you to cite works you base your article on." , "Academic tradition requires you to cite works you base your article on." ,
"If you use programs that use GNU Parallel to process data for an article in a", "If you use programs that use GNU Parallel to process data for an article in a",
"scientific publication, please cite:", "scientific publication, please cite:",
"", "",
"\@software{tange_2021_4454976,", "\@software{tange_2021_4554342,",
" author = {Tange, Ole},", " author = {Tange, Ole},",
" title = {GNU Parallel 20210122 ('Capitol Riots')},", " title = {GNU Parallel 20210222 ('AngSangSuKyi')},",
" month = Jan,", " month = Feb,",
" year = 2020,", " year = 2020,",
" note = {{GNU Parallel is a general parallelizer to run", " note = {{GNU Parallel is a general parallelizer to run",
" multiple serial command line programs in paralle l", " multiple serial command line programs in paralle l",
" without changing them.}},", " without changing them.}},",
" publisher = {Zenodo},", " publisher = {Zenodo},",
" doi = {10.5281/zenodo.4454976},", " doi = {10.5281/zenodo.4554342},",
" url = {https://doi.org/10.5281/zenodo.4454976}", " url = {https://doi.org/10.5281/zenodo.4554342}",
"}", "}",
"", "",
"(Feel free to use \\nocite{tange_2021_4454976})", "(Feel free to use \\nocite{tange_2021_4554342})",
"", "",
# Before changing this line, please read # Before changing this line, please read
# https://www.gnu.org/software/parallel/parallel_design.html#Citation-not ice and # https://www.gnu.org/software/parallel/parallel_design.html#Citation-not ice and
# https://git.savannah.gnu.org/cgit/parallel.git/tree/doc/citation-notice -faq.txt # https://git.savannah.gnu.org/cgit/parallel.git/tree/doc/citation-notice -faq.txt
"This helps funding further development; AND IT WON'T COST YOU A CENT.", "This helps funding further development; AND IT WON'T COST YOU A CENT.",
"If you pay 10000 EUR you should feel free to use GNU Parallel without ci ting.", "If you pay 10000 EUR you should feel free to use GNU Parallel without ci ting.",
"", "",
"More about funding GNU Parallel and the citation notice:", "More about funding GNU Parallel and the citation notice:",
"https://lists.gnu.org/archive/html/parallel/2013-11/msg00006.html", "https://lists.gnu.org/archive/html/parallel/2013-11/msg00006.html",
"https://www.gnu.org/software/parallel/parallel_design.html#Citation-noti ce", "https://www.gnu.org/software/parallel/parallel_design.html#Citation-noti ce",
skipping to change at line 6253 skipping to change at line 6284
$self->{'jobs_completed'}++; $self->{'jobs_completed'}++;
$Global::total_completed++; $Global::total_completed++;
} }
sub set_max_jobs_running($$) { sub set_max_jobs_running($$) {
my $self = shift; my $self = shift;
if(defined $self->{'max_jobs_running'}) { if(defined $self->{'max_jobs_running'}) {
$Global::max_jobs_running -= $self->{'max_jobs_running'}; $Global::max_jobs_running -= $self->{'max_jobs_running'};
} }
$self->{'max_jobs_running'} = shift; $self->{'max_jobs_running'} = shift;
if(defined $self->{'max_jobs_running'}) { if(defined $self->{'max_jobs_running'}) {
# max_jobs_running could be resat if -j is a changed file # max_jobs_running could be resat if -j is a changed file
$Global::max_jobs_running += $self->{'max_jobs_running'}; $Global::max_jobs_running += $self->{'max_jobs_running'};
} }
# Initialize orig to the first non-zero value that comes around # Initialize orig to the first non-zero value that comes around
$self->{'orig_max_jobs_running'} ||= $self->{'max_jobs_running'}; $self->{'orig_max_jobs_running'} ||= $self->{'max_jobs_running'};
} }
sub memfree() { sub memfree() {
# Returns: # Returns:
skipping to change at line 7022 skipping to change at line 7054
sub cleanup() { sub cleanup() {
# Cleanup: Close the files # Cleanup: Close the files
for (values %fh) { close $_ } for (values %fh) { close $_ }
# Cleanup: Kill the children # Cleanup: Kill the children
for my $pid (@children) { for my $pid (@children) {
kill 9, $pid; kill 9, $pid;
waitpid($pid,0); waitpid($pid,0);
delete $Global::unkilled_children{$pid}; delete $Global::unkilled_children{$pid};
} }
# Cleanup: Unget the command_lines or the @args # Cleanup: Unget the command_lines or the @args
$Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->unget(@args); $Global::JobQueue->{'commandlinequeue'}{'arg_queue'}->unget(@args);
@args = (); @args = ();
$Global::JobQueue->unget(@jobs); $Global::JobQueue->unget(@jobs);
@jobs = (); @jobs = ();
} }
sub processes_available_by_system_limit($) { sub processes_available_by_system_limit($) {
# If the wanted number of processes is bigger than the system limits: # If the wanted number of processes is bigger than the system limits:
# Limit them to the system limits # Limit them to the system limits
# Limits are: File handles, number of input lines, processes, # Limits are: File handles, number of input lines, processes,
# and taking > 1 second to spawn 10 extra processes # and taking > 1 second to spawn 10 extra processes
skipping to change at line 7983 skipping to change at line 8015
package JobQueue; package JobQueue;
sub new($) { sub new($) {
my $class = shift; my $class = shift;
my $commandref = shift; my $commandref = shift;
my $read_from = shift; my $read_from = shift;
my $context_replace = shift; my $context_replace = shift;
my $max_number_of_args = shift; my $max_number_of_args = shift;
my $transfer_files = shift; my $transfer_files = shift;
my $return_files = shift; my $return_files = shift;
my $template_names = shift;
my $template_contents = shift;
my $commandlinequeue = CommandLineQueue->new my $commandlinequeue = CommandLineQueue->new
($commandref, $read_from, $context_replace, $max_number_of_args, ($commandref, $read_from, $context_replace, $max_number_of_args,
$transfer_files, $return_files); $transfer_files, $return_files, $template_names, $template_contents);
my @unget = (); my @unget = ();
return bless { return bless {
'unget' => \@unget, 'unget' => \@unget,
'commandlinequeue' => $commandlinequeue, 'commandlinequeue' => $commandlinequeue,
'this_job_no' => 0, 'this_job_no' => 0,
'total_jobs' => undef, 'total_jobs' => undef,
}, ref($class) || $class; }, ref($class) || $class;
} }
sub get($) { sub get($) {
my $self = shift; my $self = shift;
$self->{'this_job_no'}++; $self->{'this_job_no'}++;
if(@{$self->{'unget'}}) { if(@{$self->{'unget'}}) {
return shift @{$self->{'unget'}}; my $job = shift @{$self->{'unget'}};
# {%} may have changed, so flush computed values
$job && $job->flush_cache();
return $job;
} else { } else {
my $commandline = $self->{'commandlinequeue'}->get(); my $commandline = $self->{'commandlinequeue'}->get();
if(defined $commandline) { if(defined $commandline) {
return Job->new($commandline); return Job->new($commandline);
} else { } else {
$self->{'this_job_no'}--; $self->{'this_job_no'}--;
return undef; return undef;
} }
} }
} }
skipping to change at line 8128 skipping to change at line 8165
'exitsignal' => undef, 'exitsignal' => undef,
# Timestamp for timeout if any # Timestamp for timeout if any
'timeout' => undef, 'timeout' => undef,
'virgin' => 1, 'virgin' => 1,
# Output used for SQL and CSV-output # Output used for SQL and CSV-output
'output' => { 1 => [], 2 => [] }, 'output' => { 1 => [], 2 => [] },
'halfline' => { 1 => [], 2 => [] }, 'halfline' => { 1 => [], 2 => [] },
}, ref($class) || $class; }, ref($class) || $class;
} }
sub flush_cache($) {
my $self = shift;
$self->{'commandline'}->flush_cache();
}
sub replaced($) { sub replaced($) {
my $self = shift; my $self = shift;
$self->{'commandline'} or ::die_bug("commandline empty"); $self->{'commandline'} or ::die_bug("commandline empty");
return $self->{'commandline'}->replaced(); return $self->{'commandline'}->replaced();
} }
sub seq($) { sub seq($) {
my $self = shift; my $self = shift;
return $self->{'commandline'}->seq(); return $self->{'commandline'}->seq();
} }
skipping to change at line 9383 skipping to change at line 9425
($pre ($pre
. "$sshcmd $serverlogin -- exec " . "$sshcmd $serverlogin -- exec "
. $quoted_remote_command . $quoted_remote_command
. ";" . ";"
. $post); . $post);
} }
} }
return $self->{'sshlogin_wrap'}{$command}; return $self->{'sshlogin_wrap'}{$command};
} }
sub fill_templates($) {
# Replace replacement strings in template(s)
# Returns:
# @templates - File names of replaced templates
my $self = shift;
if(%opt::template) {
my @template_name =
map { $self->{'commandline'}->replace_placeholders([$_],0,0) }
@{$self->{'commandline'}{'template_names'}};
::debug("tmpl","Names: @template_name\n");
for(my $i = 0; $i <= $#template_name; $i++) {
open(my $fh, ">", $template_name[$i]) || die;
print $fh $self->{'commandline'}->
replace_placeholders([$self->{'commandline'}{'template_contents'}
[$i]],0,0);
close $fh;
}
if($opt::cleanup) {
$self->add_rm(@template_name);
}
}
}
sub filter($) {
# Replace replacement strings in filter(s) and evaluate them
# Returns:
# $run - 1=yes, undef=no
my $self = shift;
my $run = 1;
if(@opt::filter) {
for my $eval ($self->{'commandline'}->
replace_placeholders(\@opt::filter,0,0)) {
$run &&= eval $eval;
}
}
return $run;
}
sub transfer($) { sub transfer($) {
# Files to transfer # Files to transfer
# Non-quoted and with {...} substituted # Non-quoted and with {...} substituted
# Returns: # Returns:
# @transfer - File names of files to transfer # @transfer - File names of files to transfer
my $self = shift; my $self = shift;
my $transfersize = 0; my $transfersize = 0;
my @transfer = $self->{'commandline'}-> my @transfer = $self->{'commandline'}->
replace_placeholders($self->{'commandline'}{'transfer_files'},0,0); replace_placeholders($self->{'commandline'}{'transfer_files'},0,0);
skipping to change at line 9759 skipping to change at line 9839
my $job = shift; my $job = shift;
# Get the shell command to be executed (possibly with ssh infront). # Get the shell command to be executed (possibly with ssh infront).
my $command = $job->wrapped(); my $command = $job->wrapped();
my $pid; my $pid;
if($Global::interactive or $Global::stderr_verbose) { if($Global::interactive or $Global::stderr_verbose) {
$job->interactive_start(); $job->interactive_start();
} }
# Must be run after $job->interactive_start(): # Must be run after $job->interactive_start():
# $job->interactive_start() may call $job->skip() # $job->interactive_start() may call $job->skip()
if($job->{'commandline'}{'skip'}) { if($job->{'commandline'}{'skip'}
# $job->skip() was called or
not $job->filter()) {
# $job->skip() was called or job filtered
$command = "true"; $command = "true";
} }
$job->openoutputfiles(); $job->openoutputfiles();
$job->print_verbose_dryrun(); $job->print_verbose_dryrun();
my($stdout_fh,$stderr_fh) = ($job->fh(1,"w"),$job->fh(2,"w")); my($stdout_fh,$stderr_fh) = ($job->fh(1,"w"),$job->fh(2,"w"));
if($opt::dryrun or $opt::sqlmaster) { $command = "true"; } if($opt::dryrun or $opt::sqlmaster) { $command = "true"; }
$ENV{'PARALLEL_SEQ'} = $job->seq(); $ENV{'PARALLEL_SEQ'} = $job->seq();
$ENV{'PARALLEL_PID'} = $$; $ENV{'PARALLEL_PID'} = $$;
$ENV{'PARALLEL_JOBSLOT'} = $job->slot(); $ENV{'PARALLEL_JOBSLOT'} = $job->slot();
$ENV{'PARALLEL_TMP'} = ::tmpname("par"); $ENV{'PARALLEL_TMP'} = ::tmpname("par");
$job->add_rm($ENV{'PARALLEL_TMP'}); $job->add_rm($ENV{'PARALLEL_TMP'});
$job->fill_templates();
::debug("run", $Global::total_running, " processes . Starting (", ::debug("run", $Global::total_running, " processes . Starting (",
$job->seq(), "): $command\n"); $job->seq(), "): $command\n");
if($opt::pipe) { if($opt::pipe) {
my ($stdin_fh) = ::gensym(); my ($stdin_fh) = ::gensym();
$pid = open3_setpgrp($stdin_fh,$stdout_fh,$stderr_fh,$command); $pid = open3_setpgrp($stdin_fh,$stdout_fh,$stderr_fh,$command);
if($opt::roundrobin and not $opt::keeporder) { if($opt::roundrobin and not $opt::keeporder) {
# --keep-order will make sure the order will be reproducible # --keep-order will make sure the order will be reproducible
::set_fh_non_blocking($stdin_fh); ::set_fh_non_blocking($stdin_fh);
} }
$job->set_fh(0,"w",$stdin_fh); $job->set_fh(0,"w",$stdin_fh);
skipping to change at line 10674 skipping to change at line 10757
sub new($) { sub new($) {
my $class = shift; my $class = shift;
my $seq = shift; my $seq = shift;
my $commandref = shift; my $commandref = shift;
$commandref || die; $commandref || die;
my $arg_queue = shift; my $arg_queue = shift;
my $context_replace = shift; my $context_replace = shift;
my $max_number_of_args = shift; # for -N and normal (-n1) my $max_number_of_args = shift; # for -N and normal (-n1)
my $transfer_files = shift; my $transfer_files = shift;
my $return_files = shift; my $return_files = shift;
my $template_names = shift;
my $template_contents = shift;
my $replacecount_ref = shift; my $replacecount_ref = shift;
my $len_ref = shift; my $len_ref = shift;
my %replacecount = %$replacecount_ref; my %replacecount = %$replacecount_ref;
my %len = %$len_ref; my %len = %$len_ref;
for (keys %$replacecount_ref) { for (keys %$replacecount_ref) {
# Total length of this replacement string {} replaced with all args # Total length of this replacement string {} replaced with all args
$len{$_} = 0; $len{$_} = 0;
} }
return bless { return bless {
'command' => $commandref, 'command' => $commandref,
skipping to change at line 10695 skipping to change at line 10780
'len' => \%len, 'len' => \%len,
'arg_list' => [], 'arg_list' => [],
'arg_list_flat' => [], 'arg_list_flat' => [],
'arg_list_flat_orig' => [undef], 'arg_list_flat_orig' => [undef],
'arg_queue' => $arg_queue, 'arg_queue' => $arg_queue,
'max_number_of_args' => $max_number_of_args, 'max_number_of_args' => $max_number_of_args,
'replacecount' => \%replacecount, 'replacecount' => \%replacecount,
'context_replace' => $context_replace, 'context_replace' => $context_replace,
'transfer_files' => $transfer_files, 'transfer_files' => $transfer_files,
'return_files' => $return_files, 'return_files' => $return_files,
'template_names' => $template_names,
'template_contents' => $template_contents,
'replaced' => undef, 'replaced' => undef,
}, ref($class) || $class; }, ref($class) || $class;
} }
sub flush_cache() {
my $self = shift;
for my $arglist (@{$self->{'arg_list'}}) {
for my $arg (@$arglist) {
$arg->flush_cache();
}
}
$self->{'arg_queue'}->flush_cache();
$self->{'replaced'} = undef;
}
sub seq($) { sub seq($) {
my $self = shift; my $self = shift;
return $self->{'seq'}; return $self->{'seq'};
} }
sub set_seq($$) { sub set_seq($$) {
my $self = shift; my $self = shift;
$self->{'seq'} = shift; $self->{'seq'} = shift;
} }
skipping to change at line 11383 skipping to change at line 11481
package CommandLineQueue; package CommandLineQueue;
sub new($) { sub new($) {
my $class = shift; my $class = shift;
my $commandref = shift; my $commandref = shift;
my $read_from = shift; my $read_from = shift;
my $context_replace = shift || 0; my $context_replace = shift || 0;
my $max_number_of_args = shift; my $max_number_of_args = shift;
my $transfer_files = shift; my $transfer_files = shift;
my $return_files = shift; my $return_files = shift;
my $template_names = shift;
my $template_contents = shift;
my @unget = (); my @unget = ();
my $posrpl; my $posrpl;
my ($replacecount_ref, $len_ref); my ($replacecount_ref, $len_ref);
my @command = @$commandref; my @command = @$commandref;
my $seq = 1; my $seq = 1;
# Replace replacement strings with {= perl expr =} # Replace replacement strings with {= perl expr =}
# '{=' 'perlexpr' '=}' => '{= perlexpr =}' # '{=' 'perlexpr' '=}' => '{= perlexpr =}'
@command = merge_rpl_parts(@command); @command = merge_rpl_parts(@command);
# Protect matching inside {= perl expr =} # Protect matching inside {= perl expr =}
# by replacing {= and =} with \257< and \257> # by replacing {= and =} with \257< and \257>
# in options that can contain replacement strings: # in options that can contain replacement strings:
# @command, --transferfile, --return, # @command, --transferfile, --return,
# --tagstring, --workdir, --results # --tagstring, --workdir, --results
for(@command, @$transfer_files, @$return_files, for(@command, @$transfer_files, @$return_files,
$opt::tagstring, $opt::workdir, $opt::results, $opt::retries) { @$template_names, @$template_contents,
$opt::tagstring, $opt::workdir, $opt::results, $opt::retries,
@opt::filter) {
# Skip if undefined # Skip if undefined
$_ or next; $_ or next;
# Escape \257 => \257\256 # Escape \257 => \257\256
$Global::escape_string_present += s/\257/\257\256/g; $Global::escape_string_present += s/\257/\257\256/g;
# Needs to match rightmost left parens (Perl defaults to leftmost) # Needs to match rightmost left parens (Perl defaults to leftmost)
# to deal with: {={==} and {={==}=} # to deal with: {={==} and {={==}=}
# Replace {= -> \257< and =} -> \257> # Replace {= -> \257< and =} -> \257>
# #
# Complex way to do: # Complex way to do:
# s/{=(.*)=}/\257<$1\257>/g # s/{=(.*)=}/\257<$1\257>/g
skipping to change at line 11500 skipping to change at line 11602
for(my $i = 1;defined $grp[$i]; $i++) { for(my $i = 1;defined $grp[$i]; $i++) {
$set_args .= "\$_pAr_gRp$i = \"" . $set_args .= "\$_pAr_gRp$i = \"" .
::perl_quote_scalar($grp[$i]) . "\";"; ::perl_quote_scalar($grp[$i]) . "\";";
} }
$unchanged . "\257<" . $position . $set_args . $rv . "\257> " $unchanged . "\257<" . $position . $set_args . $rv . "\257> "
}gxes) { }gxes) {
} }
} }
} }
} }
# Add {} if no replacement strings in @command # Add {} if no replacement strings in @command
($replacecount_ref, $len_ref, @command) = ($replacecount_ref, $len_ref, @command) =
replacement_counts_and_lengths($transfer_files,$return_files,@command); replacement_counts_and_lengths($transfer_files, $return_files,
$template_names, $template_contents,
@command);
if("@command" =~ /^[^ \t\n=]*\257</) { if("@command" =~ /^[^ \t\n=]*\257</) {
# Replacement string is (part of) the command (and not just # Replacement string is (part of) the command (and not just
# argument or variable definition V1={}) # argument or variable definition V1={})
# E.g. parallel {}, parallel my_{= s/_//=}, parallel {2} # E.g. parallel {}, parallel my_{= s/_//=}, parallel {2}
# Do no quote (Otherwise it will fail if the input contains spaces) # Do no quote (Otherwise it will fail if the input contains spaces)
$Global::quote_replace = 0; $Global::quote_replace = 0;
} }
if($opt::sqlmaster and $Global::sql->append()) { if($opt::sqlmaster and $Global::sql->append()) {
$seq = $Global::sql->max_seq() + 1; $seq = $Global::sql->max_seq() + 1;
} }
return bless { return bless {
'unget' => \@unget, ('unget' => \@unget,
'command' => \@command, 'command' => \@command,
'replacecount' => $replacecount_ref, 'replacecount' => $replacecount_ref,
'arg_queue' => RecordQueue->new($read_from,$opt::colsep), 'arg_queue' => RecordQueue->new($read_from,$opt::colsep),
'context_replace' => $context_replace, 'context_replace' => $context_replace,
'len' => $len_ref, 'len' => $len_ref,
'max_number_of_args' => $max_number_of_args, 'max_number_of_args' => $max_number_of_args,
'size' => undef, 'size' => undef,
'transfer_files' => $transfer_files, 'transfer_files' => $transfer_files,
'return_files' => $return_files, 'return_files' => $return_files,
'seq' => $seq, 'template_names' => $template_names,
'template_contents' => $template_contents,
'seq' => $seq,
)
}, ref($class) || $class; }, ref($class) || $class;
} }
sub merge_rpl_parts($) { sub merge_rpl_parts($) {
# '{=' 'perlexpr' '=}' => '{= perlexpr =}' # '{=' 'perlexpr' '=}' => '{= perlexpr =}'
# Input: # Input:
# @in = the @command as given by the user # @in = the @command as given by the user
# Uses: # Uses:
# $Global::parensleft # $Global::parensleft
# $Global::parensright # $Global::parensright
skipping to change at line 11575 skipping to change at line 11681
sub replacement_counts_and_lengths($$@) { sub replacement_counts_and_lengths($$@) {
# Count the number of different replacement strings. # Count the number of different replacement strings.
# Find the lengths of context for context groups and non-context # Find the lengths of context for context groups and non-context
# groups. # groups.
# If no {} found in @command: add it to @command # If no {} found in @command: add it to @command
# #
# Input: # Input:
# \@transfer_files = array of filenames to transfer # \@transfer_files = array of filenames to transfer
# \@return_files = array of filenames to return # \@return_files = array of filenames to return
# \@template_names = array of names to copy to
# \@template_contents = array of contents to write
# @command = command template # @command = command template
# Output: # Output:
# \%replacecount, \%len, @command # \%replacecount, \%len, @command
my $transfer_files = shift; my $transfer_files = shift;
my $return_files = shift; my $return_files = shift;
my $template_names = shift;
my $template_contents = shift;
my @command = @_; my @command = @_;
my (%replacecount,%len); my (%replacecount,%len);
my $sum = 0; my $sum = 0;
while($sum == 0) { while($sum == 0) {
# Count how many times each replacement string is used # Count how many times each replacement string is used
my @cmd = @command; my @cmd = @command;
my $contextlen = 0; my $contextlen = 0;
my $noncontextlen = 0; my $noncontextlen = 0;
my $contextgroups = 0; my $contextgroups = 0;
for my $c (@cmd) { for my $c (@cmd) {
skipping to change at line 11609 skipping to change at line 11719
while($c =~ s/ (\S*\000\S*) //xs) { while($c =~ s/ (\S*\000\S*) //xs) {
my $w = $1; my $w = $1;
$w =~ tr/\000//d; # Remove all \000's $w =~ tr/\000//d; # Remove all \000's
$contextlen += length($w); $contextlen += length($w);
$contextgroups++; $contextgroups++;
} }
# All {= perl expr =} have been removed: The rest is non-context # All {= perl expr =} have been removed: The rest is non-context
$noncontextlen += length $c; $noncontextlen += length $c;
} }
for(@$transfer_files, @$return_files, for(@$transfer_files, @$return_files,
@$template_names, @$template_contents,
@opt::filter,
$opt::tagstring, $opt::workdir, $opt::results, $opt::retries) { $opt::tagstring, $opt::workdir, $opt::results, $opt::retries) {
# Options that can contain replacement strings # Options that can contain replacement strings
$_ or next; $_ or next;
my $t = $_; my $t = $_;
while($t =~ s/ \257<( (?: [^\257]*|[\257][^<>] )* )\257> //xs) { while($t =~ s/ \257<( (?: [^\257]*|[\257][^<>] )* )\257> //xs) {
# %replacecount = { "perlexpr" => number of times seen } # %replacecount = { "perlexpr" => number of times seen }
# e.g { "$_++" => 2 } # e.g { "$_++" => 2 }
# But for tagstring we just need to mark it as seen # But for tagstring we just need to mark it as seen
$replacecount{$1} ||= 1; $replacecount{$1} ||= 1;
} }
skipping to change at line 11667 skipping to change at line 11779
# Get the sequence number from the SQL table # Get the sequence number from the SQL table
$self->set_seq($SQL::next_seq); $self->set_seq($SQL::next_seq);
# Get the command from the SQL table # Get the command from the SQL table
$self->{'command'} = $SQL::command_ref; $self->{'command'} = $SQL::command_ref;
my @command; my @command;
# Recompute replace counts based on the read command # Recompute replace counts based on the read command
($self->{'replacecount'}, ($self->{'replacecount'},
$self->{'len'}, @command) = $self->{'len'}, @command) =
replacement_counts_and_lengths($self->{'transfer_files'}, replacement_counts_and_lengths($self->{'transfer_files'},
$self->{'return_files'}, $self->{'return_files'},
$self->{'template_name'},
$self->{'template_contents'},
@$SQL::command_ref); @$SQL::command_ref);
if("@command" =~ /^[^ \t\n=]*\257</) { if("@command" =~ /^[^ \t\n=]*\257</) {
# Replacement string is (part of) the command (and not just # Replacement string is (part of) the command (and not just
# argument or variable definition V1={}) # argument or variable definition V1={})
# E.g. parallel {}, parallel my_{= s/_//=}, parallel {2} # E.g. parallel {}, parallel my_{= s/_//=}, parallel {2}
# Do no quote (Otherwise it will fail if the input contains space s) # Do no quote (Otherwise it will fail if the input contains space s)
$Global::quote_replace = 0; $Global::quote_replace = 0;
} }
} }
my $cmd_line = CommandLine->new($self->seq(), my $cmd_line = CommandLine->new($self->seq(),
$self->{'command'}, $self->{'command'},
$self->{'arg_queue'}, $self->{'arg_queue'},
$self->{'context_replace'}, $self->{'context_replace'},
$self->{'max_number_of_args'}, $self->{'max_number_of_args'},
$self->{'transfer_files'}, $self->{'transfer_files'},
$self->{'return_files'}, $self->{'return_files'},
$self->{'template_names'},
$self->{'template_contents'},
$self->{'replacecount'}, $self->{'replacecount'},
$self->{'len'}, $self->{'len'},
); );
$cmd_line->populate(); $cmd_line->populate();
::debug("run","cmd_line->number_of_args ", ::debug("run","cmd_line->number_of_args ",
$cmd_line->number_of_args(), "\n"); $cmd_line->number_of_args(), "\n");
if(not $Global::no_more_input and ($opt::pipe or $opt::pipepart)) { if(not $Global::no_more_input and ($opt::pipe or $opt::pipepart)) {
if($cmd_line->replaced() eq "") { if($cmd_line->replaced() eq "") {
# Empty command - pipe requires a command # Empty command - pipe requires a command
::error("--pipe/--pipepart must have a command to pipe into ". ::error("--pipe/--pipepart must have a command to pipe into ".
skipping to change at line 11964 skipping to change at line 12080
} }
sub empty($) { sub empty($) {
my $self = shift; my $self = shift;
my $empty = (not @{$self->{'unget'}}) && my $empty = (not @{$self->{'unget'}}) &&
$self->{'arg_sub_queue'}->empty(); $self->{'arg_sub_queue'}->empty();
::debug("run", "RecordQueue->empty $empty"); ::debug("run", "RecordQueue->empty $empty");
return $empty; return $empty;
} }
sub flush_cache($) {
my $self = shift;
for my $record (@{$self->{'unget'}}) {
for my $arg (@$record) {
$arg->flush_cache();
}
}
$self->{'arg_sub_queue'}->flush_cache();
}
sub arg_number($) { sub arg_number($) {
my $self = shift; my $self = shift;
return $self->{'arg_number'}; return $self->{'arg_number'};
} }
package RecordColQueue; package RecordColQueue;
sub new($) { sub new($) {
my $class = shift; my $class = shift;
my $fhs = shift; my $fhs = shift;
skipping to change at line 11989 skipping to change at line 12115
}, ref($class) || $class; }, ref($class) || $class;
} }
sub get($) { sub get($) {
# Returns: # Returns:
# reference to array of Arg-objects # reference to array of Arg-objects
my $self = shift; my $self = shift;
if(@{$self->{'unget'}}) { if(@{$self->{'unget'}}) {
return shift @{$self->{'unget'}}; return shift @{$self->{'unget'}};
} }
my $unget_ref = $self->{'unget'};
if($self->{'arg_sub_queue'}->empty()) { if($self->{'arg_sub_queue'}->empty()) {
return undef; return undef;
} }
my $in_record = $self->{'arg_sub_queue'}->get(); my $in_record = $self->{'arg_sub_queue'}->get();
if(defined $in_record) { if(defined $in_record) {
my @out_record = (); my @out_record = ();
for my $arg (@$in_record) { for my $arg (@$in_record) {
::debug("run", "RecordColQueue::arg $arg\n"); ::debug("run", "RecordColQueue::arg $arg\n");
my $line = $arg->orig(); my $line = $arg->orig();
::debug("run", "line='$line'\n"); ::debug("run", "line='$line'\n");
if($line ne "") { if($line ne "") {
if($opt::csv) { if($opt::csv) {
# Parse CSV # Parse CSV and put it into a record
chomp $line; chomp $line;
if(not $Global::csv->parse($line)) { if(not $Global::csv->parse($line)) {
die "CSV has unexpected format: ^$line^"; die "CSV has unexpected format: ^$line^";
} }
for($Global::csv->fields()) { for($Global::csv->fields()) {
push @out_record, Arg->new($_); push @out_record, Arg->new($_);
} }
} else { } else {
# Split --colsep into record
for my $s (split /$opt::colsep/o, $line, -1) { for my $s (split /$opt::colsep/o, $line, -1) {
push @out_record, Arg->new($s); push @out_record, Arg->new($s);
} }
} }
} else { } else {
push @out_record, Arg->new(""); push @out_record, Arg->new("");
} }
} }
return \@out_record; return \@out_record;
} else { } else {
skipping to change at line 12039 skipping to change at line 12165
} }
sub empty($) { sub empty($) {
my $self = shift; my $self = shift;
my $empty = (not @{$self->{'unget'}}) && my $empty = (not @{$self->{'unget'}}) &&
$self->{'arg_sub_queue'}->empty(); $self->{'arg_sub_queue'}->empty();
::debug("run", "RecordColQueue->empty $empty"); ::debug("run", "RecordColQueue->empty $empty");
return $empty; return $empty;
} }
sub flush_cache($) {
my $self = shift;
for my $arg (@{$self->{'unget'}}) {
$arg->flush_cache();
}
$self->{'arg_sub_queue'}->flush_cache();
}
package SQLRecordQueue; package SQLRecordQueue;
sub new($) { sub new($) {
my $class = shift; my $class = shift;
my @unget = (); my @unget = ();
return bless { return bless {
'unget' => \@unget, 'unget' => \@unget,
}, ref($class) || $class; }, ref($class) || $class;
} }
skipping to change at line 12077 skipping to change at line 12211
if(@{$self->{'unget'}}) { return 0; } if(@{$self->{'unget'}}) { return 0; }
my $get = $self->get(); my $get = $self->get();
if(defined $get) { if(defined $get) {
$self->unget($get); $self->unget($get);
} }
my $empty = not $get; my $empty = not $get;
::debug("run", "SQLRecordQueue->empty $empty"); ::debug("run", "SQLRecordQueue->empty $empty");
return $empty; return $empty;
} }
sub flush_cache($) {
my $self = shift;
for my $record (@{$self->{'unget'}}) {
for my $arg (@$record) {
$arg->flush_cache();
}
}
}
package MultifileQueue; package MultifileQueue;
@Global::unget_argv=(); @Global::unget_argv=();
sub new($$) { sub new($$) {
my $class = shift; my $class = shift;
my $fhs = shift; my $fhs = shift;
for my $fh (@$fhs) { for my $fh (@$fhs) {
if(-t $fh and -t ($Global::status_fd || *STDERR)) { if(-t $fh and -t ($Global::status_fd || *STDERR)) {
::warning("Input is read from the terminal. You are either an expert" , ::warning("Input is read from the terminal. You are either an expert" ,
skipping to change at line 12126 skipping to change at line 12269
my $self = shift; my $self = shift;
my $empty = (not @Global::unget_argv) && my $empty = (not @Global::unget_argv) &&
not @{$self->{'unget'}}; not @{$self->{'unget'}};
for my $fh (@{$self->{'fhs'}}) { for my $fh (@{$self->{'fhs'}}) {
$empty &&= eof($fh); $empty &&= eof($fh);
} }
::debug("run", "MultifileQueue->empty $empty "); ::debug("run", "MultifileQueue->empty $empty ");
return $empty; return $empty;
} }
sub flush_cache($) {
my $self = shift;
for my $record (@{$self->{'unget'}}, @{$self->{'arg_matrix'}}) {
for my $arg (@$record) {
$arg->flush_cache();
}
}
}
sub link_get($) { sub link_get($) {
my $self = shift; my $self = shift;
if(@{$self->{'unget'}}) { if(@{$self->{'unget'}}) {
return shift @{$self->{'unget'}}; return shift @{$self->{'unget'}};
} }
my @record = (); my @record = ();
my $prepend; my $prepend;
my $empty = 1; my $empty = 1;
for my $fh (@{$self->{'fhs'}}) { for my $i (0..$#{$self->{'fhs'}}) {
my $fh = $self->{'fhs'}[$i];
my $arg = read_arg_from_fh($fh); my $arg = read_arg_from_fh($fh);
if(defined $arg) { if(defined $arg) {
# Record $arg for recycling at end of file # Record $arg for recycling at end of file
push @{$self->{'arg_matrix'}{$fh}}, $arg; push @{$self->{'arg_matrix'}[$i]}, $arg;
push @record, $arg; push @record, $arg;
$empty = 0; $empty = 0;
} else { } else {
::debug("run", "EOA "); ::debug("run", "EOA ");
# End of file: Recycle arguments # End of file: Recycle arguments
push @{$self->{'arg_matrix'}{$fh}}, shift @{$self->{'arg_matrix'}{$fh }}; push @{$self->{'arg_matrix'}[$i]}, shift @{$self->{'arg_matrix'}[$i]} ;
# return last @{$args->{'args'}{$fh}}; # return last @{$args->{'args'}{$fh}};
push @record, @{$self->{'arg_matrix'}{$fh}}[-1]; push @record, @{$self->{'arg_matrix'}[$i]}[-1];
} }
} }
if($empty) { if($empty) {
return undef; return undef;
} else { } else {
return \@record; return \@record;
} }
} }
sub nest_get($) { sub nest_get($) {
skipping to change at line 12238 skipping to change at line 12391
# get the first # get the first
if(@mapped) { if(@mapped) {
return shift @{$self->{'unget'}}; return shift @{$self->{'unget'}};
} }
} }
} }
# all are eof or at EOF string; return from the unget queue # all are eof or at EOF string; return from the unget queue
return shift @{$self->{'unget'}}; return shift @{$self->{'unget'}};
} }
sub read_arg_from_fh($) { {
# Read one Arg from filehandle my $cr_count = 0;
# Returns: my $nl_count = 0;
# Arg-object with one read line my $dos_crnl_determined;
# undef if end of file sub read_arg_from_fh($) {
my $fh = shift; # Read one Arg from filehandle
my $prepend; # Returns:
my $arg; # Arg-object with one read line
my $half_record = 0; # undef if end of file
do {{ my $fh = shift;
# This makes 10% faster my $prepend;
if(not defined ($arg = <$fh>)) { my $arg;
if(defined $prepend) { my $half_record = 0;
return Arg->new($prepend); do {{
} else { # This makes 10% faster
return undef; if(not defined ($arg = <$fh>)) {
if(defined $prepend) {
return Arg->new($prepend);
} else {
return undef;
}
} }
} if(not $dos_crnl_determined and not $opt::d) {
if($opt::csv) { # Warn if input has CR-NL and -d is not set
# We need to read a full CSV line. if($arg =~ /\r$/) {
if(($arg =~ y/"/"/) % 2 ) { $cr_count++;
# The number of " on the line is uneven: } else {
# If we were in a half_record => we have a full record now $nl_count++;
# If we were ouside a half_record => we are in a half record now }
$half_record = not $half_record; if($cr_count == 3 or $nl_count == 3) {
} $dos_crnl_determined = 1;
if($half_record) { if($nl_count == 0 and $cr_count == 3) {
# CSV half-record with quoting: ::warning('The first three values end in CR-NL. Consider
# col1,"col2 2""x3"" board newline <-this one using -d "\r\n"');
# cont",col3 }
$prepend .= $arg; }
redo; }
} else { if($opt::csv) {
# Now we have a full CSV record # We need to read a full CSV line.
if(($arg =~ y/"/"/) % 2 ) {
# The number of " on the line is uneven:
# If we were in a half_record => we have a full record now
# If we were ouside a half_record => we are in a half record
now
$half_record = not $half_record;
}
if($half_record) {
# CSV half-record with quoting:
# col1,"col2 2""x3"" board newline <-this one
# cont",col3
$prepend .= $arg;
redo;
} else {
# Now we have a full CSV record
}
}
# Remove delimiter
chomp $arg;
if($Global::end_of_file_string and
$arg eq $Global::end_of_file_string) {
# Ignore the rest of input file
close $fh;
::debug("run", "EOF-string ($arg) met\n");
if(defined $prepend) {
return Arg->new($prepend);
} else {
return undef;
}
} }
}
# Remove delimiter
chomp $arg;
if($Global::end_of_file_string and
$arg eq $Global::end_of_file_string) {
# Ignore the rest of input file
close $fh;
::debug("run", "EOF-string ($arg) met\n");
if(defined $prepend) { if(defined $prepend) {
return Arg->new($prepend); $arg = $prepend.$arg; # For line continuation
} else { undef $prepend;
return undef;
} }
} if($Global::ignore_empty) {
if(defined $prepend) { if($arg =~ /^\s*$/) {
$arg = $prepend.$arg; # For line continuation redo; # Try the next line
undef $prepend; }
}
if($Global::ignore_empty) {
if($arg =~ /^\s*$/) {
redo; # Try the next line
} }
} if($Global::max_lines) {
if($Global::max_lines) { if($arg =~ /\s$/) {
if($arg =~ /\s$/) { # Trailing space => continued on next line
# Trailing space => continued on next line $prepend = $arg;
$prepend = $arg; redo;
redo; }
} }
}} while (1 == 0); # Dummy loop {{}} for redo
if(defined $arg) {
return Arg->new($arg);
} else {
::die_bug("multiread arg undefined");
} }
}} while (1 == 0); # Dummy loop {{}} for redo
if(defined $arg) {
return Arg->new($arg);
} else {
::die_bug("multiread arg undefined");
} }
} }
# Prototype forwarding # Prototype forwarding
sub expand_combinations(@); sub expand_combinations(@);
sub expand_combinations(@) { sub expand_combinations(@) {
# Input: # Input:
# ([xmin,xmax], [ymin,ymax], ...) # ([xmin,xmax], [ymin,ymax], ...)
# Returns: ([x,y,...],[x,y,...]) # Returns: ([x,y,...],[x,y,...])
# where xmin <= x <= xmax and ymin <= y <= ymax # where xmin <= x <= xmax and ymin <= y <= ymax
skipping to change at line 13411 skipping to change at line 13583
$Global::start_sqlworker = 1; $Global::start_sqlworker = 1;
$opt::sqlworker = undef; $opt::sqlworker = undef;
} }
if($opt::nonall or $opt::onall) { if($opt::nonall or $opt::onall) {
onall(\@input_source_fh,@command); onall(\@input_source_fh,@command);
wait_and_exit(min(undef_as_zero($Global::exitstatus),254)); wait_and_exit(min(undef_as_zero($Global::exitstatus),254));
} }
$Global::JobQueue = JobQueue->new( $Global::JobQueue = JobQueue->new(
\@command,\@input_source_fh,$Global::ContextReplace, \@command, \@input_source_fh, $Global::ContextReplace,
$number_of_args,\@Global::transfer_files,\@Global::ret_files); $number_of_args, \@Global::transfer_files, \@Global::ret_files,
\@Global::template_names, \@Global::template_contents
);
if($opt::sqlmaster) { if($opt::sqlmaster) {
# Create SQL table to hold joblog + output # Create SQL table to hold joblog + output
# Figure out how many arguments are in a job # Figure out how many arguments are in a job
# (It is affected by --colsep, -N, $number_source_fh) # (It is affected by --colsep, -N, $number_source_fh)
my $record_queue = $Global::JobQueue->{'commandlinequeue'}{'arg_queue'}; my $record_queue = $Global::JobQueue->{'commandlinequeue'}{'arg_queue'};
my $record = $record_queue->get(); my $record = $record_queue->get();
my $no_of_values = $number_of_args * (1+$#{$record}); my $no_of_values = $number_of_args * (1+$#{$record});
$record_queue->unget($record); $record_queue->unget($record);
$Global::sql->create_table($no_of_values); $Global::sql->create_table($no_of_values);
skipping to change at line 13450 skipping to change at line 13624
} }
if($opt::eta or $opt::bar or $opt::shuf or $Global::halt_pct) { if($opt::eta or $opt::bar or $opt::shuf or $Global::halt_pct) {
# Count the number of jobs or shuffle all jobs # Count the number of jobs or shuffle all jobs
# before starting any. # before starting any.
# Must be done after ungetting any --pipepart jobs. # Must be done after ungetting any --pipepart jobs.
$Global::JobQueue->total_jobs(); $Global::JobQueue->total_jobs();
} }
# Compute $Global::max_jobs_running # Compute $Global::max_jobs_running
# Must be done after ungetting any --pipepart jobs. # Must be done after ungetting any --pipepart jobs.
max_jobs_running(); max_jobs_running();
init_run_jobs(); init_run_jobs();
my $sem; my $sem;
if($Global::semaphore) { if($Global::semaphore) {
$sem = acquire_semaphore(); $sem = acquire_semaphore();
} }
$SIG{TERM} = $Global::original_sig{TERM}; $SIG{TERM} = $Global::original_sig{TERM};
$SIG{HUP} = \&start_no_new_jobs; $SIG{HUP} = \&start_no_new_jobs;
if($opt::tee or $opt::shard or $opt::bin) { if($opt::tee or $opt::shard or $opt::bin) {
# All jobs must be running in parallel for --tee/--shard/--bin # All jobs must be running in parallel for --tee/--shard/--bin
skipping to change at line 13481 skipping to change at line 13656
} else { } else {
::die_bug("--bin/--shard/--tee should not get here"); ::die_bug("--bin/--shard/--tee should not get here");
} }
::wait_and_exit(255); ::wait_and_exit(255);
} }
} elsif($opt::pipe and not $opt::pipepart) { } elsif($opt::pipe and not $opt::pipepart) {
# Fill all jobslots # Fill all jobslots
while(start_more_jobs()) {} while(start_more_jobs()) {}
spreadstdin(); spreadstdin();
} else { } else {
# Reap the finished jobs - start more # Reap the finished jobs and start more
while(reapers() + start_more_jobs()) {} while(reapers() + start_more_jobs()) {}
} }
::debug("init", "Start draining\n"); ::debug("init", "Start draining\n");
drain_job_queue(@command); drain_job_queue(@command);
::debug("init", "Done draining\n"); ::debug("init", "Done draining\n");
reapers(); reapers();
::debug("init", "Done reaping\n"); ::debug("init", "Done reaping\n");
if($Global::semaphore) { if($Global::semaphore) {
$sem->release(); $sem->release();
} }
 End of changes. 70 change blocks. 
125 lines changed or deleted 307 lines changed or added

Home  |  About  |  Features  |  All  |  Newest  |  Dox  |  Diffs  |  RSS Feeds  |  Screenshots  |  Comments  |  Imprint  |  Privacy  |  HTTP(S)