Home Download Docs Code Community
     1	/*
     2	Copyright 2013 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	// Package storagetest tests blobserver.Storage implementations
    18	package storagetest // import "perkeep.org/pkg/blobserver/storagetest"
    19	
    20	import (
    21		"context"
    22		"errors"
    23		"fmt"
    24		"io"
    25		"reflect"
    26		"sort"
    27		"strconv"
    28		"strings"
    29		"testing"
    30		"time"
    31	
    32		"perkeep.org/pkg/blob"
    33		"perkeep.org/pkg/blobserver"
    34		"perkeep.org/pkg/test"
    35	
    36		"go4.org/syncutil"
    37	)
    38	
    39	type Opts struct {
    40		// New is required and must return the storage server to test, along with a func to
    41		// clean it up. The cleanup may be nil.
    42		New func(*testing.T) (sto blobserver.Storage, cleanup func())
    43	
    44		// Retries specifies how long to wait to retry after each failure
    45		// that may be an eventual consistency issue (enumerate, stat), etc.
    46		Retries []time.Duration
    47	
    48		SkipEnum bool // for when EnumerateBlobs is not implemented
    49	}
    50	
    51	func Test(t *testing.T, fn func(*testing.T) (sto blobserver.Storage, cleanup func())) {
    52		TestOpt(t, Opts{New: fn})
    53	}
    54	
    55	type run struct {
    56		t   *testing.T
    57		opt Opts
    58		sto blobserver.Storage
    59	}
    60	
    61	func TestOpt(t *testing.T, opt Opts) {
    62		sto, cleanup := opt.New(t)
    63		defer func() {
    64			if t.Failed() {
    65				t.Logf("test %T FAILED, skipping cleanup!", sto)
    66			} else {
    67				if cleanup != nil {
    68					cleanup()
    69				}
    70			}
    71		}()
    72		r := &run{
    73			t:   t,
    74			opt: opt,
    75			sto: sto,
    76		}
    77		t.Logf("Testing blobserver storage %T", sto)
    78	
    79		t.Logf("Testing Enumerate for empty")
    80		r.testEnumerate(nil)
    81	
    82		t.Logf("Test stat of blob not existing")
    83		{
    84			b := &test.Blob{Contents: "not exist"}
    85			blobRefs := []blob.Ref{b.BlobRef()}
    86			testStat(t, sto, blobRefs, nil)
    87		}
    88	
    89		var blobs []*test.Blob
    90		var blobRefs []blob.Ref
    91		var blobSizedRefs []blob.SizedRef
    92	
    93		contents := []string{"foo", "quux", "asdf", "qwerty", "0123456789"}
    94		if !testing.Short() {
    95			for i := 0; i < 95; i++ {
    96				contents = append(contents, "foo-"+strconv.Itoa(i))
    97			}
    98		}
    99	
   100		t.Logf("Testing receive")
   101		for i, x := range contents {
   102			b1 := &test.Blob{Contents: x}
   103			if testing.Short() {
   104				t.Logf("blob[%d] = %s: %q", i, b1.BlobRef(), x)
   105			}
   106			b1s, err := sto.ReceiveBlob(context.Background(), b1.BlobRef(), b1.Reader())
   107			if err != nil {
   108				t.Fatalf("ReceiveBlob of %s: %v", b1, err)
   109			}
   110			if b1s != b1.SizedRef() {
   111				t.Fatalf("Received %v; want %v", b1s, b1.SizedRef())
   112			}
   113			blobs = append(blobs, b1)
   114			blobRefs = append(blobRefs, b1.BlobRef())
   115			blobSizedRefs = append(blobSizedRefs, b1.SizedRef())
   116	
   117			switch len(blobSizedRefs) {
   118			case 1, 5, 100:
   119				t.Logf("Testing Enumerate for %d blobs", len(blobSizedRefs))
   120				r.testEnumerate(blobSizedRefs)
   121			}
   122		}
   123	
   124		t.Logf("Testing Fetch")
   125		for i, b2 := range blobs {
   126			rc, size, err := sto.Fetch(context.Background(), b2.BlobRef())
   127			if err != nil {
   128				t.Fatalf("error fetching %d. %s: %v", i, b2, err)
   129			}
   130			testSizedBlob(t, rc, b2.BlobRef(), int64(size))
   131			rc.Close()
   132		}
   133	
   134		t.Logf("Testing Stat")
   135		testStat(t, sto, blobRefs, blobSizedRefs)
   136	
   137		// Enumerate tests.
   138		sort.Sort(blob.SizedByRef(blobSizedRefs))
   139	
   140		t.Logf("Testing Enumerate on all")
   141		r.testEnumerate(blobSizedRefs)
   142	
   143		t.Logf("Testing Enumerate 'limit' param")
   144		r.testEnumerate(blobSizedRefs[:3], 3)
   145	
   146		// Enumerate 'after'
   147		{
   148			after := blobSizedRefs[2].Ref.String()
   149			t.Logf("Testing Enumerate 'after' param; after %q", after)
   150			r.testEnumerate(blobSizedRefs[3:], after)
   151		}
   152	
   153		// Enumerate 'after' + limit
   154		{
   155			after := blobSizedRefs[2].Ref.String()
   156			t.Logf("Testing Enumerate 'after' + 'limit' param; after %q, limit 1", after)
   157			r.testEnumerate(blobSizedRefs[3:4], after, 1)
   158		}
   159	
   160		// Enumerate 'after' with prefix of a blobref + limit
   161		{
   162			after := "a"
   163			t.Logf("Testing Enumerate 'after' + 'limit' param; after %q, limit 1", after)
   164			r.testEnumerate(blobSizedRefs[:1], after, 1)
   165		}
   166	
   167		r.testRemove(blobRefs)
   168	
   169		r.testSubFetcher()
   170	}
   171	
   172	func (r *run) testRemove(blobRefs []blob.Ref) {
   173		ctx := context.Background()
   174		t, sto := r.t, r.sto
   175		t.Logf("Testing Remove")
   176		if err := sto.RemoveBlobs(ctx, blobRefs); err != nil {
   177			if strings.Contains(err.Error(), "not implemented") {
   178				t.Logf("RemoveBlobs: %v", err)
   179				return
   180			}
   181			t.Fatalf("RemoveBlobs: %v", err)
   182		}
   183		r.testEnumerate(nil) // verify they're all gone
   184		if len(blobRefs) > 0 {
   185			t.Logf("Testing double-delete")
   186			if err := sto.RemoveBlobs(ctx, []blob.Ref{blobRefs[0]}); err != nil {
   187				t.Fatalf("Double RemoveBlobs: %v", err)
   188			}
   189		}
   190	}
   191	
   192	func (r *run) testSubFetcher() {
   193		t, sto := r.t, r.sto
   194		sf, ok := sto.(blob.SubFetcher)
   195		if !ok {
   196			t.Logf("%T is not a SubFetcher", sto)
   197			return
   198		}
   199		t.Logf("Testing SubFetch")
   200		big := &test.Blob{Contents: "Some big blob"}
   201		if _, err := sto.ReceiveBlob(context.Background(), big.BlobRef(), big.Reader()); err != nil {
   202			t.Fatal(err)
   203		}
   204		regions := []struct {
   205			off, limit int64
   206			want       string
   207			errok      bool
   208		}{
   209			{5, 3, "big", false},
   210			{5, 8, "big blob", false},
   211			{5, 100, "big blob", true},
   212		}
   213		for _, tt := range regions {
   214			r, err := sf.SubFetch(context.Background(), big.BlobRef(), tt.off, tt.limit)
   215			if err == blob.ErrUnimplemented {
   216				t.Logf("%T implements SubFetcher but its wrapped value doesn't", sto)
   217				return
   218			}
   219			if err != nil {
   220				t.Fatalf("Error fetching big blob for SubFetch: %v", err)
   221			}
   222			if r == nil {
   223				t.Fatal("SubFetch returned nil, nil")
   224			}
   225			all, err := io.ReadAll(r)
   226			r.Close()
   227			if err != nil && !tt.errok {
   228				t.Errorf("Unexpected error reading SubFetch region %+v: %v", tt, err)
   229			}
   230			if string(all) != tt.want {
   231				t.Errorf("SubFetch region %+v got %q; want %q", tt, all, tt.want)
   232			}
   233		}
   234	
   235		// test invalid offsets
   236		invalids := []struct {
   237			off, limit int64
   238		}{
   239			{int64(len(big.Contents)) + 1, 1},
   240			{-1, 1},
   241			{1, -1},
   242		}
   243		for _, tt := range invalids {
   244			r, err := sf.SubFetch(context.Background(), big.BlobRef(), tt.off, tt.limit)
   245			if err == blob.ErrUnimplemented {
   246				t.Logf("%T implements SubFetcher but its wrapped value doesn't", sto)
   247				return
   248			}
   249			if err == nil {
   250				r.Close()
   251				t.Errorf("No error fetching with off=%d limit=%d; wanted an error", tt.off, tt.limit)
   252				continue
   253			}
   254			if err != blob.ErrNegativeSubFetch && err != blob.ErrOutOfRangeOffsetSubFetch {
   255				t.Errorf("Unexpected error fetching with off=%d limit=%d: %v", tt.off, tt.limit, err)
   256			}
   257		}
   258	}
   259	
   260	func testSizedBlob(t *testing.T, r io.Reader, b1 blob.Ref, size int64) {
   261		h := b1.Hash()
   262		n, err := io.Copy(h, r)
   263		if err != nil {
   264			t.Fatalf("error reading from %s: %v", r, err)
   265		}
   266		if n != size {
   267			t.Fatalf("read %d bytes from %s, metadata said %d!", n, r, size)
   268		}
   269		b2 := blob.RefFromHash(h)
   270		if b2 != b1 {
   271			t.Fatalf("content mismatch (awaited %s, got %s)", b1, b2)
   272		}
   273	}
   274	
   275	func CheckEnumerate(sto blobserver.Storage, wantUnsorted []blob.SizedRef, opts ...interface{}) error {
   276		var after string
   277		var n = 1000
   278		for _, opt := range opts {
   279			switch v := opt.(type) {
   280			case string:
   281				after = v
   282			case int:
   283				n = v
   284			default:
   285				panic("bad option of type " + fmt.Sprintf("%T", v))
   286			}
   287		}
   288	
   289		want := append([]blob.SizedRef(nil), wantUnsorted...)
   290		sort.Sort(blob.SizedByRef(want))
   291	
   292		sbc := make(chan blob.SizedRef, 10)
   293	
   294		var got []blob.SizedRef
   295		var grp syncutil.Group
   296		sawEnd := make(chan bool, 1)
   297		grp.Go(func() error {
   298			ctx, cancel := context.WithCancel(context.TODO())
   299			defer cancel()
   300			if err := sto.EnumerateBlobs(ctx, sbc, after, n); err != nil {
   301				return fmt.Errorf("EnumerateBlobs(%q, %d): %v", after, n, err)
   302			}
   303			return nil
   304		})
   305		grp.Go(func() error {
   306			var lastRef blob.Ref
   307			for sb := range sbc {
   308				if !sb.Valid() {
   309					return fmt.Errorf("invalid blobref %#v received in enumerate", sb)
   310				}
   311				got = append(got, sb)
   312				if lastRef.Valid() && sb.Ref.Less(lastRef) {
   313					return fmt.Errorf("blobs appearing out of order")
   314				}
   315				lastRef = sb.Ref
   316			}
   317			sawEnd <- true
   318			return nil
   319	
   320		})
   321		grp.Go(func() error {
   322			select {
   323			case <-sawEnd:
   324				return nil
   325			case <-time.After(10 * time.Second):
   326				return errors.New("timeout waiting for EnumerateBlobs to close its channel")
   327			}
   328	
   329		})
   330		if err := grp.Err(); err != nil {
   331			return fmt.Errorf("Enumerate error: %v", err)
   332		}
   333		if len(got) == 0 && len(want) == 0 {
   334			return nil
   335		}
   336		var gotSet = map[blob.SizedRef]bool{}
   337		for _, sb := range got {
   338			if gotSet[sb] {
   339				return fmt.Errorf("duplicate blob %v returned in enumerate", sb)
   340			}
   341			gotSet[sb] = true
   342		}
   343	
   344		if !reflect.DeepEqual(got, want) {
   345			return fmt.Errorf("enumerate mismatch. Got %d; want %d.\n Got: %v\nWant: %v\n",
   346				len(got), len(want), got, want)
   347		}
   348		return nil
   349	}
   350	
   351	func (r *run) testEnumerate(wantUnsorted []blob.SizedRef, opts ...interface{}) {
   352		if r.opt.SkipEnum {
   353			r.t.Log("Skipping enum test")
   354			return
   355		}
   356		if err := r.withRetries(func() error {
   357			return CheckEnumerate(r.sto, wantUnsorted, opts...)
   358		}); err != nil {
   359			r.t.Fatalf("%v", err)
   360		}
   361	}
   362	
   363	func (r *run) withRetries(fn func() error) error {
   364		delays := r.opt.Retries
   365		for {
   366			err := fn()
   367			if err == nil || len(delays) == 0 {
   368				return err
   369			}
   370			r.t.Logf("(operation failed; retrying after %v)", delays[0])
   371			time.Sleep(delays[0])
   372			delays = delays[1:]
   373		}
   374	}
   375	
   376	func testStat(t *testing.T, sto blobserver.BlobStatter, blobs []blob.Ref, want []blob.SizedRef) {
   377		// blobs may arrive in ANY order
   378		pos := make(map[blob.Ref]int) // wanted ref => its position in want
   379		need := make(map[blob.Ref]bool)
   380		for i, sb := range want {
   381			pos[sb.Ref] = i
   382			need[sb.Ref] = true
   383		}
   384	
   385		err := sto.StatBlobs(context.Background(), blobs, func(sb blob.SizedRef) error {
   386			if !sb.Valid() {
   387				t.Errorf("StatBlobs func called with invalid/zero blob.SizedRef")
   388				return nil
   389			}
   390			wantPos, ok := pos[sb.Ref]
   391			if !ok {
   392				t.Errorf("StatBlobs func called with unrequested ref %v (size %d)", sb.Ref, sb.Size)
   393				return nil
   394			}
   395			if !need[sb.Ref] {
   396				t.Errorf("StatBlobs func called with ref %v multiple times", sb.Ref)
   397				return nil
   398			}
   399			delete(need, sb.Ref)
   400			w := want[wantPos]
   401			if sb != w {
   402				t.Errorf("StatBlobs returned %v; want %v", sb, w)
   403			}
   404			return nil
   405		})
   406		for br := range need {
   407			t.Errorf("StatBlobs never returned results for %v", br)
   408		}
   409		if err != nil {
   410			t.Errorf("StatBlobs: %v", err)
   411		}
   412	}
   413	
   414	type StreamerTestOpt interface {
   415		verify(got []blob.SizedRef) error
   416	}
   417	
   418	// WantN is a wanted condition, that the caller wants N of the items.
   419	type WantN int
   420	
   421	func (want WantN) verify(got []blob.SizedRef) error {
   422		if int(want) != len(got) {
   423			return fmt.Errorf("got %d streamed blobs; want %d", len(got), int(want))
   424		}
   425		return nil
   426	}
   427	
   428	type WantSizedRefs []blob.SizedRef
   429	
   430	func (s WantSizedRefs) verify(got []blob.SizedRef) error {
   431		want := []blob.SizedRef(s)
   432		if !reflect.DeepEqual(got, want) {
   433			return fmt.Errorf("mismatch:\n got %d blobs: %q\nwant %d blobs: %q\n", len(got), got, len(want), want)
   434		}
   435		return nil
   436	}
   437	
   438	// TestStreamer tests that the BlobStreamer bs implements all of the
   439	// promised interface behavior and ultimately yields the provided
   440	// blobs.
   441	//
   442	// If bs also implements BlobEnumerator, the two are compared for
   443	// consistency.
   444	func TestStreamer(t *testing.T, bs blobserver.BlobStreamer, opts ...StreamerTestOpt) {
   445	
   446		var sawEnum map[blob.SizedRef]bool
   447		if enumer, ok := bs.(blobserver.BlobEnumerator); ok {
   448			sawEnum = make(map[blob.SizedRef]bool)
   449			// First do an enumerate over all blobs as a baseline. The Streamer should
   450			// yield the same blobs, even if it's in a different order.
   451			enumCtx, cancel := context.WithCancel(context.TODO())
   452			defer cancel()
   453			if err := blobserver.EnumerateAll(enumCtx, enumer, func(sb blob.SizedRef) error {
   454				sawEnum[sb] = true
   455				return nil
   456			}); err != nil {
   457				t.Fatalf("Enumerate: %v", err)
   458			}
   459		}
   460	
   461		// See if, without cancellation, it yields the right
   462		// result and without errors.
   463		ch := make(chan blobserver.BlobAndToken)
   464		errCh := make(chan error, 1)
   465		go func() {
   466			ctx, cancel := context.WithCancel(context.TODO())
   467			defer cancel()
   468			errCh <- bs.StreamBlobs(ctx, ch, "")
   469		}()
   470		var gotRefs []blob.SizedRef
   471		sawStreamed := map[blob.Ref]int{}
   472		for b := range ch {
   473			sawStreamed[b.Ref()]++
   474			sbr := b.SizedRef()
   475			if sawEnum != nil {
   476				if _, ok := sawEnum[sbr]; ok {
   477					delete(sawEnum, sbr)
   478				} else {
   479					t.Errorf("Streamer yielded blob not returned by Enumerate: %v", sbr)
   480				}
   481			}
   482			gotRefs = append(gotRefs, sbr)
   483		}
   484		if err := <-errCh; err != nil {
   485			t.Errorf("initial uninterrupted StreamBlobs error: %v", err)
   486		}
   487		for br, n := range sawStreamed {
   488			if n > 1 {
   489				t.Errorf("Streamed returned duplicate %v, %d times", br, n)
   490			}
   491		}
   492		nMissing := 0
   493		for sbr := range sawEnum {
   494			t.Errorf("Enumerate found %v but Streamer didn't return it", sbr)
   495			nMissing++
   496			if nMissing == 10 && len(sawEnum) > 10 {
   497				t.Errorf("... etc ...")
   498				break
   499			}
   500		}
   501		for _, opt := range opts {
   502			if err := opt.verify(gotRefs); err != nil {
   503				t.Errorf("error after first uninterrupted StreamBlobs pass: %v", err)
   504			}
   505		}
   506		if t.Failed() {
   507			return
   508		}
   509	
   510		// Next, the "complex pass": test a cancellation at each point,
   511		// to test that resume works properly.
   512		//
   513		// Basic strategy:
   514		// -- receive 1 blob, note the blobref, cancel.
   515		// -- start again with that blobref, receive 2, cancel. first should be same,
   516		//    second should be new. note its blobref.
   517		// Each iteration should yield 1 new unique blob and all but
   518		// the first and last will return 2 blobs.
   519		wantRefs := append([]blob.SizedRef(nil), gotRefs...) // copy
   520		sawStreamed = map[blob.Ref]int{}
   521		gotRefs = gotRefs[:0]
   522		contToken := ""
   523		for i := 0; i < len(wantRefs); i++ {
   524			ctx, cancel := context.WithCancel(context.TODO())
   525			defer cancel()
   526			ch := make(chan blobserver.BlobAndToken)
   527			errc := make(chan error, 1)
   528			go func() {
   529				errc <- bs.StreamBlobs(ctx, ch, contToken)
   530			}()
   531			nrecv := 0
   532			nextToken := ""
   533			for bt := range ch {
   534				nrecv++
   535				sbr := bt.Blob.SizedRef()
   536				isNew := len(gotRefs) == 0 || sbr != gotRefs[len(gotRefs)-1]
   537				if isNew {
   538					if sawStreamed[sbr.Ref] > 0 {
   539						t.Fatalf("In complex pass, returned duplicate blob %v\n\nSo far, before interrupting:\n%v\n\nWant:\n%v", sbr, gotRefs, wantRefs)
   540					}
   541					sawStreamed[sbr.Ref]++
   542					gotRefs = append(gotRefs, sbr)
   543					nextToken = bt.Token
   544					cancel()
   545					break
   546				} else if i == 0 {
   547					t.Fatalf("first iteration should receive a new value")
   548				} else if nrecv == 2 {
   549					t.Fatalf("at cut point %d of testStream, Streamer received 2 values, both not unique. Looping?", i)
   550				}
   551			}
   552			err := <-errc
   553			if err != nil && err != context.Canceled {
   554				t.Fatalf("StreamBlobs on iteration %d (token %q) returned error: %v", i, contToken, err)
   555			}
   556			if err == nil {
   557				break
   558			}
   559			contToken = nextToken
   560		}
   561		if !reflect.DeepEqual(gotRefs, wantRefs) {
   562			t.Errorf("Mismatch on complex pass (got %d, want %d):\n got %q\nwant %q\n", len(gotRefs), len(wantRefs), gotRefs, wantRefs)
   563			wantMap := map[blob.SizedRef]bool{}
   564			for _, sbr := range wantRefs {
   565				wantMap[sbr] = true
   566			}
   567			for _, sbr := range gotRefs {
   568				if _, ok := wantMap[sbr]; ok {
   569					delete(wantMap, sbr)
   570				} else {
   571					t.Errorf("got has unwanted: %v", sbr)
   572				}
   573			}
   574			missing := wantMap // found stuff has been deleted
   575			for sbr := range missing {
   576				t.Errorf("got is missing: %v", sbr)
   577			}
   578	
   579			t.FailNow()
   580		}
   581	}
Website layout inspired by memcached.
Content by the authors.