1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18
19 package mongo
20
21 import (
22 "bytes"
23 "errors"
24 "log"
25 "sync"
26 "time"
27
28 "go4.org/jsonconfig"
29 "perkeep.org/pkg/sorted"
30
31 "gopkg.in/mgo.v2"
32 "gopkg.in/mgo.v2/bson"
33 )
34
35
36
37
38
39
40 const (
41 CollectionName = "keys"
42 mgoKey = "k"
43 mgoValue = "v"
44 )
45
46 func init() {
47 sorted.RegisterKeyValue("mongo", newKeyValueFromJSONConfig)
48 }
49
50 func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
51 ins := &instance{
52 server: cfg.OptionalString("host", "localhost"),
53 database: cfg.RequiredString("database"),
54 user: cfg.OptionalString("user", ""),
55 password: cfg.OptionalString("password", ""),
56 }
57 if err := cfg.Validate(); err != nil {
58 return nil, err
59 }
60 db, err := ins.getCollection()
61 if err != nil {
62 return nil, err
63 }
64 return &keyValue{db: db, session: ins.session}, nil
65 }
66
67
68 type iter struct {
69 res bson.M
70 *mgo.Iter
71 end []byte
72 }
73
74 func (it *iter) Next() bool {
75 if !it.Iter.Next(&it.res) {
76 return false
77 }
78 if len(it.end) > 0 && bytes.Compare(it.KeyBytes(), it.end) >= 0 {
79 return false
80 }
81 return true
82 }
83
84 func (it *iter) Key() string {
85 key, ok := (it.res[mgoKey]).(string)
86 if !ok {
87 return ""
88 }
89 return key
90 }
91
92 func (it *iter) KeyBytes() []byte {
93
94
95
96 return []byte(it.Key())
97 }
98
99 func (it *iter) Value() string {
100 value, ok := (it.res[mgoValue]).(string)
101 if !ok {
102 return ""
103 }
104 return value
105 }
106
107 func (it *iter) ValueBytes() []byte {
108
109
110
111 return []byte(it.Value())
112 }
113
114 func (it *iter) Close() error {
115 return it.Iter.Close()
116 }
117
118
119 type keyValue struct {
120 session *mgo.Session
121 mu sync.Mutex
122 db *mgo.Collection
123 }
124
125 func (kv *keyValue) Get(key string) (string, error) {
126 kv.mu.Lock()
127 defer kv.mu.Unlock()
128 res := bson.M{}
129 q := kv.db.Find(&bson.M{mgoKey: key})
130 err := q.One(&res)
131 if err != nil {
132 if err == mgo.ErrNotFound {
133 return "", sorted.ErrNotFound
134 }
135 return "", err
136 }
137 return res[mgoValue].(string), err
138 }
139
140 func (kv *keyValue) Find(start, end string) sorted.Iterator {
141 kv.mu.Lock()
142 defer kv.mu.Unlock()
143 it := kv.db.Find(&bson.M{mgoKey: &bson.M{"$gte": start}}).Sort(mgoKey).Iter()
144 return &iter{res: bson.M{}, Iter: it, end: []byte(end)}
145 }
146
147 func (kv *keyValue) Set(key, value string) error {
148 if err := sorted.CheckSizes(key, value); err != nil {
149 log.Printf("Skipping storing (%q:%q): %v", key, value, err)
150 return nil
151 }
152 kv.mu.Lock()
153 defer kv.mu.Unlock()
154 _, err := kv.db.Upsert(&bson.M{mgoKey: key}, &bson.M{mgoKey: key, mgoValue: value})
155 return err
156 }
157
158
159 func (kv *keyValue) Delete(key string) error {
160 kv.mu.Lock()
161 defer kv.mu.Unlock()
162 err := kv.db.Remove(&bson.M{mgoKey: key})
163 if err == mgo.ErrNotFound {
164 return nil
165 }
166 return err
167 }
168
169
170 func (kv *keyValue) Wipe() error {
171 kv.mu.Lock()
172 defer kv.mu.Unlock()
173 _, err := kv.db.RemoveAll(nil)
174 return err
175 }
176
177 type batch interface {
178 Mutations() []sorted.Mutation
179 }
180
181 func (kv *keyValue) BeginBatch() sorted.BatchMutation {
182 return sorted.NewBatchMutation()
183 }
184
185 func (kv *keyValue) CommitBatch(bm sorted.BatchMutation) error {
186 b, ok := bm.(batch)
187 if !ok {
188 return errors.New("invalid batch type")
189 }
190
191 kv.mu.Lock()
192 defer kv.mu.Unlock()
193 for _, m := range b.Mutations() {
194 if m.IsDelete() {
195 if err := kv.db.Remove(bson.M{mgoKey: m.Key()}); err != nil && err != mgo.ErrNotFound {
196 return err
197 }
198 } else {
199 if err := sorted.CheckSizes(m.Key(), m.Value()); err != nil {
200 log.Printf("Skipping storing (%q:%q): %v", m.Key(), m.Value(), err)
201 continue
202 }
203 if _, err := kv.db.Upsert(&bson.M{mgoKey: m.Key()}, &bson.M{mgoKey: m.Key(), mgoValue: m.Value()}); err != nil {
204 return err
205 }
206 }
207 }
208 return nil
209 }
210
211 func (kv *keyValue) Close() error {
212 kv.session.Close()
213 return nil
214 }
215
216
217 func Ping(host string, timeout time.Duration) bool {
218 return (&instance{server: host}).ping(timeout)
219 }
220
221
222
223 type instance struct {
224 server string
225 database string
226 user string
227 password string
228 session *mgo.Session
229 }
230
231 func (ins *instance) url() string {
232 if ins.user == "" || ins.password == "" {
233 return ins.server
234 }
235 return ins.user + ":" + ins.password + "@" + ins.server + "/" + ins.database
236 }
237
238
239 func (ins *instance) ping(timeout time.Duration) bool {
240 session, err := mgo.DialWithTimeout(ins.url(), timeout)
241 if err != nil {
242 return false
243 }
244 defer session.Close()
245 session.SetSyncTimeout(timeout)
246 if err = session.Ping(); err != nil {
247 return false
248 }
249 return true
250 }
251
252 func (ins *instance) getConnection() (*mgo.Session, error) {
253 if ins.session != nil {
254 return ins.session, nil
255 }
256
257 session, err := mgo.Dial(ins.url())
258 if err != nil {
259 return nil, err
260 }
261 session.SetMode(mgo.Monotonic, true)
262 session.SetSafe(&mgo.Safe{})
263 ins.session = session
264 return session, nil
265 }
266
267
268
269
270
271
272
273 func (ins *instance) getCollection() (*mgo.Collection, error) {
274 session, err := ins.getConnection()
275 if err != nil {
276 return nil, err
277 }
278 session.SetSafe(&mgo.Safe{})
279 session.SetMode(mgo.Strong, true)
280 c := session.DB(ins.database).C(CollectionName)
281 return c, nil
282 }