cache support all kinds of primary keys

This commit is contained in:
Lunny Xiao 2014-11-08 11:12:37 +08:00
parent a98e9405e1
commit 3a5d5aa786
6 changed files with 109 additions and 62 deletions

View File

@ -47,6 +47,11 @@ type Engine struct {
disableGlobalCache bool
}
func (engine *Engine) SetLogger(logger core.ILogger) {
engine.Logger = logger
engine.dialect.SetLogger(logger)
}
func (engine *Engine) SetDisableGlobalCache(disable bool) {
if engine.disableGlobalCache != disable {
engine.disableGlobalCache = disable
@ -200,7 +205,7 @@ func (engine *Engine) LogSQLQueryTime(sqlStr string, args interface{}, execution
b4ExecTime := time.Now()
stmt, res, err := executionBlock()
execDuration := time.Since(b4ExecTime)
engine.LogDebugf("sql [%s] - args [%v] - query took: %vns", sqlStr, args, execDuration.Nanoseconds())
engine.LogDebugf("[time] %s - args %v - query took: %vns", sqlStr, args, execDuration.Nanoseconds())
return stmt, res, err
} else {
return executionBlock()
@ -212,7 +217,7 @@ func (engine *Engine) LogSQLExecutionTime(sqlStr string, args interface{}, execu
b4ExecTime := time.Now()
res, err := executionBlock()
execDuration := time.Since(b4ExecTime)
engine.LogDebugf("sql [%s] - args [%v] - execution took: %vns", sqlStr, args, execDuration.Nanoseconds())
engine.LogDebugf("[time] %s - args %v - execution took: %vns", sqlStr, args, execDuration.Nanoseconds())
return res, err
} else {
return executionBlock()

View File

@ -296,6 +296,10 @@ func (db *mysql) GetColumns(tableName string) ([]string, map[string]*core.Column
" `COLUMN_KEY`, `EXTRA` FROM `INFORMATION_SCHEMA`.`COLUMNS` WHERE `TABLE_SCHEMA` = ? AND `TABLE_NAME` = ?"
rows, err := db.DB().Query(s, args...)
if db.Logger != nil {
db.Logger.Info("[sql]", s, args)
}
if err != nil {
return nil, nil, err
}
@ -404,6 +408,9 @@ func (db *mysql) GetTables() ([]*core.Table, error) {
s := "SELECT `TABLE_NAME`, `ENGINE`, `TABLE_ROWS`, `AUTO_INCREMENT` from `INFORMATION_SCHEMA`.`TABLES` WHERE `TABLE_SCHEMA`=?"
rows, err := db.DB().Query(s, args...)
if db.Logger != nil {
db.Logger.Info("[sql]", s, args)
}
if err != nil {
return nil, err
}
@ -431,6 +438,9 @@ func (db *mysql) GetIndexes(tableName string) (map[string]*core.Index, error) {
s := "SELECT `INDEX_NAME`, `NON_UNIQUE`, `COLUMN_NAME` FROM `INFORMATION_SCHEMA`.`STATISTICS` WHERE `TABLE_SCHEMA` = ? AND `TABLE_NAME` = ?"
rows, err := db.DB().Query(s, args...)
if db.Logger != nil {
db.Logger.Info("[sql]", s, args)
}
if err != nil {
return nil, err
}

View File

@ -591,6 +591,9 @@ func (db *oracle) IsColumnExist(tableName string, col *core.Column) (bool, error
query := "SELECT column_name FROM USER_TAB_COLUMNS WHERE table_name = ?" +
" AND column_name = ?"
rows, err := db.DB().Query(query, args...)
if db.Logger != nil {
db.Logger.Info("[sql]", query, args)
}
if err != nil {
return false, err
}
@ -608,6 +611,9 @@ func (db *oracle) GetColumns(tableName string) ([]string, map[string]*core.Colum
"nullable FROM USER_TAB_COLUMNS WHERE table_name = :1"
rows, err := db.DB().Query(s, args...)
if db.Logger != nil {
db.Logger.Info("[sql]", s, args)
}
if err != nil {
return nil, nil, err
}
@ -672,6 +678,9 @@ func (db *oracle) GetTables() ([]*core.Table, error) {
s := "SELECT table_name FROM user_tables"
rows, err := db.DB().Query(s, args...)
if db.Logger != nil {
db.Logger.Info("[sql]", s, args)
}
if err != nil {
return nil, err
}
@ -696,6 +705,9 @@ func (db *oracle) GetIndexes(tableName string) (map[string]*core.Index, error) {
"WHERE t.index_name = i.index_name and t.table_name = i.table_name and t.table_name =:1"
rows, err := db.DB().Query(s, args...)
if db.Logger != nil {
db.Logger.Info("[sql]", s, args)
}
if err != nil {
return nil, err
}

View File

@ -897,6 +897,9 @@ func (db *postgres) IsColumnExist(tableName string, col *core.Column) (bool, err
query := "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE table_name = $1" +
" AND column_name = $2"
rows, err := db.DB().Query(query, args...)
if db.Logger != nil {
db.Logger.Info("[sql]", query, args)
}
if err != nil {
return false, err
}
@ -920,6 +923,9 @@ FROM pg_attribute f
WHERE c.relkind = 'r'::char AND c.relname = $1 AND f.attnum > 0 ORDER BY f.attnum;`
rows, err := db.DB().Query(s, args...)
if db.Logger != nil {
db.Logger.Info("[sql]", s, args)
}
if err != nil {
return nil, nil, err
}
@ -1008,6 +1014,9 @@ func (db *postgres) GetTables() ([]*core.Table, error) {
s := "SELECT tablename FROM pg_tables where schemaname = 'public'"
rows, err := db.DB().Query(s, args...)
if db.Logger != nil {
db.Logger.Info("[sql]", s, args)
}
if err != nil {
return nil, err
}
@ -1032,6 +1041,9 @@ func (db *postgres) GetIndexes(tableName string) (map[string]*core.Index, error)
s := "SELECT indexname, indexdef FROM pg_indexes WHERE schemaname='public' AND tablename=$1"
rows, err := db.DB().Query(s, args...)
if db.Logger != nil {
db.Logger.Info("[sql]", s, args)
}
if err != nil {
return nil, err
}

View File

@ -618,13 +618,6 @@ func (statement *Statement) convertIdSql(sqlStr string) string {
return ""
}
// *******
// TODO: this codes should be removed after multi primary key cache be supported
if len(cols) > 1 {
return ""
}
// *******
colstrs := statement.JoinColumns(cols)
sqls := splitNNoCase(sqlStr, "from", 2)
if len(sqls) != 2 {
@ -637,14 +630,16 @@ func (statement *Statement) convertIdSql(sqlStr string) string {
func (session *Session) cacheGet(bean interface{}, sqlStr string, args ...interface{}) (has bool, err error) {
// if has no reftable, then don't use cache currently
if session.Statement.RefTable == nil {
if session.Statement.RefTable == nil ||
session.Statement.JoinStr != "" ||
session.Statement.RawSQL != "" {
return false, ErrCacheFailed
}
// TODO: remove this after support multi pk cache
if len(session.Statement.RefTable.PrimaryKeys) != 1 {
/*if len(session.Statement.RefTable.PrimaryKeys) != 1 {
return false, ErrCacheFailed
}
}*/
for _, filter := range session.Engine.dialect.Filters() {
sqlStr = filter.Do(sqlStr, session.Engine.dialect, session.Statement.RefTable)
@ -656,7 +651,7 @@ func (session *Session) cacheGet(bean interface{}, sqlStr string, args ...interf
cacher := session.Engine.getCacher2(session.Statement.RefTable)
tableName := session.Statement.TableName()
session.Engine.LogDebug("[xorm:cacheGet] find sql:", newsql, args)
session.Engine.LogDebug("[cacheGet] find sql:", newsql, args)
ids, err := core.GetCacheSql(cacher, tableName, newsql, args)
table := session.Statement.RefTable
if err != nil {
@ -692,19 +687,19 @@ func (session *Session) cacheGet(bean interface{}, sqlStr string, args ...interf
}
ids = []core.PK{pk}
session.Engine.LogDebug("[xorm:cacheGet] cache ids:", newsql, ids)
session.Engine.LogDebug("[cacheGet] cache ids:", newsql, ids)
err = core.PutCacheSql(cacher, ids, tableName, newsql, args)
if err != nil {
return false, err
}
} else {
session.Engine.LogDebug("[xorm:cacheGet] cached sql:", newsql)
session.Engine.LogDebug("[cacheGet] cache hit sql:", newsql)
}
if len(ids) > 0 {
structValue := reflect.Indirect(reflect.ValueOf(bean))
id := ids[0]
session.Engine.LogDebug("[xorm:cacheGet] get bean:", tableName, id)
session.Engine.LogDebug("[cacheGet] get bean:", tableName, id)
sid, err := id.ToString()
if err != nil {
return false, err
@ -726,10 +721,10 @@ func (session *Session) cacheGet(bean interface{}, sqlStr string, args ...interf
return has, err
}
session.Engine.LogDebug("[xorm:cacheGet] cache bean:", tableName, id, cacheBean)
session.Engine.LogDebug("[cacheGet] cache bean:", tableName, id, cacheBean)
cacher.PutBean(tableName, sid, cacheBean)
} else {
session.Engine.LogDebug("[xorm:cacheGet] cached bean:", tableName, id, cacheBean)
session.Engine.LogDebug("[cacheGet] cache hit bean:", tableName, id, cacheBean)
has = true
}
structValue.Set(reflect.Indirect(reflect.ValueOf(cacheBean)))
@ -741,12 +736,16 @@ func (session *Session) cacheGet(bean interface{}, sqlStr string, args ...interf
func (session *Session) cacheFind(t reflect.Type, sqlStr string, rowsSlicePtr interface{}, args ...interface{}) (err error) {
if session.Statement.RefTable == nil ||
len(session.Statement.RefTable.PrimaryKeys) != 1 ||
indexNoCase(sqlStr, "having") != -1 ||
indexNoCase(sqlStr, "group by") != -1 {
return ErrCacheFailed
}
// TODO: remove this after multi pk supported
/*if len(session.Statement.RefTable.PrimaryKeys) != 1 {
return ErrCacheFailed
}*/
for _, filter := range session.Engine.dialect.Filters() {
sqlStr = filter.Do(sqlStr, session.Engine.dialect, session.Statement.RefTable)
}
@ -771,7 +770,7 @@ func (session *Session) cacheFind(t reflect.Type, sqlStr string, rowsSlicePtr in
for rows.Next() {
i++
if i > 500 {
session.Engine.LogDebug("[xorm:cacheFind] ids length > 500, no cache")
session.Engine.LogDebug("[cacheFind] ids length > 500, no cache")
return ErrCacheFailed
}
var res = make([]string, len(table.PrimaryKeys))
@ -800,17 +799,16 @@ func (session *Session) cacheFind(t reflect.Type, sqlStr string, rowsSlicePtr in
tableName := session.Statement.TableName()
session.Engine.LogDebug("[xorm:cacheFind] cache ids:", ids, tableName, newsql, args)
session.Engine.LogDebug("[cacheFind] cache sql:", ids, tableName, newsql, args)
err = core.PutCacheSql(cacher, ids, tableName, newsql, args)
if err != nil {
return err
}
} else {
session.Engine.LogDebug("[xorm:cacheFind] cached sql:", newsql, args)
session.Engine.LogDebug("[cacheFind] cache hit sql:", newsql, args)
}
sliceValue := reflect.Indirect(reflect.ValueOf(rowsSlicePtr))
//pkFieldName := session.Statement.RefTable.PKColumns()[0].FieldName
ididxes := make(map[string]int)
var ides []core.PK = make([]core.PK, 0)
@ -826,7 +824,7 @@ func (session *Session) cacheFind(t reflect.Type, sqlStr string, rowsSlicePtr in
ides = append(ides, id)
ididxes[sid] = idx
} else {
session.Engine.LogDebug("[xorm:cacheFind] cached bean:", tableName, id, bean)
session.Engine.LogDebug("[cacheFind] cache hit bean:", tableName, id, bean)
pk := session.Engine.IdOf(bean)
xid, err := pk.ToString()
@ -835,7 +833,7 @@ func (session *Session) cacheFind(t reflect.Type, sqlStr string, rowsSlicePtr in
}
if sid != xid {
session.Engine.LogError("[xorm:cacheFind] error cache", xid, sid, bean)
session.Engine.LogError("[cacheFind] error cache", xid, sid, bean)
return ErrCacheFailed
}
temps[idx] = bean
@ -848,20 +846,25 @@ func (session *Session) cacheFind(t reflect.Type, sqlStr string, rowsSlicePtr in
slices := reflect.New(reflect.SliceOf(t))
beans := slices.Interface()
//beans := reflect.New(sliceValue.Type()).Interface()
//err = newSession.In("(id)", ides...).OrderBy(session.Statement.OrderStr).NoCache().Find(beans)
ff := make([][]interface{}, len(table.PrimaryKeys))
for i, _ := range table.PrimaryKeys {
ff[i] = make([]interface{}, 0)
}
if len(table.PrimaryKeys) == 1 {
ff := make([]interface{}, 0)
for _, ie := range ides {
for i, _ := range table.PrimaryKeys {
ff[i] = append(ff[i], ie[i])
ff = append(ff, ie[0])
}
newSession.In(table.PrimaryKeys[0], ff...)
} else {
var kn = make([]string, 0)
for _, name := range table.PrimaryKeys {
kn = append(kn, name+" = ?")
}
condi := "(" + strings.Join(kn, " AND ") + ")"
for _, ie := range ides {
newSession.Or(condi, ie...)
}
}
for i, name := range table.PrimaryKeys {
newSession.In(name, ff[i]...)
}
err = newSession.NoCache().Find(beans)
if err != nil {
return err
@ -879,10 +882,9 @@ func (session *Session) cacheFind(t reflect.Type, sqlStr string, rowsSlicePtr in
if err != nil {
return err
}
//bean := vs.Index(i).Addr().Interface()
temps[ididxes[sid]] = bean
//temps[idxes[i]] = bean
session.Engine.LogDebug("[xorm:cacheFind] cache bean:", tableName, id, bean)
session.Engine.LogDebug("[cacheFind] cache bean:", tableName, id, bean)
cacher.PutBean(tableName, sid, bean)
}
}
@ -890,7 +892,7 @@ func (session *Session) cacheFind(t reflect.Type, sqlStr string, rowsSlicePtr in
for j := 0; j < len(temps); j++ {
bean := temps[j]
if bean == nil {
session.Engine.LogWarn("[xorm:cacheFind] cache no hit:", tableName, ides[j])
session.Engine.LogWarn("[cacheFind] cache no hit:", tableName, ides[j])
// return errors.New("cache error") // !nashtsai! no need to return error, but continue instead
continue
}
@ -1028,9 +1030,6 @@ func (session *Session) Get(bean interface{}) (bool, error) {
}
// defer stmt.Close() // !nashtsai! don't close due to stmt is cached and bounded to this session
rawRows, err = stmt.Query(args...)
if err != nil {
return false, err
}
} else {
rawRows, err = session.Tx.Query(sqlStr, args...)
}
@ -2620,7 +2619,6 @@ func (session *Session) bytes2Value(col *core.Column, fieldValue *reflect.Value,
if fieldType.Elem().Kind() == reflect.Struct {
if session.Statement.UseCascade {
structInter := reflect.New(fieldType.Elem())
//fmt.Println(structInter, fieldType.Elem())
table := session.Engine.autoMapType(structInter.Elem())
if table != nil {
x, err := strconv.ParseInt(string(data), 10, 64)
@ -3010,7 +3008,7 @@ func (statement *Statement) convertUpdateSql(sqlStr string) (string, string) {
}
func (session *Session) cacheInsert(tables ...string) error {
if session.Statement.RefTable == nil || len(session.Statement.RefTable.PrimaryKeys) != 1 {
if session.Statement.RefTable == nil {
return ErrCacheFailed
}
@ -3018,7 +3016,7 @@ func (session *Session) cacheInsert(tables ...string) error {
cacher := session.Engine.getCacher2(table)
for _, t := range tables {
session.Engine.LogDebug("cache clear:", t)
session.Engine.LogDebug("[cache] clear sql:", t)
cacher.ClearIds(t)
}
@ -3037,7 +3035,7 @@ func (session *Session) cacheUpdate(sqlStr string, args ...interface{}) error {
for _, filter := range session.Engine.dialect.Filters() {
newsql = filter.Do(newsql, session.Engine.dialect, session.Statement.RefTable)
}
session.Engine.LogDebug("[xorm:cacheUpdate] new sql", oldhead, newsql)
session.Engine.LogDebug("[cacheUpdate] new sql", oldhead, newsql)
var nStart int
if len(args) > 0 {
@ -3051,7 +3049,7 @@ func (session *Session) cacheUpdate(sqlStr string, args ...interface{}) error {
table := session.Statement.RefTable
cacher := session.Engine.getCacher2(table)
tableName := session.Statement.TableName()
session.Engine.LogDebug("[xorm:cacheUpdate] get cache sql", newsql, args[nStart:])
session.Engine.LogDebug("[cacheUpdate] get cache sql", newsql, args[nStart:])
ids, err := core.GetCacheSql(cacher, tableName, newsql, args[nStart:])
if err != nil {
rows, err := session.Db.Query(newsql, args[nStart:]...)
@ -3084,7 +3082,7 @@ func (session *Session) cacheUpdate(sqlStr string, args ...interface{}) error {
ids = append(ids, pk)
}
session.Engine.LogDebug("[xorm:cacheUpdate] find updated id", ids)
session.Engine.LogDebug("[cacheUpdate] find updated id", ids)
} /*else {
session.Engine.LogDebug("[xorm:cacheUpdate] del cached sql:", tableName, newsql, args)
cacher.DelIds(tableName, genSqlKey(newsql, args))
@ -3115,7 +3113,7 @@ func (session *Session) cacheUpdate(sqlStr string, args ...interface{}) error {
} else if strings.Contains(colName, session.Engine.QuoteStr()) {
colName = strings.TrimSpace(strings.Replace(colName, session.Engine.QuoteStr(), "", -1))
} else {
session.Engine.LogDebug("[xorm:cacheUpdate] cannot find column", tableName, colName)
session.Engine.LogDebug("[cacheUpdate] cannot find column", tableName, colName)
return ErrCacheFailed
}
@ -3124,7 +3122,7 @@ func (session *Session) cacheUpdate(sqlStr string, args ...interface{}) error {
if err != nil {
session.Engine.LogError(err)
} else {
session.Engine.LogDebug("[xorm:cacheUpdate] set bean field", bean, colName, fieldValue.Interface())
session.Engine.LogDebug("[cacheUpdate] set bean field", bean, colName, fieldValue.Interface())
if col.IsVersion && session.Statement.checkVersion {
fieldValue.SetInt(fieldValue.Int() + 1)
} else {
@ -3132,16 +3130,16 @@ func (session *Session) cacheUpdate(sqlStr string, args ...interface{}) error {
}
}
} else {
session.Engine.LogError("[xorm:cacheUpdate] ERROR: column %v is not table %v's",
session.Engine.LogErrorf("[cacheUpdate] ERROR: column %v is not table %v's",
colName, table.Name)
}
}
session.Engine.LogDebug("[xorm:cacheUpdate] update cache", tableName, id, bean)
session.Engine.LogDebug("[cacheUpdate] update cache", tableName, id, bean)
cacher.PutBean(tableName, sid, bean)
}
}
session.Engine.LogDebug("[xorm:cacheUpdate] clear cached table sql:", tableName)
session.Engine.LogDebug("[cacheUpdate] clear cached table sql:", tableName)
cacher.ClearIds(tableName)
return nil
}
@ -3319,7 +3317,6 @@ func (session *Session) Update(bean interface{}, condiBean ...interface{}) (int6
}
if cacher := session.Engine.getCacher2(table); cacher != nil && session.Statement.UseCache {
//session.cacheUpdate(sqlStr, args...)
cacher.ClearIds(session.Statement.TableName())
cacher.ClearBeans(session.Statement.TableName())
}
@ -3330,7 +3327,7 @@ func (session *Session) Update(bean interface{}, condiBean ...interface{}) (int6
closure(bean)
}
if processor, ok := interface{}(bean).(AfterUpdateProcessor); ok {
session.Engine.LogDebug(session.Statement.TableName(), " has after update processor")
session.Engine.LogDebug("[event]", session.Statement.TableName(), " has after update processor")
processor.AfterUpdate()
}
} else {
@ -3399,14 +3396,14 @@ func (session *Session) cacheDelete(sqlStr string, args ...interface{}) error {
}*/
for _, id := range ids {
session.Engine.LogDebug("[xorm:cacheDelete] delete cache obj", tableName, id)
session.Engine.LogDebug("[cacheDelete] delete cache obj", tableName, id)
sid, err := id.ToString()
if err != nil {
return err
}
cacher.DelBean(tableName, sid)
}
session.Engine.LogDebug("[xorm:cacheDelete] clear cache table", tableName)
session.Engine.LogDebug("[cacheDelete] clear cache sql", tableName)
cacher.ClearIds(tableName)
return nil
}
@ -3583,12 +3580,11 @@ func (s *Session) Sync2(beans ...interface{}) error {
if oriCol != nil {
expectedType := engine.dialect.SqlType(col)
//curType := oriCol.SQLType.Name
curType := engine.dialect.SqlType(oriCol)
if expectedType != curType {
if expectedType == core.Text &&
strings.HasPrefix(curType, core.Varchar) {
// currently only support mysql
// currently only support mysql & postgres
if engine.dialect.DBType() == core.MYSQL ||
engine.dialect.DBType() == core.POSTGRES {
engine.LogInfof("Table %s column %s change type from %s to %s\n",
@ -3685,7 +3681,7 @@ func (s *Session) Sync2(beans ...interface{}) error {
}
if oriTable == nil {
//engine.LogWarnf("Table %s has no struct to mapping it", table.Name)
engine.LogWarnf("Table %s has no struct to mapping it", table.Name)
continue
}

View File

@ -247,6 +247,9 @@ func (db *sqlite3) IsColumnExist(tableName string, col *core.Column) (bool, erro
args := []interface{}{tableName}
query := "SELECT name FROM sqlite_master WHERE type='table' and name = ? and ((sql like '%`" + col.Name + "`%') or (sql like '%[" + col.Name + "]%'))"
rows, err := db.DB().Query(query, args...)
if db.Logger != nil {
db.Logger.Info("[sql]", query, args)
}
if err != nil {
return false, err
}
@ -263,6 +266,9 @@ func (db *sqlite3) GetColumns(tableName string) ([]string, map[string]*core.Colu
s := "SELECT sql FROM sqlite_master WHERE type='table' and name = ?"
rows, err := db.DB().Query(s, args...)
if db.Logger != nil {
db.Logger.Info("[sql]", s, args)
}
if err != nil {
return nil, nil, err
}
@ -322,6 +328,9 @@ func (db *sqlite3) GetTables() ([]*core.Table, error) {
s := "SELECT name FROM sqlite_master WHERE type='table'"
rows, err := db.DB().Query(s, args...)
if db.Logger != nil {
db.Logger.Info("[sql]", s, args)
}
if err != nil {
return nil, err
}
@ -347,6 +356,9 @@ func (db *sqlite3) GetIndexes(tableName string) (map[string]*core.Index, error)
s := "SELECT sql FROM sqlite_master WHERE type='index' and tbl_name = ?"
rows, err := db.DB().Query(s, args...)
if db.Logger != nil {
db.Logger.Info("[sql]", s, args)
}
if err != nil {
return nil, err
}