Home Download Docs Code Community
     1	/*
     2	Copyright 2013 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	// Package storagetest tests blobserver.Storage implementations
    18	package storagetest // import "perkeep.org/pkg/blobserver/storagetest"
    19	
    20	import (
    21		"context"
    22		"errors"
    23		"fmt"
    24		"io"
    25		"reflect"
    26		"sort"
    27		"strconv"
    28		"strings"
    29		"testing"
    30		"time"
    31	
    32		"perkeep.org/pkg/blob"
    33		"perkeep.org/pkg/blobserver"
    34		"perkeep.org/pkg/test"
    35	
    36		"go4.org/syncutil"
    37	)
    38	
    39	type Opts struct {
    40		// New is required and must return the storage server to test.
    41		New func(*testing.T) blobserver.Storage
    42	
    43		// Retries specifies how long to wait to retry after each failure
    44		// that may be an eventual consistency issue (enumerate, stat), etc.
    45		Retries []time.Duration
    46	
    47		SkipEnum bool // for when EnumerateBlobs is not implemented
    48	}
    49	
    50	func Test(t *testing.T, fn func(*testing.T) blobserver.Storage) {
    51		TestOpt(t, Opts{New: fn})
    52	}
    53	
    54	type run struct {
    55		t   *testing.T
    56		opt Opts
    57		sto blobserver.Storage
    58	}
    59	
    60	func TestOpt(t *testing.T, opt Opts) {
    61		sto := opt.New(t)
    62		r := &run{
    63			t:   t,
    64			opt: opt,
    65			sto: sto,
    66		}
    67		t.Logf("Testing blobserver storage %T", sto)
    68	
    69		t.Logf("Testing Enumerate for empty")
    70		r.testEnumerate(nil)
    71	
    72		t.Logf("Test stat of blob not existing")
    73		{
    74			b := &test.Blob{Contents: "not exist"}
    75			blobRefs := []blob.Ref{b.BlobRef()}
    76			testStat(t, sto, blobRefs, nil)
    77		}
    78	
    79		var blobs []*test.Blob
    80		var blobRefs []blob.Ref
    81		var blobSizedRefs []blob.SizedRef
    82	
    83		contents := []string{"foo", "quux", "asdf", "qwerty", "0123456789"}
    84		if !testing.Short() {
    85			for i := 0; i < 95; i++ {
    86				contents = append(contents, "foo-"+strconv.Itoa(i))
    87			}
    88		}
    89	
    90		t.Logf("Testing receive")
    91		for i, x := range contents {
    92			b1 := &test.Blob{Contents: x}
    93			if testing.Short() {
    94				t.Logf("blob[%d] = %s: %q", i, b1.BlobRef(), x)
    95			}
    96			b1s, err := sto.ReceiveBlob(context.Background(), b1.BlobRef(), b1.Reader())
    97			if err != nil {
    98				t.Fatalf("ReceiveBlob of %s: %v", b1, err)
    99			}
   100			if b1s != b1.SizedRef() {
   101				t.Fatalf("Received %v; want %v", b1s, b1.SizedRef())
   102			}
   103			blobs = append(blobs, b1)
   104			blobRefs = append(blobRefs, b1.BlobRef())
   105			blobSizedRefs = append(blobSizedRefs, b1.SizedRef())
   106	
   107			switch len(blobSizedRefs) {
   108			case 1, 5, 100:
   109				t.Logf("Testing Enumerate for %d blobs", len(blobSizedRefs))
   110				r.testEnumerate(blobSizedRefs)
   111			}
   112		}
   113	
   114		t.Logf("Testing Fetch")
   115		for i, b2 := range blobs {
   116			rc, size, err := sto.Fetch(context.Background(), b2.BlobRef())
   117			if err != nil {
   118				t.Fatalf("error fetching %d. %s: %v", i, b2, err)
   119			}
   120			testSizedBlob(t, rc, b2.BlobRef(), int64(size))
   121			rc.Close()
   122		}
   123	
   124		t.Logf("Testing Stat")
   125		testStat(t, sto, blobRefs, blobSizedRefs)
   126	
   127		// Enumerate tests.
   128		sort.Sort(blob.SizedByRef(blobSizedRefs))
   129	
   130		t.Logf("Testing Enumerate on all")
   131		r.testEnumerate(blobSizedRefs)
   132	
   133		t.Logf("Testing Enumerate 'limit' param")
   134		r.testEnumerate(blobSizedRefs[:3], 3)
   135	
   136		// Enumerate 'after'
   137		{
   138			after := blobSizedRefs[2].Ref.String()
   139			t.Logf("Testing Enumerate 'after' param; after %q", after)
   140			r.testEnumerate(blobSizedRefs[3:], after)
   141		}
   142	
   143		// Enumerate 'after' + limit
   144		{
   145			after := blobSizedRefs[2].Ref.String()
   146			t.Logf("Testing Enumerate 'after' + 'limit' param; after %q, limit 1", after)
   147			r.testEnumerate(blobSizedRefs[3:4], after, 1)
   148		}
   149	
   150		// Enumerate 'after' with prefix of a blobref + limit
   151		{
   152			after := "a"
   153			t.Logf("Testing Enumerate 'after' + 'limit' param; after %q, limit 1", after)
   154			r.testEnumerate(blobSizedRefs[:1], after, 1)
   155		}
   156	
   157		r.testRemove(blobRefs)
   158	
   159		r.testSubFetcher()
   160	}
   161	
   162	func (r *run) testRemove(blobRefs []blob.Ref) {
   163		ctx := context.Background()
   164		t, sto := r.t, r.sto
   165		t.Logf("Testing Remove")
   166		if err := sto.RemoveBlobs(ctx, blobRefs); err != nil {
   167			if strings.Contains(err.Error(), "not implemented") {
   168				t.Logf("RemoveBlobs: %v", err)
   169				return
   170			}
   171			t.Fatalf("RemoveBlobs: %v", err)
   172		}
   173		r.testEnumerate(nil) // verify they're all gone
   174		if len(blobRefs) > 0 {
   175			t.Logf("Testing double-delete")
   176			if err := sto.RemoveBlobs(ctx, []blob.Ref{blobRefs[0]}); err != nil {
   177				t.Fatalf("Double RemoveBlobs: %v", err)
   178			}
   179		}
   180	}
   181	
   182	func (r *run) testSubFetcher() {
   183		t, sto := r.t, r.sto
   184		sf, ok := sto.(blob.SubFetcher)
   185		if !ok {
   186			t.Logf("%T is not a SubFetcher", sto)
   187			return
   188		}
   189		t.Logf("Testing SubFetch")
   190		big := &test.Blob{Contents: "Some big blob"}
   191		if _, err := sto.ReceiveBlob(context.Background(), big.BlobRef(), big.Reader()); err != nil {
   192			t.Fatal(err)
   193		}
   194		regions := []struct {
   195			off, limit int64
   196			want       string
   197			errok      bool
   198		}{
   199			{5, 3, "big", false},
   200			{5, 8, "big blob", false},
   201			{5, 100, "big blob", true},
   202		}
   203		for _, tt := range regions {
   204			r, err := sf.SubFetch(context.Background(), big.BlobRef(), tt.off, tt.limit)
   205			if errors.Is(err, blob.ErrUnimplemented) {
   206				t.Logf("%T implements SubFetcher but its wrapped value doesn't", sto)
   207				return
   208			}
   209			if err != nil {
   210				t.Fatalf("Error fetching big blob for SubFetch: %v", err)
   211			}
   212			if r == nil {
   213				t.Fatal("SubFetch returned nil, nil")
   214			}
   215			all, err := io.ReadAll(r)
   216			r.Close()
   217			if err != nil && !tt.errok {
   218				t.Errorf("Unexpected error reading SubFetch region %+v: %v", tt, err)
   219			}
   220			if string(all) != tt.want {
   221				t.Errorf("SubFetch region %+v got %q; want %q", tt, all, tt.want)
   222			}
   223		}
   224	
   225		// test invalid offsets
   226		invalids := []struct {
   227			off, limit int64
   228		}{
   229			{int64(len(big.Contents)) + 1, 1},
   230			{-1, 1},
   231			{1, -1},
   232		}
   233		for _, tt := range invalids {
   234			r, err := sf.SubFetch(context.Background(), big.BlobRef(), tt.off, tt.limit)
   235			if errors.Is(err, blob.ErrUnimplemented) {
   236				t.Logf("%T implements SubFetcher but its wrapped value doesn't", sto)
   237				return
   238			}
   239			if err == nil {
   240				r.Close()
   241				t.Errorf("No error fetching with off=%d limit=%d; wanted an error", tt.off, tt.limit)
   242				continue
   243			}
   244			if !errors.Is(err, blob.ErrNegativeSubFetch) && !errors.Is(err, blob.ErrOutOfRangeOffsetSubFetch) {
   245				t.Errorf("Unexpected error fetching with off=%d limit=%d: %v", tt.off, tt.limit, err)
   246			}
   247		}
   248	}
   249	
   250	func testSizedBlob(t *testing.T, r io.Reader, b1 blob.Ref, size int64) {
   251		h := b1.Hash()
   252		n, err := io.Copy(h, r)
   253		if err != nil {
   254			t.Fatalf("error reading from %s: %v", r, err)
   255		}
   256		if n != size {
   257			t.Fatalf("read %d bytes from %s, metadata said %d!", n, r, size)
   258		}
   259		b2 := blob.RefFromHash(h)
   260		if b2 != b1 {
   261			t.Fatalf("content mismatch (awaited %s, got %s)", b1, b2)
   262		}
   263	}
   264	
   265	func CheckEnumerate(sto blobserver.Storage, wantUnsorted []blob.SizedRef, opts ...interface{}) error {
   266		var after string
   267		var n = 1000
   268		for _, opt := range opts {
   269			switch v := opt.(type) {
   270			case string:
   271				after = v
   272			case int:
   273				n = v
   274			default:
   275				panic("bad option of type " + fmt.Sprintf("%T", v))
   276			}
   277		}
   278	
   279		want := append([]blob.SizedRef(nil), wantUnsorted...)
   280		sort.Sort(blob.SizedByRef(want))
   281	
   282		sbc := make(chan blob.SizedRef, 10)
   283	
   284		var got []blob.SizedRef
   285		var grp syncutil.Group
   286		sawEnd := make(chan bool, 1)
   287		grp.Go(func() error {
   288			ctx, cancel := context.WithCancel(context.TODO())
   289			defer cancel()
   290			if err := sto.EnumerateBlobs(ctx, sbc, after, n); err != nil {
   291				return fmt.Errorf("EnumerateBlobs(%q, %d): %w", after, n, err)
   292			}
   293			return nil
   294		})
   295		grp.Go(func() error {
   296			var lastRef blob.Ref
   297			for sb := range sbc {
   298				if !sb.Valid() {
   299					return fmt.Errorf("invalid blobref %#v received in enumerate", sb)
   300				}
   301				got = append(got, sb)
   302				if lastRef.Valid() && sb.Ref.Less(lastRef) {
   303					return fmt.Errorf("blobs appearing out of order")
   304				}
   305				lastRef = sb.Ref
   306			}
   307			sawEnd <- true
   308			return nil
   309	
   310		})
   311		grp.Go(func() error {
   312			select {
   313			case <-sawEnd:
   314				return nil
   315			case <-time.After(10 * time.Second):
   316				return errors.New("timeout waiting for EnumerateBlobs to close its channel")
   317			}
   318	
   319		})
   320		if err := grp.Err(); err != nil {
   321			return fmt.Errorf("Enumerate error: %w", err)
   322		}
   323		if len(got) == 0 && len(want) == 0 {
   324			return nil
   325		}
   326		var gotSet = map[blob.SizedRef]bool{}
   327		for _, sb := range got {
   328			if gotSet[sb] {
   329				return fmt.Errorf("duplicate blob %v returned in enumerate", sb)
   330			}
   331			gotSet[sb] = true
   332		}
   333	
   334		if !reflect.DeepEqual(got, want) {
   335			return fmt.Errorf("enumerate mismatch. Got %d; want %d.\n Got: %v\nWant: %v\n",
   336				len(got), len(want), got, want)
   337		}
   338		return nil
   339	}
   340	
   341	func (r *run) testEnumerate(wantUnsorted []blob.SizedRef, opts ...interface{}) {
   342		if r.opt.SkipEnum {
   343			r.t.Log("Skipping enum test")
   344			return
   345		}
   346		if err := r.withRetries(func() error {
   347			return CheckEnumerate(r.sto, wantUnsorted, opts...)
   348		}); err != nil {
   349			r.t.Fatalf("%v", err)
   350		}
   351	}
   352	
   353	func (r *run) withRetries(fn func() error) error {
   354		delays := r.opt.Retries
   355		for {
   356			err := fn()
   357			if err == nil || len(delays) == 0 {
   358				return err
   359			}
   360			r.t.Logf("(operation failed; retrying after %v)", delays[0])
   361			time.Sleep(delays[0])
   362			delays = delays[1:]
   363		}
   364	}
   365	
   366	func testStat(t *testing.T, sto blobserver.BlobStatter, blobs []blob.Ref, want []blob.SizedRef) {
   367		// blobs may arrive in ANY order
   368		pos := make(map[blob.Ref]int) // wanted ref => its position in want
   369		need := make(map[blob.Ref]bool)
   370		for i, sb := range want {
   371			pos[sb.Ref] = i
   372			need[sb.Ref] = true
   373		}
   374	
   375		err := sto.StatBlobs(context.Background(), blobs, func(sb blob.SizedRef) error {
   376			if !sb.Valid() {
   377				t.Errorf("StatBlobs func called with invalid/zero blob.SizedRef")
   378				return nil
   379			}
   380			wantPos, ok := pos[sb.Ref]
   381			if !ok {
   382				t.Errorf("StatBlobs func called with unrequested ref %v (size %d)", sb.Ref, sb.Size)
   383				return nil
   384			}
   385			if !need[sb.Ref] {
   386				t.Errorf("StatBlobs func called with ref %v multiple times", sb.Ref)
   387				return nil
   388			}
   389			delete(need, sb.Ref)
   390			w := want[wantPos]
   391			if sb != w {
   392				t.Errorf("StatBlobs returned %v; want %v", sb, w)
   393			}
   394			return nil
   395		})
   396		for br := range need {
   397			t.Errorf("StatBlobs never returned results for %v", br)
   398		}
   399		if err != nil {
   400			t.Errorf("StatBlobs: %v", err)
   401		}
   402	}
   403	
   404	type StreamerTestOpt interface {
   405		verify(got []blob.SizedRef) error
   406	}
   407	
   408	// WantN is a wanted condition, that the caller wants N of the items.
   409	type WantN int
   410	
   411	func (want WantN) verify(got []blob.SizedRef) error {
   412		if int(want) != len(got) {
   413			return fmt.Errorf("got %d streamed blobs; want %d", len(got), int(want))
   414		}
   415		return nil
   416	}
   417	
   418	type WantSizedRefs []blob.SizedRef
   419	
   420	func (s WantSizedRefs) verify(got []blob.SizedRef) error {
   421		want := []blob.SizedRef(s)
   422		if !reflect.DeepEqual(got, want) {
   423			return fmt.Errorf("mismatch:\n got %d blobs: %q\nwant %d blobs: %q\n", len(got), got, len(want), want)
   424		}
   425		return nil
   426	}
   427	
   428	// TestStreamer tests that the BlobStreamer bs implements all of the
   429	// promised interface behavior and ultimately yields the provided
   430	// blobs.
   431	//
   432	// If bs also implements BlobEnumerator, the two are compared for
   433	// consistency.
   434	func TestStreamer(t *testing.T, bs blobserver.BlobStreamer, opts ...StreamerTestOpt) {
   435	
   436		var sawEnum map[blob.SizedRef]bool
   437		if enumer, ok := bs.(blobserver.BlobEnumerator); ok {
   438			sawEnum = make(map[blob.SizedRef]bool)
   439			// First do an enumerate over all blobs as a baseline. The Streamer should
   440			// yield the same blobs, even if it's in a different order.
   441			enumCtx, cancel := context.WithCancel(context.TODO())
   442			defer cancel()
   443			if err := blobserver.EnumerateAll(enumCtx, enumer, func(sb blob.SizedRef) error {
   444				sawEnum[sb] = true
   445				return nil
   446			}); err != nil {
   447				t.Fatalf("Enumerate: %v", err)
   448			}
   449		}
   450	
   451		// See if, without cancellation, it yields the right
   452		// result and without errors.
   453		ch := make(chan blobserver.BlobAndToken)
   454		errCh := make(chan error, 1)
   455		go func() {
   456			ctx, cancel := context.WithCancel(context.TODO())
   457			defer cancel()
   458			errCh <- bs.StreamBlobs(ctx, ch, "")
   459		}()
   460		var gotRefs []blob.SizedRef
   461		sawStreamed := map[blob.Ref]int{}
   462		for b := range ch {
   463			sawStreamed[b.Ref()]++
   464			sbr := b.SizedRef()
   465			if sawEnum != nil {
   466				if _, ok := sawEnum[sbr]; ok {
   467					delete(sawEnum, sbr)
   468				} else {
   469					t.Errorf("Streamer yielded blob not returned by Enumerate: %v", sbr)
   470				}
   471			}
   472			gotRefs = append(gotRefs, sbr)
   473		}
   474		if err := <-errCh; err != nil {
   475			t.Errorf("initial uninterrupted StreamBlobs error: %v", err)
   476		}
   477		for br, n := range sawStreamed {
   478			if n > 1 {
   479				t.Errorf("Streamed returned duplicate %v, %d times", br, n)
   480			}
   481		}
   482		nMissing := 0
   483		for sbr := range sawEnum {
   484			t.Errorf("Enumerate found %v but Streamer didn't return it", sbr)
   485			nMissing++
   486			if nMissing == 10 && len(sawEnum) > 10 {
   487				t.Errorf("... etc ...")
   488				break
   489			}
   490		}
   491		for _, opt := range opts {
   492			if err := opt.verify(gotRefs); err != nil {
   493				t.Errorf("error after first uninterrupted StreamBlobs pass: %v", err)
   494			}
   495		}
   496		if t.Failed() {
   497			return
   498		}
   499	
   500		// Next, the "complex pass": test a cancellation at each point,
   501		// to test that resume works properly.
   502		//
   503		// Basic strategy:
   504		// -- receive 1 blob, note the blobref, cancel.
   505		// -- start again with that blobref, receive 2, cancel. first should be same,
   506		//    second should be new. note its blobref.
   507		// Each iteration should yield 1 new unique blob and all but
   508		// the first and last will return 2 blobs.
   509		wantRefs := append([]blob.SizedRef(nil), gotRefs...) // copy
   510		sawStreamed = map[blob.Ref]int{}
   511		gotRefs = gotRefs[:0]
   512		contToken := ""
   513		for i := 0; i < len(wantRefs); i++ {
   514			ctx, cancel := context.WithCancel(context.TODO())
   515			defer cancel()
   516			ch := make(chan blobserver.BlobAndToken)
   517			errc := make(chan error, 1)
   518			go func() {
   519				errc <- bs.StreamBlobs(ctx, ch, contToken)
   520			}()
   521			nrecv := 0
   522			nextToken := ""
   523			for bt := range ch {
   524				nrecv++
   525				sbr := bt.Blob.SizedRef()
   526				isNew := len(gotRefs) == 0 || sbr != gotRefs[len(gotRefs)-1]
   527				if isNew {
   528					if sawStreamed[sbr.Ref] > 0 {
   529						t.Fatalf("In complex pass, returned duplicate blob %v\n\nSo far, before interrupting:\n%v\n\nWant:\n%v", sbr, gotRefs, wantRefs)
   530					}
   531					sawStreamed[sbr.Ref]++
   532					gotRefs = append(gotRefs, sbr)
   533					nextToken = bt.Token
   534					cancel()
   535					break
   536				} else if i == 0 {
   537					t.Fatalf("first iteration should receive a new value")
   538				} else if nrecv == 2 {
   539					t.Fatalf("at cut point %d of testStream, Streamer received 2 values, both not unique. Looping?", i)
   540				}
   541			}
   542			err := <-errc
   543			if err != nil && err != context.Canceled {
   544				t.Fatalf("StreamBlobs on iteration %d (token %q) returned error: %v", i, contToken, err)
   545			}
   546			if err == nil {
   547				break
   548			}
   549			contToken = nextToken
   550		}
   551		if !reflect.DeepEqual(gotRefs, wantRefs) {
   552			t.Errorf("Mismatch on complex pass (got %d, want %d):\n got %q\nwant %q\n", len(gotRefs), len(wantRefs), gotRefs, wantRefs)
   553			wantMap := map[blob.SizedRef]bool{}
   554			for _, sbr := range wantRefs {
   555				wantMap[sbr] = true
   556			}
   557			for _, sbr := range gotRefs {
   558				if _, ok := wantMap[sbr]; ok {
   559					delete(wantMap, sbr)
   560				} else {
   561					t.Errorf("got has unwanted: %v", sbr)
   562				}
   563			}
   564			missing := wantMap // found stuff has been deleted
   565			for sbr := range missing {
   566				t.Errorf("got is missing: %v", sbr)
   567			}
   568	
   569			t.FailNow()
   570		}
   571	}
Website layout inspired by memcached.
Content by the authors.