diff --git a/engine_group.go b/engine_group.go index b8cdc9c8..39d4c37d 100644 --- a/engine_group.go +++ b/engine_group.go @@ -25,22 +25,29 @@ type EngineGroup struct { p int } -func NewGroup(args1 interface{}, args2 interface{}, policy ...Policy) (*EngineGroup, error) { +func NewGroup(args1 interface{}, args2 interface{}, policies ...Policy) (*EngineGroup, error) { + var policy Policy + if len(policies) > 0 { + policy = policies[0] + } else { + policy = NewRandomPolicy() + } + driverName, ok1 := args1.(string) dataSourceNames, ok2 := args2.(string) if ok1 && ok2 { - return newGroup1(driverName, dataSourceNames, policy...) + return newGroup1(driverName, dataSourceNames, policy) } Master, ok3 := args1.(*Engine) Slaves, ok4 := args2.([]*Engine) if ok3 && ok4 { - return newGroup2(Master, Slaves, policy...) + return newGroup2(Master, Slaves, policy) } return nil, ErrParamsType } -func newGroup1(driverName string, dataSourceNames string, policy ...Policy) (*EngineGroup, error) { +func newGroup1(driverName string, dataSourceNames string, policy Policy) (*EngineGroup, error) { conns := strings.Split(dataSourceNames, ";") engines := make([]*Engine, len(conns)) for i, _ := range conns { @@ -51,58 +58,23 @@ func newGroup1(driverName string, dataSourceNames string, policy ...Policy) (*En engines[i] = engine } - n := len(policy) - if n > 1 { - return nil, ErrParamsType - } else if n == 1 { - eg := &EngineGroup{ - master: engines[0], - slaves: engines[1:], - count: len(engines), - s_count: len(engines[1:]), - policy: policy[0], - } - return eg, nil - } else { - xPolicy := new(XormEngineGroupPolicy) - eg := &EngineGroup{ - master: engines[0], - slaves: engines[1:], - count: len(engines), - s_count: len(engines[1:]), - policy: xPolicy, - } - xPolicy.Init() - return eg, nil - } - + return &EngineGroup{ + master: engines[0], + slaves: engines[1:], + count: len(engines), + s_count: len(engines[1:]), + policy: policy, + }, nil } -func newGroup2(Master *Engine, Slaves []*Engine, policy ...Policy) (*EngineGroup, error) { - n := len(policy) - if n > 1 { - return nil, ErrParamsType - } else if n == 1 { - eg := &EngineGroup{ - master: Master, - slaves: Slaves, - count: 1 + len(Slaves), - s_count: len(Slaves), - policy: policy[0], - } - return eg, nil - } else { - xPolicy := new(XormEngineGroupPolicy) - eg := &EngineGroup{ - master: Master, - slaves: Slaves, - count: 1 + len(Slaves), - s_count: len(Slaves), - policy: xPolicy, - } - xPolicy.Init() - return eg, nil - } +func newGroup2(Master *Engine, Slaves []*Engine, policy Policy) (*EngineGroup, error) { + return &EngineGroup{ + master: Master, + slaves: Slaves, + count: 1 + len(Slaves), + s_count: len(Slaves), + policy: policy, + }, nil } func (eg *EngineGroup) SetPolicy(policy Policy) *EngineGroup { @@ -110,34 +82,6 @@ func (eg *EngineGroup) SetPolicy(policy Policy) *EngineGroup { return eg } -func (eg *EngineGroup) UsePolicy(policy int) *EngineGroup { - eg.p = policy - return eg -} - -func (eg *EngineGroup) SetWeight(weight ...interface{}) *EngineGroup { - l := len(weight) - if l == 1 { - switch weight[0].(type) { - case []int: - eg.weight = weight[0].([]int) - } - } else if l > 1 { - s := make([]int, 0) - for i, _ := range weight { - switch weight[i].(type) { - case int: - s = append(s, weight[i].(int)) - default: - s = append(s, 1) - } - } - eg.weight = s - } - - return eg -} - func (eg *EngineGroup) Master() *Engine { return eg.master } @@ -858,11 +802,6 @@ func (eg *EngineGroup) Import(r io.Reader) ([]sql.Result, error) { return eg.Master().Import(r) } -// NowTime2 return current time -func (eg *EngineGroup) NowTime2(sqlTypeName string) (interface{}, time.Time) { - return eg.Master().NowTime2(sqlTypeName) -} - // Unscoped always disable struct tag "deleted" func (eg *EngineGroup) Unscoped() *EGSession { egs := eg.NewEGSession() diff --git a/engine_group_policy.go b/engine_group_policy.go index 311ba67f..bd4ba4ae 100644 --- a/engine_group_policy.go +++ b/engine_group_policy.go @@ -6,128 +6,131 @@ package xorm import ( "math/rand" + "sync" "time" ) -const ( - ENGINE_GROUP_POLICY_RANDOM = iota - ENGINE_GROUP_POLICY_WEIGHTRANDOM - ENGINE_GROUP_POLICY_ROUNDROBIN - ENGINE_GROUP_POLICY_WEIGHTROUNDROBIN - ENGINE_GROUP_POLICY_LEASTCONNECTIONS -) - type Policy interface { Slave(*EngineGroup) *Engine } -type XormEngineGroupPolicy struct { - pos int - slaves []int - eg *EngineGroup - r *rand.Rand +type RandomPolicy struct { + r *rand.Rand } -func (xgep *XormEngineGroupPolicy) Init() { - xgep.r = rand.New(rand.NewSource(time.Now().UnixNano())) +func NewRandomPolicy() *RandomPolicy { + return &RandomPolicy{ + r: rand.New(rand.NewSource(time.Now().UnixNano())), + } } -func (xgep *XormEngineGroupPolicy) Slave(eg *EngineGroup) *Engine { - xgep.eg = eg - return eg.slaves[xgep.slave()] +func (policy *RandomPolicy) Slave(g *EngineGroup) *Engine { + return g.Slaves()[policy.r.Intn(len(g.Slaves()))] } -func (xgep *XormEngineGroupPolicy) SetWeight() { - for i, _ := range xgep.eg.weight { - w := xgep.eg.weight[i] - for n := 0; n < w; n++ { - xgep.slaves = append(xgep.slaves, i) +type WeightRandomPolicy struct { + weights []int + rands []int + r *rand.Rand +} + +func NewWeightRandomPolicy(weights []int) *WeightRandomPolicy { + var rands = make([]int, 0, len(weights)) + for i := 0; i < len(weights); i++ { + for n := 0; n < weights[i]; n++ { + rands = append(rands, i) } } + + return &WeightRandomPolicy{ + weights: weights, + rands: rands, + r: rand.New(rand.NewSource(time.Now().UnixNano())), + } } -func (xgep *XormEngineGroupPolicy) slave() int { - switch xgep.eg.p { - case ENGINE_GROUP_POLICY_RANDOM: - return xgep.Random() - case ENGINE_GROUP_POLICY_WEIGHTRANDOM: - return xgep.WeightRandom() - case ENGINE_GROUP_POLICY_ROUNDROBIN: - return xgep.RoundRobin() - case ENGINE_GROUP_POLICY_WEIGHTROUNDROBIN: - return xgep.WeightRoundRobin() - case ENGINE_GROUP_POLICY_LEASTCONNECTIONS: - return xgep.LeastConnections() - default: - return xgep.Random() +func (policy *WeightRandomPolicy) Slave(g *EngineGroup) *Engine { + var slaves = g.Slaves() + idx := policy.rands[policy.r.Intn(len(policy.rands))] + if idx >= len(slaves) { + idx = len(slaves) - 1 } - + return slaves[idx] } -func (xgep *XormEngineGroupPolicy) Random() int { - if xgep.eg.s_count <= 1 { - return 0 - } - - rnd := xgep.r.Intn(xgep.eg.s_count) - return rnd +type RoundRobinPolicy struct { + pos int + lock sync.Mutex } -func (xgep *XormEngineGroupPolicy) WeightRandom() int { - if xgep.eg.s_count <= 1 { - return 0 - } - - xgep.SetWeight() - s := len(xgep.slaves) - rnd := xgep.r.Intn(s) - return xgep.slaves[rnd] +func NewRoundRobinPolicy() *RoundRobinPolicy { + return &RoundRobinPolicy{pos: -1} } -func (xgep *XormEngineGroupPolicy) RoundRobin() int { - if xgep.eg.s_count <= 1 { - return 0 +func (policy *RoundRobinPolicy) Slave(g *EngineGroup) *Engine { + var pos int + policy.lock.Lock() + policy.pos++ + if policy.pos >= len(g.Slaves()) { + policy.pos = 0 } + pos = policy.pos + policy.lock.Unlock() - if xgep.pos >= xgep.eg.s_count { - xgep.pos = 0 - } - xgep.pos++ - - return xgep.pos - 1 + return g.Slaves()[pos] } -func (xgep *XormEngineGroupPolicy) WeightRoundRobin() int { - if xgep.eg.s_count <= 1 { - return 0 - } - - xgep.SetWeight() - count := len(xgep.slaves) - if xgep.pos >= count { - xgep.pos = 0 - } - xgep.pos++ - - return xgep.slaves[xgep.pos-1] +type WeightRoundRobin struct { + weights []int + rands []int + r *rand.Rand + lock sync.Mutex + pos int } -func (xgep *XormEngineGroupPolicy) LeastConnections() int { - if xgep.eg.s_count <= 1 { - return 0 - } - connections := 0 - slave := 0 - for i, _ := range xgep.eg.slaves { - open_connections := xgep.eg.slaves[i].Stats() - if i == 0 { - connections = open_connections - slave = i - } else if open_connections <= connections { - slave = i - connections = open_connections +func NewWeightRoundRobin(weights []int) *WeightRoundRobin { + var rands = make([]int, 0, len(weights)) + for i := 0; i < len(weights); i++ { + for n := 0; n < weights[i]; n++ { + rands = append(rands, i) } } - return slave + + return &WeightRoundRobin{ + weights: weights, + rands: rands, + r: rand.New(rand.NewSource(time.Now().UnixNano())), + pos: -1, + } +} + +func (policy *WeightRoundRobin) Slave(g *EngineGroup) *Engine { + var slaves = g.Slaves() + var pos int + policy.lock.Lock() + policy.pos++ + if policy.pos >= len(g.Slaves()) { + policy.pos = 0 + } + pos = policy.pos + policy.lock.Unlock() + + idx := policy.rands[pos] + if idx >= len(slaves) { + idx = len(slaves) - 1 + } + return slaves[idx] +} + +type LeastConnPolicy struct { +} + +func NewLeastConnPolicy() *LeastConnPolicy { + return &LeastConnPolicy{} +} + +func (policy *LeastConnPolicy) Slave(g *EngineGroup) *Engine { + panic("not implementation") + return nil }