1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18
19
20 package leveldb
21
22 import (
23 "errors"
24 "fmt"
25 "log"
26 "os"
27 "sync"
28
29 "go4.org/jsonconfig"
30 "perkeep.org/pkg/env"
31 "perkeep.org/pkg/sorted"
32
33 "github.com/syndtr/goleveldb/leveldb"
34 "github.com/syndtr/goleveldb/leveldb/filter"
35 "github.com/syndtr/goleveldb/leveldb/iterator"
36 "github.com/syndtr/goleveldb/leveldb/opt"
37 "github.com/syndtr/goleveldb/leveldb/util"
38 )
39
40 var _ sorted.Wiper = (*kvis)(nil)
41
42 func init() {
43 sorted.RegisterKeyValue("leveldb", newKeyValueFromJSONConfig)
44 }
45
46
47
48 func NewStorage(file string) (sorted.KeyValue, error) {
49 return newKeyValueFromJSONConfig(jsonconfig.Obj{"file": file})
50 }
51
52
53
54 func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
55 file := cfg.RequiredString("file")
56 if err := cfg.Validate(); err != nil {
57 return nil, err
58 }
59 strictness := opt.DefaultStrict
60 if env.IsDev() {
61
62 strictness = opt.StrictAll
63 }
64 opts := &opt.Options{
65
66
67
68
69
70 Filter: filter.NewBloomFilter(10),
71 Strict: strictness,
72 }
73 db, err := leveldb.OpenFile(file, opts)
74 if err != nil {
75 return nil, err
76 }
77 is := &kvis{
78 db: db,
79 path: file,
80 opts: opts,
81 readOpts: &opt.ReadOptions{Strict: strictness},
82
83
84 writeOpts: &opt.WriteOptions{Sync: false},
85 }
86 return is, nil
87 }
88
89 type kvis struct {
90 path string
91 db *leveldb.DB
92 opts *opt.Options
93 readOpts *opt.ReadOptions
94 writeOpts *opt.WriteOptions
95 }
96
97 func (is *kvis) Get(key string) (string, error) {
98 val, err := is.db.Get([]byte(key), is.readOpts)
99 if err != nil {
100 if err == leveldb.ErrNotFound {
101 return "", sorted.ErrNotFound
102 }
103 return "", err
104 }
105 if val == nil {
106 return "", sorted.ErrNotFound
107 }
108 return string(val), nil
109 }
110
111 func (is *kvis) Set(key, value string) error {
112 if err := sorted.CheckSizes(key, value); err != nil {
113 log.Printf("Skipping storing (%q:%q): %v", key, value, err)
114 return nil
115 }
116 return is.db.Put([]byte(key), []byte(value), is.writeOpts)
117 }
118
119 func (is *kvis) Delete(key string) error {
120 return is.db.Delete([]byte(key), is.writeOpts)
121 }
122
123 func (is *kvis) Find(start, end string) sorted.Iterator {
124 var startB, endB []byte
125
126 if start != "" {
127 startB = []byte(start)
128 }
129
130 if end != "" {
131 endB = []byte(end)
132 }
133 it := &iter{
134 it: is.db.NewIterator(
135 &util.Range{Start: startB, Limit: endB},
136 is.readOpts,
137 ),
138 }
139 return it
140 }
141
142 func (is *kvis) Wipe() error {
143
144 if err := is.db.Close(); err != nil {
145 return err
146 }
147 if err := os.RemoveAll(is.path); err != nil {
148 return err
149 }
150
151 db, err := leveldb.OpenFile(is.path, is.opts)
152 if err != nil {
153 return fmt.Errorf("error creating %s: %v", is.path, err)
154 }
155 is.db = db
156 return nil
157 }
158
159 func (is *kvis) BeginBatch() sorted.BatchMutation {
160 return &lvbatch{batch: new(leveldb.Batch)}
161 }
162
163 type lvbatch struct {
164 errMu sync.Mutex
165 err error
166
167 batch *leveldb.Batch
168 }
169
170 func (lvb *lvbatch) Set(key, value string) {
171 lvb.errMu.Lock()
172 defer lvb.errMu.Unlock()
173 if lvb.err != nil {
174 return
175 }
176 if err := sorted.CheckSizes(key, value); err != nil {
177 log.Printf("Skipping storing (%q:%q): %v", key, value, err)
178 return
179 }
180 lvb.batch.Put([]byte(key), []byte(value))
181 }
182
183 func (lvb *lvbatch) Delete(key string) {
184 lvb.batch.Delete([]byte(key))
185 }
186
187 func (is *kvis) CommitBatch(bm sorted.BatchMutation) error {
188 b, ok := bm.(*lvbatch)
189 if !ok {
190 return errors.New("invalid batch type")
191 }
192 b.errMu.Lock()
193 defer b.errMu.Unlock()
194 if b.err != nil {
195 return b.err
196 }
197 return is.db.Write(b.batch, is.writeOpts)
198 }
199
200 func (is *kvis) Close() error {
201 return is.db.Close()
202 }
203
204 type iter struct {
205 it iterator.Iterator
206
207 skey, sval *string
208
209
210
211 closed bool
212 }
213
214 func (it *iter) Close() error {
215 it.closed = true
216 it.it.Release()
217 return it.it.Error()
218 }
219
220 func (it *iter) KeyBytes() []byte {
221 return it.it.Key()
222 }
223
224 func (it *iter) Key() string {
225 if it.skey != nil {
226 return *it.skey
227 }
228 str := string(it.it.Key())
229 it.skey = &str
230 return str
231 }
232
233 func (it *iter) ValueBytes() []byte {
234 return it.it.Value()
235 }
236
237 func (it *iter) Value() string {
238 if it.sval != nil {
239 return *it.sval
240 }
241 str := string(it.it.Value())
242 it.sval = &str
243 return str
244 }
245
246 func (it *iter) Next() bool {
247 if it.closed {
248 panic("Next called on closed iterator")
249 }
250 it.skey, it.sval = nil, nil
251 return it.it.Next()
252 }