Skip to content

Commit

Permalink
feat: Implement chunk support for collector (#270)
Browse files Browse the repository at this point in the history
Implement chunk support for collector. The garbage collector now deletes any ChunkRef and the underlying objects even when the parent BlobRef remains active.
  • Loading branch information
yuryu authored Jul 20, 2021
1 parent 01ea4ad commit 0b93c1e
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 18 deletions.
14 changes: 13 additions & 1 deletion deploy/terraform/gcp/datastore.tf
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,16 @@ resource "google_datastore_index" "blob_status_updated_at" {
name = "Timestamps.UpdatedAt"
direction = "ASCENDING"
}
}
}

resource "google_datastore_index" "chunk_status_updated_at" {
kind = "chunk"
properties {
name = "Status"
direction = "ASCENDING"
}
properties {
name = "Timestamps.UpdatedAt"
direction = "ASCENDING"
}
}
112 changes: 95 additions & 17 deletions internal/app/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"fmt"
"time"

"github.com/google/uuid"
"github.com/googleforgames/open-saves/internal/pkg/blob"
"github.com/googleforgames/open-saves/internal/pkg/cache"
"github.com/googleforgames/open-saves/internal/pkg/cache/redis"
"github.com/googleforgames/open-saves/internal/pkg/metadb"
"github.com/googleforgames/open-saves/internal/pkg/metadb/blobref"
"github.com/googleforgames/open-saves/internal/pkg/metadb/blobref/chunkref"
log "github.com/sirupsen/logrus"
"gocloud.dev/gcerrors"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -93,6 +95,77 @@ func (c *Collector) run(ctx context.Context) {
}
for _, s := range statuses {
c.deleteMatchingBlobRefs(ctx, s, c.cfg.Before)
c.deleteMatchingChunkRefs(ctx, s, c.cfg.Before)
}
}

func (c *Collector) deleteChunk(ctx context.Context, chunk *chunkref.ChunkRef) error {
if err := c.blob.Delete(ctx, chunk.ObjectPath()); err != nil {
if gcerrors.Code(err) != gcerrors.NotFound {
log.Errorf("Blob.Delete failed for chunkref key(%v): %v", chunk.Key, err)
if chunk.Status != blobref.StatusError {
chunk.Fail()
if err := c.metaDB.UpdateChunkRef(ctx, chunk); err != nil {
log.Errorf("MetaDB.UpdateChunkRef failed for key(%v): %v", chunk.Key, err)
}
}
return err
} else {
log.Warnf("Blob (%v) was not found.", chunk.ObjectPath())
}
}
return nil
}

func (c *Collector) deleteChildChunks(ctx context.Context, blobKey uuid.UUID) error {
cur := c.metaDB.GetChildChunkRefs(ctx, blobKey)
for {
chunk, err := cur.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Errorf("cursor.Next() returned error: %v", err)
return err
}
if err := c.deleteChunk(ctx, chunk); err != nil {
return err
}
}
return nil
}

func (c *Collector) markBlobFailed(ctx context.Context, blob *blobref.BlobRef) {
if blob.Status != blobref.StatusError {
blob.Fail()
_, err := c.metaDB.UpdateBlobRef(ctx, blob)
if err != nil {
log.Errorf("MetaDB.UpdateBlobRef failed for key(%v): %v", blob.Key, err)
}
}
}

func (c *Collector) deleteBlob(ctx context.Context, blob *blobref.BlobRef) {
if blob.Chunked {
if err := c.deleteChildChunks(ctx, blob.Key); err != nil {
c.markBlobFailed(ctx, blob)
return
}
} else {
if err := c.blob.Delete(ctx, blob.ObjectPath()); err != nil {
if gcerrors.Code(err) != gcerrors.NotFound {
log.Errorf("Blob.Delete failed for key(%v): %v", blob.Key, err)
c.markBlobFailed(ctx, blob)
return
} else {
log.Warnf("Blob (%v) was not found. Deleting BlobRef (%v) anyway.", blob.ObjectPath(), blob.Key)
}
}
}
if err := c.metaDB.DeleteBlobRef(ctx, blob.Key); err != nil {
log.Errorf("DeleteBlobRef failed for key(%v): %v", blob.Key, err)
} else {
log.Infof("Deleted BlobRef (%v), status = %v", blob.Key, blob.Status)
}
}

Expand All @@ -112,25 +185,30 @@ func (c *Collector) deleteMatchingBlobRefs(ctx context.Context, status blobref.S
log.Errorf("cursor.Next() returned error: %v", err)
break
}
if err := c.blob.Delete(ctx, blob.ObjectPath()); err != nil {
if gcerrors.Code(err) != gcerrors.NotFound {
log.Errorf("Blob.Delete failed for key(%v): %v", blob.Key, err)
if blob.Status != blobref.StatusError {
blob.Fail()
_, err := c.metaDB.UpdateBlobRef(ctx, blob)
if err != nil {
log.Errorf("MetaDB.UpdateBlobRef failed for key(%v): %v", blob.Key, err)
}
}
continue
} else {
log.Warnf("Blob (%v) was not found. Deleting BlobRef (%v) anyway.", blob.ObjectPath(), blob.Key)
}
c.deleteBlob(ctx, blob)
}
return nil
}

