Home Download Docs Code Community
     1	/*
     2	Copyright 2018 The Perkeep Authors
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	/*
    18	Package sftp registers the "sftp" blobserver storage type, storing
    19	blobs one-per-file in a forest of sharded directories to a remote SFTP
    20	server over an SSH connection. It uses the same directory & file
    21	structure as the "localdisk" storage type.
    22	
    23	Example low-level config:
    24	
    25		"/storage/": {
    26		    "handler": "storage-sftp",
    27		    "handlerArgs": {
    28		         "user": "alice",
    29		         "addr": "10.1.2.3",
    30		         "dir": "/remote/path/to/store/blobs/in",
    31		         "serverFingerprint": "SHA256:fBkTSuUzQVnVMJ9+e74XNTCnQKSHldbfFiOI9kBMemc",
    32	
    33		         "pass": "s3cr3thunteR1!",
    34		         "passFile": "/home/alice/keys/sftp.password"
    35		     }
    36		},
    37	*/
    38	package sftp // import "perkeep.org/pkg/blobserver/sftp"
    39	
    40	import (
    41		"crypto/rand"
    42		"errors"
    43		"fmt"
    44		"io"
    45		"log"
    46		"net"
    47		"os"
    48		"os/exec"
    49		"path"
    50		"path/filepath"
    51		"strings"
    52		"sync"
    53		"time"
    54	
    55		"github.com/pkg/sftp"
    56	
    57		"go4.org/jsonconfig"
    58		"go4.org/syncutil"
    59		"go4.org/syncutil/singleflight"
    60		"go4.org/wkfs"
    61		"golang.org/x/crypto/ssh"
    62		"perkeep.org/internal/osutil"
    63		"perkeep.org/pkg/blob"
    64		"perkeep.org/pkg/blobserver"
    65		"perkeep.org/pkg/blobserver/files"
    66	)
    67	
    68	// Storage implements the blobserver.Storage interface using an SFTP server.
    69	type Storage struct {
    70		blobserver.Storage
    71		blob.SubFetcher
    72	
    73		addr, root string
    74		cc         *ssh.ClientConfig
    75	
    76		getClientGroup singleflight.Group
    77	
    78		mu         sync.Mutex
    79		lastGet    time.Time // time last fetched
    80		sc         *sftp.Client
    81		connCloser io.Closer // ssh.Conn or net.Conn
    82	}
    83	
    84	// Validate we implement expected interfaces.
    85	var (
    86		_ blobserver.Storage = (*Storage)(nil)
    87		_ blob.SubFetcher    = (*Storage)(nil)
    88	)
    89	
    90	func (s *Storage) String() string {
    91		return fmt.Sprintf("\"sftp\" file-per-blob at %s@%s, dir %s", s.cc.User, s.addr, s.root)
    92	}
    93	
    94	const (
    95		// We refuse to create a Storage when the user's ulimit is lower than
    96		// minFDLimit. 100 is ridiculously low, but the default value on OSX is 256, and we
    97		// don't want to fail by default, so our min value has to be lower than 256.
    98		minFDLimit         = 100
    99		recommendedFDLimit = 1024
   100	)
   101	
   102	// NewStorage returns a new SFTP storage implementation at the provided
   103	// TCP addr (host:port) in the named directory. An empty dir means ".".
   104	// The provided SSH client configured is required.
   105	func NewStorage(addr, dir string, cc *ssh.ClientConfig) (*Storage, error) {
   106		if dir == "" {
   107			dir = "."
   108		}
   109		s := &Storage{
   110			addr: addr,
   111			root: dir,
   112			cc:   cc,
   113		}
   114		fs := files.NewStorage(sftpFS{s}, dir)
   115		s.Storage = fs
   116		s.SubFetcher = fs
   117	
   118		if err := s.adjustFDLimit(fs); err != nil {
   119			return nil, err
   120		}
   121		return s, nil
   122	}
   123	
   124	func (s *Storage) adjustFDLimit(fs *files.Storage) error {
   125		ul, err := osutil.MaxFD()
   126		if errors.Is(err, osutil.ErrNotSupported) {
   127			return nil
   128		}
   129		if err != nil {
   130			return fmt.Errorf("sftp failed to determine system's file descriptor limit: %w", err)
   131		}
   132		if ul < minFDLimit {
   133			return fmt.Errorf("the max number of open file descriptors on your system (ulimit -n) is too low. Please fix it with 'ulimit -S -n X' with X being at least %d", recommendedFDLimit)
   134		}
   135		// Setting the gate to 80% of the ulimit, to leave a bit of room for other file ops happening in Perkeep.
   136		// TODO(mpl): make this used and enforced Perkeep-wide. Issue #837.
   137		fs.SetNewFileGate(syncutil.NewGate(int(ul * 80 / 100)))
   138		return nil
   139	}
   140	
   141	func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
   142		user := config.RequiredString("user")
   143		dir := config.RequiredString("dir")
   144		addr := config.RequiredString("addr")
   145		pass := config.OptionalString("pass", "")
   146		passFile := config.OptionalString("passFile", "")
   147		wantFingerprint := config.RequiredString("serverFingerprint")
   148	
   149		if err := config.Validate(); err != nil {
   150			return nil, err
   151		}
   152	
   153		if pass != "" && passFile != "" {
   154			return nil, errors.New(`the "pass" and "passFile" options are mutually exclusive`)
   155		}
   156		if passFile != "" {
   157			slurp, err := wkfs.ReadFile(passFile)
   158			if err != nil {
   159				return nil, err
   160			}
   161			pass = strings.TrimSpace(string(slurp))
   162		}
   163	
   164		if _, _, err := net.SplitHostPort(addr); err != nil {
   165			addr = net.JoinHostPort(addr, "22")
   166		}
   167		cc := &ssh.ClientConfig{
   168			User: user,
   169			HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
   170				keyPrint := ssh.FingerprintSHA256(key)
   171				if keyPrint == wantFingerprint {
   172					log.Printf("sftp: connected to %s at %v@%v, with expected fingerprint %v",
   173						hostname, user, remote, ssh.FingerprintSHA256(key))
   174					return nil
   175				}
   176				if wantFingerprint == "insecure-skip-verify" {
   177					log.Printf(`sftp: WARNNING: using "insecure-skip-verify", connected to %s at %v@%v, with untrusted fingerprint %v`,
   178						hostname, user, remote, ssh.FingerprintSHA256(key))
   179					return nil
   180				}
   181				return fmt.Errorf(`sftp: unexpected fingerprint %q connecting to %v/%v; want %q (or "insecure-skip-verify")`,
   182					keyPrint, hostname, remote, wantFingerprint)
   183			},
   184			Timeout: 10 * time.Second,
   185		}
   186		if pass != "" {
   187			cc.Auth = []ssh.AuthMethod{ssh.Password(pass)}
   188		}
   189		return NewStorage(addr, dir, cc)
   190	}
   191	
   192	// markConnDead clears the cached SFTP connection after the caller detects
   193	// a connection failure.
   194	func (s *Storage) markConnDead() {
   195		s.mu.Lock()
   196		defer s.mu.Unlock()
   197		s.markConnDeadLocked()
   198	}
   199	
   200	func (s *Storage) markConnDeadLocked() {
   201		if s.connCloser != nil {
   202			go s.connCloser.Close()
   203		}
   204		s.sc = nil
   205		s.connCloser = nil
   206	}
   207	
   208	func (s *Storage) monitorSSHConn(wait func() error) {
   209		err := wait()
   210		log.Printf("sftp: marking SSH connection dead: %v", err)
   211		s.markConnDead()
   212	}
   213	
   214	func (s *Storage) dialSFTP() (sc *sftp.Client, waiter func() error, toClose io.Closer, err error) {
   215		// Special case for testing:
   216		if s.cc.User == "RAWSFTPNOSSH" {
   217			var c net.Conn
   218			c, err = net.Dial("tcp", s.addr)
   219			if err != nil {
   220				return
   221			}
   222			sc, err = sftp.NewClientPipe(c, c)
   223			if err != nil {
   224				go c.Close()
   225				return
   226			}
   227			toClose = c
   228			return
   229		}
   230	
   231		var pw io.WriteCloser
   232		var pr io.Reader
   233	
   234		// Another special case for testing:
   235		user := s.cc.User
   236		const sysPrefix = "use-system-ssh:"
   237		if strings.HasPrefix(user, sysPrefix) {
   238			user = strings.TrimPrefix(user, sysPrefix)
   239			cmd := exec.Command("ssh", user+"@"+strings.TrimSuffix(s.addr, ":22"), "-s", "sftp")
   240			cmd.Stderr = os.Stderr
   241			// get stdin and stdout
   242			pw, err = cmd.StdinPipe()
   243			if err != nil {
   244				err = fmt.Errorf("%v: %w", cmd.Args, err)
   245				return
   246			}
   247			pr, err = cmd.StdoutPipe()
   248			if err != nil {
   249				err = fmt.Errorf("%v: %w", cmd.Args, err)
   250				return
   251			}
   252	
   253			// start the process
   254			if err = cmd.Start(); err != nil {
   255				err = fmt.Errorf("%v: %w", cmd.Args, err)
   256				return
   257			}
   258			log.Printf("using sftp directly")
   259			sc, err = sftp.NewClientPipe(pr, pw)
   260			return
   261		}
   262	
   263		var sshc *ssh.Client
   264		sshc, err = ssh.Dial("tcp", s.addr, s.cc)
   265		if err != nil {
   266			log.Printf("sftp: Dial: %v", err)
   267			return
   268		}
   269	
   270		var sess *ssh.Session
   271		sess, err = sshc.NewSession()
   272		if err != nil {
   273			log.Printf("sftp: ssh NewSession: %v", err)
   274			go sshc.Close()
   275			return
   276		}
   277		if err = sess.RequestSubsystem("sftp"); err != nil {
   278			log.Printf("sftp: RequestSubsystem: %v", err)
   279			go sshc.Close()
   280			return
   281		}
   282		pw, err = sess.StdinPipe()
   283		if err != nil {
   284			go sshc.Close()
   285			return
   286		}
   287		pr, err = sess.StdoutPipe()
   288		if err != nil {
   289			go sshc.Close()
   290			return
   291		}
   292	
   293		sc, err = sftp.NewClientPipe(pr, pw)
   294		if err != nil {
   295			go sshc.Close()
   296			return
   297		}
   298		toClose = sshc
   299		waiter = sshc.Wait
   300		return
   301	}
   302	
   303	// sftp returns the *sftp.Client to the server, handling reconnects and coalesced dialing
   304	// for concurrent callers.
   305	func (s *Storage) sftp() (*sftp.Client, error) {
   306		s.mu.Lock()
   307		if s.sc != nil {
   308			defer s.mu.Unlock()
   309			// TODO: remove all this "lastGet" stuff once the wait
   310			// mechanism has test coverage and we're sending
   311			// periodic heartbeats over the SSH channel.
   312			if now := time.Now(); s.lastGet.After(now.Add(-30 * time.Second)) {
   313				s.lastGet = now
   314				return s.sc, nil
   315			} else {
   316				// It's been awhile. Let's see if it's still good.
   317				_, err := s.sc.Stat(".")
   318				if err != nil {
   319					s.markConnDeadLocked()
   320				} else {
   321					s.lastGet = now
   322					return s.sc, nil
   323				}
   324			}
   325		}
   326		s.mu.Unlock()
   327		ci, err := s.getClientGroup.Do("", func() (interface{}, error) {
   328			sc, waiter, toClose, err := s.dialSFTP()
   329			if err != nil {
   330				return nil, err
   331			}
   332			if waiter != nil {
   333				go s.monitorSSHConn(waiter)
   334			}
   335			s.mu.Lock()
   336			defer s.mu.Unlock()
   337			s.connCloser = toClose
   338			s.sc = sc
   339			s.lastGet = time.Now()
   340			return sc, nil
   341		})
   342		if err != nil {
   343			return nil, err
   344		}
   345		return ci.(*sftp.Client), nil
   346	}
   347	
   348	func init() {
   349		blobserver.RegisterStorageConstructor("sftp", blobserver.StorageConstructor(newFromConfig))
   350	}
   351	
   352	type sftpFS struct {
   353		*Storage
   354	}
   355	
   356	func (s sftpFS) Remove(file string) error {
   357		sc, err := s.sftp()
   358		if err != nil {
   359			return err
   360		}
   361		return sc.Remove(filepath.ToSlash(file))
   362	}
   363	
   364	func (s sftpFS) RemoveDir(dir string) error {
   365		sc, err := s.sftp()
   366		if err != nil {
   367			return err
   368		}
   369		return sc.RemoveDirectory(filepath.ToSlash(dir))
   370	}
   371	
   372	func (s sftpFS) Open(file string) (files.ReadableFile, error) {
   373		sc, err := s.sftp()
   374		if err != nil {
   375			return nil, err
   376		}
   377		return sc.Open(filepath.ToSlash(file))
   378	}
   379	
   380	func (s sftpFS) Rename(oldname, newname string) error {
   381		sc, err := s.sftp()
   382		if err != nil {
   383			return err
   384		}
   385		return sc.PosixRename(filepath.ToSlash(oldname), filepath.ToSlash(newname))
   386	}
   387	
   388	func (s sftpFS) TempFile(dir, prefix string) (files.WritableFile, error) {
   389		sc, err := s.sftp()
   390		if err != nil {
   391			return nil, err
   392		}
   393		dir = filepath.ToSlash(dir)
   394		for tries := 0; tries < 5; tries++ {
   395			sufRand := make([]byte, 5)
   396			_, _ = rand.Read(sufRand)
   397			suffix := fmt.Sprintf("%x", sufRand)
   398			name := path.Join(dir, prefix+suffix)
   399			f, err := sc.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR)
   400			if err == nil {
   401				return writableFile{
   402					name:        name,
   403					WriteCloser: f,
   404				}, nil
   405			}
   406		}
   407		return nil, fmt.Errorf("sftp: failed to open temp file in %s:%s/%s with prefix %s", s.addr, s.root, dir, prefix)
   408	}
   409	
   410	type writableFile struct {
   411		io.WriteCloser
   412		name string
   413	}
   414	
   415	func (f writableFile) Name() string { return f.name }
   416	func (f writableFile) Sync() error  { return nil } // TODO: send fsync
   417	
   418	func (s sftpFS) ReadDirNames(dir string) ([]string, error) {
   419		sc, err := s.sftp()
   420		if err != nil {
   421			return nil, err
   422		}
   423		fis, err := sc.ReadDir(filepath.ToSlash(dir))
   424		if err != nil {
   425			return nil, err
   426		}
   427		// TODO: it's wasteful that we throw all this info away and
   428		// have the files package restat each file. Change the
   429		// interface or add an optional one that sftp can implement
   430		// and make the files package smarter about not asking for
   431		// duplicate work when possible.
   432		names := make([]string, len(fis))
   433		for i, fi := range fis {
   434			names[i] = fi.Name()
   435		}
   436		return names, nil
   437	}
   438	
   439	func (s sftpFS) Stat(path string) (os.FileInfo, error) {
   440		sc, err := s.sftp()
   441		if err != nil {
   442			return nil, err
   443		}
   444		return sc.Stat(filepath.ToSlash(path))
   445	}
   446	
   447	func (s sftpFS) Lstat(dir string) (os.FileInfo, error) {
   448		sc, err := s.sftp()
   449		if err != nil {
   450			return nil, err
   451		}
   452		return sc.Lstat(filepath.ToSlash(dir))
   453	}
   454	
   455	func (s sftpFS) MkdirAll(dir string, perm os.FileMode) error {
   456		sc, err := s.sftp()
   457		if err != nil {
   458			return err
   459		}
   460	
   461		return sc.MkdirAll(filepath.ToSlash(dir))
   462	}
Website layout inspired by memcached.
Content by the authors.