Home Download Docs Code Community
     1	/*
     2	Copyright 2016 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	// Package encrypt registers the "encrypt" blobserver storage type
    18	// which stores all blobs and metadata with age encryption into other
    19	// wrapped storage targets (e.g. localdisk, s3, remote, google).
    20	//
    21	// An encrypt storage target is configured with two other storage targets:
    22	// one to hold encrypted blobs, and one to hold encrypted metadata about
    23	// the encrypted blobs. On start-up, all the metadata blobs are read
    24	// to discover the plaintext blobrefs.
    25	//
    26	// Encryption is currently always age. See code for metadata
    27	// formats and configuration details, which are currently subject to change.
    28	//
    29	// The low-level config requires 'keyFile' to be set.
    30	//
    31	// Example low-level config:
    32	//
    33	//	"/storage-encrypted/": {
    34	//	    "handler": "storage-encrypt",
    35	//	    "handlerArgs": {
    36	//	        "I_AGREE": "that encryption support hasn't been peer-reviewed, isn't finished, and its format might change.",
    37	//	        "keyFile": "/path/to/keyfile",
    38	//	        "blobs": "/blobs-storage/",
    39	//	        "meta": "/meta-storage/",
    40	//	        "metaIndex": {
    41	//	            "file": "/path/to/index.leveldb",
    42	//	            "type": "leveldb"
    43	//	        },
    44	//	    }
    45	//	},
    46	package encrypt // import "perkeep.org/pkg/blobserver/encrypt"
    47	
    48	import (
    49		"bufio"
    50		"bytes"
    51		"context"
    52		"errors"
    53		"fmt"
    54		"io"
    55		"log"
    56		"os"
    57		"time"
    58	
    59		"filippo.io/age"
    60		"go4.org/jsonconfig"
    61		"go4.org/syncutil"
    62		"perkeep.org/internal/pools"
    63		"perkeep.org/pkg/blob"
    64		"perkeep.org/pkg/blobserver"
    65		"perkeep.org/pkg/sorted"
    66	)
    67	
    68	type storage struct {
    69		// index is the meta index, populated at startup from the blobs in storage.meta.
    70		// key: plaintext blob.Ref
    71		// value: <plaintext length>/<encrypted blob.Ref>
    72		index sorted.KeyValue
    73	
    74		// identity is used to encrypt and decrypt the blobs.
    75		identity *age.X25519Identity
    76	
    77		// blobs holds encrypted versions of all plaintext blobs.
    78		blobs blobserver.Storage
    79	
    80		// meta holds metadata mapping between the names of plaintext blobs and
    81		// their original size and after-encryption name. Each blob in meta contains
    82		// 1 or more blob descriptions. All new insertions generate both a new
    83		// encrypted blob in 'blobs' and one single-meta blob in
    84		// 'meta'. The small metadata blobs are occasionally rolled up
    85		// into bigger blobs with multiple blob descriptions.
    86		meta blobserver.Storage
    87	
    88		// smallMeta tracks a heap of meta blobs smaller than the target size.
    89		smallMeta *metaBlobHeap
    90	}
    91	
    92	// Format of encrypted blobs:
    93	// versionByte (0x02) || age_v1(plaintext)
    94	
    95	const version = 2
    96	
    97	// encryptBlob encrypts plaintext and appends the result to ciphertext,
    98	func (s *storage) encryptBlob(ciphertext, plaintext *bytes.Buffer) error {
    99		if err := ciphertext.WriteByte(version); err != nil {
   100			return fmt.Errorf("unable to write version byte: %w", err)
   101		}
   102		enc, err := age.Encrypt(ciphertext, s.identity.Recipient())
   103		if err != nil {
   104			return fmt.Errorf("unable to encrypt plaintext: %w", err)
   105		}
   106		if _, err := io.Copy(enc, plaintext); err != nil {
   107			return fmt.Errorf("unable to encrypt plaintext: %w", err)
   108		}
   109		if err := enc.Close(); err != nil {
   110			return fmt.Errorf("unable to encrypt plaintext: %w", err)
   111		}
   112		return nil
   113	}
   114	
   115	// decryptBlob decrypts ciphertext and appends the result to plaintext,
   116	func (s *storage) decryptBlob(plaintext, ciphertext *bytes.Buffer) error {
   117		if versionByte, err := ciphertext.ReadByte(); err != nil {
   118			return fmt.Errorf("unable to read version byte: %w", err)
   119		} else if versionByte != version {
   120			return fmt.Errorf("unknown encrypted blob version: %d", versionByte)
   121		}
   122	
   123		dec, err := age.Decrypt(ciphertext, s.identity)
   124		if err != nil {
   125			return fmt.Errorf("unable to decrypt ciphertext: %w", err)
   126		}
   127		if _, err := io.Copy(plaintext, dec); err != nil {
   128			return fmt.Errorf("unable to decrypt plaintext: %w", err)
   129		}
   130		return nil
   131	}
   132	
   133	func (s *storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
   134		return blobserver.ErrNotImplemented // TODO
   135	}
   136	
   137	var statGate = syncutil.NewGate(20) // arbitrary
   138	
   139	func (s *storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
   140		return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(br blob.Ref) (sb blob.SizedRef, err error) {
   141			plainSize, _, err := s.fetchMeta(ctx, br)
   142			switch err {
   143			case nil:
   144				return blob.SizedRef{Ref: br, Size: plainSize}, nil
   145			case os.ErrNotExist:
   146				return sb, nil
   147			default:
   148				return sb, err
   149			}
   150		})
   151	}
   152	
   153	func (s *storage) ReceiveBlob(ctx context.Context, plainBR blob.Ref, source io.Reader) (sb blob.SizedRef, err error) {
   154		// Aggressively check for duplicates since there's nothing else to ensure we don't store blobs twice
   155		if plainSize, _, err := s.fetchMeta(ctx, plainBR); err == nil {
   156			log.Println("encrypt: duplicated blob received", plainBR)
   157			return blob.SizedRef{Ref: plainBR, Size: uint32(plainSize)}, nil
   158		}
   159	
   160		plainBytes := pools.BytesBuffer()
   161		defer pools.PutBuffer(plainBytes)
   162	
   163		hash := plainBR.Hash()
   164		plainSize, err := io.Copy(io.MultiWriter(plainBytes, hash), source)
   165		if err != nil {
   166			return sb, err
   167		}
   168		if !plainBR.HashMatches(hash) {
   169			return sb, blobserver.ErrCorruptBlob
   170		}
   171	
   172		encBytes := pools.BytesBuffer()
   173		defer pools.PutBuffer(encBytes)
   174	
   175		if err := s.encryptBlob(encBytes, plainBytes); err != nil {
   176			return sb, fmt.Errorf("encrypt: error encrypting blob: %w", err)
   177		}
   178		encBR := blob.RefFromBytes(encBytes.Bytes())
   179	
   180		if _, err = blobserver.ReceiveNoHash(ctx, s.blobs, encBR, encBytes); err != nil {
   181			return sb, fmt.Errorf("encrypt: error writing encrypted blob %v (plaintext %v): %v", encBR, plainBR, err)
   182		}
   183	
   184		metaBytes, err := s.makeSingleMetaBlob(plainBR, encBR, uint32(plainSize))
   185		if err != nil {
   186			return sb, fmt.Errorf("encrypt: error making meta blob: %w", err)
   187		}
   188	
   189		metaBR := blob.RefFromBytes(metaBytes)
   190		metaSB, err := blobserver.ReceiveNoHash(ctx, s.meta, metaBR, bytes.NewReader(metaBytes))
   191		if err != nil {
   192			return sb, fmt.Errorf("encrypt: error writing encrypted meta for plaintext %v (encrypted blob %v): %v", plainBR, encBR, err)
   193		}
   194		s.recordMeta(&metaBlob{br: metaSB.Ref, plains: []blob.Ref{plainBR}})
   195	
   196		err = s.index.Set(plainBR.String(), packIndexEntry(uint32(plainSize), encBR))
   197		if err != nil {
   198			return sb, fmt.Errorf("encrypt: error updating index for encrypted %v (plaintext %v): %v", encBR, plainBR, err)
   199		}
   200	
   201		return blob.SizedRef{Ref: plainBR, Size: uint32(plainSize)}, nil
   202	}
   203	
   204	func (s *storage) Fetch(ctx context.Context, plainBR blob.Ref) (io.ReadCloser, uint32, error) {
   205		plainSize, encBR, err := s.fetchMeta(ctx, plainBR)
   206		if err != nil {
   207			return nil, 0, err
   208		}
   209		encData, _, err := s.blobs.Fetch(ctx, encBR)
   210		if err != nil {
   211			return nil, 0, fmt.Errorf("encrypt: error fetching plaintext %s's encrypted %v blob: %v", plainBR, encBR, err)
   212		}
   213		defer encData.Close()
   214	
   215		encBytes := pools.BytesBuffer()
   216		defer pools.PutBuffer(encBytes)
   217	
   218		encHash := encBR.Hash()
   219		_, err = io.Copy(io.MultiWriter(encBytes, encHash), encData)
   220		if err != nil {
   221			return nil, 0, err
   222		}
   223	
   224		// We have a signed statement in the meta blob that attests that the
   225		// ciphertext hash corresponds to the plaintext hash, so no need to check
   226		// the latter.  However, check the former to make sure the encrypted blob
   227		// was not swapped for another.
   228		if !encBR.HashMatches(encHash) {
   229			return nil, 0, blobserver.ErrCorruptBlob
   230		}
   231	
   232		// Using the pool here would be racy since the caller will read this asynchronously
   233		plainBytes := bytes.NewBuffer(nil)
   234		if err := s.decryptBlob(plainBytes, encBytes); err != nil {
   235			return nil, 0, fmt.Errorf("encrypt: encrypted blob %s failed validation: %s", encBR, err)
   236		}
   237	
   238		return io.NopCloser(plainBytes), plainSize, nil
   239	}
   240	
   241	func (s *storage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
   242		defer close(dest)
   243		iter := s.index.Find(after, "")
   244		n := 0
   245		for iter.Next() {
   246			if iter.Key() == after {
   247				continue
   248			}
   249			// Both ReceiveBlob and processEncryptedMetaBlob validate this
   250			br := blob.MustParse(iter.Key())
   251			plainSize, _, err := unpackIndexEntry(iter.Value())
   252			if err != nil {
   253				return fmt.Errorf("bogus encrypt index value %q: %s", iter.Value(), err)
   254			}
   255			select {
   256			case dest <- blob.SizedRef{Ref: br, Size: plainSize}:
   257			case <-ctx.Done():
   258				return ctx.Err()
   259			}
   260			n++
   261			if limit != 0 && n >= limit {
   262				break
   263			}
   264		}
   265		return iter.Close()
   266	}
   267	
   268	func init() {
   269		blobserver.RegisterStorageConstructor("encrypt", blobserver.StorageConstructor(newFromConfig))
   270	}
   271	
   272	func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (bs blobserver.Storage, err error) {
   273		sto := &storage{}
   274		agreement := config.RequiredString("I_AGREE")
   275		const wantAgreement = "that encryption support hasn't been peer-reviewed, isn't finished, and its format might change."
   276		if agreement != wantAgreement {
   277			return nil, errors.New("use of the 'encrypt' target without the proper I_AGREE value")
   278		}
   279	
   280		keyFile := config.RequiredString("keyFile")
   281		blobStorage := config.RequiredString("blobs")
   282		metaStorage := config.RequiredString("meta")
   283		metaConf := config.RequiredObject("metaIndex")
   284		if err := config.Validate(); err != nil {
   285			return nil, err
   286		}
   287	
   288		sto.index, err = sorted.NewKeyValueMaybeWipe(metaConf)
   289		if err != nil {
   290			return
   291		}
   292	
   293		sto.blobs, err = ld.GetStorage(blobStorage)
   294		if err != nil {
   295			return
   296		}
   297		sto.meta, err = ld.GetStorage(metaStorage)
   298		if err != nil {
   299			return
   300		}
   301	
   302		keyData, err := readKeyFile(keyFile)
   303		if err != nil {
   304			return nil, fmt.Errorf("error reading key file '%s': %w", keyFile, err)
   305		}
   306	
   307		identity, err := age.ParseX25519Identity(keyData)
   308		if err != nil {
   309			return nil, fmt.Errorf("error parsing x25519 identity: %w", err)
   310		}
   311		sto.identity = identity
   312	
   313		start := time.Now()
   314		log.Printf("Reading encryption metadata...")
   315		sto.smallMeta = &metaBlobHeap{}
   316		if err := sto.readAllMetaBlobs(); err != nil {
   317			return nil, fmt.Errorf("error scanning metadata on start-up: %v", err)
   318		}
   319		log.Printf("Read all encryption metadata in %.3f seconds", time.Since(start).Seconds())
   320	
   321		return sto, nil
   322	}
   323	
   324	func readKeyFile(keyFile string) (string, error) {
   325		if err := checkKeyFilePermissions(keyFile); err != nil {
   326			return "", fmt.Errorf("error checking key file permissions: %w", err)
   327		}
   328		f, err := os.Open(keyFile)
   329		if err != nil {
   330			return "", fmt.Errorf("error opening key file: %w", err)
   331		}
   332		defer f.Close()
   333	
   334		keyScanner := bufio.NewScanner(f)
   335		if !keyScanner.Scan() {
   336			return "", errors.New("empty key file")
   337		}
   338		keyData := keyScanner.Text()
   339	
   340		if keyScanner.Scan() {
   341			return "", errors.New("key file contained multiple lines")
   342		}
   343	
   344		return keyData, keyScanner.Err()
   345	}
Website layout inspired by memcached.
Content by the authors.