add dynamic weight loading to load balancer

This commit is contained in:
mantaohuang 2020-04-09 23:30:08 -04:00
parent 524a4680d4
commit 916646b1e2
4 changed files with 101 additions and 15 deletions

View File

@ -5,5 +5,8 @@ proxy:
weight: 5 weight: 5
- url: "socks5://192.168.122.128:1082" - url: "socks5://192.168.122.128:1082"
weight: 5 weight: 5
sticky: true - url: "socks5://192.168.122.128:1083"
weight: 5
load-balance-mode: "cached-shuffle"
cache-clean-interval: 10 cache-clean-interval: 10
dynamic-weight-file: "dw.json"

3
go-socks-lb/dw.json Normal file
View File

@ -0,0 +1,3 @@
{
"weights": [0, 0, 0, 2]
}

View File

@ -1,13 +1,18 @@
package main package main
import ( import (
"encoding/json"
"flag" "flag"
"fmt" "fmt"
"io/ioutil"
"math/rand" "math/rand"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"os/signal"
"sort"
"sync" "sync"
"syscall"
"time" "time"
wr "github.com/mroth/weightedrand" wr "github.com/mroth/weightedrand"
@ -88,7 +93,9 @@ type ProxyManager struct {
Cache map[string]int Cache map[string]int
LoadBalanceMode string LoadBalanceMode string
CacheCleanInterval int CacheCleanInterval int
DynamicWeightFile string
Chooser wr.Chooser Chooser wr.Chooser
enabledIndics []int
mux sync.Mutex mux sync.Mutex
} }
type proxyConf struct { type proxyConf struct {
@ -98,6 +105,10 @@ type proxyConf struct {
} `yaml:"proxy"` } `yaml:"proxy"`
LoadBalanceMode string `yaml:"load-balance-mode"` LoadBalanceMode string `yaml:"load-balance-mode"`
CacheCleanInterval int `yaml:"cache-clean-interval"` CacheCleanInterval int `yaml:"cache-clean-interval"`
DynamicWeightFile string `yaml:"dynamic-weight-file"`
}
type dynamicWeight struct {
Weights []int `json:"weights"`
} }
func NewPM(confFp string) (*ProxyManager, error) { func NewPM(confFp string) (*ProxyManager, error) {
@ -114,51 +125,79 @@ func NewPM(confFp string) (*ProxyManager, error) {
if err != nil { if err != nil {
return &ProxyManager{}, err return &ProxyManager{}, err
} }
var chooseArr []wr.Choice for _, pc := range cfg.Proxys {
for idx, pc := range cfg.Proxys {
var pi proxyInst var pi proxyInst
pi.URL = pc.URL pi.URL = pc.URL
pi.Weight = pc.Weight pi.Weight = pc.Weight
pm.Proxys = append(pm.Proxys, pi) pm.Proxys = append(pm.Proxys, pi)
chooseArr = append(chooseArr, wr.Choice{Item: idx, Weight: uint(pi.Weight)})
} }
pm.applyWeight()
pm.LoadBalanceMode = cfg.LoadBalanceMode pm.LoadBalanceMode = cfg.LoadBalanceMode
pm.CacheCleanInterval = cfg.CacheCleanInterval pm.CacheCleanInterval = cfg.CacheCleanInterval
pm.DynamicWeightFile = cfg.DynamicWeightFile
pm.Cache = make(map[string]int) pm.Cache = make(map[string]int)
pm.Chooser = wr.NewChooser(chooseArr...)
go pm.keepClearingCache() go pm.keepClearingCache()
go pm.dynamicWeightLoader()
return &pm, nil return &pm, nil
} }
func (pm *ProxyManager) applyWeight() {
func (pm *ProxyManager) Get(addr string) string { // update Chooser and enabledIndics
var chooseArr []wr.Choice
previousEnabledIndics := pm.enabledIndics
pm.enabledIndics = nil
for idx, p := range pm.Proxys {
if p.Weight > 0 {
pm.enabledIndics = append(pm.enabledIndics, idx)
chooseArr = append(chooseArr, wr.Choice{Item: len(pm.enabledIndics) - 1, Weight: uint(p.Weight)})
}
}
fmt.Println("enabled indices:", pm.enabledIndics)
pm.Chooser = wr.NewChooser(chooseArr...)
// update cache to remove disabled proxys
for addr, idx := range pm.Cache {
i := previousEnabledIndics[idx]
pos := sort.SearchInts(pm.enabledIndics, i)
if !(pos == len(pm.enabledIndics)) && pm.enabledIndics[pos] == i {
// found in current enabled list
pm.Cache[addr] = pos
} else {
// not found in current enabled list
pm.Cache[addr] = pm.Chooser.Pick().(int)
}
}
}
func (pm *ProxyManager) Get(addr string) (string, bool) {
pm.mux.Lock() pm.mux.Lock()
defer pm.mux.Unlock() defer pm.mux.Unlock()
// fmt.Println(pm.Cache) // fmt.Println(pm.Cache)
if len(pm.enabledIndics) == 0 {
// return error if no enabled proxy
return "", false
}
if pm.LoadBalanceMode == sticky { if pm.LoadBalanceMode == sticky {
idx, ok := pm.Cache[addr] idx, ok := pm.Cache[addr]
if ok { if ok {
// fmt.Println("match addr", addr, "using:", idx) // fmt.Println("match addr", addr, "using:", idx)
return pm.Proxys[idx].URL return pm.Proxys[pm.enabledIndics[idx]].URL, true
} else { } else {
idx := pm.Chooser.Pick().(int) idx := pm.Chooser.Pick().(int)
pm.Cache[addr] = idx pm.Cache[addr] = idx
return pm.Proxys[idx].URL return pm.Proxys[pm.enabledIndics[idx]].URL, true
} }
} else if pm.LoadBalanceMode == cachedShuffle { } else if pm.LoadBalanceMode == cachedShuffle {
idx, ok := pm.Cache[addr] idx, ok := pm.Cache[addr]
if ok { if ok {
pm.Cache[addr] = (idx + 1) % (len(pm.Proxys)) pm.Cache[addr] = (idx + 1) % (len(pm.enabledIndics))
return pm.Proxys[pm.Cache[addr]].URL return pm.Proxys[pm.enabledIndics[pm.Cache[addr]]].URL, true
} else { } else {
idx := pm.Chooser.Pick().(int) idx := pm.Chooser.Pick().(int)
pm.Cache[addr] = idx pm.Cache[addr] = idx
return pm.Proxys[idx].URL return pm.Proxys[pm.enabledIndics[idx]].URL, true
} }
} else { } else {
idx := pm.Chooser.Pick().(int) idx := pm.Chooser.Pick().(int)
return pm.Proxys[idx].URL return pm.Proxys[pm.enabledIndics[idx]].URL, true
} }
//return ""
} }
func (pm *ProxyManager) ClearCache() { func (pm *ProxyManager) ClearCache() {
@ -180,6 +219,40 @@ func (pm *ProxyManager) keepClearingCache() {
} }
} }
func (pm *ProxyManager) dynamicWeightLoader() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGUSR1, syscall.SIGTERM)
go func() {
for {
<-sigs
fmt.Println("Loading new weights")
file, err := ioutil.ReadFile(pm.DynamicWeightFile)
if err != nil {
fmt.Println("Error loading new weights, error:", err)
continue
}
dw := dynamicWeight{}
err = json.Unmarshal([]byte(file), &dw)
if err != nil {
fmt.Println("Error loading new weights, json parsing error:", err)
continue
}
// check if number of weights is the same as the number of proxys configured
if len(dw.Weights) != len(pm.Proxys) {
fmt.Println("Error loading new weights, unexpected number of weights")
continue
}
// apply the weights
pm.mux.Lock()
for idx, weight := range dw.Weights {
pm.Proxys[idx].Weight = weight
}
pm.applyWeight()
pm.mux.Unlock()
}
}()
}
func main() { func main() {
// testFn := func() bool { // testFn := func() bool {
// status := proxyTestStatusCode("socks5://127.0.0.1:1080", URL, 204) // status := proxyTestStatusCode("socks5://127.0.0.1:1080", URL, 204)
@ -203,6 +276,7 @@ func main() {
var pm *ProxyManager var pm *ProxyManager
pm, err := NewPM(*configPath) pm, err := NewPM(*configPath)
fmt.Println("pm:", pm)
if err != nil { if err != nil {
return return
} }

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -173,7 +174,12 @@ func (s *Server) handleConnect(ctx context.Context, conn conn, req *Request) err
// dial := s.config.Dial // dial := s.config.Dial
dial := func(ctx context.Context, net_, addr string) (net.Conn, error) { dial := func(ctx context.Context, net_, addr string) (net.Conn, error) {
var dp proxy.Dialer var dp proxy.Dialer
u, _ := url.Parse(s.config.PM.Get(addr)) proxyAddr, got := s.config.PM.Get(addr)
if !got {
var nullDial net.Conn
return nullDial, errors.New("Cannot get an proxy from proxy manager")
}
u, _ := url.Parse(proxyAddr)
dp, _ = proxy.FromURL(u, proxy.Direct) dp, _ = proxy.FromURL(u, proxy.Direct)
return dp.Dial(net_, addr) return dp.Dial(net_, addr)
} }