Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	package files
    18	
    19	import (
    20		"context"
    21		"fmt"
    22		"io"
    23		"log"
    24	
    25		"perkeep.org/pkg/blob"
    26	)
    27	
    28	func (ds *Storage) startGate() {
    29		if ds.tmpFileGate == nil {
    30			return
    31		}
    32		ds.tmpFileGate.Start()
    33	}
    34	
    35	func (ds *Storage) doneGate() {
    36		if ds.tmpFileGate == nil {
    37			return
    38		}
    39		ds.tmpFileGate.Done()
    40	}
    41	
    42	func (ds *Storage) ReceiveBlob(ctx context.Context, blobRef blob.Ref, source io.Reader) (blob.SizedRef, error) {
    43		ds.dirLockMu.RLock()
    44		defer ds.dirLockMu.RUnlock()
    45	
    46		hashedDirectory := ds.blobDirectory(blobRef)
    47		err := ds.fs.MkdirAll(hashedDirectory, 0700)
    48		if err != nil {
    49			return blob.SizedRef{}, err
    50		}
    51	
    52		// TODO(mpl): warn when we hit the gate, and at a limited rate, like maximum once a minute.
    53		// Deferring to another CL, since it requires modifications to syncutil.Gate first.
    54		ds.startGate()
    55		tempFile, err := ds.fs.TempFile(hashedDirectory, blobFileBaseName(blobRef)+".tmp")
    56		if err != nil {
    57			ds.doneGate()
    58			return blob.SizedRef{}, err
    59		}
    60	
    61		success := false // set true later
    62		defer func() {
    63			if !success {
    64				log.Println("Removing temp file: ", tempFile.Name())
    65				ds.fs.Remove(tempFile.Name())
    66			}
    67			ds.doneGate()
    68		}()
    69	
    70		written, err := io.Copy(tempFile, source)
    71		if err != nil {
    72			return blob.SizedRef{}, err
    73		}
    74		if err = tempFile.Sync(); err != nil {
    75			return blob.SizedRef{}, err
    76		}
    77		if err = tempFile.Close(); err != nil {
    78			return blob.SizedRef{}, err
    79		}
    80		stat, err := ds.fs.Lstat(tempFile.Name())
    81		if err != nil {
    82			return blob.SizedRef{}, err
    83		}
    84		if stat.Size() != written {
    85			return blob.SizedRef{}, fmt.Errorf("temp file %q size %d didn't match written size %d", tempFile.Name(), stat.Size(), written)
    86		}
    87	
    88		fileName := ds.blobPath(blobRef)
    89		if err := ds.fs.Rename(tempFile.Name(), fileName); err != nil {
    90			return blob.SizedRef{}, err
    91		}
    92	
    93		stat, err = ds.fs.Lstat(fileName)
    94		if err != nil {
    95			return blob.SizedRef{}, err
    96		}
    97		if stat.Size() != written {
    98			return blob.SizedRef{}, fmt.Errorf("files: wrote %d bytes but stat after said %d bytes", written, stat.Size())
    99		}
   100	
   101		success = true // used in defer above
   102		return blob.SizedRef{Ref: blobRef, Size: uint32(stat.Size())}, nil
   103	}
Website layout inspired by memcached.
Content by the authors.