refactor Group Policy

This commit is contained in:
Lunny Xiao 2017-09-26 11:26:06 +08:00
parent 196e520b4b
commit 5f5e7735c0
No known key found for this signature in database
GPG Key ID: C3B7C91B632F738A
2 changed files with 120 additions and 178 deletions

View File

@ -25,22 +25,29 @@ type EngineGroup struct {
p int p int
} }
func NewGroup(args1 interface{}, args2 interface{}, policy ...Policy) (*EngineGroup, error) { func NewGroup(args1 interface{}, args2 interface{}, policies ...Policy) (*EngineGroup, error) {
var policy Policy
if len(policies) > 0 {
policy = policies[0]
} else {
policy = NewRandomPolicy()
}
driverName, ok1 := args1.(string) driverName, ok1 := args1.(string)
dataSourceNames, ok2 := args2.(string) dataSourceNames, ok2 := args2.(string)
if ok1 && ok2 { if ok1 && ok2 {
return newGroup1(driverName, dataSourceNames, policy...) return newGroup1(driverName, dataSourceNames, policy)
} }
Master, ok3 := args1.(*Engine) Master, ok3 := args1.(*Engine)
Slaves, ok4 := args2.([]*Engine) Slaves, ok4 := args2.([]*Engine)
if ok3 && ok4 { if ok3 && ok4 {
return newGroup2(Master, Slaves, policy...) return newGroup2(Master, Slaves, policy)
} }
return nil, ErrParamsType return nil, ErrParamsType
} }
func newGroup1(driverName string, dataSourceNames string, policy ...Policy) (*EngineGroup, error) { func newGroup1(driverName string, dataSourceNames string, policy Policy) (*EngineGroup, error) {
conns := strings.Split(dataSourceNames, ";") conns := strings.Split(dataSourceNames, ";")
engines := make([]*Engine, len(conns)) engines := make([]*Engine, len(conns))
for i, _ := range conns { for i, _ := range conns {
@ -51,58 +58,23 @@ func newGroup1(driverName string, dataSourceNames string, policy ...Policy) (*En
engines[i] = engine engines[i] = engine
} }
n := len(policy) return &EngineGroup{
if n > 1 { master: engines[0],
return nil, ErrParamsType slaves: engines[1:],
} else if n == 1 { count: len(engines),
eg := &EngineGroup{ s_count: len(engines[1:]),
master: engines[0], policy: policy,
slaves: engines[1:], }, nil
count: len(engines),
s_count: len(engines[1:]),
policy: policy[0],
}
return eg, nil
} else {
xPolicy := new(XormEngineGroupPolicy)
eg := &EngineGroup{
master: engines[0],
slaves: engines[1:],
count: len(engines),
s_count: len(engines[1:]),
policy: xPolicy,
}
xPolicy.Init()
return eg, nil
}
} }
func newGroup2(Master *Engine, Slaves []*Engine, policy ...Policy) (*EngineGroup, error) { func newGroup2(Master *Engine, Slaves []*Engine, policy Policy) (*EngineGroup, error) {
n := len(policy) return &EngineGroup{
if n > 1 { master: Master,
return nil, ErrParamsType slaves: Slaves,
} else if n == 1 { count: 1 + len(Slaves),
eg := &EngineGroup{ s_count: len(Slaves),
master: Master, policy: policy,
slaves: Slaves, }, nil
count: 1 + len(Slaves),
s_count: len(Slaves),
policy: policy[0],
}
return eg, nil
} else {
xPolicy := new(XormEngineGroupPolicy)
eg := &EngineGroup{
master: Master,
slaves: Slaves,
count: 1 + len(Slaves),
s_count: len(Slaves),
policy: xPolicy,
}
xPolicy.Init()
return eg, nil
}
} }
func (eg *EngineGroup) SetPolicy(policy Policy) *EngineGroup { func (eg *EngineGroup) SetPolicy(policy Policy) *EngineGroup {
@ -110,34 +82,6 @@ func (eg *EngineGroup) SetPolicy(policy Policy) *EngineGroup {
return eg return eg
} }
func (eg *EngineGroup) UsePolicy(policy int) *EngineGroup {
eg.p = policy
return eg
}
func (eg *EngineGroup) SetWeight(weight ...interface{}) *EngineGroup {
l := len(weight)
if l == 1 {
switch weight[0].(type) {
case []int:
eg.weight = weight[0].([]int)
}
} else if l > 1 {
s := make([]int, 0)
for i, _ := range weight {
switch weight[i].(type) {
case int:
s = append(s, weight[i].(int))
default:
s = append(s, 1)
}
}
eg.weight = s
}
return eg
}
func (eg *EngineGroup) Master() *Engine { func (eg *EngineGroup) Master() *Engine {
return eg.master return eg.master
} }
@ -858,11 +802,6 @@ func (eg *EngineGroup) Import(r io.Reader) ([]sql.Result, error) {
return eg.Master().Import(r) return eg.Master().Import(r)
} }
// NowTime2 return current time
func (eg *EngineGroup) NowTime2(sqlTypeName string) (interface{}, time.Time) {
return eg.Master().NowTime2(sqlTypeName)
}
// Unscoped always disable struct tag "deleted" // Unscoped always disable struct tag "deleted"
func (eg *EngineGroup) Unscoped() *EGSession { func (eg *EngineGroup) Unscoped() *EGSession {
egs := eg.NewEGSession() egs := eg.NewEGSession()

View File

@ -6,128 +6,131 @@ package xorm
import ( import (
"math/rand" "math/rand"
"sync"
"time" "time"
) )
const (
ENGINE_GROUP_POLICY_RANDOM = iota
ENGINE_GROUP_POLICY_WEIGHTRANDOM
ENGINE_GROUP_POLICY_ROUNDROBIN
ENGINE_GROUP_POLICY_WEIGHTROUNDROBIN
ENGINE_GROUP_POLICY_LEASTCONNECTIONS
)
type Policy interface { type Policy interface {
Slave(*EngineGroup) *Engine Slave(*EngineGroup) *Engine
} }
type XormEngineGroupPolicy struct { type RandomPolicy struct {
pos int r *rand.Rand
slaves []int
eg *EngineGroup
r *rand.Rand
} }
func (xgep *XormEngineGroupPolicy) Init() { func NewRandomPolicy() *RandomPolicy {
xgep.r = rand.New(rand.NewSource(time.Now().UnixNano())) return &RandomPolicy{
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
} }
func (xgep *XormEngineGroupPolicy) Slave(eg *EngineGroup) *Engine { func (policy *RandomPolicy) Slave(g *EngineGroup) *Engine {
xgep.eg = eg return g.Slaves()[policy.r.Intn(len(g.Slaves()))]
return eg.slaves[xgep.slave()]
} }
func (xgep *XormEngineGroupPolicy) SetWeight() { type WeightRandomPolicy struct {
for i, _ := range xgep.eg.weight { weights []int
w := xgep.eg.weight[i] rands []int
for n := 0; n < w; n++ { r *rand.Rand
xgep.slaves = append(xgep.slaves, i) }
func NewWeightRandomPolicy(weights []int) *WeightRandomPolicy {
var rands = make([]int, 0, len(weights))
for i := 0; i < len(weights); i++ {
for n := 0; n < weights[i]; n++ {
rands = append(rands, i)
} }
} }
return &WeightRandomPolicy{
weights: weights,
rands: rands,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
} }
func (xgep *XormEngineGroupPolicy) slave() int { func (policy *WeightRandomPolicy) Slave(g *EngineGroup) *Engine {
switch xgep.eg.p { var slaves = g.Slaves()
case ENGINE_GROUP_POLICY_RANDOM: idx := policy.rands[policy.r.Intn(len(policy.rands))]
return xgep.Random() if idx >= len(slaves) {
case ENGINE_GROUP_POLICY_WEIGHTRANDOM: idx = len(slaves) - 1
return xgep.WeightRandom()
case ENGINE_GROUP_POLICY_ROUNDROBIN:
return xgep.RoundRobin()
case ENGINE_GROUP_POLICY_WEIGHTROUNDROBIN:
return xgep.WeightRoundRobin()
case ENGINE_GROUP_POLICY_LEASTCONNECTIONS:
return xgep.LeastConnections()
default:
return xgep.Random()
} }
return slaves[idx]
} }
func (xgep *XormEngineGroupPolicy) Random() int { type RoundRobinPolicy struct {
if xgep.eg.s_count <= 1 { pos int
return 0 lock sync.Mutex
}
rnd := xgep.r.Intn(xgep.eg.s_count)
return rnd
} }
func (xgep *XormEngineGroupPolicy) WeightRandom() int { func NewRoundRobinPolicy() *RoundRobinPolicy {
if xgep.eg.s_count <= 1 { return &RoundRobinPolicy{pos: -1}
return 0
}
xgep.SetWeight()
s := len(xgep.slaves)
rnd := xgep.r.Intn(s)
return xgep.slaves[rnd]
} }
func (xgep *XormEngineGroupPolicy) RoundRobin() int { func (policy *RoundRobinPolicy) Slave(g *EngineGroup) *Engine {
if xgep.eg.s_count <= 1 { var pos int
return 0 policy.lock.Lock()
policy.pos++
if policy.pos >= len(g.Slaves()) {
policy.pos = 0
} }
pos = policy.pos
policy.lock.Unlock()
if xgep.pos >= xgep.eg.s_count { return g.Slaves()[pos]
xgep.pos = 0
}
xgep.pos++
return xgep.pos - 1
} }
func (xgep *XormEngineGroupPolicy) WeightRoundRobin() int { type WeightRoundRobin struct {
if xgep.eg.s_count <= 1 { weights []int
return 0 rands []int
} r *rand.Rand
lock sync.Mutex
xgep.SetWeight() pos int
count := len(xgep.slaves)
if xgep.pos >= count {
xgep.pos = 0
}
xgep.pos++
return xgep.slaves[xgep.pos-1]
} }
func (xgep *XormEngineGroupPolicy) LeastConnections() int { func NewWeightRoundRobin(weights []int) *WeightRoundRobin {
if xgep.eg.s_count <= 1 { var rands = make([]int, 0, len(weights))
return 0 for i := 0; i < len(weights); i++ {
} for n := 0; n < weights[i]; n++ {
connections := 0 rands = append(rands, i)
slave := 0
for i, _ := range xgep.eg.slaves {
open_connections := xgep.eg.slaves[i].Stats()
if i == 0 {
connections = open_connections
slave = i
} else if open_connections <= connections {
slave = i
connections = open_connections
} }
} }
return slave
return &WeightRoundRobin{
weights: weights,
rands: rands,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
pos: -1,
}
}
func (policy *WeightRoundRobin) Slave(g *EngineGroup) *Engine {
var slaves = g.Slaves()
var pos int
policy.lock.Lock()
policy.pos++
if policy.pos >= len(g.Slaves()) {
policy.pos = 0
}
pos = policy.pos
policy.lock.Unlock()
idx := policy.rands[pos]
if idx >= len(slaves) {
idx = len(slaves) - 1
}
return slaves[idx]
}
type LeastConnPolicy struct {
}
func NewLeastConnPolicy() *LeastConnPolicy {
return &LeastConnPolicy{}
}
func (policy *LeastConnPolicy) Slave(g *EngineGroup) *Engine {
panic("not implementation")
return nil
} }