Added NewDB(), DB(), Clone() and removed pool object and other improvements

This commit is contained in:
Lunny Xiao 2014-04-15 11:39:29 +08:00
parent 38616e71fd
commit feb1f45e72
5 changed files with 66 additions and 342 deletions

View File

@ -18,23 +18,32 @@ import (
// Engine is the major struct of xorm, it means a database manager.
// Commonly, an application only need one engine
type Engine struct {
ColumnMapper core.IMapper
TableMapper core.IMapper
TagIdentifier string
DriverName string
DataSourceName string
dialect core.Dialect
Tables map[reflect.Type]*core.Table
db *core.DB
dialect core.Dialect
ColumnMapper core.IMapper
TableMapper core.IMapper
TagIdentifier string
Tables map[reflect.Type]*core.Table
mutex *sync.RWMutex
Cacher core.Cacher
mutex *sync.RWMutex
ShowSQL bool
ShowErr bool
ShowDebug bool
ShowWarn bool
Pool IConnectPool
Filters []core.Filter
Logger ILogger // io.Writer
Cacher core.Cacher
//Pool IConnectPool
//Filters []core.Filter
Logger ILogger // io.Writer
}
func (engine *Engine) DriverName() string {
return engine.dialect.DriverName()
}
func (engine *Engine) DataSourceName() string {
return engine.dialect.DataSourceName()
}
func (engine *Engine) SetMapper(mapper core.IMapper) {
@ -80,19 +89,19 @@ func (engine *Engine) AutoIncrStr() string {
}
// Set engine's pool, the pool default is Go's standard library's connection pool.
func (engine *Engine) SetPool(pool IConnectPool) error {
/*func (engine *Engine) SetPool(pool IConnectPool) error {
engine.Pool = pool
return engine.Pool.Init(engine)
}
}*/
// SetMaxConns is only available for go 1.2+
func (engine *Engine) SetMaxConns(conns int) {
engine.Pool.SetMaxConns(conns)
engine.db.SetMaxOpenConns(conns)
}
// SetMaxIdleConns
func (engine *Engine) SetMaxIdleConns(conns int) {
engine.Pool.SetMaxIdleConns(conns)
engine.db.SetMaxIdleConns(conns)
}
// SetDefaltCacher set the default cacher. Xorm's default not enable cacher.
@ -122,8 +131,12 @@ func (engine *Engine) MapCacher(bean interface{}, cacher core.Cacher) {
}
// OpenDB provides a interface to operate database directly.
func (engine *Engine) OpenDB() (*core.DB, error) {
return core.Open(engine.DriverName, engine.DataSourceName)
func (engine *Engine) NewDB() (*core.DB, error) {
return core.OpenDialect(engine.dialect)
}
func (engine *Engine) DB() *core.DB {
return engine.db
}
// New a session
@ -135,7 +148,7 @@ func (engine *Engine) NewSession() *Session {
// Close the engine
func (engine *Engine) Close() error {
return engine.Pool.Close(engine)
return engine.db.Close()
}
// Ping tests if database is alive.

285
pool.go
View File

@ -1,285 +0,0 @@
package xorm
import (
"container/list"
"reflect"
"sync"
"time"
"github.com/go-xorm/core"
)
// Interface IConnecPool is a connection pool interface, all implements should implement
// Init, RetrieveDB, ReleaseDB and Close methods.
// Init for init when engine be created or invoke SetPool
// RetrieveDB for requesting a connection to db;
// ReleaseDB for releasing a db connection;
// Close for invoking when engine.Close
type IConnectPool interface {
Init(engine *Engine) error
RetrieveDB(engine *Engine) (*core.DB, error)
ReleaseDB(engine *Engine, db *core.DB)
Close(engine *Engine) error
SetMaxIdleConns(conns int)
MaxIdleConns() int
SetMaxConns(conns int)
MaxConns() int
}
// Struct NoneConnectPool is a implement for IConnectPool. It provides directly invoke driver's
// open and release connection function
type NoneConnectPool struct {
}
// NewNoneConnectPool new a NoneConnectPool.
func NewNoneConnectPool() IConnectPool {
return &NoneConnectPool{}
}
// Init do nothing
func (p *NoneConnectPool) Init(engine *Engine) error {
return nil
}
// RetrieveDB directly open a connection
func (p *NoneConnectPool) RetrieveDB(engine *Engine) (db *core.DB, err error) {
db, err = engine.OpenDB()
return
}
// ReleaseDB directly close a connection
func (p *NoneConnectPool) ReleaseDB(engine *Engine, db *core.DB) {
db.Close()
}
// Close do nothing
func (p *NoneConnectPool) Close(engine *Engine) error {
return nil
}
func (p *NoneConnectPool) SetMaxIdleConns(conns int) {
}
func (p *NoneConnectPool) MaxIdleConns() int {
return 0
}
// not implemented
func (p *NoneConnectPool) SetMaxConns(conns int) {
}
// not implemented
func (p *NoneConnectPool) MaxConns() int {
return -1
}
// Struct SysConnectPool is a simple wrapper for using system default connection pool.
// About the system connection pool, you can review the code database/sql/sql.go
// It's currently default Pool implments.
type SysConnectPool struct {
db *core.DB
maxIdleConns int
maxConns int
curConns int
mutex *sync.Mutex
queue *list.List
}
// NewSysConnectPool new a SysConnectPool.
func NewSysConnectPool() IConnectPool {
return &SysConnectPool{}
}
// Init create a db immediately and keep it util engine closed.
func (s *SysConnectPool) Init(engine *Engine) error {
db, err := engine.OpenDB()
if err != nil {
return err
}
s.db = db
s.maxIdleConns = 2
s.maxConns = -1
s.curConns = 0
s.mutex = &sync.Mutex{}
s.queue = list.New()
return nil
}
type node struct {
mutex sync.Mutex
cond *sync.Cond
}
func newCondNode() *node {
n := &node{}
n.cond = sync.NewCond(&n.mutex)
return n
}
// RetrieveDB just return the only db
func (s *SysConnectPool) RetrieveDB(engine *Engine) (db *core.DB, err error) {
/*if s.maxConns > 0 {
fmt.Println("before retrieve")
s.mutex.Lock()
for s.curConns >= s.maxConns {
fmt.Println("before waiting...", s.curConns, s.queue.Len())
s.mutex.Unlock()
n := NewNode()
n.cond.L.Lock()
s.queue.PushBack(n)
n.cond.Wait()
n.cond.L.Unlock()
s.mutex.Lock()
fmt.Println("after waiting...", s.curConns, s.queue.Len())
}
s.curConns += 1
s.mutex.Unlock()
fmt.Println("after retrieve")
}*/
return s.db, nil
}
// ReleaseDB do nothing
func (s *SysConnectPool) ReleaseDB(engine *Engine, db *core.DB) {
/*if s.maxConns > 0 {
s.mutex.Lock()
fmt.Println("before release", s.queue.Len())
s.curConns -= 1
if e := s.queue.Front(); e != nil {
n := e.Value.(*node)
//n.cond.L.Lock()
n.cond.Signal()
fmt.Println("signaled...")
s.queue.Remove(e)
//n.cond.L.Unlock()
}
fmt.Println("after released", s.queue.Len())
s.mutex.Unlock()
}*/
}
// Close closed the only db
func (p *SysConnectPool) Close(engine *Engine) error {
return p.db.Close()
}
func (p *SysConnectPool) SetMaxIdleConns(conns int) {
p.db.SetMaxIdleConns(conns)
p.maxIdleConns = conns
}
func (p *SysConnectPool) MaxIdleConns() int {
return p.maxIdleConns
}
// not implemented
func (p *SysConnectPool) SetMaxConns(conns int) {
p.maxConns = conns
// if support SetMaxOpenConns, go 1.2+, then set
if reflect.ValueOf(p.db).MethodByName("SetMaxOpenConns").IsValid() {
reflect.ValueOf(p.db).MethodByName("SetMaxOpenConns").Call([]reflect.Value{reflect.ValueOf(conns)})
}
//p.db.SetMaxOpenConns(conns)
}
// not implemented
func (p *SysConnectPool) MaxConns() int {
return p.maxConns
}
// NewSimpleConnectPool new a SimpleConnectPool
func NewSimpleConnectPool() IConnectPool {
return &SimpleConnectPool{releasedConnects: make([]*core.DB, 10),
usingConnects: map[*core.DB]time.Time{},
cur: -1,
maxWaitTimeOut: 14400,
maxIdleConns: 10,
mutex: &sync.Mutex{},
}
}
// Struct SimpleConnectPool is a simple implementation for IConnectPool.
// It's a custom connection pool and not use system connection pool.
// Opening or Closing a database connection must be enter a lock.
// This implements will be improved in furture.
type SimpleConnectPool struct {
releasedConnects []*core.DB
cur int
usingConnects map[*core.DB]time.Time
maxWaitTimeOut int
mutex *sync.Mutex
maxIdleConns int
}
func (s *SimpleConnectPool) Init(engine *Engine) error {
return nil
}
// RetrieveDB get a connection from connection pool
func (p *SimpleConnectPool) RetrieveDB(engine *Engine) (*core.DB, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
var db *core.DB = nil
var err error = nil
//fmt.Printf("%x, rbegin - released:%v, using:%v\n", &p, p.cur+1, len(p.usingConnects))
if p.cur < 0 {
db, err = engine.OpenDB()
if err != nil {
return nil, err
}
p.usingConnects[db] = time.Now()
} else {
db = p.releasedConnects[p.cur]
p.usingConnects[db] = time.Now()
p.releasedConnects[p.cur] = nil
p.cur = p.cur - 1
}
//fmt.Printf("%x, rend - released:%v, using:%v\n", &p, p.cur+1, len(p.usingConnects))
return db, nil
}
// ReleaseDB release a db from connection pool
func (p *SimpleConnectPool) ReleaseDB(engine *Engine, db *core.DB) {
p.mutex.Lock()
defer p.mutex.Unlock()
//fmt.Printf("%x, lbegin - released:%v, using:%v\n", &p, p.cur+1, len(p.usingConnects))
if p.cur >= p.maxIdleConns-1 {
db.Close()
} else {
p.cur = p.cur + 1
p.releasedConnects[p.cur] = db
}
delete(p.usingConnects, db)
//fmt.Printf("%x, lend - released:%v, using:%v\n", &p, p.cur+1, len(p.usingConnects))
}
// Close release all db
func (p *SimpleConnectPool) Close(engine *Engine) error {
p.mutex.Lock()
defer p.mutex.Unlock()
for len(p.releasedConnects) > 0 {
p.releasedConnects[0].Close()
p.releasedConnects = p.releasedConnects[1:]
}
return nil
}
func (p *SimpleConnectPool) SetMaxIdleConns(conns int) {
p.maxIdleConns = conns
}
func (p *SimpleConnectPool) MaxIdleConns() int {
return p.maxIdleConns
}
// not implemented
func (p *SimpleConnectPool) SetMaxConns(conns int) {
}
// not implemented
func (p *SimpleConnectPool) MaxConns() int {
return -1
}

View File

@ -42,7 +42,7 @@ func newRows(session *Session, bean interface{}) (*Rows, error) {
args = rows.session.Statement.RawParams
}
for _, filter := range rows.session.Engine.Filters {
for _, filter := range rows.session.Engine.dialect.Filters() {
sqlStr = filter.Do(sqlStr, session.Engine.dialect, rows.session.Statement.RefTable)
}

View File

@ -61,7 +61,7 @@ func (session *Session) Close() {
}
if session.Db != nil {
session.Engine.Pool.ReleaseDB(session.Engine, session.Db)
//session.Engine.Pool.ReleaseDB(session.Engine, session.Db)
session.Db = nil
session.Tx = nil
session.stmtCache = nil
@ -267,11 +267,11 @@ func (session *Session) Having(conditions string) *Session {
func (session *Session) newDb() error {
if session.Db == nil {
db, err := session.Engine.Pool.RetrieveDB(session.Engine)
/*db, err := session.Engine.Pool.RetrieveDB(session.Engine)
if err != nil {
return err
}
session.Db = db
}*/
session.Db = session.Engine.db
session.stmtCache = make(map[uint32]*core.Stmt, 0)
}
return nil
@ -426,7 +426,7 @@ func (session *Session) innerExec(sqlStr string, args ...interface{}) (sql.Resul
}
func (session *Session) exec(sqlStr string, args ...interface{}) (sql.Result, error) {
for _, filter := range session.Engine.Filters {
for _, filter := range session.Engine.dialect.Filters() {
sqlStr = filter.Do(sqlStr, session.Engine.dialect, session.Statement.RefTable)
}
@ -612,7 +612,7 @@ func (session *Session) cacheGet(bean interface{}, sqlStr string, args ...interf
if session.Statement.RefTable == nil || len(session.Statement.RefTable.PrimaryKeys) != 1 {
return false, ErrCacheFailed
}
for _, filter := range session.Engine.Filters {
for _, filter := range session.Engine.dialect.Filters() {
sqlStr = filter.Do(sqlStr, session.Engine.dialect, session.Statement.RefTable)
}
newsql := session.Statement.convertIdSql(sqlStr)
@ -699,7 +699,7 @@ func (session *Session) cacheFind(t reflect.Type, sqlStr string, rowsSlicePtr in
return ErrCacheFailed
}
for _, filter := range session.Engine.Filters {
for _, filter := range session.Engine.dialect.Filters() {
sqlStr = filter.Do(sqlStr, session.Engine.dialect, session.Statement.RefTable)
}
@ -1709,7 +1709,7 @@ func (session *Session) row2Bean(rows *core.Rows, fields []string, fieldsCount i
}
func (session *Session) queryPreprocess(sqlStr *string, paramStr ...interface{}) {
for _, filter := range session.Engine.Filters {
for _, filter := range session.Engine.dialect.Filters() {
*sqlStr = filter.Do(*sqlStr, session.Engine.dialect, session.Statement.RefTable)
}
@ -2263,7 +2263,7 @@ func (session *Session) bytes2Value(col *core.Column, fieldValue *reflect.Value,
var err error
// for mysql, when use bit, it returned \x01
if col.SQLType.Name == core.Bit &&
strings.Contains(session.Engine.DriverName, "mysql") {
strings.Contains(session.Engine.DriverName(), "mysql") {
if len(data) == 1 {
x = int64(data[0])
} else {
@ -2289,7 +2289,7 @@ func (session *Session) bytes2Value(col *core.Column, fieldValue *reflect.Value,
var err error
// for mysql, when use bit, it returned \x01
if col.SQLType.Name == core.Bit &&
strings.Contains(session.Engine.DriverName, "mysql") {
strings.Contains(session.Engine.DriverName(), "mysql") {
if len(data) == 1 {
x = int(data[0])
} else {
@ -2347,7 +2347,7 @@ func (session *Session) bytes2Value(col *core.Column, fieldValue *reflect.Value,
var err error
// for mysql, when use bit, it returned \x01
if col.SQLType.Name == core.Bit &&
strings.Contains(session.Engine.DriverName, "mysql") {
strings.Contains(session.Engine.DriverName(), "mysql") {
if len(data) == 1 {
x = int8(data[0])
} else {
@ -2376,7 +2376,7 @@ func (session *Session) bytes2Value(col *core.Column, fieldValue *reflect.Value,
var err error
// for mysql, when use bit, it returned \x01
if col.SQLType.Name == core.Bit &&
strings.Contains(session.Engine.DriverName, "mysql") {
strings.Contains(session.Engine.DriverName(), "mysql") {
if len(data) == 1 {
x = int16(data[0])
} else {
@ -2583,7 +2583,7 @@ func (session *Session) innerInsert(bean interface{}) (int64, error) {
// for postgres, many of them didn't implement lastInsertId, so we should
// implemented it ourself.
if session.Engine.DriverName != core.POSTGRES || table.AutoIncrement == "" {
if session.Engine.DriverName() != core.POSTGRES || table.AutoIncrement == "" {
res, err := session.exec(sqlStr, args...)
if err != nil {
return 0, err
@ -2782,7 +2782,7 @@ func (session *Session) cacheUpdate(sqlStr string, args ...interface{}) error {
if newsql == "" {
return ErrCacheFailed
}
for _, filter := range session.Engine.Filters {
for _, filter := range session.Engine.dialect.Filters() {
newsql = filter.Do(newsql, session.Engine.dialect, session.Statement.RefTable)
}
session.Engine.LogDebug("[xorm:cacheUpdate] new sql", oldhead, newsql)
@ -3085,7 +3085,7 @@ func (session *Session) cacheDelete(sqlStr string, args ...interface{}) error {
return ErrCacheFailed
}
for _, filter := range session.Engine.Filters {
for _, filter := range session.Engine.dialect.Filters() {
sqlStr = filter.Do(sqlStr, session.Engine.dialect, session.Statement.RefTable)
}

40
xorm.go
View File

@ -69,39 +69,35 @@ func NewEngine(driverName string, dataSourceName string) (*Engine, error) {
return nil, err
}
db, err := core.OpenDialect(dialect)
if err != nil {
return nil, err
}
engine := &Engine{
DriverName: driverName,
DataSourceName: dataSourceName,
dialect: dialect,
db: db,
dialect: dialect,
Tables: make(map[reflect.Type]*core.Table),
mutex: &sync.RWMutex{},
TagIdentifier: "xorm",
Logger: NewSimpleLogger(os.Stdout),
}
engine.SetMapper(core.NewCacheMapper(new(core.SnakeMapper)))
engine.Filters = dialect.Filters()
engine.Tables = make(map[reflect.Type]*core.Table)
engine.mutex = &sync.RWMutex{}
engine.TagIdentifier = "xorm"
engine.Logger = NewSimpleLogger(os.Stdout)
//engine.Pool = NewSimpleConnectPool()
//engine.Pool = NewNoneConnectPool()
//engine.Filters = dialect.Filters()
//engine.Cacher = NewLRUCacher()
err = engine.SetPool(NewSysConnectPool())
//err = engine.SetPool(NewSysConnectPool())
runtime.SetFinalizer(engine, close)
return engine, err
}
// func NewLRUCacher(store core.CacheStore, max int) *LRUCacher {
// return NewLRUCacher(store, core.CacheExpired, core.CacheMaxMemory, max)
// }
// clone an engine
func (engine *Engine) Clone() (*Engine, error) {
return NewEngine(engine.dialect.DriverName(), engine.dialect.DataSourceName())
}
func NewLRUCacher2(store core.CacheStore, expired time.Duration, max int) *LRUCacher {
return NewLRUCacher(store, expired, 0, max)
}
// func NewMemoryStore() *MemoryStore {
// return NewMemoryStore()
// }