Home Download Docs Code Community
     1	/*
     2	Copyright 2018 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package blobserver
    18	
    19	import (
    20		"context"
    21		"os"
    22		"sync"
    23	
    24		"go4.org/syncutil"
    25	
    26		"perkeep.org/pkg/blob"
    27	)
    28	
    29	// StatBlob calls bs.StatBlobs to stat a single blob.
    30	// If the blob is not found, the error is os.ErrNotExist.
    31	func StatBlob(ctx context.Context, bs BlobStatter, br blob.Ref) (blob.SizedRef, error) {
    32		var ret blob.SizedRef
    33		err := bs.StatBlobs(ctx, []blob.Ref{br}, func(sb blob.SizedRef) error {
    34			ret = sb
    35			return nil
    36		})
    37		if err == nil && !ret.Ref.Valid() {
    38			err = os.ErrNotExist
    39		}
    40		return ret, err
    41	}
    42	
    43	// StatBlobs stats multiple blobs and returns a map
    44	// of the found refs to their sizes.
    45	func StatBlobs(ctx context.Context, bs BlobStatter, blobs []blob.Ref) (map[blob.Ref]blob.SizedRef, error) {
    46		var m map[blob.Ref]blob.SizedRef
    47		err := bs.StatBlobs(ctx, blobs, func(sb blob.SizedRef) error {
    48			if m == nil {
    49				m = make(map[blob.Ref]blob.SizedRef)
    50			}
    51			m[sb.Ref] = sb
    52			return nil
    53		})
    54		return m, err
    55	}
    56	
    57	// StatBlobsParallelHelper is for use by blobserver implementations
    58	// that want to issue stats in parallel.  This runs worker in multiple
    59	// goroutines (bounded by gate), but calls fn in serial, per the
    60	// BlobStatter contract, and stops once there's a failure.
    61	//
    62	// The worker func should return two zero values to signal that a blob
    63	// doesn't exist. (This is different than the StatBlob func, which
    64	// returns os.ErrNotExist)
    65	func StatBlobsParallelHelper(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error,
    66		gate *syncutil.Gate, worker func(blob.Ref) (blob.SizedRef, error)) error {
    67		if len(blobs) == 0 {
    68			return nil
    69		}
    70	
    71		ctx, cancel := context.WithCancel(ctx)
    72		defer cancel()
    73	
    74		var fnMu sync.Mutex // serializes calls to fn
    75	
    76		var wg syncutil.Group
    77		for i := range blobs {
    78			gate.Start()
    79			b := blobs[i]
    80	
    81			select {
    82			case <-ctx.Done():
    83				// If a previous failed, stop.
    84				break
    85			default:
    86			}
    87	
    88			wg.Go(func() error {
    89				defer gate.Done()
    90	
    91				sb, err := worker(b)
    92				if err != nil {
    93					cancel()
    94					return err
    95				}
    96				if !sb.Valid() {
    97					// not found.
    98					return nil
    99				}
   100	
   101				fnMu.Lock()
   102				defer fnMu.Unlock()
   103	
   104				select {
   105				case <-ctx.Done():
   106					// If a previous failed, stop.
   107					return ctx.Err()
   108				default:
   109				}
   110	
   111				if err := fn(sb); err != nil {
   112					cancel() // stop others from running
   113					return err
   114				}
   115				return nil
   116			})
   117		}
   118	
   119		if err := wg.Err(); err != nil {
   120			return err
   121		}
   122		select {
   123		case <-ctx.Done():
   124			return ctx.Err()
   125		default:
   126			return nil
   127		}
   128	}
Website layout inspired by memcached.
Content by the authors.