func (c *Collector) deleteMatchingChunkRefs(ctx context.Context, status blobref.Status, olderThan time.Time) error {
log.Infof("Garbage collecting ChunkRef objects with status = %v, older than %v", status, olderThan)
cursor := c.metaDB.ListChunkRefsByStatus(ctx, status, olderThan)
for {
chunk, err := cursor.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Errorf("cursor.Next() return error: %v", err)
return err
}
if err := c.metaDB.DeleteBlobRef(ctx, blob.Key); err != nil {
log.Errorf("DeleteBlobRef failed for key(%v): %v", blob.Key, err)
if err := c.deleteChunk(ctx, chunk); err != nil {
log.Errorf("deleteChunk failed for chunk (%v): %v", chunk.Key, err)
continue
}
if err := c.metaDB.DeleteChunkRef(ctx, chunk.BlobRef, chunk.Key); err != nil {
log.Errorf("DeleteChunkRef failed for chunk (%v): %v", chunk.Key, err)
}
log.Infof("Deleted BlobRef (%v), status = %v", blob.Key, blob.Status)
}
return nil
}
70 changes: 70 additions & 0 deletions internal/app/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"cloud.google.com/go/datastore"
"github.com/google/uuid"
"github.com/googleforgames/open-saves/internal/pkg/metadb/blobref"
"github.com/googleforgames/open-saves/internal/pkg/metadb/blobref/chunkref"
"github.com/googleforgames/open-saves/internal/pkg/metadb/record"
"github.com/googleforgames/open-saves/internal/pkg/metadb/store"
"github.com/stretchr/testify/assert"
Expand All @@ -39,6 +40,7 @@ const (
storeKind = "store"
recordKind = "record"
blobKind = "blob"
chunkKind = "chunk"
testTimeThreshold = -1 * time.Hour
)

Expand Down Expand Up @@ -113,6 +115,22 @@ func setupTestBlobRef(ctx context.Context, t *testing.T, ds *datastore.Client, b
})
}

func chunkRefKey(chunk *chunkref.ChunkRef) *datastore.Key {
return datastore.NameKey(chunkKind, chunk.Key.String(),
datastore.NameKey(blobKind, chunk.BlobRef.String(), nil))
}

func setupTestChunkRef(ctx context.Context, t *testing.T, collector *Collector, ds *datastore.Client, chunk *chunkref.ChunkRef) {
t.Helper()

if err := collector.metaDB.InsertChunkRef(ctx, chunk); err != nil {
t.Fatalf("InsertChunkRef failed: %v", err)
}
t.Cleanup(func() {
ds.Delete(ctx, chunkRefKey(chunk))
})
}

func setupExternalBlob(ctx context.Context, t *testing.T, collector *Collector, path string) {
t.Helper()
err := collector.blob.Put(ctx, path, []byte{})
Expand Down Expand Up @@ -206,3 +224,55 @@ func TestCollector_DeletesUnlinkedBlobRefs(t *testing.T) {
assert.Equal(t, gcerrors.NotFound, gcerrors.Code(err))
}
}

func TestCollector_DeletesChunkedBlobs(t *testing.T) {
ctx := context.Background()
collector := newTestCollector(ctx, t)
store := setupTestStore(ctx, t, collector)
record := setupTestRecord(ctx, t, collector, store.Key)
blob := blobref.NewChunkedBlobRef(store.Key, record.Key)
ds := newDatastoreClient(ctx, t)
setupTestBlobRef(ctx, t, ds, blob)

const numChunkRefs = 5
chunks := make([]*chunkref.ChunkRef, 0, numChunkRefs)

// 0 and 2 are old, to be deleted
// 1 and 3 have the applicable statuses but new
// 4 is still initializing
for i := 0; i < numChunkRefs; i++ {
chunk := chunkref.New(blob.Key, int32(i))
chunk.Timestamps.CreatedAt = collector.cfg.Before
chunk.Timestamps.UpdatedAt = collector.cfg.Before
chunks = append(chunks, chunk)
}
chunks[0].MarkForDeletion()
chunks[0].Timestamps.UpdatedAt = collector.cfg.Before.Add(-1 * time.Microsecond)
chunks[1].MarkForDeletion()
chunks[2].Fail()
chunks[2].Timestamps.UpdatedAt = collector.cfg.Before.Add(-1 * time.Microsecond)
chunks[3].Fail()

for _, c := range chunks {
setupTestChunkRef(ctx, t, collector, ds, c)
setupExternalBlob(ctx, t, collector, c.ObjectPath())
}
collector.run(ctx)

exists := []bool{false, true, false, true, true}
for i, e := range exists {
chunk := new(chunkref.ChunkRef)
err := ds.Get(ctx, chunkRefKey(chunks[i]), chunk)
if e {
assert.NoError(t, err)
} else {
assert.ErrorIs(t, datastore.ErrNoSuchEntity, err)
}
_, err = collector.blob.Get(ctx, chunks[i].ObjectPath())
if e {
assert.NoError(t, err)
} else {
assert.Equal(t, gcerrors.NotFound, gcerrors.Code(err))
}
}
}
51 changes: 51 additions & 0 deletions internal/pkg/metadb/blobref/chunkref/cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package chunkref

