Home Download Docs Code Community
     1	/*
     2	Copyright 2014 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	// Package memory registers the "memory" blobserver storage type, storing blobs
    18	// in an in-memory map.
    19	package memory // import "perkeep.org/pkg/blobserver/memory"
    20	
    21	import (
    22		"bytes"
    23		"context"
    24		"fmt"
    25		"io"
    26		"os"
    27		"sort"
    28		"sync"
    29		"sync/atomic"
    30	
    31		"perkeep.org/internal/lru"
    32		"perkeep.org/pkg/blob"
    33		"perkeep.org/pkg/blobserver"
    34	
    35		"go4.org/jsonconfig"
    36		"go4.org/types"
    37	)
    38	
    39	// Storage is an in-memory implementation of the blobserver Storage
    40	// interface. It also includes other convenience methods used by
    41	// tests.
    42	//
    43	// Its zero value is usable.
    44	type Storage struct {
    45		maxSize int64 // or zero if no limit
    46	
    47		mu   sync.RWMutex        // guards following 2 fields.
    48		m    map[blob.Ref][]byte // maps blob ref to its contents
    49		size int64               // sum of len(values(m))
    50	
    51		// lru is non-nil if we're in cache mode.
    52		// Else it maps blobref.String() to a nil value.
    53		lru *lru.Cache
    54	
    55		blobsFetched int64 // atomic
    56		bytesFetched int64 // atomic
    57	}
    58	
    59	var _ blobserver.BlobStreamer = (*Storage)(nil)
    60	
    61	func init() {
    62		blobserver.RegisterStorageConstructor("memory", blobserver.StorageConstructor(newFromConfig))
    63	}
    64	
    65	func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
    66		if err := config.Validate(); err != nil {
    67			return nil, err
    68		}
    69		return &Storage{}, nil
    70	}
    71	
    72	// NewCache returns a cache that won't store more than size bytes.
    73	// Blobs are evicted in LRU order.
    74	func NewCache(size int64) *Storage {
    75		return &Storage{
    76			maxSize: size,
    77			lru:     lru.New(0), // infinite items; we evict by size, not count
    78		}
    79	}
    80	
    81	func (s *Storage) Fetch(ctx context.Context, ref blob.Ref) (file io.ReadCloser, size uint32, err error) {
    82		s.mu.RLock()
    83		defer s.mu.RUnlock()
    84		if s.lru != nil {
    85			s.lru.Get(ref.String()) // force to head
    86		}
    87		if s.m == nil {
    88			err = os.ErrNotExist
    89			return
    90		}
    91		b, ok := s.m[ref]
    92		if !ok {
    93			err = os.ErrNotExist
    94			return
    95		}
    96		size = uint32(len(b))
    97		atomic.AddInt64(&s.blobsFetched, 1)
    98		atomic.AddInt64(&s.bytesFetched, int64(len(b)))
    99	
   100		return struct {
   101			*io.SectionReader
   102			io.Closer
   103		}{
   104			io.NewSectionReader(bytes.NewReader(b), 0, int64(size)),
   105			types.NopCloser,
   106		}, size, nil
   107	}
   108	
   109	func (s *Storage) SubFetch(ctx context.Context, ref blob.Ref, offset, length int64) (io.ReadCloser, error) {
   110		if offset < 0 || length < 0 {
   111			return nil, blob.ErrNegativeSubFetch
   112		}
   113		s.mu.RLock()
   114		defer s.mu.RUnlock()
   115		b, ok := s.m[ref]
   116		if !ok {
   117			return nil, os.ErrNotExist
   118		}
   119		if offset > int64(len(b)) {
   120			return nil, blob.ErrOutOfRangeOffsetSubFetch
   121		}
   122		atomic.AddInt64(&s.blobsFetched, 1)
   123		atomic.AddInt64(&s.bytesFetched, length)
   124	
   125		return struct {
   126			*io.SectionReader
   127			io.Closer
   128		}{
   129			io.NewSectionReader(bytes.NewReader(b), offset, int64(length)),
   130			types.NopCloser,
   131		}, nil
   132	}
   133	
   134	func (s *Storage) ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (blob.SizedRef, error) {
   135		sb := blob.SizedRef{}
   136		h := br.Hash()
   137		if h == nil {
   138			return sb, fmt.Errorf("Unsupported blobref hash for %s", br)
   139		}
   140		all, err := io.ReadAll(io.TeeReader(source, h))
   141		if err != nil {
   142			return sb, err
   143		}
   144		if !br.HashMatches(h) {
   145			// This is a somewhat redundant check, since
   146			// blobserver.Receive now does it. But for testing code,
   147			// it's worth the cost.
   148			return sb, fmt.Errorf("Hash mismatch receiving blob %s", br)
   149		}
   150		s.mu.Lock()
   151		defer s.mu.Unlock()
   152		if s.m == nil {
   153			s.m = make(map[blob.Ref][]byte)
   154		}
   155		_, had := s.m[br]
   156		if !had {
   157			s.m[br] = all
   158			if s.lru != nil {
   159				s.lru.Add(br.String(), nil)
   160			}
   161			s.size += int64(len(all))
   162			for s.maxSize != 0 && s.size > s.maxSize {
   163				if key, _ := s.lru.RemoveOldest(); key != "" {
   164					s.removeBlobLocked(blob.MustParse(key))
   165				} else {
   166					break // shouldn't happen
   167				}
   168			}
   169		}
   170		return blob.SizedRef{Ref: br, Size: uint32(len(all))}, nil
   171	}
   172	
   173	func (s *Storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
   174		for _, br := range blobs {
   175			s.mu.RLock()
   176			b, ok := s.m[br]
   177			s.mu.RUnlock()
   178			if ok {
   179				if err := fn(blob.SizedRef{Ref: br, Size: uint32(len(b))}); err != nil {
   180					return err
   181				}
   182			}
   183		}
   184		return nil
   185	}
   186	
   187	func (s *Storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
   188		defer close(dest)
   189		s.mu.RLock()
   190		defer s.mu.RUnlock()
   191	
   192		// TODO(bradfitz): care about keeping this sorted like we used
   193		// to? I think it was more expensive than it was worth before,
   194		// since maintaining it was more costly than how often it was
   195		// used. But perhaps it'd make sense to maintain it lazily:
   196		// construct it on EnumerateBlobs but invalidate it everywhere
   197		// else.  Probably doesn't matter much.
   198		sorted := make([]blob.Ref, 0, len(s.m))
   199		for br := range s.m {
   200			sorted = append(sorted, br)
   201		}
   202		sort.Sort(blob.ByRef(sorted))
   203	
   204		n := 0
   205		for _, br := range sorted {
   206			if after != "" && br.String() <= after {
   207				continue
   208			}
   209			select {
   210			case dest <- blob.SizedRef{Ref: br, Size: uint32(len(s.m[br]))}:
   211			case <-ctx.Done():
   212				return ctx.Err()
   213			}
   214			n++
   215			if limit > 0 && n == limit {
   216				break
   217			}
   218		}
   219		return nil
   220	}
   221	
   222	func (s *Storage) StreamBlobs(ctx context.Context, dest chan<- blobserver.BlobAndToken, contToken string) error {
   223		// for this impl, contToken is >= blobref.String()
   224		defer close(dest)
   225		s.mu.RLock()
   226		defer s.mu.RUnlock()
   227	
   228		sorted := make([]blob.Ref, 0, len(s.m))
   229		for br := range s.m {
   230			sorted = append(sorted, br)
   231		}
   232		sort.Sort(blob.ByRef(sorted))
   233	
   234		for _, br := range sorted {
   235			if br.String() < contToken {
   236				continue
   237			}
   238			contents := s.m[br]
   239			select {
   240			case <-ctx.Done():
   241				return ctx.Err()
   242			case dest <- blobserver.BlobAndToken{
   243				Blob: blob.NewBlob(br, uint32(len(contents)), func(ctx context.Context) ([]byte, error) {
   244					return contents, nil
   245				}),
   246				Token: br.String(),
   247			}:
   248			}
   249		}
   250		return nil
   251	}
   252	
   253	func (s *Storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
   254		s.mu.Lock()
   255		defer s.mu.Unlock()
   256		for _, br := range blobs {
   257			s.removeBlobLocked(br)
   258		}
   259		return nil
   260	}
   261	
   262	func (s *Storage) removeBlobLocked(br blob.Ref) {
   263		v, had := s.m[br]
   264		if !had {
   265			return
   266		}
   267		s.size -= int64(len(v))
   268		delete(s.m, br)
   269	}
   270	
   271	// TODO(mpl): remove or move BlobContents
   272	// See comment in https://camlistore-review.googlesource.com/#/c/3986/24/pkg/blobserver/localdisk/localdisk.go
   273	
   274	// BlobContents returns as a string the contents of the blob br.
   275	func (s *Storage) BlobContents(br blob.Ref) (contents string, ok bool) {
   276		s.mu.RLock()
   277		defer s.mu.RUnlock()
   278		b, ok := s.m[br]
   279		if !ok {
   280			return
   281		}
   282		return string(b), true
   283	}
   284	
   285	// NumBlobs returns the number of blobs stored in s.
   286	func (s *Storage) NumBlobs() int {
   287		s.mu.RLock()
   288		defer s.mu.RUnlock()
   289		return len(s.m)
   290	}
   291	
   292	// SumBlobSize returns the total size in bytes of all the blobs in s.
   293	func (s *Storage) SumBlobSize() int64 {
   294		s.mu.RLock()
   295		defer s.mu.RUnlock()
   296		return s.size
   297	}
   298	
   299	// BlobrefStrings returns the sorted stringified blobrefs stored in s.
   300	func (s *Storage) BlobrefStrings() []string {
   301		s.mu.RLock()
   302		defer s.mu.RUnlock()
   303		sorted := make([]string, 0, len(s.m))
   304		for br := range s.m {
   305			sorted = append(sorted, br.String())
   306		}
   307		sort.Strings(sorted)
   308		return sorted
   309	}
   310	
   311	// Stats returns the number of blobs and number of bytes that were fetched from s.
   312	func (s *Storage) Stats() (blobsFetched, bytesFetched int64) {
   313		return atomic.LoadInt64(&s.blobsFetched), atomic.LoadInt64(&s.bytesFetched)
   314	}
Website layout inspired by memcached.
Content by the authors.