1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18
19
20
21 package archiver
22
23 import (
24 "archive/zip"
25 "bytes"
26 "context"
27 "errors"
28 "io"
29
30 "perkeep.org/pkg/blob"
31 "perkeep.org/pkg/blobserver"
32 )
33
34
35 const DefaultMinZipSize = 16 << 20
36
37
38
39 type Archiver struct {
40
41
42 Source blobserver.Storage
43
44
45
46 MinZipSize int64
47
48
49
50
51
52
53
54 Store func(zip []byte, blobs []blob.SizedRef) error
55
56
57
58
59
60
61
62 DeleteSourceAfterStore bool
63 }
64
65
66
67 var ErrSourceTooSmall = errors.New("archiver: not enough blob data on source to warrant a new zip archive")
68
69 func (a *Archiver) zipSize() int64 {
70 if a.MinZipSize > 0 {
71 return a.MinZipSize
72 }
73 return DefaultMinZipSize
74 }
75
76 var errStopEnumerate = errors.New("sentinel return value")
77
78
79
80 func (a *Archiver) RunOnce(ctx context.Context) error {
81 if a.Source == nil {
82 return errors.New("archiver: nil Source")
83 }
84 if a.Store == nil {
85 return errors.New("archiver: nil Store func")
86 }
87 pz := &potentialZip{a: a}
88 err := blobserver.EnumerateAll(ctx, a.Source, func(sb blob.SizedRef) error {
89 if err := pz.addBlob(ctx, sb); err != nil {
90 return err
91 }
92 if pz.bigEnough() {
93 return errStopEnumerate
94 }
95 return nil
96 })
97 if err == errStopEnumerate {
98 err = nil
99 }
100 if err != nil {
101 return err
102 }
103 if err := pz.condClose(); err != nil {
104 return err
105 }
106 if !pz.bigEnough() {
107 return ErrSourceTooSmall
108 }
109 if err := a.Store(pz.buf.Bytes(), pz.blobs); err != nil {
110 return err
111 }
112 if a.DeleteSourceAfterStore {
113 blobs := make([]blob.Ref, 0, len(pz.blobs))
114 for _, sb := range pz.blobs {
115 blobs = append(blobs, sb.Ref)
116 }
117 if err := a.Source.RemoveBlobs(ctx, blobs); err != nil {
118 return err
119 }
120 }
121 return nil
122 }
123
124 type potentialZip struct {
125 a *Archiver
126 blobs []blob.SizedRef
127 zw *zip.Writer
128 buf bytes.Buffer
129 sumSize int64
130 closed bool
131 }
132
133 func (z *potentialZip) bigEnough() bool {
134 return int64(z.buf.Len()) > z.a.zipSize()
135 }
136
137 func (z *potentialZip) condClose() error {
138 if z.closed || z.zw == nil {
139 return nil
140 }
141 z.closed = true
142 return z.zw.Close()
143 }
144
145 func (z *potentialZip) addBlob(ctx context.Context, sb blob.SizedRef) error {
146 if z.bigEnough() {
147 return nil
148 }
149 z.sumSize += int64(sb.Size)
150 if z.zw == nil && z.sumSize > z.a.zipSize() {
151 z.zw = zip.NewWriter(&z.buf)
152 for _, sb := range z.blobs {
153 if err := z.writeZipBlob(ctx, sb); err != nil {
154 return err
155 }
156 }
157 }
158 z.blobs = append(z.blobs, sb)
159 if z.zw != nil {
160 return z.writeZipBlob(ctx, sb)
161 }
162 return nil
163 }
164
165 func (z *potentialZip) writeZipBlob(ctx context.Context, sb blob.SizedRef) error {
166 w, err := z.zw.CreateHeader(&zip.FileHeader{
167 Name: sb.Ref.String(),
168 Method: zip.Deflate,
169 })
170 if err != nil {
171 return err
172 }
173 blobSrc, _, err := z.a.Source.Fetch(ctx, sb.Ref)
174 if err != nil {
175 return err
176 }
177 defer blobSrc.Close()
178 _, err = io.Copy(w, blobSrc)
179 return err
180 }