improve sync2 (#1443)

This commit is contained in:
Lunny Xiao 2019-09-30 16:32:57 +08:00 committed by GitHub
parent c5ee68faa1
commit 98db3ef013
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 139 additions and 134 deletions

View File

@ -377,6 +377,32 @@ func (engine *Engine) NoAutoCondition(no ...bool) *Session {
return session.NoAutoCondition(no...) return session.NoAutoCondition(no...)
} }
func (engine *Engine) loadTableInfo(table *core.Table) error {
colSeq, cols, err := engine.dialect.GetColumns(table.Name)
if err != nil {
return err
}
for _, name := range colSeq {
table.AddColumn(cols[name])
}
indexes, err := engine.dialect.GetIndexes(table.Name)
if err != nil {
return err
}
table.Indexes = indexes
for _, index := range indexes {
for _, name := range index.Cols {
if col := table.GetColumn(name); col != nil {
col.Indexes[index.Name] = index.Type
} else {
return fmt.Errorf("Unknown col %s in index %v of table %v, columns %v", name, index.Name, table.Name, table.ColumnsSeq())
}
}
}
return nil
}
// DBMetas Retrieve all tables, columns, indexes' informations from database. // DBMetas Retrieve all tables, columns, indexes' informations from database.
func (engine *Engine) DBMetas() ([]*core.Table, error) { func (engine *Engine) DBMetas() ([]*core.Table, error) {
tables, err := engine.dialect.GetTables() tables, err := engine.dialect.GetTables()
@ -385,28 +411,9 @@ func (engine *Engine) DBMetas() ([]*core.Table, error) {
} }
for _, table := range tables { for _, table := range tables {
colSeq, cols, err := engine.dialect.GetColumns(table.Name) if err = engine.loadTableInfo(table); err != nil {
if err != nil {
return nil, err return nil, err
} }
for _, name := range colSeq {
table.AddColumn(cols[name])
}
indexes, err := engine.dialect.GetIndexes(table.Name)
if err != nil {
return nil, err
}
table.Indexes = indexes
for _, index := range indexes {
for _, name := range index.Cols {
if col := table.GetColumn(name); col != nil {
col.Indexes[index.Name] = index.Type
} else {
return nil, fmt.Errorf("Unknown col %s in index %v of table %v, columns %v", name, index.Name, table.Name, table.ColumnsSeq())
}
}
}
} }
return tables, nil return tables, nil
} }

View File

