diff --git a/pkg/iter/tree.go b/pkg/iter/tree.go new file mode 100644 index 000000000..4f6695087 --- /dev/null +++ b/pkg/iter/tree.go @@ -0,0 +1,31 @@ +package iter + +import ( + "github.com/grafana/phlare/pkg/util/loser" +) + +var _ Iterator[interface{}] = &TreeIterator[interface{}]{} + +type TreeIterator[T any] struct { + *loser.Tree[T, Iterator[T]] +} + +// NewTreeIterator returns an Iterator that iterates over the given loser tree iterator. +func NewTreeIterator[T any](tree *loser.Tree[T, Iterator[T]]) *TreeIterator[T] { + return &TreeIterator[T]{ + Tree: tree, + } +} + +func (it TreeIterator[T]) At() T { + return it.Tree.Winner().At() +} + +func (it *TreeIterator[T]) Err() error { + return it.Tree.Winner().Err() +} + +func (it *TreeIterator[T]) Close() error { + it.Tree.Close() + return nil +} diff --git a/pkg/parquet/row_reader.go b/pkg/parquet/row_reader.go index 89a786f6a..80e0d2551 100644 --- a/pkg/parquet/row_reader.go +++ b/pkg/parquet/row_reader.go @@ -16,7 +16,7 @@ const ( var ( _ parquet.RowReader = (*emptyRowReader)(nil) _ parquet.RowReader = (*ErrRowReader)(nil) - _ parquet.RowReader = (*MergeRowReader)(nil) + _ parquet.RowReader = (*IteratorRowReader)(nil) _ iter.Iterator[parquet.Row] = (*BufferedRowReaderIterator)(nil) EmptyRowReader = &emptyRowReader{} @@ -32,10 +32,6 @@ func NewErrRowReader(err error) *ErrRowReader { return &ErrRowReader{err: err} } func (e ErrRowReader) ReadRows(rows []parquet.Row) (int, error) { return 0, e.err } -type MergeRowReader struct { - tree *loser.Tree[parquet.Row, iter.Iterator[parquet.Row]] -} - // NewMergeRowReader returns a RowReader that k-way merges the given readers using the less function. // Each reader must be sorted according to the less function already. func NewMergeRowReader(readers []parquet.RowReader, maxValue parquet.Row, less func(parquet.Row, parquet.Row) bool) parquet.RowReader { @@ -50,18 +46,31 @@ func NewMergeRowReader(readers []parquet.RowReader, maxValue parquet.Row, less f its[i] = NewBufferedRowReaderIterator(readers[i], defaultRowBufferSize) } - return &MergeRowReader{ - tree: loser.New( - its, - maxValue, - func(it iter.Iterator[parquet.Row]) parquet.Row { return it.At() }, - less, - func(it iter.Iterator[parquet.Row]) { it.Close() }, + return NewIteratorRowReader( + iter.NewTreeIterator[parquet.Row]( + loser.New( + its, + maxValue, + func(it iter.Iterator[parquet.Row]) parquet.Row { return it.At() }, + less, + func(it iter.Iterator[parquet.Row]) { _ = it.Close() }, + ), ), + ) +} + +type IteratorRowReader struct { + iter.Iterator[parquet.Row] +} + +// NewIteratorRowReader returns a RowReader that reads rows from the given iterator. +func NewIteratorRowReader(it iter.Iterator[parquet.Row]) *IteratorRowReader { + return &IteratorRowReader{ + Iterator: it, } } -func (s *MergeRowReader) ReadRows(rows []parquet.Row) (int, error) { +func (it *IteratorRowReader) ReadRows(rows []parquet.Row) (int, error) { var n int if len(rows) == 0 { return 0, nil @@ -70,11 +79,13 @@ func (s *MergeRowReader) ReadRows(rows []parquet.Row) (int, error) { if n == len(rows) { break } - if !s.tree.Next() { - s.tree.Close() + if !it.Next() { + if err := it.Close(); err != nil { + return n, err + } return n, io.EOF } - rows[n] = s.tree.Winner().At() + rows[n] = it.At() n++ } return n, nil diff --git a/pkg/parquet/row_reader_test.go b/pkg/parquet/row_reader_test.go index 075d44bc7..48d84d5bc 100644 --- a/pkg/parquet/row_reader_test.go +++ b/pkg/parquet/row_reader_test.go @@ -141,3 +141,26 @@ func TestNewMergeRowReader(t *testing.T) { }) } } + +func TestIteratorRowReader(t *testing.T) { + it := NewIteratorRowReader( + NewBufferedRowReaderIterator(NewBatchReader([][]parquet.Row{ + {{parquet.Int32Value(1)}, {parquet.Int32Value(2)}, {parquet.Int32Value(3)}}, + {{parquet.Int32Value(4)}, {parquet.Int32Value(5)}, {parquet.Int32Value(6)}}, + {{parquet.Int32Value(7)}, {parquet.Int32Value(8)}, {parquet.Int32Value(9)}}, + }), 4), + ) + actual, err := ReadAllWithBufferSize(it, 3) + require.NoError(t, err) + require.Equal(t, []parquet.Row{ + {parquet.Int32Value(1)}, + {parquet.Int32Value(2)}, + {parquet.Int32Value(3)}, + {parquet.Int32Value(4)}, + {parquet.Int32Value(5)}, + {parquet.Int32Value(6)}, + {parquet.Int32Value(7)}, + {parquet.Int32Value(8)}, + {parquet.Int32Value(9)}, + }, actual) +} diff --git a/pkg/parquet/row_writer.go b/pkg/parquet/row_writer.go index eebc76590..54bf3abbd 100644 --- a/pkg/parquet/row_writer.go +++ b/pkg/parquet/row_writer.go @@ -6,7 +6,7 @@ import ( "github.com/segmentio/parquet-go" ) -type RowGroupWriter interface { +type RowWriterFlusher interface { parquet.RowWriter Flush() error } @@ -14,7 +14,7 @@ type RowGroupWriter interface { // CopyAsRowGroups copies row groups to dst from src and flush a rowgroup per rowGroupNumCount read. // It returns the total number of rows copied and the number of row groups written. // Flush is called to create a new row group. -func CopyAsRowGroups(dst RowGroupWriter, src parquet.RowReader, rowGroupNumCount int) (total uint64, rowGroupCount uint64, err error) { +func CopyAsRowGroups(dst RowWriterFlusher, src parquet.RowReader, rowGroupNumCount int) (total uint64, rowGroupCount uint64, err error) { if rowGroupNumCount <= 0 { panic("rowGroupNumCount must be positive") } diff --git a/pkg/parquet/row_writer_test.go b/pkg/parquet/row_writer_test.go index 8b6ddb52e..285cc6ab9 100644 --- a/pkg/parquet/row_writer_test.go +++ b/pkg/parquet/row_writer_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/require" ) -var _ RowGroupWriter = (*TestRowGroupWriter)(nil) +var _ RowWriterFlusher = (*TestRowGroupWriter)(nil) type TestRowGroupWriter struct { RowGroups [][]parquet.Row diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go new file mode 100644 index 000000000..696659556 --- /dev/null +++ b/pkg/phlaredb/compact.go @@ -0,0 +1,301 @@ +package phlaredb + +import ( + "context" + "math" + "os" + "path/filepath" + + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/segmentio/parquet-go" + + "github.com/grafana/phlare/pkg/iter" + phlaremodel "github.com/grafana/phlare/pkg/model" + phlareparquet "github.com/grafana/phlare/pkg/parquet" + "github.com/grafana/phlare/pkg/phlaredb/block" + schemav1 "github.com/grafana/phlare/pkg/phlaredb/schemas/v1" + "github.com/grafana/phlare/pkg/phlaredb/tsdb/index" + "github.com/grafana/phlare/pkg/util" + "github.com/grafana/phlare/pkg/util/loser" +) + +type BlockReader interface { + Profiles() []parquet.RowGroup + Index() IndexReader + // Symbols() SymbolReader +} + +type SymbolReader interface { + // todo +} + +func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, error) { + meta := block.NewMeta() + blockPath := filepath.Join(dst, meta.ULID.String()) + if err := os.MkdirAll(blockPath, 0o777); err != nil { + return block.Meta{}, err + } + indexPath := filepath.Join(blockPath, block.IndexFilename) + indexw, err := prepareIndexWriter(ctx, indexPath, src) + if err != nil { + return block.Meta{}, err + } + profilePath := filepath.Join(blockPath, (&schemav1.ProfilePersister{}).Name()+block.ParquetSuffix) + profileFile, err := os.OpenFile(profilePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644) + if err != nil { + return block.Meta{}, err + } + profileWriter := newProfileWriter(profileFile) + + // todo new symbdb + + rowsIt := newMergeRowProfileIterator(src) + rowsIt = newSeriesRewriter(rowsIt, indexw) + rowsIt = newSymbolsRewriter(rowsIt) + reader := phlareparquet.NewIteratorRowReader(newRowsIterator(rowsIt)) + + // todo size of rowgroups. + _, _, err = phlareparquet.CopyAsRowGroups(profileWriter, reader, 1024) + if err != nil { + return block.Meta{}, err + } + + // flush the index file. + if err := indexw.Close(); err != nil { + return block.Meta{}, err + } + + if err := profileWriter.Close(); err != nil { + return block.Meta{}, err + } + // todo: block meta + if _, err := meta.WriteToFile(util.Logger, blockPath); err != nil { + return block.Meta{}, err + } + return *meta, nil +} + +type profileRow struct { + timeNanos int64 + + seriesRef uint32 + labels phlaremodel.Labels + fp model.Fingerprint + row schemav1.ProfileRow +} + +type profileRowIterator struct { + profiles iter.Iterator[parquet.Row] + index IndexReader + err error + + currentRow profileRow + chunks []index.ChunkMeta +} + +func newProfileRowIterator(profiles iter.Iterator[parquet.Row], idx IndexReader) *profileRowIterator { + return &profileRowIterator{ + profiles: profiles, + index: idx, + currentRow: profileRow{ + seriesRef: math.MaxUint32, + }, + chunks: make([]index.ChunkMeta, 1), + } +} + +func (p *profileRowIterator) At() profileRow { + return p.currentRow +} + +func (p *profileRowIterator) Next() bool { + if !p.profiles.Next() { + return false + } + p.currentRow.row = schemav1.ProfileRow(p.profiles.At()) + seriesIndex := p.currentRow.row.SeriesIndex() + p.currentRow.timeNanos = p.currentRow.row.TimeNanos() + // do we have a new series? + if seriesIndex == p.currentRow.seriesRef { + return true + } + p.currentRow.seriesRef = seriesIndex + fp, err := p.index.Series(storage.SeriesRef(p.currentRow.seriesRef), &p.currentRow.labels, &p.chunks) + if err != nil { + p.err = err + return false + } + p.currentRow.fp = model.Fingerprint(fp) + return true +} + +func (p *profileRowIterator) Err() error { + if p.err != nil { + return p.err + } + return p.profiles.Err() +} + +func (p *profileRowIterator) Close() error { + return p.profiles.Close() +} + +func newMergeRowProfileIterator(src []BlockReader) iter.Iterator[profileRow] { + its := make([]iter.Iterator[profileRow], len(src)) + for i, s := range src { + // todo: may be we could merge rowgroups in parallel but that requires locking. + reader := parquet.MultiRowGroup(s.Profiles()...).Rows() + its[i] = newProfileRowIterator( + phlareparquet.NewBufferedRowReaderIterator(reader, 1024), + s.Index(), + ) + } + return &dedupeProfileRowIterator{ + Iterator: iter.NewTreeIterator(loser.New( + its, + profileRow{ + timeNanos: math.MaxInt64, + }, + func(it iter.Iterator[profileRow]) profileRow { return it.At() }, + func(r1, r2 profileRow) bool { + // first handle max profileRow if it's either r1 or r2 + if r1.timeNanos == math.MaxInt64 { + return false + } + if r2.timeNanos == math.MaxInt64 { + return true + } + // then handle normal profileRows + if cmp := phlaremodel.CompareLabelPairs(r1.labels, r2.labels); cmp != 0 { + return cmp < 0 + } + return r1.timeNanos < r2.timeNanos + }, + func(it iter.Iterator[profileRow]) { _ = it.Close() }, + )), + } +} + +type symbolsRewriter struct { + iter.Iterator[profileRow] +} + +// todo remap symbols & ingest symbols +func newSymbolsRewriter(it iter.Iterator[profileRow]) *symbolsRewriter { + return &symbolsRewriter{ + Iterator: it, + } +} + +type seriesRewriter struct { + iter.Iterator[profileRow] + + indexw *index.Writer + + seriesRef storage.SeriesRef + labels phlaremodel.Labels + previousFp model.Fingerprint + currentChunkMeta index.ChunkMeta + err error +} + +func newSeriesRewriter(it iter.Iterator[profileRow], indexw *index.Writer) *seriesRewriter { + return &seriesRewriter{ + Iterator: it, + indexw: indexw, + } +} + +func (s *seriesRewriter) Next() bool { + if !s.Iterator.Next() { + if s.previousFp != 0 { + if err := s.indexw.AddSeries(s.seriesRef, s.labels, s.previousFp, s.currentChunkMeta); err != nil { + s.err = err + return false + } + } + return false + } + currentProfile := s.Iterator.At() + if s.previousFp != currentProfile.fp { + // store the previous series. + if s.previousFp != 0 { + if err := s.indexw.AddSeries(s.seriesRef, s.labels, s.previousFp, s.currentChunkMeta); err != nil { + s.err = err + return false + } + } + s.seriesRef++ + s.labels = currentProfile.labels.Clone() + s.previousFp = currentProfile.fp + s.currentChunkMeta.MinTime = currentProfile.timeNanos + } + s.currentChunkMeta.MaxTime = currentProfile.timeNanos + currentProfile.row.SetSeriesIndex(uint32(s.seriesRef)) + return true +} + +type rowsIterator struct { + iter.Iterator[profileRow] +} + +func newRowsIterator(it iter.Iterator[profileRow]) *rowsIterator { + return &rowsIterator{ + Iterator: it, + } +} + +func (r *rowsIterator) At() parquet.Row { + return parquet.Row(r.Iterator.At().row) +} + +type dedupeProfileRowIterator struct { + iter.Iterator[profileRow] + + prevFP model.Fingerprint + prevTimeNanos int64 +} + +func (it *dedupeProfileRowIterator) Next() bool { + for { + if !it.Iterator.Next() { + return false + } + currentProfile := it.Iterator.At() + if it.prevFP == currentProfile.fp && it.prevTimeNanos == currentProfile.timeNanos { + // skip duplicate profile + continue + } + it.prevFP = currentProfile.fp + it.prevTimeNanos = currentProfile.timeNanos + return true + } +} + +func prepareIndexWriter(ctx context.Context, path string, readers []BlockReader) (*index.Writer, error) { + var symbols index.StringIter + indexw, err := index.NewWriter(ctx, path) + if err != nil { + return nil, err + } + for i, r := range readers { + if i == 0 { + symbols = r.Index().Symbols() + } + symbols = tsdb.NewMergedStringIter(symbols, r.Index().Symbols()) + } + + for symbols.Next() { + if err := indexw.AddSymbol(symbols.At()); err != nil { + return nil, errors.Wrap(err, "add symbol") + } + } + if symbols.Err() != nil { + return nil, errors.Wrap(symbols.Err(), "next symbol") + } + + return indexw, nil +} diff --git a/pkg/phlaredb/profile_store.go b/pkg/phlaredb/profile_store.go index 5d21f7590..6a0648b4e 100644 --- a/pkg/phlaredb/profile_store.go +++ b/pkg/phlaredb/profile_store.go @@ -63,6 +63,14 @@ type profileStore struct { flushBufferLbs []phlaremodel.Labels } +func newProfileWriter(writer io.Writer) *parquet.GenericWriter[*schemav1.Profile] { + return parquet.NewGenericWriter[*schemav1.Profile](writer, (&schemav1.ProfilePersister{}).Schema(), + parquet.ColumnPageBuffers(parquet.NewFileBufferPool(os.TempDir(), "phlaredb-parquet-buffers*")), + parquet.CreatedBy("github.com/grafana/phlare/", build.Version, build.Revision), + parquet.PageBufferSize(3*1024*1024), + ) +} + func newProfileStore(phlarectx context.Context) *profileStore { s := &profileStore{ logger: phlarecontext.Logger(phlarectx), @@ -76,11 +84,7 @@ func newProfileStore(phlarectx context.Context) *profileStore { go s.cutRowGroupLoop() // Initialize writer on /dev/null // TODO: Reuse parquet.Writer beyond life time of the head. - s.writer = parquet.NewGenericWriter[*schemav1.Profile](io.Discard, s.persister.Schema(), - parquet.ColumnPageBuffers(parquet.NewFileBufferPool(os.TempDir(), "phlaredb-parquet-buffers*")), - parquet.CreatedBy("github.com/grafana/phlare/", build.Version, build.Revision), - parquet.PageBufferSize(3*1024*1024), - ) + s.writer = newProfileWriter(io.Discard) return s } diff --git a/pkg/phlaredb/schemas/v1/profiles.go b/pkg/phlaredb/schemas/v1/profiles.go index 7fbe2288b..e18cf45ec 100644 --- a/pkg/phlaredb/schemas/v1/profiles.go +++ b/pkg/phlaredb/schemas/v1/profiles.go @@ -458,3 +458,24 @@ func lessProfileRows(r1, r2 parquet.Row) bool { } return ts1 < ts2 } + +type ProfileRow parquet.Row + +func (p ProfileRow) SeriesIndex() uint32 { + return p[seriesIndexColIndex].Uint32() +} + +func (p ProfileRow) TimeNanos() int64 { + var ts int64 + for i := len(p) - 1; i >= 0; i-- { + if p[i].Column() == timeNanoColIndex { + ts = p[i].Int64() + break + } + } + return ts +} + +func (p ProfileRow) SetSeriesIndex(v uint32) { + p[seriesIndexColIndex] = parquet.Int32Value(int32(v)).Level(0, 0, seriesIndexColIndex) +}