Home Download Docs Code Community
     1	/*
     2	Copyright 2018 The Perkeep Authors.
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package server
    18	
    19	import (
    20		"bytes"
    21		"context"
    22		"errors"
    23		"fmt"
    24		"io"
    25		"log"
    26		"net/http"
    27		"net/url"
    28		"strconv"
    29		"sync"
    30	
    31		"perkeep.org/internal/httputil"
    32	
    33		"perkeep.org/pkg/blob"
    34		"perkeep.org/pkg/blobserver"
    35		"perkeep.org/pkg/client"
    36		"perkeep.org/pkg/index"
    37		"perkeep.org/pkg/schema"
    38		"perkeep.org/pkg/types/camtypes"
    39	
    40		"go4.org/types"
    41	)
    42	
    43	// numWorkers is the number of "long-lived" goroutines that run during the
    44	// importing process. In addition, a goroutine is created for each directory
    45	// visited (but terminates quickly).
    46	const numWorkers = 10
    47	
    48	// shareImporter imports shared blobs from src into dest.
    49	// a shareImporter is not meant to be ephemeral, but it only handles one
    50	// importing at a time.
    51	type shareImporter struct {
    52		shareURL string
    53		src      *client.Client
    54		dest     blobserver.Storage
    55	
    56		mu        sync.RWMutex
    57		seen      int      // files seen. for statistics, as web UI feedback.
    58		copied    int      // files actually copied. for statistics, as web UI feedback.
    59		running   bool     // whether an importing is currently going on.
    60		assembled bool     // whether the shared blob is of an assembled file.
    61		br        blob.Ref // the resulting imported file or directory schema.
    62		err       error    // any error encountered by one of the importing workers.
    63	
    64		wg    sync.WaitGroup // to wait for all the work assigners, spawned for each directory, to finish.
    65		workc chan work      // workers read from it in order to do the importing work.
    66	}
    67	
    68	// work is the unit of work to be done, sent on workc to one of the workers
    69	type work struct {
    70		br   blob.Ref     // blob to import
    71		errc chan<- error // chan on which the worker sends back any error (or nil)
    72	}
    73	
    74	// imprt fetches and copies br and recurses on the contents or children of br.
    75	func (si *shareImporter) imprt(ctx context.Context, br blob.Ref) error {
    76		// A little less than the sniffer will take, so we don't truncate.
    77		const sniffSize = 900 * 1024
    78		select {
    79		case <-ctx.Done():
    80			return ctx.Err()
    81		default:
    82		}
    83		src := si.src
    84		dest := si.dest
    85		rc, _, err := src.Fetch(ctx, br)
    86		if err != nil {
    87			return err
    88		}
    89		rcc := types.NewOnceCloser(rc)
    90		defer rcc.Close()
    91	
    92		sniffer := index.NewBlobSniffer(br)
    93		_, err = io.CopyN(sniffer, rc, sniffSize)
    94		if err != nil && err != io.EOF {
    95			return err
    96		}
    97		sniffer.Parse()
    98		b, ok := sniffer.SchemaBlob()
    99		if !ok {
   100			return fmt.Errorf("%q: not a Perkeep schema.", br)
   101		}
   102		body, err := sniffer.Body()
   103		if err != nil {
   104			return err
   105		}
   106		rc = io.NopCloser(io.MultiReader(bytes.NewReader(body), rc))
   107	
   108		switch b.Type() {
   109		case "directory":
   110			if _, err := blobserver.Receive(ctx, dest, br, rc); err != nil {
   111				return err
   112			}
   113			ssbr, ok := b.DirectoryEntries()
   114			if !ok {
   115				return fmt.Errorf("%q not actually a directory", br)
   116			}
   117			rcc.Close()
   118			return si.imprt(ctx, ssbr)
   119		case "static-set":
   120			if _, err := blobserver.Receive(ctx, dest, br, rc); err != nil {
   121				return err
   122			}
   123			rcc.Close()
   124			si.wg.Add(1)
   125			// asynchronous work assignment through w.workc
   126			go func() {
   127				defer si.wg.Done()
   128				si.mu.RLock()
   129				// do not pile in more work to do if there's already been an error
   130				if si.err != nil {
   131					si.mu.RUnlock()
   132					return
   133				}
   134				si.mu.RUnlock()
   135				var errcs []<-chan error
   136				for _, mref := range b.StaticSetMembers() {
   137					errc := make(chan error, 1)
   138					errcs = append(errcs, errc)
   139					si.workc <- work{mref, errc}
   140				}
   141				for _, errc := range errcs {
   142					if err := <-errc; err != nil {
   143						si.mu.Lock()
   144						si.err = err
   145						si.mu.Unlock()
   146					}
   147				}
   148			}()
   149			return nil
   150		case "file":
   151			rcc.Close()
   152			fr, err := schema.NewFileReader(ctx, src, br)
   153			if err != nil {
   154				return fmt.Errorf("NewFileReader: %v", err)
   155			}
   156			defer fr.Close()
   157			si.mu.Lock()
   158			si.seen++
   159			si.mu.Unlock()
   160			if _, err := schema.WriteFileMap(ctx, dest, b.Builder(), fr); err != nil {
   161				return err
   162			}
   163			si.mu.Lock()
   164			si.copied++
   165			si.mu.Unlock()
   166			return nil
   167		// TODO(mpl): other camliTypes, at least symlink.
   168		default:
   169			return errors.New("unknown blob type: " + string(b.Type()))
   170		}
   171	}
   172	
   173	// importAll imports all the shared contents transitively reachable under
   174	// si.shareURL.
   175	func (si *shareImporter) importAll(ctx context.Context) error {
   176		src, shared, err := client.NewFromShareRoot(ctx, si.shareURL, client.OptionNoExternalConfig())
   177		if err != nil {
   178			return err
   179		}
   180		si.src = src
   181		si.br = shared
   182		si.workc = make(chan work, 2*numWorkers)
   183		defer close(si.workc)
   184	
   185		// fan out over a pool of numWorkers workers overall
   186		for i := 0; i < numWorkers; i++ {
   187			go func() {
   188				for wi := range si.workc {
   189					wi.errc <- si.imprt(ctx, wi.br)
   190				}
   191			}()
   192		}
   193		// work assignment is done asynchronously, so imprt returns before all the work is finished.
   194		err = si.imprt(ctx, shared)
   195		si.wg.Wait()
   196		if err == nil {
   197			si.mu.RLock()
   198			err = si.err
   199			si.mu.RUnlock()
   200		}
   201		log.Print("share importer: all workers done")
   202		if err != nil {
   203			return err
   204		}
   205		return nil
   206	}
   207	
   208	// importAssembled imports the assembled file shared at si.shareURL.
   209	func (si *shareImporter) importAssembled(ctx context.Context) {
   210		res, err := http.Get(si.shareURL)
   211		if err != nil {
   212			return
   213		}
   214		defer res.Body.Close()
   215		br, err := schema.WriteFileFromReader(ctx, si.dest, "", res.Body)
   216		if err != nil {
   217			return
   218		}
   219		si.mu.Lock()
   220		si.br = br
   221		si.mu.Unlock()
   222		return
   223	}
   224	
   225	// isAssembled reports whether si.shareURL is of a shared assembled file.
   226	func (si *shareImporter) isAssembled() (bool, error) {
   227		u, err := url.Parse(si.shareURL)
   228		if err != nil {
   229			return false, err
   230		}
   231		isAs, _ := strconv.ParseBool(u.Query().Get("assemble"))
   232		return isAs, nil
   233	}
   234	
   235	// ServeHTTP answers the following queries:
   236	//
   237	// POST:
   238	//
   239	//	?shareurl=https://yourfriendserver/share/sha224-shareclaim
   240	//
   241	// Imports all the contents transitively reachable under that shared URL.
   242	//
   243	// GET:
   244	// Serves as JSON the state of the currently running import process, encoded
   245	// from a camtypes.ShareImportProgress.
   246	//
   247	// If an import is already running, POST requests are served a 503.
   248	func (si *shareImporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
   249		if si.dest == nil {
   250			http.Error(w, "shareImporter without a dest", 500)
   251			return
   252		}
   253		if r.Method == "GET" {
   254			si.serveProgress(w, r)
   255			return
   256		}
   257		if r.Method != "POST" {
   258			http.Error(w, "Not a POST", http.StatusBadRequest)
   259			return
   260		}
   261		si.mu.Lock()
   262		if si.running {
   263			http.Error(w, "an import is already in progress", http.StatusServiceUnavailable)
   264			si.mu.Unlock()
   265			return
   266		}
   267		si.running = true
   268		si.seen = 0
   269		si.copied = 0
   270		si.assembled = false
   271		si.br = blob.Ref{}
   272		si.mu.Unlock()
   273	
   274		shareURL := r.FormValue("shareurl")
   275		if shareURL == "" {
   276			http.Error(w, "No shareurl parameter", http.StatusBadRequest)
   277			return
   278		}
   279		si.shareURL = shareURL
   280		isAs, err := si.isAssembled()
   281		if err != nil {
   282			http.Error(w, "Could not parse shareurl", http.StatusInternalServerError)
   283			return
   284		}
   285	
   286		go func() {
   287			defer func() {
   288				si.mu.Lock()
   289				si.running = false
   290				si.mu.Unlock()
   291			}()
   292			if isAs {
   293				si.mu.Lock()
   294				si.assembled = true
   295				si.mu.Unlock()
   296				si.importAssembled(context.Background())
   297				return
   298			}
   299			si.importAll(context.Background())
   300		}()
   301		w.WriteHeader(200)
   302	}
   303	
   304	// serveProgress serves the state of the currently running importing process
   305	func (si *shareImporter) serveProgress(w http.ResponseWriter, r *http.Request) {
   306		si.mu.RLock()
   307		defer si.mu.RUnlock()
   308		httputil.ReturnJSON(w, camtypes.ShareImportProgress{
   309			FilesSeen:   si.seen,
   310			FilesCopied: si.copied,
   311			Running:     si.running,
   312			Assembled:   si.assembled,
   313			BlobRef:     si.br,
   314		})
   315	}
Website layout inspired by memcached.
Content by the authors.