1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18
19
20
21 package buffer
22
23 import (
24 "fmt"
25 "log"
26 "sync"
27
28 "perkeep.org/pkg/sorted"
29 )
30
31
32
33
34
35 func New(buffer, backing sorted.KeyValue, maxBufferBytes int64) *KeyValue {
36 return &KeyValue{
37 buf: buffer,
38 back: backing,
39 maxBuffer: maxBufferBytes,
40 }
41 }
42
43 var _ sorted.KeyValue = (*KeyValue)(nil)
44
45 type KeyValue struct {
46 buf, back sorted.KeyValue
47 maxBuffer int64
48
49 bufMu sync.Mutex
50 buffered int64
51
52
53
54 mu sync.RWMutex
55 }
56
57 func (kv *KeyValue) Flush() error {
58 kv.mu.Lock()
59 defer kv.mu.Unlock()
60 var (
61 bmback = kv.back.BeginBatch()
62 bmbuf = kv.buf.BeginBatch()
63 commit = false
64 it = kv.buf.Find("", "")
65 )
66 for it.Next() {
67 bmback.Set(it.Key(), it.Value())
68 bmbuf.Delete(it.Key())
69 commit = true
70 }
71 if err := it.Close(); err != nil {
72 return err
73 }
74 if commit {
75 if err := kv.back.CommitBatch(bmback); err != nil {
76 return err
77 }
78 if err := kv.buf.CommitBatch(bmbuf); err != nil {
79 return err
80 }
81 kv.bufMu.Lock()
82 kv.buffered = 0
83 kv.bufMu.Unlock()
84 }
85 return nil
86 }
87
88 func (kv *KeyValue) Get(key string) (string, error) {
89 kv.mu.RLock()
90 defer kv.mu.RUnlock()
91 v, err := kv.buf.Get(key)
92 switch err {
93 case sorted.ErrNotFound:
94 break
95 case nil:
96 return v, nil
97 default:
98 return "", err
99 }
100 return kv.back.Get(key)
101 }
102
103 func (kv *KeyValue) Set(key, value string) error {
104 if err := sorted.CheckSizes(key, value); err != nil {
105 log.Printf("Skipping storing (%q:%q): %v", key, value, err)
106 return nil
107 }
108 kv.mu.RLock()
109 err := kv.buf.Set(key, value)
110 kv.mu.RUnlock()
111 if err == nil {
112 kv.bufMu.Lock()
113 kv.buffered += int64(len(key) + len(value))
114 doFlush := kv.buffered > kv.maxBuffer
115 kv.bufMu.Unlock()
116 if doFlush {
117 err = kv.Flush()
118 }
119 }
120 return err
121 }
122
123 func (kv *KeyValue) Delete(key string) error {
124 kv.mu.RLock()
125 defer kv.mu.RUnlock()
126
127
128
129
130
131
132 err1 := kv.buf.Delete(key)
133 err2 := kv.back.Delete(key)
134 if err1 != nil {
135 return err1
136 }
137 return err2
138 }
139
140 func (kv *KeyValue) BeginBatch() sorted.BatchMutation {
141 return new(batch)
142 }
143
144 func (kv *KeyValue) CommitBatch(bm sorted.BatchMutation) error {
145 kv.mu.RLock()
146 defer kv.mu.RUnlock()
147 b, ok := bm.(*batch)
148 if !ok {
149 return fmt.Errorf("unexpected BatchMutation type %T", bm)
150 }
151 var (
152
153 bmbuf = kv.buf.BeginBatch()
154
155
156 bmback sorted.BatchMutation
157 )
158 for _, m := range b.mods {
159 if m.isDelete {
160 bmbuf.Delete(m.key)
161 if bmback == nil {
162 bmback = kv.back.BeginBatch()
163 }
164 bmback.Delete(m.key)
165 continue
166 } else {
167 if err := sorted.CheckSizes(m.key, m.value); err != nil {
168 log.Printf("Skipping storing (%q:%q): %v", m.key, m.value, err)
169 continue
170 }
171 }
172 bmbuf.Set(m.key, m.value)
173 }
174 if err := kv.buf.CommitBatch(bmbuf); err != nil {
175 return err
176 }
177 if bmback != nil {
178 return kv.back.CommitBatch(bmback)
179 }
180 return nil
181 }
182
183 func (kv *KeyValue) Close() error {
184 if err := kv.Flush(); err != nil {
185 return err
186 }
187 return kv.back.Close()
188 }
189
190 func (kv *KeyValue) Find(start, end string) sorted.Iterator {
191
192 ibuf := kv.buf.Find(start, end)
193 iback := kv.back.Find(start, end)
194 return &iter{
195 buf: subIter{Iterator: ibuf},
196 back: subIter{Iterator: iback},
197 }
198 }
199
200 type batch struct {
201 mu sync.Mutex
202 mods []mod
203 }
204
205 type mod struct {
206 isDelete bool
207 key, value string
208 }
209
210 func (b *batch) Set(key, value string) {
211 defer b.mu.Unlock()
212 b.mu.Lock()
213 b.mods = append(b.mods, mod{key: key, value: value})
214 }
215
216 func (b *batch) Delete(key string) {
217 defer b.mu.Unlock()
218 b.mu.Lock()
219 b.mods = append(b.mods, mod{key: key, isDelete: true})
220 }
221
222 type iter struct {
223 buf, back subIter
224 }
225
226 func (it *iter) current() *subIter {
227 switch {
228 case it.back.eof:
229 return &it.buf
230 case it.buf.eof:
231 return &it.back
232 case it.buf.key <= it.back.key:
233 return &it.buf
234 default:
235 return &it.back
236 }
237 }
238
239 func (it *iter) Next() bool {
240
241
242 start := false
243 if it.buf.key == "" && !it.buf.eof {
244 start = it.buf.next()
245 }
246 if it.back.key == "" && !it.buf.eof {
247 start = it.back.next() || start
248 }
249 if start {
250
251 return true
252 }
253
254 if it.buf.eof && it.back.eof {
255 return false
256 }
257
258 if it.buf.eof {
259 return it.back.next()
260 }
261 if it.back.eof {
262 return it.buf.next()
263 }
264
265
266
267 switch {
268 case it.buf.key < it.back.key:
269 it.buf.next()
270 case it.buf.key > it.back.key:
271 it.back.next()
272 case it.buf.key == it.back.key:
273 n1, n2 := it.buf.next(), it.back.next()
274 if !n1 && !n2 {
275
276 return false
277 }
278 }
279 return true
280 }
281
282 func (it *iter) Key() string {
283 return it.current().key
284 }
285
286 func (it *iter) Value() string {
287 return it.current().Value()
288 }
289
290 func (it *iter) KeyBytes() []byte {
291 return it.current().KeyBytes()
292 }
293
294 func (it *iter) ValueBytes() []byte {
295 return it.current().ValueBytes()
296 }
297
298 func (it *iter) Close() error {
299 err1 := it.buf.Close()
300 err2 := it.back.Close()
301 if err1 != nil {
302 return err1
303 }
304 return err2
305 }
306
307
308
309 type subIter struct {
310 sorted.Iterator
311 key string
312 eof bool
313 }
314
315 func (it *subIter) next() bool {
316 if it.Next() {
317 it.key = it.Key()
318 return true
319 }
320 it.eof = true
321 return false
322 }