Skip to content

Commit

Permalink
Optimising IO Stats Computation (#1738)
Browse files Browse the repository at this point in the history
* stats computation under a flag

* reverting the interface type assertions

* reverting the interface type assertions

* - Moving the stat update (atomic operation accessed by multiple threads)
  from per hit to a per query level (at the end of collector.Collect()).
- Leveraging the DocumentMatch struct to track the bytes read stat value,
  inorder to update the TotBytesReadQueryTime stat at a query level.
- Removing the interface assertions, and making existing structs implement
  diskStatsReporter interface (else causes a drop in performance)
- Removing the flag which controls the io stats computation. Since the
  regression in throughput and latency is mitigated, keeping the stats
  computation on all the time.

* code cleanup; accounting for the bytes read for conjunction and disjunction searchers

* code cleanup and rebase fixes
  • Loading branch information
Thejas-bhat authored Oct 20, 2022
1 parent a70b8d1 commit 20e6eb5
Show file tree
Hide file tree
Showing 15 changed files with 80 additions and 68 deletions.
12 changes: 3 additions & 9 deletions index/scorch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,8 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
return err
}

switch segI := seg.(type) {
case segment.DiskStatsReporter:
totalBytesRead := segI.BytesRead() + prevBytesReadTotal
segI.ResetBytesRead(totalBytesRead)
seg = segI.(segment.Segment)
}
totalBytesRead := seg.BytesRead() + prevBytesReadTotal
seg.ResetBytesRead(totalBytesRead)

