"Fossies" - the Fresh Open Source Software Archive

Member "prometheus-2.15.2/tsdb/db_test.go" (6 Jan 2020, 81223 Bytes) of package /linux/misc/prometheus-2.15.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Go source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "db_test.go": 2.15.1_vs_2.15.2.

    1 // Copyright 2017 The Prometheus Authors
    2 // Licensed under the Apache License, Version 2.0 (the "License");
    3 // you may not use this file except in compliance with the License.
    4 // You may obtain a copy of the License at
    5 //
    6 // http://www.apache.org/licenses/LICENSE-2.0
    7 //
    8 // Unless required by applicable law or agreed to in writing, software
    9 // distributed under the License is distributed on an "AS IS" BASIS,
   10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   11 // See the License for the specific language governing permissions and
   12 // limitations under the License.
   13 
   14 package tsdb
   15 
   16 import (
   17     "encoding/binary"
   18     "fmt"
   19     "hash/crc32"
   20     "io/ioutil"
   21     "math"
   22     "math/rand"
   23     "os"
   24     "path"
   25     "path/filepath"
   26     "sort"
   27     "strconv"
   28     "sync"
   29     "testing"
   30     "time"
   31 
   32     "github.com/prometheus/prometheus/tsdb/fileutil"
   33 
   34     "github.com/go-kit/kit/log"
   35     "github.com/oklog/ulid"
   36     "github.com/pkg/errors"
   37     "github.com/prometheus/client_golang/prometheus"
   38     prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
   39     "github.com/prometheus/prometheus/pkg/labels"
   40     "github.com/prometheus/prometheus/tsdb/chunks"
   41     "github.com/prometheus/prometheus/tsdb/index"
   42     "github.com/prometheus/prometheus/tsdb/record"
   43     "github.com/prometheus/prometheus/tsdb/tombstones"
   44     "github.com/prometheus/prometheus/tsdb/tsdbutil"
   45     "github.com/prometheus/prometheus/tsdb/wal"
   46     "github.com/prometheus/prometheus/util/testutil"
   47 )
   48 
   49 func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) {
   50     tmpdir, err := ioutil.TempDir("", "test")
   51     testutil.Ok(t, err)
   52 
   53     db, err = Open(tmpdir, nil, nil, opts)
   54     testutil.Ok(t, err)
   55 
   56     // Do not close the test database by default as it will deadlock on test failures.
   57     return db, func() {
   58         testutil.Ok(t, os.RemoveAll(tmpdir))
   59     }
   60 }
   61 
   62 // query runs a matcher query against the querier and fully expands its data.
   63 func query(t testing.TB, q Querier, matchers ...*labels.Matcher) map[string][]tsdbutil.Sample {
   64     ss, err := q.Select(matchers...)
   65     defer func() {
   66         testutil.Ok(t, q.Close())
   67     }()
   68     testutil.Ok(t, err)
   69 
   70     result := map[string][]tsdbutil.Sample{}
   71 
   72     for ss.Next() {
   73         series := ss.At()
   74 
   75         samples := []tsdbutil.Sample{}
   76         it := series.Iterator()
   77         for it.Next() {
   78             t, v := it.At()
   79             samples = append(samples, sample{t: t, v: v})
   80         }
   81         testutil.Ok(t, it.Err())
   82 
   83         name := series.Labels().String()
   84         result[name] = samples
   85     }
   86     testutil.Ok(t, ss.Err())
   87 
   88     return result
   89 }
   90 
   91 // Ensure that blocks are held in memory in their time order
   92 // and not in ULID order as they are read from the directory.
   93 func TestDB_reloadOrder(t *testing.T) {
   94     db, delete := openTestDB(t, nil)
   95     defer func() {
   96         testutil.Ok(t, db.Close())
   97         delete()
   98     }()
   99 
  100     metas := []BlockMeta{
  101         {MinTime: 90, MaxTime: 100},
  102         {MinTime: 70, MaxTime: 80},
  103         {MinTime: 100, MaxTime: 110},
  104     }
  105     for _, m := range metas {
  106         createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime))
  107     }
  108 
  109     testutil.Ok(t, db.reload())
  110     blocks := db.Blocks()
  111     testutil.Equals(t, 3, len(blocks))
  112     testutil.Equals(t, metas[1].MinTime, blocks[0].Meta().MinTime)
  113     testutil.Equals(t, metas[1].MaxTime, blocks[0].Meta().MaxTime)
  114     testutil.Equals(t, metas[0].MinTime, blocks[1].Meta().MinTime)
  115     testutil.Equals(t, metas[0].MaxTime, blocks[1].Meta().MaxTime)
  116     testutil.Equals(t, metas[2].MinTime, blocks[2].Meta().MinTime)
  117     testutil.Equals(t, metas[2].MaxTime, blocks[2].Meta().MaxTime)
  118 }
  119 
  120 func TestDataAvailableOnlyAfterCommit(t *testing.T) {
  121     db, delete := openTestDB(t, nil)
  122     defer func() {
  123         testutil.Ok(t, db.Close())
  124         delete()
  125     }()
  126 
  127     app := db.Appender()
  128 
  129     _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
  130     testutil.Ok(t, err)
  131 
  132     querier, err := db.Querier(0, 1)
  133     testutil.Ok(t, err)
  134     seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
  135     testutil.Equals(t, map[string][]tsdbutil.Sample{}, seriesSet)
  136 
  137     err = app.Commit()
  138     testutil.Ok(t, err)
  139 
  140     querier, err = db.Querier(0, 1)
  141     testutil.Ok(t, err)
  142     defer querier.Close()
  143 
  144     seriesSet = query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
  145 
  146     testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{t: 0, v: 0}}}, seriesSet)
  147 }
  148 
  149 func TestDataNotAvailableAfterRollback(t *testing.T) {
  150     db, delete := openTestDB(t, nil)
  151     defer func() {
  152         testutil.Ok(t, db.Close())
  153         delete()
  154     }()
  155 
  156     app := db.Appender()
  157     _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0)
  158     testutil.Ok(t, err)
  159 
  160     err = app.Rollback()
  161     testutil.Ok(t, err)
  162 
  163     querier, err := db.Querier(0, 1)
  164     testutil.Ok(t, err)
  165     defer querier.Close()
  166 
  167     seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
  168 
  169     testutil.Equals(t, map[string][]tsdbutil.Sample{}, seriesSet)
  170 }
  171 
  172 func TestDBAppenderAddRef(t *testing.T) {
  173     db, delete := openTestDB(t, nil)
  174     defer func() {
  175         testutil.Ok(t, db.Close())
  176         delete()
  177     }()
  178 
  179     app1 := db.Appender()
  180 
  181     ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0)
  182     testutil.Ok(t, err)
  183 
  184     // Reference should already work before commit.
  185     err = app1.AddFast(ref1, 124, 1)
  186     testutil.Ok(t, err)
  187 
  188     err = app1.Commit()
  189     testutil.Ok(t, err)
  190 
  191     app2 := db.Appender()
  192 
  193     // first ref should already work in next transaction.
  194     err = app2.AddFast(ref1, 125, 0)
  195     testutil.Ok(t, err)
  196 
  197     ref2, err := app2.Add(labels.FromStrings("a", "b"), 133, 1)
  198     testutil.Ok(t, err)
  199 
  200     testutil.Assert(t, ref1 == ref2, "")
  201 
  202     // Reference must be valid to add another sample.
  203     err = app2.AddFast(ref2, 143, 2)
  204     testutil.Ok(t, err)
  205 
  206     err = app2.AddFast(9999999, 1, 1)
  207     testutil.Equals(t, ErrNotFound, errors.Cause(err))
  208 
  209     testutil.Ok(t, app2.Commit())
  210 
  211     q, err := db.Querier(0, 200)
  212     testutil.Ok(t, err)
  213 
  214     res := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
  215 
  216     testutil.Equals(t, map[string][]tsdbutil.Sample{
  217         labels.FromStrings("a", "b").String(): {
  218             sample{t: 123, v: 0},
  219             sample{t: 124, v: 1},
  220             sample{t: 125, v: 0},
  221             sample{t: 133, v: 1},
  222             sample{t: 143, v: 2},
  223         },
  224     }, res)
  225 }
  226 
  227 func TestAppendEmptyLabelsIgnored(t *testing.T) {
  228     db, delete := openTestDB(t, nil)
  229     defer func() {
  230         testutil.Ok(t, db.Close())
  231         delete()
  232     }()
  233 
  234     app1 := db.Appender()
  235 
  236     ref1, err := app1.Add(labels.FromStrings("a", "b"), 123, 0)
  237     testutil.Ok(t, err)
  238 
  239     // Construct labels manually so there is an empty label.
  240     ref2, err := app1.Add(labels.Labels{labels.Label{Name: "a", Value: "b"}, labels.Label{Name: "c", Value: ""}}, 124, 0)
  241     testutil.Ok(t, err)
  242 
  243     // Should be the same series.
  244     testutil.Equals(t, ref1, ref2)
  245 
  246     err = app1.Commit()
  247     testutil.Ok(t, err)
  248 }
  249 
  250 func TestDeleteSimple(t *testing.T) {
  251     numSamples := int64(10)
  252 
  253     cases := []struct {
  254         Intervals tombstones.Intervals
  255         remaint   []int64
  256     }{
  257         {
  258             Intervals: tombstones.Intervals{{Mint: 0, Maxt: 3}},
  259             remaint:   []int64{4, 5, 6, 7, 8, 9},
  260         },
  261         {
  262             Intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}},
  263             remaint:   []int64{0, 4, 5, 6, 7, 8, 9},
  264         },
  265         {
  266             Intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 7}},
  267             remaint:   []int64{0, 8, 9},
  268         },
  269         {
  270             Intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 700}},
  271             remaint:   []int64{0},
  272         },
  273         { // This case is to ensure that labels and symbols are deleted.
  274             Intervals: tombstones.Intervals{{Mint: 0, Maxt: 9}},
  275             remaint:   []int64{},
  276         },
  277     }
  278 
  279 Outer:
  280     for _, c := range cases {
  281         db, delete := openTestDB(t, nil)
  282         defer func() {
  283             testutil.Ok(t, db.Close())
  284             delete()
  285         }()
  286 
  287         app := db.Appender()
  288 
  289         smpls := make([]float64, numSamples)
  290         for i := int64(0); i < numSamples; i++ {
  291             smpls[i] = rand.Float64()
  292             app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
  293         }
  294 
  295         testutil.Ok(t, app.Commit())
  296 
  297         // TODO(gouthamve): Reset the tombstones somehow.
  298         // Delete the ranges.
  299         for _, r := range c.Intervals {
  300             testutil.Ok(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
  301         }
  302 
  303         // Compare the result.
  304         q, err := db.Querier(0, numSamples)
  305         testutil.Ok(t, err)
  306 
  307         res, err := q.Select(labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
  308         testutil.Ok(t, err)
  309 
  310         expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
  311         for _, ts := range c.remaint {
  312             expSamples = append(expSamples, sample{ts, smpls[ts]})
  313         }
  314 
  315         expss := newMockSeriesSet([]Series{
  316             newSeries(map[string]string{"a": "b"}, expSamples),
  317         })
  318 
  319         lns, err := q.LabelNames()
  320         testutil.Ok(t, err)
  321         lvs, err := q.LabelValues("a")
  322         testutil.Ok(t, err)
  323         if len(expSamples) == 0 {
  324             testutil.Equals(t, 0, len(lns))
  325             testutil.Equals(t, 0, len(lvs))
  326             testutil.Assert(t, res.Next() == false, "")
  327             continue
  328         } else {
  329             testutil.Equals(t, 1, len(lns))
  330             testutil.Equals(t, 1, len(lvs))
  331             testutil.Equals(t, "a", lns[0])
  332             testutil.Equals(t, "b", lvs[0])
  333         }
  334 
  335         for {
  336             eok, rok := expss.Next(), res.Next()
  337             testutil.Equals(t, eok, rok)
  338 
  339             if !eok {
  340                 continue Outer
  341             }
  342             sexp := expss.At()
  343             sres := res.At()
  344 
  345             testutil.Equals(t, sexp.Labels(), sres.Labels())
  346 
  347             smplExp, errExp := expandSeriesIterator(sexp.Iterator())
  348             smplRes, errRes := expandSeriesIterator(sres.Iterator())
  349 
  350             testutil.Equals(t, errExp, errRes)
  351             testutil.Equals(t, smplExp, smplRes)
  352         }
  353     }
  354 }
  355 
  356 func TestAmendDatapointCausesError(t *testing.T) {
  357     db, delete := openTestDB(t, nil)
  358     defer func() {
  359         testutil.Ok(t, db.Close())
  360         delete()
  361     }()
  362 
  363     app := db.Appender()
  364     _, err := app.Add(labels.Labels{}, 0, 0)
  365     testutil.Ok(t, err)
  366     testutil.Ok(t, app.Commit())
  367 
  368     app = db.Appender()
  369     _, err = app.Add(labels.Labels{}, 0, 1)
  370     testutil.Equals(t, ErrAmendSample, err)
  371     testutil.Ok(t, app.Rollback())
  372 }
  373 
  374 func TestDuplicateNaNDatapointNoAmendError(t *testing.T) {
  375     db, delete := openTestDB(t, nil)
  376     defer func() {
  377         testutil.Ok(t, db.Close())
  378         delete()
  379     }()
  380 
  381     app := db.Appender()
  382     _, err := app.Add(labels.Labels{}, 0, math.NaN())
  383     testutil.Ok(t, err)
  384     testutil.Ok(t, app.Commit())
  385 
  386     app = db.Appender()
  387     _, err = app.Add(labels.Labels{}, 0, math.NaN())
  388     testutil.Ok(t, err)
  389 }
  390 
  391 func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) {
  392     db, delete := openTestDB(t, nil)
  393     defer func() {
  394         testutil.Ok(t, db.Close())
  395         delete()
  396     }()
  397     app := db.Appender()
  398     _, err := app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001))
  399     testutil.Ok(t, err)
  400     testutil.Ok(t, app.Commit())
  401 
  402     app = db.Appender()
  403     _, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000002))
  404     testutil.Equals(t, ErrAmendSample, err)
  405 }
  406 
  407 func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
  408     db, delete := openTestDB(t, nil)
  409     defer func() {
  410         testutil.Ok(t, db.Close())
  411         delete()
  412     }()
  413 
  414     // Append AmendedValue.
  415     app := db.Appender()
  416     _, err := app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 1)
  417     testutil.Ok(t, err)
  418     _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 0, 2)
  419     testutil.Ok(t, err)
  420     testutil.Ok(t, app.Commit())
  421 
  422     // Make sure the right value is stored.
  423     q, err := db.Querier(0, 10)
  424     testutil.Ok(t, err)
  425 
  426     ssMap := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
  427 
  428     testutil.Equals(t, map[string][]tsdbutil.Sample{
  429         labels.New(labels.Label{Name: "a", Value: "b"}).String(): {sample{0, 1}},
  430     }, ssMap)
  431 
  432     // Append Out of Order Value.
  433     app = db.Appender()
  434     _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 10, 3)
  435     testutil.Ok(t, err)
  436     _, err = app.Add(labels.Labels{{Name: "a", Value: "b"}}, 7, 5)
  437     testutil.Ok(t, err)
  438     testutil.Ok(t, app.Commit())
  439 
  440     q, err = db.Querier(0, 10)
  441     testutil.Ok(t, err)
  442 
  443     ssMap = query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
  444 
  445     testutil.Equals(t, map[string][]tsdbutil.Sample{
  446         labels.New(labels.Label{Name: "a", Value: "b"}).String(): {sample{0, 1}, sample{10, 3}},
  447     }, ssMap)
  448 }
  449 
  450 func TestDB_Snapshot(t *testing.T) {
  451     db, delete := openTestDB(t, nil)
  452     defer delete()
  453 
  454     // append data
  455     app := db.Appender()
  456     mint := int64(1414141414000)
  457     for i := 0; i < 1000; i++ {
  458         _, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0)
  459         testutil.Ok(t, err)
  460     }
  461     testutil.Ok(t, app.Commit())
  462     testutil.Ok(t, app.Rollback())
  463 
  464     // create snapshot
  465     snap, err := ioutil.TempDir("", "snap")
  466     testutil.Ok(t, err)
  467 
  468     defer func() {
  469         testutil.Ok(t, os.RemoveAll(snap))
  470     }()
  471     testutil.Ok(t, db.Snapshot(snap, true))
  472     testutil.Ok(t, db.Close())
  473 
  474     // reopen DB from snapshot
  475     db, err = Open(snap, nil, nil, nil)
  476     testutil.Ok(t, err)
  477     defer func() { testutil.Ok(t, db.Close()) }()
  478 
  479     querier, err := db.Querier(mint, mint+1000)
  480     testutil.Ok(t, err)
  481     defer func() { testutil.Ok(t, querier.Close()) }()
  482 
  483     // sum values
  484     seriesSet, err := querier.Select(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
  485     testutil.Ok(t, err)
  486 
  487     sum := 0.0
  488     for seriesSet.Next() {
  489         series := seriesSet.At().Iterator()
  490         for series.Next() {
  491             _, v := series.At()
  492             sum += v
  493         }
  494         testutil.Ok(t, series.Err())
  495     }
  496     testutil.Ok(t, seriesSet.Err())
  497     testutil.Equals(t, 1000.0, sum)
  498 }
  499 
  500 // TestDB_Snapshot_ChunksOutsideOfCompactedRange ensures that a snapshot removes chunks samples
  501 // that are outside the set block time range.
  502 // See https://github.com/prometheus/prometheus/issues/5105
  503 func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
  504     db, delete := openTestDB(t, nil)
  505     defer delete()
  506 
  507     app := db.Appender()
  508     mint := int64(1414141414000)
  509     for i := 0; i < 1000; i++ {
  510         _, err := app.Add(labels.FromStrings("foo", "bar"), mint+int64(i), 1.0)
  511         testutil.Ok(t, err)
  512     }
  513     testutil.Ok(t, app.Commit())
  514     testutil.Ok(t, app.Rollback())
  515 
  516     snap, err := ioutil.TempDir("", "snap")
  517     testutil.Ok(t, err)
  518 
  519     // Hackingly introduce "race", by having lower max time then maxTime in last chunk.
  520     db.head.maxTime = db.head.maxTime - 10
  521 
  522     defer func() {
  523         testutil.Ok(t, os.RemoveAll(snap))
  524     }()
  525     testutil.Ok(t, db.Snapshot(snap, true))
  526     testutil.Ok(t, db.Close())
  527 
  528     // Reopen DB from snapshot.
  529     db, err = Open(snap, nil, nil, nil)
  530     testutil.Ok(t, err)
  531     defer func() { testutil.Ok(t, db.Close()) }()
  532 
  533     querier, err := db.Querier(mint, mint+1000)
  534     testutil.Ok(t, err)
  535     defer func() { testutil.Ok(t, querier.Close()) }()
  536 
  537     // Sum values.
  538     seriesSet, err := querier.Select(labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
  539     testutil.Ok(t, err)
  540 
  541     sum := 0.0
  542     for seriesSet.Next() {
  543         series := seriesSet.At().Iterator()
  544         for series.Next() {
  545             _, v := series.At()
  546             sum += v
  547         }
  548         testutil.Ok(t, series.Err())
  549     }
  550     testutil.Ok(t, seriesSet.Err())
  551 
  552     // Since we snapshotted with MaxTime - 10, so expect 10 less samples.
  553     testutil.Equals(t, 1000.0-10, sum)
  554 }
  555 
  556 func TestDB_SnapshotWithDelete(t *testing.T) {
  557     numSamples := int64(10)
  558 
  559     db, delete := openTestDB(t, nil)
  560     defer delete()
  561 
  562     app := db.Appender()
  563 
  564     smpls := make([]float64, numSamples)
  565     for i := int64(0); i < numSamples; i++ {
  566         smpls[i] = rand.Float64()
  567         app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
  568     }
  569 
  570     testutil.Ok(t, app.Commit())
  571     cases := []struct {
  572         intervals tombstones.Intervals
  573         remaint   []int64
  574     }{
  575         {
  576             intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 7}},
  577             remaint:   []int64{0, 8, 9},
  578         },
  579     }
  580 
  581 Outer:
  582     for _, c := range cases {
  583         // TODO(gouthamve): Reset the tombstones somehow.
  584         // Delete the ranges.
  585         for _, r := range c.intervals {
  586             testutil.Ok(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
  587         }
  588 
  589         // create snapshot
  590         snap, err := ioutil.TempDir("", "snap")
  591         testutil.Ok(t, err)
  592 
  593         defer func() {
  594             testutil.Ok(t, os.RemoveAll(snap))
  595         }()
  596         testutil.Ok(t, db.Snapshot(snap, true))
  597         testutil.Ok(t, db.Close())
  598 
  599         // reopen DB from snapshot
  600         db, err = Open(snap, nil, nil, nil)
  601         testutil.Ok(t, err)
  602         defer func() { testutil.Ok(t, db.Close()) }()
  603 
  604         // Compare the result.
  605         q, err := db.Querier(0, numSamples)
  606         testutil.Ok(t, err)
  607         defer func() { testutil.Ok(t, q.Close()) }()
  608 
  609         res, err := q.Select(labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
  610         testutil.Ok(t, err)
  611 
  612         expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
  613         for _, ts := range c.remaint {
  614             expSamples = append(expSamples, sample{ts, smpls[ts]})
  615         }
  616 
  617         expss := newMockSeriesSet([]Series{
  618             newSeries(map[string]string{"a": "b"}, expSamples),
  619         })
  620 
  621         if len(expSamples) == 0 {
  622             testutil.Assert(t, res.Next() == false, "")
  623             continue
  624         }
  625 
  626         for {
  627             eok, rok := expss.Next(), res.Next()
  628             testutil.Equals(t, eok, rok)
  629 
  630             if !eok {
  631                 continue Outer
  632             }
  633             sexp := expss.At()
  634             sres := res.At()
  635 
  636             testutil.Equals(t, sexp.Labels(), sres.Labels())
  637 
  638             smplExp, errExp := expandSeriesIterator(sexp.Iterator())
  639             smplRes, errRes := expandSeriesIterator(sres.Iterator())
  640 
  641             testutil.Equals(t, errExp, errRes)
  642             testutil.Equals(t, smplExp, smplRes)
  643         }
  644     }
  645 }
  646 
  647 func TestDB_e2e(t *testing.T) {
  648     const (
  649         numDatapoints = 1000
  650         numRanges     = 1000
  651         timeInterval  = int64(3)
  652     )
  653     // Create 8 series with 1000 data-points of different ranges and run queries.
  654     lbls := []labels.Labels{
  655         {
  656             {Name: "a", Value: "b"},
  657             {Name: "instance", Value: "localhost:9090"},
  658             {Name: "job", Value: "prometheus"},
  659         },
  660         {
  661             {Name: "a", Value: "b"},
  662             {Name: "instance", Value: "127.0.0.1:9090"},
  663             {Name: "job", Value: "prometheus"},
  664         },
  665         {
  666             {Name: "a", Value: "b"},
  667             {Name: "instance", Value: "127.0.0.1:9090"},
  668             {Name: "job", Value: "prom-k8s"},
  669         },
  670         {
  671             {Name: "a", Value: "b"},
  672             {Name: "instance", Value: "localhost:9090"},
  673             {Name: "job", Value: "prom-k8s"},
  674         },
  675         {
  676             {Name: "a", Value: "c"},
  677             {Name: "instance", Value: "localhost:9090"},
  678             {Name: "job", Value: "prometheus"},
  679         },
  680         {
  681             {Name: "a", Value: "c"},
  682             {Name: "instance", Value: "127.0.0.1:9090"},
  683             {Name: "job", Value: "prometheus"},
  684         },
  685         {
  686             {Name: "a", Value: "c"},
  687             {Name: "instance", Value: "127.0.0.1:9090"},
  688             {Name: "job", Value: "prom-k8s"},
  689         },
  690         {
  691             {Name: "a", Value: "c"},
  692             {Name: "instance", Value: "localhost:9090"},
  693             {Name: "job", Value: "prom-k8s"},
  694         },
  695     }
  696 
  697     seriesMap := map[string][]tsdbutil.Sample{}
  698     for _, l := range lbls {
  699         seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{}
  700     }
  701 
  702     db, delete := openTestDB(t, nil)
  703     defer func() {
  704         testutil.Ok(t, db.Close())
  705         delete()
  706     }()
  707 
  708     app := db.Appender()
  709 
  710     for _, l := range lbls {
  711         lset := labels.New(l...)
  712         series := []tsdbutil.Sample{}
  713 
  714         ts := rand.Int63n(300)
  715         for i := 0; i < numDatapoints; i++ {
  716             v := rand.Float64()
  717 
  718             series = append(series, sample{ts, v})
  719 
  720             _, err := app.Add(lset, ts, v)
  721             testutil.Ok(t, err)
  722 
  723             ts += rand.Int63n(timeInterval) + 1
  724         }
  725 
  726         seriesMap[lset.String()] = series
  727     }
  728 
  729     testutil.Ok(t, app.Commit())
  730 
  731     // Query each selector on 1000 random time-ranges.
  732     queries := []struct {
  733         ms []*labels.Matcher
  734     }{
  735         {
  736             ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "b")},
  737         },
  738         {
  739             ms: []*labels.Matcher{
  740                 labels.MustNewMatcher(labels.MatchEqual, "a", "b"),
  741                 labels.MustNewMatcher(labels.MatchEqual, "job", "prom-k8s"),
  742             },
  743         },
  744         {
  745             ms: []*labels.Matcher{
  746                 labels.MustNewMatcher(labels.MatchEqual, "a", "c"),
  747                 labels.MustNewMatcher(labels.MatchEqual, "instance", "localhost:9090"),
  748                 labels.MustNewMatcher(labels.MatchEqual, "job", "prometheus"),
  749             },
  750         },
  751         // TODO: Add Regexp Matchers.
  752     }
  753 
  754     for _, qry := range queries {
  755         matched := labels.Slice{}
  756         for _, ls := range lbls {
  757             s := labels.Selector(qry.ms)
  758             if s.Matches(ls) {
  759                 matched = append(matched, ls)
  760             }
  761         }
  762 
  763         sort.Sort(matched)
  764 
  765         for i := 0; i < numRanges; i++ {
  766             mint := rand.Int63n(300)
  767             maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints))
  768 
  769             expected := map[string][]tsdbutil.Sample{}
  770 
  771             // Build the mockSeriesSet.
  772             for _, m := range matched {
  773                 smpls := boundedSamples(seriesMap[m.String()], mint, maxt)
  774                 if len(smpls) > 0 {
  775                     expected[m.String()] = smpls
  776                 }
  777             }
  778 
  779             q, err := db.Querier(mint, maxt)
  780             testutil.Ok(t, err)
  781 
  782             ss, err := q.Select(qry.ms...)
  783             testutil.Ok(t, err)
  784 
  785             result := map[string][]tsdbutil.Sample{}
  786 
  787             for ss.Next() {
  788                 x := ss.At()
  789 
  790                 smpls, err := expandSeriesIterator(x.Iterator())
  791                 testutil.Ok(t, err)
  792 
  793                 if len(smpls) > 0 {
  794                     result[x.Labels().String()] = smpls
  795                 }
  796             }
  797 
  798             testutil.Ok(t, ss.Err())
  799             testutil.Equals(t, expected, result)
  800 
  801             q.Close()
  802         }
  803     }
  804 }
  805 
  806 func TestWALFlushedOnDBClose(t *testing.T) {
  807     db, delete := openTestDB(t, nil)
  808     defer delete()
  809 
  810     dirDb := db.Dir()
  811 
  812     lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}}
  813 
  814     app := db.Appender()
  815     _, err := app.Add(lbls, 0, 1)
  816     testutil.Ok(t, err)
  817     testutil.Ok(t, app.Commit())
  818 
  819     testutil.Ok(t, db.Close())
  820 
  821     db, err = Open(dirDb, nil, nil, nil)
  822     testutil.Ok(t, err)
  823     defer func() { testutil.Ok(t, db.Close()) }()
  824 
  825     q, err := db.Querier(0, 1)
  826     testutil.Ok(t, err)
  827 
  828     values, err := q.LabelValues("labelname")
  829     testutil.Ok(t, err)
  830     testutil.Equals(t, []string{"labelvalue"}, values)
  831 }
  832 
  833 func TestWALSegmentSizeOptions(t *testing.T) {
  834     tests := map[int]func(dbdir string, segmentSize int){
  835         // Default Wal Size.
  836         0: func(dbDir string, segmentSize int) {
  837             files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal"))
  838             testutil.Ok(t, err)
  839             for _, f := range files[:len(files)-1] {
  840                 testutil.Equals(t, int64(DefaultOptions.WALSegmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name())
  841             }
  842             lastFile := files[len(files)-1]
  843             testutil.Assert(t, int64(DefaultOptions.WALSegmentSize) > lastFile.Size(), "last WAL file size is not smaller than the WALSegmentSize option, filename: %v", lastFile.Name())
  844         },
  845         // Custom Wal Size.
  846         2 * 32 * 1024: func(dbDir string, segmentSize int) {
  847             files, err := ioutil.ReadDir(filepath.Join(dbDir, "wal"))
  848             testutil.Assert(t, len(files) > 1, "current WALSegmentSize should result in more than a single WAL file.")
  849             testutil.Ok(t, err)
  850             for _, f := range files[:len(files)-1] {
  851                 testutil.Equals(t, int64(segmentSize), f.Size(), "WAL file size doesn't match WALSegmentSize option, filename: %v", f.Name())
  852             }
  853             lastFile := files[len(files)-1]
  854             testutil.Assert(t, int64(segmentSize) > lastFile.Size(), "last WAL file size is not smaller than the WALSegmentSize option, filename: %v", lastFile.Name())
  855         },
  856         // Wal disabled.
  857         -1: func(dbDir string, segmentSize int) {
  858             if _, err := os.Stat(filepath.Join(dbDir, "wal")); !os.IsNotExist(err) {
  859                 t.Fatal("wal directory is present when the wal is disabled")
  860             }
  861         },
  862     }
  863     for segmentSize, testFunc := range tests {
  864         t.Run(fmt.Sprintf("WALSegmentSize %d test", segmentSize), func(t *testing.T) {
  865             options := *DefaultOptions
  866             options.WALSegmentSize = segmentSize
  867             db, delete := openTestDB(t, &options)
  868             defer delete()
  869             app := db.Appender()
  870             for i := int64(0); i < 155; i++ {
  871                 _, err := app.Add(labels.Labels{labels.Label{Name: "wal", Value: "size"}}, i, rand.Float64())
  872                 testutil.Ok(t, err)
  873                 testutil.Ok(t, app.Commit())
  874             }
  875 
  876             dbDir := db.Dir()
  877             db.Close()
  878             testFunc(dbDir, options.WALSegmentSize)
  879         })
  880     }
  881 }
  882 
  883 func TestTombstoneClean(t *testing.T) {
  884     numSamples := int64(10)
  885 
  886     db, delete := openTestDB(t, nil)
  887     defer delete()
  888 
  889     app := db.Appender()
  890 
  891     smpls := make([]float64, numSamples)
  892     for i := int64(0); i < numSamples; i++ {
  893         smpls[i] = rand.Float64()
  894         app.Add(labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i])
  895     }
  896 
  897     testutil.Ok(t, app.Commit())
  898     cases := []struct {
  899         intervals tombstones.Intervals
  900         remaint   []int64
  901     }{
  902         {
  903             intervals: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 7}},
  904             remaint:   []int64{0, 8, 9},
  905         },
  906     }
  907 
  908     for _, c := range cases {
  909         // Delete the ranges.
  910 
  911         // create snapshot
  912         snap, err := ioutil.TempDir("", "snap")
  913         testutil.Ok(t, err)
  914 
  915         defer func() {
  916             testutil.Ok(t, os.RemoveAll(snap))
  917         }()
  918         testutil.Ok(t, db.Snapshot(snap, true))
  919         testutil.Ok(t, db.Close())
  920 
  921         // reopen DB from snapshot
  922         db, err = Open(snap, nil, nil, nil)
  923         testutil.Ok(t, err)
  924         defer db.Close()
  925 
  926         for _, r := range c.intervals {
  927             testutil.Ok(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
  928         }
  929 
  930         // All of the setup for THIS line.
  931         testutil.Ok(t, db.CleanTombstones())
  932 
  933         // Compare the result.
  934         q, err := db.Querier(0, numSamples)
  935         testutil.Ok(t, err)
  936         defer q.Close()
  937 
  938         res, err := q.Select(labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
  939         testutil.Ok(t, err)
  940 
  941         expSamples := make([]tsdbutil.Sample, 0, len(c.remaint))
  942         for _, ts := range c.remaint {
  943             expSamples = append(expSamples, sample{ts, smpls[ts]})
  944         }
  945 
  946         expss := newMockSeriesSet([]Series{
  947             newSeries(map[string]string{"a": "b"}, expSamples),
  948         })
  949 
  950         if len(expSamples) == 0 {
  951             testutil.Assert(t, res.Next() == false, "")
  952             continue
  953         }
  954 
  955         for {
  956             eok, rok := expss.Next(), res.Next()
  957             testutil.Equals(t, eok, rok)
  958 
  959             if !eok {
  960                 break
  961             }
  962             sexp := expss.At()
  963             sres := res.At()
  964 
  965             testutil.Equals(t, sexp.Labels(), sres.Labels())
  966 
  967             smplExp, errExp := expandSeriesIterator(sexp.Iterator())
  968             smplRes, errRes := expandSeriesIterator(sres.Iterator())
  969 
  970             testutil.Equals(t, errExp, errRes)
  971             testutil.Equals(t, smplExp, smplRes)
  972         }
  973 
  974         for _, b := range db.Blocks() {
  975             testutil.Equals(t, tombstones.NewMemTombstones(), b.tombstones)
  976         }
  977     }
  978 }
  979 
  980 // TestTombstoneCleanFail tests that a failing TombstoneClean doesn't leave any blocks behind.
  981 // When TombstoneClean errors the original block that should be rebuilt doesn't get deleted so
  982 // if TombstoneClean leaves any blocks behind these will overlap.
  983 func TestTombstoneCleanFail(t *testing.T) {
  984 
  985     db, delete := openTestDB(t, nil)
  986     defer func() {
  987         testutil.Ok(t, db.Close())
  988         delete()
  989     }()
  990 
  991     var expectedBlockDirs []string
  992 
  993     // Create some empty blocks pending for compaction.
  994     // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
  995     totalBlocks := 2
  996     for i := 0; i < totalBlocks; i++ {
  997         blockDir := createBlock(t, db.Dir(), genSeries(1, 1, 0, 1))
  998         block, err := OpenBlock(nil, blockDir, nil)
  999         testutil.Ok(t, err)
 1000         // Add some some fake tombstones to trigger the compaction.
 1001         tomb := tombstones.NewMemTombstones()
 1002         tomb.AddInterval(0, tombstones.Interval{Mint: 0, Maxt: 1})
 1003         block.tombstones = tomb
 1004 
 1005         db.blocks = append(db.blocks, block)
 1006         expectedBlockDirs = append(expectedBlockDirs, blockDir)
 1007     }
 1008 
 1009     // Initialize the mockCompactorFailing with a room for a single compaction iteration.
 1010     // mockCompactorFailing will fail on the second iteration so we can check if the cleanup works as expected.
 1011     db.compactor = &mockCompactorFailing{
 1012         t:      t,
 1013         blocks: db.blocks,
 1014         max:    totalBlocks + 1,
 1015     }
 1016 
 1017     // The compactor should trigger a failure here.
 1018     testutil.NotOk(t, db.CleanTombstones())
 1019 
 1020     // Now check that the CleanTombstones didn't leave any blocks behind after a failure.
 1021     actualBlockDirs, err := blockDirs(db.dir)
 1022     testutil.Ok(t, err)
 1023     testutil.Equals(t, expectedBlockDirs, actualBlockDirs)
 1024 }
 1025 
 1026 // mockCompactorFailing creates a new empty block on every write and fails when reached the max allowed total.
 1027 type mockCompactorFailing struct {
 1028     t      *testing.T
 1029     blocks []*Block
 1030     max    int
 1031 }
 1032 
 1033 func (*mockCompactorFailing) Plan(dir string) ([]string, error) {
 1034     return nil, nil
 1035 }
 1036 func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
 1037     if len(c.blocks) >= c.max {
 1038         return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
 1039     }
 1040 
 1041     block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil)
 1042     testutil.Ok(c.t, err)
 1043     testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere.
 1044     c.blocks = append(c.blocks, block)
 1045 
 1046     // Now check that all expected blocks are actually persisted on disk.
 1047     // This way we make sure that the we have some blocks that are supposed to be removed.
 1048     var expectedBlocks []string
 1049     for _, b := range c.blocks {
 1050         expectedBlocks = append(expectedBlocks, filepath.Join(dest, b.Meta().ULID.String()))
 1051     }
 1052     actualBlockDirs, err := blockDirs(dest)
 1053     testutil.Ok(c.t, err)
 1054 
 1055     testutil.Equals(c.t, expectedBlocks, actualBlockDirs)
 1056 
 1057     return block.Meta().ULID, nil
 1058 }
 1059 
 1060 func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) {
 1061     return ulid.ULID{}, nil
 1062 
 1063 }
 1064 
 1065 func TestTimeRetention(t *testing.T) {
 1066     db, delete := openTestDB(t, &Options{
 1067         BlockRanges: []int64{1000},
 1068     })
 1069     defer func() {
 1070         testutil.Ok(t, db.Close())
 1071         delete()
 1072     }()
 1073 
 1074     blocks := []*BlockMeta{
 1075         {MinTime: 500, MaxTime: 900}, // Oldest block
 1076         {MinTime: 1000, MaxTime: 1500},
 1077         {MinTime: 1500, MaxTime: 2000}, // Newest Block
 1078     }
 1079 
 1080     for _, m := range blocks {
 1081         createBlock(t, db.Dir(), genSeries(10, 10, m.MinTime, m.MaxTime))
 1082     }
 1083 
 1084     testutil.Ok(t, db.reload())                       // Reload the db to register the new blocks.
 1085     testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered.
 1086 
 1087     db.opts.RetentionDuration = uint64(blocks[2].MaxTime - blocks[1].MinTime)
 1088     testutil.Ok(t, db.reload())
 1089 
 1090     expBlocks := blocks[1:]
 1091     actBlocks := db.Blocks()
 1092 
 1093     testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.metrics.timeRetentionCount)), "metric retention count mismatch")
 1094     testutil.Equals(t, len(expBlocks), len(actBlocks))
 1095     testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime)
 1096     testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime)
 1097 }
 1098 
 1099 func TestSizeRetention(t *testing.T) {
 1100     db, delete := openTestDB(t, &Options{
 1101         BlockRanges: []int64{100},
 1102     })
 1103     defer func() {
 1104         testutil.Ok(t, db.Close())
 1105         delete()
 1106     }()
 1107 
 1108     blocks := []*BlockMeta{
 1109         {MinTime: 100, MaxTime: 200}, // Oldest block
 1110         {MinTime: 200, MaxTime: 300},
 1111         {MinTime: 300, MaxTime: 400},
 1112         {MinTime: 400, MaxTime: 500},
 1113         {MinTime: 500, MaxTime: 600}, // Newest Block
 1114     }
 1115 
 1116     for _, m := range blocks {
 1117         createBlock(t, db.Dir(), genSeries(100, 10, m.MinTime, m.MaxTime))
 1118     }
 1119 
 1120     headBlocks := []*BlockMeta{
 1121         {MinTime: 700, MaxTime: 800},
 1122     }
 1123 
 1124     // Add some data to the WAL.
 1125     headApp := db.Head().Appender()
 1126     for _, m := range headBlocks {
 1127         series := genSeries(100, 10, m.MinTime, m.MaxTime)
 1128         for _, s := range series {
 1129             it := s.Iterator()
 1130             for it.Next() {
 1131                 tim, v := it.At()
 1132                 _, err := headApp.Add(s.Labels(), tim, v)
 1133                 testutil.Ok(t, err)
 1134             }
 1135             testutil.Ok(t, it.Err())
 1136         }
 1137     }
 1138     testutil.Ok(t, headApp.Commit())
 1139 
 1140     // Test that registered size matches the actual disk size.
 1141     testutil.Ok(t, db.reload())                                         // Reload the db to register the new db size.
 1142     testutil.Equals(t, len(blocks), len(db.Blocks()))                   // Ensure all blocks are registered.
 1143     blockSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the the actual internal metrics.
 1144     walSize, err := db.Head().wal.Size()
 1145     testutil.Ok(t, err)
 1146     // Expected size should take into account block size + WAL size
 1147     expSize := blockSize + walSize
 1148     actSize, err := fileutil.DirSize(db.Dir())
 1149     testutil.Ok(t, err)
 1150     testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size")
 1151 
 1152     // Create a WAL checkpoint, and compare sizes.
 1153     first, last, err := db.Head().wal.Segments()
 1154     testutil.Ok(t, err)
 1155     _, err = wal.Checkpoint(db.Head().wal, first, last-1, func(x uint64) bool { return false }, 0)
 1156     testutil.Ok(t, err)
 1157     blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the the actual internal metrics.
 1158     walSize, err = db.Head().wal.Size()
 1159     testutil.Ok(t, err)
 1160     expSize = blockSize + walSize
 1161     actSize, err = fileutil.DirSize(db.Dir())
 1162     testutil.Ok(t, err)
 1163     testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size")
 1164 
 1165     // Decrease the max bytes limit so that a delete is triggered.
 1166     // Check total size, total count and check that the oldest block was deleted.
 1167     firstBlockSize := db.Blocks()[0].Size()
 1168     sizeLimit := actSize - firstBlockSize
 1169     db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size.
 1170     testutil.Ok(t, db.reload())  // Reload the db to register the new db size.
 1171 
 1172     expBlocks := blocks[1:]
 1173     actBlocks := db.Blocks()
 1174     blockSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes))
 1175     walSize, err = db.Head().wal.Size()
 1176     testutil.Ok(t, err)
 1177     // Expected size should take into account block size + WAL size
 1178     expSize = blockSize + walSize
 1179     actRetentCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount))
 1180     actSize, err = fileutil.DirSize(db.Dir())
 1181     testutil.Ok(t, err)
 1182 
 1183     testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch")
 1184     testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size")
 1185     testutil.Assert(t, expSize <= sizeLimit, "actual size (%v) is expected to be less than or equal to limit (%v)", expSize, sizeLimit)
 1186     testutil.Equals(t, len(blocks)-1, len(actBlocks), "new block count should be decreased from:%v to:%v", len(blocks), len(blocks)-1)
 1187     testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime, "maxT mismatch of the first block")
 1188     testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime, "maxT mismatch of the last block")
 1189 }
 1190 
 1191 func TestSizeRetentionMetric(t *testing.T) {
 1192     cases := []struct {
 1193         maxBytes    int64
 1194         expMaxBytes int64
 1195     }{
 1196         {maxBytes: 1000, expMaxBytes: 1000},
 1197         {maxBytes: 0, expMaxBytes: 0},
 1198         {maxBytes: -1000, expMaxBytes: 0},
 1199     }
 1200 
 1201     for _, c := range cases {
 1202         db, delete := openTestDB(t, &Options{
 1203             BlockRanges: []int64{100},
 1204             MaxBytes:    c.maxBytes,
 1205         })
 1206 
 1207         actMaxBytes := int64(prom_testutil.ToFloat64(db.metrics.maxBytes))
 1208         testutil.Equals(t, actMaxBytes, c.expMaxBytes, "metric retention limit bytes mismatch")
 1209 
 1210         testutil.Ok(t, db.Close())
 1211         delete()
 1212     }
 1213 }
 1214 
 1215 func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
 1216     db, delete := openTestDB(t, nil)
 1217     defer func() {
 1218         testutil.Ok(t, db.Close())
 1219         delete()
 1220     }()
 1221 
 1222     labelpairs := []labels.Labels{
 1223         labels.FromStrings("a", "abcd", "b", "abcde"),
 1224         labels.FromStrings("labelname", "labelvalue"),
 1225     }
 1226 
 1227     app := db.Appender()
 1228     for _, lbls := range labelpairs {
 1229         _, err := app.Add(lbls, 0, 1)
 1230         testutil.Ok(t, err)
 1231     }
 1232     testutil.Ok(t, app.Commit())
 1233 
 1234     cases := []struct {
 1235         selector labels.Selector
 1236         series   []labels.Labels
 1237     }{{
 1238         selector: labels.Selector{
 1239             labels.MustNewMatcher(labels.MatchNotEqual, "lname", "lvalue"),
 1240         },
 1241         series: labelpairs,
 1242     }, {
 1243         selector: labels.Selector{
 1244             labels.MustNewMatcher(labels.MatchEqual, "a", "abcd"),
 1245             labels.MustNewMatcher(labels.MatchNotEqual, "b", "abcde"),
 1246         },
 1247         series: []labels.Labels{},
 1248     }, {
 1249         selector: labels.Selector{
 1250             labels.MustNewMatcher(labels.MatchEqual, "a", "abcd"),
 1251             labels.MustNewMatcher(labels.MatchNotEqual, "b", "abc"),
 1252         },
 1253         series: []labels.Labels{labelpairs[0]},
 1254     }, {
 1255         selector: labels.Selector{
 1256             labels.MustNewMatcher(labels.MatchNotRegexp, "a", "abd.*"),
 1257         },
 1258         series: labelpairs,
 1259     }, {
 1260         selector: labels.Selector{
 1261             labels.MustNewMatcher(labels.MatchNotRegexp, "a", "abc.*"),
 1262         },
 1263         series: labelpairs[1:],
 1264     }, {
 1265         selector: labels.Selector{
 1266             labels.MustNewMatcher(labels.MatchNotRegexp, "c", "abd.*"),
 1267         },
 1268         series: labelpairs,
 1269     }, {
 1270         selector: labels.Selector{
 1271             labels.MustNewMatcher(labels.MatchNotRegexp, "labelname", "labelvalue"),
 1272         },
 1273         series: labelpairs[:1],
 1274     }}
 1275 
 1276     q, err := db.Querier(0, 10)
 1277     testutil.Ok(t, err)
 1278     defer func() { testutil.Ok(t, q.Close()) }()
 1279 
 1280     for _, c := range cases {
 1281         ss, err := q.Select(c.selector...)
 1282         testutil.Ok(t, err)
 1283 
 1284         lres, err := expandSeriesSet(ss)
 1285         testutil.Ok(t, err)
 1286 
 1287         testutil.Equals(t, c.series, lres)
 1288     }
 1289 }
 1290 
 1291 func expandSeriesSet(ss SeriesSet) ([]labels.Labels, error) {
 1292     result := []labels.Labels{}
 1293     for ss.Next() {
 1294         result = append(result, ss.At().Labels())
 1295     }
 1296 
 1297     return result, ss.Err()
 1298 }
 1299 
 1300 func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
 1301     // Create 10 blocks that does not overlap (0-10, 10-20, ..., 100-110) but in reverse order to ensure our algorithm
 1302     // will handle that.
 1303     var metas = make([]BlockMeta, 11)
 1304     for i := 10; i >= 0; i-- {
 1305         metas[i] = BlockMeta{MinTime: int64(i * 10), MaxTime: int64((i + 1) * 10)}
 1306     }
 1307 
 1308     testutil.Assert(t, len(OverlappingBlocks(metas)) == 0, "we found unexpected overlaps")
 1309 
 1310     // Add overlapping blocks. We've to establish order again since we aren't interested
 1311     // in trivial overlaps caused by unorderedness.
 1312     add := func(ms ...BlockMeta) []BlockMeta {
 1313         repl := append(append([]BlockMeta{}, metas...), ms...)
 1314         sort.Slice(repl, func(i, j int) bool {
 1315             return repl[i].MinTime < repl[j].MinTime
 1316         })
 1317         return repl
 1318     }
 1319 
 1320     // o1 overlaps with 10-20.
 1321     o1 := BlockMeta{MinTime: 15, MaxTime: 17}
 1322     testutil.Equals(t, Overlaps{
 1323         {Min: 15, Max: 17}: {metas[1], o1},
 1324     }, OverlappingBlocks(add(o1)))
 1325 
 1326     // o2 overlaps with 20-30 and 30-40.
 1327     o2 := BlockMeta{MinTime: 21, MaxTime: 31}
 1328     testutil.Equals(t, Overlaps{
 1329         {Min: 21, Max: 30}: {metas[2], o2},
 1330         {Min: 30, Max: 31}: {o2, metas[3]},
 1331     }, OverlappingBlocks(add(o2)))
 1332 
 1333     // o3a and o3b overlaps with 30-40 and each other.
 1334     o3a := BlockMeta{MinTime: 33, MaxTime: 39}
 1335     o3b := BlockMeta{MinTime: 34, MaxTime: 36}
 1336     testutil.Equals(t, Overlaps{
 1337         {Min: 34, Max: 36}: {metas[3], o3a, o3b},
 1338     }, OverlappingBlocks(add(o3a, o3b)))
 1339 
 1340     // o4 is 1:1 overlap with 50-60.
 1341     o4 := BlockMeta{MinTime: 50, MaxTime: 60}
 1342     testutil.Equals(t, Overlaps{
 1343         {Min: 50, Max: 60}: {metas[5], o4},
 1344     }, OverlappingBlocks(add(o4)))
 1345 
 1346     // o5 overlaps with 60-70, 70-80 and 80-90.
 1347     o5 := BlockMeta{MinTime: 61, MaxTime: 85}
 1348     testutil.Equals(t, Overlaps{
 1349         {Min: 61, Max: 70}: {metas[6], o5},
 1350         {Min: 70, Max: 80}: {o5, metas[7]},
 1351         {Min: 80, Max: 85}: {o5, metas[8]},
 1352     }, OverlappingBlocks(add(o5)))
 1353 
 1354     // o6a overlaps with 90-100, 100-110 and o6b, o6b overlaps with 90-100 and o6a.
 1355     o6a := BlockMeta{MinTime: 92, MaxTime: 105}
 1356     o6b := BlockMeta{MinTime: 94, MaxTime: 99}
 1357     testutil.Equals(t, Overlaps{
 1358         {Min: 94, Max: 99}:   {metas[9], o6a, o6b},
 1359         {Min: 100, Max: 105}: {o6a, metas[10]},
 1360     }, OverlappingBlocks(add(o6a, o6b)))
 1361 
 1362     // All together.
 1363     testutil.Equals(t, Overlaps{
 1364         {Min: 15, Max: 17}: {metas[1], o1},
 1365         {Min: 21, Max: 30}: {metas[2], o2}, {Min: 30, Max: 31}: {o2, metas[3]},
 1366         {Min: 34, Max: 36}: {metas[3], o3a, o3b},
 1367         {Min: 50, Max: 60}: {metas[5], o4},
 1368         {Min: 61, Max: 70}: {metas[6], o5}, {Min: 70, Max: 80}: {o5, metas[7]}, {Min: 80, Max: 85}: {o5, metas[8]},
 1369         {Min: 94, Max: 99}: {metas[9], o6a, o6b}, {Min: 100, Max: 105}: {o6a, metas[10]},
 1370     }, OverlappingBlocks(add(o1, o2, o3a, o3b, o4, o5, o6a, o6b)))
 1371 
 1372     // Additional case.
 1373     var nc1 []BlockMeta
 1374     nc1 = append(nc1, BlockMeta{MinTime: 1, MaxTime: 5})
 1375     nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3})
 1376     nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3})
 1377     nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3})
 1378     nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3})
 1379     nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 6})
 1380     nc1 = append(nc1, BlockMeta{MinTime: 3, MaxTime: 5})
 1381     nc1 = append(nc1, BlockMeta{MinTime: 5, MaxTime: 7})
 1382     nc1 = append(nc1, BlockMeta{MinTime: 7, MaxTime: 10})
 1383     nc1 = append(nc1, BlockMeta{MinTime: 8, MaxTime: 9})
 1384     testutil.Equals(t, Overlaps{
 1385         {Min: 2, Max: 3}: {nc1[0], nc1[1], nc1[2], nc1[3], nc1[4], nc1[5]}, // 1-5, 2-3, 2-3, 2-3, 2-3, 2,6
 1386         {Min: 3, Max: 5}: {nc1[0], nc1[5], nc1[6]},                         // 1-5, 2-6, 3-5
 1387         {Min: 5, Max: 6}: {nc1[5], nc1[7]},                                 // 2-6, 5-7
 1388         {Min: 8, Max: 9}: {nc1[8], nc1[9]},                                 // 7-10, 8-9
 1389     }, OverlappingBlocks(nc1))
 1390 }
 1391 
 1392 // Regression test for https://github.com/prometheus/prometheus/tsdb/issues/347
 1393 func TestChunkAtBlockBoundary(t *testing.T) {
 1394     db, delete := openTestDB(t, nil)
 1395     defer func() {
 1396         testutil.Ok(t, db.Close())
 1397         delete()
 1398     }()
 1399 
 1400     app := db.Appender()
 1401 
 1402     blockRange := DefaultOptions.BlockRanges[0]
 1403     label := labels.FromStrings("foo", "bar")
 1404 
 1405     for i := int64(0); i < 3; i++ {
 1406         _, err := app.Add(label, i*blockRange, 0)
 1407         testutil.Ok(t, err)
 1408         _, err = app.Add(label, i*blockRange+1000, 0)
 1409         testutil.Ok(t, err)
 1410     }
 1411 
 1412     err := app.Commit()
 1413     testutil.Ok(t, err)
 1414 
 1415     err = db.compact()
 1416     testutil.Ok(t, err)
 1417 
 1418     for _, block := range db.Blocks() {
 1419         r, err := block.Index()
 1420         testutil.Ok(t, err)
 1421         defer r.Close()
 1422 
 1423         meta := block.Meta()
 1424 
 1425         k, v := index.AllPostingsKey()
 1426         p, err := r.Postings(k, v)
 1427         testutil.Ok(t, err)
 1428 
 1429         var (
 1430             lset labels.Labels
 1431             chks []chunks.Meta
 1432         )
 1433 
 1434         chunkCount := 0
 1435 
 1436         for p.Next() {
 1437             err = r.Series(p.At(), &lset, &chks)
 1438             testutil.Ok(t, err)
 1439             for _, c := range chks {
 1440                 testutil.Assert(t, meta.MinTime <= c.MinTime && c.MaxTime <= meta.MaxTime,
 1441                     "chunk spans beyond block boundaries: [block.MinTime=%d, block.MaxTime=%d]; [chunk.MinTime=%d, chunk.MaxTime=%d]",
 1442                     meta.MinTime, meta.MaxTime, c.MinTime, c.MaxTime)
 1443                 chunkCount++
 1444             }
 1445         }
 1446         testutil.Assert(t, chunkCount == 1, "expected 1 chunk in block %s, got %d", meta.ULID, chunkCount)
 1447     }
 1448 }
 1449 
 1450 func TestQuerierWithBoundaryChunks(t *testing.T) {
 1451     db, delete := openTestDB(t, nil)
 1452     defer func() {
 1453         testutil.Ok(t, db.Close())
 1454         delete()
 1455     }()
 1456 
 1457     app := db.Appender()
 1458 
 1459     blockRange := DefaultOptions.BlockRanges[0]
 1460     label := labels.FromStrings("foo", "bar")
 1461 
 1462     for i := int64(0); i < 5; i++ {
 1463         _, err := app.Add(label, i*blockRange, 0)
 1464         testutil.Ok(t, err)
 1465     }
 1466 
 1467     err := app.Commit()
 1468     testutil.Ok(t, err)
 1469 
 1470     err = db.compact()
 1471     testutil.Ok(t, err)
 1472 
 1473     testutil.Assert(t, len(db.blocks) >= 3, "invalid test, less than three blocks in DB")
 1474 
 1475     q, err := db.Querier(blockRange, 2*blockRange)
 1476     testutil.Ok(t, err)
 1477     defer q.Close()
 1478 
 1479     // The requested interval covers 2 blocks, so the querier should contain 2 blocks.
 1480     count := len(q.(*querier).blocks)
 1481     testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count)
 1482 }
 1483 
 1484 // TestInitializeHeadTimestamp ensures that the h.minTime is set properly.
 1485 //  - no blocks no WAL: set to the time of the first  appended sample
 1486 //  - no blocks with WAL: set to the smallest sample from the WAL
 1487 //  - with blocks no WAL: set to the last block maxT
 1488 //  - with blocks with WAL: same as above
 1489 func TestInitializeHeadTimestamp(t *testing.T) {
 1490     t.Run("clean", func(t *testing.T) {
 1491         dir, err := ioutil.TempDir("", "test_head_init")
 1492         testutil.Ok(t, err)
 1493         defer func() {
 1494             testutil.Ok(t, os.RemoveAll(dir))
 1495         }()
 1496 
 1497         db, err := Open(dir, nil, nil, nil)
 1498         testutil.Ok(t, err)
 1499         defer db.Close()
 1500 
 1501         // Should be set to init values if no WAL or blocks exist so far.
 1502         testutil.Equals(t, int64(math.MaxInt64), db.head.MinTime())
 1503         testutil.Equals(t, int64(math.MinInt64), db.head.MaxTime())
 1504 
 1505         // First added sample initializes the writable range.
 1506         app := db.Appender()
 1507         _, err = app.Add(labels.FromStrings("a", "b"), 1000, 1)
 1508         testutil.Ok(t, err)
 1509 
 1510         testutil.Equals(t, int64(1000), db.head.MinTime())
 1511         testutil.Equals(t, int64(1000), db.head.MaxTime())
 1512     })
 1513     t.Run("wal-only", func(t *testing.T) {
 1514         dir, err := ioutil.TempDir("", "test_head_init")
 1515         testutil.Ok(t, err)
 1516         defer func() {
 1517             testutil.Ok(t, os.RemoveAll(dir))
 1518         }()
 1519 
 1520         testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
 1521         w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
 1522         testutil.Ok(t, err)
 1523 
 1524         var enc record.Encoder
 1525         err = w.Log(
 1526             enc.Series([]record.RefSeries{
 1527                 {Ref: 123, Labels: labels.FromStrings("a", "1")},
 1528                 {Ref: 124, Labels: labels.FromStrings("a", "2")},
 1529             }, nil),
 1530             enc.Samples([]record.RefSample{
 1531                 {Ref: 123, T: 5000, V: 1},
 1532                 {Ref: 124, T: 15000, V: 1},
 1533             }, nil),
 1534         )
 1535         testutil.Ok(t, err)
 1536         testutil.Ok(t, w.Close())
 1537 
 1538         db, err := Open(dir, nil, nil, nil)
 1539         testutil.Ok(t, err)
 1540         defer db.Close()
 1541 
 1542         testutil.Equals(t, int64(5000), db.head.MinTime())
 1543         testutil.Equals(t, int64(15000), db.head.MaxTime())
 1544     })
 1545     t.Run("existing-block", func(t *testing.T) {
 1546         dir, err := ioutil.TempDir("", "test_head_init")
 1547         testutil.Ok(t, err)
 1548         defer func() {
 1549             testutil.Ok(t, os.RemoveAll(dir))
 1550         }()
 1551 
 1552         createBlock(t, dir, genSeries(1, 1, 1000, 2000))
 1553 
 1554         db, err := Open(dir, nil, nil, nil)
 1555         testutil.Ok(t, err)
 1556         defer db.Close()
 1557 
 1558         testutil.Equals(t, int64(2000), db.head.MinTime())
 1559         testutil.Equals(t, int64(2000), db.head.MaxTime())
 1560     })
 1561     t.Run("existing-block-and-wal", func(t *testing.T) {
 1562         dir, err := ioutil.TempDir("", "test_head_init")
 1563         testutil.Ok(t, err)
 1564         defer func() {
 1565             testutil.Ok(t, os.RemoveAll(dir))
 1566         }()
 1567 
 1568         createBlock(t, dir, genSeries(1, 1, 1000, 6000))
 1569 
 1570         testutil.Ok(t, os.MkdirAll(path.Join(dir, "wal"), 0777))
 1571         w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
 1572         testutil.Ok(t, err)
 1573 
 1574         var enc record.Encoder
 1575         err = w.Log(
 1576             enc.Series([]record.RefSeries{
 1577                 {Ref: 123, Labels: labels.FromStrings("a", "1")},
 1578                 {Ref: 124, Labels: labels.FromStrings("a", "2")},
 1579             }, nil),
 1580             enc.Samples([]record.RefSample{
 1581                 {Ref: 123, T: 5000, V: 1},
 1582                 {Ref: 124, T: 15000, V: 1},
 1583             }, nil),
 1584         )
 1585         testutil.Ok(t, err)
 1586         testutil.Ok(t, w.Close())
 1587 
 1588         r := prometheus.NewRegistry()
 1589 
 1590         db, err := Open(dir, nil, r, nil)
 1591         testutil.Ok(t, err)
 1592         defer db.Close()
 1593 
 1594         testutil.Equals(t, int64(6000), db.head.MinTime())
 1595         testutil.Equals(t, int64(15000), db.head.MaxTime())
 1596         // Check that old series has been GCed.
 1597         testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.series))
 1598     })
 1599 }
 1600 
 1601 func TestNoEmptyBlocks(t *testing.T) {
 1602     db, delete := openTestDB(t, &Options{
 1603         BlockRanges: []int64{100},
 1604     })
 1605     defer func() {
 1606         testutil.Ok(t, db.Close())
 1607         delete()
 1608     }()
 1609     db.DisableCompactions()
 1610 
 1611     rangeToTriggerCompaction := db.opts.BlockRanges[0]/2*3 - 1
 1612     defaultLabel := labels.FromStrings("foo", "bar")
 1613     defaultMatcher := labels.MustNewMatcher(labels.MatchRegexp, "", ".*")
 1614 
 1615     t.Run("Test no blocks after compact with empty head.", func(t *testing.T) {
 1616         testutil.Ok(t, db.compact())
 1617         actBlocks, err := blockDirs(db.Dir())
 1618         testutil.Ok(t, err)
 1619         testutil.Equals(t, len(db.Blocks()), len(actBlocks))
 1620         testutil.Equals(t, 0, len(actBlocks))
 1621         testutil.Equals(t, 0, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "no compaction should be triggered here")
 1622     })
 1623 
 1624     t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) {
 1625         app := db.Appender()
 1626         _, err := app.Add(defaultLabel, 1, 0)
 1627         testutil.Ok(t, err)
 1628         _, err = app.Add(defaultLabel, 2, 0)
 1629         testutil.Ok(t, err)
 1630         _, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0)
 1631         testutil.Ok(t, err)
 1632         testutil.Ok(t, app.Commit())
 1633         testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
 1634         testutil.Ok(t, db.compact())
 1635         testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
 1636 
 1637         actBlocks, err := blockDirs(db.Dir())
 1638         testutil.Ok(t, err)
 1639         testutil.Equals(t, len(db.Blocks()), len(actBlocks))
 1640         testutil.Equals(t, 0, len(actBlocks))
 1641 
 1642         app = db.Appender()
 1643         _, err = app.Add(defaultLabel, 1, 0)
 1644         testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed")
 1645 
 1646         // Adding new blocks.
 1647         currentTime := db.Head().MaxTime()
 1648         _, err = app.Add(defaultLabel, currentTime, 0)
 1649         testutil.Ok(t, err)
 1650         _, err = app.Add(defaultLabel, currentTime+1, 0)
 1651         testutil.Ok(t, err)
 1652         _, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0)
 1653         testutil.Ok(t, err)
 1654         testutil.Ok(t, app.Commit())
 1655 
 1656         testutil.Ok(t, db.compact())
 1657         testutil.Equals(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
 1658         actBlocks, err = blockDirs(db.Dir())
 1659         testutil.Ok(t, err)
 1660         testutil.Equals(t, len(db.Blocks()), len(actBlocks))
 1661         testutil.Assert(t, len(actBlocks) == 1, "No blocks created when compacting with >0 samples")
 1662     })
 1663 
 1664     t.Run(`When no new block is created from head, and there are some blocks on disk
 1665     compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) {
 1666         oldBlocks := db.Blocks()
 1667         app := db.Appender()
 1668         currentTime := db.Head().MaxTime()
 1669         _, err := app.Add(defaultLabel, currentTime, 0)
 1670         testutil.Ok(t, err)
 1671         _, err = app.Add(defaultLabel, currentTime+1, 0)
 1672         testutil.Ok(t, err)
 1673         _, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0)
 1674         testutil.Ok(t, err)
 1675         testutil.Ok(t, app.Commit())
 1676         testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
 1677         testutil.Ok(t, db.compact())
 1678         testutil.Equals(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
 1679         testutil.Equals(t, oldBlocks, db.Blocks())
 1680     })
 1681 
 1682     t.Run("Test no blocks remaining after deleting all samples from disk.", func(t *testing.T) {
 1683         currentTime := db.Head().MaxTime()
 1684         blocks := []*BlockMeta{
 1685             {MinTime: currentTime, MaxTime: currentTime + db.opts.BlockRanges[0]},
 1686             {MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]},
 1687         }
 1688         for _, m := range blocks {
 1689             createBlock(t, db.Dir(), genSeries(2, 2, m.MinTime, m.MaxTime))
 1690         }
 1691 
 1692         oldBlocks := db.Blocks()
 1693         testutil.Ok(t, db.reload())                                      // Reload the db to register the new blocks.
 1694         testutil.Equals(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered.
 1695         testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
 1696         testutil.Ok(t, db.compact())
 1697         testutil.Equals(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here once for each block that have tombstones")
 1698 
 1699         actBlocks, err := blockDirs(db.Dir())
 1700         testutil.Ok(t, err)
 1701         testutil.Equals(t, len(db.Blocks()), len(actBlocks))
 1702         testutil.Equals(t, 1, len(actBlocks), "All samples are deleted. Only the most recent block should remain after compaction.")
 1703     })
 1704 }
 1705 
 1706 func TestDB_LabelNames(t *testing.T) {
 1707     tests := []struct {
 1708         // Add 'sampleLabels1' -> Test Head -> Compact -> Test Disk ->
 1709         // -> Add 'sampleLabels2' -> Test Head+Disk
 1710 
 1711         sampleLabels1 [][2]string // For checking head and disk separately.
 1712         // To test Head+Disk, sampleLabels2 should have
 1713         // at least 1 unique label name which is not in sampleLabels1.
 1714         sampleLabels2 [][2]string // // For checking head and disk together.
 1715         exp1          []string    // after adding sampleLabels1.
 1716         exp2          []string    // after adding sampleLabels1 and sampleLabels2.
 1717     }{
 1718         {
 1719             sampleLabels1: [][2]string{
 1720                 {"name1", "1"},
 1721                 {"name3", "3"},
 1722                 {"name2", "2"},
 1723             },
 1724             sampleLabels2: [][2]string{
 1725                 {"name4", "4"},
 1726                 {"name1", "1"},
 1727             },
 1728             exp1: []string{"name1", "name2", "name3"},
 1729             exp2: []string{"name1", "name2", "name3", "name4"},
 1730         },
 1731         {
 1732             sampleLabels1: [][2]string{
 1733                 {"name2", "2"},
 1734                 {"name1", "1"},
 1735                 {"name2", "2"},
 1736             },
 1737             sampleLabels2: [][2]string{
 1738                 {"name6", "6"},
 1739                 {"name0", "0"},
 1740             },
 1741             exp1: []string{"name1", "name2"},
 1742             exp2: []string{"name0", "name1", "name2", "name6"},
 1743         },
 1744     }
 1745 
 1746     blockRange := DefaultOptions.BlockRanges[0]
 1747     // Appends samples into the database.
 1748     appendSamples := func(db *DB, mint, maxt int64, sampleLabels [][2]string) {
 1749         t.Helper()
 1750         app := db.Appender()
 1751         for i := mint; i <= maxt; i++ {
 1752             for _, tuple := range sampleLabels {
 1753                 label := labels.FromStrings(tuple[0], tuple[1])
 1754                 _, err := app.Add(label, i*blockRange, 0)
 1755                 testutil.Ok(t, err)
 1756             }
 1757         }
 1758         err := app.Commit()
 1759         testutil.Ok(t, err)
 1760     }
 1761     for _, tst := range tests {
 1762         db, delete := openTestDB(t, nil)
 1763         defer func() {
 1764             testutil.Ok(t, db.Close())
 1765             delete()
 1766         }()
 1767 
 1768         appendSamples(db, 0, 4, tst.sampleLabels1)
 1769 
 1770         // Testing head.
 1771         headIndexr, err := db.head.Index()
 1772         testutil.Ok(t, err)
 1773         labelNames, err := headIndexr.LabelNames()
 1774         testutil.Ok(t, err)
 1775         testutil.Equals(t, tst.exp1, labelNames)
 1776         testutil.Ok(t, headIndexr.Close())
 1777 
 1778         // Testing disk.
 1779         err = db.compact()
 1780         testutil.Ok(t, err)
 1781         // All blocks have same label names, hence check them individually.
 1782         // No need to aggregate and check.
 1783         for _, b := range db.Blocks() {
 1784             blockIndexr, err := b.Index()
 1785             testutil.Ok(t, err)
 1786             labelNames, err = blockIndexr.LabelNames()
 1787             testutil.Ok(t, err)
 1788             testutil.Equals(t, tst.exp1, labelNames)
 1789             testutil.Ok(t, blockIndexr.Close())
 1790         }
 1791 
 1792         // Adding more samples to head with new label names
 1793         // so that we can test (head+disk).LabelNames() (the union).
 1794         appendSamples(db, 5, 9, tst.sampleLabels2)
 1795 
 1796         // Testing DB (union).
 1797         q, err := db.Querier(math.MinInt64, math.MaxInt64)
 1798         testutil.Ok(t, err)
 1799         labelNames, err = q.LabelNames()
 1800         testutil.Ok(t, err)
 1801         testutil.Ok(t, q.Close())
 1802         testutil.Equals(t, tst.exp2, labelNames)
 1803     }
 1804 }
 1805 
 1806 func TestCorrectNumTombstones(t *testing.T) {
 1807     db, delete := openTestDB(t, nil)
 1808     defer func() {
 1809         testutil.Ok(t, db.Close())
 1810         delete()
 1811     }()
 1812 
 1813     blockRange := DefaultOptions.BlockRanges[0]
 1814     defaultLabel := labels.FromStrings("foo", "bar")
 1815     defaultMatcher := labels.MustNewMatcher(labels.MatchEqual, defaultLabel[0].Name, defaultLabel[0].Value)
 1816 
 1817     app := db.Appender()
 1818     for i := int64(0); i < 3; i++ {
 1819         for j := int64(0); j < 15; j++ {
 1820             _, err := app.Add(defaultLabel, i*blockRange+j, 0)
 1821             testutil.Ok(t, err)
 1822         }
 1823     }
 1824     testutil.Ok(t, app.Commit())
 1825 
 1826     err := db.compact()
 1827     testutil.Ok(t, err)
 1828     testutil.Equals(t, 1, len(db.blocks))
 1829 
 1830     testutil.Ok(t, db.Delete(0, 1, defaultMatcher))
 1831     testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones)
 1832 
 1833     // {0, 1} and {2, 3} are merged to form 1 tombstone.
 1834     testutil.Ok(t, db.Delete(2, 3, defaultMatcher))
 1835     testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones)
 1836 
 1837     testutil.Ok(t, db.Delete(5, 6, defaultMatcher))
 1838     testutil.Equals(t, uint64(2), db.blocks[0].meta.Stats.NumTombstones)
 1839 
 1840     testutil.Ok(t, db.Delete(9, 11, defaultMatcher))
 1841     testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones)
 1842 }
 1843 
 1844 func TestVerticalCompaction(t *testing.T) {
 1845     cases := []struct {
 1846         blockSeries          [][]Series
 1847         expSeries            map[string][]tsdbutil.Sample
 1848         expBlockNum          int
 1849         expOverlappingBlocks int
 1850     }{
 1851         // Case 0
 1852         // |--------------|
 1853         //        |----------------|
 1854         {
 1855             blockSeries: [][]Series{
 1856                 {
 1857                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 1858                         sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
 1859                         sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0},
 1860                     }),
 1861                 },
 1862                 {
 1863                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 1864                         sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99},
 1865                         sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
 1866                         sample{12, 99}, sample{13, 99}, sample{14, 99},
 1867                     }),
 1868                 },
 1869             },
 1870             expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
 1871                 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
 1872                 sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99},
 1873                 sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
 1874                 sample{12, 99}, sample{13, 99}, sample{14, 99},
 1875             }},
 1876             expBlockNum:          1,
 1877             expOverlappingBlocks: 1,
 1878         },
 1879         // Case 1
 1880         // |-------------------------------|
 1881         //        |----------------|
 1882         {
 1883             blockSeries: [][]Series{
 1884                 {
 1885                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 1886                         sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
 1887                         sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0},
 1888                         sample{11, 0}, sample{13, 0}, sample{17, 0},
 1889                     }),
 1890                 },
 1891                 {
 1892                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 1893                         sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99},
 1894                         sample{8, 99}, sample{9, 99}, sample{10, 99},
 1895                     }),
 1896                 },
 1897             },
 1898             expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
 1899                 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
 1900                 sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99},
 1901                 sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 0},
 1902                 sample{13, 0}, sample{17, 0},
 1903             }},
 1904             expBlockNum:          1,
 1905             expOverlappingBlocks: 1,
 1906         },
 1907         // Case 2
 1908         // |-------------------------------|
 1909         //        |------------|
 1910         //                           |--------------------|
 1911         {
 1912             blockSeries: [][]Series{
 1913                 {
 1914                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 1915                         sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
 1916                         sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0},
 1917                         sample{11, 0}, sample{13, 0}, sample{17, 0},
 1918                     }),
 1919                 },
 1920                 {
 1921                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 1922                         sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99},
 1923                         sample{8, 99}, sample{9, 99},
 1924                     }),
 1925                 },
 1926                 {
 1927                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 1928                         sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59},
 1929                         sample{21, 59}, sample{22, 59},
 1930                     }),
 1931                 },
 1932             },
 1933             expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
 1934                 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
 1935                 sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99},
 1936                 sample{8, 99}, sample{9, 99}, sample{11, 0}, sample{13, 0},
 1937                 sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59},
 1938                 sample{21, 59}, sample{22, 59},
 1939             }},
 1940             expBlockNum:          1,
 1941             expOverlappingBlocks: 1,
 1942         },
 1943         // Case 3
 1944         // |-------------------|
 1945         //                           |--------------------|
 1946         //               |----------------|
 1947         {
 1948             blockSeries: [][]Series{
 1949                 {
 1950                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 1951                         sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
 1952                         sample{5, 0}, sample{8, 0}, sample{9, 0},
 1953                     }),
 1954                 },
 1955                 {
 1956                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 1957                         sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59},
 1958                         sample{21, 59}, sample{22, 59},
 1959                     }),
 1960                 },
 1961                 {
 1962                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 1963                         sample{5, 99}, sample{6, 99}, sample{7, 99}, sample{8, 99},
 1964                         sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
 1965                         sample{16, 99}, sample{17, 99},
 1966                     }),
 1967                 },
 1968             },
 1969             expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
 1970                 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
 1971                 sample{5, 99}, sample{6, 99}, sample{7, 99}, sample{8, 99},
 1972                 sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{14, 59},
 1973                 sample{15, 59}, sample{16, 99}, sample{17, 59}, sample{20, 59},
 1974                 sample{21, 59}, sample{22, 59},
 1975             }},
 1976             expBlockNum:          1,
 1977             expOverlappingBlocks: 1,
 1978         },
 1979         // Case 4
 1980         // |-------------------------------------|
 1981         //            |------------|
 1982         //      |-------------------------|
 1983         {
 1984             blockSeries: [][]Series{
 1985                 {
 1986                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 1987                         sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
 1988                         sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
 1989                         sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
 1990                         sample{20, 0}, sample{22, 0},
 1991                     }),
 1992                 },
 1993                 {
 1994                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 1995                         sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
 1996                         sample{11, 59},
 1997                     }),
 1998                 },
 1999                 {
 2000                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 2001                         sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99},
 2002                         sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
 2003                         sample{16, 99}, sample{17, 99},
 2004                     }),
 2005                 },
 2006             },
 2007             expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
 2008                 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
 2009                 sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59},
 2010                 sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59},
 2011                 sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
 2012                 sample{20, 0}, sample{22, 0},
 2013             }},
 2014             expBlockNum:          1,
 2015             expOverlappingBlocks: 1,
 2016         },
 2017         // Case 5: series are merged properly when there are multiple series.
 2018         // |-------------------------------------|
 2019         //            |------------|
 2020         //      |-------------------------|
 2021         {
 2022             blockSeries: [][]Series{
 2023                 {
 2024                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 2025                         sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
 2026                         sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
 2027                         sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
 2028                         sample{20, 0}, sample{22, 0},
 2029                     }),
 2030                     newSeries(map[string]string{"b": "c"}, []tsdbutil.Sample{
 2031                         sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
 2032                         sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
 2033                         sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
 2034                         sample{20, 0}, sample{22, 0},
 2035                     }),
 2036                     newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{
 2037                         sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
 2038                         sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
 2039                         sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
 2040                         sample{20, 0}, sample{22, 0},
 2041                     }),
 2042                 },
 2043                 {
 2044                     newSeries(map[string]string{"__name__": "a"}, []tsdbutil.Sample{
 2045                         sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
 2046                         sample{11, 59},
 2047                     }),
 2048                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 2049                         sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
 2050                         sample{11, 59},
 2051                     }),
 2052                     newSeries(map[string]string{"aa": "bb"}, []tsdbutil.Sample{
 2053                         sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
 2054                         sample{11, 59},
 2055                     }),
 2056                     newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{
 2057                         sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
 2058                         sample{11, 59},
 2059                     }),
 2060                 },
 2061                 {
 2062                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 2063                         sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99},
 2064                         sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
 2065                         sample{16, 99}, sample{17, 99},
 2066                     }),
 2067                     newSeries(map[string]string{"aa": "bb"}, []tsdbutil.Sample{
 2068                         sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99},
 2069                         sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
 2070                         sample{16, 99}, sample{17, 99},
 2071                     }),
 2072                     newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{
 2073                         sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99},
 2074                         sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99},
 2075                         sample{16, 99}, sample{17, 99},
 2076                     }),
 2077                 },
 2078             },
 2079             expSeries: map[string][]tsdbutil.Sample{
 2080                 `{__name__="a"}`: {
 2081                     sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59},
 2082                     sample{11, 59},
 2083                 },
 2084                 `{a="b"}`: {
 2085                     sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
 2086                     sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59},
 2087                     sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59},
 2088                     sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
 2089                     sample{20, 0}, sample{22, 0},
 2090                 },
 2091                 `{aa="bb"}`: {
 2092                     sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 59},
 2093                     sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59},
 2094                     sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
 2095                 },
 2096                 `{b="c"}`: {
 2097                     sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
 2098                     sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0},
 2099                     sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0},
 2100                     sample{20, 0}, sample{22, 0},
 2101                 },
 2102                 `{c="d"}`: {
 2103                     sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
 2104                     sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59},
 2105                     sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59},
 2106                     sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99},
 2107                     sample{20, 0}, sample{22, 0},
 2108                 },
 2109             },
 2110             expBlockNum:          1,
 2111             expOverlappingBlocks: 1,
 2112         },
 2113         // Case 6
 2114         // |--------------|
 2115         //        |----------------|
 2116         //                                         |--------------|
 2117         //                                                  |----------------|
 2118         {
 2119             blockSeries: [][]Series{
 2120                 {
 2121                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 2122                         sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0},
 2123                         sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0},
 2124                     }),
 2125                 },
 2126                 {
 2127                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 2128                         sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99},
 2129                         sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
 2130                         sample{12, 99}, sample{13, 99}, sample{14, 99},
 2131                     }),
 2132                 },
 2133                 {
 2134                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 2135                         sample{20, 0}, sample{21, 0}, sample{22, 0}, sample{24, 0},
 2136                         sample{25, 0}, sample{27, 0}, sample{28, 0}, sample{29, 0},
 2137                     }),
 2138                 },
 2139                 {
 2140                     newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{
 2141                         sample{23, 99}, sample{25, 99}, sample{26, 99}, sample{27, 99},
 2142                         sample{28, 99}, sample{29, 99}, sample{30, 99}, sample{31, 99},
 2143                     }),
 2144                 },
 2145             },
 2146             expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: {
 2147                 sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99},
 2148                 sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99},
 2149                 sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99},
 2150                 sample{12, 99}, sample{13, 99}, sample{14, 99},
 2151                 sample{20, 0}, sample{21, 0}, sample{22, 0}, sample{23, 99},
 2152                 sample{24, 0}, sample{25, 99}, sample{26, 99}, sample{27, 99},
 2153                 sample{28, 99}, sample{29, 99}, sample{30, 99}, sample{31, 99},
 2154             }},
 2155             expBlockNum:          2,
 2156             expOverlappingBlocks: 2,
 2157         },
 2158     }
 2159 
 2160     defaultMatcher := labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")
 2161     for _, c := range cases {
 2162         if ok := t.Run("", func(t *testing.T) {
 2163 
 2164             tmpdir, err := ioutil.TempDir("", "data")
 2165             testutil.Ok(t, err)
 2166             defer func() {
 2167                 testutil.Ok(t, os.RemoveAll(tmpdir))
 2168             }()
 2169 
 2170             for _, series := range c.blockSeries {
 2171                 createBlock(t, tmpdir, series)
 2172             }
 2173             opts := *DefaultOptions
 2174             opts.AllowOverlappingBlocks = true
 2175             db, err := Open(tmpdir, nil, nil, &opts)
 2176             testutil.Ok(t, err)
 2177             defer func() {
 2178                 testutil.Ok(t, db.Close())
 2179             }()
 2180             db.DisableCompactions()
 2181             testutil.Assert(t, len(db.blocks) == len(c.blockSeries), "Wrong number of blocks [before compact].")
 2182 
 2183             // Vertical Query Merging test.
 2184             querier, err := db.Querier(0, 100)
 2185             testutil.Ok(t, err)
 2186             actSeries := query(t, querier, defaultMatcher)
 2187             testutil.Equals(t, c.expSeries, actSeries)
 2188 
 2189             // Vertical compaction.
 2190             lc := db.compactor.(*LeveledCompactor)
 2191             testutil.Equals(t, 0, int(prom_testutil.ToFloat64(lc.metrics.overlappingBlocks)), "overlapping blocks count should be still 0 here")
 2192             err = db.compact()
 2193             testutil.Ok(t, err)
 2194             testutil.Equals(t, c.expBlockNum, len(db.Blocks()), "Wrong number of blocks [after compact]")
 2195 
 2196             testutil.Equals(t, c.expOverlappingBlocks, int(prom_testutil.ToFloat64(lc.metrics.overlappingBlocks)), "overlapping blocks count mismatch")
 2197 
 2198             // Query test after merging the overlapping blocks.
 2199             querier, err = db.Querier(0, 100)
 2200             testutil.Ok(t, err)
 2201             actSeries = query(t, querier, defaultMatcher)
 2202             testutil.Equals(t, c.expSeries, actSeries)
 2203         }); !ok {
 2204             return
 2205         }
 2206     }
 2207 }
 2208 
 2209 // TestBlockRanges checks the following use cases:
 2210 //  - No samples can be added with timestamps lower than the last block maxt.
 2211 //  - The compactor doesn't create overlapping blocks
 2212 // even when the last blocks is not within the default boundaries.
 2213 //  - Lower boundary is based on the smallest sample in the head and
 2214 // upper boundary is rounded to the configured block range.
 2215 //
 2216 // This ensures that a snapshot that includes the head and creates a block with a custom time range
 2217 // will not overlap with the first block created by the next compaction.
 2218 func TestBlockRanges(t *testing.T) {
 2219     logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
 2220 
 2221     dir, err := ioutil.TempDir("", "test_storage")
 2222     if err != nil {
 2223         t.Fatalf("Opening test dir failed: %s", err)
 2224     }
 2225 
 2226     rangeToTriggercompaction := DefaultOptions.BlockRanges[0]/2*3 + 1
 2227 
 2228     // Test that the compactor doesn't create overlapping blocks
 2229     // when a non standard block already exists.
 2230     firstBlockMaxT := int64(3)
 2231     createBlock(t, dir, genSeries(1, 1, 0, firstBlockMaxT))
 2232     db, err := Open(dir, logger, nil, DefaultOptions)
 2233     if err != nil {
 2234         t.Fatalf("Opening test storage failed: %s", err)
 2235     }
 2236     defer func() {
 2237         os.RemoveAll(dir)
 2238     }()
 2239     app := db.Appender()
 2240     lbl := labels.Labels{{Name: "a", Value: "b"}}
 2241     _, err = app.Add(lbl, firstBlockMaxT-1, rand.Float64())
 2242     if err == nil {
 2243         t.Fatalf("appending a sample with a timestamp covered by a previous block shouldn't be possible")
 2244     }
 2245     _, err = app.Add(lbl, firstBlockMaxT+1, rand.Float64())
 2246     testutil.Ok(t, err)
 2247     _, err = app.Add(lbl, firstBlockMaxT+2, rand.Float64())
 2248     testutil.Ok(t, err)
 2249     secondBlockMaxt := firstBlockMaxT + rangeToTriggercompaction
 2250     _, err = app.Add(lbl, secondBlockMaxt, rand.Float64()) // Add samples to trigger a new compaction
 2251 
 2252     testutil.Ok(t, err)
 2253     testutil.Ok(t, app.Commit())
 2254     for x := 0; x < 100; x++ {
 2255         if len(db.Blocks()) == 2 {
 2256             break
 2257         }
 2258         time.Sleep(100 * time.Millisecond)
 2259     }
 2260     testutil.Equals(t, 2, len(db.Blocks()), "no new block created after the set timeout")
 2261 
 2262     if db.Blocks()[0].Meta().MaxTime > db.Blocks()[1].Meta().MinTime {
 2263         t.Fatalf("new block overlaps  old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta())
 2264     }
 2265 
 2266     // Test that wal records are skipped when an existing block covers the same time ranges
 2267     // and compaction doesn't create an overlapping block.
 2268     db.DisableCompactions()
 2269     _, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64())
 2270     testutil.Ok(t, err)
 2271     _, err = app.Add(lbl, secondBlockMaxt+2, rand.Float64())
 2272     testutil.Ok(t, err)
 2273     _, err = app.Add(lbl, secondBlockMaxt+3, rand.Float64())
 2274     testutil.Ok(t, err)
 2275     _, err = app.Add(lbl, secondBlockMaxt+4, rand.Float64())
 2276     testutil.Ok(t, err)
 2277     testutil.Ok(t, app.Commit())
 2278     testutil.Ok(t, db.Close())
 2279 
 2280     thirdBlockMaxt := secondBlockMaxt + 2
 2281     createBlock(t, dir, genSeries(1, 1, secondBlockMaxt+1, thirdBlockMaxt))
 2282 
 2283     db, err = Open(dir, logger, nil, DefaultOptions)
 2284     if err != nil {
 2285         t.Fatalf("Opening test storage failed: %s", err)
 2286     }
 2287     defer db.Close()
 2288     testutil.Equals(t, 3, len(db.Blocks()), "db doesn't include expected number of blocks")
 2289     testutil.Equals(t, db.Blocks()[2].Meta().MaxTime, thirdBlockMaxt, "unexpected maxt of the last block")
 2290 
 2291     app = db.Appender()
 2292     _, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggercompaction, rand.Float64()) // Trigger a compaction
 2293     testutil.Ok(t, err)
 2294     testutil.Ok(t, app.Commit())
 2295     for x := 0; x < 100; x++ {
 2296         if len(db.Blocks()) == 4 {
 2297             break
 2298         }
 2299         time.Sleep(100 * time.Millisecond)
 2300     }
 2301 
 2302     testutil.Equals(t, 4, len(db.Blocks()), "no new block created after the set timeout")
 2303 
 2304     if db.Blocks()[2].Meta().MaxTime > db.Blocks()[3].Meta().MinTime {
 2305         t.Fatalf("new block overlaps  old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta())
 2306     }
 2307 }
 2308 
 2309 // TestDBReadOnly ensures that opening a DB in readonly mode doesn't modify any files on the disk.
 2310 // It also checks that the API calls return equivalent results as a normal db.Open() mode.
 2311 func TestDBReadOnly(t *testing.T) {
 2312     var (
 2313         dbDir          string
 2314         logger         = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
 2315         expBlocks      []*Block
 2316         expSeries      map[string][]tsdbutil.Sample
 2317         expSeriesCount int
 2318         expDBHash      []byte
 2319         matchAll       = labels.MustNewMatcher(labels.MatchEqual, "", "")
 2320         err            error
 2321     )
 2322 
 2323     // Bootstrap the db.
 2324     {
 2325         dbDir, err = ioutil.TempDir("", "test")
 2326         testutil.Ok(t, err)
 2327 
 2328         defer func() {
 2329             testutil.Ok(t, os.RemoveAll(dbDir))
 2330         }()
 2331 
 2332         dbBlocks := []*BlockMeta{
 2333             {MinTime: 10, MaxTime: 11},
 2334             {MinTime: 11, MaxTime: 12},
 2335             {MinTime: 12, MaxTime: 13},
 2336         }
 2337 
 2338         for _, m := range dbBlocks {
 2339             createBlock(t, dbDir, genSeries(1, 1, m.MinTime, m.MaxTime))
 2340         }
 2341         expSeriesCount++
 2342     }
 2343 
 2344     // Open a normal db to use for a comparison.
 2345     {
 2346         dbWritable, err := Open(dbDir, logger, nil, nil)
 2347         testutil.Ok(t, err)
 2348         dbWritable.DisableCompactions()
 2349 
 2350         dbSizeBeforeAppend, err := fileutil.DirSize(dbWritable.Dir())
 2351         testutil.Ok(t, err)
 2352         app := dbWritable.Appender()
 2353         _, err = app.Add(labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0)
 2354         testutil.Ok(t, err)
 2355         testutil.Ok(t, app.Commit())
 2356         expSeriesCount++
 2357 
 2358         expBlocks = dbWritable.Blocks()
 2359         expDbSize, err := fileutil.DirSize(dbWritable.Dir())
 2360         testutil.Ok(t, err)
 2361         testutil.Assert(t, expDbSize > dbSizeBeforeAppend, "db size didn't increase after an append")
 2362 
 2363         q, err := dbWritable.Querier(math.MinInt64, math.MaxInt64)
 2364         testutil.Ok(t, err)
 2365         expSeries = query(t, q, matchAll)
 2366 
 2367         testutil.Ok(t, dbWritable.Close()) // Close here to allow getting the dir hash for windows.
 2368         expDBHash = testutil.DirHash(t, dbWritable.Dir())
 2369     }
 2370 
 2371     // Open a read only db and ensure that the API returns the same result as the normal DB.
 2372     {
 2373         dbReadOnly, err := OpenDBReadOnly(dbDir, logger)
 2374         testutil.Ok(t, err)
 2375         defer func() {
 2376             testutil.Ok(t, dbReadOnly.Close())
 2377         }()
 2378         blocks, err := dbReadOnly.Blocks()
 2379         testutil.Ok(t, err)
 2380         testutil.Equals(t, len(expBlocks), len(blocks))
 2381 
 2382         for i, expBlock := range expBlocks {
 2383             testutil.Equals(t, expBlock.Meta(), blocks[i].Meta(), "block meta mismatch")
 2384         }
 2385 
 2386         q, err := dbReadOnly.Querier(math.MinInt64, math.MaxInt64)
 2387         testutil.Ok(t, err)
 2388         readOnlySeries := query(t, q, matchAll)
 2389         readOnlyDBHash := testutil.DirHash(t, dbDir)
 2390 
 2391         testutil.Equals(t, expSeriesCount, len(readOnlySeries), "total series mismatch")
 2392         testutil.Equals(t, expSeries, readOnlySeries, "series mismatch")
 2393         testutil.Equals(t, expDBHash, readOnlyDBHash, "after all read operations the db hash should remain the same")
 2394     }
 2395 }
 2396 
 2397 // TestDBReadOnlyClosing ensures that after closing the db
 2398 // all api methods return an ErrClosed.
 2399 func TestDBReadOnlyClosing(t *testing.T) {
 2400     dbDir, err := ioutil.TempDir("", "test")
 2401     testutil.Ok(t, err)
 2402 
 2403     defer func() {
 2404         testutil.Ok(t, os.RemoveAll(dbDir))
 2405     }()
 2406     db, err := OpenDBReadOnly(dbDir, log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)))
 2407     testutil.Ok(t, err)
 2408     testutil.Ok(t, db.Close())
 2409     testutil.Equals(t, db.Close(), ErrClosed)
 2410     _, err = db.Blocks()
 2411     testutil.Equals(t, err, ErrClosed)
 2412     _, err = db.Querier(0, 1)
 2413     testutil.Equals(t, err, ErrClosed)
 2414 }
 2415 
 2416 func TestDBReadOnly_FlushWAL(t *testing.T) {
 2417     var (
 2418         dbDir  string
 2419         logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
 2420         err    error
 2421         maxt   int
 2422     )
 2423 
 2424     // Bootstrap the db.
 2425     {
 2426         dbDir, err = ioutil.TempDir("", "test")
 2427         testutil.Ok(t, err)
 2428 
 2429         defer func() {
 2430             testutil.Ok(t, os.RemoveAll(dbDir))
 2431         }()
 2432 
 2433         // Append data to the WAL.
 2434         db, err := Open(dbDir, logger, nil, nil)
 2435         testutil.Ok(t, err)
 2436         db.DisableCompactions()
 2437         app := db.Appender()
 2438         maxt = 1000
 2439         for i := 0; i < maxt; i++ {
 2440             _, err := app.Add(labels.FromStrings(defaultLabelName, "flush"), int64(i), 1.0)
 2441             testutil.Ok(t, err)
 2442         }
 2443         testutil.Ok(t, app.Commit())
 2444         defer func() { testutil.Ok(t, db.Close()) }()
 2445     }
 2446 
 2447     // Flush WAL.
 2448     db, err := OpenDBReadOnly(dbDir, logger)
 2449     testutil.Ok(t, err)
 2450 
 2451     flush, err := ioutil.TempDir("", "flush")
 2452     testutil.Ok(t, err)
 2453 
 2454     defer func() {
 2455         testutil.Ok(t, os.RemoveAll(flush))
 2456     }()
 2457     testutil.Ok(t, db.FlushWAL(flush))
 2458     testutil.Ok(t, db.Close())
 2459 
 2460     // Reopen the DB from the flushed WAL block.
 2461     db, err = OpenDBReadOnly(flush, logger)
 2462     testutil.Ok(t, err)
 2463     defer func() { testutil.Ok(t, db.Close()) }()
 2464     blocks, err := db.Blocks()
 2465     testutil.Ok(t, err)
 2466     testutil.Equals(t, len(blocks), 1)
 2467 
 2468     querier, err := db.Querier(0, int64(maxt)-1)
 2469     testutil.Ok(t, err)
 2470     defer func() { testutil.Ok(t, querier.Close()) }()
 2471 
 2472     // Sum the values.
 2473     seriesSet, err := querier.Select(labels.MustNewMatcher(labels.MatchEqual, defaultLabelName, "flush"))
 2474     testutil.Ok(t, err)
 2475 
 2476     sum := 0.0
 2477     for seriesSet.Next() {
 2478         series := seriesSet.At().Iterator()
 2479         for series.Next() {
 2480             _, v := series.At()
 2481             sum += v
 2482         }
 2483         testutil.Ok(t, series.Err())
 2484     }
 2485     testutil.Ok(t, seriesSet.Err())
 2486     testutil.Equals(t, 1000.0, sum)
 2487 }
 2488 
 2489 // TestChunkWriter_ReadAfterWrite ensures that chunk segment are cut at the set segment size and
 2490 // that the resulted segments includes the expected chunks data.
 2491 func TestChunkWriter_ReadAfterWrite(t *testing.T) {
 2492     chk1 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}})
 2493     chk2 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}})
 2494     chk3 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}})
 2495     chk4 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 4}})
 2496     chk5 := tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 5}})
 2497     chunkSize := len(chk1.Chunk.Bytes()) + chunks.MaxChunkLengthFieldSize + chunks.ChunkEncodingSize + crc32.Size
 2498 
 2499     tests := []struct {
 2500         chks [][]chunks.Meta
 2501         segmentSize,
 2502         expSegmentsCount int
 2503         expSegmentSizes []int
 2504     }{
 2505         // 0:Last chunk ends at the segment boundary so
 2506         // all chunks should fit in a single segment.
 2507         {
 2508             chks: [][]chunks.Meta{
 2509                 []chunks.Meta{
 2510                     chk1,
 2511                     chk2,
 2512                     chk3,
 2513                 },
 2514             },
 2515             segmentSize:      3 * chunkSize,
 2516             expSegmentSizes:  []int{3 * chunkSize},
 2517             expSegmentsCount: 1,
 2518         },
 2519         // 1:Two chunks can fit in a single segment so the last one should result in a new segment.
 2520         {
 2521             chks: [][]chunks.Meta{
 2522                 []chunks.Meta{
 2523                     chk1,
 2524                     chk2,
 2525                     chk3,
 2526                     chk4,
 2527                     chk5,
 2528                 },
 2529             },
 2530             segmentSize:      2 * chunkSize,
 2531             expSegmentSizes:  []int{2 * chunkSize, 2 * chunkSize, chunkSize},
 2532             expSegmentsCount: 3,
 2533         },
 2534         // 2:When the segment size is smaller than the size of 2 chunks
 2535         // the last segment should still create a new segment.
 2536         {
 2537             chks: [][]chunks.Meta{
 2538                 []chunks.Meta{
 2539                     chk1,
 2540                     chk2,
 2541                     chk3,
 2542                 },
 2543             },
 2544             segmentSize:      2*chunkSize - 1,
 2545             expSegmentSizes:  []int{chunkSize, chunkSize, chunkSize},
 2546             expSegmentsCount: 3,
 2547         },
 2548         // 3:When the segment is smaller than a single chunk
 2549         // it should still be written by ignoring the max segment size.
 2550         {
 2551             chks: [][]chunks.Meta{
 2552                 []chunks.Meta{
 2553                     chk1,
 2554                 },
 2555             },
 2556             segmentSize:      chunkSize - 1,
 2557             expSegmentSizes:  []int{chunkSize},
 2558             expSegmentsCount: 1,
 2559         },
 2560         // 4:All chunks are bigger than the max segment size, but
 2561         // these should still be written even when this will result in bigger segment than the set size.
 2562         // Each segment will hold a single chunk.
 2563         {
 2564             chks: [][]chunks.Meta{
 2565                 []chunks.Meta{
 2566                     chk1,
 2567                     chk2,
 2568                     chk3,
 2569                 },
 2570             },
 2571             segmentSize:      1,
 2572             expSegmentSizes:  []int{chunkSize, chunkSize, chunkSize},
 2573             expSegmentsCount: 3,
 2574         },
 2575         // 5:Adding multiple batches of chunks.
 2576         {
 2577             chks: [][]chunks.Meta{
 2578                 []chunks.Meta{
 2579                     chk1,
 2580                     chk2,
 2581                     chk3,
 2582                 },
 2583                 []chunks.Meta{
 2584                     chk4,
 2585                     chk5,
 2586                 },
 2587             },
 2588             segmentSize:      3 * chunkSize,
 2589             expSegmentSizes:  []int{3 * chunkSize, 2 * chunkSize},
 2590             expSegmentsCount: 2,
 2591         },
 2592         // 6:Adding multiple batches of chunks.
 2593         {
 2594             chks: [][]chunks.Meta{
 2595                 []chunks.Meta{
 2596                     chk1,
 2597                 },
 2598                 []chunks.Meta{
 2599                     chk2,
 2600                     chk3,
 2601                 },
 2602                 []chunks.Meta{
 2603                     chk4,
 2604                 },
 2605             },
 2606             segmentSize:      2 * chunkSize,
 2607             expSegmentSizes:  []int{2 * chunkSize, 2 * chunkSize},
 2608             expSegmentsCount: 2,
 2609         },
 2610     }
 2611 
 2612     for i, test := range tests {
 2613         t.Run(strconv.Itoa(i), func(t *testing.T) {
 2614 
 2615             tempDir, err := ioutil.TempDir("", "test_chunk_writer")
 2616             testutil.Ok(t, err)
 2617             defer func() { testutil.Ok(t, os.RemoveAll(tempDir)) }()
 2618 
 2619             chunkw, err := chunks.NewWriterWithSegSize(tempDir, chunks.SegmentHeaderSize+int64(test.segmentSize))
 2620             testutil.Ok(t, err)
 2621 
 2622             for _, chks := range test.chks {
 2623                 testutil.Ok(t, chunkw.WriteChunks(chks...))
 2624             }
 2625             testutil.Ok(t, chunkw.Close())
 2626 
 2627             files, err := ioutil.ReadDir(tempDir)
 2628             testutil.Ok(t, err)
 2629             testutil.Equals(t, test.expSegmentsCount, len(files), "expected segments count mismatch")
 2630 
 2631             // Verify that all data is written to the segments.
 2632             sizeExp := 0
 2633             sizeAct := 0
 2634 
 2635             for _, chks := range test.chks {
 2636                 for _, chk := range chks {
 2637                     l := make([]byte, binary.MaxVarintLen32)
 2638                     sizeExp += binary.PutUvarint(l, uint64(len(chk.Chunk.Bytes()))) // The length field.
 2639                     sizeExp += chunks.ChunkEncodingSize
 2640                     sizeExp += len(chk.Chunk.Bytes()) // The data itself.
 2641                     sizeExp += crc32.Size             // The 4 bytes of crc32
 2642                 }
 2643             }
 2644             sizeExp += test.expSegmentsCount * chunks.SegmentHeaderSize // The segment header bytes.
 2645 
 2646             for i, f := range files {
 2647                 size := int(f.Size())
 2648                 // Verify that the segment is the same or smaller than the expected size.
 2649                 testutil.Assert(t, chunks.SegmentHeaderSize+test.expSegmentSizes[i] >= size, "Segment:%v should NOT be bigger than:%v actual:%v", i, chunks.SegmentHeaderSize+test.expSegmentSizes[i], size)
 2650 
 2651                 sizeAct += size
 2652             }
 2653             testutil.Equals(t, sizeExp, sizeAct)
 2654 
 2655             // Check the content of the chunks.
 2656             r, err := chunks.NewDirReader(tempDir, nil)
 2657             testutil.Ok(t, err)
 2658             defer func() { testutil.Ok(t, r.Close()) }()
 2659 
 2660             for _, chks := range test.chks {
 2661                 for _, chkExp := range chks {
 2662                     chkAct, err := r.Chunk(chkExp.Ref)
 2663                     testutil.Ok(t, err)
 2664                     testutil.Equals(t, chkExp.Chunk.Bytes(), chkAct.Bytes())
 2665                 }
 2666             }
 2667         })
 2668     }
 2669 }
 2670 
 2671 // TestChunkReader_ConcurrentReads checks that the chunk result can be read concurrently.
 2672 // Regression test for https://github.com/prometheus/prometheus/pull/6514.
 2673 func TestChunkReader_ConcurrentReads(t *testing.T) {
 2674     chks := []chunks.Meta{
 2675         tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 1}}),
 2676         tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 2}}),
 2677         tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 3}}),
 2678         tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 4}}),
 2679         tsdbutil.ChunkFromSamples([]tsdbutil.Sample{sample{1, 5}}),
 2680     }
 2681 
 2682     tempDir, err := ioutil.TempDir("", "test_chunk_writer")
 2683     testutil.Ok(t, err)
 2684     defer func() { testutil.Ok(t, os.RemoveAll(tempDir)) }()
 2685 
 2686     chunkw, err := chunks.NewWriter(tempDir)
 2687     testutil.Ok(t, err)
 2688 
 2689     testutil.Ok(t, chunkw.WriteChunks(chks...))
 2690     testutil.Ok(t, chunkw.Close())
 2691 
 2692     r, err := chunks.NewDirReader(tempDir, nil)
 2693     testutil.Ok(t, err)
 2694 
 2695     var wg sync.WaitGroup
 2696     for _, chk := range chks {
 2697         for i := 0; i < 100; i++ {
 2698             wg.Add(1)
 2699             go func(chunk chunks.Meta) {
 2700                 defer wg.Done()
 2701 
 2702                 chkAct, err := r.Chunk(chunk.Ref)
 2703                 testutil.Ok(t, err)
 2704                 testutil.Equals(t, chunk.Chunk.Bytes(), chkAct.Bytes())
 2705             }(chk)
 2706         }
 2707         wg.Wait()
 2708     }
 2709     testutil.Ok(t, r.Close())
 2710 }