1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18
19
20 package android
21
22 import (
23 "bufio"
24 "context"
25 "crypto/tls"
26 "crypto/x509"
27 "fmt"
28 "io"
29 "log"
30 "net"
31 "net/http"
32 "os"
33 "os/exec"
34 "path/filepath"
35 "regexp"
36 "runtime"
37 "runtime/debug"
38 "strconv"
39 "sync"
40 "time"
41
42 "perkeep.org/internal/osutil"
43 "perkeep.org/pkg/blob"
44 "perkeep.org/pkg/blobserver"
45 "perkeep.org/pkg/schema"
46 )
47
48
49 var androidOutput, _ = strconv.ParseBool(os.Getenv("CAMPUT_ANDROID_OUTPUT"))
50
51 func PreExit() {
52 if !IsChild() {
53 return
54 }
55 Printf("STAT exit 1\n")
56 }
57
58 var androidOutMu sync.Mutex
59
60 func Printf(format string, args ...interface{}) {
61 androidOutMu.Lock()
62 defer androidOutMu.Unlock()
63 fmt.Printf(format, args...)
64 }
65
66 var detectOnce sync.Once
67 var onAndroidCache bool
68
69 func dirExists(f string) bool {
70 fi, err := os.Stat(f)
71 return err == nil && fi.IsDir()
72 }
73
74 func initOnAndroid() {
75
76 onAndroidCache = dirExists("/data/data") && dirExists("/system/etc")
77 }
78
79 var pingRx = regexp.MustCompile(`\((.+?)\)`)
80
81 type namedInt struct {
82 name string
83 sync.Mutex
84 val int64
85 }
86
87 func (ni *namedInt) Incr(delta int64) {
88 ni.Lock()
89 ni.val += delta
90 nv := ni.val
91 ni.Unlock()
92 Printf("STAT %s %d\n", ni.name, nv)
93 }
94
95 func (ni *namedInt) Set(v int64) {
96 ni.Lock()
97 if v == ni.val {
98 ni.Unlock()
99 return
100 }
101 ni.val = v
102 ni.Unlock()
103 Printf("STAT %s %d\n", ni.name, v)
104 }
105
106 var (
107 statDNSStart = &namedInt{name: "dns_start"}
108 statDNSDone = &namedInt{name: "dns_done"}
109 statTCPStart = &namedInt{name: "tcp_start"}
110 statTCPStarted = &namedInt{name: "tcp_started"}
111 statTCPFail = &namedInt{name: "tcp_fail"}
112 statTCPDone = &namedInt{name: "tcp_done_because_close"}
113 statTCPDoneRead = &namedInt{name: "tcp_done_because_read"}
114 statTCPWrites = &namedInt{name: "tcp_write_byte"}
115 statTCPWrote = &namedInt{name: "tcp_wrote_byte"}
116 statTCPReads = &namedInt{name: "tcp_read_byte"}
117 statHTTPStart = &namedInt{name: "http_start"}
118 statHTTPResHeaders = &namedInt{name: "http_res_headers"}
119 statBlobUploaded = &namedInt{name: "blob_uploaded"}
120 statBlobExisted = &namedInt{name: "blob_existed"}
121 statFileUploaded = &namedInt{name: "file_uploaded"}
122 statFileExisted = &namedInt{name: "file_existed"}
123 statMemReleased = &namedInt{name: "mem_heap_released"}
124 statMemAlloc = &namedInt{name: "mem_alloc"}
125 statMemRSS = &namedInt{name: "mem_rss"}
126 )
127
128 type statTrackingConn struct {
129 net.Conn
130 once sync.Once
131 }
132
133 func (c *statTrackingConn) Write(p []byte) (n int, err error) {
134 statTCPWrites.Incr(int64(len(p)))
135 n, err = c.Conn.Write(p)
136 statTCPWrote.Incr(int64(n))
137 return
138 }
139
140 func (c *statTrackingConn) Read(p []byte) (n int, err error) {
141 n, err = c.Conn.Read(p)
142 statTCPReads.Incr(int64(n))
143 if err != nil {
144 c.once.Do(func() {
145 statTCPDoneRead.Incr(1)
146 })
147 }
148 return
149 }
150
151 func (c *statTrackingConn) Close() error {
152 c.once.Do(func() {
153 statTCPDone.Incr(1)
154 })
155 return nil
156 }
157
158 var (
159 dnsMu sync.Mutex
160 dnsCache = make(map[string]string)
161 )
162
163 func androidLookupHost(host string) string {
164 dnsMu.Lock()
165 v, ok := dnsCache[host]
166 dnsMu.Unlock()
167 if ok {
168 return v
169 }
170 statDNSStart.Incr(1)
171 defer statDNSDone.Incr(1)
172
173
174
175
176 c := make(chan string, 1)
177 go func() {
178 cmd := exec.Command("/system/bin/ping", "-c", "1", host)
179 stdout, err := cmd.StdoutPipe()
180 if err != nil {
181 panic(err)
182 }
183 defer stdout.Close()
184 if err := cmd.Start(); err != nil {
185 log.Printf("Error resolving %q with ping: %v", host, err)
186 c <- host
187 return
188 }
189 defer func() {
190 if p := cmd.Process; p != nil {
191 p.Kill()
192 }
193 }()
194 br := bufio.NewReader(stdout)
195 line, err := br.ReadString('\n')
196 if err != nil {
197 log.Printf("Failed to resolve %q with ping", host)
198 c <- host
199 return
200 }
201 if m := pingRx.FindStringSubmatch(line); m != nil {
202 ip := m[1]
203 dnsMu.Lock()
204 dnsCache[host] = ip
205 dnsMu.Unlock()
206 c <- ip
207 return
208 }
209 log.Printf("Failed to resolve %q with ping", host)
210 c <- host
211 }()
212 return <-c
213 }
214
215 type StatsTransport struct {
216 Rt http.RoundTripper
217 }
218
219 func (t StatsTransport) RoundTrip(req *http.Request) (res *http.Response, err error) {
220 statHTTPStart.Incr(1)
221 res, err = t.Rt.RoundTrip(req)
222 statHTTPResHeaders.Incr(1)
223
224 return
225 }
226
227 func Dial(network, addr string) (net.Conn, error) {
228
229
230
231
232
233
234
235 statTCPStart.Incr(1)
236 host, port, err := net.SplitHostPort(addr)
237 if err != nil {
238 statTCPFail.Incr(1)
239 return nil, fmt.Errorf("couldn't split %q", addr)
240 }
241 if OnAndroid() {
242
243
244
245 host = androidLookupHost(host)
246 }
247 c, err := net.Dial(network, net.JoinHostPort(host, port))
248 if err != nil {
249 statTCPFail.Incr(1)
250 return nil, err
251 }
252 statTCPStarted.Incr(1)
253 return &statTrackingConn{Conn: c}, err
254 }
255
256 func TLSConfig() (*tls.Config, error) {
257 if !OnAndroid() {
258 return nil, nil
259 }
260 certDir := "/system/etc/security/cacerts"
261 fi, err := os.Stat(certDir)
262 if err != nil {
263 return nil, err
264 }
265 if !fi.IsDir() {
266 return nil, fmt.Errorf("%q not a dir", certDir)
267 }
268 pool := x509.NewCertPool()
269 cfg := &tls.Config{RootCAs: pool}
270
271 f, err := os.Open(certDir)
272 if err != nil {
273 return nil, err
274 }
275 defer f.Close()
276 names, _ := f.Readdirnames(-1)
277 for _, name := range names {
278 pem, err := os.ReadFile(filepath.Join(certDir, name))
279 if err != nil {
280 return nil, err
281 }
282 pool.AppendCertsFromPEM(pem)
283 }
284 return cfg, nil
285 }
286
287
288
289 func NoteFileUploaded(fullPath string, uploaded bool) {
290 if !IsChild() {
291 return
292 }
293 if uploaded {
294 statFileUploaded.Incr(1)
295 } else {
296 statFileExisted.Incr(1)
297 }
298 Printf("FILE_UPLOADED %s\n", fullPath)
299 }
300
301
302
303
304 type StatusReceiver struct {
305 Sr blobserver.StatReceiver
306 Path string
307 }
308
309 func (asr StatusReceiver) noteChunkOnServer(sb blob.SizedRef) {
310 Printf("CHUNK_UPLOADED %d %s %s\n", sb.Size, sb.Ref, asr.Path)
311 }
312
313 func (asr StatusReceiver) ReceiveBlob(ctx context.Context, blob blob.Ref, source io.Reader) (blob.SizedRef, error) {
314
315
316 var buf [1024]byte
317 contents := buf[:0]
318 sb, err := asr.Sr.ReceiveBlob(ctx, blob, io.TeeReader(source, writeUntilSliceFull{&contents}))
319 if err == nil && !schema.LikelySchemaBlob(contents) {
320 statBlobUploaded.Incr(1)
321 asr.noteChunkOnServer(sb)
322 }
323 return sb, err
324 }
325
326 func (asr StatusReceiver) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
327 return asr.Sr.StatBlobs(ctx, blobs, func(sb blob.SizedRef) error {
328 asr.noteChunkOnServer(sb)
329 statBlobExisted.Incr(1)
330 return fn(sb)
331 })
332 }
333
334 type writeUntilSliceFull struct {
335 s *[]byte
336 }
337
338 func (w writeUntilSliceFull) Write(p []byte) (n int, err error) {
339 s := *w.s
340 l := len(s)
341 growBy := cap(s) - l
342 if growBy > len(p) {
343 growBy = len(p)
344 }
345 s = s[0 : l+growBy]
346 copy(s[l:], p)
347 *w.s = s
348 return len(p), nil
349 }
350
351 var memOnce sync.Once
352
353 func startMemGoroutine() {
354 if !androidOutput {
355 return
356 }
357 go func() {
358 var ms runtime.MemStats
359 n := 0
360 for {
361 runtime.ReadMemStats(&ms)
362 statMemReleased.Set(int64(ms.HeapReleased))
363 statMemAlloc.Set(int64(ms.Alloc))
364 statMemRSS.Set(osutil.MemUsage())
365 time.Sleep(1 * time.Second)
366 n++
367 if n%5 == 0 {
368 debug.FreeOSMemory()
369 }
370 }
371 }()
372 }