support params > 65535 for postgres.

This commit is contained in:
zhangmj 2020-05-18 16:36:21 +08:00
parent 55594d1dbe
commit f5711b4552
2 changed files with 141 additions and 94 deletions

View File

@ -7,8 +7,10 @@ package integrations
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"strings"
"testing" "testing"
"time" "time"
"xorm.io/xorm/schemas"
"xorm.io/xorm" "xorm.io/xorm"
@ -45,6 +47,26 @@ func TestInsertMulti(t *testing.T) {
append([]TestMulti{}, TestMulti{1, "test1"}, TestMulti{2, "test2"}, TestMulti{3, "test3"})) append([]TestMulti{}, TestMulti{1, "test1"}, TestMulti{2, "test2"}, TestMulti{3, "test3"}))
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, 3, num) assert.EqualValues(t, 3, num)
if schemas.DBType(strings.ToLower(dbType)) == schemas.POSTGRES {
type TestMultiPG struct {
Id int64 `xorm:"int(11) pk"`
Name string `xorm:"varchar(255)"`
}
assert.NoError(t, testEngine.Sync2(new(TestMultiPG)))
var data []TestMultiPG
for i := 1; i < 655360; i++ {
data = append(data, TestMultiPG{
Id: int64(i),
Name: fmt.Sprintf("test %d", i),
})
}
num, err := insertMultiDatas(655359, data)
assert.NoError(t, err)
assert.EqualValues(t, 655359, num)
}
} }
func insertMultiDatas(step int, datas interface{}) (num int64, err error) { func insertMultiDatas(step int, datas interface{}) (num int64, err error) {

View File

@ -19,6 +19,9 @@ import (
// ErrNoElementsOnSlice represents an error there is no element when insert // ErrNoElementsOnSlice represents an error there is no element when insert
var ErrNoElementsOnSlice = errors.New("No element on slice when insert") var ErrNoElementsOnSlice = errors.New("No element on slice when insert")
// maxPgParams pg only support max 65535 placeholder params
const maxPgParams = 65535
// Insert insert one or more beans // Insert insert one or more beans
func (session *Session) Insert(beans ...interface{}) (int64, error) { func (session *Session) Insert(beans ...interface{}) (int64, error) {
var affected int64 var affected int64
@ -112,113 +115,135 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error
} }
var ( var (
table = session.statement.RefTable table = session.statement.RefTable
size = sliceValue.Len() size = sliceValue.Len()
colNames []string cols []*schemas.Column
colMultiPlaces []string colNames []string
args []interface{} step = size
cols []*schemas.Column affectRows int64
) )
for i := 0; i < size; i++ { if session.engine.dialect.URI().DBType == schemas.POSTGRES {
v := sliceValue.Index(i) step = maxPgParams / len(session.statement.RefTable.Columns())
var vv reflect.Value }
switch v.Kind() {
case reflect.Interface:
vv = reflect.Indirect(v.Elem())
default:
vv = reflect.Indirect(v)
}
elemValue := v.Interface()
var colPlaces []string
// handle BeforeInsertProcessor for i := 0; i < size; i += step {
// !nashtsai! does user expect it's same slice to passed closure when using Before()/After() when insert multi?? var colMultiPlaces []string
for _, closure := range session.beforeClosures { var args []interface{}
closure(elemValue)
stepSize := i + step
if stepSize > size {
stepSize = size
} }
if processor, ok := interface{}(elemValue).(BeforeInsertProcessor); ok { for j := i; j < stepSize; j++ {
processor.BeforeInsert() v := sliceValue.Index(j)
} var vv reflect.Value
// -- switch v.Kind() {
case reflect.Interface:
vv = reflect.Indirect(v.Elem())
default:
vv = reflect.Indirect(v)
}
elemValue := v.Interface()
var colPlaces []string
for _, col := range table.Columns() { // handle BeforeInsertProcessor
ptrFieldValue, err := col.ValueOfV(&vv) // !nashtsai! does user expect it's same slice to passed closure when using Before()/After() when insert multi??
if err != nil { for _, closure := range session.beforeClosures {
return 0, err closure(elemValue)
} }
fieldValue := *ptrFieldValue
if col.IsAutoIncrement && utils.IsZero(fieldValue.Interface()) {
continue
}
if col.MapType == schemas.ONLYFROMDB {
continue
}
if col.IsDeleted {
continue
}
if session.statement.OmitColumnMap.Contain(col.Name) {
continue
}
if len(session.statement.ColumnMap) > 0 && !session.statement.ColumnMap.Contain(col.Name) {
continue
}
if (col.IsCreated || col.IsUpdated) && session.statement.UseAutoTime {
val, t := session.engine.nowTime(col)
args = append(args, val)
var colName = col.Name if processor, ok := interface{}(elemValue).(BeforeInsertProcessor); ok {
session.afterClosures = append(session.afterClosures, func(bean interface{}) { processor.BeforeInsert()
col := table.GetColumn(colName) }
setColumnTime(bean, col, t) // --
})
} else if col.IsVersion && session.statement.CheckVersion { for _, col := range table.Columns() {
args = append(args, 1) ptrFieldValue, err := col.ValueOfV(&vv)
var colName = col.Name
session.afterClosures = append(session.afterClosures, func(bean interface{}) {
col := table.GetColumn(colName)
setColumnInt(bean, col, 1)
})
} else {
arg, err := session.statement.Value2Interface(col, fieldValue)
if err != nil { if err != nil {
return 0, err return affectRows, err
} }
args = append(args, arg) fieldValue := *ptrFieldValue
if col.IsAutoIncrement && utils.IsZero(fieldValue.Interface()) {
continue
}
if col.MapType == schemas.ONLYFROMDB {
continue
}
if col.IsDeleted {
continue
}
if session.statement.OmitColumnMap.Contain(col.Name) {
continue
}
if len(session.statement.ColumnMap) > 0 && !session.statement.ColumnMap.Contain(col.Name) {
continue
}
if (col.IsCreated || col.IsUpdated) && session.statement.UseAutoTime {
val, t := session.engine.nowTime(col)
args = append(args, val)
var colName = col.Name
session.afterClosures = append(session.afterClosures, func(bean interface{}) {
col := table.GetColumn(colName)
setColumnTime(bean, col, t)
})
} else if col.IsVersion && session.statement.CheckVersion {
args = append(args, 1)
var colName = col.Name
session.afterClosures = append(session.afterClosures, func(bean interface{}) {
col := table.GetColumn(colName)
setColumnInt(bean, col, 1)
})
} else {
arg, err := session.statement.Value2Interface(col, fieldValue)
if err != nil {
return affectRows, err
}
args = append(args, arg)
}
if j == 0 {
colNames = append(colNames, col.Name)
cols = append(cols, col)
}
colPlaces = append(colPlaces, "?")
} }
if i == 0 { colMultiPlaces = append(colMultiPlaces, strings.Join(colPlaces, ", "))
colNames = append(colNames, col.Name)
cols = append(cols, col)
}
colPlaces = append(colPlaces, "?")
} }
cleanupProcessorsClosures(&session.beforeClosures)
colMultiPlaces = append(colMultiPlaces, strings.Join(colPlaces, ", ")) quoter := session.engine.dialect.Quoter()
} var sql string
cleanupProcessorsClosures(&session.beforeClosures) colStr := quoter.Join(colNames, ",")
if session.engine.dialect.URI().DBType == schemas.ORACLE {
quoter := session.engine.dialect.Quoter() temp := fmt.Sprintf(") INTO %s (%v) VALUES (",
var sql string quoter.Quote(tableName),
colStr := quoter.Join(colNames, ",") colStr)
if session.engine.dialect.URI().DBType == schemas.ORACLE { sql = fmt.Sprintf("INSERT ALL INTO %s (%v) VALUES (%v) SELECT 1 FROM DUAL",
temp := fmt.Sprintf(") INTO %s (%v) VALUES (", quoter.Quote(tableName),
quoter.Quote(tableName), colStr,
colStr) strings.Join(colMultiPlaces, temp))
sql = fmt.Sprintf("INSERT ALL INTO %s (%v) VALUES (%v) SELECT 1 FROM DUAL", } else {
quoter.Quote(tableName), sql = fmt.Sprintf("INSERT INTO %s (%v) VALUES (%v)",
colStr, quoter.Quote(tableName),
strings.Join(colMultiPlaces, temp)) colStr,
} else { strings.Join(colMultiPlaces, "),("))
sql = fmt.Sprintf("INSERT INTO %s (%v) VALUES (%v)", }
quoter.Quote(tableName), res, err := session.exec(sql, args...)
colStr, if err != nil {
strings.Join(colMultiPlaces, "),(")) return affectRows, err
} }
res, err := session.exec(sql, args...) af, err := res.RowsAffected()
if err != nil { if err != nil {
return 0, err return affectRows, err
}
affectRows += af
if stepSize == size {
break
}
} }
session.cacheInsert(tableName) session.cacheInsert(tableName)
@ -254,7 +279,7 @@ func (session *Session) innerInsertMulti(rowsSlicePtr interface{}) (int64, error
} }
cleanupProcessorsClosures(&session.afterClosures) cleanupProcessorsClosures(&session.afterClosures)
return res.RowsAffected() return affectRows, nil
} }
// InsertMulti insert multiple records // InsertMulti insert multiple records