1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package blob
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "io"
24 "sync/atomic"
25 "unicode/utf8"
26
27 "perkeep.org/pkg/constants"
28 )
29
30
31
32 type Blob struct {
33 ref Ref
34 size uint32
35 readAll func(context.Context) ([]byte, error)
36 mem atomic.Value
37 }
38
39
40
41
42
43 func NewBlob(ref Ref, size uint32, readAll func(ctx context.Context) ([]byte, error)) *Blob {
44 return &Blob{
45 ref: ref,
46 size: size,
47 readAll: readAll,
48 }
49 }
50
51
52 func (b *Blob) Size() uint32 {
53 return b.size
54 }
55
56
57 func (b *Blob) SizedRef() SizedRef {
58 return SizedRef{b.ref, b.size}
59 }
60
61
62 func (b *Blob) Ref() Ref { return b.ref }
63
64
65
66
67
68
69 func (b *Blob) ReadAll(ctx context.Context) (*bytes.Reader, error) {
70 mem, err := b.getMem(ctx)
71 if err != nil {
72 return nil, err
73 }
74 return bytes.NewReader(mem), nil
75 }
76
77 func (b *Blob) getMem(ctx context.Context) ([]byte, error) {
78 mem, ok := b.mem.Load().([]byte)
79 if ok {
80 return mem, nil
81 }
82 mem, err := b.readAll(ctx)
83 if err != nil {
84 return nil, err
85 }
86 if uint32(len(mem)) != b.size {
87 return nil, fmt.Errorf("Blob.ReadAll read %d bytes of %v; expected %d", len(mem), b.ref, b.size)
88 }
89 b.mem.Store(mem)
90 return mem, nil
91 }
92
93
94
95 func (b *Blob) ValidContents(ctx context.Context) error {
96 mem, err := b.getMem(ctx)
97 if err != nil {
98 return err
99 }
100 h := b.ref.Hash()
101 h.Write(mem)
102 if !b.ref.HashMatches(h) {
103 return fmt.Errorf("blob contents don't match digest for ref %v", b.ref)
104 }
105 return nil
106 }
107
108
109 func (b *Blob) IsUTF8(ctx context.Context) (bool, error) {
110 mem, err := b.getMem(ctx)
111 if err != nil {
112 return false, err
113 }
114 return utf8.Valid(mem), nil
115 }
116
117
118
119
120 func FromFetcher(ctx context.Context, fetcher Fetcher, br Ref) (*Blob, error) {
121 rc, size, err := fetcher.Fetch(ctx, br)
122 if err != nil {
123 return nil, err
124 }
125 defer rc.Close()
126 return FromReader(ctx, br, rc, size)
127 }
128
129
130
131
132 func FromReader(ctx context.Context, br Ref, r io.Reader, size uint32) (*Blob, error) {
133 if size > constants.MaxBlobSize {
134 return nil, fmt.Errorf("blob: %v with reported size %d is over max size of %d", br, size, constants.MaxBlobSize)
135 }
136 buf := make([]byte, size)
137
138 if n, err := io.ReadFull(r, buf); err != nil {
139 return nil, fmt.Errorf("blob: after reading %d bytes of %v: %v", n, br, err)
140 }
141 n, _ := io.CopyN(io.Discard, r, 1)
142 if n > 0 {
143 return nil, fmt.Errorf("blob: %v had more than reported %d bytes", br, size)
144 }
145 b := NewBlob(br, uint32(size), func(context.Context) ([]byte, error) {
146 return buf, nil
147 })
148 b.mem.Store(buf)
149 return b, nil
150 }