Home Download Docs Code Community
     1	/*
     2	Copyright 2014 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	// Package archiver zips lots of little blobs into bigger zip files
    18	// and stores them somewhere. While generic, it was designed to
    19	// incrementally create Amazon Glacier archives from many little
    20	// blobs, rather than creating millions of Glacier archives.
    21	package archiver // import "perkeep.org/pkg/blobserver/archiver"
    22	
    23	import (
    24		"archive/zip"
    25		"bytes"
    26		"context"
    27		"errors"
    28		"io"
    29	
    30		"perkeep.org/pkg/blob"
    31		"perkeep.org/pkg/blobserver"
    32	)
    33	
    34	// DefaultMinZipSize is the default value of Archiver.MinZipSize.
    35	const DefaultMinZipSize = 16 << 20
    36	
    37	// An Archiver specifies the parameters of the job that copies from
    38	// one blobserver Storage (the Source) to long-term storage.
    39	type Archiver struct {
    40		// Source is where the blobs should come from.
    41		// (and be deleted from, if DeleteSourceAfterStore)
    42		Source blobserver.Storage
    43	
    44		// MinZipSize is the minimum size of zip files to create.
    45		// If zero, DefaultMinZipSize is used.
    46		MinZipSize int64
    47	
    48		// Store specifies a function that writes the zip file
    49		// (encoded in the byte slice) to permanent storage
    50		// (e.g. Amazon Glacier) and notes somewhere (a database) that
    51		// it contains the listed blobs. The blobs are redundant with
    52		// the filenames in the zip file, which will be named by
    53		// their blobref string, with no extension.
    54		Store func(zip []byte, blobs []blob.SizedRef) error
    55	
    56		// DeleteSourceAfterStore, if true, deletes the blobs from Source
    57		// after Store returns success.
    58		// This should pretty much always be set true, otherwise subsequent
    59		// calls to Run/RunOnce will generate the same archives. Wrap
    60		// the Source in a "namespace" storage if you don't actually
    61		// want to delete things locally.
    62		DeleteSourceAfterStore bool
    63	}
    64	
    65	// ErrSourceTooSmall is returned by RunOnce if there aren't enough blobs on Source
    66	// to warrant a new zip archive.
    67	var ErrSourceTooSmall = errors.New("archiver: not enough blob data on source to warrant a new zip archive")
    68	
    69	func (a *Archiver) zipSize() int64 {
    70		if a.MinZipSize > 0 {
    71			return a.MinZipSize
    72		}
    73		return DefaultMinZipSize
    74	}
    75	
    76	var errStopEnumerate = errors.New("sentinel return value")
    77	
    78	// RunOnce scans a.Source and conditionally creates a new zip.
    79	// It returns ErrSourceTooSmall if there aren't enough blobs on Source.
    80	func (a *Archiver) RunOnce(ctx context.Context) error {
    81		if a.Source == nil {
    82			return errors.New("archiver: nil Source")
    83		}
    84		if a.Store == nil {
    85			return errors.New("archiver: nil Store func")
    86		}
    87		pz := &potentialZip{a: a}
    88		err := blobserver.EnumerateAll(ctx, a.Source, func(sb blob.SizedRef) error {
    89			if err := pz.addBlob(ctx, sb); err != nil {
    90				return err
    91			}
    92			if pz.bigEnough() {
    93				return errStopEnumerate
    94			}
    95			return nil
    96		})
    97		if err == errStopEnumerate {
    98			err = nil
    99		}
   100		if err != nil {
   101			return err
   102		}
   103		if err := pz.condClose(); err != nil {
   104			return err
   105		}
   106		if !pz.bigEnough() {
   107			return ErrSourceTooSmall
   108		}
   109		if err := a.Store(pz.buf.Bytes(), pz.blobs); err != nil {
   110			return err
   111		}
   112		if a.DeleteSourceAfterStore {
   113			blobs := make([]blob.Ref, 0, len(pz.blobs))
   114			for _, sb := range pz.blobs {
   115				blobs = append(blobs, sb.Ref)
   116			}
   117			if err := a.Source.RemoveBlobs(ctx, blobs); err != nil {
   118				return err
   119			}
   120		}
   121		return nil
   122	}
   123	
   124	type potentialZip struct {
   125		a       *Archiver
   126		blobs   []blob.SizedRef
   127		zw      *zip.Writer  // nil until actually writing
   128		buf     bytes.Buffer // of the zip file
   129		sumSize int64        // of uncompressed bytes of blobs
   130		closed  bool
   131	}
   132	
   133	func (z *potentialZip) bigEnough() bool {
   134		return int64(z.buf.Len()) > z.a.zipSize()
   135	}
   136	
   137	func (z *potentialZip) condClose() error {
   138		if z.closed || z.zw == nil {
   139			return nil
   140		}
   141		z.closed = true
   142		return z.zw.Close()
   143	}
   144	
   145	func (z *potentialZip) addBlob(ctx context.Context, sb blob.SizedRef) error {
   146		if z.bigEnough() {
   147			return nil
   148		}
   149		z.sumSize += int64(sb.Size)
   150		if z.zw == nil && z.sumSize > z.a.zipSize() {
   151			z.zw = zip.NewWriter(&z.buf)
   152			for _, sb := range z.blobs {
   153				if err := z.writeZipBlob(ctx, sb); err != nil {
   154					return err
   155				}
   156			}
   157		}
   158		z.blobs = append(z.blobs, sb)
   159		if z.zw != nil {
   160			return z.writeZipBlob(ctx, sb)
   161		}
   162		return nil
   163	}
   164	
   165	func (z *potentialZip) writeZipBlob(ctx context.Context, sb blob.SizedRef) error {
   166		w, err := z.zw.CreateHeader(&zip.FileHeader{
   167			Name:   sb.Ref.String(),
   168			Method: zip.Deflate,
   169		})
   170		if err != nil {
   171			return err
   172		}
   173		blobSrc, _, err := z.a.Source.Fetch(ctx, sb.Ref)
   174		if err != nil {
   175			return err
   176		}
   177		defer blobSrc.Close()
   178		_, err = io.Copy(w, blobSrc)
   179		return err
   180	}
Website layout inspired by memcached.
Content by the authors.