refactor Group Policy
This commit is contained in:
parent
4d30c6865a
commit
694a7f1d94
110
engine_group.go
110
engine_group.go
|
@ -25,22 +25,29 @@ type EngineGroup struct {
|
|||
p int
|
||||
}
|
||||
|
||||
func NewGroup(args1 interface{}, args2 interface{}, policy ...Policy) (*EngineGroup, error) {
|
||||
func NewGroup(args1 interface{}, args2 interface{}, policies ...Policy) (*EngineGroup, error) {
|
||||
var policy Policy
|
||||
if len(policies) > 0 {
|
||||
policy = policies[0]
|
||||
} else {
|
||||
policy = NewRandomPolicy()
|
||||
}
|
||||
|
||||
driverName, ok1 := args1.(string)
|
||||
dataSourceNames, ok2 := args2.(string)
|
||||
if ok1 && ok2 {
|
||||
return newGroup1(driverName, dataSourceNames, policy...)
|
||||
return newGroup1(driverName, dataSourceNames, policy)
|
||||
}
|
||||
|
||||
Master, ok3 := args1.(*Engine)
|
||||
Slaves, ok4 := args2.([]*Engine)
|
||||
if ok3 && ok4 {
|
||||
return newGroup2(Master, Slaves, policy...)
|
||||
return newGroup2(Master, Slaves, policy)
|
||||
}
|
||||
return nil, ErrParamsType
|
||||
}
|
||||
|
||||
func newGroup1(driverName string, dataSourceNames string, policy ...Policy) (*EngineGroup, error) {
|
||||
func newGroup1(driverName string, dataSourceNames string, policy Policy) (*EngineGroup, error) {
|
||||
conns := strings.Split(dataSourceNames, ";")
|
||||
engines := make([]*Engine, len(conns))
|
||||
for i, _ := range conns {
|
||||
|
@ -51,60 +58,23 @@ func newGroup1(driverName string, dataSourceNames string, policy ...Policy) (*En
|
|||
engines[i] = engine
|
||||
}
|
||||
|
||||
n := len(policy)
|
||||
if n > 1 {
|
||||
return nil, ErrParamsType
|
||||
} else if n == 1 {
|
||||
eg := &EngineGroup{
|
||||
master: engines[0],
|
||||
slaves: engines[1:],
|
||||
count: len(engines),
|
||||
s_count: len(engines[1:]),
|
||||
policy: policy[0],
|
||||
}
|
||||
eg.policy.Init()
|
||||
return eg, nil
|
||||
} else {
|
||||
xPolicy := new(XormEngineGroupPolicy)
|
||||
eg := &EngineGroup{
|
||||
master: engines[0],
|
||||
slaves: engines[1:],
|
||||
count: len(engines),
|
||||
s_count: len(engines[1:]),
|
||||
policy: xPolicy,
|
||||
}
|
||||
xPolicy.Init()
|
||||
return eg, nil
|
||||
}
|
||||
|
||||
return &EngineGroup{
|
||||
master: engines[0],
|
||||
slaves: engines[1:],
|
||||
count: len(engines),
|
||||
s_count: len(engines[1:]),
|
||||
policy: policy,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newGroup2(Master *Engine, Slaves []*Engine, policy ...Policy) (*EngineGroup, error) {
|
||||
n := len(policy)
|
||||
if n > 1 {
|
||||
return nil, ErrParamsType
|
||||
} else if n == 1 {
|
||||
eg := &EngineGroup{
|
||||
master: Master,
|
||||
slaves: Slaves,
|
||||
count: 1 + len(Slaves),
|
||||
s_count: len(Slaves),
|
||||
policy: policy[0],
|
||||
}
|
||||
eg.policy.Init()
|
||||
return eg, nil
|
||||
} else {
|
||||
xPolicy := new(XormEngineGroupPolicy)
|
||||
eg := &EngineGroup{
|
||||
master: Master,
|
||||
slaves: Slaves,
|
||||
count: 1 + len(Slaves),
|
||||
s_count: len(Slaves),
|
||||
policy: xPolicy,
|
||||
}
|
||||
xPolicy.Init()
|
||||
return eg, nil
|
||||
}
|
||||
func newGroup2(Master *Engine, Slaves []*Engine, policy Policy) (*EngineGroup, error) {
|
||||
return &EngineGroup{
|
||||
master: Master,
|
||||
slaves: Slaves,
|
||||
count: 1 + len(Slaves),
|
||||
s_count: len(Slaves),
|
||||
policy: policy,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (eg *EngineGroup) SetPolicy(policy Policy) *EngineGroup {
|
||||
|
@ -112,34 +82,6 @@ func (eg *EngineGroup) SetPolicy(policy Policy) *EngineGroup {
|
|||
return eg
|
||||
}
|
||||
|
||||
func (eg *EngineGroup) UsePolicy(policy int) *EngineGroup {
|
||||
eg.p = policy
|
||||
return eg
|
||||
}
|
||||
|
||||
func (eg *EngineGroup) SetWeight(weight ...interface{}) *EngineGroup {
|
||||
l := len(weight)
|
||||
if l == 1 {
|
||||
switch weight[0].(type) {
|
||||
case []int:
|
||||
eg.weight = weight[0].([]int)
|
||||
}
|
||||
} else if l > 1 {
|
||||
s := make([]int, 0)
|
||||
for i, _ := range weight {
|
||||
switch weight[i].(type) {
|
||||
case int:
|
||||
s = append(s, weight[i].(int))
|
||||
default:
|
||||
s = append(s, 1)
|
||||
}
|
||||
}
|
||||
eg.weight = s
|
||||
}
|
||||
|
||||
return eg
|
||||
}
|
||||
|
||||
func (eg *EngineGroup) Master() *Engine {
|
||||
return eg.master
|
||||
}
|
||||
|
|
|
@ -6,129 +6,131 @@ package xorm
|
|||
|
||||
import (
|
||||
"math/rand"
|
||||
"sync"
|
||||
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
ENGINE_GROUP_POLICY_RANDOM = iota
|
||||
ENGINE_GROUP_POLICY_WEIGHTRANDOM
|
||||
ENGINE_GROUP_POLICY_ROUNDROBIN
|
||||
ENGINE_GROUP_POLICY_WEIGHTROUNDROBIN
|
||||
ENGINE_GROUP_POLICY_LEASTCONNECTIONS
|
||||
)
|
||||
|
||||
type Policy interface {
|
||||
Init()
|
||||
Slave(*EngineGroup) *Engine
|
||||
}
|
||||
|
||||
type XormEngineGroupPolicy struct {
|
||||
pos int
|
||||
slaves []int
|
||||
eg *EngineGroup
|
||||
r *rand.Rand
|
||||
type RandomPolicy struct {
|
||||
r *rand.Rand
|
||||
}
|
||||
|
||||
func (xgep *XormEngineGroupPolicy) Init() {
|
||||
xgep.r = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
func NewRandomPolicy() *RandomPolicy {
|
||||
return &RandomPolicy{
|
||||
r: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
}
|
||||
}
|
||||
|
||||
func (xgep *XormEngineGroupPolicy) Slave(eg *EngineGroup) *Engine {
|
||||
xgep.eg = eg
|
||||
return eg.slaves[xgep.slave()]
|
||||
func (policy *RandomPolicy) Slave(g *EngineGroup) *Engine {
|
||||
return g.Slaves()[policy.r.Intn(len(g.Slaves()))]
|
||||
}
|
||||
|
||||
func (xgep *XormEngineGroupPolicy) SetWeight() {
|
||||
for i, _ := range xgep.eg.weight {
|
||||
w := xgep.eg.weight[i]
|
||||
for n := 0; n < w; n++ {
|
||||
xgep.slaves = append(xgep.slaves, i)
|
||||
type WeightRandomPolicy struct {
|
||||
weights []int
|
||||
rands []int
|
||||
r *rand.Rand
|
||||
}
|
||||
|
||||
func NewWeightRandomPolicy(weights []int) *WeightRandomPolicy {
|
||||
var rands = make([]int, 0, len(weights))
|
||||
for i := 0; i < len(weights); i++ {
|
||||
for n := 0; n < weights[i]; n++ {
|
||||
rands = append(rands, i)
|
||||
}
|
||||
}
|
||||
|
||||
return &WeightRandomPolicy{
|
||||
weights: weights,
|
||||
rands: rands,
|
||||
r: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
}
|
||||
}
|
||||
|
||||
func (xgep *XormEngineGroupPolicy) slave() int {
|
||||
switch xgep.eg.p {
|
||||
case ENGINE_GROUP_POLICY_RANDOM:
|
||||
return xgep.Random()
|
||||
case ENGINE_GROUP_POLICY_WEIGHTRANDOM:
|
||||
return xgep.WeightRandom()
|
||||
case ENGINE_GROUP_POLICY_ROUNDROBIN:
|
||||
return xgep.RoundRobin()
|
||||
case ENGINE_GROUP_POLICY_WEIGHTROUNDROBIN:
|
||||
return xgep.WeightRoundRobin()
|
||||
case ENGINE_GROUP_POLICY_LEASTCONNECTIONS:
|
||||
return xgep.LeastConnections()
|
||||
default:
|
||||
return xgep.Random()
|
||||
func (policy *WeightRandomPolicy) Slave(g *EngineGroup) *Engine {
|
||||
var slaves = g.Slaves()
|
||||
idx := policy.rands[policy.r.Intn(len(policy.rands))]
|
||||
if idx >= len(slaves) {
|
||||
idx = len(slaves) - 1
|
||||
}
|
||||
|
||||
return slaves[idx]
|
||||
}
|
||||
|
||||
func (xgep *XormEngineGroupPolicy) Random() int {
|
||||
if xgep.eg.s_count <= 1 {
|
||||
return 0
|
||||
}
|
||||
|
||||
rnd := xgep.r.Intn(xgep.eg.s_count)
|
||||
return rnd
|
||||
type RoundRobinPolicy struct {
|
||||
pos int
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func (xgep *XormEngineGroupPolicy) WeightRandom() int {
|
||||
if xgep.eg.s_count <= 1 {
|
||||
return 0
|
||||
}
|
||||
|
||||
xgep.SetWeight()
|
||||
s := len(xgep.slaves)
|
||||
rnd := xgep.r.Intn(s)
|
||||
return xgep.slaves[rnd]
|
||||
func NewRoundRobinPolicy() *RoundRobinPolicy {
|
||||
return &RoundRobinPolicy{pos: -1}
|
||||
}
|
||||
|
||||
func (xgep *XormEngineGroupPolicy) RoundRobin() int {
|
||||
if xgep.eg.s_count <= 1 {
|
||||
return 0
|
||||
func (policy *RoundRobinPolicy) Slave(g *EngineGroup) *Engine {
|
||||
var pos int
|
||||
policy.lock.Lock()
|
||||
policy.pos++
|
||||
if policy.pos >= len(g.Slaves()) {
|
||||
policy.pos = 0
|
||||
}
|
||||
pos = policy.pos
|
||||
policy.lock.Unlock()
|
||||
|
||||
if xgep.pos >= xgep.eg.s_count {
|
||||
xgep.pos = 0
|
||||
}
|
||||
xgep.pos++
|
||||
|
||||
return xgep.pos - 1
|
||||
return g.Slaves()[pos]
|
||||
}
|
||||
|
||||
func (xgep *XormEngineGroupPolicy) WeightRoundRobin() int {
|
||||
if xgep.eg.s_count <= 1 {
|
||||
return 0
|
||||
}
|
||||
|
||||
xgep.SetWeight()
|
||||
count := len(xgep.slaves)
|
||||
if xgep.pos >= count {
|
||||
xgep.pos = 0
|
||||
}
|
||||
xgep.pos++
|
||||
|
||||
return xgep.slaves[xgep.pos-1]
|
||||
type WeightRoundRobin struct {
|
||||
weights []int
|
||||
rands []int
|
||||
r *rand.Rand
|
||||
lock sync.Mutex
|
||||
pos int
|
||||
}
|
||||
|
||||
func (xgep *XormEngineGroupPolicy) LeastConnections() int {
|
||||
if xgep.eg.s_count <= 1 {
|
||||
return 0
|
||||
}
|
||||
connections := 0
|
||||
slave := 0
|
||||
for i, _ := range xgep.eg.slaves {
|
||||
open_connections := xgep.eg.slaves[i].Stats()
|
||||
if i == 0 {
|
||||
connections = open_connections
|
||||
slave = i
|
||||
} else if open_connections <= connections {
|
||||
slave = i
|
||||
connections = open_connections
|
||||
func NewWeightRoundRobin(weights []int) *WeightRoundRobin {
|
||||
var rands = make([]int, 0, len(weights))
|
||||
for i := 0; i < len(weights); i++ {
|
||||
for n := 0; n < weights[i]; n++ {
|
||||
rands = append(rands, i)
|
||||
}
|
||||
}
|
||||
return slave
|
||||
|
||||
return &WeightRoundRobin{
|
||||
weights: weights,
|
||||
rands: rands,
|
||||
r: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
pos: -1,
|
||||
}
|
||||
}
|
||||
|
||||
func (policy *WeightRoundRobin) Slave(g *EngineGroup) *Engine {
|
||||
var slaves = g.Slaves()
|
||||
var pos int
|
||||
policy.lock.Lock()
|
||||
policy.pos++
|
||||
if policy.pos >= len(g.Slaves()) {
|
||||
policy.pos = 0
|
||||
}
|
||||
pos = policy.pos
|
||||
policy.lock.Unlock()
|
||||
|
||||
idx := policy.rands[pos]
|
||||
if idx >= len(slaves) {
|
||||
idx = len(slaves) - 1
|
||||
}
|
||||
return slaves[idx]
|
||||
}
|
||||
|
||||
type LeastConnPolicy struct {
|
||||
}
|
||||
|
||||
func NewLeastConnPolicy() *LeastConnPolicy {
|
||||
return &LeastConnPolicy{}
|
||||
}
|
||||
|
||||
func (policy *LeastConnPolicy) Slave(g *EngineGroup) *Engine {
|
||||
panic("not implementation")
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue