Home Download Docs Code Community
     1	/*
     2	Copyright 2013 The Perkeep Authors.
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	// Package mongo provides an implementation of sorted.KeyValue
    18	// using MongoDB.
    19	package mongo // import "perkeep.org/pkg/sorted/mongo"
    20	
    21	import (
    22		"bytes"
    23		"errors"
    24		"log"
    25		"sync"
    26		"time"
    27	
    28		"go4.org/jsonconfig"
    29		"perkeep.org/pkg/sorted"
    30	
    31		"gopkg.in/mgo.v2"
    32		"gopkg.in/mgo.v2/bson"
    33	)
    34	
    35	// We explicitly separate the key and the value in a document,
    36	// instead of simply storing as key:value, to avoid problems
    37	// such as "." being an illegal char in a key name. Also because
    38	// there is no way to do partial matching for key names (one can
    39	// only check for their existence with bson.M{$exists: true}).
    40	const (
    41		CollectionName = "keys" // MongoDB collection, equiv. to SQL table
    42		mgoKey         = "k"
    43		mgoValue       = "v"
    44	)
    45	
    46	func init() {
    47		sorted.RegisterKeyValue("mongo", newKeyValueFromJSONConfig)
    48	}
    49	
    50	func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
    51		ins := &instance{
    52			server:   cfg.OptionalString("host", "localhost"),
    53			database: cfg.RequiredString("database"),
    54			user:     cfg.OptionalString("user", ""),
    55			password: cfg.OptionalString("password", ""),
    56		}
    57		if err := cfg.Validate(); err != nil {
    58			return nil, err
    59		}
    60		db, err := ins.getCollection()
    61		if err != nil {
    62			return nil, err
    63		}
    64		return &keyValue{db: db, session: ins.session}, nil
    65	}
    66	
    67	// Implementation of Iterator
    68	type iter struct {
    69		res bson.M
    70		*mgo.Iter
    71		end []byte
    72	}
    73	
    74	func (it *iter) Next() bool {
    75		if !it.Iter.Next(&it.res) {
    76			return false
    77		}
    78		if len(it.end) > 0 && bytes.Compare(it.KeyBytes(), it.end) >= 0 {
    79			return false
    80		}
    81		return true
    82	}
    83	
    84	func (it *iter) Key() string {
    85		key, ok := (it.res[mgoKey]).(string)
    86		if !ok {
    87			return ""
    88		}
    89		return key
    90	}
    91	
    92	func (it *iter) KeyBytes() []byte {
    93		// TODO(bradfitz,mpl): this is less efficient than the string way. we should
    94		// do better here, somehow, like all the other KeyValue iterators.
    95		// For now:
    96		return []byte(it.Key())
    97	}
    98	
    99	func (it *iter) Value() string {
   100		value, ok := (it.res[mgoValue]).(string)
   101		if !ok {
   102			return ""
   103		}
   104		return value
   105	}
   106	
   107	func (it *iter) ValueBytes() []byte {
   108		// TODO(bradfitz,mpl): this is less efficient than the string way. we should
   109		// do better here, somehow, like all the other KeyValue iterators.
   110		// For now:
   111		return []byte(it.Value())
   112	}
   113	
   114	func (it *iter) Close() error {
   115		return it.Iter.Close()
   116	}
   117	
   118	// Implementation of KeyValue
   119	type keyValue struct {
   120		session *mgo.Session // so we can close it
   121		mu      sync.Mutex   // guards db
   122		db      *mgo.Collection
   123	}
   124	
   125	func (kv *keyValue) Get(key string) (string, error) {
   126		kv.mu.Lock()
   127		defer kv.mu.Unlock()
   128		res := bson.M{}
   129		q := kv.db.Find(&bson.M{mgoKey: key})
   130		err := q.One(&res)
   131		if err != nil {
   132			if err == mgo.ErrNotFound {
   133				return "", sorted.ErrNotFound
   134			}
   135			return "", err
   136		}
   137		return res[mgoValue].(string), err
   138	}
   139	
   140	func (kv *keyValue) Find(start, end string) sorted.Iterator {
   141		kv.mu.Lock()
   142		defer kv.mu.Unlock()
   143		it := kv.db.Find(&bson.M{mgoKey: &bson.M{"$gte": start}}).Sort(mgoKey).Iter()
   144		return &iter{res: bson.M{}, Iter: it, end: []byte(end)}
   145	}
   146	
   147	func (kv *keyValue) Set(key, value string) error {
   148		if err := sorted.CheckSizes(key, value); err != nil {
   149			log.Printf("Skipping storing (%q:%q): %v", key, value, err)
   150			return nil
   151		}
   152		kv.mu.Lock()
   153		defer kv.mu.Unlock()
   154		_, err := kv.db.Upsert(&bson.M{mgoKey: key}, &bson.M{mgoKey: key, mgoValue: value})
   155		return err
   156	}
   157	
   158	// Delete removes the document with the matching key.
   159	func (kv *keyValue) Delete(key string) error {
   160		kv.mu.Lock()
   161		defer kv.mu.Unlock()
   162		err := kv.db.Remove(&bson.M{mgoKey: key})
   163		if err == mgo.ErrNotFound {
   164			return nil
   165		}
   166		return err
   167	}
   168	
   169	// Wipe removes all documents from the collection.
   170	func (kv *keyValue) Wipe() error {
   171		kv.mu.Lock()
   172		defer kv.mu.Unlock()
   173		_, err := kv.db.RemoveAll(nil)
   174		return err
   175	}
   176	
   177	type batch interface {
   178		Mutations() []sorted.Mutation
   179	}
   180	
   181	func (kv *keyValue) BeginBatch() sorted.BatchMutation {
   182		return sorted.NewBatchMutation()
   183	}
   184	
   185	func (kv *keyValue) CommitBatch(bm sorted.BatchMutation) error {
   186		b, ok := bm.(batch)
   187		if !ok {
   188			return errors.New("invalid batch type")
   189		}
   190	
   191		kv.mu.Lock()
   192		defer kv.mu.Unlock()
   193		for _, m := range b.Mutations() {
   194			if m.IsDelete() {
   195				if err := kv.db.Remove(bson.M{mgoKey: m.Key()}); err != nil && err != mgo.ErrNotFound {
   196					return err
   197				}
   198			} else {
   199				if err := sorted.CheckSizes(m.Key(), m.Value()); err != nil {
   200					log.Printf("Skipping storing (%q:%q): %v", m.Key(), m.Value(), err)
   201					continue
   202				}
   203				if _, err := kv.db.Upsert(&bson.M{mgoKey: m.Key()}, &bson.M{mgoKey: m.Key(), mgoValue: m.Value()}); err != nil {
   204					return err
   205				}
   206			}
   207		}
   208		return nil
   209	}
   210	
   211	func (kv *keyValue) Close() error {
   212		kv.session.Close()
   213		return nil
   214	}
   215	
   216	// Ping tests if MongoDB on host can be dialed.
   217	func Ping(host string, timeout time.Duration) bool {
   218		return (&instance{server: host}).ping(timeout)
   219	}
   220	
   221	// instance helps with the low level details about
   222	// the connection to MongoDB.
   223	type instance struct {
   224		server   string
   225		database string
   226		user     string
   227		password string
   228		session  *mgo.Session
   229	}
   230	
   231	func (ins *instance) url() string {
   232		if ins.user == "" || ins.password == "" {
   233			return ins.server
   234		}
   235		return ins.user + ":" + ins.password + "@" + ins.server + "/" + ins.database
   236	}
   237	
   238	// ping won't work with old (1.2) mongo servers.
   239	func (ins *instance) ping(timeout time.Duration) bool {
   240		session, err := mgo.DialWithTimeout(ins.url(), timeout)
   241		if err != nil {
   242			return false
   243		}
   244		defer session.Close()
   245		session.SetSyncTimeout(timeout)
   246		if err = session.Ping(); err != nil {
   247			return false
   248		}
   249		return true
   250	}
   251	
   252	func (ins *instance) getConnection() (*mgo.Session, error) {
   253		if ins.session != nil {
   254			return ins.session, nil
   255		}
   256		// TODO(mpl): do some "client caching" as in mysql, to avoid systematically dialing?
   257		session, err := mgo.Dial(ins.url())
   258		if err != nil {
   259			return nil, err
   260		}
   261		session.SetMode(mgo.Monotonic, true)
   262		session.SetSafe(&mgo.Safe{}) // so we get an ErrNotFound error when deleting an absent key
   263		ins.session = session
   264		return session, nil
   265	}
   266	
   267	// TODO(mpl): I'm only calling getCollection at the beginning, and
   268	// keeping the collection around and reusing it everywhere, instead
   269	// of calling getCollection everytime, because that's the easiest.
   270	// But I can easily change that. Gustavo says it does not make
   271	// much difference either way.
   272	// Brad, what do you think?
   273	func (ins *instance) getCollection() (*mgo.Collection, error) {
   274		session, err := ins.getConnection()
   275		if err != nil {
   276			return nil, err
   277		}
   278		session.SetSafe(&mgo.Safe{})
   279		session.SetMode(mgo.Strong, true)
   280		c := session.DB(ins.database).C(CollectionName)
   281		return c, nil
   282	}
Website layout inspired by memcached.
Content by the authors.