changed load balancer behavior

This commit is contained in:
mantaohuang 2020-04-14 17:36:58 -04:00
parent a1377a5962
commit 6cf38cd7f3
4 changed files with 126 additions and 60 deletions

View File

@ -3,5 +3,5 @@ proxy:{% for i in instances %}
weight: {{i["weight"]}} weight: {{i["weight"]}}
{% end %} {% end %}
load-balance-mode: "cached-shuffle" load-balance-mode: "cached-shuffle"
cache-clean-interval: 0 cache-max-length: 256
dynamic-weight-file: "{{dynamic_weight_fp}}" dynamic-weight-file: "{{dynamic_weight_fp}}"

View File

@ -1,12 +1,12 @@
proxy: proxy:
- url: "socks5://192.168.122.128:1080"
weight: 5
- url: "socks5://192.168.122.128:1081" - url: "socks5://192.168.122.128:1081"
weight: 5 weight: 5
- url: "socks5://192.168.122.128:1082" - url: "socks5://192.168.122.128:1082"
weight: 5 weight: 5
- url: "socks5://192.168.122.128:1083" - url: "socks5://192.168.122.128:1083"
weight: 5 weight: 5
load-balance-mode: "cached-shuffle" - url: "socks5://192.168.122.128:1084"
cache-clean-interval: 10 weight: 5
load-balance-mode: "sticky"
cache-max-length: 256
dynamic-weight-file: "dw.json" dynamic-weight-file: "dw.json"

View File

