1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18
19
20 package kvfile
21
22 import (
23 "bytes"
24 "errors"
25 "fmt"
26 "io"
27 "log"
28 "os"
29 "sync"
30
31 "go4.org/jsonconfig"
32 "modernc.org/kv"
33 "perkeep.org/pkg/kvutil"
34 "perkeep.org/pkg/sorted"
35 )
36
37 var _ sorted.Wiper = (*kvis)(nil)
38
39 func init() {
40 sorted.RegisterKeyValue("kv", newKeyValueFromJSONConfig)
41 }
42
43
44
45 func NewStorage(file string) (sorted.KeyValue, error) {
46 return newKeyValueFromJSONConfig(jsonconfig.Obj{"file": file})
47 }
48
49
50
51 func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
52 file := cfg.RequiredString("file")
53 if err := cfg.Validate(); err != nil {
54 return nil, err
55 }
56 opts := &kv.Options{}
57 db, err := kvutil.Open(file, opts)
58 if err != nil {
59 return nil, err
60 }
61 is := &kvis{
62 db: db,
63 opts: opts,
64 path: file,
65 }
66 return is, nil
67 }
68
69 type kvis struct {
70 path string
71 db *kv.DB
72 opts *kv.Options
73 txmu sync.Mutex
74 }
75
76
77 func getBuf(n int) []byte { return make([]byte, n) }
78 func putBuf([]byte) {}
79
80 func (is *kvis) Get(key string) (string, error) {
81 buf := getBuf(200)
82 defer putBuf(buf)
83 val, err := is.db.Get(buf, []byte(key))
84 if err != nil {
85 return "", err
86 }
87 if val == nil {
88 return "", sorted.ErrNotFound
89 }
90 return string(val), nil
91 }
92
93 func (is *kvis) Set(key, value string) error {
94 if err := sorted.CheckSizes(key, value); err != nil {
95 log.Printf("Skipping storing (%q:%q): %v", key, value, err)
96 return nil
97 }
98 return is.db.Set([]byte(key), []byte(value))
99 }
100
101 func (is *kvis) Delete(key string) error {
102 return is.db.Delete([]byte(key))
103 }
104
105 func (is *kvis) Find(start, end string) sorted.Iterator {
106 it := &iter{
107 db: is.db,
108 startKey: start,
109 endKey: []byte(end),
110 }
111 it.enum, _, it.err = it.db.Seek([]byte(start))
112 return it
113 }
114
115 func (is *kvis) BeginBatch() sorted.BatchMutation {
116 return sorted.NewBatchMutation()
117 }
118
119 func (is *kvis) Wipe() error {
120
121 if err := is.db.Close(); err != nil {
122 return err
123 }
124 if err := os.Remove(is.path); err != nil {
125 return err
126 }
127
128 db, err := kv.Create(is.path, is.opts)
129 if err != nil {
130 return fmt.Errorf("error creating %s: %v", is.path, err)
131 }
132 is.db = db
133 return nil
134 }
135
136 type batch interface {
137 Mutations() []sorted.Mutation
138 }
139
140 func (is *kvis) CommitBatch(bm sorted.BatchMutation) error {
141 b, ok := bm.(batch)
142 if !ok {
143 return errors.New("invalid batch type")
144 }
145 is.txmu.Lock()
146 defer is.txmu.Unlock()
147
148 good := false
149 defer func() {
150 if !good {
151 is.db.Rollback()
152 }
153 }()
154
155 if err := is.db.BeginTransaction(); err != nil {
156 return err
157 }
158 for _, m := range b.Mutations() {
159 if m.IsDelete() {
160 if err := is.db.Delete([]byte(m.Key())); err != nil {
161 return err
162 }
163 } else {
164 if err := sorted.CheckSizes(m.Key(), m.Value()); err != nil {
165 log.Printf("Skipping storing (%q:%q): %v", m.Key(), m.Value(), err)
166 continue
167 }
168 if err := is.db.Set([]byte(m.Key()), []byte(m.Value())); err != nil {
169 return err
170 }
171 }
172 }
173
174 good = true
175 return is.db.Commit()
176 }
177
178 func (is *kvis) Close() error {
179 log.Printf("Closing kvfile database %s", is.path)
180 return is.db.Close()
181 }
182
183 type iter struct {
184 db *kv.DB
185 startKey string
186 endKey []byte
187
188 enum *kv.Enumerator
189
190 valid bool
191 key, val []byte
192 skey, sval *string
193
194 err error
195 closed bool
196 }
197
198 func (it *iter) Close() error {
199 it.closed = true
200 return it.err
201 }
202
203 func (it *iter) KeyBytes() []byte {
204 if !it.valid {
205 panic("not valid")
206 }
207 return it.key
208 }
209
210 func (it *iter) Key() string {
211 if !it.valid {
212 panic("not valid")
213 }
214 if it.skey != nil {
215 return *it.skey
216 }
217 str := string(it.key)
218 it.skey = &str
219 return str
220 }
221
222 func (it *iter) ValueBytes() []byte {
223 if !it.valid {
224 panic("not valid")
225 }
226 return it.val
227 }
228
229 func (it *iter) Value() string {
230 if !it.valid {
231 panic("not valid")
232 }
233 if it.sval != nil {
234 return *it.sval
235 }
236 str := string(it.val)
237 it.sval = &str
238 return str
239 }
240
241 func (it *iter) end() bool {
242 it.valid = false
243 it.closed = true
244 return false
245 }
246
247 func (it *iter) Next() bool {
248 if it.err != nil {
249 return false
250 }
251 if it.closed {
252 panic("Next called after Next returned value")
253 }
254 it.skey, it.sval = nil, nil
255 var err error
256 it.key, it.val, err = it.enum.Next()
257 if err == io.EOF {
258 it.err = nil
259 return it.end()
260 }
261 if err != nil {
262 it.err = err
263 return it.end()
264 }
265 if len(it.endKey) > 0 && bytes.Compare(it.key, it.endKey) >= 0 {
266 return it.end()
267 }
268 it.valid = true
269 return true
270 }