added maxconnect limit support
This commit is contained in:
parent
f817b30f28
commit
f0e87becd2
|
@ -42,7 +42,7 @@ type Engine struct {
|
|||
Tables map[reflect.Type]*Table
|
||||
mutex *sync.Mutex
|
||||
ShowSQL bool
|
||||
pool IConnectPool
|
||||
Pool IConnectPool
|
||||
CacheMapping bool
|
||||
Filters []Filter
|
||||
Logger io.Writer
|
||||
|
@ -69,8 +69,8 @@ func (engine *Engine) AutoIncrStr() string {
|
|||
}
|
||||
|
||||
func (engine *Engine) SetPool(pool IConnectPool) error {
|
||||
engine.pool = pool
|
||||
return engine.pool.Init(engine)
|
||||
engine.Pool = pool
|
||||
return engine.Pool.Init(engine)
|
||||
}
|
||||
|
||||
func Type(bean interface{}) reflect.Type {
|
||||
|
@ -96,7 +96,7 @@ func (engine *Engine) NewSession() *Session {
|
|||
}
|
||||
|
||||
func (engine *Engine) Close() error {
|
||||
return engine.pool.Close(engine)
|
||||
return engine.Pool.Close(engine)
|
||||
}
|
||||
|
||||
func (engine *Engine) Test() error {
|
||||
|
|
|
@ -34,7 +34,7 @@ func test(engine *xorm.Engine) {
|
|||
return
|
||||
}
|
||||
|
||||
size := 10
|
||||
size := 500
|
||||
queue := make(chan int, size)
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
|
@ -84,7 +84,7 @@ func test(engine *xorm.Engine) {
|
|||
}
|
||||
|
||||
func main() {
|
||||
fmt.Println("create engine")
|
||||
fmt.Println("-----start sqlite go routines-----")
|
||||
engine, err := sqliteEngine()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
|
@ -96,6 +96,7 @@ func main() {
|
|||
fmt.Println("test end")
|
||||
engine.Close()
|
||||
|
||||
fmt.Println("-----start mysql go routines-----")
|
||||
engine, err = mysqlEngine()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
//xorm "github.com/lunny/xorm"
|
||||
"fmt"
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"os"
|
||||
//"time"
|
||||
//"sync/atomic"
|
||||
xorm "xorm"
|
||||
)
|
||||
|
||||
type User struct {
|
||||
Id int64
|
||||
Name string
|
||||
}
|
||||
|
||||
func sqliteEngine() (*xorm.Engine, error) {
|
||||
os.Remove("./test.db")
|
||||
return xorm.NewEngine("sqlite3", "./goroutine.db")
|
||||
}
|
||||
|
||||
func mysqlEngine() (*xorm.Engine, error) {
|
||||
return xorm.NewEngine("mysql", "root:@/test?charset=utf8")
|
||||
}
|
||||
|
||||
var u *User = &User{}
|
||||
|
||||
func test(engine *xorm.Engine) {
|
||||
err := engine.CreateTables(u)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
engine.Pool.SetMaxConns(5)
|
||||
size := 10
|
||||
queue := make(chan int, size)
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
go func(x int) {
|
||||
//x := i
|
||||
err := engine.Test()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
} else {
|
||||
err = engine.Map(u)
|
||||
if err != nil {
|
||||
fmt.Println("Map user failed")
|
||||
} else {
|
||||
for j := 0; j < 10; j++ {
|
||||
if x+j < 2 {
|
||||
_, err = engine.Get(u)
|
||||
} else if x+j < 4 {
|
||||
users := make([]User, 0)
|
||||
err = engine.Find(&users)
|
||||
} else if x+j < 8 {
|
||||
_, err = engine.Count(u)
|
||||
} else if x+j < 16 {
|
||||
_, err = engine.Insert(&User{Name: "xlw"})
|
||||
} else if x+j < 32 {
|
||||
_, err = engine.Id(1).Delete(u)
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
queue <- x
|
||||
return
|
||||
}
|
||||
}
|
||||
fmt.Printf("%v success!\n", x)
|
||||
}
|
||||
}
|
||||
queue <- x
|
||||
}(i)
|
||||
}
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
<-queue
|
||||
}
|
||||
|
||||
//conns := atomic.LoadInt32(&xorm.ConnectionNum)
|
||||
//fmt.Println("connection number:", conns)
|
||||
fmt.Println("end")
|
||||
}
|
||||
|
||||
func main() {
|
||||
fmt.Println("create engine")
|
||||
engine, err := sqliteEngine()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
engine.ShowSQL = true
|
||||
fmt.Println(engine)
|
||||
test(engine)
|
||||
fmt.Println("test end")
|
||||
engine.Close()
|
||||
|
||||
engine, err = mysqlEngine()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
defer engine.Close()
|
||||
test(engine)
|
||||
}
|
58
pool.go
58
pool.go
|
@ -32,6 +32,8 @@ type IConnectPool interface {
|
|||
Close(engine *Engine) error
|
||||
SetMaxIdleConns(conns int)
|
||||
MaxIdleConns() int
|
||||
SetMaxConns(conns int)
|
||||
MaxConns() int
|
||||
}
|
||||
|
||||
// Struct NoneConnectPool is a implement for IConnectPool. It provides directly invoke driver's
|
||||
|
@ -72,12 +74,25 @@ func (p *NoneConnectPool) MaxIdleConns() int {
|
|||
return 0
|
||||
}
|
||||
|
||||
// not implemented
|
||||
func (p *NoneConnectPool) SetMaxConns(conns int) {
|
||||
}
|
||||
|
||||
// not implemented
|
||||
func (p *NoneConnectPool) MaxConns() int {
|
||||
return -1
|
||||
}
|
||||
|
||||
// Struct SysConnectPool is a simple wrapper for using system default connection pool.
|
||||
// About the system connection pool, you can review the code database/sql/sql.go
|
||||
// It's currently default Pool implments.
|
||||
type SysConnectPool struct {
|
||||
db *sql.DB
|
||||
maxIdleConns int
|
||||
maxConns int
|
||||
curConns int
|
||||
mutex *sync.Mutex
|
||||
cond *sync.Cond
|
||||
}
|
||||
|
||||
// NewSysConnectPool new a SysConnectPool.
|
||||
|
@ -93,16 +108,40 @@ func (s *SysConnectPool) Init(engine *Engine) error {
|
|||
}
|
||||
s.db = db
|
||||
s.maxIdleConns = 2
|
||||
s.maxConns = -1
|
||||
s.curConns = 0
|
||||
s.mutex = &sync.Mutex{}
|
||||
s.cond = sync.NewCond(s.mutex)
|
||||
return nil
|
||||
}
|
||||
|
||||
// RetrieveDB just return the only db
|
||||
func (p *SysConnectPool) RetrieveDB(engine *Engine) (db *sql.DB, err error) {
|
||||
if p.maxConns != -1 {
|
||||
p.cond.L.Lock()
|
||||
defer p.cond.L.Unlock()
|
||||
//fmt.Println("before retrieve - current connections:", p.curConns, p.maxConns)
|
||||
for p.curConns >= p.maxConns-1 {
|
||||
//fmt.Println("waiting...")
|
||||
p.cond.Wait()
|
||||
}
|
||||
p.curConns += 1
|
||||
}
|
||||
return p.db, nil
|
||||
}
|
||||
|
||||
// ReleaseDB do nothing
|
||||
func (p *SysConnectPool) ReleaseDB(engine *Engine, db *sql.DB) {
|
||||
if p.maxConns != -1 {
|
||||
p.cond.L.Lock()
|
||||
defer p.cond.L.Unlock()
|
||||
//fmt.Println("before release - current connections:", p.curConns, p.maxConns)
|
||||
if p.curConns >= p.maxConns-1 {
|
||||
//fmt.Println("signaling...")
|
||||
p.cond.Signal()
|
||||
}
|
||||
p.curConns -= 1
|
||||
}
|
||||
}
|
||||
|
||||
// Close closed the only db
|
||||
|
@ -119,6 +158,16 @@ func (p *SysConnectPool) MaxIdleConns() int {
|
|||
return p.maxIdleConns
|
||||
}
|
||||
|
||||
// not implemented
|
||||
func (p *SysConnectPool) SetMaxConns(conns int) {
|
||||
p.maxConns = conns
|
||||
}
|
||||
|
||||
// not implemented
|
||||
func (p *SysConnectPool) MaxConns() int {
|
||||
return p.maxConns
|
||||
}
|
||||
|
||||
// NewSimpleConnectPool new a SimpleConnectPool
|
||||
func NewSimpleConnectPool() IConnectPool {
|
||||
return &SimpleConnectPool{releasedConnects: make([]*sql.DB, 10),
|
||||
|
@ -205,3 +254,12 @@ func (p *SimpleConnectPool) SetMaxIdleConns(conns int) {
|
|||
func (p *SimpleConnectPool) MaxIdleConns() int {
|
||||
return p.maxIdleConns
|
||||
}
|
||||
|
||||
// not implemented
|
||||
func (p *SimpleConnectPool) SetMaxConns(conns int) {
|
||||
}
|
||||
|
||||
// not implemented
|
||||
func (p *SimpleConnectPool) MaxConns() int {
|
||||
return -1
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ func (session *Session) Init() {
|
|||
func (session *Session) Close() {
|
||||
defer func() {
|
||||
if session.Db != nil {
|
||||
session.Engine.pool.ReleaseDB(session.Engine, session.Db)
|
||||
session.Engine.Pool.ReleaseDB(session.Engine, session.Db)
|
||||
session.Db = nil
|
||||
session.Tx = nil
|
||||
session.Init()
|
||||
|
@ -125,7 +125,7 @@ func (session *Session) Having(conditions string) *Session {
|
|||
|
||||
func (session *Session) newDb() error {
|
||||
if session.Db == nil {
|
||||
db, err := session.Engine.pool.RetrieveDB(session.Engine)
|
||||
db, err := session.Engine.Pool.RetrieveDB(session.Engine)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
3
xorm.go
3
xorm.go
|
@ -47,8 +47,7 @@ func NewEngine(driverName string, dataSourceName string) (*Engine, error) {
|
|||
|
||||
//engine.Pool = NewSimpleConnectPool()
|
||||
//engine.Pool = NewNoneConnectPool()
|
||||
engine.pool = NewSysConnectPool()
|
||||
err := engine.pool.Init(engine)
|
||||
err := engine.SetPool(NewSysConnectPool())
|
||||
|
||||
return engine, err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue