Skip to content

Commit

Permalink
Fix goroutine leak in BufferedIterator (#2168)
Browse files Browse the repository at this point in the history
* Add tests for a goroutine leak

* Fixes goroutine leak in BufferedIterator

* Fixes context cancellation propagation issue

* Do not returns cancellation error on Err()
  • Loading branch information
cyriltovena authored Jul 25, 2023
1 parent 7c35179 commit d4d32e2
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 6 deletions.
31 changes: 30 additions & 1 deletion pkg/iter/iter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package iter

import (
"context"
"sort"
"sync"

"github.com/samber/lo"
"golang.org/x/exp/constraints"
Expand Down Expand Up @@ -229,22 +231,38 @@ type BufferedIterator[T any] struct {
Iterator[T]
buff chan T
at T

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

// NewBufferedIterator returns an iterator that reads asynchronously from the given iterator and buffers up to size elements.
func NewBufferedIterator[T any](it Iterator[T], size int) Iterator[T] {
ctx, cancel := context.WithCancel(context.Background())
buffered := &BufferedIterator[T]{
Iterator: it,
buff: make(chan T, size),
ctx: ctx,
cancel: cancel,
}

buffered.wg.Add(1)
go buffered.fill()
return buffered
}

func (it *BufferedIterator[T]) fill() {
defer it.wg.Done()
defer close(it.buff)

for it.Iterator.Next() {
it.buff <- it.Iterator.At()
select {
case <-it.ctx.Done():
return
case it.buff <- it.Iterator.At():
continue
}
}
}

Expand All @@ -256,6 +274,17 @@ func (it *BufferedIterator[T]) Next() bool {
return ok
}

func (it *BufferedIterator[T]) Err() error {
return it.Iterator.Err()
}

func (it *BufferedIterator[T]) At() T {
return it.at
}

func (it *BufferedIterator[T]) Close() error {
err := it.Iterator.Close()
it.cancel()
it.wg.Wait()
return err
}
9 changes: 9 additions & 0 deletions pkg/iter/profiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

phlaremodel "github.com/grafana/pyroscope/pkg/model"
)
Expand Down Expand Up @@ -171,6 +172,14 @@ func Test_BufferedIterator(t *testing.T) {
}
}

func Test_BufferedIteratorClose(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())

it := NewBufferedIterator(
NewSliceIterator(generatesProfiles(t, 100)), 10)
require.NoError(t, it.Close())
}

func generatesProfiles(t *testing.T, n int) []profile {
t.Helper()
profiles := make([]profile, n)
Expand Down
9 changes: 6 additions & 3 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,11 @@ func SelectMatchingProfiles(ctx context.Context, request *ingestv1.SelectProfile
}

if err := g.Wait(); err != nil {
for _, it := range iters {
if it != nil {
runutil.CloseWithLogOnErr(util.Logger, it, "closing buffered iterator")
}
}
return nil, err
}
return iters, nil
Expand Down Expand Up @@ -933,9 +938,7 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
}
}

var (
buf [][]parquet.Value
)
var buf [][]parquet.Value

pIt := query.NewBinaryJoinIterator(
0,
Expand Down
34 changes: 32 additions & 2 deletions pkg/phlaredb/block_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package phlaredb
import (
"context"
"fmt"
"math/rand"
"strings"
"testing"
"time"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

ingestv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
Expand Down Expand Up @@ -134,7 +136,6 @@ func (p *profileCounter) Next() bool {
}

return r

}

func TestBlockCompatability(t *testing.T) {
Expand All @@ -148,7 +149,6 @@ func TestBlockCompatability(t *testing.T) {

for _, meta := range metas {
t.Run(fmt.Sprintf("block-v%d-%s", meta.Version, meta.ULID.String()), func(t *testing.T) {

q := NewSingleBlockQuerierFromMeta(ctx, bucket, meta)
require.NoError(t, q.Open(ctx))

Expand Down Expand Up @@ -186,6 +186,36 @@ func TestBlockCompatability(t *testing.T) {

require.Equal(t, int(meta.Stats.NumProfiles), profileCount)
})
}
}

type fakeQuerier struct {
Querier
doErr bool
}

func (f *fakeQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) {
// add some jitter
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
if f.doErr {
return nil, fmt.Errorf("fake error")
}
profiles := []Profile{}
for i := 0; i < 100000; i++ {
profiles = append(profiles, BlockProfile{})
}
return iter.NewSliceIterator(profiles), nil
}

func TestSelectMatchingProfilesCleanUp(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())

_, err := SelectMatchingProfiles(context.Background(), &ingestv1.SelectProfilesRequest{}, Queriers{
&fakeQuerier{},
&fakeQuerier{},
&fakeQuerier{},
&fakeQuerier{},
&fakeQuerier{doErr: true},
})
require.Error(t, err)
}

0 comments on commit d4d32e2

Please sign in to comment.