Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package blobserver
    18	
    19	import (
    20		"sync"
    21		"time"
    22	
    23		"perkeep.org/pkg/blob"
    24	
    25		"go4.org/syncutil"
    26	)
    27	
    28	type BlobHub interface {
    29		// TODO(bradfitz): no need for this to be an interface anymore. make GetHub return
    30		// a concrete type instead? also, maybe remove all the async listener stuff, or
    31		// audit its users and remove anything that is now unnecessary.
    32	
    33		// NotifyBlobReceived notes that a storage target has successfully received
    34		// a blob and asynchronously notifies registered listeners.
    35		//
    36		// If any synchronous receive hooks are registered, they're run before
    37		// NotifyBlobReceived returns and their error is returned.
    38		NotifyBlobReceived(blob.SizedRef) error
    39	
    40		// AddReceiveHook adds a hook that is synchronously run
    41		// whenever blobs are received.  All registered hooks are run
    42		// on each blob upload but if more than one returns an error,
    43		// NotifyBlobReceived will only return one of the errors.
    44		AddReceiveHook(func(blob.SizedRef) error)
    45	
    46		RegisterListener(ch chan<- blob.Ref)
    47		UnregisterListener(ch chan<- blob.Ref)
    48	
    49		RegisterBlobListener(blob blob.Ref, ch chan<- blob.Ref)
    50		UnregisterBlobListener(blob blob.Ref, ch chan<- blob.Ref)
    51	}
    52	
    53	var (
    54		hubmu  sync.RWMutex
    55		stohub = map[interface{}]BlobHub{} // Storage -> hub
    56	)
    57	
    58	// GetHub return a BlobHub for the given storage implementation.
    59	func GetHub(storage interface{}) BlobHub {
    60		if h, ok := getHub(storage); ok {
    61			return h
    62		}
    63		hubmu.Lock()
    64		defer hubmu.Unlock()
    65		h, ok := stohub[storage]
    66		if ok {
    67			return h
    68		}
    69		h = new(memHub)
    70		stohub[storage] = h
    71		return h
    72	}
    73	
    74	func getHub(storage interface{}) (BlobHub, bool) {
    75		hubmu.RLock()
    76		defer hubmu.RUnlock()
    77		h, ok := stohub[storage]
    78		return h, ok
    79	}
    80	
    81	// canLongPoll is set to false on App Engine. (multi-process environment...)
    82	var canLongPoll = true
    83	
    84	// WaitForBlob waits until deadline for blobs to arrive. If blobs is empty, any
    85	// blobs are waited on.  Otherwise, those specific blobs are waited on.
    86	// When WaitForBlob returns, nothing may have happened.
    87	func WaitForBlob(storage interface{}, deadline time.Time, blobs []blob.Ref) {
    88		hub := GetHub(storage)
    89		ch := make(chan blob.Ref, 1)
    90		if len(blobs) == 0 {
    91			hub.RegisterListener(ch)
    92			defer hub.UnregisterListener(ch)
    93		}
    94		for _, br := range blobs {
    95			hub.RegisterBlobListener(br, ch)
    96			defer hub.UnregisterBlobListener(br, ch)
    97		}
    98		var tc <-chan time.Time
    99		if !canLongPoll {
   100			tc = time.After(2 * time.Second)
   101		}
   102	
   103		t := time.NewTimer(time.Until(deadline))
   104		defer t.Stop()
   105	
   106		select {
   107		case <-ch:
   108		case <-tc:
   109		case <-t.C:
   110		}
   111	}
   112	
   113	type memHub struct {
   114		mu sync.RWMutex
   115	
   116		hooks         []func(blob.SizedRef) error
   117		listeners     map[chan<- blob.Ref]bool
   118		blobListeners map[blob.Ref]map[chan<- blob.Ref]bool
   119	}
   120	
   121	func (h *memHub) NotifyBlobReceived(sb blob.SizedRef) error {
   122		h.mu.RLock()
   123		defer h.mu.RUnlock()
   124	
   125		br := sb.Ref
   126	
   127		// Synchronous hooks. If error, prevents notifying other
   128		// subscribers.
   129		var grp syncutil.Group
   130		for i := range h.hooks {
   131			hook := h.hooks[i]
   132			grp.Go(func() error { return hook(sb) })
   133		}
   134		if err := grp.Err(); err != nil {
   135			return err
   136		}
   137	
   138		// Global listeners
   139		for ch := range h.listeners {
   140			ch := ch
   141			go func() { ch <- br }()
   142		}
   143	
   144		// Blob-specific listeners
   145		for ch := range h.blobListeners[br] {
   146			ch := ch
   147			go func() { ch <- br }()
   148		}
   149		return nil
   150	}
   151	
   152	func (h *memHub) RegisterListener(ch chan<- blob.Ref) {
   153		h.mu.Lock()
   154		defer h.mu.Unlock()
   155		if h.listeners == nil {
   156			h.listeners = make(map[chan<- blob.Ref]bool)
   157		}
   158		h.listeners[ch] = true
   159	}
   160	
   161	func (h *memHub) UnregisterListener(ch chan<- blob.Ref) {
   162		h.mu.Lock()
   163		defer h.mu.Unlock()
   164		if h.listeners == nil {
   165			panic("blobhub: UnregisterListener called without RegisterListener")
   166		}
   167		delete(h.listeners, ch)
   168	}
   169	
   170	func (h *memHub) RegisterBlobListener(br blob.Ref, ch chan<- blob.Ref) {
   171		h.mu.Lock()
   172		defer h.mu.Unlock()
   173		if h.blobListeners == nil {
   174			h.blobListeners = make(map[blob.Ref]map[chan<- blob.Ref]bool)
   175		}
   176		_, ok := h.blobListeners[br]
   177		if !ok {
   178			h.blobListeners[br] = make(map[chan<- blob.Ref]bool)
   179		}
   180		h.blobListeners[br][ch] = true
   181	}
   182	
   183	func (h *memHub) UnregisterBlobListener(br blob.Ref, ch chan<- blob.Ref) {
   184		h.mu.Lock()
   185		defer h.mu.Unlock()
   186		if h.blobListeners == nil {
   187			panic("blobhub: UnregisterBlobListener called without RegisterBlobListener")
   188		}
   189		set, ok := h.blobListeners[br]
   190		if !ok {
   191			panic("blobhub: UnregisterBlobListener called without RegisterBlobListener for " + br.String())
   192		}
   193		delete(set, ch)
   194		if len(set) == 0 {
   195			delete(h.blobListeners, br)
   196		}
   197	}
   198	
   199	func (h *memHub) AddReceiveHook(hook func(blob.SizedRef) error) {
   200		h.mu.Lock()
   201		defer h.mu.Unlock()
   202		h.hooks = append(h.hooks, hook)
   203	}
Website layout inspired by memcached.
Content by the authors.