Home Download Docs Code Community
     1	/*
     2	Copyright 2011 The Perkeep Authors.
     3	
     4	Licensed under the Apache License, Version 2.0 (the "License");
     5	you may not use this file except in compliance with the License.
     6	You may obtain a copy of the License at
     7	
     8	     http://www.apache.org/licenses/LICENSE-2.0
     9	
    10	Unless required by applicable law or agreed to in writing, software
    11	distributed under the License is distributed on an "AS IS" BASIS,
    12	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13	See the License for the specific language governing permissions and
    14	limitations under the License.
    15	*/
    16	
    17	// Package mysql provides an implementation of sorted.KeyValue
    18	// on top of MySQL.
    19	package mysql // import "perkeep.org/pkg/sorted/mysql"
    20	
    21	import (
    22		"database/sql"
    23		"errors"
    24		"fmt"
    25		"os"
    26		"regexp"
    27		"strconv"
    28		"strings"
    29		"sync"
    30	
    31		_ "github.com/go-sql-driver/mysql"
    32		"go4.org/jsonconfig"
    33		"go4.org/syncutil"
    34		"perkeep.org/pkg/env"
    35		"perkeep.org/pkg/sorted"
    36		"perkeep.org/pkg/sorted/sqlkv"
    37	)
    38	
    39	func init() {
    40		sorted.RegisterKeyValue("mysql", newKeyValueFromJSONConfig)
    41	}
    42	
    43	// newKVDB returns an unusable KeyValue, with a database, but no tables yet. It
    44	// should be followed by Wipe or finalize.
    45	func newKVDB(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
    46		var (
    47			user     = cfg.RequiredString("user")
    48			database = cfg.RequiredString("database")
    49			host     = cfg.OptionalString("host", "")
    50			password = cfg.OptionalString("password", "")
    51		)
    52		if err := cfg.Validate(); err != nil {
    53			return nil, err
    54		}
    55		if !validDatabaseName(database) {
    56			return nil, fmt.Errorf("%q looks like an invalid database name", database)
    57		}
    58		var err error
    59		if host != "" {
    60			host, err = maybeRemapCloudSQL(host)
    61			if err != nil {
    62				return nil, err
    63			}
    64			if !strings.Contains(host, ":") {
    65				host += ":3306"
    66			}
    67			host = "tcp(" + host + ")"
    68		}
    69		// The DSN does NOT have a database name in it so it's
    70		// cacheable and can be shared between different queues & the
    71		// index, all sharing the same database server, cutting down
    72		// number of TCP connections required. We add the database
    73		// name in queries instead.
    74		dsn := fmt.Sprintf("%s:%s@%s/", user, password, host)
    75	
    76		db, err := openOrCachedDB(dsn)
    77		if err != nil {
    78			return nil, err
    79		}
    80	
    81		if err := CreateDB(db, database); err != nil {
    82			return nil, err
    83		}
    84		return &keyValue{
    85			database: database,
    86			dsn:      dsn,
    87			db:       db,
    88			KeyValue: &sqlkv.KeyValue{
    89				DB:          db,
    90				TablePrefix: database + ".",
    91				Gate:        syncutil.NewGate(20), // arbitrary limit. TODO: configurable, automatically-learned?
    92			},
    93		}, nil
    94	}
    95	
    96	// Wipe resets the KeyValue by dropping and recreating the database tables.
    97	func (kv *keyValue) Wipe() error {
    98		if _, err := kv.db.Exec("DROP TABLE IF EXISTS " + kv.database + ".rows"); err != nil {
    99			return err
   100		}
   101		if _, err := kv.db.Exec("DROP TABLE IF EXISTS " + kv.database + ".meta"); err != nil {
   102			return err
   103		}
   104		return kv.finalize()
   105	}
   106	
   107	// finalize should be called on a keyValue initialized with newKVDB.
   108	func (kv *keyValue) finalize() error {
   109		if err := createTables(kv.db, kv.database); err != nil {
   110			return err
   111		}
   112	
   113		if err := kv.ping(); err != nil {
   114			return fmt.Errorf("MySQL db unreachable: %v", err)
   115		}
   116	
   117		version, err := kv.SchemaVersion()
   118		if err != nil {
   119			return fmt.Errorf("error getting current database schema version: %v", err)
   120		}
   121		if version == 0 {
   122			// Newly created table case
   123			if _, err := kv.db.Exec(fmt.Sprintf(`REPLACE INTO %s.meta VALUES ('version', ?)`, kv.database), requiredSchemaVersion); err != nil {
   124				return fmt.Errorf("error setting schema version: %v", err)
   125			}
   126			return nil
   127		}
   128		if version != requiredSchemaVersion {
   129			if version == 20 && requiredSchemaVersion == 21 {
   130				fmt.Fprintf(os.Stderr, fixSchema20to21)
   131			}
   132			if env.IsDev() {
   133				// Good signal that we're using the devcam server, so help out
   134				// the user with a more useful tip:
   135				return sorted.NeedWipeError{
   136					Msg: fmt.Sprintf("database schema version is %d; expect %d (run \"devcam server --wipe\" to wipe both your blobs and re-populate the database schema)", version, requiredSchemaVersion),
   137				}
   138			}
   139			return sorted.NeedWipeError{
   140				Msg: fmt.Sprintf("database schema version is %d; expect %d (need to re-init/upgrade database?)",
   141					version, requiredSchemaVersion),
   142			}
   143		}
   144	
   145		return nil
   146	}
   147	
   148	func newKeyValueFromJSONConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) {
   149		kv, err := newKVDB(cfg)
   150		if err != nil {
   151			return nil, err
   152		}
   153		return kv, kv.(*keyValue).finalize()
   154	}
   155	
   156	var dbnameRx = regexp.MustCompile(`^[a-zA-Z0-9_]+$`)
   157	
   158	// validDatabaseName reports whether dbname is a valid-looking database name.
   159	func validDatabaseName(dbname string) bool {
   160		return dbnameRx.MatchString(dbname)
   161	}
   162	
   163	// CreateDB creates the named database if it does not already exist.
   164	func CreateDB(db *sql.DB, dbname string) error {
   165		if dbname == "" {
   166			return errors.New("can not create database: database name is missing")
   167		}
   168		if _, err := db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", dbname)); err != nil {
   169			return fmt.Errorf("error creating database %v: %v", dbname, err)
   170		}
   171		return nil
   172	}
   173	
   174	func createTables(db *sql.DB, database string) error {
   175		for _, tableSQL := range SQLCreateTables() {
   176			tableSQL = strings.Replace(tableSQL, "/*DB*/", database, -1)
   177			if _, err := db.Exec(tableSQL); err != nil {
   178				errMsg := "error creating table with %q: %v."
   179				createError := err
   180				sv, err := serverVersion(db)
   181				if err != nil {
   182					return err
   183				}
   184				if !hasLargeVarchar(sv) {
   185					errMsg += "\nYour MySQL server is too old (< 5.0.3) to support VARCHAR larger than 255."
   186				}
   187				return fmt.Errorf(errMsg, tableSQL, createError)
   188			}
   189		}
   190		return nil
   191	}
   192	
   193	// We keep a cache of open database handles.
   194	var (
   195		dbsmu sync.Mutex
   196		dbs   = map[string]*sql.DB{} // DSN -> db
   197	)
   198	
   199	func openOrCachedDB(dsn string) (*sql.DB, error) {
   200		dbsmu.Lock()
   201		defer dbsmu.Unlock()
   202		if db, ok := dbs[dsn]; ok {
   203			return db, nil
   204		}
   205		db, err := sql.Open("mysql", dsn)
   206		if err != nil {
   207			return nil, err
   208		}
   209		dbs[dsn] = db
   210		return db, nil
   211	}
   212	
   213	type keyValue struct {
   214		*sqlkv.KeyValue
   215	
   216		database string
   217		dsn      string
   218		db       *sql.DB
   219	}
   220	
   221	// Close overrides KeyValue.Close because we need to remove the DB from the pool
   222	// when closing.
   223	func (kv *keyValue) Close() error {
   224		dbsmu.Lock()
   225		defer dbsmu.Unlock()
   226		delete(dbs, kv.dsn)
   227		return kv.DB.Close()
   228	}
   229	
   230	func (kv *keyValue) ping() error {
   231		// TODO(bradfitz): something more efficient here?
   232		_, err := kv.SchemaVersion()
   233		return err
   234	}
   235	
   236	// SchemaVersion returns the schema version found in the meta table. If no
   237	// version is found it returns (0, nil), as the table should be considered empty.
   238	func (kv *keyValue) SchemaVersion() (version int, err error) {
   239		err = kv.db.QueryRow("SELECT value FROM " + kv.KeyValue.TablePrefix + "meta WHERE metakey='version'").Scan(&version)
   240		if err == sql.ErrNoRows {
   241			return 0, nil
   242		}
   243		return
   244	}
   245	
   246	const fixSchema20to21 = `Character set in tables changed to binary, you can fix your tables with:
   247	ALTER TABLE rows CONVERT TO CHARACTER SET binary;
   248	ALTER TABLE meta CONVERT TO CHARACTER SET binary;
   249	UPDATE meta SET value=21 WHERE metakey='version' AND value=20;
   250	`
   251	
   252	// serverVersion returns the MySQL server version as []int{major, minor, revision}.
   253	func serverVersion(db *sql.DB) ([]int, error) {
   254		versionRx := regexp.MustCompile(`([0-9]+)\.([0-9]+)\.([0-9]+)-.*`)
   255		var version string
   256		if err := db.QueryRow("SELECT VERSION()").Scan(&version); err != nil {
   257			return nil, fmt.Errorf("error getting MySQL server version: %v", err)
   258		}
   259		m := versionRx.FindStringSubmatch(version)
   260		if len(m) < 4 {
   261			return nil, fmt.Errorf("bogus MySQL server version: %v", version)
   262		}
   263		major, _ := strconv.Atoi(m[1])
   264		minor, _ := strconv.Atoi(m[2])
   265		rev, _ := strconv.Atoi(m[3])
   266		return []int{major, minor, rev}, nil
   267	}
   268	
   269	// hasLargeVarchar returns whether the given version (as []int{major, minor, revision})
   270	// supports VARCHAR larger than 255.
   271	func hasLargeVarchar(version []int) bool {
   272		if len(version) < 3 {
   273			panic(fmt.Sprintf("bogus mysql server version %v: ", version))
   274		}
   275		if version[0] < 5 {
   276			return false
   277		}
   278		if version[1] > 0 {
   279			return true
   280		}
   281		return version[0] == 5 && version[1] == 0 && version[2] >= 3
   282	}
Website layout inspired by memcached.
Content by the authors.