1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18
19 package stats
20
21 import (
22 "context"
23 "io"
24 "sort"
25 "sync"
26
27 "perkeep.org/pkg/blob"
28 )
29
30
31
32
33
34
35
36 type Receiver struct {
37 sync.Mutex
38 Have map[blob.Ref]int64
39 }
40
41 func (sr *Receiver) NumBlobs() int {
42 sr.Lock()
43 defer sr.Unlock()
44 return len(sr.Have)
45 }
46
47
48 func (sr *Receiver) Sizes() []int {
49 sr.Lock()
50 defer sr.Unlock()
51 sizes := make([]int, 0, len(sr.Have))
52 for _, size := range sr.Have {
53 sizes = append(sizes, int(size))
54 }
55 sort.Ints(sizes)
56 return sizes
57 }
58
59 func (sr *Receiver) SumBlobSize() int64 {
60 sr.Lock()
61 defer sr.Unlock()
62 var sum int64
63 for _, v := range sr.Have {
64 sum += v
65 }
66 return sum
67 }
68
69 func (sr *Receiver) ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (sb blob.SizedRef, err error) {
70 n, err := io.Copy(io.Discard, source)
71 if err != nil {
72 return
73 }
74 return sr.ReceiveRef(br, n)
75 }
76
77 func (sr *Receiver) ReceiveRef(br blob.Ref, size int64) (sb blob.SizedRef, err error) {
78 sr.Lock()
79 defer sr.Unlock()
80 if sr.Have == nil {
81 sr.Have = make(map[blob.Ref]int64)
82 }
83 sr.Have[br] = size
84 return blob.SizedRef{Ref: br, Size: uint32(size)}, nil
85 }
86
87 func (sr *Receiver) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
88 var sized []blob.SizedRef
89 sr.Lock()
90 for _, br := range blobs {
91 if size, ok := sr.Have[br]; ok {
92 sized = append(sized, blob.SizedRef{Ref: br, Size: uint32(size)})
93 }
94 }
95 sr.Unlock()
96
97
98 for _, sb := range sized {
99 if err := fn(sb); err != nil {
100 return err
101 }
102 }
103 return nil
104 }
105
106 func (sr *Receiver) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
107 sr.Lock()
108 defer sr.Unlock()
109 for _, br := range blobs {
110 delete(sr.Have, br)
111 }
112
113 return nil
114 }
115
116 func (sr *Receiver) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
117 sr.Lock()
118 defer sr.Unlock()
119 defer close(dest)
120
121 refs := blob.SizedByRef{}
122 for ref, size := range sr.Have {
123 if after != "" && ref.String() <= after {
124 continue
125 }
126 refs = append(refs, blob.SizedRef{Ref: ref, Size: uint32(size)})
127 }
128 sort.Sort(refs)
129
130 if len(refs) == 0 {
131 return nil
132 }
133
134 if len(refs) <= limit {
135 limit = len(refs)
136 }
137
138 for _, sb := range refs[:limit] {
139 dest <- sb
140 }
141
142 return nil
143 }