1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18
19
20
21
22
23 package namespace
24
25 import (
26 "bytes"
27 "context"
28 "fmt"
29 "io"
30 "log"
31 "os"
32 "strconv"
33
34 "go4.org/jsonconfig"
35 "perkeep.org/pkg/blob"
36 "perkeep.org/pkg/blobserver"
37 "perkeep.org/pkg/sorted"
38
39 "go4.org/strutil"
40 )
41
42 type nsto struct {
43 inventory sorted.KeyValue
44 master blobserver.Storage
45 }
46
47 func init() {
48 blobserver.RegisterStorageConstructor("namespace", blobserver.StorageConstructor(newFromConfig))
49 }
50
51 func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
52 sto := &nsto{}
53 invConf := config.RequiredObject("inventory")
54 masterName := config.RequiredString("storage")
55 if err := config.Validate(); err != nil {
56 return nil, err
57 }
58 sto.inventory, err = sorted.NewKeyValue(invConf)
59 if err != nil {
60 return nil, fmt.Errorf("Invalid 'inventory' configuration: %v", err)
61 }
62 sto.master, err = ld.GetStorage(masterName)
63 if err != nil {
64 return nil, fmt.Errorf("Invalid 'storage' configuration: %v", err)
65 }
66 return sto, nil
67 }
68
69 func (ns *nsto) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
70 defer close(dest)
71 done := ctx.Done()
72
73 it := ns.inventory.Find(after, "")
74 first := true
75 for limit > 0 && it.Next() {
76 if first {
77 first = false
78 if after != "" && it.Key() == after {
79 continue
80 }
81 }
82 br, ok := blob.ParseBytes(it.KeyBytes())
83 size, err := strutil.ParseUintBytes(it.ValueBytes(), 10, 32)
84 if !ok || err != nil {
85 log.Printf("Bogus namespace key %q / value %q", it.Key(), it.Value())
86 continue
87 }
88 select {
89 case dest <- blob.SizedRef{Ref: br, Size: uint32(size)}:
90 case <-done:
91 return ctx.Err()
92 }
93 limit--
94 }
95 if err := it.Close(); err != nil {
96 return err
97 }
98 return nil
99 }
100
101 func (ns *nsto) Fetch(ctx context.Context, br blob.Ref) (rc io.ReadCloser, size uint32, err error) {
102 invSizeStr, err := ns.inventory.Get(br.String())
103 if err == sorted.ErrNotFound {
104 err = os.ErrNotExist
105 return
106 }
107 if err != nil {
108 return
109 }
110 invSize, err := strconv.ParseUint(invSizeStr, 10, 32)
111 if err != nil {
112 return
113 }
114 rc, size, err = ns.master.Fetch(ctx, br)
115 if err != nil {
116 return
117 }
118 if size != uint32(invSize) {
119 log.Printf("namespace: on blob %v, unexpected inventory size %d for master size %d", br, invSize, size)
120 return nil, 0, os.ErrNotExist
121 }
122 return rc, size, nil
123 }
124
125 func (ns *nsto) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) {
126 var buf bytes.Buffer
127 size, err := io.Copy(&buf, src)
128 if err != nil {
129 return
130 }
131
132
133 if _, ierr := ns.inventory.Get(br.String()); ierr == nil {
134 return blob.SizedRef{Ref: br, Size: uint32(size)}, nil
135 }
136
137 sb, err = ns.master.ReceiveBlob(ctx, br, &buf)
138 if err != nil {
139 return
140 }
141
142 err = ns.inventory.Set(br.String(), strconv.Itoa(int(size)))
143 return
144 }
145
146 func (ns *nsto) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
147 for _, br := range blobs {
148 if err := ns.inventory.Delete(br.String()); err != nil {
149 return err
150 }
151 }
152 return nil
153 }
154
155 func (ns *nsto) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
156 for _, br := range blobs {
157 invSizeStr, err := ns.inventory.Get(br.String())
158 if err == sorted.ErrNotFound {
159 continue
160 }
161 if err != nil {
162 return err
163 }
164 invSize, err := strconv.ParseUint(invSizeStr, 10, 32)
165 if err != nil {
166 log.Printf("Bogus namespace key %q / value %q", br.String(), invSizeStr)
167 }
168 if err := fn(blob.SizedRef{Ref: br, Size: uint32(invSize)}); err != nil {
169 return err
170 }
171 }
172 return nil
173 }