Home Download Docs Code Community
     1	/*
     2	Copyright 2016 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package encrypt
    18	
    19	import (
    20		"bytes"
    21		"container/heap"
    22		"context"
    23		"errors"
    24		"fmt"
    25		"io"
    26		"log"
    27		"os"
    28		"sort"
    29		"strconv"
    30		"strings"
    31		"sync"
    32	
    33		"perkeep.org/internal/pools"
    34		"perkeep.org/pkg/blob"
    35		"perkeep.org/pkg/blobserver"
    36		"perkeep.org/pkg/sorted"
    37	)
    38	
    39	// Encrypted meta format:
    40	//    #camlistore/encmeta=2
    41	// Then sorted lines, each ending in a newline, like:
    42	//    sha1-plain/<plain size>/sha1-encrypted
    43	
    44	const (
    45		// FullMetaBlobSize is the number of lines at which we stop compacting a meta blob.
    46		FullMetaBlobSize = 10 * 1000 // ~ 512kB
    47		// SmallMetaCountLimit is the number of small meta that triggers compaction.
    48		SmallMetaCountLimit = 100 // 100 rounds to make a full = ~ 26MB bw waste
    49	)
    50	
    51	type metaBlob struct {
    52		br     blob.Ref // of meta blob
    53		plains []blob.Ref
    54	}
    55	
    56	// metaBlobHeap is a heap of metaBlobs.
    57	// heap.Pop returns the metaBlob with the LEAST entries.
    58	type metaBlobHeap struct {
    59		sync.Mutex
    60		s []*metaBlob
    61	}
    62	
    63	var _ heap.Interface = (*metaBlobHeap)(nil)
    64	
    65	func (h *metaBlobHeap) Push(x interface{}) {
    66		h.s = append(h.s, x.(*metaBlob))
    67	}
    68	
    69	func (h *metaBlobHeap) Pop() interface{} {
    70		l := len(h.s)
    71		v := h.s[l-1]
    72		h.s = h.s[:l-1]
    73		return v
    74	}
    75	
    76	func (h *metaBlobHeap) Less(i, j int) bool {
    77		return len(h.s[i].plains) < len(h.s[j].plains)
    78	}
    79	
    80	func (h *metaBlobHeap) Len() int      { return len(h.s) }
    81	func (h *metaBlobHeap) Swap(i, j int) { h.s[i], h.s[j] = h.s[j], h.s[i] }
    82	
    83	func (s *storage) recordMeta(b *metaBlob) {
    84		if len(b.plains) > FullMetaBlobSize {
    85			return
    86		}
    87	
    88		s.smallMeta.Lock()
    89		defer s.smallMeta.Unlock()
    90		heap.Push(s.smallMeta, b)
    91	
    92		// If the heap is full, pop and group the entries under the lock,
    93		// then schedule upload, deletion and reinserion in parallel.
    94		if s.smallMeta.Len() > SmallMetaCountLimit {
    95			var plains, toDelete []blob.Ref
    96			for s.smallMeta.Len() > 0 {
    97				meta := heap.Pop(s.smallMeta).(*metaBlob)
    98				plains = append(plains, meta.plains...)
    99				toDelete = append(toDelete, meta.br)
   100				if len(plains) > FullMetaBlobSize {
   101					go s.makePackedMetaBlob(plains, toDelete)
   102					plains, toDelete = nil, nil
   103				}
   104			}
   105			if len(toDelete) == 1 {
   106				heap.Push(s.smallMeta, &metaBlob{br: toDelete[0], plains: plains})
   107			} else if len(toDelete) > 1 {
   108				go s.makePackedMetaBlob(plains, toDelete)
   109			}
   110		}
   111	}
   112	
   113	func (s *storage) makePackedMetaBlob(plains, toDelete []blob.Ref) {
   114		ctx := context.Background() // TODO
   115		// We lose track of the small blobs in case of error, but they will be packed at next start.
   116		sort.Sort(blob.ByRef(plains))
   117	
   118		metaBytes := pools.BytesBuffer()
   119		defer pools.PutBuffer(metaBytes)
   120	
   121		metaBytes.WriteString("#camlistore/encmeta=2\n")
   122		metaBytes.Grow(len(plains[0].String()) * len(plains) * 2)
   123		for _, plain := range plains {
   124			p := plain.String()
   125			metaBytes.WriteString(p)
   126			metaBytes.WriteString("/")
   127			v, err := s.index.Get(p)
   128			if err != nil {
   129				log.Printf("encrypt: failed to find the index entry for %s while packing: %v", p, err)
   130				return
   131			}
   132			metaBytes.WriteString(v)
   133			metaBytes.WriteString("\n")
   134		}
   135	
   136		encBytes := pools.BytesBuffer()
   137		defer pools.PutBuffer(encBytes)
   138	
   139		if err := s.encryptBlob(encBytes, metaBytes); err != nil {
   140			log.Printf("encrypt: failed to encrypt meta: %v", err)
   141			return
   142		}
   143	
   144		metaBR := blob.RefFromBytes(encBytes.Bytes())
   145		metaSB, err := blobserver.ReceiveNoHash(ctx, s.meta, metaBR, encBytes)
   146		if err != nil {
   147			log.Printf("encrypt: failed to upload a packed meta: %v", err)
   148			return
   149		}
   150		if len(plains) < FullMetaBlobSize {
   151			s.recordMeta(&metaBlob{br: metaSB.Ref, plains: plains})
   152		}
   153		if err := s.meta.RemoveBlobs(ctx, toDelete); err != nil {
   154			log.Printf("encrypt: failed to delete small meta blobs: %v", err)
   155		}
   156		log.Printf("encrypt: packed %d small meta blobs into one (%d refs)", len(toDelete), len(plains))
   157	}
   158	
   159	// makeSingleMetaBlob makes and encrypts a metaBlob with one entry.
   160	func (s *storage) makeSingleMetaBlob(plainBR, encBR blob.Ref, plainSize uint32) ([]byte, error) {
   161		// would be racy to use the pool and return Bytes() here, just use a fresh buffer
   162		encBytes := bytes.NewBuffer(nil)
   163	
   164		plain := fmt.Sprintf("#camlistore/encmeta=2\n%s/%d/%s\n", plainBR, plainSize, encBR)
   165		if err := s.encryptBlob(encBytes, bytes.NewBufferString(plain)); err != nil {
   166			return nil, err
   167		}
   168	
   169		return encBytes.Bytes(), nil
   170	}
   171	
   172	func packIndexEntry(plainSize uint32, encBR blob.Ref) string {
   173		return fmt.Sprintf("%d/%s", plainSize, encBR)
   174	}
   175	
   176	func unpackIndexEntry(s string) (plainSize uint32, encBR blob.Ref, err error) {
   177		parts := strings.Split(s, "/")
   178		if len(parts) != 2 {
   179			err = fmt.Errorf("malformed index entry %q", s)
   180			return
   181		}
   182		size, err := strconv.ParseUint(parts[0], 10, 32)
   183		if err != nil {
   184			err = fmt.Errorf("malformed index entry %q: %s", s, err)
   185			return
   186		}
   187		plainSize = uint32(size)
   188		encBR = blob.ParseOrZero(parts[1])
   189		if !encBR.Valid() {
   190			err = fmt.Errorf("malformed index entry %q: %s", s, err)
   191		}
   192		return
   193	}
   194	
   195	// fetchMeta returns os.ErrNotExist if the plaintext blob is not in the index.
   196	func (s *storage) fetchMeta(ctx context.Context, b blob.Ref) (plainSize uint32, encBR blob.Ref, err error) {
   197		v, err := s.index.Get(b.String())
   198		if err == sorted.ErrNotFound {
   199			err = os.ErrNotExist
   200		}
   201		if err != nil {
   202			return 0, blob.Ref{}, err
   203		}
   204		return unpackIndexEntry(v)
   205	}
   206	
   207	// processEncryptedMetaBlob decrypts dat (the data for the br meta blob) and parses
   208	// its meta lines, updating the index.
   209	//
   210	// processEncryptedMetaBlob is not thread-safe.
   211	func (s *storage) processEncryptedMetaBlob(br blob.Ref, dat []byte) error {
   212		plainBytes := pools.BytesBuffer()
   213		defer pools.PutBuffer(plainBytes)
   214	
   215		if err := s.decryptBlob(plainBytes, bytes.NewBuffer(dat)); err != nil {
   216			return err
   217		}
   218	
   219		header, err := plainBytes.ReadString('\n')
   220		if err != nil {
   221			return errors.New("No first line")
   222		}
   223		if header != "#camlistore/encmeta=2\n" {
   224			if len(header) > 80 {
   225				header = header[:80]
   226			}
   227			return fmt.Errorf("unsupported first line %q", header)
   228		}
   229		var plains []blob.Ref
   230		for {
   231			line, err := plainBytes.ReadString('\n')
   232			if err != nil && len(line) != 0 {
   233				return io.ErrUnexpectedEOF
   234			} else if err != nil {
   235				break
   236			}
   237			parts := strings.Split(strings.TrimRight(line, "\n"), "/")
   238			if len(parts) != 3 {
   239				if len(line) > 80 {
   240					line = line[:80]
   241				}
   242				return fmt.Errorf("malformed line %q", line)
   243			}
   244			// We do very limited checking here, as we signed the blob and we check
   245			// the value anyway on s.index.Get.
   246			plainBR, ok := blob.ParseKnown(parts[0])
   247			if !ok {
   248				if len(line) > 80 {
   249					line = line[:80]
   250				}
   251				return fmt.Errorf("malformed line %q", line)
   252			}
   253			plains = append(plains, plainBR)
   254			if err := s.index.Set(parts[0], parts[1]+"/"+parts[2]); err != nil {
   255				return err
   256			}
   257		}
   258		s.recordMeta(&metaBlob{br: br, plains: plains})
   259		return nil
   260	}
   261	
   262	func (s *storage) readAllMetaBlobs() error {
   263		type encMB struct {
   264			br  blob.Ref
   265			dat []byte // encrypted blob
   266			err error
   267		}
   268		metac := make(chan encMB, 16)
   269	
   270		const maxInFlight = 5 // arbitrary
   271		var gate = make(chan bool, maxInFlight)
   272	
   273		var stopEnumerate = make(chan bool) // closed on error
   274		enumErrc := make(chan error, 1)
   275		go func() {
   276			var wg sync.WaitGroup
   277			ctx := context.TODO()
   278			enumErrc <- blobserver.EnumerateAll(ctx, s.meta, func(sb blob.SizedRef) error {
   279				select {
   280				case <-stopEnumerate:
   281					return errors.New("enumeration stopped")
   282				default:
   283				}
   284	
   285				wg.Add(1)
   286				gate <- true
   287				go func() {
   288					defer wg.Done()
   289					defer func() { <-gate }()
   290					rc, _, err := s.meta.Fetch(ctx, sb.Ref)
   291					if err != nil {
   292						metac <- encMB{sb.Ref, nil, fmt.Errorf("fetch failed: %v", err)}
   293						return
   294					}
   295					defer rc.Close()
   296					all, err := io.ReadAll(rc)
   297					if err != nil {
   298						metac <- encMB{sb.Ref, nil, fmt.Errorf("read failed: %v", err)}
   299						return
   300					}
   301					metac <- encMB{sb.Ref, all, nil}
   302				}()
   303				return nil
   304			})
   305			wg.Wait()
   306			close(metac)
   307		}()
   308	
   309		for mi := range metac {
   310			err := mi.err
   311			if err == nil {
   312				err = s.processEncryptedMetaBlob(mi.br, mi.dat)
   313			}
   314			if err != nil {
   315				close(stopEnumerate)
   316				go func() {
   317					for range metac {
   318					}
   319				}()
   320				// TODO: advertise in this error message a new option or environment variable
   321				// to skip a certain or all meta blobs, to allow partial recovery, if some
   322				// are corrupt. For now, require all to be correct.
   323				return fmt.Errorf("error with meta blob %v: %v", mi.br, err)
   324			}
   325		}
   326	
   327		return <-enumErrc
   328	}
Website layout inspired by memcached.
Content by the authors.