fixed transactional based after processors handling bug

This commit is contained in:
Nash Tsai 2013-12-03 09:09:21 +08:00
parent 40dc45f8e4
commit b107101bd5
1 changed files with 67 additions and 66 deletions

View File

@ -22,15 +22,18 @@ type Session struct {
IsCommitedOrRollbacked bool IsCommitedOrRollbacked bool
TransType string TransType string
IsAutoClose bool IsAutoClose bool
// !nashtsai! storing these beans due to yet committed tx // !nashtsai! storing these beans due to yet committed tx
afterInsertBeans []interface{} // afterInsertBeans []interface{}
afterUpdateBeans []interface{} // afterUpdateBeans []interface{}
afterDeleteBeans []interface{} // afterDeleteBeans []interface{}
afterInsertBeans map[interface{}][]func(interface{})
afterUpdateBeans map[interface{}][]func(interface{})
afterDeleteBeans map[interface{}][]func(interface{})
// -- // --
beforeClosures []func(interface{}) beforeClosures []func(interface{})
afterClosures []func(interface{}) afterClosures []func(interface{})
} }
// Method Init reset the session as the init status. // Method Init reset the session as the init status.
@ -40,11 +43,11 @@ func (session *Session) Init() {
session.IsAutoCommit = true session.IsAutoCommit = true
session.IsCommitedOrRollbacked = false session.IsCommitedOrRollbacked = false
session.IsAutoClose = false session.IsAutoClose = false
// !nashtsai! is lazy init better? // !nashtsai! is lazy init better?
session.afterInsertBeans = make([]interface{}, 0) session.afterInsertBeans = make(map[interface{}][]func(interface{}), 0)
session.afterUpdateBeans = make([]interface{}, 0) session.afterUpdateBeans = make(map[interface{}][]func(interface{}), 0)
session.afterDeleteBeans = make([]interface{}, 0) session.afterDeleteBeans = make(map[interface{}][]func(interface{}), 0)
session.beforeClosures = make([]func(interface{}), 0) session.beforeClosures = make([]func(interface{}), 0)
session.afterClosures = make([]func(interface{}), 0) session.afterClosures = make([]func(interface{}), 0)
} }
@ -98,7 +101,7 @@ func (session *Session) Before(closures func(interface{})) *Session {
session.beforeClosures = append(session.beforeClosures, closures) session.beforeClosures = append(session.beforeClosures, closures)
} }
return session return session
} }
// Apply after Processor, affected bean is passed to closure arg // Apply after Processor, affected bean is passed to closure arg
func (session *Session) After(closures func(interface{})) *Session { func (session *Session) After(closures func(interface{})) *Session {
@ -284,55 +287,55 @@ func (session *Session) Commit() error {
session.Engine.LogSQL("COMMIT") session.Engine.LogSQL("COMMIT")
session.IsCommitedOrRollbacked = true session.IsCommitedOrRollbacked = true
var err error var err error
if err = session.Tx.Commit(); err == nil { if err = session.Tx.Commit(); err == nil {
// handle processors after tx committed // handle processors after tx committed
for _, elem := range session.afterInsertBeans { for bean, closures := range session.afterInsertBeans {
for _, closure := range session.afterClosures { for _, closure := range closures {
closure(elem) closure(bean)
} }
if processor, ok := interface{}(elem).(AfterInsertProcessor); ok { if processor, ok := interface{}(bean).(AfterInsertProcessor); ok {
processor.AfterInsert() processor.AfterInsert()
} }
} }
for _, elem := range session.afterUpdateBeans { for bean, closures := range session.afterUpdateBeans {
for _, closure := range session.afterClosures { for _, closure := range closures {
closure(elem) closure(bean)
} }
if processor, ok := interface{}(elem).(AfterUpdateProcessor); ok {
if processor, ok := interface{}(bean).(AfterUpdateProcessor); ok {
processor.AfterUpdate() processor.AfterUpdate()
} }
} }
for _, elem := range session.afterDeleteBeans { for bean, closures := range session.afterDeleteBeans {
for _, closure := range session.afterClosures { for _, closure := range closures {
closure(elem) closure(bean)
} }
if processor, ok := interface{}(elem).(AfterDeleteProcessor); ok {
if processor, ok := interface{}(bean).(AfterDeleteProcessor); ok {
processor.AfterDelete() processor.AfterDelete()
} }
} }
cleanUpFunc := func(slices *[]interface{}) { cleanUpFunc := func(slices *map[interface{}][]func(interface{})) {
if len(*slices) > 0 { if len(*slices) > 0 {
*slices = make([]interface{}, 0) *slices = make(map[interface{}][]func(interface{}), 0)
}
}
cleanUpProcessorsFunc := func(slices *[]func(interface{})) {
if len(*slices) > 0 {
*slices = make([]func(interface{}), 0)
} }
} }
cleanUpFunc(&session.afterInsertBeans) cleanUpFunc(&session.afterInsertBeans)
cleanUpFunc(&session.afterUpdateBeans) cleanUpFunc(&session.afterUpdateBeans)
cleanUpFunc(&session.afterDeleteBeans) cleanUpFunc(&session.afterDeleteBeans)
}
// !nash! should session based processors get cleanup?
cleanUpProcessorsFunc(&session.afterClosures)
}
return err return err
} }
return nil return nil
} }
func cleanupProcessorsClosures(slices *[]func(interface{})) {
if len(*slices) > 0 {
*slices = make([]func(interface{}), 0)
}
}
func (session *Session) scanMapIntoStruct(obj interface{}, objMap map[string][]byte) error { func (session *Session) scanMapIntoStruct(obj interface{}, objMap map[string][]byte) error {
dataStruct := reflect.Indirect(reflect.ValueOf(obj)) dataStruct := reflect.Indirect(reflect.ValueOf(obj))
if dataStruct.Kind() != reflect.Struct { if dataStruct.Kind() != reflect.Struct {
@ -1410,7 +1413,7 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error
for i := 0; i < size; i++ { for i := 0; i < size; i++ {
elemValue := sliceValue.Index(i).Interface() elemValue := sliceValue.Index(i).Interface()
colPlaces := make([]string, 0) colPlaces := make([]string, 0)
// handle BeforeInsertProcessor // handle BeforeInsertProcessor
for _, closure := range session.beforeClosures { for _, closure := range session.beforeClosures {
closure(elemValue) closure(elemValue)
@ -1420,7 +1423,6 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error
processor.BeforeInsert() processor.BeforeInsert()
} }
// -- // --
if i == 0 { if i == 0 {
for _, col := range table.Columns { for _, col := range table.Columns {
@ -1479,6 +1481,7 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error
} }
colMultiPlaces = append(colMultiPlaces, strings.Join(colPlaces, ", ")) colMultiPlaces = append(colMultiPlaces, strings.Join(colPlaces, ", "))
} }
cleanupProcessorsClosures(&session.beforeClosures)
statement := fmt.Sprintf("INSERT INTO %v%v%v (%v%v%v) VALUES (%v)", statement := fmt.Sprintf("INSERT INTO %v%v%v (%v%v%v) VALUES (%v)",
session.Engine.QuoteStr(), session.Engine.QuoteStr(),
@ -1497,8 +1500,8 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error
if table.Cacher != nil && session.Statement.UseCache { if table.Cacher != nil && session.Statement.UseCache {
session.cacheInsert(session.Statement.TableName()) session.cacheInsert(session.Statement.TableName())
} }
hasAfterClosures := len(session.afterClosures) > 0 hasAfterClosures := len(session.afterClosures) > 0
for i := 0; i < size; i++ { for i := 0; i < size; i++ {
elemValue := sliceValue.Index(i).Interface() elemValue := sliceValue.Index(i).Interface()
// handle AfterInsertProcessor // handle AfterInsertProcessor
@ -1511,15 +1514,15 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error
} }
} else { } else {
if hasAfterClosures { if hasAfterClosures {
session.afterInsertBeans = append(session.afterInsertBeans, elemValue) session.afterInsertBeans[bean] = session.afterClosures
} else { } else {
if _, ok := interface{}(elemValue).(AfterInsertProcessor); ok { if _, ok := interface{}(elemValue).(AfterInsertProcessor); ok {
session.afterInsertBeans = append(session.afterInsertBeans, elemValue) session.afterInsertBeans[bean] = nil
} }
} }
} }
} }
cleanupProcessorsClosures(&session.afterClosures)
return res.RowsAffected() return res.RowsAffected()
} }
@ -1796,13 +1799,12 @@ func (session *Session) innerInsert(bean interface{}) (int64, error) {
for _, closure := range session.beforeClosures { for _, closure := range session.beforeClosures {
closure(bean) closure(bean)
} }
cleanupProcessorsClosures(&session.beforeClosures) // cleanup after used
if processor, ok := interface{}(bean).(BeforeInsertProcessor); ok { if processor, ok := interface{}(bean).(BeforeInsertProcessor); ok {
session.Engine.LogDebug(session.Statement.TableName(), " has before insert processor")
processor.BeforeInsert() processor.BeforeInsert()
} else {
session.Engine.LogDebug(session.Statement.TableName(), " has no before insert processor")
} }
// -- // --
colNames, args, err := table.genCols(session, bean, false, false) colNames, args, err := table.genCols(session, bean, false, false)
if err != nil { if err != nil {
@ -1821,27 +1823,26 @@ func (session *Session) innerInsert(bean interface{}) (int64, error) {
session.Engine.QuoteStr(), session.Engine.QuoteStr(),
colPlaces) colPlaces)
handleAfterInsertProcessorFunc := func(bean interface{}) { handleAfterInsertProcessorFunc := func(bean interface{}) {
if session.IsAutoCommit { if session.IsAutoCommit {
for _, closure := range session.afterClosures { for _, closure := range session.afterClosures {
closure(bean) closure(bean)
} }
if processor, ok := interface{}(bean).(AfterInsertProcessor); ok { if processor, ok := interface{}(bean).(AfterInsertProcessor); ok {
session.Engine.LogDebug(session.Statement.TableName(), " has after insert processor")
processor.AfterInsert() processor.AfterInsert()
} }
} else { } else {
if len(session.afterClosures) > 0 { if len(session.afterClosures) > 0 {
session.afterInsertBeans = append(session.afterInsertBeans, bean) session.afterInsertBeans[bean] = session.afterClosures
} else { } else {
if _, ok := interface{}(bean).(AfterInsertProcessor); ok { if _, ok := interface{}(bean).(AfterInsertProcessor); ok {
session.afterInsertBeans = append(session.afterInsertBeans, bean) session.afterInsertBeans = nil
} }
} }
} }
} cleanupProcessorsClosures(&session.afterClosures) // cleanup after used
}
// for postgres, many of them didn't implement lastInsertId, so we should // for postgres, many of them didn't implement lastInsertId, so we should
// implemented it ourself. // implemented it ourself.
@ -2127,12 +2128,11 @@ func (session *Session) Update(bean interface{}, condiBean ...interface{}) (int6
var args []interface{} var args []interface{}
var table *Table var table *Table
// handle before update processors // handle before update processors
for _, closure := range session.beforeClosures { for _, closure := range session.beforeClosures {
closure(bean) closure(bean)
} }
cleanupProcessorsClosures(&session.beforeClosures) // cleanup after used
if processor, ok := interface{}(bean).(BeforeUpdateProcessor); ok { if processor, ok := interface{}(bean).(BeforeUpdateProcessor); ok {
processor.BeforeUpdate() processor.BeforeUpdate()
} }
@ -2247,13 +2247,14 @@ func (session *Session) Update(bean interface{}, condiBean ...interface{}) (int6
} }
} else { } else {
if len(session.afterClosures) > 0 { if len(session.afterClosures) > 0 {
session.afterUpdateBeans = append(session.afterUpdateBeans, bean) session.afterUpdateBeans[bean] = session.afterClosures
} else { } else {
if _, ok := interface{}(bean).(AfterUpdateProcessor); ok { if _, ok := interface{}(bean).(AfterUpdateProcessor); ok {
session.afterUpdateBeans = append(session.afterUpdateBeans, bean) session.afterUpdateBeans[bean] = nil
} }
} }
} }
cleanupProcessorsClosures(&session.afterClosures) // cleanup after used
// -- // --
return res.RowsAffected() return res.RowsAffected()
@ -2321,11 +2322,11 @@ func (session *Session) Delete(bean interface{}) (int64, error) {
defer session.Close() defer session.Close()
} }
// handle before delete processors // handle before delete processors
for _, closure := range session.beforeClosures { for _, closure := range session.beforeClosures {
closure(bean) closure(bean)
} }
cleanupProcessorsClosures(&session.beforeClosures)
if processor, ok := interface{}(bean).(BeforeDeleteProcessor); ok { if processor, ok := interface{}(bean).(BeforeDeleteProcessor); ok {
processor.BeforeDelete() processor.BeforeDelete()
@ -2370,18 +2371,18 @@ func (session *Session) Delete(bean interface{}) (int64, error) {
closure(bean) closure(bean)
} }
if processor, ok := interface{}(bean).(AfterDeleteProcessor); ok { if processor, ok := interface{}(bean).(AfterDeleteProcessor); ok {
session.Engine.LogDebug(session.Statement.TableName(), " has after update processor")
processor.AfterDelete() processor.AfterDelete()
} }
} else { } else {
if len(session.afterClosures) > 0 { if len(session.afterClosures) > 0 {
session.afterDeleteBeans = append(session.afterDeleteBeans, bean) session.afterDeleteBeans[bean] = session.afterClosures
} else { } else {
if _, ok := interface{}(bean).(AfterDeleteProcessor); ok { if _, ok := interface{}(bean).(AfterDeleteProcessor); ok {
session.afterDeleteBeans = append(session.afterDeleteBeans, bean) session.afterDeleteBeans = nil
} }
} }
} }
cleanupProcessorsClosures(&session.afterClosures)
// -- // --
return res.RowsAffected() return res.RowsAffected()