"Fossies" - the Fresh Open Source Software Archive

Member "redis-5.0.6/tests/unit/type/stream.tcl" (25 Sep 2019, 13857 Bytes) of package /linux/misc/redis-5.0.6.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Tcl/Tk source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file.

    1 # return value is like strcmp() and similar.
    2 proc streamCompareID {a b} {
    3     if {$a eq $b} {return 0}
    4     lassign [split $a -] a_ms a_seq
    5     lassign [split $b -] b_ms b_seq
    6     if {$a_ms > $b_ms} {return 1}
    7     if {$a_ms < $b_ms} {return -1}
    8     # Same ms case, compare seq.
    9     if {$a_seq > $b_seq} {return 1}
   10     if {$a_seq < $b_seq} {return -1}
   11 }
   12 
   13 # return the ID immediately greater than the specified one.
   14 # Note that this function does not care to handle 'seq' overflow
   15 # since it's a 64 bit value.
   16 proc streamNextID {id} {
   17     lassign [split $id -] ms seq
   18     incr seq
   19     join [list $ms $seq] -
   20 }
   21 
   22 # Generate a random stream entry ID with the ms part between min and max
   23 # and a low sequence number (0 - 999 range), in order to stress test
   24 # XRANGE against a Tcl implementation implementing the same concept
   25 # with Tcl-only code in a linear array.
   26 proc streamRandomID {min_id max_id} {
   27     lassign [split $min_id -] min_ms min_seq
   28     lassign [split $max_id -] max_ms max_seq
   29     set delta [expr {$max_ms-$min_ms+1}]
   30     set ms [expr {$min_ms+[randomInt $delta]}]
   31     set seq [randomInt 1000]
   32     return $ms-$seq
   33 }
   34 
   35 # Tcl-side implementation of XRANGE to perform fuzz testing in the Redis
   36 # XRANGE implementation.
   37 proc streamSimulateXRANGE {items start end} {
   38     set res {}
   39     foreach i $items  {
   40         set this_id [lindex $i 0]
   41         if {[streamCompareID $this_id $start] >= 0} {
   42             if {[streamCompareID $this_id $end] <= 0} {
   43                 lappend res $i
   44             }
   45         }
   46     }
   47     return $res
   48 }
   49 
   50 set content {} ;# Will be populated with Tcl side copy of the stream content.
   51 
   52 start_server {
   53     tags {"stream"}
   54 } {
   55     test {XADD can add entries into a stream that XRANGE can fetch} {
   56         r XADD mystream * item 1 value a
   57         r XADD mystream * item 2 value b
   58         assert_equal 2 [r XLEN mystream]
   59         set items [r XRANGE mystream - +]
   60         assert_equal [lindex $items 0 1] {item 1 value a}
   61         assert_equal [lindex $items 1 1] {item 2 value b}
   62     }
   63 
   64     test {XADD IDs are incremental} {
   65         set id1 [r XADD mystream * item 1 value a]
   66         set id2 [r XADD mystream * item 2 value b]
   67         set id3 [r XADD mystream * item 3 value c]
   68         assert {[streamCompareID $id1 $id2] == -1}
   69         assert {[streamCompareID $id2 $id3] == -1}
   70     }
   71 
   72     test {XADD IDs are incremental when ms is the same as well} {
   73         r multi
   74         r XADD mystream * item 1 value a
   75         r XADD mystream * item 2 value b
   76         r XADD mystream * item 3 value c
   77         lassign [r exec] id1 id2 id3
   78         assert {[streamCompareID $id1 $id2] == -1}
   79         assert {[streamCompareID $id2 $id3] == -1}
   80     }
   81 
   82     test {XADD with MAXLEN option} {
   83         r DEL mystream
   84         for {set j 0} {$j < 1000} {incr j} {
   85             if {rand() < 0.9} {
   86                 r XADD mystream MAXLEN 5 * xitem $j
   87             } else {
   88                 r XADD mystream MAXLEN 5 * yitem $j
   89             }
   90         }
   91         set res [r xrange mystream - +]
   92         set expected 995
   93         foreach r $res {
   94             assert {[lindex $r 1 1] == $expected}
   95             incr expected
   96         }
   97     }
   98 
   99     test {XADD mass insertion and XLEN} {
  100         r DEL mystream
  101         r multi
  102         for {set j 0} {$j < 10000} {incr j} {
  103             # From time to time insert a field with a different set
  104             # of fields in order to stress the stream compression code.
  105             if {rand() < 0.9} {
  106                 r XADD mystream * item $j
  107             } else {
  108                 r XADD mystream * item $j otherfield foo
  109             }
  110         }
  111         r exec
  112 
  113         set items [r XRANGE mystream - +]
  114         for {set j 0} {$j < 10000} {incr j} {
  115             assert {[lrange [lindex $items $j 1] 0 1] eq [list item $j]}
  116         }
  117         assert {[r xlen mystream] == $j}
  118     }
  119 
  120     test {XRANGE COUNT works as expected} {
  121         assert {[llength [r xrange mystream - + COUNT 10]] == 10}
  122     }
  123 
  124     test {XREVRANGE COUNT works as expected} {
  125         assert {[llength [r xrevrange mystream + - COUNT 10]] == 10}
  126     }
  127 
  128     test {XRANGE can be used to iterate the whole stream} {
  129         set last_id "-"
  130         set j 0
  131         while 1 {
  132             set elements [r xrange mystream $last_id + COUNT 100]
  133             if {[llength $elements] == 0} break
  134             foreach e $elements {
  135                 assert {[lrange [lindex $e 1] 0 1] eq [list item $j]}
  136                 incr j;
  137             }
  138             set last_id [streamNextID [lindex $elements end 0]]
  139         }
  140         assert {$j == 10000}
  141     }
  142 
  143     test {XREVRANGE returns the reverse of XRANGE} {
  144         assert {[r xrange mystream - +] == [lreverse [r xrevrange mystream + -]]}
  145     }
  146 
  147     test {XREAD with non empty stream} {
  148         set res [r XREAD COUNT 1 STREAMS mystream 0-0]
  149         assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}}
  150     }
  151 
  152     test {Non blocking XREAD with empty streams} {
  153         set res [r XREAD STREAMS s1 s2 0-0 0-0]
  154         assert {$res eq {}}
  155     }
  156 
  157     test {XREAD with non empty second stream} {
  158         set res [r XREAD COUNT 1 STREAMS nostream mystream 0-0 0-0]
  159         assert {[lindex $res 0 0] eq {mystream}}
  160         assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}}
  161     }
  162 
  163     test {Blocking XREAD waiting new data} {
  164         r XADD s2 * old abcd1234
  165         set rd [redis_deferring_client]
  166         $rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ $ $
  167         r XADD s2 * new abcd1234
  168         set res [$rd read]
  169         assert {[lindex $res 0 0] eq {s2}}
  170         assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
  171     }
  172 
  173     test {Blocking XREAD waiting old data} {
  174         set rd [redis_deferring_client]
  175         $rd XREAD BLOCK 20000 STREAMS s1 s2 s3 $ 0-0 $
  176         r XADD s2 * foo abcd1234
  177         set res [$rd read]
  178         assert {[lindex $res 0 0] eq {s2}}
  179         assert {[lindex $res 0 1 0 1] eq {old abcd1234}}
  180     }
  181 
  182     test "XREAD: XADD + DEL should not awake client" {
  183         set rd [redis_deferring_client]
  184         r del s1
  185         $rd XREAD BLOCK 20000 STREAMS s1 $
  186         r multi
  187         r XADD s1 * old abcd1234
  188         r DEL s1
  189         r exec
  190         r XADD s1 * new abcd1234
  191         set res [$rd read]
  192         assert {[lindex $res 0 0] eq {s1}}
  193         assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
  194     }
  195 
  196     test "XREAD: XADD + DEL + LPUSH should not awake client" {
  197         set rd [redis_deferring_client]
  198         r del s1
  199         $rd XREAD BLOCK 20000 STREAMS s1 $
  200         r multi
  201         r XADD s1 * old abcd1234
  202         r DEL s1
  203         r LPUSH s1 foo bar
  204         r exec
  205         r DEL s1
  206         r XADD s1 * new abcd1234
  207         set res [$rd read]
  208         assert {[lindex $res 0 0] eq {s1}}
  209         assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
  210     }
  211 
  212     test {XREAD with same stream name multiple times should work} {
  213         r XADD s2 * old abcd1234
  214         set rd [redis_deferring_client]
  215         $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
  216         r XADD s2 * new abcd1234
  217         set res [$rd read]
  218         assert {[lindex $res 0 0] eq {s2}}
  219         assert {[lindex $res 0 1 0 1] eq {new abcd1234}}
  220     }
  221 
  222     test {XREAD + multiple XADD inside transaction} {
  223         r XADD s2 * old abcd1234
  224         set rd [redis_deferring_client]
  225         $rd XREAD BLOCK 20000 STREAMS s2 s2 s2 $ $ $
  226         r MULTI
  227         r XADD s2 * field one
  228         r XADD s2 * field two
  229         r XADD s2 * field three
  230         r EXEC
  231         set res [$rd read]
  232         assert {[lindex $res 0 0] eq {s2}}
  233         assert {[lindex $res 0 1 0 1] eq {field one}}
  234         assert {[lindex $res 0 1 1 1] eq {field two}}
  235     }
  236 
  237     test {XDEL basic test} {
  238         r del somestream
  239         r xadd somestream * foo value0
  240         set id [r xadd somestream * foo value1]
  241         r xadd somestream * foo value2
  242         r xdel somestream $id
  243         assert {[r xlen somestream] == 2}
  244         set result [r xrange somestream - +]
  245         assert {[lindex $result 0 1 1] eq {value0}}
  246         assert {[lindex $result 1 1 1] eq {value2}}
  247     }
  248 
  249     # Here the idea is to check the consistency of the stream data structure
  250     # as we remove all the elements down to zero elements.
  251     test {XDEL fuzz test} {
  252         r del somestream
  253         set ids {}
  254         set x 0; # Length of the stream
  255         while 1 {
  256             lappend ids [r xadd somestream * item $x]
  257             incr x
  258             # Add enough elements to have a few radix tree nodes inside the stream.
  259             if {[dict get [r xinfo stream somestream] radix-tree-keys] > 20} break
  260         }
  261 
  262         # Now remove all the elements till we reach an empty stream
  263         # and after every deletion, check that the stream is sane enough
  264         # to report the right number of elements with XRANGE: this will also
  265         # force accessing the whole data structure to check sanity.
  266         assert {[r xlen somestream] == $x}
  267 
  268         # We want to remove elements in random order to really test the
  269         # implementation in a better way.
  270         set ids [lshuffle $ids]
  271         foreach id $ids {
  272             assert {[r xdel somestream $id] == 1}
  273             incr x -1
  274             assert {[r xlen somestream] == $x}
  275             # The test would be too slow calling XRANGE for every iteration.
  276             # Do it every 100 removal.
  277             if {$x % 100 == 0} {
  278                 set res [r xrange somestream - +]
  279                 assert {[llength $res] == $x}
  280             }
  281         }
  282     }
  283 
  284     test {XRANGE fuzzing} {
  285         set low_id [lindex $items 0 0]
  286         set high_id [lindex $items end 0]
  287         for {set j 0} {$j < 100} {incr j} {
  288             set start [streamRandomID $low_id $high_id]
  289             set end [streamRandomID $low_id $high_id]
  290             set range [r xrange mystream $start $end]
  291             set tcl_range [streamSimulateXRANGE $items $start $end]
  292             if {$range ne $tcl_range} {
  293                 puts "*** WARNING *** - XRANGE fuzzing mismatch: $start - $end"
  294                 puts "---"
  295                 puts "XRANGE: '$range'"
  296                 puts "---"
  297                 puts "TCL: '$tcl_range'"
  298                 puts "---"
  299                 fail "XRANGE fuzzing failed, check logs for details"
  300             }
  301         }
  302     }
  303 
  304     test {XREVRANGE regression test for issue #5006} {
  305         # Add non compressed entries
  306         r xadd teststream 1234567891230 key1 value1
  307         r xadd teststream 1234567891240 key2 value2
  308         r xadd teststream 1234567891250 key3 value3
  309 
  310         # Add SAMEFIELD compressed entries
  311         r xadd teststream2 1234567891230 key1 value1
  312         r xadd teststream2 1234567891240 key1 value2
  313         r xadd teststream2 1234567891250 key1 value3
  314 
  315         assert_equal [r xrevrange teststream 1234567891245 -] {{1234567891240-0 {key2 value2}} {1234567891230-0 {key1 value1}}}
  316 
  317         assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}}
  318     }
  319 }
  320 
  321 start_server {tags {"stream"} overrides {appendonly yes}} {
  322     test {XADD with MAXLEN > xlen can propagate correctly} {
  323         for {set j 0} {$j < 100} {incr j} {
  324             r XADD mystream * xitem v
  325         }
  326         r XADD mystream MAXLEN 200 * xitem v
  327         incr j
  328         assert {[r xlen mystream] == $j}
  329         r debug loadaof
  330         r XADD mystream * xitem v
  331         incr j
  332         assert {[r xlen mystream] == $j}
  333     }
  334 }
  335 
  336 start_server {tags {"stream"} overrides {appendonly yes}} {
  337     test {XADD with ~ MAXLEN can propagate correctly} {
  338         for {set j 0} {$j < 100} {incr j} {
  339             r XADD mystream * xitem v
  340         }
  341         r XADD mystream MAXLEN ~ $j * xitem v
  342         incr j
  343         assert {[r xlen mystream] == $j}
  344         r config set stream-node-max-entries 1
  345         r debug loadaof
  346         r XADD mystream * xitem v
  347         incr j
  348         assert {[r xlen mystream] == $j}
  349     }
  350 }
  351 
  352 start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} {
  353     test {XTRIM with ~ MAXLEN can propagate correctly} {
  354         for {set j 0} {$j < 100} {incr j} {
  355             r XADD mystream * xitem v
  356         }
  357         r XTRIM mystream MAXLEN ~ 85
  358         assert {[r xlen mystream] == 89}
  359         r config set stream-node-max-entries 1
  360         r debug loadaof
  361         r XADD mystream * xitem v
  362         incr j
  363         assert {[r xlen mystream] == 90}
  364     }
  365 }
  366 
  367 start_server {tags {"xsetid"}} {
  368     test {XADD can CREATE an empty stream} {
  369         r XADD mystream MAXLEN 0 * a b
  370         assert {[dict get [r xinfo stream mystream] length] == 0}
  371     }
  372 
  373     test {XSETID can set a specific ID} {
  374         r XSETID mystream "200-0"
  375         assert {[dict get [r xinfo stream mystream] last-generated-id] == "200-0"}
  376     }
  377 
  378     test {XSETID cannot SETID with smaller ID} {
  379         r XADD mystream * a b
  380         catch {r XSETID mystream "1-1"} err
  381         r XADD mystream MAXLEN 0 * a b
  382         set err
  383     } {ERR*smaller*}
  384 
  385     test {XSETID cannot SETID on non-existent key} {
  386         catch {r XSETID stream 1-1} err
  387         set _ $err
  388     } {ERR no such key}
  389 }
  390 
  391 start_server {tags {"stream"} overrides {appendonly yes aof-use-rdb-preamble no}} {
  392     test {Empty stream can be rewrite into AOF correctly} {
  393         r XADD mystream MAXLEN 0 * a b
  394         assert {[dict get [r xinfo stream mystream] length] == 0}
  395         r bgrewriteaof
  396         waitForBgrewriteaof r
  397         r debug loadaof
  398         assert {[dict get [r xinfo stream mystream] length] == 0}
  399     }
  400 
  401     test {Stream can be rewrite into AOF correctly after XDEL lastid} {
  402         r XSETID mystream 0-0
  403         r XADD mystream 1-1 a b
  404         r XADD mystream 2-2 a b
  405         assert {[dict get [r xinfo stream mystream] length] == 2}
  406         r XDEL mystream 2-2
  407         r bgrewriteaof
  408         waitForBgrewriteaof r
  409         r debug loadaof
  410         assert {[dict get [r xinfo stream mystream] length] == 1}
  411         assert {[dict get [r xinfo stream mystream] last-generated-id] == "2-2"}
  412     }
  413 }