Home Download Docs Code Community
     1	/*
     2	Copyright 2014 The Perkeep AUTHORS
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package blobpacked
    18	
    19	import (
    20		"context"
    21		"errors"
    22		"fmt"
    23		"io"
    24		"strconv"
    25		"strings"
    26	
    27		"perkeep.org/pkg/blob"
    28		"perkeep.org/pkg/blobserver"
    29	)
    30	
    31	// StreamBlobs impl.
    32	
    33	func (s *storage) StreamBlobs(ctx context.Context, dest chan<- blobserver.BlobAndToken, contToken string) (err error) {
    34		return blobserver.NewMultiBlobStreamer(
    35			smallBlobStreamer{s},
    36			largeBlobStreamer{s},
    37		).StreamBlobs(ctx, dest, contToken)
    38	}
    39	
    40	type smallBlobStreamer struct{ sto *storage }
    41	type largeBlobStreamer struct{ sto *storage }
    42	
    43	// stream the loose blobs
    44	func (st smallBlobStreamer) StreamBlobs(ctx context.Context, dest chan<- blobserver.BlobAndToken, contToken string) (err error) {
    45		small := st.sto.small
    46		if bs, ok := small.(blobserver.BlobStreamer); ok {
    47			return bs.StreamBlobs(ctx, dest, contToken)
    48		}
    49		defer close(dest)
    50		donec := ctx.Done()
    51		return blobserver.EnumerateAllFrom(ctx, small, contToken, func(sb blob.SizedRef) error {
    52			select {
    53			case dest <- blobserver.BlobAndToken{
    54				Blob: blob.NewBlob(sb.Ref, sb.Size, func(ctx context.Context) ([]byte, error) {
    55					return slurpSizedRef(ctx, small, sb)
    56				}),
    57				Token: sb.Ref.StringMinusOne(), // streamer is >=, enumerate is >
    58			}:
    59				return nil
    60			case <-donec:
    61				return ctx.Err()
    62			}
    63		})
    64	}
    65	
    66	var errContToken = errors.New("blobpacked: bad continuation token")
    67	
    68	// contToken is of forms:
    69	//
    70	//	""                : start from beginning of zip files
    71	//	"sha1-xxxxx:n"    : start at == (sha1-xxxx, file n), else next zip
    72	func (st largeBlobStreamer) StreamBlobs(ctx context.Context, dest chan<- blobserver.BlobAndToken, contToken string) (err error) {
    73		defer close(dest)
    74		s := st.sto
    75		large := s.large
    76	
    77		var after string // for enumerateAll
    78		var skipFiles int
    79		var firstRef blob.Ref // first we care about
    80	
    81		if contToken != "" {
    82			f := strings.SplitN(contToken, ":", 2)
    83			if len(f) != 2 {
    84				return errContToken
    85			}
    86			firstRef, _ = blob.Parse(f[0])
    87			skipFiles, err = strconv.Atoi(f[1])
    88			if !firstRef.Valid() || err != nil {
    89				return errContToken
    90			}
    91			// EnumerateAllFrom takes a cursor that's greater, but
    92			// we want to start _at_ firstRef. So start
    93			// enumerating right before our target.
    94			after = firstRef.StringMinusOne()
    95		}
    96		return blobserver.EnumerateAllFrom(ctx, large, after, func(sb blob.SizedRef) error {
    97			if firstRef.Valid() {
    98				if sb.Ref.Less(firstRef) {
    99					// Skip.
   100					return nil
   101				}
   102				if firstRef.Less(sb.Ref) {
   103					skipFiles = 0 // reset it.
   104				}
   105			}
   106			fileN := 0
   107			return s.foreachZipBlob(ctx, sb.Ref, func(bap BlobAndPos) error {
   108				if skipFiles > 0 {
   109					skipFiles--
   110					fileN++
   111					return nil
   112				}
   113				select {
   114				case dest <- blobserver.BlobAndToken{
   115					Blob: blob.NewBlob(bap.Ref, bap.Size, func(ctx context.Context) ([]byte, error) {
   116						return slurpSizedRef(ctx, s, bap.SizedRef)
   117					}),
   118					Token: fmt.Sprintf("%s:%d", sb.Ref, fileN),
   119				}:
   120					fileN++
   121					return nil
   122				case <-ctx.Done():
   123					return ctx.Err()
   124				}
   125			})
   126		})
   127	}
   128	
   129	func slurpSizedRef(ctx context.Context, f blob.Fetcher, sb blob.SizedRef) ([]byte, error) {
   130		rc, size, err := f.Fetch(ctx, sb.Ref)
   131		if err != nil {
   132			return nil, err
   133		}
   134		defer rc.Close()
   135		if size != sb.Size {
   136			return nil, fmt.Errorf("blobpacked fetch of %v reported %d bytes; expected %d", sb.Ref, size, sb.Size)
   137		}
   138		slurp, err := io.ReadAll(rc)
   139		if err != nil {
   140			return nil, err
   141		}
   142		if uint32(len(slurp)) != sb.Size {
   143			return nil, fmt.Errorf("blobpacked read %d bytes of %v; expected %d", len(slurp), sb.Ref, sb.Size)
   144		}
   145		return slurp, nil
   146	}
Website layout inspired by memcached.
Content by the authors.