Home Download Docs Code Community
     1	/*
     2	Copyright 2014 The Perkeep Authors.
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	// Package leveldb provides an implementation of sorted.KeyValue
    18	// on top of a single mutable database file on disk using
    19	// github.com/syndtr/goleveldb.
    20	package leveldb // import "perkeep.org/pkg/sorted/leveldb"
    21	
    22	import (
    23		"errors"
    24		"fmt"
    25		"log"
    26		"os"
    27		"sync"
    28	
    29		"go4.org/jsonconfig"
    30		"perkeep.org/pkg/env"
    31		"perkeep.org/pkg/sorted"
    32	
    33		"github.com/syndtr/goleveldb/leveldb"
    34		"github.com/syndtr/goleveldb/leveldb/filter"
    35		"github.com/syndtr/goleveldb/leveldb/iterator"
    36		"github.com/syndtr/goleveldb/leveldb/opt"
    37		"github.com/syndtr/goleveldb/leveldb/util"
    38	)
    39	
    40	var _ sorted.Wiper = (*kvis)(nil)
    41	
    42	func init() {
    43		sorted.RegisterKeyValue("leveldb", newKeyValueFromJSONConfig)
    44	}
    45	
    46	// NewStorage is a convenience that calls newKeyValueFromJSONConfig
    47	// with file as the leveldb storage directory.
    48	func NewStorage(file string) (sorted.KeyValue, error) {
    49		return newKeyValueFromJSONConfig(jsonconfig.Obj{"file": file})
    50	}
    51	
    52	// newKeyValueFromJSONConfig returns a KeyValue implementation on top of a
    53	// github.com/syndtr/goleveldb/leveldb file.
    54	func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
    55		file := cfg.RequiredString("file")
    56		if err := cfg.Validate(); err != nil {
    57			return nil, err
    58		}
    59		strictness := opt.DefaultStrict
    60		if env.IsDev() {
    61			// Be more strict in dev mode.
    62			strictness = opt.StrictAll
    63		}
    64		opts := &opt.Options{
    65			// The default is 10,
    66			// 8 means 2.126% or 1/47th disk check rate,
    67			// 10 means 0.812% error rate (1/2^(bits/1.44)) or 1/123th disk check rate,
    68			// 12 means 0.31% or 1/322th disk check rate.
    69			// TODO(tgulacsi): decide which number is the best here. Till that go with the default.
    70			Filter: filter.NewBloomFilter(10),
    71			Strict: strictness,
    72		}
    73		db, err := leveldb.OpenFile(file, opts)
    74		if err != nil {
    75			return nil, err
    76		}
    77		is := &kvis{
    78			db:       db,
    79			path:     file,
    80			opts:     opts,
    81			readOpts: &opt.ReadOptions{Strict: strictness},
    82			// On machine crash we want to reindex anyway, and
    83			// fsyncs may impose great performance penalty.
    84			writeOpts: &opt.WriteOptions{Sync: false},
    85		}
    86		return is, nil
    87	}
    88	
    89	type kvis struct {
    90		path      string
    91		db        *leveldb.DB
    92		opts      *opt.Options
    93		readOpts  *opt.ReadOptions
    94		writeOpts *opt.WriteOptions
    95	}
    96	
    97	func (is *kvis) Get(key string) (string, error) {
    98		val, err := is.db.Get([]byte(key), is.readOpts)
    99		if err != nil {
   100			if err == leveldb.ErrNotFound {
   101				return "", sorted.ErrNotFound
   102			}
   103			return "", err
   104		}
   105		if val == nil {
   106			return "", sorted.ErrNotFound
   107		}
   108		return string(val), nil
   109	}
   110	
   111	func (is *kvis) Set(key, value string) error {
   112		if err := sorted.CheckSizes(key, value); err != nil {
   113			log.Printf("Skipping storing (%q:%q): %v", key, value, err)
   114			return nil
   115		}
   116		return is.db.Put([]byte(key), []byte(value), is.writeOpts)
   117	}
   118	
   119	func (is *kvis) Delete(key string) error {
   120		return is.db.Delete([]byte(key), is.writeOpts)
   121	}
   122	
   123	func (is *kvis) Find(start, end string) sorted.Iterator {
   124		var startB, endB []byte
   125		// A nil Range.Start is treated as a key before all keys in the DB.
   126		if start != "" {
   127			startB = []byte(start)
   128		}
   129		// A nil Range.Limit is treated as a key after all keys in the DB.
   130		if end != "" {
   131			endB = []byte(end)
   132		}
   133		it := &iter{
   134			it: is.db.NewIterator(
   135				&util.Range{Start: startB, Limit: endB},
   136				is.readOpts,
   137			),
   138		}
   139		return it
   140	}
   141	
   142	func (is *kvis) Wipe() error {
   143		// Close the already open DB.
   144		if err := is.db.Close(); err != nil {
   145			return err
   146		}
   147		if err := os.RemoveAll(is.path); err != nil {
   148			return err
   149		}
   150	
   151		db, err := leveldb.OpenFile(is.path, is.opts)
   152		if err != nil {
   153			return fmt.Errorf("error creating %s: %v", is.path, err)
   154		}
   155		is.db = db
   156		return nil
   157	}
   158	
   159	func (is *kvis) BeginBatch() sorted.BatchMutation {
   160		return &lvbatch{batch: new(leveldb.Batch)}
   161	}
   162	
   163	type lvbatch struct {
   164		errMu sync.Mutex
   165		err   error // Set if one of the mutations had too large a key or value. Sticky.
   166	
   167		batch *leveldb.Batch
   168	}
   169	
   170	func (lvb *lvbatch) Set(key, value string) {
   171		lvb.errMu.Lock()
   172		defer lvb.errMu.Unlock()
   173		if lvb.err != nil {
   174			return
   175		}
   176		if err := sorted.CheckSizes(key, value); err != nil {
   177			log.Printf("Skipping storing (%q:%q): %v", key, value, err)
   178			return
   179		}
   180		lvb.batch.Put([]byte(key), []byte(value))
   181	}
   182	
   183	func (lvb *lvbatch) Delete(key string) {
   184		lvb.batch.Delete([]byte(key))
   185	}
   186	
   187	func (is *kvis) CommitBatch(bm sorted.BatchMutation) error {
   188		b, ok := bm.(*lvbatch)
   189		if !ok {
   190			return errors.New("invalid batch type")
   191		}
   192		b.errMu.Lock()
   193		defer b.errMu.Unlock()
   194		if b.err != nil {
   195			return b.err
   196		}
   197		return is.db.Write(b.batch, is.writeOpts)
   198	}
   199	
   200	func (is *kvis) Close() error {
   201		return is.db.Close()
   202	}
   203	
   204	type iter struct {
   205		it iterator.Iterator
   206	
   207		skey, sval *string // for caching string values
   208	
   209		// closed is not strictly necessary, but helps us detect programmer
   210		// errors like calling Next on a Closed iterator (which panics).
   211		closed bool
   212	}
   213	
   214	func (it *iter) Close() error {
   215		it.closed = true
   216		it.it.Release()
   217		return it.it.Error()
   218	}
   219	
   220	func (it *iter) KeyBytes() []byte {
   221		return it.it.Key()
   222	}
   223	
   224	func (it *iter) Key() string {
   225		if it.skey != nil {
   226			return *it.skey
   227		}
   228		str := string(it.it.Key())
   229		it.skey = &str
   230		return str
   231	}
   232	
   233	func (it *iter) ValueBytes() []byte {
   234		return it.it.Value()
   235	}
   236	
   237	func (it *iter) Value() string {
   238		if it.sval != nil {
   239			return *it.sval
   240		}
   241		str := string(it.it.Value())
   242		it.sval = &str
   243		return str
   244	}
   245	
   246	func (it *iter) Next() bool {
   247		if it.closed {
   248			panic("Next called on closed iterator")
   249		}
   250		it.skey, it.sval = nil, nil
   251		return it.it.Next()
   252	}
Website layout inspired by memcached.
Content by the authors.