Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Commit

Permalink
Adds iteration error support in the loser.Tree (#836)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Jul 11, 2023
1 parent 7f69298 commit 0ba6fc4
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 12 deletions.
8 changes: 4 additions & 4 deletions pkg/iter/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func lessProfile(p1, p2 Profile) bool {

type MergeIterator[P Profile] struct {
tree *loser.Tree[P, Iterator[P]]
errs multierror.MultiError
closeErrs multierror.MultiError
current P
deduplicate bool
}
Expand Down Expand Up @@ -59,7 +59,7 @@ func NewMergeIterator[P Profile](max P, deduplicate bool, iters ...Iterator[P])
},
func(s Iterator[P]) {
if err := s.Close(); err != nil {
iter.errs.Add(err)
iter.closeErrs.Add(err)
}
})
return iter
Expand Down Expand Up @@ -88,12 +88,12 @@ func (i *MergeIterator[P]) At() P {
}

func (i *MergeIterator[P]) Err() error {
return i.errs.Err()
return i.tree.Err()
}

func (i *MergeIterator[P]) Close() error {
i.tree.Close()
return i.Err()
return i.closeErrs.Err()
}

type TimeRangedIterator[T Timestamp] struct {
Expand Down
3 changes: 3 additions & 0 deletions pkg/parquet/row_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ func (s *MergeRowReader) ReadRows(rows []parquet.Row) (int, error) {
}
if !s.tree.Next() {
s.tree.Close()
if err := s.tree.Err(); err != nil {
return n, err
}
return n, io.EOF
}
rows[n] = s.tree.Winner().At()
Expand Down
3 changes: 3 additions & 0 deletions pkg/querier/select_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ func skipDuplicates(ctx context.Context, its []MergeIterator) error {
}
span.LogFields(otlog.Int("duplicates", duplicates))
span.LogFields(otlog.Int("total", total))
if err := tree.Err(); err != nil {
errors.Add(err)
}

return errors.Err()
}
Expand Down
55 changes: 48 additions & 7 deletions pkg/util/loser/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,27 @@ package loser

type Sequence interface {
Next() bool // Advances and returns true if there is a value at this new position.
Err() error // Returns any error encountered while advancing.
}

