1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package blobserver
18
19 import (
20 "context"
21 "os"
22 "sync"
23
24 "go4.org/syncutil"
25
26 "perkeep.org/pkg/blob"
27 )
28
29
30
31 func StatBlob(ctx context.Context, bs BlobStatter, br blob.Ref) (blob.SizedRef, error) {
32 var ret blob.SizedRef
33 err := bs.StatBlobs(ctx, []blob.Ref{br}, func(sb blob.SizedRef) error {
34 ret = sb
35 return nil
36 })
37 if err == nil && !ret.Ref.Valid() {
38 err = os.ErrNotExist
39 }
40 return ret, err
41 }
42
43
44
45 func StatBlobs(ctx context.Context, bs BlobStatter, blobs []blob.Ref) (map[blob.Ref]blob.SizedRef, error) {
46 var m map[blob.Ref]blob.SizedRef
47 err := bs.StatBlobs(ctx, blobs, func(sb blob.SizedRef) error {
48 if m == nil {
49 m = make(map[blob.Ref]blob.SizedRef)
50 }
51 m[sb.Ref] = sb
52 return nil
53 })
54 return m, err
55 }
56
57
58
59
60
61
62
63
64
65 func StatBlobsParallelHelper(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error,
66 gate *syncutil.Gate, worker func(blob.Ref) (blob.SizedRef, error)) error {
67 if len(blobs) == 0 {
68 return nil
69 }
70
71 ctx, cancel := context.WithCancel(ctx)
72 defer cancel()
73
74 var fnMu sync.Mutex
75
76 var wg syncutil.Group
77 for i := range blobs {
78 gate.Start()
79 b := blobs[i]
80
81 select {
82 case <-ctx.Done():
83
84 break
85 default:
86 }
87
88 wg.Go(func() error {
89 defer gate.Done()
90
91 sb, err := worker(b)
92 if err != nil {
93 cancel()
94 return err
95 }
96 if !sb.Valid() {
97
98 return nil
99 }
100
101 fnMu.Lock()
102 defer fnMu.Unlock()
103
104 select {
105 case <-ctx.Done():
106
107 return ctx.Err()
108 default:
109 }
110
111 if err := fn(sb); err != nil {
112 cancel()
113 return err
114 }
115 return nil
116 })
117 }
118
119 if err := wg.Err(); err != nil {
120 return err
121 }
122 select {
123 case <-ctx.Done():
124 return ctx.Err()
125 default:
126 return nil
127 }
128 }