1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package blobpacked
18
19
20
21
22 import (
23 "context"
24 "errors"
25 "fmt"
26 "io"
27 "log"
28 "os"
29 "sort"
30
31 "perkeep.org/pkg/blob"
32 "perkeep.org/pkg/conv"
33 )
34
35
36
37 type zipPart struct {
38 idx uint32
39 zipRef blob.Ref
40 zipOff uint32
41 len uint32
42 }
43
44
45
46
47 type byZipIndex []zipPart
48
49 func (s byZipIndex) Len() int { return len(s) }
50 func (s byZipIndex) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
51 func (s byZipIndex) Less(i, j int) bool { return s[i].idx < s[j].idx }
52
53 func (s *storage) OpenWholeRef(wholeRef blob.Ref, offset int64) (rc io.ReadCloser, wholeSize int64, err error) {
54 ctx := context.TODO()
55
56
57 startKey := wholeMetaPrefix + wholeRef.String()
58 it := s.meta.Find(startKey, startKey+";")
59 if it == nil {
60 panic("nil iterator")
61 }
62 rows := 0
63 var parts []zipPart
64 var nZipWant uint32
65 for it.Next() {
66 rows++
67 k := it.KeyBytes()
68 if len(k) == len(startKey) {
69 if rows != 1 {
70
71 break
72 }
73 if err := conv.ParseFields(it.ValueBytes(), &wholeSize, &nZipWant); err != nil {
74 it.Close()
75 return nil, 0, err
76 }
77 continue
78 }
79 var zp zipPart
80 if k[len(startKey)] != ':' {
81
82 break
83 }
84 if err := conv.ParseFields(k[len(startKey)+len(":"):], &zp.idx); err != nil {
85 it.Close()
86 return nil, 0, fmt.Errorf("blobpacked: error parsing meta key %q: %v", k, err)
87 }
88
89 var ignore uint64
90 if err := conv.ParseFields(it.ValueBytes(), &zp.zipRef, &zp.zipOff, &ignore, &zp.len); err != nil {
91 it.Close()
92 return nil, 0, fmt.Errorf("blobpacked: error parsing meta key %q = %q: %v", k, it.ValueBytes(), err)
93 }
94 parts = append(parts, zp)
95 }
96 if err := it.Close(); err != nil {
97 return nil, 0, err
98 }
99 if rows == 0 || uint32(len(parts)) != nZipWant {
100 return nil, 0, os.ErrNotExist
101 }
102 sort.Sort(byZipIndex(parts))
103 for i, zp := range parts {
104 if zp.idx != uint32(i) {
105 log.Printf("blobpacked: discontiguous or overlapping index for wholeref %v", wholeRef)
106 return nil, 0, os.ErrNotExist
107 }
108 }
109 needSkip := offset
110 for len(parts) > 0 && needSkip >= int64(parts[0].len) {
111 needSkip -= int64(parts[0].len)
112 parts = parts[1:]
113 }
114 if len(parts) > 0 && needSkip > 0 {
115 parts[0].zipOff += uint32(needSkip)
116 parts[0].len -= uint32(needSkip)
117 }
118 rc = &wholeFromZips{
119 ctx: ctx,
120 src: s.large,
121 remain: wholeSize - offset,
122 zp: parts,
123 }
124 return rc, wholeSize, nil
125 }
126
127
128
129 type wholeFromZips struct {
130 ctx context.Context
131 src blob.SubFetcher
132 err error
133 closed bool
134 remain int64
135
136
137
138 cur io.ReadCloser
139 curRemain uint32
140
141
142 zp []zipPart
143 }
144
145 func (zr *wholeFromZips) Read(p []byte) (n int, err error) {
146 if zr.closed {
147 return 0, errors.New("blobpacked: Read on Closed wholeref reader")
148 }
149 zr.initCur()
150 if zr.err != nil {
151 return 0, zr.err
152 }
153 if uint32(len(p)) > zr.curRemain {
154 p = p[:zr.curRemain]
155 }
156 n, err = zr.cur.Read(p)
157 if int64(n) > int64(zr.curRemain) || n < 0 {
158 panic("Reader returned bogus number of bytes read")
159 }
160 zr.curRemain -= uint32(n)
161 zr.remain -= int64(n)
162 if zr.curRemain == 0 {
163 zr.cur.Close()
164 zr.cur = nil
165 }
166 if err == io.EOF && zr.remain > 0 {
167 err = nil
168 } else if err == nil && zr.remain == 0 {
169 err = io.EOF
170 }
171 return n, err
172 }
173
174 func (zr *wholeFromZips) initCur() {
175 if zr.err != nil || zr.cur != nil {
176 return
177 }
178 if zr.remain <= 0 {
179 zr.err = io.EOF
180 return
181 }
182 if len(zr.zp) == 0 {
183 zr.err = io.ErrUnexpectedEOF
184 return
185 }
186 zp := zr.zp[0]
187 zr.zp = zr.zp[1:]
188 rc, err := zr.src.SubFetch(zr.ctx, zp.zipRef, int64(zp.zipOff), int64(zp.len))
189 if err != nil {
190 if err == os.ErrNotExist {
191 err = fmt.Errorf("blobpacked: error opening next part of file: %v", err)
192 }
193 zr.err = err
194 return
195 }
196 zr.cur = rc
197 zr.curRemain = zp.len
198 }
199
200 func (zr *wholeFromZips) Close() error {
201 if zr.closed {
202 return nil
203 }
204 zr.closed = true
205 var err error
206 if zr.cur != nil {
207 err = zr.cur.Close()
208 zr.cur = nil
209 }
210 return err
211 }