Home Download Docs Code Community
     1	/*
     2	Copyright 2014 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	// Package buffer provides a sorted.KeyValue implementation that
    18	// buffers one KeyValue implementation in front of an another. It's
    19	// used for cases such as reindexing where you need a KeyValue but it
    20	// doesn't need to be flushed and consistent until the end.
    21	package buffer // import "perkeep.org/pkg/sorted/buffer"
    22	
    23	import (
    24		"fmt"
    25		"log"
    26		"sync"
    27	
    28		"perkeep.org/pkg/sorted"
    29	)
    30	
    31	// New returns a sorted.KeyValue implementation that adds a Flush
    32	// method to flush the buffer to the backing storage. A flush will
    33	// also be performed when maxBufferBytes are reached. If
    34	// maxBufferBytes <= 0, no automatic flushing is performed.
    35	func New(buffer, backing sorted.KeyValue, maxBufferBytes int64) *KeyValue {
    36		return &KeyValue{
    37			buf:       buffer,
    38			back:      backing,
    39			maxBuffer: maxBufferBytes,
    40		}
    41	}
    42	
    43	var _ sorted.KeyValue = (*KeyValue)(nil)
    44	
    45	type KeyValue struct {
    46		buf, back sorted.KeyValue
    47		maxBuffer int64
    48	
    49		bufMu    sync.Mutex
    50		buffered int64
    51	
    52		// This read lock should be held during Set/Get/Delete/BatchCommit,
    53		// and the write lock should be held during Flush.
    54		mu sync.RWMutex
    55	}
    56	
    57	func (kv *KeyValue) Flush() error {
    58		kv.mu.Lock()
    59		defer kv.mu.Unlock()
    60		var (
    61			bmback = kv.back.BeginBatch()
    62			bmbuf  = kv.buf.BeginBatch()
    63			commit = false
    64			it     = kv.buf.Find("", "")
    65		)
    66		for it.Next() {
    67			bmback.Set(it.Key(), it.Value())
    68			bmbuf.Delete(it.Key())
    69			commit = true
    70		}
    71		if err := it.Close(); err != nil {
    72			return err
    73		}
    74		if commit {
    75			if err := kv.back.CommitBatch(bmback); err != nil {
    76				return err
    77			}
    78			if err := kv.buf.CommitBatch(bmbuf); err != nil {
    79				return err
    80			}
    81			kv.bufMu.Lock()
    82			kv.buffered = 0
    83			kv.bufMu.Unlock()
    84		}
    85		return nil
    86	}
    87	
    88	func (kv *KeyValue) Get(key string) (string, error) {
    89		kv.mu.RLock()
    90		defer kv.mu.RUnlock()
    91		v, err := kv.buf.Get(key)
    92		switch err {
    93		case sorted.ErrNotFound:
    94			break
    95		case nil:
    96			return v, nil
    97		default:
    98			return "", err
    99		}
   100		return kv.back.Get(key)
   101	}
   102	
   103	func (kv *KeyValue) Set(key, value string) error {
   104		if err := sorted.CheckSizes(key, value); err != nil {
   105			log.Printf("Skipping storing (%q:%q): %v", key, value, err)
   106			return nil
   107		}
   108		kv.mu.RLock()
   109		err := kv.buf.Set(key, value)
   110		kv.mu.RUnlock()
   111		if err == nil {
   112			kv.bufMu.Lock()
   113			kv.buffered += int64(len(key) + len(value))
   114			doFlush := kv.buffered > kv.maxBuffer
   115			kv.bufMu.Unlock()
   116			if doFlush {
   117				err = kv.Flush()
   118			}
   119		}
   120		return err
   121	}
   122	
   123	func (kv *KeyValue) Delete(key string) error {
   124		kv.mu.RLock()
   125		defer kv.mu.RUnlock()
   126		// This isn't an ideal implementation, since it synchronously
   127		// deletes from the backing store. But deletes aren't really
   128		// used, so ignoring for now.
   129		// Could also use a syncutil.Group to do these in parallel,
   130		// but the buffer should be an in-memory implementation
   131		// anyway, so should be fast.
   132		err1 := kv.buf.Delete(key)
   133		err2 := kv.back.Delete(key)
   134		if err1 != nil {
   135			return err1
   136		}
   137		return err2
   138	}
   139	
   140	func (kv *KeyValue) BeginBatch() sorted.BatchMutation {
   141		return new(batch)
   142	}
   143	
   144	func (kv *KeyValue) CommitBatch(bm sorted.BatchMutation) error {
   145		kv.mu.RLock()
   146		defer kv.mu.RUnlock()
   147		b, ok := bm.(*batch)
   148		if !ok {
   149			return fmt.Errorf("unexpected BatchMutation type %T", bm)
   150		}
   151		var (
   152			// A batch mutation for applying this mutation to the buffer.
   153			bmbuf = kv.buf.BeginBatch()
   154			// A lazily created batch mutation for deleting from the backing
   155			// storage; this should be rare. (See Delete above.)
   156			bmback sorted.BatchMutation
   157		)
   158		for _, m := range b.mods {
   159			if m.isDelete {
   160				bmbuf.Delete(m.key)
   161				if bmback == nil {
   162					bmback = kv.back.BeginBatch()
   163				}
   164				bmback.Delete(m.key)
   165				continue
   166			} else {
   167				if err := sorted.CheckSizes(m.key, m.value); err != nil {
   168					log.Printf("Skipping storing (%q:%q): %v", m.key, m.value, err)
   169					continue
   170				}
   171			}
   172			bmbuf.Set(m.key, m.value)
   173		}
   174		if err := kv.buf.CommitBatch(bmbuf); err != nil {
   175			return err
   176		}
   177		if bmback != nil {
   178			return kv.back.CommitBatch(bmback)
   179		}
   180		return nil
   181	}
   182	
   183	func (kv *KeyValue) Close() error {
   184		if err := kv.Flush(); err != nil {
   185			return err
   186		}
   187		return kv.back.Close()
   188	}
   189	
   190	func (kv *KeyValue) Find(start, end string) sorted.Iterator {
   191		// TODO(adg): hold read lock while iterating? seems complicated
   192		ibuf := kv.buf.Find(start, end)
   193		iback := kv.back.Find(start, end)
   194		return &iter{
   195			buf:  subIter{Iterator: ibuf},
   196			back: subIter{Iterator: iback},
   197		}
   198	}
   199	
   200	type batch struct {
   201		mu   sync.Mutex
   202		mods []mod
   203	}
   204	
   205	type mod struct {
   206		isDelete   bool
   207		key, value string
   208	}
   209	
   210	func (b *batch) Set(key, value string) {
   211		defer b.mu.Unlock()
   212		b.mu.Lock()
   213		b.mods = append(b.mods, mod{key: key, value: value})
   214	}
   215	
   216	func (b *batch) Delete(key string) {
   217		defer b.mu.Unlock()
   218		b.mu.Lock()
   219		b.mods = append(b.mods, mod{key: key, isDelete: true})
   220	}
   221	
   222	type iter struct {
   223		buf, back subIter
   224	}
   225	
   226	func (it *iter) current() *subIter {
   227		switch {
   228		case it.back.eof:
   229			return &it.buf
   230		case it.buf.eof:
   231			return &it.back
   232		case it.buf.key <= it.back.key:
   233			return &it.buf
   234		default:
   235			return &it.back
   236		}
   237	}
   238	
   239	func (it *iter) Next() bool {
   240		// Call Next on both iterators for the first time, if we haven't
   241		// already, so that the key comparisons below are valid.
   242		start := false
   243		if it.buf.key == "" && !it.buf.eof {
   244			start = it.buf.next()
   245		}
   246		if it.back.key == "" && !it.buf.eof {
   247			start = it.back.next() || start
   248		}
   249		if start {
   250			// We started iterating with at least one value.
   251			return true
   252		}
   253		// Bail if both iterators are done.
   254		if it.buf.eof && it.back.eof {
   255			return false
   256		}
   257		// If one iterator is done, advance the other.
   258		if it.buf.eof {
   259			return it.back.next()
   260		}
   261		if it.back.eof {
   262			return it.buf.next()
   263		}
   264		// If both iterators still going,
   265		// advance the one that is further behind,
   266		// or both simultaneously if they point to the same key.
   267		switch {
   268		case it.buf.key < it.back.key:
   269			it.buf.next()
   270		case it.buf.key > it.back.key:
   271			it.back.next()
   272		case it.buf.key == it.back.key:
   273			n1, n2 := it.buf.next(), it.back.next()
   274			if !n1 && !n2 {
   275				// Both finished simultaneously.
   276				return false
   277			}
   278		}
   279		return true
   280	}
   281	
   282	func (it *iter) Key() string {
   283		return it.current().key
   284	}
   285	
   286	func (it *iter) Value() string {
   287		return it.current().Value()
   288	}
   289	
   290	func (it *iter) KeyBytes() []byte {
   291		return it.current().KeyBytes()
   292	}
   293	
   294	func (it *iter) ValueBytes() []byte {
   295		return it.current().ValueBytes()
   296	}
   297	
   298	func (it *iter) Close() error {
   299		err1 := it.buf.Close()
   300		err2 := it.back.Close()
   301		if err1 != nil {
   302			return err1
   303		}
   304		return err2
   305	}
   306	
   307	// subIter is an iterator (either the backing storage or the buffer) that
   308	// keeps track of the current key and whether it has reached EOF.
   309	type subIter struct {
   310		sorted.Iterator
   311		key string
   312		eof bool
   313	}
   314	
   315	func (it *subIter) next() bool {
   316		if it.Next() {
   317			it.key = it.Key()
   318			return true
   319		}
   320		it.eof = true
   321		return false
   322	}
Website layout inspired by memcached.
Content by the authors.