1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
16
17
18
19 package mysql
20
21 import (
22 "database/sql"
23 "errors"
24 "fmt"
25 "os"
26 "regexp"
27 "strconv"
28 "strings"
29 "sync"
30
31 _ "github.com/go-sql-driver/mysql"
32 "go4.org/jsonconfig"
33 "go4.org/syncutil"
34 "perkeep.org/pkg/env"
35 "perkeep.org/pkg/sorted"
36 "perkeep.org/pkg/sorted/sqlkv"
37 )
38
39 func init() {
40 sorted.RegisterKeyValue("mysql", newKeyValueFromJSONConfig)
41 }
42
43
44
45 func newKVDB(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
46 var (
47 user = cfg.RequiredString("user")
48 database = cfg.RequiredString("database")
49 host = cfg.OptionalString("host", "")
50 password = cfg.OptionalString("password", "")
51 )
52 if err := cfg.Validate(); err != nil {
53 return nil, err
54 }
55 if !validDatabaseName(database) {
56 return nil, fmt.Errorf("%q looks like an invalid database name", database)
57 }
58 var err error
59 if host != "" {
60 host, err = maybeRemapCloudSQL(host)
61 if err != nil {
62 return nil, err
63 }
64 if !strings.Contains(host, ":") {
65 host += ":3306"
66 }
67 host = "tcp(" + host + ")"
68 }
69
70
71
72
73
74 dsn := fmt.Sprintf("%s:%s@%s/", user, password, host)
75
76 db, err := openOrCachedDB(dsn)
77 if err != nil {
78 return nil, err
79 }
80
81 if err := CreateDB(db, database); err != nil {
82 return nil, err
83 }
84 return &keyValue{
85 database: database,
86 dsn: dsn,
87 db: db,
88 KeyValue: &sqlkv.KeyValue{
89 DB: db,
90 TablePrefix: database + ".",
91 Gate: syncutil.NewGate(20),
92 },
93 }, nil
94 }
95
96
97 func (kv *keyValue) Wipe() error {
98 if _, err := kv.db.Exec("DROP TABLE IF EXISTS " + kv.database + ".rows"); err != nil {
99 return err
100 }
101 if _, err := kv.db.Exec("DROP TABLE IF EXISTS " + kv.database + ".meta"); err != nil {
102 return err
103 }
104 return kv.finalize()
105 }
106
107
108 func (kv *keyValue) finalize() error {
109 if err := createTables(kv.db, kv.database); err != nil {
110 return err
111 }
112
113 if err := kv.ping(); err != nil {
114 return fmt.Errorf("MySQL db unreachable: %v", err)
115 }
116
117 version, err := kv.SchemaVersion()
118 if err != nil {
119 return fmt.Errorf("error getting current database schema version: %v", err)
120 }
121 if version == 0 {
122
123 if _, err := kv.db.Exec(fmt.Sprintf(`REPLACE INTO %s.meta VALUES ('version', ?)`, kv.database), requiredSchemaVersion); err != nil {
124 return fmt.Errorf("error setting schema version: %v", err)
125 }
126 return nil
127 }
128 if version != requiredSchemaVersion {
129 if version == 20 && requiredSchemaVersion == 21 {
130 fmt.Fprintf(os.Stderr, fixSchema20to21)
131 }
132 if env.IsDev() {
133
134
135 return sorted.NeedWipeError{
136 Msg: fmt.Sprintf("database schema version is %d; expect %d (run \"devcam server --wipe\" to wipe both your blobs and re-populate the database schema)", version, requiredSchemaVersion),
137 }
138 }
139 return sorted.NeedWipeError{
140 Msg: fmt.Sprintf("database schema version is %d; expect %d (need to re-init/upgrade database?)",
141 version, requiredSchemaVersion),
142 }
143 }
144
145 return nil
146 }
147
148 func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
149 kv, err := newKVDB(cfg)
150 if err != nil {
151 return nil, err
152 }
153 return kv, kv.(*keyValue).finalize()
154 }
155
156 var dbnameRx = regexp.MustCompile(`^[a-zA-Z0-9_]+$`)
157
158
159 func validDatabaseName(dbname string) bool {
160 return dbnameRx.MatchString(dbname)
161 }
162
163
164 func CreateDB(db *sql.DB, dbname string) error {
165 if dbname == "" {
166 return errors.New("can not create database: database name is missing")
167 }
168 if _, err := db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", dbname)); err != nil {
169 return fmt.Errorf("error creating database %v: %v", dbname, err)
170 }
171 return nil
172 }
173
174 func createTables(db *sql.DB, database string) error {
175 for _, tableSQL := range SQLCreateTables() {
176 tableSQL = strings.Replace(tableSQL, "/*DB*/", database, -1)
177 if _, err := db.Exec(tableSQL); err != nil {
178 errMsg := "error creating table with %q: %v."
179 createError := err
180 sv, err := serverVersion(db)
181 if err != nil {
182 return err
183 }
184 if !hasLargeVarchar(sv) {
185 errMsg += "\nYour MySQL server is too old (< 5.0.3) to support VARCHAR larger than 255."
186 }
187 return fmt.Errorf(errMsg, tableSQL, createError)
188 }
189 }
190 return nil
191 }
192
193
194 var (
195 dbsmu sync.Mutex
196 dbs = map[string]*sql.DB{}
197 )
198
199 func openOrCachedDB(dsn string) (*sql.DB, error) {
200 dbsmu.Lock()
201 defer dbsmu.Unlock()
202 if db, ok := dbs[dsn]; ok {
203 return db, nil
204 }
205 db, err := sql.Open("mysql", dsn)
206 if err != nil {
207 return nil, err
208 }
209 dbs[dsn] = db
210 return db, nil
211 }
212
213 type keyValue struct {
214 *sqlkv.KeyValue
215
216 database string
217 dsn string
218 db *sql.DB
219 }
220
221
222
223 func (kv *keyValue) Close() error {
224 dbsmu.Lock()
225 defer dbsmu.Unlock()
226 delete(dbs, kv.dsn)
227 return kv.DB.Close()
228 }
229
230 func (kv *keyValue) ping() error {
231
232 _, err := kv.SchemaVersion()
233 return err
234 }
235
236
237
238 func (kv *keyValue) SchemaVersion() (version int, err error) {
239 err = kv.db.QueryRow("SELECT value FROM " + kv.KeyValue.TablePrefix + "meta WHERE metakey='version'").Scan(&version)
240 if err == sql.ErrNoRows {
241 return 0, nil
242 }
243 return
244 }
245
246 const fixSchema20to21 = `Character set in tables changed to binary, you can fix your tables with:
247 ALTER TABLE rows CONVERT TO CHARACTER SET binary;
248 ALTER TABLE meta CONVERT TO CHARACTER SET binary;
249 UPDATE meta SET value=21 WHERE metakey='version' AND value=20;
250 `
251
252
253 func serverVersion(db *sql.DB) ([]int, error) {
254 versionRx := regexp.MustCompile(`([0-9]+)\.([0-9]+)\.([0-9]+)-.*`)
255 var version string
256 if err := db.QueryRow("SELECT VERSION()").Scan(&version); err != nil {
257 return nil, fmt.Errorf("error getting MySQL server version: %v", err)
258 }
259 m := versionRx.FindStringSubmatch(version)
260 if len(m) < 4 {
261 return nil, fmt.Errorf("bogus MySQL server version: %v", version)
262 }
263 major, _ := strconv.Atoi(m[1])
264 minor, _ := strconv.Atoi(m[2])
265 rev, _ := strconv.Atoi(m[3])
266 return []int{major, minor, rev}, nil
267 }
268
269
270
271 func hasLargeVarchar(version []int) bool {
272 if len(version) < 3 {
273 panic(fmt.Sprintf("bogus mysql server version %v: ", version))
274 }
275 if version[0] < 5 {
276 return false
277 }
278 if version[1] > 0 {
279 return true
280 }
281 return version[0] == 5 && version[1] == 0 && version[2] >= 3
282 }