From 6cf38cd7f31373e802747a963a12bf7879f3340c Mon Sep 17 00:00:00 2001 From: mantaohuang Date: Tue, 14 Apr 2020 17:36:58 -0400 Subject: [PATCH] changed load balancer behavior --- go-socks-lb.yml.template | 2 +- go-socks-lb/config.yml | 8 +- go-socks-lb/dw.json | 2 +- go-socks-lb/manager.go | 174 +++++++++++++++++++++++++++------------ 4 files changed, 126 insertions(+), 60 deletions(-) diff --git a/go-socks-lb.yml.template b/go-socks-lb.yml.template index 96e73eb..d1910d7 100644 --- a/go-socks-lb.yml.template +++ b/go-socks-lb.yml.template @@ -3,5 +3,5 @@ proxy:{% for i in instances %} weight: {{i["weight"]}} {% end %} load-balance-mode: "cached-shuffle" -cache-clean-interval: 0 +cache-max-length: 256 dynamic-weight-file: "{{dynamic_weight_fp}}" diff --git a/go-socks-lb/config.yml b/go-socks-lb/config.yml index 4fb97e6..ed41adf 100644 --- a/go-socks-lb/config.yml +++ b/go-socks-lb/config.yml @@ -1,12 +1,12 @@ proxy: - - url: "socks5://192.168.122.128:1080" - weight: 5 - url: "socks5://192.168.122.128:1081" weight: 5 - url: "socks5://192.168.122.128:1082" weight: 5 - url: "socks5://192.168.122.128:1083" weight: 5 -load-balance-mode: "cached-shuffle" -cache-clean-interval: 10 + - url: "socks5://192.168.122.128:1084" + weight: 5 +load-balance-mode: "sticky" +cache-max-length: 256 dynamic-weight-file: "dw.json" diff --git a/go-socks-lb/dw.json b/go-socks-lb/dw.json index bfae623..938d736 100644 --- a/go-socks-lb/dw.json +++ b/go-socks-lb/dw.json @@ -1,3 +1,3 @@ { - "weights": [1, 0, 0, 0] + "weights": [1, 0, 0, 1] } \ No newline at end of file diff --git a/go-socks-lb/manager.go b/go-socks-lb/manager.go index d559a91..069d1e7 100644 --- a/go-socks-lb/manager.go +++ b/go-socks-lb/manager.go @@ -1,6 +1,7 @@ package main import ( + "container/list" "encoding/json" "fmt" "io/ioutil" @@ -9,7 +10,6 @@ import ( "sort" "sync" "syscall" - "time" wr "github.com/mroth/weightedrand" "gopkg.in/yaml.v2" @@ -20,29 +20,98 @@ const ( sticky = "sticky" ) +type queueMapElement struct { + i int + e *list.Element +} +type queueMap struct { + m map[string]*queueMapElement + l *list.List +} + +func NewQueueMap() *queueMap { + var qm queueMap + qm.l = list.New() + qm.m = make(map[string]*queueMapElement) + return &qm +} + +func (qm *queueMap) Set(key string, value int) { + qme, present := qm.m[key] + if present { + // key already in qm, bring it to the head + qm.l.MoveToFront(qme.e) + qme.i = value + } else { + // key not present, add to the head + e := qm.l.PushFront(key) + newQME := queueMapElement{i: value, e: e} + qm.m[key] = &newQME + } +} +func (qm *queueMap) Get(key string) (int, bool) { + value, present := qm.m[key] + if present { + return value.i, present + } + return 0, false +} +func (qm *queueMap) Trim(maxLength int) { + nToTrim := qm.l.Len() - maxLength + if nToTrim > 0 { + for i := 0; i < nToTrim; i++ { + e := qm.l.Back() + key := (e.Value).(string) + delete(qm.m, key) + qm.l.Remove(e) + } + } +} +func (qm *queueMap) SetTrim(key string, value int, maxLength int) { + qm.Set(key, value) + qm.Trim(maxLength) +} +func (qm *queueMap) Filter(f func(int) int) { + e := qm.l.Front() + for e != nil { + key := e.Value.(string) + newValue := f(qm.m[key].i) + if newValue < 0 { + // need to remove + delete(qm.m, key) + ne := e.Next() + qm.l.Remove(e) + e = ne + } else { + qm.m[key].i = newValue + e = e.Next() + } + } +} + type proxyInst struct { URL string Weight int StatusHistory []bool } type ProxyManager struct { - Proxys []proxyInst - Cache map[string]int - LoadBalanceMode string - CacheCleanInterval int - DynamicWeightFile string - Chooser wr.Chooser - enabledIndics []int - mux sync.Mutex + Proxys []proxyInst + Cache *queueMap + LoadBalanceMode string + CacheMaxLength int + DynamicWeightFile string + Chooser wr.Chooser + enabledIndics []int + mux sync.Mutex } type proxyConf struct { Proxys []struct { URL string `yaml:"url"` Weight int `yaml:"weight"` } `yaml:"proxy"` - LoadBalanceMode string `yaml:"load-balance-mode"` - CacheCleanInterval int `yaml:"cache-clean-interval"` - DynamicWeightFile string `yaml:"dynamic-weight-file"` + LoadBalanceMode string `yaml:"load-balance-mode"` + CacheMaxLength int `yaml:"cache-max-length"` + DynamicWeightFile string `yaml:"dynamic-weight-file"` } type dynamicWeight struct { Weights []int `json:"weights"` @@ -68,20 +137,20 @@ func NewPM(confFp string) (*ProxyManager, error) { pi.Weight = pc.Weight pm.Proxys = append(pm.Proxys, pi) } - pm.applyWeight() pm.LoadBalanceMode = cfg.LoadBalanceMode - pm.CacheCleanInterval = cfg.CacheCleanInterval + pm.CacheMaxLength = cfg.CacheMaxLength pm.DynamicWeightFile = cfg.DynamicWeightFile - pm.Cache = make(map[string]int) - go pm.keepClearingCache() + pm.Cache = NewQueueMap() + pm.applyWeight() + // go pm.keepClearingCache() go pm.dynamicWeightLoader() return &pm, nil } func (pm *ProxyManager) DescribeLoadBalanceMode() string { if pm.LoadBalanceMode == sticky { - return fmt.Sprintf("sticky with cache clearing interval: %ds", pm.CacheCleanInterval) + return fmt.Sprintf("sticky with maximum cache size of %d", pm.CacheMaxLength) } else if pm.LoadBalanceMode == cachedShuffle { - return "cached shuffling" + return fmt.Sprintf("cached shuffle with maximum cache size of %d", pm.CacheMaxLength) } else { return "randomize every connection" } @@ -99,75 +168,72 @@ func (pm *ProxyManager) applyWeight() { } if len(pm.enabledIndics) == 0 { fmt.Println("no enabled indice, proxy will be disabled.") + pm.Cache = NewQueueMap() return } fmt.Println("enabled indices:", pm.enabledIndics) pm.Chooser = wr.NewChooser(chooseArr...) // update cache to remove disabled proxys - for addr, idx := range pm.Cache { + filterFn := func(idx int) int { i := previousEnabledIndics[idx] pos := sort.SearchInts(pm.enabledIndics, i) - if !(pos == len(pm.enabledIndics)) && pm.enabledIndics[pos] == i { - // found in current enabled list - pm.Cache[addr] = pos + shouldRemain := !(pos == len(pm.enabledIndics)) && pm.enabledIndics[pos] == i + if shouldRemain { + // found in current enabled indics, use it + return pos } else { - // not found in current enabled list - pm.Cache[addr] = pm.Chooser.Pick().(int) + return -1 } } + pm.Cache.Filter(filterFn) } func (pm *ProxyManager) Get(addr string) (string, bool) { pm.mux.Lock() defer pm.mux.Unlock() - // fmt.Println(pm.Cache) - if len(pm.enabledIndics) == 0 { - // return error if no enabled proxy - return "", false - } + // fmt.Println(pm.Cache)IsIn if pm.LoadBalanceMode == sticky { - idx, ok := pm.Cache[addr] + idx, ok := pm.Cache.Get(addr) if ok { // fmt.Println("match addr", addr, "using:", idx) return pm.Proxys[pm.enabledIndics[idx]].URL, true } else { idx := pm.Chooser.Pick().(int) - pm.Cache[addr] = idx + pm.Cache.SetTrim(addr, idx, pm.CacheMaxLength) return pm.Proxys[pm.enabledIndics[idx]].URL, true } } else if pm.LoadBalanceMode == cachedShuffle { - idx, ok := pm.Cache[addr] + idx, ok := pm.Cache.Get(addr) if ok { - pm.Cache[addr] = (idx + 1) % (len(pm.enabledIndics)) - return pm.Proxys[pm.enabledIndics[pm.Cache[addr]]].URL, true + idx = (idx + 1) % (len(pm.enabledIndics)) } else { - idx := pm.Chooser.Pick().(int) - pm.Cache[addr] = idx - return pm.Proxys[pm.enabledIndics[idx]].URL, true + idx = pm.Chooser.Pick().(int) } + pm.Cache.SetTrim(addr, idx, pm.CacheMaxLength) + return pm.Proxys[pm.enabledIndics[idx]].URL, true } else { idx := pm.Chooser.Pick().(int) return pm.Proxys[pm.enabledIndics[idx]].URL, true } } -func (pm *ProxyManager) ClearCache() { - // fmt.Println("clearing cache") - pm.mux.Lock() - pm.Cache = make(map[string]int) - pm.mux.Unlock() - // fmt.Println("after:", pm.Cache) -} +// func (pm *ProxyManager) ClearCache() { +// // fmt.Println("clearing cache") +// pm.mux.Lock() +// pm.Cache = New() +// pm.mux.Unlock() +// // fmt.Println("after:", pm.Cache) +// } -func (pm *ProxyManager) keepClearingCache() { - interval := pm.CacheCleanInterval - if interval <= 0 { - return - } - for { - time.Sleep(time.Duration(interval) * time.Second) - pm.ClearCache() - } -} +// func (pm *ProxyManager) keepClearingCache() { +// interval := pm.CacheCleanInterval +// if interval <= 0 { +// return +// } +// for { +// time.Sleep(time.Duration(interval) * time.Second) +// pm.ClearCache() +// } +// } func (pm *ProxyManager) dynamicWeightLoader() { sigs := make(chan os.Signal, 1)