1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package blobserver
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "hash"
24 "io"
25 "strings"
26
27 "perkeep.org/pkg/blob"
28 )
29
30
31 var ErrReadonly = errors.New("this blobserver is read only")
32
33
34
35 func ReceiveString(ctx context.Context, dst BlobReceiver, s string) (blob.SizedRef, error) {
36 return Receive(ctx, dst, blob.RefFromString(s), strings.NewReader(s))
37 }
38
39
40
41
42
43 func Receive(ctx context.Context, dst BlobReceiver, br blob.Ref, src io.Reader) (blob.SizedRef, error) {
44 return receive(ctx, dst, br, src, true)
45 }
46
47 func ReceiveNoHash(ctx context.Context, dst BlobReceiver, br blob.Ref, src io.Reader) (blob.SizedRef, error) {
48 return receive(ctx, dst, br, src, false)
49 }
50
51 func receive(ctx context.Context, dst BlobReceiver, br blob.Ref, src io.Reader, checkHash bool) (sb blob.SizedRef, err error) {
52 src = io.LimitReader(src, MaxBlobSize)
53 if checkHash {
54 h := br.Hash()
55 if h == nil {
56 return sb, fmt.Errorf("invalid blob type %v; no registered hash function", br)
57 }
58 src = &checkHashReader{h, br, src, false}
59 }
60 sb, err = dst.ReceiveBlob(ctx, br, src)
61 if err != nil {
62 if checkHash && src.(*checkHashReader).corrupt {
63 err = ErrCorruptBlob
64 }
65 return
66 }
67 err = GetHub(dst).NotifyBlobReceived(sb)
68 return
69 }
70
71
72
73
74 type checkHashReader struct {
75 h hash.Hash
76 br blob.Ref
77 src io.Reader
78 corrupt bool
79 }
80
81 func (c *checkHashReader) Read(p []byte) (n int, err error) {
82 n, err = c.src.Read(p)
83 c.h.Write(p[:n])
84 if err == io.EOF && !c.br.HashMatches(c.h) {
85 err = ErrCorruptBlob
86 c.corrupt = true
87 }
88 return
89 }