"Fossies" - the Fresh Open Source Software Archive

Member "amavisd-new-2.11.1/TinyRedis.pm" (19 Nov 2014, 13864 Bytes) of package /linux/misc/amavisd-new-2.11.1.tar.bz2:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Perl source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. For more information about "TinyRedis.pm" see the Fossies "Dox" file reference documentation and the last Fossies "Diffs" side-by-side code changes report: 2.10.1_vs_2.11.0-rc1.

    1 #
    2 # Copyright (c) 2013-2014 Mark Martinec
    3 # All rights reserved.
    4 #
    5 # See LICENSE AND COPYRIGHT section in POD text below for usage
    6 # and distribution rights.
    7 #
    8 
    9 package Redis::TinyRedis;
   10 
   11 use strict;
   12 use re 'taint';
   13 use warnings;
   14 
   15 use Errno qw(EINTR EAGAIN EPIPE ENOTCONN ECONNRESET ECONNABORTED);
   16 use IO::Socket::UNIX;
   17 use Time::HiRes ();
   18 
   19 use vars qw($VERSION $io_socket_module_name);
   20 BEGIN {
   21   $VERSION = '1.001';
   22   if (eval { require IO::Socket::IP }) {
   23     $io_socket_module_name = 'IO::Socket::IP';
   24   } elsif (eval { require IO::Socket::INET6 }) {
   25     $io_socket_module_name = 'IO::Socket::INET6';
   26   } elsif (eval { require IO::Socket::INET }) {
   27     $io_socket_module_name = 'IO::Socket::INET';
   28   }
   29 }
   30 
   31 sub new {
   32   my($class, %args) = @_;
   33   my $self = bless { args => {%args} }, $class;
   34   my $outbuf = ''; $self->{outbuf} = \$outbuf;
   35   $self->{batch_size} = 0;
   36   $self->{server} = $args{server} || $args{sock} || '127.0.0.1:6379';
   37   $self->{on_connect} = $args{on_connect};
   38   return if !$self->connect;
   39   $self;
   40 }
   41 
   42 sub DESTROY {
   43   my $self = $_[0];
   44   local($@, $!, $_);
   45   undef $self->{sock};
   46 }
   47 
   48 sub disconnect {
   49   my $self = $_[0];
   50   local($@, $!);
   51   undef $self->{sock};
   52 }
   53 
   54 sub connect {
   55   my $self = $_[0];
   56 
   57   $self->disconnect;
   58   my $sock;
   59   my $server = $self->{server};
   60   if ($server =~ m{^/}) {
   61     $sock = IO::Socket::UNIX->new(
   62               Peer => $server, Type => SOCK_STREAM);
   63   } elsif ($server =~ /^(?: \[ ([^\]]+) \] | ([^:]+) ) : ([^:]+) \z/xs) {
   64     $server = defined $1 ? $1 : $2;  my $port = $3;
   65     $sock = $io_socket_module_name->new(
   66               PeerAddr => $server, PeerPort => $port, Proto => 'tcp');
   67   } else {
   68     die "Invalid 'server:port' specification: $server";
   69   }
   70   if ($sock) {
   71     $self->{sock} = $sock;
   72 
   73     $self->{sock_fd} = $sock->fileno; $self->{fd_mask} = '';
   74     vec($self->{fd_mask}, $self->{sock_fd}, 1) = 1;
   75 
   76     # an on_connect() callback must not use batched calls!
   77     $self->{on_connect}->($self)  if $self->{on_connect};
   78   }
   79   $sock;
   80 }
   81 
   82 # Receive, parse and return $cnt consecutive redis replies as a list.
   83 #
   84 sub _response {
   85   my($self, $cnt) = @_;
   86 
   87   my $sock = $self->{sock};
   88   if (!$sock) {
   89     $self->connect  or die "Connect failed: $!";
   90     $sock = $self->{sock};
   91   };
   92 
   93   my @list;
   94 
   95   for (1 .. $cnt) {
   96 
   97     my $result = <$sock>;
   98     if (!defined $result) {
   99       $self->disconnect;
  100       die "Error reading from Redis server: $!";
  101     }
  102     chomp $result;
  103     my $resp_type = substr($result, 0, 1, '');
  104 
  105     if ($resp_type eq '$') {  # bulk reply
  106       if ($result < 0) {
  107         push(@list, undef);  # null bulk reply
  108       } else {
  109         my $data = ''; my $ofs = 0; my $len = $result + 2;
  110         while ($len > 0) {
  111           my $nbytes = read($sock, $data, $len, $ofs);
  112           if (!$nbytes) {
  113             $self->disconnect;
  114             defined $nbytes  or die "Error reading from Redis server: $!";
  115             die "Redis server closed connection";
  116           }
  117           $ofs += $nbytes; $len -= $nbytes;
  118         }
  119         chomp $data;
  120         push(@list, $data);
  121       }
  122 
  123     } elsif ($resp_type eq ':') {  # integer reply
  124       push(@list, 0+$result);
  125 
  126     } elsif ($resp_type eq '+') {  # status reply
  127       push(@list, $result);
  128 
  129     } elsif ($resp_type eq '*') {  # multi-bulk reply
  130       push(@list, $result < 0 ? undef : $self->_response(0+$result) );
  131 
  132     } elsif ($resp_type eq '-') {  # error reply
  133       die "$result\n";
  134 
  135     } else {
  136       die "Unknown Redis reply: $resp_type ($result)";
  137     }
  138   }
  139   \@list;
  140 }
  141 
  142 sub _write_buff {
  143   my($self, $bufref) = @_;
  144 
  145   if (!$self->{sock}) { $self->connect or die "Connect failed: $!" };
  146   my $nwrite;
  147   for (my $ofs = 0; $ofs < length($$bufref); $ofs += $nwrite) {
  148     # to reliably detect a disconnect we need to check for an input event
  149     # using a select; checking status of syswrite is not sufficient
  150     my($rout, $wout, $inbuff); my $fd_mask = $self->{fd_mask};
  151     my $nfound = select($rout=$fd_mask, $wout=$fd_mask, undef, undef);
  152     defined $nfound && $nfound >= 0 or die "Select failed: $!";
  153     if (vec($rout, $self->{sock_fd}, 1) &&
  154         !sysread($self->{sock}, $inbuff, 1024)) {
  155       # eof, try reconnecting
  156       $self->connect  or die "Connect failed: $!";
  157     }
  158     local $SIG{PIPE} = 'IGNORE';  # don't signal on a write to a widowed pipe
  159     $nwrite = syswrite($self->{sock}, $$bufref, length($$bufref)-$ofs, $ofs);
  160     next if defined $nwrite;
  161     $nwrite = 0;
  162     if ($! == EINTR || $! == EAGAIN) {  # no big deal, try again
  163       Time::HiRes::sleep(0.1);  # slow down, just in case
  164     } else {
  165       $self->disconnect;
  166       if ($! == ENOTCONN   || $! == EPIPE ||
  167           $! == ECONNRESET || $! == ECONNABORTED) {
  168         $self->connect  or die "Connect failed: $!";
  169       } else {
  170         die "Error writing to redis socket: $!";
  171       }
  172     }
  173   }
  174   1;
  175 }
  176 
  177 # Send a redis command with arguments, returning a redis reply.
  178 #
  179 sub call {
  180   my $self = shift;
  181 
  182   my $buff = '*' . scalar(@_) . "\015\012";
  183   $buff .= '$' . length($_) . "\015\012" . $_ . "\015\012"  for @_;
  184 
  185   $self->_write_buff(\$buff);
  186   local($/) = "\015\012";
  187   my $arr_ref = $self->_response(1);
  188   $arr_ref && $arr_ref->[0];
  189 }
  190 
  191 # Append a redis command with arguments to a batch.
  192 #
  193 sub b_call {
  194   my $self = shift;
  195 
  196   my $bufref = $self->{outbuf};
  197   $$bufref .= '*' . scalar(@_) . "\015\012";
  198   $$bufref .= '$' . length($_) . "\015\012" . $_ . "\015\012"  for @_;
  199   ++ $self->{batch_size};
  200 }
  201 
  202 # Send a batch of commands, returning an arrayref of redis replies,
  203 # each array element corresponding to one command in a batch.
  204 #
  205 sub b_results {
  206   my $self = $_[0];
  207   my $batch_size = $self->{batch_size};
  208   return if !$batch_size;
  209   my $bufref = $self->{outbuf};
  210   $self->_write_buff($bufref);
  211   $$bufref = ''; $self->{batch_size} = 0;
  212   local($/) = "\015\012";
  213   $self->_response($batch_size);
  214 }
  215 
  216 1;
  217 
  218 __END__
  219 =head1 NAME
  220 
  221 Redis::TinyRedis - client side of the Redis protocol
  222 
  223 =head1 SYNOPSIS
  224 
  225 EXAMPLE:
  226 
  227   use Redis::TinyRedis;
  228 
  229   sub on_connect {
  230     my($r) = @_;
  231   # $r->call('AUTH', 'xyz');
  232     $r->call('SELECT', 3);
  233     $r->call('CLIENT', 'SETNAME', "test[$$]");
  234     1;
  235   }
  236 
  237 # my $server = '/tmp/redis.sock';
  238   my $server = '[::1]:6379';
  239 
  240   my $r = Redis::TinyRedis->new(server => $server,
  241                                 on_connect => \&on_connect);
  242   $r or die "Error connecting to a Redis server: $!";
  243 
  244   $r->call('SET', 'key123', 'val123');  # will die on error
  245   $r->call('SET', 'key456', 'val456');  # will die on error
  246 
  247   my $v = $r->call('GET', 'key123');
  248   if (defined $v) { printf("got %s\n", $v) }
  249   else { printf("key not in a database\n") }
  250 
  251   my @keys = ('key123', 'key456', 'keynone');
  252   my $values = $r->call('MGET', @keys);
  253   printf("got %s => %s\n", $_, shift @$values // 'UNDEF') for @keys;
  254 
  255   # batching (pipelining) multiple commands saves on round-trips
  256   $r->b_call('DEL',     'keyxxx');
  257   $r->b_call('HINCRBY', 'keyxxx', 'cnt1', 5);
  258   $r->b_call('HINCRBY', 'keyxxx', 'cnt2', 1);
  259   $r->b_call('HINCRBY', 'keyxxx', 'cnt2', 2);
  260   $r->b_call('EXPIRE',  'keyxxx', 120);
  261   $r->b_results;  # collect response ignoring results, dies on error
  262 
  263   my $counts = $r->call('HMGET', 'keyxxx', 'cnt1', 'cnt2', 'cnt3');
  264   printf("count %s\n", $_ // 'UNDEF') for @$counts;
  265 
  266   # Lua server side scripting
  267   my $lua_results = $r->call('EVAL',
  268     'return redis.call("HGETALL", KEYS[1])', 1, 'keyxxx');
  269   printf("%s\n", join(', ', @$lua_results));
  270 
  271   # traversing all keys
  272   for (my $cursor = 0; ; ) {
  273     my $pair = $r->call('SCAN', $cursor, 'COUNT', 20);
  274     ($cursor, my $elements) = @$pair;
  275     printf("key: %s\n", $_) for @$elements;
  276     last if !$cursor;
  277   }
  278 
  279   # another batch of commands
  280   $r->b_call('DEL', $_) for @keys;
  281   my $results = $r->b_results;  # collect response
  282   printf("delete status for %s: %d\n", $_, shift @$results) for @keys;
  283 
  284   # monitor activity on a database through Redis keyspace notifications
  285   $r->call('CONFIG', 'SET', 'notify-keyspace-events', 'KEA');
  286   $r->call('PSUBSCRIBE', '__key*__:*');
  287   for (1..20) {
  288     my $msg = $r->call;  # collect one message at a time
  289     printf("%s\n", join(", ",@$msg));
  290   }
  291   $r->call('UNSUBSCRIBE');
  292   $r->call('CONFIG', 'SET', 'notify-keyspace-events', '');
  293 
  294   undef $r;  # DESTROY cleanly closes a connection to a redis server
  295 
  296 =head1 DESCRIPTION
  297 
  298 This is a Perl module Redis::TinyRedis implementing a client side of
  299 the Redis protocol, i.e. a unified request protocol as introduced
  300 in Redis 1.2. Design priorities were speed, simplicity, error checking.
  301 
  302 =head1 METHODS
  303 
  304 =head2 new
  305 
  306 Initializes a Redis::TinyRedis object and established a connection
  307 to a Redis server. Returns a Redis::TinyRedis object if the connection
  308 was successfully established (by calling a connect() method implicitly),
  309 or false otherwise, leaving a failure reason in $! .
  310 
  311 =over 4
  312 
  313 =item B<server>
  314 
  315 Specifies a socket where a Redis server is listening. If a string
  316 starts with a '/' an absolute path to a Unix socket is assumed,
  317 otherwise it is interpreted as an INET or INET6 socket specification
  318 in a syntax as recognized by a C<PeerAddr> option of the underlying
  319 socket module (IO::Socket::IP, or IO::Socket::INET6, or IO::Socket::INET),
  320 e.g. '127.0.0.1:6379' or '[::1]:6379' or 'localhost::6379'.
  321 Port number must be explicitly specified.
  322 
  323 A default is '127.0.0.1:6379'.
  324 
  325 =item B<on_connect>
  326 
  327 Specifies an optional callback subroutine, to be called by a connect()
  328 method after each successful establishment of a connection to a redis server.
  329 Useful as a provider of a Redis client authentication or for database
  330 index selection.
  331 
  332 The on_connect() callback is given a Redis::TinyRedis object as its
  333 argument. This object also carries all arguments that were given in
  334 a call to new() when it was created, including any additional options
  335 unrecognized and ignored by Redis::TinyRedis->new().
  336 
  337 An on_connect() callback subroutine must not use batched calls
  338 (b_call / b_results), but may use the call() method.
  339 
  340 =back
  341 
  342 =head2 connect
  343 
  344 Establishes a connection to a Redis server. Returns a socket object,
  345 or undef if the connection failed, leaving error status in $! .
  346 The connect() method is called implicitly by new(), or by call() or
  347 b_results() if a connection was dropped due to some previous failure.
  348 It may be called explicitly by an application, possibly combined with
  349 a disconnect() method, to give more control to the application.
  350 
  351 =head2 disconnect
  352 
  353 Closes a connection to a Redis server if it is established,
  354 does nothing if a connection is not established. The connection
  355 will be re-established by subsequent calls to connect() or call()
  356 or b_results().
  357 
  358 Closing a connection is implied by a DESTROY method, so dropping
  359 references to a Redis::TinyRedis object also cleanly disconnects
  360 an established session with a Redis server.
  361 
  362 =head2 call
  363 
  364 Sends a redis command with arguments, returning a redis reply.
  365 The first argument is expected to be a name of a Redis command,
  366 followed by its arguments according to Redis documentation.
  367 
  368 The command will die if a Redis server returns an error reply.
  369 It may also die if it needs to implicitly re-establish a connection
  370 and the connect() call fails.
  371 
  372 The returned value is an integer if a Redis server returns an integer
  373 reply, is a status string if a status reply is returned, is undef if a
  374 null bulk reply is returned, is a string in case of a bulk reply, and
  375 is a reference to an array of results in case of a multi-bulk reply.
  376 
  377 =head2 b_call
  378 
  379 Appends a redis command with arguments to a batch.
  380 The first argument is expected to be a name of a Redis command,
  381 followed by its arguments according to Redis documentation.
  382 
  383 =head2 b_results
  384 
  385 Sends a batch of commands, then resets the batch. Returns a reference
  386 to an array of redis replies, each array element corresponding to one
  387 command in a batch.
  388 
  389 The command will die if a Redis server returns an error reply.
  390 It may also die if it needs to implicitly re-establish a connection
  391 and the connect() call fails.
  392 
  393 Returns a reference to an array of results, each one corresponding
  394 to one command of a batch. A data type of each array element is the
  395 same as described in a call() method.
  396 
  397 =head1 AUTHOR
  398 
  399 Mark Martinec, C<< <Mark.Martinec@ijs.si> >>
  400 
  401 =head1 BUGS
  402 
  403 Please send bug reports to the author.
  404 
  405 =head1 LICENSE AND COPYRIGHT
  406 
  407 Copyright (c) 2013-2014 Mark Martinec
  408 All rights reserved.
  409 
  410 Redistribution and use in source and binary forms, with or without
  411 modification, are permitted provided that the following conditions
  412 are met:
  413 1. Redistributions of source code must retain the above copyright notice,
  414    this list of conditions and the following disclaimer.
  415 2. Redistributions in binary form must reproduce the above copyright notice,
  416    this list of conditions and the following disclaimer in the documentation
  417    and/or other materials provided with the distribution.
  418 
  419 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  420 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  421 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  422 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
  423 BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  424 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  425 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  426 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  427 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  428 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  429 POSSIBILITY OF SUCH DAMAGE.
  430 
  431 The views and conclusions contained in the software and documentation are
  432 those of the authors and should not be interpreted as representing official
  433 policies, either expressed or implied, of the Jozef Stefan Institute.
  434 
  435 (the above license is the 2-clause BSD license, also known as
  436  a "Simplified BSD License", and pertains to this program only)
  437 
  438 =cut