Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	/*
    18	Package cond registers the "cond" conditional blobserver storage type
    19	to select routing of get/put operations on blobs to other storage
    20	targets as a function of their content.
    21	
    22	Currently only the "isSchema" predicate is defined.
    23	
    24	Example usage:
    25	
    26		  "/bs-and-maybe-also-index/": {
    27			"handler": "storage-cond",
    28			"handlerArgs": {
    29				"write": {
    30					"if": "isSchema",
    31					"then": "/bs-and-index/",
    32					"else": "/bs/"
    33				},
    34				"read": "/bs/"
    35			}
    36		  }
    37	*/
    38	package cond // import "perkeep.org/pkg/blobserver/cond"
    39	
    40	import (
    41		"bytes"
    42		"context"
    43		"errors"
    44		"fmt"
    45		"io"
    46		"time"
    47	
    48		"go4.org/jsonconfig"
    49		"perkeep.org/pkg/blob"
    50		"perkeep.org/pkg/blobserver"
    51		"perkeep.org/pkg/schema"
    52	)
    53	
    54	// A storageFunc selects a destination for a given blob. It may consume from src but must always return
    55	// a newSrc that is identical to the original src passed in.
    56	type storageFunc func(br blob.Ref, src io.Reader) (dest blobserver.Storage, newSrc io.Reader, err error)
    57	
    58	type condStorage struct {
    59		storageForReceive storageFunc
    60		read              blobserver.Storage
    61		remove            blobserver.Storage
    62	}
    63	
    64	func (sto *condStorage) StorageGeneration() (initTime time.Time, random string, err error) {
    65		if gener, ok := sto.read.(blobserver.Generationer); ok {
    66			return gener.StorageGeneration()
    67		}
    68		err = blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.read))
    69		return
    70	}
    71	
    72	func (sto *condStorage) ResetStorageGeneration() error {
    73		if gener, ok := sto.read.(blobserver.Generationer); ok {
    74			return gener.ResetStorageGeneration()
    75		}
    76		return blobserver.GenerationNotSupportedError(fmt.Sprintf("blobserver.Generationer not implemented on %T", sto.read))
    77	}
    78	
    79	func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (storage blobserver.Storage, err error) {
    80		sto := &condStorage{}
    81	
    82		receive := conf.OptionalStringOrObject("write")
    83		read := conf.RequiredString("read")
    84		remove := conf.OptionalString("remove", "")
    85		if err := conf.Validate(); err != nil {
    86			return nil, err
    87		}
    88	
    89		if receive != nil {
    90			sto.storageForReceive, err = buildStorageForReceive(ld, receive)
    91			if err != nil {
    92				return
    93			}
    94		}
    95	
    96		sto.read, err = ld.GetStorage(read)
    97		if err != nil {
    98			return
    99		}
   100	
   101		if remove != "" {
   102			sto.remove, err = ld.GetStorage(remove)
   103			if err != nil {
   104				return
   105			}
   106		}
   107		return sto, nil
   108	}
   109	
   110	func buildStorageForReceive(ld blobserver.Loader, confOrString interface{}) (storageFunc, error) {
   111		// Static configuration from a string
   112		if s, ok := confOrString.(string); ok {
   113			sto, err := ld.GetStorage(s)
   114			if err != nil {
   115				return nil, err
   116			}
   117			f := func(br blob.Ref, src io.Reader) (blobserver.Storage, io.Reader, error) {
   118				return sto, src, nil
   119			}
   120			return f, nil
   121		}
   122	
   123		conf := jsonconfig.Obj(confOrString.(map[string]interface{}))
   124	
   125		ifStr := conf.RequiredString("if")
   126		// TODO: let 'then' and 'else' point to not just strings but either
   127		// a string or a JSON object with another condition, and then
   128		// call buildStorageForReceive on it recursively
   129		thenTarget := conf.RequiredString("then")
   130		elseTarget := conf.RequiredString("else")
   131		if err := conf.Validate(); err != nil {
   132			return nil, err
   133		}
   134		thenSto, err := ld.GetStorage(thenTarget)
   135		if err != nil {
   136			return nil, err
   137		}
   138		elseSto, err := ld.GetStorage(elseTarget)
   139		if err != nil {
   140			return nil, err
   141		}
   142	
   143		switch ifStr {
   144		case "isSchema":
   145			return isSchemaPicker(thenSto, elseSto), nil
   146		}
   147		return nil, fmt.Errorf("cond: unsupported 'if' type of %q", ifStr)
   148	}
   149	
   150	func isSchemaPicker(thenSto, elseSto blobserver.Storage) storageFunc {
   151		return func(br blob.Ref, src io.Reader) (dest blobserver.Storage, newSrc io.Reader, err error) {
   152			var buf bytes.Buffer
   153			blob, err := schema.BlobFromReader(br, io.TeeReader(src, &buf))
   154			newSrc = io.MultiReader(bytes.NewReader(buf.Bytes()), src)
   155			if err != nil || blob.Type() == "" {
   156				return elseSto, newSrc, nil
   157			}
   158			return thenSto, newSrc, nil
   159		}
   160	}
   161	
   162	func (sto *condStorage) ReceiveBlob(ctx context.Context, br blob.Ref, src io.Reader) (sb blob.SizedRef, err error) {
   163		destSto, src, err := sto.storageForReceive(br, src)
   164		if err != nil {
   165			return
   166		}
   167		return blobserver.Receive(ctx, destSto, br, src)
   168	}
   169	
   170	func (sto *condStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
   171		if sto.remove != nil {
   172			return sto.remove.RemoveBlobs(ctx, blobs)
   173		}
   174		return errors.New("cond: Remove not configured")
   175	}
   176	
   177	func (sto *condStorage) Fetch(ctx context.Context, b blob.Ref) (file io.ReadCloser, size uint32, err error) {
   178		if sto.read != nil {
   179			return sto.read.Fetch(ctx, b)
   180		}
   181		err = errors.New("cond: Read not configured")
   182		return
   183	}
   184	
   185	func (sto *condStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
   186		if sto.read != nil {
   187			return sto.read.StatBlobs(ctx, blobs, fn)
   188		}
   189		return errors.New("cond: Read not configured")
   190	}
   191	
   192	func (sto *condStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
   193		if sto.read != nil {
   194			return sto.read.EnumerateBlobs(ctx, dest, after, limit)
   195		}
   196		return errors.New("cond: Read not configured")
   197	}
   198	
   199	func init() {
   200		blobserver.RegisterStorageConstructor("cond", blobserver.StorageConstructor(newFromConfig))
   201	}
Website layout inspired by memcached.
Content by the authors.