import (
ds "cloud.google.com/go/datastore"
"github.com/googleforgames/open-saves/internal/pkg/metadb/blobref"
)

var (
ErrIteratorNil = blobref.ErrIteratorNil
)

// ChunkRefCursor is a database cursor for ChunkRef.
type ChunkRefCursor struct {
iter *ds.Iterator
}

func NewCursor(i *ds.Iterator) *ChunkRefCursor {
return &ChunkRefCursor{iter: i}
}

// Next advances the iterator and returns the next value.
// Returns nil and and iterator.Done at the end of the iterator.
// Returns ErrIteratorNil if the iterator is nil.
func (i *ChunkRefCursor) Next() (*ChunkRef, error) {
if i == nil {
return nil, ErrIteratorNil
}
if i.iter == nil {
return nil, ErrIteratorNil
}
var chunk ChunkRef
_, err := i.iter.Next(&chunk)
if err != nil {
return nil, err
}
return &chunk, nil
}
28 changes: 28 additions & 0 deletions internal/pkg/metadb/blobref/chunkref/cursor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package chunkref

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestCursor_NextOnNil(t *testing.T) {
c := NewCursor(nil)
b, err := c.Next()
assert.Nil(t, b)
assert.ErrorIs(t, ErrIteratorNil, err)
}
33 changes: 33 additions & 0 deletions internal/pkg/metadb/metadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,24 @@ func (m *MetaDB) DeleteBlobRef(ctx context.Context, key uuid.UUID) error {
return datastoreErrToGRPCStatus(err)
}

// DeleteChunkRef deletes the ChunkRef object from the database.
// Returned errors:
// - NotFound: the chunkref object is not found.
// - FailedPrecondition: the chunkref status is Ready and can't be deleted.
func (m *MetaDB) DeleteChunkRef(ctx context.Context, blobKey, key uuid.UUID) error {
_, err := m.client.RunInTransaction(ctx, func(tx *ds.Transaction) error {
var chunk chunkref.ChunkRef
if err := tx.Get(m.createChunkRefKey(blobKey, key), &chunk); err != nil {
return err
}
if chunk.Status == blobref.StatusReady {
return status.Error(codes.FailedPrecondition, "chunk is currently marked as ready. mark it for deletion first")
}
return tx.Delete(m.createChunkRefKey(blobKey, key))
})
return datastoreErrToGRPCStatus(err)
}

// ListBlobRefsByStatus returns a cursor that iterates over BlobRefs
// where Status = status and UpdatedAt < olderThan.
func (m *MetaDB) ListBlobRefsByStatus(ctx context.Context, status blobref.Status, olderThan time.Time) (*blobref.BlobRefCursor, error) {
Expand All @@ -631,6 +649,21 @@ func (m *MetaDB) ListBlobRefsByStatus(ctx context.Context, status blobref.Status
return iter, nil
}

// ListChunkRefsByStatus returns a cursor that iterates over ChunkRefs
// where Status = status and UpdatedAt < olderThan.
func (m *MetaDB) ListChunkRefsByStatus(ctx context.Context, status blobref.Status, olderThan time.Time) *chunkref.ChunkRefCursor {
query := m.newQuery(chunkKind).Filter("Status = ", int(status)).
Filter("Timestamps.UpdatedAt <", olderThan)
return chunkref.NewCursor(m.client.Run(ctx, query))
}

// GetChildChunkRefs returns a ChunkRef cursor that iterats over child ChunkRef
// entries of the BlobRef specified by blobkey.
func (m *MetaDB) GetChildChunkRefs(ctx context.Context, blobKey uuid.UUID) *chunkref.ChunkRefCursor {
query := m.newQuery(chunkKind).Ancestor(m.createBlobKey(blobKey))
return chunkref.NewCursor(m.client.Run(ctx, query))
}

func addPropertyFilter(q *ds.Query, f *pb.QueryFilter) (*ds.Query, error) {
switch f.Operator {
case pb.FilterOperator_EQUAL:
Expand Down
Loading

0 comments on commit 0b93c1e

Please sign in to comment.