@ -1,3 +1,3 @@
{ {
"weights": [1, 0, 0, 0] "weights": [1, 0, 0, 1]
} }

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"container/list"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@ -9,7 +10,6 @@ import (
"sort" "sort"
"sync" "sync"
"syscall" "syscall"
"time"
wr "github.com/mroth/weightedrand" wr "github.com/mroth/weightedrand"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
@ -20,29 +20,98 @@ const (
sticky = "sticky" sticky = "sticky"
) )
type queueMapElement struct {
i int
e *list.Element
}
type queueMap struct {
m map[string]*queueMapElement
l *list.List
}
func NewQueueMap() *queueMap {
var qm queueMap
qm.l = list.New()
qm.m = make(map[string]*queueMapElement)
return &qm
}
func (qm *queueMap) Set(key string, value int) {
qme, present := qm.m[key]
if present {
// key already in qm, bring it to the head
qm.l.MoveToFront(qme.e)
qme.i = value
} else {
// key not present, add to the head
e := qm.l.PushFront(key)
newQME := queueMapElement{i: value, e: e}
qm.m[key] = &newQME
}
}
func (qm *queueMap) Get(key string) (int, bool) {
value, present := qm.m[key]
if present {
return value.i, present
}
return 0, false
}
func (qm *queueMap) Trim(maxLength int) {
nToTrim := qm.l.Len() - maxLength
if nToTrim > 0 {
for i := 0; i < nToTrim; i++ {
e := qm.l.Back()
key := (e.Value).(string)
delete(qm.m, key)
qm.l.Remove(e)
}
}
}
func (qm *queueMap) SetTrim(key string, value int, maxLength int) {
qm.Set(key, value)
qm.Trim(maxLength)
}
func (qm *queueMap) Filter(f func(int) int) {
e := qm.l.Front()
for e != nil {
key := e.Value.(string)
newValue := f(qm.m[key].i)
if newValue < 0 {
// need to remove
delete(qm.m, key)
ne := e.Next()
qm.l.Remove(e)
e = ne
} else {
qm.m[key].i = newValue
e = e.Next()
}
}
}
type proxyInst struct { type proxyInst struct {
URL string URL string
Weight int Weight int
StatusHistory []bool StatusHistory []bool
} }
type ProxyManager struct { type ProxyManager struct {
Proxys []proxyInst Proxys []proxyInst
Cache map[string]int Cache *queueMap
LoadBalanceMode string LoadBalanceMode string
CacheCleanInterval int CacheMaxLength int
DynamicWeightFile string DynamicWeightFile string
Chooser wr.Chooser Chooser wr.Chooser
enabledIndics []int enabledIndics []int
mux sync.Mutex mux sync.Mutex
} }
type proxyConf struct { type proxyConf struct {
Proxys []struct { Proxys []struct {
URL string `yaml:"url"` URL string `yaml:"url"`
Weight int `yaml:"weight"` Weight int `yaml:"weight"`
} `yaml:"proxy"` } `yaml:"proxy"`
LoadBalanceMode string `yaml:"load-balance-mode"` LoadBalanceMode string `yaml:"load-balance-mode"`
CacheCleanInterval int `yaml:"cache-clean-interval"` CacheMaxLength int `yaml:"cache-max-length"`
DynamicWeightFile string `yaml:"dynamic-weight-file"` DynamicWeightFile string `yaml:"dynamic-weight-file"`
} }
type dynamicWeight struct { type dynamicWeight struct {
Weights []int `json:"weights"` Weights []int `json:"weights"`
@ -68,20 +137,20 @@ func NewPM(confFp string) (*ProxyManager, error) {
pi.Weight = pc.Weight pi.Weight = pc.Weight
pm.Proxys = append(pm.Proxys, pi) pm.Proxys = append(pm.Proxys, pi)
} }
pm.applyWeight()
pm.LoadBalanceMode = cfg.LoadBalanceMode pm.LoadBalanceMode = cfg.LoadBalanceMode
pm.CacheCleanInterval = cfg.CacheCleanInterval pm.CacheMaxLength = cfg.CacheMaxLength
pm.DynamicWeightFile = cfg.DynamicWeightFile pm.DynamicWeightFile = cfg.DynamicWeightFile
pm.Cache = make(map[string]int) pm.Cache = NewQueueMap()
go pm.keepClearingCache() pm.applyWeight()
// go pm.keepClearingCache()
go pm.dynamicWeightLoader() go pm.dynamicWeightLoader()
return &pm, nil return &pm, nil
} }
func (pm *ProxyManager) DescribeLoadBalanceMode() string { func (pm *ProxyManager) DescribeLoadBalanceMode() string {
if pm.LoadBalanceMode == sticky { if pm.LoadBalanceMode == sticky {
return fmt.Sprintf("sticky with cache clearing interval: %ds", pm.CacheCleanInterval) return fmt.Sprintf("sticky with maximum cache size of %d", pm.CacheMaxLength)
} else if pm.LoadBalanceMode == cachedShuffle { } else if pm.LoadBalanceMode == cachedShuffle {
return "cached shuffling" return fmt.Sprintf("cached shuffle with maximum cache size of %d", pm.CacheMaxLength)
} else { } else {
return "randomize every connection" return "randomize every connection"
} }
@ -99,75 +168,72 @@ func (pm *ProxyManager) applyWeight() {
} }
if len(pm.enabledIndics) == 0 { if len(pm.enabledIndics) == 0 {
fmt.Println("no enabled indice, proxy will be disabled.") fmt.Println("no enabled indice, proxy will be disabled.")
pm.Cache = NewQueueMap()
return return
} }
fmt.Println("enabled indices:", pm.enabledIndics) fmt.Println("enabled indices:", pm.enabledIndics)
pm.Chooser = wr.NewChooser(chooseArr...) pm.Chooser = wr.NewChooser(chooseArr...)
// update cache to remove disabled proxys // update cache to remove disabled proxys
for addr, idx := range pm.Cache { filterFn := func(idx int) int {
i := previousEnabledIndics[idx] i := previousEnabledIndics[idx]
pos := sort.SearchInts(pm.enabledIndics, i) pos := sort.SearchInts(pm.enabledIndics, i)
if !(pos == len(pm.enabledIndics)) && pm.enabledIndics[pos] == i { shouldRemain := !(pos == len(pm.enabledIndics)) && pm.enabledIndics[pos] == i
// found in current enabled list if shouldRemain {
pm.Cache[addr] = pos // found in current enabled indics, use it
return pos
} else { } else {
// not found in current enabled list return -1
pm.Cache[addr] = pm.Chooser.Pick().(int)
} }
} }
pm.Cache.Filter(filterFn)
} }
func (pm *ProxyManager) Get(addr string) (string, bool) { func (pm *ProxyManager) Get(addr string) (string, bool) {
pm.mux.Lock() pm.mux.Lock()
defer pm.mux.Unlock() defer pm.mux.Unlock()
// fmt.Println(pm.Cache) // fmt.Println(pm.Cache)IsIn
if len(pm.enabledIndics) == 0 {
// return error if no enabled proxy
return "", false
}
if pm.LoadBalanceMode == sticky { if pm.LoadBalanceMode == sticky {
idx, ok := pm.Cache[addr] idx, ok := pm.Cache.Get(addr)
if ok { if ok {
// fmt.Println("match addr", addr, "using:", idx) // fmt.Println("match addr", addr, "using:", idx)
return pm.Proxys[pm.enabledIndics[idx]].URL, true return pm.Proxys[pm.enabledIndics[idx]].URL, true
} else { } else {
idx := pm.Chooser.Pick().(int) idx := pm.Chooser.Pick().(int)
pm.Cache[addr] = idx pm.Cache.SetTrim(addr, idx, pm.CacheMaxLength)
return pm.Proxys[pm.enabledIndics[idx]].URL, true return pm.Proxys[pm.enabledIndics[idx]].URL, true
} }
} else if pm.LoadBalanceMode == cachedShuffle { } else if pm.LoadBalanceMode == cachedShuffle {
idx, ok := pm.Cache[addr] idx, ok := pm.Cache.Get(addr)
if ok { if ok {
pm.Cache[addr] = (idx + 1) % (len(pm.enabledIndics)) idx = (idx + 1) % (len(pm.enabledIndics))
return pm.Proxys[pm.enabledIndics[pm.Cache[addr]]].URL, true
} else { } else {
idx := pm.Chooser.Pick().(int) idx = pm.Chooser.Pick().(int)
pm.Cache[addr] = idx
return pm.Proxys[pm.enabledIndics[idx]].URL, true
} }
pm.Cache.SetTrim(addr, idx, pm.CacheMaxLength)
return pm.Proxys[pm.enabledIndics[idx]].URL, true
} else { } else {
idx := pm.Chooser.Pick().(int) idx := pm.Chooser.Pick().(int)
return pm.Proxys[pm.enabledIndics[idx]].URL, true return pm.Proxys[pm.enabledIndics[idx]].URL, true
} }
} }
func (pm *ProxyManager) ClearCache() { // func (pm *ProxyManager) ClearCache() {
// fmt.Println("clearing cache") // // fmt.Println("clearing cache")
pm.mux.Lock() // pm.mux.Lock()
pm.Cache = make(map[string]int) // pm.Cache = New()
pm.mux.Unlock() // pm.mux.Unlock()
// fmt.Println("after:", pm.Cache) // // fmt.Println("after:", pm.Cache)
} // }
func (pm *ProxyManager) keepClearingCache() { // func (pm *ProxyManager) keepClearingCache() {
interval := pm.CacheCleanInterval // interval := pm.CacheCleanInterval
if interval <= 0 { // if interval <= 0 {
return // return
} // }
for { // for {
time.Sleep(time.Duration(interval) * time.Second) // time.Sleep(time.Duration(interval) * time.Second)
pm.ClearCache() // pm.ClearCache()
} // }
} // }
func (pm *ProxyManager) dynamicWeightLoader() { func (pm *ProxyManager) dynamicWeightLoader() {
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)