1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
38 package sftp
39
40 import (
41 "crypto/rand"
42 "errors"
43 "fmt"
44 "io"
45 "log"
46 "net"
47 "os"
48 "os/exec"
49 "path"
50 "path/filepath"
51 "strings"
52 "sync"
53 "time"
54
55 "github.com/pkg/sftp"
56
57 "go4.org/jsonconfig"
58 "go4.org/syncutil"
59 "go4.org/syncutil/singleflight"
60 "go4.org/wkfs"
61 "golang.org/x/crypto/ssh"
62 "perkeep.org/internal/osutil"
63 "perkeep.org/pkg/blob"
64 "perkeep.org/pkg/blobserver"
65 "perkeep.org/pkg/blobserver/files"
66 )
67
68
69 type Storage struct {
70 blobserver.Storage
71 blob.SubFetcher
72
73 addr, root string
74 cc *ssh.ClientConfig
75
76 getClientGroup singleflight.Group
77
78 mu sync.Mutex
79 lastGet time.Time
80 sc *sftp.Client
81 connCloser io.Closer
82 }
83
84
85 var (
86 _ blobserver.Storage = (*Storage)(nil)
87 _ blob.SubFetcher = (*Storage)(nil)
88 )
89
90 func (s *Storage) String() string {
91 return fmt.Sprintf("\"sftp\" file-per-blob at %s@%s, dir %s", s.cc.User, s.addr, s.root)
92 }
93
94 const (
95
96
97
98 minFDLimit = 100
99 recommendedFDLimit = 1024
100 )
101
102
103
104
105 func NewStorage(addr, dir string, cc *ssh.ClientConfig) (*Storage, error) {
106 if dir == "" {
107 dir = "."
108 }
109 s := &Storage{
110 addr: addr,
111 root: dir,
112 cc: cc,
113 }
114 fs := files.NewStorage(sftpFS{s}, dir)
115 s.Storage = fs
116 s.SubFetcher = fs
117
118 if err := s.adjustFDLimit(fs); err != nil {
119 return nil, err
120 }
121 return s, nil
122 }
123
124 func (s *Storage) adjustFDLimit(fs *files.Storage) error {
125 ul, err := osutil.MaxFD()
126 if errors.Is(err, osutil.ErrNotSupported) {
127 return nil
128 }
129 if err != nil {
130 return fmt.Errorf("sftp failed to determine system's file descriptor limit: %w", err)
131 }
132 if ul < minFDLimit {
133 return fmt.Errorf("the max number of open file descriptors on your system (ulimit -n) is too low. Please fix it with 'ulimit -S -n X' with X being at least %d", recommendedFDLimit)
134 }
135
136
137 fs.SetNewFileGate(syncutil.NewGate(int(ul * 80 / 100)))
138 return nil
139 }
140
141 func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
142 user := config.RequiredString("user")
143 dir := config.RequiredString("dir")
144 addr := config.RequiredString("addr")
145 pass := config.OptionalString("pass", "")
146 passFile := config.OptionalString("passFile", "")
147 wantFingerprint := config.RequiredString("serverFingerprint")
148
149 if err := config.Validate(); err != nil {
150 return nil, err
151 }
152
153 if pass != "" && passFile != "" {
154 return nil, errors.New(`the "pass" and "passFile" options are mutually exclusive`)
155 }
156 if passFile != "" {
157 slurp, err := wkfs.ReadFile(passFile)
158 if err != nil {
159 return nil, err
160 }
161 pass = strings.TrimSpace(string(slurp))
162 }
163
164 if _, _, err := net.SplitHostPort(addr); err != nil {
165 addr = net.JoinHostPort(addr, "22")
166 }
167 cc := &ssh.ClientConfig{
168 User: user,
169 HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
170 keyPrint := ssh.FingerprintSHA256(key)
171 if keyPrint == wantFingerprint {
172 log.Printf("sftp: connected to %s at %v@%v, with expected fingerprint %v",
173 hostname, user, remote, ssh.FingerprintSHA256(key))
174 return nil
175 }
176 if wantFingerprint == "insecure-skip-verify" {
177 log.Printf(`sftp: WARNNING: using "insecure-skip-verify", connected to %s at %v@%v, with untrusted fingerprint %v`,
178 hostname, user, remote, ssh.FingerprintSHA256(key))
179 return nil
180 }
181 return fmt.Errorf(`sftp: unexpected fingerprint %q connecting to %v/%v; want %q (or "insecure-skip-verify")`,
182 keyPrint, hostname, remote, wantFingerprint)
183 },
184 Timeout: 10 * time.Second,
185 }
186 if pass != "" {
187 cc.Auth = []ssh.AuthMethod{ssh.Password(pass)}
188 }
189 return NewStorage(addr, dir, cc)
190 }
191
192
193
194 func (s *Storage) markConnDead() {
195 s.mu.Lock()
196 defer s.mu.Unlock()
197 s.markConnDeadLocked()
198 }
199
200 func (s *Storage) markConnDeadLocked() {
201 if s.connCloser != nil {
202 go s.connCloser.Close()
203 }
204 s.sc = nil
205 s.connCloser = nil
206 }
207
208 func (s *Storage) monitorSSHConn(wait func() error) {
209 err := wait()
210 log.Printf("sftp: marking SSH connection dead: %v", err)
211 s.markConnDead()
212 }
213
214 func (s *Storage) dialSFTP() (sc *sftp.Client, waiter func() error, toClose io.Closer, err error) {
215
216 if s.cc.User == "RAWSFTPNOSSH" {
217 var c net.Conn
218 c, err = net.Dial("tcp", s.addr)
219 if err != nil {
220 return
221 }
222 sc, err = sftp.NewClientPipe(c, c)
223 if err != nil {
224 go c.Close()
225 return
226 }
227 toClose = c
228 return
229 }
230
231 var pw io.WriteCloser
232 var pr io.Reader
233
234
235 user := s.cc.User
236 const sysPrefix = "use-system-ssh:"
237 if strings.HasPrefix(user, sysPrefix) {
238 user = strings.TrimPrefix(user, sysPrefix)
239 cmd := exec.Command("ssh", user+"@"+strings.TrimSuffix(s.addr, ":22"), "-s", "sftp")
240 cmd.Stderr = os.Stderr
241
242 pw, err = cmd.StdinPipe()
243 if err != nil {
244 err = fmt.Errorf("%v: %w", cmd.Args, err)
245 return
246 }
247 pr, err = cmd.StdoutPipe()
248 if err != nil {
249 err = fmt.Errorf("%v: %w", cmd.Args, err)
250 return
251 }
252
253
254 if err = cmd.Start(); err != nil {
255 err = fmt.Errorf("%v: %w", cmd.Args, err)
256 return
257 }
258 log.Printf("using sftp directly")
259 sc, err = sftp.NewClientPipe(pr, pw)
260 return
261 }
262
263 var sshc *ssh.Client
264 sshc, err = ssh.Dial("tcp", s.addr, s.cc)
265 if err != nil {
266 log.Printf("sftp: Dial: %v", err)
267 return
268 }
269
270 var sess *ssh.Session
271 sess, err = sshc.NewSession()
272 if err != nil {
273 log.Printf("sftp: ssh NewSession: %v", err)
274 go sshc.Close()
275 return
276 }
277 if err = sess.RequestSubsystem("sftp"); err != nil {
278 log.Printf("sftp: RequestSubsystem: %v", err)
279 go sshc.Close()
280 return
281 }
282 pw, err = sess.StdinPipe()
283 if err != nil {
284 go sshc.Close()
285 return
286 }
287 pr, err = sess.StdoutPipe()
288 if err != nil {
289 go sshc.Close()
290 return
291 }
292
293 sc, err = sftp.NewClientPipe(pr, pw)
294 if err != nil {
295 go sshc.Close()
296 return
297 }
298 toClose = sshc
299 waiter = sshc.Wait
300 return
301 }
302
303
304
305 func (s *Storage) sftp() (*sftp.Client, error) {
306 s.mu.Lock()
307 if s.sc != nil {
308 defer s.mu.Unlock()
309
310
311
312 if now := time.Now(); s.lastGet.After(now.Add(-30 * time.Second)) {
313 s.lastGet = now
314 return s.sc, nil
315 } else {
316
317 _, err := s.sc.Stat(".")
318 if err != nil {
319 s.markConnDeadLocked()
320 } else {
321 s.lastGet = now
322 return s.sc, nil
323 }
324 }
325 }
326 s.mu.Unlock()
327 ci, err := s.getClientGroup.Do("", func() (interface{}, error) {
328 sc, waiter, toClose, err := s.dialSFTP()
329 if err != nil {
330 return nil, err
331 }
332 if waiter != nil {
333 go s.monitorSSHConn(waiter)
334 }
335 s.mu.Lock()
336 defer s.mu.Unlock()
337 s.connCloser = toClose
338 s.sc = sc
339 s.lastGet = time.Now()
340 return sc, nil
341 })
342 if err != nil {
343 return nil, err
344 }
345 return ci.(*sftp.Client), nil
346 }
347
348 func init() {
349 blobserver.RegisterStorageConstructor("sftp", blobserver.StorageConstructor(newFromConfig))
350 }
351
352 type sftpFS struct {
353 *Storage
354 }
355
356 func (s sftpFS) Remove(file string) error {
357 sc, err := s.sftp()
358 if err != nil {
359 return err
360 }
361 return sc.Remove(filepath.ToSlash(file))
362 }
363
364 func (s sftpFS) RemoveDir(dir string) error {
365 sc, err := s.sftp()
366 if err != nil {
367 return err
368 }
369 return sc.RemoveDirectory(filepath.ToSlash(dir))
370 }
371
372 func (s sftpFS) Open(file string) (files.ReadableFile, error) {
373 sc, err := s.sftp()
374 if err != nil {
375 return nil, err
376 }
377 return sc.Open(filepath.ToSlash(file))
378 }
379
380 func (s sftpFS) Rename(oldname, newname string) error {
381 sc, err := s.sftp()
382 if err != nil {
383 return err
384 }
385 return sc.PosixRename(filepath.ToSlash(oldname), filepath.ToSlash(newname))
386 }
387
388 func (s sftpFS) TempFile(dir, prefix string) (files.WritableFile, error) {
389 sc, err := s.sftp()
390 if err != nil {
391 return nil, err
392 }
393 dir = filepath.ToSlash(dir)
394 for tries := 0; tries < 5; tries++ {
395 sufRand := make([]byte, 5)
396 _, _ = rand.Read(sufRand)
397 suffix := fmt.Sprintf("%x", sufRand)
398 name := path.Join(dir, prefix+suffix)
399 f, err := sc.OpenFile(name, os.O_CREATE|os.O_EXCL|os.O_RDWR)
400 if err == nil {
401 return writableFile{
402 name: name,
403 WriteCloser: f,
404 }, nil
405 }
406 }
407 return nil, fmt.Errorf("sftp: failed to open temp file in %s:%s/%s with prefix %s", s.addr, s.root, dir, prefix)
408 }
409
410 type writableFile struct {
411 io.WriteCloser
412 name string
413 }
414
415 func (f writableFile) Name() string { return f.name }
416 func (f writableFile) Sync() error { return nil }
417
418 func (s sftpFS) ReadDirNames(dir string) ([]string, error) {
419 sc, err := s.sftp()
420 if err != nil {
421 return nil, err
422 }
423 fis, err := sc.ReadDir(filepath.ToSlash(dir))
424 if err != nil {
425 return nil, err
426 }
427
428
429
430
431
432 names := make([]string, len(fis))
433 for i, fi := range fis {
434 names[i] = fi.Name()
435 }
436 return names, nil
437 }
438
439 func (s sftpFS) Stat(path string) (os.FileInfo, error) {
440 sc, err := s.sftp()
441 if err != nil {
442 return nil, err
443 }
444 return sc.Stat(filepath.ToSlash(path))
445 }
446
447 func (s sftpFS) Lstat(dir string) (os.FileInfo, error) {
448 sc, err := s.sftp()
449 if err != nil {
450 return nil, err
451 }
452 return sc.Lstat(filepath.ToSlash(dir))
453 }
454
455 func (s sftpFS) MkdirAll(dir string, perm os.FileMode) error {
456 sc, err := s.sftp()
457 if err != nil {
458 return err
459 }
460
461 return sc.MkdirAll(filepath.ToSlash(dir))
462 }