Home Download Docs Code Community
     1	/*
     2	Copyright 2013 The Perkeep Authors.
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	// Package kvfile provides an implementation of sorted.KeyValue
    18	// on top of a single mutable database file on disk using
    19	// modernc.org/kv.
    20	package kvfile // import "perkeep.org/pkg/sorted/kvfile"
    21	
    22	import (
    23		"bytes"
    24		"errors"
    25		"fmt"
    26		"io"
    27		"log"
    28		"os"
    29		"sync"
    30	
    31		"go4.org/jsonconfig"
    32		"modernc.org/kv"
    33		"perkeep.org/pkg/kvutil"
    34		"perkeep.org/pkg/sorted"
    35	)
    36	
    37	var _ sorted.Wiper = (*kvis)(nil)
    38	
    39	func init() {
    40		sorted.RegisterKeyValue("kv", newKeyValueFromJSONConfig)
    41	}
    42	
    43	// NewStorage is a convenience that calls newKeyValueFromJSONConfig
    44	// with file as the kv storage file.
    45	func NewStorage(file string) (sorted.KeyValue, error) {
    46		return newKeyValueFromJSONConfig(jsonconfig.Obj{"file": file})
    47	}
    48	
    49	// newKeyValueFromJSONConfig returns a KeyValue implementation on top of a
    50	// modernc.org/kv file.
    51	func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
    52		file := cfg.RequiredString("file")
    53		if err := cfg.Validate(); err != nil {
    54			return nil, err
    55		}
    56		opts := &kv.Options{}
    57		db, err := kvutil.Open(file, opts)
    58		if err != nil {
    59			return nil, err
    60		}
    61		is := &kvis{
    62			db:   db,
    63			opts: opts,
    64			path: file,
    65		}
    66		return is, nil
    67	}
    68	
    69	type kvis struct {
    70		path string
    71		db   *kv.DB
    72		opts *kv.Options
    73		txmu sync.Mutex
    74	}
    75	
    76	// TODO: use bytepool package.
    77	func getBuf(n int) []byte { return make([]byte, n) }
    78	func putBuf([]byte)       {}
    79	
    80	func (is *kvis) Get(key string) (string, error) {
    81		buf := getBuf(200)
    82		defer putBuf(buf)
    83		val, err := is.db.Get(buf, []byte(key))
    84		if err != nil {
    85			return "", err
    86		}
    87		if val == nil {
    88			return "", sorted.ErrNotFound
    89		}
    90		return string(val), nil
    91	}
    92	
    93	func (is *kvis) Set(key, value string) error {
    94		if err := sorted.CheckSizes(key, value); err != nil {
    95			log.Printf("Skipping storing (%q:%q): %v", key, value, err)
    96			return nil
    97		}
    98		return is.db.Set([]byte(key), []byte(value))
    99	}
   100	
   101	func (is *kvis) Delete(key string) error {
   102		return is.db.Delete([]byte(key))
   103	}
   104	
   105	func (is *kvis) Find(start, end string) sorted.Iterator {
   106		it := &iter{
   107			db:       is.db,
   108			startKey: start,
   109			endKey:   []byte(end),
   110		}
   111		it.enum, _, it.err = it.db.Seek([]byte(start))
   112		return it
   113	}
   114	
   115	func (is *kvis) BeginBatch() sorted.BatchMutation {
   116		return sorted.NewBatchMutation()
   117	}
   118	
   119	func (is *kvis) Wipe() error {
   120		// Unlock the already open DB.
   121		if err := is.db.Close(); err != nil {
   122			return err
   123		}
   124		if err := os.Remove(is.path); err != nil {
   125			return err
   126		}
   127	
   128		db, err := kv.Create(is.path, is.opts)
   129		if err != nil {
   130			return fmt.Errorf("error creating %s: %v", is.path, err)
   131		}
   132		is.db = db
   133		return nil
   134	}
   135	
   136	type batch interface {
   137		Mutations() []sorted.Mutation
   138	}
   139	
   140	func (is *kvis) CommitBatch(bm sorted.BatchMutation) error {
   141		b, ok := bm.(batch)
   142		if !ok {
   143			return errors.New("invalid batch type")
   144		}
   145		is.txmu.Lock()
   146		defer is.txmu.Unlock()
   147	
   148		good := false
   149		defer func() {
   150			if !good {
   151				is.db.Rollback()
   152			}
   153		}()
   154	
   155		if err := is.db.BeginTransaction(); err != nil {
   156			return err
   157		}
   158		for _, m := range b.Mutations() {
   159			if m.IsDelete() {
   160				if err := is.db.Delete([]byte(m.Key())); err != nil {
   161					return err
   162				}
   163			} else {
   164				if err := sorted.CheckSizes(m.Key(), m.Value()); err != nil {
   165					log.Printf("Skipping storing (%q:%q): %v", m.Key(), m.Value(), err)
   166					continue
   167				}
   168				if err := is.db.Set([]byte(m.Key()), []byte(m.Value())); err != nil {
   169					return err
   170				}
   171			}
   172		}
   173	
   174		good = true
   175		return is.db.Commit()
   176	}
   177	
   178	func (is *kvis) Close() error {
   179		log.Printf("Closing kvfile database %s", is.path)
   180		return is.db.Close()
   181	}
   182	
   183	type iter struct {
   184		db       *kv.DB
   185		startKey string
   186		endKey   []byte
   187	
   188		enum *kv.Enumerator
   189	
   190		valid      bool
   191		key, val   []byte
   192		skey, sval *string // non-nil if valid
   193	
   194		err    error
   195		closed bool
   196	}
   197	
   198	func (it *iter) Close() error {
   199		it.closed = true
   200		return it.err
   201	}
   202	
   203	func (it *iter) KeyBytes() []byte {
   204		if !it.valid {
   205			panic("not valid")
   206		}
   207		return it.key
   208	}
   209	
   210	func (it *iter) Key() string {
   211		if !it.valid {
   212			panic("not valid")
   213		}
   214		if it.skey != nil {
   215			return *it.skey
   216		}
   217		str := string(it.key)
   218		it.skey = &str
   219		return str
   220	}
   221	
   222	func (it *iter) ValueBytes() []byte {
   223		if !it.valid {
   224			panic("not valid")
   225		}
   226		return it.val
   227	}
   228	
   229	func (it *iter) Value() string {
   230		if !it.valid {
   231			panic("not valid")
   232		}
   233		if it.sval != nil {
   234			return *it.sval
   235		}
   236		str := string(it.val)
   237		it.sval = &str
   238		return str
   239	}
   240	
   241	func (it *iter) end() bool {
   242		it.valid = false
   243		it.closed = true
   244		return false
   245	}
   246	
   247	func (it *iter) Next() bool {
   248		if it.err != nil {
   249			return false
   250		}
   251		if it.closed {
   252			panic("Next called after Next returned value")
   253		}
   254		it.skey, it.sval = nil, nil
   255		var err error
   256		it.key, it.val, err = it.enum.Next()
   257		if err == io.EOF {
   258			it.err = nil
   259			return it.end()
   260		}
   261		if err != nil {
   262			it.err = err
   263			return it.end()
   264		}
   265		if len(it.endKey) > 0 && bytes.Compare(it.key, it.endKey) >= 0 {
   266			return it.end()
   267		}
   268		it.valid = true
   269		return true
   270	}
Website layout inspired by memcached.
Content by the authors.