Merge pull request #29 from nashtsai/processors
fixed transactional based after processors handling bug
This commit is contained in:
commit
9b89cef6f6
91
session.go
91
session.go
|
@ -24,9 +24,12 @@ type Session struct {
|
|||
IsAutoClose bool
|
||||
|
||||
// !nashtsai! storing these beans due to yet committed tx
|
||||
afterInsertBeans []interface{}
|
||||
afterUpdateBeans []interface{}
|
||||
afterDeleteBeans []interface{}
|
||||
// afterInsertBeans []interface{}
|
||||
// afterUpdateBeans []interface{}
|
||||
// afterDeleteBeans []interface{}
|
||||
afterInsertBeans map[interface{}][]func(interface{})
|
||||
afterUpdateBeans map[interface{}][]func(interface{})
|
||||
afterDeleteBeans map[interface{}][]func(interface{})
|
||||
// --
|
||||
|
||||
beforeClosures []func(interface{})
|
||||
|
@ -42,9 +45,9 @@ func (session *Session) Init() {
|
|||
session.IsAutoClose = false
|
||||
|
||||
// !nashtsai! is lazy init better?
|
||||
session.afterInsertBeans = make([]interface{}, 0)
|
||||
session.afterUpdateBeans = make([]interface{}, 0)
|
||||
session.afterDeleteBeans = make([]interface{}, 0)
|
||||
session.afterInsertBeans = make(map[interface{}][]func(interface{}), 0)
|
||||
session.afterUpdateBeans = make(map[interface{}][]func(interface{}), 0)
|
||||
session.afterDeleteBeans = make(map[interface{}][]func(interface{}), 0)
|
||||
session.beforeClosures = make([]func(interface{}), 0)
|
||||
session.afterClosures = make([]func(interface{}), 0)
|
||||
}
|
||||
|
@ -286,53 +289,53 @@ func (session *Session) Commit() error {
|
|||
var err error
|
||||
if err = session.Tx.Commit(); err == nil {
|
||||
// handle processors after tx committed
|
||||
for _, elem := range session.afterInsertBeans {
|
||||
for _, closure := range session.afterClosures {
|
||||
closure(elem)
|
||||
for bean, closures := range session.afterInsertBeans {
|
||||
for _, closure := range closures {
|
||||
closure(bean)
|
||||
}
|
||||
|
||||
if processor, ok := interface{}(elem).(AfterInsertProcessor); ok {
|
||||
if processor, ok := interface{}(bean).(AfterInsertProcessor); ok {
|
||||
processor.AfterInsert()
|
||||
}
|
||||
}
|
||||
for _, elem := range session.afterUpdateBeans {
|
||||
for _, closure := range session.afterClosures {
|
||||
closure(elem)
|
||||
for bean, closures := range session.afterUpdateBeans {
|
||||
for _, closure := range closures {
|
||||
closure(bean)
|
||||
}
|
||||
if processor, ok := interface{}(elem).(AfterUpdateProcessor); ok {
|
||||
|
||||
if processor, ok := interface{}(bean).(AfterUpdateProcessor); ok {
|
||||
processor.AfterUpdate()
|
||||
}
|
||||
}
|
||||
for _, elem := range session.afterDeleteBeans {
|
||||
for _, closure := range session.afterClosures {
|
||||
closure(elem)
|
||||
for bean, closures := range session.afterDeleteBeans {
|
||||
for _, closure := range closures {
|
||||
closure(bean)
|
||||
}
|
||||
if processor, ok := interface{}(elem).(AfterDeleteProcessor); ok {
|
||||
|
||||
if processor, ok := interface{}(bean).(AfterDeleteProcessor); ok {
|
||||
processor.AfterDelete()
|
||||
}
|
||||
}
|
||||
cleanUpFunc := func(slices *[]interface{}) {
|
||||
cleanUpFunc := func(slices *map[interface{}][]func(interface{})) {
|
||||
if len(*slices) > 0 {
|
||||
*slices = make([]interface{}, 0)
|
||||
}
|
||||
}
|
||||
cleanUpProcessorsFunc := func(slices *[]func(interface{})) {
|
||||
if len(*slices) > 0 {
|
||||
*slices = make([]func(interface{}), 0)
|
||||
*slices = make(map[interface{}][]func(interface{}), 0)
|
||||
}
|
||||
}
|
||||
cleanUpFunc(&session.afterInsertBeans)
|
||||
cleanUpFunc(&session.afterUpdateBeans)
|
||||
cleanUpFunc(&session.afterDeleteBeans)
|
||||
|
||||
// !nash! should session based processors get cleanup?
|
||||
cleanUpProcessorsFunc(&session.afterClosures)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func cleanupProcessorsClosures(slices *[]func(interface{})) {
|
||||
if len(*slices) > 0 {
|
||||
*slices = make([]func(interface{}), 0)
|
||||
}
|
||||
}
|
||||
|
||||
func (session *Session) scanMapIntoStruct(obj interface{}, objMap map[string][]byte) error {
|
||||
dataStruct := reflect.Indirect(reflect.ValueOf(obj))
|
||||
if dataStruct.Kind() != reflect.Struct {
|
||||
|
@ -1491,6 +1494,7 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error
|
|||
}
|
||||
colMultiPlaces = append(colMultiPlaces, strings.Join(colPlaces, ", "))
|
||||
}
|
||||
cleanupProcessorsClosures(&session.beforeClosures)
|
||||
|
||||
statement := fmt.Sprintf("INSERT INTO %v%v%v (%v%v%v) VALUES (%v)",
|
||||
session.Engine.QuoteStr(),
|
||||
|
@ -1523,15 +1527,15 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error
|
|||
}
|
||||
} else {
|
||||
if hasAfterClosures {
|
||||
session.afterInsertBeans = append(session.afterInsertBeans, elemValue)
|
||||
session.afterInsertBeans[bean] = session.afterClosures
|
||||
} else {
|
||||
if _, ok := interface{}(elemValue).(AfterInsertProcessor); ok {
|
||||
session.afterInsertBeans = append(session.afterInsertBeans, elemValue)
|
||||
session.afterInsertBeans[bean] = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanupProcessorsClosures(&session.afterClosures)
|
||||
return res.RowsAffected()
|
||||
}
|
||||
|
||||
|
@ -1808,11 +1812,10 @@ func (session *Session) innerInsert(bean interface{}) (int64, error) {
|
|||
for _, closure := range session.beforeClosures {
|
||||
closure(bean)
|
||||
}
|
||||
cleanupProcessorsClosures(&session.beforeClosures) // cleanup after used
|
||||
|
||||
if processor, ok := interface{}(bean).(BeforeInsertProcessor); ok {
|
||||
session.Engine.LogDebug(session.Statement.TableName(), " has before insert processor")
|
||||
processor.BeforeInsert()
|
||||
} else {
|
||||
session.Engine.LogDebug(session.Statement.TableName(), " has no before insert processor")
|
||||
}
|
||||
// --
|
||||
|
||||
|
@ -1840,18 +1843,18 @@ func (session *Session) innerInsert(bean interface{}) (int64, error) {
|
|||
closure(bean)
|
||||
}
|
||||
if processor, ok := interface{}(bean).(AfterInsertProcessor); ok {
|
||||
session.Engine.LogDebug(session.Statement.TableName(), " has after insert processor")
|
||||
processor.AfterInsert()
|
||||
}
|
||||
} else {
|
||||
if len(session.afterClosures) > 0 {
|
||||
session.afterInsertBeans = append(session.afterInsertBeans, bean)
|
||||
session.afterInsertBeans[bean] = session.afterClosures
|
||||
} else {
|
||||
if _, ok := interface{}(bean).(AfterInsertProcessor); ok {
|
||||
session.afterInsertBeans = append(session.afterInsertBeans, bean)
|
||||
session.afterInsertBeans = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
cleanupProcessorsClosures(&session.afterClosures) // cleanup after used
|
||||
}
|
||||
|
||||
// for postgres, many of them didn't implement lastInsertId, so we should
|
||||
|
@ -2142,7 +2145,7 @@ func (session *Session) Update(bean interface{}, condiBean ...interface{}) (int6
|
|||
for _, closure := range session.beforeClosures {
|
||||
closure(bean)
|
||||
}
|
||||
|
||||
cleanupProcessorsClosures(&session.beforeClosures) // cleanup after used
|
||||
if processor, ok := interface{}(bean).(BeforeUpdateProcessor); ok {
|
||||
processor.BeforeUpdate()
|
||||
}
|
||||
|
@ -2257,13 +2260,14 @@ func (session *Session) Update(bean interface{}, condiBean ...interface{}) (int6
|
|||
}
|
||||
} else {
|
||||
if len(session.afterClosures) > 0 {
|
||||
session.afterUpdateBeans = append(session.afterUpdateBeans, bean)
|
||||
session.afterUpdateBeans[bean] = session.afterClosures
|
||||
} else {
|
||||
if _, ok := interface{}(bean).(AfterUpdateProcessor); ok {
|
||||
session.afterUpdateBeans = append(session.afterUpdateBeans, bean)
|
||||
session.afterUpdateBeans[bean] = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
cleanupProcessorsClosures(&session.afterClosures) // cleanup after used
|
||||
// --
|
||||
|
||||
return res.RowsAffected()
|
||||
|
@ -2335,6 +2339,7 @@ func (session *Session) Delete(bean interface{}) (int64, error) {
|
|||
for _, closure := range session.beforeClosures {
|
||||
closure(bean)
|
||||
}
|
||||
cleanupProcessorsClosures(&session.beforeClosures)
|
||||
|
||||
if processor, ok := interface{}(bean).(BeforeDeleteProcessor); ok {
|
||||
processor.BeforeDelete()
|
||||
|
@ -2379,18 +2384,18 @@ func (session *Session) Delete(bean interface{}) (int64, error) {
|
|||
closure(bean)
|
||||
}
|
||||
if processor, ok := interface{}(bean).(AfterDeleteProcessor); ok {
|
||||
session.Engine.LogDebug(session.Statement.TableName(), " has after update processor")
|
||||
processor.AfterDelete()
|
||||
}
|
||||
} else {
|
||||
if len(session.afterClosures) > 0 {
|
||||
session.afterDeleteBeans = append(session.afterDeleteBeans, bean)
|
||||
session.afterDeleteBeans[bean] = session.afterClosures
|
||||
} else {
|
||||
if _, ok := interface{}(bean).(AfterDeleteProcessor); ok {
|
||||
session.afterDeleteBeans = append(session.afterDeleteBeans, bean)
|
||||
session.afterDeleteBeans = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
cleanupProcessorsClosures(&session.afterClosures)
|
||||
// --
|
||||
|
||||
return res.RowsAffected()
|
||||
|
|
Loading…
Reference in New Issue