// New returns a new loser tree that merges the given sequences.
// The sequences must be sorted according to the less function already.
// The maxVal is used to initialize the tree.
// The at function returns the current value of the sequence.
// The less function compares two values.
// The close function is called on each sequence when the tree is closed or when a sequence returns false from Next().
// If any sequence returns an error from Err() after Next() = false, the tree will stop and return that error in Err().
// Examples:
//
// tree := loser.New(...)
// defer tree.Close()
// for tree.Next() {
// value := tree.Winner().At()
// ...
// }
// if err := tree.Err(); err != nil {
// ...
// }
func New[E any, S Sequence](sequences []S, maxVal E, at func(S) E, less func(E, E) bool, close func(S)) *Tree[E, S] {
nSequences := len(sequences)
t := Tree[E, S]{
Expand All @@ -17,7 +36,11 @@ func New[E any, S Sequence](sequences []S, maxVal E, at func(S) E, less func(E,
}
for i, s := range sequences {
t.nodes[i+nSequences].items = s
t.moveNext(i + nSequences) // Must call Next on each item so that At() has a value.
if !t.moveNext(i + nSequences) { // Must call Next on each item so that At() has a value.
if t.err != nil {
break
}
}
}
if nSequences > 0 {
t.nodes[0].index = -1 // flag to be initialized on first call to Next().
Expand All @@ -44,6 +67,8 @@ type Tree[E any, S Sequence] struct {
less func(E, E) bool
close func(S) // Called when Next() returns false.
nodes []node[E, S]

err error
}

type node[E any, S Sequence] struct {
Expand All @@ -59,6 +84,7 @@ func (t *Tree[E, S]) moveNext(index int) bool {
return true
}
t.close(n.items) // Next() returned false; close it and mark as finished.
t.err = n.items.Err()
n.value = t.maxVal
n.index = -1
return false
Expand All @@ -69,7 +95,7 @@ func (t *Tree[E, S]) Winner() S {
}

func (t *Tree[E, S]) Next() bool {
if len(t.nodes) == 0 {
if len(t.nodes) == 0 || t.err != nil {
return false
}
if t.nodes[0].index == -1 { // If tree has not been initialized yet, do that.
Expand All @@ -79,11 +105,17 @@ func (t *Tree[E, S]) Next() bool {
if t.nodes[t.nodes[0].index].index == -1 { // already exhausted
return false
}
t.moveNext(t.nodes[0].index)
if !t.moveNext(t.nodes[0].index) && t.err != nil {
return false
}
t.replayGames(t.nodes[0].index)
return t.nodes[t.nodes[0].index].index != -1
}

func (t *Tree[E, S]) Err() error {
return t.err
}

func (t *Tree[E, S]) initialize() {
winners := make([]int, len(t.nodes))
// Initialize leaf nodes as winners to start.
Expand Down Expand Up @@ -131,15 +163,19 @@ func (t *Tree[E, S]) playGame(a, b int) (loser, winner int) {
func parent(i int) int { return i / 2 }

// Add a new sequence to the merge set
func (t *Tree[E, S]) Push(sequence S) {
func (t *Tree[E, S]) Push(sequence S) error {
// First, see if we can replace one that was previously finished.
for newPos := len(t.nodes) / 2; newPos < len(t.nodes); newPos++ {
if t.nodes[newPos].index == -1 {
t.nodes[newPos].index = newPos
t.nodes[newPos].items = sequence
t.moveNext(newPos)
if !t.moveNext(newPos) {
if t.err != nil {
return t.err
}
}
t.nodes[0].index = -1 // flag for re-initialize on next call to Next()
return
return nil
}
}
// We need to expand the tree. Pick the next biggest power of 2 to amortise resizing cost.
Expand All @@ -161,6 +197,11 @@ func (t *Tree[E, S]) Push(sequence S) {
t.nodes[i].index = -1
t.nodes[i].value = t.maxVal
}
t.moveNext(newPos)
if !t.moveNext(newPos) {
if t.err != nil {
return t.err
}
}
t.nodes[0].index = -1 // flag for re-initialize on next call to Next()
return nil
}
42 changes: 41 additions & 1 deletion pkg/util/loser/tree_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package loser_test

import (
"errors"
"math"
"testing"

Expand All @@ -10,6 +11,8 @@ import (
type List struct {
list []uint64
cur uint64

err error
}

func NewList(list ...uint64) *List {
Expand All @@ -20,6 +23,8 @@ func (it *List) At() uint64 {
return it.cur
}

func (it *List) Err() error { return it.err }

func (it *List) Next() bool {
if len(it.list) > 0 {
it.cur = it.list[0]
Expand Down Expand Up @@ -132,7 +137,9 @@ func TestPush(t *testing.T) {
}
lt := loser.New(nil, math.MaxUint64, at, less, close)
for _, s := range tt.args {
lt.Push(s)
if err := lt.Push(s); err != nil {
t.Fatalf("Push failed: %v", err)
}
}
checkIterablesEqual(t, tt.want, lt, at, at2, less)
if numCloses != len(tt.args) {
Expand All @@ -141,3 +148,36 @@ func TestPush(t *testing.T) {
})
}
}

func TestInitWithErr(t *testing.T) {
l := NewList()
l.err = errors.New("test")
l2 := NewList(5, 6, 7, 8)
tree := loser.New([]*List{l, l2}, math.MaxUint64, func(s *List) uint64 { return s.At() }, func(a, b uint64) bool { return a < b }, func(s *List) {})

if tree.Next() {
t.Errorf("Next() should have returned false")
}
if tree.Err() != l.err {
t.Errorf("Err() should have returned %v, got %v", l.err, tree.Err())
}
}

func TestErrDuringNext(t *testing.T) {
l := NewList(5)
l.err = errors.New("test")
tree := loser.New([]*List{l}, math.MaxUint64, func(s *List) uint64 { return s.At() }, func(a, b uint64) bool { return a < b }, func(s *List) {})

if !tree.Next() {
t.Errorf("Next() should have returned true")
}
if tree.Next() {
t.Errorf("Next() should have returned false")
}
if tree.Err() != l.err {
t.Errorf("Err() should have returned %v, got %v", l.err, tree.Err())
}
if tree.Next() {
t.Errorf("Next() should have returned false")
}
}

0 comments on commit 0ba6fc4

Please sign in to comment.