Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Add first draft of block compaction #818

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions pkg/iter/tree.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package iter

import (
"github.com/grafana/phlare/pkg/util/loser"
)

var _ Iterator[interface{}] = &TreeIterator[interface{}]{}

type TreeIterator[T any] struct {
*loser.Tree[T, Iterator[T]]
}

// NewTreeIterator returns an Iterator that iterates over the given loser tree iterator.
func NewTreeIterator[T any](tree *loser.Tree[T, Iterator[T]]) *TreeIterator[T] {
return &TreeIterator[T]{
Tree: tree,
}
}

func (it TreeIterator[T]) At() T {
return it.Tree.Winner().At()
}

func (it *TreeIterator[T]) Err() error {
return it.Tree.Winner().Err()
}

func (it *TreeIterator[T]) Close() error {
it.Tree.Close()
return nil
}
43 changes: 27 additions & 16 deletions pkg/parquet/row_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
var (
_ parquet.RowReader = (*emptyRowReader)(nil)
_ parquet.RowReader = (*ErrRowReader)(nil)
_ parquet.RowReader = (*MergeRowReader)(nil)
_ parquet.RowReader = (*IteratorRowReader)(nil)
_ iter.Iterator[parquet.Row] = (*BufferedRowReaderIterator)(nil)

EmptyRowReader = &emptyRowReader{}
Expand All @@ -32,10 +32,6 @@ func NewErrRowReader(err error) *ErrRowReader { return &ErrRowReader{err: err} }

func (e ErrRowReader) ReadRows(rows []parquet.Row) (int, error) { return 0, e.err }

type MergeRowReader struct {
tree *loser.Tree[parquet.Row, iter.Iterator[parquet.Row]]
}

// NewMergeRowReader returns a RowReader that k-way merges the given readers using the less function.
// Each reader must be sorted according to the less function already.
func NewMergeRowReader(readers []parquet.RowReader, maxValue parquet.Row, less func(parquet.Row, parquet.Row) bool) parquet.RowReader {
Expand All @@ -50,18 +46,31 @@ func NewMergeRowReader(readers []parquet.RowReader, maxValue parquet.Row, less f
its[i] = NewBufferedRowReaderIterator(readers[i], defaultRowBufferSize)
}

return &MergeRowReader{
tree: loser.New(
its,
maxValue,
func(it iter.Iterator[parquet.Row]) parquet.Row { return it.At() },
less,
func(it iter.Iterator[parquet.Row]) { it.Close() },
return NewIteratorRowReader(
iter.NewTreeIterator[parquet.Row](
loser.New(
its,
maxValue,
func(it iter.Iterator[parquet.Row]) parquet.Row { return it.At() },
less,
func(it iter.Iterator[parquet.Row]) { _ = it.Close() },
),
),
)
}

type IteratorRowReader struct {
iter.Iterator[parquet.Row]
}

// NewIteratorRowReader returns a RowReader that reads rows from the given iterator.
func NewIteratorRowReader(it iter.Iterator[parquet.Row]) *IteratorRowReader {
return &IteratorRowReader{
Iterator: it,
}
}

func (s *MergeRowReader) ReadRows(rows []parquet.Row) (int, error) {
func (it *IteratorRowReader) ReadRows(rows []parquet.Row) (int, error) {
var n int
if len(rows) == 0 {
return 0, nil
Expand All @@ -70,11 +79,13 @@ func (s *MergeRowReader) ReadRows(rows []parquet.Row) (int, error) {
if n == len(rows) {
break
}
if !s.tree.Next() {
s.tree.Close()
if !it.Next() {
if err := it.Close(); err != nil {
return n, err
}
return n, io.EOF
}
rows[n] = s.tree.Winner().At()
rows[n] = it.At()
n++
}
return n, nil
Expand Down
23 changes: 23 additions & 0 deletions pkg/parquet/row_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,26 @@ func TestNewMergeRowReader(t *testing.T) {
})
}
}

func TestIteratorRowReader(t *testing.T) {
it := NewIteratorRowReader(
NewBufferedRowReaderIterator(NewBatchReader([][]parquet.Row{
{{parquet.Int32Value(1)}, {parquet.Int32Value(2)}, {parquet.Int32Value(3)}},
{{parquet.Int32Value(4)}, {parquet.Int32Value(5)}, {parquet.Int32Value(6)}},
{{parquet.Int32Value(7)}, {parquet.Int32Value(8)}, {parquet.Int32Value(9)}},
}), 4),
)
actual, err := ReadAllWithBufferSize(it, 3)
require.NoError(t, err)
require.Equal(t, []parquet.Row{
{parquet.Int32Value(1)},
{parquet.Int32Value(2)},
{parquet.Int32Value(3)},
{parquet.Int32Value(4)},
{parquet.Int32Value(5)},
{parquet.Int32Value(6)},
{parquet.Int32Value(7)},
{parquet.Int32Value(8)},
{parquet.Int32Value(9)},
}, actual)
}
4 changes: 2 additions & 2 deletions pkg/parquet/row_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import (
"github.com/segmentio/parquet-go"
)

type RowGroupWriter interface {
type RowWriterFlusher interface {
parquet.RowWriter
Flush() error
}

// CopyAsRowGroups copies row groups to dst from src and flush a rowgroup per rowGroupNumCount read.
// It returns the total number of rows copied and the number of row groups written.
// Flush is called to create a new row group.
func CopyAsRowGroups(dst RowGroupWriter, src parquet.RowReader, rowGroupNumCount int) (total uint64, rowGroupCount uint64, err error) {
func CopyAsRowGroups(dst RowWriterFlusher, src parquet.RowReader, rowGroupNumCount int) (total uint64, rowGroupCount uint64, err error) {
if rowGroupNumCount <= 0 {
panic("rowGroupNumCount must be positive")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/parquet/row_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/stretchr/testify/require"
)

var _ RowGroupWriter = (*TestRowGroupWriter)(nil)
var _ RowWriterFlusher = (*TestRowGroupWriter)(nil)

type TestRowGroupWriter struct {
RowGroups [][]parquet.Row
Expand Down
Loading
Loading