1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
37 package remote
38
39 import (
40 "context"
41 "io"
42
43 "go4.org/jsonconfig"
44 "perkeep.org/pkg/blob"
45 "perkeep.org/pkg/blobserver"
46 "perkeep.org/pkg/client"
47 )
48
49
50
51 type remoteStorage struct {
52 client *client.Client
53 }
54
55 var (
56 _ blobserver.Storage = (*remoteStorage)(nil)
57 _ io.Closer = (*remoteStorage)(nil)
58 )
59
60
61
62 func NewFromClient(c *client.Client) blobserver.Storage {
63 return &remoteStorage{client: c}
64 }
65
66 func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
67 url := config.RequiredString("url")
68 auth := config.RequiredString("auth")
69 skipStartupCheck := config.OptionalBool("skipStartupCheck", false)
70 trustedCert := config.OptionalString("trustedCert", "")
71 if err := config.Validate(); err != nil {
72 return nil, err
73 }
74
75 client, err := client.New(
76 client.OptionServer(url),
77 client.OptionTrustedCert(trustedCert),
78 )
79 if err != nil {
80 return nil, err
81 }
82 if err = client.SetupAuthFromString(auth); err != nil {
83 return nil, err
84 }
85 sto := &remoteStorage{
86 client: client,
87 }
88 if !skipStartupCheck {
89
90
91
92 c := make(chan blob.SizedRef, 1)
93 err = sto.EnumerateBlobs(context.TODO(), c, "", 1)
94 if err != nil {
95 return nil, err
96 }
97 }
98 return sto, nil
99 }
100
101 func (sto *remoteStorage) Close() error {
102 return sto.client.Close()
103 }
104
105 func (sto *remoteStorage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
106 return sto.client.RemoveBlobs(ctx, blobs)
107 }
108
109 func (sto *remoteStorage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
110
111
112
113
114 return sto.client.StatBlobs(ctx, blobs, fn)
115 }
116
117 func (sto *remoteStorage) ReceiveBlob(ctx context.Context, blob blob.Ref, source io.Reader) (outsb blob.SizedRef, outerr error) {
118 h := &client.UploadHandle{
119 BlobRef: blob,
120 Size: 0,
121 Contents: source,
122 }
123 pr, err := sto.client.Upload(ctx, h)
124 if err != nil {
125 outerr = err
126 return
127 }
128 return pr.SizedBlobRef(), nil
129 }
130
131 func (sto *remoteStorage) Fetch(ctx context.Context, b blob.Ref) (file io.ReadCloser, size uint32, err error) {
132 return sto.client.Fetch(ctx, b)
133 }
134
135 func (sto *remoteStorage) MaxEnumerate() int { return 1000 }
136
137 func (sto *remoteStorage) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
138 return sto.client.EnumerateBlobsOpts(ctx, dest, client.EnumerateOpts{
139 After: after,
140 Limit: limit,
141 })
142 }
143
144 func init() {
145 blobserver.RegisterStorageConstructor("remote", blobserver.StorageConstructor(newFromConfig))
146 }