1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package schema
18
19 import (
20 "context"
21 "encoding/json"
22 "errors"
23 "fmt"
24 "io"
25
26 "go4.org/syncutil"
27
28 "perkeep.org/pkg/blob"
29 )
30
31
32
33 type DirReader struct {
34 fetcher blob.Fetcher
35 ss *superset
36
37 staticSet []blob.Ref
38 current int
39 }
40
41
42
43 func NewDirReader(ctx context.Context, fetcher blob.Fetcher, dirBlobRef blob.Ref) (*DirReader, error) {
44 ss := new(superset)
45 err := ss.setFromBlobRef(ctx, fetcher, dirBlobRef)
46 if err != nil {
47 return nil, err
48 }
49 if ss.Type != "directory" {
50 return nil, fmt.Errorf("schema/dirreader: expected \"directory\" schema blob for %s, got %q", dirBlobRef, ss.Type)
51 }
52 dr, err := ss.NewDirReader(fetcher)
53 if err != nil {
54 return nil, fmt.Errorf("schema/dirreader: creating DirReader for %s: %v", dirBlobRef, err)
55 }
56 dr.current = 0
57 return dr, nil
58 }
59
60 func (b *Blob) NewDirReader(ctx context.Context, fetcher blob.Fetcher) (*DirReader, error) {
61 return b.ss.NewDirReader(fetcher)
62 }
63
64 func (ss *superset) NewDirReader(fetcher blob.Fetcher) (*DirReader, error) {
65 if ss.Type != "directory" {
66 return nil, fmt.Errorf("Superset not of type \"directory\"")
67 }
68 return &DirReader{fetcher: fetcher, ss: ss}, nil
69 }
70
71 func (ss *superset) setFromBlobRef(ctx context.Context, fetcher blob.Fetcher, blobRef blob.Ref) error {
72 if !blobRef.Valid() {
73 return errors.New("schema/dirreader: blobref invalid")
74 }
75 ss.BlobRef = blobRef
76 rc, _, err := fetcher.Fetch(ctx, blobRef)
77 if err != nil {
78 return fmt.Errorf("schema/dirreader: fetching schema blob %s: %v", blobRef, err)
79 }
80 defer rc.Close()
81 if err := json.NewDecoder(rc).Decode(ss); err != nil {
82 return fmt.Errorf("schema/dirreader: decoding schema blob %s: %v", blobRef, err)
83 }
84 return nil
85 }
86
87
88 func (dr *DirReader) StaticSet(ctx context.Context) ([]blob.Ref, error) {
89 if dr.staticSet != nil {
90 return dr.staticSet, nil
91 }
92 staticSetBlobref := dr.ss.Entries
93 if !staticSetBlobref.Valid() {
94 return nil, errors.New("schema/dirreader: Invalid blobref")
95 }
96 members, err := staticSet(ctx, staticSetBlobref, dr.fetcher)
97 if err != nil {
98 return nil, err
99 }
100 dr.staticSet = members
101 return dr.staticSet, nil
102 }
103
104 func staticSet(ctx context.Context, staticSetBlobref blob.Ref, fetcher blob.Fetcher) ([]blob.Ref, error) {
105 rsc, _, err := fetcher.Fetch(ctx, staticSetBlobref)
106 if err != nil {
107 return nil, fmt.Errorf("schema/dirreader: fetching schema blob %s: %v", staticSetBlobref, err)
108 }
109 defer rsc.Close()
110 ss, err := parseSuperset(rsc)
111 if err != nil {
112 return nil, fmt.Errorf("schema/dirreader: decoding schema blob %s: %v", staticSetBlobref, err)
113 }
114 if ss.Type != "static-set" {
115 return nil, fmt.Errorf("schema/dirreader: expected \"static-set\" schema blob for %s, got %q", staticSetBlobref, ss.Type)
116 }
117 var members []blob.Ref
118 if len(ss.Members) > 0 {
119
120
121 for _, member := range ss.Members {
122 if !member.Valid() {
123 return nil, fmt.Errorf("schema/dirreader: invalid (static-set member) blobref referred by \"static-set\" schema blob %v", staticSetBlobref)
124 }
125 members = append(members, member)
126 }
127 return members, nil
128 }
129
130
131 for _, toMerge := range ss.MergeSets {
132 if !toMerge.Valid() {
133 return nil, fmt.Errorf("schema/dirreader: invalid (static-set subset) blobref referred by \"static-set\" schema blob %v", staticSetBlobref)
134 }
135
136 subset, err := staticSet(ctx, toMerge, fetcher)
137 if err != nil {
138 return nil, fmt.Errorf("schema/dirreader: could not get members of %q, subset of %v: %v", toMerge, staticSetBlobref, err)
139 }
140 members = append(members, subset...)
141 }
142 return members, nil
143 }
144
145
146 func (dr *DirReader) Readdir(ctx context.Context, n int) (entries []DirectoryEntry, err error) {
147 sts, err := dr.StaticSet(ctx)
148 if err != nil {
149 return nil, fmt.Errorf("schema/dirreader: can't get StaticSet: %v", err)
150 }
151 up := dr.current + n
152 if n <= 0 {
153 dr.current = 0
154 up = len(sts)
155 } else {
156 if n > (len(sts) - dr.current) {
157 err = io.EOF
158 up = len(sts)
159 }
160 }
161
162
163
164
165
166
167
168
169
170 type res struct {
171 ent DirectoryEntry
172 err error
173 }
174 var cs []chan res
175
176
177 gate := syncutil.NewGate(20)
178 for _, entRef := range sts[dr.current:up] {
179 c := make(chan res, 1)
180 cs = append(cs, c)
181 gate.Start()
182 go func(entRef blob.Ref) {
183 defer gate.Done()
184 entry, err := NewDirectoryEntryFromBlobRef(ctx, dr.fetcher, entRef)
185 c <- res{entry, err}
186 }(entRef)
187 }
188
189 for _, c := range cs {
190 res := <-c
191 if res.err != nil {
192 return nil, fmt.Errorf("schema/dirreader: can't create dirEntry: %v", res.err)
193 }
194 entries = append(entries, res.ent)
195 }
196 return entries, nil
197 }