1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 package blobserver
18
19 import (
20 "sync"
21 "time"
22
23 "perkeep.org/pkg/blob"
24
25 "go4.org/syncutil"
26 )
27
28 type BlobHub interface {
29
30
31
32
33
34
35
36
37
38 NotifyBlobReceived(blob.SizedRef) error
39
40
41
42
43
44 AddReceiveHook(func(blob.SizedRef) error)
45
46 RegisterListener(ch chan<- blob.Ref)
47 UnregisterListener(ch chan<- blob.Ref)
48
49 RegisterBlobListener(blob blob.Ref, ch chan<- blob.Ref)
50 UnregisterBlobListener(blob blob.Ref, ch chan<- blob.Ref)
51 }
52
53 var (
54 hubmu sync.RWMutex
55 stohub = map[interface{}]BlobHub{}
56 )
57
58
59 func GetHub(storage interface{}) BlobHub {
60 if h, ok := getHub(storage); ok {
61 return h
62 }
63 hubmu.Lock()
64 defer hubmu.Unlock()
65 h, ok := stohub[storage]
66 if ok {
67 return h
68 }
69 h = new(memHub)
70 stohub[storage] = h
71 return h
72 }
73
74 func getHub(storage interface{}) (BlobHub, bool) {
75 hubmu.RLock()
76 defer hubmu.RUnlock()
77 h, ok := stohub[storage]
78 return h, ok
79 }
80
81
82 var canLongPoll = true
83
84
85
86
87 func WaitForBlob(storage interface{}, deadline time.Time, blobs []blob.Ref) {
88 hub := GetHub(storage)
89 ch := make(chan blob.Ref, 1)
90 if len(blobs) == 0 {
91 hub.RegisterListener(ch)
92 defer hub.UnregisterListener(ch)
93 }
94 for _, br := range blobs {
95 hub.RegisterBlobListener(br, ch)
96 defer hub.UnregisterBlobListener(br, ch)
97 }
98 var tc <-chan time.Time
99 if !canLongPoll {
100 tc = time.After(2 * time.Second)
101 }
102
103 t := time.NewTimer(time.Until(deadline))
104 defer t.Stop()
105
106 select {
107 case <-ch:
108 case <-tc:
109 case <-t.C:
110 }
111 }
112
113 type memHub struct {
114 mu sync.RWMutex
115
116 hooks []func(blob.SizedRef) error
117 listeners map[chan<- blob.Ref]bool
118 blobListeners map[blob.Ref]map[chan<- blob.Ref]bool
119 }
120
121 func (h *memHub) NotifyBlobReceived(sb blob.SizedRef) error {
122 h.mu.RLock()
123 defer h.mu.RUnlock()
124
125 br := sb.Ref
126
127
128
129 var grp syncutil.Group
130 for i := range h.hooks {
131 hook := h.hooks[i]
132 grp.Go(func() error { return hook(sb) })
133 }
134 if err := grp.Err(); err != nil {
135 return err
136 }
137
138
139 for ch := range h.listeners {
140 ch := ch
141 go func() { ch <- br }()
142 }
143
144
145 for ch := range h.blobListeners[br] {
146 ch := ch
147 go func() { ch <- br }()
148 }
149 return nil
150 }
151
152 func (h *memHub) RegisterListener(ch chan<- blob.Ref) {
153 h.mu.Lock()
154 defer h.mu.Unlock()
155 if h.listeners == nil {
156 h.listeners = make(map[chan<- blob.Ref]bool)
157 }
158 h.listeners[ch] = true
159 }
160
161 func (h *memHub) UnregisterListener(ch chan<- blob.Ref) {
162 h.mu.Lock()
163 defer h.mu.Unlock()
164 if h.listeners == nil {
165 panic("blobhub: UnregisterListener called without RegisterListener")
166 }
167 delete(h.listeners, ch)
168 }
169
170 func (h *memHub) RegisterBlobListener(br blob.Ref, ch chan<- blob.Ref) {
171 h.mu.Lock()
172 defer h.mu.Unlock()
173 if h.blobListeners == nil {
174 h.blobListeners = make(map[blob.Ref]map[chan<- blob.Ref]bool)
175 }
176 _, ok := h.blobListeners[br]
177 if !ok {
178 h.blobListeners[br] = make(map[chan<- blob.Ref]bool)
179 }
180 h.blobListeners[br][ch] = true
181 }
182
183 func (h *memHub) UnregisterBlobListener(br blob.Ref, ch chan<- blob.Ref) {
184 h.mu.Lock()
185 defer h.mu.Unlock()
186 if h.blobListeners == nil {
187 panic("blobhub: UnregisterBlobListener called without RegisterBlobListener")
188 }
189 set, ok := h.blobListeners[br]
190 if !ok {
191 panic("blobhub: UnregisterBlobListener called without RegisterBlobListener for " + br.String())
192 }
193 delete(set, ch)
194 if len(set) == 0 {
195 delete(h.blobListeners, br)
196 }
197 }
198
199 func (h *memHub) AddReceiveHook(hook func(blob.SizedRef) error) {
200 h.mu.Lock()
201 defer h.mu.Unlock()
202 h.hooks = append(h.hooks, hook)
203 }