Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Commit

Permalink
Refactor block meta iteration (#849)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Jul 13, 2023
1 parent 7aef408 commit 55f2aea
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 109 deletions.
114 changes: 113 additions & 1 deletion pkg/phlaredb/block/list.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
package block

import (
"context"
"fmt"
"os"
"path/filepath"
"sort"
"time"

"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"golang.org/x/sync/errgroup"

phlareobj "github.com/grafana/phlare/pkg/objstore"
"github.com/grafana/phlare/pkg/util"
)

func ListBlock(path string, ulidMinTime time.Time) (map[ulid.ULID]*Meta, error) {
func ListBlocks(path string, ulidMinTime time.Time) (map[ulid.ULID]*Meta, error) {
result := make(map[ulid.ULID]*Meta)
entries, err := os.ReadDir(path)
if err != nil {
Expand All @@ -32,6 +41,109 @@ func ListBlock(path string, ulidMinTime time.Time) (map[ulid.ULID]*Meta, error)
return result, nil
}

// IterBlockMetas iterates over all block metas in the given time range.
// It calls the given function for each block meta.
// It returns the first error returned by the function.
// It returns nil if all calls succeed.
// The function is called concurrently.
// Currently doesn't work with filesystem bucket.
func IterBlockMetas(ctx context.Context, bkt phlareobj.Bucket, from, to time.Time, fn func(*Meta)) error {
allIDs, err := listAllBlockByPrefixes(ctx, bkt, from, to)
if err != nil {
return err
}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(128)

// fetch all meta.json
for _, ids := range allIDs {
for _, id := range ids {
id := id
g.Go(func() error {
r, err := bkt.Get(ctx, id+block.MetaFilename)
if err != nil {
return err
}

m, err := Read(r)
if err != nil {
return err
}
fn(m)
return nil
})
}
}
return g.Wait()
}

func listAllBlockByPrefixes(ctx context.Context, bkt phlareobj.Bucket, from, to time.Time) ([][]string, error) {
// todo: We should cache prefixes listing per tenants.
blockPrefixes, err := blockPrefixesFromTo(from, to, 4)
if err != nil {
return nil, err
}
ids := make([][]string, len(blockPrefixes))
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(64)

for i, prefix := range blockPrefixes {
prefix := prefix
i := i
g.Go(func() error {
level.Debug(util.Logger).Log("msg", "listing blocks", "prefix", prefix, "i", i)
prefixIds := []string{}
err := bkt.Iter(ctx, prefix, func(name string) error {
if _, ok := block.IsBlockDir(name); ok {
prefixIds = append(prefixIds, name)
}
return nil
}, objstore.WithoutApendingDirDelim)
if err != nil {
return err
}
ids[i] = prefixIds
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return ids, nil
}

// orderOfSplit is the number of bytes of the ulid id used for the split. The duration of the split is:
// 0: 1114y
// 1: 34.8y
// 2: 1y
// 3: 12.4d
// 4: 9h19m
// TODO: To needs to be adapted based on the MaxBlockDuration.
func blockPrefixesFromTo(from, to time.Time, orderOfSplit uint8) (prefixes []string, err error) {
var id ulid.ULID

if orderOfSplit > 9 {
return nil, fmt.Errorf("order of split must be between 0 and 9")
}

byteShift := (9 - orderOfSplit) * 5

ms := uint64(from.UnixMilli()) >> byteShift
ms = ms << byteShift
for ms <= uint64(to.UnixMilli()) {
if err := id.SetTime(ms); err != nil {
return nil, err
}
prefixes = append(prefixes, id.String()[:orderOfSplit+1])

ms = ms >> byteShift
ms += 1
ms = ms << byteShift
}

return prefixes, nil
}

func SortBlocks(metas map[ulid.ULID]*Meta) []*Meta {
var blocks []*Meta

Expand Down
121 changes: 14 additions & 107 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package storegateway

import (
"context"
"fmt"
"os"
"path"
"path/filepath"
Expand All @@ -17,8 +16,6 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/samber/lo"
"github.com/thanos-io/objstore"
"golang.org/x/sync/errgroup"

phlareobj "github.com/grafana/phlare/pkg/objstore"
"github.com/grafana/phlare/pkg/phlaredb"
Expand Down Expand Up @@ -258,48 +255,25 @@ func (s *BucketStore) fetchBlocksMeta(ctx context.Context) (map[ulid.ULID]*block
to = time.Now()
from = to.Add(-time.Hour * 24 * 31) // todo make this configurable
)
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(128)

allIDs, err := s.listAllBlockByPrefixes(ctx, from, to)
if err != nil {
return nil, err
}
totalMeta := 0
for _, ids := range allIDs {
totalMeta += len(ids)
}
metas := make([]*block.Meta, totalMeta)
idx := 0
var (
metas []*block.Meta
mtx sync.Mutex
)

start := time.Now()
level.Debug(s.logger).Log("msg", "fetching blocks meta", "total", totalMeta)
level.Debug(s.logger).Log("msg", "fetching blocks meta", "from", from, "to", to)
defer func() {
level.Debug(s.logger).Log("msg", "fetched blocks meta", "total", totalMeta, "elapsed", time.Since(start))
level.Debug(s.logger).Log("msg", "fetched blocks meta", "total", len(metas), "elapsed", time.Since(start))
}()
// fetch all meta.json
for _, ids := range allIDs {
for _, id := range ids {
id := id
currentIdx := idx
idx++
g.Go(func() error {
r, err := s.bucket.Get(ctx, id+block.MetaFilename)
if err != nil {
return err
}

m, err := block.Read(r)
if err != nil {
return err
}
metas[currentIdx] = m
return nil
})
}
}
if err := g.Wait(); err != nil {
return nil, err
if err := block.IterBlockMetas(ctx, s.bucket, from, to, func(m *block.Meta) {
mtx.Lock()
defer mtx.Unlock()
metas = append(metas, m)
}); err != nil {
return nil, errors.Wrap(err, "iter block metas")
}

metaMap := lo.SliceToMap(metas, func(item *block.Meta) (ulid.ULID, *block.Meta) {
return item.ULID, item
})
Expand All @@ -316,73 +290,6 @@ func (s *BucketStore) fetchBlocksMeta(ctx context.Context) (map[ulid.ULID]*block
return metaMap, nil
}

func (s *BucketStore) listAllBlockByPrefixes(ctx context.Context, from, to time.Time) ([][]string, error) {
// todo: We should cache prefixes listing per tenants.
blockPrefixes, err := blockPrefixesFromTo(from, to, 4)
if err != nil {
return nil, err
}
ids := make([][]string, len(blockPrefixes))
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(64)

for i, prefix := range blockPrefixes {
prefix := prefix
i := i
g.Go(func() error {
level.Debug(s.logger).Log("msg", "listing blocks", "prefix", prefix, "i", i)
prefixIds := []string{}
err := s.bucket.Iter(ctx, prefix, func(name string) error {
if _, ok := block.IsBlockDir(name); ok {
prefixIds = append(prefixIds, name)
}
return nil
}, objstore.WithoutApendingDirDelim)
if err != nil {
return err
}
ids[i] = prefixIds
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return ids, nil
}

// orderOfSplit is the number of bytes of the ulid id used for the split. The duration of the split is:
// 0: 1114y
// 1: 34.8y
// 2: 1y
// 3: 12.4d
// 4: 9h19m
// TODO: To needs to be adapted based on the MaxBlockDuration.
func blockPrefixesFromTo(from, to time.Time, orderOfSplit uint8) (prefixes []string, err error) {
var id ulid.ULID

if orderOfSplit > 9 {
return nil, fmt.Errorf("order of split must be between 0 and 9")
}

byteShift := (9 - orderOfSplit) * 5

ms := uint64(from.UnixMilli()) >> byteShift
ms = ms << byteShift
for ms <= uint64(to.UnixMilli()) {
if err := id.SetTime(ms); err != nil {
return nil, err
}
prefixes = append(prefixes, id.String()[:orderOfSplit+1])

ms = ms >> byteShift
ms += 1
ms = ms << byteShift
}

return prefixes, nil
}

// bucketBlockSet holds all blocks.
type bucketBlockSet struct {
mtx sync.RWMutex
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/gateway_blocks_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (s *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request) {
}
}

metasMap, err := block.ListBlock(filepath.Join(s.gatewayCfg.BucketStoreConfig.SyncDir, tenantID), time.Time{})
metasMap, err := block.ListBlocks(filepath.Join(s.gatewayCfg.BucketStoreConfig.SyncDir, tenantID), time.Time{})
if err != nil {
util.WriteTextResponse(w, fmt.Sprintf("Failed to read block metadata: %s", err))
return
Expand Down

0 comments on commit 55f2aea

Please sign in to comment.