"Fossies" - the Fresh Open Source Software Archive

Member "munin-2.0.67/master/lib/Munin/Master/Node.pm" (22 Feb 2021, 23038 Bytes) of package /linux/misc/munin-2.0.67.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Perl source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "Node.pm" see the Fossies "Dox" file reference documentation and the latest Fossies "Diffs" side-by-side code changes report: 2.0.66_vs_2.0.67.

    1 package Munin::Master::Node;
    2 
    3 # This module is used by UpdateWorker to keep in touch with a node and
    4 # parse some of the output.
    5 
    6 use warnings;
    7 use strict;
    8 
    9 use Carp;
   10 use Munin::Master::Config;
   11 use Munin::Common::Timeout;
   12 use Munin::Common::TLSClient;
   13 use Data::Dumper;
   14 use Log::Log4perl qw( :easy );
   15 use Time::HiRes qw( gettimeofday tv_interval );
   16 use IO::Socket::INET6;
   17 
   18 # Used as a timestamp value, this declares none was found
   19 use constant NO_TIMESTAMP => -1;
   20 
   21 my $config = Munin::Master::Config->instance()->{config};
   22 
   23 # Quick version, to enable "DEBUG ... if $debug" constructs
   24 my $debug = $config->{debug};
   25 
   26 # Note: This timeout governs both small commands and waiting for the total
   27 # output of a plugin.  It is reset for each read.
   28 
   29 sub new {
   30     my ($class, $address, $port, $host, $configref) = @_;
   31 
   32     my $self = {
   33         address => $address,
   34         port    => $port,
   35         host    => $host,
   36         tls     => undef,
   37         reader  => undef,
   38         pid     => undef,
   39         writer  => undef,
   40         master_capabilities => "multigraph dirtyconfig",
   41         io_timeout => 120,
   42     configref => $configref,
   43     };
   44 
   45     return bless $self, $class;
   46 }
   47 
   48 
   49 sub do_in_session {
   50     my ($self, $block) = @_;
   51 
   52     if ($self->_do_connect()) {
   53     $self->_run_starttls_if_required();
   54     my $exit_value = $block->();
   55     $self->_do_close();
   56     return { exit_value => $exit_value }; # If we're still here
   57     }
   58     return 0;  # _do_connect failed.
   59 }
   60 
   61 
   62 sub _do_connect {
   63     # Connect to a munin node.  Return false if not, true otherwise.
   64     my ($self) = @_;
   65 
   66     LOGCROAK("[FATAL] No address!  Did you forget to set 'update no' or to set 'address <IP>' ?")
   67     if !defined($self->{address});
   68 
   69     # Check if it's an URI or a plain host
   70     use URI;
   71 
   72     # Parameters are space-separated from the main address
   73     my ($url, $params) = split(/ +/, $self->{address}, 2);
   74     my $uri = new URI($url);
   75 
   76     # If address is only "ssh://host/" $params will not get set
   77     $params = "" unless defined $params;
   78 
   79     # If the scheme is not defined, it's a plain host. 
   80     # Prefix it with munin:// to be able to parse it like others
   81     $uri = new URI("munin://" . $url) unless $uri->scheme;
   82     LOGCROAK("[FATAL] '$url' is not a valid address!") unless $uri->scheme;
   83 
   84     if ($uri->scheme eq "munin") {
   85         $self->{reader} = $self->{writer} = IO::Socket::INET6->new(
   86         PeerAddr  => $uri->host,
   87         PeerPort  => $self->{port} || 4949,
   88         LocalAddr => $config->{local_address},
   89         Proto     => 'tcp', 
   90         MultiHomed => 1,
   91         Timeout   => $config->{timeout}
   92     );
   93     unless ($self->{reader} && defined $self->{reader}->connected()) {
   94         ERROR "Failed to connect to node $self->{address}:$self->{port}/tcp : $!";
   95         return 0;
   96     }
   97     } elsif ($uri->scheme eq "ssh") {
   98         my $ssh_command = sprintf("%s %s", $config->{ssh_command}, $config->{ssh_options});
   99         my $user_part = ($uri->user) ? ($uri->user . "@") : "";
  100         my $remote_cmd = ($uri->path ne '/') ? $uri->path : "";
  101 
  102         # Add any parameter to the cmd
  103         my $remote_connection_cmd = $ssh_command . " -p " . $uri->port . " " . $user_part . $uri->host . " " . $remote_cmd . " " . $params;
  104 
  105         # Open a triple pipe
  106         use IPC::Open3;
  107 
  108         $self->{reader} = new IO::Handle();
  109         $self->{writer} = new IO::Handle();
  110         $self->{stderr} = new IO::Handle();
  111 
  112         DEBUG "[DEBUG] open3($remote_connection_cmd)";
  113         $self->{pid} = open3($self->{writer}, $self->{reader}, $self->{stderr}, $remote_connection_cmd);
  114             ERROR "Failed to connect to node $self->{address} : $!" unless $self->{pid};
  115     } elsif ($uri->scheme eq "cmd") {
  116         # local commands should ignore the username, url and host
  117         my $local_cmd = $uri->path;
  118         my $local_pipe_cmd = "$local_cmd $params";
  119 
  120         # Open a triple pipe
  121         use IPC::Open3;
  122 
  123         $self->{reader} = new IO::Handle();
  124         $self->{writer} = new IO::Handle();
  125         $self->{stderr} = new IO::Handle();
  126 
  127         DEBUG "[DEBUG] open3($local_pipe_cmd)";
  128         $self->{pid} = open3($self->{writer}, $self->{reader}, $self->{stderr}, $local_pipe_cmd);
  129             ERROR "Failed to execute local command: $!" unless $self->{pid};
  130     } else {
  131         ERROR "Unknown scheme : " . $uri->scheme;
  132         return 0;
  133     }
  134 
  135     # check all the lines until we find one that matches the expected
  136     # greeting; ignore anything that doesn't look like it as long as
  137     # there is output. This allows to accept SSH connections where
  138     # lastlog or motd is used.
  139     until(defined($self->{node_name})) {
  140     my $greeting = $self->_node_read_single();
  141     if (!$greeting) {
  142         die "[ERROR] Got unknown reply from node ".$self->{host}."\n";
  143     }
  144 
  145     if ($greeting =~ /\#.*(?:lrrd|munin) (?:client|node) at (\S+)/i) {
  146         $self->{node_name} = $1;
  147     }
  148     };
  149 
  150     INFO "[INFO] node $self->{host} advertised itself as $self->{node_name} instead." if $self->{node_name} && $self->{node_name} ne $self->{host};
  151 
  152     return 1;
  153 }
  154 
  155 
  156 sub _get_node_or_global_setting {
  157     my ($self, $key) = @_;
  158     return exists $self->{configref}->{$key} ? $self->{configref}->{$key} : $config->{$key};
  159 }
  160 
  161 
  162 sub _run_starttls_if_required {
  163     my ($self) = @_;
  164 
  165     # TLS should only be attempted if explicitly enabled. The default
  166     # value is therefore "disabled" (and not "auto" as before).
  167     my $tls_requirement = $self->_get_node_or_global_setting("tls");
  168     DEBUG "TLS set to \"$tls_requirement\".";
  169     return if $tls_requirement eq 'disabled';
  170     my $logger = Log::Log4perl->get_logger("Munin::Master");
  171     $self->{tls} = Munin::Common::TLSClient->new({
  172         DEBUG        => $config->{debug},
  173         logger       => sub { $logger->warn(@_) },
  174         read_fd      => fileno($self->{reader}),
  175         read_func    => sub { _node_read_single($self) },
  176         tls_ca_cert  => $self->_get_node_or_global_setting("tls_ca_certificate"),
  177         tls_cert     => $self->_get_node_or_global_setting("tls_certificate"),
  178         tls_paranoia => $tls_requirement,
  179         tls_priv     => $self->_get_node_or_global_setting("tls_private_key"),
  180         tls_vdepth   => $self->_get_node_or_global_setting("tls_verify_depth"),
  181         tls_verify   => $self->_get_node_or_global_setting("tls_verify_certificate"),
  182         tls_match    => $self->_get_node_or_global_setting("tls_match"),
  183         write_fd     => fileno($self->{writer}),
  184         write_func   => sub { _node_write_single($self, @_) },
  185     });
  186 
  187     if (!$self->{tls}->start_tls()) {
  188         $self->{tls} = undef;
  189         if ($tls_requirement eq "paranoid" or $tls_requirement eq "enabled") {
  190         die "[ERROR] Could not establish TLS connection to '$self->{address}'. Skipping.\n";
  191         }
  192     }
  193 }
  194 
  195 
  196 sub _do_close {
  197     my ($self) = @_;
  198 
  199     close $self->{reader};
  200     close $self->{writer};
  201     $self->{reader} = undef;
  202     $self->{writer} = undef;
  203 
  204     # Close stderr if needed
  205     close $self->{stderr} if $self->{stderr};
  206     $self->{stderr} = undef if $self->{stderr};
  207 
  208     # Reap the underlying process
  209     waitpid($self->{pid}, 0) if (defined $self->{pid});
  210 }
  211 
  212 
  213 sub negotiate_capabilities {
  214     my ($self) = @_;
  215     # Please note: Sone of the capabilities are asymetrical.  Each
  216     # side simply announces which capabilities they have, and then the
  217     # other takes advantage of the capabilities it understands (or
  218     # dumbs itself down to the counterparts level of sophistication).
  219 
  220     DEBUG "[DEBUG] Negotiating capabilities\n";
  221 
  222     $self->_node_write_single("cap $self->{master_capabilities}\n");
  223     my $cap = $self->_node_read_single();
  224 
  225     if (index($cap, 'cap ') == -1) {
  226         return ('NA');
  227     }
  228 
  229     my @node_capabilities = split(/\s+/,$cap);
  230     shift @node_capabilities ; # Get rid of leading "cap".
  231 
  232     DEBUG "[DEBUG] Node says /$cap/\n";
  233 
  234     return @node_capabilities;
  235 }
  236 
  237 
  238 sub list_plugins {
  239     my ($self) = @_;
  240 
  241     # Check for one on this node- if not, use the global one
  242     my $use_node_name = defined($self->{configref}{use_node_name})
  243         ? $self->{configref}{use_node_name}
  244         : $config->{use_node_name};
  245     my $host = $use_node_name
  246         ? $self->{node_name}
  247         : $self->{host};
  248 
  249     my $use_default_node = defined($self->{configref}{use_default_node})
  250         ? $self->{configref}{use_default_node}
  251         : $config->{use_default_node};
  252 
  253     if (! $use_default_node && ! $host) {
  254     die "[ERROR] Couldn't find out which host to list on $host.\n";
  255     }
  256 
  257     my $list_host = $use_default_node ? "" : $host;
  258     $self->_node_write_single("list $list_host\n");
  259     my $list = $self->_node_read_single();
  260 
  261     if (not $list) {
  262         WARN "[WARNING] Config node $self->{host} listed no services for $host, (advertised as $self->{node_name}).  Please see http://munin-monitoring.org/wiki/FAQ_no_graphs for further information.";
  263     }
  264 
  265     return split / /, $list;
  266 }
  267 
  268 
  269 sub parse_service_config {
  270     my ($self, $service, $lines) = @_;
  271 
  272     my $errors;
  273     my $correct;
  274 
  275     my $plugin = $service;
  276 
  277     my $nodedesignation = $self->{host}."/".$self->{address}."/".$self->{port};
  278 
  279     my $global_config = {
  280     multigraph => [],
  281     };
  282     my $data_source_config = {};
  283     my @graph_order = ( );
  284 
  285     # Pascal style nested subroutine
  286     local *new_service = sub {
  287     push @{$global_config->{multigraph}}, $service;
  288     $global_config->{$service} = [];
  289     $data_source_config->{$service} = {};
  290     };
  291 
  292 
  293    local *push_graphorder = sub {
  294         my ($oldservice) = @_;
  295 
  296         # We always appends the field names in config order to any
  297         # graph_order given.
  298         # Note that this results in duplicates in the internal state
  299         # for @graph_order but munin_get_field_order() will eliminate
  300         # them before graphing.
  301 
  302         if (@graph_order) {
  303             foreach (@{$global_config->{$oldservice}}) {
  304                 if ( $_->[0] eq 'graph_order' ) {
  305                     # append to a given graph_order
  306                     $_->[1] .= join(' ', '', @graph_order);
  307                     @graph_order = ( );
  308                     return;
  309                 }
  310             }
  311             push @{$global_config->{$oldservice}}, ['graph_order', join(' ', @graph_order)];
  312         }
  313         @graph_order = ( );
  314     };
  315 
  316 
  317     DEBUG "[DEBUG] Now parsing config output from plugin $plugin on "
  318     .$self->{host};
  319 
  320     new_service($service);
  321 
  322     for my $line (@$lines) {
  323 
  324     DEBUG "[CONFIG from $plugin] $line" if $debug;
  325 
  326     if ($line =~ /\# timeout/) {
  327         die "[ERROR] Timeout error on $nodedesignation during fetch of $plugin. \n";
  328     }
  329 
  330         next unless $line;
  331         next if $line =~ /^\#/;
  332 
  333     if ($line =~ m{\A multigraph \s+ (.+) }xms) {
  334         push_graphorder($service);
  335 
  336         $service = $1;
  337 
  338         if ($service eq 'multigraph') {
  339         ERROR "[ERROR] SERVICE can't be named \"$service\" in plugin $plugin on ".$self->{host}."/".$self->{address}."/".$self->{port};
  340                 $errors++;
  341                 last;
  342         }
  343             if ($service =~ /(^\.|\.$|\.\.)/) {
  344                 ERROR "[ERROR] SERVICE \"$service\" contains dots in wrong places in plugin $plugin on ".$self->{host}."/".$self->{address}."/".$self->{port};
  345                 $errors++;
  346                 last;
  347             }
  348             if ($service !~ m/^[-\w.:.]+$/) {
  349                 ERROR "[ERROR] SERVICE \"$service\" contains weird characters in plugin $plugin on ".$self->{host}."/".$self->{address}."/".$self->{port};
  350                 $errors++;
  351                 last;
  352             }
  353         new_service($service) unless $global_config->{$service};
  354         DEBUG "[CONFIG multigraph $plugin] Service is now $service";
  355         $correct++;
  356     }
  357     elsif ($line =~ m{\A ([^\s\.]+) \s+ (.+?) \s* $}xms) {
  358         $correct++;
  359 
  360         my $label = $self->_sanitise_fieldname($1);
  361 
  362         # add to config if not already here
  363         push @{$global_config->{$service}}, [$label, $2]
  364             unless grep { $_->[0] eq $label }  @{$global_config->{$service}};
  365             DEBUG "[CONFIG graph global $plugin] $service->$label = $2" if $debug;
  366         } elsif ($line =~ m{\A ([^\.]+)\.value \s+ (.+?) \s* $}xms) {
  367         $correct++;
  368         # Special case for dirtyconfig
  369             my ($ds_name, $value, $when) = ($1, $2, NO_TIMESTAMP);
  370             
  371         $ds_name = $self->_sanitise_fieldname($ds_name);
  372         if ($value =~ /^(\d+):(.+)$/) {
  373         $when = $1;
  374         $value = $2;
  375         }
  376             DEBUG "[CONFIG dirtyconfig $plugin] Storing $value from $when in $ds_name";
  377 
  378         # Creating the datastructure if not created already
  379             $data_source_config->{$service}{$ds_name} ||= {};
  380             $data_source_config->{$service}{$ds_name}{when} ||= [];
  381             $data_source_config->{$service}{$ds_name}{value} ||= [];
  382     
  383         # Saving the timed value in the datastructure
  384         push @{$data_source_config->{$service}{$ds_name}{when}}, $when;
  385         push @{$data_source_config->{$service}{$ds_name}{value}}, $value;
  386         }
  387     elsif ($line =~ m{\A ([^\.]+)\.([^\s]+) \s+ (.+?) \s* $}xms) {
  388         $correct++;
  389         
  390             my ($ds_name, $ds_var, $ds_val) = ($1, $2, $3);
  391             $ds_name = $self->_sanitise_fieldname($ds_name);
  392             $data_source_config->{$service}{$ds_name} ||= {};
  393             $data_source_config->{$service}{$ds_name}{$ds_var} = $ds_val;
  394             DEBUG "[CONFIG dataseries $plugin] $service->$ds_name.$ds_var = $ds_val" if $debug;
  395             push ( @graph_order, $ds_name ) if $ds_var eq 'label';
  396         }
  397         elsif ($line =~ m{\A ([^\.]+)\.([^\s]+) \s* $}xms) {
  398             # the field value is empty - ignore it
  399             # see https://github.com/munin-monitoring/contrib/issues/1156#issuecomment-746884950
  400             # For example the meminfo "slab_size" graph may contain empty fields.  These should not
  401             # end up as log noise, but can be safely ignored instead.
  402             $correct++;
  403             DEBUG "[DEBUG] Ignoring field without value ('$line') from $plugin on $nodedesignation.\n";
  404         }
  405     else {
  406         $errors++;
  407         DEBUG "[DEBUG] Protocol exception: unrecognized line '$line' from $plugin on $nodedesignation.\n";
  408         }
  409     }
  410 
  411     if ($errors) {
  412     WARN "[WARNING] $errors lines had errors while $correct lines were correct in data from 'config $plugin' on $nodedesignation";
  413     }
  414 
  415     $self->_validate_data_sources($data_source_config);
  416 
  417     push_graphorder($service);
  418 
  419     return (global => $global_config, data_source => $data_source_config);
  420 }
  421 
  422 
  423 sub fetch_service_config {
  424     my ($self, $service) = @_;
  425 
  426     my $t0 = [gettimeofday];
  427 
  428     DEBUG "[DEBUG] Fetching service configuration for '$service'";
  429     $self->_node_write_single("config $service\n");
  430 
  431     # The whole config in one fell swoop.
  432     my $lines = $self->_node_read();
  433 
  434     my $elapsed = tv_interval($t0);
  435 
  436     my $nodedesignation = $self->{host}."/".$self->{address}."/".$self->{port};
  437     DEBUG "[DEBUG] config: $elapsed sec for '$service' on $nodedesignation";
  438 
  439     $service = $self->_sanitise_plugin_name($service);
  440 
  441     return $self->parse_service_config($service, $lines);
  442 }
  443 
  444 sub spoolfetch {
  445     my ($self, $timestamp) = @_;
  446 
  447     DEBUG "[DEBUG] Fetching spooled services since $timestamp (" . localtime($timestamp) . ")";
  448     $self->_node_write_single("spoolfetch $timestamp\n");
  449 
  450     # The whole stuff in one fell swoop.
  451     my $lines = $self->_node_read();
  452 
  453     # using the multigraph parsing. 
  454     # Using "__root__" as a special plugin name. 
  455     return $self->parse_service_config("__root__", $lines);
  456 }
  457 
  458 sub _validate_data_sources {
  459     my ($self, $all_data_source_config) = @_;
  460 
  461     my $nodedesignation = $self->{host}."/".$self->{address}.":".$self->{port};
  462 
  463     for my $service (keys %$all_data_source_config) {
  464     my $data_source_config = $all_data_source_config->{$service};
  465 
  466     for my $ds (keys %$data_source_config) {
  467         if (!defined $data_source_config->{$ds}{label}) {
  468         ERROR "Missing required attribute 'label' for data source '$ds' in service $service on $nodedesignation";
  469         $data_source_config->{$ds}{label} = 'No .label provided';
  470         $data_source_config->{$ds}{extinfo} = "NOTE: The plugin did not provide any label for the data source $ds.  It is in need of fixing.";
  471         }
  472     }
  473     }
  474 }
  475 
  476 
  477 sub parse_service_data {
  478     my ($self, $service, $lines) = @_;
  479 
  480     my $plugin = $service;
  481     my $errors = 0;
  482     my $correct = 0;
  483 
  484     my $nodedesignation = $self->{host}."/".$self->{address}.":".$self->{port};
  485 
  486     my %values = (
  487     $service => {},
  488     );
  489 
  490     DEBUG "[DEBUG] Now parsing fetch output from plugin $plugin on ".
  491     $nodedesignation;
  492 
  493     # every 'N' has the same value. Should not take parsing time into the equation
  494     my $now = time;
  495 
  496     for my $line (@$lines) {
  497 
  498     DEBUG "[FETCH from $plugin] $line";
  499 
  500     if ($line =~ /\# timeout/) {
  501         die "[WARNING] Timeout in fetch from '$plugin' on ".
  502         $nodedesignation;
  503     }
  504 
  505         next unless $line;
  506         next if $line =~ /^\#/;
  507 
  508     if ($line =~ m{\A multigraph \s+ (.+) }xms) {
  509         $service = $1;
  510             if ($service =~ /(^\.|\.$|\.\.)/) {
  511                 ERROR "[ERROR] SERVICE \"$service\" contains dots in wrong places in plugin $plugin on ".$self->{host}."/".$self->{address}."/".$self->{port};
  512                 $errors++;
  513                 last;
  514             }
  515             if ($service !~ m/^[-\w.:.]+$/) {
  516                 ERROR "[ERROR] SERVICE \"$service\" contains weird characters in plugin $plugin on ".$self->{host}."/".$self->{address}."/".$self->{port};
  517                 $errors++;
  518                 last;
  519             }
  520         $values{$service} = {};
  521 
  522         if ($service eq 'multigraph') {
  523                 $errors++;
  524         ERROR "[ERROR] SERVICE can't be named \"$service\" in plugin $plugin on ".
  525             $nodedesignation;
  526                 last;
  527         }
  528         $correct++;
  529     }
  530     elsif ($line =~ m{\A ([^\.]+)\.value \s+ ([\S:]+) }xms) {
  531             my ($data_source, $value, $when) = ($1, $2, $now);
  532 
  533         $correct++;
  534 
  535             $data_source = $self->_sanitise_fieldname($data_source);
  536 
  537         DEBUG "[FETCH from $plugin] Storing $value in $data_source";
  538 
  539         if ($value =~ /^(\d+):(.+)$/) {
  540         $when = $1;
  541         $value = $2;
  542         }
  543 
  544         $values{$service}{$data_source} ||= { when => [], value => [], };
  545 
  546         push @{$values{$service}{$data_source}{when}}, $when;
  547         push @{$values{$service}{$data_source}{value}}, $value;
  548         }
  549     elsif ($line =~ m{\A ([^\.]+)\.extinfo \s+ (.+?) \s* $}xms) {
  550         # Extinfo is used in munin-limits
  551             my ($data_source, $value) = ($1, $2);
  552         
  553         $correct++;
  554 
  555             $data_source = $self->_sanitise_fieldname($data_source);
  556 
  557         $values{$service}{$data_source} ||= {};
  558 
  559         $values{$service}{$data_source}{extinfo} = $value;
  560 
  561     }
  562         else {
  563         $errors++;
  564             DEBUG "[DEBUG] Protocol exception while fetching '$service' from $plugin on $nodedesignation: unrecognized line '$line'";
  565         next;
  566         }
  567     }
  568     if ($errors) {
  569     my $percent = ($errors / ($errors + $correct)) * 100; 
  570     $percent = sprintf("%.2f", $percent);
  571     WARN "[WARNING] $errors lines had errors while $correct lines were correct ($percent%) in data from 'fetch $plugin' on $nodedesignation";
  572     }
  573 
  574     return %values;
  575 }
  576 
  577 
  578 sub fetch_service_data {
  579     my ($self, $plugin) = @_;
  580 
  581     my $t0 = [gettimeofday];
  582 
  583     $self->_node_write_single("fetch $plugin\n");
  584 
  585     my $lines = $self->_node_read_fast();
  586     
  587     my $elapsed = tv_interval($t0);
  588     my $nodedesignation = $self->{host}."/".$self->{address}."/".$self->{port};
  589     DEBUG "[DEBUG] data: $elapsed sec for '$plugin' on $nodedesignation";
  590 
  591     $plugin = $self->_sanitise_plugin_name($plugin);
  592 
  593     return $self->parse_service_data($plugin, $lines);
  594 }
  595 
  596 sub quit {
  597     my ($self) = @_;
  598 
  599     my $t0 = [gettimeofday];
  600     $self->_node_write_single("quit \n");
  601     my $elapsed = tv_interval($t0);
  602     my $nodedesignation = $self->{host}."/".$self->{address}."/".$self->{port};
  603     DEBUG "[DEBUG] quit: $elapsed sec on $nodedesignation";
  604 
  605     return 1;
  606 }
  607 
  608 
  609 sub _sanitise_plugin_name {
  610     my ($self, $name) = @_;
  611 
  612     $name =~ s/[^_A-Za-z0-9]/_/g;
  613     
  614     return $name;
  615 }
  616 
  617 
  618 sub _sanitise_fieldname {
  619     # http://munin-monitoring.org/wiki/notes_on_datasource_names
  620     my ($self, $name) = @_;
  621 
  622     $name =~ s/^[^A-Za-z_]/_/;
  623     $name =~ s/[^A-Za-z0-9_]/_/g;
  624 
  625     return $name;
  626 }
  627 
  628 
  629 sub _node_write_single {
  630     my ($self, $text) = @_;
  631 
  632     DEBUG "[DEBUG] Writing to socket: \"$text\".";
  633     my $timed_out = !do_with_timeout($self->{io_timeout}, sub {
  634         if ($self->{tls} && $self->{tls}->session_started()) {
  635             $self->{tls}->write($text) or exit 9;
  636         } else {
  637             print { $self->{writer} } $text;
  638         }
  639     return 1;
  640     });
  641     if ($timed_out) {
  642         LOGCROAK "[FATAL] Socket write timed out to ".$self->{host}.
  643         ".  Terminating process.";
  644     }
  645     return 1;
  646 }
  647 
  648 
  649 sub _node_read_single {
  650     my ($self) = @_;
  651     my $res = undef;
  652 
  653     my $timed_out = !do_with_timeout($self->{io_timeout}, sub {
  654       if ($self->{tls} && $self->{tls}->session_started()) {
  655           $res = $self->{tls}->read();
  656       }
  657       else {
  658           $res = readline $self->{reader};
  659       }
  660       # Remove \r *and* \n. Normally only one, since we read line per line.
  661       $res =~ tr/\x{d}\x{a}//d if defined $res;
  662       return 1;
  663     });
  664     if ($timed_out) {
  665         LOGCROAK "[FATAL] Socket read timed out to ".$self->{host}.
  666         ".  Terminating process.";
  667     }
  668     if (!defined($res)) {
  669     # Probable socket not open.  Why are we here again then?
  670     # aren't we supposed to be in "do in session"?
  671     LOGCROAK "[FATAL] Socket read from ".$self->{host}." failed.  Terminating process.";
  672     }
  673     DEBUG "[DEBUG] Reading from socket to ".$self->{host}.": \"$res\"." if $debug;
  674     return $res;
  675 }
  676 
  677 sub _node_read_fast {
  678     my ($self) = @_;
  679 
  680     # We cannot bypass the IO if using TLS
  681     # so just reverting to normal mode.
  682     return _node_read(@_) if $self->{tls};
  683 
  684     # Disable Buffering here, to be able to use sysread()
  685     local $| = 1;
  686 
  687     my $io_src = $self->{reader};
  688         my $buf;
  689         my $offset = 0;
  690         while (my $read_len = sysread($io_src, $buf, 4096, $offset)) {
  691         $offset += $read_len;
  692 
  693         # Stop when we read a \n.\n
  694         # ... No need to have a full regex : simple index()
  695         my $start_offset = $offset - $read_len - 3;
  696         $start_offset = 0 if $start_offset < 0;
  697         last if index($buf, "\n.\n", $start_offset) >= 0;
  698 
  699         # if empty, the client only sends a plain ".\n"
  700         last if $buf eq ".\n";
  701         }
  702 
  703     # Remove the last line that only contains ".\n"
  704     $buf =~ s/\.\n$//;
  705 
  706     return [ split(/\n/, $buf) ];
  707 }
  708 
  709 sub _node_read {
  710     my ($self) = @_;
  711     my @array = ();
  712 
  713     my $line = $self->_node_read_single();
  714     while($line ne ".") {
  715         push @array, $line;
  716         $line = $self->_node_read_single();
  717     }
  718 
  719     DEBUG "[DEBUG] Reading from socket: \"".(join ("\\n",@array))."\".";
  720     return \@array;
  721 }
  722 
  723 # Defines the URL::scheme for munin
  724 package URI::munin;
  725 
  726 # We are like a generic server
  727 require URI::_server;
  728 @URI::munin::ISA=qw(URI::_server);
  729 
  730 # munin://HOST[:PORT]
  731 
  732 sub default_port { return 4949; }
  733 
  734 1;
  735 
  736 __END__
  737 
  738 =head1 NAME
  739 
  740 Munin::Master::Node - Provides easy access to the munin node
  741 
  742 =head1 SYNOPSIS
  743 
  744  use Munin::Master::Node;
  745  my $node = Munin::Master::Node->new('localhost', '4949', 'foo');
  746  $node->do_in_session(sub{
  747      ... # Call misc. methods on $node
  748  });
  749 
  750 =head1 METHODS
  751 
  752 =over
  753 
  754 =item B<new>
  755 
  756 FIX
  757 
  758 =item B<do_in_session>
  759 
  760 FIX
  761 
  762 =item B<negotiate_capabilities>
  763 
  764 FIX
  765 
  766 =item B<list_services>
  767 
  768 FIX
  769 
  770 =item B<fetch_service_config>
  771 
  772 FIX
  773 
  774 =item B<fetch_service_data>
  775 
  776 FIX
  777 
  778 =back
  779