From f0e87becd2d44cd6efb0a6523c38ac5e3e8ac5b9 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Thu, 29 Aug 2013 17:26:33 +0800 Subject: [PATCH] added maxconnect limit support --- engine.go | 8 +-- examples/goroutine.go | 5 +- examples/maxconnect.go | 107 +++++++++++++++++++++++++++++++++++++++++ pool.go | 58 ++++++++++++++++++++++ session.go | 4 +- xorm.go | 3 +- 6 files changed, 175 insertions(+), 10 deletions(-) create mode 100644 examples/maxconnect.go diff --git a/engine.go b/engine.go index 8d75dfe3..b74132e6 100644 --- a/engine.go +++ b/engine.go @@ -42,7 +42,7 @@ type Engine struct { Tables map[reflect.Type]*Table mutex *sync.Mutex ShowSQL bool - pool IConnectPool + Pool IConnectPool CacheMapping bool Filters []Filter Logger io.Writer @@ -69,8 +69,8 @@ func (engine *Engine) AutoIncrStr() string { } func (engine *Engine) SetPool(pool IConnectPool) error { - engine.pool = pool - return engine.pool.Init(engine) + engine.Pool = pool + return engine.Pool.Init(engine) } func Type(bean interface{}) reflect.Type { @@ -96,7 +96,7 @@ func (engine *Engine) NewSession() *Session { } func (engine *Engine) Close() error { - return engine.pool.Close(engine) + return engine.Pool.Close(engine) } func (engine *Engine) Test() error { diff --git a/examples/goroutine.go b/examples/goroutine.go index 70f8c5ab..23c57a76 100644 --- a/examples/goroutine.go +++ b/examples/goroutine.go @@ -34,7 +34,7 @@ func test(engine *xorm.Engine) { return } - size := 10 + size := 500 queue := make(chan int, size) for i := 0; i < size; i++ { @@ -84,7 +84,7 @@ func test(engine *xorm.Engine) { } func main() { - fmt.Println("create engine") + fmt.Println("-----start sqlite go routines-----") engine, err := sqliteEngine() if err != nil { fmt.Println(err) @@ -96,6 +96,7 @@ func main() { fmt.Println("test end") engine.Close() + fmt.Println("-----start mysql go routines-----") engine, err = mysqlEngine() if err != nil { fmt.Println(err) diff --git a/examples/maxconnect.go b/examples/maxconnect.go new file mode 100644 index 00000000..afbcc837 --- /dev/null +++ b/examples/maxconnect.go @@ -0,0 +1,107 @@ +package main + +import ( + //xorm "github.com/lunny/xorm" + "fmt" + _ "github.com/go-sql-driver/mysql" + _ "github.com/mattn/go-sqlite3" + "os" + //"time" + //"sync/atomic" + xorm "xorm" +) + +type User struct { + Id int64 + Name string +} + +func sqliteEngine() (*xorm.Engine, error) { + os.Remove("./test.db") + return xorm.NewEngine("sqlite3", "./goroutine.db") +} + +func mysqlEngine() (*xorm.Engine, error) { + return xorm.NewEngine("mysql", "root:@/test?charset=utf8") +} + +var u *User = &User{} + +func test(engine *xorm.Engine) { + err := engine.CreateTables(u) + if err != nil { + fmt.Println(err) + return + } + + engine.Pool.SetMaxConns(5) + size := 10 + queue := make(chan int, size) + + for i := 0; i < size; i++ { + go func(x int) { + //x := i + err := engine.Test() + if err != nil { + fmt.Println(err) + } else { + err = engine.Map(u) + if err != nil { + fmt.Println("Map user failed") + } else { + for j := 0; j < 10; j++ { + if x+j < 2 { + _, err = engine.Get(u) + } else if x+j < 4 { + users := make([]User, 0) + err = engine.Find(&users) + } else if x+j < 8 { + _, err = engine.Count(u) + } else if x+j < 16 { + _, err = engine.Insert(&User{Name: "xlw"}) + } else if x+j < 32 { + _, err = engine.Id(1).Delete(u) + } + if err != nil { + fmt.Println(err) + queue <- x + return + } + } + fmt.Printf("%v success!\n", x) + } + } + queue <- x + }(i) + } + + for i := 0; i < size; i++ { + <-queue + } + + //conns := atomic.LoadInt32(&xorm.ConnectionNum) + //fmt.Println("connection number:", conns) + fmt.Println("end") +} + +func main() { + fmt.Println("create engine") + engine, err := sqliteEngine() + if err != nil { + fmt.Println(err) + return + } + engine.ShowSQL = true + fmt.Println(engine) + test(engine) + fmt.Println("test end") + engine.Close() + + engine, err = mysqlEngine() + if err != nil { + fmt.Println(err) + return + } + defer engine.Close() + test(engine) +} diff --git a/pool.go b/pool.go index 8fcd7255..b8caa273 100644 --- a/pool.go +++ b/pool.go @@ -32,6 +32,8 @@ type IConnectPool interface { Close(engine *Engine) error SetMaxIdleConns(conns int) MaxIdleConns() int + SetMaxConns(conns int) + MaxConns() int } // Struct NoneConnectPool is a implement for IConnectPool. It provides directly invoke driver's @@ -72,12 +74,25 @@ func (p *NoneConnectPool) MaxIdleConns() int { return 0 } +// not implemented +func (p *NoneConnectPool) SetMaxConns(conns int) { +} + +// not implemented +func (p *NoneConnectPool) MaxConns() int { + return -1 +} + // Struct SysConnectPool is a simple wrapper for using system default connection pool. // About the system connection pool, you can review the code database/sql/sql.go // It's currently default Pool implments. type SysConnectPool struct { db *sql.DB maxIdleConns int + maxConns int + curConns int + mutex *sync.Mutex + cond *sync.Cond } // NewSysConnectPool new a SysConnectPool. @@ -93,16 +108,40 @@ func (s *SysConnectPool) Init(engine *Engine) error { } s.db = db s.maxIdleConns = 2 + s.maxConns = -1 + s.curConns = 0 + s.mutex = &sync.Mutex{} + s.cond = sync.NewCond(s.mutex) return nil } // RetrieveDB just return the only db func (p *SysConnectPool) RetrieveDB(engine *Engine) (db *sql.DB, err error) { + if p.maxConns != -1 { + p.cond.L.Lock() + defer p.cond.L.Unlock() + //fmt.Println("before retrieve - current connections:", p.curConns, p.maxConns) + for p.curConns >= p.maxConns-1 { + //fmt.Println("waiting...") + p.cond.Wait() + } + p.curConns += 1 + } return p.db, nil } // ReleaseDB do nothing func (p *SysConnectPool) ReleaseDB(engine *Engine, db *sql.DB) { + if p.maxConns != -1 { + p.cond.L.Lock() + defer p.cond.L.Unlock() + //fmt.Println("before release - current connections:", p.curConns, p.maxConns) + if p.curConns >= p.maxConns-1 { + //fmt.Println("signaling...") + p.cond.Signal() + } + p.curConns -= 1 + } } // Close closed the only db @@ -119,6 +158,16 @@ func (p *SysConnectPool) MaxIdleConns() int { return p.maxIdleConns } +// not implemented +func (p *SysConnectPool) SetMaxConns(conns int) { + p.maxConns = conns +} + +// not implemented +func (p *SysConnectPool) MaxConns() int { + return p.maxConns +} + // NewSimpleConnectPool new a SimpleConnectPool func NewSimpleConnectPool() IConnectPool { return &SimpleConnectPool{releasedConnects: make([]*sql.DB, 10), @@ -205,3 +254,12 @@ func (p *SimpleConnectPool) SetMaxIdleConns(conns int) { func (p *SimpleConnectPool) MaxIdleConns() int { return p.maxIdleConns } + +// not implemented +func (p *SimpleConnectPool) SetMaxConns(conns int) { +} + +// not implemented +func (p *SimpleConnectPool) MaxConns() int { + return -1 +} diff --git a/session.go b/session.go index 1c6a96b7..fc7db522 100644 --- a/session.go +++ b/session.go @@ -37,7 +37,7 @@ func (session *Session) Init() { func (session *Session) Close() { defer func() { if session.Db != nil { - session.Engine.pool.ReleaseDB(session.Engine, session.Db) + session.Engine.Pool.ReleaseDB(session.Engine, session.Db) session.Db = nil session.Tx = nil session.Init() @@ -125,7 +125,7 @@ func (session *Session) Having(conditions string) *Session { func (session *Session) newDb() error { if session.Db == nil { - db, err := session.Engine.pool.RetrieveDB(session.Engine) + db, err := session.Engine.Pool.RetrieveDB(session.Engine) if err != nil { return err } diff --git a/xorm.go b/xorm.go index 0e5a3958..a965f713 100644 --- a/xorm.go +++ b/xorm.go @@ -47,8 +47,7 @@ func NewEngine(driverName string, dataSourceName string) (*Engine, error) { //engine.Pool = NewSimpleConnectPool() //engine.Pool = NewNoneConnectPool() - engine.pool = NewSysConnectPool() - err := engine.pool.Init(engine) + err := engine.SetPool(NewSysConnectPool()) return engine, err }