Skip to content

Commit

Permalink
Add image blobs cache (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Oct 5, 2022
1 parent 3453e40 commit e92cc8c
Show file tree
Hide file tree
Showing 12 changed files with 419 additions and 150 deletions.
88 changes: 88 additions & 0 deletions blobscache/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package blobscache

import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"time"

json "github.com/json-iterator/go"
)

var (
ErrCacheNotFound = errors.New("cache not found")
)

type Client interface {
PutBlob(ctx context.Context, key string, blob []byte) error
GetBlob(ctx context.Context, key string) ([]byte, error)
}

func NewRemoteBlobsCache(url string) Client {
return &remoteBlobsCache{
url: url,
client: &http.Client{
Timeout: 10 * time.Second,
},
}
}

type remoteBlobsCache struct {
url string
client *http.Client
}

func (c *remoteBlobsCache) PutBlob(ctx context.Context, key string, blob []byte) error {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(&PubBlobRequest{
Key: key,
Blob: blob,
}); err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/PutBlob", c.url), &buf)
if err != nil {
return err
}
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if st := resp.StatusCode; st != http.StatusOK {
return fmt.Errorf("put blob failed, response status=%d", st)
}
return nil
}

func (c *remoteBlobsCache) GetBlob(ctx context.Context, key string) ([]byte, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(&GetBlobRequest{
Key: key,
}); err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/GetBlob", c.url), &buf)
if err != nil {
return nil, err
}
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if st := resp.StatusCode; st != http.StatusOK {
if st == http.StatusNotFound {
return nil, ErrCacheNotFound
}
return nil, fmt.Errorf("put blob failed, response status=%d", st)
}

var res GetBlobResponse
if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
return nil, err
}
return res.Blob, nil
}
137 changes: 137 additions & 0 deletions blobscache/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package blobscache

import (
"context"
"errors"
"fmt"
"net"
"net/http"
"time"

lru "github.com/hashicorp/golang-lru"
json "github.com/json-iterator/go"
"github.com/sirupsen/logrus"
)

type ServerConfig struct {
ServePort int
}

func NewBlobsCacheServer(log logrus.FieldLogger, cfg ServerConfig) *Server {
return &Server{
log: log,
cfg: cfg,
blobsCache: newMemoryBlobsCacheStore(log),
}
}

type Server struct {
log logrus.FieldLogger
cfg ServerConfig
blobsCache blobsCacheStore
listener net.Listener
}

func (s *Server) Start(ctx context.Context) {
mux := http.NewServeMux()
mux.HandleFunc("/PutBlob", s.putBlob)
mux.HandleFunc("/GetBlob", s.getBlob)

srv := &http.Server{
Addr: fmt.Sprintf(":%d", s.cfg.ServePort),
Handler: mux,
WriteTimeout: 5 * time.Second,
ReadTimeout: 5 * time.Second,
}

go func() {
<-ctx.Done()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
s.log.Errorf("blobs cache server shutdown: %v", err)
}
}()

if s.listener == nil {
var err error
s.listener, err = net.Listen("tcp", srv.Addr)
if err != nil {
s.log.Errorf("creating blobs cache server listener: %v", err)
return
}
}

if err := srv.Serve(s.listener); err != nil && errors.Is(err, http.ErrServerClosed) {
s.log.Errorf("serving blobs cache server: %v", err)
}
}

func (s *Server) putBlob(w http.ResponseWriter, r *http.Request) {
var req PubBlobRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
s.log.Errorf("decoding put blob request: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

s.blobsCache.putBlob(req.Key, req.Blob)

w.WriteHeader(http.StatusOK)
}

func (s *Server) getBlob(w http.ResponseWriter, r *http.Request) {
var req GetBlobRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
s.log.Errorf("decoding get blob request: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

blob, found := s.blobsCache.getBlob(req.Key)
if !found {
w.WriteHeader(http.StatusNotFound)
return
}

resp := GetBlobResponse{Blob: blob}
if err := json.NewEncoder(w).Encode(resp); err != nil {
s.log.Errorf("encoding get blob response: %v", err)
w.WriteHeader(http.StatusInternalServerError)
}
}

type blobsCacheStore interface {
putBlob(key string, blob []byte)
getBlob(key string) ([]byte, bool)
}

func newMemoryBlobsCacheStore(log logrus.FieldLogger) blobsCacheStore {
// One large blob json size is around 16KB, so we should use max 32MB of extra memory.
cache, _ := lru.New(2000)
return &memoryBlobsCacheStore{
log: log,
cache: cache,
}
}

type memoryBlobsCacheStore struct {
log logrus.FieldLogger
cache *lru.Cache
}

func (c *memoryBlobsCacheStore) putBlob(key string, blob []byte) {
c.log.Debugf("adding image blob to cache, current cache size=%d", c.cache.Len())
evicted := c.cache.Add(key, blob)
if evicted {
c.log.Info("evicted old image blob cache entry")
}
}

func (c *memoryBlobsCacheStore) getBlob(key string) ([]byte, bool) {
val, ok := c.cache.Get(key)
if !ok {
return nil, false
}
return val.([]byte), true
}
48 changes: 48 additions & 0 deletions blobscache/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package blobscache

import (
"context"
"errors"
"fmt"
"net"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
)

func TestBlobsCacheServer(t *testing.T) {
r := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

log := logrus.New()
log.SetLevel(logrus.DebugLevel)

srv := NewBlobsCacheServer(log, ServerConfig{})
listener, err := newLocalListener()
r.NoError(err)
srv.listener = listener
go srv.Start(ctx)

client := NewRemoteBlobsCache(fmt.Sprintf("http://%s", listener.Addr().String()))

// Wait unti server is ready
r.Eventually(func() bool {
_, err := client.GetBlob(ctx, "noop")
return errors.Is(err, ErrCacheNotFound)
}, 3*time.Second, 10*time.Millisecond)

blob := []byte(`{"some": "json"}`)
err = client.PutBlob(ctx, "b1", blob)
r.NoError(err)

addedBlob, err := client.GetBlob(ctx, "b1")
r.NoError(err)
r.Equal(blob, addedBlob)
}

func newLocalListener() (net.Listener, error) {
return net.Listen("tcp", "127.0.0.1:0")
}
16 changes: 16 additions & 0 deletions blobscache/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package blobscache

import json "github.com/json-iterator/go"

type PubBlobRequest struct {
Key string `json:"key"`
Blob json.RawMessage `json:"blob"`
}

type GetBlobRequest struct {
Key string `json:"key"`
}

type GetBlobResponse struct {
Blob json.RawMessage `json:"blob"`
}
Loading

0 comments on commit e92cc8c

Please sign in to comment.