@ -228,7 +228,7 @@ func (session *Session) Sync2(beans ...interface{}) error {
defer session.Close() defer session.Close()
} }
tables, err := engine.DBMetas() tables, err := engine.dialect.GetTables()
if err != nil { if err != nil {
return err return err
} }
@ -239,15 +239,12 @@ func (session *Session) Sync2(beans ...interface{}) error {
session.resetStatement() session.resetStatement()
}() }()
var structTables []*core.Table
for _, bean := range beans { for _, bean := range beans {
v := rValue(bean) v := rValue(bean)
table, err := engine.mapType(v) table, err := engine.mapType(v)
if err != nil { if err != nil {
return err return err
} }
structTables = append(structTables, table)
tbName := engine.TableName(bean) tbName := engine.TableName(bean)
tbNameWithSchema := engine.TableName(tbName, true) tbNameWithSchema := engine.TableName(tbName, true)
@ -259,6 +256,7 @@ func (session *Session) Sync2(beans ...interface{}) error {
} }
} }
// this is a new table
if oriTable == nil { if oriTable == nil {
err = session.StoreEngine(session.statement.StoreEngine).createTable(bean) err = session.StoreEngine(session.statement.StoreEngine).createTable(bean)
if err != nil { if err != nil {
@ -274,148 +272,148 @@ func (session *Session) Sync2(beans ...interface{}) error {
if err != nil { if err != nil {
return err return err
} }
} else { continue
for _, col := range table.Columns() { }
var oriCol *core.Column
for _, col2 := range oriTable.Columns() {
if strings.EqualFold(col.Name, col2.Name) {
oriCol = col2
break
}
}
if oriCol != nil { // this will modify an old table
expectedType := engine.dialect.SqlType(col) if err = engine.loadTableInfo(oriTable); err != nil {
curType := engine.dialect.SqlType(oriCol) return err
if expectedType != curType { }
if expectedType == core.Text &&
strings.HasPrefix(curType, core.Varchar) { // check columns
// currently only support mysql & postgres for _, col := range table.Columns() {
if engine.dialect.DBType() == core.MYSQL || var oriCol *core.Column
engine.dialect.DBType() == core.POSTGRES { for _, col2 := range oriTable.Columns() {
engine.logger.Infof("Table %s column %s change type from %s to %s\n", if strings.EqualFold(col.Name, col2.Name) {
tbNameWithSchema, col.Name, curType, expectedType) oriCol = col2
_, err = session.exec(engine.dialect.ModifyColumnSql(tbNameWithSchema, col)) break
} else {
engine.logger.Warnf("Table %s column %s db type is %s, struct type is %s\n",
tbNameWithSchema, col.Name, curType, expectedType)
}
} else if strings.HasPrefix(curType, core.Varchar) && strings.HasPrefix(expectedType, core.Varchar) {
if engine.dialect.DBType() == core.MYSQL {
if oriCol.Length < col.Length {
engine.logger.Infof("Table %s column %s change type from varchar(%d) to varchar(%d)\n",
tbNameWithSchema, col.Name, oriCol.Length, col.Length)
_, err = session.exec(engine.dialect.ModifyColumnSql(tbNameWithSchema, col))
}
}
} else {
if !(strings.HasPrefix(curType, expectedType) && curType[len(expectedType)] == '(') {
engine.logger.Warnf("Table %s column %s db type is %s, struct type is %s",
tbNameWithSchema, col.Name, curType, expectedType)
}
}
} else if expectedType == core.Varchar {
if engine.dialect.DBType() == core.MYSQL {
if oriCol.Length < col.Length {
engine.logger.Infof("Table %s column %s change type from varchar(%d) to varchar(%d)\n",
tbNameWithSchema, col.Name, oriCol.Length, col.Length)
_, err = session.exec(engine.dialect.ModifyColumnSql(tbNameWithSchema, col))
}
}
}
if col.Default != oriCol.Default {
engine.logger.Warnf("Table %s Column %s db default is %s, struct default is %s",
tbName, col.Name, oriCol.Default, col.Default)
}
if col.Nullable != oriCol.Nullable {
engine.logger.Warnf("Table %s Column %s db nullable is %v, struct nullable is %v",
tbName, col.Name, oriCol.Nullable, col.Nullable)
}
} else {
session.statement.RefTable = table
session.statement.tableName = tbNameWithSchema
err = session.addColumn(col.Name)
} }
if err != nil { }
// column is not exist on table
if oriCol == nil {
session.statement.RefTable = table
session.statement.tableName = tbNameWithSchema
if err = session.addColumn(col.Name); err != nil {
return err return err
} }
continue
} }
var foundIndexNames = make(map[string]bool) err = nil
var addedNames = make(map[string]*core.Index) expectedType := engine.dialect.SqlType(col)
curType := engine.dialect.SqlType(oriCol)
for name, index := range table.Indexes { if expectedType != curType {
var oriIndex *core.Index if expectedType == core.Text &&
for name2, index2 := range oriTable.Indexes { strings.HasPrefix(curType, core.Varchar) {
if index.Equal(index2) { // currently only support mysql & postgres
oriIndex = index2 if engine.dialect.DBType() == core.MYSQL ||
foundIndexNames[name2] = true engine.dialect.DBType() == core.POSTGRES {
break engine.logger.Infof("Table %s column %s change type from %s to %s\n",
tbNameWithSchema, col.Name, curType, expectedType)
_, err = session.exec(engine.dialect.ModifyColumnSql(tbNameWithSchema, col))
} else {
engine.logger.Warnf("Table %s column %s db type is %s, struct type is %s\n",
tbNameWithSchema, col.Name, curType, expectedType)
} }
} } else if strings.HasPrefix(curType, core.Varchar) && strings.HasPrefix(expectedType, core.Varchar) {
if engine.dialect.DBType() == core.MYSQL {
if oriIndex != nil { if oriCol.Length < col.Length {
if oriIndex.Type != index.Type { engine.logger.Infof("Table %s column %s change type from varchar(%d) to varchar(%d)\n",
sql := engine.dialect.DropIndexSql(tbNameWithSchema, oriIndex) tbNameWithSchema, col.Name, oriCol.Length, col.Length)
_, err = session.exec(sql) _, err = session.exec(engine.dialect.ModifyColumnSql(tbNameWithSchema, col))
if err != nil {
return err
} }
oriIndex = nil }
} else {
if !(strings.HasPrefix(curType, expectedType) && curType[len(expectedType)] == '(') {
engine.logger.Warnf("Table %s column %s db type is %s, struct type is %s",
tbNameWithSchema, col.Name, curType, expectedType)
} }
} }
} else if expectedType == core.Varchar {
if engine.dialect.DBType() == core.MYSQL {
if oriCol.Length < col.Length {
engine.logger.Infof("Table %s column %s change type from varchar(%d) to varchar(%d)\n",
tbNameWithSchema, col.Name, oriCol.Length, col.Length)
_, err = session.exec(engine.dialect.ModifyColumnSql(tbNameWithSchema, col))
}
}
}
if col.Default != oriCol.Default {
engine.logger.Warnf("Table %s Column %s db default is %s, struct default is %s",
tbName, col.Name, oriCol.Default, col.Default)
}
if col.Nullable != oriCol.Nullable {
engine.logger.Warnf("Table %s Column %s db nullable is %v, struct nullable is %v",
tbName, col.Name, oriCol.Nullable, col.Nullable)
}
if oriIndex == nil { if err != nil {
addedNames[name] = index return err
}
}
var foundIndexNames = make(map[string]bool)
var addedNames = make(map[string]*core.Index)
for name, index := range table.Indexes {
var oriIndex *core.Index
for name2, index2 := range oriTable.Indexes {
if index.Equal(index2) {
oriIndex = index2
foundIndexNames[name2] = true
break
} }
} }
for name2, index2 := range oriTable.Indexes { if oriIndex != nil {
if _, ok := foundIndexNames[name2]; !ok { if oriIndex.Type != index.Type {
sql := engine.dialect.DropIndexSql(tbNameWithSchema, index2) sql := engine.dialect.DropIndexSql(tbNameWithSchema, oriIndex)
_, err = session.exec(sql) _, err = session.exec(sql)
if err != nil { if err != nil {
return err return err
} }
oriIndex = nil
} }
} }
for name, index := range addedNames { if oriIndex == nil {
if index.Type == core.UniqueType { addedNames[name] = index
session.statement.RefTable = table }
session.statement.tableName = tbNameWithSchema }
err = session.addUnique(tbNameWithSchema, name)
} else if index.Type == core.IndexType { for name2, index2 := range oriTable.Indexes {
session.statement.RefTable = table if _, ok := foundIndexNames[name2]; !ok {
session.statement.tableName = tbNameWithSchema sql := engine.dialect.DropIndexSql(tbNameWithSchema, index2)
err = session.addIndex(tbNameWithSchema, name) _, err = session.exec(sql)
}
if err != nil { if err != nil {
return err return err
} }
} }
} }
}
for _, table := range tables { for name, index := range addedNames {
var oriTable *core.Table if index.Type == core.UniqueType {
for _, structTable := range structTables { session.statement.RefTable = table
if strings.EqualFold(table.Name, session.tbNameNoSchema(structTable)) { session.statement.tableName = tbNameWithSchema
oriTable = structTable err = session.addUnique(tbNameWithSchema, name)
break } else if index.Type == core.IndexType {
session.statement.RefTable = table
session.statement.tableName = tbNameWithSchema
err = session.addIndex(tbNameWithSchema, name)
}
if err != nil {
return err
} }
} }
if oriTable == nil { // check all the columns which removed from struct fields but left on database tables.
//engine.LogWarnf("Table %s has no struct to mapping it", table.Name) for _, colName := range oriTable.ColumnsSeq() {
continue if table.GetColumn(colName) == nil {
} engine.logger.Warnf("Table %s has column %s but struct has not related field", engine.TableName(oriTable.Name, true), colName)
for _, colName := range table.ColumnsSeq() {
if oriTable.GetColumn(colName) == nil {
engine.logger.Warnf("Table %s has column %s but struct has not related field", engine.TableName(table.Name, true), colName)
} }
} }
} }
return nil return nil
} }