From b107101bd52aa06411a387967d75b050200d11af Mon Sep 17 00:00:00 2001 From: Nash Tsai Date: Tue, 3 Dec 2013 09:09:21 +0800 Subject: [PATCH 1/2] fixed transactional based after processors handling bug --- session.go | 133 +++++++++++++++++++++++++++-------------------------- 1 file changed, 67 insertions(+), 66 deletions(-) diff --git a/session.go b/session.go index 17c744a2..6fe16e4b 100644 --- a/session.go +++ b/session.go @@ -22,15 +22,18 @@ type Session struct { IsCommitedOrRollbacked bool TransType string IsAutoClose bool - - // !nashtsai! storing these beans due to yet committed tx - afterInsertBeans []interface{} - afterUpdateBeans []interface{} - afterDeleteBeans []interface{} + + // !nashtsai! storing these beans due to yet committed tx + // afterInsertBeans []interface{} + // afterUpdateBeans []interface{} + // afterDeleteBeans []interface{} + afterInsertBeans map[interface{}][]func(interface{}) + afterUpdateBeans map[interface{}][]func(interface{}) + afterDeleteBeans map[interface{}][]func(interface{}) // -- - + beforeClosures []func(interface{}) - afterClosures []func(interface{}) + afterClosures []func(interface{}) } // Method Init reset the session as the init status. @@ -40,11 +43,11 @@ func (session *Session) Init() { session.IsAutoCommit = true session.IsCommitedOrRollbacked = false session.IsAutoClose = false - + // !nashtsai! is lazy init better? - session.afterInsertBeans = make([]interface{}, 0) - session.afterUpdateBeans = make([]interface{}, 0) - session.afterDeleteBeans = make([]interface{}, 0) + session.afterInsertBeans = make(map[interface{}][]func(interface{}), 0) + session.afterUpdateBeans = make(map[interface{}][]func(interface{}), 0) + session.afterDeleteBeans = make(map[interface{}][]func(interface{}), 0) session.beforeClosures = make([]func(interface{}), 0) session.afterClosures = make([]func(interface{}), 0) } @@ -98,7 +101,7 @@ func (session *Session) Before(closures func(interface{})) *Session { session.beforeClosures = append(session.beforeClosures, closures) } return session -} +} // Apply after Processor, affected bean is passed to closure arg func (session *Session) After(closures func(interface{})) *Session { @@ -284,55 +287,55 @@ func (session *Session) Commit() error { session.Engine.LogSQL("COMMIT") session.IsCommitedOrRollbacked = true var err error - if err = session.Tx.Commit(); err == nil { + if err = session.Tx.Commit(); err == nil { // handle processors after tx committed - for _, elem := range session.afterInsertBeans { - for _, closure := range session.afterClosures { - closure(elem) + for bean, closures := range session.afterInsertBeans { + for _, closure := range closures { + closure(bean) } - - if processor, ok := interface{}(elem).(AfterInsertProcessor); ok { + + if processor, ok := interface{}(bean).(AfterInsertProcessor); ok { processor.AfterInsert() } } - for _, elem := range session.afterUpdateBeans { - for _, closure := range session.afterClosures { - closure(elem) + for bean, closures := range session.afterUpdateBeans { + for _, closure := range closures { + closure(bean) } - if processor, ok := interface{}(elem).(AfterUpdateProcessor); ok { + + if processor, ok := interface{}(bean).(AfterUpdateProcessor); ok { processor.AfterUpdate() } } - for _, elem := range session.afterDeleteBeans { - for _, closure := range session.afterClosures { - closure(elem) - } - if processor, ok := interface{}(elem).(AfterDeleteProcessor); ok { + for bean, closures := range session.afterDeleteBeans { + for _, closure := range closures { + closure(bean) + } + + if processor, ok := interface{}(bean).(AfterDeleteProcessor); ok { processor.AfterDelete() } } - cleanUpFunc := func(slices *[]interface{}) { + cleanUpFunc := func(slices *map[interface{}][]func(interface{})) { if len(*slices) > 0 { - *slices = make([]interface{}, 0) - } - } - cleanUpProcessorsFunc := func(slices *[]func(interface{})) { - if len(*slices) > 0 { - *slices = make([]func(interface{}), 0) + *slices = make(map[interface{}][]func(interface{}), 0) } } cleanUpFunc(&session.afterInsertBeans) cleanUpFunc(&session.afterUpdateBeans) cleanUpFunc(&session.afterDeleteBeans) - - // !nash! should session based processors get cleanup? - cleanUpProcessorsFunc(&session.afterClosures) - } + } return err } return nil } +func cleanupProcessorsClosures(slices *[]func(interface{})) { + if len(*slices) > 0 { + *slices = make([]func(interface{}), 0) + } +} + func (session *Session) scanMapIntoStruct(obj interface{}, objMap map[string][]byte) error { dataStruct := reflect.Indirect(reflect.ValueOf(obj)) if dataStruct.Kind() != reflect.Struct { @@ -1410,7 +1413,7 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error for i := 0; i < size; i++ { elemValue := sliceValue.Index(i).Interface() colPlaces := make([]string, 0) - + // handle BeforeInsertProcessor for _, closure := range session.beforeClosures { closure(elemValue) @@ -1420,7 +1423,6 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error processor.BeforeInsert() } // -- - if i == 0 { for _, col := range table.Columns { @@ -1479,6 +1481,7 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error } colMultiPlaces = append(colMultiPlaces, strings.Join(colPlaces, ", ")) } + cleanupProcessorsClosures(&session.beforeClosures) statement := fmt.Sprintf("INSERT INTO %v%v%v (%v%v%v) VALUES (%v)", session.Engine.QuoteStr(), @@ -1497,8 +1500,8 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error if table.Cacher != nil && session.Statement.UseCache { session.cacheInsert(session.Statement.TableName()) } - - hasAfterClosures := len(session.afterClosures) > 0 + + hasAfterClosures := len(session.afterClosures) > 0 for i := 0; i < size; i++ { elemValue := sliceValue.Index(i).Interface() // handle AfterInsertProcessor @@ -1511,15 +1514,15 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error } } else { if hasAfterClosures { - session.afterInsertBeans = append(session.afterInsertBeans, elemValue) + session.afterInsertBeans[bean] = session.afterClosures } else { if _, ok := interface{}(elemValue).(AfterInsertProcessor); ok { - session.afterInsertBeans = append(session.afterInsertBeans, elemValue) + session.afterInsertBeans[bean] = nil } } } } - + cleanupProcessorsClosures(&session.afterClosures) return res.RowsAffected() } @@ -1796,13 +1799,12 @@ func (session *Session) innerInsert(bean interface{}) (int64, error) { for _, closure := range session.beforeClosures { closure(bean) } + cleanupProcessorsClosures(&session.beforeClosures) // cleanup after used + if processor, ok := interface{}(bean).(BeforeInsertProcessor); ok { - session.Engine.LogDebug(session.Statement.TableName(), " has before insert processor") processor.BeforeInsert() - } else { - session.Engine.LogDebug(session.Statement.TableName(), " has no before insert processor") } - // -- + // -- colNames, args, err := table.genCols(session, bean, false, false) if err != nil { @@ -1821,27 +1823,26 @@ func (session *Session) innerInsert(bean interface{}) (int64, error) { session.Engine.QuoteStr(), colPlaces) - handleAfterInsertProcessorFunc := func(bean interface{}) { if session.IsAutoCommit { for _, closure := range session.afterClosures { closure(bean) } - if processor, ok := interface{}(bean).(AfterInsertProcessor); ok { - session.Engine.LogDebug(session.Statement.TableName(), " has after insert processor") + if processor, ok := interface{}(bean).(AfterInsertProcessor); ok { processor.AfterInsert() } } else { if len(session.afterClosures) > 0 { - session.afterInsertBeans = append(session.afterInsertBeans, bean) + session.afterInsertBeans[bean] = session.afterClosures } else { - if _, ok := interface{}(bean).(AfterInsertProcessor); ok { - session.afterInsertBeans = append(session.afterInsertBeans, bean) + if _, ok := interface{}(bean).(AfterInsertProcessor); ok { + session.afterInsertBeans = nil } } } - } + cleanupProcessorsClosures(&session.afterClosures) // cleanup after used + } // for postgres, many of them didn't implement lastInsertId, so we should // implemented it ourself. @@ -2127,12 +2128,11 @@ func (session *Session) Update(bean interface{}, condiBean ...interface{}) (int6 var args []interface{} var table *Table - // handle before update processors for _, closure := range session.beforeClosures { closure(bean) } - + cleanupProcessorsClosures(&session.beforeClosures) // cleanup after used if processor, ok := interface{}(bean).(BeforeUpdateProcessor); ok { processor.BeforeUpdate() } @@ -2247,13 +2247,14 @@ func (session *Session) Update(bean interface{}, condiBean ...interface{}) (int6 } } else { if len(session.afterClosures) > 0 { - session.afterUpdateBeans = append(session.afterUpdateBeans, bean) + session.afterUpdateBeans[bean] = session.afterClosures } else { if _, ok := interface{}(bean).(AfterUpdateProcessor); ok { - session.afterUpdateBeans = append(session.afterUpdateBeans, bean) + session.afterUpdateBeans[bean] = nil } } - } + } + cleanupProcessorsClosures(&session.afterClosures) // cleanup after used // -- return res.RowsAffected() @@ -2321,11 +2322,11 @@ func (session *Session) Delete(bean interface{}) (int64, error) { defer session.Close() } - // handle before delete processors for _, closure := range session.beforeClosures { closure(bean) } + cleanupProcessorsClosures(&session.beforeClosures) if processor, ok := interface{}(bean).(BeforeDeleteProcessor); ok { processor.BeforeDelete() @@ -2370,18 +2371,18 @@ func (session *Session) Delete(bean interface{}) (int64, error) { closure(bean) } if processor, ok := interface{}(bean).(AfterDeleteProcessor); ok { - session.Engine.LogDebug(session.Statement.TableName(), " has after update processor") processor.AfterDelete() } } else { if len(session.afterClosures) > 0 { - session.afterDeleteBeans = append(session.afterDeleteBeans, bean) + session.afterDeleteBeans[bean] = session.afterClosures } else { if _, ok := interface{}(bean).(AfterDeleteProcessor); ok { - session.afterDeleteBeans = append(session.afterDeleteBeans, bean) + session.afterDeleteBeans = nil } } - } + } + cleanupProcessorsClosures(&session.afterClosures) // -- return res.RowsAffected() From 0182e87906f0378a16dd9b604f2d1a1857e1c662 Mon Sep 17 00:00:00 2001 From: Nash Tsai Date: Tue, 3 Dec 2013 11:49:30 +0800 Subject: [PATCH 2/2] fixed transactional after processors handling if same bean has been operated more than once --- session.go | 103 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 70 insertions(+), 33 deletions(-) diff --git a/session.go b/session.go index 0a0104a8..197773db 100644 --- a/session.go +++ b/session.go @@ -27,9 +27,9 @@ type Session struct { // afterInsertBeans []interface{} // afterUpdateBeans []interface{} // afterDeleteBeans []interface{} - afterInsertBeans map[interface{}][]func(interface{}) - afterUpdateBeans map[interface{}][]func(interface{}) - afterDeleteBeans map[interface{}][]func(interface{}) + afterInsertBeans map[interface{}]*[]func(interface{}) + afterUpdateBeans map[interface{}]*[]func(interface{}) + afterDeleteBeans map[interface{}]*[]func(interface{}) // -- beforeClosures []func(interface{}) @@ -45,9 +45,9 @@ func (session *Session) Init() { session.IsAutoClose = false // !nashtsai! is lazy init better? - session.afterInsertBeans = make(map[interface{}][]func(interface{}), 0) - session.afterUpdateBeans = make(map[interface{}][]func(interface{}), 0) - session.afterDeleteBeans = make(map[interface{}][]func(interface{}), 0) + session.afterInsertBeans = make(map[interface{}]*[]func(interface{}), 0) + session.afterUpdateBeans = make(map[interface{}]*[]func(interface{}), 0) + session.afterDeleteBeans = make(map[interface{}]*[]func(interface{}), 0) session.beforeClosures = make([]func(interface{}), 0) session.afterClosures = make([]func(interface{}), 0) } @@ -289,36 +289,40 @@ func (session *Session) Commit() error { var err error if err = session.Tx.Commit(); err == nil { // handle processors after tx committed - for bean, closures := range session.afterInsertBeans { - for _, closure := range closures { - closure(bean) + + closureCallFunc := func(closuresPtr *[]func(interface{}), bean interface{}) { + + if closuresPtr != nil { + for _, closure := range *closuresPtr { + closure(bean) + } } + } + + for bean, closuresPtr := range session.afterInsertBeans { + closureCallFunc(closuresPtr, bean) if processor, ok := interface{}(bean).(AfterInsertProcessor); ok { processor.AfterInsert() } } - for bean, closures := range session.afterUpdateBeans { - for _, closure := range closures { - closure(bean) - } + for bean, closuresPtr := range session.afterUpdateBeans { + closureCallFunc(closuresPtr, bean) if processor, ok := interface{}(bean).(AfterUpdateProcessor); ok { processor.AfterUpdate() } } - for bean, closures := range session.afterDeleteBeans { - for _, closure := range closures { - closure(bean) - } + for bean, closuresPtr := range session.afterDeleteBeans { + closureCallFunc(closuresPtr, bean) if processor, ok := interface{}(bean).(AfterDeleteProcessor); ok { processor.AfterDelete() } } - cleanUpFunc := func(slices *map[interface{}][]func(interface{})) { + cleanUpFunc := func(slices *map[interface{}]*[]func(interface{})) { if len(*slices) > 0 { - *slices = make(map[interface{}][]func(interface{}), 0) + *slices = make(map[interface{}]*[]func(interface{}), 0) } } cleanUpFunc(&session.afterInsertBeans) @@ -1428,6 +1432,7 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error colPlaces := make([]string, 0) // handle BeforeInsertProcessor + // !nashtsai! does user expect it's same slice to passed closure when using Before()/After() when insert multi?? for _, closure := range session.beforeClosures { closure(elemValue) } @@ -1514,11 +1519,12 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error session.cacheInsert(session.Statement.TableName()) } - hasAfterClosures := len(session.afterClosures) > 0 + lenAfterClosures := len(session.afterClosures) for i := 0; i < size; i++ { elemValue := sliceValue.Index(i).Interface() // handle AfterInsertProcessor if session.IsAutoCommit { + // !nashtsai! does user expect it's same slice to passed closure when using Before()/After() when insert multi?? for _, closure := range session.afterClosures { closure(elemValue) } @@ -1526,11 +1532,18 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error processor.AfterInsert() } } else { - if hasAfterClosures { - session.afterInsertBeans[bean] = session.afterClosures + if lenAfterClosures > 0 { + if value, has := session.afterInsertBeans[elemValue]; has && value != nil { + *value = append(*value, session.afterClosures...) + } else { + afterClosures := make([]func(interface{}), lenAfterClosures) + copy(afterClosures, session.afterClosures) + session.afterInsertBeans[elemValue] = &afterClosures + } + } else { if _, ok := interface{}(elemValue).(AfterInsertProcessor); ok { - session.afterInsertBeans[bean] = nil + session.afterInsertBeans[elemValue] = nil } } } @@ -1846,11 +1859,19 @@ func (session *Session) innerInsert(bean interface{}) (int64, error) { processor.AfterInsert() } } else { - if len(session.afterClosures) > 0 { - session.afterInsertBeans[bean] = session.afterClosures + lenAfterClosures := len(session.afterClosures) + if lenAfterClosures > 0 { + if value, has := session.afterInsertBeans[bean]; has && value != nil { + *value = append(*value, session.afterClosures...) + } else { + afterClosures := make([]func(interface{}), lenAfterClosures) + copy(afterClosures, session.afterClosures) + session.afterInsertBeans[bean] = &afterClosures + } + } else { if _, ok := interface{}(bean).(AfterInsertProcessor); ok { - session.afterInsertBeans = nil + session.afterInsertBeans[bean] = nil } } } @@ -2259,10 +2280,18 @@ func (session *Session) Update(bean interface{}, condiBean ...interface{}) (int6 processor.AfterUpdate() } } else { - if len(session.afterClosures) > 0 { - session.afterUpdateBeans[bean] = session.afterClosures + lenAfterClosures := len(session.afterClosures) + if lenAfterClosures > 0 { + if value, has := session.afterUpdateBeans[bean]; has && value != nil { + *value = append(*value, session.afterClosures...) + } else { + afterClosures := make([]func(interface{}), lenAfterClosures) + copy(afterClosures, session.afterClosures) + session.afterUpdateBeans[bean] = &afterClosures + } + } else { - if _, ok := interface{}(bean).(AfterUpdateProcessor); ok { + if _, ok := interface{}(bean).(AfterInsertProcessor); ok { session.afterUpdateBeans[bean] = nil } } @@ -2387,11 +2416,19 @@ func (session *Session) Delete(bean interface{}) (int64, error) { processor.AfterDelete() } } else { - if len(session.afterClosures) > 0 { - session.afterDeleteBeans[bean] = session.afterClosures + lenAfterClosures := len(session.afterClosures) + if lenAfterClosures > 0 { + if value, has := session.afterDeleteBeans[bean]; has && value != nil { + *value = append(*value, session.afterClosures...) + } else { + afterClosures := make([]func(interface{}), lenAfterClosures) + copy(afterClosures, session.afterClosures) + session.afterDeleteBeans[bean] = &afterClosures + } + } else { - if _, ok := interface{}(bean).(AfterDeleteProcessor); ok { - session.afterDeleteBeans = nil + if _, ok := interface{}(bean).(AfterInsertProcessor); ok { + session.afterDeleteBeans[bean] = nil } } }