"Fossies" - the Fresh Open Source Software Archive

Member "prometheus-2.15.2/tsdb/block_test.go" (6 Jan 2020, 12336 Bytes) of package /linux/misc/prometheus-2.15.2.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Go source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file. See also the latest Fossies "Diffs" side-by-side code changes report for "block_test.go": 2.15.1_vs_2.15.2.

    1 // Copyright 2017 The Prometheus Authors
    2 // Licensed under the Apache License, Version 2.0 (the "License");
    3 // you may not use this file except in compliance with the License.
    4 // You may obtain a copy of the License at
    5 //
    6 // http://www.apache.org/licenses/LICENSE-2.0
    7 //
    8 // Unless required by applicable law or agreed to in writing, software
    9 // distributed under the License is distributed on an "AS IS" BASIS,
   10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   11 // See the License for the specific language governing permissions and
   12 // limitations under the License.
   13 
   14 package tsdb
   15 
   16 import (
   17     "context"
   18     "encoding/binary"
   19 
   20     "errors"
   21     "hash/crc32"
   22     "io/ioutil"
   23     "math/rand"
   24     "os"
   25     "path/filepath"
   26     "strconv"
   27     "testing"
   28 
   29     "github.com/go-kit/kit/log"
   30     "github.com/prometheus/prometheus/pkg/labels"
   31     "github.com/prometheus/prometheus/tsdb/chunks"
   32     "github.com/prometheus/prometheus/tsdb/fileutil"
   33     "github.com/prometheus/prometheus/tsdb/tsdbutil"
   34     "github.com/prometheus/prometheus/util/testutil"
   35 )
   36 
   37 // In Prometheus 2.1.0 we had a bug where the meta.json version was falsely bumped
   38 // to 2. We had a migration in place resetting it to 1 but we should move immediately to
   39 // version 3 next time to avoid confusion and issues.
   40 func TestBlockMetaMustNeverBeVersion2(t *testing.T) {
   41     dir, err := ioutil.TempDir("", "metaversion")
   42     testutil.Ok(t, err)
   43     defer func() {
   44         testutil.Ok(t, os.RemoveAll(dir))
   45     }()
   46 
   47     _, err = writeMetaFile(log.NewNopLogger(), dir, &BlockMeta{})
   48     testutil.Ok(t, err)
   49 
   50     meta, _, err := readMetaFile(dir)
   51     testutil.Ok(t, err)
   52     testutil.Assert(t, meta.Version != 2, "meta.json version must never be 2")
   53 }
   54 
   55 func TestSetCompactionFailed(t *testing.T) {
   56     tmpdir, err := ioutil.TempDir("", "test")
   57     testutil.Ok(t, err)
   58     defer func() {
   59         testutil.Ok(t, os.RemoveAll(tmpdir))
   60     }()
   61 
   62     blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1))
   63     b, err := OpenBlock(nil, blockDir, nil)
   64     testutil.Ok(t, err)
   65     testutil.Equals(t, false, b.meta.Compaction.Failed)
   66     testutil.Ok(t, b.setCompactionFailed())
   67     testutil.Equals(t, true, b.meta.Compaction.Failed)
   68     testutil.Ok(t, b.Close())
   69 
   70     b, err = OpenBlock(nil, blockDir, nil)
   71     testutil.Ok(t, err)
   72     testutil.Equals(t, true, b.meta.Compaction.Failed)
   73     testutil.Ok(t, b.Close())
   74 }
   75 
   76 func TestCreateBlock(t *testing.T) {
   77     tmpdir, err := ioutil.TempDir("", "test")
   78     testutil.Ok(t, err)
   79     defer func() {
   80         testutil.Ok(t, os.RemoveAll(tmpdir))
   81     }()
   82     b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil)
   83     if err == nil {
   84         testutil.Ok(t, b.Close())
   85     }
   86     testutil.Ok(t, err)
   87 }
   88 
   89 func TestCorruptedChunk(t *testing.T) {
   90     for name, test := range map[string]struct {
   91         corrFunc func(f *os.File) // Func that applies the corruption.
   92         openErr  error
   93         queryErr error
   94     }{
   95         "invalid header size": {
   96             func(f *os.File) {
   97                 err := f.Truncate(1)
   98                 testutil.Ok(t, err)
   99             },
  100             errors.New("invalid segment header in segment 0: invalid size"),
  101             nil,
  102         },
  103         "invalid magic number": {
  104             func(f *os.File) {
  105                 magicChunksOffset := int64(0)
  106                 _, err := f.Seek(magicChunksOffset, 0)
  107                 testutil.Ok(t, err)
  108 
  109                 // Set invalid magic number.
  110                 b := make([]byte, chunks.MagicChunksSize)
  111                 binary.BigEndian.PutUint32(b[:chunks.MagicChunksSize], 0x00000000)
  112                 n, err := f.Write(b)
  113                 testutil.Ok(t, err)
  114                 testutil.Equals(t, chunks.MagicChunksSize, n)
  115             },
  116             errors.New("invalid magic number 0"),
  117             nil,
  118         },
  119         "invalid chunk format version": {
  120             func(f *os.File) {
  121                 chunksFormatVersionOffset := int64(4)
  122                 _, err := f.Seek(chunksFormatVersionOffset, 0)
  123                 testutil.Ok(t, err)
  124 
  125                 // Set invalid chunk format version.
  126                 b := make([]byte, chunks.ChunksFormatVersionSize)
  127                 b[0] = 0
  128                 n, err := f.Write(b)
  129                 testutil.Ok(t, err)
  130                 testutil.Equals(t, chunks.ChunksFormatVersionSize, n)
  131             },
  132             errors.New("invalid chunk format version 0"),
  133             nil,
  134         },
  135         "chunk not enough bytes to read the chunk length": {
  136             func(f *os.File) {
  137                 // Truncate one byte after the segment header.
  138                 err := f.Truncate(chunks.SegmentHeaderSize + 1)
  139                 testutil.Ok(t, err)
  140             },
  141             nil,
  142             errors.New("segment doesn't include enough bytes to read the chunk size data field - required:13, available:9"),
  143         },
  144         "chunk not enough bytes to read the data": {
  145             func(f *os.File) {
  146                 fi, err := f.Stat()
  147                 testutil.Ok(t, err)
  148 
  149                 err = f.Truncate(fi.Size() - 1)
  150                 testutil.Ok(t, err)
  151             },
  152             nil,
  153             errors.New("segment doesn't include enough bytes to read the chunk - required:26, available:25"),
  154         },
  155         "checksum mismatch": {
  156             func(f *os.File) {
  157                 fi, err := f.Stat()
  158                 testutil.Ok(t, err)
  159 
  160                 // Get the chunk data end offset.
  161                 chkEndOffset := int(fi.Size()) - crc32.Size
  162 
  163                 // Seek to the last byte of chunk data and modify it.
  164                 _, err = f.Seek(int64(chkEndOffset-1), 0)
  165                 testutil.Ok(t, err)
  166                 n, err := f.Write([]byte("x"))
  167                 testutil.Ok(t, err)
  168                 testutil.Equals(t, n, 1)
  169             },
  170             nil,
  171             errors.New("checksum mismatch expected:cfc0526c, actual:34815eae"),
  172         },
  173     } {
  174         t.Run(name, func(t *testing.T) {
  175             tmpdir, err := ioutil.TempDir("", "test_open_block_chunk_corrupted")
  176             testutil.Ok(t, err)
  177             defer func() {
  178                 testutil.Ok(t, os.RemoveAll(tmpdir))
  179             }()
  180 
  181             series := newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{sample{1, 1}})
  182             blockDir := createBlock(t, tmpdir, []Series{series})
  183             files, err := sequenceFiles(chunkDir(blockDir))
  184             testutil.Ok(t, err)
  185             testutil.Assert(t, len(files) > 0, "No chunk created.")
  186 
  187             f, err := os.OpenFile(files[0], os.O_RDWR, 0666)
  188             testutil.Ok(t, err)
  189 
  190             // Apply corruption function.
  191             test.corrFunc(f)
  192             testutil.Ok(t, f.Close())
  193 
  194             // Check open err.
  195             b, err := OpenBlock(nil, blockDir, nil)
  196             if test.openErr != nil {
  197                 testutil.Equals(t, test.openErr.Error(), err.Error())
  198                 return
  199             }
  200             defer func() { testutil.Ok(t, b.Close()) }()
  201 
  202             querier, err := NewBlockQuerier(b, 0, 1)
  203             testutil.Ok(t, err)
  204             defer func() { testutil.Ok(t, querier.Close()) }()
  205             set, err := querier.Select(labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
  206             testutil.Ok(t, err)
  207 
  208             // Check query err.
  209             testutil.Equals(t, false, set.Next())
  210             testutil.Equals(t, test.queryErr.Error(), set.Err().Error())
  211         })
  212     }
  213 }
  214 
  215 // TestBlockSize ensures that the block size is calculated correctly.
  216 func TestBlockSize(t *testing.T) {
  217     tmpdir, err := ioutil.TempDir("", "test_blockSize")
  218     testutil.Ok(t, err)
  219     defer func() {
  220         testutil.Ok(t, os.RemoveAll(tmpdir))
  221     }()
  222 
  223     var (
  224         blockInit    *Block
  225         expSizeInit  int64
  226         blockDirInit string
  227     )
  228 
  229     // Create a block and compare the reported size vs actual disk size.
  230     {
  231         blockDirInit = createBlock(t, tmpdir, genSeries(10, 1, 1, 100))
  232         blockInit, err = OpenBlock(nil, blockDirInit, nil)
  233         testutil.Ok(t, err)
  234         defer func() {
  235             testutil.Ok(t, blockInit.Close())
  236         }()
  237         expSizeInit = blockInit.Size()
  238         actSizeInit, err := fileutil.DirSize(blockInit.Dir())
  239         testutil.Ok(t, err)
  240         testutil.Equals(t, expSizeInit, actSizeInit)
  241     }
  242 
  243     // Delete some series and check the sizes again.
  244     {
  245         testutil.Ok(t, blockInit.Delete(1, 10, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")))
  246         expAfterDelete := blockInit.Size()
  247         testutil.Assert(t, expAfterDelete > expSizeInit, "after a delete the block size should be bigger as the tombstone file should grow %v > %v", expAfterDelete, expSizeInit)
  248         actAfterDelete, err := fileutil.DirSize(blockDirInit)
  249         testutil.Ok(t, err)
  250         testutil.Equals(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size")
  251 
  252         c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil)
  253         testutil.Ok(t, err)
  254         blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil)
  255         testutil.Ok(t, err)
  256         blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirAfterCompact.String()), nil)
  257         testutil.Ok(t, err)
  258         defer func() {
  259             testutil.Ok(t, blockAfterCompact.Close())
  260         }()
  261         expAfterCompact := blockAfterCompact.Size()
  262         actAfterCompact, err := fileutil.DirSize(blockAfterCompact.Dir())
  263         testutil.Ok(t, err)
  264         testutil.Assert(t, actAfterDelete > actAfterCompact, "after a delete and compaction the block size should be smaller %v,%v", actAfterDelete, actAfterCompact)
  265         testutil.Equals(t, expAfterCompact, actAfterCompact, "after a delete and compaction reported block size doesn't match actual disk size")
  266     }
  267 }
  268 
  269 func TestReadIndexFormatV1(t *testing.T) {
  270     /* The block here was produced at the commit
  271         706602daed1487f7849990678b4ece4599745905 used in 2.0.0 with:
  272        db, _ := Open("v1db", nil, nil, nil)
  273        app := db.Appender()
  274        app.Add(labels.FromStrings("foo", "bar"), 1, 2)
  275        app.Add(labels.FromStrings("foo", "baz"), 3, 4)
  276        app.Add(labels.FromStrings("foo", "meh"), 1000*3600*4, 4) // Not in the block.
  277        // Make sure we've enough values for the lack of sorting of postings offsets to show up.
  278        for i := 0; i < 100; i++ {
  279          app.Add(labels.FromStrings("bar", strconv.FormatInt(int64(i), 10)), 0, 0)
  280        }
  281        app.Commit()
  282        db.compact()
  283        db.Close()
  284     */
  285 
  286     blockDir := filepath.Join("testdata", "index_format_v1")
  287     block, err := OpenBlock(nil, blockDir, nil)
  288     testutil.Ok(t, err)
  289 
  290     q, err := NewBlockQuerier(block, 0, 1000)
  291     testutil.Ok(t, err)
  292     testutil.Equals(t, query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")),
  293         map[string][]tsdbutil.Sample{`{foo="bar"}`: []tsdbutil.Sample{sample{t: 1, v: 2}}})
  294 
  295     q, err = NewBlockQuerier(block, 0, 1000)
  296     testutil.Ok(t, err)
  297     testutil.Equals(t, query(t, q, labels.MustNewMatcher(labels.MatchNotRegexp, "foo", "^.?$")),
  298         map[string][]tsdbutil.Sample{
  299             `{foo="bar"}`: []tsdbutil.Sample{sample{t: 1, v: 2}},
  300             `{foo="baz"}`: []tsdbutil.Sample{sample{t: 3, v: 4}},
  301         })
  302 }
  303 
  304 // createBlock creates a block with given set of series and returns its dir.
  305 func createBlock(tb testing.TB, dir string, series []Series) string {
  306     return createBlockFromHead(tb, dir, createHead(tb, series))
  307 }
  308 
  309 func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
  310     compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil)
  311     testutil.Ok(tb, err)
  312 
  313     testutil.Ok(tb, os.MkdirAll(dir, 0777))
  314 
  315     // Add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime).
  316     // Because of this block intervals are always +1 than the total samples it includes.
  317     ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime()+1, nil)
  318     testutil.Ok(tb, err)
  319     return filepath.Join(dir, ulid.String())
  320 }
  321 
  322 func createHead(tb testing.TB, series []Series) *Head {
  323     head, err := NewHead(nil, nil, nil, 2*60*60*1000)
  324     testutil.Ok(tb, err)
  325     defer head.Close()
  326 
  327     app := head.Appender()
  328     for _, s := range series {
  329         ref := uint64(0)
  330         it := s.Iterator()
  331         for it.Next() {
  332             t, v := it.At()
  333             if ref != 0 {
  334                 err := app.AddFast(ref, t, v)
  335                 if err == nil {
  336                     continue
  337                 }
  338             }
  339             ref, err = app.Add(s.Labels(), t, v)
  340             testutil.Ok(tb, err)
  341         }
  342         testutil.Ok(tb, it.Err())
  343     }
  344     err = app.Commit()
  345     testutil.Ok(tb, err)
  346     return head
  347 }
  348 
  349 const (
  350     defaultLabelName  = "labelName"
  351     defaultLabelValue = "labelValue"
  352 )
  353 
  354 // genSeries generates series with a given number of labels and values.
  355 func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series {
  356     if totalSeries == 0 || labelCount == 0 {
  357         return nil
  358     }
  359 
  360     series := make([]Series, totalSeries)
  361 
  362     for i := 0; i < totalSeries; i++ {
  363         lbls := make(map[string]string, labelCount)
  364         lbls[defaultLabelName] = strconv.Itoa(i)
  365         for j := 1; len(lbls) < labelCount; j++ {
  366             lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j)
  367         }
  368         samples := make([]tsdbutil.Sample, 0, maxt-mint+1)
  369         for t := mint; t < maxt; t++ {
  370             samples = append(samples, sample{t: t, v: rand.Float64()})
  371         }
  372         series[i] = newSeries(lbls, samples)
  373     }
  374     return series
  375 }
  376 
  377 // populateSeries generates series from given labels, mint and maxt.
  378 func populateSeries(lbls []map[string]string, mint, maxt int64) []Series {
  379     if len(lbls) == 0 {
  380         return nil
  381     }
  382 
  383     series := make([]Series, 0, len(lbls))
  384     for _, lbl := range lbls {
  385         if len(lbl) == 0 {
  386             continue
  387         }
  388         samples := make([]tsdbutil.Sample, 0, maxt-mint+1)
  389         for t := mint; t <= maxt; t++ {
  390             samples = append(samples, sample{t: t, v: rand.Float64()})
  391         }
  392         series = append(series, newSeries(lbl, samples))
  393     }
  394     return series
  395 }