From 916646b1e2e6fa95095abcd32aa0a7474f61e5ca Mon Sep 17 00:00:00 2001 From: mantaohuang Date: Thu, 9 Apr 2020 23:30:08 -0400 Subject: [PATCH] add dynamic weight loading to load balancer --- go-socks-lb/config.yml | 5 ++- go-socks-lb/dw.json | 3 ++ go-socks-lb/main.go | 100 +++++++++++++++++++++++++++++++++++------ go-socks-lb/request.go | 8 +++- 4 files changed, 101 insertions(+), 15 deletions(-) create mode 100644 go-socks-lb/dw.json diff --git a/go-socks-lb/config.yml b/go-socks-lb/config.yml index d6ead0e..4fb97e6 100644 --- a/go-socks-lb/config.yml +++ b/go-socks-lb/config.yml @@ -5,5 +5,8 @@ proxy: weight: 5 - url: "socks5://192.168.122.128:1082" weight: 5 -sticky: true + - url: "socks5://192.168.122.128:1083" + weight: 5 +load-balance-mode: "cached-shuffle" cache-clean-interval: 10 +dynamic-weight-file: "dw.json" diff --git a/go-socks-lb/dw.json b/go-socks-lb/dw.json new file mode 100644 index 0000000..c79ba13 --- /dev/null +++ b/go-socks-lb/dw.json @@ -0,0 +1,3 @@ +{ + "weights": [0, 0, 0, 2] +} \ No newline at end of file diff --git a/go-socks-lb/main.go b/go-socks-lb/main.go index 2f5f9f2..adf4e18 100644 --- a/go-socks-lb/main.go +++ b/go-socks-lb/main.go @@ -1,13 +1,18 @@ package main import ( + "encoding/json" "flag" "fmt" + "io/ioutil" "math/rand" "net/http" "net/url" "os" + "os/signal" + "sort" "sync" + "syscall" "time" wr "github.com/mroth/weightedrand" @@ -88,7 +93,9 @@ type ProxyManager struct { Cache map[string]int LoadBalanceMode string CacheCleanInterval int + DynamicWeightFile string Chooser wr.Chooser + enabledIndics []int mux sync.Mutex } type proxyConf struct { @@ -98,6 +105,10 @@ type proxyConf struct { } `yaml:"proxy"` LoadBalanceMode string `yaml:"load-balance-mode"` CacheCleanInterval int `yaml:"cache-clean-interval"` + DynamicWeightFile string `yaml:"dynamic-weight-file"` +} +type dynamicWeight struct { + Weights []int `json:"weights"` } func NewPM(confFp string) (*ProxyManager, error) { @@ -114,51 +125,79 @@ func NewPM(confFp string) (*ProxyManager, error) { if err != nil { return &ProxyManager{}, err } - var chooseArr []wr.Choice - for idx, pc := range cfg.Proxys { + for _, pc := range cfg.Proxys { var pi proxyInst pi.URL = pc.URL pi.Weight = pc.Weight pm.Proxys = append(pm.Proxys, pi) - chooseArr = append(chooseArr, wr.Choice{Item: idx, Weight: uint(pi.Weight)}) } + pm.applyWeight() pm.LoadBalanceMode = cfg.LoadBalanceMode pm.CacheCleanInterval = cfg.CacheCleanInterval + pm.DynamicWeightFile = cfg.DynamicWeightFile pm.Cache = make(map[string]int) - pm.Chooser = wr.NewChooser(chooseArr...) go pm.keepClearingCache() + go pm.dynamicWeightLoader() return &pm, nil } - -func (pm *ProxyManager) Get(addr string) string { +func (pm *ProxyManager) applyWeight() { + // update Chooser and enabledIndics + var chooseArr []wr.Choice + previousEnabledIndics := pm.enabledIndics + pm.enabledIndics = nil + for idx, p := range pm.Proxys { + if p.Weight > 0 { + pm.enabledIndics = append(pm.enabledIndics, idx) + chooseArr = append(chooseArr, wr.Choice{Item: len(pm.enabledIndics) - 1, Weight: uint(p.Weight)}) + } + } + fmt.Println("enabled indices:", pm.enabledIndics) + pm.Chooser = wr.NewChooser(chooseArr...) + // update cache to remove disabled proxys + for addr, idx := range pm.Cache { + i := previousEnabledIndics[idx] + pos := sort.SearchInts(pm.enabledIndics, i) + if !(pos == len(pm.enabledIndics)) && pm.enabledIndics[pos] == i { + // found in current enabled list + pm.Cache[addr] = pos + } else { + // not found in current enabled list + pm.Cache[addr] = pm.Chooser.Pick().(int) + } + } +} +func (pm *ProxyManager) Get(addr string) (string, bool) { pm.mux.Lock() defer pm.mux.Unlock() // fmt.Println(pm.Cache) + if len(pm.enabledIndics) == 0 { + // return error if no enabled proxy + return "", false + } if pm.LoadBalanceMode == sticky { idx, ok := pm.Cache[addr] if ok { // fmt.Println("match addr", addr, "using:", idx) - return pm.Proxys[idx].URL + return pm.Proxys[pm.enabledIndics[idx]].URL, true } else { idx := pm.Chooser.Pick().(int) pm.Cache[addr] = idx - return pm.Proxys[idx].URL + return pm.Proxys[pm.enabledIndics[idx]].URL, true } } else if pm.LoadBalanceMode == cachedShuffle { idx, ok := pm.Cache[addr] if ok { - pm.Cache[addr] = (idx + 1) % (len(pm.Proxys)) - return pm.Proxys[pm.Cache[addr]].URL + pm.Cache[addr] = (idx + 1) % (len(pm.enabledIndics)) + return pm.Proxys[pm.enabledIndics[pm.Cache[addr]]].URL, true } else { idx := pm.Chooser.Pick().(int) pm.Cache[addr] = idx - return pm.Proxys[idx].URL + return pm.Proxys[pm.enabledIndics[idx]].URL, true } } else { idx := pm.Chooser.Pick().(int) - return pm.Proxys[idx].URL + return pm.Proxys[pm.enabledIndics[idx]].URL, true } - //return "" } func (pm *ProxyManager) ClearCache() { @@ -180,6 +219,40 @@ func (pm *ProxyManager) keepClearingCache() { } } +func (pm *ProxyManager) dynamicWeightLoader() { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGUSR1, syscall.SIGTERM) + go func() { + for { + <-sigs + fmt.Println("Loading new weights") + file, err := ioutil.ReadFile(pm.DynamicWeightFile) + if err != nil { + fmt.Println("Error loading new weights, error:", err) + continue + } + dw := dynamicWeight{} + err = json.Unmarshal([]byte(file), &dw) + if err != nil { + fmt.Println("Error loading new weights, json parsing error:", err) + continue + } + // check if number of weights is the same as the number of proxys configured + if len(dw.Weights) != len(pm.Proxys) { + fmt.Println("Error loading new weights, unexpected number of weights") + continue + } + // apply the weights + pm.mux.Lock() + for idx, weight := range dw.Weights { + pm.Proxys[idx].Weight = weight + } + pm.applyWeight() + pm.mux.Unlock() + } + }() +} + func main() { // testFn := func() bool { // status := proxyTestStatusCode("socks5://127.0.0.1:1080", URL, 204) @@ -203,6 +276,7 @@ func main() { var pm *ProxyManager pm, err := NewPM(*configPath) + fmt.Println("pm:", pm) if err != nil { return } diff --git a/go-socks-lb/request.go b/go-socks-lb/request.go index 1913330..8ec0cd9 100644 --- a/go-socks-lb/request.go +++ b/go-socks-lb/request.go @@ -1,6 +1,7 @@ package main import ( + "errors" "fmt" "io" "net" @@ -173,7 +174,12 @@ func (s *Server) handleConnect(ctx context.Context, conn conn, req *Request) err // dial := s.config.Dial dial := func(ctx context.Context, net_, addr string) (net.Conn, error) { var dp proxy.Dialer - u, _ := url.Parse(s.config.PM.Get(addr)) + proxyAddr, got := s.config.PM.Get(addr) + if !got { + var nullDial net.Conn + return nullDial, errors.New("Cannot get an proxy from proxy manager") + } + u, _ := url.Parse(proxyAddr) dp, _ = proxy.FromURL(u, proxy.Direct) return dp.Dial(net_, addr) }