oldNewDocNums = make(map[uint64][]uint64)
for i, segNewDocNums := range newDocNums {
Expand Down Expand Up @@ -438,9 +434,7 @@ type segmentMerge struct {
func cumulateBytesRead(sbs []segment.Segment) uint64 {
var rv uint64
for _, seg := range sbs {
if segI, diskStatsAvailable := seg.(segment.DiskStatsReporter); diskStatsAvailable {
rv += segI.BytesRead()
}
rv += seg.BytesRead()
}
return rv
}
Expand Down
3 changes: 3 additions & 0 deletions index/scorch/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ func TestIndexReader(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
// Ignoring the BytesRead value, since it doesn't have
// relevance in this type of test
match.BytesRead = 0
if !reflect.DeepEqual(expectedMatch, match) {
t.Errorf("got %#v, expected %#v", match, expectedMatch)
}
Expand Down
1 change: 1 addition & 0 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func NewScorch(storeName string,
if ok {
rv.onAsyncError = RegistryAsyncErrorCallbacks[aecbName]
}

return rv, nil
}

Expand Down
68 changes: 27 additions & 41 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ var reflectStaticSizeIndexSnapshot int
// in the kvConfig.
var DefaultFieldTFRCacheThreshold uint64 = 10

type diskStatsReporter segment.DiskStatsReporter

func init() {
var is interface{} = IndexSnapshot{}
reflectStaticSizeIndexSnapshot = int(reflect.TypeOf(is).Size())
Expand Down Expand Up @@ -149,18 +147,15 @@ func (i *IndexSnapshot) newIndexSnapshotFieldDict(field string,
for index, segment := range i.segment {
go func(index int, segment *SegmentSnapshot) {
var prevBytesRead uint64
seg, diskStatsAvailable := segment.segment.(diskStatsReporter)
if diskStatsAvailable {
prevBytesRead = seg.BytesRead()
}
prevBytesRead = segment.segment.BytesRead()

dict, err := segment.segment.Dictionary(field)
if err != nil {
results <- &asynchSegmentResult{err: err}
} else {
if diskStatsAvailable {
atomic.AddUint64(&i.parent.stats.TotBytesReadAtQueryTime,
seg.BytesRead()-prevBytesRead)
}
atomic.AddUint64(&i.parent.stats.TotBytesReadAtQueryTime,
segment.segment.BytesRead()-prevBytesRead)

if randomLookup {
results <- &asynchSegmentResult{dict: dict}
} else {
Expand Down Expand Up @@ -435,11 +430,8 @@ func (i *IndexSnapshot) Document(id string) (rv index.Document, err error) {
segmentIndex, localDocNum := i.segmentIndexAndLocalDocNumFromGlobal(docNum)

rvd := document.NewDocument(id)
var prevBytesRead uint64
seg, diskStatsAvailable := i.segment[segmentIndex].segment.(segment.DiskStatsReporter)
if diskStatsAvailable {
prevBytesRead = seg.BytesRead()
}
prevBytesRead := i.segment[segmentIndex].segment.BytesRead()

err = i.segment[segmentIndex].VisitDocument(localDocNum, func(name string, typ byte, val []byte, pos []uint64) bool {
if name == "_id" {
return true
Expand Down Expand Up @@ -471,8 +463,8 @@ func (i *IndexSnapshot) Document(id string) (rv index.Document, err error) {
if err != nil {
return nil, err
}
if diskStatsAvailable {
delta := seg.BytesRead() - prevBytesRead

if delta := i.segment[segmentIndex].segment.BytesRead() - prevBytesRead; delta > 0 {
atomic.AddUint64(&i.parent.stats.TotBytesReadAtQueryTime, delta)
}
return rvd, nil
Expand Down Expand Up @@ -549,26 +541,22 @@ func (is *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,
if rv.dicts == nil {
rv.dicts = make([]segment.TermDictionary, len(is.segment))
for i, segment := range is.segment {
var prevBytesRead uint64
segP, diskStatsAvailable := segment.segment.(diskStatsReporter)
if diskStatsAvailable {
prevBytesRead = segP.BytesRead()
}
prevBytesRead := segment.segment.BytesRead()
dict, err := segment.segment.Dictionary(field)
if err != nil {
return nil, err
}
if diskStatsAvailable {
atomic.AddUint64(&is.parent.stats.TotBytesReadAtQueryTime, segP.BytesRead()-prevBytesRead)
if bytesRead := segment.segment.BytesRead(); bytesRead > prevBytesRead {
atomic.AddUint64(&is.parent.stats.TotBytesReadAtQueryTime, bytesRead-prevBytesRead)
}
rv.dicts[i] = dict
}
}

for i, segment := range is.segment {
var prevBytesReadPL uint64
if postings, diskStatsAvailable := rv.postings[i].(diskStatsReporter); diskStatsAvailable {
prevBytesReadPL = postings.BytesRead()
if rv.postings[i] != nil {
prevBytesReadPL = rv.postings[i].BytesRead()
}
pl, err := rv.dicts[i].PostingsList(term, segment.deleted, rv.postings[i])
if err != nil {
Expand All @@ -577,21 +565,19 @@ func (is *IndexSnapshot) TermFieldReader(term []byte, field string, includeFreq,
rv.postings[i] = pl

var prevBytesReadItr uint64
if itr, diskStatsAvailable := rv.iterators[i].(diskStatsReporter); diskStatsAvailable {
prevBytesReadItr = itr.BytesRead()
if rv.iterators[i] != nil {
prevBytesReadItr = rv.iterators[i].BytesRead()
}
rv.iterators[i] = pl.Iterator(includeFreq, includeNorm, includeTermVectors, rv.iterators[i])

if postings, diskStatsAvailable := pl.(diskStatsReporter); diskStatsAvailable &&
prevBytesReadPL < postings.BytesRead() {
if bytesRead := rv.postings[i].BytesRead(); prevBytesReadPL < bytesRead {
atomic.AddUint64(&is.parent.stats.TotBytesReadAtQueryTime,
postings.BytesRead()-prevBytesReadPL)
bytesRead-prevBytesReadPL)
}

if itr, diskStatsAvailable := rv.iterators[i].(diskStatsReporter); diskStatsAvailable &&
prevBytesReadItr < itr.BytesRead() {
if bytesRead := rv.iterators[i].BytesRead(); prevBytesReadItr < bytesRead {
atomic.AddUint64(&is.parent.stats.TotBytesReadAtQueryTime,
itr.BytesRead()-prevBytesReadItr)
bytesRead-prevBytesReadItr)
}
}
atomic.AddUint64(&is.parent.stats.TotTermSearchersStarted, uint64(1))
Expand Down Expand Up @@ -711,17 +697,13 @@ func (i *IndexSnapshot) documentVisitFieldTermsOnSegment(
}

if ssvOk && ssv != nil && len(vFields) > 0 {
var prevBytesRead uint64
ssvp, diskStatsAvailable := ssv.(segment.DiskStatsReporter)
if diskStatsAvailable {
prevBytesRead = ssvp.BytesRead()
}
prevBytesRead := ss.segment.BytesRead()
dvs, err = ssv.VisitDocValues(localDocNum, fields, visitor, dvs)
if err != nil {
return nil, nil, err
}
if diskStatsAvailable {
atomic.AddUint64(&i.parent.stats.TotBytesReadAtQueryTime, ssvp.BytesRead()-prevBytesRead)
if delta := ss.segment.BytesRead() - prevBytesRead; delta > 0 {
atomic.AddUint64(&i.parent.stats.TotBytesReadAtQueryTime, delta)
}
}

Expand Down Expand Up @@ -889,6 +871,10 @@ func (i *IndexSnapshot) CopyTo(d index.Directory) error {
return copyBolt.Sync()
}

func (s *IndexSnapshot) UpdateIOStats(val uint64) {
atomic.AddUint64(&s.parent.stats.TotBytesReadAtQueryTime, val)
}

func (i *IndexSnapshot) GetSpatialAnalyzerPlugin(typ string) (
index.SpatialAnalyzerPlugin, error) {
var rv index.SpatialAnalyzerPlugin
Expand Down
11 changes: 3 additions & 8 deletions index/scorch/snapshot_index_tfr.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,7 @@ func (i *IndexSnapshotTermFieldReader) Next(preAlloced *index.TermFieldDoc) (*in
}
// find the next hit
for i.segmentOffset < len(i.iterators) {
prevBytesRead := uint64(0)
itr, diskStatsAvailable := i.iterators[i.segmentOffset].(segment.DiskStatsReporter)
if diskStatsAvailable {
prevBytesRead = itr.BytesRead()
}
prevBytesRead := i.iterators[i.segmentOffset].BytesRead()
next, err := i.iterators[i.segmentOffset].Next()
if err != nil {
return nil, err
Expand All @@ -98,9 +94,8 @@ func (i *IndexSnapshotTermFieldReader) Next(preAlloced *index.TermFieldDoc) (*in
// this is because there are chances of having a series of loadChunk calls,
// and they have to be added together before sending the bytesRead at this point
// upstream.
if diskStatsAvailable {
delta := itr.BytesRead() - prevBytesRead
atomic.AddUint64(&i.snapshot.parent.stats.TotBytesReadAtQueryTime, uint64(delta))
if delta := i.iterators[i.segmentOffset].BytesRead() - prevBytesRead; delta > 0 {
rv.BytesRead = delta
}

return rv, nil
Expand Down
10 changes: 10 additions & 0 deletions index_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/blevesearch/bleve/v2/document"
"github.com/blevesearch/bleve/v2/index/scorch"
"github.com/blevesearch/bleve/v2/index/upsidedown"
"github.com/blevesearch/bleve/v2/mapping"
"github.com/blevesearch/bleve/v2/registry"
Expand Down Expand Up @@ -527,12 +528,21 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
}()
}
}
var totalBytesRead uint64
SendBytesRead := func(bytesRead uint64) {
totalBytesRead = bytesRead
}

ctx = context.WithValue(ctx, collector.SearchIOStatsCallbackKey,
collector.SearchIOStatsCallbackFunc(SendBytesRead))
err = coll.Collect(ctx, searcher, indexReader)
if err != nil {
return nil, err
}

if sr, ok := indexReader.(*scorch.IndexSnapshot); ok {
sr.UpdateIOStats(totalBytesRead)
}
hits := coll.Results()

var highlighter highlight.Highlighter
Expand Down
9 changes: 5 additions & 4 deletions index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ func TestBytesRead(t *testing.T) {
typeFieldMapping := NewTextFieldMapping()
typeFieldMapping.Store = false
documentMapping.AddFieldMappingsAt("type", typeFieldMapping)

idx, err := NewUsing(tmpIndexPath, indexMapping, Config.DefaultIndexType, Config.DefaultMemKVStore, nil)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -579,7 +580,7 @@ func TestBytesReadStored(t *testing.T) {
stats, _ := idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead != 15792 {
t.Fatalf("expected the bytes read stat to be around 15792, got %v", err)
t.Fatalf("expected the bytes read stat to be around 15792, got %v", bytesRead)
}
prevBytesRead := bytesRead

Expand All @@ -591,7 +592,7 @@ func TestBytesReadStored(t *testing.T) {
stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 15 {
t.Fatalf("expected the bytes read stat to be around 15, got %v", err)
t.Fatalf("expected the bytes read stat to be around 15, got %v", bytesRead-prevBytesRead)
}
prevBytesRead = bytesRead

Expand Down Expand Up @@ -660,7 +661,7 @@ func TestBytesReadStored(t *testing.T) {
stats, _ = idx1.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 12 {
t.Fatalf("expected the bytes read stat to be around 12, got %v", err)
t.Fatalf("expected the bytes read stat to be around 12, got %v", bytesRead-prevBytesRead)
}
prevBytesRead = bytesRead

Expand All @@ -674,7 +675,7 @@ func TestBytesReadStored(t *testing.T) {
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)

if bytesRead-prevBytesRead != 646 {
t.Fatalf("expected the bytes read stat to be around 646, got %v", err)
t.Fatalf("expected the bytes read stat to be around 646, got %v", bytesRead-prevBytesRead)
}
}

Expand Down
12 changes: 11 additions & 1 deletion search/collector/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type collectorCompare func(i, j *search.DocumentMatch) int

type collectorFixup func(d *search.DocumentMatch) error

const SearchIOStatsCallbackKey = "_search_io_stats_callback_key"

type SearchIOStatsCallbackFunc func(uint64)

// TopNCollector collects the top N hits, optionally skipping some results
type TopNCollector struct {
size int
Expand Down Expand Up @@ -197,14 +201,15 @@ func (hc *TopNCollector) Collect(ctx context.Context, searcher search.Searcher,
}

hc.needDocIds = hc.needDocIds || loadID

var totalBytesRead uint64
select {
case <-ctx.Done():
return ctx.Err()
default:
next, err = searcher.Next(searchContext)
}
for err == nil && next != nil {
totalBytesRead += next.BytesRead
if hc.total%CheckDoneEvery == 0 {
select {
case <-ctx.Done():
Expand All @@ -226,6 +231,11 @@ func (hc *TopNCollector) Collect(ctx context.Context, searcher search.Searcher,
next, err = searcher.Next(searchContext)
}

statsCallbackFn := ctx.Value(SearchIOStatsCallbackKey)
if statsCallbackFn != nil {
statsCallbackFn.(SearchIOStatsCallbackFunc)(totalBytesRead)
}

// help finalize/flush the results in case
// of custom document match handlers.
err = dmHandler(nil)
Expand Down
9 changes: 8 additions & 1 deletion search/scorer/scorer_conjunction.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ func NewConjunctionQueryScorer(options search.SearcherOptions) *ConjunctionQuery
options: options,
}
}

func getTotalBytesRead(matches []*search.DocumentMatch) uint64 {
var rv uint64
for _, match := range matches {
rv += match.BytesRead
}
return rv
}
func (s *ConjunctionQueryScorer) Score(ctx *search.SearchContext, constituents []*search.DocumentMatch) *search.DocumentMatch {
var sum float64
var childrenExplanations []*search.Explanation
Expand All @@ -67,6 +73,7 @@ func (s *ConjunctionQueryScorer) Score(ctx *search.SearchContext, constituents [
rv.Expl = newExpl
rv.FieldTermLocations = search.MergeFieldTermLocations(
rv.FieldTermLocations, constituents[1:])
rv.BytesRead = getTotalBytesRead(constituents)

return rv
}
2 changes: 1 addition & 1 deletion search/scorer/scorer_disjunction.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ func (s *DisjunctionQueryScorer) Score(ctx *search.SearchContext, constituents [
rv.Expl = newExpl
rv.FieldTermLocations = search.MergeFieldTermLocations(
rv.FieldTermLocations, constituents[1:])

rv.BytesRead = getTotalBytesRead(constituents)
return rv
}
2 changes: 1 addition & 1 deletion search/scorer/scorer_term.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,6 @@ func (s *TermQueryScorer) Score(ctx *search.SearchContext, termMatch *index.Term
})
}
}

rv.BytesRead = termMatch.BytesRead
return rv
}
2 changes: 2 additions & 0 deletions search/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ type DocumentMatch struct {
// be later incorporated into the Locations map when search
// results are completed
FieldTermLocations []FieldTermLocation `json:"-"`

BytesRead uint64 `json:"-"`
}

func (dm *DocumentMatch) AddFieldValue(name string, value interface{}) {
Expand Down
1 change: 1 addition & 0 deletions search/searcher/search_disjunction.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package searcher

import (
"fmt"

"github.com/blevesearch/bleve/v2/search"
index "github.com/blevesearch/bleve_index_api"
)
Expand Down
1 change: 0 additions & 1 deletion search/searcher/search_disjunction_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ func (s *DisjunctionSliceSearcher) updateMatches() error {
matchingIdxs = matchingIdxs[:0]
}
}

matching = append(matching, curr)
matchingIdxs = append(matchingIdxs, i)
}
Expand Down
Loading

0 comments on commit 20e6eb5

Please sign in to comment.